solana_core/
completed_data_sets_service.rs

1//! [`CompletedDataSetsService`] is a hub, that runs different operations when a "completed data
2//! set", also known as a [`Vec<Entry>`], is received by the validator.
3//!
4//! Currently, `WindowService` sends [`CompletedDataSetInfo`]s via a `completed_sets_receiver`
5//! provided to the [`CompletedDataSetsService`].
6
7use {
8    crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
9    solana_entry::entry::Entry,
10    solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo},
11    solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
12    solana_signature::Signature,
13    std::{
14        sync::{
15            atomic::{AtomicBool, Ordering},
16            Arc,
17        },
18        thread::{self, Builder, JoinHandle},
19        time::Duration,
20    },
21};
22
23pub type CompletedDataSetsReceiver = Receiver<Vec<CompletedDataSetInfo>>;
24pub type CompletedDataSetsSender = Sender<Vec<CompletedDataSetInfo>>;
25
26pub struct CompletedDataSetsService {
27    thread_hdl: JoinHandle<()>,
28}
29
30impl CompletedDataSetsService {
31    pub fn new(
32        completed_sets_receiver: CompletedDataSetsReceiver,
33        blockstore: Arc<Blockstore>,
34        rpc_subscriptions: Arc<RpcSubscriptions>,
35        exit: Arc<AtomicBool>,
36        max_slots: Arc<MaxSlots>,
37    ) -> Self {
38        let thread_hdl = Builder::new()
39            .name("solComplDataSet".to_string())
40            .spawn(move || {
41                info!("CompletedDataSetsService has started");
42                loop {
43                    if exit.load(Ordering::Relaxed) {
44                        break;
45                    }
46                    if let Err(RecvTimeoutError::Disconnected) = Self::recv_completed_data_sets(
47                        &completed_sets_receiver,
48                        &blockstore,
49                        &rpc_subscriptions,
50                        &max_slots,
51                    ) {
52                        break;
53                    }
54                }
55                info!("CompletedDataSetsService has stopped");
56            })
57            .unwrap();
58        Self { thread_hdl }
59    }
60
61    fn recv_completed_data_sets(
62        completed_sets_receiver: &CompletedDataSetsReceiver,
63        blockstore: &Blockstore,
64        rpc_subscriptions: &RpcSubscriptions,
65        max_slots: &Arc<MaxSlots>,
66    ) -> Result<(), RecvTimeoutError> {
67        const RECV_TIMEOUT: Duration = Duration::from_secs(1);
68        let handle_completed_data_set_info = |completed_data_set_info| {
69            let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
70            match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) {
71                Ok(entries) => {
72                    let transactions = Self::get_transaction_signatures(entries);
73                    if !transactions.is_empty() {
74                        rpc_subscriptions.notify_signatures_received((slot, transactions));
75                    }
76                }
77                Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e),
78            }
79            slot
80        };
81        let slots = completed_sets_receiver
82            .recv_timeout(RECV_TIMEOUT)
83            .map(std::iter::once)?
84            .chain(completed_sets_receiver.try_iter())
85            .flatten()
86            .map(handle_completed_data_set_info);
87        if let Some(slot) = slots.max() {
88            max_slots.shred_insert.fetch_max(slot, Ordering::Relaxed);
89        }
90        Ok(())
91    }
92
93    fn get_transaction_signatures(entries: Vec<Entry>) -> Vec<Signature> {
94        entries
95            .into_iter()
96            .flat_map(|e| {
97                e.transactions
98                    .into_iter()
99                    .filter_map(|mut t| t.signatures.drain(..).next())
100            })
101            .collect::<Vec<Signature>>()
102    }
103
104    pub fn join(self) -> thread::Result<()> {
105        self.thread_hdl.join()
106    }
107}
108
109#[cfg(test)]
110pub mod test {
111    use {
112        super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
113        solana_transaction::Transaction,
114    };
115
116    #[test]
117    fn test_zero_signatures() {
118        let tx = Transaction::new_with_payer(&[], None);
119        let entries = vec![Entry::new(&Hash::default(), 1, vec![tx])];
120        let signatures = CompletedDataSetsService::get_transaction_signatures(entries);
121        assert!(signatures.is_empty());
122    }
123
124    #[test]
125    fn test_multi_signatures() {
126        let kp = Keypair::new();
127        let tx =
128            Transaction::new_signed_with_payer(&[], Some(&kp.pubkey()), &[&kp], Hash::default());
129        let entries = vec![Entry::new(&Hash::default(), 1, vec![tx.clone()])];
130        let signatures = CompletedDataSetsService::get_transaction_signatures(entries);
131        assert_eq!(signatures.len(), 1);
132
133        let entries = vec![
134            Entry::new(&Hash::default(), 1, vec![tx.clone(), tx.clone()]),
135            Entry::new(&Hash::default(), 1, vec![tx]),
136        ];
137        let signatures = CompletedDataSetsService::get_transaction_signatures(entries);
138        assert_eq!(signatures.len(), 3);
139    }
140}