jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2#[cfg(test)]
3use futures_util::FutureExt;
4use futures_util::future::BoxFuture;
5use reqwest::{Client, Url};
6use solana_geyser_plugin_manager::{
7    block_metadata_notifier_interface::BlockMetadataNotifier,
8    geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_ledger::entry_notifier_interface::EntryNotifier;
11use solana_reward_info::RewardInfo;
12use solana_rpc::{
13    optimistically_confirmed_bank_tracker::SlotNotification,
14    transaction_notifier_interface::TransactionNotifier,
15};
16use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
17use solana_sdk::{hash::Hash, pubkey::Pubkey, transaction::VersionedTransaction};
18use solana_vote_program::id as vote_program_id;
19use std::{
20    fmt::Display,
21    future::Future,
22    io,
23    ops::Range,
24    path::PathBuf,
25    sync::{
26        Arc,
27        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
28    },
29};
30use thiserror::Error;
31use tokio::{
32    sync::broadcast::{self, error::TryRecvError},
33    time::timeout,
34};
35
36use crate::{
37    LOG_MODULE,
38    epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
39    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
40    node_reader::NodeReader,
41    utils,
42};
43
44// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
45// header, seeking, reading next block). Adjust here to tune stall detection/restart
46// aggressiveness.
47const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
48
49fn poll_shutdown(
50    flag: &Arc<std::sync::atomic::AtomicBool>,
51    receiver: &mut Option<broadcast::Receiver<()>>,
52) -> bool {
53    if let Some(rx) = receiver {
54        match rx.try_recv() {
55            Ok(_) | Err(TryRecvError::Lagged(_)) => {
56                flag.store(true, Ordering::SeqCst);
57            }
58            Err(TryRecvError::Closed) => {
59                flag.store(true, Ordering::SeqCst);
60            }
61            Err(TryRecvError::Empty) => {}
62        }
63    }
64    flag.load(Ordering::SeqCst)
65}
66
67fn is_shutdown_error(err: &FirehoseError) -> bool {
68    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
69        inner
70            .downcast_ref::<io::Error>()
71            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
72            .unwrap_or(false)
73    }
74
75    match err {
76        FirehoseError::BlockHandlerError(inner)
77        | FirehoseError::TransactionHandlerError(inner)
78        | FirehoseError::EntryHandlerError(inner)
79        | FirehoseError::RewardHandlerError(inner)
80        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
81        _ => false,
82    }
83}
84
85/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
86/// the firehose.
87#[derive(Debug, Error)]
88pub enum FirehoseError {
89    /// HTTP client error surfaced from `reqwest`.
90    Reqwest(reqwest::Error),
91    /// Failure while reading the Old Faithful CAR header.
92    ReadHeader(Box<dyn std::error::Error>),
93    /// Error emitted by the Solana Geyser plugin service.
94    GeyserPluginService(GeyserPluginServiceError),
95    /// Transaction notifier could not be acquired from the Geyser service.
96    FailedToGetTransactionNotifier,
97    /// Failure while reading data until the next block boundary.
98    ReadUntilBlockError(Box<dyn std::error::Error>),
99    /// Failure while fetching an individual block.
100    GetBlockError(Box<dyn std::error::Error>),
101    /// Failed to decode a node at the given index.
102    NodeDecodingError(usize, Box<dyn std::error::Error>),
103    /// Error surfaced when querying the slot offset index.
104    SlotOffsetIndexError(SlotOffsetIndexError),
105    /// Failure while seeking to a slot within the Old Faithful CAR stream.
106    SeekToSlotError(Box<dyn std::error::Error>),
107    /// Error surfaced during the plugin `on_load` stage.
108    OnLoadError(Box<dyn std::error::Error>),
109    /// Error emitted while invoking the stats handler.
110    OnStatsHandlerError(Box<dyn std::error::Error>),
111    /// Timeout reached while waiting for a firehose operation.
112    OperationTimeout(&'static str),
113    /// Transaction handler returned an error.
114    TransactionHandlerError(Box<dyn std::error::Error>),
115    /// Entry handler returned an error.
116    EntryHandlerError(Box<dyn std::error::Error>),
117    /// Reward handler returned an error.
118    RewardHandlerError(Box<dyn std::error::Error>),
119    /// Block handler returned an error.
120    BlockHandlerError(Box<dyn std::error::Error>),
121}
122
123impl Display for FirehoseError {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        match self {
126            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
127            FirehoseError::ReadHeader(error) => {
128                write!(f, "Error reading header: {}", error)
129            }
130            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
131                f,
132                "Error initializing geyser plugin service: {}",
133                geyser_plugin_service_error
134            ),
135            FirehoseError::FailedToGetTransactionNotifier => write!(
136                f,
137                "Failed to get transaction notifier from GeyserPluginService"
138            ),
139            FirehoseError::ReadUntilBlockError(error) => {
140                write!(f, "Error reading until block: {}", error)
141            }
142            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
143            FirehoseError::NodeDecodingError(item_index, error) => {
144                write!(
145                    f,
146                    "Error seeking, reading data from, or decoding data for data node {}: {}",
147                    item_index, error
148                )
149            }
150            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
151                f,
152                "Error getting info from slot offset index: {}",
153                slot_offset_index_error
154            ),
155            FirehoseError::SeekToSlotError(error) => {
156                write!(f, "Error seeking to slot: {}", error)
157            }
158            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
159            FirehoseError::OnStatsHandlerError(error) => {
160                write!(f, "Stats handler error: {}", error)
161            }
162            FirehoseError::OperationTimeout(op) => {
163                write!(f, "Timeout while waiting for operation: {}", op)
164            }
165            FirehoseError::TransactionHandlerError(error) => {
166                write!(f, "Transaction handler error: {}", error)
167            }
168            FirehoseError::EntryHandlerError(error) => {
169                write!(f, "Entry handler error: {}", error)
170            }
171            FirehoseError::RewardHandlerError(error) => {
172                write!(f, "Reward handler error: {}", error)
173            }
174            FirehoseError::BlockHandlerError(error) => {
175                write!(f, "Block handler error: {}", error)
176            }
177        }
178    }
179}
180
181impl From<reqwest::Error> for FirehoseError {
182    fn from(e: reqwest::Error) -> Self {
183        FirehoseError::Reqwest(e)
184    }
185}
186
187impl From<GeyserPluginServiceError> for FirehoseError {
188    fn from(e: GeyserPluginServiceError) -> Self {
189        FirehoseError::GeyserPluginService(e)
190    }
191}
192
193impl From<SlotOffsetIndexError> for FirehoseError {
194    fn from(e: SlotOffsetIndexError) -> Self {
195        FirehoseError::SlotOffsetIndexError(e)
196    }
197}
198
199/// Per-thread progress information emitted by the firehose runner.
200#[derive(Clone, PartialEq, Eq, Hash, Debug)]
201pub struct ThreadStats {
202    /// Identifier of the worker thread reporting the stats.
203    pub thread_id: usize,
204    /// Timestamp captured when the thread began processing.
205    pub start_time: std::time::Instant,
206    /// Timestamp captured when the thread finished, if finished.
207    pub finish_time: Option<std::time::Instant>,
208    /// Inclusive slot range assigned to the thread.
209    pub slot_range: Range<u64>,
210    /// Latest slot processed by the thread.
211    pub current_slot: u64,
212    /// Total slots processed by the thread.
213    pub slots_processed: u64,
214    /// Number of blocks successfully processed.
215    pub blocks_processed: u64,
216    /// Number of slots skipped by the cluster leader.
217    pub leader_skipped_slots: u64,
218    /// Total transactions processed.
219    pub transactions_processed: u64,
220    /// Total entries processed.
221    pub entries_processed: u64,
222    /// Total rewards processed.
223    pub rewards_processed: u64,
224}
225
226/// Aggregated firehose statistics covering all worker threads.
227#[derive(Clone, PartialEq, Eq, Hash, Debug)]
228pub struct Stats {
229    /// Per-thread statistics for the current update.
230    pub thread_stats: ThreadStats,
231    /// Timestamp captured when processing began.
232    pub start_time: std::time::Instant,
233    /// Timestamp captured when all processing finished, if finished.
234    pub finish_time: Option<std::time::Instant>,
235    /// Slot range currently being processed.
236    pub slot_range: Range<u64>,
237    /// Aggregate slots processed across all threads.
238    pub slots_processed: u64,
239    /// Aggregate blocks processed across all threads.
240    pub blocks_processed: u64,
241    /// Aggregate skipped slots across all threads.
242    pub leader_skipped_slots: u64,
243    /// Aggregate transactions processed across all threads.
244    pub transactions_processed: u64,
245    /// Aggregate entries processed across all threads.
246    pub entries_processed: u64,
247    /// Aggregate rewards processed across all threads.
248    pub rewards_processed: u64,
249    /// Transactions processed since the previous stats pulse.
250    pub transactions_since_last_pulse: u64,
251    /// Blocks processed since the previous stats pulse.
252    pub blocks_since_last_pulse: u64,
253    /// Slots processed since the previous stats pulse.
254    pub slots_since_last_pulse: u64,
255    /// Elapsed time since the previous stats pulse.
256    pub time_since_last_pulse: std::time::Duration,
257}
258
259/// Configuration for periodic stats emission via a [`Handler`] callback.
260#[derive(Clone, PartialEq, Eq, Hash, Debug)]
261pub struct StatsTracking<OnStats: Handler<Stats>> {
262    /// Callback invoked whenever new stats are available.
263    pub on_stats: OnStats,
264    /// Minimum number of slots processed before triggering the callback.
265    pub tracking_interval_slots: u64,
266}
267
268#[inline(always)]
269#[allow(clippy::too_many_arguments)]
270async fn maybe_emit_stats<OnStats: Handler<Stats>>(
271    stats_tracking: Option<&StatsTracking<OnStats>>,
272    thread_index: usize,
273    thread_stats: &ThreadStats,
274    overall_slots_processed: &AtomicU64,
275    overall_blocks_processed: &AtomicU64,
276    overall_transactions_processed: &AtomicU64,
277    overall_entries_processed: &AtomicU64,
278    transactions_since_stats: &AtomicU64,
279    blocks_since_stats: &AtomicU64,
280    slots_since_stats: &AtomicU64,
281    last_pulse: &Arc<AtomicU64>,
282    base_instant: std::time::Instant,
283) -> Result<(), (FirehoseError, u64)> {
284    if let Some(stats_tracker) = stats_tracking {
285        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
286        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
287        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
288        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
289        let now_nanos = base_instant.elapsed().as_nanos() as u64;
290        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
291        let delta_nanos = now_nanos.saturating_sub(previous);
292        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
293        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
294        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
295        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
296
297        let stats = Stats {
298            thread_stats: thread_stats.clone(),
299            start_time: thread_stats.start_time,
300            finish_time: thread_stats.finish_time,
301            slot_range: thread_stats.slot_range.clone(),
302            slots_processed: total_slots,
303            blocks_processed: total_blocks,
304            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
305            transactions_processed: total_transactions,
306            entries_processed: total_entries,
307            rewards_processed: thread_stats.rewards_processed,
308            transactions_since_last_pulse: processed_transactions,
309            blocks_since_last_pulse: processed_blocks,
310            slots_since_last_pulse: processed_slots,
311            time_since_last_pulse,
312        };
313
314        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
315            last_pulse.store(previous, Ordering::Relaxed);
316            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
317            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
318            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
319            return Err((
320                FirehoseError::OnStatsHandlerError(e),
321                thread_stats.current_slot,
322            ));
323        }
324    }
325    Ok(())
326}
327
328#[inline(always)]
329fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
330    if tracking_enabled {
331        atomic.fetch_add(value, Ordering::Relaxed);
332    }
333}
334
335/// Firehose transaction payload passed to [`Handler`] callbacks.
336#[derive(Debug, Clone)]
337pub struct TransactionData {
338    /// Slot that contains the transaction.
339    pub slot: u64,
340    /// Index of the transaction within the slot.
341    pub transaction_slot_index: usize,
342    /// Transaction signature.
343    pub signature: solana_sdk::signature::Signature,
344    /// Hash of the transaction message.
345    pub message_hash: Hash,
346    /// Indicates whether the transaction is a vote.
347    pub is_vote: bool,
348    /// Status metadata returned by the Solana runtime.
349    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
350    /// Fully decoded transaction.
351    pub transaction: VersionedTransaction,
352}
353
354/// Block entry metadata passed to [`Handler`] callbacks.
355#[derive(Debug, Clone)]
356pub struct EntryData {
357    /// Slot that generated the entry.
358    pub slot: u64,
359    /// Index of the entry within the slot.
360    pub entry_index: usize,
361    /// Range of transaction indexes covered by the entry.
362    pub transaction_indexes: Range<usize>,
363    /// Number of hashes associated with the entry.
364    pub num_hashes: u64,
365    /// Entry hash.
366    pub hash: Hash,
367}
368
369/// Reward data conveyed to reward [`Handler`] callbacks.
370#[derive(Debug, Clone)]
371pub struct RewardsData {
372    /// Slot the rewards correspond to.
373    pub slot: u64,
374    /// Reward recipients and their associated reward information.
375    pub rewards: Vec<(Pubkey, RewardInfo)>,
376}
377
378/// Block-level data streamed to block handlers.
379#[derive(Debug)]
380pub enum BlockData {
381    /// Fully populated block payload with ledger metadata.
382    Block {
383        /// Parent slot number.
384        parent_slot: u64,
385        /// Parent block hash.
386        parent_blockhash: Hash,
387        /// Current block slot.
388        slot: u64,
389        /// Current block hash.
390        blockhash: Hash,
391        /// Rewards keyed by account and partition information.
392        rewards: KeyedRewardsAndNumPartitions,
393        /// Optional Unix timestamp for the block.
394        block_time: Option<i64>,
395        /// Optional ledger block height.
396        block_height: Option<u64>,
397        /// Number of executed transactions in the block.
398        executed_transaction_count: u64,
399        /// Number of entries contained in the block.
400        entry_count: u64,
401    },
402    /// Marker indicating the slot was skipped by the leader.
403    LeaderSkipped {
404        /// Skipped slot number.
405        slot: u64,
406    },
407}
408
409impl BlockData {
410    /// Returns the slot associated with this block or skipped slot.
411    #[inline(always)]
412    pub const fn slot(&self) -> u64 {
413        match self {
414            BlockData::Block { slot, .. } => *slot,
415            BlockData::LeaderSkipped { slot } => *slot,
416        }
417    }
418
419    /// Returns `true` if the slot was skipped by the leader.
420    #[inline(always)]
421    pub const fn was_skipped(&self) -> bool {
422        matches!(self, BlockData::LeaderSkipped { .. })
423    }
424}
425
426type HandlerResult = Result<(), Box<dyn std::error::Error + Send + 'static>>;
427type HandlerFuture = BoxFuture<'static, HandlerResult>;
428
429/// Asynchronous callback invoked for each firehose event of type `Data`.
430pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
431
432impl<Data, F> Handler<Data> for F where
433    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
434{
435}
436
437/// Function pointer alias for [`Handler`] callbacks.
438pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
439/// Convenience alias for block handlers accepted by [`firehose`].
440pub type OnBlockFn = HandlerFn<BlockData>;
441/// Convenience alias for transaction handlers accepted by [`firehose`].
442pub type OnTxFn = HandlerFn<TransactionData>;
443/// Convenience alias for entry handlers accepted by [`firehose`].
444pub type OnEntryFn = HandlerFn<EntryData>;
445/// Convenience alias for reward handlers accepted by [`firehose`].
446pub type OnRewardFn = HandlerFn<RewardsData>;
447/// Type alias for [`StatsTracking`] using simple function pointers.
448pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
449
450/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
451#[inline]
452#[allow(clippy::too_many_arguments)]
453pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats>(
454    threads: u64,
455    slot_range: Range<u64>,
456    on_block: Option<OnBlock>,
457    on_tx: Option<OnTransaction>,
458    on_entry: Option<OnEntry>,
459    on_rewards: Option<OnRewards>,
460    stats_tracking: Option<StatsTracking<OnStats>>,
461    shutdown_signal: Option<broadcast::Receiver<()>>,
462) -> Result<(), (FirehoseError, u64)>
463where
464    OnBlock: Handler<BlockData>,
465    OnTransaction: Handler<TransactionData>,
466    OnEntry: Handler<EntryData>,
467    OnRewards: Handler<RewardsData>,
468    OnStats: Handler<Stats>,
469{
470    if threads == 0 {
471        return Err((
472            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
473            slot_range.start,
474        ));
475    }
476    let client = Client::new();
477    log::info!(target: LOG_MODULE, "starting firehose...");
478    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
479
480    let slot_range = Arc::new(slot_range);
481
482    // divide slot_range into n subranges
483    let subranges = generate_subranges(&slot_range, threads);
484    if threads > 1 {
485        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
486    }
487
488    let firehose_start = std::time::Instant::now();
489    let shutdown_flag = Arc::new(AtomicBool::new(false));
490    if let Some(ref rx) = shutdown_signal {
491        let mut rx = rx.resubscribe();
492        let flag = shutdown_flag.clone();
493        tokio::spawn(async move {
494            if rx.recv().await.is_ok() {
495                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
496                flag.store(true, Ordering::SeqCst);
497            }
498        });
499    }
500    let mut handles = Vec::new();
501    // Shared per-thread error counters
502    let error_counts: Arc<Vec<AtomicU32>> =
503        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
504
505    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
506    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
507    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
508    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
509
510    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
511        let error_counts = error_counts.clone();
512        let client = client.clone();
513        let on_block = on_block.clone();
514        let on_tx = on_tx.clone();
515        let on_entry = on_entry.clone();
516        let on_reward = on_rewards.clone();
517        let overall_slots_processed = overall_slots_processed.clone();
518        let overall_blocks_processed = overall_blocks_processed.clone();
519        let overall_transactions_processed = overall_transactions_processed.clone();
520        let overall_entries_processed = overall_entries_processed.clone();
521        let stats_tracking = stats_tracking.clone();
522        let transactions_since_stats = Arc::new(AtomicU64::new(0));
523        let blocks_since_stats = Arc::new(AtomicU64::new(0));
524        let slots_since_stats = Arc::new(AtomicU64::new(0));
525        let last_pulse = Arc::new(AtomicU64::new(0));
526        let transactions_since_stats_cloned = transactions_since_stats.clone();
527        let blocks_since_stats_cloned = blocks_since_stats.clone();
528        let slots_since_stats_cloned = slots_since_stats.clone();
529        let last_pulse_cloned = last_pulse.clone();
530        let shutdown_flag = shutdown_flag.clone();
531        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
532
533        let handle = tokio::spawn(async move {
534            let transactions_since_stats = transactions_since_stats_cloned;
535            let blocks_since_stats = blocks_since_stats_cloned;
536            let slots_since_stats = slots_since_stats_cloned;
537            let last_pulse = last_pulse_cloned;
538            let mut shutdown_rx = thread_shutdown_rx;
539            let start_time = std::time::Instant::now();
540            last_pulse.store(0, Ordering::Relaxed);
541            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
542            let mut skip_until_index = None;
543            let block_enabled = on_block.is_some();
544            let tx_enabled = on_tx.is_some();
545            let entry_enabled = on_entry.is_some();
546            let reward_enabled = on_reward.is_some();
547            let tracking_enabled = stats_tracking.is_some();
548            let mut last_counted_slot = slot_range.start.saturating_sub(1);
549
550            // let mut triggered = false;
551            while let Err((err, slot)) = async {
552                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
553                    log::info!(
554                        target: &log_target,
555                        "shutdown requested; terminating firehose thread {}",
556                        thread_index
557                    );
558                    return Ok(());
559                }
560                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
561                log::info!(
562                    target: &log_target,
563                    "slot range: {} (epoch {}) ... {} (epoch {})",
564                    slot_range.start,
565                    slot_to_epoch(slot_range.start),
566                    slot_range.end,
567                    slot_to_epoch(slot_range.end)
568                );
569
570                log::info!(target: &log_target, "🚒 starting firehose...");
571
572                // for each epoch
573                let mut current_slot: Option<u64> = None;
574                let mut previous_slot: Option<u64> = Some(slot_range.start.saturating_sub(1));
575                for epoch_num in epoch_range.clone() {
576                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
577                        log::info!(
578                            target: &log_target,
579                            "shutdown requested; terminating firehose thread {}",
580                            thread_index
581                        );
582                        return Ok(());
583                    }
584                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
585                    let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
586                        Ok(stream) => stream,
587                        Err(_) => {
588                            return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
589                        }
590                    };
591                    let mut reader = NodeReader::new(stream);
592
593                    let header_fut = reader.read_raw_header();
594                    let header = match timeout(OP_TIMEOUT, header_fut).await {
595                        Ok(res) => res
596                            .map_err(FirehoseError::ReadHeader)
597                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
598                        Err(_) => {
599                            return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
600                        }
601                    };
602                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
603
604                    let mut previous_blockhash = Hash::default();
605                    let mut latest_entry_blockhash = Hash::default();
606
607                    let mut thread_stats = if tracking_enabled {
608                        Some(ThreadStats {
609                            thread_id: thread_index,
610                            start_time,
611                            finish_time: None,
612                            slot_range: slot_range.clone(),
613                            current_slot: slot_range.start,
614                            slots_processed: 0,
615                            blocks_processed: 0,
616                            leader_skipped_slots: 0,
617                            transactions_processed: 0,
618                            entries_processed: 0,
619                            rewards_processed: 0,
620                        })
621                    } else {
622                        None
623                    };
624
625                    if slot_range.start > epoch_to_slot_range(epoch_num).0 {
626                        let seek_fut = reader.seek_to_slot(slot_range.start);
627                        match timeout(OP_TIMEOUT, seek_fut).await {
628                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
629                            Err(_) => {
630                                return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
631                            }
632                        }
633                    }
634
635                    // for each item in each block
636                    let mut item_index = 0;
637                    let mut displayed_skip_message = false;
638                    loop {
639                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
640                            log::info!(
641                                target: &log_target,
642                                "shutdown requested; terminating firehose thread {}",
643                                thread_index
644                            );
645                            return Ok(());
646                        }
647                        let read_fut = reader.read_until_block();
648                        let nodes = match timeout(OP_TIMEOUT, read_fut).await {
649                            Ok(result) => result
650                                .map_err(FirehoseError::ReadUntilBlockError)
651                                .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
652                            Err(_) => {
653                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
654                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
655                            }
656                        };
657                        if nodes.is_empty() {
658                            log::info!(
659                                target: &log_target,
660                                "reached end of epoch {}",
661                                epoch_num
662                            );
663                            break;
664                        }
665                        if let Some(last_node) = nodes.0.last()
666                            && !last_node.get_node().is_block()
667                        {
668                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
669                            break;
670                        }
671                        let block = nodes
672                            .get_block()
673                            .map_err(FirehoseError::GetBlockError)
674                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
675                        log::debug!(
676                            target: &log_target,
677                            "read {} items from epoch {}, now at slot {}",
678                            item_index,
679                            epoch_num,
680                            block.slot
681                        );
682                        let slot = block.slot;
683                        if slot >= slot_range.end {
684                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
685                            // Return early to terminate the firehose thread cleanly. We use >=
686                            // because slot_range is half-open [start, end), so any slot equal
687                            // to end is out-of-range and must not be processed.
688
689                            let target_last_slot = slot_range.end.saturating_sub(1);
690                            let skip_start_from_previous = previous_slot
691                                .map(|s| s.saturating_add(1))
692                                .unwrap_or(slot_range.start);
693                            let first_untracked_slot = last_counted_slot.saturating_add(1);
694                            let skip_start = std::cmp::max(skip_start_from_previous, first_untracked_slot);
695
696                            if skip_start <= target_last_slot {
697                                if block_enabled
698                                    && let Some(on_block_cb) = on_block.as_ref() {
699                                        for skipped_slot in skip_start..=target_last_slot {
700                                            log::debug!(
701                                                target: &log_target,
702                                                "leader skipped slot {} (prev_counted {}, target {})",
703                                                skipped_slot,
704                                                last_counted_slot,
705                                                target_last_slot,
706                                            );
707                                            on_block_cb(
708                                                thread_index,
709                                                BlockData::LeaderSkipped { slot: skipped_slot },
710                                            )
711                                            .await
712                                            .map_err(|e| FirehoseError::BlockHandlerError(e))
713                                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
714                                        }
715                                    }
716
717                                let missing_slots = target_last_slot.saturating_sub(skip_start) + 1;
718                                if tracking_enabled {
719                                    if let Some(ref mut stats) = thread_stats {
720                                        stats.leader_skipped_slots += missing_slots;
721                                        stats.slots_processed += missing_slots;
722                                        stats.current_slot = target_last_slot;
723                                    }
724                                    overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
725                                    slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
726                                }
727                                last_counted_slot = target_last_slot;
728                            }
729
730                            return Ok(());
731                        }
732                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
733                        if slot < slot_range.start {
734                            log::warn!(
735                                target: &log_target,
736                                "encountered slot {} before start of range {}, skipping",
737                                slot,
738                                slot_range.start
739                            );
740                            continue;
741                        }
742                        if current_slot.is_some() {
743                            previous_slot = current_slot;
744                        }
745                        current_slot = Some(slot);
746                        let mut entry_index: usize = 0;
747                        let mut this_block_executed_transaction_count: u64 = 0;
748                        let mut this_block_entry_count: u64 = 0;
749                        let mut this_block_rewards: Vec<(Pubkey, RewardInfo)> = Vec::new();
750
751                        for node_with_cid in &nodes.0 {
752                            item_index += 1;
753                            if let Some(skip) = skip_until_index {
754                                if item_index < skip {
755                                    if !displayed_skip_message {
756                                        log::info!(
757                                            target: &log_target,
758                                            "skipping until index {} (at {})",
759                                            skip,
760                                            item_index
761                                        );
762                                        displayed_skip_message = true;
763                                    }
764                                    continue;
765                                } else {
766                                    log::info!(
767                                        target: &log_target,
768                                        "reached target index {}, resuming...",
769                                        skip
770                                    );
771                                    skip_until_index = None;
772                                }
773                            }
774                            let node = node_with_cid.get_node();
775
776                            if let Some(ref mut stats) = thread_stats {
777                                stats.current_slot = slot;
778                            }
779
780                            let error_slot = current_slot.unwrap_or(slot_range.start);
781
782                            use crate::node::Node::*;
783                            match node {
784                                Transaction(tx) => {
785                                    if tx_enabled
786                                        && let Some(on_tx_cb) = on_tx.as_ref()
787                                    {
788                                        let error_slot = current_slot.unwrap_or(slot_range.start);
789                                        let versioned_tx = tx.as_parsed().map_err(|err| {
790                                            (
791                                                FirehoseError::NodeDecodingError(item_index, err),
792                                                error_slot,
793                                            )
794                                        })?;
795                                        let reassembled_metadata = nodes
796                                            .reassemble_dataframes(tx.metadata.clone())
797                                            .map_err(|err| {
798                                                (
799                                                    FirehoseError::NodeDecodingError(item_index, err),
800                                                    error_slot,
801                                                )
802                                            })?;
803
804                                        let decompressed =
805                                            utils::decompress_zstd(reassembled_metadata.clone())
806                                                .map_err(|err| {
807                                                    (
808                                                        FirehoseError::NodeDecodingError(
809                                                            item_index,
810                                                            err,
811                                                        ),
812                                                        error_slot,
813                                                    )
814                                                })?;
815
816                                        let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
817                                            prost_011::Message::decode(decompressed.as_slice())
818                                                .map_err(|err| {
819                                                    (
820                                                        FirehoseError::NodeDecodingError(
821                                                            item_index,
822                                                            Box::new(err),
823                                                        ),
824                                                        error_slot,
825                                                    )
826                                                })?;
827
828                                        let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
829                                            metadata.try_into().map_err(|err| {
830                                                (
831                                                    FirehoseError::NodeDecodingError(
832                                                        item_index,
833                                                        Box::new(err),
834                                                    ),
835                                                    error_slot,
836                                                )
837                                            })?;
838
839                                        let message_hash = {
840                                            #[cfg(feature = "verify-transaction-signatures")]
841                                            {
842                                                versioned_tx.verify_and_hash_message().map_err(|err| {
843                                                    (
844                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
845                                                        error_slot,
846                                                    )
847                                                })?
848                                            }
849                                            #[cfg(not(feature = "verify-transaction-signatures"))]
850                                            {
851                                                versioned_tx.message.hash()
852                                            }
853                                        };
854                                        let signature = versioned_tx
855                                            .signatures
856                                            .first()
857                                            .ok_or_else(|| {
858                                                Box::new(std::io::Error::new(
859                                                    std::io::ErrorKind::InvalidData,
860                                                    "transaction missing signature",
861                                                )) as Box<dyn std::error::Error>
862                                            })
863                                            .map_err(|err| {
864                                                (
865                                                    FirehoseError::NodeDecodingError(
866                                                        item_index,
867                                                        err,
868                                                    ),
869                                                    error_slot,
870                                                )
871                                            })?;
872                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
873
874                                        on_tx_cb(
875                                            thread_index,
876                                            TransactionData {
877                                                slot: block.slot,
878                                                transaction_slot_index: tx.index.unwrap() as usize,
879                                                signature: *signature,
880                                                message_hash,
881                                                is_vote,
882                                                transaction_status_meta: as_native_metadata.clone(),
883                                                transaction: versioned_tx.clone(),
884                                            },
885                                        )
886                                        .await
887                                        .map_err(|e| {
888                                            (
889                                                FirehoseError::TransactionHandlerError(e),
890                                                error_slot,
891                                            )
892                                        })?;
893                                    }
894                                    fetch_add_if(
895                                        tracking_enabled,
896                                        &overall_transactions_processed,
897                                        1,
898                                    );
899                                    if let Some(ref mut stats) = thread_stats {
900                                        stats.transactions_processed += 1;
901                                    }
902                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
903                                }
904                                Entry(entry) => {
905                                    let entry_hash = Hash::from(entry.hash.to_bytes());
906                                    let entry_transaction_count = entry.transactions.len();
907                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
908                                    let starting_transaction_index_u64 =
909                                        this_block_executed_transaction_count;
910                                    latest_entry_blockhash = entry_hash;
911                                    this_block_executed_transaction_count += entry_transaction_count_u64;
912                                    this_block_entry_count += 1;
913
914                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
915                                        let starting_transaction_index = usize::try_from(
916                                            starting_transaction_index_u64,
917                                        )
918                                        .map_err(|err| {
919                                            (
920                                                FirehoseError::EntryHandlerError(Box::new(err)),
921                                                error_slot,
922                                            )
923                                        })?;
924                                        let transaction_indexes_end =
925                                            starting_transaction_index + entry_transaction_count;
926                                        on_entry_cb(
927                                            thread_index,
928                                            EntryData {
929                                                slot: block.slot,
930                                                entry_index,
931                                                transaction_indexes: starting_transaction_index
932                                                    ..transaction_indexes_end,
933                                                num_hashes: entry.num_hashes,
934                                                hash: entry_hash,
935                                            },
936                                        )
937                                        .await
938                                        .map_err(|e| {
939                                            (
940                                                FirehoseError::EntryHandlerError(e),
941                                                error_slot,
942                                            )
943                                        })?;
944                                    }
945                                    entry_index += 1;
946                                    fetch_add_if(
947                                        tracking_enabled,
948                                        &overall_entries_processed,
949                                        1,
950                                    );
951                                    if let Some(ref mut stats) = thread_stats {
952                                        stats.entries_processed += 1;
953                                    }
954                                }
955                                Block(block) => {
956                                    let prev_last_counted_slot = last_counted_slot;
957                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
958                                        (
959                                            stats.slots_processed,
960                                            stats.blocks_processed,
961                                            stats.leader_skipped_slots,
962                                            stats.current_slot,
963                                        )
964                                    });
965
966                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
967                                    let skip_start_from_previous = previous_slot
968                                        .map(|s| s.saturating_add(1))
969                                        .unwrap_or(next_expected_slot);
970                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
971
972                                    for skipped_slot in skip_start..slot {
973                                        log::debug!(
974                                            target: &log_target,
975                                            "leader skipped slot {} (prev_counted {}, current slot {})",
976                                            skipped_slot,
977                                            prev_last_counted_slot,
978                                            slot,
979                                        );
980                                        if block_enabled
981                                            && let Some(on_block_cb) = on_block.as_ref() {
982                                                on_block_cb(
983                                                    thread_index,
984                                                    BlockData::LeaderSkipped {
985                                                        slot: skipped_slot,
986                                                    },
987                                                )
988                                                .await
989                                                .map_err(|e| {
990                                                    (
991                                                        FirehoseError::BlockHandlerError(e),
992                                                        error_slot,
993                                                    )
994                                                })?;
995                                            }
996                                        if tracking_enabled {
997                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
998                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
999                                            if let Some(ref mut stats) = thread_stats {
1000                                                stats.leader_skipped_slots += 1;
1001                                                stats.slots_processed += 1;
1002                                                stats.current_slot = skipped_slot;
1003                                            }
1004                                        }
1005                                        last_counted_slot = skipped_slot;
1006                                    }
1007
1008                                    if slot <= last_counted_slot {
1009                                        log::debug!(
1010                                            target: &log_target,
1011                                            "duplicate block {}, already counted (last_counted={})",
1012                                            slot,
1013                                            last_counted_slot,
1014                                        );
1015                                        this_block_rewards.clear();
1016                                        continue;
1017                                    }
1018
1019                                    if block_enabled {
1020                                        if let Some(on_block_cb) = on_block.as_ref() {
1021                                            let keyed_rewards = std::mem::take(&mut this_block_rewards);
1022                                            on_block_cb(
1023                                                thread_index,
1024                                                BlockData::Block {
1025                                                    parent_slot: block.meta.parent_slot,
1026                                                    parent_blockhash: previous_blockhash,
1027                                                    slot: block.slot,
1028                                                    blockhash: latest_entry_blockhash,
1029                                                    rewards: KeyedRewardsAndNumPartitions {
1030                                                        keyed_rewards,
1031                                                        num_partitions: None,
1032                                                    },
1033                                                    block_time: Some(block.meta.blocktime as i64),
1034                                                    block_height: block.meta.block_height,
1035                                                    executed_transaction_count:
1036                                                        this_block_executed_transaction_count,
1037                                                    entry_count: this_block_entry_count,
1038                                                },
1039                                            )
1040                                            .await
1041                                            .map_err(|e| {
1042                                                (
1043                                                    FirehoseError::BlockHandlerError(e),
1044                                                    error_slot,
1045                                                )
1046                                            })?;
1047                                        }
1048                                    } else {
1049                                        this_block_rewards.clear();
1050                                    }
1051                                    previous_blockhash = latest_entry_blockhash;
1052
1053                                    if tracking_enabled {
1054                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1055                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1056                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1057                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1058                                        if let Some(ref mut stats) = thread_stats {
1059                                            stats.blocks_processed += 1;
1060                                            stats.slots_processed += 1;
1061                                            stats.current_slot = slot;
1062                                        }
1063
1064                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1065                                            (&stats_tracking, thread_stats.as_mut())
1066                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1067                                                && let Err(err) = maybe_emit_stats(
1068                                                    stats_tracking.as_ref(),
1069                                                    thread_index,
1070                                                    thread_stats_ref,
1071                                                    &overall_slots_processed,
1072                                                    &overall_blocks_processed,
1073                                                    &overall_transactions_processed,
1074                                                    &overall_entries_processed,
1075                                                    &transactions_since_stats,
1076                                                    &blocks_since_stats,
1077                                                    &slots_since_stats,
1078                                                    &last_pulse,
1079                                                    start_time,
1080                                                )
1081                                                .await
1082                                                {
1083                                                    blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1084                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1085                                                    overall_blocks_processed
1086                                                        .fetch_sub(1, Ordering::Relaxed);
1087                                                    overall_slots_processed
1088                                                        .fetch_sub(1, Ordering::Relaxed);
1089                                                    if let Some((
1090                                                        prev_slots_processed,
1091                                                        prev_blocks_processed,
1092                                                        prev_leader_skipped,
1093                                                        prev_current_slot,
1094                                                    )) = thread_stats_snapshot
1095                                                    {
1096                                                        thread_stats_ref.slots_processed =
1097                                                            prev_slots_processed;
1098                                                        thread_stats_ref.blocks_processed =
1099                                                            prev_blocks_processed;
1100                                                        thread_stats_ref.leader_skipped_slots =
1101                                                            prev_leader_skipped;
1102                                                        thread_stats_ref.current_slot =
1103                                                            prev_current_slot;
1104                                                    }
1105                                                    last_counted_slot = prev_last_counted_slot;
1106                                                    return Err(err);
1107                                                }
1108                                    }
1109
1110                                    last_counted_slot = slot;
1111                                }
1112                                Subset(_subset) => (),
1113                                Epoch(_epoch) => (),
1114                                Rewards(rewards) => {
1115                                    if reward_enabled || block_enabled {
1116                                        let reassembled = nodes
1117                                            .reassemble_dataframes(rewards.data.clone())
1118                                            .map_err(|err| {
1119                                                (
1120                                                    FirehoseError::NodeDecodingError(item_index, err),
1121                                                    current_slot.unwrap_or(slot_range.start),
1122                                                )
1123                                            })?;
1124                                        if reassembled.is_empty() {
1125                                            this_block_rewards.clear();
1126                                            if reward_enabled
1127                                                && let Some(on_reward_cb) = on_reward.as_ref()
1128                                            {
1129                                                on_reward_cb(
1130                                                    thread_index,
1131                                                    RewardsData {
1132                                                        slot: block.slot,
1133                                                        rewards: Vec::new(),
1134                                                    },
1135                                                )
1136                                                .await
1137                                                .map_err(|e| {
1138                                                    (
1139                                                        FirehoseError::RewardHandlerError(e),
1140                                                        error_slot,
1141                                                    )
1142                                                })?;
1143                                            }
1144                                            continue;
1145                                        }
1146
1147                                        let decompressed = utils::decompress_zstd(reassembled)
1148                                            .map_err(|err| {
1149                                                (
1150                                                    FirehoseError::NodeDecodingError(
1151                                                        item_index,
1152                                                        err,
1153                                                    ),
1154                                                    error_slot,
1155                                                )
1156                                            })?;
1157
1158                                        let decoded =
1159                                            prost_011::Message::decode(decompressed.as_slice())
1160                                                .map_err(|err| {
1161                                                    (
1162                                                        FirehoseError::NodeDecodingError(
1163                                                            item_index,
1164                                                            Box::new(err),
1165                                                        ),
1166                                                        error_slot,
1167                                                    )
1168                                                })?;
1169                                        let keyed_rewards = convert_proto_rewards(&decoded)
1170                                            .map_err(|err| {
1171                                                (
1172                                                    FirehoseError::NodeDecodingError(item_index, err),
1173                                                    error_slot,
1174                                                )
1175                                            })?;
1176                                        if reward_enabled
1177                                            && let Some(on_reward_cb) = on_reward.as_ref()
1178                                        {
1179                                            on_reward_cb(
1180                                                thread_index,
1181                                                RewardsData {
1182                                                    slot: block.slot,
1183                                                    rewards: keyed_rewards.clone(),
1184                                                },
1185                                            )
1186                                            .await
1187                                            .map_err(|e| {
1188                                                (
1189                                                    FirehoseError::RewardHandlerError(e),
1190                                                    error_slot,
1191                                                )
1192                                            })?;
1193                                        }
1194                                        this_block_rewards = keyed_rewards;
1195                                        if let Some(ref mut stats) = thread_stats {
1196                                            stats.rewards_processed +=
1197                                                this_block_rewards.len() as u64;
1198                                        }
1199                                    }
1200                                }
1201                                DataFrame(_data_frame) => (),
1202                            }
1203                        }
1204                        if block.slot == slot_range.end - 1 {
1205                            let finish_time = std::time::Instant::now();
1206                            let elapsed = finish_time.duration_since(start_time);
1207                            log::info!(target: &log_target, "processed slot {}", block.slot);
1208                            let elapsed_pretty = human_readable_duration(elapsed);
1209                            log::info!(
1210                                target: &log_target,
1211                                "processed {} slots across {} epochs in {}.",
1212                                slot_range.end - slot_range.start,
1213                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1214                                elapsed_pretty
1215                            );
1216                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1217                            // On completion, report threads with non-zero error counts for
1218                            // visibility.
1219                            let summary: String = error_counts
1220                                .iter()
1221                                .enumerate()
1222                                .filter_map(|(i, c)| {
1223                                    let v = c.load(Ordering::Relaxed);
1224                                    if v > 0 {
1225                                        Some(format!("{:03}({})", i, v))
1226                                    } else {
1227                                        None
1228                                    }
1229                                })
1230                                .collect::<Vec<_>>()
1231                                .join(", ");
1232                            if !summary.is_empty() {
1233                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1234                            }
1235                            return Ok(());
1236                        }
1237                    }
1238                    if tracking_enabled
1239                        && let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1240                            && last_counted_slot < expected_last_slot {
1241                                let flush_start = last_counted_slot.saturating_add(1);
1242                                if block_enabled
1243                                    && let Some(on_block_cb) = on_block.as_ref() {
1244                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1245                                        for skipped_slot in flush_start..=expected_last_slot {
1246                                            log::debug!(
1247                                                target: &log_target,
1248                                                "leader skipped slot {} during final flush (prev_counted {})",
1249                                                skipped_slot,
1250                                                last_counted_slot,
1251                                            );
1252                                            on_block_cb(
1253                                                thread_index,
1254                                                BlockData::LeaderSkipped { slot: skipped_slot },
1255                                            )
1256                                            .await
1257                                            .map_err(|e| FirehoseError::BlockHandlerError(e))
1258                                            .map_err(|e| (e, error_slot))?;
1259                                        }
1260                                    }
1261                                let missing_slots = expected_last_slot.saturating_sub(last_counted_slot);
1262                                if let Some(stats_ref) = thread_stats.as_mut() {
1263                                    stats_ref.leader_skipped_slots += missing_slots;
1264                                    stats_ref.slots_processed += missing_slots;
1265                                    stats_ref.current_slot = expected_last_slot;
1266                                }
1267                                overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
1268                                slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
1269                                last_counted_slot = expected_last_slot;
1270                            }
1271                    if let Some(ref mut stats) = thread_stats {
1272                        stats.finish_time = Some(std::time::Instant::now());
1273                        maybe_emit_stats(
1274                            stats_tracking.as_ref(),
1275                            thread_index,
1276                            stats,
1277                            &overall_slots_processed,
1278                            &overall_blocks_processed,
1279                            &overall_transactions_processed,
1280                            &overall_entries_processed,
1281                            &transactions_since_stats,
1282                            &blocks_since_stats,
1283                            &slots_since_stats,
1284                            &last_pulse,
1285                            start_time,
1286                        )
1287                        .await?;
1288                    }
1289                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1290                    }
1291                    Ok(())
1292            }
1293            .await
1294            {
1295                if is_shutdown_error(&err) {
1296                    log::info!(
1297                        target: &log_target,
1298                        "shutdown requested; terminating firehose thread {}",
1299                        thread_index
1300                    );
1301                    break;
1302                }
1303                log::error!(
1304                    target: &log_target,
1305                    "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1306                    slot,
1307                    slot_to_epoch(slot)
1308                );
1309                log::error!(target: &log_target, "{}", err);
1310                let item_index = match err {
1311                    FirehoseError::NodeDecodingError(item_index, _) => item_index,
1312                    _ => 0,
1313                };
1314                // Increment this thread's error counter
1315                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1316                log::warn!(
1317                    target: &log_target,
1318                    "restarting from slot {} at index {}",
1319                    slot,
1320                    item_index,
1321                );
1322                // Update slot range to resume from the failed slot, not the original start
1323                slot_range.start = slot;
1324                skip_until_index = Some(item_index);
1325            }
1326        });
1327        handles.push(handle);
1328    }
1329
1330    // Wait for all threads to complete
1331    for handle in handles {
1332        handle.await.unwrap();
1333    }
1334    if stats_tracking.is_some() {
1335        let elapsed = firehose_start.elapsed();
1336        let elapsed_secs = elapsed.as_secs_f64();
1337        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1338        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1339        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1340        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1341        let total_errors: u64 = error_counts
1342            .iter()
1343            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1344            .sum();
1345        let overall_tps = if elapsed_secs > 0.0 {
1346            total_transactions as f64 / elapsed_secs
1347        } else {
1348            0.0
1349        };
1350        log::info!(
1351            target: LOG_MODULE,
1352            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1353            elapsed_secs,
1354            total_slots,
1355            total_blocks,
1356            total_leader_skipped,
1357            total_transactions,
1358            overall_tps,
1359            total_errors
1360        );
1361    }
1362    if shutdown_flag.load(Ordering::SeqCst) {
1363        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1364    } else {
1365        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1366    }
1367    Ok(())
1368}
1369
1370#[allow(clippy::result_large_err)]
1371/// Builds a Geyser-backed firehose and returns a slot notification stream.
1372///
1373/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1374/// rather than relying solely on remote streams.
1375pub fn firehose_geyser(
1376    rt: Arc<tokio::runtime::Runtime>,
1377    slot_range: Range<u64>,
1378    geyser_config_files: Option<&[PathBuf]>,
1379    index_base_url: &Url,
1380    client: &Client,
1381    on_load: impl Future<Output = Result<(), Box<dyn std::error::Error + Send + 'static>>>
1382    + Send
1383    + 'static,
1384    threads: u64,
1385) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1386    if threads == 0 {
1387        return Err((
1388            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1389            slot_range.start,
1390        ));
1391    }
1392    log::info!(target: LOG_MODULE, "starting firehose...");
1393    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1394    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1395    let mut entry_notifier_maybe = None;
1396    let mut block_meta_notifier_maybe = None;
1397    let mut transaction_notifier_maybe = None;
1398    if let Some(geyser_config_files) = geyser_config_files {
1399        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1400
1401        let service =
1402            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1403                confirmed_bank_receiver.clone(),
1404                true,
1405                geyser_config_files,
1406            )
1407            .map_err(|e| (e.into(), slot_range.start))?;
1408
1409        transaction_notifier_maybe = Some(
1410            service
1411                .get_transaction_notifier()
1412                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1413                .map_err(|e| (e, slot_range.start))?,
1414        );
1415
1416        entry_notifier_maybe = service.get_entry_notifier();
1417        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1418
1419        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1420    }
1421
1422    if entry_notifier_maybe.is_some() {
1423        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1424    } else {
1425        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1426    }
1427    log::info!(target: LOG_MODULE, "running on_load...");
1428    rt.spawn(on_load);
1429
1430    let slot_range = Arc::new(slot_range);
1431    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1432    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1433    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1434    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1435
1436    // divide slot_range into n subranges
1437    let subranges = generate_subranges(&slot_range, threads);
1438    if threads > 1 {
1439        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1440    }
1441
1442    let mut handles = Vec::new();
1443    // Shared per-thread error counters
1444    let error_counts: Arc<Vec<AtomicU32>> =
1445        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1446
1447    for (i, slot_range) in subranges.into_iter().enumerate() {
1448        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1449        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1450        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1451        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1452        let client = client.clone();
1453        let error_counts = error_counts.clone();
1454
1455        let rt_clone = rt.clone();
1456
1457        let handle = std::thread::spawn(move || {
1458            rt_clone.block_on(async {
1459                firehose_geyser_thread(
1460                    slot_range,
1461                    transaction_notifier_maybe,
1462                    entry_notifier_maybe,
1463                    block_meta_notifier_maybe,
1464                    confirmed_bank_sender,
1465                    &client,
1466                    if threads > 1 { Some(i) } else { None },
1467                    error_counts,
1468                )
1469                .await
1470                .unwrap();
1471            });
1472        });
1473        handles.push(handle);
1474    }
1475
1476    // Wait for all threads to complete
1477    for handle in handles {
1478        handle.join().unwrap();
1479    }
1480    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1481    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1482        block_meta_notifier.notify_block_metadata(
1483            u64::MAX,
1484            "unload",
1485            u64::MAX,
1486            "unload",
1487            &KeyedRewardsAndNumPartitions {
1488                keyed_rewards: vec![],
1489                num_partitions: None,
1490            },
1491            None,
1492            None,
1493            0,
1494            0,
1495        );
1496    }
1497    Ok(confirmed_bank_receiver)
1498}
1499
1500#[allow(clippy::too_many_arguments)]
1501#[allow(clippy::result_large_err)]
1502async fn firehose_geyser_thread(
1503    mut slot_range: Range<u64>,
1504    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1505    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1506    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1507    confirmed_bank_sender: Sender<SlotNotification>,
1508    client: &Client,
1509    thread_index: Option<usize>,
1510    error_counts: Arc<Vec<AtomicU32>>,
1511) -> Result<(), (FirehoseError, u64)> {
1512    let start_time = std::time::Instant::now();
1513    let log_target = if let Some(thread_index) = thread_index {
1514        format!("{}::T{:03}", LOG_MODULE, thread_index)
1515    } else {
1516        LOG_MODULE.to_string()
1517    };
1518    let mut skip_until_index = None;
1519    // let mut triggered = false;
1520    while let Err((err, slot)) = async {
1521            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1522            log::info!(
1523                target: &log_target,
1524                "slot range: {} (epoch {}) ... {} (epoch {})",
1525                slot_range.start,
1526                slot_to_epoch(slot_range.start),
1527                slot_range.end,
1528                slot_to_epoch(slot_range.end)
1529            );
1530
1531            log::info!(target: &log_target, "🚒 starting firehose...");
1532
1533            // for each epoch
1534            let mut current_slot: Option<u64> = None;
1535            let mut previous_slot: Option<u64> = None;
1536            for epoch_num in epoch_range.clone() {
1537                log::info!(target: &log_target, "entering epoch {}", epoch_num);
1538                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1539                    Ok(stream) => stream,
1540                    Err(_) => {
1541                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1542                    }
1543                };
1544                let mut reader = NodeReader::new(stream);
1545
1546                let header_fut = reader.read_raw_header();
1547                let header = match timeout(OP_TIMEOUT, header_fut).await {
1548                    Ok(res) => res
1549                        .map_err(FirehoseError::ReadHeader)
1550                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1551                    Err(_) => {
1552                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1553                    }
1554                };
1555                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1556
1557                let mut todo_previous_blockhash = Hash::default();
1558                let mut todo_latest_entry_blockhash = Hash::default();
1559
1560                if slot_range.start > epoch_to_slot_range(epoch_num).0 {
1561                    let seek_fut = reader.seek_to_slot(slot_range.start);
1562                    match timeout(OP_TIMEOUT, seek_fut).await {
1563                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1564                        Err(_) => {
1565                            return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1566                        }
1567                    }
1568                }
1569
1570                // for each item in each block
1571                let mut item_index = 0;
1572                let mut displayed_skip_message = false;
1573                loop {
1574                    let read_fut = reader.read_until_block();
1575                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1576                        Ok(result) => result
1577                            .map_err(FirehoseError::ReadUntilBlockError)
1578                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1579                        Err(_) => {
1580                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1581                            return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.unwrap_or(slot_range.start)));
1582                        }
1583                    };
1584                    if nodes.is_empty() {
1585                        log::info!(
1586                            target: &log_target,
1587                            "reached end of epoch {}",
1588                            epoch_num
1589                        );
1590                        break;
1591                    }
1592                    // ignore epoch and subset nodes at end of car file loop { if
1593                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
1594                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
1595                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
1596                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
1597                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
1598                    //         log::info!(target: &log_target, "reached end of epoch {}",
1599                    //             epoch_num); break; }
1600                    if let Some(last_node) = nodes.0.last()
1601                        && !last_node.get_node().is_block() {
1602                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1603                            break;
1604                        }
1605                    let block = nodes
1606                        .get_block()
1607                        .map_err(FirehoseError::GetBlockError)
1608                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1609                    log::debug!(
1610                        target: &log_target,
1611                        "read {} items from epoch {}, now at slot {}",
1612                        item_index,
1613                        epoch_num,
1614                        block.slot
1615                    );
1616                    let slot = block.slot;
1617                    if slot >= slot_range.end {
1618                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1619                        // Return early to terminate the firehose thread cleanly. We use >=
1620                        // because slot_range is half-open [start, end), so any slot equal to
1621                        // end is out-of-range and must not be processed.
1622                        return Ok(());
1623                    }
1624                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1625                    if slot < slot_range.start {
1626                        log::warn!(
1627                            target: &log_target,
1628                            "encountered slot {} before start of range {}, skipping",
1629                            slot,
1630                            slot_range.start
1631                        );
1632                        continue;
1633                    }
1634                    if let Some(previous_slot) = previous_slot
1635                        && slot != previous_slot + 1 {
1636                            // log::warn!(target: &log_target, "non-consecutive slots: {}
1637                            // followed by {}", previous_slot, slot);
1638                        }
1639                    previous_slot = current_slot;
1640                    current_slot = Some(slot);
1641                    let mut entry_index: usize = 0;
1642                    let mut this_block_executed_transaction_count: u64 = 0;
1643                    let mut this_block_entry_count: u64 = 0;
1644                    let mut this_block_rewards: Vec<(Pubkey, RewardInfo)> = Vec::new();
1645
1646                    nodes.each(|node_with_cid| -> Result<(), Box<dyn std::error::Error>> {
1647                        item_index += 1;
1648                        // if item_index == 100000 && !triggered { log::info!("simulating
1649                        //     error"); triggered = true; return
1650                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
1651                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
1652                        //         "simulated error", )), ))); }
1653                        if let Some(skip) = skip_until_index {
1654                            if item_index < skip {
1655                                if !displayed_skip_message {
1656                                    log::info!(
1657                                        target: &log_target,
1658                                        "skipping until index {} (at {})",
1659                                        skip,
1660                                        item_index
1661                                    );
1662                                    displayed_skip_message = true;
1663                                }
1664                                return Ok(());
1665                            } else {
1666                                log::info!(
1667                                    target: &log_target,
1668                                    "reached target index {}, resuming...",
1669                                    skip
1670                                );
1671                                skip_until_index = None;
1672                            }
1673                        }
1674                        let node = node_with_cid.get_node();
1675
1676                        use crate::node::Node::*;
1677                        match node {
1678                            Transaction(tx) => {
1679                                let versioned_tx = tx.as_parsed()?;
1680                                let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1681
1682                                let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1683
1684                                let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1685                                    prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1686                                        Box::new(std::io::Error::other(
1687                                            std::format!("Error decoding metadata: {:?}", err),
1688                                        ))
1689                                    })?;
1690
1691                                let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1692                                    metadata.try_into()?;
1693
1694                                let message_hash = {
1695                                    #[cfg(feature = "verify-transaction-signatures")]
1696                                    {
1697                                        versioned_tx.verify_and_hash_message()?
1698                                    }
1699                                    #[cfg(not(feature = "verify-transaction-signatures"))]
1700                                    {
1701                                        // Signature verification is optional because it is
1702                                        // extremely expensive at replay scale.
1703                                        versioned_tx.message.hash()
1704                                    }
1705                                };
1706                                let signature = versioned_tx
1707                                    .signatures
1708                                    .first()
1709                                    .ok_or_else(|| {
1710                                        Box::new(std::io::Error::new(
1711                                            std::io::ErrorKind::InvalidData,
1712                                            "transaction missing signature",
1713                                        )) as Box<dyn std::error::Error>
1714                                    })?;
1715                                let is_vote = is_simple_vote_transaction(&versioned_tx);
1716
1717                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1718                                    transaction_notifier.notify_transaction(
1719                                        block.slot,
1720                                        tx.index.unwrap() as usize,
1721                                        signature,
1722                                        &message_hash,
1723                                        is_vote,
1724                                        &as_native_metadata,
1725                                        &versioned_tx,
1726                                    );
1727                                }
1728
1729                            }
1730                            Entry(entry) => {
1731                                let entry_hash = Hash::from(entry.hash.to_bytes());
1732                                let entry_transaction_count = entry.transactions.len();
1733                                let entry_transaction_count_u64 = entry_transaction_count as u64;
1734                                let starting_transaction_index =
1735                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1736                                        Box::new(std::io::Error::other(
1737                                            "transaction index exceeds usize range",
1738                                        )) as Box<dyn std::error::Error>
1739                                    })?;
1740                                todo_latest_entry_blockhash = entry_hash;
1741                                this_block_executed_transaction_count += entry_transaction_count_u64;
1742                                this_block_entry_count += 1;
1743                                if entry_notifier_maybe.is_none() {
1744                                    return Ok(());
1745                                }
1746                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1747                                let entry_summary = solana_entry::entry::EntrySummary {
1748                                    num_hashes: entry.num_hashes,
1749                                    hash: Hash::from(entry.hash.to_bytes()),
1750                                    num_transactions: entry_transaction_count_u64,
1751                                };
1752                                entry_notifier.notify_entry(
1753                                    block.slot,
1754                                    entry_index,
1755                                    &entry_summary,
1756                                    starting_transaction_index,
1757                                );
1758                                entry_index += 1;
1759                            }
1760                            Block(block) => {
1761                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1762                                confirmed_bank_sender.send(notification).unwrap();
1763
1764                                if block_meta_notifier_maybe.is_none() {
1765                                    return Ok(());
1766                                }
1767                                let keyed_rewards = std::mem::take(&mut this_block_rewards);
1768                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1769                                block_meta_notifier.notify_block_metadata(
1770                                    block.meta.parent_slot,
1771                                    todo_previous_blockhash.to_string().as_str(),
1772                                    block.slot,
1773                                    todo_latest_entry_blockhash.to_string().as_str(),
1774                                    &KeyedRewardsAndNumPartitions {
1775                                        keyed_rewards,
1776                                        num_partitions: None,
1777                                    },
1778                                    Some(block.meta.blocktime as i64),
1779                                    block.meta.block_height,
1780                                    this_block_executed_transaction_count,
1781                                    this_block_entry_count,
1782                                );
1783                                todo_previous_blockhash = todo_latest_entry_blockhash;
1784                                std::thread::yield_now();
1785                            }
1786                            Subset(_subset) => (),
1787                            Epoch(_epoch) => (),
1788                            Rewards(rewards) => {
1789                                if !rewards.is_complete() {
1790                                    let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1791                                    let decompressed = utils::decompress_zstd(reassembled)?;
1792                                    let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1793                                        Box::new(std::io::Error::other(
1794                                            std::format!("Error decoding rewards: {:?}", err),
1795                                        ))
1796                                    })?;
1797                                    this_block_rewards = convert_proto_rewards(&decoded)?;
1798                                }
1799                            }
1800                            DataFrame(_data_frame) => (),
1801                        }
1802                        Ok(())
1803                    })
1804                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1805                    if block.slot == slot_range.end - 1 {
1806                        let finish_time = std::time::Instant::now();
1807                        let elapsed = finish_time.duration_since(start_time);
1808                        log::info!(target: &log_target, "processed slot {}", block.slot);
1809                        let elapsed_pretty = human_readable_duration(elapsed);
1810                        log::info!(
1811                            target: &log_target,
1812                            "processed {} slots across {} epochs in {}.",
1813                            slot_range.end - slot_range.start,
1814                            slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1815                            elapsed_pretty
1816                        );
1817                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1818                        // On completion, report threads with non-zero error counts for
1819                        // visibility.
1820                        let summary: String = error_counts
1821                            .iter()
1822                            .enumerate()
1823                            .filter_map(|(i, c)| {
1824                                let v = c.load(Ordering::Relaxed);
1825                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1826                            })
1827                            .collect::<Vec<_>>()
1828                            .join(", ");
1829                        if !summary.is_empty() {
1830                            log::debug!(target: &log_target, "threads with errors: {}", summary);
1831                        }
1832                        return Ok(());
1833                    }
1834                }
1835            }
1836            Ok(())
1837}
1838.await
1839{
1840        if is_shutdown_error(&err) {
1841            log::info!(
1842                target: &log_target,
1843                "shutdown requested; terminating firehose thread {:?}",
1844                thread_index
1845            );
1846            return Ok(());
1847        }
1848        log::error!(
1849            target: &log_target,
1850            "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1851            slot,
1852            slot_to_epoch(slot)
1853            );
1854            log::error!(target: &log_target, "{}", err);
1855            let item_index = match err {
1856                FirehoseError::NodeDecodingError(item_index, _) => item_index,
1857                _ => 0,
1858            };
1859            // Increment this thread's error counter
1860            let idx = thread_index.unwrap_or(0);
1861            error_counts[idx].fetch_add(1, Ordering::Relaxed);
1862            log::warn!(
1863                target: &log_target,
1864                "restarting from slot {} at index {}",
1865                slot,
1866                item_index,
1867            );
1868            // Update slot range to resume from the failed slot, not the original start
1869            slot_range.start = slot;
1870            skip_until_index = Some(item_index);
1871    }
1872    Ok(())
1873}
1874
1875#[inline]
1876fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
1877    if !(1..=2).contains(&versioned_tx.signatures.len()) {
1878        return false;
1879    }
1880
1881    if !matches!(
1882        versioned_tx.version(),
1883        solana_sdk::transaction::TransactionVersion::Legacy(_)
1884    ) {
1885        return false;
1886    }
1887
1888    let instructions = versioned_tx.message.instructions();
1889    if instructions.len() != 1 {
1890        return false;
1891    }
1892
1893    let program_index = instructions[0].program_id_index as usize;
1894    versioned_tx
1895        .message
1896        .static_account_keys()
1897        .get(program_index)
1898        .map(|program_id| program_id == &vote_program_id())
1899        .unwrap_or(false)
1900}
1901
1902#[inline(always)]
1903fn convert_proto_rewards(
1904    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
1905) -> Result<Vec<(Pubkey, RewardInfo)>, Box<dyn std::error::Error>> {
1906    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
1907    for proto_reward in proto_rewards.rewards.iter() {
1908        let reward = RewardInfo {
1909            reward_type: match proto_reward.reward_type - 1 {
1910                0 => RewardType::Fee,
1911                1 => RewardType::Rent,
1912                2 => RewardType::Staking,
1913                3 => RewardType::Voting,
1914                typ => {
1915                    return Err(Box::new(std::io::Error::other(format!(
1916                        "unsupported reward type {}",
1917                        typ
1918                    ))));
1919                }
1920            },
1921            lamports: proto_reward.lamports,
1922            post_balance: proto_reward.post_balance,
1923            commission: proto_reward.commission.parse::<u8>().ok(),
1924        };
1925        let pubkey = proto_reward
1926            .pubkey
1927            .parse::<Pubkey>()
1928            .map_err(|err| Box::new(err) as Box<dyn std::error::Error>)?;
1929        keyed_rewards.push((pubkey, reward));
1930    }
1931    Ok(keyed_rewards)
1932}
1933
1934#[inline]
1935/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
1936pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
1937    let total = slot_range.end - slot_range.start;
1938    let slots_per_thread = total / threads;
1939    let remainder = total % threads;
1940
1941    let ranges: Vec<Range<u64>> = (0..threads)
1942        .map(|i| {
1943            // Distribute remainder slots to the first `remainder` threads
1944            let extra_slot = if i < remainder { 1 } else { 0 };
1945            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
1946            let end = start + slots_per_thread + extra_slot;
1947            start..end
1948        })
1949        .collect();
1950
1951    // Verify that ranges cover all slots exactly
1952    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1953    assert_eq!(
1954        total_covered, total,
1955        "Range generation failed: {} threads should cover {} slots but only cover {}",
1956        threads, total, total_covered
1957    );
1958
1959    // Verify no gaps between ranges
1960    for i in 1..ranges.len() {
1961        assert_eq!(
1962            ranges[i - 1].end,
1963            ranges[i].start,
1964            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
1965            i - 1,
1966            ranges[i - 1].end,
1967            i,
1968            ranges[i].start
1969        );
1970    }
1971
1972    log::info!(
1973        target: LOG_MODULE,
1974        "Generated {} thread ranges covering {} slots total",
1975        threads,
1976        total_covered
1977    );
1978    ranges
1979}
1980
1981fn human_readable_duration(duration: std::time::Duration) -> String {
1982    if duration.is_zero() {
1983        return "0s".into();
1984    }
1985    let total_secs = duration.as_secs();
1986    if total_secs < 60 {
1987        let secs_f = duration.as_secs_f64();
1988        if total_secs == 0 {
1989            format!("{:.2}s", secs_f)
1990        } else if duration.subsec_millis() == 0 {
1991            format!("{}s", total_secs)
1992        } else {
1993            format!("{:.2}s", secs_f)
1994        }
1995    } else {
1996        let mut secs = total_secs;
1997        let days = secs / 86_400;
1998        secs %= 86_400;
1999        let hours = secs / 3_600;
2000        secs %= 3_600;
2001        let minutes = secs / 60;
2002        secs %= 60;
2003        if days > 0 {
2004            if hours > 0 {
2005                format!("{days}d{hours}h")
2006            } else {
2007                format!("{days}d")
2008            }
2009        } else if hours > 0 {
2010            if minutes > 0 {
2011                format!("{hours}h{minutes}m")
2012            } else {
2013                format!("{hours}h")
2014            }
2015        } else if minutes > 0 {
2016            if secs > 0 {
2017                format!("{minutes}m{secs}s")
2018            } else {
2019                format!("{minutes}m")
2020            }
2021        } else {
2022            format!("{secs}s")
2023        }
2024    }
2025}
2026
2027#[cfg(test)]
2028fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2029    Box::pin(async move {
2030        let elapsed = stats.start_time.elapsed();
2031        let elapsed_secs = elapsed.as_secs_f64();
2032        let tps = if elapsed_secs > 0.0 {
2033            stats.transactions_processed as f64 / elapsed_secs
2034        } else {
2035            0.0
2036        };
2037        log::info!(
2038            target: LOG_MODULE,
2039            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2040            stats.thread_stats.current_slot,
2041            stats.slots_processed,
2042            stats.blocks_processed,
2043            stats.transactions_processed,
2044            stats.entries_processed,
2045            stats.rewards_processed,
2046            elapsed_secs,
2047            tps
2048        );
2049        Ok(())
2050    })
2051}
2052
2053#[tokio::test(flavor = "multi_thread")]
2054async fn test_firehose_epoch_800() {
2055    use std::sync::atomic::{AtomicU64, Ordering};
2056    solana_logger::setup_with_default("info");
2057    const THREADS: usize = 4;
2058    const NUM_SLOTS_TO_COVER: u64 = 50;
2059    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2060    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2061    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2062    let stats_tracking = StatsTracking {
2063        on_stats: log_stats_handler,
2064        tracking_interval_slots: 10,
2065    };
2066
2067    firehose(
2068        THREADS.try_into().unwrap(),
2069        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2070        Some(|thread_id: usize, block: BlockData| {
2071            async move {
2072                let prev =
2073                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2074                if block.was_skipped() {
2075                    log::info!(
2076                        target: LOG_MODULE,
2077                        "leader skipped block {} on thread {}",
2078                        block.slot(),
2079                        thread_id,
2080                    );
2081                } else {
2082                    /*log::info!(
2083                        target: LOG_MODULE,
2084                        "got block {} on thread {}",
2085                        block.slot(),
2086                        thread_id,
2087                    );*/
2088                }
2089
2090                if prev > 0 {
2091                    assert_eq!(prev + 1, block.slot());
2092                }
2093                if block.was_skipped() {
2094                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2095                } else {
2096                    NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2097                }
2098                Ok(())
2099            }
2100            .boxed()
2101        }),
2102        None::<OnTxFn>,
2103        None::<OnEntryFn>,
2104        None::<OnRewardFn>,
2105        Some(stats_tracking),
2106        None,
2107    )
2108    .await
2109    .unwrap();
2110    assert_eq!(
2111        NUM_BLOCKS.load(Ordering::Relaxed) + NUM_SKIPPED_BLOCKS.load(Ordering::Relaxed),
2112        NUM_SLOTS_TO_COVER
2113    );
2114    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2115}