jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7    block_metadata_notifier_interface::BlockMetadataNotifier,
8    geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14    optimistically_confirmed_bank_tracker::SlotNotification,
15    transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21    fmt::Display,
22    future::Future,
23    io,
24    ops::Range,
25    path::PathBuf,
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29    },
30};
31use thiserror::Error;
32use tokio::{
33    sync::broadcast::{self, error::TryRecvError},
34    time::timeout,
35};
36
37use crate::{
38    LOG_MODULE, SharedError,
39    epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
40    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
41    node_reader::NodeReader,
42    utils,
43};
44
45// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
46// header, seeking, reading next block). Adjust here to tune stall detection/restart
47// aggressiveness.
48const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49// Epochs earlier than this were bincode-encoded in Old Faithful.
50const BINCODE_EPOCH_CUTOFF: u64 = 157;
51
52fn poll_shutdown(
53    flag: &Arc<std::sync::atomic::AtomicBool>,
54    receiver: &mut Option<broadcast::Receiver<()>>,
55) -> bool {
56    if let Some(rx) = receiver {
57        match rx.try_recv() {
58            Ok(_) | Err(TryRecvError::Lagged(_)) => {
59                flag.store(true, Ordering::SeqCst);
60            }
61            Err(TryRecvError::Closed) => {
62                flag.store(true, Ordering::SeqCst);
63            }
64            Err(TryRecvError::Empty) => {}
65        }
66    }
67    flag.load(Ordering::SeqCst)
68}
69
70fn is_shutdown_error(err: &FirehoseError) -> bool {
71    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
72        inner
73            .downcast_ref::<io::Error>()
74            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
75            .unwrap_or(false)
76    }
77
78    match err {
79        FirehoseError::BlockHandlerError(inner)
80        | FirehoseError::TransactionHandlerError(inner)
81        | FirehoseError::EntryHandlerError(inner)
82        | FirehoseError::RewardHandlerError(inner)
83        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
84        _ => false,
85    }
86}
87
88/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
89/// the firehose.
90#[derive(Debug, Error)]
91pub enum FirehoseError {
92    /// HTTP client error surfaced from `reqwest`.
93    Reqwest(reqwest::Error),
94    /// Failure while reading the Old Faithful CAR header.
95    ReadHeader(SharedError),
96    /// Error emitted by the Solana Geyser plugin service.
97    GeyserPluginService(GeyserPluginServiceError),
98    /// Transaction notifier could not be acquired from the Geyser service.
99    FailedToGetTransactionNotifier,
100    /// Failure while reading data until the next block boundary.
101    ReadUntilBlockError(SharedError),
102    /// Failure while fetching an individual block.
103    GetBlockError(SharedError),
104    /// Failed to decode a node at the given index.
105    NodeDecodingError(usize, SharedError),
106    /// Error surfaced when querying the slot offset index.
107    SlotOffsetIndexError(SlotOffsetIndexError),
108    /// Failure while seeking to a slot within the Old Faithful CAR stream.
109    SeekToSlotError(SharedError),
110    /// Error surfaced during the plugin `on_load` stage.
111    OnLoadError(SharedError),
112    /// Error emitted while invoking the stats handler.
113    OnStatsHandlerError(SharedError),
114    /// Timeout reached while waiting for a firehose operation.
115    OperationTimeout(&'static str),
116    /// Transaction handler returned an error.
117    TransactionHandlerError(SharedError),
118    /// Entry handler returned an error.
119    EntryHandlerError(SharedError),
120    /// Reward handler returned an error.
121    RewardHandlerError(SharedError),
122    /// Block handler returned an error.
123    BlockHandlerError(SharedError),
124}
125
126unsafe impl Send for FirehoseError {}
127unsafe impl Sync for FirehoseError {}
128
129impl Display for FirehoseError {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        match self {
132            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
133            FirehoseError::ReadHeader(error) => {
134                write!(f, "Error reading header: {}", error)
135            }
136            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
137                f,
138                "Error initializing geyser plugin service: {}",
139                geyser_plugin_service_error
140            ),
141            FirehoseError::FailedToGetTransactionNotifier => write!(
142                f,
143                "Failed to get transaction notifier from GeyserPluginService"
144            ),
145            FirehoseError::ReadUntilBlockError(error) => {
146                write!(f, "Error reading until block: {}", error)
147            }
148            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
149            FirehoseError::NodeDecodingError(item_index, error) => {
150                write!(
151                    f,
152                    "Error seeking, reading data from, or decoding data for data node {}: {}",
153                    item_index, error
154                )
155            }
156            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
157                f,
158                "Error getting info from slot offset index: {}",
159                slot_offset_index_error
160            ),
161            FirehoseError::SeekToSlotError(error) => {
162                write!(f, "Error seeking to slot: {}", error)
163            }
164            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
165            FirehoseError::OnStatsHandlerError(error) => {
166                write!(f, "Stats handler error: {}", error)
167            }
168            FirehoseError::OperationTimeout(op) => {
169                write!(f, "Timeout while waiting for operation: {}", op)
170            }
171            FirehoseError::TransactionHandlerError(error) => {
172                write!(f, "Transaction handler error: {}", error)
173            }
174            FirehoseError::EntryHandlerError(error) => {
175                write!(f, "Entry handler error: {}", error)
176            }
177            FirehoseError::RewardHandlerError(error) => {
178                write!(f, "Reward handler error: {}", error)
179            }
180            FirehoseError::BlockHandlerError(error) => {
181                write!(f, "Block handler error: {}", error)
182            }
183        }
184    }
185}
186
187impl From<reqwest::Error> for FirehoseError {
188    fn from(e: reqwest::Error) -> Self {
189        FirehoseError::Reqwest(e)
190    }
191}
192
193impl From<GeyserPluginServiceError> for FirehoseError {
194    fn from(e: GeyserPluginServiceError) -> Self {
195        FirehoseError::GeyserPluginService(e)
196    }
197}
198
199impl From<SlotOffsetIndexError> for FirehoseError {
200    fn from(e: SlotOffsetIndexError) -> Self {
201        FirehoseError::SlotOffsetIndexError(e)
202    }
203}
204
205/// Per-thread progress information emitted by the firehose runner.
206#[derive(Clone, PartialEq, Eq, Hash, Debug)]
207pub struct ThreadStats {
208    /// Identifier of the worker thread reporting the stats.
209    pub thread_id: usize,
210    /// Timestamp captured when the thread began processing.
211    pub start_time: std::time::Instant,
212    /// Timestamp captured when the thread finished, if finished.
213    pub finish_time: Option<std::time::Instant>,
214    /// Slot range currently assigned to the thread (half-open, may shrink on restart).
215    pub slot_range: Range<u64>,
216    /// Original slot range assigned to the thread (half-open, never modified).
217    pub initial_slot_range: Range<u64>,
218    /// Latest slot processed by the thread.
219    pub current_slot: u64,
220    /// Total slots processed by the thread.
221    pub slots_processed: u64,
222    /// Number of blocks successfully processed.
223    pub blocks_processed: u64,
224    /// Number of slots skipped by the cluster leader.
225    pub leader_skipped_slots: u64,
226    /// Total transactions processed.
227    pub transactions_processed: u64,
228    /// Total entries processed.
229    pub entries_processed: u64,
230    /// Total rewards processed.
231    pub rewards_processed: u64,
232}
233
234/// Aggregated firehose statistics covering all worker threads.
235#[derive(Clone, PartialEq, Eq, Hash, Debug)]
236pub struct Stats {
237    /// Per-thread statistics for the current update.
238    pub thread_stats: ThreadStats,
239    /// Timestamp captured when processing began.
240    pub start_time: std::time::Instant,
241    /// Timestamp captured when all processing finished, if finished.
242    pub finish_time: Option<std::time::Instant>,
243    /// Slot range currently being processed (half-open [start, end)).
244    pub slot_range: Range<u64>,
245    /// Aggregate slots processed across all threads.
246    pub slots_processed: u64,
247    /// Aggregate blocks processed across all threads.
248    pub blocks_processed: u64,
249    /// Aggregate skipped slots across all threads.
250    pub leader_skipped_slots: u64,
251    /// Aggregate transactions processed across all threads.
252    pub transactions_processed: u64,
253    /// Aggregate entries processed across all threads.
254    pub entries_processed: u64,
255    /// Aggregate rewards processed across all threads.
256    pub rewards_processed: u64,
257    /// Transactions processed since the previous stats pulse.
258    pub transactions_since_last_pulse: u64,
259    /// Blocks processed since the previous stats pulse.
260    pub blocks_since_last_pulse: u64,
261    /// Slots processed since the previous stats pulse.
262    pub slots_since_last_pulse: u64,
263    /// Elapsed time since the previous stats pulse.
264    pub time_since_last_pulse: std::time::Duration,
265}
266
267/// Configuration for periodic stats emission via a [`Handler`] callback.
268#[derive(Clone, PartialEq, Eq, Hash, Debug)]
269pub struct StatsTracking<OnStats: Handler<Stats>> {
270    /// Callback invoked whenever new stats are available.
271    pub on_stats: OnStats,
272    /// Emits a stats callback when the current slot is a multiple of this interval.
273    pub tracking_interval_slots: u64,
274}
275
276#[inline(always)]
277#[allow(clippy::too_many_arguments)]
278async fn maybe_emit_stats<OnStats: Handler<Stats>>(
279    stats_tracking: Option<&StatsTracking<OnStats>>,
280    thread_index: usize,
281    thread_stats: &ThreadStats,
282    overall_slots_processed: &AtomicU64,
283    overall_blocks_processed: &AtomicU64,
284    overall_transactions_processed: &AtomicU64,
285    overall_entries_processed: &AtomicU64,
286    transactions_since_stats: &AtomicU64,
287    blocks_since_stats: &AtomicU64,
288    slots_since_stats: &AtomicU64,
289    last_pulse: &Arc<AtomicU64>,
290    base_instant: std::time::Instant,
291) -> Result<(), (FirehoseError, u64)> {
292    if let Some(stats_tracker) = stats_tracking {
293        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
294        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
295        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
296        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
297        let now_nanos = base_instant.elapsed().as_nanos() as u64;
298        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
299        let delta_nanos = now_nanos.saturating_sub(previous);
300        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
301        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
302        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
303        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
304
305        let stats = Stats {
306            thread_stats: thread_stats.clone(),
307            start_time: thread_stats.start_time,
308            finish_time: thread_stats.finish_time,
309            slot_range: thread_stats.slot_range.clone(),
310            slots_processed: total_slots,
311            blocks_processed: total_blocks,
312            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
313            transactions_processed: total_transactions,
314            entries_processed: total_entries,
315            rewards_processed: thread_stats.rewards_processed,
316            transactions_since_last_pulse: processed_transactions,
317            blocks_since_last_pulse: processed_blocks,
318            slots_since_last_pulse: processed_slots,
319            time_since_last_pulse,
320        };
321
322        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
323            last_pulse.store(previous, Ordering::Relaxed);
324            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
325            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
326            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
327            return Err((
328                FirehoseError::OnStatsHandlerError(e),
329                thread_stats.current_slot,
330            ));
331        }
332    }
333    Ok(())
334}
335
336#[inline(always)]
337fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
338    if tracking_enabled {
339        atomic.fetch_add(value, Ordering::Relaxed);
340    }
341}
342
343fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
344    map.get(&thread_id)
345        .map(|set| set.remove(&slot).is_some())
346        .unwrap_or(false)
347}
348
349fn decode_transaction_status_meta_from_frame(
350    slot: u64,
351    reassembled_metadata: Vec<u8>,
352) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
353    if reassembled_metadata.is_empty() {
354        // Early epochs often omit metadata entirely.
355        return Ok(solana_transaction_status::TransactionStatusMeta::default());
356    }
357
358    match utils::decompress_zstd(reassembled_metadata.clone()) {
359        Ok(decompressed) => {
360            decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
361                Box::new(std::io::Error::other(format!(
362                    "decode transaction metadata (slot {slot}): {err}"
363                ))) as SharedError
364            })
365        }
366        Err(decomp_err) => {
367            // If the frame was not zstd-compressed (common for very early data), try to
368            // decode the raw bytes directly before bailing.
369            decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
370                Box::new(std::io::Error::other(format!(
371                    "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
372                ))) as SharedError
373            })
374        }
375    }
376}
377
378fn decode_transaction_status_meta(
379    slot: u64,
380    metadata_bytes: &[u8],
381) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
382    let epoch = slot_to_epoch(slot);
383    let mut bincode_err: Option<String> = None;
384    if epoch < BINCODE_EPOCH_CUTOFF {
385        match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
386            metadata_bytes,
387        ) {
388            Ok(stored) => return Ok(stored.into()),
389            Err(err) => {
390                bincode_err = Some(err.to_string());
391            }
392        }
393    }
394
395    let bin_err_for_proto = bincode_err.clone();
396    let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
397        prost_011::Message::decode(metadata_bytes).map_err(|err| {
398            // If we already tried bincode, surface both failures for easier debugging.
399            if let Some(ref bin_err) = bin_err_for_proto {
400                Box::new(std::io::Error::other(format!(
401                    "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
402                ))) as SharedError
403            } else {
404                Box::new(std::io::Error::other(format!(
405                    "protobuf decode transaction metadata: {err}"
406                ))) as SharedError
407            }
408        })?;
409
410    proto.try_into().map_err(|err| {
411        if let Some(ref bin_err) = bincode_err {
412            Box::new(std::io::Error::other(format!(
413                "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
414            ))) as SharedError
415        } else {
416            Box::new(std::io::Error::other(format!(
417                "convert transaction metadata proto: {err}"
418            ))) as SharedError
419        }
420    })
421}
422
423#[cfg(test)]
424mod metadata_decode_tests {
425    use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
426    use solana_message::v0::LoadedAddresses;
427    use solana_storage_proto::StoredTransactionStatusMeta;
428    use solana_transaction_status::TransactionStatusMeta;
429
430    fn sample_meta() -> TransactionStatusMeta {
431        let mut meta = TransactionStatusMeta::default();
432        meta.fee = 42;
433        meta.pre_balances = vec![1, 2];
434        meta.post_balances = vec![3, 4];
435        meta.log_messages = Some(vec!["hello".into()]);
436        meta.pre_token_balances = Some(Vec::new());
437        meta.post_token_balances = Some(Vec::new());
438        meta.rewards = Some(Vec::new());
439        meta.compute_units_consumed = Some(7);
440        meta.cost_units = Some(9);
441        meta.loaded_addresses = LoadedAddresses::default();
442        meta
443    }
444
445    #[test]
446    fn decodes_bincode_metadata_for_early_epochs() {
447        let stored = StoredTransactionStatusMeta {
448            status: Ok(()),
449            fee: 42,
450            pre_balances: vec![1, 2],
451            post_balances: vec![3, 4],
452            inner_instructions: None,
453            log_messages: Some(vec!["hello".into()]),
454            pre_token_balances: Some(Vec::new()),
455            post_token_balances: Some(Vec::new()),
456            rewards: Some(Vec::new()),
457            return_data: None,
458            compute_units_consumed: Some(7),
459            cost_units: Some(9),
460        };
461        let bytes = bincode::serialize(&stored).expect("bincode serialize");
462        let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
463        assert_eq!(decoded, TransactionStatusMeta::from(stored));
464    }
465
466    #[test]
467    fn decodes_protobuf_metadata_for_later_epochs() {
468        let meta = sample_meta();
469        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
470            meta.clone().into();
471        let bytes = prost_011::Message::encode_to_vec(&generated);
472        let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
473        assert_eq!(decoded, meta);
474    }
475
476    #[test]
477    fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
478        let meta = sample_meta();
479        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
480            meta.clone().into();
481        let bytes = prost_011::Message::encode_to_vec(&generated);
482        // Epoch 100 should try bincode first; if those bytes are proto, we must fall back.
483        let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
484        assert_eq!(decoded, meta);
485    }
486
487    #[test]
488    fn empty_frame_decodes_to_default() {
489        let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
490        assert_eq!(decoded, TransactionStatusMeta::default());
491    }
492
493    #[test]
494    fn raw_bincode_frame_without_zstd_still_decodes() {
495        let stored = StoredTransactionStatusMeta {
496            status: Ok(()),
497            fee: 1,
498            pre_balances: vec![],
499            post_balances: vec![],
500            inner_instructions: None,
501            log_messages: None,
502            pre_token_balances: Some(Vec::new()),
503            post_token_balances: Some(Vec::new()),
504            rewards: Some(Vec::new()),
505            return_data: None,
506            compute_units_consumed: None,
507            cost_units: None,
508        };
509        let raw_bytes = bincode::serialize(&stored).expect("serialize");
510        let decoded =
511            decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
512        assert_eq!(decoded, TransactionStatusMeta::from(stored));
513    }
514}
515
516/// Firehose transaction payload passed to [`Handler`] callbacks.
517#[derive(Debug, Clone)]
518pub struct TransactionData {
519    /// Slot that contains the transaction.
520    pub slot: u64,
521    /// Index of the transaction within the slot.
522    pub transaction_slot_index: usize,
523    /// Transaction signature.
524    pub signature: solana_signature::Signature,
525    /// Hash of the transaction message.
526    pub message_hash: Hash,
527    /// Indicates whether the transaction is a vote.
528    pub is_vote: bool,
529    /// Status metadata returned by the Solana runtime.
530    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
531    /// Fully decoded transaction.
532    pub transaction: VersionedTransaction,
533}
534
535/// Block entry metadata passed to [`Handler`] callbacks.
536#[derive(Debug, Clone)]
537pub struct EntryData {
538    /// Slot that generated the entry.
539    pub slot: u64,
540    /// Index of the entry within the slot.
541    pub entry_index: usize,
542    /// Range of transaction indexes covered by the entry.
543    pub transaction_indexes: Range<usize>,
544    /// Number of hashes associated with the entry.
545    pub num_hashes: u64,
546    /// Entry hash.
547    pub hash: Hash,
548}
549
550/// Reward data conveyed to reward [`Handler`] callbacks.
551#[derive(Debug, Clone)]
552pub struct RewardsData {
553    /// Slot the rewards correspond to.
554    pub slot: u64,
555    /// Reward recipients and their associated reward information.
556    pub rewards: Vec<(Address, RewardInfo)>,
557}
558
559/// Block-level data streamed to block handlers.
560#[derive(Debug)]
561pub enum BlockData {
562    /// Fully populated block payload with ledger metadata.
563    Block {
564        /// Parent slot number.
565        parent_slot: u64,
566        /// Parent block hash.
567        parent_blockhash: Hash,
568        /// Current block slot.
569        slot: u64,
570        /// Current block hash.
571        blockhash: Hash,
572        /// Rewards keyed by account and partition information.
573        rewards: KeyedRewardsAndNumPartitions,
574        /// Optional Unix timestamp for the block.
575        block_time: Option<i64>,
576        /// Optional ledger block height.
577        block_height: Option<u64>,
578        /// Number of executed transactions in the block.
579        executed_transaction_count: u64,
580        /// Number of entries contained in the block.
581        entry_count: u64,
582    },
583    /// Marker indicating the slot appears skipped (either truly skipped or it is late and will
584    /// arrive out of order).
585    PossibleLeaderSkipped {
586        /// Slot number that either lacked a block or may still arrive later.
587        slot: u64,
588    },
589}
590
591impl BlockData {
592    /// Returns the slot associated with this block or skipped slot.
593    #[inline(always)]
594    pub const fn slot(&self) -> u64 {
595        match self {
596            BlockData::Block { slot, .. } => *slot,
597            BlockData::PossibleLeaderSkipped { slot } => *slot,
598        }
599    }
600
601    /// Returns `true` if this record currently represents a missing/possibly skipped slot.
602    #[inline(always)]
603    pub const fn was_skipped(&self) -> bool {
604        matches!(self, BlockData::PossibleLeaderSkipped { .. })
605    }
606
607    /// Returns the optional block time when available.
608    #[inline(always)]
609    pub const fn block_time(&self) -> Option<i64> {
610        match self {
611            BlockData::Block { block_time, .. } => *block_time,
612            BlockData::PossibleLeaderSkipped { .. } => None,
613        }
614    }
615}
616
617type HandlerResult = Result<(), SharedError>;
618type HandlerFuture = BoxFuture<'static, HandlerResult>;
619
620/// Asynchronous callback invoked for each firehose event of type `Data`.
621pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
622
623impl<Data, F> Handler<Data> for F where
624    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
625{
626}
627
628/// Function pointer alias for [`Handler`] callbacks.
629pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
630/// Convenience alias for block handlers accepted by [`firehose`].
631pub type OnBlockFn = HandlerFn<BlockData>;
632/// Convenience alias for transaction handlers accepted by [`firehose`].
633pub type OnTxFn = HandlerFn<TransactionData>;
634/// Convenience alias for entry handlers accepted by [`firehose`].
635pub type OnEntryFn = HandlerFn<EntryData>;
636/// Convenience alias for reward handlers accepted by [`firehose`].
637pub type OnRewardFn = HandlerFn<RewardsData>;
638/// Type alias for [`StatsTracking`] using simple function pointers.
639pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
640/// Convenience alias for firehose error handlers.
641pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
642/// Convenience alias for stats tracking handlers accepted by [`firehose`].
643pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
644
645/// Metadata describing a firehose worker failure.
646#[derive(Clone, Debug)]
647pub struct FirehoseErrorContext {
648    /// Thread index that encountered the error.
649    pub thread_id: usize,
650    /// Slot the worker was processing when the error surfaced.
651    pub slot: u64,
652    /// Epoch derived from the failing slot.
653    pub epoch: u64,
654    /// Stringified error payload for display/logging.
655    pub error_message: String,
656}
657
658/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
659///
660/// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the
661/// runner restarts from the last processed slot to maintain coverage.
662#[inline]
663#[allow(clippy::too_many_arguments)]
664pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
665    threads: u64,
666    slot_range: Range<u64>,
667    on_block: Option<OnBlock>,
668    on_tx: Option<OnTransaction>,
669    on_entry: Option<OnEntry>,
670    on_rewards: Option<OnRewards>,
671    on_error: Option<OnError>,
672    stats_tracking: Option<StatsTracking<OnStats>>,
673    shutdown_signal: Option<broadcast::Receiver<()>>,
674) -> Result<(), (FirehoseError, u64)>
675where
676    OnBlock: Handler<BlockData>,
677    OnTransaction: Handler<TransactionData>,
678    OnEntry: Handler<EntryData>,
679    OnRewards: Handler<RewardsData>,
680    OnStats: Handler<Stats>,
681    OnError: Handler<FirehoseErrorContext>,
682{
683    if threads == 0 {
684        return Err((
685            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
686            slot_range.start,
687        ));
688    }
689    let client = crate::network::create_http_client();
690    log::info!(target: LOG_MODULE, "starting firehose...");
691    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
692
693    let slot_range = Arc::new(slot_range);
694
695    // divide slot_range into n subranges
696    let subranges = generate_subranges(&slot_range, threads);
697    if threads > 1 {
698        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
699    }
700
701    let firehose_start = std::time::Instant::now();
702    let shutdown_flag = Arc::new(AtomicBool::new(false));
703    if let Some(ref rx) = shutdown_signal {
704        let mut rx = rx.resubscribe();
705        let flag = shutdown_flag.clone();
706        tokio::spawn(async move {
707            if rx.recv().await.is_ok() {
708                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
709                flag.store(true, Ordering::SeqCst);
710            }
711        });
712    }
713    let mut handles = Vec::new();
714    // Shared per-thread error counters
715    let error_counts: Arc<Vec<AtomicU32>> =
716        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
717
718    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
719    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
720    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
721    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
722    let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
723
724    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
725        let error_counts = error_counts.clone();
726        let client = client.clone();
727        let on_block = on_block.clone();
728        let on_tx = on_tx.clone();
729        let on_entry = on_entry.clone();
730        let on_reward = on_rewards.clone();
731        let on_error = on_error.clone();
732        let overall_slots_processed = overall_slots_processed.clone();
733        let overall_blocks_processed = overall_blocks_processed.clone();
734        let overall_transactions_processed = overall_transactions_processed.clone();
735        let overall_entries_processed = overall_entries_processed.clone();
736        let stats_tracking = stats_tracking.clone();
737        let transactions_since_stats = Arc::new(AtomicU64::new(0));
738        let blocks_since_stats = Arc::new(AtomicU64::new(0));
739        let slots_since_stats = Arc::new(AtomicU64::new(0));
740        let last_pulse = Arc::new(AtomicU64::new(0));
741        let transactions_since_stats_cloned = transactions_since_stats.clone();
742        let blocks_since_stats_cloned = blocks_since_stats.clone();
743        let slots_since_stats_cloned = slots_since_stats.clone();
744        let last_pulse_cloned = last_pulse.clone();
745        let shutdown_flag = shutdown_flag.clone();
746        let pending_skipped_slots = pending_skipped_slots.clone();
747        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
748
749        let handle = tokio::spawn(async move {
750            let transactions_since_stats = transactions_since_stats_cloned;
751            let blocks_since_stats = blocks_since_stats_cloned;
752            let slots_since_stats = slots_since_stats_cloned;
753            let last_pulse = last_pulse_cloned;
754            let mut shutdown_rx = thread_shutdown_rx;
755            let start_time = firehose_start;
756            last_pulse.store(
757                firehose_start.elapsed().as_nanos() as u64,
758                Ordering::Relaxed,
759            );
760            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
761            let mut skip_until_index = None;
762            let last_emitted_slot = slot_range.start.saturating_sub(1);
763            let block_enabled = on_block.is_some();
764            let tx_enabled = on_tx.is_some();
765            let entry_enabled = on_entry.is_some();
766            let reward_enabled = on_reward.is_some();
767            let tracking_enabled = stats_tracking.is_some();
768            if block_enabled {
769                pending_skipped_slots.entry(thread_index).or_default();
770            }
771            let mut last_counted_slot = slot_range.start.saturating_sub(1);
772            let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
773            let mut thread_stats = if tracking_enabled {
774                Some(ThreadStats {
775                    thread_id: thread_index,
776                    start_time,
777                    finish_time: None,
778                    slot_range: slot_range.clone(),
779                    initial_slot_range: slot_range.clone(),
780                    current_slot: slot_range.start,
781                    slots_processed: 0,
782                    blocks_processed: 0,
783                    leader_skipped_slots: 0,
784                    transactions_processed: 0,
785                    entries_processed: 0,
786                    rewards_processed: 0,
787                })
788            } else {
789                None
790            };
791
792            // let mut triggered = false;
793            while let Err((err, slot)) = async {
794                let mut last_emitted_slot = last_emitted_slot_global;
795                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
796                    log::info!(
797                        target: &log_target,
798                        "shutdown requested; terminating firehose thread {}",
799                        thread_index
800                    );
801                    return Ok(());
802                }
803                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
804                log::info!(
805                    target: &log_target,
806                    "slot range: {} (epoch {}) ... {} (epoch {})",
807                    slot_range.start,
808                    slot_to_epoch(slot_range.start),
809                    slot_range.end,
810                    slot_to_epoch(slot_range.end)
811                );
812
813                log::info!(target: &log_target, "🚒 starting firehose...");
814
815                // for each epoch
816                let mut current_slot: Option<u64> = None;
817                for epoch_num in epoch_range.clone() {
818                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
819                        log::info!(
820                            target: &log_target,
821                            "shutdown requested; terminating firehose thread {}",
822                            thread_index
823                        );
824                        return Ok(());
825                    }
826                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
827                    let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
828                        Ok(stream) => stream,
829                        Err(_) => {
830                            return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
831                        }
832                    };
833                    let mut reader = NodeReader::new(stream);
834
835                    let header_fut = reader.read_raw_header();
836                    let header = match timeout(OP_TIMEOUT, header_fut).await {
837                        Ok(res) => res
838                            .map_err(FirehoseError::ReadHeader)
839                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
840                        Err(_) => {
841                            return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
842                        }
843                    };
844                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
845
846                    let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
847                    let local_start = std::cmp::max(slot_range.start, epoch_start);
848                    let local_end_inclusive =
849                        std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
850                    if local_start > local_end_inclusive {
851                        log::debug!(
852                            target: &log_target,
853                            "epoch {} has no overlap with thread range ({}..{}), skipping",
854                            epoch_num,
855                            slot_range.start,
856                            slot_range.end
857                        );
858                        continue;
859                    }
860
861                    let mut previous_blockhash = Hash::default();
862                    let mut latest_entry_blockhash = Hash::default();
863                    // Reset counters to align to the local epoch slice; prevents boundary slots
864                    // from being treated as already-counted after a restart.
865                    last_counted_slot = local_start.saturating_sub(1);
866                    current_slot = None;
867                    if tracking_enabled
868                        && let Some(ref mut stats) = thread_stats {
869                            stats.current_slot = local_start;
870                            stats.slot_range.start = local_start;
871                        }
872
873                    if local_start > epoch_start {
874                        // Seek to the previous slot so the stream includes all nodes
875                        // (transactions, entries, rewards) that precede the block payload for
876                        // `local_start`.
877                        let seek_slot = local_start.saturating_sub(1);
878                        let seek_fut = reader.seek_to_slot(seek_slot);
879                        match timeout(OP_TIMEOUT, seek_fut).await {
880                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
881                            Err(_) => {
882                                return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
883                            }
884                        }
885                    }
886
887                    // for each item in each block
888                    let mut item_index = 0;
889                    let mut displayed_skip_message = false;
890                    loop {
891                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
892                            log::info!(
893                                target: &log_target,
894                                "shutdown requested; terminating firehose thread {}",
895                                thread_index
896                            );
897                            return Ok(());
898                        }
899                        let read_fut = reader.read_until_block();
900                        let nodes = match timeout(OP_TIMEOUT, read_fut).await {
901                            Ok(result) => result
902                                .map_err(FirehoseError::ReadUntilBlockError)
903                                .map_err(|e| {
904                                    (
905                                        e,
906                                        current_slot
907                                            .map(|slot| slot.saturating_add(1))
908                                            .unwrap_or(slot_range.start),
909                                    )
910                                })?,
911                            Err(_) => {
912                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
913                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
914                            }
915                        };
916                        if nodes.is_empty() {
917                            log::info!(
918                                target: &log_target,
919                                "reached end of epoch {}",
920                                epoch_num
921                            );
922                            break;
923                        }
924                        if let Some(last_node) = nodes.0.last()
925                            && !last_node.get_node().is_block()
926                        {
927                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
928                            break;
929                        }
930                        let block = nodes
931                            .get_block()
932                            .map_err(FirehoseError::GetBlockError)
933                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
934                        log::debug!(
935                            target: &log_target,
936                            "read {} items from epoch {}, now at slot {}",
937                            item_index,
938                            epoch_num,
939                            block.slot
940                        );
941                        let slot = block.slot;
942                        if slot > local_end_inclusive {
943                            log::debug!(
944                                target: &log_target,
945                                "reached end of local slice at slot {} (epoch {}), stopping",
946                                slot,
947                                epoch_num
948                            );
949                            break;
950                        }
951                        if slot >= slot_range.end {
952                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
953                            // Return early to terminate the firehose thread cleanly. We use >=
954                            // because slot_range is half-open [start, end), so any slot equal
955                            // to end is out-of-range and must not be processed. Do not emit
956                            // synthetic skipped slots here; another thread may own the boundary.
957                            if block_enabled {
958                                pending_skipped_slots.remove(&thread_index);
959                            }
960                            return Ok(());
961                        }
962                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
963                        if slot < slot_range.start {
964                            if slot.saturating_add(1) == slot_range.start {
965                                log::debug!(
966                                    target: &log_target,
967                                    "priming reader with preceding slot {}, skipping",
968                                    slot
969                                );
970                            } else {
971                                log::warn!(
972                                    target: &log_target,
973                                    "encountered slot {} before start of range {}, skipping",
974                                    slot,
975                                    slot_range.start
976                                );
977                            }
978                            continue;
979                        }
980                        current_slot = Some(slot);
981                        let mut entry_index: usize = 0;
982                        let mut this_block_executed_transaction_count: u64 = 0;
983                        let mut this_block_entry_count: u64 = 0;
984                        let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
985
986                        for node_with_cid in &nodes.0 {
987                            item_index += 1;
988                            if let Some(skip) = skip_until_index {
989                                if item_index < skip {
990                                    if !displayed_skip_message {
991                                        log::info!(
992                                            target: &log_target,
993                                            "skipping until index {} (at {})",
994                                            skip,
995                                            item_index
996                                        );
997                                        displayed_skip_message = true;
998                                    }
999                                    continue;
1000                                } else {
1001                                    log::info!(
1002                                        target: &log_target,
1003                                        "reached target index {}, resuming...",
1004                                        skip
1005                                    );
1006                                    skip_until_index = None;
1007                                }
1008                            }
1009                            let node = node_with_cid.get_node();
1010
1011                            if let Some(ref mut stats) = thread_stats {
1012                                stats.current_slot = slot;
1013                            }
1014
1015                            let error_slot = current_slot.unwrap_or(slot_range.start);
1016
1017                            use crate::node::Node::*;
1018                            match node {
1019                                Transaction(tx) => {
1020                                    if tx_enabled
1021                                        && let Some(on_tx_cb) = on_tx.as_ref()
1022                                    {
1023                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1024                                        let versioned_tx = tx.as_parsed().map_err(|err| {
1025                                            (
1026                                                FirehoseError::NodeDecodingError(item_index, err),
1027                                                error_slot,
1028                                            )
1029                                        })?;
1030                                        let reassembled_metadata = nodes
1031                                            .reassemble_dataframes(tx.metadata.clone())
1032                                            .map_err(|err| {
1033                                                (
1034                                                    FirehoseError::NodeDecodingError(item_index, err),
1035                                                    error_slot,
1036                                                )
1037                                            })?;
1038
1039                                        let as_native_metadata = decode_transaction_status_meta_from_frame(
1040                                            block.slot,
1041                                            reassembled_metadata,
1042                                        )
1043                                        .map_err(|err| {
1044                                            (
1045                                                FirehoseError::NodeDecodingError(item_index, err),
1046                                                error_slot,
1047                                            )
1048                                        })?;
1049
1050                                        let message_hash = {
1051                                            #[cfg(feature = "verify-transaction-signatures")]
1052                                            {
1053                                                versioned_tx.verify_and_hash_message().map_err(|err| {
1054                                                    (
1055                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
1056                                                        error_slot,
1057                                                    )
1058                                                })?
1059                                            }
1060                                            #[cfg(not(feature = "verify-transaction-signatures"))]
1061                                            {
1062                                                versioned_tx.message.hash()
1063                                            }
1064                                        };
1065                                        let signature = versioned_tx
1066                                            .signatures
1067                                            .first()
1068                                            .ok_or_else(|| {
1069                                                Box::new(std::io::Error::new(
1070                                                    std::io::ErrorKind::InvalidData,
1071                                                    "transaction missing signature",
1072                                                )) as SharedError
1073                                            })
1074                                            .map_err(|err| {
1075                                                (
1076                                                    FirehoseError::NodeDecodingError(
1077                                                        item_index,
1078                                                        err,
1079                                                    ),
1080                                                    error_slot,
1081                                                )
1082                                            })?;
1083                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
1084
1085                                        on_tx_cb(
1086                                            thread_index,
1087                                            TransactionData {
1088                                                slot: block.slot,
1089                                                transaction_slot_index: tx.index.unwrap() as usize,
1090                                                signature: *signature,
1091                                                message_hash,
1092                                                is_vote,
1093                                                transaction_status_meta: as_native_metadata.clone(),
1094                                                transaction: versioned_tx.clone(),
1095                                            },
1096                                        )
1097                                        .await
1098                                        .map_err(|e| {
1099                                            (
1100                                                FirehoseError::TransactionHandlerError(e),
1101                                                error_slot,
1102                                            )
1103                                        })?;
1104                                    }
1105                                    fetch_add_if(
1106                                        tracking_enabled,
1107                                        &overall_transactions_processed,
1108                                        1,
1109                                    );
1110                                    if let Some(ref mut stats) = thread_stats {
1111                                        stats.transactions_processed += 1;
1112                                    }
1113                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1114                                }
1115                                Entry(entry) => {
1116                                    let entry_hash = Hash::from(entry.hash.to_bytes());
1117                                    let entry_transaction_count = entry.transactions.len();
1118                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
1119                                    let starting_transaction_index_u64 =
1120                                        this_block_executed_transaction_count;
1121                                    latest_entry_blockhash = entry_hash;
1122                                    this_block_executed_transaction_count += entry_transaction_count_u64;
1123                                    this_block_entry_count += 1;
1124
1125                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1126                                        let starting_transaction_index = usize::try_from(
1127                                            starting_transaction_index_u64,
1128                                        )
1129                                        .map_err(|err| {
1130                                            (
1131                                                FirehoseError::EntryHandlerError(Box::new(err)),
1132                                                error_slot,
1133                                            )
1134                                        })?;
1135                                        let transaction_indexes_end =
1136                                            starting_transaction_index + entry_transaction_count;
1137                                        on_entry_cb(
1138                                            thread_index,
1139                                            EntryData {
1140                                                slot: block.slot,
1141                                                entry_index,
1142                                                transaction_indexes: starting_transaction_index
1143                                                    ..transaction_indexes_end,
1144                                                num_hashes: entry.num_hashes,
1145                                                hash: entry_hash,
1146                                            },
1147                                        )
1148                                        .await
1149                                        .map_err(|e| {
1150                                            (
1151                                                FirehoseError::EntryHandlerError(e),
1152                                                error_slot,
1153                                            )
1154                                        })?;
1155                                    }
1156                                    entry_index += 1;
1157                                    fetch_add_if(
1158                                        tracking_enabled,
1159                                        &overall_entries_processed,
1160                                        1,
1161                                    );
1162                                    if let Some(ref mut stats) = thread_stats {
1163                                        stats.entries_processed += 1;
1164                                    }
1165                                }
1166                                Block(block) => {
1167                                    let prev_last_counted_slot = last_counted_slot;
1168                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1169                                        (
1170                                            stats.slots_processed,
1171                                            stats.blocks_processed,
1172                                            stats.leader_skipped_slots,
1173                                            stats.current_slot,
1174                                        )
1175                                    });
1176
1177                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1178                                    let skip_start_from_previous = last_counted_slot.saturating_add(1);
1179                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
1180
1181                                    let skipped_epoch = slot_to_epoch(last_counted_slot);
1182                                    for skipped_slot in skip_start..slot {
1183                                        if slot_to_epoch(skipped_slot) != skipped_epoch {
1184                                            break;
1185                                        }
1186                                        log::debug!(
1187                                            target: &log_target,
1188                                            "leader skipped slot {} (prev_counted {}, current slot {})",
1189                                            skipped_slot,
1190                                            prev_last_counted_slot,
1191                                            slot,
1192                                        );
1193                                        if block_enabled {
1194                                            pending_skipped_slots
1195                                                .entry(thread_index)
1196                                                .or_default()
1197                                                .insert(skipped_slot);
1198                                        }
1199                                        if block_enabled
1200                                            && let Some(on_block_cb) = on_block.as_ref()
1201                                            && skipped_slot > last_emitted_slot {
1202                                                last_emitted_slot = skipped_slot;
1203                                                on_block_cb(
1204                                                    thread_index,
1205                                                    BlockData::PossibleLeaderSkipped {
1206                                                        slot: skipped_slot,
1207                                                    },
1208                                                )
1209                                                .await
1210                                                .map_err(|e| {
1211                                                    (
1212                                                        FirehoseError::BlockHandlerError(e),
1213                                                        error_slot,
1214                                                    )
1215                                                })?;
1216                                            }
1217                                        if tracking_enabled {
1218                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1219                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1220                                            if let Some(ref mut stats) = thread_stats {
1221                                                stats.leader_skipped_slots += 1;
1222                                                stats.slots_processed += 1;
1223                                                stats.current_slot = skipped_slot;
1224                                            }
1225                                        }
1226                                        last_counted_slot = skipped_slot;
1227                                    }
1228
1229                                    let cleared_pending_skip = if block_enabled {
1230                                        clear_pending_skip(
1231                                            &pending_skipped_slots,
1232                                            thread_index,
1233                                            slot,
1234                                        )
1235                                    } else {
1236                                        false
1237                                    };
1238
1239                                    if slot <= last_counted_slot && !cleared_pending_skip {
1240                                        log::debug!(
1241                                            target: &log_target,
1242                                            "duplicate block {}, already counted (last_counted={})",
1243                                            slot,
1244                                            last_counted_slot,
1245                                        );
1246                                        this_block_rewards.clear();
1247                                        continue;
1248                                    }
1249
1250                                    if block_enabled {
1251                                        if let Some(on_block_cb) = on_block.as_ref() {
1252                                            let keyed_rewards = std::mem::take(&mut this_block_rewards);
1253                                            if slot > last_emitted_slot {
1254                                                last_emitted_slot = slot;
1255                                                on_block_cb(
1256                                                    thread_index,
1257                                                    BlockData::Block {
1258                                                        parent_slot: block.meta.parent_slot,
1259                                                        parent_blockhash: previous_blockhash,
1260                                                        slot: block.slot,
1261                                                        blockhash: latest_entry_blockhash,
1262                                                        rewards: KeyedRewardsAndNumPartitions {
1263                                                            keyed_rewards,
1264                                                            num_partitions: None,
1265                                                        },
1266                                                        block_time: Some(block.meta.blocktime as i64),
1267                                                        block_height: block.meta.block_height,
1268                                                        executed_transaction_count:
1269                                                            this_block_executed_transaction_count,
1270                                                        entry_count: this_block_entry_count,
1271                                                    },
1272                                                )
1273                                                .await
1274                                                .map_err(|e| {
1275                                                    (
1276                                                        FirehoseError::BlockHandlerError(e),
1277                                                        error_slot,
1278                                                    )
1279                                                })?;
1280                                            }
1281                                        }
1282                                    } else {
1283                                        this_block_rewards.clear();
1284                                    }
1285                                    previous_blockhash = latest_entry_blockhash;
1286
1287                                    if tracking_enabled {
1288                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1289                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1290                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1291                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1292                                        if let Some(ref mut stats) = thread_stats {
1293                                            stats.blocks_processed += 1;
1294                                            stats.slots_processed += 1;
1295                                            stats.current_slot = slot;
1296                                        }
1297
1298                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1299                                            (&stats_tracking, thread_stats.as_mut())
1300                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1301                                                && let Err(err) = maybe_emit_stats(
1302                                                    stats_tracking.as_ref(),
1303                                                    thread_index,
1304                                                    thread_stats_ref,
1305                                                    &overall_slots_processed,
1306                                                    &overall_blocks_processed,
1307                                                    &overall_transactions_processed,
1308                                                    &overall_entries_processed,
1309                                                &transactions_since_stats,
1310                                                &blocks_since_stats,
1311                                                &slots_since_stats,
1312                                                &last_pulse,
1313                                                start_time,
1314                                            )
1315                                            .await
1316                                            {
1317                                                blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1318                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1319                                                    overall_blocks_processed
1320                                                        .fetch_sub(1, Ordering::Relaxed);
1321                                                    overall_slots_processed
1322                                                        .fetch_sub(1, Ordering::Relaxed);
1323                                                    if let Some((
1324                                                        prev_slots_processed,
1325                                                        prev_blocks_processed,
1326                                                        prev_leader_skipped,
1327                                                        prev_current_slot,
1328                                                    )) = thread_stats_snapshot
1329                                                    {
1330                                                        thread_stats_ref.slots_processed =
1331                                                            prev_slots_processed;
1332                                                        thread_stats_ref.blocks_processed =
1333                                                            prev_blocks_processed;
1334                                                        thread_stats_ref.leader_skipped_slots =
1335                                                            prev_leader_skipped;
1336                                                        thread_stats_ref.current_slot =
1337                                                            prev_current_slot;
1338                                                    }
1339                                                    last_counted_slot = prev_last_counted_slot;
1340                                                    return Err(err);
1341                                                }
1342                                    }
1343
1344                                    if slot > last_counted_slot {
1345                                        last_counted_slot = slot;
1346                                    }
1347                                }
1348                                Subset(_subset) => (),
1349                                Epoch(_epoch) => (),
1350                                Rewards(rewards) => {
1351                                    if reward_enabled || block_enabled {
1352                                        let reassembled = nodes
1353                                            .reassemble_dataframes(rewards.data.clone())
1354                                            .map_err(|err| {
1355                                                (
1356                                                    FirehoseError::NodeDecodingError(item_index, err),
1357                                                    current_slot.unwrap_or(slot_range.start),
1358                                                )
1359                                            })?;
1360                                        if reassembled.is_empty() {
1361                                            this_block_rewards.clear();
1362                                            if reward_enabled
1363                                                && let Some(on_reward_cb) = on_reward.as_ref()
1364                                            {
1365                                                on_reward_cb(
1366                                                    thread_index,
1367                                                    RewardsData {
1368                                                        slot: block.slot,
1369                                                        rewards: Vec::new(),
1370                                                    },
1371                                                )
1372                                                .await
1373                                                .map_err(|e| {
1374                                                    (
1375                                                        FirehoseError::RewardHandlerError(e),
1376                                                        error_slot,
1377                                                    )
1378                                                })?;
1379                                            }
1380                                            continue;
1381                                        }
1382
1383                                        let decompressed = utils::decompress_zstd(reassembled)
1384                                            .map_err(|err| {
1385                                                (
1386                                                    FirehoseError::NodeDecodingError(
1387                                                        item_index,
1388                                                        err,
1389                                                    ),
1390                                                    error_slot,
1391                                                )
1392                                            })?;
1393
1394                                        let decoded =
1395                                            prost_011::Message::decode(decompressed.as_slice())
1396                                                .map_err(|err| {
1397                                                    (
1398                                                        FirehoseError::NodeDecodingError(
1399                                                            item_index,
1400                                                            Box::new(err),
1401                                                        ),
1402                                                        error_slot,
1403                                                    )
1404                                                })?;
1405                                        let keyed_rewards = convert_proto_rewards(&decoded)
1406                                            .map_err(|err| {
1407                                                (
1408                                                    FirehoseError::NodeDecodingError(item_index, err),
1409                                                    error_slot,
1410                                                )
1411                                            })?;
1412                                        if reward_enabled
1413                                            && let Some(on_reward_cb) = on_reward.as_ref()
1414                                        {
1415                                            on_reward_cb(
1416                                                thread_index,
1417                                                RewardsData {
1418                                                    slot: block.slot,
1419                                                    rewards: keyed_rewards.clone(),
1420                                                },
1421                                            )
1422                                            .await
1423                                            .map_err(|e| {
1424                                                (
1425                                                    FirehoseError::RewardHandlerError(e),
1426                                                    error_slot,
1427                                                )
1428                                            })?;
1429                                        }
1430                                        this_block_rewards = keyed_rewards;
1431                                        if let Some(ref mut stats) = thread_stats {
1432                                            stats.rewards_processed +=
1433                                                this_block_rewards.len() as u64;
1434                                        }
1435                                    }
1436                                }
1437                                DataFrame(_data_frame) => (),
1438                            }
1439                        }
1440                        if block.slot == slot_range.end - 1 {
1441                            let finish_time = std::time::Instant::now();
1442                            let elapsed = finish_time.duration_since(start_time);
1443                            log::info!(target: &log_target, "processed slot {}", block.slot);
1444                            let elapsed_pretty = human_readable_duration(elapsed);
1445                            log::info!(
1446                                target: &log_target,
1447                                "processed {} slots across {} epochs in {}.",
1448                                slot_range.end - slot_range.start,
1449                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1450                                elapsed_pretty
1451                            );
1452                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1453                            // On completion, report threads with non-zero error counts for
1454                            // visibility.
1455                            let summary: String = error_counts
1456                                .iter()
1457                                .enumerate()
1458                                .filter_map(|(i, c)| {
1459                                    let v = c.load(Ordering::Relaxed);
1460                                    if v > 0 {
1461                                        Some(format!("{:03}({})", i, v))
1462                                    } else {
1463                                        None
1464                                    }
1465                                })
1466                                .collect::<Vec<_>>()
1467                                .join(", ");
1468                            if !summary.is_empty() {
1469                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1470                            }
1471                            return Ok(());
1472                        }
1473                    }
1474                    if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1475                        && last_counted_slot < expected_last_slot
1476                    {
1477                        // Do not synthesize skipped slots during final flush; another thread may
1478                        // cover the remaining range (especially across epoch boundaries).
1479                    }
1480                    if let Some(ref mut stats) = thread_stats {
1481                        stats.finish_time = Some(std::time::Instant::now());
1482                        maybe_emit_stats(
1483                            stats_tracking.as_ref(),
1484                            thread_index,
1485                            stats,
1486                            &overall_slots_processed,
1487                            &overall_blocks_processed,
1488                            &overall_transactions_processed,
1489                            &overall_entries_processed,
1490                            &transactions_since_stats,
1491                            &blocks_since_stats,
1492                            &slots_since_stats,
1493                            &last_pulse,
1494                            start_time,
1495                        )
1496                        .await?;
1497                    }
1498                    if block_enabled {
1499                        pending_skipped_slots.remove(&thread_index);
1500                    }
1501                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1502                    }
1503                    Ok(())
1504            }
1505            .await
1506            {
1507                if is_shutdown_error(&err) {
1508                    log::info!(
1509                        target: &log_target,
1510                        "shutdown requested; terminating firehose thread {}",
1511                        thread_index
1512                    );
1513                    break;
1514                }
1515                let epoch = slot_to_epoch(slot);
1516                let item_index = match &err {
1517                    FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1518                    _ => 0,
1519                };
1520                let error_message = err.to_string();
1521                log::error!(
1522                    target: &log_target,
1523                    "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1524                    slot,
1525                    epoch
1526                );
1527                log::error!(target: &log_target, "{}", error_message);
1528                if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1529                    || error_message.contains("Unknown CID version")
1530                {
1531                    // Clear cached index data for this epoch to avoid retrying with a bad/partial index
1532                    // (or a bad seek offset that landed mid-stream).
1533                    SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1534                }
1535                if let Some(on_error_cb) = on_error.clone() {
1536                    let context = FirehoseErrorContext {
1537                        thread_id: thread_index,
1538                        slot,
1539                        epoch,
1540                        error_message: error_message.clone(),
1541                    };
1542                    if let Err(handler_err) = on_error_cb(thread_index, context).await {
1543                        log::error!(
1544                            target: &log_target,
1545                            "on_error handler failed: {}",
1546                            handler_err
1547                        );
1548                    }
1549                }
1550                // Increment this thread's error counter
1551                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1552                log::warn!(
1553                    target: &log_target,
1554                    "restarting from slot {} at index {}",
1555                    slot,
1556                    item_index,
1557                );
1558                // Update slot range to resume from the failed slot, not the original start.
1559                // Reset local tracking so we don't treat the resumed slot range as already counted.
1560                // If we've already counted this slot, resume from the next one to avoid duplicates.
1561                if slot <= last_counted_slot {
1562                    slot_range.start = last_counted_slot.saturating_add(1);
1563                } else {
1564                    slot_range.start = slot;
1565                }
1566                // Reset pulse timer to exclude downtime from next rate calc.
1567                last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1568                if tracking_enabled
1569                    && let Some(ref mut stats_ref) = thread_stats {
1570                        stats_ref.slot_range.start = slot_range.start;
1571                        stats_ref.slot_range.end = slot_range.end;
1572                        // initial_slot_range remains unchanged for progress reporting.
1573                    }
1574                if block_enabled {
1575                    pending_skipped_slots.remove(&thread_index);
1576                }
1577                // `skip_until_index` is unsafe across retries because `item_index`
1578                // is reset to 0 each epoch restart. Keeping it can skip large portions
1579                // of the stream and silently drop slots.
1580                skip_until_index = None;
1581                last_emitted_slot_global = last_emitted_slot;
1582            }
1583        });
1584        handles.push(handle);
1585    }
1586
1587    // Wait for all threads to complete
1588    for handle in handles {
1589        handle.await.unwrap();
1590    }
1591    if stats_tracking.is_some() {
1592        let elapsed = firehose_start.elapsed();
1593        let elapsed_secs = elapsed.as_secs_f64();
1594        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1595        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1596        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1597        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1598        let total_errors: u64 = error_counts
1599            .iter()
1600            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1601            .sum();
1602        let overall_tps = if elapsed_secs > 0.0 {
1603            total_transactions as f64 / elapsed_secs
1604        } else {
1605            0.0
1606        };
1607        log::info!(
1608            target: LOG_MODULE,
1609            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1610            elapsed_secs,
1611            total_slots,
1612            total_blocks,
1613            total_leader_skipped,
1614            total_transactions,
1615            overall_tps,
1616            total_errors
1617        );
1618    }
1619    if shutdown_flag.load(Ordering::SeqCst) {
1620        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1621    } else {
1622        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1623    }
1624    Ok(())
1625}
1626
1627#[allow(clippy::result_large_err)]
1628/// Builds a Geyser-backed firehose and returns a slot notification stream.
1629///
1630/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1631/// rather than relying solely on remote streams. The provided `slot_range` is treated as a
1632/// half-open interval `[start, end)`, and the thread will restart from the last processed
1633/// slot on recoverable errors to maintain coverage.
1634pub fn firehose_geyser(
1635    rt: Arc<tokio::runtime::Runtime>,
1636    slot_range: Range<u64>,
1637    geyser_config_files: Option<&[PathBuf]>,
1638    index_base_url: &Url,
1639    client: &Client,
1640    on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1641    threads: u64,
1642) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1643    if threads == 0 {
1644        return Err((
1645            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1646            slot_range.start,
1647        ));
1648    }
1649    log::info!(target: LOG_MODULE, "starting firehose...");
1650    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1651    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1652    let mut entry_notifier_maybe = None;
1653    let mut block_meta_notifier_maybe = None;
1654    let mut transaction_notifier_maybe = None;
1655    if let Some(geyser_config_files) = geyser_config_files {
1656        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1657
1658        let service =
1659            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1660                confirmed_bank_receiver.clone(),
1661                true,
1662                geyser_config_files,
1663            )
1664            .map_err(|e| (e.into(), slot_range.start))?;
1665
1666        transaction_notifier_maybe = Some(
1667            service
1668                .get_transaction_notifier()
1669                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1670                .map_err(|e| (e, slot_range.start))?,
1671        );
1672
1673        entry_notifier_maybe = service.get_entry_notifier();
1674        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1675
1676        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1677    }
1678
1679    if entry_notifier_maybe.is_some() {
1680        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1681    } else {
1682        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1683    }
1684    log::info!(target: LOG_MODULE, "running on_load...");
1685    rt.spawn(on_load);
1686
1687    let slot_range = Arc::new(slot_range);
1688    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1689    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1690    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1691    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1692
1693    // divide slot_range into n subranges
1694    let subranges = generate_subranges(&slot_range, threads);
1695    if threads > 1 {
1696        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1697    }
1698
1699    let mut handles = Vec::new();
1700    // Shared per-thread error counters
1701    let error_counts: Arc<Vec<AtomicU32>> =
1702        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1703
1704    for (i, slot_range) in subranges.into_iter().enumerate() {
1705        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1706        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1707        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1708        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1709        let client = client.clone();
1710        let error_counts = error_counts.clone();
1711
1712        let rt_clone = rt.clone();
1713
1714        let handle = std::thread::spawn(move || {
1715            rt_clone.block_on(async {
1716                firehose_geyser_thread(
1717                    slot_range,
1718                    transaction_notifier_maybe,
1719                    entry_notifier_maybe,
1720                    block_meta_notifier_maybe,
1721                    confirmed_bank_sender,
1722                    &client,
1723                    if threads > 1 { Some(i) } else { None },
1724                    error_counts,
1725                )
1726                .await
1727                .unwrap();
1728            });
1729        });
1730        handles.push(handle);
1731    }
1732
1733    // Wait for all threads to complete
1734    for handle in handles {
1735        handle.join().unwrap();
1736    }
1737    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1738    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1739        block_meta_notifier.notify_block_metadata(
1740            u64::MAX,
1741            "unload",
1742            u64::MAX,
1743            "unload",
1744            &KeyedRewardsAndNumPartitions {
1745                keyed_rewards: vec![],
1746                num_partitions: None,
1747            },
1748            None,
1749            None,
1750            0,
1751            0,
1752        );
1753    }
1754    Ok(confirmed_bank_receiver)
1755}
1756
1757#[allow(clippy::too_many_arguments)]
1758#[allow(clippy::result_large_err)]
1759async fn firehose_geyser_thread(
1760    mut slot_range: Range<u64>,
1761    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1762    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1763    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1764    confirmed_bank_sender: Sender<SlotNotification>,
1765    client: &Client,
1766    thread_index: Option<usize>,
1767    error_counts: Arc<Vec<AtomicU32>>,
1768) -> Result<(), (FirehoseError, u64)> {
1769    let start_time = std::time::Instant::now();
1770    let log_target = if let Some(thread_index) = thread_index {
1771        format!("{}::T{:03}", LOG_MODULE, thread_index)
1772    } else {
1773        LOG_MODULE.to_string()
1774    };
1775    let initial_slot_range = slot_range.clone();
1776    let mut skip_until_index = None;
1777    let mut last_counted_slot = slot_range.start.saturating_sub(1);
1778    // let mut triggered = false;
1779    while let Err((err, slot)) = async {
1780            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1781            log::info!(
1782                target: &log_target,
1783                "slot range: {} (epoch {}) ... {} (epoch {})",
1784                slot_range.start,
1785                slot_to_epoch(slot_range.start),
1786                slot_range.end,
1787                slot_to_epoch(slot_range.end)
1788            );
1789
1790            log::info!(target: &log_target, "🚒 starting firehose...");
1791
1792            // for each epoch
1793            let mut current_slot: Option<u64> = None;
1794            for epoch_num in epoch_range.clone() {
1795                log::info!(target: &log_target, "entering epoch {}", epoch_num);
1796                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1797                    Ok(stream) => stream,
1798                    Err(_) => {
1799                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1800                    }
1801                };
1802                let mut reader = NodeReader::new(stream);
1803
1804                let header_fut = reader.read_raw_header();
1805                let header = match timeout(OP_TIMEOUT, header_fut).await {
1806                    Ok(res) => res
1807                        .map_err(FirehoseError::ReadHeader)
1808                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1809                    Err(_) => {
1810                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1811                    }
1812                };
1813                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1814
1815                let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1816                let local_start = std::cmp::max(slot_range.start, epoch_start);
1817                let local_end_inclusive =
1818                    std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1819                if local_start > local_end_inclusive {
1820                    log::debug!(
1821                        target: &log_target,
1822                        "epoch {} has no overlap with thread range ({}..{}), skipping",
1823                        epoch_num,
1824                        slot_range.start,
1825                        slot_range.end
1826                    );
1827                    continue;
1828                }
1829
1830                let mut todo_previous_blockhash = Hash::default();
1831                let mut todo_latest_entry_blockhash = Hash::default();
1832                // Reset counters to align to the local epoch slice; prevents boundary slots
1833                // from being treated as already-counted after a restart.
1834                last_counted_slot = local_start.saturating_sub(1);
1835                current_slot = None;
1836
1837                if local_start > epoch_start {
1838                    // Seek to the slot immediately preceding the requested range so the reader
1839                    // captures the full node set (transactions, entries, rewards) for the
1840                    // target block on the next iteration.
1841                    let seek_slot = local_start.saturating_sub(1);
1842                    let seek_fut = reader.seek_to_slot(seek_slot);
1843                    match timeout(OP_TIMEOUT, seek_fut).await {
1844                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1845                        Err(_) => {
1846                            return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1847                        }
1848                    }
1849                }
1850
1851                // for each item in each block
1852                let mut item_index = 0;
1853                let mut displayed_skip_message = false;
1854                loop {
1855                    let read_fut = reader.read_until_block();
1856                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1857                        Ok(result) => result
1858                            .map_err(FirehoseError::ReadUntilBlockError)
1859                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1860                        Err(_) => {
1861                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1862                            let restart_slot =
1863                                current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
1864                            return Err((
1865                                FirehoseError::OperationTimeout("read_until_block"),
1866                                restart_slot,
1867                            ));
1868                        }
1869                    };
1870                    if nodes.is_empty() {
1871                        log::info!(
1872                            target: &log_target,
1873                            "reached end of epoch {}",
1874                            epoch_num
1875                        );
1876                        break;
1877                    }
1878                    // ignore epoch and subset nodes at end of car file loop { if
1879                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
1880                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
1881                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
1882                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
1883                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
1884                    //         log::info!(target: &log_target, "reached end of epoch {}",
1885                    //             epoch_num); break; }
1886                    if let Some(last_node) = nodes.0.last()
1887                        && !last_node.get_node().is_block() {
1888                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1889                            break;
1890                        }
1891                    let block = nodes
1892                        .get_block()
1893                        .map_err(FirehoseError::GetBlockError)
1894                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1895                    log::debug!(
1896                        target: &log_target,
1897                        "read {} items from epoch {}, now at slot {}",
1898                        item_index,
1899                        epoch_num,
1900                        block.slot
1901                    );
1902                    let slot = block.slot;
1903                    if slot > local_end_inclusive {
1904                        log::debug!(
1905                            target: &log_target,
1906                            "reached end of local slice at slot {} (epoch {}), stopping",
1907                            slot,
1908                            epoch_num
1909                        );
1910                        break;
1911                    }
1912                    if slot >= slot_range.end {
1913                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1914                        // Return early to terminate the firehose thread cleanly. We use >=
1915                        // because slot_range is half-open [start, end), so any slot equal to
1916                        // end is out-of-range and must not be processed.
1917                        return Ok(());
1918                    }
1919                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1920                    if slot < local_start {
1921                        if slot.saturating_add(1) == local_start {
1922                            log::debug!(
1923                                target: &log_target,
1924                                "priming reader with preceding slot {}, skipping",
1925                                slot
1926                            );
1927                        } else {
1928                            log::warn!(
1929                                target: &log_target,
1930                                "encountered slot {} before start of range {}, skipping",
1931                                slot,
1932                                local_start
1933                            );
1934                        }
1935                        continue;
1936                    }
1937                    current_slot = Some(slot);
1938                    let mut entry_index: usize = 0;
1939                    let mut this_block_executed_transaction_count: u64 = 0;
1940                    let mut this_block_entry_count: u64 = 0;
1941                    let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1942
1943                    if slot <= last_counted_slot {
1944                        log::debug!(
1945                            target: &log_target,
1946                            "duplicate block {}, already counted (last_counted={})",
1947                            slot,
1948                            last_counted_slot,
1949                        );
1950                        this_block_rewards.clear();
1951                        continue;
1952                    }
1953
1954                    nodes.each(|node_with_cid| -> Result<(), SharedError> {
1955                        item_index += 1;
1956                        // if item_index == 100000 && !triggered { log::info!("simulating
1957                        //     error"); triggered = true; return
1958                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
1959                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
1960                        //         "simulated error", )), ))); }
1961                        if let Some(skip) = skip_until_index {
1962                            if item_index < skip {
1963                                if !displayed_skip_message {
1964                                    log::info!(
1965                                        target: &log_target,
1966                                        "skipping until index {} (at {})",
1967                                        skip,
1968                                        item_index
1969                                    );
1970                                    displayed_skip_message = true;
1971                                }
1972                                return Ok(());
1973                            } else {
1974                                log::info!(
1975                                    target: &log_target,
1976                                    "reached target index {}, resuming...",
1977                                    skip
1978                                );
1979                                skip_until_index = None;
1980                            }
1981                        }
1982                        let node = node_with_cid.get_node();
1983
1984                        use crate::node::Node::*;
1985                        match node {
1986                            Transaction(tx) => {
1987                                let versioned_tx = tx.as_parsed()?;
1988                                let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1989
1990                                let as_native_metadata = decode_transaction_status_meta_from_frame(
1991                                    block.slot,
1992                                    reassembled_metadata,
1993                                )?;
1994
1995                                let message_hash = {
1996                                    #[cfg(feature = "verify-transaction-signatures")]
1997                                    {
1998                                        versioned_tx.verify_and_hash_message()?
1999                                    }
2000                                    #[cfg(not(feature = "verify-transaction-signatures"))]
2001                                    {
2002                                        // Signature verification is optional because it is
2003                                        // extremely expensive at replay scale.
2004                                        versioned_tx.message.hash()
2005                                    }
2006                                };
2007                                let signature = versioned_tx
2008                                    .signatures
2009                                    .first()
2010                                    .ok_or_else(|| {
2011                                        Box::new(std::io::Error::new(
2012                                            std::io::ErrorKind::InvalidData,
2013                                            "transaction missing signature",
2014                                        )) as SharedError
2015                                    })?;
2016                                let is_vote = is_simple_vote_transaction(&versioned_tx);
2017
2018                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2019                                    transaction_notifier.notify_transaction(
2020                                        block.slot,
2021                                        tx.index.unwrap() as usize,
2022                                        signature,
2023                                        &message_hash,
2024                                        is_vote,
2025                                        &as_native_metadata,
2026                                        &versioned_tx,
2027                                    );
2028                                }
2029
2030                            }
2031                            Entry(entry) => {
2032                                let entry_hash = Hash::from(entry.hash.to_bytes());
2033                                let entry_transaction_count = entry.transactions.len();
2034                                let entry_transaction_count_u64 = entry_transaction_count as u64;
2035                                let starting_transaction_index =
2036                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2037                                        Box::new(std::io::Error::other(
2038                                            "transaction index exceeds usize range",
2039                                        )) as SharedError
2040                                    })?;
2041                                todo_latest_entry_blockhash = entry_hash;
2042                                this_block_executed_transaction_count += entry_transaction_count_u64;
2043                                this_block_entry_count += 1;
2044                                if entry_notifier_maybe.is_none() {
2045                                    return Ok(());
2046                                }
2047                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2048                                let entry_summary = solana_entry::entry::EntrySummary {
2049                                    num_hashes: entry.num_hashes,
2050                                    hash: Hash::from(entry.hash.to_bytes()),
2051                                    num_transactions: entry_transaction_count_u64,
2052                                };
2053                                entry_notifier.notify_entry(
2054                                    block.slot,
2055                                    entry_index,
2056                                    &entry_summary,
2057                                    starting_transaction_index,
2058                                );
2059                                entry_index += 1;
2060                            }
2061                            Block(block) => {
2062                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2063                                confirmed_bank_sender.send(notification).unwrap();
2064
2065                                if block_meta_notifier_maybe.is_none() {
2066                                    last_counted_slot = block.slot;
2067                                    return Ok(());
2068                                }
2069                                let keyed_rewards = std::mem::take(&mut this_block_rewards);
2070                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2071                                block_meta_notifier.notify_block_metadata(
2072                                    block.meta.parent_slot,
2073                                    todo_previous_blockhash.to_string().as_str(),
2074                                    block.slot,
2075                                    todo_latest_entry_blockhash.to_string().as_str(),
2076                                    &KeyedRewardsAndNumPartitions {
2077                                        keyed_rewards,
2078                                        num_partitions: None,
2079                                    },
2080                                    Some(block.meta.blocktime as i64),
2081                                    block.meta.block_height,
2082                                    this_block_executed_transaction_count,
2083                                    this_block_entry_count,
2084                                );
2085                                todo_previous_blockhash = todo_latest_entry_blockhash;
2086                                last_counted_slot = block.slot;
2087                                std::thread::yield_now();
2088                            }
2089                            Subset(_subset) => (),
2090                            Epoch(_epoch) => (),
2091                            Rewards(rewards) => {
2092                                if !rewards.is_complete() {
2093                                    let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
2094                                    let decompressed = utils::decompress_zstd(reassembled)?;
2095                                    let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
2096                                        Box::new(std::io::Error::other(
2097                                            std::format!("Error decoding rewards: {:?}", err),
2098                                        ))
2099                                    })?;
2100                                    this_block_rewards = convert_proto_rewards(&decoded)?;
2101                                }
2102                            }
2103                            DataFrame(_data_frame) => (),
2104                        }
2105                        Ok(())
2106                    })
2107                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2108                    if block.slot == slot_range.end - 1 {
2109                        let finish_time = std::time::Instant::now();
2110                        let elapsed = finish_time.duration_since(start_time);
2111                        log::info!(target: &log_target, "processed slot {}", block.slot);
2112                        let elapsed_pretty = human_readable_duration(elapsed);
2113                        log::info!(
2114                            target: &log_target,
2115                            "processed {} slots across {} epochs in {}.",
2116                            initial_slot_range.end - initial_slot_range.start,
2117                            slot_to_epoch(initial_slot_range.end)
2118                                + 1
2119                                - slot_to_epoch(initial_slot_range.start),
2120                            elapsed_pretty
2121                        );
2122                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2123                        // On completion, report threads with non-zero error counts for
2124                        // visibility.
2125                        let summary: String = error_counts
2126                            .iter()
2127                            .enumerate()
2128                            .filter_map(|(i, c)| {
2129                                let v = c.load(Ordering::Relaxed);
2130                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2131                            })
2132                            .collect::<Vec<_>>()
2133                            .join(", ");
2134                        if !summary.is_empty() {
2135                            log::debug!(target: &log_target, "threads with errors: {}", summary);
2136                        }
2137                        return Ok(());
2138                    }
2139                }
2140            }
2141            Ok(())
2142}
2143.await
2144{
2145        if is_shutdown_error(&err) {
2146            log::info!(
2147                target: &log_target,
2148                "shutdown requested; terminating firehose thread {:?}",
2149                thread_index
2150            );
2151            return Ok(());
2152        }
2153        log::error!(
2154            target: &log_target,
2155            "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2156            slot,
2157            slot_to_epoch(slot)
2158            );
2159            log::error!(target: &log_target, "{}", err);
2160            let error_message = err.to_string();
2161            if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2162                || error_message.contains("Unknown CID version")
2163            {
2164                // Clear cached index data for this epoch to avoid retrying with a bad/partial index
2165                // (or a bad seek offset that landed mid-stream).
2166                SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2167            }
2168            let item_index = match err {
2169                FirehoseError::NodeDecodingError(item_index, _) => item_index,
2170                _ => 0,
2171            };
2172            // Increment this thread's error counter
2173            let idx = thread_index.unwrap_or(0);
2174            error_counts[idx].fetch_add(1, Ordering::Relaxed);
2175            log::warn!(
2176                target: &log_target,
2177                "restarting from slot {} at index {}",
2178                slot,
2179                item_index,
2180            );
2181            // Update slot range to resume from the failed slot, not the original start.
2182            // If the failing slot was already fully processed, resume from the next slot.
2183            if slot <= last_counted_slot {
2184                slot_range.start = last_counted_slot.saturating_add(1);
2185            } else {
2186                slot_range.start = slot;
2187            }
2188            // `skip_until_index` is unsafe across retries because `item_index`
2189            // is reset to 0 each epoch restart. Keeping it can skip large portions
2190            // of the stream and silently drop slots.
2191            skip_until_index = None;
2192}
2193    Ok(())
2194}
2195
2196#[inline]
2197fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2198    if !(1..=2).contains(&versioned_tx.signatures.len()) {
2199        return false;
2200    }
2201
2202    if !matches!(
2203        versioned_tx.version(),
2204        solana_transaction::versioned::TransactionVersion::Legacy(_)
2205    ) {
2206        return false;
2207    }
2208
2209    let instructions = versioned_tx.message.instructions();
2210    if instructions.len() != 1 {
2211        return false;
2212    }
2213
2214    let program_index = instructions[0].program_id_index as usize;
2215    versioned_tx
2216        .message
2217        .static_account_keys()
2218        .get(program_index)
2219        .map(|program_id| program_id == &vote_program_id())
2220        .unwrap_or(false)
2221}
2222
2223#[inline(always)]
2224fn convert_proto_rewards(
2225    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2226) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2227    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2228    for proto_reward in proto_rewards.rewards.iter() {
2229        let reward = RewardInfo {
2230            reward_type: match proto_reward.reward_type - 1 {
2231                0 => RewardType::Fee,
2232                1 => RewardType::Rent,
2233                2 => RewardType::Staking,
2234                3 => RewardType::Voting,
2235                typ => {
2236                    return Err(Box::new(std::io::Error::other(format!(
2237                        "unsupported reward type {}",
2238                        typ
2239                    ))));
2240                }
2241            },
2242            lamports: proto_reward.lamports,
2243            post_balance: proto_reward.post_balance,
2244            commission: proto_reward.commission.parse::<u8>().ok(),
2245        };
2246        let pubkey = proto_reward
2247            .pubkey
2248            .parse::<Address>()
2249            .map_err(|err| Box::new(err) as SharedError)?;
2250        keyed_rewards.push((pubkey, reward));
2251    }
2252    Ok(keyed_rewards)
2253}
2254
2255#[inline]
2256/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
2257pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2258    let total = slot_range.end - slot_range.start;
2259    let slots_per_thread = total / threads;
2260    let remainder = total % threads;
2261
2262    let ranges: Vec<Range<u64>> = (0..threads)
2263        .map(|i| {
2264            // Distribute remainder slots to the first `remainder` threads
2265            let extra_slot = if i < remainder { 1 } else { 0 };
2266            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2267            let end = start + slots_per_thread + extra_slot;
2268            start..end
2269        })
2270        .collect();
2271
2272    // Verify that ranges cover all slots exactly
2273    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2274    assert_eq!(
2275        total_covered, total,
2276        "Range generation failed: {} threads should cover {} slots but only cover {}",
2277        threads, total, total_covered
2278    );
2279
2280    // Verify no gaps between ranges
2281    for i in 1..ranges.len() {
2282        assert_eq!(
2283            ranges[i - 1].end,
2284            ranges[i].start,
2285            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2286            i - 1,
2287            ranges[i - 1].end,
2288            i,
2289            ranges[i].start
2290        );
2291    }
2292
2293    log::info!(
2294        target: LOG_MODULE,
2295        "Generated {} thread ranges covering {} slots total",
2296        threads,
2297        total_covered
2298    );
2299    ranges
2300}
2301
2302fn human_readable_duration(duration: std::time::Duration) -> String {
2303    if duration.is_zero() {
2304        return "0s".into();
2305    }
2306    let total_secs = duration.as_secs();
2307    if total_secs < 60 {
2308        let secs_f = duration.as_secs_f64();
2309        if total_secs == 0 {
2310            format!("{:.2}s", secs_f)
2311        } else if duration.subsec_millis() == 0 {
2312            format!("{}s", total_secs)
2313        } else {
2314            format!("{:.2}s", secs_f)
2315        }
2316    } else {
2317        let mut secs = total_secs;
2318        let days = secs / 86_400;
2319        secs %= 86_400;
2320        let hours = secs / 3_600;
2321        secs %= 3_600;
2322        let minutes = secs / 60;
2323        secs %= 60;
2324        if days > 0 {
2325            if hours > 0 {
2326                format!("{days}d{hours}h")
2327            } else {
2328                format!("{days}d")
2329            }
2330        } else if hours > 0 {
2331            if minutes > 0 {
2332                format!("{hours}h{minutes}m")
2333            } else {
2334                format!("{hours}h")
2335            }
2336        } else if minutes > 0 {
2337            if secs > 0 {
2338                format!("{minutes}m{secs}s")
2339            } else {
2340                format!("{minutes}m")
2341            }
2342        } else {
2343            format!("{secs}s")
2344        }
2345    }
2346}
2347
2348#[cfg(test)]
2349fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2350    Box::pin(async move {
2351        let elapsed = stats.start_time.elapsed();
2352        let elapsed_secs = elapsed.as_secs_f64();
2353        let tps = if elapsed_secs > 0.0 {
2354            stats.transactions_processed as f64 / elapsed_secs
2355        } else {
2356            0.0
2357        };
2358        log::info!(
2359            target: LOG_MODULE,
2360            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2361            stats.thread_stats.current_slot,
2362            stats.slots_processed,
2363            stats.blocks_processed,
2364            stats.transactions_processed,
2365            stats.entries_processed,
2366            stats.rewards_processed,
2367            elapsed_secs,
2368            tps
2369        );
2370        Ok(())
2371    })
2372}
2373
2374#[cfg(test)]
2375use futures_util::FutureExt;
2376#[cfg(test)]
2377use serial_test::serial;
2378#[cfg(test)]
2379use std::sync::{Mutex, OnceLock};
2380
2381#[tokio::test(flavor = "multi_thread")]
2382async fn test_firehose_epoch_800() {
2383    use dashmap::DashSet;
2384    use std::sync::atomic::{AtomicU64, Ordering};
2385    solana_logger::setup_with_default("info");
2386    const THREADS: usize = 4;
2387    const NUM_SLOTS_TO_COVER: u64 = 50;
2388    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2389    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2390    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2391    static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2392    static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2393    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2394    let stats_tracking = StatsTracking {
2395        on_stats: log_stats_handler,
2396        tracking_interval_slots: 10,
2397    };
2398
2399    for prev in PREV_BLOCK.iter() {
2400        prev.store(0, Ordering::Relaxed);
2401    }
2402    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2403    NUM_BLOCKS.store(0, Ordering::Relaxed);
2404    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2405    SEEN_SLOTS.get_or_init(DashSet::new).clear();
2406    SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2407
2408    firehose(
2409        THREADS.try_into().unwrap(),
2410        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2411        Some(|thread_id: usize, block: BlockData| {
2412            async move {
2413                let _prev =
2414                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2415                if block.was_skipped() {
2416                    log::info!(
2417                        target: LOG_MODULE,
2418                        "leader skipped block {} on thread {}",
2419                        block.slot(),
2420                        thread_id,
2421                    );
2422                } else {
2423                    /*log::info!(
2424                        target: LOG_MODULE,
2425                        "got block {} on thread {}",
2426                        block.slot(),
2427                        thread_id,
2428                    );*/
2429                }
2430
2431                let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2432                if block.was_skipped() {
2433                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2434                    SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2435                } else {
2436                    if first_time {
2437                        NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2438                        if let BlockData::Block {
2439                            executed_transaction_count,
2440                            ..
2441                        } = &block
2442                        {
2443                            let executed = *executed_transaction_count;
2444                            let _ = MIN_TRANSACTIONS.fetch_update(
2445                                Ordering::Relaxed,
2446                                Ordering::Relaxed,
2447                                |current| {
2448                                    if executed < current {
2449                                        Some(executed)
2450                                    } else {
2451                                        None
2452                                    }
2453                                },
2454                            );
2455                        }
2456                    }
2457                }
2458                Ok(())
2459            }
2460            .boxed()
2461        }),
2462        None::<OnTxFn>,
2463        None::<OnEntryFn>,
2464        None::<OnRewardFn>,
2465        None::<OnErrorFn>,
2466        Some(stats_tracking),
2467        None,
2468    )
2469    .await
2470    .unwrap();
2471    let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2472    assert_eq!(
2473        seen, NUM_SLOTS_TO_COVER,
2474        "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2475    );
2476    let mut skipped: Vec<u64> = SEEN_SKIPPED
2477        .get_or_init(DashSet::new)
2478        .iter()
2479        .map(|v| *v)
2480        .collect();
2481    skipped.sort_unstable();
2482    // 345600000 is present but empty; still emitted as a block. Skip set should not include it.
2483    const EXPECTED_SKIPPED: [u64; 6] = [
2484        345_600_004,
2485        345_600_005,
2486        345_600_008,
2487        345_600_009,
2488        345_600_010,
2489        345_600_011,
2490    ];
2491    assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2492    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2493}
2494
2495#[tokio::test(flavor = "multi_thread")]
2496async fn test_firehose_target_slot_transactions() {
2497    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2498    solana_logger::setup_with_default("info");
2499    const TARGET_SLOT: u64 = 376_273_722;
2500    const SLOT_RADIUS: u64 = 50;
2501    const EXPECTED_TRANSACTIONS: u64 = 1414;
2502    const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2503    static FOUND: AtomicBool = AtomicBool::new(false);
2504    static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2505    static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2506
2507    FOUND.store(false, Ordering::Relaxed);
2508    OBSERVED_TXS.store(0, Ordering::Relaxed);
2509    OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2510
2511    firehose(
2512        4,
2513        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2514        Some(|_thread_id: usize, block: BlockData| {
2515            async move {
2516                if block.slot() == TARGET_SLOT {
2517                    assert!(
2518                        !block.was_skipped(),
2519                        "target slot {TARGET_SLOT} was marked leader skipped",
2520                    );
2521                    if let BlockData::Block {
2522                        executed_transaction_count,
2523                        ..
2524                    } = block
2525                    {
2526                        OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2527                        FOUND.store(true, Ordering::Relaxed);
2528                        assert_eq!(
2529                            executed_transaction_count, EXPECTED_TRANSACTIONS,
2530                            "unexpected transaction count for slot {TARGET_SLOT}"
2531                        );
2532                        assert_eq!(
2533                            OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2534                            EXPECTED_NON_VOTE_TRANSACTIONS,
2535                            "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2536                        );
2537                    }
2538                }
2539                Ok(())
2540            }
2541            .boxed()
2542        }),
2543        Some(|_thread_id: usize, transaction: TransactionData| {
2544            async move {
2545                if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2546                    OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2547                }
2548                Ok(())
2549            }
2550            .boxed()
2551        }),
2552        None::<OnEntryFn>,
2553        None::<OnRewardFn>,
2554        None::<OnErrorFn>,
2555        None::<OnStatsTrackingFn>,
2556        None,
2557    )
2558    .await
2559    .unwrap();
2560
2561    assert!(
2562        FOUND.load(Ordering::Relaxed),
2563        "target slot was not processed"
2564    );
2565    assert_eq!(
2566        OBSERVED_TXS.load(Ordering::Relaxed),
2567        EXPECTED_TRANSACTIONS,
2568        "recorded transaction count mismatch"
2569    );
2570}
2571
2572#[tokio::test(flavor = "multi_thread")]
2573async fn test_firehose_epoch_850_has_logs() {
2574    use std::sync::atomic::{AtomicU64, Ordering};
2575    solana_logger::setup_with_default("info");
2576    const START_SLOT: u64 = 367_200_075; // within epoch 850
2577    const SLOT_COUNT: u64 = 50;
2578    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
2579
2580    TOTAL_TXS.store(0, Ordering::Relaxed);
2581
2582    firehose(
2583        4,
2584        START_SLOT..(START_SLOT + SLOT_COUNT),
2585        None::<OnBlockFn>,
2586        Some(|_thread_id: usize, transaction: TransactionData| {
2587            async move {
2588                TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
2589                if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
2590                    let has_logs = logs.iter().any(|msg| !msg.is_empty());
2591                    assert_eq!(has_logs, true);
2592                }
2593                Ok(())
2594            }
2595            .boxed()
2596        }),
2597        None::<OnEntryFn>,
2598        None::<OnRewardFn>,
2599        None::<OnErrorFn>,
2600        None::<OnStatsTrackingFn>,
2601        None,
2602    )
2603    .await
2604    .unwrap();
2605
2606    assert!(
2607        TOTAL_TXS.load(Ordering::Relaxed) > 0,
2608        "no transactions observed in epoch 850 range"
2609    );
2610}
2611
2612#[tokio::test(flavor = "multi_thread")]
2613async fn test_firehose_epoch_850_votes_present() {
2614    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2615    solana_logger::setup_with_default("info");
2616    const TARGET_SLOT: u64 = 367_200_100; // epoch 850
2617    const SLOT_RADIUS: u64 = 10;
2618    static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
2619    static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
2620    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
2621
2622    SEEN_BLOCK.store(false, Ordering::Relaxed);
2623    VOTE_TXS.store(0, Ordering::Relaxed);
2624    TOTAL_TXS.store(0, Ordering::Relaxed);
2625
2626    firehose(
2627        2,
2628        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2629        Some(|_thread_id: usize, block: BlockData| {
2630            async move {
2631                if block.slot() == TARGET_SLOT {
2632                    assert!(
2633                        !block.was_skipped(),
2634                        "target slot {TARGET_SLOT} was marked leader skipped",
2635                    );
2636                    SEEN_BLOCK.store(true, Ordering::Relaxed);
2637                }
2638                Ok(())
2639            }
2640            .boxed()
2641        }),
2642        Some(|_thread_id: usize, transaction: TransactionData| {
2643            async move {
2644                if transaction.slot == TARGET_SLOT {
2645                    TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
2646                    if transaction.is_vote {
2647                        VOTE_TXS.fetch_add(1, Ordering::Relaxed);
2648                    }
2649                }
2650                Ok(())
2651            }
2652            .boxed()
2653        }),
2654        None::<OnEntryFn>,
2655        None::<OnRewardFn>,
2656        None::<OnErrorFn>,
2657        None::<OnStatsTrackingFn>,
2658        None,
2659    )
2660    .await
2661    .unwrap();
2662
2663    assert!(
2664        SEEN_BLOCK.load(Ordering::Relaxed),
2665        "target slot was not processed"
2666    );
2667    assert!(
2668        TOTAL_TXS.load(Ordering::Relaxed) > 0,
2669        "no transactions counted in target slot"
2670    );
2671    assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
2672}
2673
2674#[cfg(test)]
2675#[serial]
2676#[tokio::test(flavor = "multi_thread")]
2677async fn test_firehose_restart_loses_coverage_without_reset() {
2678    use std::collections::HashMap;
2679    solana_logger::setup_with_default("info");
2680    const THREADS: usize = 1;
2681    const START_SLOT: u64 = 345_600_000;
2682    const NUM_SLOTS: u64 = 8;
2683
2684    static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
2685    COVERAGE
2686        .get_or_init(|| Mutex::new(HashMap::new()))
2687        .lock()
2688        .unwrap()
2689        .clear();
2690    static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
2691    static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
2692    FAIL_TRIGGERED.store(false, Ordering::Relaxed);
2693    SEEN_BLOCKS.store(0, Ordering::Relaxed);
2694
2695    firehose(
2696        THREADS.try_into().unwrap(),
2697        START_SLOT..(START_SLOT + NUM_SLOTS),
2698        Some(|_thread_id: usize, block: BlockData| {
2699            async move {
2700                // Force an error after at least one block has been seen so restart happens mid-range.
2701                if !block.was_skipped()
2702                    && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
2703                    && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
2704                {
2705                    return Err("synthetic handler failure to exercise restart".into());
2706                }
2707                let mut coverage = COVERAGE
2708                    .get_or_init(|| Mutex::new(HashMap::new()))
2709                    .lock()
2710                    .unwrap();
2711                *coverage.entry(block.slot()).or_insert(0) += 1;
2712                if !block.was_skipped() {
2713                    SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
2714                }
2715                Ok(())
2716            }
2717            .boxed()
2718        }),
2719        None::<OnTxFn>,
2720        None::<OnEntryFn>,
2721        None::<OnRewardFn>,
2722        None::<OnErrorFn>,
2723        None::<OnStatsTrackingFn>,
2724        None,
2725    )
2726    .await
2727    .unwrap();
2728
2729    let coverage = COVERAGE.get().unwrap().lock().unwrap();
2730    for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
2731        assert!(
2732            coverage.contains_key(&slot),
2733            "missing coverage for slot {slot} after restart"
2734        );
2735    }
2736}
2737
2738#[cfg(test)]
2739#[serial]
2740#[tokio::test(flavor = "multi_thread")]
2741async fn test_firehose_gap_coverage_near_known_missing_range() {
2742    use std::collections::HashSet;
2743    solana_logger::setup_with_default("info");
2744    const GAP_START: u64 = 378864000;
2745    const START_SLOT: u64 = GAP_START - 1000;
2746    const END_SLOT: u64 = GAP_START + 1000;
2747    const THREADS: usize = 16;
2748
2749    static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
2750    COVERAGE
2751        .get_or_init(|| Mutex::new(HashSet::new()))
2752        .lock()
2753        .unwrap()
2754        .clear();
2755
2756    firehose(
2757        THREADS.try_into().unwrap(),
2758        START_SLOT..(END_SLOT + 1),
2759        Some(|_thread_id: usize, block: BlockData| {
2760            async move {
2761                if block.was_skipped() {
2762                    return Ok(());
2763                }
2764                let slot = block.slot();
2765                COVERAGE
2766                    .get_or_init(|| Mutex::new(HashSet::new()))
2767                    .lock()
2768                    .unwrap()
2769                    .insert(slot);
2770                Ok(())
2771            }
2772            .boxed()
2773        }),
2774        None::<OnTxFn>,
2775        None::<OnEntryFn>,
2776        None::<OnRewardFn>,
2777        None::<OnErrorFn>,
2778        None::<OnStatsTrackingFn>,
2779        None,
2780    )
2781    .await
2782    .unwrap();
2783
2784    let mut coverage = COVERAGE
2785        .get_or_init(|| Mutex::new(HashSet::new()))
2786        .lock()
2787        .unwrap()
2788        .clone();
2789
2790    // ignore a known 4-slot leader skipped gap
2791    coverage.insert(378864396);
2792    coverage.insert(378864397);
2793    coverage.insert(378864398);
2794    coverage.insert(378864399);
2795
2796    let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
2797    let missing: Vec<u64> = expected
2798        .iter()
2799        .copied()
2800        .filter(|slot| !coverage.contains(slot))
2801        .collect();
2802    assert!(
2803        missing.is_empty(),
2804        "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
2805        missing.len(),
2806        &missing[..missing.len().min(10)]
2807    );
2808}