1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7 block_metadata_notifier_interface::BlockMetadataNotifier,
8 geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14 optimistically_confirmed_bank_tracker::SlotNotification,
15 transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21 fmt::Display,
22 future::Future,
23 io,
24 ops::Range,
25 path::PathBuf,
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29 },
30};
31use thiserror::Error;
32use tokio::{
33 sync::broadcast::{self, error::TryRecvError},
34 time::timeout,
35};
36
37use crate::{
38 LOG_MODULE, SharedError,
39 epochs::{
40 FetchEpochStreamOptions, epoch_to_slot_range, fetch_epoch_stream,
41 fetch_epoch_stream_with_options, slot_to_epoch,
42 },
43 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
44 node_reader::NodeReader,
45 utils,
46};
47
48const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
52const OP_TIMEOUT_SEQUENTIAL: std::time::Duration = std::time::Duration::from_secs(180);
53const BINCODE_EPOCH_CUTOFF: u64 = 157;
55
56fn poll_shutdown(
57 flag: &Arc<std::sync::atomic::AtomicBool>,
58 receiver: &mut Option<broadcast::Receiver<()>>,
59) -> bool {
60 if let Some(rx) = receiver {
61 match rx.try_recv() {
62 Ok(_) | Err(TryRecvError::Lagged(_)) => {
63 flag.store(true, Ordering::SeqCst);
64 }
65 Err(TryRecvError::Closed) => {
66 flag.store(true, Ordering::SeqCst);
67 }
68 Err(TryRecvError::Empty) => {}
69 }
70 }
71 flag.load(Ordering::SeqCst)
72}
73
74fn is_shutdown_error(err: &FirehoseError) -> bool {
75 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
76 inner
77 .downcast_ref::<io::Error>()
78 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
79 .unwrap_or(false)
80 }
81
82 match err {
83 FirehoseError::BlockHandlerError(inner)
84 | FirehoseError::TransactionHandlerError(inner)
85 | FirehoseError::EntryHandlerError(inner)
86 | FirehoseError::RewardHandlerError(inner)
87 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
88 _ => false,
89 }
90}
91
92async fn find_previous_indexed_slot(
93 local_start: u64,
94 epoch_start: u64,
95 log_target: &str,
96) -> Result<Option<u64>, FirehoseError> {
97 if local_start <= epoch_start {
98 return Ok(None);
99 }
100 let mut candidate = local_start.saturating_sub(1);
101 let mut skipped = 0u64;
102 loop {
103 match slot_to_offset(candidate).await {
104 Ok(_) => {
105 if skipped > 0 {
106 log::info!(
107 target: log_target,
108 "slot {} missing in index; seeking back {} slots to {}",
109 local_start.saturating_sub(1),
110 skipped,
111 candidate
112 );
113 }
114 return Ok(Some(candidate));
115 }
116 Err(SlotOffsetIndexError::SlotNotFound(..)) => {
117 if candidate <= epoch_start {
118 break;
119 }
120 skipped += 1;
121 candidate = candidate.saturating_sub(1);
122 }
123 Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
124 }
125 }
126 log::warn!(
127 target: log_target,
128 "no indexed slot found before {} (epoch start {}); reading from epoch start",
129 local_start,
130 epoch_start
131 );
132 Ok(None)
133}
134
135#[derive(Debug, Error)]
138pub enum FirehoseError {
139 Reqwest(reqwest::Error),
141 ReadHeader(SharedError),
143 GeyserPluginService(GeyserPluginServiceError),
145 FailedToGetTransactionNotifier,
147 ReadUntilBlockError(SharedError),
149 GetBlockError(SharedError),
151 NodeDecodingError(usize, SharedError),
153 SlotOffsetIndexError(SlotOffsetIndexError),
155 SeekToSlotError(SharedError),
157 OnLoadError(SharedError),
159 OnStatsHandlerError(SharedError),
161 OperationTimeout(&'static str),
163 TransactionHandlerError(SharedError),
165 EntryHandlerError(SharedError),
167 RewardHandlerError(SharedError),
169 BlockHandlerError(SharedError),
171}
172
173unsafe impl Send for FirehoseError {}
174unsafe impl Sync for FirehoseError {}
175
176impl Display for FirehoseError {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
180 FirehoseError::ReadHeader(error) => {
181 write!(f, "Error reading header: {}", error)
182 }
183 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
184 f,
185 "Error initializing geyser plugin service: {}",
186 geyser_plugin_service_error
187 ),
188 FirehoseError::FailedToGetTransactionNotifier => write!(
189 f,
190 "Failed to get transaction notifier from GeyserPluginService"
191 ),
192 FirehoseError::ReadUntilBlockError(error) => {
193 write!(f, "Error reading until block: {}", error)
194 }
195 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
196 FirehoseError::NodeDecodingError(item_index, error) => {
197 write!(
198 f,
199 "Error seeking, reading data from, or decoding data for data node {}: {}",
200 item_index, error
201 )
202 }
203 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
204 f,
205 "Error getting info from slot offset index: {}",
206 slot_offset_index_error
207 ),
208 FirehoseError::SeekToSlotError(error) => {
209 write!(f, "Error seeking to slot: {}", error)
210 }
211 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
212 FirehoseError::OnStatsHandlerError(error) => {
213 write!(f, "Stats handler error: {}", error)
214 }
215 FirehoseError::OperationTimeout(op) => {
216 write!(f, "Timeout while waiting for operation: {}", op)
217 }
218 FirehoseError::TransactionHandlerError(error) => {
219 write!(f, "Transaction handler error: {}", error)
220 }
221 FirehoseError::EntryHandlerError(error) => {
222 write!(f, "Entry handler error: {}", error)
223 }
224 FirehoseError::RewardHandlerError(error) => {
225 write!(f, "Reward handler error: {}", error)
226 }
227 FirehoseError::BlockHandlerError(error) => {
228 write!(f, "Block handler error: {}", error)
229 }
230 }
231 }
232}
233
234impl From<reqwest::Error> for FirehoseError {
235 fn from(e: reqwest::Error) -> Self {
236 FirehoseError::Reqwest(e)
237 }
238}
239
240impl From<GeyserPluginServiceError> for FirehoseError {
241 fn from(e: GeyserPluginServiceError) -> Self {
242 FirehoseError::GeyserPluginService(e)
243 }
244}
245
246impl From<SlotOffsetIndexError> for FirehoseError {
247 fn from(e: SlotOffsetIndexError) -> Self {
248 FirehoseError::SlotOffsetIndexError(e)
249 }
250}
251
252#[derive(Clone, PartialEq, Eq, Hash, Debug)]
254pub struct ThreadStats {
255 pub thread_id: usize,
257 pub start_time: std::time::Instant,
259 pub finish_time: Option<std::time::Instant>,
261 pub slot_range: Range<u64>,
263 pub initial_slot_range: Range<u64>,
265 pub current_slot: u64,
267 pub slots_processed: u64,
269 pub blocks_processed: u64,
271 pub leader_skipped_slots: u64,
273 pub transactions_processed: u64,
275 pub entries_processed: u64,
277 pub rewards_processed: u64,
279}
280
281#[derive(Clone, PartialEq, Eq, Hash, Debug)]
283pub struct Stats {
284 pub thread_stats: ThreadStats,
286 pub start_time: std::time::Instant,
288 pub finish_time: Option<std::time::Instant>,
290 pub slot_range: Range<u64>,
292 pub slots_processed: u64,
294 pub blocks_processed: u64,
296 pub leader_skipped_slots: u64,
298 pub transactions_processed: u64,
300 pub entries_processed: u64,
302 pub rewards_processed: u64,
304 pub transactions_since_last_pulse: u64,
306 pub blocks_since_last_pulse: u64,
308 pub slots_since_last_pulse: u64,
310 pub time_since_last_pulse: std::time::Duration,
312}
313
314#[derive(Clone, PartialEq, Eq, Hash, Debug)]
316pub struct StatsTracking<OnStats: Handler<Stats>> {
317 pub on_stats: OnStats,
319 pub tracking_interval_slots: u64,
321}
322
323#[inline(always)]
324#[allow(clippy::too_many_arguments)]
325async fn maybe_emit_stats<OnStats: Handler<Stats>>(
326 stats_tracking: Option<&StatsTracking<OnStats>>,
327 thread_index: usize,
328 thread_stats: &ThreadStats,
329 overall_slots_processed: &AtomicU64,
330 overall_blocks_processed: &AtomicU64,
331 overall_transactions_processed: &AtomicU64,
332 overall_entries_processed: &AtomicU64,
333 transactions_since_stats: &AtomicU64,
334 blocks_since_stats: &AtomicU64,
335 slots_since_stats: &AtomicU64,
336 last_pulse: &Arc<AtomicU64>,
337 base_instant: std::time::Instant,
338) -> Result<(), (FirehoseError, u64)> {
339 if let Some(stats_tracker) = stats_tracking {
340 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
341 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
342 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
343 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
344 let now_nanos = base_instant.elapsed().as_nanos() as u64;
345 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
346 let delta_nanos = now_nanos.saturating_sub(previous);
347 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
348 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
349 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
350 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
351
352 let stats = Stats {
353 thread_stats: thread_stats.clone(),
354 start_time: thread_stats.start_time,
355 finish_time: thread_stats.finish_time,
356 slot_range: thread_stats.slot_range.clone(),
357 slots_processed: total_slots,
358 blocks_processed: total_blocks,
359 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
360 transactions_processed: total_transactions,
361 entries_processed: total_entries,
362 rewards_processed: thread_stats.rewards_processed,
363 transactions_since_last_pulse: processed_transactions,
364 blocks_since_last_pulse: processed_blocks,
365 slots_since_last_pulse: processed_slots,
366 time_since_last_pulse,
367 };
368
369 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
370 last_pulse.store(previous, Ordering::Relaxed);
371 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
372 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
373 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
374 return Err((
375 FirehoseError::OnStatsHandlerError(e),
376 thread_stats.current_slot,
377 ));
378 }
379 }
380 Ok(())
381}
382
383#[inline(always)]
384fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
385 if tracking_enabled {
386 atomic.fetch_add(value, Ordering::Relaxed);
387 }
388}
389
390fn clear_pending_skip(
391 map: &DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
392 thread_id: usize,
393 slot: u64,
394) -> bool {
395 map.get(&thread_id)
396 .map(|set| set.remove(&slot).is_some())
397 .unwrap_or(false)
398}
399
400fn decode_transaction_status_meta_from_frame(
401 slot: u64,
402 reassembled_metadata: Vec<u8>,
403) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
404 if reassembled_metadata.is_empty() {
405 return Ok(solana_transaction_status::TransactionStatusMeta::default());
407 }
408
409 match utils::decompress_zstd(reassembled_metadata.as_slice()) {
410 Ok(decompressed) => {
411 decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
412 Box::new(std::io::Error::other(format!(
413 "decode transaction metadata (slot {slot}): {err}"
414 ))) as SharedError
415 })
416 }
417 Err(decomp_err) => {
418 decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
421 Box::new(std::io::Error::other(format!(
422 "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
423 ))) as SharedError
424 })
425 }
426 }
427}
428
429#[derive(Debug, Default)]
430struct DecodedRewards {
431 keyed_rewards: Vec<(Address, RewardInfo)>,
432 num_partitions: Option<u64>,
433}
434
435impl DecodedRewards {
436 fn empty() -> Self {
437 Self {
438 keyed_rewards: Vec::new(),
439 num_partitions: None,
440 }
441 }
442}
443
444fn decode_rewards_from_frame(
445 slot: u64,
446 reassembled_rewards: Vec<u8>,
447) -> Result<DecodedRewards, SharedError> {
448 if reassembled_rewards.is_empty() {
449 return Ok(DecodedRewards::empty());
451 }
452
453 match utils::decompress_zstd(reassembled_rewards.as_slice()) {
454 Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
455 |err| {
456 Box::new(std::io::Error::other(format!(
457 "decode rewards (slot {slot}): {err}"
458 ))) as SharedError
459 },
460 ),
461 Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
462 |err| {
463 Box::new(std::io::Error::other(format!(
464 "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
465 ))) as SharedError
466 },
467 ),
468 }
469}
470
471fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
472 let epoch = slot_to_epoch(slot);
473 let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
474 prost_011::Message::decode(bytes);
475 match proto_attempt {
476 Ok(proto) => {
477 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
478 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
479 Box::new(std::io::Error::other(format!(
480 "convert rewards proto failed (epoch {epoch}): {err}"
481 ))) as SharedError
482 })?;
483 Ok(DecodedRewards {
484 keyed_rewards,
485 num_partitions,
486 })
487 }
488 Err(proto_err) => {
489 let stored: solana_storage_proto::StoredExtendedRewards =
490 bincode::deserialize(bytes).map_err(|bin_err| {
491 Box::new(std::io::Error::other(format!(
492 "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
493 ))) as SharedError
494 })?;
495 let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
496 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
497 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
498 Box::new(std::io::Error::other(format!(
499 "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
500 ))) as SharedError
501 })?;
502 Ok(DecodedRewards {
503 keyed_rewards,
504 num_partitions,
505 })
506 }
507 }
508}
509
510fn decode_transaction_status_meta(
511 slot: u64,
512 metadata_bytes: &[u8],
513) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
514 let epoch = slot_to_epoch(slot);
515 let mut bincode_err: Option<String> = None;
516 if epoch < BINCODE_EPOCH_CUTOFF {
517 match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
518 metadata_bytes,
519 ) {
520 Ok(stored) => return Ok(stored.into()),
521 Err(err) => {
522 bincode_err = Some(err.to_string());
523 }
524 }
525 }
526
527 let bin_err_for_proto = bincode_err.clone();
528 let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
529 prost_011::Message::decode(metadata_bytes).map_err(|err| {
530 if let Some(ref bin_err) = bin_err_for_proto {
532 Box::new(std::io::Error::other(format!(
533 "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
534 ))) as SharedError
535 } else {
536 Box::new(std::io::Error::other(format!(
537 "protobuf decode transaction metadata: {err}"
538 ))) as SharedError
539 }
540 })?;
541
542 proto.try_into().map_err(|err| {
543 if let Some(ref bin_err) = bincode_err {
544 Box::new(std::io::Error::other(format!(
545 "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
546 ))) as SharedError
547 } else {
548 Box::new(std::io::Error::other(format!(
549 "convert transaction metadata proto: {err}"
550 ))) as SharedError
551 }
552 })
553}
554
555#[cfg(test)]
556mod metadata_decode_tests {
557 use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
558 use solana_message::v0::LoadedAddresses;
559 use solana_storage_proto::StoredTransactionStatusMeta;
560 use solana_transaction_status::TransactionStatusMeta;
561
562 fn sample_meta() -> TransactionStatusMeta {
563 let mut meta = TransactionStatusMeta::default();
564 meta.fee = 42;
565 meta.pre_balances = vec![1, 2];
566 meta.post_balances = vec![3, 4];
567 meta.log_messages = Some(vec!["hello".into()]);
568 meta.pre_token_balances = Some(Vec::new());
569 meta.post_token_balances = Some(Vec::new());
570 meta.rewards = Some(Vec::new());
571 meta.compute_units_consumed = Some(7);
572 meta.cost_units = Some(9);
573 meta.loaded_addresses = LoadedAddresses::default();
574 meta
575 }
576
577 #[test]
578 fn decodes_bincode_metadata_for_early_epochs() {
579 let stored = StoredTransactionStatusMeta {
580 status: Ok(()),
581 fee: 42,
582 pre_balances: vec![1, 2],
583 post_balances: vec![3, 4],
584 inner_instructions: None,
585 log_messages: Some(vec!["hello".into()]),
586 pre_token_balances: Some(Vec::new()),
587 post_token_balances: Some(Vec::new()),
588 rewards: Some(Vec::new()),
589 return_data: None,
590 compute_units_consumed: Some(7),
591 cost_units: Some(9),
592 };
593 let bytes = bincode::serialize(&stored).expect("bincode serialize");
594 let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
595 assert_eq!(decoded, TransactionStatusMeta::from(stored));
596 }
597
598 #[test]
599 fn decodes_protobuf_metadata_for_later_epochs() {
600 let meta = sample_meta();
601 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
602 meta.clone().into();
603 let bytes = prost_011::Message::encode_to_vec(&generated);
604 let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
605 assert_eq!(decoded, meta);
606 }
607
608 #[test]
609 fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
610 let meta = sample_meta();
611 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
612 meta.clone().into();
613 let bytes = prost_011::Message::encode_to_vec(&generated);
614 let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
616 assert_eq!(decoded, meta);
617 }
618
619 #[test]
620 fn empty_frame_decodes_to_default() {
621 let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
622 assert_eq!(decoded, TransactionStatusMeta::default());
623 }
624
625 #[test]
626 fn raw_bincode_frame_without_zstd_still_decodes() {
627 let stored = StoredTransactionStatusMeta {
628 status: Ok(()),
629 fee: 1,
630 pre_balances: vec![],
631 post_balances: vec![],
632 inner_instructions: None,
633 log_messages: None,
634 pre_token_balances: Some(Vec::new()),
635 post_token_balances: Some(Vec::new()),
636 rewards: Some(Vec::new()),
637 return_data: None,
638 compute_units_consumed: None,
639 cost_units: None,
640 };
641 let raw_bytes = bincode::serialize(&stored).expect("serialize");
642 let decoded =
643 decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
644 assert_eq!(decoded, TransactionStatusMeta::from(stored));
645 }
646}
647
648#[cfg(test)]
649mod rewards_decode_tests {
650 use super::decode_rewards_from_bytes;
651 use solana_sdk_ids::vote::id as vote_program_id;
652 use solana_storage_proto::StoredExtendedRewards;
653 use solana_transaction_status::{Reward, RewardType};
654
655 #[test]
656 fn decodes_protobuf_rewards() {
657 let pubkey = vote_program_id().to_string();
658 let proto = solana_storage_proto::convert::generated::Rewards {
659 rewards: vec![solana_storage_proto::convert::generated::Reward {
660 pubkey,
661 lamports: 5,
662 post_balance: 10,
663 reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
664 commission: "1".to_string(),
665 }],
666 num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
667 num_partitions: 2,
668 }),
669 };
670 let bytes = prost_011::Message::encode_to_vec(&proto);
671 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
672 assert_eq!(decoded.keyed_rewards.len(), 1);
673 assert_eq!(decoded.num_partitions, Some(2));
674 }
675
676 #[test]
677 fn decodes_bincode_rewards() {
678 let pubkey = vote_program_id().to_string();
679 let reward = Reward {
680 pubkey,
681 lamports: 7,
682 post_balance: 9,
683 reward_type: Some(RewardType::Rent),
684 commission: Some(3),
685 };
686 let stored_rewards: StoredExtendedRewards = vec![reward.into()];
687 let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
688 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
689 assert_eq!(decoded.keyed_rewards.len(), 1);
690 assert_eq!(decoded.num_partitions, None);
691 }
692}
693
694#[derive(Debug, Clone)]
696pub struct TransactionData {
697 pub slot: u64,
699 pub transaction_slot_index: usize,
701 pub signature: solana_signature::Signature,
703 pub message_hash: Hash,
705 pub is_vote: bool,
707 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
709 pub transaction: VersionedTransaction,
711}
712
713#[derive(Debug, Clone)]
715pub struct EntryData {
716 pub slot: u64,
718 pub entry_index: usize,
720 pub transaction_indexes: Range<usize>,
722 pub num_hashes: u64,
724 pub hash: Hash,
726}
727
728#[derive(Debug, Clone)]
730pub struct RewardsData {
731 pub slot: u64,
733 pub rewards: Vec<(Address, RewardInfo)>,
735}
736
737#[derive(Debug)]
739pub enum BlockData {
740 Block {
742 parent_slot: u64,
744 parent_blockhash: Hash,
746 slot: u64,
748 blockhash: Hash,
750 rewards: KeyedRewardsAndNumPartitions,
752 block_time: Option<i64>,
754 block_height: Option<u64>,
756 executed_transaction_count: u64,
758 entry_count: u64,
760 },
761 PossibleLeaderSkipped {
764 slot: u64,
766 },
767}
768
769impl BlockData {
770 #[inline(always)]
772 pub const fn slot(&self) -> u64 {
773 match self {
774 BlockData::Block { slot, .. } => *slot,
775 BlockData::PossibleLeaderSkipped { slot } => *slot,
776 }
777 }
778
779 #[inline(always)]
781 pub const fn was_skipped(&self) -> bool {
782 matches!(self, BlockData::PossibleLeaderSkipped { .. })
783 }
784
785 #[inline(always)]
787 pub const fn block_time(&self) -> Option<i64> {
788 match self {
789 BlockData::Block { block_time, .. } => *block_time,
790 BlockData::PossibleLeaderSkipped { .. } => None,
791 }
792 }
793}
794
795type HandlerResult = Result<(), SharedError>;
796type HandlerFuture = BoxFuture<'static, HandlerResult>;
797
798pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
800
801impl<Data, F> Handler<Data> for F where
802 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
803{
804}
805
806pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
808pub type OnBlockFn = HandlerFn<BlockData>;
810pub type OnTxFn = HandlerFn<TransactionData>;
812pub type OnEntryFn = HandlerFn<EntryData>;
814pub type OnRewardFn = HandlerFn<RewardsData>;
816pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
818pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
820pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
822
823#[derive(Clone, Debug)]
825pub struct FirehoseErrorContext {
826 pub thread_id: usize,
828 pub slot: u64,
830 pub epoch: u64,
832 pub error_message: String,
834}
835
836#[inline]
848#[allow(clippy::too_many_arguments)]
849pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
850 threads: u64,
851 sequential: bool,
852 buffer_window_bytes: Option<u64>,
853 slot_range: Range<u64>,
854 on_block: Option<OnBlock>,
855 on_tx: Option<OnTransaction>,
856 on_entry: Option<OnEntry>,
857 on_rewards: Option<OnRewards>,
858 on_error: Option<OnError>,
859 stats_tracking: Option<StatsTracking<OnStats>>,
860 shutdown_signal: Option<broadcast::Receiver<()>>,
861) -> Result<(), (FirehoseError, u64)>
862where
863 OnBlock: Handler<BlockData>,
864 OnTransaction: Handler<TransactionData>,
865 OnEntry: Handler<EntryData>,
866 OnRewards: Handler<RewardsData>,
867 OnStats: Handler<Stats>,
868 OnError: Handler<FirehoseErrorContext>,
869{
870 if threads == 0 {
871 return Err((
872 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
873 slot_range.start,
874 ));
875 }
876 let client = crate::network::create_http_client();
877 log::info!(target: LOG_MODULE, "starting firehose...");
878 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
879 let firehose_threads = if sequential { 1 } else { threads };
880 let sequential_download_threads = std::cmp::max(1, threads as usize);
881 let sequential_buffer_window_bytes = buffer_window_bytes
882 .filter(|value| *value >= 2)
883 .unwrap_or_else(crate::system::default_firehose_buffer_window_bytes);
884 if sequential {
885 log::info!(
886 target: LOG_MODULE,
887 "sequential mode enabled: firehose_threads=1, ripget_threads={}, ripget_window={}",
888 sequential_download_threads,
889 crate::system::format_byte_size(sequential_buffer_window_bytes)
890 );
891 }
892
893 let slot_range = Arc::new(slot_range);
894
895 let subranges = generate_subranges(&slot_range, firehose_threads);
897 if firehose_threads > 1 {
898 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
899 }
900
901 let firehose_start = std::time::Instant::now();
902 let shutdown_flag = Arc::new(AtomicBool::new(false));
903 if let Some(ref rx) = shutdown_signal {
904 let mut rx = rx.resubscribe();
905 let flag = shutdown_flag.clone();
906 tokio::spawn(async move {
907 if rx.recv().await.is_ok() {
908 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
909 flag.store(true, Ordering::SeqCst);
910 }
911 });
912 }
913
914 let shared_ripget_client: Option<ripget::Client> = if sequential {
916 Some(
917 ripget::build_client(Some(&format!(
918 "jetstreamer-firehose/{}",
919 env!("CARGO_PKG_VERSION")
920 )))
921 .expect("failed to build ripget HTTP client"),
922 )
923 } else {
924 None
925 };
926
927 let mut handles = Vec::new();
928 let error_counts: Arc<Vec<AtomicU32>> =
930 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
931
932 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
933 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
934 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
935 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
936 let pending_skipped_slots: Arc<
937 DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
938 > = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
939
940 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
941 let error_counts = error_counts.clone();
942 let client = client.clone();
943 let on_block = on_block.clone();
944 let on_tx = on_tx.clone();
945 let on_entry = on_entry.clone();
946 let on_reward = on_rewards.clone();
947 let on_error = on_error.clone();
948 let overall_slots_processed = overall_slots_processed.clone();
949 let overall_blocks_processed = overall_blocks_processed.clone();
950 let overall_transactions_processed = overall_transactions_processed.clone();
951 let overall_entries_processed = overall_entries_processed.clone();
952 let stats_tracking = stats_tracking.clone();
953 let transactions_since_stats = Arc::new(AtomicU64::new(0));
954 let blocks_since_stats = Arc::new(AtomicU64::new(0));
955 let slots_since_stats = Arc::new(AtomicU64::new(0));
956 let last_pulse = Arc::new(AtomicU64::new(0));
957 let transactions_since_stats_cloned = transactions_since_stats.clone();
958 let blocks_since_stats_cloned = blocks_since_stats.clone();
959 let slots_since_stats_cloned = slots_since_stats.clone();
960 let last_pulse_cloned = last_pulse.clone();
961 let shutdown_flag = shutdown_flag.clone();
962 let pending_skipped_slots = pending_skipped_slots.clone();
963 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
964 let sequential_mode = sequential;
965 let ripget_threads = sequential_download_threads;
966 let ripget_buffer_window_bytes = sequential_buffer_window_bytes;
967 let ripget_client = shared_ripget_client.clone();
968
969 let handle = tokio::spawn(async move {
970 let transactions_since_stats = transactions_since_stats_cloned;
971 let blocks_since_stats = blocks_since_stats_cloned;
972 let slots_since_stats = slots_since_stats_cloned;
973 let last_pulse = last_pulse_cloned;
974 let mut shutdown_rx = thread_shutdown_rx;
975 let start_time = firehose_start;
976 last_pulse.store(
977 firehose_start.elapsed().as_nanos() as u64,
978 Ordering::Relaxed,
979 );
980 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
981 let mut skip_until_index = None;
982 let last_emitted_slot = slot_range.start.saturating_sub(1);
983 let block_enabled = on_block.is_some();
984 let tx_enabled = on_tx.is_some();
985 let entry_enabled = on_entry.is_some();
986 let reward_enabled = on_reward.is_some();
987 let tracking_enabled = stats_tracking.is_some();
988 if block_enabled {
989 pending_skipped_slots
990 .entry(thread_index)
991 .or_insert_with(|| DashSet::with_hasher(ahash::RandomState::new()));
992 }
993 let mut last_counted_slot = slot_range.start.saturating_sub(1);
994 let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
995 let mut thread_stats = if tracking_enabled {
996 Some(ThreadStats {
997 thread_id: thread_index,
998 start_time,
999 finish_time: None,
1000 slot_range: slot_range.clone(),
1001 initial_slot_range: slot_range.clone(),
1002 current_slot: slot_range.start,
1003 slots_processed: 0,
1004 blocks_processed: 0,
1005 leader_skipped_slots: 0,
1006 transactions_processed: 0,
1007 entries_processed: 0,
1008 rewards_processed: 0,
1009 })
1010 } else {
1011 None
1012 };
1013
1014 while let Err((err, slot)) = async {
1016 let mut last_emitted_slot = last_emitted_slot_global;
1017 let op_timeout = if sequential_mode {
1018 OP_TIMEOUT_SEQUENTIAL
1019 } else {
1020 OP_TIMEOUT
1021 };
1022 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1023 log::info!(
1024 target: &log_target,
1025 "shutdown requested; terminating firehose thread {}",
1026 thread_index
1027 );
1028 return Ok(());
1029 }
1030 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1031 log::info!(
1032 target: &log_target,
1033 "slot range: {} (epoch {}) ... {} (epoch {})",
1034 slot_range.start,
1035 slot_to_epoch(slot_range.start),
1036 slot_range.end,
1037 slot_to_epoch(slot_range.end)
1038 );
1039
1040 log::info!(target: &log_target, "🚒 starting firehose...");
1041
1042 let mut current_slot: Option<u64> = None;
1044 for epoch_num in epoch_range.clone() {
1045 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1046 log::info!(
1047 target: &log_target,
1048 "shutdown requested; terminating firehose thread {}",
1049 thread_index
1050 );
1051 return Ok(());
1052 }
1053 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1054 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1055 let local_start = std::cmp::max(slot_range.start, epoch_start);
1056 let local_end_inclusive =
1057 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1058 if local_start > local_end_inclusive {
1059 log::debug!(
1060 target: &log_target,
1061 "epoch {} has no overlap with thread range ({}..{}), skipping",
1062 epoch_num,
1063 slot_range.start,
1064 slot_range.end
1065 );
1066 continue;
1067 }
1068 let use_sequential_stream = sequential_mode && local_start == epoch_start;
1069 let stream = match timeout(op_timeout, async {
1070 if use_sequential_stream {
1071 fetch_epoch_stream_with_options(
1072 epoch_num,
1073 &client,
1074 Some(FetchEpochStreamOptions {
1075 sequential: true,
1076 ripget_threads,
1077 buffer_window_bytes: ripget_buffer_window_bytes,
1078 ripget_client: ripget_client.clone(),
1079 }),
1080 )
1081 .await
1082 } else {
1083 fetch_epoch_stream(epoch_num, &client).await
1084 }
1085 })
1086 .await
1087 {
1088 Ok(stream) => stream,
1089 Err(_) => {
1090 return Err((
1091 FirehoseError::OperationTimeout("fetch_epoch_stream"),
1092 current_slot.unwrap_or(slot_range.start),
1093 ));
1094 }
1095 };
1096 let mut reader = NodeReader::new(stream);
1097
1098 let header_fut = reader.read_raw_header();
1099 let header = match timeout(op_timeout, header_fut).await {
1100 Ok(res) => res
1101 .map_err(FirehoseError::ReadHeader)
1102 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1103 Err(_) => {
1104 return Err((
1105 FirehoseError::OperationTimeout("read_raw_header"),
1106 current_slot.unwrap_or(slot_range.start),
1107 ));
1108 }
1109 };
1110 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1111
1112 let mut previous_blockhash = Hash::default();
1113 let mut latest_entry_blockhash = Hash::default();
1114 last_counted_slot = local_start.saturating_sub(1);
1117 current_slot = None;
1118 if tracking_enabled
1119 && let Some(ref mut stats) = thread_stats {
1120 stats.current_slot = local_start;
1121 stats.slot_range.start = local_start;
1122 }
1123
1124 if local_start > epoch_start {
1125 let seek_slot = match timeout(
1128 OP_TIMEOUT,
1129 find_previous_indexed_slot(local_start, epoch_start, &log_target),
1130 )
1131 .await
1132 {
1133 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1134 Err(_) => {
1135 return Err((
1136 FirehoseError::OperationTimeout(
1137 "seek_to_previous_indexed_slot",
1138 ),
1139 current_slot.unwrap_or(slot_range.start),
1140 ));
1141 }
1142 };
1143 if let Some(seek_slot) = seek_slot {
1144 let seek_fut = reader.seek_to_slot(seek_slot);
1145 match timeout(op_timeout, seek_fut).await {
1146 Ok(res) => {
1147 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1148 }
1149 Err(_) => {
1150 return Err((
1151 FirehoseError::OperationTimeout("seek_to_slot"),
1152 current_slot.unwrap_or(slot_range.start),
1153 ));
1154 }
1155 }
1156 }
1157 }
1158
1159 let mut item_index = 0;
1161 let mut displayed_skip_message = false;
1162 loop {
1163 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1164 log::info!(
1165 target: &log_target,
1166 "shutdown requested; terminating firehose thread {}",
1167 thread_index
1168 );
1169 return Ok(());
1170 }
1171 let read_fut = reader.read_until_block();
1172 let nodes = match timeout(op_timeout, read_fut).await {
1173 Ok(result) => result
1174 .map_err(FirehoseError::ReadUntilBlockError)
1175 .map_err(|e| {
1176 (
1177 e,
1178 current_slot
1179 .map(|slot| slot.saturating_add(1))
1180 .unwrap_or(slot_range.start),
1181 )
1182 })?,
1183 Err(_) => {
1184 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1185 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1186 }
1187 };
1188 if nodes.is_empty() {
1189 log::info!(
1190 target: &log_target,
1191 "reached end of epoch {}",
1192 epoch_num
1193 );
1194 break;
1195 }
1196 if let Some(last_node) = nodes.0.last()
1197 && !last_node.get_node().is_block()
1198 {
1199 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1200 break;
1201 }
1202 let block = nodes
1203 .get_block()
1204 .map_err(FirehoseError::GetBlockError)
1205 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1206 log::debug!(
1207 target: &log_target,
1208 "read {} items from epoch {}, now at slot {}",
1209 item_index,
1210 epoch_num,
1211 block.slot
1212 );
1213 let slot = block.slot;
1214 if slot > local_end_inclusive {
1215 log::debug!(
1216 target: &log_target,
1217 "reached end of local slice at slot {} (epoch {}), stopping",
1218 slot,
1219 epoch_num
1220 );
1221 break;
1222 }
1223 if slot >= slot_range.end {
1224 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1225 if block_enabled {
1230 pending_skipped_slots.remove(&thread_index);
1231 }
1232 return Ok(());
1233 }
1234 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1235 if slot < slot_range.start {
1236 if slot.saturating_add(1) == slot_range.start {
1237 log::debug!(
1238 target: &log_target,
1239 "priming reader with preceding slot {}, skipping",
1240 slot
1241 );
1242 } else {
1243 log::warn!(
1244 target: &log_target,
1245 "encountered slot {} before start of range {}, skipping",
1246 slot,
1247 slot_range.start
1248 );
1249 }
1250 continue;
1251 }
1252 current_slot = Some(slot);
1253 let mut entry_index: usize = 0;
1254 let mut this_block_executed_transaction_count: u64 = 0;
1255 let mut this_block_entry_count: u64 = 0;
1256 let mut this_block_rewards = DecodedRewards::empty();
1257
1258 for node_with_cid in &nodes.0 {
1259 item_index += 1;
1260 if let Some(skip) = skip_until_index {
1261 if item_index < skip {
1262 if !displayed_skip_message {
1263 log::info!(
1264 target: &log_target,
1265 "skipping until index {} (at {})",
1266 skip,
1267 item_index
1268 );
1269 displayed_skip_message = true;
1270 }
1271 continue;
1272 } else {
1273 log::info!(
1274 target: &log_target,
1275 "reached target index {}, resuming...",
1276 skip
1277 );
1278 skip_until_index = None;
1279 }
1280 }
1281 let node = node_with_cid.get_node();
1282
1283 if let Some(ref mut stats) = thread_stats {
1284 stats.current_slot = slot;
1285 }
1286
1287 let error_slot = current_slot.unwrap_or(slot_range.start);
1288
1289 use crate::node::Node::*;
1290 match node {
1291 Transaction(tx) => {
1292 if tx_enabled
1293 && let Some(on_tx_cb) = on_tx.as_ref()
1294 {
1295 let error_slot = current_slot.unwrap_or(slot_range.start);
1296 let versioned_tx = tx.as_parsed().map_err(|err| {
1297 (
1298 FirehoseError::NodeDecodingError(item_index, err),
1299 error_slot,
1300 )
1301 })?;
1302 let reassembled_metadata = nodes
1303 .reassemble_dataframes(&tx.metadata)
1304 .map_err(|err| {
1305 (
1306 FirehoseError::NodeDecodingError(item_index, err),
1307 error_slot,
1308 )
1309 })?;
1310
1311 let as_native_metadata = decode_transaction_status_meta_from_frame(
1312 block.slot,
1313 reassembled_metadata,
1314 )
1315 .map_err(|err| {
1316 (
1317 FirehoseError::NodeDecodingError(item_index, err),
1318 error_slot,
1319 )
1320 })?;
1321
1322 let message_hash = {
1323 #[cfg(feature = "verify-transaction-signatures")]
1324 {
1325 versioned_tx.verify_and_hash_message().map_err(|err| {
1326 (
1327 FirehoseError::TransactionHandlerError(Box::new(err)),
1328 error_slot,
1329 )
1330 })?
1331 }
1332 #[cfg(not(feature = "verify-transaction-signatures"))]
1333 {
1334 versioned_tx.message.hash()
1335 }
1336 };
1337 let signature = versioned_tx
1338 .signatures
1339 .first()
1340 .ok_or_else(|| {
1341 Box::new(std::io::Error::new(
1342 std::io::ErrorKind::InvalidData,
1343 "transaction missing signature",
1344 )) as SharedError
1345 })
1346 .map_err(|err| {
1347 (
1348 FirehoseError::NodeDecodingError(
1349 item_index,
1350 err,
1351 ),
1352 error_slot,
1353 )
1354 })?;
1355 let is_vote = is_simple_vote_transaction(&versioned_tx);
1356
1357 on_tx_cb(
1358 thread_index,
1359 TransactionData {
1360 slot: block.slot,
1361 transaction_slot_index: tx.index.unwrap() as usize,
1362 signature: *signature,
1363 message_hash,
1364 is_vote,
1365 transaction_status_meta: as_native_metadata,
1366 transaction: versioned_tx,
1367 },
1368 )
1369 .await
1370 .map_err(|e| {
1371 (
1372 FirehoseError::TransactionHandlerError(e),
1373 error_slot,
1374 )
1375 })?;
1376 }
1377 fetch_add_if(
1378 tracking_enabled,
1379 &overall_transactions_processed,
1380 1,
1381 );
1382 if let Some(ref mut stats) = thread_stats {
1383 stats.transactions_processed += 1;
1384 }
1385 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1386 }
1387 Entry(entry) => {
1388 let entry_hash = Hash::from(entry.hash.to_bytes());
1389 let entry_transaction_count = entry.transactions.len();
1390 let entry_transaction_count_u64 = entry_transaction_count as u64;
1391 let starting_transaction_index_u64 =
1392 this_block_executed_transaction_count;
1393 latest_entry_blockhash = entry_hash;
1394 this_block_executed_transaction_count += entry_transaction_count_u64;
1395 this_block_entry_count += 1;
1396
1397 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1398 let starting_transaction_index = usize::try_from(
1399 starting_transaction_index_u64,
1400 )
1401 .map_err(|err| {
1402 (
1403 FirehoseError::EntryHandlerError(Box::new(err)),
1404 error_slot,
1405 )
1406 })?;
1407 let transaction_indexes_end =
1408 starting_transaction_index + entry_transaction_count;
1409 on_entry_cb(
1410 thread_index,
1411 EntryData {
1412 slot: block.slot,
1413 entry_index,
1414 transaction_indexes: starting_transaction_index
1415 ..transaction_indexes_end,
1416 num_hashes: entry.num_hashes,
1417 hash: entry_hash,
1418 },
1419 )
1420 .await
1421 .map_err(|e| {
1422 (
1423 FirehoseError::EntryHandlerError(e),
1424 error_slot,
1425 )
1426 })?;
1427 }
1428 entry_index += 1;
1429 fetch_add_if(
1430 tracking_enabled,
1431 &overall_entries_processed,
1432 1,
1433 );
1434 if let Some(ref mut stats) = thread_stats {
1435 stats.entries_processed += 1;
1436 }
1437 }
1438 Block(block) => {
1439 let prev_last_counted_slot = last_counted_slot;
1440 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1441 (
1442 stats.slots_processed,
1443 stats.blocks_processed,
1444 stats.leader_skipped_slots,
1445 stats.current_slot,
1446 )
1447 });
1448
1449 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1450 let skip_start_from_previous = last_counted_slot.saturating_add(1);
1451 let skip_start = skip_start_from_previous.max(next_expected_slot);
1452
1453 let skipped_epoch = slot_to_epoch(last_counted_slot);
1454 for skipped_slot in skip_start..slot {
1455 if slot_to_epoch(skipped_slot) != skipped_epoch {
1456 break;
1457 }
1458 log::debug!(
1459 target: &log_target,
1460 "leader skipped slot {} (prev_counted {}, current slot {})",
1461 skipped_slot,
1462 prev_last_counted_slot,
1463 slot,
1464 );
1465 if block_enabled {
1466 pending_skipped_slots
1467 .entry(thread_index)
1468 .or_default()
1469 .insert(skipped_slot);
1470 }
1471 if block_enabled
1472 && let Some(on_block_cb) = on_block.as_ref()
1473 && skipped_slot > last_emitted_slot {
1474 last_emitted_slot = skipped_slot;
1475 on_block_cb(
1476 thread_index,
1477 BlockData::PossibleLeaderSkipped {
1478 slot: skipped_slot,
1479 },
1480 )
1481 .await
1482 .map_err(|e| {
1483 (
1484 FirehoseError::BlockHandlerError(e),
1485 error_slot,
1486 )
1487 })?;
1488 }
1489 if tracking_enabled {
1490 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1491 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1492 if let Some(ref mut stats) = thread_stats {
1493 stats.leader_skipped_slots += 1;
1494 stats.slots_processed += 1;
1495 stats.current_slot = skipped_slot;
1496 }
1497 }
1498 last_counted_slot = skipped_slot;
1499 }
1500
1501 let cleared_pending_skip = if block_enabled {
1502 clear_pending_skip(
1503 &pending_skipped_slots,
1504 thread_index,
1505 slot,
1506 )
1507 } else {
1508 false
1509 };
1510
1511 if slot <= last_counted_slot && !cleared_pending_skip {
1512 log::debug!(
1513 target: &log_target,
1514 "duplicate block {}, already counted (last_counted={})",
1515 slot,
1516 last_counted_slot,
1517 );
1518 this_block_rewards = DecodedRewards::empty();
1519 continue;
1520 }
1521
1522 if block_enabled {
1523 if let Some(on_block_cb) = on_block.as_ref() {
1524 let DecodedRewards {
1525 keyed_rewards,
1526 num_partitions,
1527 } = std::mem::take(&mut this_block_rewards);
1528 if slot > last_emitted_slot {
1529 last_emitted_slot = slot;
1530 on_block_cb(
1531 thread_index,
1532 BlockData::Block {
1533 parent_slot: block.meta.parent_slot,
1534 parent_blockhash: previous_blockhash,
1535 slot: block.slot,
1536 blockhash: latest_entry_blockhash,
1537 rewards: KeyedRewardsAndNumPartitions {
1538 keyed_rewards,
1539 num_partitions,
1540 },
1541 block_time: Some(block.meta.blocktime as i64),
1542 block_height: block.meta.block_height,
1543 executed_transaction_count:
1544 this_block_executed_transaction_count,
1545 entry_count: this_block_entry_count,
1546 },
1547 )
1548 .await
1549 .map_err(|e| {
1550 (
1551 FirehoseError::BlockHandlerError(e),
1552 error_slot,
1553 )
1554 })?;
1555 }
1556 }
1557 } else {
1558 this_block_rewards = DecodedRewards::empty();
1559 }
1560 previous_blockhash = latest_entry_blockhash;
1561
1562 if tracking_enabled {
1563 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1564 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1565 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1566 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1567 if let Some(ref mut stats) = thread_stats {
1568 stats.blocks_processed += 1;
1569 stats.slots_processed += 1;
1570 stats.current_slot = slot;
1571 }
1572
1573 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1574 (&stats_tracking, thread_stats.as_mut())
1575 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1576 && let Err(err) = maybe_emit_stats(
1577 stats_tracking.as_ref(),
1578 thread_index,
1579 thread_stats_ref,
1580 &overall_slots_processed,
1581 &overall_blocks_processed,
1582 &overall_transactions_processed,
1583 &overall_entries_processed,
1584 &transactions_since_stats,
1585 &blocks_since_stats,
1586 &slots_since_stats,
1587 &last_pulse,
1588 start_time,
1589 )
1590 .await
1591 {
1592 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1593 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1594 overall_blocks_processed
1595 .fetch_sub(1, Ordering::Relaxed);
1596 overall_slots_processed
1597 .fetch_sub(1, Ordering::Relaxed);
1598 if let Some((
1599 prev_slots_processed,
1600 prev_blocks_processed,
1601 prev_leader_skipped,
1602 prev_current_slot,
1603 )) = thread_stats_snapshot
1604 {
1605 thread_stats_ref.slots_processed =
1606 prev_slots_processed;
1607 thread_stats_ref.blocks_processed =
1608 prev_blocks_processed;
1609 thread_stats_ref.leader_skipped_slots =
1610 prev_leader_skipped;
1611 thread_stats_ref.current_slot =
1612 prev_current_slot;
1613 }
1614 last_counted_slot = prev_last_counted_slot;
1615 return Err(err);
1616 }
1617 }
1618
1619 if slot > last_counted_slot {
1620 last_counted_slot = slot;
1621 }
1622 }
1623 Subset(_subset) => (),
1624 Epoch(_epoch) => (),
1625 Rewards(rewards) => {
1626 if reward_enabled || block_enabled {
1627 let reassembled = nodes
1628 .reassemble_dataframes(&rewards.data)
1629 .map_err(|err| {
1630 (
1631 FirehoseError::NodeDecodingError(item_index, err),
1632 current_slot.unwrap_or(slot_range.start),
1633 )
1634 })?;
1635 if reassembled.is_empty() {
1636 this_block_rewards = DecodedRewards::empty();
1637 if reward_enabled
1638 && let Some(on_reward_cb) = on_reward.as_ref()
1639 {
1640 on_reward_cb(
1641 thread_index,
1642 RewardsData {
1643 slot: block.slot,
1644 rewards: Vec::new(),
1645 },
1646 )
1647 .await
1648 .map_err(|e| {
1649 (
1650 FirehoseError::RewardHandlerError(e),
1651 error_slot,
1652 )
1653 })?;
1654 }
1655 continue;
1656 }
1657
1658 let decoded_rewards =
1659 decode_rewards_from_frame(block.slot, reassembled)
1660 .map_err(|err| {
1661 (
1662 FirehoseError::NodeDecodingError(
1663 item_index,
1664 err,
1665 ),
1666 error_slot,
1667 )
1668 })?;
1669 if reward_enabled
1670 && let Some(on_reward_cb) = on_reward.as_ref()
1671 {
1672 on_reward_cb(
1673 thread_index,
1674 RewardsData {
1675 slot: block.slot,
1676 rewards: decoded_rewards.keyed_rewards.clone(),
1677 },
1678 )
1679 .await
1680 .map_err(|e| {
1681 (
1682 FirehoseError::RewardHandlerError(e),
1683 error_slot,
1684 )
1685 })?;
1686 }
1687 this_block_rewards = decoded_rewards;
1688 if let Some(ref mut stats) = thread_stats {
1689 stats.rewards_processed +=
1690 this_block_rewards.keyed_rewards.len() as u64;
1691 }
1692 }
1693 }
1694 DataFrame(_data_frame) => (),
1695 }
1696 }
1697 if block.slot == slot_range.end - 1 {
1698 let finish_time = std::time::Instant::now();
1699 let elapsed = finish_time.duration_since(start_time);
1700 log::info!(target: &log_target, "processed slot {}", block.slot);
1701 let elapsed_pretty = human_readable_duration(elapsed);
1702 log::info!(
1703 target: &log_target,
1704 "processed {} slots across {} epochs in {}.",
1705 slot_range.end - slot_range.start,
1706 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1707 elapsed_pretty
1708 );
1709 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1710 let summary: String = error_counts
1713 .iter()
1714 .enumerate()
1715 .filter_map(|(i, c)| {
1716 let v = c.load(Ordering::Relaxed);
1717 if v > 0 {
1718 Some(format!("{:03}({})", i, v))
1719 } else {
1720 None
1721 }
1722 })
1723 .collect::<Vec<_>>()
1724 .join(", ");
1725 if !summary.is_empty() {
1726 log::debug!(target: &log_target, "threads with errors: {}", summary);
1727 }
1728 return Ok(());
1729 }
1730 }
1731 if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1732 && last_counted_slot < expected_last_slot
1733 {
1734 }
1737 if let Some(ref mut stats) = thread_stats {
1738 stats.finish_time = Some(std::time::Instant::now());
1739 maybe_emit_stats(
1740 stats_tracking.as_ref(),
1741 thread_index,
1742 stats,
1743 &overall_slots_processed,
1744 &overall_blocks_processed,
1745 &overall_transactions_processed,
1746 &overall_entries_processed,
1747 &transactions_since_stats,
1748 &blocks_since_stats,
1749 &slots_since_stats,
1750 &last_pulse,
1751 start_time,
1752 )
1753 .await?;
1754 }
1755 if block_enabled {
1756 pending_skipped_slots.remove(&thread_index);
1757 }
1758 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1759 }
1760 Ok(())
1761 }
1762 .await
1763 {
1764 if is_shutdown_error(&err) {
1765 log::info!(
1766 target: &log_target,
1767 "shutdown requested; terminating firehose thread {}",
1768 thread_index
1769 );
1770 break;
1771 }
1772 let epoch = slot_to_epoch(slot);
1773 let item_index = match &err {
1774 FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1775 _ => 0,
1776 };
1777 let error_message = err.to_string();
1778 log::error!(
1779 target: &log_target,
1780 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1781 slot,
1782 epoch
1783 );
1784 log::error!(target: &log_target, "{}", error_message);
1785 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1786 || error_message.contains("Unknown CID version")
1787 {
1788 SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1791 }
1792 if let Some(on_error_cb) = on_error.clone() {
1793 let context = FirehoseErrorContext {
1794 thread_id: thread_index,
1795 slot,
1796 epoch,
1797 error_message: error_message.clone(),
1798 };
1799 if let Err(handler_err) = on_error_cb(thread_index, context).await {
1800 log::error!(
1801 target: &log_target,
1802 "on_error handler failed: {}",
1803 handler_err
1804 );
1805 }
1806 }
1807 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1809 log::warn!(
1810 target: &log_target,
1811 "restarting from slot {} at index {}",
1812 slot,
1813 item_index,
1814 );
1815 if slot <= last_counted_slot {
1819 slot_range.start = last_counted_slot.saturating_add(1);
1820 } else {
1821 slot_range.start = slot;
1822 }
1823 last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1825 if tracking_enabled
1826 && let Some(ref mut stats_ref) = thread_stats {
1827 stats_ref.slot_range.start = slot_range.start;
1828 stats_ref.slot_range.end = slot_range.end;
1829 }
1831 if block_enabled {
1832 pending_skipped_slots.remove(&thread_index);
1833 }
1834 skip_until_index = None;
1838 last_emitted_slot_global = last_emitted_slot;
1839 }
1840 });
1841 handles.push(handle);
1842 }
1843
1844 for handle in handles {
1846 handle.await.unwrap();
1847 }
1848 if stats_tracking.is_some() {
1849 let elapsed = firehose_start.elapsed();
1850 let elapsed_secs = elapsed.as_secs_f64();
1851 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1852 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1853 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1854 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1855 let total_errors: u64 = error_counts
1856 .iter()
1857 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1858 .sum();
1859 let overall_tps = if elapsed_secs > 0.0 {
1860 total_transactions as f64 / elapsed_secs
1861 } else {
1862 0.0
1863 };
1864 log::info!(
1865 target: LOG_MODULE,
1866 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1867 elapsed_secs,
1868 total_slots,
1869 total_blocks,
1870 total_leader_skipped,
1871 total_transactions,
1872 overall_tps,
1873 total_errors
1874 );
1875 }
1876 if shutdown_flag.load(Ordering::SeqCst) {
1877 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1878 } else {
1879 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1880 }
1881 Ok(())
1882}
1883
1884#[allow(clippy::result_large_err)]
1885pub fn firehose_geyser(
1892 rt: Arc<tokio::runtime::Runtime>,
1893 slot_range: Range<u64>,
1894 geyser_config_files: Option<&[PathBuf]>,
1895 index_base_url: &Url,
1896 client: &Client,
1897 on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1898 threads: u64,
1899) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1900 if threads == 0 {
1901 return Err((
1902 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1903 slot_range.start,
1904 ));
1905 }
1906 log::info!(target: LOG_MODULE, "starting firehose...");
1907 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1908 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1909 let mut entry_notifier_maybe = None;
1910 let mut block_meta_notifier_maybe = None;
1911 let mut transaction_notifier_maybe = None;
1912 if let Some(geyser_config_files) = geyser_config_files {
1913 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1914
1915 let service =
1916 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1917 confirmed_bank_receiver.clone(),
1918 true,
1919 geyser_config_files,
1920 )
1921 .map_err(|e| (e.into(), slot_range.start))?;
1922
1923 transaction_notifier_maybe = Some(
1924 service
1925 .get_transaction_notifier()
1926 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1927 .map_err(|e| (e, slot_range.start))?,
1928 );
1929
1930 entry_notifier_maybe = service.get_entry_notifier();
1931 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1932
1933 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1934 }
1935
1936 if entry_notifier_maybe.is_some() {
1937 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1938 } else {
1939 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1940 }
1941 log::info!(target: LOG_MODULE, "running on_load...");
1942 rt.spawn(on_load);
1943
1944 let slot_range = Arc::new(slot_range);
1945 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1946 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1947 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1948 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1949
1950 let subranges = generate_subranges(&slot_range, threads);
1952 if threads > 1 {
1953 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1954 }
1955
1956 let mut handles = Vec::new();
1957 let error_counts: Arc<Vec<AtomicU32>> =
1959 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1960
1961 for (i, slot_range) in subranges.into_iter().enumerate() {
1962 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1963 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1964 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1965 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1966 let client = client.clone();
1967 let error_counts = error_counts.clone();
1968
1969 let rt_clone = rt.clone();
1970
1971 let handle = std::thread::spawn(move || {
1972 rt_clone.block_on(async {
1973 firehose_geyser_thread(
1974 slot_range,
1975 transaction_notifier_maybe,
1976 entry_notifier_maybe,
1977 block_meta_notifier_maybe,
1978 confirmed_bank_sender,
1979 &client,
1980 if threads > 1 { Some(i) } else { None },
1981 error_counts,
1982 )
1983 .await
1984 .unwrap();
1985 });
1986 });
1987 handles.push(handle);
1988 }
1989
1990 for handle in handles {
1992 handle.join().unwrap();
1993 }
1994 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1995 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1996 block_meta_notifier.notify_block_metadata(
1997 u64::MAX,
1998 "unload",
1999 u64::MAX,
2000 "unload",
2001 &KeyedRewardsAndNumPartitions {
2002 keyed_rewards: vec![],
2003 num_partitions: None,
2004 },
2005 None,
2006 None,
2007 0,
2008 0,
2009 );
2010 }
2011 Ok(confirmed_bank_receiver)
2012}
2013
2014#[allow(clippy::too_many_arguments)]
2015#[allow(clippy::result_large_err)]
2016async fn firehose_geyser_thread(
2017 mut slot_range: Range<u64>,
2018 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
2019 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
2020 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
2021 confirmed_bank_sender: Sender<SlotNotification>,
2022 client: &Client,
2023 thread_index: Option<usize>,
2024 error_counts: Arc<Vec<AtomicU32>>,
2025) -> Result<(), (FirehoseError, u64)> {
2026 let start_time = std::time::Instant::now();
2027 let log_target = if let Some(thread_index) = thread_index {
2028 format!("{}::T{:03}", LOG_MODULE, thread_index)
2029 } else {
2030 LOG_MODULE.to_string()
2031 };
2032 let initial_slot_range = slot_range.clone();
2033 let mut skip_until_index = None;
2034 let mut last_counted_slot = slot_range.start.saturating_sub(1);
2035 while let Err((err, slot)) = async {
2037 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
2038 log::info!(
2039 target: &log_target,
2040 "slot range: {} (epoch {}) ... {} (epoch {})",
2041 slot_range.start,
2042 slot_to_epoch(slot_range.start),
2043 slot_range.end,
2044 slot_to_epoch(slot_range.end)
2045 );
2046
2047 log::info!(target: &log_target, "🚒 starting firehose...");
2048
2049 let mut current_slot: Option<u64> = None;
2051 for epoch_num in epoch_range.clone() {
2052 log::info!(target: &log_target, "entering epoch {}", epoch_num);
2053 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
2054 Ok(stream) => stream,
2055 Err(_) => {
2056 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
2057 }
2058 };
2059 let mut reader = NodeReader::new(stream);
2060
2061 let header_fut = reader.read_raw_header();
2062 let header = match timeout(OP_TIMEOUT, header_fut).await {
2063 Ok(res) => res
2064 .map_err(FirehoseError::ReadHeader)
2065 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2066 Err(_) => {
2067 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
2068 }
2069 };
2070 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
2071
2072 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
2073 let local_start = std::cmp::max(slot_range.start, epoch_start);
2074 let local_end_inclusive =
2075 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
2076 if local_start > local_end_inclusive {
2077 log::debug!(
2078 target: &log_target,
2079 "epoch {} has no overlap with thread range ({}..{}), skipping",
2080 epoch_num,
2081 slot_range.start,
2082 slot_range.end
2083 );
2084 continue;
2085 }
2086
2087 let mut todo_previous_blockhash = Hash::default();
2088 let mut todo_latest_entry_blockhash = Hash::default();
2089 last_counted_slot = local_start.saturating_sub(1);
2092 current_slot = None;
2093
2094 if local_start > epoch_start {
2095 let seek_slot = match timeout(
2098 OP_TIMEOUT,
2099 find_previous_indexed_slot(local_start, epoch_start, &log_target),
2100 )
2101 .await
2102 {
2103 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2104 Err(_) => {
2105 return Err((
2106 FirehoseError::OperationTimeout(
2107 "seek_to_previous_indexed_slot",
2108 ),
2109 current_slot.unwrap_or(slot_range.start),
2110 ));
2111 }
2112 };
2113 if let Some(seek_slot) = seek_slot {
2114 let seek_fut = reader.seek_to_slot(seek_slot);
2115 match timeout(OP_TIMEOUT, seek_fut).await {
2116 Ok(res) => {
2117 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2118 }
2119 Err(_) => {
2120 return Err((
2121 FirehoseError::OperationTimeout("seek_to_slot"),
2122 current_slot.unwrap_or(slot_range.start),
2123 ));
2124 }
2125 }
2126 }
2127 }
2128
2129 let mut item_index = 0;
2131 let mut displayed_skip_message = false;
2132 loop {
2133 let read_fut = reader.read_until_block();
2134 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2135 Ok(result) => result
2136 .map_err(FirehoseError::ReadUntilBlockError)
2137 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2138 Err(_) => {
2139 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2140 let restart_slot =
2141 current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2142 return Err((
2143 FirehoseError::OperationTimeout("read_until_block"),
2144 restart_slot,
2145 ));
2146 }
2147 };
2148 if nodes.is_empty() {
2149 log::info!(
2150 target: &log_target,
2151 "reached end of epoch {}",
2152 epoch_num
2153 );
2154 break;
2155 }
2156 if let Some(last_node) = nodes.0.last()
2165 && !last_node.get_node().is_block() {
2166 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2167 break;
2168 }
2169 let block = nodes
2170 .get_block()
2171 .map_err(FirehoseError::GetBlockError)
2172 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2173 log::debug!(
2174 target: &log_target,
2175 "read {} items from epoch {}, now at slot {}",
2176 item_index,
2177 epoch_num,
2178 block.slot
2179 );
2180 let slot = block.slot;
2181 if slot > local_end_inclusive {
2182 log::debug!(
2183 target: &log_target,
2184 "reached end of local slice at slot {} (epoch {}), stopping",
2185 slot,
2186 epoch_num
2187 );
2188 break;
2189 }
2190 if slot >= slot_range.end {
2191 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2192 return Ok(());
2196 }
2197 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2198 if slot < local_start {
2199 if slot.saturating_add(1) == local_start {
2200 log::debug!(
2201 target: &log_target,
2202 "priming reader with preceding slot {}, skipping",
2203 slot
2204 );
2205 } else {
2206 log::warn!(
2207 target: &log_target,
2208 "encountered slot {} before start of range {}, skipping",
2209 slot,
2210 local_start
2211 );
2212 }
2213 continue;
2214 }
2215 current_slot = Some(slot);
2216 let mut entry_index: usize = 0;
2217 let mut this_block_executed_transaction_count: u64 = 0;
2218 let mut this_block_entry_count: u64 = 0;
2219 let mut this_block_rewards = DecodedRewards::empty();
2220
2221 if slot <= last_counted_slot {
2222 log::debug!(
2223 target: &log_target,
2224 "duplicate block {}, already counted (last_counted={})",
2225 slot,
2226 last_counted_slot,
2227 );
2228 continue;
2229 }
2230
2231 nodes.each(|node_with_cid| -> Result<(), SharedError> {
2232 item_index += 1;
2233 if let Some(skip) = skip_until_index {
2239 if item_index < skip {
2240 if !displayed_skip_message {
2241 log::info!(
2242 target: &log_target,
2243 "skipping until index {} (at {})",
2244 skip,
2245 item_index
2246 );
2247 displayed_skip_message = true;
2248 }
2249 return Ok(());
2250 } else {
2251 log::info!(
2252 target: &log_target,
2253 "reached target index {}, resuming...",
2254 skip
2255 );
2256 skip_until_index = None;
2257 }
2258 }
2259 let node = node_with_cid.get_node();
2260
2261 use crate::node::Node::*;
2262 match node {
2263 Transaction(tx) => {
2264 let versioned_tx = tx.as_parsed()?;
2265 let reassembled_metadata = nodes.reassemble_dataframes(&tx.metadata)?;
2266
2267 let as_native_metadata = decode_transaction_status_meta_from_frame(
2268 block.slot,
2269 reassembled_metadata,
2270 )?;
2271
2272 let message_hash = {
2273 #[cfg(feature = "verify-transaction-signatures")]
2274 {
2275 versioned_tx.verify_and_hash_message()?
2276 }
2277 #[cfg(not(feature = "verify-transaction-signatures"))]
2278 {
2279 versioned_tx.message.hash()
2282 }
2283 };
2284 let signature = versioned_tx
2285 .signatures
2286 .first()
2287 .ok_or_else(|| {
2288 Box::new(std::io::Error::new(
2289 std::io::ErrorKind::InvalidData,
2290 "transaction missing signature",
2291 )) as SharedError
2292 })?;
2293 let is_vote = is_simple_vote_transaction(&versioned_tx);
2294
2295 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2296 transaction_notifier.notify_transaction(
2297 block.slot,
2298 tx.index.unwrap() as usize,
2299 signature,
2300 &message_hash,
2301 is_vote,
2302 &as_native_metadata,
2303 &versioned_tx,
2304 );
2305 }
2306
2307 }
2308 Entry(entry) => {
2309 let entry_hash = Hash::from(entry.hash.to_bytes());
2310 let entry_transaction_count = entry.transactions.len();
2311 let entry_transaction_count_u64 = entry_transaction_count as u64;
2312 let starting_transaction_index =
2313 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2314 Box::new(std::io::Error::other(
2315 "transaction index exceeds usize range",
2316 )) as SharedError
2317 })?;
2318 todo_latest_entry_blockhash = entry_hash;
2319 this_block_executed_transaction_count += entry_transaction_count_u64;
2320 this_block_entry_count += 1;
2321 if entry_notifier_maybe.is_none() {
2322 return Ok(());
2323 }
2324 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2325 let entry_summary = solana_entry::entry::EntrySummary {
2326 num_hashes: entry.num_hashes,
2327 hash: Hash::from(entry.hash.to_bytes()),
2328 num_transactions: entry_transaction_count_u64,
2329 };
2330 entry_notifier.notify_entry(
2331 block.slot,
2332 entry_index,
2333 &entry_summary,
2334 starting_transaction_index,
2335 );
2336 entry_index += 1;
2337 }
2338 Block(block) => {
2339 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2340 confirmed_bank_sender.send(notification).unwrap();
2341
2342 if block_meta_notifier_maybe.is_none() {
2343 last_counted_slot = block.slot;
2344 return Ok(());
2345 }
2346 let DecodedRewards {
2347 keyed_rewards,
2348 num_partitions,
2349 } = std::mem::take(&mut this_block_rewards);
2350 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2351 block_meta_notifier.notify_block_metadata(
2352 block.meta.parent_slot,
2353 todo_previous_blockhash.to_string().as_str(),
2354 block.slot,
2355 todo_latest_entry_blockhash.to_string().as_str(),
2356 &KeyedRewardsAndNumPartitions {
2357 keyed_rewards,
2358 num_partitions,
2359 },
2360 Some(block.meta.blocktime as i64),
2361 block.meta.block_height,
2362 this_block_executed_transaction_count,
2363 this_block_entry_count,
2364 );
2365 todo_previous_blockhash = todo_latest_entry_blockhash;
2366 last_counted_slot = block.slot;
2367 std::thread::yield_now();
2368 }
2369 Subset(_subset) => (),
2370 Epoch(_epoch) => (),
2371 Rewards(rewards) => {
2372 let reassembled = nodes.reassemble_dataframes(&rewards.data)?;
2373 if !reassembled.is_empty() {
2374 this_block_rewards = decode_rewards_from_frame(
2375 block.slot,
2376 reassembled,
2377 )?;
2378 } else {
2379 this_block_rewards = DecodedRewards::empty();
2380 }
2381 }
2382 DataFrame(_data_frame) => (),
2383 }
2384 Ok(())
2385 })
2386 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2387 if block.slot == slot_range.end - 1 {
2388 let finish_time = std::time::Instant::now();
2389 let elapsed = finish_time.duration_since(start_time);
2390 log::info!(target: &log_target, "processed slot {}", block.slot);
2391 let elapsed_pretty = human_readable_duration(elapsed);
2392 log::info!(
2393 target: &log_target,
2394 "processed {} slots across {} epochs in {}.",
2395 initial_slot_range.end - initial_slot_range.start,
2396 slot_to_epoch(initial_slot_range.end)
2397 + 1
2398 - slot_to_epoch(initial_slot_range.start),
2399 elapsed_pretty
2400 );
2401 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2402 let summary: String = error_counts
2405 .iter()
2406 .enumerate()
2407 .filter_map(|(i, c)| {
2408 let v = c.load(Ordering::Relaxed);
2409 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2410 })
2411 .collect::<Vec<_>>()
2412 .join(", ");
2413 if !summary.is_empty() {
2414 log::debug!(target: &log_target, "threads with errors: {}", summary);
2415 }
2416 return Ok(());
2417 }
2418 }
2419 }
2420 Ok(())
2421}
2422.await
2423{
2424 if is_shutdown_error(&err) {
2425 log::info!(
2426 target: &log_target,
2427 "shutdown requested; terminating firehose thread {:?}",
2428 thread_index
2429 );
2430 return Ok(());
2431 }
2432 log::error!(
2433 target: &log_target,
2434 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2435 slot,
2436 slot_to_epoch(slot)
2437 );
2438 log::error!(target: &log_target, "{}", err);
2439 let error_message = err.to_string();
2440 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2441 || error_message.contains("Unknown CID version")
2442 {
2443 SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2446 }
2447 let item_index = match err {
2448 FirehoseError::NodeDecodingError(item_index, _) => item_index,
2449 _ => 0,
2450 };
2451 let idx = thread_index.unwrap_or(0);
2453 error_counts[idx].fetch_add(1, Ordering::Relaxed);
2454 log::warn!(
2455 target: &log_target,
2456 "restarting from slot {} at index {}",
2457 slot,
2458 item_index,
2459 );
2460 if slot <= last_counted_slot {
2463 slot_range.start = last_counted_slot.saturating_add(1);
2464 } else {
2465 slot_range.start = slot;
2466 }
2467 skip_until_index = None;
2471}
2472 Ok(())
2473}
2474
2475#[inline]
2476fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2477 if !(1..=2).contains(&versioned_tx.signatures.len()) {
2478 return false;
2479 }
2480
2481 if !matches!(
2482 versioned_tx.version(),
2483 solana_transaction::versioned::TransactionVersion::Legacy(_)
2484 ) {
2485 return false;
2486 }
2487
2488 let instructions = versioned_tx.message.instructions();
2489 if instructions.len() != 1 {
2490 return false;
2491 }
2492
2493 let program_index = instructions[0].program_id_index as usize;
2494 versioned_tx
2495 .message
2496 .static_account_keys()
2497 .get(program_index)
2498 .map(|program_id| program_id == &vote_program_id())
2499 .unwrap_or(false)
2500}
2501
2502#[inline(always)]
2503fn convert_proto_rewards(
2504 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2505) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2506 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2507 for proto_reward in proto_rewards.rewards.iter() {
2508 let reward = RewardInfo {
2509 reward_type: match proto_reward.reward_type - 1 {
2510 0 => RewardType::Fee,
2511 1 => RewardType::Rent,
2512 2 => RewardType::Staking,
2513 3 => RewardType::Voting,
2514 typ => {
2515 return Err(Box::new(std::io::Error::other(format!(
2516 "unsupported reward type {}",
2517 typ
2518 ))));
2519 }
2520 },
2521 lamports: proto_reward.lamports,
2522 post_balance: proto_reward.post_balance,
2523 commission: proto_reward.commission.parse::<u8>().ok(),
2524 };
2525 let pubkey = proto_reward
2526 .pubkey
2527 .parse::<Address>()
2528 .map_err(|err| Box::new(err) as SharedError)?;
2529 keyed_rewards.push((pubkey, reward));
2530 }
2531 Ok(keyed_rewards)
2532}
2533
2534#[inline]
2535pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2537 let total = slot_range.end - slot_range.start;
2538 let slots_per_thread = total / threads;
2539 let remainder = total % threads;
2540
2541 let ranges: Vec<Range<u64>> = (0..threads)
2542 .map(|i| {
2543 let extra_slot = if i < remainder { 1 } else { 0 };
2545 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2546 let end = start + slots_per_thread + extra_slot;
2547 start..end
2548 })
2549 .collect();
2550
2551 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2553 assert_eq!(
2554 total_covered, total,
2555 "Range generation failed: {} threads should cover {} slots but only cover {}",
2556 threads, total, total_covered
2557 );
2558
2559 for i in 1..ranges.len() {
2561 assert_eq!(
2562 ranges[i - 1].end,
2563 ranges[i].start,
2564 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2565 i - 1,
2566 ranges[i - 1].end,
2567 i,
2568 ranges[i].start
2569 );
2570 }
2571
2572 log::info!(
2573 target: LOG_MODULE,
2574 "Generated {} thread ranges covering {} slots total",
2575 threads,
2576 total_covered
2577 );
2578 ranges
2579}
2580
2581fn human_readable_duration(duration: std::time::Duration) -> String {
2582 if duration.is_zero() {
2583 return "0s".into();
2584 }
2585 let total_secs = duration.as_secs();
2586 if total_secs < 60 {
2587 let secs_f = duration.as_secs_f64();
2588 if total_secs == 0 {
2589 format!("{:.2}s", secs_f)
2590 } else if duration.subsec_millis() == 0 {
2591 format!("{}s", total_secs)
2592 } else {
2593 format!("{:.2}s", secs_f)
2594 }
2595 } else {
2596 let mut secs = total_secs;
2597 let days = secs / 86_400;
2598 secs %= 86_400;
2599 let hours = secs / 3_600;
2600 secs %= 3_600;
2601 let minutes = secs / 60;
2602 secs %= 60;
2603 if days > 0 {
2604 if hours > 0 {
2605 format!("{days}d{hours}h")
2606 } else {
2607 format!("{days}d")
2608 }
2609 } else if hours > 0 {
2610 if minutes > 0 {
2611 format!("{hours}h{minutes}m")
2612 } else {
2613 format!("{hours}h")
2614 }
2615 } else if minutes > 0 {
2616 if secs > 0 {
2617 format!("{minutes}m{secs}s")
2618 } else {
2619 format!("{minutes}m")
2620 }
2621 } else {
2622 format!("{secs}s")
2623 }
2624 }
2625}
2626
2627#[cfg(test)]
2628fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2629 Box::pin(async move {
2630 let elapsed = stats.start_time.elapsed();
2631 let elapsed_secs = elapsed.as_secs_f64();
2632 let tps = if elapsed_secs > 0.0 {
2633 stats.transactions_processed as f64 / elapsed_secs
2634 } else {
2635 0.0
2636 };
2637 log::info!(
2638 target: LOG_MODULE,
2639 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2640 stats.thread_stats.current_slot,
2641 stats.slots_processed,
2642 stats.blocks_processed,
2643 stats.transactions_processed,
2644 stats.entries_processed,
2645 stats.rewards_processed,
2646 elapsed_secs,
2647 tps
2648 );
2649 Ok(())
2650 })
2651}
2652
2653#[cfg(test)]
2654use futures_util::FutureExt;
2655#[cfg(test)]
2656use serial_test::serial;
2657#[cfg(test)]
2658use std::sync::{Mutex, OnceLock};
2659
2660#[cfg(test)]
2661async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2662 use std::sync::Arc;
2663 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2664
2665 let found = Arc::new(AtomicBool::new(false));
2666 let observed_total = Arc::new(AtomicU64::new(0));
2667 let observed_non_vote = Arc::new(AtomicU64::new(0));
2668
2669 let found_block = found.clone();
2670 let observed_total_block = observed_total.clone();
2671 let target_slot_block = slot;
2672 let target_slot_tx = slot;
2673 let observed_non_vote_tx = observed_non_vote.clone();
2674
2675 firehose(
2676 1,
2677 false,
2678 None,
2679 target_slot_block..(target_slot_block + 1),
2680 Some(move |_thread_id: usize, block: BlockData| {
2681 let found_block = found_block.clone();
2682 let observed_total_block = observed_total_block.clone();
2683 async move {
2684 if block.slot() == target_slot_block {
2685 assert!(
2686 !block.was_skipped(),
2687 "slot {target_slot_block} was marked leader skipped",
2688 );
2689 if let BlockData::Block {
2690 executed_transaction_count,
2691 ..
2692 } = block
2693 {
2694 found_block.store(true, Ordering::Relaxed);
2695 observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2696 }
2697 }
2698 Ok(())
2699 }
2700 .boxed()
2701 }),
2702 Some(move |_thread_id: usize, transaction: TransactionData| {
2703 let observed_non_vote_tx = observed_non_vote_tx.clone();
2704 async move {
2705 if transaction.slot == target_slot_tx && !transaction.is_vote {
2706 observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2707 }
2708 Ok(())
2709 }
2710 .boxed()
2711 }),
2712 None::<OnEntryFn>,
2713 None::<OnRewardFn>,
2714 None::<OnErrorFn>,
2715 None::<OnStatsTrackingFn>,
2716 None,
2717 )
2718 .await
2719 .unwrap();
2720
2721 assert!(
2722 found.load(Ordering::Relaxed),
2723 "target slot {slot} was not processed"
2724 );
2725 let observed_total = observed_total.load(Ordering::Relaxed);
2726 let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2727 assert!(
2728 observed_total > 0,
2729 "slot {slot} executed transaction count was zero"
2730 );
2731 assert!(
2732 observed_total >= min_executed,
2733 "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2734 );
2735 log::info!(
2736 target: LOG_MODULE,
2737 "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2738 observed_total,
2739 observed_non_vote
2740 );
2741}
2742
2743#[cfg(test)]
2744async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2745 use crate::index::slot_to_offset;
2746 use crate::node::Node;
2747
2748 let epoch = slot_to_epoch(slot);
2749 let client = crate::network::create_http_client();
2750 let stream = fetch_epoch_stream(epoch, &client).await;
2751 let mut reader = NodeReader::new(stream);
2752 reader
2753 .seek_to_slot(slot)
2754 .await
2755 .map_err(|err| Box::new(err) as SharedError)?;
2756
2757 let nodes = reader.read_until_block().await?;
2758 let mut transactions = 0u64;
2759 let mut entries = 0u64;
2760 let mut entry_tx_total = 0u64;
2761 let mut dataframes = 0u64;
2762 let mut rewards = 0u64;
2763 let mut subsets = 0u64;
2764 let mut epochs = 0u64;
2765 let mut block_slot = None;
2766 let mut block_entries = None;
2767 let first_kind = nodes
2768 .0
2769 .first()
2770 .map(|node| node.get_node())
2771 .map(|node| match node {
2772 Node::Transaction(_) => "transaction",
2773 Node::Entry(_) => "entry",
2774 Node::Block(_) => "block",
2775 Node::Subset(_) => "subset",
2776 Node::Epoch(_) => "epoch",
2777 Node::Rewards(_) => "rewards",
2778 Node::DataFrame(_) => "dataframe",
2779 })
2780 .unwrap_or("none");
2781
2782 for node in &nodes.0 {
2783 match node.get_node() {
2784 Node::Transaction(_) => {
2785 transactions += 1;
2786 }
2787 Node::Entry(entry) => {
2788 entries += 1;
2789 entry_tx_total += entry.transactions.len() as u64;
2790 }
2791 Node::Block(block) => {
2792 block_slot = Some(block.slot);
2793 block_entries = Some(block.entries.len());
2794 }
2795 Node::Subset(_) => {
2796 subsets += 1;
2797 }
2798 Node::Epoch(_) => {
2799 epochs += 1;
2800 }
2801 Node::Rewards(_) => {
2802 rewards += 1;
2803 }
2804 Node::DataFrame(_) => {
2805 dataframes += 1;
2806 }
2807 }
2808 }
2809
2810 log::info!(
2811 target: LOG_MODULE,
2812 "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2813 nodes.len(),
2814 first_kind,
2815 transactions,
2816 entries,
2817 entry_tx_total,
2818 block_slot,
2819 block_entries,
2820 dataframes,
2821 rewards,
2822 subsets,
2823 epochs
2824 );
2825
2826 if slot > 0 {
2827 let mut found_previous = None;
2828 for delta in 1..=5 {
2829 let candidate = slot.saturating_sub(delta);
2830 match slot_to_offset(candidate).await {
2831 Ok(offset) => {
2832 found_previous = Some((candidate, offset));
2833 break;
2834 }
2835 Err(err) => {
2836 log::info!(
2837 target: LOG_MODULE,
2838 "slot {slot} previous lookup {candidate} failed: {err}"
2839 );
2840 }
2841 }
2842 }
2843 if let Some((candidate, offset)) = found_previous {
2844 log::info!(
2845 target: LOG_MODULE,
2846 "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2847 );
2848 } else {
2849 log::info!(
2850 target: LOG_MODULE,
2851 "slot {slot} no previous offsets found within 5 slots"
2852 );
2853 }
2854 }
2855
2856 Ok(())
2857}
2858
2859#[tokio::test(flavor = "multi_thread")]
2860async fn test_firehose_epoch_800() {
2861 use dashmap::DashSet;
2862 use std::sync::atomic::{AtomicU64, Ordering};
2863 solana_logger::setup_with_default("info");
2864 const THREADS: usize = 4;
2865 const NUM_SLOTS_TO_COVER: u64 = 50;
2866 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2867 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2868 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2869 static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2870 static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2871 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2872 let stats_tracking = StatsTracking {
2873 on_stats: log_stats_handler,
2874 tracking_interval_slots: 10,
2875 };
2876
2877 for prev in PREV_BLOCK.iter() {
2878 prev.store(0, Ordering::Relaxed);
2879 }
2880 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2881 NUM_BLOCKS.store(0, Ordering::Relaxed);
2882 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2883 SEEN_SLOTS.get_or_init(DashSet::new).clear();
2884 SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2885
2886 firehose(
2887 THREADS.try_into().unwrap(),
2888 false,
2889 None,
2890 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2891 Some(|thread_id: usize, block: BlockData| {
2892 async move {
2893 let _prev =
2894 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2895 if block.was_skipped() {
2896 log::info!(
2897 target: LOG_MODULE,
2898 "leader skipped block {} on thread {}",
2899 block.slot(),
2900 thread_id,
2901 );
2902 } else {
2903 }
2910
2911 let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2912 if block.was_skipped() {
2913 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2914 SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2915 } else {
2916 if first_time {
2917 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2918 if let BlockData::Block {
2919 executed_transaction_count,
2920 ..
2921 } = &block
2922 {
2923 let executed = *executed_transaction_count;
2924 let _ = MIN_TRANSACTIONS.fetch_update(
2925 Ordering::Relaxed,
2926 Ordering::Relaxed,
2927 |current| {
2928 if executed < current {
2929 Some(executed)
2930 } else {
2931 None
2932 }
2933 },
2934 );
2935 }
2936 }
2937 }
2938 Ok(())
2939 }
2940 .boxed()
2941 }),
2942 None::<OnTxFn>,
2943 None::<OnEntryFn>,
2944 None::<OnRewardFn>,
2945 None::<OnErrorFn>,
2946 Some(stats_tracking),
2947 None,
2948 )
2949 .await
2950 .unwrap();
2951 let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2952 assert_eq!(
2953 seen, NUM_SLOTS_TO_COVER,
2954 "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2955 );
2956 let mut skipped: Vec<u64> = SEEN_SKIPPED
2957 .get_or_init(DashSet::new)
2958 .iter()
2959 .map(|v| *v)
2960 .collect();
2961 skipped.sort_unstable();
2962 const EXPECTED_SKIPPED: [u64; 6] = [
2964 345_600_004,
2965 345_600_005,
2966 345_600_008,
2967 345_600_009,
2968 345_600_010,
2969 345_600_011,
2970 ];
2971 assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2972 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2973}
2974
2975#[tokio::test(flavor = "multi_thread")]
2976async fn test_firehose_target_slot_transactions() {
2977 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2978 solana_logger::setup_with_default("info");
2979 const TARGET_SLOT: u64 = 376_273_722;
2980 const SLOT_RADIUS: u64 = 50;
2981 const EXPECTED_TRANSACTIONS: u64 = 1414;
2982 const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2983 static FOUND: AtomicBool = AtomicBool::new(false);
2984 static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2985 static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2986
2987 FOUND.store(false, Ordering::Relaxed);
2988 OBSERVED_TXS.store(0, Ordering::Relaxed);
2989 OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2990
2991 firehose(
2992 4,
2993 false,
2994 None,
2995 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2996 Some(|_thread_id: usize, block: BlockData| {
2997 async move {
2998 if block.slot() == TARGET_SLOT {
2999 assert!(
3000 !block.was_skipped(),
3001 "target slot {TARGET_SLOT} was marked leader skipped",
3002 );
3003 if let BlockData::Block {
3004 executed_transaction_count,
3005 ..
3006 } = block
3007 {
3008 OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
3009 FOUND.store(true, Ordering::Relaxed);
3010 assert_eq!(
3011 executed_transaction_count, EXPECTED_TRANSACTIONS,
3012 "unexpected transaction count for slot {TARGET_SLOT}"
3013 );
3014 assert_eq!(
3015 OBSERVED_NON_VOTE.load(Ordering::Relaxed),
3016 EXPECTED_NON_VOTE_TRANSACTIONS,
3017 "unexpected non-vote transaction count for slot {TARGET_SLOT}"
3018 );
3019 }
3020 }
3021 Ok(())
3022 }
3023 .boxed()
3024 }),
3025 Some(|_thread_id: usize, transaction: TransactionData| {
3026 async move {
3027 if transaction.slot == TARGET_SLOT && !transaction.is_vote {
3028 OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
3029 }
3030 Ok(())
3031 }
3032 .boxed()
3033 }),
3034 None::<OnEntryFn>,
3035 None::<OnRewardFn>,
3036 None::<OnErrorFn>,
3037 None::<OnStatsTrackingFn>,
3038 None,
3039 )
3040 .await
3041 .unwrap();
3042
3043 assert!(
3044 FOUND.load(Ordering::Relaxed),
3045 "target slot was not processed"
3046 );
3047 assert_eq!(
3048 OBSERVED_TXS.load(Ordering::Relaxed),
3049 EXPECTED_TRANSACTIONS,
3050 "recorded transaction count mismatch"
3051 );
3052}
3053
3054#[cfg(test)]
3055#[serial]
3056#[tokio::test(flavor = "multi_thread")]
3057async fn test_firehose_epoch_900_boundary_window_sequential_monotonic_transactions() {
3058 use std::sync::{
3059 Arc, Mutex,
3060 atomic::{AtomicU64, Ordering},
3061 };
3062
3063 solana_logger::setup_with_default("info");
3064 const SLOT_COUNT: u64 = 100;
3065 const THREADS: u64 = 4;
3066 const TEST_BUFFER_WINDOW: &str = "4GiB";
3067
3068 let (epoch_900_start, _) = epoch_to_slot_range(900);
3069 let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3070
3071 let last_seen_tx_slot = Arc::new(Mutex::new(slot_range.start));
3072 let observed_txs = Arc::new(AtomicU64::new(0));
3073 let stats_tracking = StatsTracking {
3074 on_stats: log_stats_handler,
3075 tracking_interval_slots: 100,
3076 };
3077 let test_buffer_window_bytes = crate::system::parse_buffer_window_bytes(TEST_BUFFER_WINDOW)
3078 .expect("valid test buffer window");
3079
3080 firehose(
3081 THREADS,
3082 true,
3083 Some(test_buffer_window_bytes),
3084 slot_range.clone(),
3085 None::<OnBlockFn>,
3086 Some({
3087 let last_seen_tx_slot = last_seen_tx_slot.clone();
3088 let observed_txs = observed_txs.clone();
3089 move |_thread_id: usize, transaction: TransactionData| {
3090 let last_seen_tx_slot = last_seen_tx_slot.clone();
3091 let observed_txs = observed_txs.clone();
3092 async move {
3093 let mut previous = last_seen_tx_slot.lock().unwrap();
3094 assert!(
3097 transaction.slot >= *previous,
3098 "transaction slot regressed: prev={}, current={}",
3099 *previous,
3100 transaction.slot
3101 );
3102 *previous = transaction.slot;
3103 observed_txs.fetch_add(1, Ordering::Relaxed);
3104 Ok(())
3105 }
3106 .boxed()
3107 }
3108 }),
3109 None::<OnEntryFn>,
3110 None::<OnRewardFn>,
3111 None::<OnErrorFn>,
3112 Some(stats_tracking),
3113 None,
3114 )
3115 .await
3116 .unwrap();
3117
3118 assert!(
3119 observed_txs.load(Ordering::Relaxed) > 0,
3120 "expected to observe at least one transaction in slots [{}, {})",
3121 slot_range.start,
3122 slot_range.end
3123 );
3124}
3125
3126#[cfg(test)]
3127#[serial]
3128#[tokio::test(flavor = "multi_thread")]
3129async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
3130 solana_logger::setup_with_default("info");
3131 assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
3132}
3133
3134#[cfg(test)]
3135#[serial]
3136#[tokio::test(flavor = "multi_thread")]
3137async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
3138 solana_logger::setup_with_default("info");
3139 assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
3140}
3141
3142#[cfg(test)]
3143#[serial]
3144#[tokio::test(flavor = "multi_thread")]
3145async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
3146 solana_logger::setup_with_default("info");
3147 assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
3148}
3149
3150#[cfg(test)]
3151#[serial]
3152#[tokio::test(flavor = "multi_thread")]
3153async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
3154 solana_logger::setup_with_default("info");
3155 assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
3156}
3157
3158#[cfg(test)]
3159#[ignore]
3160#[serial]
3161#[tokio::test(flavor = "multi_thread")]
3162async fn debug_epoch_720_slot_311173980_node_summary() {
3163 solana_logger::setup_with_default("info");
3164 const SLOTS: &[u64] = &[
3165 311_173_980,
3166 311_225_232,
3167 311_175_860,
3168 311_134_608,
3169 376_273_722,
3170 ];
3171 for slot in SLOTS {
3172 log_slot_node_summary(*slot).await.expect("slot summary");
3173 }
3174}
3175
3176#[tokio::test(flavor = "multi_thread")]
3177async fn test_firehose_epoch_850_has_logs() {
3178 use std::sync::atomic::{AtomicU64, Ordering};
3179 solana_logger::setup_with_default("info");
3180 const START_SLOT: u64 = 367_200_075; const SLOT_COUNT: u64 = 50;
3182 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3183
3184 TOTAL_TXS.store(0, Ordering::Relaxed);
3185
3186 firehose(
3187 4,
3188 false,
3189 None,
3190 START_SLOT..(START_SLOT + SLOT_COUNT),
3191 None::<OnBlockFn>,
3192 Some(|_thread_id: usize, transaction: TransactionData| {
3193 async move {
3194 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3195 if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3196 let has_logs = logs.iter().any(|msg| !msg.is_empty());
3197 assert_eq!(has_logs, true);
3198 }
3199 Ok(())
3200 }
3201 .boxed()
3202 }),
3203 None::<OnEntryFn>,
3204 None::<OnRewardFn>,
3205 None::<OnErrorFn>,
3206 None::<OnStatsTrackingFn>,
3207 None,
3208 )
3209 .await
3210 .unwrap();
3211
3212 assert!(
3213 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3214 "no transactions observed in epoch 850 range"
3215 );
3216}
3217
3218#[tokio::test(flavor = "multi_thread")]
3219async fn test_firehose_epoch_850_votes_present() {
3220 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3221 solana_logger::setup_with_default("info");
3222 const TARGET_SLOT: u64 = 367_200_100; const SLOT_RADIUS: u64 = 10;
3224 static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3225 static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3226 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3227
3228 SEEN_BLOCK.store(false, Ordering::Relaxed);
3229 VOTE_TXS.store(0, Ordering::Relaxed);
3230 TOTAL_TXS.store(0, Ordering::Relaxed);
3231
3232 firehose(
3233 2,
3234 false,
3235 None,
3236 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3237 Some(|_thread_id: usize, block: BlockData| {
3238 async move {
3239 if block.slot() == TARGET_SLOT {
3240 assert!(
3241 !block.was_skipped(),
3242 "target slot {TARGET_SLOT} was marked leader skipped",
3243 );
3244 SEEN_BLOCK.store(true, Ordering::Relaxed);
3245 }
3246 Ok(())
3247 }
3248 .boxed()
3249 }),
3250 Some(|_thread_id: usize, transaction: TransactionData| {
3251 async move {
3252 if transaction.slot == TARGET_SLOT {
3253 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3254 if transaction.is_vote {
3255 VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3256 }
3257 }
3258 Ok(())
3259 }
3260 .boxed()
3261 }),
3262 None::<OnEntryFn>,
3263 None::<OnRewardFn>,
3264 None::<OnErrorFn>,
3265 None::<OnStatsTrackingFn>,
3266 None,
3267 )
3268 .await
3269 .unwrap();
3270
3271 assert!(
3272 SEEN_BLOCK.load(Ordering::Relaxed),
3273 "target slot was not processed"
3274 );
3275 assert!(
3276 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3277 "no transactions counted in target slot"
3278 );
3279 assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3280}
3281
3282#[cfg(test)]
3283#[serial]
3284#[tokio::test(flavor = "multi_thread")]
3285async fn test_firehose_restart_loses_coverage_without_reset() {
3286 use std::collections::HashMap;
3287 solana_logger::setup_with_default("info");
3288 const THREADS: usize = 1;
3289 const START_SLOT: u64 = 345_600_000;
3290 const NUM_SLOTS: u64 = 8;
3291
3292 static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3293 COVERAGE
3294 .get_or_init(|| Mutex::new(HashMap::new()))
3295 .lock()
3296 .unwrap()
3297 .clear();
3298 static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3299 static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3300 FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3301 SEEN_BLOCKS.store(0, Ordering::Relaxed);
3302
3303 firehose(
3304 THREADS.try_into().unwrap(),
3305 false,
3306 None,
3307 START_SLOT..(START_SLOT + NUM_SLOTS),
3308 Some(|_thread_id: usize, block: BlockData| {
3309 async move {
3310 if !block.was_skipped()
3312 && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3313 && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3314 {
3315 return Err("synthetic handler failure to exercise restart".into());
3316 }
3317 let mut coverage = COVERAGE
3318 .get_or_init(|| Mutex::new(HashMap::new()))
3319 .lock()
3320 .unwrap();
3321 *coverage.entry(block.slot()).or_insert(0) += 1;
3322 if !block.was_skipped() {
3323 SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3324 }
3325 Ok(())
3326 }
3327 .boxed()
3328 }),
3329 None::<OnTxFn>,
3330 None::<OnEntryFn>,
3331 None::<OnRewardFn>,
3332 None::<OnErrorFn>,
3333 None::<OnStatsTrackingFn>,
3334 None,
3335 )
3336 .await
3337 .unwrap();
3338
3339 let coverage = COVERAGE.get().unwrap().lock().unwrap();
3340 for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3341 assert!(
3342 coverage.contains_key(&slot),
3343 "missing coverage for slot {slot} after restart"
3344 );
3345 }
3346}
3347
3348#[cfg(test)]
3349#[serial]
3350#[tokio::test(flavor = "multi_thread")]
3351async fn test_firehose_gap_coverage_near_known_missing_range() {
3352 use std::collections::HashSet;
3353 solana_logger::setup_with_default("info");
3354 const GAP_START: u64 = 378864000;
3355 const START_SLOT: u64 = GAP_START - 1000;
3356 const END_SLOT: u64 = GAP_START + 1000;
3357 const THREADS: usize = 16;
3358
3359 static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3360 COVERAGE
3361 .get_or_init(|| Mutex::new(HashSet::new()))
3362 .lock()
3363 .unwrap()
3364 .clear();
3365
3366 firehose(
3367 THREADS.try_into().unwrap(),
3368 false,
3369 None,
3370 START_SLOT..(END_SLOT + 1),
3371 Some(|_thread_id: usize, block: BlockData| {
3372 async move {
3373 if block.was_skipped() {
3374 return Ok(());
3375 }
3376 let slot = block.slot();
3377 COVERAGE
3378 .get_or_init(|| Mutex::new(HashSet::new()))
3379 .lock()
3380 .unwrap()
3381 .insert(slot);
3382 Ok(())
3383 }
3384 .boxed()
3385 }),
3386 None::<OnTxFn>,
3387 None::<OnEntryFn>,
3388 None::<OnRewardFn>,
3389 None::<OnErrorFn>,
3390 None::<OnStatsTrackingFn>,
3391 None,
3392 )
3393 .await
3394 .unwrap();
3395
3396 let mut coverage = COVERAGE
3397 .get_or_init(|| Mutex::new(HashSet::new()))
3398 .lock()
3399 .unwrap()
3400 .clone();
3401
3402 coverage.insert(378864396);
3404 coverage.insert(378864397);
3405 coverage.insert(378864398);
3406 coverage.insert(378864399);
3407
3408 let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3409 let missing: Vec<u64> = expected
3410 .iter()
3411 .copied()
3412 .filter(|slot| !coverage.contains(slot))
3413 .collect();
3414 assert!(
3415 missing.is_empty(),
3416 "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3417 missing.len(),
3418 &missing[..missing.len().min(10)]
3419 );
3420}