jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7    block_metadata_notifier_interface::BlockMetadataNotifier,
8    geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14    optimistically_confirmed_bank_tracker::SlotNotification,
15    transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21    fmt::Display,
22    future::Future,
23    io,
24    ops::Range,
25    path::PathBuf,
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29    },
30};
31use thiserror::Error;
32use tokio::{
33    sync::broadcast::{self, error::TryRecvError},
34    time::timeout,
35};
36
37use crate::{
38    LOG_MODULE, SharedError,
39    epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
40    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
41    node_reader::NodeReader,
42    utils,
43};
44
45// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
46// header, seeking, reading next block). Adjust here to tune stall detection/restart
47// aggressiveness.
48const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49// Epochs earlier than this were bincode-encoded in Old Faithful.
50const BINCODE_EPOCH_CUTOFF: u64 = 157;
51
52fn poll_shutdown(
53    flag: &Arc<std::sync::atomic::AtomicBool>,
54    receiver: &mut Option<broadcast::Receiver<()>>,
55) -> bool {
56    if let Some(rx) = receiver {
57        match rx.try_recv() {
58            Ok(_) | Err(TryRecvError::Lagged(_)) => {
59                flag.store(true, Ordering::SeqCst);
60            }
61            Err(TryRecvError::Closed) => {
62                flag.store(true, Ordering::SeqCst);
63            }
64            Err(TryRecvError::Empty) => {}
65        }
66    }
67    flag.load(Ordering::SeqCst)
68}
69
70fn is_shutdown_error(err: &FirehoseError) -> bool {
71    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
72        inner
73            .downcast_ref::<io::Error>()
74            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
75            .unwrap_or(false)
76    }
77
78    match err {
79        FirehoseError::BlockHandlerError(inner)
80        | FirehoseError::TransactionHandlerError(inner)
81        | FirehoseError::EntryHandlerError(inner)
82        | FirehoseError::RewardHandlerError(inner)
83        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
84        _ => false,
85    }
86}
87
88/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
89/// the firehose.
90#[derive(Debug, Error)]
91pub enum FirehoseError {
92    /// HTTP client error surfaced from `reqwest`.
93    Reqwest(reqwest::Error),
94    /// Failure while reading the Old Faithful CAR header.
95    ReadHeader(SharedError),
96    /// Error emitted by the Solana Geyser plugin service.
97    GeyserPluginService(GeyserPluginServiceError),
98    /// Transaction notifier could not be acquired from the Geyser service.
99    FailedToGetTransactionNotifier,
100    /// Failure while reading data until the next block boundary.
101    ReadUntilBlockError(SharedError),
102    /// Failure while fetching an individual block.
103    GetBlockError(SharedError),
104    /// Failed to decode a node at the given index.
105    NodeDecodingError(usize, SharedError),
106    /// Error surfaced when querying the slot offset index.
107    SlotOffsetIndexError(SlotOffsetIndexError),
108    /// Failure while seeking to a slot within the Old Faithful CAR stream.
109    SeekToSlotError(SharedError),
110    /// Error surfaced during the plugin `on_load` stage.
111    OnLoadError(SharedError),
112    /// Error emitted while invoking the stats handler.
113    OnStatsHandlerError(SharedError),
114    /// Timeout reached while waiting for a firehose operation.
115    OperationTimeout(&'static str),
116    /// Transaction handler returned an error.
117    TransactionHandlerError(SharedError),
118    /// Entry handler returned an error.
119    EntryHandlerError(SharedError),
120    /// Reward handler returned an error.
121    RewardHandlerError(SharedError),
122    /// Block handler returned an error.
123    BlockHandlerError(SharedError),
124}
125
126unsafe impl Send for FirehoseError {}
127unsafe impl Sync for FirehoseError {}
128
129impl Display for FirehoseError {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        match self {
132            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
133            FirehoseError::ReadHeader(error) => {
134                write!(f, "Error reading header: {}", error)
135            }
136            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
137                f,
138                "Error initializing geyser plugin service: {}",
139                geyser_plugin_service_error
140            ),
141            FirehoseError::FailedToGetTransactionNotifier => write!(
142                f,
143                "Failed to get transaction notifier from GeyserPluginService"
144            ),
145            FirehoseError::ReadUntilBlockError(error) => {
146                write!(f, "Error reading until block: {}", error)
147            }
148            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
149            FirehoseError::NodeDecodingError(item_index, error) => {
150                write!(
151                    f,
152                    "Error seeking, reading data from, or decoding data for data node {}: {}",
153                    item_index, error
154                )
155            }
156            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
157                f,
158                "Error getting info from slot offset index: {}",
159                slot_offset_index_error
160            ),
161            FirehoseError::SeekToSlotError(error) => {
162                write!(f, "Error seeking to slot: {}", error)
163            }
164            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
165            FirehoseError::OnStatsHandlerError(error) => {
166                write!(f, "Stats handler error: {}", error)
167            }
168            FirehoseError::OperationTimeout(op) => {
169                write!(f, "Timeout while waiting for operation: {}", op)
170            }
171            FirehoseError::TransactionHandlerError(error) => {
172                write!(f, "Transaction handler error: {}", error)
173            }
174            FirehoseError::EntryHandlerError(error) => {
175                write!(f, "Entry handler error: {}", error)
176            }
177            FirehoseError::RewardHandlerError(error) => {
178                write!(f, "Reward handler error: {}", error)
179            }
180            FirehoseError::BlockHandlerError(error) => {
181                write!(f, "Block handler error: {}", error)
182            }
183        }
184    }
185}
186
187impl From<reqwest::Error> for FirehoseError {
188    fn from(e: reqwest::Error) -> Self {
189        FirehoseError::Reqwest(e)
190    }
191}
192
193impl From<GeyserPluginServiceError> for FirehoseError {
194    fn from(e: GeyserPluginServiceError) -> Self {
195        FirehoseError::GeyserPluginService(e)
196    }
197}
198
199impl From<SlotOffsetIndexError> for FirehoseError {
200    fn from(e: SlotOffsetIndexError) -> Self {
201        FirehoseError::SlotOffsetIndexError(e)
202    }
203}
204
205/// Per-thread progress information emitted by the firehose runner.
206#[derive(Clone, PartialEq, Eq, Hash, Debug)]
207pub struct ThreadStats {
208    /// Identifier of the worker thread reporting the stats.
209    pub thread_id: usize,
210    /// Timestamp captured when the thread began processing.
211    pub start_time: std::time::Instant,
212    /// Timestamp captured when the thread finished, if finished.
213    pub finish_time: Option<std::time::Instant>,
214    /// Slot range currently assigned to the thread (half-open, may shrink on restart).
215    pub slot_range: Range<u64>,
216    /// Original slot range assigned to the thread (half-open, never modified).
217    pub initial_slot_range: Range<u64>,
218    /// Latest slot processed by the thread.
219    pub current_slot: u64,
220    /// Total slots processed by the thread.
221    pub slots_processed: u64,
222    /// Number of blocks successfully processed.
223    pub blocks_processed: u64,
224    /// Number of slots skipped by the cluster leader.
225    pub leader_skipped_slots: u64,
226    /// Total transactions processed.
227    pub transactions_processed: u64,
228    /// Total entries processed.
229    pub entries_processed: u64,
230    /// Total rewards processed.
231    pub rewards_processed: u64,
232}
233
234/// Aggregated firehose statistics covering all worker threads.
235#[derive(Clone, PartialEq, Eq, Hash, Debug)]
236pub struct Stats {
237    /// Per-thread statistics for the current update.
238    pub thread_stats: ThreadStats,
239    /// Timestamp captured when processing began.
240    pub start_time: std::time::Instant,
241    /// Timestamp captured when all processing finished, if finished.
242    pub finish_time: Option<std::time::Instant>,
243    /// Slot range currently being processed (half-open [start, end)).
244    pub slot_range: Range<u64>,
245    /// Aggregate slots processed across all threads.
246    pub slots_processed: u64,
247    /// Aggregate blocks processed across all threads.
248    pub blocks_processed: u64,
249    /// Aggregate skipped slots across all threads.
250    pub leader_skipped_slots: u64,
251    /// Aggregate transactions processed across all threads.
252    pub transactions_processed: u64,
253    /// Aggregate entries processed across all threads.
254    pub entries_processed: u64,
255    /// Aggregate rewards processed across all threads.
256    pub rewards_processed: u64,
257    /// Transactions processed since the previous stats pulse.
258    pub transactions_since_last_pulse: u64,
259    /// Blocks processed since the previous stats pulse.
260    pub blocks_since_last_pulse: u64,
261    /// Slots processed since the previous stats pulse.
262    pub slots_since_last_pulse: u64,
263    /// Elapsed time since the previous stats pulse.
264    pub time_since_last_pulse: std::time::Duration,
265}
266
267/// Configuration for periodic stats emission via a [`Handler`] callback.
268#[derive(Clone, PartialEq, Eq, Hash, Debug)]
269pub struct StatsTracking<OnStats: Handler<Stats>> {
270    /// Callback invoked whenever new stats are available.
271    pub on_stats: OnStats,
272    /// Emits a stats callback when the current slot is a multiple of this interval.
273    pub tracking_interval_slots: u64,
274}
275
276#[inline(always)]
277#[allow(clippy::too_many_arguments)]
278async fn maybe_emit_stats<OnStats: Handler<Stats>>(
279    stats_tracking: Option<&StatsTracking<OnStats>>,
280    thread_index: usize,
281    thread_stats: &ThreadStats,
282    overall_slots_processed: &AtomicU64,
283    overall_blocks_processed: &AtomicU64,
284    overall_transactions_processed: &AtomicU64,
285    overall_entries_processed: &AtomicU64,
286    transactions_since_stats: &AtomicU64,
287    blocks_since_stats: &AtomicU64,
288    slots_since_stats: &AtomicU64,
289    last_pulse: &Arc<AtomicU64>,
290    base_instant: std::time::Instant,
291) -> Result<(), (FirehoseError, u64)> {
292    if let Some(stats_tracker) = stats_tracking {
293        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
294        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
295        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
296        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
297        let now_nanos = base_instant.elapsed().as_nanos() as u64;
298        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
299        let delta_nanos = now_nanos.saturating_sub(previous);
300        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
301        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
302        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
303        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
304
305        let stats = Stats {
306            thread_stats: thread_stats.clone(),
307            start_time: thread_stats.start_time,
308            finish_time: thread_stats.finish_time,
309            slot_range: thread_stats.slot_range.clone(),
310            slots_processed: total_slots,
311            blocks_processed: total_blocks,
312            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
313            transactions_processed: total_transactions,
314            entries_processed: total_entries,
315            rewards_processed: thread_stats.rewards_processed,
316            transactions_since_last_pulse: processed_transactions,
317            blocks_since_last_pulse: processed_blocks,
318            slots_since_last_pulse: processed_slots,
319            time_since_last_pulse,
320        };
321
322        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
323            last_pulse.store(previous, Ordering::Relaxed);
324            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
325            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
326            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
327            return Err((
328                FirehoseError::OnStatsHandlerError(e),
329                thread_stats.current_slot,
330            ));
331        }
332    }
333    Ok(())
334}
335
336#[inline(always)]
337fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
338    if tracking_enabled {
339        atomic.fetch_add(value, Ordering::Relaxed);
340    }
341}
342
343fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
344    map.get(&thread_id)
345        .map(|set| set.remove(&slot).is_some())
346        .unwrap_or(false)
347}
348
349fn decode_transaction_status_meta_from_frame(
350    slot: u64,
351    reassembled_metadata: Vec<u8>,
352) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
353    if reassembled_metadata.is_empty() {
354        // Early epochs often omit metadata entirely.
355        return Ok(solana_transaction_status::TransactionStatusMeta::default());
356    }
357
358    match utils::decompress_zstd(reassembled_metadata.clone()) {
359        Ok(decompressed) => {
360            decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
361                Box::new(std::io::Error::other(format!(
362                    "decode transaction metadata (slot {slot}): {err}"
363                ))) as SharedError
364            })
365        }
366        Err(decomp_err) => {
367            // If the frame was not zstd-compressed (common for very early data), try to
368            // decode the raw bytes directly before bailing.
369            decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
370                Box::new(std::io::Error::other(format!(
371                    "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
372                ))) as SharedError
373            })
374        }
375    }
376}
377
378fn decode_transaction_status_meta(
379    slot: u64,
380    metadata_bytes: &[u8],
381) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
382    let epoch = slot_to_epoch(slot);
383    let mut bincode_err: Option<String> = None;
384    if epoch < BINCODE_EPOCH_CUTOFF {
385        match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
386            metadata_bytes,
387        ) {
388            Ok(stored) => return Ok(stored.into()),
389            Err(err) => {
390                bincode_err = Some(err.to_string());
391            }
392        }
393    }
394
395    let bin_err_for_proto = bincode_err.clone();
396    let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
397        prost_011::Message::decode(metadata_bytes).map_err(|err| {
398            // If we already tried bincode, surface both failures for easier debugging.
399            if let Some(ref bin_err) = bin_err_for_proto {
400                Box::new(std::io::Error::other(format!(
401                    "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
402                ))) as SharedError
403            } else {
404                Box::new(std::io::Error::other(format!(
405                    "protobuf decode transaction metadata: {err}"
406                ))) as SharedError
407            }
408        })?;
409
410    proto.try_into().map_err(|err| {
411        if let Some(ref bin_err) = bincode_err {
412            Box::new(std::io::Error::other(format!(
413                "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
414            ))) as SharedError
415        } else {
416            Box::new(std::io::Error::other(format!(
417                "convert transaction metadata proto: {err}"
418            ))) as SharedError
419        }
420    })
421}
422
423#[cfg(test)]
424mod metadata_decode_tests {
425    use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
426    use solana_message::v0::LoadedAddresses;
427    use solana_storage_proto::StoredTransactionStatusMeta;
428    use solana_transaction_status::TransactionStatusMeta;
429
430    fn sample_meta() -> TransactionStatusMeta {
431        let mut meta = TransactionStatusMeta::default();
432        meta.fee = 42;
433        meta.pre_balances = vec![1, 2];
434        meta.post_balances = vec![3, 4];
435        meta.log_messages = Some(vec!["hello".into()]);
436        meta.pre_token_balances = Some(Vec::new());
437        meta.post_token_balances = Some(Vec::new());
438        meta.rewards = Some(Vec::new());
439        meta.compute_units_consumed = Some(7);
440        meta.cost_units = Some(9);
441        meta.loaded_addresses = LoadedAddresses::default();
442        meta
443    }
444
445    #[test]
446    fn decodes_bincode_metadata_for_early_epochs() {
447        let stored = StoredTransactionStatusMeta {
448            status: Ok(()),
449            fee: 42,
450            pre_balances: vec![1, 2],
451            post_balances: vec![3, 4],
452            inner_instructions: None,
453            log_messages: Some(vec!["hello".into()]),
454            pre_token_balances: Some(Vec::new()),
455            post_token_balances: Some(Vec::new()),
456            rewards: Some(Vec::new()),
457            return_data: None,
458            compute_units_consumed: Some(7),
459            cost_units: Some(9),
460        };
461        let bytes = bincode::serialize(&stored).expect("bincode serialize");
462        let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
463        assert_eq!(decoded, TransactionStatusMeta::from(stored));
464    }
465
466    #[test]
467    fn decodes_protobuf_metadata_for_later_epochs() {
468        let meta = sample_meta();
469        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
470            meta.clone().into();
471        let bytes = prost_011::Message::encode_to_vec(&generated);
472        let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
473        assert_eq!(decoded, meta);
474    }
475
476    #[test]
477    fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
478        let meta = sample_meta();
479        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
480            meta.clone().into();
481        let bytes = prost_011::Message::encode_to_vec(&generated);
482        // Epoch 100 should try bincode first; if those bytes are proto, we must fall back.
483        let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
484        assert_eq!(decoded, meta);
485    }
486
487    #[test]
488    fn empty_frame_decodes_to_default() {
489        let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
490        assert_eq!(decoded, TransactionStatusMeta::default());
491    }
492
493    #[test]
494    fn raw_bincode_frame_without_zstd_still_decodes() {
495        let stored = StoredTransactionStatusMeta {
496            status: Ok(()),
497            fee: 1,
498            pre_balances: vec![],
499            post_balances: vec![],
500            inner_instructions: None,
501            log_messages: None,
502            pre_token_balances: Some(Vec::new()),
503            post_token_balances: Some(Vec::new()),
504            rewards: Some(Vec::new()),
505            return_data: None,
506            compute_units_consumed: None,
507            cost_units: None,
508        };
509        let raw_bytes = bincode::serialize(&stored).expect("serialize");
510        let decoded =
511            decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
512        assert_eq!(decoded, TransactionStatusMeta::from(stored));
513    }
514}
515
516/// Firehose transaction payload passed to [`Handler`] callbacks.
517#[derive(Debug, Clone)]
518pub struct TransactionData {
519    /// Slot that contains the transaction.
520    pub slot: u64,
521    /// Index of the transaction within the slot.
522    pub transaction_slot_index: usize,
523    /// Transaction signature.
524    pub signature: solana_signature::Signature,
525    /// Hash of the transaction message.
526    pub message_hash: Hash,
527    /// Indicates whether the transaction is a vote.
528    pub is_vote: bool,
529    /// Status metadata returned by the Solana runtime.
530    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
531    /// Fully decoded transaction.
532    pub transaction: VersionedTransaction,
533}
534
535/// Block entry metadata passed to [`Handler`] callbacks.
536#[derive(Debug, Clone)]
537pub struct EntryData {
538    /// Slot that generated the entry.
539    pub slot: u64,
540    /// Index of the entry within the slot.
541    pub entry_index: usize,
542    /// Range of transaction indexes covered by the entry.
543    pub transaction_indexes: Range<usize>,
544    /// Number of hashes associated with the entry.
545    pub num_hashes: u64,
546    /// Entry hash.
547    pub hash: Hash,
548}
549
550/// Reward data conveyed to reward [`Handler`] callbacks.
551#[derive(Debug, Clone)]
552pub struct RewardsData {
553    /// Slot the rewards correspond to.
554    pub slot: u64,
555    /// Reward recipients and their associated reward information.
556    pub rewards: Vec<(Address, RewardInfo)>,
557}
558
559/// Block-level data streamed to block handlers.
560#[derive(Debug)]
561pub enum BlockData {
562    /// Fully populated block payload with ledger metadata.
563    Block {
564        /// Parent slot number.
565        parent_slot: u64,
566        /// Parent block hash.
567        parent_blockhash: Hash,
568        /// Current block slot.
569        slot: u64,
570        /// Current block hash.
571        blockhash: Hash,
572        /// Rewards keyed by account and partition information.
573        rewards: KeyedRewardsAndNumPartitions,
574        /// Optional Unix timestamp for the block.
575        block_time: Option<i64>,
576        /// Optional ledger block height.
577        block_height: Option<u64>,
578        /// Number of executed transactions in the block.
579        executed_transaction_count: u64,
580        /// Number of entries contained in the block.
581        entry_count: u64,
582    },
583    /// Marker indicating the slot appears skipped (either truly skipped or it is late and will
584    /// arrive out of order).
585    PossibleLeaderSkipped {
586        /// Slot number that either lacked a block or may still arrive later.
587        slot: u64,
588    },
589}
590
591impl BlockData {
592    /// Returns the slot associated with this block or skipped slot.
593    #[inline(always)]
594    pub const fn slot(&self) -> u64 {
595        match self {
596            BlockData::Block { slot, .. } => *slot,
597            BlockData::PossibleLeaderSkipped { slot } => *slot,
598        }
599    }
600
601    /// Returns `true` if this record currently represents a missing/possibly skipped slot.
602    #[inline(always)]
603    pub const fn was_skipped(&self) -> bool {
604        matches!(self, BlockData::PossibleLeaderSkipped { .. })
605    }
606
607    /// Returns the optional block time when available.
608    #[inline(always)]
609    pub const fn block_time(&self) -> Option<i64> {
610        match self {
611            BlockData::Block { block_time, .. } => *block_time,
612            BlockData::PossibleLeaderSkipped { .. } => None,
613        }
614    }
615}
616
617type HandlerResult = Result<(), SharedError>;
618type HandlerFuture = BoxFuture<'static, HandlerResult>;
619
620/// Asynchronous callback invoked for each firehose event of type `Data`.
621pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
622
623impl<Data, F> Handler<Data> for F where
624    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
625{
626}
627
628/// Function pointer alias for [`Handler`] callbacks.
629pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
630/// Convenience alias for block handlers accepted by [`firehose`].
631pub type OnBlockFn = HandlerFn<BlockData>;
632/// Convenience alias for transaction handlers accepted by [`firehose`].
633pub type OnTxFn = HandlerFn<TransactionData>;
634/// Convenience alias for entry handlers accepted by [`firehose`].
635pub type OnEntryFn = HandlerFn<EntryData>;
636/// Convenience alias for reward handlers accepted by [`firehose`].
637pub type OnRewardFn = HandlerFn<RewardsData>;
638/// Type alias for [`StatsTracking`] using simple function pointers.
639pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
640/// Convenience alias for firehose error handlers.
641pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
642/// Convenience alias for stats tracking handlers accepted by [`firehose`].
643pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
644
645/// Metadata describing a firehose worker failure.
646#[derive(Clone, Debug)]
647pub struct FirehoseErrorContext {
648    /// Thread index that encountered the error.
649    pub thread_id: usize,
650    /// Slot the worker was processing when the error surfaced.
651    pub slot: u64,
652    /// Epoch derived from the failing slot.
653    pub epoch: u64,
654    /// Stringified error payload for display/logging.
655    pub error_message: String,
656}
657
658/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
659///
660/// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the
661/// runner restarts from the last processed slot to maintain coverage.
662#[inline]
663#[allow(clippy::too_many_arguments)]
664pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
665    threads: u64,
666    slot_range: Range<u64>,
667    on_block: Option<OnBlock>,
668    on_tx: Option<OnTransaction>,
669    on_entry: Option<OnEntry>,
670    on_rewards: Option<OnRewards>,
671    on_error: Option<OnError>,
672    stats_tracking: Option<StatsTracking<OnStats>>,
673    shutdown_signal: Option<broadcast::Receiver<()>>,
674) -> Result<(), (FirehoseError, u64)>
675where
676    OnBlock: Handler<BlockData>,
677    OnTransaction: Handler<TransactionData>,
678    OnEntry: Handler<EntryData>,
679    OnRewards: Handler<RewardsData>,
680    OnStats: Handler<Stats>,
681    OnError: Handler<FirehoseErrorContext>,
682{
683    if threads == 0 {
684        return Err((
685            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
686            slot_range.start,
687        ));
688    }
689    let client = Client::new();
690    log::info!(target: LOG_MODULE, "starting firehose...");
691    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
692
693    let slot_range = Arc::new(slot_range);
694
695    // divide slot_range into n subranges
696    let subranges = generate_subranges(&slot_range, threads);
697    if threads > 1 {
698        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
699    }
700
701    let firehose_start = std::time::Instant::now();
702    let shutdown_flag = Arc::new(AtomicBool::new(false));
703    if let Some(ref rx) = shutdown_signal {
704        let mut rx = rx.resubscribe();
705        let flag = shutdown_flag.clone();
706        tokio::spawn(async move {
707            if rx.recv().await.is_ok() {
708                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
709                flag.store(true, Ordering::SeqCst);
710            }
711        });
712    }
713    let mut handles = Vec::new();
714    // Shared per-thread error counters
715    let error_counts: Arc<Vec<AtomicU32>> =
716        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
717
718    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
719    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
720    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
721    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
722    let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
723
724    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
725        let error_counts = error_counts.clone();
726        let client = client.clone();
727        let on_block = on_block.clone();
728        let on_tx = on_tx.clone();
729        let on_entry = on_entry.clone();
730        let on_reward = on_rewards.clone();
731        let on_error = on_error.clone();
732        let overall_slots_processed = overall_slots_processed.clone();
733        let overall_blocks_processed = overall_blocks_processed.clone();
734        let overall_transactions_processed = overall_transactions_processed.clone();
735        let overall_entries_processed = overall_entries_processed.clone();
736        let stats_tracking = stats_tracking.clone();
737        let transactions_since_stats = Arc::new(AtomicU64::new(0));
738        let blocks_since_stats = Arc::new(AtomicU64::new(0));
739        let slots_since_stats = Arc::new(AtomicU64::new(0));
740        let last_pulse = Arc::new(AtomicU64::new(0));
741        let transactions_since_stats_cloned = transactions_since_stats.clone();
742        let blocks_since_stats_cloned = blocks_since_stats.clone();
743        let slots_since_stats_cloned = slots_since_stats.clone();
744        let last_pulse_cloned = last_pulse.clone();
745        let shutdown_flag = shutdown_flag.clone();
746        let pending_skipped_slots = pending_skipped_slots.clone();
747        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
748
749        let handle = tokio::spawn(async move {
750            let transactions_since_stats = transactions_since_stats_cloned;
751            let blocks_since_stats = blocks_since_stats_cloned;
752            let slots_since_stats = slots_since_stats_cloned;
753            let last_pulse = last_pulse_cloned;
754            let mut shutdown_rx = thread_shutdown_rx;
755            let start_time = firehose_start;
756            last_pulse.store(
757                firehose_start.elapsed().as_nanos() as u64,
758                Ordering::Relaxed,
759            );
760            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
761            let mut skip_until_index = None;
762            let last_emitted_slot = slot_range.start.saturating_sub(1);
763            let block_enabled = on_block.is_some();
764            let tx_enabled = on_tx.is_some();
765            let entry_enabled = on_entry.is_some();
766            let reward_enabled = on_reward.is_some();
767            let tracking_enabled = stats_tracking.is_some();
768            if block_enabled {
769                pending_skipped_slots.entry(thread_index).or_default();
770            }
771            let mut last_counted_slot = slot_range.start.saturating_sub(1);
772            let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
773            let mut thread_stats = if tracking_enabled {
774                Some(ThreadStats {
775                    thread_id: thread_index,
776                    start_time,
777                    finish_time: None,
778                    slot_range: slot_range.clone(),
779                    initial_slot_range: slot_range.clone(),
780                    current_slot: slot_range.start,
781                    slots_processed: 0,
782                    blocks_processed: 0,
783                    leader_skipped_slots: 0,
784                    transactions_processed: 0,
785                    entries_processed: 0,
786                    rewards_processed: 0,
787                })
788            } else {
789                None
790            };
791
792            // let mut triggered = false;
793            while let Err((err, slot)) = async {
794                let mut last_emitted_slot = last_emitted_slot_global;
795                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
796                    log::info!(
797                        target: &log_target,
798                        "shutdown requested; terminating firehose thread {}",
799                        thread_index
800                    );
801                    return Ok(());
802                }
803                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
804                log::info!(
805                    target: &log_target,
806                    "slot range: {} (epoch {}) ... {} (epoch {})",
807                    slot_range.start,
808                    slot_to_epoch(slot_range.start),
809                    slot_range.end,
810                    slot_to_epoch(slot_range.end)
811                );
812
813                log::info!(target: &log_target, "🚒 starting firehose...");
814
815                // for each epoch
816                let mut current_slot: Option<u64> = None;
817                for epoch_num in epoch_range.clone() {
818                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
819                        log::info!(
820                            target: &log_target,
821                            "shutdown requested; terminating firehose thread {}",
822                            thread_index
823                        );
824                        return Ok(());
825                    }
826                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
827                    let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
828                        Ok(stream) => stream,
829                        Err(_) => {
830                            return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
831                        }
832                    };
833                    let mut reader = NodeReader::new(stream);
834
835                    let header_fut = reader.read_raw_header();
836                    let header = match timeout(OP_TIMEOUT, header_fut).await {
837                        Ok(res) => res
838                            .map_err(FirehoseError::ReadHeader)
839                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
840                        Err(_) => {
841                            return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
842                        }
843                    };
844                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
845
846                    let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
847                    let local_start = std::cmp::max(slot_range.start, epoch_start);
848                    let local_end_inclusive =
849                        std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
850                    if local_start > local_end_inclusive {
851                        log::debug!(
852                            target: &log_target,
853                            "epoch {} has no overlap with thread range ({}..{}), skipping",
854                            epoch_num,
855                            slot_range.start,
856                            slot_range.end
857                        );
858                        continue;
859                    }
860
861                    let mut previous_blockhash = Hash::default();
862                    let mut latest_entry_blockhash = Hash::default();
863                    // Reset counters to align to the local epoch slice; prevents boundary slots
864                    // from being treated as already-counted after a restart.
865                    last_counted_slot = local_start.saturating_sub(1);
866                    current_slot = None;
867                    if tracking_enabled
868                        && let Some(ref mut stats) = thread_stats {
869                            stats.current_slot = local_start;
870                            stats.slot_range.start = local_start;
871                        }
872
873                    if local_start > epoch_start {
874                        // Seek to the previous slot so the stream includes all nodes
875                        // (transactions, entries, rewards) that precede the block payload for
876                        // `local_start`.
877                        let seek_slot = local_start.saturating_sub(1);
878                        let seek_fut = reader.seek_to_slot(seek_slot);
879                        match timeout(OP_TIMEOUT, seek_fut).await {
880                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
881                            Err(_) => {
882                                return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
883                            }
884                        }
885                    }
886
887                    // for each item in each block
888                    let mut item_index = 0;
889                    let mut displayed_skip_message = false;
890                    loop {
891                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
892                            log::info!(
893                                target: &log_target,
894                                "shutdown requested; terminating firehose thread {}",
895                                thread_index
896                            );
897                            return Ok(());
898                        }
899                        let read_fut = reader.read_until_block();
900                        let nodes = match timeout(OP_TIMEOUT, read_fut).await {
901                            Ok(result) => result
902                                .map_err(FirehoseError::ReadUntilBlockError)
903                                .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
904                            Err(_) => {
905                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
906                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
907                            }
908                        };
909                        if nodes.is_empty() {
910                            log::info!(
911                                target: &log_target,
912                                "reached end of epoch {}",
913                                epoch_num
914                            );
915                            break;
916                        }
917                        if let Some(last_node) = nodes.0.last()
918                            && !last_node.get_node().is_block()
919                        {
920                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
921                            break;
922                        }
923                        let block = nodes
924                            .get_block()
925                            .map_err(FirehoseError::GetBlockError)
926                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
927                        log::debug!(
928                            target: &log_target,
929                            "read {} items from epoch {}, now at slot {}",
930                            item_index,
931                            epoch_num,
932                            block.slot
933                        );
934                        let slot = block.slot;
935                        if slot > local_end_inclusive {
936                            log::debug!(
937                                target: &log_target,
938                                "reached end of local slice at slot {} (epoch {}), stopping",
939                                slot,
940                                epoch_num
941                            );
942                            break;
943                        }
944                        if slot >= slot_range.end {
945                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
946                            // Return early to terminate the firehose thread cleanly. We use >=
947                            // because slot_range is half-open [start, end), so any slot equal
948                            // to end is out-of-range and must not be processed. Do not emit
949                            // synthetic skipped slots here; another thread may own the boundary.
950                            if block_enabled {
951                                pending_skipped_slots.remove(&thread_index);
952                            }
953                            return Ok(());
954                        }
955                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
956                        if slot < slot_range.start {
957                            if slot.saturating_add(1) == slot_range.start {
958                                log::debug!(
959                                    target: &log_target,
960                                    "priming reader with preceding slot {}, skipping",
961                                    slot
962                                );
963                            } else {
964                                log::warn!(
965                                    target: &log_target,
966                                    "encountered slot {} before start of range {}, skipping",
967                                    slot,
968                                    slot_range.start
969                                );
970                            }
971                            continue;
972                        }
973                        current_slot = Some(slot);
974                        let mut entry_index: usize = 0;
975                        let mut this_block_executed_transaction_count: u64 = 0;
976                        let mut this_block_entry_count: u64 = 0;
977                        let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
978
979                        for node_with_cid in &nodes.0 {
980                            item_index += 1;
981                            if let Some(skip) = skip_until_index {
982                                if item_index < skip {
983                                    if !displayed_skip_message {
984                                        log::info!(
985                                            target: &log_target,
986                                            "skipping until index {} (at {})",
987                                            skip,
988                                            item_index
989                                        );
990                                        displayed_skip_message = true;
991                                    }
992                                    continue;
993                                } else {
994                                    log::info!(
995                                        target: &log_target,
996                                        "reached target index {}, resuming...",
997                                        skip
998                                    );
999                                    skip_until_index = None;
1000                                }
1001                            }
1002                            let node = node_with_cid.get_node();
1003
1004                            if let Some(ref mut stats) = thread_stats {
1005                                stats.current_slot = slot;
1006                            }
1007
1008                            let error_slot = current_slot.unwrap_or(slot_range.start);
1009
1010                            use crate::node::Node::*;
1011                            match node {
1012                                Transaction(tx) => {
1013                                    if tx_enabled
1014                                        && let Some(on_tx_cb) = on_tx.as_ref()
1015                                    {
1016                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1017                                        let versioned_tx = tx.as_parsed().map_err(|err| {
1018                                            (
1019                                                FirehoseError::NodeDecodingError(item_index, err),
1020                                                error_slot,
1021                                            )
1022                                        })?;
1023                                        let reassembled_metadata = nodes
1024                                            .reassemble_dataframes(tx.metadata.clone())
1025                                            .map_err(|err| {
1026                                                (
1027                                                    FirehoseError::NodeDecodingError(item_index, err),
1028                                                    error_slot,
1029                                                )
1030                                            })?;
1031
1032                                        let as_native_metadata = decode_transaction_status_meta_from_frame(
1033                                            block.slot,
1034                                            reassembled_metadata,
1035                                        )
1036                                        .map_err(|err| {
1037                                            (
1038                                                FirehoseError::NodeDecodingError(item_index, err),
1039                                                error_slot,
1040                                            )
1041                                        })?;
1042
1043                                        let message_hash = {
1044                                            #[cfg(feature = "verify-transaction-signatures")]
1045                                            {
1046                                                versioned_tx.verify_and_hash_message().map_err(|err| {
1047                                                    (
1048                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
1049                                                        error_slot,
1050                                                    )
1051                                                })?
1052                                            }
1053                                            #[cfg(not(feature = "verify-transaction-signatures"))]
1054                                            {
1055                                                versioned_tx.message.hash()
1056                                            }
1057                                        };
1058                                        let signature = versioned_tx
1059                                            .signatures
1060                                            .first()
1061                                            .ok_or_else(|| {
1062                                                Box::new(std::io::Error::new(
1063                                                    std::io::ErrorKind::InvalidData,
1064                                                    "transaction missing signature",
1065                                                )) as SharedError
1066                                            })
1067                                            .map_err(|err| {
1068                                                (
1069                                                    FirehoseError::NodeDecodingError(
1070                                                        item_index,
1071                                                        err,
1072                                                    ),
1073                                                    error_slot,
1074                                                )
1075                                            })?;
1076                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
1077
1078                                        on_tx_cb(
1079                                            thread_index,
1080                                            TransactionData {
1081                                                slot: block.slot,
1082                                                transaction_slot_index: tx.index.unwrap() as usize,
1083                                                signature: *signature,
1084                                                message_hash,
1085                                                is_vote,
1086                                                transaction_status_meta: as_native_metadata.clone(),
1087                                                transaction: versioned_tx.clone(),
1088                                            },
1089                                        )
1090                                        .await
1091                                        .map_err(|e| {
1092                                            (
1093                                                FirehoseError::TransactionHandlerError(e),
1094                                                error_slot,
1095                                            )
1096                                        })?;
1097                                    }
1098                                    fetch_add_if(
1099                                        tracking_enabled,
1100                                        &overall_transactions_processed,
1101                                        1,
1102                                    );
1103                                    if let Some(ref mut stats) = thread_stats {
1104                                        stats.transactions_processed += 1;
1105                                    }
1106                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1107                                }
1108                                Entry(entry) => {
1109                                    let entry_hash = Hash::from(entry.hash.to_bytes());
1110                                    let entry_transaction_count = entry.transactions.len();
1111                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
1112                                    let starting_transaction_index_u64 =
1113                                        this_block_executed_transaction_count;
1114                                    latest_entry_blockhash = entry_hash;
1115                                    this_block_executed_transaction_count += entry_transaction_count_u64;
1116                                    this_block_entry_count += 1;
1117
1118                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1119                                        let starting_transaction_index = usize::try_from(
1120                                            starting_transaction_index_u64,
1121                                        )
1122                                        .map_err(|err| {
1123                                            (
1124                                                FirehoseError::EntryHandlerError(Box::new(err)),
1125                                                error_slot,
1126                                            )
1127                                        })?;
1128                                        let transaction_indexes_end =
1129                                            starting_transaction_index + entry_transaction_count;
1130                                        on_entry_cb(
1131                                            thread_index,
1132                                            EntryData {
1133                                                slot: block.slot,
1134                                                entry_index,
1135                                                transaction_indexes: starting_transaction_index
1136                                                    ..transaction_indexes_end,
1137                                                num_hashes: entry.num_hashes,
1138                                                hash: entry_hash,
1139                                            },
1140                                        )
1141                                        .await
1142                                        .map_err(|e| {
1143                                            (
1144                                                FirehoseError::EntryHandlerError(e),
1145                                                error_slot,
1146                                            )
1147                                        })?;
1148                                    }
1149                                    entry_index += 1;
1150                                    fetch_add_if(
1151                                        tracking_enabled,
1152                                        &overall_entries_processed,
1153                                        1,
1154                                    );
1155                                    if let Some(ref mut stats) = thread_stats {
1156                                        stats.entries_processed += 1;
1157                                    }
1158                                }
1159                                Block(block) => {
1160                                    let prev_last_counted_slot = last_counted_slot;
1161                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1162                                        (
1163                                            stats.slots_processed,
1164                                            stats.blocks_processed,
1165                                            stats.leader_skipped_slots,
1166                                            stats.current_slot,
1167                                        )
1168                                    });
1169
1170                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1171                                    let skip_start_from_previous = last_counted_slot.saturating_add(1);
1172                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
1173
1174                                    let skipped_epoch = slot_to_epoch(last_counted_slot);
1175                                    for skipped_slot in skip_start..slot {
1176                                        if slot_to_epoch(skipped_slot) != skipped_epoch {
1177                                            break;
1178                                        }
1179                                        log::debug!(
1180                                            target: &log_target,
1181                                            "leader skipped slot {} (prev_counted {}, current slot {})",
1182                                            skipped_slot,
1183                                            prev_last_counted_slot,
1184                                            slot,
1185                                        );
1186                                        if block_enabled {
1187                                            pending_skipped_slots
1188                                                .entry(thread_index)
1189                                                .or_default()
1190                                                .insert(skipped_slot);
1191                                        }
1192                                        if block_enabled
1193                                            && let Some(on_block_cb) = on_block.as_ref()
1194                                            && skipped_slot > last_emitted_slot {
1195                                                last_emitted_slot = skipped_slot;
1196                                                on_block_cb(
1197                                                    thread_index,
1198                                                    BlockData::PossibleLeaderSkipped {
1199                                                        slot: skipped_slot,
1200                                                    },
1201                                                )
1202                                                .await
1203                                                .map_err(|e| {
1204                                                    (
1205                                                        FirehoseError::BlockHandlerError(e),
1206                                                        error_slot,
1207                                                    )
1208                                                })?;
1209                                            }
1210                                        if tracking_enabled {
1211                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1212                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1213                                            if let Some(ref mut stats) = thread_stats {
1214                                                stats.leader_skipped_slots += 1;
1215                                                stats.slots_processed += 1;
1216                                                stats.current_slot = skipped_slot;
1217                                            }
1218                                        }
1219                                        last_counted_slot = skipped_slot;
1220                                    }
1221
1222                                    let cleared_pending_skip = if block_enabled {
1223                                        clear_pending_skip(
1224                                            &pending_skipped_slots,
1225                                            thread_index,
1226                                            slot,
1227                                        )
1228                                    } else {
1229                                        false
1230                                    };
1231
1232                                    if slot <= last_counted_slot && !cleared_pending_skip {
1233                                        log::debug!(
1234                                            target: &log_target,
1235                                            "duplicate block {}, already counted (last_counted={})",
1236                                            slot,
1237                                            last_counted_slot,
1238                                        );
1239                                        this_block_rewards.clear();
1240                                        continue;
1241                                    }
1242
1243                                    if block_enabled {
1244                                        if let Some(on_block_cb) = on_block.as_ref() {
1245                                            let keyed_rewards = std::mem::take(&mut this_block_rewards);
1246                                            if slot > last_emitted_slot {
1247                                                last_emitted_slot = slot;
1248                                                on_block_cb(
1249                                                    thread_index,
1250                                                    BlockData::Block {
1251                                                        parent_slot: block.meta.parent_slot,
1252                                                        parent_blockhash: previous_blockhash,
1253                                                        slot: block.slot,
1254                                                        blockhash: latest_entry_blockhash,
1255                                                        rewards: KeyedRewardsAndNumPartitions {
1256                                                            keyed_rewards,
1257                                                            num_partitions: None,
1258                                                        },
1259                                                        block_time: Some(block.meta.blocktime as i64),
1260                                                        block_height: block.meta.block_height,
1261                                                        executed_transaction_count:
1262                                                            this_block_executed_transaction_count,
1263                                                        entry_count: this_block_entry_count,
1264                                                    },
1265                                                )
1266                                                .await
1267                                                .map_err(|e| {
1268                                                    (
1269                                                        FirehoseError::BlockHandlerError(e),
1270                                                        error_slot,
1271                                                    )
1272                                                })?;
1273                                            }
1274                                        }
1275                                    } else {
1276                                        this_block_rewards.clear();
1277                                    }
1278                                    previous_blockhash = latest_entry_blockhash;
1279
1280                                    if tracking_enabled {
1281                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1282                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1283                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1284                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1285                                        if let Some(ref mut stats) = thread_stats {
1286                                            stats.blocks_processed += 1;
1287                                            stats.slots_processed += 1;
1288                                            stats.current_slot = slot;
1289                                        }
1290
1291                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1292                                            (&stats_tracking, thread_stats.as_mut())
1293                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1294                                                && let Err(err) = maybe_emit_stats(
1295                                                    stats_tracking.as_ref(),
1296                                                    thread_index,
1297                                                    thread_stats_ref,
1298                                                    &overall_slots_processed,
1299                                                    &overall_blocks_processed,
1300                                                    &overall_transactions_processed,
1301                                                    &overall_entries_processed,
1302                                                &transactions_since_stats,
1303                                                &blocks_since_stats,
1304                                                &slots_since_stats,
1305                                                &last_pulse,
1306                                                start_time,
1307                                            )
1308                                            .await
1309                                            {
1310                                                blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1311                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1312                                                    overall_blocks_processed
1313                                                        .fetch_sub(1, Ordering::Relaxed);
1314                                                    overall_slots_processed
1315                                                        .fetch_sub(1, Ordering::Relaxed);
1316                                                    if let Some((
1317                                                        prev_slots_processed,
1318                                                        prev_blocks_processed,
1319                                                        prev_leader_skipped,
1320                                                        prev_current_slot,
1321                                                    )) = thread_stats_snapshot
1322                                                    {
1323                                                        thread_stats_ref.slots_processed =
1324                                                            prev_slots_processed;
1325                                                        thread_stats_ref.blocks_processed =
1326                                                            prev_blocks_processed;
1327                                                        thread_stats_ref.leader_skipped_slots =
1328                                                            prev_leader_skipped;
1329                                                        thread_stats_ref.current_slot =
1330                                                            prev_current_slot;
1331                                                    }
1332                                                    last_counted_slot = prev_last_counted_slot;
1333                                                    return Err(err);
1334                                                }
1335                                    }
1336
1337                                    if slot > last_counted_slot {
1338                                        last_counted_slot = slot;
1339                                    }
1340                                }
1341                                Subset(_subset) => (),
1342                                Epoch(_epoch) => (),
1343                                Rewards(rewards) => {
1344                                    if reward_enabled || block_enabled {
1345                                        let reassembled = nodes
1346                                            .reassemble_dataframes(rewards.data.clone())
1347                                            .map_err(|err| {
1348                                                (
1349                                                    FirehoseError::NodeDecodingError(item_index, err),
1350                                                    current_slot.unwrap_or(slot_range.start),
1351                                                )
1352                                            })?;
1353                                        if reassembled.is_empty() {
1354                                            this_block_rewards.clear();
1355                                            if reward_enabled
1356                                                && let Some(on_reward_cb) = on_reward.as_ref()
1357                                            {
1358                                                on_reward_cb(
1359                                                    thread_index,
1360                                                    RewardsData {
1361                                                        slot: block.slot,
1362                                                        rewards: Vec::new(),
1363                                                    },
1364                                                )
1365                                                .await
1366                                                .map_err(|e| {
1367                                                    (
1368                                                        FirehoseError::RewardHandlerError(e),
1369                                                        error_slot,
1370                                                    )
1371                                                })?;
1372                                            }
1373                                            continue;
1374                                        }
1375
1376                                        let decompressed = utils::decompress_zstd(reassembled)
1377                                            .map_err(|err| {
1378                                                (
1379                                                    FirehoseError::NodeDecodingError(
1380                                                        item_index,
1381                                                        err,
1382                                                    ),
1383                                                    error_slot,
1384                                                )
1385                                            })?;
1386
1387                                        let decoded =
1388                                            prost_011::Message::decode(decompressed.as_slice())
1389                                                .map_err(|err| {
1390                                                    (
1391                                                        FirehoseError::NodeDecodingError(
1392                                                            item_index,
1393                                                            Box::new(err),
1394                                                        ),
1395                                                        error_slot,
1396                                                    )
1397                                                })?;
1398                                        let keyed_rewards = convert_proto_rewards(&decoded)
1399                                            .map_err(|err| {
1400                                                (
1401                                                    FirehoseError::NodeDecodingError(item_index, err),
1402                                                    error_slot,
1403                                                )
1404                                            })?;
1405                                        if reward_enabled
1406                                            && let Some(on_reward_cb) = on_reward.as_ref()
1407                                        {
1408                                            on_reward_cb(
1409                                                thread_index,
1410                                                RewardsData {
1411                                                    slot: block.slot,
1412                                                    rewards: keyed_rewards.clone(),
1413                                                },
1414                                            )
1415                                            .await
1416                                            .map_err(|e| {
1417                                                (
1418                                                    FirehoseError::RewardHandlerError(e),
1419                                                    error_slot,
1420                                                )
1421                                            })?;
1422                                        }
1423                                        this_block_rewards = keyed_rewards;
1424                                        if let Some(ref mut stats) = thread_stats {
1425                                            stats.rewards_processed +=
1426                                                this_block_rewards.len() as u64;
1427                                        }
1428                                    }
1429                                }
1430                                DataFrame(_data_frame) => (),
1431                            }
1432                        }
1433                        if block.slot == slot_range.end - 1 {
1434                            let finish_time = std::time::Instant::now();
1435                            let elapsed = finish_time.duration_since(start_time);
1436                            log::info!(target: &log_target, "processed slot {}", block.slot);
1437                            let elapsed_pretty = human_readable_duration(elapsed);
1438                            log::info!(
1439                                target: &log_target,
1440                                "processed {} slots across {} epochs in {}.",
1441                                slot_range.end - slot_range.start,
1442                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1443                                elapsed_pretty
1444                            );
1445                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1446                            // On completion, report threads with non-zero error counts for
1447                            // visibility.
1448                            let summary: String = error_counts
1449                                .iter()
1450                                .enumerate()
1451                                .filter_map(|(i, c)| {
1452                                    let v = c.load(Ordering::Relaxed);
1453                                    if v > 0 {
1454                                        Some(format!("{:03}({})", i, v))
1455                                    } else {
1456                                        None
1457                                    }
1458                                })
1459                                .collect::<Vec<_>>()
1460                                .join(", ");
1461                            if !summary.is_empty() {
1462                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1463                            }
1464                            return Ok(());
1465                        }
1466                    }
1467                    if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1468                        && last_counted_slot < expected_last_slot
1469                    {
1470                        // Do not synthesize skipped slots during final flush; another thread may
1471                        // cover the remaining range (especially across epoch boundaries).
1472                    }
1473                    if let Some(ref mut stats) = thread_stats {
1474                        stats.finish_time = Some(std::time::Instant::now());
1475                        maybe_emit_stats(
1476                            stats_tracking.as_ref(),
1477                            thread_index,
1478                            stats,
1479                            &overall_slots_processed,
1480                            &overall_blocks_processed,
1481                            &overall_transactions_processed,
1482                            &overall_entries_processed,
1483                            &transactions_since_stats,
1484                            &blocks_since_stats,
1485                            &slots_since_stats,
1486                            &last_pulse,
1487                            start_time,
1488                        )
1489                        .await?;
1490                    }
1491                    if block_enabled {
1492                        pending_skipped_slots.remove(&thread_index);
1493                    }
1494                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1495                    }
1496                    Ok(())
1497            }
1498            .await
1499            {
1500                if is_shutdown_error(&err) {
1501                    log::info!(
1502                        target: &log_target,
1503                        "shutdown requested; terminating firehose thread {}",
1504                        thread_index
1505                    );
1506                    break;
1507                }
1508                let epoch = slot_to_epoch(slot);
1509                let item_index = match &err {
1510                    FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1511                    _ => 0,
1512                };
1513                let error_message = err.to_string();
1514                log::error!(
1515                    target: &log_target,
1516                    "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1517                    slot,
1518                    epoch
1519                );
1520                log::error!(target: &log_target, "{}", error_message);
1521                if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
1522                    // Clear cached index data for this epoch to avoid retrying with a bad/partial index.
1523                    SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1524                }
1525                if let Some(on_error_cb) = on_error.clone() {
1526                    let context = FirehoseErrorContext {
1527                        thread_id: thread_index,
1528                        slot,
1529                        epoch,
1530                        error_message: error_message.clone(),
1531                    };
1532                    if let Err(handler_err) = on_error_cb(thread_index, context).await {
1533                        log::error!(
1534                            target: &log_target,
1535                            "on_error handler failed: {}",
1536                            handler_err
1537                        );
1538                    }
1539                }
1540                // Increment this thread's error counter
1541                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1542                log::warn!(
1543                    target: &log_target,
1544                    "restarting from slot {} at index {}",
1545                    slot,
1546                    item_index,
1547                );
1548                // Update slot range to resume from the failed slot, not the original start.
1549                // Reset local tracking so we don't treat the resumed slot range as already counted.
1550                // If we've already counted this slot, resume from the next one to avoid duplicates.
1551                if slot <= last_counted_slot {
1552                    slot_range.start = last_counted_slot.saturating_add(1);
1553                } else {
1554                    slot_range.start = slot;
1555                }
1556                // Reset pulse timer to exclude downtime from next rate calc.
1557                last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1558                if tracking_enabled
1559                    && let Some(ref mut stats_ref) = thread_stats {
1560                        stats_ref.slot_range.start = slot_range.start;
1561                        stats_ref.slot_range.end = slot_range.end;
1562                        // initial_slot_range remains unchanged for progress reporting.
1563                    }
1564                if block_enabled {
1565                    pending_skipped_slots.remove(&thread_index);
1566                }
1567                skip_until_index = Some(item_index);
1568                last_emitted_slot_global = last_emitted_slot;
1569            }
1570        });
1571        handles.push(handle);
1572    }
1573
1574    // Wait for all threads to complete
1575    for handle in handles {
1576        handle.await.unwrap();
1577    }
1578    if stats_tracking.is_some() {
1579        let elapsed = firehose_start.elapsed();
1580        let elapsed_secs = elapsed.as_secs_f64();
1581        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1582        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1583        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1584        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1585        let total_errors: u64 = error_counts
1586            .iter()
1587            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1588            .sum();
1589        let overall_tps = if elapsed_secs > 0.0 {
1590            total_transactions as f64 / elapsed_secs
1591        } else {
1592            0.0
1593        };
1594        log::info!(
1595            target: LOG_MODULE,
1596            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1597            elapsed_secs,
1598            total_slots,
1599            total_blocks,
1600            total_leader_skipped,
1601            total_transactions,
1602            overall_tps,
1603            total_errors
1604        );
1605    }
1606    if shutdown_flag.load(Ordering::SeqCst) {
1607        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1608    } else {
1609        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1610    }
1611    Ok(())
1612}
1613
1614#[allow(clippy::result_large_err)]
1615/// Builds a Geyser-backed firehose and returns a slot notification stream.
1616///
1617/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1618/// rather than relying solely on remote streams. The provided `slot_range` is treated as a
1619/// half-open interval `[start, end)`, and the thread will restart from the last processed
1620/// slot on recoverable errors to maintain coverage.
1621pub fn firehose_geyser(
1622    rt: Arc<tokio::runtime::Runtime>,
1623    slot_range: Range<u64>,
1624    geyser_config_files: Option<&[PathBuf]>,
1625    index_base_url: &Url,
1626    client: &Client,
1627    on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1628    threads: u64,
1629) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1630    if threads == 0 {
1631        return Err((
1632            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1633            slot_range.start,
1634        ));
1635    }
1636    log::info!(target: LOG_MODULE, "starting firehose...");
1637    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1638    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1639    let mut entry_notifier_maybe = None;
1640    let mut block_meta_notifier_maybe = None;
1641    let mut transaction_notifier_maybe = None;
1642    if let Some(geyser_config_files) = geyser_config_files {
1643        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1644
1645        let service =
1646            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1647                confirmed_bank_receiver.clone(),
1648                true,
1649                geyser_config_files,
1650            )
1651            .map_err(|e| (e.into(), slot_range.start))?;
1652
1653        transaction_notifier_maybe = Some(
1654            service
1655                .get_transaction_notifier()
1656                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1657                .map_err(|e| (e, slot_range.start))?,
1658        );
1659
1660        entry_notifier_maybe = service.get_entry_notifier();
1661        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1662
1663        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1664    }
1665
1666    if entry_notifier_maybe.is_some() {
1667        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1668    } else {
1669        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1670    }
1671    log::info!(target: LOG_MODULE, "running on_load...");
1672    rt.spawn(on_load);
1673
1674    let slot_range = Arc::new(slot_range);
1675    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1676    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1677    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1678    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1679
1680    // divide slot_range into n subranges
1681    let subranges = generate_subranges(&slot_range, threads);
1682    if threads > 1 {
1683        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1684    }
1685
1686    let mut handles = Vec::new();
1687    // Shared per-thread error counters
1688    let error_counts: Arc<Vec<AtomicU32>> =
1689        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1690
1691    for (i, slot_range) in subranges.into_iter().enumerate() {
1692        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1693        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1694        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1695        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1696        let client = client.clone();
1697        let error_counts = error_counts.clone();
1698
1699        let rt_clone = rt.clone();
1700
1701        let handle = std::thread::spawn(move || {
1702            rt_clone.block_on(async {
1703                firehose_geyser_thread(
1704                    slot_range,
1705                    transaction_notifier_maybe,
1706                    entry_notifier_maybe,
1707                    block_meta_notifier_maybe,
1708                    confirmed_bank_sender,
1709                    &client,
1710                    if threads > 1 { Some(i) } else { None },
1711                    error_counts,
1712                )
1713                .await
1714                .unwrap();
1715            });
1716        });
1717        handles.push(handle);
1718    }
1719
1720    // Wait for all threads to complete
1721    for handle in handles {
1722        handle.join().unwrap();
1723    }
1724    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1725    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1726        block_meta_notifier.notify_block_metadata(
1727            u64::MAX,
1728            "unload",
1729            u64::MAX,
1730            "unload",
1731            &KeyedRewardsAndNumPartitions {
1732                keyed_rewards: vec![],
1733                num_partitions: None,
1734            },
1735            None,
1736            None,
1737            0,
1738            0,
1739        );
1740    }
1741    Ok(confirmed_bank_receiver)
1742}
1743
1744#[allow(clippy::too_many_arguments)]
1745#[allow(clippy::result_large_err)]
1746async fn firehose_geyser_thread(
1747    mut slot_range: Range<u64>,
1748    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1749    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1750    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1751    confirmed_bank_sender: Sender<SlotNotification>,
1752    client: &Client,
1753    thread_index: Option<usize>,
1754    error_counts: Arc<Vec<AtomicU32>>,
1755) -> Result<(), (FirehoseError, u64)> {
1756    let start_time = std::time::Instant::now();
1757    let log_target = if let Some(thread_index) = thread_index {
1758        format!("{}::T{:03}", LOG_MODULE, thread_index)
1759    } else {
1760        LOG_MODULE.to_string()
1761    };
1762    let initial_slot_range = slot_range.clone();
1763    let mut skip_until_index = None;
1764    let mut last_counted_slot = slot_range.start.saturating_sub(1);
1765    // let mut triggered = false;
1766    while let Err((err, slot)) = async {
1767            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1768            log::info!(
1769                target: &log_target,
1770                "slot range: {} (epoch {}) ... {} (epoch {})",
1771                slot_range.start,
1772                slot_to_epoch(slot_range.start),
1773                slot_range.end,
1774                slot_to_epoch(slot_range.end)
1775            );
1776
1777            log::info!(target: &log_target, "🚒 starting firehose...");
1778
1779            // for each epoch
1780            let mut current_slot: Option<u64> = None;
1781            for epoch_num in epoch_range.clone() {
1782                log::info!(target: &log_target, "entering epoch {}", epoch_num);
1783                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1784                    Ok(stream) => stream,
1785                    Err(_) => {
1786                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1787                    }
1788                };
1789                let mut reader = NodeReader::new(stream);
1790
1791                let header_fut = reader.read_raw_header();
1792                let header = match timeout(OP_TIMEOUT, header_fut).await {
1793                    Ok(res) => res
1794                        .map_err(FirehoseError::ReadHeader)
1795                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1796                    Err(_) => {
1797                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1798                    }
1799                };
1800                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1801
1802                let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1803                let local_start = std::cmp::max(slot_range.start, epoch_start);
1804                let local_end_inclusive =
1805                    std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1806                if local_start > local_end_inclusive {
1807                    log::debug!(
1808                        target: &log_target,
1809                        "epoch {} has no overlap with thread range ({}..{}), skipping",
1810                        epoch_num,
1811                        slot_range.start,
1812                        slot_range.end
1813                    );
1814                    continue;
1815                }
1816
1817                let mut todo_previous_blockhash = Hash::default();
1818                let mut todo_latest_entry_blockhash = Hash::default();
1819                // Reset counters to align to the local epoch slice; prevents boundary slots
1820                // from being treated as already-counted after a restart.
1821                last_counted_slot = local_start.saturating_sub(1);
1822                current_slot = None;
1823
1824                if local_start > epoch_start {
1825                    // Seek to the slot immediately preceding the requested range so the reader
1826                    // captures the full node set (transactions, entries, rewards) for the
1827                    // target block on the next iteration.
1828                    let seek_slot = local_start.saturating_sub(1);
1829                    let seek_fut = reader.seek_to_slot(seek_slot);
1830                    match timeout(OP_TIMEOUT, seek_fut).await {
1831                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1832                        Err(_) => {
1833                            return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1834                        }
1835                    }
1836                }
1837
1838                // for each item in each block
1839                let mut item_index = 0;
1840                let mut displayed_skip_message = false;
1841                loop {
1842                    let read_fut = reader.read_until_block();
1843                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1844                        Ok(result) => result
1845                            .map_err(FirehoseError::ReadUntilBlockError)
1846                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1847                        Err(_) => {
1848                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1849                            let restart_slot =
1850                                current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
1851                            return Err((
1852                                FirehoseError::OperationTimeout("read_until_block"),
1853                                restart_slot,
1854                            ));
1855                        }
1856                    };
1857                    if nodes.is_empty() {
1858                        log::info!(
1859                            target: &log_target,
1860                            "reached end of epoch {}",
1861                            epoch_num
1862                        );
1863                        break;
1864                    }
1865                    // ignore epoch and subset nodes at end of car file loop { if
1866                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
1867                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
1868                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
1869                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
1870                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
1871                    //         log::info!(target: &log_target, "reached end of epoch {}",
1872                    //             epoch_num); break; }
1873                    if let Some(last_node) = nodes.0.last()
1874                        && !last_node.get_node().is_block() {
1875                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1876                            break;
1877                        }
1878                    let block = nodes
1879                        .get_block()
1880                        .map_err(FirehoseError::GetBlockError)
1881                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1882                    log::debug!(
1883                        target: &log_target,
1884                        "read {} items from epoch {}, now at slot {}",
1885                        item_index,
1886                        epoch_num,
1887                        block.slot
1888                    );
1889                    let slot = block.slot;
1890                    if slot > local_end_inclusive {
1891                        log::debug!(
1892                            target: &log_target,
1893                            "reached end of local slice at slot {} (epoch {}), stopping",
1894                            slot,
1895                            epoch_num
1896                        );
1897                        break;
1898                    }
1899                    if slot >= slot_range.end {
1900                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1901                        // Return early to terminate the firehose thread cleanly. We use >=
1902                        // because slot_range is half-open [start, end), so any slot equal to
1903                        // end is out-of-range and must not be processed.
1904                        return Ok(());
1905                    }
1906                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1907                    if slot < local_start {
1908                        if slot.saturating_add(1) == local_start {
1909                            log::debug!(
1910                                target: &log_target,
1911                                "priming reader with preceding slot {}, skipping",
1912                                slot
1913                            );
1914                        } else {
1915                            log::warn!(
1916                                target: &log_target,
1917                                "encountered slot {} before start of range {}, skipping",
1918                                slot,
1919                                local_start
1920                            );
1921                        }
1922                        continue;
1923                    }
1924                    current_slot = Some(slot);
1925                    let mut entry_index: usize = 0;
1926                    let mut this_block_executed_transaction_count: u64 = 0;
1927                    let mut this_block_entry_count: u64 = 0;
1928                    let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1929
1930                    if slot <= last_counted_slot {
1931                        log::debug!(
1932                            target: &log_target,
1933                            "duplicate block {}, already counted (last_counted={})",
1934                            slot,
1935                            last_counted_slot,
1936                        );
1937                        this_block_rewards.clear();
1938                        continue;
1939                    }
1940
1941                    nodes.each(|node_with_cid| -> Result<(), SharedError> {
1942                        item_index += 1;
1943                        // if item_index == 100000 && !triggered { log::info!("simulating
1944                        //     error"); triggered = true; return
1945                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
1946                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
1947                        //         "simulated error", )), ))); }
1948                        if let Some(skip) = skip_until_index {
1949                            if item_index < skip {
1950                                if !displayed_skip_message {
1951                                    log::info!(
1952                                        target: &log_target,
1953                                        "skipping until index {} (at {})",
1954                                        skip,
1955                                        item_index
1956                                    );
1957                                    displayed_skip_message = true;
1958                                }
1959                                return Ok(());
1960                            } else {
1961                                log::info!(
1962                                    target: &log_target,
1963                                    "reached target index {}, resuming...",
1964                                    skip
1965                                );
1966                                skip_until_index = None;
1967                            }
1968                        }
1969                        let node = node_with_cid.get_node();
1970
1971                        use crate::node::Node::*;
1972                        match node {
1973                            Transaction(tx) => {
1974                                let versioned_tx = tx.as_parsed()?;
1975                                let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1976
1977                                let as_native_metadata = decode_transaction_status_meta_from_frame(
1978                                    block.slot,
1979                                    reassembled_metadata,
1980                                )?;
1981
1982                                let message_hash = {
1983                                    #[cfg(feature = "verify-transaction-signatures")]
1984                                    {
1985                                        versioned_tx.verify_and_hash_message()?
1986                                    }
1987                                    #[cfg(not(feature = "verify-transaction-signatures"))]
1988                                    {
1989                                        // Signature verification is optional because it is
1990                                        // extremely expensive at replay scale.
1991                                        versioned_tx.message.hash()
1992                                    }
1993                                };
1994                                let signature = versioned_tx
1995                                    .signatures
1996                                    .first()
1997                                    .ok_or_else(|| {
1998                                        Box::new(std::io::Error::new(
1999                                            std::io::ErrorKind::InvalidData,
2000                                            "transaction missing signature",
2001                                        )) as SharedError
2002                                    })?;
2003                                let is_vote = is_simple_vote_transaction(&versioned_tx);
2004
2005                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2006                                    transaction_notifier.notify_transaction(
2007                                        block.slot,
2008                                        tx.index.unwrap() as usize,
2009                                        signature,
2010                                        &message_hash,
2011                                        is_vote,
2012                                        &as_native_metadata,
2013                                        &versioned_tx,
2014                                    );
2015                                }
2016
2017                            }
2018                            Entry(entry) => {
2019                                let entry_hash = Hash::from(entry.hash.to_bytes());
2020                                let entry_transaction_count = entry.transactions.len();
2021                                let entry_transaction_count_u64 = entry_transaction_count as u64;
2022                                let starting_transaction_index =
2023                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2024                                        Box::new(std::io::Error::other(
2025                                            "transaction index exceeds usize range",
2026                                        )) as SharedError
2027                                    })?;
2028                                todo_latest_entry_blockhash = entry_hash;
2029                                this_block_executed_transaction_count += entry_transaction_count_u64;
2030                                this_block_entry_count += 1;
2031                                if entry_notifier_maybe.is_none() {
2032                                    return Ok(());
2033                                }
2034                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2035                                let entry_summary = solana_entry::entry::EntrySummary {
2036                                    num_hashes: entry.num_hashes,
2037                                    hash: Hash::from(entry.hash.to_bytes()),
2038                                    num_transactions: entry_transaction_count_u64,
2039                                };
2040                                entry_notifier.notify_entry(
2041                                    block.slot,
2042                                    entry_index,
2043                                    &entry_summary,
2044                                    starting_transaction_index,
2045                                );
2046                                entry_index += 1;
2047                            }
2048                            Block(block) => {
2049                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2050                                confirmed_bank_sender.send(notification).unwrap();
2051
2052                                if block_meta_notifier_maybe.is_none() {
2053                                    last_counted_slot = block.slot;
2054                                    return Ok(());
2055                                }
2056                                let keyed_rewards = std::mem::take(&mut this_block_rewards);
2057                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2058                                block_meta_notifier.notify_block_metadata(
2059                                    block.meta.parent_slot,
2060                                    todo_previous_blockhash.to_string().as_str(),
2061                                    block.slot,
2062                                    todo_latest_entry_blockhash.to_string().as_str(),
2063                                    &KeyedRewardsAndNumPartitions {
2064                                        keyed_rewards,
2065                                        num_partitions: None,
2066                                    },
2067                                    Some(block.meta.blocktime as i64),
2068                                    block.meta.block_height,
2069                                    this_block_executed_transaction_count,
2070                                    this_block_entry_count,
2071                                );
2072                                todo_previous_blockhash = todo_latest_entry_blockhash;
2073                                last_counted_slot = block.slot;
2074                                std::thread::yield_now();
2075                            }
2076                            Subset(_subset) => (),
2077                            Epoch(_epoch) => (),
2078                            Rewards(rewards) => {
2079                                if !rewards.is_complete() {
2080                                    let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
2081                                    let decompressed = utils::decompress_zstd(reassembled)?;
2082                                    let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
2083                                        Box::new(std::io::Error::other(
2084                                            std::format!("Error decoding rewards: {:?}", err),
2085                                        ))
2086                                    })?;
2087                                    this_block_rewards = convert_proto_rewards(&decoded)?;
2088                                }
2089                            }
2090                            DataFrame(_data_frame) => (),
2091                        }
2092                        Ok(())
2093                    })
2094                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2095                    if block.slot == slot_range.end - 1 {
2096                        let finish_time = std::time::Instant::now();
2097                        let elapsed = finish_time.duration_since(start_time);
2098                        log::info!(target: &log_target, "processed slot {}", block.slot);
2099                        let elapsed_pretty = human_readable_duration(elapsed);
2100                        log::info!(
2101                            target: &log_target,
2102                            "processed {} slots across {} epochs in {}.",
2103                            initial_slot_range.end - initial_slot_range.start,
2104                            slot_to_epoch(initial_slot_range.end)
2105                                + 1
2106                                - slot_to_epoch(initial_slot_range.start),
2107                            elapsed_pretty
2108                        );
2109                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2110                        // On completion, report threads with non-zero error counts for
2111                        // visibility.
2112                        let summary: String = error_counts
2113                            .iter()
2114                            .enumerate()
2115                            .filter_map(|(i, c)| {
2116                                let v = c.load(Ordering::Relaxed);
2117                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2118                            })
2119                            .collect::<Vec<_>>()
2120                            .join(", ");
2121                        if !summary.is_empty() {
2122                            log::debug!(target: &log_target, "threads with errors: {}", summary);
2123                        }
2124                        return Ok(());
2125                    }
2126                }
2127            }
2128            Ok(())
2129}
2130.await
2131{
2132        if is_shutdown_error(&err) {
2133            log::info!(
2134                target: &log_target,
2135                "shutdown requested; terminating firehose thread {:?}",
2136                thread_index
2137            );
2138            return Ok(());
2139        }
2140        log::error!(
2141            target: &log_target,
2142            "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2143            slot,
2144            slot_to_epoch(slot)
2145            );
2146            log::error!(target: &log_target, "{}", err);
2147            if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
2148                // Clear cached index data for this epoch to avoid retrying with a bad/partial index.
2149                SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2150            }
2151            let item_index = match err {
2152                FirehoseError::NodeDecodingError(item_index, _) => item_index,
2153                _ => 0,
2154            };
2155            // Increment this thread's error counter
2156            let idx = thread_index.unwrap_or(0);
2157            error_counts[idx].fetch_add(1, Ordering::Relaxed);
2158            log::warn!(
2159                target: &log_target,
2160                "restarting from slot {} at index {}",
2161                slot,
2162                item_index,
2163            );
2164            // Update slot range to resume from the failed slot, not the original start.
2165            // If the failing slot was already fully processed, resume from the next slot.
2166            if slot <= last_counted_slot {
2167                slot_range.start = last_counted_slot.saturating_add(1);
2168            } else {
2169                slot_range.start = slot;
2170            }
2171            skip_until_index = Some(item_index);
2172}
2173    Ok(())
2174}
2175
2176#[inline]
2177fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2178    if !(1..=2).contains(&versioned_tx.signatures.len()) {
2179        return false;
2180    }
2181
2182    if !matches!(
2183        versioned_tx.version(),
2184        solana_transaction::versioned::TransactionVersion::Legacy(_)
2185    ) {
2186        return false;
2187    }
2188
2189    let instructions = versioned_tx.message.instructions();
2190    if instructions.len() != 1 {
2191        return false;
2192    }
2193
2194    let program_index = instructions[0].program_id_index as usize;
2195    versioned_tx
2196        .message
2197        .static_account_keys()
2198        .get(program_index)
2199        .map(|program_id| program_id == &vote_program_id())
2200        .unwrap_or(false)
2201}
2202
2203#[inline(always)]
2204fn convert_proto_rewards(
2205    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2206) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2207    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2208    for proto_reward in proto_rewards.rewards.iter() {
2209        let reward = RewardInfo {
2210            reward_type: match proto_reward.reward_type - 1 {
2211                0 => RewardType::Fee,
2212                1 => RewardType::Rent,
2213                2 => RewardType::Staking,
2214                3 => RewardType::Voting,
2215                typ => {
2216                    return Err(Box::new(std::io::Error::other(format!(
2217                        "unsupported reward type {}",
2218                        typ
2219                    ))));
2220                }
2221            },
2222            lamports: proto_reward.lamports,
2223            post_balance: proto_reward.post_balance,
2224            commission: proto_reward.commission.parse::<u8>().ok(),
2225        };
2226        let pubkey = proto_reward
2227            .pubkey
2228            .parse::<Address>()
2229            .map_err(|err| Box::new(err) as SharedError)?;
2230        keyed_rewards.push((pubkey, reward));
2231    }
2232    Ok(keyed_rewards)
2233}
2234
2235#[inline]
2236/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
2237pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2238    let total = slot_range.end - slot_range.start;
2239    let slots_per_thread = total / threads;
2240    let remainder = total % threads;
2241
2242    let ranges: Vec<Range<u64>> = (0..threads)
2243        .map(|i| {
2244            // Distribute remainder slots to the first `remainder` threads
2245            let extra_slot = if i < remainder { 1 } else { 0 };
2246            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2247            let end = start + slots_per_thread + extra_slot;
2248            start..end
2249        })
2250        .collect();
2251
2252    // Verify that ranges cover all slots exactly
2253    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2254    assert_eq!(
2255        total_covered, total,
2256        "Range generation failed: {} threads should cover {} slots but only cover {}",
2257        threads, total, total_covered
2258    );
2259
2260    // Verify no gaps between ranges
2261    for i in 1..ranges.len() {
2262        assert_eq!(
2263            ranges[i - 1].end,
2264            ranges[i].start,
2265            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2266            i - 1,
2267            ranges[i - 1].end,
2268            i,
2269            ranges[i].start
2270        );
2271    }
2272
2273    log::info!(
2274        target: LOG_MODULE,
2275        "Generated {} thread ranges covering {} slots total",
2276        threads,
2277        total_covered
2278    );
2279    ranges
2280}
2281
2282fn human_readable_duration(duration: std::time::Duration) -> String {
2283    if duration.is_zero() {
2284        return "0s".into();
2285    }
2286    let total_secs = duration.as_secs();
2287    if total_secs < 60 {
2288        let secs_f = duration.as_secs_f64();
2289        if total_secs == 0 {
2290            format!("{:.2}s", secs_f)
2291        } else if duration.subsec_millis() == 0 {
2292            format!("{}s", total_secs)
2293        } else {
2294            format!("{:.2}s", secs_f)
2295        }
2296    } else {
2297        let mut secs = total_secs;
2298        let days = secs / 86_400;
2299        secs %= 86_400;
2300        let hours = secs / 3_600;
2301        secs %= 3_600;
2302        let minutes = secs / 60;
2303        secs %= 60;
2304        if days > 0 {
2305            if hours > 0 {
2306                format!("{days}d{hours}h")
2307            } else {
2308                format!("{days}d")
2309            }
2310        } else if hours > 0 {
2311            if minutes > 0 {
2312                format!("{hours}h{minutes}m")
2313            } else {
2314                format!("{hours}h")
2315            }
2316        } else if minutes > 0 {
2317            if secs > 0 {
2318                format!("{minutes}m{secs}s")
2319            } else {
2320                format!("{minutes}m")
2321            }
2322        } else {
2323            format!("{secs}s")
2324        }
2325    }
2326}
2327
2328#[cfg(test)]
2329fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2330    Box::pin(async move {
2331        let elapsed = stats.start_time.elapsed();
2332        let elapsed_secs = elapsed.as_secs_f64();
2333        let tps = if elapsed_secs > 0.0 {
2334            stats.transactions_processed as f64 / elapsed_secs
2335        } else {
2336            0.0
2337        };
2338        log::info!(
2339            target: LOG_MODULE,
2340            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2341            stats.thread_stats.current_slot,
2342            stats.slots_processed,
2343            stats.blocks_processed,
2344            stats.transactions_processed,
2345            stats.entries_processed,
2346            stats.rewards_processed,
2347            elapsed_secs,
2348            tps
2349        );
2350        Ok(())
2351    })
2352}
2353
2354#[cfg(test)]
2355use futures_util::FutureExt;
2356#[cfg(test)]
2357use serial_test::serial;
2358#[cfg(test)]
2359use std::sync::{Mutex, OnceLock};
2360
2361#[tokio::test(flavor = "multi_thread")]
2362async fn test_firehose_epoch_800() {
2363    use dashmap::DashSet;
2364    use std::sync::atomic::{AtomicU64, Ordering};
2365    solana_logger::setup_with_default("info");
2366    const THREADS: usize = 4;
2367    const NUM_SLOTS_TO_COVER: u64 = 50;
2368    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2369    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2370    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2371    static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2372    static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2373    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2374    let stats_tracking = StatsTracking {
2375        on_stats: log_stats_handler,
2376        tracking_interval_slots: 10,
2377    };
2378
2379    for prev in PREV_BLOCK.iter() {
2380        prev.store(0, Ordering::Relaxed);
2381    }
2382    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2383    NUM_BLOCKS.store(0, Ordering::Relaxed);
2384    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2385    SEEN_SLOTS.get_or_init(DashSet::new).clear();
2386    SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2387
2388    firehose(
2389        THREADS.try_into().unwrap(),
2390        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2391        Some(|thread_id: usize, block: BlockData| {
2392            async move {
2393                let _prev =
2394                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2395                if block.was_skipped() {
2396                    log::info!(
2397                        target: LOG_MODULE,
2398                        "leader skipped block {} on thread {}",
2399                        block.slot(),
2400                        thread_id,
2401                    );
2402                } else {
2403                    /*log::info!(
2404                        target: LOG_MODULE,
2405                        "got block {} on thread {}",
2406                        block.slot(),
2407                        thread_id,
2408                    );*/
2409                }
2410
2411                let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2412                if block.was_skipped() {
2413                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2414                    SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2415                } else {
2416                    if first_time {
2417                        NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2418                        if let BlockData::Block {
2419                            executed_transaction_count,
2420                            ..
2421                        } = &block
2422                        {
2423                            let executed = *executed_transaction_count;
2424                            let _ = MIN_TRANSACTIONS.fetch_update(
2425                                Ordering::Relaxed,
2426                                Ordering::Relaxed,
2427                                |current| {
2428                                    if executed < current {
2429                                        Some(executed)
2430                                    } else {
2431                                        None
2432                                    }
2433                                },
2434                            );
2435                        }
2436                    }
2437                }
2438                Ok(())
2439            }
2440            .boxed()
2441        }),
2442        None::<OnTxFn>,
2443        None::<OnEntryFn>,
2444        None::<OnRewardFn>,
2445        None::<OnErrorFn>,
2446        Some(stats_tracking),
2447        None,
2448    )
2449    .await
2450    .unwrap();
2451    let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2452    assert_eq!(
2453        seen, NUM_SLOTS_TO_COVER,
2454        "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2455    );
2456    let mut skipped: Vec<u64> = SEEN_SKIPPED
2457        .get_or_init(DashSet::new)
2458        .iter()
2459        .map(|v| *v)
2460        .collect();
2461    skipped.sort_unstable();
2462    // 345600000 is present but empty; still emitted as a block. Skip set should not include it.
2463    const EXPECTED_SKIPPED: [u64; 6] = [
2464        345_600_004,
2465        345_600_005,
2466        345_600_008,
2467        345_600_009,
2468        345_600_010,
2469        345_600_011,
2470    ];
2471    assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2472    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2473}
2474
2475#[tokio::test(flavor = "multi_thread")]
2476async fn test_firehose_target_slot_transactions() {
2477    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2478    solana_logger::setup_with_default("info");
2479    const TARGET_SLOT: u64 = 376_273_722;
2480    const SLOT_RADIUS: u64 = 50;
2481    const EXPECTED_TRANSACTIONS: u64 = 1414;
2482    const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2483    static FOUND: AtomicBool = AtomicBool::new(false);
2484    static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2485    static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2486
2487    FOUND.store(false, Ordering::Relaxed);
2488    OBSERVED_TXS.store(0, Ordering::Relaxed);
2489    OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2490
2491    firehose(
2492        4,
2493        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2494        Some(|_thread_id: usize, block: BlockData| {
2495            async move {
2496                if block.slot() == TARGET_SLOT {
2497                    assert!(
2498                        !block.was_skipped(),
2499                        "target slot {TARGET_SLOT} was marked leader skipped",
2500                    );
2501                    if let BlockData::Block {
2502                        executed_transaction_count,
2503                        ..
2504                    } = block
2505                    {
2506                        OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2507                        FOUND.store(true, Ordering::Relaxed);
2508                        assert_eq!(
2509                            executed_transaction_count, EXPECTED_TRANSACTIONS,
2510                            "unexpected transaction count for slot {TARGET_SLOT}"
2511                        );
2512                        assert_eq!(
2513                            OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2514                            EXPECTED_NON_VOTE_TRANSACTIONS,
2515                            "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2516                        );
2517                    }
2518                }
2519                Ok(())
2520            }
2521            .boxed()
2522        }),
2523        Some(|_thread_id: usize, transaction: TransactionData| {
2524            async move {
2525                if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2526                    OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2527                }
2528                Ok(())
2529            }
2530            .boxed()
2531        }),
2532        None::<OnEntryFn>,
2533        None::<OnRewardFn>,
2534        None::<OnErrorFn>,
2535        None::<OnStatsTrackingFn>,
2536        None,
2537    )
2538    .await
2539    .unwrap();
2540
2541    assert!(
2542        FOUND.load(Ordering::Relaxed),
2543        "target slot was not processed"
2544    );
2545    assert_eq!(
2546        OBSERVED_TXS.load(Ordering::Relaxed),
2547        EXPECTED_TRANSACTIONS,
2548        "recorded transaction count mismatch"
2549    );
2550}
2551
2552#[tokio::test(flavor = "multi_thread")]
2553async fn test_firehose_epoch_850_votes_present() {
2554    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2555    solana_logger::setup_with_default("info");
2556    const TARGET_SLOT: u64 = 367_200_100; // epoch 850
2557    const SLOT_RADIUS: u64 = 10;
2558    static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
2559    static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
2560    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
2561
2562    SEEN_BLOCK.store(false, Ordering::Relaxed);
2563    VOTE_TXS.store(0, Ordering::Relaxed);
2564    TOTAL_TXS.store(0, Ordering::Relaxed);
2565
2566    firehose(
2567        2,
2568        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2569        Some(|_thread_id: usize, block: BlockData| {
2570            async move {
2571                if block.slot() == TARGET_SLOT {
2572                    assert!(
2573                        !block.was_skipped(),
2574                        "target slot {TARGET_SLOT} was marked leader skipped",
2575                    );
2576                    SEEN_BLOCK.store(true, Ordering::Relaxed);
2577                }
2578                Ok(())
2579            }
2580            .boxed()
2581        }),
2582        Some(|_thread_id: usize, transaction: TransactionData| {
2583            async move {
2584                if transaction.slot == TARGET_SLOT {
2585                    TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
2586                    if transaction.is_vote {
2587                        VOTE_TXS.fetch_add(1, Ordering::Relaxed);
2588                    }
2589                }
2590                Ok(())
2591            }
2592            .boxed()
2593        }),
2594        None::<OnEntryFn>,
2595        None::<OnRewardFn>,
2596        None::<OnErrorFn>,
2597        None::<OnStatsTrackingFn>,
2598        None,
2599    )
2600    .await
2601    .unwrap();
2602
2603    assert!(
2604        SEEN_BLOCK.load(Ordering::Relaxed),
2605        "target slot was not processed"
2606    );
2607    assert!(
2608        TOTAL_TXS.load(Ordering::Relaxed) > 0,
2609        "no transactions counted in target slot"
2610    );
2611    assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
2612}
2613
2614#[cfg(test)]
2615#[serial]
2616#[tokio::test(flavor = "multi_thread")]
2617async fn test_firehose_restart_loses_coverage_without_reset() {
2618    use std::collections::HashMap;
2619    solana_logger::setup_with_default("info");
2620    const THREADS: usize = 1;
2621    const START_SLOT: u64 = 345_600_000;
2622    const NUM_SLOTS: u64 = 8;
2623
2624    static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
2625    COVERAGE
2626        .get_or_init(|| Mutex::new(HashMap::new()))
2627        .lock()
2628        .unwrap()
2629        .clear();
2630    static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
2631    static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
2632    FAIL_TRIGGERED.store(false, Ordering::Relaxed);
2633    SEEN_BLOCKS.store(0, Ordering::Relaxed);
2634
2635    firehose(
2636        THREADS.try_into().unwrap(),
2637        START_SLOT..(START_SLOT + NUM_SLOTS),
2638        Some(|_thread_id: usize, block: BlockData| {
2639            async move {
2640                // Force an error after at least one block has been seen so restart happens mid-range.
2641                if !block.was_skipped()
2642                    && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
2643                    && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
2644                {
2645                    return Err("synthetic handler failure to exercise restart".into());
2646                }
2647                let mut coverage = COVERAGE
2648                    .get_or_init(|| Mutex::new(HashMap::new()))
2649                    .lock()
2650                    .unwrap();
2651                *coverage.entry(block.slot()).or_insert(0) += 1;
2652                if !block.was_skipped() {
2653                    SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
2654                }
2655                Ok(())
2656            }
2657            .boxed()
2658        }),
2659        None::<OnTxFn>,
2660        None::<OnEntryFn>,
2661        None::<OnRewardFn>,
2662        None::<OnErrorFn>,
2663        None::<OnStatsTrackingFn>,
2664        None,
2665    )
2666    .await
2667    .unwrap();
2668
2669    let coverage = COVERAGE.get().unwrap().lock().unwrap();
2670    for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
2671        assert!(
2672            coverage.contains_key(&slot),
2673            "missing coverage for slot {slot} after restart"
2674        );
2675    }
2676}
2677
2678#[cfg(test)]
2679#[serial]
2680#[tokio::test(flavor = "multi_thread")]
2681async fn test_firehose_gap_coverage_near_known_missing_range() {
2682    use std::collections::HashSet;
2683    solana_logger::setup_with_default("info");
2684    const GAP_START: u64 = 378864000;
2685    const START_SLOT: u64 = GAP_START - 1000;
2686    const END_SLOT: u64 = GAP_START + 1000;
2687    const THREADS: usize = 16;
2688
2689    static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
2690    COVERAGE
2691        .get_or_init(|| Mutex::new(HashSet::new()))
2692        .lock()
2693        .unwrap()
2694        .clear();
2695
2696    firehose(
2697        THREADS.try_into().unwrap(),
2698        START_SLOT..(END_SLOT + 1),
2699        Some(|_thread_id: usize, block: BlockData| {
2700            async move {
2701                if block.was_skipped() {
2702                    return Ok(());
2703                }
2704                let slot = block.slot();
2705                COVERAGE
2706                    .get_or_init(|| Mutex::new(HashSet::new()))
2707                    .lock()
2708                    .unwrap()
2709                    .insert(slot);
2710                Ok(())
2711            }
2712            .boxed()
2713        }),
2714        None::<OnTxFn>,
2715        None::<OnEntryFn>,
2716        None::<OnRewardFn>,
2717        None::<OnErrorFn>,
2718        None::<OnStatsTrackingFn>,
2719        None,
2720    )
2721    .await
2722    .unwrap();
2723
2724    let mut coverage = COVERAGE
2725        .get_or_init(|| Mutex::new(HashSet::new()))
2726        .lock()
2727        .unwrap()
2728        .clone();
2729
2730    // ignore a known 4-slot leader skipped gap
2731    coverage.insert(378864396);
2732    coverage.insert(378864397);
2733    coverage.insert(378864398);
2734    coverage.insert(378864399);
2735
2736    let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
2737    let missing: Vec<u64> = expected
2738        .iter()
2739        .copied()
2740        .filter(|slot| !coverage.contains(slot))
2741        .collect();
2742    assert!(
2743        missing.is_empty(),
2744        "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
2745        missing.len(),
2746        &missing[..missing.len().min(10)]
2747    );
2748}