1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7 block_metadata_notifier_interface::BlockMetadataNotifier,
8 geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14 optimistically_confirmed_bank_tracker::SlotNotification,
15 transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21 fmt::Display,
22 future::Future,
23 io,
24 ops::Range,
25 path::PathBuf,
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29 },
30};
31use thiserror::Error;
32use tokio::{
33 sync::broadcast::{self, error::TryRecvError},
34 time::timeout,
35};
36
37use crate::{
38 LOG_MODULE, SharedError,
39 epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
40 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
41 node_reader::NodeReader,
42 utils,
43};
44
45const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49const BINCODE_EPOCH_CUTOFF: u64 = 157;
51
52fn poll_shutdown(
53 flag: &Arc<std::sync::atomic::AtomicBool>,
54 receiver: &mut Option<broadcast::Receiver<()>>,
55) -> bool {
56 if let Some(rx) = receiver {
57 match rx.try_recv() {
58 Ok(_) | Err(TryRecvError::Lagged(_)) => {
59 flag.store(true, Ordering::SeqCst);
60 }
61 Err(TryRecvError::Closed) => {
62 flag.store(true, Ordering::SeqCst);
63 }
64 Err(TryRecvError::Empty) => {}
65 }
66 }
67 flag.load(Ordering::SeqCst)
68}
69
70fn is_shutdown_error(err: &FirehoseError) -> bool {
71 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
72 inner
73 .downcast_ref::<io::Error>()
74 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
75 .unwrap_or(false)
76 }
77
78 match err {
79 FirehoseError::BlockHandlerError(inner)
80 | FirehoseError::TransactionHandlerError(inner)
81 | FirehoseError::EntryHandlerError(inner)
82 | FirehoseError::RewardHandlerError(inner)
83 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
84 _ => false,
85 }
86}
87
88async fn find_previous_indexed_slot(
89 local_start: u64,
90 epoch_start: u64,
91 log_target: &str,
92) -> Result<Option<u64>, FirehoseError> {
93 if local_start <= epoch_start {
94 return Ok(None);
95 }
96 let mut candidate = local_start.saturating_sub(1);
97 let mut skipped = 0u64;
98 loop {
99 match slot_to_offset(candidate).await {
100 Ok(_) => {
101 if skipped > 0 {
102 log::info!(
103 target: log_target,
104 "slot {} missing in index; seeking back {} slots to {}",
105 local_start.saturating_sub(1),
106 skipped,
107 candidate
108 );
109 }
110 return Ok(Some(candidate));
111 }
112 Err(SlotOffsetIndexError::SlotNotFound(..)) => {
113 if candidate <= epoch_start {
114 break;
115 }
116 skipped += 1;
117 candidate = candidate.saturating_sub(1);
118 }
119 Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
120 }
121 }
122 log::warn!(
123 target: log_target,
124 "no indexed slot found before {} (epoch start {}); reading from epoch start",
125 local_start,
126 epoch_start
127 );
128 Ok(None)
129}
130
131#[derive(Debug, Error)]
134pub enum FirehoseError {
135 Reqwest(reqwest::Error),
137 ReadHeader(SharedError),
139 GeyserPluginService(GeyserPluginServiceError),
141 FailedToGetTransactionNotifier,
143 ReadUntilBlockError(SharedError),
145 GetBlockError(SharedError),
147 NodeDecodingError(usize, SharedError),
149 SlotOffsetIndexError(SlotOffsetIndexError),
151 SeekToSlotError(SharedError),
153 OnLoadError(SharedError),
155 OnStatsHandlerError(SharedError),
157 OperationTimeout(&'static str),
159 TransactionHandlerError(SharedError),
161 EntryHandlerError(SharedError),
163 RewardHandlerError(SharedError),
165 BlockHandlerError(SharedError),
167}
168
169unsafe impl Send for FirehoseError {}
170unsafe impl Sync for FirehoseError {}
171
172impl Display for FirehoseError {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 match self {
175 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
176 FirehoseError::ReadHeader(error) => {
177 write!(f, "Error reading header: {}", error)
178 }
179 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
180 f,
181 "Error initializing geyser plugin service: {}",
182 geyser_plugin_service_error
183 ),
184 FirehoseError::FailedToGetTransactionNotifier => write!(
185 f,
186 "Failed to get transaction notifier from GeyserPluginService"
187 ),
188 FirehoseError::ReadUntilBlockError(error) => {
189 write!(f, "Error reading until block: {}", error)
190 }
191 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
192 FirehoseError::NodeDecodingError(item_index, error) => {
193 write!(
194 f,
195 "Error seeking, reading data from, or decoding data for data node {}: {}",
196 item_index, error
197 )
198 }
199 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
200 f,
201 "Error getting info from slot offset index: {}",
202 slot_offset_index_error
203 ),
204 FirehoseError::SeekToSlotError(error) => {
205 write!(f, "Error seeking to slot: {}", error)
206 }
207 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
208 FirehoseError::OnStatsHandlerError(error) => {
209 write!(f, "Stats handler error: {}", error)
210 }
211 FirehoseError::OperationTimeout(op) => {
212 write!(f, "Timeout while waiting for operation: {}", op)
213 }
214 FirehoseError::TransactionHandlerError(error) => {
215 write!(f, "Transaction handler error: {}", error)
216 }
217 FirehoseError::EntryHandlerError(error) => {
218 write!(f, "Entry handler error: {}", error)
219 }
220 FirehoseError::RewardHandlerError(error) => {
221 write!(f, "Reward handler error: {}", error)
222 }
223 FirehoseError::BlockHandlerError(error) => {
224 write!(f, "Block handler error: {}", error)
225 }
226 }
227 }
228}
229
230impl From<reqwest::Error> for FirehoseError {
231 fn from(e: reqwest::Error) -> Self {
232 FirehoseError::Reqwest(e)
233 }
234}
235
236impl From<GeyserPluginServiceError> for FirehoseError {
237 fn from(e: GeyserPluginServiceError) -> Self {
238 FirehoseError::GeyserPluginService(e)
239 }
240}
241
242impl From<SlotOffsetIndexError> for FirehoseError {
243 fn from(e: SlotOffsetIndexError) -> Self {
244 FirehoseError::SlotOffsetIndexError(e)
245 }
246}
247
248#[derive(Clone, PartialEq, Eq, Hash, Debug)]
250pub struct ThreadStats {
251 pub thread_id: usize,
253 pub start_time: std::time::Instant,
255 pub finish_time: Option<std::time::Instant>,
257 pub slot_range: Range<u64>,
259 pub initial_slot_range: Range<u64>,
261 pub current_slot: u64,
263 pub slots_processed: u64,
265 pub blocks_processed: u64,
267 pub leader_skipped_slots: u64,
269 pub transactions_processed: u64,
271 pub entries_processed: u64,
273 pub rewards_processed: u64,
275}
276
277#[derive(Clone, PartialEq, Eq, Hash, Debug)]
279pub struct Stats {
280 pub thread_stats: ThreadStats,
282 pub start_time: std::time::Instant,
284 pub finish_time: Option<std::time::Instant>,
286 pub slot_range: Range<u64>,
288 pub slots_processed: u64,
290 pub blocks_processed: u64,
292 pub leader_skipped_slots: u64,
294 pub transactions_processed: u64,
296 pub entries_processed: u64,
298 pub rewards_processed: u64,
300 pub transactions_since_last_pulse: u64,
302 pub blocks_since_last_pulse: u64,
304 pub slots_since_last_pulse: u64,
306 pub time_since_last_pulse: std::time::Duration,
308}
309
310#[derive(Clone, PartialEq, Eq, Hash, Debug)]
312pub struct StatsTracking<OnStats: Handler<Stats>> {
313 pub on_stats: OnStats,
315 pub tracking_interval_slots: u64,
317}
318
319#[inline(always)]
320#[allow(clippy::too_many_arguments)]
321async fn maybe_emit_stats<OnStats: Handler<Stats>>(
322 stats_tracking: Option<&StatsTracking<OnStats>>,
323 thread_index: usize,
324 thread_stats: &ThreadStats,
325 overall_slots_processed: &AtomicU64,
326 overall_blocks_processed: &AtomicU64,
327 overall_transactions_processed: &AtomicU64,
328 overall_entries_processed: &AtomicU64,
329 transactions_since_stats: &AtomicU64,
330 blocks_since_stats: &AtomicU64,
331 slots_since_stats: &AtomicU64,
332 last_pulse: &Arc<AtomicU64>,
333 base_instant: std::time::Instant,
334) -> Result<(), (FirehoseError, u64)> {
335 if let Some(stats_tracker) = stats_tracking {
336 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
337 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
338 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
339 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
340 let now_nanos = base_instant.elapsed().as_nanos() as u64;
341 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
342 let delta_nanos = now_nanos.saturating_sub(previous);
343 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
344 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
345 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
346 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
347
348 let stats = Stats {
349 thread_stats: thread_stats.clone(),
350 start_time: thread_stats.start_time,
351 finish_time: thread_stats.finish_time,
352 slot_range: thread_stats.slot_range.clone(),
353 slots_processed: total_slots,
354 blocks_processed: total_blocks,
355 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
356 transactions_processed: total_transactions,
357 entries_processed: total_entries,
358 rewards_processed: thread_stats.rewards_processed,
359 transactions_since_last_pulse: processed_transactions,
360 blocks_since_last_pulse: processed_blocks,
361 slots_since_last_pulse: processed_slots,
362 time_since_last_pulse,
363 };
364
365 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
366 last_pulse.store(previous, Ordering::Relaxed);
367 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
368 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
369 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
370 return Err((
371 FirehoseError::OnStatsHandlerError(e),
372 thread_stats.current_slot,
373 ));
374 }
375 }
376 Ok(())
377}
378
379#[inline(always)]
380fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
381 if tracking_enabled {
382 atomic.fetch_add(value, Ordering::Relaxed);
383 }
384}
385
386fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
387 map.get(&thread_id)
388 .map(|set| set.remove(&slot).is_some())
389 .unwrap_or(false)
390}
391
392fn decode_transaction_status_meta_from_frame(
393 slot: u64,
394 reassembled_metadata: Vec<u8>,
395) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
396 if reassembled_metadata.is_empty() {
397 return Ok(solana_transaction_status::TransactionStatusMeta::default());
399 }
400
401 match utils::decompress_zstd(reassembled_metadata.clone()) {
402 Ok(decompressed) => {
403 decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
404 Box::new(std::io::Error::other(format!(
405 "decode transaction metadata (slot {slot}): {err}"
406 ))) as SharedError
407 })
408 }
409 Err(decomp_err) => {
410 decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
413 Box::new(std::io::Error::other(format!(
414 "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
415 ))) as SharedError
416 })
417 }
418 }
419}
420
421#[derive(Debug, Default)]
422struct DecodedRewards {
423 keyed_rewards: Vec<(Address, RewardInfo)>,
424 num_partitions: Option<u64>,
425}
426
427impl DecodedRewards {
428 fn empty() -> Self {
429 Self {
430 keyed_rewards: Vec::new(),
431 num_partitions: None,
432 }
433 }
434}
435
436fn decode_rewards_from_frame(
437 slot: u64,
438 reassembled_rewards: Vec<u8>,
439) -> Result<DecodedRewards, SharedError> {
440 if reassembled_rewards.is_empty() {
441 return Ok(DecodedRewards::empty());
443 }
444
445 match utils::decompress_zstd(reassembled_rewards.clone()) {
446 Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
447 |err| {
448 Box::new(std::io::Error::other(format!(
449 "decode rewards (slot {slot}): {err}"
450 ))) as SharedError
451 },
452 ),
453 Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
454 |err| {
455 Box::new(std::io::Error::other(format!(
456 "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
457 ))) as SharedError
458 },
459 ),
460 }
461}
462
463fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
464 let epoch = slot_to_epoch(slot);
465 let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
466 prost_011::Message::decode(bytes);
467 match proto_attempt {
468 Ok(proto) => {
469 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
470 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
471 Box::new(std::io::Error::other(format!(
472 "convert rewards proto failed (epoch {epoch}): {err}"
473 ))) as SharedError
474 })?;
475 Ok(DecodedRewards {
476 keyed_rewards,
477 num_partitions,
478 })
479 }
480 Err(proto_err) => {
481 let stored: solana_storage_proto::StoredExtendedRewards =
482 bincode::deserialize(bytes).map_err(|bin_err| {
483 Box::new(std::io::Error::other(format!(
484 "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
485 ))) as SharedError
486 })?;
487 let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
488 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
489 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
490 Box::new(std::io::Error::other(format!(
491 "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
492 ))) as SharedError
493 })?;
494 Ok(DecodedRewards {
495 keyed_rewards,
496 num_partitions,
497 })
498 }
499 }
500}
501
502fn decode_transaction_status_meta(
503 slot: u64,
504 metadata_bytes: &[u8],
505) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
506 let epoch = slot_to_epoch(slot);
507 let mut bincode_err: Option<String> = None;
508 if epoch < BINCODE_EPOCH_CUTOFF {
509 match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
510 metadata_bytes,
511 ) {
512 Ok(stored) => return Ok(stored.into()),
513 Err(err) => {
514 bincode_err = Some(err.to_string());
515 }
516 }
517 }
518
519 let bin_err_for_proto = bincode_err.clone();
520 let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
521 prost_011::Message::decode(metadata_bytes).map_err(|err| {
522 if let Some(ref bin_err) = bin_err_for_proto {
524 Box::new(std::io::Error::other(format!(
525 "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
526 ))) as SharedError
527 } else {
528 Box::new(std::io::Error::other(format!(
529 "protobuf decode transaction metadata: {err}"
530 ))) as SharedError
531 }
532 })?;
533
534 proto.try_into().map_err(|err| {
535 if let Some(ref bin_err) = bincode_err {
536 Box::new(std::io::Error::other(format!(
537 "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
538 ))) as SharedError
539 } else {
540 Box::new(std::io::Error::other(format!(
541 "convert transaction metadata proto: {err}"
542 ))) as SharedError
543 }
544 })
545}
546
547#[cfg(test)]
548mod metadata_decode_tests {
549 use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
550 use solana_message::v0::LoadedAddresses;
551 use solana_storage_proto::StoredTransactionStatusMeta;
552 use solana_transaction_status::TransactionStatusMeta;
553
554 fn sample_meta() -> TransactionStatusMeta {
555 let mut meta = TransactionStatusMeta::default();
556 meta.fee = 42;
557 meta.pre_balances = vec![1, 2];
558 meta.post_balances = vec![3, 4];
559 meta.log_messages = Some(vec!["hello".into()]);
560 meta.pre_token_balances = Some(Vec::new());
561 meta.post_token_balances = Some(Vec::new());
562 meta.rewards = Some(Vec::new());
563 meta.compute_units_consumed = Some(7);
564 meta.cost_units = Some(9);
565 meta.loaded_addresses = LoadedAddresses::default();
566 meta
567 }
568
569 #[test]
570 fn decodes_bincode_metadata_for_early_epochs() {
571 let stored = StoredTransactionStatusMeta {
572 status: Ok(()),
573 fee: 42,
574 pre_balances: vec![1, 2],
575 post_balances: vec![3, 4],
576 inner_instructions: None,
577 log_messages: Some(vec!["hello".into()]),
578 pre_token_balances: Some(Vec::new()),
579 post_token_balances: Some(Vec::new()),
580 rewards: Some(Vec::new()),
581 return_data: None,
582 compute_units_consumed: Some(7),
583 cost_units: Some(9),
584 };
585 let bytes = bincode::serialize(&stored).expect("bincode serialize");
586 let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
587 assert_eq!(decoded, TransactionStatusMeta::from(stored));
588 }
589
590 #[test]
591 fn decodes_protobuf_metadata_for_later_epochs() {
592 let meta = sample_meta();
593 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
594 meta.clone().into();
595 let bytes = prost_011::Message::encode_to_vec(&generated);
596 let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
597 assert_eq!(decoded, meta);
598 }
599
600 #[test]
601 fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
602 let meta = sample_meta();
603 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
604 meta.clone().into();
605 let bytes = prost_011::Message::encode_to_vec(&generated);
606 let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
608 assert_eq!(decoded, meta);
609 }
610
611 #[test]
612 fn empty_frame_decodes_to_default() {
613 let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
614 assert_eq!(decoded, TransactionStatusMeta::default());
615 }
616
617 #[test]
618 fn raw_bincode_frame_without_zstd_still_decodes() {
619 let stored = StoredTransactionStatusMeta {
620 status: Ok(()),
621 fee: 1,
622 pre_balances: vec![],
623 post_balances: vec![],
624 inner_instructions: None,
625 log_messages: None,
626 pre_token_balances: Some(Vec::new()),
627 post_token_balances: Some(Vec::new()),
628 rewards: Some(Vec::new()),
629 return_data: None,
630 compute_units_consumed: None,
631 cost_units: None,
632 };
633 let raw_bytes = bincode::serialize(&stored).expect("serialize");
634 let decoded =
635 decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
636 assert_eq!(decoded, TransactionStatusMeta::from(stored));
637 }
638}
639
640#[cfg(test)]
641mod rewards_decode_tests {
642 use super::decode_rewards_from_bytes;
643 use solana_sdk_ids::vote::id as vote_program_id;
644 use solana_storage_proto::StoredExtendedRewards;
645 use solana_transaction_status::{Reward, RewardType};
646
647 #[test]
648 fn decodes_protobuf_rewards() {
649 let pubkey = vote_program_id().to_string();
650 let proto = solana_storage_proto::convert::generated::Rewards {
651 rewards: vec![solana_storage_proto::convert::generated::Reward {
652 pubkey,
653 lamports: 5,
654 post_balance: 10,
655 reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
656 commission: "1".to_string(),
657 }],
658 num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
659 num_partitions: 2,
660 }),
661 };
662 let bytes = prost_011::Message::encode_to_vec(&proto);
663 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
664 assert_eq!(decoded.keyed_rewards.len(), 1);
665 assert_eq!(decoded.num_partitions, Some(2));
666 }
667
668 #[test]
669 fn decodes_bincode_rewards() {
670 let pubkey = vote_program_id().to_string();
671 let reward = Reward {
672 pubkey,
673 lamports: 7,
674 post_balance: 9,
675 reward_type: Some(RewardType::Rent),
676 commission: Some(3),
677 };
678 let stored_rewards: StoredExtendedRewards = vec![reward.into()];
679 let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
680 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
681 assert_eq!(decoded.keyed_rewards.len(), 1);
682 assert_eq!(decoded.num_partitions, None);
683 }
684}
685
686#[derive(Debug, Clone)]
688pub struct TransactionData {
689 pub slot: u64,
691 pub transaction_slot_index: usize,
693 pub signature: solana_signature::Signature,
695 pub message_hash: Hash,
697 pub is_vote: bool,
699 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
701 pub transaction: VersionedTransaction,
703}
704
705#[derive(Debug, Clone)]
707pub struct EntryData {
708 pub slot: u64,
710 pub entry_index: usize,
712 pub transaction_indexes: Range<usize>,
714 pub num_hashes: u64,
716 pub hash: Hash,
718}
719
720#[derive(Debug, Clone)]
722pub struct RewardsData {
723 pub slot: u64,
725 pub rewards: Vec<(Address, RewardInfo)>,
727}
728
729#[derive(Debug)]
731pub enum BlockData {
732 Block {
734 parent_slot: u64,
736 parent_blockhash: Hash,
738 slot: u64,
740 blockhash: Hash,
742 rewards: KeyedRewardsAndNumPartitions,
744 block_time: Option<i64>,
746 block_height: Option<u64>,
748 executed_transaction_count: u64,
750 entry_count: u64,
752 },
753 PossibleLeaderSkipped {
756 slot: u64,
758 },
759}
760
761impl BlockData {
762 #[inline(always)]
764 pub const fn slot(&self) -> u64 {
765 match self {
766 BlockData::Block { slot, .. } => *slot,
767 BlockData::PossibleLeaderSkipped { slot } => *slot,
768 }
769 }
770
771 #[inline(always)]
773 pub const fn was_skipped(&self) -> bool {
774 matches!(self, BlockData::PossibleLeaderSkipped { .. })
775 }
776
777 #[inline(always)]
779 pub const fn block_time(&self) -> Option<i64> {
780 match self {
781 BlockData::Block { block_time, .. } => *block_time,
782 BlockData::PossibleLeaderSkipped { .. } => None,
783 }
784 }
785}
786
787type HandlerResult = Result<(), SharedError>;
788type HandlerFuture = BoxFuture<'static, HandlerResult>;
789
790pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
792
793impl<Data, F> Handler<Data> for F where
794 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
795{
796}
797
798pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
800pub type OnBlockFn = HandlerFn<BlockData>;
802pub type OnTxFn = HandlerFn<TransactionData>;
804pub type OnEntryFn = HandlerFn<EntryData>;
806pub type OnRewardFn = HandlerFn<RewardsData>;
808pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
810pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
812pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
814
815#[derive(Clone, Debug)]
817pub struct FirehoseErrorContext {
818 pub thread_id: usize,
820 pub slot: u64,
822 pub epoch: u64,
824 pub error_message: String,
826}
827
828#[inline]
833#[allow(clippy::too_many_arguments)]
834pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
835 threads: u64,
836 slot_range: Range<u64>,
837 on_block: Option<OnBlock>,
838 on_tx: Option<OnTransaction>,
839 on_entry: Option<OnEntry>,
840 on_rewards: Option<OnRewards>,
841 on_error: Option<OnError>,
842 stats_tracking: Option<StatsTracking<OnStats>>,
843 shutdown_signal: Option<broadcast::Receiver<()>>,
844) -> Result<(), (FirehoseError, u64)>
845where
846 OnBlock: Handler<BlockData>,
847 OnTransaction: Handler<TransactionData>,
848 OnEntry: Handler<EntryData>,
849 OnRewards: Handler<RewardsData>,
850 OnStats: Handler<Stats>,
851 OnError: Handler<FirehoseErrorContext>,
852{
853 if threads == 0 {
854 return Err((
855 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
856 slot_range.start,
857 ));
858 }
859 let client = crate::network::create_http_client();
860 log::info!(target: LOG_MODULE, "starting firehose...");
861 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
862
863 let slot_range = Arc::new(slot_range);
864
865 let subranges = generate_subranges(&slot_range, threads);
867 if threads > 1 {
868 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
869 }
870
871 let firehose_start = std::time::Instant::now();
872 let shutdown_flag = Arc::new(AtomicBool::new(false));
873 if let Some(ref rx) = shutdown_signal {
874 let mut rx = rx.resubscribe();
875 let flag = shutdown_flag.clone();
876 tokio::spawn(async move {
877 if rx.recv().await.is_ok() {
878 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
879 flag.store(true, Ordering::SeqCst);
880 }
881 });
882 }
883 let mut handles = Vec::new();
884 let error_counts: Arc<Vec<AtomicU32>> =
886 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
887
888 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
889 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
890 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
891 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
892 let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
893
894 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
895 let error_counts = error_counts.clone();
896 let client = client.clone();
897 let on_block = on_block.clone();
898 let on_tx = on_tx.clone();
899 let on_entry = on_entry.clone();
900 let on_reward = on_rewards.clone();
901 let on_error = on_error.clone();
902 let overall_slots_processed = overall_slots_processed.clone();
903 let overall_blocks_processed = overall_blocks_processed.clone();
904 let overall_transactions_processed = overall_transactions_processed.clone();
905 let overall_entries_processed = overall_entries_processed.clone();
906 let stats_tracking = stats_tracking.clone();
907 let transactions_since_stats = Arc::new(AtomicU64::new(0));
908 let blocks_since_stats = Arc::new(AtomicU64::new(0));
909 let slots_since_stats = Arc::new(AtomicU64::new(0));
910 let last_pulse = Arc::new(AtomicU64::new(0));
911 let transactions_since_stats_cloned = transactions_since_stats.clone();
912 let blocks_since_stats_cloned = blocks_since_stats.clone();
913 let slots_since_stats_cloned = slots_since_stats.clone();
914 let last_pulse_cloned = last_pulse.clone();
915 let shutdown_flag = shutdown_flag.clone();
916 let pending_skipped_slots = pending_skipped_slots.clone();
917 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
918
919 let handle = tokio::spawn(async move {
920 let transactions_since_stats = transactions_since_stats_cloned;
921 let blocks_since_stats = blocks_since_stats_cloned;
922 let slots_since_stats = slots_since_stats_cloned;
923 let last_pulse = last_pulse_cloned;
924 let mut shutdown_rx = thread_shutdown_rx;
925 let start_time = firehose_start;
926 last_pulse.store(
927 firehose_start.elapsed().as_nanos() as u64,
928 Ordering::Relaxed,
929 );
930 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
931 let mut skip_until_index = None;
932 let last_emitted_slot = slot_range.start.saturating_sub(1);
933 let block_enabled = on_block.is_some();
934 let tx_enabled = on_tx.is_some();
935 let entry_enabled = on_entry.is_some();
936 let reward_enabled = on_reward.is_some();
937 let tracking_enabled = stats_tracking.is_some();
938 if block_enabled {
939 pending_skipped_slots.entry(thread_index).or_default();
940 }
941 let mut last_counted_slot = slot_range.start.saturating_sub(1);
942 let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
943 let mut thread_stats = if tracking_enabled {
944 Some(ThreadStats {
945 thread_id: thread_index,
946 start_time,
947 finish_time: None,
948 slot_range: slot_range.clone(),
949 initial_slot_range: slot_range.clone(),
950 current_slot: slot_range.start,
951 slots_processed: 0,
952 blocks_processed: 0,
953 leader_skipped_slots: 0,
954 transactions_processed: 0,
955 entries_processed: 0,
956 rewards_processed: 0,
957 })
958 } else {
959 None
960 };
961
962 while let Err((err, slot)) = async {
964 let mut last_emitted_slot = last_emitted_slot_global;
965 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
966 log::info!(
967 target: &log_target,
968 "shutdown requested; terminating firehose thread {}",
969 thread_index
970 );
971 return Ok(());
972 }
973 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
974 log::info!(
975 target: &log_target,
976 "slot range: {} (epoch {}) ... {} (epoch {})",
977 slot_range.start,
978 slot_to_epoch(slot_range.start),
979 slot_range.end,
980 slot_to_epoch(slot_range.end)
981 );
982
983 log::info!(target: &log_target, "🚒 starting firehose...");
984
985 let mut current_slot: Option<u64> = None;
987 for epoch_num in epoch_range.clone() {
988 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
989 log::info!(
990 target: &log_target,
991 "shutdown requested; terminating firehose thread {}",
992 thread_index
993 );
994 return Ok(());
995 }
996 log::info!(target: &log_target, "entering epoch {}", epoch_num);
997 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
998 Ok(stream) => stream,
999 Err(_) => {
1000 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1001 }
1002 };
1003 let mut reader = NodeReader::new(stream);
1004
1005 let header_fut = reader.read_raw_header();
1006 let header = match timeout(OP_TIMEOUT, header_fut).await {
1007 Ok(res) => res
1008 .map_err(FirehoseError::ReadHeader)
1009 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1010 Err(_) => {
1011 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1012 }
1013 };
1014 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1015
1016 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1017 let local_start = std::cmp::max(slot_range.start, epoch_start);
1018 let local_end_inclusive =
1019 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1020 if local_start > local_end_inclusive {
1021 log::debug!(
1022 target: &log_target,
1023 "epoch {} has no overlap with thread range ({}..{}), skipping",
1024 epoch_num,
1025 slot_range.start,
1026 slot_range.end
1027 );
1028 continue;
1029 }
1030
1031 let mut previous_blockhash = Hash::default();
1032 let mut latest_entry_blockhash = Hash::default();
1033 last_counted_slot = local_start.saturating_sub(1);
1036 current_slot = None;
1037 if tracking_enabled
1038 && let Some(ref mut stats) = thread_stats {
1039 stats.current_slot = local_start;
1040 stats.slot_range.start = local_start;
1041 }
1042
1043 if local_start > epoch_start {
1044 let seek_slot = match timeout(
1047 OP_TIMEOUT,
1048 find_previous_indexed_slot(local_start, epoch_start, &log_target),
1049 )
1050 .await
1051 {
1052 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1053 Err(_) => {
1054 return Err((
1055 FirehoseError::OperationTimeout(
1056 "seek_to_previous_indexed_slot",
1057 ),
1058 current_slot.unwrap_or(slot_range.start),
1059 ));
1060 }
1061 };
1062 if let Some(seek_slot) = seek_slot {
1063 let seek_fut = reader.seek_to_slot(seek_slot);
1064 match timeout(OP_TIMEOUT, seek_fut).await {
1065 Ok(res) => {
1066 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1067 }
1068 Err(_) => {
1069 return Err((
1070 FirehoseError::OperationTimeout("seek_to_slot"),
1071 current_slot.unwrap_or(slot_range.start),
1072 ));
1073 }
1074 }
1075 }
1076 }
1077
1078 let mut item_index = 0;
1080 let mut displayed_skip_message = false;
1081 loop {
1082 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1083 log::info!(
1084 target: &log_target,
1085 "shutdown requested; terminating firehose thread {}",
1086 thread_index
1087 );
1088 return Ok(());
1089 }
1090 let read_fut = reader.read_until_block();
1091 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1092 Ok(result) => result
1093 .map_err(FirehoseError::ReadUntilBlockError)
1094 .map_err(|e| {
1095 (
1096 e,
1097 current_slot
1098 .map(|slot| slot.saturating_add(1))
1099 .unwrap_or(slot_range.start),
1100 )
1101 })?,
1102 Err(_) => {
1103 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1104 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1105 }
1106 };
1107 if nodes.is_empty() {
1108 log::info!(
1109 target: &log_target,
1110 "reached end of epoch {}",
1111 epoch_num
1112 );
1113 break;
1114 }
1115 if let Some(last_node) = nodes.0.last()
1116 && !last_node.get_node().is_block()
1117 {
1118 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1119 break;
1120 }
1121 let block = nodes
1122 .get_block()
1123 .map_err(FirehoseError::GetBlockError)
1124 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1125 log::debug!(
1126 target: &log_target,
1127 "read {} items from epoch {}, now at slot {}",
1128 item_index,
1129 epoch_num,
1130 block.slot
1131 );
1132 let slot = block.slot;
1133 if slot > local_end_inclusive {
1134 log::debug!(
1135 target: &log_target,
1136 "reached end of local slice at slot {} (epoch {}), stopping",
1137 slot,
1138 epoch_num
1139 );
1140 break;
1141 }
1142 if slot >= slot_range.end {
1143 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1144 if block_enabled {
1149 pending_skipped_slots.remove(&thread_index);
1150 }
1151 return Ok(());
1152 }
1153 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1154 if slot < slot_range.start {
1155 if slot.saturating_add(1) == slot_range.start {
1156 log::debug!(
1157 target: &log_target,
1158 "priming reader with preceding slot {}, skipping",
1159 slot
1160 );
1161 } else {
1162 log::warn!(
1163 target: &log_target,
1164 "encountered slot {} before start of range {}, skipping",
1165 slot,
1166 slot_range.start
1167 );
1168 }
1169 continue;
1170 }
1171 current_slot = Some(slot);
1172 let mut entry_index: usize = 0;
1173 let mut this_block_executed_transaction_count: u64 = 0;
1174 let mut this_block_entry_count: u64 = 0;
1175 let mut this_block_rewards = DecodedRewards::empty();
1176
1177 for node_with_cid in &nodes.0 {
1178 item_index += 1;
1179 if let Some(skip) = skip_until_index {
1180 if item_index < skip {
1181 if !displayed_skip_message {
1182 log::info!(
1183 target: &log_target,
1184 "skipping until index {} (at {})",
1185 skip,
1186 item_index
1187 );
1188 displayed_skip_message = true;
1189 }
1190 continue;
1191 } else {
1192 log::info!(
1193 target: &log_target,
1194 "reached target index {}, resuming...",
1195 skip
1196 );
1197 skip_until_index = None;
1198 }
1199 }
1200 let node = node_with_cid.get_node();
1201
1202 if let Some(ref mut stats) = thread_stats {
1203 stats.current_slot = slot;
1204 }
1205
1206 let error_slot = current_slot.unwrap_or(slot_range.start);
1207
1208 use crate::node::Node::*;
1209 match node {
1210 Transaction(tx) => {
1211 if tx_enabled
1212 && let Some(on_tx_cb) = on_tx.as_ref()
1213 {
1214 let error_slot = current_slot.unwrap_or(slot_range.start);
1215 let versioned_tx = tx.as_parsed().map_err(|err| {
1216 (
1217 FirehoseError::NodeDecodingError(item_index, err),
1218 error_slot,
1219 )
1220 })?;
1221 let reassembled_metadata = nodes
1222 .reassemble_dataframes(tx.metadata.clone())
1223 .map_err(|err| {
1224 (
1225 FirehoseError::NodeDecodingError(item_index, err),
1226 error_slot,
1227 )
1228 })?;
1229
1230 let as_native_metadata = decode_transaction_status_meta_from_frame(
1231 block.slot,
1232 reassembled_metadata,
1233 )
1234 .map_err(|err| {
1235 (
1236 FirehoseError::NodeDecodingError(item_index, err),
1237 error_slot,
1238 )
1239 })?;
1240
1241 let message_hash = {
1242 #[cfg(feature = "verify-transaction-signatures")]
1243 {
1244 versioned_tx.verify_and_hash_message().map_err(|err| {
1245 (
1246 FirehoseError::TransactionHandlerError(Box::new(err)),
1247 error_slot,
1248 )
1249 })?
1250 }
1251 #[cfg(not(feature = "verify-transaction-signatures"))]
1252 {
1253 versioned_tx.message.hash()
1254 }
1255 };
1256 let signature = versioned_tx
1257 .signatures
1258 .first()
1259 .ok_or_else(|| {
1260 Box::new(std::io::Error::new(
1261 std::io::ErrorKind::InvalidData,
1262 "transaction missing signature",
1263 )) as SharedError
1264 })
1265 .map_err(|err| {
1266 (
1267 FirehoseError::NodeDecodingError(
1268 item_index,
1269 err,
1270 ),
1271 error_slot,
1272 )
1273 })?;
1274 let is_vote = is_simple_vote_transaction(&versioned_tx);
1275
1276 on_tx_cb(
1277 thread_index,
1278 TransactionData {
1279 slot: block.slot,
1280 transaction_slot_index: tx.index.unwrap() as usize,
1281 signature: *signature,
1282 message_hash,
1283 is_vote,
1284 transaction_status_meta: as_native_metadata.clone(),
1285 transaction: versioned_tx.clone(),
1286 },
1287 )
1288 .await
1289 .map_err(|e| {
1290 (
1291 FirehoseError::TransactionHandlerError(e),
1292 error_slot,
1293 )
1294 })?;
1295 }
1296 fetch_add_if(
1297 tracking_enabled,
1298 &overall_transactions_processed,
1299 1,
1300 );
1301 if let Some(ref mut stats) = thread_stats {
1302 stats.transactions_processed += 1;
1303 }
1304 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1305 }
1306 Entry(entry) => {
1307 let entry_hash = Hash::from(entry.hash.to_bytes());
1308 let entry_transaction_count = entry.transactions.len();
1309 let entry_transaction_count_u64 = entry_transaction_count as u64;
1310 let starting_transaction_index_u64 =
1311 this_block_executed_transaction_count;
1312 latest_entry_blockhash = entry_hash;
1313 this_block_executed_transaction_count += entry_transaction_count_u64;
1314 this_block_entry_count += 1;
1315
1316 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1317 let starting_transaction_index = usize::try_from(
1318 starting_transaction_index_u64,
1319 )
1320 .map_err(|err| {
1321 (
1322 FirehoseError::EntryHandlerError(Box::new(err)),
1323 error_slot,
1324 )
1325 })?;
1326 let transaction_indexes_end =
1327 starting_transaction_index + entry_transaction_count;
1328 on_entry_cb(
1329 thread_index,
1330 EntryData {
1331 slot: block.slot,
1332 entry_index,
1333 transaction_indexes: starting_transaction_index
1334 ..transaction_indexes_end,
1335 num_hashes: entry.num_hashes,
1336 hash: entry_hash,
1337 },
1338 )
1339 .await
1340 .map_err(|e| {
1341 (
1342 FirehoseError::EntryHandlerError(e),
1343 error_slot,
1344 )
1345 })?;
1346 }
1347 entry_index += 1;
1348 fetch_add_if(
1349 tracking_enabled,
1350 &overall_entries_processed,
1351 1,
1352 );
1353 if let Some(ref mut stats) = thread_stats {
1354 stats.entries_processed += 1;
1355 }
1356 }
1357 Block(block) => {
1358 let prev_last_counted_slot = last_counted_slot;
1359 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1360 (
1361 stats.slots_processed,
1362 stats.blocks_processed,
1363 stats.leader_skipped_slots,
1364 stats.current_slot,
1365 )
1366 });
1367
1368 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1369 let skip_start_from_previous = last_counted_slot.saturating_add(1);
1370 let skip_start = skip_start_from_previous.max(next_expected_slot);
1371
1372 let skipped_epoch = slot_to_epoch(last_counted_slot);
1373 for skipped_slot in skip_start..slot {
1374 if slot_to_epoch(skipped_slot) != skipped_epoch {
1375 break;
1376 }
1377 log::debug!(
1378 target: &log_target,
1379 "leader skipped slot {} (prev_counted {}, current slot {})",
1380 skipped_slot,
1381 prev_last_counted_slot,
1382 slot,
1383 );
1384 if block_enabled {
1385 pending_skipped_slots
1386 .entry(thread_index)
1387 .or_default()
1388 .insert(skipped_slot);
1389 }
1390 if block_enabled
1391 && let Some(on_block_cb) = on_block.as_ref()
1392 && skipped_slot > last_emitted_slot {
1393 last_emitted_slot = skipped_slot;
1394 on_block_cb(
1395 thread_index,
1396 BlockData::PossibleLeaderSkipped {
1397 slot: skipped_slot,
1398 },
1399 )
1400 .await
1401 .map_err(|e| {
1402 (
1403 FirehoseError::BlockHandlerError(e),
1404 error_slot,
1405 )
1406 })?;
1407 }
1408 if tracking_enabled {
1409 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1410 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1411 if let Some(ref mut stats) = thread_stats {
1412 stats.leader_skipped_slots += 1;
1413 stats.slots_processed += 1;
1414 stats.current_slot = skipped_slot;
1415 }
1416 }
1417 last_counted_slot = skipped_slot;
1418 }
1419
1420 let cleared_pending_skip = if block_enabled {
1421 clear_pending_skip(
1422 &pending_skipped_slots,
1423 thread_index,
1424 slot,
1425 )
1426 } else {
1427 false
1428 };
1429
1430 if slot <= last_counted_slot && !cleared_pending_skip {
1431 log::debug!(
1432 target: &log_target,
1433 "duplicate block {}, already counted (last_counted={})",
1434 slot,
1435 last_counted_slot,
1436 );
1437 this_block_rewards = DecodedRewards::empty();
1438 continue;
1439 }
1440
1441 if block_enabled {
1442 if let Some(on_block_cb) = on_block.as_ref() {
1443 let DecodedRewards {
1444 keyed_rewards,
1445 num_partitions,
1446 } = std::mem::take(&mut this_block_rewards);
1447 if slot > last_emitted_slot {
1448 last_emitted_slot = slot;
1449 on_block_cb(
1450 thread_index,
1451 BlockData::Block {
1452 parent_slot: block.meta.parent_slot,
1453 parent_blockhash: previous_blockhash,
1454 slot: block.slot,
1455 blockhash: latest_entry_blockhash,
1456 rewards: KeyedRewardsAndNumPartitions {
1457 keyed_rewards,
1458 num_partitions,
1459 },
1460 block_time: Some(block.meta.blocktime as i64),
1461 block_height: block.meta.block_height,
1462 executed_transaction_count:
1463 this_block_executed_transaction_count,
1464 entry_count: this_block_entry_count,
1465 },
1466 )
1467 .await
1468 .map_err(|e| {
1469 (
1470 FirehoseError::BlockHandlerError(e),
1471 error_slot,
1472 )
1473 })?;
1474 }
1475 }
1476 } else {
1477 this_block_rewards = DecodedRewards::empty();
1478 }
1479 previous_blockhash = latest_entry_blockhash;
1480
1481 if tracking_enabled {
1482 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1483 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1484 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1485 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1486 if let Some(ref mut stats) = thread_stats {
1487 stats.blocks_processed += 1;
1488 stats.slots_processed += 1;
1489 stats.current_slot = slot;
1490 }
1491
1492 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1493 (&stats_tracking, thread_stats.as_mut())
1494 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1495 && let Err(err) = maybe_emit_stats(
1496 stats_tracking.as_ref(),
1497 thread_index,
1498 thread_stats_ref,
1499 &overall_slots_processed,
1500 &overall_blocks_processed,
1501 &overall_transactions_processed,
1502 &overall_entries_processed,
1503 &transactions_since_stats,
1504 &blocks_since_stats,
1505 &slots_since_stats,
1506 &last_pulse,
1507 start_time,
1508 )
1509 .await
1510 {
1511 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1512 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1513 overall_blocks_processed
1514 .fetch_sub(1, Ordering::Relaxed);
1515 overall_slots_processed
1516 .fetch_sub(1, Ordering::Relaxed);
1517 if let Some((
1518 prev_slots_processed,
1519 prev_blocks_processed,
1520 prev_leader_skipped,
1521 prev_current_slot,
1522 )) = thread_stats_snapshot
1523 {
1524 thread_stats_ref.slots_processed =
1525 prev_slots_processed;
1526 thread_stats_ref.blocks_processed =
1527 prev_blocks_processed;
1528 thread_stats_ref.leader_skipped_slots =
1529 prev_leader_skipped;
1530 thread_stats_ref.current_slot =
1531 prev_current_slot;
1532 }
1533 last_counted_slot = prev_last_counted_slot;
1534 return Err(err);
1535 }
1536 }
1537
1538 if slot > last_counted_slot {
1539 last_counted_slot = slot;
1540 }
1541 }
1542 Subset(_subset) => (),
1543 Epoch(_epoch) => (),
1544 Rewards(rewards) => {
1545 if reward_enabled || block_enabled {
1546 let reassembled = nodes
1547 .reassemble_dataframes(rewards.data.clone())
1548 .map_err(|err| {
1549 (
1550 FirehoseError::NodeDecodingError(item_index, err),
1551 current_slot.unwrap_or(slot_range.start),
1552 )
1553 })?;
1554 if reassembled.is_empty() {
1555 this_block_rewards = DecodedRewards::empty();
1556 if reward_enabled
1557 && let Some(on_reward_cb) = on_reward.as_ref()
1558 {
1559 on_reward_cb(
1560 thread_index,
1561 RewardsData {
1562 slot: block.slot,
1563 rewards: Vec::new(),
1564 },
1565 )
1566 .await
1567 .map_err(|e| {
1568 (
1569 FirehoseError::RewardHandlerError(e),
1570 error_slot,
1571 )
1572 })?;
1573 }
1574 continue;
1575 }
1576
1577 let decoded_rewards =
1578 decode_rewards_from_frame(block.slot, reassembled)
1579 .map_err(|err| {
1580 (
1581 FirehoseError::NodeDecodingError(
1582 item_index,
1583 err,
1584 ),
1585 error_slot,
1586 )
1587 })?;
1588 if reward_enabled
1589 && let Some(on_reward_cb) = on_reward.as_ref()
1590 {
1591 on_reward_cb(
1592 thread_index,
1593 RewardsData {
1594 slot: block.slot,
1595 rewards: decoded_rewards.keyed_rewards.clone(),
1596 },
1597 )
1598 .await
1599 .map_err(|e| {
1600 (
1601 FirehoseError::RewardHandlerError(e),
1602 error_slot,
1603 )
1604 })?;
1605 }
1606 this_block_rewards = decoded_rewards;
1607 if let Some(ref mut stats) = thread_stats {
1608 stats.rewards_processed +=
1609 this_block_rewards.keyed_rewards.len() as u64;
1610 }
1611 }
1612 }
1613 DataFrame(_data_frame) => (),
1614 }
1615 }
1616 if block.slot == slot_range.end - 1 {
1617 let finish_time = std::time::Instant::now();
1618 let elapsed = finish_time.duration_since(start_time);
1619 log::info!(target: &log_target, "processed slot {}", block.slot);
1620 let elapsed_pretty = human_readable_duration(elapsed);
1621 log::info!(
1622 target: &log_target,
1623 "processed {} slots across {} epochs in {}.",
1624 slot_range.end - slot_range.start,
1625 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1626 elapsed_pretty
1627 );
1628 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1629 let summary: String = error_counts
1632 .iter()
1633 .enumerate()
1634 .filter_map(|(i, c)| {
1635 let v = c.load(Ordering::Relaxed);
1636 if v > 0 {
1637 Some(format!("{:03}({})", i, v))
1638 } else {
1639 None
1640 }
1641 })
1642 .collect::<Vec<_>>()
1643 .join(", ");
1644 if !summary.is_empty() {
1645 log::debug!(target: &log_target, "threads with errors: {}", summary);
1646 }
1647 return Ok(());
1648 }
1649 }
1650 if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1651 && last_counted_slot < expected_last_slot
1652 {
1653 }
1656 if let Some(ref mut stats) = thread_stats {
1657 stats.finish_time = Some(std::time::Instant::now());
1658 maybe_emit_stats(
1659 stats_tracking.as_ref(),
1660 thread_index,
1661 stats,
1662 &overall_slots_processed,
1663 &overall_blocks_processed,
1664 &overall_transactions_processed,
1665 &overall_entries_processed,
1666 &transactions_since_stats,
1667 &blocks_since_stats,
1668 &slots_since_stats,
1669 &last_pulse,
1670 start_time,
1671 )
1672 .await?;
1673 }
1674 if block_enabled {
1675 pending_skipped_slots.remove(&thread_index);
1676 }
1677 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1678 }
1679 Ok(())
1680 }
1681 .await
1682 {
1683 if is_shutdown_error(&err) {
1684 log::info!(
1685 target: &log_target,
1686 "shutdown requested; terminating firehose thread {}",
1687 thread_index
1688 );
1689 break;
1690 }
1691 let epoch = slot_to_epoch(slot);
1692 let item_index = match &err {
1693 FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1694 _ => 0,
1695 };
1696 let error_message = err.to_string();
1697 log::error!(
1698 target: &log_target,
1699 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1700 slot,
1701 epoch
1702 );
1703 log::error!(target: &log_target, "{}", error_message);
1704 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1705 || error_message.contains("Unknown CID version")
1706 {
1707 SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1710 }
1711 if let Some(on_error_cb) = on_error.clone() {
1712 let context = FirehoseErrorContext {
1713 thread_id: thread_index,
1714 slot,
1715 epoch,
1716 error_message: error_message.clone(),
1717 };
1718 if let Err(handler_err) = on_error_cb(thread_index, context).await {
1719 log::error!(
1720 target: &log_target,
1721 "on_error handler failed: {}",
1722 handler_err
1723 );
1724 }
1725 }
1726 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1728 log::warn!(
1729 target: &log_target,
1730 "restarting from slot {} at index {}",
1731 slot,
1732 item_index,
1733 );
1734 if slot <= last_counted_slot {
1738 slot_range.start = last_counted_slot.saturating_add(1);
1739 } else {
1740 slot_range.start = slot;
1741 }
1742 last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1744 if tracking_enabled
1745 && let Some(ref mut stats_ref) = thread_stats {
1746 stats_ref.slot_range.start = slot_range.start;
1747 stats_ref.slot_range.end = slot_range.end;
1748 }
1750 if block_enabled {
1751 pending_skipped_slots.remove(&thread_index);
1752 }
1753 skip_until_index = None;
1757 last_emitted_slot_global = last_emitted_slot;
1758 }
1759 });
1760 handles.push(handle);
1761 }
1762
1763 for handle in handles {
1765 handle.await.unwrap();
1766 }
1767 if stats_tracking.is_some() {
1768 let elapsed = firehose_start.elapsed();
1769 let elapsed_secs = elapsed.as_secs_f64();
1770 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1771 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1772 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1773 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1774 let total_errors: u64 = error_counts
1775 .iter()
1776 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1777 .sum();
1778 let overall_tps = if elapsed_secs > 0.0 {
1779 total_transactions as f64 / elapsed_secs
1780 } else {
1781 0.0
1782 };
1783 log::info!(
1784 target: LOG_MODULE,
1785 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1786 elapsed_secs,
1787 total_slots,
1788 total_blocks,
1789 total_leader_skipped,
1790 total_transactions,
1791 overall_tps,
1792 total_errors
1793 );
1794 }
1795 if shutdown_flag.load(Ordering::SeqCst) {
1796 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1797 } else {
1798 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1799 }
1800 Ok(())
1801}
1802
1803#[allow(clippy::result_large_err)]
1804pub fn firehose_geyser(
1811 rt: Arc<tokio::runtime::Runtime>,
1812 slot_range: Range<u64>,
1813 geyser_config_files: Option<&[PathBuf]>,
1814 index_base_url: &Url,
1815 client: &Client,
1816 on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1817 threads: u64,
1818) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1819 if threads == 0 {
1820 return Err((
1821 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1822 slot_range.start,
1823 ));
1824 }
1825 log::info!(target: LOG_MODULE, "starting firehose...");
1826 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1827 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1828 let mut entry_notifier_maybe = None;
1829 let mut block_meta_notifier_maybe = None;
1830 let mut transaction_notifier_maybe = None;
1831 if let Some(geyser_config_files) = geyser_config_files {
1832 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1833
1834 let service =
1835 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1836 confirmed_bank_receiver.clone(),
1837 true,
1838 geyser_config_files,
1839 )
1840 .map_err(|e| (e.into(), slot_range.start))?;
1841
1842 transaction_notifier_maybe = Some(
1843 service
1844 .get_transaction_notifier()
1845 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1846 .map_err(|e| (e, slot_range.start))?,
1847 );
1848
1849 entry_notifier_maybe = service.get_entry_notifier();
1850 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1851
1852 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1853 }
1854
1855 if entry_notifier_maybe.is_some() {
1856 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1857 } else {
1858 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1859 }
1860 log::info!(target: LOG_MODULE, "running on_load...");
1861 rt.spawn(on_load);
1862
1863 let slot_range = Arc::new(slot_range);
1864 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1865 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1866 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1867 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1868
1869 let subranges = generate_subranges(&slot_range, threads);
1871 if threads > 1 {
1872 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1873 }
1874
1875 let mut handles = Vec::new();
1876 let error_counts: Arc<Vec<AtomicU32>> =
1878 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1879
1880 for (i, slot_range) in subranges.into_iter().enumerate() {
1881 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1882 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1883 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1884 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1885 let client = client.clone();
1886 let error_counts = error_counts.clone();
1887
1888 let rt_clone = rt.clone();
1889
1890 let handle = std::thread::spawn(move || {
1891 rt_clone.block_on(async {
1892 firehose_geyser_thread(
1893 slot_range,
1894 transaction_notifier_maybe,
1895 entry_notifier_maybe,
1896 block_meta_notifier_maybe,
1897 confirmed_bank_sender,
1898 &client,
1899 if threads > 1 { Some(i) } else { None },
1900 error_counts,
1901 )
1902 .await
1903 .unwrap();
1904 });
1905 });
1906 handles.push(handle);
1907 }
1908
1909 for handle in handles {
1911 handle.join().unwrap();
1912 }
1913 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1914 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1915 block_meta_notifier.notify_block_metadata(
1916 u64::MAX,
1917 "unload",
1918 u64::MAX,
1919 "unload",
1920 &KeyedRewardsAndNumPartitions {
1921 keyed_rewards: vec![],
1922 num_partitions: None,
1923 },
1924 None,
1925 None,
1926 0,
1927 0,
1928 );
1929 }
1930 Ok(confirmed_bank_receiver)
1931}
1932
1933#[allow(clippy::too_many_arguments)]
1934#[allow(clippy::result_large_err)]
1935async fn firehose_geyser_thread(
1936 mut slot_range: Range<u64>,
1937 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1938 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1939 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1940 confirmed_bank_sender: Sender<SlotNotification>,
1941 client: &Client,
1942 thread_index: Option<usize>,
1943 error_counts: Arc<Vec<AtomicU32>>,
1944) -> Result<(), (FirehoseError, u64)> {
1945 let start_time = std::time::Instant::now();
1946 let log_target = if let Some(thread_index) = thread_index {
1947 format!("{}::T{:03}", LOG_MODULE, thread_index)
1948 } else {
1949 LOG_MODULE.to_string()
1950 };
1951 let initial_slot_range = slot_range.clone();
1952 let mut skip_until_index = None;
1953 let mut last_counted_slot = slot_range.start.saturating_sub(1);
1954 while let Err((err, slot)) = async {
1956 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1957 log::info!(
1958 target: &log_target,
1959 "slot range: {} (epoch {}) ... {} (epoch {})",
1960 slot_range.start,
1961 slot_to_epoch(slot_range.start),
1962 slot_range.end,
1963 slot_to_epoch(slot_range.end)
1964 );
1965
1966 log::info!(target: &log_target, "🚒 starting firehose...");
1967
1968 let mut current_slot: Option<u64> = None;
1970 for epoch_num in epoch_range.clone() {
1971 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1972 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1973 Ok(stream) => stream,
1974 Err(_) => {
1975 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1976 }
1977 };
1978 let mut reader = NodeReader::new(stream);
1979
1980 let header_fut = reader.read_raw_header();
1981 let header = match timeout(OP_TIMEOUT, header_fut).await {
1982 Ok(res) => res
1983 .map_err(FirehoseError::ReadHeader)
1984 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1985 Err(_) => {
1986 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1987 }
1988 };
1989 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1990
1991 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1992 let local_start = std::cmp::max(slot_range.start, epoch_start);
1993 let local_end_inclusive =
1994 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1995 if local_start > local_end_inclusive {
1996 log::debug!(
1997 target: &log_target,
1998 "epoch {} has no overlap with thread range ({}..{}), skipping",
1999 epoch_num,
2000 slot_range.start,
2001 slot_range.end
2002 );
2003 continue;
2004 }
2005
2006 let mut todo_previous_blockhash = Hash::default();
2007 let mut todo_latest_entry_blockhash = Hash::default();
2008 last_counted_slot = local_start.saturating_sub(1);
2011 current_slot = None;
2012
2013 if local_start > epoch_start {
2014 let seek_slot = match timeout(
2017 OP_TIMEOUT,
2018 find_previous_indexed_slot(local_start, epoch_start, &log_target),
2019 )
2020 .await
2021 {
2022 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2023 Err(_) => {
2024 return Err((
2025 FirehoseError::OperationTimeout(
2026 "seek_to_previous_indexed_slot",
2027 ),
2028 current_slot.unwrap_or(slot_range.start),
2029 ));
2030 }
2031 };
2032 if let Some(seek_slot) = seek_slot {
2033 let seek_fut = reader.seek_to_slot(seek_slot);
2034 match timeout(OP_TIMEOUT, seek_fut).await {
2035 Ok(res) => {
2036 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2037 }
2038 Err(_) => {
2039 return Err((
2040 FirehoseError::OperationTimeout("seek_to_slot"),
2041 current_slot.unwrap_or(slot_range.start),
2042 ));
2043 }
2044 }
2045 }
2046 }
2047
2048 let mut item_index = 0;
2050 let mut displayed_skip_message = false;
2051 loop {
2052 let read_fut = reader.read_until_block();
2053 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2054 Ok(result) => result
2055 .map_err(FirehoseError::ReadUntilBlockError)
2056 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2057 Err(_) => {
2058 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2059 let restart_slot =
2060 current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2061 return Err((
2062 FirehoseError::OperationTimeout("read_until_block"),
2063 restart_slot,
2064 ));
2065 }
2066 };
2067 if nodes.is_empty() {
2068 log::info!(
2069 target: &log_target,
2070 "reached end of epoch {}",
2071 epoch_num
2072 );
2073 break;
2074 }
2075 if let Some(last_node) = nodes.0.last()
2084 && !last_node.get_node().is_block() {
2085 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2086 break;
2087 }
2088 let block = nodes
2089 .get_block()
2090 .map_err(FirehoseError::GetBlockError)
2091 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2092 log::debug!(
2093 target: &log_target,
2094 "read {} items from epoch {}, now at slot {}",
2095 item_index,
2096 epoch_num,
2097 block.slot
2098 );
2099 let slot = block.slot;
2100 if slot > local_end_inclusive {
2101 log::debug!(
2102 target: &log_target,
2103 "reached end of local slice at slot {} (epoch {}), stopping",
2104 slot,
2105 epoch_num
2106 );
2107 break;
2108 }
2109 if slot >= slot_range.end {
2110 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2111 return Ok(());
2115 }
2116 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2117 if slot < local_start {
2118 if slot.saturating_add(1) == local_start {
2119 log::debug!(
2120 target: &log_target,
2121 "priming reader with preceding slot {}, skipping",
2122 slot
2123 );
2124 } else {
2125 log::warn!(
2126 target: &log_target,
2127 "encountered slot {} before start of range {}, skipping",
2128 slot,
2129 local_start
2130 );
2131 }
2132 continue;
2133 }
2134 current_slot = Some(slot);
2135 let mut entry_index: usize = 0;
2136 let mut this_block_executed_transaction_count: u64 = 0;
2137 let mut this_block_entry_count: u64 = 0;
2138 let mut this_block_rewards = DecodedRewards::empty();
2139
2140 if slot <= last_counted_slot {
2141 log::debug!(
2142 target: &log_target,
2143 "duplicate block {}, already counted (last_counted={})",
2144 slot,
2145 last_counted_slot,
2146 );
2147 continue;
2148 }
2149
2150 nodes.each(|node_with_cid| -> Result<(), SharedError> {
2151 item_index += 1;
2152 if let Some(skip) = skip_until_index {
2158 if item_index < skip {
2159 if !displayed_skip_message {
2160 log::info!(
2161 target: &log_target,
2162 "skipping until index {} (at {})",
2163 skip,
2164 item_index
2165 );
2166 displayed_skip_message = true;
2167 }
2168 return Ok(());
2169 } else {
2170 log::info!(
2171 target: &log_target,
2172 "reached target index {}, resuming...",
2173 skip
2174 );
2175 skip_until_index = None;
2176 }
2177 }
2178 let node = node_with_cid.get_node();
2179
2180 use crate::node::Node::*;
2181 match node {
2182 Transaction(tx) => {
2183 let versioned_tx = tx.as_parsed()?;
2184 let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
2185
2186 let as_native_metadata = decode_transaction_status_meta_from_frame(
2187 block.slot,
2188 reassembled_metadata,
2189 )?;
2190
2191 let message_hash = {
2192 #[cfg(feature = "verify-transaction-signatures")]
2193 {
2194 versioned_tx.verify_and_hash_message()?
2195 }
2196 #[cfg(not(feature = "verify-transaction-signatures"))]
2197 {
2198 versioned_tx.message.hash()
2201 }
2202 };
2203 let signature = versioned_tx
2204 .signatures
2205 .first()
2206 .ok_or_else(|| {
2207 Box::new(std::io::Error::new(
2208 std::io::ErrorKind::InvalidData,
2209 "transaction missing signature",
2210 )) as SharedError
2211 })?;
2212 let is_vote = is_simple_vote_transaction(&versioned_tx);
2213
2214 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2215 transaction_notifier.notify_transaction(
2216 block.slot,
2217 tx.index.unwrap() as usize,
2218 signature,
2219 &message_hash,
2220 is_vote,
2221 &as_native_metadata,
2222 &versioned_tx,
2223 );
2224 }
2225
2226 }
2227 Entry(entry) => {
2228 let entry_hash = Hash::from(entry.hash.to_bytes());
2229 let entry_transaction_count = entry.transactions.len();
2230 let entry_transaction_count_u64 = entry_transaction_count as u64;
2231 let starting_transaction_index =
2232 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2233 Box::new(std::io::Error::other(
2234 "transaction index exceeds usize range",
2235 )) as SharedError
2236 })?;
2237 todo_latest_entry_blockhash = entry_hash;
2238 this_block_executed_transaction_count += entry_transaction_count_u64;
2239 this_block_entry_count += 1;
2240 if entry_notifier_maybe.is_none() {
2241 return Ok(());
2242 }
2243 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2244 let entry_summary = solana_entry::entry::EntrySummary {
2245 num_hashes: entry.num_hashes,
2246 hash: Hash::from(entry.hash.to_bytes()),
2247 num_transactions: entry_transaction_count_u64,
2248 };
2249 entry_notifier.notify_entry(
2250 block.slot,
2251 entry_index,
2252 &entry_summary,
2253 starting_transaction_index,
2254 );
2255 entry_index += 1;
2256 }
2257 Block(block) => {
2258 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2259 confirmed_bank_sender.send(notification).unwrap();
2260
2261 if block_meta_notifier_maybe.is_none() {
2262 last_counted_slot = block.slot;
2263 return Ok(());
2264 }
2265 let DecodedRewards {
2266 keyed_rewards,
2267 num_partitions,
2268 } = std::mem::take(&mut this_block_rewards);
2269 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2270 block_meta_notifier.notify_block_metadata(
2271 block.meta.parent_slot,
2272 todo_previous_blockhash.to_string().as_str(),
2273 block.slot,
2274 todo_latest_entry_blockhash.to_string().as_str(),
2275 &KeyedRewardsAndNumPartitions {
2276 keyed_rewards,
2277 num_partitions,
2278 },
2279 Some(block.meta.blocktime as i64),
2280 block.meta.block_height,
2281 this_block_executed_transaction_count,
2282 this_block_entry_count,
2283 );
2284 todo_previous_blockhash = todo_latest_entry_blockhash;
2285 last_counted_slot = block.slot;
2286 std::thread::yield_now();
2287 }
2288 Subset(_subset) => (),
2289 Epoch(_epoch) => (),
2290 Rewards(rewards) => {
2291 let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
2292 if !reassembled.is_empty() {
2293 this_block_rewards = decode_rewards_from_frame(
2294 block.slot,
2295 reassembled,
2296 )?;
2297 } else {
2298 this_block_rewards = DecodedRewards::empty();
2299 }
2300 }
2301 DataFrame(_data_frame) => (),
2302 }
2303 Ok(())
2304 })
2305 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2306 if block.slot == slot_range.end - 1 {
2307 let finish_time = std::time::Instant::now();
2308 let elapsed = finish_time.duration_since(start_time);
2309 log::info!(target: &log_target, "processed slot {}", block.slot);
2310 let elapsed_pretty = human_readable_duration(elapsed);
2311 log::info!(
2312 target: &log_target,
2313 "processed {} slots across {} epochs in {}.",
2314 initial_slot_range.end - initial_slot_range.start,
2315 slot_to_epoch(initial_slot_range.end)
2316 + 1
2317 - slot_to_epoch(initial_slot_range.start),
2318 elapsed_pretty
2319 );
2320 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2321 let summary: String = error_counts
2324 .iter()
2325 .enumerate()
2326 .filter_map(|(i, c)| {
2327 let v = c.load(Ordering::Relaxed);
2328 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2329 })
2330 .collect::<Vec<_>>()
2331 .join(", ");
2332 if !summary.is_empty() {
2333 log::debug!(target: &log_target, "threads with errors: {}", summary);
2334 }
2335 return Ok(());
2336 }
2337 }
2338 }
2339 Ok(())
2340}
2341.await
2342{
2343 if is_shutdown_error(&err) {
2344 log::info!(
2345 target: &log_target,
2346 "shutdown requested; terminating firehose thread {:?}",
2347 thread_index
2348 );
2349 return Ok(());
2350 }
2351 log::error!(
2352 target: &log_target,
2353 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2354 slot,
2355 slot_to_epoch(slot)
2356 );
2357 log::error!(target: &log_target, "{}", err);
2358 let error_message = err.to_string();
2359 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2360 || error_message.contains("Unknown CID version")
2361 {
2362 SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2365 }
2366 let item_index = match err {
2367 FirehoseError::NodeDecodingError(item_index, _) => item_index,
2368 _ => 0,
2369 };
2370 let idx = thread_index.unwrap_or(0);
2372 error_counts[idx].fetch_add(1, Ordering::Relaxed);
2373 log::warn!(
2374 target: &log_target,
2375 "restarting from slot {} at index {}",
2376 slot,
2377 item_index,
2378 );
2379 if slot <= last_counted_slot {
2382 slot_range.start = last_counted_slot.saturating_add(1);
2383 } else {
2384 slot_range.start = slot;
2385 }
2386 skip_until_index = None;
2390}
2391 Ok(())
2392}
2393
2394#[inline]
2395fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2396 if !(1..=2).contains(&versioned_tx.signatures.len()) {
2397 return false;
2398 }
2399
2400 if !matches!(
2401 versioned_tx.version(),
2402 solana_transaction::versioned::TransactionVersion::Legacy(_)
2403 ) {
2404 return false;
2405 }
2406
2407 let instructions = versioned_tx.message.instructions();
2408 if instructions.len() != 1 {
2409 return false;
2410 }
2411
2412 let program_index = instructions[0].program_id_index as usize;
2413 versioned_tx
2414 .message
2415 .static_account_keys()
2416 .get(program_index)
2417 .map(|program_id| program_id == &vote_program_id())
2418 .unwrap_or(false)
2419}
2420
2421#[inline(always)]
2422fn convert_proto_rewards(
2423 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2424) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2425 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2426 for proto_reward in proto_rewards.rewards.iter() {
2427 let reward = RewardInfo {
2428 reward_type: match proto_reward.reward_type - 1 {
2429 0 => RewardType::Fee,
2430 1 => RewardType::Rent,
2431 2 => RewardType::Staking,
2432 3 => RewardType::Voting,
2433 typ => {
2434 return Err(Box::new(std::io::Error::other(format!(
2435 "unsupported reward type {}",
2436 typ
2437 ))));
2438 }
2439 },
2440 lamports: proto_reward.lamports,
2441 post_balance: proto_reward.post_balance,
2442 commission: proto_reward.commission.parse::<u8>().ok(),
2443 };
2444 let pubkey = proto_reward
2445 .pubkey
2446 .parse::<Address>()
2447 .map_err(|err| Box::new(err) as SharedError)?;
2448 keyed_rewards.push((pubkey, reward));
2449 }
2450 Ok(keyed_rewards)
2451}
2452
2453#[inline]
2454pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2456 let total = slot_range.end - slot_range.start;
2457 let slots_per_thread = total / threads;
2458 let remainder = total % threads;
2459
2460 let ranges: Vec<Range<u64>> = (0..threads)
2461 .map(|i| {
2462 let extra_slot = if i < remainder { 1 } else { 0 };
2464 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2465 let end = start + slots_per_thread + extra_slot;
2466 start..end
2467 })
2468 .collect();
2469
2470 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2472 assert_eq!(
2473 total_covered, total,
2474 "Range generation failed: {} threads should cover {} slots but only cover {}",
2475 threads, total, total_covered
2476 );
2477
2478 for i in 1..ranges.len() {
2480 assert_eq!(
2481 ranges[i - 1].end,
2482 ranges[i].start,
2483 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2484 i - 1,
2485 ranges[i - 1].end,
2486 i,
2487 ranges[i].start
2488 );
2489 }
2490
2491 log::info!(
2492 target: LOG_MODULE,
2493 "Generated {} thread ranges covering {} slots total",
2494 threads,
2495 total_covered
2496 );
2497 ranges
2498}
2499
2500fn human_readable_duration(duration: std::time::Duration) -> String {
2501 if duration.is_zero() {
2502 return "0s".into();
2503 }
2504 let total_secs = duration.as_secs();
2505 if total_secs < 60 {
2506 let secs_f = duration.as_secs_f64();
2507 if total_secs == 0 {
2508 format!("{:.2}s", secs_f)
2509 } else if duration.subsec_millis() == 0 {
2510 format!("{}s", total_secs)
2511 } else {
2512 format!("{:.2}s", secs_f)
2513 }
2514 } else {
2515 let mut secs = total_secs;
2516 let days = secs / 86_400;
2517 secs %= 86_400;
2518 let hours = secs / 3_600;
2519 secs %= 3_600;
2520 let minutes = secs / 60;
2521 secs %= 60;
2522 if days > 0 {
2523 if hours > 0 {
2524 format!("{days}d{hours}h")
2525 } else {
2526 format!("{days}d")
2527 }
2528 } else if hours > 0 {
2529 if minutes > 0 {
2530 format!("{hours}h{minutes}m")
2531 } else {
2532 format!("{hours}h")
2533 }
2534 } else if minutes > 0 {
2535 if secs > 0 {
2536 format!("{minutes}m{secs}s")
2537 } else {
2538 format!("{minutes}m")
2539 }
2540 } else {
2541 format!("{secs}s")
2542 }
2543 }
2544}
2545
2546#[cfg(test)]
2547fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2548 Box::pin(async move {
2549 let elapsed = stats.start_time.elapsed();
2550 let elapsed_secs = elapsed.as_secs_f64();
2551 let tps = if elapsed_secs > 0.0 {
2552 stats.transactions_processed as f64 / elapsed_secs
2553 } else {
2554 0.0
2555 };
2556 log::info!(
2557 target: LOG_MODULE,
2558 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2559 stats.thread_stats.current_slot,
2560 stats.slots_processed,
2561 stats.blocks_processed,
2562 stats.transactions_processed,
2563 stats.entries_processed,
2564 stats.rewards_processed,
2565 elapsed_secs,
2566 tps
2567 );
2568 Ok(())
2569 })
2570}
2571
2572#[cfg(test)]
2573use futures_util::FutureExt;
2574#[cfg(test)]
2575use serial_test::serial;
2576#[cfg(test)]
2577use std::sync::{Mutex, OnceLock};
2578
2579#[cfg(test)]
2580async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2581 use std::sync::Arc;
2582 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2583
2584 let found = Arc::new(AtomicBool::new(false));
2585 let observed_total = Arc::new(AtomicU64::new(0));
2586 let observed_non_vote = Arc::new(AtomicU64::new(0));
2587
2588 let found_block = found.clone();
2589 let observed_total_block = observed_total.clone();
2590 let target_slot_block = slot;
2591 let target_slot_tx = slot;
2592 let observed_non_vote_tx = observed_non_vote.clone();
2593
2594 firehose(
2595 1,
2596 target_slot_block..(target_slot_block + 1),
2597 Some(move |_thread_id: usize, block: BlockData| {
2598 let found_block = found_block.clone();
2599 let observed_total_block = observed_total_block.clone();
2600 async move {
2601 if block.slot() == target_slot_block {
2602 assert!(
2603 !block.was_skipped(),
2604 "slot {target_slot_block} was marked leader skipped",
2605 );
2606 if let BlockData::Block {
2607 executed_transaction_count,
2608 ..
2609 } = block
2610 {
2611 found_block.store(true, Ordering::Relaxed);
2612 observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2613 }
2614 }
2615 Ok(())
2616 }
2617 .boxed()
2618 }),
2619 Some(move |_thread_id: usize, transaction: TransactionData| {
2620 let observed_non_vote_tx = observed_non_vote_tx.clone();
2621 async move {
2622 if transaction.slot == target_slot_tx && !transaction.is_vote {
2623 observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2624 }
2625 Ok(())
2626 }
2627 .boxed()
2628 }),
2629 None::<OnEntryFn>,
2630 None::<OnRewardFn>,
2631 None::<OnErrorFn>,
2632 None::<OnStatsTrackingFn>,
2633 None,
2634 )
2635 .await
2636 .unwrap();
2637
2638 assert!(
2639 found.load(Ordering::Relaxed),
2640 "target slot {slot} was not processed"
2641 );
2642 let observed_total = observed_total.load(Ordering::Relaxed);
2643 let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2644 assert!(
2645 observed_total > 0,
2646 "slot {slot} executed transaction count was zero"
2647 );
2648 assert!(
2649 observed_total >= min_executed,
2650 "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2651 );
2652 log::info!(
2653 target: LOG_MODULE,
2654 "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2655 observed_total,
2656 observed_non_vote
2657 );
2658}
2659
2660#[cfg(test)]
2661async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2662 use crate::index::slot_to_offset;
2663 use crate::node::Node;
2664
2665 let epoch = slot_to_epoch(slot);
2666 let client = crate::network::create_http_client();
2667 let stream = fetch_epoch_stream(epoch, &client).await;
2668 let mut reader = NodeReader::new(stream);
2669 reader
2670 .seek_to_slot(slot)
2671 .await
2672 .map_err(|err| Box::new(err) as SharedError)?;
2673
2674 let nodes = reader.read_until_block().await?;
2675 let mut transactions = 0u64;
2676 let mut entries = 0u64;
2677 let mut entry_tx_total = 0u64;
2678 let mut dataframes = 0u64;
2679 let mut rewards = 0u64;
2680 let mut subsets = 0u64;
2681 let mut epochs = 0u64;
2682 let mut block_slot = None;
2683 let mut block_entries = None;
2684 let first_kind = nodes
2685 .0
2686 .first()
2687 .map(|node| node.get_node())
2688 .map(|node| match node {
2689 Node::Transaction(_) => "transaction",
2690 Node::Entry(_) => "entry",
2691 Node::Block(_) => "block",
2692 Node::Subset(_) => "subset",
2693 Node::Epoch(_) => "epoch",
2694 Node::Rewards(_) => "rewards",
2695 Node::DataFrame(_) => "dataframe",
2696 })
2697 .unwrap_or("none");
2698
2699 for node in &nodes.0 {
2700 match node.get_node() {
2701 Node::Transaction(_) => {
2702 transactions += 1;
2703 }
2704 Node::Entry(entry) => {
2705 entries += 1;
2706 entry_tx_total += entry.transactions.len() as u64;
2707 }
2708 Node::Block(block) => {
2709 block_slot = Some(block.slot);
2710 block_entries = Some(block.entries.len());
2711 }
2712 Node::Subset(_) => {
2713 subsets += 1;
2714 }
2715 Node::Epoch(_) => {
2716 epochs += 1;
2717 }
2718 Node::Rewards(_) => {
2719 rewards += 1;
2720 }
2721 Node::DataFrame(_) => {
2722 dataframes += 1;
2723 }
2724 }
2725 }
2726
2727 log::info!(
2728 target: LOG_MODULE,
2729 "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2730 nodes.len(),
2731 first_kind,
2732 transactions,
2733 entries,
2734 entry_tx_total,
2735 block_slot,
2736 block_entries,
2737 dataframes,
2738 rewards,
2739 subsets,
2740 epochs
2741 );
2742
2743 if slot > 0 {
2744 let mut found_previous = None;
2745 for delta in 1..=5 {
2746 let candidate = slot.saturating_sub(delta);
2747 match slot_to_offset(candidate).await {
2748 Ok(offset) => {
2749 found_previous = Some((candidate, offset));
2750 break;
2751 }
2752 Err(err) => {
2753 log::info!(
2754 target: LOG_MODULE,
2755 "slot {slot} previous lookup {candidate} failed: {err}"
2756 );
2757 }
2758 }
2759 }
2760 if let Some((candidate, offset)) = found_previous {
2761 log::info!(
2762 target: LOG_MODULE,
2763 "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2764 );
2765 } else {
2766 log::info!(
2767 target: LOG_MODULE,
2768 "slot {slot} no previous offsets found within 5 slots"
2769 );
2770 }
2771 }
2772
2773 Ok(())
2774}
2775
2776#[tokio::test(flavor = "multi_thread")]
2777async fn test_firehose_epoch_800() {
2778 use dashmap::DashSet;
2779 use std::sync::atomic::{AtomicU64, Ordering};
2780 solana_logger::setup_with_default("info");
2781 const THREADS: usize = 4;
2782 const NUM_SLOTS_TO_COVER: u64 = 50;
2783 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2784 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2785 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2786 static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2787 static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2788 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2789 let stats_tracking = StatsTracking {
2790 on_stats: log_stats_handler,
2791 tracking_interval_slots: 10,
2792 };
2793
2794 for prev in PREV_BLOCK.iter() {
2795 prev.store(0, Ordering::Relaxed);
2796 }
2797 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2798 NUM_BLOCKS.store(0, Ordering::Relaxed);
2799 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2800 SEEN_SLOTS.get_or_init(DashSet::new).clear();
2801 SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2802
2803 firehose(
2804 THREADS.try_into().unwrap(),
2805 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2806 Some(|thread_id: usize, block: BlockData| {
2807 async move {
2808 let _prev =
2809 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2810 if block.was_skipped() {
2811 log::info!(
2812 target: LOG_MODULE,
2813 "leader skipped block {} on thread {}",
2814 block.slot(),
2815 thread_id,
2816 );
2817 } else {
2818 }
2825
2826 let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2827 if block.was_skipped() {
2828 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2829 SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2830 } else {
2831 if first_time {
2832 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2833 if let BlockData::Block {
2834 executed_transaction_count,
2835 ..
2836 } = &block
2837 {
2838 let executed = *executed_transaction_count;
2839 let _ = MIN_TRANSACTIONS.fetch_update(
2840 Ordering::Relaxed,
2841 Ordering::Relaxed,
2842 |current| {
2843 if executed < current {
2844 Some(executed)
2845 } else {
2846 None
2847 }
2848 },
2849 );
2850 }
2851 }
2852 }
2853 Ok(())
2854 }
2855 .boxed()
2856 }),
2857 None::<OnTxFn>,
2858 None::<OnEntryFn>,
2859 None::<OnRewardFn>,
2860 None::<OnErrorFn>,
2861 Some(stats_tracking),
2862 None,
2863 )
2864 .await
2865 .unwrap();
2866 let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2867 assert_eq!(
2868 seen, NUM_SLOTS_TO_COVER,
2869 "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2870 );
2871 let mut skipped: Vec<u64> = SEEN_SKIPPED
2872 .get_or_init(DashSet::new)
2873 .iter()
2874 .map(|v| *v)
2875 .collect();
2876 skipped.sort_unstable();
2877 const EXPECTED_SKIPPED: [u64; 6] = [
2879 345_600_004,
2880 345_600_005,
2881 345_600_008,
2882 345_600_009,
2883 345_600_010,
2884 345_600_011,
2885 ];
2886 assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2887 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2888}
2889
2890#[tokio::test(flavor = "multi_thread")]
2891async fn test_firehose_target_slot_transactions() {
2892 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2893 solana_logger::setup_with_default("info");
2894 const TARGET_SLOT: u64 = 376_273_722;
2895 const SLOT_RADIUS: u64 = 50;
2896 const EXPECTED_TRANSACTIONS: u64 = 1414;
2897 const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2898 static FOUND: AtomicBool = AtomicBool::new(false);
2899 static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2900 static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2901
2902 FOUND.store(false, Ordering::Relaxed);
2903 OBSERVED_TXS.store(0, Ordering::Relaxed);
2904 OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2905
2906 firehose(
2907 4,
2908 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2909 Some(|_thread_id: usize, block: BlockData| {
2910 async move {
2911 if block.slot() == TARGET_SLOT {
2912 assert!(
2913 !block.was_skipped(),
2914 "target slot {TARGET_SLOT} was marked leader skipped",
2915 );
2916 if let BlockData::Block {
2917 executed_transaction_count,
2918 ..
2919 } = block
2920 {
2921 OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2922 FOUND.store(true, Ordering::Relaxed);
2923 assert_eq!(
2924 executed_transaction_count, EXPECTED_TRANSACTIONS,
2925 "unexpected transaction count for slot {TARGET_SLOT}"
2926 );
2927 assert_eq!(
2928 OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2929 EXPECTED_NON_VOTE_TRANSACTIONS,
2930 "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2931 );
2932 }
2933 }
2934 Ok(())
2935 }
2936 .boxed()
2937 }),
2938 Some(|_thread_id: usize, transaction: TransactionData| {
2939 async move {
2940 if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2941 OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2942 }
2943 Ok(())
2944 }
2945 .boxed()
2946 }),
2947 None::<OnEntryFn>,
2948 None::<OnRewardFn>,
2949 None::<OnErrorFn>,
2950 None::<OnStatsTrackingFn>,
2951 None,
2952 )
2953 .await
2954 .unwrap();
2955
2956 assert!(
2957 FOUND.load(Ordering::Relaxed),
2958 "target slot was not processed"
2959 );
2960 assert_eq!(
2961 OBSERVED_TXS.load(Ordering::Relaxed),
2962 EXPECTED_TRANSACTIONS,
2963 "recorded transaction count mismatch"
2964 );
2965}
2966
2967#[cfg(test)]
2968#[serial]
2969#[tokio::test(flavor = "multi_thread")]
2970async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
2971 solana_logger::setup_with_default("info");
2972 assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
2973}
2974
2975#[cfg(test)]
2976#[serial]
2977#[tokio::test(flavor = "multi_thread")]
2978async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
2979 solana_logger::setup_with_default("info");
2980 assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
2981}
2982
2983#[cfg(test)]
2984#[serial]
2985#[tokio::test(flavor = "multi_thread")]
2986async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
2987 solana_logger::setup_with_default("info");
2988 assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
2989}
2990
2991#[cfg(test)]
2992#[serial]
2993#[tokio::test(flavor = "multi_thread")]
2994async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
2995 solana_logger::setup_with_default("info");
2996 assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
2997}
2998
2999#[cfg(test)]
3000#[ignore]
3001#[serial]
3002#[tokio::test(flavor = "multi_thread")]
3003async fn debug_epoch_720_slot_311173980_node_summary() {
3004 solana_logger::setup_with_default("info");
3005 const SLOTS: &[u64] = &[
3006 311_173_980,
3007 311_225_232,
3008 311_175_860,
3009 311_134_608,
3010 376_273_722,
3011 ];
3012 for slot in SLOTS {
3013 log_slot_node_summary(*slot).await.expect("slot summary");
3014 }
3015}
3016
3017#[tokio::test(flavor = "multi_thread")]
3018async fn test_firehose_epoch_850_has_logs() {
3019 use std::sync::atomic::{AtomicU64, Ordering};
3020 solana_logger::setup_with_default("info");
3021 const START_SLOT: u64 = 367_200_075; const SLOT_COUNT: u64 = 50;
3023 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3024
3025 TOTAL_TXS.store(0, Ordering::Relaxed);
3026
3027 firehose(
3028 4,
3029 START_SLOT..(START_SLOT + SLOT_COUNT),
3030 None::<OnBlockFn>,
3031 Some(|_thread_id: usize, transaction: TransactionData| {
3032 async move {
3033 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3034 if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3035 let has_logs = logs.iter().any(|msg| !msg.is_empty());
3036 assert_eq!(has_logs, true);
3037 }
3038 Ok(())
3039 }
3040 .boxed()
3041 }),
3042 None::<OnEntryFn>,
3043 None::<OnRewardFn>,
3044 None::<OnErrorFn>,
3045 None::<OnStatsTrackingFn>,
3046 None,
3047 )
3048 .await
3049 .unwrap();
3050
3051 assert!(
3052 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3053 "no transactions observed in epoch 850 range"
3054 );
3055}
3056
3057#[tokio::test(flavor = "multi_thread")]
3058async fn test_firehose_epoch_850_votes_present() {
3059 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3060 solana_logger::setup_with_default("info");
3061 const TARGET_SLOT: u64 = 367_200_100; const SLOT_RADIUS: u64 = 10;
3063 static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3064 static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3065 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3066
3067 SEEN_BLOCK.store(false, Ordering::Relaxed);
3068 VOTE_TXS.store(0, Ordering::Relaxed);
3069 TOTAL_TXS.store(0, Ordering::Relaxed);
3070
3071 firehose(
3072 2,
3073 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3074 Some(|_thread_id: usize, block: BlockData| {
3075 async move {
3076 if block.slot() == TARGET_SLOT {
3077 assert!(
3078 !block.was_skipped(),
3079 "target slot {TARGET_SLOT} was marked leader skipped",
3080 );
3081 SEEN_BLOCK.store(true, Ordering::Relaxed);
3082 }
3083 Ok(())
3084 }
3085 .boxed()
3086 }),
3087 Some(|_thread_id: usize, transaction: TransactionData| {
3088 async move {
3089 if transaction.slot == TARGET_SLOT {
3090 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3091 if transaction.is_vote {
3092 VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3093 }
3094 }
3095 Ok(())
3096 }
3097 .boxed()
3098 }),
3099 None::<OnEntryFn>,
3100 None::<OnRewardFn>,
3101 None::<OnErrorFn>,
3102 None::<OnStatsTrackingFn>,
3103 None,
3104 )
3105 .await
3106 .unwrap();
3107
3108 assert!(
3109 SEEN_BLOCK.load(Ordering::Relaxed),
3110 "target slot was not processed"
3111 );
3112 assert!(
3113 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3114 "no transactions counted in target slot"
3115 );
3116 assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3117}
3118
3119#[cfg(test)]
3120#[serial]
3121#[tokio::test(flavor = "multi_thread")]
3122async fn test_firehose_restart_loses_coverage_without_reset() {
3123 use std::collections::HashMap;
3124 solana_logger::setup_with_default("info");
3125 const THREADS: usize = 1;
3126 const START_SLOT: u64 = 345_600_000;
3127 const NUM_SLOTS: u64 = 8;
3128
3129 static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3130 COVERAGE
3131 .get_or_init(|| Mutex::new(HashMap::new()))
3132 .lock()
3133 .unwrap()
3134 .clear();
3135 static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3136 static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3137 FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3138 SEEN_BLOCKS.store(0, Ordering::Relaxed);
3139
3140 firehose(
3141 THREADS.try_into().unwrap(),
3142 START_SLOT..(START_SLOT + NUM_SLOTS),
3143 Some(|_thread_id: usize, block: BlockData| {
3144 async move {
3145 if !block.was_skipped()
3147 && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3148 && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3149 {
3150 return Err("synthetic handler failure to exercise restart".into());
3151 }
3152 let mut coverage = COVERAGE
3153 .get_or_init(|| Mutex::new(HashMap::new()))
3154 .lock()
3155 .unwrap();
3156 *coverage.entry(block.slot()).or_insert(0) += 1;
3157 if !block.was_skipped() {
3158 SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3159 }
3160 Ok(())
3161 }
3162 .boxed()
3163 }),
3164 None::<OnTxFn>,
3165 None::<OnEntryFn>,
3166 None::<OnRewardFn>,
3167 None::<OnErrorFn>,
3168 None::<OnStatsTrackingFn>,
3169 None,
3170 )
3171 .await
3172 .unwrap();
3173
3174 let coverage = COVERAGE.get().unwrap().lock().unwrap();
3175 for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3176 assert!(
3177 coverage.contains_key(&slot),
3178 "missing coverage for slot {slot} after restart"
3179 );
3180 }
3181}
3182
3183#[cfg(test)]
3184#[serial]
3185#[tokio::test(flavor = "multi_thread")]
3186async fn test_firehose_gap_coverage_near_known_missing_range() {
3187 use std::collections::HashSet;
3188 solana_logger::setup_with_default("info");
3189 const GAP_START: u64 = 378864000;
3190 const START_SLOT: u64 = GAP_START - 1000;
3191 const END_SLOT: u64 = GAP_START + 1000;
3192 const THREADS: usize = 16;
3193
3194 static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3195 COVERAGE
3196 .get_or_init(|| Mutex::new(HashSet::new()))
3197 .lock()
3198 .unwrap()
3199 .clear();
3200
3201 firehose(
3202 THREADS.try_into().unwrap(),
3203 START_SLOT..(END_SLOT + 1),
3204 Some(|_thread_id: usize, block: BlockData| {
3205 async move {
3206 if block.was_skipped() {
3207 return Ok(());
3208 }
3209 let slot = block.slot();
3210 COVERAGE
3211 .get_or_init(|| Mutex::new(HashSet::new()))
3212 .lock()
3213 .unwrap()
3214 .insert(slot);
3215 Ok(())
3216 }
3217 .boxed()
3218 }),
3219 None::<OnTxFn>,
3220 None::<OnEntryFn>,
3221 None::<OnRewardFn>,
3222 None::<OnErrorFn>,
3223 None::<OnStatsTrackingFn>,
3224 None,
3225 )
3226 .await
3227 .unwrap();
3228
3229 let mut coverage = COVERAGE
3230 .get_or_init(|| Mutex::new(HashSet::new()))
3231 .lock()
3232 .unwrap()
3233 .clone();
3234
3235 coverage.insert(378864396);
3237 coverage.insert(378864397);
3238 coverage.insert(378864398);
3239 coverage.insert(378864399);
3240
3241 let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3242 let missing: Vec<u64> = expected
3243 .iter()
3244 .copied()
3245 .filter(|slot| !coverage.contains(slot))
3246 .collect();
3247 assert!(
3248 missing.is_empty(),
3249 "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3250 missing.len(),
3251 &missing[..missing.len().min(10)]
3252 );
3253}