jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2#[cfg(test)]
3use futures_util::FutureExt;
4use futures_util::future::BoxFuture;
5use reqwest::{Client, Url};
6use solana_address::Address;
7use solana_geyser_plugin_manager::{
8    block_metadata_notifier_interface::BlockMetadataNotifier,
9    geyser_plugin_service::GeyserPluginServiceError,
10};
11use solana_hash::Hash;
12use solana_ledger::entry_notifier_interface::EntryNotifier;
13use solana_reward_info::RewardInfo;
14use solana_rpc::{
15    optimistically_confirmed_bank_tracker::SlotNotification,
16    transaction_notifier_interface::TransactionNotifier,
17};
18use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
19use solana_sdk_ids::vote::id as vote_program_id;
20use solana_transaction::versioned::VersionedTransaction;
21use std::{
22    fmt::Display,
23    future::Future,
24    io,
25    ops::Range,
26    path::PathBuf,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
30    },
31};
32use thiserror::Error;
33use tokio::{
34    sync::broadcast::{self, error::TryRecvError},
35    time::timeout,
36};
37
38use crate::{
39    LOG_MODULE,
40    epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
41    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
42    node_reader::NodeReader,
43    utils,
44};
45
46// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
47// header, seeking, reading next block). Adjust here to tune stall detection/restart
48// aggressiveness.
49const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
50
51fn poll_shutdown(
52    flag: &Arc<std::sync::atomic::AtomicBool>,
53    receiver: &mut Option<broadcast::Receiver<()>>,
54) -> bool {
55    if let Some(rx) = receiver {
56        match rx.try_recv() {
57            Ok(_) | Err(TryRecvError::Lagged(_)) => {
58                flag.store(true, Ordering::SeqCst);
59            }
60            Err(TryRecvError::Closed) => {
61                flag.store(true, Ordering::SeqCst);
62            }
63            Err(TryRecvError::Empty) => {}
64        }
65    }
66    flag.load(Ordering::SeqCst)
67}
68
69fn is_shutdown_error(err: &FirehoseError) -> bool {
70    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
71        inner
72            .downcast_ref::<io::Error>()
73            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
74            .unwrap_or(false)
75    }
76
77    match err {
78        FirehoseError::BlockHandlerError(inner)
79        | FirehoseError::TransactionHandlerError(inner)
80        | FirehoseError::EntryHandlerError(inner)
81        | FirehoseError::RewardHandlerError(inner)
82        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
83        _ => false,
84    }
85}
86
87/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
88/// the firehose.
89#[derive(Debug, Error)]
90pub enum FirehoseError {
91    /// HTTP client error surfaced from `reqwest`.
92    Reqwest(reqwest::Error),
93    /// Failure while reading the Old Faithful CAR header.
94    ReadHeader(Box<dyn std::error::Error>),
95    /// Error emitted by the Solana Geyser plugin service.
96    GeyserPluginService(GeyserPluginServiceError),
97    /// Transaction notifier could not be acquired from the Geyser service.
98    FailedToGetTransactionNotifier,
99    /// Failure while reading data until the next block boundary.
100    ReadUntilBlockError(Box<dyn std::error::Error>),
101    /// Failure while fetching an individual block.
102    GetBlockError(Box<dyn std::error::Error>),
103    /// Failed to decode a node at the given index.
104    NodeDecodingError(usize, Box<dyn std::error::Error>),
105    /// Error surfaced when querying the slot offset index.
106    SlotOffsetIndexError(SlotOffsetIndexError),
107    /// Failure while seeking to a slot within the Old Faithful CAR stream.
108    SeekToSlotError(Box<dyn std::error::Error>),
109    /// Error surfaced during the plugin `on_load` stage.
110    OnLoadError(Box<dyn std::error::Error>),
111    /// Error emitted while invoking the stats handler.
112    OnStatsHandlerError(Box<dyn std::error::Error>),
113    /// Timeout reached while waiting for a firehose operation.
114    OperationTimeout(&'static str),
115    /// Transaction handler returned an error.
116    TransactionHandlerError(Box<dyn std::error::Error>),
117    /// Entry handler returned an error.
118    EntryHandlerError(Box<dyn std::error::Error>),
119    /// Reward handler returned an error.
120    RewardHandlerError(Box<dyn std::error::Error>),
121    /// Block handler returned an error.
122    BlockHandlerError(Box<dyn std::error::Error>),
123}
124
125impl Display for FirehoseError {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
129            FirehoseError::ReadHeader(error) => {
130                write!(f, "Error reading header: {}", error)
131            }
132            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
133                f,
134                "Error initializing geyser plugin service: {}",
135                geyser_plugin_service_error
136            ),
137            FirehoseError::FailedToGetTransactionNotifier => write!(
138                f,
139                "Failed to get transaction notifier from GeyserPluginService"
140            ),
141            FirehoseError::ReadUntilBlockError(error) => {
142                write!(f, "Error reading until block: {}", error)
143            }
144            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
145            FirehoseError::NodeDecodingError(item_index, error) => {
146                write!(
147                    f,
148                    "Error seeking, reading data from, or decoding data for data node {}: {}",
149                    item_index, error
150                )
151            }
152            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
153                f,
154                "Error getting info from slot offset index: {}",
155                slot_offset_index_error
156            ),
157            FirehoseError::SeekToSlotError(error) => {
158                write!(f, "Error seeking to slot: {}", error)
159            }
160            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
161            FirehoseError::OnStatsHandlerError(error) => {
162                write!(f, "Stats handler error: {}", error)
163            }
164            FirehoseError::OperationTimeout(op) => {
165                write!(f, "Timeout while waiting for operation: {}", op)
166            }
167            FirehoseError::TransactionHandlerError(error) => {
168                write!(f, "Transaction handler error: {}", error)
169            }
170            FirehoseError::EntryHandlerError(error) => {
171                write!(f, "Entry handler error: {}", error)
172            }
173            FirehoseError::RewardHandlerError(error) => {
174                write!(f, "Reward handler error: {}", error)
175            }
176            FirehoseError::BlockHandlerError(error) => {
177                write!(f, "Block handler error: {}", error)
178            }
179        }
180    }
181}
182
183impl From<reqwest::Error> for FirehoseError {
184    fn from(e: reqwest::Error) -> Self {
185        FirehoseError::Reqwest(e)
186    }
187}
188
189impl From<GeyserPluginServiceError> for FirehoseError {
190    fn from(e: GeyserPluginServiceError) -> Self {
191        FirehoseError::GeyserPluginService(e)
192    }
193}
194
195impl From<SlotOffsetIndexError> for FirehoseError {
196    fn from(e: SlotOffsetIndexError) -> Self {
197        FirehoseError::SlotOffsetIndexError(e)
198    }
199}
200
201/// Per-thread progress information emitted by the firehose runner.
202#[derive(Clone, PartialEq, Eq, Hash, Debug)]
203pub struct ThreadStats {
204    /// Identifier of the worker thread reporting the stats.
205    pub thread_id: usize,
206    /// Timestamp captured when the thread began processing.
207    pub start_time: std::time::Instant,
208    /// Timestamp captured when the thread finished, if finished.
209    pub finish_time: Option<std::time::Instant>,
210    /// Inclusive slot range assigned to the thread.
211    pub slot_range: Range<u64>,
212    /// Latest slot processed by the thread.
213    pub current_slot: u64,
214    /// Total slots processed by the thread.
215    pub slots_processed: u64,
216    /// Number of blocks successfully processed.
217    pub blocks_processed: u64,
218    /// Number of slots skipped by the cluster leader.
219    pub leader_skipped_slots: u64,
220    /// Total transactions processed.
221    pub transactions_processed: u64,
222    /// Total entries processed.
223    pub entries_processed: u64,
224    /// Total rewards processed.
225    pub rewards_processed: u64,
226}
227
228/// Aggregated firehose statistics covering all worker threads.
229#[derive(Clone, PartialEq, Eq, Hash, Debug)]
230pub struct Stats {
231    /// Per-thread statistics for the current update.
232    pub thread_stats: ThreadStats,
233    /// Timestamp captured when processing began.
234    pub start_time: std::time::Instant,
235    /// Timestamp captured when all processing finished, if finished.
236    pub finish_time: Option<std::time::Instant>,
237    /// Slot range currently being processed.
238    pub slot_range: Range<u64>,
239    /// Aggregate slots processed across all threads.
240    pub slots_processed: u64,
241    /// Aggregate blocks processed across all threads.
242    pub blocks_processed: u64,
243    /// Aggregate skipped slots across all threads.
244    pub leader_skipped_slots: u64,
245    /// Aggregate transactions processed across all threads.
246    pub transactions_processed: u64,
247    /// Aggregate entries processed across all threads.
248    pub entries_processed: u64,
249    /// Aggregate rewards processed across all threads.
250    pub rewards_processed: u64,
251    /// Transactions processed since the previous stats pulse.
252    pub transactions_since_last_pulse: u64,
253    /// Blocks processed since the previous stats pulse.
254    pub blocks_since_last_pulse: u64,
255    /// Slots processed since the previous stats pulse.
256    pub slots_since_last_pulse: u64,
257    /// Elapsed time since the previous stats pulse.
258    pub time_since_last_pulse: std::time::Duration,
259}
260
261/// Configuration for periodic stats emission via a [`Handler`] callback.
262#[derive(Clone, PartialEq, Eq, Hash, Debug)]
263pub struct StatsTracking<OnStats: Handler<Stats>> {
264    /// Callback invoked whenever new stats are available.
265    pub on_stats: OnStats,
266    /// Minimum number of slots processed before triggering the callback.
267    pub tracking_interval_slots: u64,
268}
269
270#[inline(always)]
271#[allow(clippy::too_many_arguments)]
272async fn maybe_emit_stats<OnStats: Handler<Stats>>(
273    stats_tracking: Option<&StatsTracking<OnStats>>,
274    thread_index: usize,
275    thread_stats: &ThreadStats,
276    overall_slots_processed: &AtomicU64,
277    overall_blocks_processed: &AtomicU64,
278    overall_transactions_processed: &AtomicU64,
279    overall_entries_processed: &AtomicU64,
280    transactions_since_stats: &AtomicU64,
281    blocks_since_stats: &AtomicU64,
282    slots_since_stats: &AtomicU64,
283    last_pulse: &Arc<AtomicU64>,
284    base_instant: std::time::Instant,
285) -> Result<(), (FirehoseError, u64)> {
286    if let Some(stats_tracker) = stats_tracking {
287        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
288        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
289        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
290        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
291        let now_nanos = base_instant.elapsed().as_nanos() as u64;
292        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
293        let delta_nanos = now_nanos.saturating_sub(previous);
294        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
295        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
296        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
297        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
298
299        let stats = Stats {
300            thread_stats: thread_stats.clone(),
301            start_time: thread_stats.start_time,
302            finish_time: thread_stats.finish_time,
303            slot_range: thread_stats.slot_range.clone(),
304            slots_processed: total_slots,
305            blocks_processed: total_blocks,
306            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
307            transactions_processed: total_transactions,
308            entries_processed: total_entries,
309            rewards_processed: thread_stats.rewards_processed,
310            transactions_since_last_pulse: processed_transactions,
311            blocks_since_last_pulse: processed_blocks,
312            slots_since_last_pulse: processed_slots,
313            time_since_last_pulse,
314        };
315
316        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
317            last_pulse.store(previous, Ordering::Relaxed);
318            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
319            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
320            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
321            return Err((
322                FirehoseError::OnStatsHandlerError(e),
323                thread_stats.current_slot,
324            ));
325        }
326    }
327    Ok(())
328}
329
330#[inline(always)]
331fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
332    if tracking_enabled {
333        atomic.fetch_add(value, Ordering::Relaxed);
334    }
335}
336
337/// Firehose transaction payload passed to [`Handler`] callbacks.
338#[derive(Debug, Clone)]
339pub struct TransactionData {
340    /// Slot that contains the transaction.
341    pub slot: u64,
342    /// Index of the transaction within the slot.
343    pub transaction_slot_index: usize,
344    /// Transaction signature.
345    pub signature: solana_signature::Signature,
346    /// Hash of the transaction message.
347    pub message_hash: Hash,
348    /// Indicates whether the transaction is a vote.
349    pub is_vote: bool,
350    /// Status metadata returned by the Solana runtime.
351    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
352    /// Fully decoded transaction.
353    pub transaction: VersionedTransaction,
354}
355
356/// Block entry metadata passed to [`Handler`] callbacks.
357#[derive(Debug, Clone)]
358pub struct EntryData {
359    /// Slot that generated the entry.
360    pub slot: u64,
361    /// Index of the entry within the slot.
362    pub entry_index: usize,
363    /// Range of transaction indexes covered by the entry.
364    pub transaction_indexes: Range<usize>,
365    /// Number of hashes associated with the entry.
366    pub num_hashes: u64,
367    /// Entry hash.
368    pub hash: Hash,
369}
370
371/// Reward data conveyed to reward [`Handler`] callbacks.
372#[derive(Debug, Clone)]
373pub struct RewardsData {
374    /// Slot the rewards correspond to.
375    pub slot: u64,
376    /// Reward recipients and their associated reward information.
377    pub rewards: Vec<(Address, RewardInfo)>,
378}
379
380/// Block-level data streamed to block handlers.
381#[derive(Debug)]
382pub enum BlockData {
383    /// Fully populated block payload with ledger metadata.
384    Block {
385        /// Parent slot number.
386        parent_slot: u64,
387        /// Parent block hash.
388        parent_blockhash: Hash,
389        /// Current block slot.
390        slot: u64,
391        /// Current block hash.
392        blockhash: Hash,
393        /// Rewards keyed by account and partition information.
394        rewards: KeyedRewardsAndNumPartitions,
395        /// Optional Unix timestamp for the block.
396        block_time: Option<i64>,
397        /// Optional ledger block height.
398        block_height: Option<u64>,
399        /// Number of executed transactions in the block.
400        executed_transaction_count: u64,
401        /// Number of entries contained in the block.
402        entry_count: u64,
403    },
404    /// Marker indicating the slot was skipped by the leader.
405    LeaderSkipped {
406        /// Skipped slot number.
407        slot: u64,
408    },
409}
410
411impl BlockData {
412    /// Returns the slot associated with this block or skipped slot.
413    #[inline(always)]
414    pub const fn slot(&self) -> u64 {
415        match self {
416            BlockData::Block { slot, .. } => *slot,
417            BlockData::LeaderSkipped { slot } => *slot,
418        }
419    }
420
421    /// Returns `true` if the slot was skipped by the leader.
422    #[inline(always)]
423    pub const fn was_skipped(&self) -> bool {
424        matches!(self, BlockData::LeaderSkipped { .. })
425    }
426}
427
428type HandlerResult = Result<(), Box<dyn std::error::Error + Send + 'static>>;
429type HandlerFuture = BoxFuture<'static, HandlerResult>;
430
431/// Asynchronous callback invoked for each firehose event of type `Data`.
432pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
433
434impl<Data, F> Handler<Data> for F where
435    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
436{
437}
438
439/// Function pointer alias for [`Handler`] callbacks.
440pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
441/// Convenience alias for block handlers accepted by [`firehose`].
442pub type OnBlockFn = HandlerFn<BlockData>;
443/// Convenience alias for transaction handlers accepted by [`firehose`].
444pub type OnTxFn = HandlerFn<TransactionData>;
445/// Convenience alias for entry handlers accepted by [`firehose`].
446pub type OnEntryFn = HandlerFn<EntryData>;
447/// Convenience alias for reward handlers accepted by [`firehose`].
448pub type OnRewardFn = HandlerFn<RewardsData>;
449/// Type alias for [`StatsTracking`] using simple function pointers.
450pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
451
452/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
453#[inline]
454#[allow(clippy::too_many_arguments)]
455pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats>(
456    threads: u64,
457    slot_range: Range<u64>,
458    on_block: Option<OnBlock>,
459    on_tx: Option<OnTransaction>,
460    on_entry: Option<OnEntry>,
461    on_rewards: Option<OnRewards>,
462    stats_tracking: Option<StatsTracking<OnStats>>,
463    shutdown_signal: Option<broadcast::Receiver<()>>,
464) -> Result<(), (FirehoseError, u64)>
465where
466    OnBlock: Handler<BlockData>,
467    OnTransaction: Handler<TransactionData>,
468    OnEntry: Handler<EntryData>,
469    OnRewards: Handler<RewardsData>,
470    OnStats: Handler<Stats>,
471{
472    if threads == 0 {
473        return Err((
474            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
475            slot_range.start,
476        ));
477    }
478    let client = Client::new();
479    log::info!(target: LOG_MODULE, "starting firehose...");
480    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
481
482    let slot_range = Arc::new(slot_range);
483
484    // divide slot_range into n subranges
485    let subranges = generate_subranges(&slot_range, threads);
486    if threads > 1 {
487        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
488    }
489
490    let firehose_start = std::time::Instant::now();
491    let shutdown_flag = Arc::new(AtomicBool::new(false));
492    if let Some(ref rx) = shutdown_signal {
493        let mut rx = rx.resubscribe();
494        let flag = shutdown_flag.clone();
495        tokio::spawn(async move {
496            if rx.recv().await.is_ok() {
497                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
498                flag.store(true, Ordering::SeqCst);
499            }
500        });
501    }
502    let mut handles = Vec::new();
503    // Shared per-thread error counters
504    let error_counts: Arc<Vec<AtomicU32>> =
505        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
506
507    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
508    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
509    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
510    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
511
512    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
513        let error_counts = error_counts.clone();
514        let client = client.clone();
515        let on_block = on_block.clone();
516        let on_tx = on_tx.clone();
517        let on_entry = on_entry.clone();
518        let on_reward = on_rewards.clone();
519        let overall_slots_processed = overall_slots_processed.clone();
520        let overall_blocks_processed = overall_blocks_processed.clone();
521        let overall_transactions_processed = overall_transactions_processed.clone();
522        let overall_entries_processed = overall_entries_processed.clone();
523        let stats_tracking = stats_tracking.clone();
524        let transactions_since_stats = Arc::new(AtomicU64::new(0));
525        let blocks_since_stats = Arc::new(AtomicU64::new(0));
526        let slots_since_stats = Arc::new(AtomicU64::new(0));
527        let last_pulse = Arc::new(AtomicU64::new(0));
528        let transactions_since_stats_cloned = transactions_since_stats.clone();
529        let blocks_since_stats_cloned = blocks_since_stats.clone();
530        let slots_since_stats_cloned = slots_since_stats.clone();
531        let last_pulse_cloned = last_pulse.clone();
532        let shutdown_flag = shutdown_flag.clone();
533        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
534
535        let handle = tokio::spawn(async move {
536            let transactions_since_stats = transactions_since_stats_cloned;
537            let blocks_since_stats = blocks_since_stats_cloned;
538            let slots_since_stats = slots_since_stats_cloned;
539            let last_pulse = last_pulse_cloned;
540            let mut shutdown_rx = thread_shutdown_rx;
541            let start_time = std::time::Instant::now();
542            last_pulse.store(0, Ordering::Relaxed);
543            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
544            let mut skip_until_index = None;
545            let block_enabled = on_block.is_some();
546            let tx_enabled = on_tx.is_some();
547            let entry_enabled = on_entry.is_some();
548            let reward_enabled = on_reward.is_some();
549            let tracking_enabled = stats_tracking.is_some();
550            let mut last_counted_slot = slot_range.start.saturating_sub(1);
551
552            // let mut triggered = false;
553            while let Err((err, slot)) = async {
554                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
555                    log::info!(
556                        target: &log_target,
557                        "shutdown requested; terminating firehose thread {}",
558                        thread_index
559                    );
560                    return Ok(());
561                }
562                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
563                log::info!(
564                    target: &log_target,
565                    "slot range: {} (epoch {}) ... {} (epoch {})",
566                    slot_range.start,
567                    slot_to_epoch(slot_range.start),
568                    slot_range.end,
569                    slot_to_epoch(slot_range.end)
570                );
571
572                log::info!(target: &log_target, "🚒 starting firehose...");
573
574                // for each epoch
575                let mut current_slot: Option<u64> = None;
576                let mut previous_slot: Option<u64> = Some(slot_range.start.saturating_sub(1));
577                for epoch_num in epoch_range.clone() {
578                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
579                        log::info!(
580                            target: &log_target,
581                            "shutdown requested; terminating firehose thread {}",
582                            thread_index
583                        );
584                        return Ok(());
585                    }
586                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
587                    let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
588                        Ok(stream) => stream,
589                        Err(_) => {
590                            return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
591                        }
592                    };
593                    let mut reader = NodeReader::new(stream);
594
595                    let header_fut = reader.read_raw_header();
596                    let header = match timeout(OP_TIMEOUT, header_fut).await {
597                        Ok(res) => res
598                            .map_err(FirehoseError::ReadHeader)
599                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
600                        Err(_) => {
601                            return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
602                        }
603                    };
604                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
605
606                    let mut previous_blockhash = Hash::default();
607                    let mut latest_entry_blockhash = Hash::default();
608
609                    let mut thread_stats = if tracking_enabled {
610                        Some(ThreadStats {
611                            thread_id: thread_index,
612                            start_time,
613                            finish_time: None,
614                            slot_range: slot_range.clone(),
615                            current_slot: slot_range.start,
616                            slots_processed: 0,
617                            blocks_processed: 0,
618                            leader_skipped_slots: 0,
619                            transactions_processed: 0,
620                            entries_processed: 0,
621                            rewards_processed: 0,
622                        })
623                    } else {
624                        None
625                    };
626
627                    if slot_range.start > epoch_to_slot_range(epoch_num).0 {
628                        let seek_fut = reader.seek_to_slot(slot_range.start);
629                        match timeout(OP_TIMEOUT, seek_fut).await {
630                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
631                            Err(_) => {
632                                return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
633                            }
634                        }
635                    }
636
637                    // for each item in each block
638                    let mut item_index = 0;
639                    let mut displayed_skip_message = false;
640                    loop {
641                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
642                            log::info!(
643                                target: &log_target,
644                                "shutdown requested; terminating firehose thread {}",
645                                thread_index
646                            );
647                            return Ok(());
648                        }
649                        let read_fut = reader.read_until_block();
650                        let nodes = match timeout(OP_TIMEOUT, read_fut).await {
651                            Ok(result) => result
652                                .map_err(FirehoseError::ReadUntilBlockError)
653                                .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
654                            Err(_) => {
655                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
656                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
657                            }
658                        };
659                        if nodes.is_empty() {
660                            log::info!(
661                                target: &log_target,
662                                "reached end of epoch {}",
663                                epoch_num
664                            );
665                            break;
666                        }
667                        if let Some(last_node) = nodes.0.last()
668                            && !last_node.get_node().is_block()
669                        {
670                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
671                            break;
672                        }
673                        let block = nodes
674                            .get_block()
675                            .map_err(FirehoseError::GetBlockError)
676                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
677                        log::debug!(
678                            target: &log_target,
679                            "read {} items from epoch {}, now at slot {}",
680                            item_index,
681                            epoch_num,
682                            block.slot
683                        );
684                        let slot = block.slot;
685                        if slot >= slot_range.end {
686                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
687                            // Return early to terminate the firehose thread cleanly. We use >=
688                            // because slot_range is half-open [start, end), so any slot equal
689                            // to end is out-of-range and must not be processed.
690
691                            let target_last_slot = slot_range.end.saturating_sub(1);
692                            let skip_start_from_previous = previous_slot
693                                .map(|s| s.saturating_add(1))
694                                .unwrap_or(slot_range.start);
695                            let first_untracked_slot = last_counted_slot.saturating_add(1);
696                            let skip_start = std::cmp::max(skip_start_from_previous, first_untracked_slot);
697
698                            if skip_start <= target_last_slot {
699                                if block_enabled
700                                    && let Some(on_block_cb) = on_block.as_ref() {
701                                        for skipped_slot in skip_start..=target_last_slot {
702                                            log::debug!(
703                                                target: &log_target,
704                                                "leader skipped slot {} (prev_counted {}, target {})",
705                                                skipped_slot,
706                                                last_counted_slot,
707                                                target_last_slot,
708                                            );
709                                            on_block_cb(
710                                                thread_index,
711                                                BlockData::LeaderSkipped { slot: skipped_slot },
712                                            )
713                                            .await
714                                            .map_err(|e| FirehoseError::BlockHandlerError(e))
715                                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
716                                        }
717                                    }
718
719                                let missing_slots = target_last_slot.saturating_sub(skip_start) + 1;
720                                if tracking_enabled {
721                                    if let Some(ref mut stats) = thread_stats {
722                                        stats.leader_skipped_slots += missing_slots;
723                                        stats.slots_processed += missing_slots;
724                                        stats.current_slot = target_last_slot;
725                                    }
726                                    overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
727                                    slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
728                                }
729                                last_counted_slot = target_last_slot;
730                            }
731
732                            return Ok(());
733                        }
734                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
735                        if slot < slot_range.start {
736                            log::warn!(
737                                target: &log_target,
738                                "encountered slot {} before start of range {}, skipping",
739                                slot,
740                                slot_range.start
741                            );
742                            continue;
743                        }
744                        if current_slot.is_some() {
745                            previous_slot = current_slot;
746                        }
747                        current_slot = Some(slot);
748                        let mut entry_index: usize = 0;
749                        let mut this_block_executed_transaction_count: u64 = 0;
750                        let mut this_block_entry_count: u64 = 0;
751                        let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
752
753                        for node_with_cid in &nodes.0 {
754                            item_index += 1;
755                            if let Some(skip) = skip_until_index {
756                                if item_index < skip {
757                                    if !displayed_skip_message {
758                                        log::info!(
759                                            target: &log_target,
760                                            "skipping until index {} (at {})",
761                                            skip,
762                                            item_index
763                                        );
764                                        displayed_skip_message = true;
765                                    }
766                                    continue;
767                                } else {
768                                    log::info!(
769                                        target: &log_target,
770                                        "reached target index {}, resuming...",
771                                        skip
772                                    );
773                                    skip_until_index = None;
774                                }
775                            }
776                            let node = node_with_cid.get_node();
777
778                            if let Some(ref mut stats) = thread_stats {
779                                stats.current_slot = slot;
780                            }
781
782                            let error_slot = current_slot.unwrap_or(slot_range.start);
783
784                            use crate::node::Node::*;
785                            match node {
786                                Transaction(tx) => {
787                                    if tx_enabled
788                                        && let Some(on_tx_cb) = on_tx.as_ref()
789                                    {
790                                        let error_slot = current_slot.unwrap_or(slot_range.start);
791                                        let versioned_tx = tx.as_parsed().map_err(|err| {
792                                            (
793                                                FirehoseError::NodeDecodingError(item_index, err),
794                                                error_slot,
795                                            )
796                                        })?;
797                                        let reassembled_metadata = nodes
798                                            .reassemble_dataframes(tx.metadata.clone())
799                                            .map_err(|err| {
800                                                (
801                                                    FirehoseError::NodeDecodingError(item_index, err),
802                                                    error_slot,
803                                                )
804                                            })?;
805
806                                        let decompressed =
807                                            utils::decompress_zstd(reassembled_metadata.clone())
808                                                .map_err(|err| {
809                                                    (
810                                                        FirehoseError::NodeDecodingError(
811                                                            item_index,
812                                                            err,
813                                                        ),
814                                                        error_slot,
815                                                    )
816                                                })?;
817
818                                        let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
819                                            prost_011::Message::decode(decompressed.as_slice())
820                                                .map_err(|err| {
821                                                    (
822                                                        FirehoseError::NodeDecodingError(
823                                                            item_index,
824                                                            Box::new(err),
825                                                        ),
826                                                        error_slot,
827                                                    )
828                                                })?;
829
830                                        let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
831                                            metadata.try_into().map_err(|err| {
832                                                (
833                                                    FirehoseError::NodeDecodingError(
834                                                        item_index,
835                                                        Box::new(err),
836                                                    ),
837                                                    error_slot,
838                                                )
839                                            })?;
840
841                                        let message_hash = {
842                                            #[cfg(feature = "verify-transaction-signatures")]
843                                            {
844                                                versioned_tx.verify_and_hash_message().map_err(|err| {
845                                                    (
846                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
847                                                        error_slot,
848                                                    )
849                                                })?
850                                            }
851                                            #[cfg(not(feature = "verify-transaction-signatures"))]
852                                            {
853                                                versioned_tx.message.hash()
854                                            }
855                                        };
856                                        let signature = versioned_tx
857                                            .signatures
858                                            .first()
859                                            .ok_or_else(|| {
860                                                Box::new(std::io::Error::new(
861                                                    std::io::ErrorKind::InvalidData,
862                                                    "transaction missing signature",
863                                                )) as Box<dyn std::error::Error>
864                                            })
865                                            .map_err(|err| {
866                                                (
867                                                    FirehoseError::NodeDecodingError(
868                                                        item_index,
869                                                        err,
870                                                    ),
871                                                    error_slot,
872                                                )
873                                            })?;
874                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
875
876                                        on_tx_cb(
877                                            thread_index,
878                                            TransactionData {
879                                                slot: block.slot,
880                                                transaction_slot_index: tx.index.unwrap() as usize,
881                                                signature: *signature,
882                                                message_hash,
883                                                is_vote,
884                                                transaction_status_meta: as_native_metadata.clone(),
885                                                transaction: versioned_tx.clone(),
886                                            },
887                                        )
888                                        .await
889                                        .map_err(|e| {
890                                            (
891                                                FirehoseError::TransactionHandlerError(e),
892                                                error_slot,
893                                            )
894                                        })?;
895                                    }
896                                    fetch_add_if(
897                                        tracking_enabled,
898                                        &overall_transactions_processed,
899                                        1,
900                                    );
901                                    if let Some(ref mut stats) = thread_stats {
902                                        stats.transactions_processed += 1;
903                                    }
904                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
905                                }
906                                Entry(entry) => {
907                                    let entry_hash = Hash::from(entry.hash.to_bytes());
908                                    let entry_transaction_count = entry.transactions.len();
909                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
910                                    let starting_transaction_index_u64 =
911                                        this_block_executed_transaction_count;
912                                    latest_entry_blockhash = entry_hash;
913                                    this_block_executed_transaction_count += entry_transaction_count_u64;
914                                    this_block_entry_count += 1;
915
916                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
917                                        let starting_transaction_index = usize::try_from(
918                                            starting_transaction_index_u64,
919                                        )
920                                        .map_err(|err| {
921                                            (
922                                                FirehoseError::EntryHandlerError(Box::new(err)),
923                                                error_slot,
924                                            )
925                                        })?;
926                                        let transaction_indexes_end =
927                                            starting_transaction_index + entry_transaction_count;
928                                        on_entry_cb(
929                                            thread_index,
930                                            EntryData {
931                                                slot: block.slot,
932                                                entry_index,
933                                                transaction_indexes: starting_transaction_index
934                                                    ..transaction_indexes_end,
935                                                num_hashes: entry.num_hashes,
936                                                hash: entry_hash,
937                                            },
938                                        )
939                                        .await
940                                        .map_err(|e| {
941                                            (
942                                                FirehoseError::EntryHandlerError(e),
943                                                error_slot,
944                                            )
945                                        })?;
946                                    }
947                                    entry_index += 1;
948                                    fetch_add_if(
949                                        tracking_enabled,
950                                        &overall_entries_processed,
951                                        1,
952                                    );
953                                    if let Some(ref mut stats) = thread_stats {
954                                        stats.entries_processed += 1;
955                                    }
956                                }
957                                Block(block) => {
958                                    let prev_last_counted_slot = last_counted_slot;
959                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
960                                        (
961                                            stats.slots_processed,
962                                            stats.blocks_processed,
963                                            stats.leader_skipped_slots,
964                                            stats.current_slot,
965                                        )
966                                    });
967
968                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
969                                    let skip_start_from_previous = previous_slot
970                                        .map(|s| s.saturating_add(1))
971                                        .unwrap_or(next_expected_slot);
972                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
973
974                                    for skipped_slot in skip_start..slot {
975                                        log::debug!(
976                                            target: &log_target,
977                                            "leader skipped slot {} (prev_counted {}, current slot {})",
978                                            skipped_slot,
979                                            prev_last_counted_slot,
980                                            slot,
981                                        );
982                                        if block_enabled
983                                            && let Some(on_block_cb) = on_block.as_ref() {
984                                                on_block_cb(
985                                                    thread_index,
986                                                    BlockData::LeaderSkipped {
987                                                        slot: skipped_slot,
988                                                    },
989                                                )
990                                                .await
991                                                .map_err(|e| {
992                                                    (
993                                                        FirehoseError::BlockHandlerError(e),
994                                                        error_slot,
995                                                    )
996                                                })?;
997                                            }
998                                        if tracking_enabled {
999                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1000                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1001                                            if let Some(ref mut stats) = thread_stats {
1002                                                stats.leader_skipped_slots += 1;
1003                                                stats.slots_processed += 1;
1004                                                stats.current_slot = skipped_slot;
1005                                            }
1006                                        }
1007                                        last_counted_slot = skipped_slot;
1008                                    }
1009
1010                                    if slot <= last_counted_slot {
1011                                        log::debug!(
1012                                            target: &log_target,
1013                                            "duplicate block {}, already counted (last_counted={})",
1014                                            slot,
1015                                            last_counted_slot,
1016                                        );
1017                                        this_block_rewards.clear();
1018                                        continue;
1019                                    }
1020
1021                                    if block_enabled {
1022                                        if let Some(on_block_cb) = on_block.as_ref() {
1023                                            let keyed_rewards = std::mem::take(&mut this_block_rewards);
1024                                            on_block_cb(
1025                                                thread_index,
1026                                                BlockData::Block {
1027                                                    parent_slot: block.meta.parent_slot,
1028                                                    parent_blockhash: previous_blockhash,
1029                                                    slot: block.slot,
1030                                                    blockhash: latest_entry_blockhash,
1031                                                    rewards: KeyedRewardsAndNumPartitions {
1032                                                        keyed_rewards,
1033                                                        num_partitions: None,
1034                                                    },
1035                                                    block_time: Some(block.meta.blocktime as i64),
1036                                                    block_height: block.meta.block_height,
1037                                                    executed_transaction_count:
1038                                                        this_block_executed_transaction_count,
1039                                                    entry_count: this_block_entry_count,
1040                                                },
1041                                            )
1042                                            .await
1043                                            .map_err(|e| {
1044                                                (
1045                                                    FirehoseError::BlockHandlerError(e),
1046                                                    error_slot,
1047                                                )
1048                                            })?;
1049                                        }
1050                                    } else {
1051                                        this_block_rewards.clear();
1052                                    }
1053                                    previous_blockhash = latest_entry_blockhash;
1054
1055                                    if tracking_enabled {
1056                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1057                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1058                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1059                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1060                                        if let Some(ref mut stats) = thread_stats {
1061                                            stats.blocks_processed += 1;
1062                                            stats.slots_processed += 1;
1063                                            stats.current_slot = slot;
1064                                        }
1065
1066                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1067                                            (&stats_tracking, thread_stats.as_mut())
1068                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1069                                                && let Err(err) = maybe_emit_stats(
1070                                                    stats_tracking.as_ref(),
1071                                                    thread_index,
1072                                                    thread_stats_ref,
1073                                                    &overall_slots_processed,
1074                                                    &overall_blocks_processed,
1075                                                    &overall_transactions_processed,
1076                                                    &overall_entries_processed,
1077                                                    &transactions_since_stats,
1078                                                    &blocks_since_stats,
1079                                                    &slots_since_stats,
1080                                                    &last_pulse,
1081                                                    start_time,
1082                                                )
1083                                                .await
1084                                                {
1085                                                    blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1086                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1087                                                    overall_blocks_processed
1088                                                        .fetch_sub(1, Ordering::Relaxed);
1089                                                    overall_slots_processed
1090                                                        .fetch_sub(1, Ordering::Relaxed);
1091                                                    if let Some((
1092                                                        prev_slots_processed,
1093                                                        prev_blocks_processed,
1094                                                        prev_leader_skipped,
1095                                                        prev_current_slot,
1096                                                    )) = thread_stats_snapshot
1097                                                    {
1098                                                        thread_stats_ref.slots_processed =
1099                                                            prev_slots_processed;
1100                                                        thread_stats_ref.blocks_processed =
1101                                                            prev_blocks_processed;
1102                                                        thread_stats_ref.leader_skipped_slots =
1103                                                            prev_leader_skipped;
1104                                                        thread_stats_ref.current_slot =
1105                                                            prev_current_slot;
1106                                                    }
1107                                                    last_counted_slot = prev_last_counted_slot;
1108                                                    return Err(err);
1109                                                }
1110                                    }
1111
1112                                    last_counted_slot = slot;
1113                                }
1114                                Subset(_subset) => (),
1115                                Epoch(_epoch) => (),
1116                                Rewards(rewards) => {
1117                                    if reward_enabled || block_enabled {
1118                                        let reassembled = nodes
1119                                            .reassemble_dataframes(rewards.data.clone())
1120                                            .map_err(|err| {
1121                                                (
1122                                                    FirehoseError::NodeDecodingError(item_index, err),
1123                                                    current_slot.unwrap_or(slot_range.start),
1124                                                )
1125                                            })?;
1126                                        if reassembled.is_empty() {
1127                                            this_block_rewards.clear();
1128                                            if reward_enabled
1129                                                && let Some(on_reward_cb) = on_reward.as_ref()
1130                                            {
1131                                                on_reward_cb(
1132                                                    thread_index,
1133                                                    RewardsData {
1134                                                        slot: block.slot,
1135                                                        rewards: Vec::new(),
1136                                                    },
1137                                                )
1138                                                .await
1139                                                .map_err(|e| {
1140                                                    (
1141                                                        FirehoseError::RewardHandlerError(e),
1142                                                        error_slot,
1143                                                    )
1144                                                })?;
1145                                            }
1146                                            continue;
1147                                        }
1148
1149                                        let decompressed = utils::decompress_zstd(reassembled)
1150                                            .map_err(|err| {
1151                                                (
1152                                                    FirehoseError::NodeDecodingError(
1153                                                        item_index,
1154                                                        err,
1155                                                    ),
1156                                                    error_slot,
1157                                                )
1158                                            })?;
1159
1160                                        let decoded =
1161                                            prost_011::Message::decode(decompressed.as_slice())
1162                                                .map_err(|err| {
1163                                                    (
1164                                                        FirehoseError::NodeDecodingError(
1165                                                            item_index,
1166                                                            Box::new(err),
1167                                                        ),
1168                                                        error_slot,
1169                                                    )
1170                                                })?;
1171                                        let keyed_rewards = convert_proto_rewards(&decoded)
1172                                            .map_err(|err| {
1173                                                (
1174                                                    FirehoseError::NodeDecodingError(item_index, err),
1175                                                    error_slot,
1176                                                )
1177                                            })?;
1178                                        if reward_enabled
1179                                            && let Some(on_reward_cb) = on_reward.as_ref()
1180                                        {
1181                                            on_reward_cb(
1182                                                thread_index,
1183                                                RewardsData {
1184                                                    slot: block.slot,
1185                                                    rewards: keyed_rewards.clone(),
1186                                                },
1187                                            )
1188                                            .await
1189                                            .map_err(|e| {
1190                                                (
1191                                                    FirehoseError::RewardHandlerError(e),
1192                                                    error_slot,
1193                                                )
1194                                            })?;
1195                                        }
1196                                        this_block_rewards = keyed_rewards;
1197                                        if let Some(ref mut stats) = thread_stats {
1198                                            stats.rewards_processed +=
1199                                                this_block_rewards.len() as u64;
1200                                        }
1201                                    }
1202                                }
1203                                DataFrame(_data_frame) => (),
1204                            }
1205                        }
1206                        if block.slot == slot_range.end - 1 {
1207                            let finish_time = std::time::Instant::now();
1208                            let elapsed = finish_time.duration_since(start_time);
1209                            log::info!(target: &log_target, "processed slot {}", block.slot);
1210                            let elapsed_pretty = human_readable_duration(elapsed);
1211                            log::info!(
1212                                target: &log_target,
1213                                "processed {} slots across {} epochs in {}.",
1214                                slot_range.end - slot_range.start,
1215                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1216                                elapsed_pretty
1217                            );
1218                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1219                            // On completion, report threads with non-zero error counts for
1220                            // visibility.
1221                            let summary: String = error_counts
1222                                .iter()
1223                                .enumerate()
1224                                .filter_map(|(i, c)| {
1225                                    let v = c.load(Ordering::Relaxed);
1226                                    if v > 0 {
1227                                        Some(format!("{:03}({})", i, v))
1228                                    } else {
1229                                        None
1230                                    }
1231                                })
1232                                .collect::<Vec<_>>()
1233                                .join(", ");
1234                            if !summary.is_empty() {
1235                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1236                            }
1237                            return Ok(());
1238                        }
1239                    }
1240                    if tracking_enabled
1241                        && let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1242                            && last_counted_slot < expected_last_slot {
1243                                let flush_start = last_counted_slot.saturating_add(1);
1244                                if block_enabled
1245                                    && let Some(on_block_cb) = on_block.as_ref() {
1246                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1247                                        for skipped_slot in flush_start..=expected_last_slot {
1248                                            log::debug!(
1249                                                target: &log_target,
1250                                                "leader skipped slot {} during final flush (prev_counted {})",
1251                                                skipped_slot,
1252                                                last_counted_slot,
1253                                            );
1254                                            on_block_cb(
1255                                                thread_index,
1256                                                BlockData::LeaderSkipped { slot: skipped_slot },
1257                                            )
1258                                            .await
1259                                            .map_err(|e| FirehoseError::BlockHandlerError(e))
1260                                            .map_err(|e| (e, error_slot))?;
1261                                        }
1262                                    }
1263                                let missing_slots = expected_last_slot.saturating_sub(last_counted_slot);
1264                                if let Some(stats_ref) = thread_stats.as_mut() {
1265                                    stats_ref.leader_skipped_slots += missing_slots;
1266                                    stats_ref.slots_processed += missing_slots;
1267                                    stats_ref.current_slot = expected_last_slot;
1268                                }
1269                                overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
1270                                slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
1271                                last_counted_slot = expected_last_slot;
1272                            }
1273                    if let Some(ref mut stats) = thread_stats {
1274                        stats.finish_time = Some(std::time::Instant::now());
1275                        maybe_emit_stats(
1276                            stats_tracking.as_ref(),
1277                            thread_index,
1278                            stats,
1279                            &overall_slots_processed,
1280                            &overall_blocks_processed,
1281                            &overall_transactions_processed,
1282                            &overall_entries_processed,
1283                            &transactions_since_stats,
1284                            &blocks_since_stats,
1285                            &slots_since_stats,
1286                            &last_pulse,
1287                            start_time,
1288                        )
1289                        .await?;
1290                    }
1291                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1292                    }
1293                    Ok(())
1294            }
1295            .await
1296            {
1297                if is_shutdown_error(&err) {
1298                    log::info!(
1299                        target: &log_target,
1300                        "shutdown requested; terminating firehose thread {}",
1301                        thread_index
1302                    );
1303                    break;
1304                }
1305                log::error!(
1306                    target: &log_target,
1307                    "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1308                    slot,
1309                    slot_to_epoch(slot)
1310                );
1311                log::error!(target: &log_target, "{}", err);
1312                let item_index = match err {
1313                    FirehoseError::NodeDecodingError(item_index, _) => item_index,
1314                    _ => 0,
1315                };
1316                // Increment this thread's error counter
1317                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1318                log::warn!(
1319                    target: &log_target,
1320                    "restarting from slot {} at index {}",
1321                    slot,
1322                    item_index,
1323                );
1324                // Update slot range to resume from the failed slot, not the original start
1325                slot_range.start = slot;
1326                skip_until_index = Some(item_index);
1327            }
1328        });
1329        handles.push(handle);
1330    }
1331
1332    // Wait for all threads to complete
1333    for handle in handles {
1334        handle.await.unwrap();
1335    }
1336    if stats_tracking.is_some() {
1337        let elapsed = firehose_start.elapsed();
1338        let elapsed_secs = elapsed.as_secs_f64();
1339        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1340        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1341        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1342        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1343        let total_errors: u64 = error_counts
1344            .iter()
1345            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1346            .sum();
1347        let overall_tps = if elapsed_secs > 0.0 {
1348            total_transactions as f64 / elapsed_secs
1349        } else {
1350            0.0
1351        };
1352        log::info!(
1353            target: LOG_MODULE,
1354            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1355            elapsed_secs,
1356            total_slots,
1357            total_blocks,
1358            total_leader_skipped,
1359            total_transactions,
1360            overall_tps,
1361            total_errors
1362        );
1363    }
1364    if shutdown_flag.load(Ordering::SeqCst) {
1365        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1366    } else {
1367        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1368    }
1369    Ok(())
1370}
1371
1372#[allow(clippy::result_large_err)]
1373/// Builds a Geyser-backed firehose and returns a slot notification stream.
1374///
1375/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1376/// rather than relying solely on remote streams.
1377pub fn firehose_geyser(
1378    rt: Arc<tokio::runtime::Runtime>,
1379    slot_range: Range<u64>,
1380    geyser_config_files: Option<&[PathBuf]>,
1381    index_base_url: &Url,
1382    client: &Client,
1383    on_load: impl Future<Output = Result<(), Box<dyn std::error::Error + Send + 'static>>>
1384    + Send
1385    + 'static,
1386    threads: u64,
1387) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1388    if threads == 0 {
1389        return Err((
1390            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1391            slot_range.start,
1392        ));
1393    }
1394    log::info!(target: LOG_MODULE, "starting firehose...");
1395    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1396    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1397    let mut entry_notifier_maybe = None;
1398    let mut block_meta_notifier_maybe = None;
1399    let mut transaction_notifier_maybe = None;
1400    if let Some(geyser_config_files) = geyser_config_files {
1401        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1402
1403        let service =
1404            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1405                confirmed_bank_receiver.clone(),
1406                true,
1407                geyser_config_files,
1408            )
1409            .map_err(|e| (e.into(), slot_range.start))?;
1410
1411        transaction_notifier_maybe = Some(
1412            service
1413                .get_transaction_notifier()
1414                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1415                .map_err(|e| (e, slot_range.start))?,
1416        );
1417
1418        entry_notifier_maybe = service.get_entry_notifier();
1419        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1420
1421        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1422    }
1423
1424    if entry_notifier_maybe.is_some() {
1425        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1426    } else {
1427        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1428    }
1429    log::info!(target: LOG_MODULE, "running on_load...");
1430    rt.spawn(on_load);
1431
1432    let slot_range = Arc::new(slot_range);
1433    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1434    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1435    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1436    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1437
1438    // divide slot_range into n subranges
1439    let subranges = generate_subranges(&slot_range, threads);
1440    if threads > 1 {
1441        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1442    }
1443
1444    let mut handles = Vec::new();
1445    // Shared per-thread error counters
1446    let error_counts: Arc<Vec<AtomicU32>> =
1447        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1448
1449    for (i, slot_range) in subranges.into_iter().enumerate() {
1450        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1451        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1452        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1453        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1454        let client = client.clone();
1455        let error_counts = error_counts.clone();
1456
1457        let rt_clone = rt.clone();
1458
1459        let handle = std::thread::spawn(move || {
1460            rt_clone.block_on(async {
1461                firehose_geyser_thread(
1462                    slot_range,
1463                    transaction_notifier_maybe,
1464                    entry_notifier_maybe,
1465                    block_meta_notifier_maybe,
1466                    confirmed_bank_sender,
1467                    &client,
1468                    if threads > 1 { Some(i) } else { None },
1469                    error_counts,
1470                )
1471                .await
1472                .unwrap();
1473            });
1474        });
1475        handles.push(handle);
1476    }
1477
1478    // Wait for all threads to complete
1479    for handle in handles {
1480        handle.join().unwrap();
1481    }
1482    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1483    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1484        block_meta_notifier.notify_block_metadata(
1485            u64::MAX,
1486            "unload",
1487            u64::MAX,
1488            "unload",
1489            &KeyedRewardsAndNumPartitions {
1490                keyed_rewards: vec![],
1491                num_partitions: None,
1492            },
1493            None,
1494            None,
1495            0,
1496            0,
1497        );
1498    }
1499    Ok(confirmed_bank_receiver)
1500}
1501
1502#[allow(clippy::too_many_arguments)]
1503#[allow(clippy::result_large_err)]
1504async fn firehose_geyser_thread(
1505    mut slot_range: Range<u64>,
1506    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1507    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1508    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1509    confirmed_bank_sender: Sender<SlotNotification>,
1510    client: &Client,
1511    thread_index: Option<usize>,
1512    error_counts: Arc<Vec<AtomicU32>>,
1513) -> Result<(), (FirehoseError, u64)> {
1514    let start_time = std::time::Instant::now();
1515    let log_target = if let Some(thread_index) = thread_index {
1516        format!("{}::T{:03}", LOG_MODULE, thread_index)
1517    } else {
1518        LOG_MODULE.to_string()
1519    };
1520    let mut skip_until_index = None;
1521    // let mut triggered = false;
1522    while let Err((err, slot)) = async {
1523            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1524            log::info!(
1525                target: &log_target,
1526                "slot range: {} (epoch {}) ... {} (epoch {})",
1527                slot_range.start,
1528                slot_to_epoch(slot_range.start),
1529                slot_range.end,
1530                slot_to_epoch(slot_range.end)
1531            );
1532
1533            log::info!(target: &log_target, "🚒 starting firehose...");
1534
1535            // for each epoch
1536            let mut current_slot: Option<u64> = None;
1537            let mut previous_slot: Option<u64> = None;
1538            for epoch_num in epoch_range.clone() {
1539                log::info!(target: &log_target, "entering epoch {}", epoch_num);
1540                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1541                    Ok(stream) => stream,
1542                    Err(_) => {
1543                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1544                    }
1545                };
1546                let mut reader = NodeReader::new(stream);
1547
1548                let header_fut = reader.read_raw_header();
1549                let header = match timeout(OP_TIMEOUT, header_fut).await {
1550                    Ok(res) => res
1551                        .map_err(FirehoseError::ReadHeader)
1552                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1553                    Err(_) => {
1554                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1555                    }
1556                };
1557                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1558
1559                let mut todo_previous_blockhash = Hash::default();
1560                let mut todo_latest_entry_blockhash = Hash::default();
1561
1562                if slot_range.start > epoch_to_slot_range(epoch_num).0 {
1563                    let seek_fut = reader.seek_to_slot(slot_range.start);
1564                    match timeout(OP_TIMEOUT, seek_fut).await {
1565                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1566                        Err(_) => {
1567                            return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1568                        }
1569                    }
1570                }
1571
1572                // for each item in each block
1573                let mut item_index = 0;
1574                let mut displayed_skip_message = false;
1575                loop {
1576                    let read_fut = reader.read_until_block();
1577                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1578                        Ok(result) => result
1579                            .map_err(FirehoseError::ReadUntilBlockError)
1580                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1581                        Err(_) => {
1582                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1583                            return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.unwrap_or(slot_range.start)));
1584                        }
1585                    };
1586                    if nodes.is_empty() {
1587                        log::info!(
1588                            target: &log_target,
1589                            "reached end of epoch {}",
1590                            epoch_num
1591                        );
1592                        break;
1593                    }
1594                    // ignore epoch and subset nodes at end of car file loop { if
1595                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
1596                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
1597                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
1598                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
1599                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
1600                    //         log::info!(target: &log_target, "reached end of epoch {}",
1601                    //             epoch_num); break; }
1602                    if let Some(last_node) = nodes.0.last()
1603                        && !last_node.get_node().is_block() {
1604                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1605                            break;
1606                        }
1607                    let block = nodes
1608                        .get_block()
1609                        .map_err(FirehoseError::GetBlockError)
1610                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1611                    log::debug!(
1612                        target: &log_target,
1613                        "read {} items from epoch {}, now at slot {}",
1614                        item_index,
1615                        epoch_num,
1616                        block.slot
1617                    );
1618                    let slot = block.slot;
1619                    if slot >= slot_range.end {
1620                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1621                        // Return early to terminate the firehose thread cleanly. We use >=
1622                        // because slot_range is half-open [start, end), so any slot equal to
1623                        // end is out-of-range and must not be processed.
1624                        return Ok(());
1625                    }
1626                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1627                    if slot < slot_range.start {
1628                        log::warn!(
1629                            target: &log_target,
1630                            "encountered slot {} before start of range {}, skipping",
1631                            slot,
1632                            slot_range.start
1633                        );
1634                        continue;
1635                    }
1636                    if let Some(previous_slot) = previous_slot
1637                        && slot != previous_slot + 1 {
1638                            // log::warn!(target: &log_target, "non-consecutive slots: {}
1639                            // followed by {}", previous_slot, slot);
1640                        }
1641                    previous_slot = current_slot;
1642                    current_slot = Some(slot);
1643                    let mut entry_index: usize = 0;
1644                    let mut this_block_executed_transaction_count: u64 = 0;
1645                    let mut this_block_entry_count: u64 = 0;
1646                    let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1647
1648                    nodes.each(|node_with_cid| -> Result<(), Box<dyn std::error::Error>> {
1649                        item_index += 1;
1650                        // if item_index == 100000 && !triggered { log::info!("simulating
1651                        //     error"); triggered = true; return
1652                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
1653                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
1654                        //         "simulated error", )), ))); }
1655                        if let Some(skip) = skip_until_index {
1656                            if item_index < skip {
1657                                if !displayed_skip_message {
1658                                    log::info!(
1659                                        target: &log_target,
1660                                        "skipping until index {} (at {})",
1661                                        skip,
1662                                        item_index
1663                                    );
1664                                    displayed_skip_message = true;
1665                                }
1666                                return Ok(());
1667                            } else {
1668                                log::info!(
1669                                    target: &log_target,
1670                                    "reached target index {}, resuming...",
1671                                    skip
1672                                );
1673                                skip_until_index = None;
1674                            }
1675                        }
1676                        let node = node_with_cid.get_node();
1677
1678                        use crate::node::Node::*;
1679                        match node {
1680                            Transaction(tx) => {
1681                                let versioned_tx = tx.as_parsed()?;
1682                                let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1683
1684                                let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1685
1686                                let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1687                                    prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1688                                        Box::new(std::io::Error::other(
1689                                            std::format!("Error decoding metadata: {:?}", err),
1690                                        ))
1691                                    })?;
1692
1693                                let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1694                                    metadata.try_into()?;
1695
1696                                let message_hash = {
1697                                    #[cfg(feature = "verify-transaction-signatures")]
1698                                    {
1699                                        versioned_tx.verify_and_hash_message()?
1700                                    }
1701                                    #[cfg(not(feature = "verify-transaction-signatures"))]
1702                                    {
1703                                        // Signature verification is optional because it is
1704                                        // extremely expensive at replay scale.
1705                                        versioned_tx.message.hash()
1706                                    }
1707                                };
1708                                let signature = versioned_tx
1709                                    .signatures
1710                                    .first()
1711                                    .ok_or_else(|| {
1712                                        Box::new(std::io::Error::new(
1713                                            std::io::ErrorKind::InvalidData,
1714                                            "transaction missing signature",
1715                                        )) as Box<dyn std::error::Error>
1716                                    })?;
1717                                let is_vote = is_simple_vote_transaction(&versioned_tx);
1718
1719                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1720                                    transaction_notifier.notify_transaction(
1721                                        block.slot,
1722                                        tx.index.unwrap() as usize,
1723                                        signature,
1724                                        &message_hash,
1725                                        is_vote,
1726                                        &as_native_metadata,
1727                                        &versioned_tx,
1728                                    );
1729                                }
1730
1731                            }
1732                            Entry(entry) => {
1733                                let entry_hash = Hash::from(entry.hash.to_bytes());
1734                                let entry_transaction_count = entry.transactions.len();
1735                                let entry_transaction_count_u64 = entry_transaction_count as u64;
1736                                let starting_transaction_index =
1737                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1738                                        Box::new(std::io::Error::other(
1739                                            "transaction index exceeds usize range",
1740                                        )) as Box<dyn std::error::Error>
1741                                    })?;
1742                                todo_latest_entry_blockhash = entry_hash;
1743                                this_block_executed_transaction_count += entry_transaction_count_u64;
1744                                this_block_entry_count += 1;
1745                                if entry_notifier_maybe.is_none() {
1746                                    return Ok(());
1747                                }
1748                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1749                                let entry_summary = solana_entry::entry::EntrySummary {
1750                                    num_hashes: entry.num_hashes,
1751                                    hash: Hash::from(entry.hash.to_bytes()),
1752                                    num_transactions: entry_transaction_count_u64,
1753                                };
1754                                entry_notifier.notify_entry(
1755                                    block.slot,
1756                                    entry_index,
1757                                    &entry_summary,
1758                                    starting_transaction_index,
1759                                );
1760                                entry_index += 1;
1761                            }
1762                            Block(block) => {
1763                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1764                                confirmed_bank_sender.send(notification).unwrap();
1765
1766                                if block_meta_notifier_maybe.is_none() {
1767                                    return Ok(());
1768                                }
1769                                let keyed_rewards = std::mem::take(&mut this_block_rewards);
1770                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1771                                block_meta_notifier.notify_block_metadata(
1772                                    block.meta.parent_slot,
1773                                    todo_previous_blockhash.to_string().as_str(),
1774                                    block.slot,
1775                                    todo_latest_entry_blockhash.to_string().as_str(),
1776                                    &KeyedRewardsAndNumPartitions {
1777                                        keyed_rewards,
1778                                        num_partitions: None,
1779                                    },
1780                                    Some(block.meta.blocktime as i64),
1781                                    block.meta.block_height,
1782                                    this_block_executed_transaction_count,
1783                                    this_block_entry_count,
1784                                );
1785                                todo_previous_blockhash = todo_latest_entry_blockhash;
1786                                std::thread::yield_now();
1787                            }
1788                            Subset(_subset) => (),
1789                            Epoch(_epoch) => (),
1790                            Rewards(rewards) => {
1791                                if !rewards.is_complete() {
1792                                    let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1793                                    let decompressed = utils::decompress_zstd(reassembled)?;
1794                                    let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1795                                        Box::new(std::io::Error::other(
1796                                            std::format!("Error decoding rewards: {:?}", err),
1797                                        ))
1798                                    })?;
1799                                    this_block_rewards = convert_proto_rewards(&decoded)?;
1800                                }
1801                            }
1802                            DataFrame(_data_frame) => (),
1803                        }
1804                        Ok(())
1805                    })
1806                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1807                    if block.slot == slot_range.end - 1 {
1808                        let finish_time = std::time::Instant::now();
1809                        let elapsed = finish_time.duration_since(start_time);
1810                        log::info!(target: &log_target, "processed slot {}", block.slot);
1811                        let elapsed_pretty = human_readable_duration(elapsed);
1812                        log::info!(
1813                            target: &log_target,
1814                            "processed {} slots across {} epochs in {}.",
1815                            slot_range.end - slot_range.start,
1816                            slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1817                            elapsed_pretty
1818                        );
1819                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1820                        // On completion, report threads with non-zero error counts for
1821                        // visibility.
1822                        let summary: String = error_counts
1823                            .iter()
1824                            .enumerate()
1825                            .filter_map(|(i, c)| {
1826                                let v = c.load(Ordering::Relaxed);
1827                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1828                            })
1829                            .collect::<Vec<_>>()
1830                            .join(", ");
1831                        if !summary.is_empty() {
1832                            log::debug!(target: &log_target, "threads with errors: {}", summary);
1833                        }
1834                        return Ok(());
1835                    }
1836                }
1837            }
1838            Ok(())
1839}
1840.await
1841{
1842        if is_shutdown_error(&err) {
1843            log::info!(
1844                target: &log_target,
1845                "shutdown requested; terminating firehose thread {:?}",
1846                thread_index
1847            );
1848            return Ok(());
1849        }
1850        log::error!(
1851            target: &log_target,
1852            "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1853            slot,
1854            slot_to_epoch(slot)
1855            );
1856            log::error!(target: &log_target, "{}", err);
1857            let item_index = match err {
1858                FirehoseError::NodeDecodingError(item_index, _) => item_index,
1859                _ => 0,
1860            };
1861            // Increment this thread's error counter
1862            let idx = thread_index.unwrap_or(0);
1863            error_counts[idx].fetch_add(1, Ordering::Relaxed);
1864            log::warn!(
1865                target: &log_target,
1866                "restarting from slot {} at index {}",
1867                slot,
1868                item_index,
1869            );
1870            // Update slot range to resume from the failed slot, not the original start
1871            slot_range.start = slot;
1872            skip_until_index = Some(item_index);
1873    }
1874    Ok(())
1875}
1876
1877#[inline]
1878fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
1879    if !(1..=2).contains(&versioned_tx.signatures.len()) {
1880        return false;
1881    }
1882
1883    if !matches!(
1884        versioned_tx.version(),
1885        solana_transaction::versioned::TransactionVersion::Legacy(_)
1886    ) {
1887        return false;
1888    }
1889
1890    let instructions = versioned_tx.message.instructions();
1891    if instructions.len() != 1 {
1892        return false;
1893    }
1894
1895    let program_index = instructions[0].program_id_index as usize;
1896    versioned_tx
1897        .message
1898        .static_account_keys()
1899        .get(program_index)
1900        .map(|program_id| program_id == &vote_program_id())
1901        .unwrap_or(false)
1902}
1903
1904#[inline(always)]
1905fn convert_proto_rewards(
1906    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
1907) -> Result<Vec<(Address, RewardInfo)>, Box<dyn std::error::Error>> {
1908    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
1909    for proto_reward in proto_rewards.rewards.iter() {
1910        let reward = RewardInfo {
1911            reward_type: match proto_reward.reward_type - 1 {
1912                0 => RewardType::Fee,
1913                1 => RewardType::Rent,
1914                2 => RewardType::Staking,
1915                3 => RewardType::Voting,
1916                typ => {
1917                    return Err(Box::new(std::io::Error::other(format!(
1918                        "unsupported reward type {}",
1919                        typ
1920                    ))));
1921                }
1922            },
1923            lamports: proto_reward.lamports,
1924            post_balance: proto_reward.post_balance,
1925            commission: proto_reward.commission.parse::<u8>().ok(),
1926        };
1927        let pubkey = proto_reward
1928            .pubkey
1929            .parse::<Address>()
1930            .map_err(|err| Box::new(err) as Box<dyn std::error::Error>)?;
1931        keyed_rewards.push((pubkey, reward));
1932    }
1933    Ok(keyed_rewards)
1934}
1935
1936#[inline]
1937/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
1938pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
1939    let total = slot_range.end - slot_range.start;
1940    let slots_per_thread = total / threads;
1941    let remainder = total % threads;
1942
1943    let ranges: Vec<Range<u64>> = (0..threads)
1944        .map(|i| {
1945            // Distribute remainder slots to the first `remainder` threads
1946            let extra_slot = if i < remainder { 1 } else { 0 };
1947            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
1948            let end = start + slots_per_thread + extra_slot;
1949            start..end
1950        })
1951        .collect();
1952
1953    // Verify that ranges cover all slots exactly
1954    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1955    assert_eq!(
1956        total_covered, total,
1957        "Range generation failed: {} threads should cover {} slots but only cover {}",
1958        threads, total, total_covered
1959    );
1960
1961    // Verify no gaps between ranges
1962    for i in 1..ranges.len() {
1963        assert_eq!(
1964            ranges[i - 1].end,
1965            ranges[i].start,
1966            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
1967            i - 1,
1968            ranges[i - 1].end,
1969            i,
1970            ranges[i].start
1971        );
1972    }
1973
1974    log::info!(
1975        target: LOG_MODULE,
1976        "Generated {} thread ranges covering {} slots total",
1977        threads,
1978        total_covered
1979    );
1980    ranges
1981}
1982
1983fn human_readable_duration(duration: std::time::Duration) -> String {
1984    if duration.is_zero() {
1985        return "0s".into();
1986    }
1987    let total_secs = duration.as_secs();
1988    if total_secs < 60 {
1989        let secs_f = duration.as_secs_f64();
1990        if total_secs == 0 {
1991            format!("{:.2}s", secs_f)
1992        } else if duration.subsec_millis() == 0 {
1993            format!("{}s", total_secs)
1994        } else {
1995            format!("{:.2}s", secs_f)
1996        }
1997    } else {
1998        let mut secs = total_secs;
1999        let days = secs / 86_400;
2000        secs %= 86_400;
2001        let hours = secs / 3_600;
2002        secs %= 3_600;
2003        let minutes = secs / 60;
2004        secs %= 60;
2005        if days > 0 {
2006            if hours > 0 {
2007                format!("{days}d{hours}h")
2008            } else {
2009                format!("{days}d")
2010            }
2011        } else if hours > 0 {
2012            if minutes > 0 {
2013                format!("{hours}h{minutes}m")
2014            } else {
2015                format!("{hours}h")
2016            }
2017        } else if minutes > 0 {
2018            if secs > 0 {
2019                format!("{minutes}m{secs}s")
2020            } else {
2021                format!("{minutes}m")
2022            }
2023        } else {
2024            format!("{secs}s")
2025        }
2026    }
2027}
2028
2029#[cfg(test)]
2030fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2031    Box::pin(async move {
2032        let elapsed = stats.start_time.elapsed();
2033        let elapsed_secs = elapsed.as_secs_f64();
2034        let tps = if elapsed_secs > 0.0 {
2035            stats.transactions_processed as f64 / elapsed_secs
2036        } else {
2037            0.0
2038        };
2039        log::info!(
2040            target: LOG_MODULE,
2041            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2042            stats.thread_stats.current_slot,
2043            stats.slots_processed,
2044            stats.blocks_processed,
2045            stats.transactions_processed,
2046            stats.entries_processed,
2047            stats.rewards_processed,
2048            elapsed_secs,
2049            tps
2050        );
2051        Ok(())
2052    })
2053}
2054
2055#[tokio::test(flavor = "multi_thread")]
2056async fn test_firehose_epoch_800() {
2057    use std::sync::atomic::{AtomicU64, Ordering};
2058    solana_logger::setup_with_default("info");
2059    const THREADS: usize = 4;
2060    const NUM_SLOTS_TO_COVER: u64 = 50;
2061    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2062    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2063    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2064    let stats_tracking = StatsTracking {
2065        on_stats: log_stats_handler,
2066        tracking_interval_slots: 10,
2067    };
2068
2069    firehose(
2070        THREADS.try_into().unwrap(),
2071        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2072        Some(|thread_id: usize, block: BlockData| {
2073            async move {
2074                let prev =
2075                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2076                if block.was_skipped() {
2077                    log::info!(
2078                        target: LOG_MODULE,
2079                        "leader skipped block {} on thread {}",
2080                        block.slot(),
2081                        thread_id,
2082                    );
2083                } else {
2084                    /*log::info!(
2085                        target: LOG_MODULE,
2086                        "got block {} on thread {}",
2087                        block.slot(),
2088                        thread_id,
2089                    );*/
2090                }
2091
2092                if prev > 0 {
2093                    assert_eq!(prev + 1, block.slot());
2094                }
2095                if block.was_skipped() {
2096                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2097                } else {
2098                    NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2099                }
2100                Ok(())
2101            }
2102            .boxed()
2103        }),
2104        None::<OnTxFn>,
2105        None::<OnEntryFn>,
2106        None::<OnRewardFn>,
2107        Some(stats_tracking),
2108        None,
2109    )
2110    .await
2111    .unwrap();
2112    assert_eq!(
2113        NUM_BLOCKS.load(Ordering::Relaxed) + NUM_SKIPPED_BLOCKS.load(Ordering::Relaxed),
2114        NUM_SLOTS_TO_COVER
2115    );
2116    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2117}