jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3#[cfg(test)]
4extern crate serial_test;
5#[cfg(test)]
6use futures_util::FutureExt;
7use futures_util::future::BoxFuture;
8use reqwest::{Client, Url};
9#[cfg(test)]
10use serial_test::serial;
11use solana_address::Address;
12use solana_geyser_plugin_manager::{
13    block_metadata_notifier_interface::BlockMetadataNotifier,
14    geyser_plugin_service::GeyserPluginServiceError,
15};
16use solana_hash::Hash;
17use solana_ledger::entry_notifier_interface::EntryNotifier;
18use solana_reward_info::RewardInfo;
19use solana_rpc::{
20    optimistically_confirmed_bank_tracker::SlotNotification,
21    transaction_notifier_interface::TransactionNotifier,
22};
23use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
24use solana_sdk_ids::vote::id as vote_program_id;
25use solana_transaction::versioned::VersionedTransaction;
26#[cfg(test)]
27use std::sync::{Mutex, OnceLock};
28use std::{
29    fmt::Display,
30    future::Future,
31    io,
32    ops::Range,
33    path::PathBuf,
34    sync::{
35        Arc,
36        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
37    },
38};
39
40use thiserror::Error;
41use tokio::{
42    sync::broadcast::{self, error::TryRecvError},
43    time::timeout,
44};
45
46use crate::{
47    LOG_MODULE, SharedError,
48    epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
49    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
50    node_reader::NodeReader,
51    utils,
52};
53
54// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
55// header, seeking, reading next block). Adjust here to tune stall detection/restart
56// aggressiveness.
57const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
58
59fn poll_shutdown(
60    flag: &Arc<std::sync::atomic::AtomicBool>,
61    receiver: &mut Option<broadcast::Receiver<()>>,
62) -> bool {
63    if let Some(rx) = receiver {
64        match rx.try_recv() {
65            Ok(_) | Err(TryRecvError::Lagged(_)) => {
66                flag.store(true, Ordering::SeqCst);
67            }
68            Err(TryRecvError::Closed) => {
69                flag.store(true, Ordering::SeqCst);
70            }
71            Err(TryRecvError::Empty) => {}
72        }
73    }
74    flag.load(Ordering::SeqCst)
75}
76
77fn is_shutdown_error(err: &FirehoseError) -> bool {
78    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
79        inner
80            .downcast_ref::<io::Error>()
81            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
82            .unwrap_or(false)
83    }
84
85    match err {
86        FirehoseError::BlockHandlerError(inner)
87        | FirehoseError::TransactionHandlerError(inner)
88        | FirehoseError::EntryHandlerError(inner)
89        | FirehoseError::RewardHandlerError(inner)
90        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
91        _ => false,
92    }
93}
94
95/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
96/// the firehose.
97#[derive(Debug, Error)]
98pub enum FirehoseError {
99    /// HTTP client error surfaced from `reqwest`.
100    Reqwest(reqwest::Error),
101    /// Failure while reading the Old Faithful CAR header.
102    ReadHeader(SharedError),
103    /// Error emitted by the Solana Geyser plugin service.
104    GeyserPluginService(GeyserPluginServiceError),
105    /// Transaction notifier could not be acquired from the Geyser service.
106    FailedToGetTransactionNotifier,
107    /// Failure while reading data until the next block boundary.
108    ReadUntilBlockError(SharedError),
109    /// Failure while fetching an individual block.
110    GetBlockError(SharedError),
111    /// Failed to decode a node at the given index.
112    NodeDecodingError(usize, SharedError),
113    /// Error surfaced when querying the slot offset index.
114    SlotOffsetIndexError(SlotOffsetIndexError),
115    /// Failure while seeking to a slot within the Old Faithful CAR stream.
116    SeekToSlotError(SharedError),
117    /// Error surfaced during the plugin `on_load` stage.
118    OnLoadError(SharedError),
119    /// Error emitted while invoking the stats handler.
120    OnStatsHandlerError(SharedError),
121    /// Timeout reached while waiting for a firehose operation.
122    OperationTimeout(&'static str),
123    /// Transaction handler returned an error.
124    TransactionHandlerError(SharedError),
125    /// Entry handler returned an error.
126    EntryHandlerError(SharedError),
127    /// Reward handler returned an error.
128    RewardHandlerError(SharedError),
129    /// Block handler returned an error.
130    BlockHandlerError(SharedError),
131}
132
133unsafe impl Send for FirehoseError {}
134unsafe impl Sync for FirehoseError {}
135
136impl Display for FirehoseError {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        match self {
139            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
140            FirehoseError::ReadHeader(error) => {
141                write!(f, "Error reading header: {}", error)
142            }
143            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
144                f,
145                "Error initializing geyser plugin service: {}",
146                geyser_plugin_service_error
147            ),
148            FirehoseError::FailedToGetTransactionNotifier => write!(
149                f,
150                "Failed to get transaction notifier from GeyserPluginService"
151            ),
152            FirehoseError::ReadUntilBlockError(error) => {
153                write!(f, "Error reading until block: {}", error)
154            }
155            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
156            FirehoseError::NodeDecodingError(item_index, error) => {
157                write!(
158                    f,
159                    "Error seeking, reading data from, or decoding data for data node {}: {}",
160                    item_index, error
161                )
162            }
163            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
164                f,
165                "Error getting info from slot offset index: {}",
166                slot_offset_index_error
167            ),
168            FirehoseError::SeekToSlotError(error) => {
169                write!(f, "Error seeking to slot: {}", error)
170            }
171            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
172            FirehoseError::OnStatsHandlerError(error) => {
173                write!(f, "Stats handler error: {}", error)
174            }
175            FirehoseError::OperationTimeout(op) => {
176                write!(f, "Timeout while waiting for operation: {}", op)
177            }
178            FirehoseError::TransactionHandlerError(error) => {
179                write!(f, "Transaction handler error: {}", error)
180            }
181            FirehoseError::EntryHandlerError(error) => {
182                write!(f, "Entry handler error: {}", error)
183            }
184            FirehoseError::RewardHandlerError(error) => {
185                write!(f, "Reward handler error: {}", error)
186            }
187            FirehoseError::BlockHandlerError(error) => {
188                write!(f, "Block handler error: {}", error)
189            }
190        }
191    }
192}
193
194impl From<reqwest::Error> for FirehoseError {
195    fn from(e: reqwest::Error) -> Self {
196        FirehoseError::Reqwest(e)
197    }
198}
199
200impl From<GeyserPluginServiceError> for FirehoseError {
201    fn from(e: GeyserPluginServiceError) -> Self {
202        FirehoseError::GeyserPluginService(e)
203    }
204}
205
206impl From<SlotOffsetIndexError> for FirehoseError {
207    fn from(e: SlotOffsetIndexError) -> Self {
208        FirehoseError::SlotOffsetIndexError(e)
209    }
210}
211
212/// Per-thread progress information emitted by the firehose runner.
213#[derive(Clone, PartialEq, Eq, Hash, Debug)]
214pub struct ThreadStats {
215    /// Identifier of the worker thread reporting the stats.
216    pub thread_id: usize,
217    /// Timestamp captured when the thread began processing.
218    pub start_time: std::time::Instant,
219    /// Timestamp captured when the thread finished, if finished.
220    pub finish_time: Option<std::time::Instant>,
221    /// Slot range currently assigned to the thread (half-open, may shrink on restart).
222    pub slot_range: Range<u64>,
223    /// Original slot range assigned to the thread (half-open, never modified).
224    pub initial_slot_range: Range<u64>,
225    /// Latest slot processed by the thread.
226    pub current_slot: u64,
227    /// Total slots processed by the thread.
228    pub slots_processed: u64,
229    /// Number of blocks successfully processed.
230    pub blocks_processed: u64,
231    /// Number of slots skipped by the cluster leader.
232    pub leader_skipped_slots: u64,
233    /// Total transactions processed.
234    pub transactions_processed: u64,
235    /// Total entries processed.
236    pub entries_processed: u64,
237    /// Total rewards processed.
238    pub rewards_processed: u64,
239}
240
241/// Aggregated firehose statistics covering all worker threads.
242#[derive(Clone, PartialEq, Eq, Hash, Debug)]
243pub struct Stats {
244    /// Per-thread statistics for the current update.
245    pub thread_stats: ThreadStats,
246    /// Timestamp captured when processing began.
247    pub start_time: std::time::Instant,
248    /// Timestamp captured when all processing finished, if finished.
249    pub finish_time: Option<std::time::Instant>,
250    /// Slot range currently being processed (half-open [start, end)).
251    pub slot_range: Range<u64>,
252    /// Aggregate slots processed across all threads.
253    pub slots_processed: u64,
254    /// Aggregate blocks processed across all threads.
255    pub blocks_processed: u64,
256    /// Aggregate skipped slots across all threads.
257    pub leader_skipped_slots: u64,
258    /// Aggregate transactions processed across all threads.
259    pub transactions_processed: u64,
260    /// Aggregate entries processed across all threads.
261    pub entries_processed: u64,
262    /// Aggregate rewards processed across all threads.
263    pub rewards_processed: u64,
264    /// Transactions processed since the previous stats pulse.
265    pub transactions_since_last_pulse: u64,
266    /// Blocks processed since the previous stats pulse.
267    pub blocks_since_last_pulse: u64,
268    /// Slots processed since the previous stats pulse.
269    pub slots_since_last_pulse: u64,
270    /// Elapsed time since the previous stats pulse.
271    pub time_since_last_pulse: std::time::Duration,
272}
273
274/// Configuration for periodic stats emission via a [`Handler`] callback.
275#[derive(Clone, PartialEq, Eq, Hash, Debug)]
276pub struct StatsTracking<OnStats: Handler<Stats>> {
277    /// Callback invoked whenever new stats are available.
278    pub on_stats: OnStats,
279    /// Emits a stats callback when the current slot is a multiple of this interval.
280    pub tracking_interval_slots: u64,
281}
282
283#[inline(always)]
284#[allow(clippy::too_many_arguments)]
285async fn maybe_emit_stats<OnStats: Handler<Stats>>(
286    stats_tracking: Option<&StatsTracking<OnStats>>,
287    thread_index: usize,
288    thread_stats: &ThreadStats,
289    overall_slots_processed: &AtomicU64,
290    overall_blocks_processed: &AtomicU64,
291    overall_transactions_processed: &AtomicU64,
292    overall_entries_processed: &AtomicU64,
293    transactions_since_stats: &AtomicU64,
294    blocks_since_stats: &AtomicU64,
295    slots_since_stats: &AtomicU64,
296    last_pulse: &Arc<AtomicU64>,
297    base_instant: std::time::Instant,
298) -> Result<(), (FirehoseError, u64)> {
299    if let Some(stats_tracker) = stats_tracking {
300        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
301        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
302        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
303        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
304        let now_nanos = base_instant.elapsed().as_nanos() as u64;
305        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
306        let delta_nanos = now_nanos.saturating_sub(previous);
307        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
308        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
309        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
310        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
311
312        let stats = Stats {
313            thread_stats: thread_stats.clone(),
314            start_time: thread_stats.start_time,
315            finish_time: thread_stats.finish_time,
316            slot_range: thread_stats.slot_range.clone(),
317            slots_processed: total_slots,
318            blocks_processed: total_blocks,
319            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
320            transactions_processed: total_transactions,
321            entries_processed: total_entries,
322            rewards_processed: thread_stats.rewards_processed,
323            transactions_since_last_pulse: processed_transactions,
324            blocks_since_last_pulse: processed_blocks,
325            slots_since_last_pulse: processed_slots,
326            time_since_last_pulse,
327        };
328
329        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
330            last_pulse.store(previous, Ordering::Relaxed);
331            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
332            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
333            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
334            return Err((
335                FirehoseError::OnStatsHandlerError(e),
336                thread_stats.current_slot,
337            ));
338        }
339    }
340    Ok(())
341}
342
343#[inline(always)]
344fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
345    if tracking_enabled {
346        atomic.fetch_add(value, Ordering::Relaxed);
347    }
348}
349
350fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
351    map.get(&thread_id)
352        .map(|set| set.remove(&slot).is_some())
353        .unwrap_or(false)
354}
355
356/// Firehose transaction payload passed to [`Handler`] callbacks.
357#[derive(Debug, Clone)]
358pub struct TransactionData {
359    /// Slot that contains the transaction.
360    pub slot: u64,
361    /// Index of the transaction within the slot.
362    pub transaction_slot_index: usize,
363    /// Transaction signature.
364    pub signature: solana_signature::Signature,
365    /// Hash of the transaction message.
366    pub message_hash: Hash,
367    /// Indicates whether the transaction is a vote.
368    pub is_vote: bool,
369    /// Status metadata returned by the Solana runtime.
370    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
371    /// Fully decoded transaction.
372    pub transaction: VersionedTransaction,
373}
374
375/// Block entry metadata passed to [`Handler`] callbacks.
376#[derive(Debug, Clone)]
377pub struct EntryData {
378    /// Slot that generated the entry.
379    pub slot: u64,
380    /// Index of the entry within the slot.
381    pub entry_index: usize,
382    /// Range of transaction indexes covered by the entry.
383    pub transaction_indexes: Range<usize>,
384    /// Number of hashes associated with the entry.
385    pub num_hashes: u64,
386    /// Entry hash.
387    pub hash: Hash,
388}
389
390/// Reward data conveyed to reward [`Handler`] callbacks.
391#[derive(Debug, Clone)]
392pub struct RewardsData {
393    /// Slot the rewards correspond to.
394    pub slot: u64,
395    /// Reward recipients and their associated reward information.
396    pub rewards: Vec<(Address, RewardInfo)>,
397}
398
399/// Block-level data streamed to block handlers.
400#[derive(Debug)]
401pub enum BlockData {
402    /// Fully populated block payload with ledger metadata.
403    Block {
404        /// Parent slot number.
405        parent_slot: u64,
406        /// Parent block hash.
407        parent_blockhash: Hash,
408        /// Current block slot.
409        slot: u64,
410        /// Current block hash.
411        blockhash: Hash,
412        /// Rewards keyed by account and partition information.
413        rewards: KeyedRewardsAndNumPartitions,
414        /// Optional Unix timestamp for the block.
415        block_time: Option<i64>,
416        /// Optional ledger block height.
417        block_height: Option<u64>,
418        /// Number of executed transactions in the block.
419        executed_transaction_count: u64,
420        /// Number of entries contained in the block.
421        entry_count: u64,
422    },
423    /// Marker indicating the slot appears skipped (either truly skipped or it is late and will
424    /// arrive out of order).
425    PossibleLeaderSkipped {
426        /// Slot number that either lacked a block or may still arrive later.
427        slot: u64,
428    },
429}
430
431impl BlockData {
432    /// Returns the slot associated with this block or skipped slot.
433    #[inline(always)]
434    pub const fn slot(&self) -> u64 {
435        match self {
436            BlockData::Block { slot, .. } => *slot,
437            BlockData::PossibleLeaderSkipped { slot } => *slot,
438        }
439    }
440
441    /// Returns `true` if this record currently represents a missing/possibly skipped slot.
442    #[inline(always)]
443    pub const fn was_skipped(&self) -> bool {
444        matches!(self, BlockData::PossibleLeaderSkipped { .. })
445    }
446
447    /// Returns the optional block time when available.
448    #[inline(always)]
449    pub const fn block_time(&self) -> Option<i64> {
450        match self {
451            BlockData::Block { block_time, .. } => *block_time,
452            BlockData::PossibleLeaderSkipped { .. } => None,
453        }
454    }
455}
456
457type HandlerResult = Result<(), SharedError>;
458type HandlerFuture = BoxFuture<'static, HandlerResult>;
459
460/// Asynchronous callback invoked for each firehose event of type `Data`.
461pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
462
463impl<Data, F> Handler<Data> for F where
464    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
465{
466}
467
468/// Function pointer alias for [`Handler`] callbacks.
469pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
470/// Convenience alias for block handlers accepted by [`firehose`].
471pub type OnBlockFn = HandlerFn<BlockData>;
472/// Convenience alias for transaction handlers accepted by [`firehose`].
473pub type OnTxFn = HandlerFn<TransactionData>;
474/// Convenience alias for entry handlers accepted by [`firehose`].
475pub type OnEntryFn = HandlerFn<EntryData>;
476/// Convenience alias for reward handlers accepted by [`firehose`].
477pub type OnRewardFn = HandlerFn<RewardsData>;
478/// Type alias for [`StatsTracking`] using simple function pointers.
479pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
480/// Convenience alias for firehose error handlers.
481pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
482/// Convenience alias for stats tracking handlers accepted by [`firehose`].
483pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
484
485/// Metadata describing a firehose worker failure.
486#[derive(Clone, Debug)]
487pub struct FirehoseErrorContext {
488    /// Thread index that encountered the error.
489    pub thread_id: usize,
490    /// Slot the worker was processing when the error surfaced.
491    pub slot: u64,
492    /// Epoch derived from the failing slot.
493    pub epoch: u64,
494    /// Stringified error payload for display/logging.
495    pub error_message: String,
496}
497
498/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
499///
500/// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the
501/// runner restarts from the last processed slot to maintain coverage.
502#[inline]
503#[allow(clippy::too_many_arguments)]
504pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
505    threads: u64,
506    slot_range: Range<u64>,
507    on_block: Option<OnBlock>,
508    on_tx: Option<OnTransaction>,
509    on_entry: Option<OnEntry>,
510    on_rewards: Option<OnRewards>,
511    on_error: Option<OnError>,
512    stats_tracking: Option<StatsTracking<OnStats>>,
513    shutdown_signal: Option<broadcast::Receiver<()>>,
514) -> Result<(), (FirehoseError, u64)>
515where
516    OnBlock: Handler<BlockData>,
517    OnTransaction: Handler<TransactionData>,
518    OnEntry: Handler<EntryData>,
519    OnRewards: Handler<RewardsData>,
520    OnStats: Handler<Stats>,
521    OnError: Handler<FirehoseErrorContext>,
522{
523    if threads == 0 {
524        return Err((
525            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
526            slot_range.start,
527        ));
528    }
529    let client = Client::new();
530    log::info!(target: LOG_MODULE, "starting firehose...");
531    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
532
533    let slot_range = Arc::new(slot_range);
534
535    // divide slot_range into n subranges
536    let subranges = generate_subranges(&slot_range, threads);
537    if threads > 1 {
538        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
539    }
540
541    let firehose_start = std::time::Instant::now();
542    let shutdown_flag = Arc::new(AtomicBool::new(false));
543    if let Some(ref rx) = shutdown_signal {
544        let mut rx = rx.resubscribe();
545        let flag = shutdown_flag.clone();
546        tokio::spawn(async move {
547            if rx.recv().await.is_ok() {
548                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
549                flag.store(true, Ordering::SeqCst);
550            }
551        });
552    }
553    let mut handles = Vec::new();
554    // Shared per-thread error counters
555    let error_counts: Arc<Vec<AtomicU32>> =
556        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
557
558    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
559    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
560    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
561    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
562    let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
563
564    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
565        let error_counts = error_counts.clone();
566        let client = client.clone();
567        let on_block = on_block.clone();
568        let on_tx = on_tx.clone();
569        let on_entry = on_entry.clone();
570        let on_reward = on_rewards.clone();
571        let on_error = on_error.clone();
572        let overall_slots_processed = overall_slots_processed.clone();
573        let overall_blocks_processed = overall_blocks_processed.clone();
574        let overall_transactions_processed = overall_transactions_processed.clone();
575        let overall_entries_processed = overall_entries_processed.clone();
576        let stats_tracking = stats_tracking.clone();
577        let transactions_since_stats = Arc::new(AtomicU64::new(0));
578        let blocks_since_stats = Arc::new(AtomicU64::new(0));
579        let slots_since_stats = Arc::new(AtomicU64::new(0));
580        let last_pulse = Arc::new(AtomicU64::new(0));
581        let transactions_since_stats_cloned = transactions_since_stats.clone();
582        let blocks_since_stats_cloned = blocks_since_stats.clone();
583        let slots_since_stats_cloned = slots_since_stats.clone();
584        let last_pulse_cloned = last_pulse.clone();
585        let shutdown_flag = shutdown_flag.clone();
586        let pending_skipped_slots = pending_skipped_slots.clone();
587        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
588
589        let handle = tokio::spawn(async move {
590            let transactions_since_stats = transactions_since_stats_cloned;
591            let blocks_since_stats = blocks_since_stats_cloned;
592            let slots_since_stats = slots_since_stats_cloned;
593            let last_pulse = last_pulse_cloned;
594            let mut shutdown_rx = thread_shutdown_rx;
595            let start_time = firehose_start;
596            last_pulse.store(
597                firehose_start.elapsed().as_nanos() as u64,
598                Ordering::Relaxed,
599            );
600            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
601            let mut skip_until_index = None;
602            let last_emitted_slot = slot_range.start.saturating_sub(1);
603            let block_enabled = on_block.is_some();
604            let tx_enabled = on_tx.is_some();
605            let entry_enabled = on_entry.is_some();
606            let reward_enabled = on_reward.is_some();
607            let tracking_enabled = stats_tracking.is_some();
608            if block_enabled {
609                pending_skipped_slots.entry(thread_index).or_default();
610            }
611            let mut last_counted_slot = slot_range.start.saturating_sub(1);
612            let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
613            let mut thread_stats = if tracking_enabled {
614                Some(ThreadStats {
615                    thread_id: thread_index,
616                    start_time,
617                    finish_time: None,
618                    slot_range: slot_range.clone(),
619                    initial_slot_range: slot_range.clone(),
620                    current_slot: slot_range.start,
621                    slots_processed: 0,
622                    blocks_processed: 0,
623                    leader_skipped_slots: 0,
624                    transactions_processed: 0,
625                    entries_processed: 0,
626                    rewards_processed: 0,
627                })
628            } else {
629                None
630            };
631
632            // let mut triggered = false;
633            while let Err((err, slot)) = async {
634                let mut last_emitted_slot = last_emitted_slot_global;
635                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
636                    log::info!(
637                        target: &log_target,
638                        "shutdown requested; terminating firehose thread {}",
639                        thread_index
640                    );
641                    return Ok(());
642                }
643                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
644                log::info!(
645                    target: &log_target,
646                    "slot range: {} (epoch {}) ... {} (epoch {})",
647                    slot_range.start,
648                    slot_to_epoch(slot_range.start),
649                    slot_range.end,
650                    slot_to_epoch(slot_range.end)
651                );
652
653                log::info!(target: &log_target, "🚒 starting firehose...");
654
655                // for each epoch
656                let mut current_slot: Option<u64> = None;
657                for epoch_num in epoch_range.clone() {
658                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
659                        log::info!(
660                            target: &log_target,
661                            "shutdown requested; terminating firehose thread {}",
662                            thread_index
663                        );
664                        return Ok(());
665                    }
666                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
667                    let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
668                        Ok(stream) => stream,
669                        Err(_) => {
670                            return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
671                        }
672                    };
673                    let mut reader = NodeReader::new(stream);
674
675                    let header_fut = reader.read_raw_header();
676                    let header = match timeout(OP_TIMEOUT, header_fut).await {
677                        Ok(res) => res
678                            .map_err(FirehoseError::ReadHeader)
679                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
680                        Err(_) => {
681                            return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
682                        }
683                    };
684                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
685
686                    let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
687                    let local_start = std::cmp::max(slot_range.start, epoch_start);
688                    let local_end_inclusive =
689                        std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
690                    if local_start > local_end_inclusive {
691                        log::debug!(
692                            target: &log_target,
693                            "epoch {} has no overlap with thread range ({}..{}), skipping",
694                            epoch_num,
695                            slot_range.start,
696                            slot_range.end
697                        );
698                        continue;
699                    }
700
701                    let mut previous_blockhash = Hash::default();
702                    let mut latest_entry_blockhash = Hash::default();
703                    // Reset counters to align to the local epoch slice; prevents boundary slots
704                    // from being treated as already-counted after a restart.
705                    last_counted_slot = local_start.saturating_sub(1);
706                    current_slot = None;
707                    if tracking_enabled
708                        && let Some(ref mut stats) = thread_stats {
709                            stats.current_slot = local_start;
710                            stats.slot_range.start = local_start;
711                        }
712
713                    if local_start > epoch_start {
714                        // Seek to the previous slot so the stream includes all nodes
715                        // (transactions, entries, rewards) that precede the block payload for
716                        // `local_start`.
717                        let seek_slot = local_start.saturating_sub(1);
718                        let seek_fut = reader.seek_to_slot(seek_slot);
719                        match timeout(OP_TIMEOUT, seek_fut).await {
720                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
721                            Err(_) => {
722                                return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
723                            }
724                        }
725                    }
726
727                    // for each item in each block
728                    let mut item_index = 0;
729                    let mut displayed_skip_message = false;
730                    loop {
731                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
732                            log::info!(
733                                target: &log_target,
734                                "shutdown requested; terminating firehose thread {}",
735                                thread_index
736                            );
737                            return Ok(());
738                        }
739                        let read_fut = reader.read_until_block();
740                        let nodes = match timeout(OP_TIMEOUT, read_fut).await {
741                            Ok(result) => result
742                                .map_err(FirehoseError::ReadUntilBlockError)
743                                .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
744                            Err(_) => {
745                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
746                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
747                            }
748                        };
749                        if nodes.is_empty() {
750                            log::info!(
751                                target: &log_target,
752                                "reached end of epoch {}",
753                                epoch_num
754                            );
755                            break;
756                        }
757                        if let Some(last_node) = nodes.0.last()
758                            && !last_node.get_node().is_block()
759                        {
760                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
761                            break;
762                        }
763                        let block = nodes
764                            .get_block()
765                            .map_err(FirehoseError::GetBlockError)
766                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
767                        log::debug!(
768                            target: &log_target,
769                            "read {} items from epoch {}, now at slot {}",
770                            item_index,
771                            epoch_num,
772                            block.slot
773                        );
774                        let slot = block.slot;
775                        if slot > local_end_inclusive {
776                            log::debug!(
777                                target: &log_target,
778                                "reached end of local slice at slot {} (epoch {}), stopping",
779                                slot,
780                                epoch_num
781                            );
782                            break;
783                        }
784                        if slot >= slot_range.end {
785                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
786                            // Return early to terminate the firehose thread cleanly. We use >=
787                            // because slot_range is half-open [start, end), so any slot equal
788                            // to end is out-of-range and must not be processed. Do not emit
789                            // synthetic skipped slots here; another thread may own the boundary.
790                            if block_enabled {
791                                pending_skipped_slots.remove(&thread_index);
792                            }
793                            return Ok(());
794                        }
795                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
796                        if slot < slot_range.start {
797                            if slot.saturating_add(1) == slot_range.start {
798                                log::debug!(
799                                    target: &log_target,
800                                    "priming reader with preceding slot {}, skipping",
801                                    slot
802                                );
803                            } else {
804                                log::warn!(
805                                    target: &log_target,
806                                    "encountered slot {} before start of range {}, skipping",
807                                    slot,
808                                    slot_range.start
809                                );
810                            }
811                            continue;
812                        }
813                        current_slot = Some(slot);
814                        let mut entry_index: usize = 0;
815                        let mut this_block_executed_transaction_count: u64 = 0;
816                        let mut this_block_entry_count: u64 = 0;
817                        let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
818
819                        for node_with_cid in &nodes.0 {
820                            item_index += 1;
821                            if let Some(skip) = skip_until_index {
822                                if item_index < skip {
823                                    if !displayed_skip_message {
824                                        log::info!(
825                                            target: &log_target,
826                                            "skipping until index {} (at {})",
827                                            skip,
828                                            item_index
829                                        );
830                                        displayed_skip_message = true;
831                                    }
832                                    continue;
833                                } else {
834                                    log::info!(
835                                        target: &log_target,
836                                        "reached target index {}, resuming...",
837                                        skip
838                                    );
839                                    skip_until_index = None;
840                                }
841                            }
842                            let node = node_with_cid.get_node();
843
844                            if let Some(ref mut stats) = thread_stats {
845                                stats.current_slot = slot;
846                            }
847
848                            let error_slot = current_slot.unwrap_or(slot_range.start);
849
850                            use crate::node::Node::*;
851                            match node {
852                                Transaction(tx) => {
853                                    if tx_enabled
854                                        && let Some(on_tx_cb) = on_tx.as_ref()
855                                    {
856                                        let error_slot = current_slot.unwrap_or(slot_range.start);
857                                        let versioned_tx = tx.as_parsed().map_err(|err| {
858                                            (
859                                                FirehoseError::NodeDecodingError(item_index, err),
860                                                error_slot,
861                                            )
862                                        })?;
863                                        let reassembled_metadata = nodes
864                                            .reassemble_dataframes(tx.metadata.clone())
865                                            .map_err(|err| {
866                                                (
867                                                    FirehoseError::NodeDecodingError(item_index, err),
868                                                    error_slot,
869                                                )
870                                            })?;
871
872                                        let decompressed =
873                                            utils::decompress_zstd(reassembled_metadata.clone())
874                                                .map_err(|err| {
875                                                    (
876                                                        FirehoseError::NodeDecodingError(
877                                                            item_index,
878                                                            err,
879                                                        ),
880                                                        error_slot,
881                                                    )
882                                                })?;
883
884                                        let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
885                                            prost_011::Message::decode(decompressed.as_slice())
886                                                .map_err(|err| {
887                                                    (
888                                                        FirehoseError::NodeDecodingError(
889                                                            item_index,
890                                                            Box::new(err),
891                                                        ),
892                                                        error_slot,
893                                                    )
894                                                })?;
895
896                                        let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
897                                            metadata.try_into().map_err(|err| {
898                                                (
899                                                    FirehoseError::NodeDecodingError(
900                                                        item_index,
901                                                        Box::new(err),
902                                                    ),
903                                                    error_slot,
904                                                )
905                                            })?;
906
907                                        let message_hash = {
908                                            #[cfg(feature = "verify-transaction-signatures")]
909                                            {
910                                                versioned_tx.verify_and_hash_message().map_err(|err| {
911                                                    (
912                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
913                                                        error_slot,
914                                                    )
915                                                })?
916                                            }
917                                            #[cfg(not(feature = "verify-transaction-signatures"))]
918                                            {
919                                                versioned_tx.message.hash()
920                                            }
921                                        };
922                                        let signature = versioned_tx
923                                            .signatures
924                                            .first()
925                                            .ok_or_else(|| {
926                                                Box::new(std::io::Error::new(
927                                                    std::io::ErrorKind::InvalidData,
928                                                    "transaction missing signature",
929                                                )) as SharedError
930                                            })
931                                            .map_err(|err| {
932                                                (
933                                                    FirehoseError::NodeDecodingError(
934                                                        item_index,
935                                                        err,
936                                                    ),
937                                                    error_slot,
938                                                )
939                                            })?;
940                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
941
942                                        on_tx_cb(
943                                            thread_index,
944                                            TransactionData {
945                                                slot: block.slot,
946                                                transaction_slot_index: tx.index.unwrap() as usize,
947                                                signature: *signature,
948                                                message_hash,
949                                                is_vote,
950                                                transaction_status_meta: as_native_metadata.clone(),
951                                                transaction: versioned_tx.clone(),
952                                            },
953                                        )
954                                        .await
955                                        .map_err(|e| {
956                                            (
957                                                FirehoseError::TransactionHandlerError(e),
958                                                error_slot,
959                                            )
960                                        })?;
961                                    }
962                                    fetch_add_if(
963                                        tracking_enabled,
964                                        &overall_transactions_processed,
965                                        1,
966                                    );
967                                    if let Some(ref mut stats) = thread_stats {
968                                        stats.transactions_processed += 1;
969                                    }
970                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
971                                }
972                                Entry(entry) => {
973                                    let entry_hash = Hash::from(entry.hash.to_bytes());
974                                    let entry_transaction_count = entry.transactions.len();
975                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
976                                    let starting_transaction_index_u64 =
977                                        this_block_executed_transaction_count;
978                                    latest_entry_blockhash = entry_hash;
979                                    this_block_executed_transaction_count += entry_transaction_count_u64;
980                                    this_block_entry_count += 1;
981
982                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
983                                        let starting_transaction_index = usize::try_from(
984                                            starting_transaction_index_u64,
985                                        )
986                                        .map_err(|err| {
987                                            (
988                                                FirehoseError::EntryHandlerError(Box::new(err)),
989                                                error_slot,
990                                            )
991                                        })?;
992                                        let transaction_indexes_end =
993                                            starting_transaction_index + entry_transaction_count;
994                                        on_entry_cb(
995                                            thread_index,
996                                            EntryData {
997                                                slot: block.slot,
998                                                entry_index,
999                                                transaction_indexes: starting_transaction_index
1000                                                    ..transaction_indexes_end,
1001                                                num_hashes: entry.num_hashes,
1002                                                hash: entry_hash,
1003                                            },
1004                                        )
1005                                        .await
1006                                        .map_err(|e| {
1007                                            (
1008                                                FirehoseError::EntryHandlerError(e),
1009                                                error_slot,
1010                                            )
1011                                        })?;
1012                                    }
1013                                    entry_index += 1;
1014                                    fetch_add_if(
1015                                        tracking_enabled,
1016                                        &overall_entries_processed,
1017                                        1,
1018                                    );
1019                                    if let Some(ref mut stats) = thread_stats {
1020                                        stats.entries_processed += 1;
1021                                    }
1022                                }
1023                                Block(block) => {
1024                                    let prev_last_counted_slot = last_counted_slot;
1025                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1026                                        (
1027                                            stats.slots_processed,
1028                                            stats.blocks_processed,
1029                                            stats.leader_skipped_slots,
1030                                            stats.current_slot,
1031                                        )
1032                                    });
1033
1034                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1035                                    let skip_start_from_previous = last_counted_slot.saturating_add(1);
1036                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
1037
1038                                    let skipped_epoch = slot_to_epoch(last_counted_slot);
1039                                    for skipped_slot in skip_start..slot {
1040                                        if slot_to_epoch(skipped_slot) != skipped_epoch {
1041                                            break;
1042                                        }
1043                                        log::debug!(
1044                                            target: &log_target,
1045                                            "leader skipped slot {} (prev_counted {}, current slot {})",
1046                                            skipped_slot,
1047                                            prev_last_counted_slot,
1048                                            slot,
1049                                        );
1050                                        if block_enabled {
1051                                            pending_skipped_slots
1052                                                .entry(thread_index)
1053                                                .or_default()
1054                                                .insert(skipped_slot);
1055                                        }
1056                                        if block_enabled
1057                                            && let Some(on_block_cb) = on_block.as_ref()
1058                                            && skipped_slot > last_emitted_slot {
1059                                                last_emitted_slot = skipped_slot;
1060                                                on_block_cb(
1061                                                    thread_index,
1062                                                    BlockData::PossibleLeaderSkipped {
1063                                                        slot: skipped_slot,
1064                                                    },
1065                                                )
1066                                                .await
1067                                                .map_err(|e| {
1068                                                    (
1069                                                        FirehoseError::BlockHandlerError(e),
1070                                                        error_slot,
1071                                                    )
1072                                                })?;
1073                                            }
1074                                        if tracking_enabled {
1075                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1076                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1077                                            if let Some(ref mut stats) = thread_stats {
1078                                                stats.leader_skipped_slots += 1;
1079                                                stats.slots_processed += 1;
1080                                                stats.current_slot = skipped_slot;
1081                                            }
1082                                        }
1083                                        last_counted_slot = skipped_slot;
1084                                    }
1085
1086                                    let cleared_pending_skip = if block_enabled {
1087                                        clear_pending_skip(
1088                                            &pending_skipped_slots,
1089                                            thread_index,
1090                                            slot,
1091                                        )
1092                                    } else {
1093                                        false
1094                                    };
1095
1096                                    if slot <= last_counted_slot && !cleared_pending_skip {
1097                                        log::debug!(
1098                                            target: &log_target,
1099                                            "duplicate block {}, already counted (last_counted={})",
1100                                            slot,
1101                                            last_counted_slot,
1102                                        );
1103                                        this_block_rewards.clear();
1104                                        continue;
1105                                    }
1106
1107                                    if block_enabled {
1108                                        if let Some(on_block_cb) = on_block.as_ref() {
1109                                            let keyed_rewards = std::mem::take(&mut this_block_rewards);
1110                                            if slot > last_emitted_slot {
1111                                                last_emitted_slot = slot;
1112                                                on_block_cb(
1113                                                    thread_index,
1114                                                    BlockData::Block {
1115                                                        parent_slot: block.meta.parent_slot,
1116                                                        parent_blockhash: previous_blockhash,
1117                                                        slot: block.slot,
1118                                                        blockhash: latest_entry_blockhash,
1119                                                        rewards: KeyedRewardsAndNumPartitions {
1120                                                            keyed_rewards,
1121                                                            num_partitions: None,
1122                                                        },
1123                                                        block_time: Some(block.meta.blocktime as i64),
1124                                                        block_height: block.meta.block_height,
1125                                                        executed_transaction_count:
1126                                                            this_block_executed_transaction_count,
1127                                                        entry_count: this_block_entry_count,
1128                                                    },
1129                                                )
1130                                                .await
1131                                                .map_err(|e| {
1132                                                    (
1133                                                        FirehoseError::BlockHandlerError(e),
1134                                                        error_slot,
1135                                                    )
1136                                                })?;
1137                                            }
1138                                        }
1139                                    } else {
1140                                        this_block_rewards.clear();
1141                                    }
1142                                    previous_blockhash = latest_entry_blockhash;
1143
1144                                    if tracking_enabled {
1145                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1146                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1147                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1148                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1149                                        if let Some(ref mut stats) = thread_stats {
1150                                            stats.blocks_processed += 1;
1151                                            stats.slots_processed += 1;
1152                                            stats.current_slot = slot;
1153                                        }
1154
1155                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1156                                            (&stats_tracking, thread_stats.as_mut())
1157                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1158                                                && let Err(err) = maybe_emit_stats(
1159                                                    stats_tracking.as_ref(),
1160                                                    thread_index,
1161                                                    thread_stats_ref,
1162                                                    &overall_slots_processed,
1163                                                    &overall_blocks_processed,
1164                                                    &overall_transactions_processed,
1165                                                    &overall_entries_processed,
1166                                                &transactions_since_stats,
1167                                                &blocks_since_stats,
1168                                                &slots_since_stats,
1169                                                &last_pulse,
1170                                                start_time,
1171                                            )
1172                                            .await
1173                                            {
1174                                                blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1175                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1176                                                    overall_blocks_processed
1177                                                        .fetch_sub(1, Ordering::Relaxed);
1178                                                    overall_slots_processed
1179                                                        .fetch_sub(1, Ordering::Relaxed);
1180                                                    if let Some((
1181                                                        prev_slots_processed,
1182                                                        prev_blocks_processed,
1183                                                        prev_leader_skipped,
1184                                                        prev_current_slot,
1185                                                    )) = thread_stats_snapshot
1186                                                    {
1187                                                        thread_stats_ref.slots_processed =
1188                                                            prev_slots_processed;
1189                                                        thread_stats_ref.blocks_processed =
1190                                                            prev_blocks_processed;
1191                                                        thread_stats_ref.leader_skipped_slots =
1192                                                            prev_leader_skipped;
1193                                                        thread_stats_ref.current_slot =
1194                                                            prev_current_slot;
1195                                                    }
1196                                                    last_counted_slot = prev_last_counted_slot;
1197                                                    return Err(err);
1198                                                }
1199                                    }
1200
1201                                    if slot > last_counted_slot {
1202                                        last_counted_slot = slot;
1203                                    }
1204                                }
1205                                Subset(_subset) => (),
1206                                Epoch(_epoch) => (),
1207                                Rewards(rewards) => {
1208                                    if reward_enabled || block_enabled {
1209                                        let reassembled = nodes
1210                                            .reassemble_dataframes(rewards.data.clone())
1211                                            .map_err(|err| {
1212                                                (
1213                                                    FirehoseError::NodeDecodingError(item_index, err),
1214                                                    current_slot.unwrap_or(slot_range.start),
1215                                                )
1216                                            })?;
1217                                        if reassembled.is_empty() {
1218                                            this_block_rewards.clear();
1219                                            if reward_enabled
1220                                                && let Some(on_reward_cb) = on_reward.as_ref()
1221                                            {
1222                                                on_reward_cb(
1223                                                    thread_index,
1224                                                    RewardsData {
1225                                                        slot: block.slot,
1226                                                        rewards: Vec::new(),
1227                                                    },
1228                                                )
1229                                                .await
1230                                                .map_err(|e| {
1231                                                    (
1232                                                        FirehoseError::RewardHandlerError(e),
1233                                                        error_slot,
1234                                                    )
1235                                                })?;
1236                                            }
1237                                            continue;
1238                                        }
1239
1240                                        let decompressed = utils::decompress_zstd(reassembled)
1241                                            .map_err(|err| {
1242                                                (
1243                                                    FirehoseError::NodeDecodingError(
1244                                                        item_index,
1245                                                        err,
1246                                                    ),
1247                                                    error_slot,
1248                                                )
1249                                            })?;
1250
1251                                        let decoded =
1252                                            prost_011::Message::decode(decompressed.as_slice())
1253                                                .map_err(|err| {
1254                                                    (
1255                                                        FirehoseError::NodeDecodingError(
1256                                                            item_index,
1257                                                            Box::new(err),
1258                                                        ),
1259                                                        error_slot,
1260                                                    )
1261                                                })?;
1262                                        let keyed_rewards = convert_proto_rewards(&decoded)
1263                                            .map_err(|err| {
1264                                                (
1265                                                    FirehoseError::NodeDecodingError(item_index, err),
1266                                                    error_slot,
1267                                                )
1268                                            })?;
1269                                        if reward_enabled
1270                                            && let Some(on_reward_cb) = on_reward.as_ref()
1271                                        {
1272                                            on_reward_cb(
1273                                                thread_index,
1274                                                RewardsData {
1275                                                    slot: block.slot,
1276                                                    rewards: keyed_rewards.clone(),
1277                                                },
1278                                            )
1279                                            .await
1280                                            .map_err(|e| {
1281                                                (
1282                                                    FirehoseError::RewardHandlerError(e),
1283                                                    error_slot,
1284                                                )
1285                                            })?;
1286                                        }
1287                                        this_block_rewards = keyed_rewards;
1288                                        if let Some(ref mut stats) = thread_stats {
1289                                            stats.rewards_processed +=
1290                                                this_block_rewards.len() as u64;
1291                                        }
1292                                    }
1293                                }
1294                                DataFrame(_data_frame) => (),
1295                            }
1296                        }
1297                        if block.slot == slot_range.end - 1 {
1298                            let finish_time = std::time::Instant::now();
1299                            let elapsed = finish_time.duration_since(start_time);
1300                            log::info!(target: &log_target, "processed slot {}", block.slot);
1301                            let elapsed_pretty = human_readable_duration(elapsed);
1302                            log::info!(
1303                                target: &log_target,
1304                                "processed {} slots across {} epochs in {}.",
1305                                slot_range.end - slot_range.start,
1306                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1307                                elapsed_pretty
1308                            );
1309                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1310                            // On completion, report threads with non-zero error counts for
1311                            // visibility.
1312                            let summary: String = error_counts
1313                                .iter()
1314                                .enumerate()
1315                                .filter_map(|(i, c)| {
1316                                    let v = c.load(Ordering::Relaxed);
1317                                    if v > 0 {
1318                                        Some(format!("{:03}({})", i, v))
1319                                    } else {
1320                                        None
1321                                    }
1322                                })
1323                                .collect::<Vec<_>>()
1324                                .join(", ");
1325                            if !summary.is_empty() {
1326                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1327                            }
1328                            return Ok(());
1329                        }
1330                    }
1331                    if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1332                        && last_counted_slot < expected_last_slot
1333                    {
1334                        // Do not synthesize skipped slots during final flush; another thread may
1335                        // cover the remaining range (especially across epoch boundaries).
1336                    }
1337                    if let Some(ref mut stats) = thread_stats {
1338                        stats.finish_time = Some(std::time::Instant::now());
1339                        maybe_emit_stats(
1340                            stats_tracking.as_ref(),
1341                            thread_index,
1342                            stats,
1343                            &overall_slots_processed,
1344                            &overall_blocks_processed,
1345                            &overall_transactions_processed,
1346                            &overall_entries_processed,
1347                            &transactions_since_stats,
1348                            &blocks_since_stats,
1349                            &slots_since_stats,
1350                            &last_pulse,
1351                            start_time,
1352                        )
1353                        .await?;
1354                    }
1355                    if block_enabled {
1356                        pending_skipped_slots.remove(&thread_index);
1357                    }
1358                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1359                    }
1360                    Ok(())
1361            }
1362            .await
1363            {
1364                if is_shutdown_error(&err) {
1365                    log::info!(
1366                        target: &log_target,
1367                        "shutdown requested; terminating firehose thread {}",
1368                        thread_index
1369                    );
1370                    break;
1371                }
1372                let epoch = slot_to_epoch(slot);
1373                let item_index = match &err {
1374                    FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1375                    _ => 0,
1376                };
1377                let error_message = err.to_string();
1378                log::error!(
1379                    target: &log_target,
1380                    "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1381                    slot,
1382                    epoch
1383                );
1384                log::error!(target: &log_target, "{}", error_message);
1385                if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
1386                    // Clear cached index data for this epoch to avoid retrying with a bad/partial index.
1387                    SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1388                }
1389                if let Some(on_error_cb) = on_error.clone() {
1390                    let context = FirehoseErrorContext {
1391                        thread_id: thread_index,
1392                        slot,
1393                        epoch,
1394                        error_message: error_message.clone(),
1395                    };
1396                    if let Err(handler_err) = on_error_cb(thread_index, context).await {
1397                        log::error!(
1398                            target: &log_target,
1399                            "on_error handler failed: {}",
1400                            handler_err
1401                        );
1402                    }
1403                }
1404                // Increment this thread's error counter
1405                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1406                log::warn!(
1407                    target: &log_target,
1408                    "restarting from slot {} at index {}",
1409                    slot,
1410                    item_index,
1411                );
1412                // Update slot range to resume from the failed slot, not the original start.
1413                // Reset local tracking so we don't treat the resumed slot range as already counted.
1414                // If we've already counted this slot, resume from the next one to avoid duplicates.
1415                if slot <= last_counted_slot {
1416                    slot_range.start = last_counted_slot.saturating_add(1);
1417                } else {
1418                    slot_range.start = slot;
1419                }
1420                // Reset pulse timer to exclude downtime from next rate calc.
1421                last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1422                if tracking_enabled
1423                    && let Some(ref mut stats_ref) = thread_stats {
1424                        stats_ref.slot_range.start = slot_range.start;
1425                        stats_ref.slot_range.end = slot_range.end;
1426                        // initial_slot_range remains unchanged for progress reporting.
1427                    }
1428                if block_enabled {
1429                    pending_skipped_slots.remove(&thread_index);
1430                }
1431                skip_until_index = Some(item_index);
1432                last_emitted_slot_global = last_emitted_slot;
1433            }
1434        });
1435        handles.push(handle);
1436    }
1437
1438    // Wait for all threads to complete
1439    for handle in handles {
1440        handle.await.unwrap();
1441    }
1442    if stats_tracking.is_some() {
1443        let elapsed = firehose_start.elapsed();
1444        let elapsed_secs = elapsed.as_secs_f64();
1445        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1446        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1447        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1448        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1449        let total_errors: u64 = error_counts
1450            .iter()
1451            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1452            .sum();
1453        let overall_tps = if elapsed_secs > 0.0 {
1454            total_transactions as f64 / elapsed_secs
1455        } else {
1456            0.0
1457        };
1458        log::info!(
1459            target: LOG_MODULE,
1460            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1461            elapsed_secs,
1462            total_slots,
1463            total_blocks,
1464            total_leader_skipped,
1465            total_transactions,
1466            overall_tps,
1467            total_errors
1468        );
1469    }
1470    if shutdown_flag.load(Ordering::SeqCst) {
1471        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1472    } else {
1473        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1474    }
1475    Ok(())
1476}
1477
1478#[allow(clippy::result_large_err)]
1479/// Builds a Geyser-backed firehose and returns a slot notification stream.
1480///
1481/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1482/// rather than relying solely on remote streams. The provided `slot_range` is treated as a
1483/// half-open interval `[start, end)`, and the thread will restart from the last processed
1484/// slot on recoverable errors to maintain coverage.
1485pub fn firehose_geyser(
1486    rt: Arc<tokio::runtime::Runtime>,
1487    slot_range: Range<u64>,
1488    geyser_config_files: Option<&[PathBuf]>,
1489    index_base_url: &Url,
1490    client: &Client,
1491    on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1492    threads: u64,
1493) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1494    if threads == 0 {
1495        return Err((
1496            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1497            slot_range.start,
1498        ));
1499    }
1500    log::info!(target: LOG_MODULE, "starting firehose...");
1501    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1502    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1503    let mut entry_notifier_maybe = None;
1504    let mut block_meta_notifier_maybe = None;
1505    let mut transaction_notifier_maybe = None;
1506    if let Some(geyser_config_files) = geyser_config_files {
1507        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1508
1509        let service =
1510            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1511                confirmed_bank_receiver.clone(),
1512                true,
1513                geyser_config_files,
1514            )
1515            .map_err(|e| (e.into(), slot_range.start))?;
1516
1517        transaction_notifier_maybe = Some(
1518            service
1519                .get_transaction_notifier()
1520                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1521                .map_err(|e| (e, slot_range.start))?,
1522        );
1523
1524        entry_notifier_maybe = service.get_entry_notifier();
1525        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1526
1527        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1528    }
1529
1530    if entry_notifier_maybe.is_some() {
1531        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1532    } else {
1533        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1534    }
1535    log::info!(target: LOG_MODULE, "running on_load...");
1536    rt.spawn(on_load);
1537
1538    let slot_range = Arc::new(slot_range);
1539    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1540    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1541    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1542    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1543
1544    // divide slot_range into n subranges
1545    let subranges = generate_subranges(&slot_range, threads);
1546    if threads > 1 {
1547        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1548    }
1549
1550    let mut handles = Vec::new();
1551    // Shared per-thread error counters
1552    let error_counts: Arc<Vec<AtomicU32>> =
1553        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1554
1555    for (i, slot_range) in subranges.into_iter().enumerate() {
1556        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1557        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1558        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1559        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1560        let client = client.clone();
1561        let error_counts = error_counts.clone();
1562
1563        let rt_clone = rt.clone();
1564
1565        let handle = std::thread::spawn(move || {
1566            rt_clone.block_on(async {
1567                firehose_geyser_thread(
1568                    slot_range,
1569                    transaction_notifier_maybe,
1570                    entry_notifier_maybe,
1571                    block_meta_notifier_maybe,
1572                    confirmed_bank_sender,
1573                    &client,
1574                    if threads > 1 { Some(i) } else { None },
1575                    error_counts,
1576                )
1577                .await
1578                .unwrap();
1579            });
1580        });
1581        handles.push(handle);
1582    }
1583
1584    // Wait for all threads to complete
1585    for handle in handles {
1586        handle.join().unwrap();
1587    }
1588    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1589    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1590        block_meta_notifier.notify_block_metadata(
1591            u64::MAX,
1592            "unload",
1593            u64::MAX,
1594            "unload",
1595            &KeyedRewardsAndNumPartitions {
1596                keyed_rewards: vec![],
1597                num_partitions: None,
1598            },
1599            None,
1600            None,
1601            0,
1602            0,
1603        );
1604    }
1605    Ok(confirmed_bank_receiver)
1606}
1607
1608#[allow(clippy::too_many_arguments)]
1609#[allow(clippy::result_large_err)]
1610async fn firehose_geyser_thread(
1611    mut slot_range: Range<u64>,
1612    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1613    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1614    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1615    confirmed_bank_sender: Sender<SlotNotification>,
1616    client: &Client,
1617    thread_index: Option<usize>,
1618    error_counts: Arc<Vec<AtomicU32>>,
1619) -> Result<(), (FirehoseError, u64)> {
1620    let start_time = std::time::Instant::now();
1621    let log_target = if let Some(thread_index) = thread_index {
1622        format!("{}::T{:03}", LOG_MODULE, thread_index)
1623    } else {
1624        LOG_MODULE.to_string()
1625    };
1626    let initial_slot_range = slot_range.clone();
1627    let mut skip_until_index = None;
1628    let mut last_counted_slot = slot_range.start.saturating_sub(1);
1629    // let mut triggered = false;
1630    while let Err((err, slot)) = async {
1631            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1632            log::info!(
1633                target: &log_target,
1634                "slot range: {} (epoch {}) ... {} (epoch {})",
1635                slot_range.start,
1636                slot_to_epoch(slot_range.start),
1637                slot_range.end,
1638                slot_to_epoch(slot_range.end)
1639            );
1640
1641            log::info!(target: &log_target, "🚒 starting firehose...");
1642
1643            // for each epoch
1644            let mut current_slot: Option<u64> = None;
1645            for epoch_num in epoch_range.clone() {
1646                log::info!(target: &log_target, "entering epoch {}", epoch_num);
1647                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1648                    Ok(stream) => stream,
1649                    Err(_) => {
1650                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1651                    }
1652                };
1653                let mut reader = NodeReader::new(stream);
1654
1655                let header_fut = reader.read_raw_header();
1656                let header = match timeout(OP_TIMEOUT, header_fut).await {
1657                    Ok(res) => res
1658                        .map_err(FirehoseError::ReadHeader)
1659                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1660                    Err(_) => {
1661                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1662                    }
1663                };
1664                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1665
1666                let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1667                let local_start = std::cmp::max(slot_range.start, epoch_start);
1668                let local_end_inclusive =
1669                    std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1670                if local_start > local_end_inclusive {
1671                    log::debug!(
1672                        target: &log_target,
1673                        "epoch {} has no overlap with thread range ({}..{}), skipping",
1674                        epoch_num,
1675                        slot_range.start,
1676                        slot_range.end
1677                    );
1678                    continue;
1679                }
1680
1681                let mut todo_previous_blockhash = Hash::default();
1682                let mut todo_latest_entry_blockhash = Hash::default();
1683                // Reset counters to align to the local epoch slice; prevents boundary slots
1684                // from being treated as already-counted after a restart.
1685                last_counted_slot = local_start.saturating_sub(1);
1686                current_slot = None;
1687
1688                if local_start > epoch_start {
1689                    // Seek to the slot immediately preceding the requested range so the reader
1690                    // captures the full node set (transactions, entries, rewards) for the
1691                    // target block on the next iteration.
1692                    let seek_slot = local_start.saturating_sub(1);
1693                    let seek_fut = reader.seek_to_slot(seek_slot);
1694                    match timeout(OP_TIMEOUT, seek_fut).await {
1695                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1696                        Err(_) => {
1697                            return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1698                        }
1699                    }
1700                }
1701
1702                // for each item in each block
1703                let mut item_index = 0;
1704                let mut displayed_skip_message = false;
1705                loop {
1706                    let read_fut = reader.read_until_block();
1707                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1708                        Ok(result) => result
1709                            .map_err(FirehoseError::ReadUntilBlockError)
1710                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1711                        Err(_) => {
1712                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1713                            let restart_slot =
1714                                current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
1715                            return Err((
1716                                FirehoseError::OperationTimeout("read_until_block"),
1717                                restart_slot,
1718                            ));
1719                        }
1720                    };
1721                    if nodes.is_empty() {
1722                        log::info!(
1723                            target: &log_target,
1724                            "reached end of epoch {}",
1725                            epoch_num
1726                        );
1727                        break;
1728                    }
1729                    // ignore epoch and subset nodes at end of car file loop { if
1730                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
1731                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
1732                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
1733                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
1734                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
1735                    //         log::info!(target: &log_target, "reached end of epoch {}",
1736                    //             epoch_num); break; }
1737                    if let Some(last_node) = nodes.0.last()
1738                        && !last_node.get_node().is_block() {
1739                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1740                            break;
1741                        }
1742                    let block = nodes
1743                        .get_block()
1744                        .map_err(FirehoseError::GetBlockError)
1745                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1746                    log::debug!(
1747                        target: &log_target,
1748                        "read {} items from epoch {}, now at slot {}",
1749                        item_index,
1750                        epoch_num,
1751                        block.slot
1752                    );
1753                    let slot = block.slot;
1754                    if slot > local_end_inclusive {
1755                        log::debug!(
1756                            target: &log_target,
1757                            "reached end of local slice at slot {} (epoch {}), stopping",
1758                            slot,
1759                            epoch_num
1760                        );
1761                        break;
1762                    }
1763                    if slot >= slot_range.end {
1764                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1765                        // Return early to terminate the firehose thread cleanly. We use >=
1766                        // because slot_range is half-open [start, end), so any slot equal to
1767                        // end is out-of-range and must not be processed.
1768                        return Ok(());
1769                    }
1770                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1771                    if slot < local_start {
1772                        if slot.saturating_add(1) == local_start {
1773                            log::debug!(
1774                                target: &log_target,
1775                                "priming reader with preceding slot {}, skipping",
1776                                slot
1777                            );
1778                        } else {
1779                            log::warn!(
1780                                target: &log_target,
1781                                "encountered slot {} before start of range {}, skipping",
1782                                slot,
1783                                local_start
1784                            );
1785                        }
1786                        continue;
1787                    }
1788                    current_slot = Some(slot);
1789                    let mut entry_index: usize = 0;
1790                    let mut this_block_executed_transaction_count: u64 = 0;
1791                    let mut this_block_entry_count: u64 = 0;
1792                    let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1793
1794                    if slot <= last_counted_slot {
1795                        log::debug!(
1796                            target: &log_target,
1797                            "duplicate block {}, already counted (last_counted={})",
1798                            slot,
1799                            last_counted_slot,
1800                        );
1801                        this_block_rewards.clear();
1802                        continue;
1803                    }
1804
1805                    nodes.each(|node_with_cid| -> Result<(), SharedError> {
1806                        item_index += 1;
1807                        // if item_index == 100000 && !triggered { log::info!("simulating
1808                        //     error"); triggered = true; return
1809                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
1810                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
1811                        //         "simulated error", )), ))); }
1812                        if let Some(skip) = skip_until_index {
1813                            if item_index < skip {
1814                                if !displayed_skip_message {
1815                                    log::info!(
1816                                        target: &log_target,
1817                                        "skipping until index {} (at {})",
1818                                        skip,
1819                                        item_index
1820                                    );
1821                                    displayed_skip_message = true;
1822                                }
1823                                return Ok(());
1824                            } else {
1825                                log::info!(
1826                                    target: &log_target,
1827                                    "reached target index {}, resuming...",
1828                                    skip
1829                                );
1830                                skip_until_index = None;
1831                            }
1832                        }
1833                        let node = node_with_cid.get_node();
1834
1835                        use crate::node::Node::*;
1836                        match node {
1837                            Transaction(tx) => {
1838                                let versioned_tx = tx.as_parsed()?;
1839                                let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1840
1841                                let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1842
1843                                let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1844                                    prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1845                                        Box::new(std::io::Error::other(
1846                                            std::format!("Error decoding metadata: {:?}", err),
1847                                        ))
1848                                    })?;
1849
1850                                let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1851                                    metadata.try_into()?;
1852
1853                                let message_hash = {
1854                                    #[cfg(feature = "verify-transaction-signatures")]
1855                                    {
1856                                        versioned_tx.verify_and_hash_message()?
1857                                    }
1858                                    #[cfg(not(feature = "verify-transaction-signatures"))]
1859                                    {
1860                                        // Signature verification is optional because it is
1861                                        // extremely expensive at replay scale.
1862                                        versioned_tx.message.hash()
1863                                    }
1864                                };
1865                                let signature = versioned_tx
1866                                    .signatures
1867                                    .first()
1868                                    .ok_or_else(|| {
1869                                        Box::new(std::io::Error::new(
1870                                            std::io::ErrorKind::InvalidData,
1871                                            "transaction missing signature",
1872                                        )) as SharedError
1873                                    })?;
1874                                let is_vote = is_simple_vote_transaction(&versioned_tx);
1875
1876                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1877                                    transaction_notifier.notify_transaction(
1878                                        block.slot,
1879                                        tx.index.unwrap() as usize,
1880                                        signature,
1881                                        &message_hash,
1882                                        is_vote,
1883                                        &as_native_metadata,
1884                                        &versioned_tx,
1885                                    );
1886                                }
1887
1888                            }
1889                            Entry(entry) => {
1890                                let entry_hash = Hash::from(entry.hash.to_bytes());
1891                                let entry_transaction_count = entry.transactions.len();
1892                                let entry_transaction_count_u64 = entry_transaction_count as u64;
1893                                let starting_transaction_index =
1894                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1895                                        Box::new(std::io::Error::other(
1896                                            "transaction index exceeds usize range",
1897                                        )) as SharedError
1898                                    })?;
1899                                todo_latest_entry_blockhash = entry_hash;
1900                                this_block_executed_transaction_count += entry_transaction_count_u64;
1901                                this_block_entry_count += 1;
1902                                if entry_notifier_maybe.is_none() {
1903                                    return Ok(());
1904                                }
1905                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1906                                let entry_summary = solana_entry::entry::EntrySummary {
1907                                    num_hashes: entry.num_hashes,
1908                                    hash: Hash::from(entry.hash.to_bytes()),
1909                                    num_transactions: entry_transaction_count_u64,
1910                                };
1911                                entry_notifier.notify_entry(
1912                                    block.slot,
1913                                    entry_index,
1914                                    &entry_summary,
1915                                    starting_transaction_index,
1916                                );
1917                                entry_index += 1;
1918                            }
1919                            Block(block) => {
1920                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1921                                confirmed_bank_sender.send(notification).unwrap();
1922
1923                                if block_meta_notifier_maybe.is_none() {
1924                                    last_counted_slot = block.slot;
1925                                    return Ok(());
1926                                }
1927                                let keyed_rewards = std::mem::take(&mut this_block_rewards);
1928                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1929                                block_meta_notifier.notify_block_metadata(
1930                                    block.meta.parent_slot,
1931                                    todo_previous_blockhash.to_string().as_str(),
1932                                    block.slot,
1933                                    todo_latest_entry_blockhash.to_string().as_str(),
1934                                    &KeyedRewardsAndNumPartitions {
1935                                        keyed_rewards,
1936                                        num_partitions: None,
1937                                    },
1938                                    Some(block.meta.blocktime as i64),
1939                                    block.meta.block_height,
1940                                    this_block_executed_transaction_count,
1941                                    this_block_entry_count,
1942                                );
1943                                todo_previous_blockhash = todo_latest_entry_blockhash;
1944                                last_counted_slot = block.slot;
1945                                std::thread::yield_now();
1946                            }
1947                            Subset(_subset) => (),
1948                            Epoch(_epoch) => (),
1949                            Rewards(rewards) => {
1950                                if !rewards.is_complete() {
1951                                    let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1952                                    let decompressed = utils::decompress_zstd(reassembled)?;
1953                                    let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1954                                        Box::new(std::io::Error::other(
1955                                            std::format!("Error decoding rewards: {:?}", err),
1956                                        ))
1957                                    })?;
1958                                    this_block_rewards = convert_proto_rewards(&decoded)?;
1959                                }
1960                            }
1961                            DataFrame(_data_frame) => (),
1962                        }
1963                        Ok(())
1964                    })
1965                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1966                    if block.slot == slot_range.end - 1 {
1967                        let finish_time = std::time::Instant::now();
1968                        let elapsed = finish_time.duration_since(start_time);
1969                        log::info!(target: &log_target, "processed slot {}", block.slot);
1970                        let elapsed_pretty = human_readable_duration(elapsed);
1971                        log::info!(
1972                            target: &log_target,
1973                            "processed {} slots across {} epochs in {}.",
1974                            initial_slot_range.end - initial_slot_range.start,
1975                            slot_to_epoch(initial_slot_range.end)
1976                                + 1
1977                                - slot_to_epoch(initial_slot_range.start),
1978                            elapsed_pretty
1979                        );
1980                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1981                        // On completion, report threads with non-zero error counts for
1982                        // visibility.
1983                        let summary: String = error_counts
1984                            .iter()
1985                            .enumerate()
1986                            .filter_map(|(i, c)| {
1987                                let v = c.load(Ordering::Relaxed);
1988                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1989                            })
1990                            .collect::<Vec<_>>()
1991                            .join(", ");
1992                        if !summary.is_empty() {
1993                            log::debug!(target: &log_target, "threads with errors: {}", summary);
1994                        }
1995                        return Ok(());
1996                    }
1997                }
1998            }
1999            Ok(())
2000}
2001.await
2002{
2003        if is_shutdown_error(&err) {
2004            log::info!(
2005                target: &log_target,
2006                "shutdown requested; terminating firehose thread {:?}",
2007                thread_index
2008            );
2009            return Ok(());
2010        }
2011        log::error!(
2012            target: &log_target,
2013            "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2014            slot,
2015            slot_to_epoch(slot)
2016            );
2017            log::error!(target: &log_target, "{}", err);
2018            if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
2019                // Clear cached index data for this epoch to avoid retrying with a bad/partial index.
2020                SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2021            }
2022            let item_index = match err {
2023                FirehoseError::NodeDecodingError(item_index, _) => item_index,
2024                _ => 0,
2025            };
2026            // Increment this thread's error counter
2027            let idx = thread_index.unwrap_or(0);
2028            error_counts[idx].fetch_add(1, Ordering::Relaxed);
2029            log::warn!(
2030                target: &log_target,
2031                "restarting from slot {} at index {}",
2032                slot,
2033                item_index,
2034            );
2035            // Update slot range to resume from the failed slot, not the original start.
2036            // If the failing slot was already fully processed, resume from the next slot.
2037            if slot <= last_counted_slot {
2038                slot_range.start = last_counted_slot.saturating_add(1);
2039            } else {
2040                slot_range.start = slot;
2041            }
2042            skip_until_index = Some(item_index);
2043}
2044    Ok(())
2045}
2046
2047#[inline]
2048fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2049    if !(1..=2).contains(&versioned_tx.signatures.len()) {
2050        return false;
2051    }
2052
2053    if !matches!(
2054        versioned_tx.version(),
2055        solana_transaction::versioned::TransactionVersion::Legacy(_)
2056    ) {
2057        return false;
2058    }
2059
2060    let instructions = versioned_tx.message.instructions();
2061    if instructions.len() != 1 {
2062        return false;
2063    }
2064
2065    let program_index = instructions[0].program_id_index as usize;
2066    versioned_tx
2067        .message
2068        .static_account_keys()
2069        .get(program_index)
2070        .map(|program_id| program_id == &vote_program_id())
2071        .unwrap_or(false)
2072}
2073
2074#[inline(always)]
2075fn convert_proto_rewards(
2076    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2077) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2078    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2079    for proto_reward in proto_rewards.rewards.iter() {
2080        let reward = RewardInfo {
2081            reward_type: match proto_reward.reward_type - 1 {
2082                0 => RewardType::Fee,
2083                1 => RewardType::Rent,
2084                2 => RewardType::Staking,
2085                3 => RewardType::Voting,
2086                typ => {
2087                    return Err(Box::new(std::io::Error::other(format!(
2088                        "unsupported reward type {}",
2089                        typ
2090                    ))));
2091                }
2092            },
2093            lamports: proto_reward.lamports,
2094            post_balance: proto_reward.post_balance,
2095            commission: proto_reward.commission.parse::<u8>().ok(),
2096        };
2097        let pubkey = proto_reward
2098            .pubkey
2099            .parse::<Address>()
2100            .map_err(|err| Box::new(err) as SharedError)?;
2101        keyed_rewards.push((pubkey, reward));
2102    }
2103    Ok(keyed_rewards)
2104}
2105
2106#[inline]
2107/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
2108pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2109    let total = slot_range.end - slot_range.start;
2110    let slots_per_thread = total / threads;
2111    let remainder = total % threads;
2112
2113    let ranges: Vec<Range<u64>> = (0..threads)
2114        .map(|i| {
2115            // Distribute remainder slots to the first `remainder` threads
2116            let extra_slot = if i < remainder { 1 } else { 0 };
2117            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2118            let end = start + slots_per_thread + extra_slot;
2119            start..end
2120        })
2121        .collect();
2122
2123    // Verify that ranges cover all slots exactly
2124    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2125    assert_eq!(
2126        total_covered, total,
2127        "Range generation failed: {} threads should cover {} slots but only cover {}",
2128        threads, total, total_covered
2129    );
2130
2131    // Verify no gaps between ranges
2132    for i in 1..ranges.len() {
2133        assert_eq!(
2134            ranges[i - 1].end,
2135            ranges[i].start,
2136            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2137            i - 1,
2138            ranges[i - 1].end,
2139            i,
2140            ranges[i].start
2141        );
2142    }
2143
2144    log::info!(
2145        target: LOG_MODULE,
2146        "Generated {} thread ranges covering {} slots total",
2147        threads,
2148        total_covered
2149    );
2150    ranges
2151}
2152
2153fn human_readable_duration(duration: std::time::Duration) -> String {
2154    if duration.is_zero() {
2155        return "0s".into();
2156    }
2157    let total_secs = duration.as_secs();
2158    if total_secs < 60 {
2159        let secs_f = duration.as_secs_f64();
2160        if total_secs == 0 {
2161            format!("{:.2}s", secs_f)
2162        } else if duration.subsec_millis() == 0 {
2163            format!("{}s", total_secs)
2164        } else {
2165            format!("{:.2}s", secs_f)
2166        }
2167    } else {
2168        let mut secs = total_secs;
2169        let days = secs / 86_400;
2170        secs %= 86_400;
2171        let hours = secs / 3_600;
2172        secs %= 3_600;
2173        let minutes = secs / 60;
2174        secs %= 60;
2175        if days > 0 {
2176            if hours > 0 {
2177                format!("{days}d{hours}h")
2178            } else {
2179                format!("{days}d")
2180            }
2181        } else if hours > 0 {
2182            if minutes > 0 {
2183                format!("{hours}h{minutes}m")
2184            } else {
2185                format!("{hours}h")
2186            }
2187        } else if minutes > 0 {
2188            if secs > 0 {
2189                format!("{minutes}m{secs}s")
2190            } else {
2191                format!("{minutes}m")
2192            }
2193        } else {
2194            format!("{secs}s")
2195        }
2196    }
2197}
2198
2199#[cfg(test)]
2200fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2201    Box::pin(async move {
2202        let elapsed = stats.start_time.elapsed();
2203        let elapsed_secs = elapsed.as_secs_f64();
2204        let tps = if elapsed_secs > 0.0 {
2205            stats.transactions_processed as f64 / elapsed_secs
2206        } else {
2207            0.0
2208        };
2209        log::info!(
2210            target: LOG_MODULE,
2211            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2212            stats.thread_stats.current_slot,
2213            stats.slots_processed,
2214            stats.blocks_processed,
2215            stats.transactions_processed,
2216            stats.entries_processed,
2217            stats.rewards_processed,
2218            elapsed_secs,
2219            tps
2220        );
2221        Ok(())
2222    })
2223}
2224
2225#[tokio::test(flavor = "multi_thread")]
2226async fn test_firehose_epoch_800() {
2227    use dashmap::DashSet;
2228    use std::sync::atomic::{AtomicU64, Ordering};
2229    solana_logger::setup_with_default("info");
2230    const THREADS: usize = 4;
2231    const NUM_SLOTS_TO_COVER: u64 = 50;
2232    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2233    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2234    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2235    static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2236    static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2237    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2238    let stats_tracking = StatsTracking {
2239        on_stats: log_stats_handler,
2240        tracking_interval_slots: 10,
2241    };
2242
2243    for prev in PREV_BLOCK.iter() {
2244        prev.store(0, Ordering::Relaxed);
2245    }
2246    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2247    NUM_BLOCKS.store(0, Ordering::Relaxed);
2248    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2249    SEEN_SLOTS.get_or_init(DashSet::new).clear();
2250    SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2251
2252    firehose(
2253        THREADS.try_into().unwrap(),
2254        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2255        Some(|thread_id: usize, block: BlockData| {
2256            async move {
2257                let _prev =
2258                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2259                if block.was_skipped() {
2260                    log::info!(
2261                        target: LOG_MODULE,
2262                        "leader skipped block {} on thread {}",
2263                        block.slot(),
2264                        thread_id,
2265                    );
2266                } else {
2267                    /*log::info!(
2268                        target: LOG_MODULE,
2269                        "got block {} on thread {}",
2270                        block.slot(),
2271                        thread_id,
2272                    );*/
2273                }
2274
2275                let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2276                if block.was_skipped() {
2277                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2278                    SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2279                } else {
2280                    if first_time {
2281                        NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2282                        if let BlockData::Block {
2283                            executed_transaction_count,
2284                            ..
2285                        } = &block
2286                        {
2287                            let executed = *executed_transaction_count;
2288                            let _ = MIN_TRANSACTIONS.fetch_update(
2289                                Ordering::Relaxed,
2290                                Ordering::Relaxed,
2291                                |current| {
2292                                    if executed < current {
2293                                        Some(executed)
2294                                    } else {
2295                                        None
2296                                    }
2297                                },
2298                            );
2299                        }
2300                    }
2301                }
2302                Ok(())
2303            }
2304            .boxed()
2305        }),
2306        None::<OnTxFn>,
2307        None::<OnEntryFn>,
2308        None::<OnRewardFn>,
2309        None::<OnErrorFn>,
2310        Some(stats_tracking),
2311        None,
2312    )
2313    .await
2314    .unwrap();
2315    let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2316    assert_eq!(
2317        seen, NUM_SLOTS_TO_COVER,
2318        "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2319    );
2320    let mut skipped: Vec<u64> = SEEN_SKIPPED
2321        .get_or_init(DashSet::new)
2322        .iter()
2323        .map(|v| *v)
2324        .collect();
2325    skipped.sort_unstable();
2326    // 345600000 is present but empty; still emitted as a block. Skip set should not include it.
2327    const EXPECTED_SKIPPED: [u64; 6] = [
2328        345_600_004,
2329        345_600_005,
2330        345_600_008,
2331        345_600_009,
2332        345_600_010,
2333        345_600_011,
2334    ];
2335    assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2336    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2337}
2338
2339#[tokio::test(flavor = "multi_thread")]
2340async fn test_firehose_target_slot_transactions() {
2341    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2342    solana_logger::setup_with_default("info");
2343    const TARGET_SLOT: u64 = 376_273_722;
2344    const SLOT_RADIUS: u64 = 50;
2345    const EXPECTED_TRANSACTIONS: u64 = 1414;
2346    const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2347    static FOUND: AtomicBool = AtomicBool::new(false);
2348    static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2349    static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2350
2351    FOUND.store(false, Ordering::Relaxed);
2352    OBSERVED_TXS.store(0, Ordering::Relaxed);
2353    OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2354
2355    firehose(
2356        4,
2357        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2358        Some(|_thread_id: usize, block: BlockData| {
2359            async move {
2360                if block.slot() == TARGET_SLOT {
2361                    assert!(
2362                        !block.was_skipped(),
2363                        "target slot {TARGET_SLOT} was marked leader skipped",
2364                    );
2365                    if let BlockData::Block {
2366                        executed_transaction_count,
2367                        ..
2368                    } = block
2369                    {
2370                        OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2371                        FOUND.store(true, Ordering::Relaxed);
2372                        assert_eq!(
2373                            executed_transaction_count, EXPECTED_TRANSACTIONS,
2374                            "unexpected transaction count for slot {TARGET_SLOT}"
2375                        );
2376                        assert_eq!(
2377                            OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2378                            EXPECTED_NON_VOTE_TRANSACTIONS,
2379                            "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2380                        );
2381                    }
2382                }
2383                Ok(())
2384            }
2385            .boxed()
2386        }),
2387        Some(|_thread_id: usize, transaction: TransactionData| {
2388            async move {
2389                if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2390                    OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2391                }
2392                Ok(())
2393            }
2394            .boxed()
2395        }),
2396        None::<OnEntryFn>,
2397        None::<OnRewardFn>,
2398        None::<OnErrorFn>,
2399        None::<OnStatsTrackingFn>,
2400        None,
2401    )
2402    .await
2403    .unwrap();
2404
2405    assert!(
2406        FOUND.load(Ordering::Relaxed),
2407        "target slot was not processed"
2408    );
2409    assert_eq!(
2410        OBSERVED_TXS.load(Ordering::Relaxed),
2411        EXPECTED_TRANSACTIONS,
2412        "recorded transaction count mismatch"
2413    );
2414}
2415
2416#[cfg(test)]
2417#[tokio::test(flavor = "multi_thread")]
2418#[serial]
2419async fn test_firehose_restart_loses_coverage_without_reset() {
2420    use std::collections::HashMap;
2421    solana_logger::setup_with_default("info");
2422    const THREADS: usize = 1;
2423    const START_SLOT: u64 = 345_600_000;
2424    const NUM_SLOTS: u64 = 8;
2425
2426    static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
2427    COVERAGE
2428        .get_or_init(|| Mutex::new(HashMap::new()))
2429        .lock()
2430        .unwrap()
2431        .clear();
2432    static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
2433    static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
2434    FAIL_TRIGGERED.store(false, Ordering::Relaxed);
2435    SEEN_BLOCKS.store(0, Ordering::Relaxed);
2436
2437    firehose(
2438        THREADS.try_into().unwrap(),
2439        START_SLOT..(START_SLOT + NUM_SLOTS),
2440        Some(|_thread_id: usize, block: BlockData| {
2441            async move {
2442                // Force an error after at least one block has been seen so restart happens mid-range.
2443                if !block.was_skipped()
2444                    && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
2445                    && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
2446                {
2447                    return Err("synthetic handler failure to exercise restart".into());
2448                }
2449                let mut coverage = COVERAGE
2450                    .get_or_init(|| Mutex::new(HashMap::new()))
2451                    .lock()
2452                    .unwrap();
2453                *coverage.entry(block.slot()).or_insert(0) += 1;
2454                if !block.was_skipped() {
2455                    SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
2456                }
2457                Ok(())
2458            }
2459            .boxed()
2460        }),
2461        None::<OnTxFn>,
2462        None::<OnEntryFn>,
2463        None::<OnRewardFn>,
2464        None::<OnErrorFn>,
2465        None::<OnStatsTrackingFn>,
2466        None,
2467    )
2468    .await
2469    .unwrap();
2470
2471    let coverage = COVERAGE.get().unwrap().lock().unwrap();
2472    for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
2473        assert!(
2474            coverage.contains_key(&slot),
2475            "missing coverage for slot {slot} after restart"
2476        );
2477    }
2478}
2479
2480#[cfg(test)]
2481#[tokio::test(flavor = "multi_thread")]
2482#[serial]
2483async fn test_firehose_gap_coverage_near_known_missing_range() {
2484    use std::collections::HashSet;
2485    solana_logger::setup_with_default("info");
2486    const GAP_START: u64 = 378864000;
2487    const START_SLOT: u64 = GAP_START - 1000;
2488    const END_SLOT: u64 = GAP_START + 1000;
2489    const THREADS: usize = 16;
2490
2491    static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
2492    COVERAGE
2493        .get_or_init(|| Mutex::new(HashSet::new()))
2494        .lock()
2495        .unwrap()
2496        .clear();
2497
2498    firehose(
2499        THREADS.try_into().unwrap(),
2500        START_SLOT..(END_SLOT + 1),
2501        Some(|_thread_id: usize, block: BlockData| {
2502            async move {
2503                if block.was_skipped() {
2504                    return Ok(());
2505                }
2506                let slot = block.slot();
2507                COVERAGE
2508                    .get_or_init(|| Mutex::new(HashSet::new()))
2509                    .lock()
2510                    .unwrap()
2511                    .insert(slot);
2512                Ok(())
2513            }
2514            .boxed()
2515        }),
2516        None::<OnTxFn>,
2517        None::<OnEntryFn>,
2518        None::<OnRewardFn>,
2519        None::<OnErrorFn>,
2520        None::<OnStatsTrackingFn>,
2521        None,
2522    )
2523    .await
2524    .unwrap();
2525
2526    let mut coverage = COVERAGE
2527        .get_or_init(|| Mutex::new(HashSet::new()))
2528        .lock()
2529        .unwrap()
2530        .clone();
2531
2532    // ignore a known 4-slot leader skipped gap
2533    coverage.insert(378864396);
2534    coverage.insert(378864397);
2535    coverage.insert(378864398);
2536    coverage.insert(378864399);
2537
2538    let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
2539    let missing: Vec<u64> = expected
2540        .iter()
2541        .copied()
2542        .filter(|slot| !coverage.contains(slot))
2543        .collect();
2544    assert!(
2545        missing.is_empty(),
2546        "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
2547        missing.len(),
2548        &missing[..missing.len().min(10)]
2549    );
2550}