jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2#[cfg(test)]
3use futures_util::FutureExt;
4use futures_util::future::BoxFuture;
5use reqwest::{Client, Url};
6use solana_address::Address;
7use solana_geyser_plugin_manager::{
8    block_metadata_notifier_interface::BlockMetadataNotifier,
9    geyser_plugin_service::GeyserPluginServiceError,
10};
11use solana_hash::Hash;
12use solana_ledger::entry_notifier_interface::EntryNotifier;
13use solana_reward_info::RewardInfo;
14use solana_rpc::{
15    optimistically_confirmed_bank_tracker::SlotNotification,
16    transaction_notifier_interface::TransactionNotifier,
17};
18use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
19use solana_sdk_ids::vote::id as vote_program_id;
20use solana_transaction::versioned::VersionedTransaction;
21use std::{
22    fmt::Display,
23    future::Future,
24    io,
25    ops::Range,
26    path::PathBuf,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
30    },
31};
32use thiserror::Error;
33use tokio::{
34    sync::broadcast::{self, error::TryRecvError},
35    time::timeout,
36};
37
38use crate::{
39    LOG_MODULE,
40    epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
41    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
42    node_reader::NodeReader,
43    utils,
44};
45
46// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
47// header, seeking, reading next block). Adjust here to tune stall detection/restart
48// aggressiveness.
49const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
50
51fn poll_shutdown(
52    flag: &Arc<std::sync::atomic::AtomicBool>,
53    receiver: &mut Option<broadcast::Receiver<()>>,
54) -> bool {
55    if let Some(rx) = receiver {
56        match rx.try_recv() {
57            Ok(_) | Err(TryRecvError::Lagged(_)) => {
58                flag.store(true, Ordering::SeqCst);
59            }
60            Err(TryRecvError::Closed) => {
61                flag.store(true, Ordering::SeqCst);
62            }
63            Err(TryRecvError::Empty) => {}
64        }
65    }
66    flag.load(Ordering::SeqCst)
67}
68
69fn is_shutdown_error(err: &FirehoseError) -> bool {
70    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
71        inner
72            .downcast_ref::<io::Error>()
73            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
74            .unwrap_or(false)
75    }
76
77    match err {
78        FirehoseError::BlockHandlerError(inner)
79        | FirehoseError::TransactionHandlerError(inner)
80        | FirehoseError::EntryHandlerError(inner)
81        | FirehoseError::RewardHandlerError(inner)
82        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
83        _ => false,
84    }
85}
86
87/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
88/// the firehose.
89#[derive(Debug, Error)]
90pub enum FirehoseError {
91    /// HTTP client error surfaced from `reqwest`.
92    Reqwest(reqwest::Error),
93    /// Failure while reading the Old Faithful CAR header.
94    ReadHeader(Box<dyn std::error::Error>),
95    /// Error emitted by the Solana Geyser plugin service.
96    GeyserPluginService(GeyserPluginServiceError),
97    /// Transaction notifier could not be acquired from the Geyser service.
98    FailedToGetTransactionNotifier,
99    /// Failure while reading data until the next block boundary.
100    ReadUntilBlockError(Box<dyn std::error::Error>),
101    /// Failure while fetching an individual block.
102    GetBlockError(Box<dyn std::error::Error>),
103    /// Failed to decode a node at the given index.
104    NodeDecodingError(usize, Box<dyn std::error::Error>),
105    /// Error surfaced when querying the slot offset index.
106    SlotOffsetIndexError(SlotOffsetIndexError),
107    /// Failure while seeking to a slot within the Old Faithful CAR stream.
108    SeekToSlotError(Box<dyn std::error::Error>),
109    /// Error surfaced during the plugin `on_load` stage.
110    OnLoadError(Box<dyn std::error::Error>),
111    /// Error emitted while invoking the stats handler.
112    OnStatsHandlerError(Box<dyn std::error::Error>),
113    /// Timeout reached while waiting for a firehose operation.
114    OperationTimeout(&'static str),
115    /// Transaction handler returned an error.
116    TransactionHandlerError(Box<dyn std::error::Error>),
117    /// Entry handler returned an error.
118    EntryHandlerError(Box<dyn std::error::Error>),
119    /// Reward handler returned an error.
120    RewardHandlerError(Box<dyn std::error::Error>),
121    /// Block handler returned an error.
122    BlockHandlerError(Box<dyn std::error::Error>),
123}
124
125impl Display for FirehoseError {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
129            FirehoseError::ReadHeader(error) => {
130                write!(f, "Error reading header: {}", error)
131            }
132            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
133                f,
134                "Error initializing geyser plugin service: {}",
135                geyser_plugin_service_error
136            ),
137            FirehoseError::FailedToGetTransactionNotifier => write!(
138                f,
139                "Failed to get transaction notifier from GeyserPluginService"
140            ),
141            FirehoseError::ReadUntilBlockError(error) => {
142                write!(f, "Error reading until block: {}", error)
143            }
144            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
145            FirehoseError::NodeDecodingError(item_index, error) => {
146                write!(
147                    f,
148                    "Error seeking, reading data from, or decoding data for data node {}: {}",
149                    item_index, error
150                )
151            }
152            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
153                f,
154                "Error getting info from slot offset index: {}",
155                slot_offset_index_error
156            ),
157            FirehoseError::SeekToSlotError(error) => {
158                write!(f, "Error seeking to slot: {}", error)
159            }
160            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
161            FirehoseError::OnStatsHandlerError(error) => {
162                write!(f, "Stats handler error: {}", error)
163            }
164            FirehoseError::OperationTimeout(op) => {
165                write!(f, "Timeout while waiting for operation: {}", op)
166            }
167            FirehoseError::TransactionHandlerError(error) => {
168                write!(f, "Transaction handler error: {}", error)
169            }
170            FirehoseError::EntryHandlerError(error) => {
171                write!(f, "Entry handler error: {}", error)
172            }
173            FirehoseError::RewardHandlerError(error) => {
174                write!(f, "Reward handler error: {}", error)
175            }
176            FirehoseError::BlockHandlerError(error) => {
177                write!(f, "Block handler error: {}", error)
178            }
179        }
180    }
181}
182
183impl From<reqwest::Error> for FirehoseError {
184    fn from(e: reqwest::Error) -> Self {
185        FirehoseError::Reqwest(e)
186    }
187}
188
189impl From<GeyserPluginServiceError> for FirehoseError {
190    fn from(e: GeyserPluginServiceError) -> Self {
191        FirehoseError::GeyserPluginService(e)
192    }
193}
194
195impl From<SlotOffsetIndexError> for FirehoseError {
196    fn from(e: SlotOffsetIndexError) -> Self {
197        FirehoseError::SlotOffsetIndexError(e)
198    }
199}
200
201/// Per-thread progress information emitted by the firehose runner.
202#[derive(Clone, PartialEq, Eq, Hash, Debug)]
203pub struct ThreadStats {
204    /// Identifier of the worker thread reporting the stats.
205    pub thread_id: usize,
206    /// Timestamp captured when the thread began processing.
207    pub start_time: std::time::Instant,
208    /// Timestamp captured when the thread finished, if finished.
209    pub finish_time: Option<std::time::Instant>,
210    /// Inclusive slot range assigned to the thread.
211    pub slot_range: Range<u64>,
212    /// Latest slot processed by the thread.
213    pub current_slot: u64,
214    /// Total slots processed by the thread.
215    pub slots_processed: u64,
216    /// Number of blocks successfully processed.
217    pub blocks_processed: u64,
218    /// Number of slots skipped by the cluster leader.
219    pub leader_skipped_slots: u64,
220    /// Total transactions processed.
221    pub transactions_processed: u64,
222    /// Total entries processed.
223    pub entries_processed: u64,
224    /// Total rewards processed.
225    pub rewards_processed: u64,
226}
227
228/// Aggregated firehose statistics covering all worker threads.
229#[derive(Clone, PartialEq, Eq, Hash, Debug)]
230pub struct Stats {
231    /// Per-thread statistics for the current update.
232    pub thread_stats: ThreadStats,
233    /// Timestamp captured when processing began.
234    pub start_time: std::time::Instant,
235    /// Timestamp captured when all processing finished, if finished.
236    pub finish_time: Option<std::time::Instant>,
237    /// Slot range currently being processed.
238    pub slot_range: Range<u64>,
239    /// Aggregate slots processed across all threads.
240    pub slots_processed: u64,
241    /// Aggregate blocks processed across all threads.
242    pub blocks_processed: u64,
243    /// Aggregate skipped slots across all threads.
244    pub leader_skipped_slots: u64,
245    /// Aggregate transactions processed across all threads.
246    pub transactions_processed: u64,
247    /// Aggregate entries processed across all threads.
248    pub entries_processed: u64,
249    /// Aggregate rewards processed across all threads.
250    pub rewards_processed: u64,
251    /// Transactions processed since the previous stats pulse.
252    pub transactions_since_last_pulse: u64,
253    /// Blocks processed since the previous stats pulse.
254    pub blocks_since_last_pulse: u64,
255    /// Slots processed since the previous stats pulse.
256    pub slots_since_last_pulse: u64,
257    /// Elapsed time since the previous stats pulse.
258    pub time_since_last_pulse: std::time::Duration,
259}
260
261/// Configuration for periodic stats emission via a [`Handler`] callback.
262#[derive(Clone, PartialEq, Eq, Hash, Debug)]
263pub struct StatsTracking<OnStats: Handler<Stats>> {
264    /// Callback invoked whenever new stats are available.
265    pub on_stats: OnStats,
266    /// Minimum number of slots processed before triggering the callback.
267    pub tracking_interval_slots: u64,
268}
269
270#[inline(always)]
271#[allow(clippy::too_many_arguments)]
272async fn maybe_emit_stats<OnStats: Handler<Stats>>(
273    stats_tracking: Option<&StatsTracking<OnStats>>,
274    thread_index: usize,
275    thread_stats: &ThreadStats,
276    overall_slots_processed: &AtomicU64,
277    overall_blocks_processed: &AtomicU64,
278    overall_transactions_processed: &AtomicU64,
279    overall_entries_processed: &AtomicU64,
280    transactions_since_stats: &AtomicU64,
281    blocks_since_stats: &AtomicU64,
282    slots_since_stats: &AtomicU64,
283    last_pulse: &Arc<AtomicU64>,
284    base_instant: std::time::Instant,
285) -> Result<(), (FirehoseError, u64)> {
286    if let Some(stats_tracker) = stats_tracking {
287        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
288        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
289        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
290        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
291        let now_nanos = base_instant.elapsed().as_nanos() as u64;
292        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
293        let delta_nanos = now_nanos.saturating_sub(previous);
294        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
295        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
296        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
297        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
298
299        let stats = Stats {
300            thread_stats: thread_stats.clone(),
301            start_time: thread_stats.start_time,
302            finish_time: thread_stats.finish_time,
303            slot_range: thread_stats.slot_range.clone(),
304            slots_processed: total_slots,
305            blocks_processed: total_blocks,
306            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
307            transactions_processed: total_transactions,
308            entries_processed: total_entries,
309            rewards_processed: thread_stats.rewards_processed,
310            transactions_since_last_pulse: processed_transactions,
311            blocks_since_last_pulse: processed_blocks,
312            slots_since_last_pulse: processed_slots,
313            time_since_last_pulse,
314        };
315
316        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
317            last_pulse.store(previous, Ordering::Relaxed);
318            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
319            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
320            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
321            return Err((
322                FirehoseError::OnStatsHandlerError(e),
323                thread_stats.current_slot,
324            ));
325        }
326    }
327    Ok(())
328}
329
330#[inline(always)]
331fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
332    if tracking_enabled {
333        atomic.fetch_add(value, Ordering::Relaxed);
334    }
335}
336
337/// Firehose transaction payload passed to [`Handler`] callbacks.
338#[derive(Debug, Clone)]
339pub struct TransactionData {
340    /// Slot that contains the transaction.
341    pub slot: u64,
342    /// Index of the transaction within the slot.
343    pub transaction_slot_index: usize,
344    /// Transaction signature.
345    pub signature: solana_signature::Signature,
346    /// Hash of the transaction message.
347    pub message_hash: Hash,
348    /// Indicates whether the transaction is a vote.
349    pub is_vote: bool,
350    /// Status metadata returned by the Solana runtime.
351    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
352    /// Fully decoded transaction.
353    pub transaction: VersionedTransaction,
354}
355
356/// Block entry metadata passed to [`Handler`] callbacks.
357#[derive(Debug, Clone)]
358pub struct EntryData {
359    /// Slot that generated the entry.
360    pub slot: u64,
361    /// Index of the entry within the slot.
362    pub entry_index: usize,
363    /// Range of transaction indexes covered by the entry.
364    pub transaction_indexes: Range<usize>,
365    /// Number of hashes associated with the entry.
366    pub num_hashes: u64,
367    /// Entry hash.
368    pub hash: Hash,
369}
370
371/// Reward data conveyed to reward [`Handler`] callbacks.
372#[derive(Debug, Clone)]
373pub struct RewardsData {
374    /// Slot the rewards correspond to.
375    pub slot: u64,
376    /// Reward recipients and their associated reward information.
377    pub rewards: Vec<(Address, RewardInfo)>,
378}
379
380/// Block-level data streamed to block handlers.
381#[derive(Debug)]
382pub enum BlockData {
383    /// Fully populated block payload with ledger metadata.
384    Block {
385        /// Parent slot number.
386        parent_slot: u64,
387        /// Parent block hash.
388        parent_blockhash: Hash,
389        /// Current block slot.
390        slot: u64,
391        /// Current block hash.
392        blockhash: Hash,
393        /// Rewards keyed by account and partition information.
394        rewards: KeyedRewardsAndNumPartitions,
395        /// Optional Unix timestamp for the block.
396        block_time: Option<i64>,
397        /// Optional ledger block height.
398        block_height: Option<u64>,
399        /// Number of executed transactions in the block.
400        executed_transaction_count: u64,
401        /// Number of entries contained in the block.
402        entry_count: u64,
403    },
404    /// Marker indicating the slot was skipped by the leader.
405    LeaderSkipped {
406        /// Skipped slot number.
407        slot: u64,
408    },
409}
410
411impl BlockData {
412    /// Returns the slot associated with this block or skipped slot.
413    #[inline(always)]
414    pub const fn slot(&self) -> u64 {
415        match self {
416            BlockData::Block { slot, .. } => *slot,
417            BlockData::LeaderSkipped { slot } => *slot,
418        }
419    }
420
421    /// Returns `true` if the slot was skipped by the leader.
422    #[inline(always)]
423    pub const fn was_skipped(&self) -> bool {
424        matches!(self, BlockData::LeaderSkipped { .. })
425    }
426}
427
428type HandlerResult = Result<(), Box<dyn std::error::Error + Send + 'static>>;
429type HandlerFuture = BoxFuture<'static, HandlerResult>;
430
431/// Asynchronous callback invoked for each firehose event of type `Data`.
432pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
433
434impl<Data, F> Handler<Data> for F where
435    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
436{
437}
438
439/// Function pointer alias for [`Handler`] callbacks.
440pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
441/// Convenience alias for block handlers accepted by [`firehose`].
442pub type OnBlockFn = HandlerFn<BlockData>;
443/// Convenience alias for transaction handlers accepted by [`firehose`].
444pub type OnTxFn = HandlerFn<TransactionData>;
445/// Convenience alias for entry handlers accepted by [`firehose`].
446pub type OnEntryFn = HandlerFn<EntryData>;
447/// Convenience alias for reward handlers accepted by [`firehose`].
448pub type OnRewardFn = HandlerFn<RewardsData>;
449/// Type alias for [`StatsTracking`] using simple function pointers.
450pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
451
452/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
453#[inline]
454#[allow(clippy::too_many_arguments)]
455pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats>(
456    threads: u64,
457    slot_range: Range<u64>,
458    on_block: Option<OnBlock>,
459    on_tx: Option<OnTransaction>,
460    on_entry: Option<OnEntry>,
461    on_rewards: Option<OnRewards>,
462    stats_tracking: Option<StatsTracking<OnStats>>,
463    shutdown_signal: Option<broadcast::Receiver<()>>,
464) -> Result<(), (FirehoseError, u64)>
465where
466    OnBlock: Handler<BlockData>,
467    OnTransaction: Handler<TransactionData>,
468    OnEntry: Handler<EntryData>,
469    OnRewards: Handler<RewardsData>,
470    OnStats: Handler<Stats>,
471{
472    if threads == 0 {
473        return Err((
474            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
475            slot_range.start,
476        ));
477    }
478    let client = Client::new();
479    log::info!(target: LOG_MODULE, "starting firehose...");
480    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
481
482    let slot_range = Arc::new(slot_range);
483
484    // divide slot_range into n subranges
485    let subranges = generate_subranges(&slot_range, threads);
486    if threads > 1 {
487        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
488    }
489
490    let firehose_start = std::time::Instant::now();
491    let shutdown_flag = Arc::new(AtomicBool::new(false));
492    if let Some(ref rx) = shutdown_signal {
493        let mut rx = rx.resubscribe();
494        let flag = shutdown_flag.clone();
495        tokio::spawn(async move {
496            if rx.recv().await.is_ok() {
497                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
498                flag.store(true, Ordering::SeqCst);
499            }
500        });
501    }
502    let mut handles = Vec::new();
503    // Shared per-thread error counters
504    let error_counts: Arc<Vec<AtomicU32>> =
505        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
506
507    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
508    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
509    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
510    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
511
512    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
513        let error_counts = error_counts.clone();
514        let client = client.clone();
515        let on_block = on_block.clone();
516        let on_tx = on_tx.clone();
517        let on_entry = on_entry.clone();
518        let on_reward = on_rewards.clone();
519        let overall_slots_processed = overall_slots_processed.clone();
520        let overall_blocks_processed = overall_blocks_processed.clone();
521        let overall_transactions_processed = overall_transactions_processed.clone();
522        let overall_entries_processed = overall_entries_processed.clone();
523        let stats_tracking = stats_tracking.clone();
524        let transactions_since_stats = Arc::new(AtomicU64::new(0));
525        let blocks_since_stats = Arc::new(AtomicU64::new(0));
526        let slots_since_stats = Arc::new(AtomicU64::new(0));
527        let last_pulse = Arc::new(AtomicU64::new(0));
528        let transactions_since_stats_cloned = transactions_since_stats.clone();
529        let blocks_since_stats_cloned = blocks_since_stats.clone();
530        let slots_since_stats_cloned = slots_since_stats.clone();
531        let last_pulse_cloned = last_pulse.clone();
532        let shutdown_flag = shutdown_flag.clone();
533        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
534
535        let handle = tokio::spawn(async move {
536            let transactions_since_stats = transactions_since_stats_cloned;
537            let blocks_since_stats = blocks_since_stats_cloned;
538            let slots_since_stats = slots_since_stats_cloned;
539            let last_pulse = last_pulse_cloned;
540            let mut shutdown_rx = thread_shutdown_rx;
541            let start_time = std::time::Instant::now();
542            last_pulse.store(0, Ordering::Relaxed);
543            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
544            let mut skip_until_index = None;
545            let block_enabled = on_block.is_some();
546            let tx_enabled = on_tx.is_some();
547            let entry_enabled = on_entry.is_some();
548            let reward_enabled = on_reward.is_some();
549            let tracking_enabled = stats_tracking.is_some();
550            let mut last_counted_slot = slot_range.start.saturating_sub(1);
551
552            // let mut triggered = false;
553            while let Err((err, slot)) = async {
554                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
555                    log::info!(
556                        target: &log_target,
557                        "shutdown requested; terminating firehose thread {}",
558                        thread_index
559                    );
560                    return Ok(());
561                }
562                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
563                log::info!(
564                    target: &log_target,
565                    "slot range: {} (epoch {}) ... {} (epoch {})",
566                    slot_range.start,
567                    slot_to_epoch(slot_range.start),
568                    slot_range.end,
569                    slot_to_epoch(slot_range.end)
570                );
571
572                log::info!(target: &log_target, "🚒 starting firehose...");
573
574                // for each epoch
575                let mut current_slot: Option<u64> = None;
576                let mut previous_slot: Option<u64> = Some(slot_range.start.saturating_sub(1));
577                for epoch_num in epoch_range.clone() {
578                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
579                        log::info!(
580                            target: &log_target,
581                            "shutdown requested; terminating firehose thread {}",
582                            thread_index
583                        );
584                        return Ok(());
585                    }
586                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
587                    let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
588                        Ok(stream) => stream,
589                        Err(_) => {
590                            return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
591                        }
592                    };
593                    let mut reader = NodeReader::new(stream);
594
595                    let header_fut = reader.read_raw_header();
596                    let header = match timeout(OP_TIMEOUT, header_fut).await {
597                        Ok(res) => res
598                            .map_err(FirehoseError::ReadHeader)
599                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
600                        Err(_) => {
601                            return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
602                        }
603                    };
604                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
605
606                    let mut previous_blockhash = Hash::default();
607                    let mut latest_entry_blockhash = Hash::default();
608
609                    let mut thread_stats = if tracking_enabled {
610                        Some(ThreadStats {
611                            thread_id: thread_index,
612                            start_time,
613                            finish_time: None,
614                            slot_range: slot_range.clone(),
615                            current_slot: slot_range.start,
616                            slots_processed: 0,
617                            blocks_processed: 0,
618                            leader_skipped_slots: 0,
619                            transactions_processed: 0,
620                            entries_processed: 0,
621                            rewards_processed: 0,
622                        })
623                    } else {
624                        None
625                    };
626
627                    if slot_range.start > epoch_to_slot_range(epoch_num).0 {
628                        // Seek to the previous slot so the stream includes all nodes (transactions,
629                        // entries, rewards) that precede the block payload for `slot_range.start`.
630                        let seek_slot = slot_range.start.saturating_sub(1);
631                        let seek_fut = reader.seek_to_slot(seek_slot);
632                        match timeout(OP_TIMEOUT, seek_fut).await {
633                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
634                            Err(_) => {
635                                return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
636                            }
637                        }
638                    }
639
640                    // for each item in each block
641                    let mut item_index = 0;
642                    let mut displayed_skip_message = false;
643                    loop {
644                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
645                            log::info!(
646                                target: &log_target,
647                                "shutdown requested; terminating firehose thread {}",
648                                thread_index
649                            );
650                            return Ok(());
651                        }
652                        let read_fut = reader.read_until_block();
653                        let nodes = match timeout(OP_TIMEOUT, read_fut).await {
654                            Ok(result) => result
655                                .map_err(FirehoseError::ReadUntilBlockError)
656                                .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
657                            Err(_) => {
658                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
659                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
660                            }
661                        };
662                        if nodes.is_empty() {
663                            log::info!(
664                                target: &log_target,
665                                "reached end of epoch {}",
666                                epoch_num
667                            );
668                            break;
669                        }
670                        if let Some(last_node) = nodes.0.last()
671                            && !last_node.get_node().is_block()
672                        {
673                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
674                            break;
675                        }
676                        let block = nodes
677                            .get_block()
678                            .map_err(FirehoseError::GetBlockError)
679                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
680                        log::debug!(
681                            target: &log_target,
682                            "read {} items from epoch {}, now at slot {}",
683                            item_index,
684                            epoch_num,
685                            block.slot
686                        );
687                        let slot = block.slot;
688                        if slot >= slot_range.end {
689                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
690                            // Return early to terminate the firehose thread cleanly. We use >=
691                            // because slot_range is half-open [start, end), so any slot equal
692                            // to end is out-of-range and must not be processed.
693
694                            let target_last_slot = slot_range.end.saturating_sub(1);
695                            let skip_start_from_previous = previous_slot
696                                .map(|s| s.saturating_add(1))
697                                .unwrap_or(slot_range.start);
698                            let first_untracked_slot = last_counted_slot.saturating_add(1);
699                            let skip_start = std::cmp::max(skip_start_from_previous, first_untracked_slot);
700
701                            if skip_start <= target_last_slot {
702                                if block_enabled
703                                    && let Some(on_block_cb) = on_block.as_ref() {
704                                        for skipped_slot in skip_start..=target_last_slot {
705                                            log::debug!(
706                                                target: &log_target,
707                                                "leader skipped slot {} (prev_counted {}, target {})",
708                                                skipped_slot,
709                                                last_counted_slot,
710                                                target_last_slot,
711                                            );
712                                            on_block_cb(
713                                                thread_index,
714                                                BlockData::LeaderSkipped { slot: skipped_slot },
715                                            )
716                                            .await
717                                            .map_err(|e| FirehoseError::BlockHandlerError(e))
718                                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
719                                        }
720                                    }
721
722                                let missing_slots = target_last_slot.saturating_sub(skip_start) + 1;
723                                if tracking_enabled {
724                                    if let Some(ref mut stats) = thread_stats {
725                                        stats.leader_skipped_slots += missing_slots;
726                                        stats.slots_processed += missing_slots;
727                                        stats.current_slot = target_last_slot;
728                                    }
729                                    overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
730                                    slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
731                                }
732                                last_counted_slot = target_last_slot;
733                            }
734
735                            return Ok(());
736                        }
737                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
738                        if slot < slot_range.start {
739                            if slot.saturating_add(1) == slot_range.start {
740                                log::debug!(
741                                    target: &log_target,
742                                    "priming reader with preceding slot {}, skipping",
743                                    slot
744                                );
745                            } else {
746                                log::warn!(
747                                    target: &log_target,
748                                    "encountered slot {} before start of range {}, skipping",
749                                    slot,
750                                    slot_range.start
751                                );
752                            }
753                            continue;
754                        }
755                        if current_slot.is_some() {
756                            previous_slot = current_slot;
757                        }
758                        current_slot = Some(slot);
759                        let mut entry_index: usize = 0;
760                        let mut this_block_executed_transaction_count: u64 = 0;
761                        let mut this_block_entry_count: u64 = 0;
762                        let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
763
764                        for node_with_cid in &nodes.0 {
765                            item_index += 1;
766                            if let Some(skip) = skip_until_index {
767                                if item_index < skip {
768                                    if !displayed_skip_message {
769                                        log::info!(
770                                            target: &log_target,
771                                            "skipping until index {} (at {})",
772                                            skip,
773                                            item_index
774                                        );
775                                        displayed_skip_message = true;
776                                    }
777                                    continue;
778                                } else {
779                                    log::info!(
780                                        target: &log_target,
781                                        "reached target index {}, resuming...",
782                                        skip
783                                    );
784                                    skip_until_index = None;
785                                }
786                            }
787                            let node = node_with_cid.get_node();
788
789                            if let Some(ref mut stats) = thread_stats {
790                                stats.current_slot = slot;
791                            }
792
793                            let error_slot = current_slot.unwrap_or(slot_range.start);
794
795                            use crate::node::Node::*;
796                            match node {
797                                Transaction(tx) => {
798                                    if tx_enabled
799                                        && let Some(on_tx_cb) = on_tx.as_ref()
800                                    {
801                                        let error_slot = current_slot.unwrap_or(slot_range.start);
802                                        let versioned_tx = tx.as_parsed().map_err(|err| {
803                                            (
804                                                FirehoseError::NodeDecodingError(item_index, err),
805                                                error_slot,
806                                            )
807                                        })?;
808                                        let reassembled_metadata = nodes
809                                            .reassemble_dataframes(tx.metadata.clone())
810                                            .map_err(|err| {
811                                                (
812                                                    FirehoseError::NodeDecodingError(item_index, err),
813                                                    error_slot,
814                                                )
815                                            })?;
816
817                                        let decompressed =
818                                            utils::decompress_zstd(reassembled_metadata.clone())
819                                                .map_err(|err| {
820                                                    (
821                                                        FirehoseError::NodeDecodingError(
822                                                            item_index,
823                                                            err,
824                                                        ),
825                                                        error_slot,
826                                                    )
827                                                })?;
828
829                                        let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
830                                            prost_011::Message::decode(decompressed.as_slice())
831                                                .map_err(|err| {
832                                                    (
833                                                        FirehoseError::NodeDecodingError(
834                                                            item_index,
835                                                            Box::new(err),
836                                                        ),
837                                                        error_slot,
838                                                    )
839                                                })?;
840
841                                        let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
842                                            metadata.try_into().map_err(|err| {
843                                                (
844                                                    FirehoseError::NodeDecodingError(
845                                                        item_index,
846                                                        Box::new(err),
847                                                    ),
848                                                    error_slot,
849                                                )
850                                            })?;
851
852                                        let message_hash = {
853                                            #[cfg(feature = "verify-transaction-signatures")]
854                                            {
855                                                versioned_tx.verify_and_hash_message().map_err(|err| {
856                                                    (
857                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
858                                                        error_slot,
859                                                    )
860                                                })?
861                                            }
862                                            #[cfg(not(feature = "verify-transaction-signatures"))]
863                                            {
864                                                versioned_tx.message.hash()
865                                            }
866                                        };
867                                        let signature = versioned_tx
868                                            .signatures
869                                            .first()
870                                            .ok_or_else(|| {
871                                                Box::new(std::io::Error::new(
872                                                    std::io::ErrorKind::InvalidData,
873                                                    "transaction missing signature",
874                                                )) as Box<dyn std::error::Error>
875                                            })
876                                            .map_err(|err| {
877                                                (
878                                                    FirehoseError::NodeDecodingError(
879                                                        item_index,
880                                                        err,
881                                                    ),
882                                                    error_slot,
883                                                )
884                                            })?;
885                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
886
887                                        on_tx_cb(
888                                            thread_index,
889                                            TransactionData {
890                                                slot: block.slot,
891                                                transaction_slot_index: tx.index.unwrap() as usize,
892                                                signature: *signature,
893                                                message_hash,
894                                                is_vote,
895                                                transaction_status_meta: as_native_metadata.clone(),
896                                                transaction: versioned_tx.clone(),
897                                            },
898                                        )
899                                        .await
900                                        .map_err(|e| {
901                                            (
902                                                FirehoseError::TransactionHandlerError(e),
903                                                error_slot,
904                                            )
905                                        })?;
906                                    }
907                                    fetch_add_if(
908                                        tracking_enabled,
909                                        &overall_transactions_processed,
910                                        1,
911                                    );
912                                    if let Some(ref mut stats) = thread_stats {
913                                        stats.transactions_processed += 1;
914                                    }
915                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
916                                }
917                                Entry(entry) => {
918                                    let entry_hash = Hash::from(entry.hash.to_bytes());
919                                    let entry_transaction_count = entry.transactions.len();
920                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
921                                    let starting_transaction_index_u64 =
922                                        this_block_executed_transaction_count;
923                                    latest_entry_blockhash = entry_hash;
924                                    this_block_executed_transaction_count += entry_transaction_count_u64;
925                                    this_block_entry_count += 1;
926
927                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
928                                        let starting_transaction_index = usize::try_from(
929                                            starting_transaction_index_u64,
930                                        )
931                                        .map_err(|err| {
932                                            (
933                                                FirehoseError::EntryHandlerError(Box::new(err)),
934                                                error_slot,
935                                            )
936                                        })?;
937                                        let transaction_indexes_end =
938                                            starting_transaction_index + entry_transaction_count;
939                                        on_entry_cb(
940                                            thread_index,
941                                            EntryData {
942                                                slot: block.slot,
943                                                entry_index,
944                                                transaction_indexes: starting_transaction_index
945                                                    ..transaction_indexes_end,
946                                                num_hashes: entry.num_hashes,
947                                                hash: entry_hash,
948                                            },
949                                        )
950                                        .await
951                                        .map_err(|e| {
952                                            (
953                                                FirehoseError::EntryHandlerError(e),
954                                                error_slot,
955                                            )
956                                        })?;
957                                    }
958                                    entry_index += 1;
959                                    fetch_add_if(
960                                        tracking_enabled,
961                                        &overall_entries_processed,
962                                        1,
963                                    );
964                                    if let Some(ref mut stats) = thread_stats {
965                                        stats.entries_processed += 1;
966                                    }
967                                }
968                                Block(block) => {
969                                    let prev_last_counted_slot = last_counted_slot;
970                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
971                                        (
972                                            stats.slots_processed,
973                                            stats.blocks_processed,
974                                            stats.leader_skipped_slots,
975                                            stats.current_slot,
976                                        )
977                                    });
978
979                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
980                                    let skip_start_from_previous = previous_slot
981                                        .map(|s| s.saturating_add(1))
982                                        .unwrap_or(next_expected_slot);
983                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
984
985                                    for skipped_slot in skip_start..slot {
986                                        log::debug!(
987                                            target: &log_target,
988                                            "leader skipped slot {} (prev_counted {}, current slot {})",
989                                            skipped_slot,
990                                            prev_last_counted_slot,
991                                            slot,
992                                        );
993                                        if block_enabled
994                                            && let Some(on_block_cb) = on_block.as_ref() {
995                                                on_block_cb(
996                                                    thread_index,
997                                                    BlockData::LeaderSkipped {
998                                                        slot: skipped_slot,
999                                                    },
1000                                                )
1001                                                .await
1002                                                .map_err(|e| {
1003                                                    (
1004                                                        FirehoseError::BlockHandlerError(e),
1005                                                        error_slot,
1006                                                    )
1007                                                })?;
1008                                            }
1009                                        if tracking_enabled {
1010                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1011                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1012                                            if let Some(ref mut stats) = thread_stats {
1013                                                stats.leader_skipped_slots += 1;
1014                                                stats.slots_processed += 1;
1015                                                stats.current_slot = skipped_slot;
1016                                            }
1017                                        }
1018                                        last_counted_slot = skipped_slot;
1019                                    }
1020
1021                                    if slot <= last_counted_slot {
1022                                        log::debug!(
1023                                            target: &log_target,
1024                                            "duplicate block {}, already counted (last_counted={})",
1025                                            slot,
1026                                            last_counted_slot,
1027                                        );
1028                                        this_block_rewards.clear();
1029                                        continue;
1030                                    }
1031
1032                                    if block_enabled {
1033                                        if let Some(on_block_cb) = on_block.as_ref() {
1034                                            let keyed_rewards = std::mem::take(&mut this_block_rewards);
1035                                            on_block_cb(
1036                                                thread_index,
1037                                                BlockData::Block {
1038                                                    parent_slot: block.meta.parent_slot,
1039                                                    parent_blockhash: previous_blockhash,
1040                                                    slot: block.slot,
1041                                                    blockhash: latest_entry_blockhash,
1042                                                    rewards: KeyedRewardsAndNumPartitions {
1043                                                        keyed_rewards,
1044                                                        num_partitions: None,
1045                                                    },
1046                                                    block_time: Some(block.meta.blocktime as i64),
1047                                                    block_height: block.meta.block_height,
1048                                                    executed_transaction_count:
1049                                                        this_block_executed_transaction_count,
1050                                                    entry_count: this_block_entry_count,
1051                                                },
1052                                            )
1053                                            .await
1054                                            .map_err(|e| {
1055                                                (
1056                                                    FirehoseError::BlockHandlerError(e),
1057                                                    error_slot,
1058                                                )
1059                                            })?;
1060                                        }
1061                                    } else {
1062                                        this_block_rewards.clear();
1063                                    }
1064                                    previous_blockhash = latest_entry_blockhash;
1065
1066                                    if tracking_enabled {
1067                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1068                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1069                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1070                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1071                                        if let Some(ref mut stats) = thread_stats {
1072                                            stats.blocks_processed += 1;
1073                                            stats.slots_processed += 1;
1074                                            stats.current_slot = slot;
1075                                        }
1076
1077                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1078                                            (&stats_tracking, thread_stats.as_mut())
1079                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1080                                                && let Err(err) = maybe_emit_stats(
1081                                                    stats_tracking.as_ref(),
1082                                                    thread_index,
1083                                                    thread_stats_ref,
1084                                                    &overall_slots_processed,
1085                                                    &overall_blocks_processed,
1086                                                    &overall_transactions_processed,
1087                                                    &overall_entries_processed,
1088                                                    &transactions_since_stats,
1089                                                    &blocks_since_stats,
1090                                                    &slots_since_stats,
1091                                                    &last_pulse,
1092                                                    start_time,
1093                                                )
1094                                                .await
1095                                                {
1096                                                    blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1097                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1098                                                    overall_blocks_processed
1099                                                        .fetch_sub(1, Ordering::Relaxed);
1100                                                    overall_slots_processed
1101                                                        .fetch_sub(1, Ordering::Relaxed);
1102                                                    if let Some((
1103                                                        prev_slots_processed,
1104                                                        prev_blocks_processed,
1105                                                        prev_leader_skipped,
1106                                                        prev_current_slot,
1107                                                    )) = thread_stats_snapshot
1108                                                    {
1109                                                        thread_stats_ref.slots_processed =
1110                                                            prev_slots_processed;
1111                                                        thread_stats_ref.blocks_processed =
1112                                                            prev_blocks_processed;
1113                                                        thread_stats_ref.leader_skipped_slots =
1114                                                            prev_leader_skipped;
1115                                                        thread_stats_ref.current_slot =
1116                                                            prev_current_slot;
1117                                                    }
1118                                                    last_counted_slot = prev_last_counted_slot;
1119                                                    return Err(err);
1120                                                }
1121                                    }
1122
1123                                    last_counted_slot = slot;
1124                                }
1125                                Subset(_subset) => (),
1126                                Epoch(_epoch) => (),
1127                                Rewards(rewards) => {
1128                                    if reward_enabled || block_enabled {
1129                                        let reassembled = nodes
1130                                            .reassemble_dataframes(rewards.data.clone())
1131                                            .map_err(|err| {
1132                                                (
1133                                                    FirehoseError::NodeDecodingError(item_index, err),
1134                                                    current_slot.unwrap_or(slot_range.start),
1135                                                )
1136                                            })?;
1137                                        if reassembled.is_empty() {
1138                                            this_block_rewards.clear();
1139                                            if reward_enabled
1140                                                && let Some(on_reward_cb) = on_reward.as_ref()
1141                                            {
1142                                                on_reward_cb(
1143                                                    thread_index,
1144                                                    RewardsData {
1145                                                        slot: block.slot,
1146                                                        rewards: Vec::new(),
1147                                                    },
1148                                                )
1149                                                .await
1150                                                .map_err(|e| {
1151                                                    (
1152                                                        FirehoseError::RewardHandlerError(e),
1153                                                        error_slot,
1154                                                    )
1155                                                })?;
1156                                            }
1157                                            continue;
1158                                        }
1159
1160                                        let decompressed = utils::decompress_zstd(reassembled)
1161                                            .map_err(|err| {
1162                                                (
1163                                                    FirehoseError::NodeDecodingError(
1164                                                        item_index,
1165                                                        err,
1166                                                    ),
1167                                                    error_slot,
1168                                                )
1169                                            })?;
1170
1171                                        let decoded =
1172                                            prost_011::Message::decode(decompressed.as_slice())
1173                                                .map_err(|err| {
1174                                                    (
1175                                                        FirehoseError::NodeDecodingError(
1176                                                            item_index,
1177                                                            Box::new(err),
1178                                                        ),
1179                                                        error_slot,
1180                                                    )
1181                                                })?;
1182                                        let keyed_rewards = convert_proto_rewards(&decoded)
1183                                            .map_err(|err| {
1184                                                (
1185                                                    FirehoseError::NodeDecodingError(item_index, err),
1186                                                    error_slot,
1187                                                )
1188                                            })?;
1189                                        if reward_enabled
1190                                            && let Some(on_reward_cb) = on_reward.as_ref()
1191                                        {
1192                                            on_reward_cb(
1193                                                thread_index,
1194                                                RewardsData {
1195                                                    slot: block.slot,
1196                                                    rewards: keyed_rewards.clone(),
1197                                                },
1198                                            )
1199                                            .await
1200                                            .map_err(|e| {
1201                                                (
1202                                                    FirehoseError::RewardHandlerError(e),
1203                                                    error_slot,
1204                                                )
1205                                            })?;
1206                                        }
1207                                        this_block_rewards = keyed_rewards;
1208                                        if let Some(ref mut stats) = thread_stats {
1209                                            stats.rewards_processed +=
1210                                                this_block_rewards.len() as u64;
1211                                        }
1212                                    }
1213                                }
1214                                DataFrame(_data_frame) => (),
1215                            }
1216                        }
1217                        if block.slot == slot_range.end - 1 {
1218                            let finish_time = std::time::Instant::now();
1219                            let elapsed = finish_time.duration_since(start_time);
1220                            log::info!(target: &log_target, "processed slot {}", block.slot);
1221                            let elapsed_pretty = human_readable_duration(elapsed);
1222                            log::info!(
1223                                target: &log_target,
1224                                "processed {} slots across {} epochs in {}.",
1225                                slot_range.end - slot_range.start,
1226                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1227                                elapsed_pretty
1228                            );
1229                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1230                            // On completion, report threads with non-zero error counts for
1231                            // visibility.
1232                            let summary: String = error_counts
1233                                .iter()
1234                                .enumerate()
1235                                .filter_map(|(i, c)| {
1236                                    let v = c.load(Ordering::Relaxed);
1237                                    if v > 0 {
1238                                        Some(format!("{:03}({})", i, v))
1239                                    } else {
1240                                        None
1241                                    }
1242                                })
1243                                .collect::<Vec<_>>()
1244                                .join(", ");
1245                            if !summary.is_empty() {
1246                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1247                            }
1248                            return Ok(());
1249                        }
1250                    }
1251                    if tracking_enabled
1252                        && let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1253                            && last_counted_slot < expected_last_slot {
1254                                let flush_start = last_counted_slot.saturating_add(1);
1255                                if block_enabled
1256                                    && let Some(on_block_cb) = on_block.as_ref() {
1257                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1258                                        for skipped_slot in flush_start..=expected_last_slot {
1259                                            log::debug!(
1260                                                target: &log_target,
1261                                                "leader skipped slot {} during final flush (prev_counted {})",
1262                                                skipped_slot,
1263                                                last_counted_slot,
1264                                            );
1265                                            on_block_cb(
1266                                                thread_index,
1267                                                BlockData::LeaderSkipped { slot: skipped_slot },
1268                                            )
1269                                            .await
1270                                            .map_err(|e| FirehoseError::BlockHandlerError(e))
1271                                            .map_err(|e| (e, error_slot))?;
1272                                        }
1273                                    }
1274                                let missing_slots = expected_last_slot.saturating_sub(last_counted_slot);
1275                                if let Some(stats_ref) = thread_stats.as_mut() {
1276                                    stats_ref.leader_skipped_slots += missing_slots;
1277                                    stats_ref.slots_processed += missing_slots;
1278                                    stats_ref.current_slot = expected_last_slot;
1279                                }
1280                                overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
1281                                slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
1282                                last_counted_slot = expected_last_slot;
1283                            }
1284                    if let Some(ref mut stats) = thread_stats {
1285                        stats.finish_time = Some(std::time::Instant::now());
1286                        maybe_emit_stats(
1287                            stats_tracking.as_ref(),
1288                            thread_index,
1289                            stats,
1290                            &overall_slots_processed,
1291                            &overall_blocks_processed,
1292                            &overall_transactions_processed,
1293                            &overall_entries_processed,
1294                            &transactions_since_stats,
1295                            &blocks_since_stats,
1296                            &slots_since_stats,
1297                            &last_pulse,
1298                            start_time,
1299                        )
1300                        .await?;
1301                    }
1302                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1303                    }
1304                    Ok(())
1305            }
1306            .await
1307            {
1308                if is_shutdown_error(&err) {
1309                    log::info!(
1310                        target: &log_target,
1311                        "shutdown requested; terminating firehose thread {}",
1312                        thread_index
1313                    );
1314                    break;
1315                }
1316                log::error!(
1317                    target: &log_target,
1318                    "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1319                    slot,
1320                    slot_to_epoch(slot)
1321                );
1322                log::error!(target: &log_target, "{}", err);
1323                let item_index = match err {
1324                    FirehoseError::NodeDecodingError(item_index, _) => item_index,
1325                    _ => 0,
1326                };
1327                // Increment this thread's error counter
1328                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1329                log::warn!(
1330                    target: &log_target,
1331                    "restarting from slot {} at index {}",
1332                    slot,
1333                    item_index,
1334                );
1335                // Update slot range to resume from the failed slot, not the original start
1336                slot_range.start = slot;
1337                skip_until_index = Some(item_index);
1338            }
1339        });
1340        handles.push(handle);
1341    }
1342
1343    // Wait for all threads to complete
1344    for handle in handles {
1345        handle.await.unwrap();
1346    }
1347    if stats_tracking.is_some() {
1348        let elapsed = firehose_start.elapsed();
1349        let elapsed_secs = elapsed.as_secs_f64();
1350        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1351        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1352        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1353        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1354        let total_errors: u64 = error_counts
1355            .iter()
1356            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1357            .sum();
1358        let overall_tps = if elapsed_secs > 0.0 {
1359            total_transactions as f64 / elapsed_secs
1360        } else {
1361            0.0
1362        };
1363        log::info!(
1364            target: LOG_MODULE,
1365            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1366            elapsed_secs,
1367            total_slots,
1368            total_blocks,
1369            total_leader_skipped,
1370            total_transactions,
1371            overall_tps,
1372            total_errors
1373        );
1374    }
1375    if shutdown_flag.load(Ordering::SeqCst) {
1376        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1377    } else {
1378        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1379    }
1380    Ok(())
1381}
1382
1383#[allow(clippy::result_large_err)]
1384/// Builds a Geyser-backed firehose and returns a slot notification stream.
1385///
1386/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1387/// rather than relying solely on remote streams.
1388pub fn firehose_geyser(
1389    rt: Arc<tokio::runtime::Runtime>,
1390    slot_range: Range<u64>,
1391    geyser_config_files: Option<&[PathBuf]>,
1392    index_base_url: &Url,
1393    client: &Client,
1394    on_load: impl Future<Output = Result<(), Box<dyn std::error::Error + Send + 'static>>>
1395    + Send
1396    + 'static,
1397    threads: u64,
1398) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1399    if threads == 0 {
1400        return Err((
1401            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1402            slot_range.start,
1403        ));
1404    }
1405    log::info!(target: LOG_MODULE, "starting firehose...");
1406    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1407    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1408    let mut entry_notifier_maybe = None;
1409    let mut block_meta_notifier_maybe = None;
1410    let mut transaction_notifier_maybe = None;
1411    if let Some(geyser_config_files) = geyser_config_files {
1412        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1413
1414        let service =
1415            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1416                confirmed_bank_receiver.clone(),
1417                true,
1418                geyser_config_files,
1419            )
1420            .map_err(|e| (e.into(), slot_range.start))?;
1421
1422        transaction_notifier_maybe = Some(
1423            service
1424                .get_transaction_notifier()
1425                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1426                .map_err(|e| (e, slot_range.start))?,
1427        );
1428
1429        entry_notifier_maybe = service.get_entry_notifier();
1430        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1431
1432        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1433    }
1434
1435    if entry_notifier_maybe.is_some() {
1436        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1437    } else {
1438        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1439    }
1440    log::info!(target: LOG_MODULE, "running on_load...");
1441    rt.spawn(on_load);
1442
1443    let slot_range = Arc::new(slot_range);
1444    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1445    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1446    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1447    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1448
1449    // divide slot_range into n subranges
1450    let subranges = generate_subranges(&slot_range, threads);
1451    if threads > 1 {
1452        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1453    }
1454
1455    let mut handles = Vec::new();
1456    // Shared per-thread error counters
1457    let error_counts: Arc<Vec<AtomicU32>> =
1458        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1459
1460    for (i, slot_range) in subranges.into_iter().enumerate() {
1461        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1462        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1463        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1464        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1465        let client = client.clone();
1466        let error_counts = error_counts.clone();
1467
1468        let rt_clone = rt.clone();
1469
1470        let handle = std::thread::spawn(move || {
1471            rt_clone.block_on(async {
1472                firehose_geyser_thread(
1473                    slot_range,
1474                    transaction_notifier_maybe,
1475                    entry_notifier_maybe,
1476                    block_meta_notifier_maybe,
1477                    confirmed_bank_sender,
1478                    &client,
1479                    if threads > 1 { Some(i) } else { None },
1480                    error_counts,
1481                )
1482                .await
1483                .unwrap();
1484            });
1485        });
1486        handles.push(handle);
1487    }
1488
1489    // Wait for all threads to complete
1490    for handle in handles {
1491        handle.join().unwrap();
1492    }
1493    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1494    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1495        block_meta_notifier.notify_block_metadata(
1496            u64::MAX,
1497            "unload",
1498            u64::MAX,
1499            "unload",
1500            &KeyedRewardsAndNumPartitions {
1501                keyed_rewards: vec![],
1502                num_partitions: None,
1503            },
1504            None,
1505            None,
1506            0,
1507            0,
1508        );
1509    }
1510    Ok(confirmed_bank_receiver)
1511}
1512
1513#[allow(clippy::too_many_arguments)]
1514#[allow(clippy::result_large_err)]
1515async fn firehose_geyser_thread(
1516    mut slot_range: Range<u64>,
1517    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1518    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1519    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1520    confirmed_bank_sender: Sender<SlotNotification>,
1521    client: &Client,
1522    thread_index: Option<usize>,
1523    error_counts: Arc<Vec<AtomicU32>>,
1524) -> Result<(), (FirehoseError, u64)> {
1525    let start_time = std::time::Instant::now();
1526    let log_target = if let Some(thread_index) = thread_index {
1527        format!("{}::T{:03}", LOG_MODULE, thread_index)
1528    } else {
1529        LOG_MODULE.to_string()
1530    };
1531    let mut skip_until_index = None;
1532    // let mut triggered = false;
1533    while let Err((err, slot)) = async {
1534            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1535            log::info!(
1536                target: &log_target,
1537                "slot range: {} (epoch {}) ... {} (epoch {})",
1538                slot_range.start,
1539                slot_to_epoch(slot_range.start),
1540                slot_range.end,
1541                slot_to_epoch(slot_range.end)
1542            );
1543
1544            log::info!(target: &log_target, "🚒 starting firehose...");
1545
1546            // for each epoch
1547            let mut current_slot: Option<u64> = None;
1548            let mut previous_slot: Option<u64> = None;
1549            for epoch_num in epoch_range.clone() {
1550                log::info!(target: &log_target, "entering epoch {}", epoch_num);
1551                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1552                    Ok(stream) => stream,
1553                    Err(_) => {
1554                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1555                    }
1556                };
1557                let mut reader = NodeReader::new(stream);
1558
1559                let header_fut = reader.read_raw_header();
1560                let header = match timeout(OP_TIMEOUT, header_fut).await {
1561                    Ok(res) => res
1562                        .map_err(FirehoseError::ReadHeader)
1563                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1564                    Err(_) => {
1565                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1566                    }
1567                };
1568                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1569
1570                let mut todo_previous_blockhash = Hash::default();
1571                let mut todo_latest_entry_blockhash = Hash::default();
1572
1573                if slot_range.start > epoch_to_slot_range(epoch_num).0 {
1574                    // Seek to the slot immediately preceding the requested range so the reader
1575                    // captures the full node set (transactions, entries, rewards) for the target
1576                    // block on the next iteration.
1577                    let seek_slot = slot_range.start.saturating_sub(1);
1578                    let seek_fut = reader.seek_to_slot(seek_slot);
1579                    match timeout(OP_TIMEOUT, seek_fut).await {
1580                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1581                        Err(_) => {
1582                            return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1583                        }
1584                    }
1585                }
1586
1587                // for each item in each block
1588                let mut item_index = 0;
1589                let mut displayed_skip_message = false;
1590                loop {
1591                    let read_fut = reader.read_until_block();
1592                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1593                        Ok(result) => result
1594                            .map_err(FirehoseError::ReadUntilBlockError)
1595                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1596                        Err(_) => {
1597                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1598                            return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.unwrap_or(slot_range.start)));
1599                        }
1600                    };
1601                    if nodes.is_empty() {
1602                        log::info!(
1603                            target: &log_target,
1604                            "reached end of epoch {}",
1605                            epoch_num
1606                        );
1607                        break;
1608                    }
1609                    // ignore epoch and subset nodes at end of car file loop { if
1610                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
1611                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
1612                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
1613                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
1614                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
1615                    //         log::info!(target: &log_target, "reached end of epoch {}",
1616                    //             epoch_num); break; }
1617                    if let Some(last_node) = nodes.0.last()
1618                        && !last_node.get_node().is_block() {
1619                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1620                            break;
1621                        }
1622                    let block = nodes
1623                        .get_block()
1624                        .map_err(FirehoseError::GetBlockError)
1625                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1626                    log::debug!(
1627                        target: &log_target,
1628                        "read {} items from epoch {}, now at slot {}",
1629                        item_index,
1630                        epoch_num,
1631                        block.slot
1632                    );
1633                    let slot = block.slot;
1634                    if slot >= slot_range.end {
1635                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1636                        // Return early to terminate the firehose thread cleanly. We use >=
1637                        // because slot_range is half-open [start, end), so any slot equal to
1638                        // end is out-of-range and must not be processed.
1639                        return Ok(());
1640                    }
1641                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1642                    if slot < slot_range.start {
1643                        if slot.saturating_add(1) == slot_range.start {
1644                            log::debug!(
1645                                target: &log_target,
1646                                "priming reader with preceding slot {}, skipping",
1647                                slot
1648                            );
1649                        } else {
1650                            log::warn!(
1651                                target: &log_target,
1652                                "encountered slot {} before start of range {}, skipping",
1653                                slot,
1654                                slot_range.start
1655                            );
1656                        }
1657                        continue;
1658                    }
1659                    if let Some(previous_slot) = previous_slot
1660                        && slot != previous_slot + 1 {
1661                            // log::warn!(target: &log_target, "non-consecutive slots: {}
1662                            // followed by {}", previous_slot, slot);
1663                        }
1664                    previous_slot = current_slot;
1665                    current_slot = Some(slot);
1666                    let mut entry_index: usize = 0;
1667                    let mut this_block_executed_transaction_count: u64 = 0;
1668                    let mut this_block_entry_count: u64 = 0;
1669                    let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1670
1671                    nodes.each(|node_with_cid| -> Result<(), Box<dyn std::error::Error>> {
1672                        item_index += 1;
1673                        // if item_index == 100000 && !triggered { log::info!("simulating
1674                        //     error"); triggered = true; return
1675                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
1676                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
1677                        //         "simulated error", )), ))); }
1678                        if let Some(skip) = skip_until_index {
1679                            if item_index < skip {
1680                                if !displayed_skip_message {
1681                                    log::info!(
1682                                        target: &log_target,
1683                                        "skipping until index {} (at {})",
1684                                        skip,
1685                                        item_index
1686                                    );
1687                                    displayed_skip_message = true;
1688                                }
1689                                return Ok(());
1690                            } else {
1691                                log::info!(
1692                                    target: &log_target,
1693                                    "reached target index {}, resuming...",
1694                                    skip
1695                                );
1696                                skip_until_index = None;
1697                            }
1698                        }
1699                        let node = node_with_cid.get_node();
1700
1701                        use crate::node::Node::*;
1702                        match node {
1703                            Transaction(tx) => {
1704                                let versioned_tx = tx.as_parsed()?;
1705                                let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1706
1707                                let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1708
1709                                let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1710                                    prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1711                                        Box::new(std::io::Error::other(
1712                                            std::format!("Error decoding metadata: {:?}", err),
1713                                        ))
1714                                    })?;
1715
1716                                let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1717                                    metadata.try_into()?;
1718
1719                                let message_hash = {
1720                                    #[cfg(feature = "verify-transaction-signatures")]
1721                                    {
1722                                        versioned_tx.verify_and_hash_message()?
1723                                    }
1724                                    #[cfg(not(feature = "verify-transaction-signatures"))]
1725                                    {
1726                                        // Signature verification is optional because it is
1727                                        // extremely expensive at replay scale.
1728                                        versioned_tx.message.hash()
1729                                    }
1730                                };
1731                                let signature = versioned_tx
1732                                    .signatures
1733                                    .first()
1734                                    .ok_or_else(|| {
1735                                        Box::new(std::io::Error::new(
1736                                            std::io::ErrorKind::InvalidData,
1737                                            "transaction missing signature",
1738                                        )) as Box<dyn std::error::Error>
1739                                    })?;
1740                                let is_vote = is_simple_vote_transaction(&versioned_tx);
1741
1742                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1743                                    transaction_notifier.notify_transaction(
1744                                        block.slot,
1745                                        tx.index.unwrap() as usize,
1746                                        signature,
1747                                        &message_hash,
1748                                        is_vote,
1749                                        &as_native_metadata,
1750                                        &versioned_tx,
1751                                    );
1752                                }
1753
1754                            }
1755                            Entry(entry) => {
1756                                let entry_hash = Hash::from(entry.hash.to_bytes());
1757                                let entry_transaction_count = entry.transactions.len();
1758                                let entry_transaction_count_u64 = entry_transaction_count as u64;
1759                                let starting_transaction_index =
1760                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1761                                        Box::new(std::io::Error::other(
1762                                            "transaction index exceeds usize range",
1763                                        )) as Box<dyn std::error::Error>
1764                                    })?;
1765                                todo_latest_entry_blockhash = entry_hash;
1766                                this_block_executed_transaction_count += entry_transaction_count_u64;
1767                                this_block_entry_count += 1;
1768                                if entry_notifier_maybe.is_none() {
1769                                    return Ok(());
1770                                }
1771                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1772                                let entry_summary = solana_entry::entry::EntrySummary {
1773                                    num_hashes: entry.num_hashes,
1774                                    hash: Hash::from(entry.hash.to_bytes()),
1775                                    num_transactions: entry_transaction_count_u64,
1776                                };
1777                                entry_notifier.notify_entry(
1778                                    block.slot,
1779                                    entry_index,
1780                                    &entry_summary,
1781                                    starting_transaction_index,
1782                                );
1783                                entry_index += 1;
1784                            }
1785                            Block(block) => {
1786                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1787                                confirmed_bank_sender.send(notification).unwrap();
1788
1789                                if block_meta_notifier_maybe.is_none() {
1790                                    return Ok(());
1791                                }
1792                                let keyed_rewards = std::mem::take(&mut this_block_rewards);
1793                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1794                                block_meta_notifier.notify_block_metadata(
1795                                    block.meta.parent_slot,
1796                                    todo_previous_blockhash.to_string().as_str(),
1797                                    block.slot,
1798                                    todo_latest_entry_blockhash.to_string().as_str(),
1799                                    &KeyedRewardsAndNumPartitions {
1800                                        keyed_rewards,
1801                                        num_partitions: None,
1802                                    },
1803                                    Some(block.meta.blocktime as i64),
1804                                    block.meta.block_height,
1805                                    this_block_executed_transaction_count,
1806                                    this_block_entry_count,
1807                                );
1808                                todo_previous_blockhash = todo_latest_entry_blockhash;
1809                                std::thread::yield_now();
1810                            }
1811                            Subset(_subset) => (),
1812                            Epoch(_epoch) => (),
1813                            Rewards(rewards) => {
1814                                if !rewards.is_complete() {
1815                                    let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1816                                    let decompressed = utils::decompress_zstd(reassembled)?;
1817                                    let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1818                                        Box::new(std::io::Error::other(
1819                                            std::format!("Error decoding rewards: {:?}", err),
1820                                        ))
1821                                    })?;
1822                                    this_block_rewards = convert_proto_rewards(&decoded)?;
1823                                }
1824                            }
1825                            DataFrame(_data_frame) => (),
1826                        }
1827                        Ok(())
1828                    })
1829                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1830                    if block.slot == slot_range.end - 1 {
1831                        let finish_time = std::time::Instant::now();
1832                        let elapsed = finish_time.duration_since(start_time);
1833                        log::info!(target: &log_target, "processed slot {}", block.slot);
1834                        let elapsed_pretty = human_readable_duration(elapsed);
1835                        log::info!(
1836                            target: &log_target,
1837                            "processed {} slots across {} epochs in {}.",
1838                            slot_range.end - slot_range.start,
1839                            slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1840                            elapsed_pretty
1841                        );
1842                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1843                        // On completion, report threads with non-zero error counts for
1844                        // visibility.
1845                        let summary: String = error_counts
1846                            .iter()
1847                            .enumerate()
1848                            .filter_map(|(i, c)| {
1849                                let v = c.load(Ordering::Relaxed);
1850                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1851                            })
1852                            .collect::<Vec<_>>()
1853                            .join(", ");
1854                        if !summary.is_empty() {
1855                            log::debug!(target: &log_target, "threads with errors: {}", summary);
1856                        }
1857                        return Ok(());
1858                    }
1859                }
1860            }
1861            Ok(())
1862}
1863.await
1864{
1865        if is_shutdown_error(&err) {
1866            log::info!(
1867                target: &log_target,
1868                "shutdown requested; terminating firehose thread {:?}",
1869                thread_index
1870            );
1871            return Ok(());
1872        }
1873        log::error!(
1874            target: &log_target,
1875            "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1876            slot,
1877            slot_to_epoch(slot)
1878            );
1879            log::error!(target: &log_target, "{}", err);
1880            let item_index = match err {
1881                FirehoseError::NodeDecodingError(item_index, _) => item_index,
1882                _ => 0,
1883            };
1884            // Increment this thread's error counter
1885            let idx = thread_index.unwrap_or(0);
1886            error_counts[idx].fetch_add(1, Ordering::Relaxed);
1887            log::warn!(
1888                target: &log_target,
1889                "restarting from slot {} at index {}",
1890                slot,
1891                item_index,
1892            );
1893            // Update slot range to resume from the failed slot, not the original start
1894            slot_range.start = slot;
1895            skip_until_index = Some(item_index);
1896    }
1897    Ok(())
1898}
1899
1900#[inline]
1901fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
1902    if !(1..=2).contains(&versioned_tx.signatures.len()) {
1903        return false;
1904    }
1905
1906    if !matches!(
1907        versioned_tx.version(),
1908        solana_transaction::versioned::TransactionVersion::Legacy(_)
1909    ) {
1910        return false;
1911    }
1912
1913    let instructions = versioned_tx.message.instructions();
1914    if instructions.len() != 1 {
1915        return false;
1916    }
1917
1918    let program_index = instructions[0].program_id_index as usize;
1919    versioned_tx
1920        .message
1921        .static_account_keys()
1922        .get(program_index)
1923        .map(|program_id| program_id == &vote_program_id())
1924        .unwrap_or(false)
1925}
1926
1927#[inline(always)]
1928fn convert_proto_rewards(
1929    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
1930) -> Result<Vec<(Address, RewardInfo)>, Box<dyn std::error::Error>> {
1931    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
1932    for proto_reward in proto_rewards.rewards.iter() {
1933        let reward = RewardInfo {
1934            reward_type: match proto_reward.reward_type - 1 {
1935                0 => RewardType::Fee,
1936                1 => RewardType::Rent,
1937                2 => RewardType::Staking,
1938                3 => RewardType::Voting,
1939                typ => {
1940                    return Err(Box::new(std::io::Error::other(format!(
1941                        "unsupported reward type {}",
1942                        typ
1943                    ))));
1944                }
1945            },
1946            lamports: proto_reward.lamports,
1947            post_balance: proto_reward.post_balance,
1948            commission: proto_reward.commission.parse::<u8>().ok(),
1949        };
1950        let pubkey = proto_reward
1951            .pubkey
1952            .parse::<Address>()
1953            .map_err(|err| Box::new(err) as Box<dyn std::error::Error>)?;
1954        keyed_rewards.push((pubkey, reward));
1955    }
1956    Ok(keyed_rewards)
1957}
1958
1959#[inline]
1960/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
1961pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
1962    let total = slot_range.end - slot_range.start;
1963    let slots_per_thread = total / threads;
1964    let remainder = total % threads;
1965
1966    let ranges: Vec<Range<u64>> = (0..threads)
1967        .map(|i| {
1968            // Distribute remainder slots to the first `remainder` threads
1969            let extra_slot = if i < remainder { 1 } else { 0 };
1970            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
1971            let end = start + slots_per_thread + extra_slot;
1972            start..end
1973        })
1974        .collect();
1975
1976    // Verify that ranges cover all slots exactly
1977    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1978    assert_eq!(
1979        total_covered, total,
1980        "Range generation failed: {} threads should cover {} slots but only cover {}",
1981        threads, total, total_covered
1982    );
1983
1984    // Verify no gaps between ranges
1985    for i in 1..ranges.len() {
1986        assert_eq!(
1987            ranges[i - 1].end,
1988            ranges[i].start,
1989            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
1990            i - 1,
1991            ranges[i - 1].end,
1992            i,
1993            ranges[i].start
1994        );
1995    }
1996
1997    log::info!(
1998        target: LOG_MODULE,
1999        "Generated {} thread ranges covering {} slots total",
2000        threads,
2001        total_covered
2002    );
2003    ranges
2004}
2005
2006fn human_readable_duration(duration: std::time::Duration) -> String {
2007    if duration.is_zero() {
2008        return "0s".into();
2009    }
2010    let total_secs = duration.as_secs();
2011    if total_secs < 60 {
2012        let secs_f = duration.as_secs_f64();
2013        if total_secs == 0 {
2014            format!("{:.2}s", secs_f)
2015        } else if duration.subsec_millis() == 0 {
2016            format!("{}s", total_secs)
2017        } else {
2018            format!("{:.2}s", secs_f)
2019        }
2020    } else {
2021        let mut secs = total_secs;
2022        let days = secs / 86_400;
2023        secs %= 86_400;
2024        let hours = secs / 3_600;
2025        secs %= 3_600;
2026        let minutes = secs / 60;
2027        secs %= 60;
2028        if days > 0 {
2029            if hours > 0 {
2030                format!("{days}d{hours}h")
2031            } else {
2032                format!("{days}d")
2033            }
2034        } else if hours > 0 {
2035            if minutes > 0 {
2036                format!("{hours}h{minutes}m")
2037            } else {
2038                format!("{hours}h")
2039            }
2040        } else if minutes > 0 {
2041            if secs > 0 {
2042                format!("{minutes}m{secs}s")
2043            } else {
2044                format!("{minutes}m")
2045            }
2046        } else {
2047            format!("{secs}s")
2048        }
2049    }
2050}
2051
2052#[cfg(test)]
2053fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2054    Box::pin(async move {
2055        let elapsed = stats.start_time.elapsed();
2056        let elapsed_secs = elapsed.as_secs_f64();
2057        let tps = if elapsed_secs > 0.0 {
2058            stats.transactions_processed as f64 / elapsed_secs
2059        } else {
2060            0.0
2061        };
2062        log::info!(
2063            target: LOG_MODULE,
2064            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2065            stats.thread_stats.current_slot,
2066            stats.slots_processed,
2067            stats.blocks_processed,
2068            stats.transactions_processed,
2069            stats.entries_processed,
2070            stats.rewards_processed,
2071            elapsed_secs,
2072            tps
2073        );
2074        Ok(())
2075    })
2076}
2077
2078#[tokio::test(flavor = "multi_thread")]
2079async fn test_firehose_epoch_800() {
2080    use std::sync::atomic::{AtomicU64, Ordering};
2081    solana_logger::setup_with_default("info");
2082    const THREADS: usize = 4;
2083    const NUM_SLOTS_TO_COVER: u64 = 50;
2084    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2085    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2086    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2087    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2088    let stats_tracking = StatsTracking {
2089        on_stats: log_stats_handler,
2090        tracking_interval_slots: 10,
2091    };
2092
2093    for prev in PREV_BLOCK.iter() {
2094        prev.store(0, Ordering::Relaxed);
2095    }
2096    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2097    NUM_BLOCKS.store(0, Ordering::Relaxed);
2098    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2099
2100    firehose(
2101        THREADS.try_into().unwrap(),
2102        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2103        Some(|thread_id: usize, block: BlockData| {
2104            async move {
2105                let prev =
2106                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2107                if block.was_skipped() {
2108                    log::info!(
2109                        target: LOG_MODULE,
2110                        "leader skipped block {} on thread {}",
2111                        block.slot(),
2112                        thread_id,
2113                    );
2114                } else {
2115                    /*log::info!(
2116                        target: LOG_MODULE,
2117                        "got block {} on thread {}",
2118                        block.slot(),
2119                        thread_id,
2120                    );*/
2121                }
2122
2123                if prev > 0 {
2124                    assert_eq!(prev + 1, block.slot());
2125                }
2126                if block.was_skipped() {
2127                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2128                } else {
2129                    NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2130                    if let BlockData::Block {
2131                        executed_transaction_count,
2132                        ..
2133                    } = &block
2134                    {
2135                        let executed = *executed_transaction_count;
2136                        let _ = MIN_TRANSACTIONS.fetch_update(
2137                            Ordering::Relaxed,
2138                            Ordering::Relaxed,
2139                            |current| {
2140                                if executed < current {
2141                                    Some(executed)
2142                                } else {
2143                                    None
2144                                }
2145                            },
2146                        );
2147                    }
2148                }
2149                Ok(())
2150            }
2151            .boxed()
2152        }),
2153        None::<OnTxFn>,
2154        None::<OnEntryFn>,
2155        None::<OnRewardFn>,
2156        Some(stats_tracking),
2157        None,
2158    )
2159    .await
2160    .unwrap();
2161    assert_eq!(
2162        NUM_BLOCKS.load(Ordering::Relaxed) + NUM_SKIPPED_BLOCKS.load(Ordering::Relaxed),
2163        NUM_SLOTS_TO_COVER
2164    );
2165    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2166    let min_transactions = MIN_TRANSACTIONS.load(Ordering::Relaxed);
2167    assert!(
2168        min_transactions >= 10,
2169        "expected at least 10 transactions in every block, minimum observed {min_transactions}"
2170    );
2171}