solana_core/
completed_data_sets_service.rs1use {
8 crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
9 solana_entry::entry::Entry,
10 solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo},
11 solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
12 solana_signature::Signature,
13 std::{
14 sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17 },
18 thread::{self, Builder, JoinHandle},
19 time::Duration,
20 },
21};
22
23pub type CompletedDataSetsReceiver = Receiver<Vec<CompletedDataSetInfo>>;
24pub type CompletedDataSetsSender = Sender<Vec<CompletedDataSetInfo>>;
25
26pub struct CompletedDataSetsService {
27 thread_hdl: JoinHandle<()>,
28}
29
30impl CompletedDataSetsService {
31 pub fn new(
32 completed_sets_receiver: CompletedDataSetsReceiver,
33 blockstore: Arc<Blockstore>,
34 rpc_subscriptions: Arc<RpcSubscriptions>,
35 exit: Arc<AtomicBool>,
36 max_slots: Arc<MaxSlots>,
37 ) -> Self {
38 let thread_hdl = Builder::new()
39 .name("solComplDataSet".to_string())
40 .spawn(move || {
41 info!("CompletedDataSetsService has started");
42 loop {
43 if exit.load(Ordering::Relaxed) {
44 break;
45 }
46 if let Err(RecvTimeoutError::Disconnected) = Self::recv_completed_data_sets(
47 &completed_sets_receiver,
48 &blockstore,
49 &rpc_subscriptions,
50 &max_slots,
51 ) {
52 break;
53 }
54 }
55 info!("CompletedDataSetsService has stopped");
56 })
57 .unwrap();
58 Self { thread_hdl }
59 }
60
61 fn recv_completed_data_sets(
62 completed_sets_receiver: &CompletedDataSetsReceiver,
63 blockstore: &Blockstore,
64 rpc_subscriptions: &RpcSubscriptions,
65 max_slots: &Arc<MaxSlots>,
66 ) -> Result<(), RecvTimeoutError> {
67 const RECV_TIMEOUT: Duration = Duration::from_secs(1);
68 let handle_completed_data_set_info = |completed_data_set_info| {
69 let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
70 match blockstore.get_entries_in_data_block(slot, indices, None) {
71 Ok(entries) => {
72 let transactions = Self::get_transaction_signatures(entries);
73 if !transactions.is_empty() {
74 rpc_subscriptions.notify_signatures_received((slot, transactions));
75 }
76 }
77 Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e),
78 }
79 slot
80 };
81 let slots = completed_sets_receiver
82 .recv_timeout(RECV_TIMEOUT)
83 .map(std::iter::once)?
84 .chain(completed_sets_receiver.try_iter())
85 .flatten()
86 .map(handle_completed_data_set_info);
87 if let Some(slot) = slots.max() {
88 max_slots.shred_insert.fetch_max(slot, Ordering::Relaxed);
89 }
90 Ok(())
91 }
92
93 fn get_transaction_signatures(entries: Vec<Entry>) -> Vec<Signature> {
94 entries
95 .into_iter()
96 .flat_map(|e| {
97 e.transactions
98 .into_iter()
99 .filter_map(|mut t| t.signatures.drain(..).next())
100 })
101 .collect::<Vec<Signature>>()
102 }
103
104 pub fn join(self) -> thread::Result<()> {
105 self.thread_hdl.join()
106 }
107}
108
109#[cfg(test)]
110pub mod test {
111 use {
112 super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
113 solana_transaction::Transaction,
114 };
115
116 #[test]
117 fn test_zero_signatures() {
118 let tx = Transaction::new_with_payer(&[], None);
119 let entries = vec![Entry::new(&Hash::default(), 1, vec![tx])];
120 let signatures = CompletedDataSetsService::get_transaction_signatures(entries);
121 assert!(signatures.is_empty());
122 }
123
124 #[test]
125 fn test_multi_signatures() {
126 let kp = Keypair::new();
127 let tx =
128 Transaction::new_signed_with_payer(&[], Some(&kp.pubkey()), &[&kp], Hash::default());
129 let entries = vec![Entry::new(&Hash::default(), 1, vec![tx.clone()])];
130 let signatures = CompletedDataSetsService::get_transaction_signatures(entries);
131 assert_eq!(signatures.len(), 1);
132
133 let entries = vec![
134 Entry::new(&Hash::default(), 1, vec![tx.clone(), tx.clone()]),
135 Entry::new(&Hash::default(), 1, vec![tx]),
136 ];
137 let signatures = CompletedDataSetsService::get_transaction_signatures(entries);
138 assert_eq!(signatures.len(), 3);
139 }
140}