Skip to main content

jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7    block_metadata_notifier_interface::BlockMetadataNotifier,
8    geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14    optimistically_confirmed_bank_tracker::SlotNotification,
15    transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21    fmt::Display,
22    future::Future,
23    io,
24    ops::Range,
25    path::PathBuf,
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29    },
30};
31use thiserror::Error;
32use tokio::{
33    sync::broadcast::{self, error::TryRecvError},
34    time::timeout,
35};
36
37use crate::{
38    LOG_MODULE, SharedError,
39    epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
40    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
41    node_reader::NodeReader,
42    utils,
43};
44
45// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
46// header, seeking, reading next block). Adjust here to tune stall detection/restart
47// aggressiveness.
48const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49// Epochs earlier than this were bincode-encoded in Old Faithful.
50const BINCODE_EPOCH_CUTOFF: u64 = 157;
51
52fn poll_shutdown(
53    flag: &Arc<std::sync::atomic::AtomicBool>,
54    receiver: &mut Option<broadcast::Receiver<()>>,
55) -> bool {
56    if let Some(rx) = receiver {
57        match rx.try_recv() {
58            Ok(_) | Err(TryRecvError::Lagged(_)) => {
59                flag.store(true, Ordering::SeqCst);
60            }
61            Err(TryRecvError::Closed) => {
62                flag.store(true, Ordering::SeqCst);
63            }
64            Err(TryRecvError::Empty) => {}
65        }
66    }
67    flag.load(Ordering::SeqCst)
68}
69
70fn is_shutdown_error(err: &FirehoseError) -> bool {
71    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
72        inner
73            .downcast_ref::<io::Error>()
74            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
75            .unwrap_or(false)
76    }
77
78    match err {
79        FirehoseError::BlockHandlerError(inner)
80        | FirehoseError::TransactionHandlerError(inner)
81        | FirehoseError::EntryHandlerError(inner)
82        | FirehoseError::RewardHandlerError(inner)
83        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
84        _ => false,
85    }
86}
87
88async fn find_previous_indexed_slot(
89    local_start: u64,
90    epoch_start: u64,
91    log_target: &str,
92) -> Result<Option<u64>, FirehoseError> {
93    if local_start <= epoch_start {
94        return Ok(None);
95    }
96    let mut candidate = local_start.saturating_sub(1);
97    let mut skipped = 0u64;
98    loop {
99        match slot_to_offset(candidate).await {
100            Ok(_) => {
101                if skipped > 0 {
102                    log::info!(
103                        target: log_target,
104                        "slot {} missing in index; seeking back {} slots to {}",
105                        local_start.saturating_sub(1),
106                        skipped,
107                        candidate
108                    );
109                }
110                return Ok(Some(candidate));
111            }
112            Err(SlotOffsetIndexError::SlotNotFound(..)) => {
113                if candidate <= epoch_start {
114                    break;
115                }
116                skipped += 1;
117                candidate = candidate.saturating_sub(1);
118            }
119            Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
120        }
121    }
122    log::warn!(
123        target: log_target,
124        "no indexed slot found before {} (epoch start {}); reading from epoch start",
125        local_start,
126        epoch_start
127    );
128    Ok(None)
129}
130
131/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
132/// the firehose.
133#[derive(Debug, Error)]
134pub enum FirehoseError {
135    /// HTTP client error surfaced from `reqwest`.
136    Reqwest(reqwest::Error),
137    /// Failure while reading the Old Faithful CAR header.
138    ReadHeader(SharedError),
139    /// Error emitted by the Solana Geyser plugin service.
140    GeyserPluginService(GeyserPluginServiceError),
141    /// Transaction notifier could not be acquired from the Geyser service.
142    FailedToGetTransactionNotifier,
143    /// Failure while reading data until the next block boundary.
144    ReadUntilBlockError(SharedError),
145    /// Failure while fetching an individual block.
146    GetBlockError(SharedError),
147    /// Failed to decode a node at the given index.
148    NodeDecodingError(usize, SharedError),
149    /// Error surfaced when querying the slot offset index.
150    SlotOffsetIndexError(SlotOffsetIndexError),
151    /// Failure while seeking to a slot within the Old Faithful CAR stream.
152    SeekToSlotError(SharedError),
153    /// Error surfaced during the plugin `on_load` stage.
154    OnLoadError(SharedError),
155    /// Error emitted while invoking the stats handler.
156    OnStatsHandlerError(SharedError),
157    /// Timeout reached while waiting for a firehose operation.
158    OperationTimeout(&'static str),
159    /// Transaction handler returned an error.
160    TransactionHandlerError(SharedError),
161    /// Entry handler returned an error.
162    EntryHandlerError(SharedError),
163    /// Reward handler returned an error.
164    RewardHandlerError(SharedError),
165    /// Block handler returned an error.
166    BlockHandlerError(SharedError),
167}
168
169unsafe impl Send for FirehoseError {}
170unsafe impl Sync for FirehoseError {}
171
172impl Display for FirehoseError {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        match self {
175            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
176            FirehoseError::ReadHeader(error) => {
177                write!(f, "Error reading header: {}", error)
178            }
179            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
180                f,
181                "Error initializing geyser plugin service: {}",
182                geyser_plugin_service_error
183            ),
184            FirehoseError::FailedToGetTransactionNotifier => write!(
185                f,
186                "Failed to get transaction notifier from GeyserPluginService"
187            ),
188            FirehoseError::ReadUntilBlockError(error) => {
189                write!(f, "Error reading until block: {}", error)
190            }
191            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
192            FirehoseError::NodeDecodingError(item_index, error) => {
193                write!(
194                    f,
195                    "Error seeking, reading data from, or decoding data for data node {}: {}",
196                    item_index, error
197                )
198            }
199            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
200                f,
201                "Error getting info from slot offset index: {}",
202                slot_offset_index_error
203            ),
204            FirehoseError::SeekToSlotError(error) => {
205                write!(f, "Error seeking to slot: {}", error)
206            }
207            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
208            FirehoseError::OnStatsHandlerError(error) => {
209                write!(f, "Stats handler error: {}", error)
210            }
211            FirehoseError::OperationTimeout(op) => {
212                write!(f, "Timeout while waiting for operation: {}", op)
213            }
214            FirehoseError::TransactionHandlerError(error) => {
215                write!(f, "Transaction handler error: {}", error)
216            }
217            FirehoseError::EntryHandlerError(error) => {
218                write!(f, "Entry handler error: {}", error)
219            }
220            FirehoseError::RewardHandlerError(error) => {
221                write!(f, "Reward handler error: {}", error)
222            }
223            FirehoseError::BlockHandlerError(error) => {
224                write!(f, "Block handler error: {}", error)
225            }
226        }
227    }
228}
229
230impl From<reqwest::Error> for FirehoseError {
231    fn from(e: reqwest::Error) -> Self {
232        FirehoseError::Reqwest(e)
233    }
234}
235
236impl From<GeyserPluginServiceError> for FirehoseError {
237    fn from(e: GeyserPluginServiceError) -> Self {
238        FirehoseError::GeyserPluginService(e)
239    }
240}
241
242impl From<SlotOffsetIndexError> for FirehoseError {
243    fn from(e: SlotOffsetIndexError) -> Self {
244        FirehoseError::SlotOffsetIndexError(e)
245    }
246}
247
248/// Per-thread progress information emitted by the firehose runner.
249#[derive(Clone, PartialEq, Eq, Hash, Debug)]
250pub struct ThreadStats {
251    /// Identifier of the worker thread reporting the stats.
252    pub thread_id: usize,
253    /// Timestamp captured when the thread began processing.
254    pub start_time: std::time::Instant,
255    /// Timestamp captured when the thread finished, if finished.
256    pub finish_time: Option<std::time::Instant>,
257    /// Slot range currently assigned to the thread (half-open, may shrink on restart).
258    pub slot_range: Range<u64>,
259    /// Original slot range assigned to the thread (half-open, never modified).
260    pub initial_slot_range: Range<u64>,
261    /// Latest slot processed by the thread.
262    pub current_slot: u64,
263    /// Total slots processed by the thread.
264    pub slots_processed: u64,
265    /// Number of blocks successfully processed.
266    pub blocks_processed: u64,
267    /// Number of slots skipped by the cluster leader.
268    pub leader_skipped_slots: u64,
269    /// Total transactions processed.
270    pub transactions_processed: u64,
271    /// Total entries processed.
272    pub entries_processed: u64,
273    /// Total rewards processed.
274    pub rewards_processed: u64,
275}
276
277/// Aggregated firehose statistics covering all worker threads.
278#[derive(Clone, PartialEq, Eq, Hash, Debug)]
279pub struct Stats {
280    /// Per-thread statistics for the current update.
281    pub thread_stats: ThreadStats,
282    /// Timestamp captured when processing began.
283    pub start_time: std::time::Instant,
284    /// Timestamp captured when all processing finished, if finished.
285    pub finish_time: Option<std::time::Instant>,
286    /// Slot range currently being processed (half-open [start, end)).
287    pub slot_range: Range<u64>,
288    /// Aggregate slots processed across all threads.
289    pub slots_processed: u64,
290    /// Aggregate blocks processed across all threads.
291    pub blocks_processed: u64,
292    /// Aggregate skipped slots across all threads.
293    pub leader_skipped_slots: u64,
294    /// Aggregate transactions processed across all threads.
295    pub transactions_processed: u64,
296    /// Aggregate entries processed across all threads.
297    pub entries_processed: u64,
298    /// Aggregate rewards processed across all threads.
299    pub rewards_processed: u64,
300    /// Transactions processed since the previous stats pulse.
301    pub transactions_since_last_pulse: u64,
302    /// Blocks processed since the previous stats pulse.
303    pub blocks_since_last_pulse: u64,
304    /// Slots processed since the previous stats pulse.
305    pub slots_since_last_pulse: u64,
306    /// Elapsed time since the previous stats pulse.
307    pub time_since_last_pulse: std::time::Duration,
308}
309
310/// Configuration for periodic stats emission via a [`Handler`] callback.
311#[derive(Clone, PartialEq, Eq, Hash, Debug)]
312pub struct StatsTracking<OnStats: Handler<Stats>> {
313    /// Callback invoked whenever new stats are available.
314    pub on_stats: OnStats,
315    /// Emits a stats callback when the current slot is a multiple of this interval.
316    pub tracking_interval_slots: u64,
317}
318
319#[inline(always)]
320#[allow(clippy::too_many_arguments)]
321async fn maybe_emit_stats<OnStats: Handler<Stats>>(
322    stats_tracking: Option<&StatsTracking<OnStats>>,
323    thread_index: usize,
324    thread_stats: &ThreadStats,
325    overall_slots_processed: &AtomicU64,
326    overall_blocks_processed: &AtomicU64,
327    overall_transactions_processed: &AtomicU64,
328    overall_entries_processed: &AtomicU64,
329    transactions_since_stats: &AtomicU64,
330    blocks_since_stats: &AtomicU64,
331    slots_since_stats: &AtomicU64,
332    last_pulse: &Arc<AtomicU64>,
333    base_instant: std::time::Instant,
334) -> Result<(), (FirehoseError, u64)> {
335    if let Some(stats_tracker) = stats_tracking {
336        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
337        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
338        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
339        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
340        let now_nanos = base_instant.elapsed().as_nanos() as u64;
341        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
342        let delta_nanos = now_nanos.saturating_sub(previous);
343        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
344        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
345        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
346        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
347
348        let stats = Stats {
349            thread_stats: thread_stats.clone(),
350            start_time: thread_stats.start_time,
351            finish_time: thread_stats.finish_time,
352            slot_range: thread_stats.slot_range.clone(),
353            slots_processed: total_slots,
354            blocks_processed: total_blocks,
355            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
356            transactions_processed: total_transactions,
357            entries_processed: total_entries,
358            rewards_processed: thread_stats.rewards_processed,
359            transactions_since_last_pulse: processed_transactions,
360            blocks_since_last_pulse: processed_blocks,
361            slots_since_last_pulse: processed_slots,
362            time_since_last_pulse,
363        };
364
365        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
366            last_pulse.store(previous, Ordering::Relaxed);
367            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
368            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
369            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
370            return Err((
371                FirehoseError::OnStatsHandlerError(e),
372                thread_stats.current_slot,
373            ));
374        }
375    }
376    Ok(())
377}
378
379#[inline(always)]
380fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
381    if tracking_enabled {
382        atomic.fetch_add(value, Ordering::Relaxed);
383    }
384}
385
386fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
387    map.get(&thread_id)
388        .map(|set| set.remove(&slot).is_some())
389        .unwrap_or(false)
390}
391
392fn decode_transaction_status_meta_from_frame(
393    slot: u64,
394    reassembled_metadata: Vec<u8>,
395) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
396    if reassembled_metadata.is_empty() {
397        // Early epochs often omit metadata entirely.
398        return Ok(solana_transaction_status::TransactionStatusMeta::default());
399    }
400
401    match utils::decompress_zstd(reassembled_metadata.clone()) {
402        Ok(decompressed) => {
403            decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
404                Box::new(std::io::Error::other(format!(
405                    "decode transaction metadata (slot {slot}): {err}"
406                ))) as SharedError
407            })
408        }
409        Err(decomp_err) => {
410            // If the frame was not zstd-compressed (common for very early data), try to
411            // decode the raw bytes directly before bailing.
412            decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
413                Box::new(std::io::Error::other(format!(
414                    "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
415                ))) as SharedError
416            })
417        }
418    }
419}
420
421#[derive(Debug, Default)]
422struct DecodedRewards {
423    keyed_rewards: Vec<(Address, RewardInfo)>,
424    num_partitions: Option<u64>,
425}
426
427impl DecodedRewards {
428    fn empty() -> Self {
429        Self {
430            keyed_rewards: Vec::new(),
431            num_partitions: None,
432        }
433    }
434}
435
436fn decode_rewards_from_frame(
437    slot: u64,
438    reassembled_rewards: Vec<u8>,
439) -> Result<DecodedRewards, SharedError> {
440    if reassembled_rewards.is_empty() {
441        // Early epochs sometimes omit rewards payloads entirely.
442        return Ok(DecodedRewards::empty());
443    }
444
445    match utils::decompress_zstd(reassembled_rewards.clone()) {
446        Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
447            |err| {
448                Box::new(std::io::Error::other(format!(
449                    "decode rewards (slot {slot}): {err}"
450                ))) as SharedError
451            },
452        ),
453        Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
454            |err| {
455                Box::new(std::io::Error::other(format!(
456                    "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
457                ))) as SharedError
458            },
459        ),
460    }
461}
462
463fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
464    let epoch = slot_to_epoch(slot);
465    let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
466        prost_011::Message::decode(bytes);
467    match proto_attempt {
468        Ok(proto) => {
469            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
470            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
471                Box::new(std::io::Error::other(format!(
472                    "convert rewards proto failed (epoch {epoch}): {err}"
473                ))) as SharedError
474            })?;
475            Ok(DecodedRewards {
476                keyed_rewards,
477                num_partitions,
478            })
479        }
480        Err(proto_err) => {
481            let stored: solana_storage_proto::StoredExtendedRewards =
482                bincode::deserialize(bytes).map_err(|bin_err| {
483                    Box::new(std::io::Error::other(format!(
484                        "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
485                    ))) as SharedError
486                })?;
487            let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
488            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
489            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
490                Box::new(std::io::Error::other(format!(
491                    "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
492                ))) as SharedError
493            })?;
494            Ok(DecodedRewards {
495                keyed_rewards,
496                num_partitions,
497            })
498        }
499    }
500}
501
502fn decode_transaction_status_meta(
503    slot: u64,
504    metadata_bytes: &[u8],
505) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
506    let epoch = slot_to_epoch(slot);
507    let mut bincode_err: Option<String> = None;
508    if epoch < BINCODE_EPOCH_CUTOFF {
509        match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
510            metadata_bytes,
511        ) {
512            Ok(stored) => return Ok(stored.into()),
513            Err(err) => {
514                bincode_err = Some(err.to_string());
515            }
516        }
517    }
518
519    let bin_err_for_proto = bincode_err.clone();
520    let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
521        prost_011::Message::decode(metadata_bytes).map_err(|err| {
522            // If we already tried bincode, surface both failures for easier debugging.
523            if let Some(ref bin_err) = bin_err_for_proto {
524                Box::new(std::io::Error::other(format!(
525                    "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
526                ))) as SharedError
527            } else {
528                Box::new(std::io::Error::other(format!(
529                    "protobuf decode transaction metadata: {err}"
530                ))) as SharedError
531            }
532        })?;
533
534    proto.try_into().map_err(|err| {
535        if let Some(ref bin_err) = bincode_err {
536            Box::new(std::io::Error::other(format!(
537                "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
538            ))) as SharedError
539        } else {
540            Box::new(std::io::Error::other(format!(
541                "convert transaction metadata proto: {err}"
542            ))) as SharedError
543        }
544    })
545}
546
547#[cfg(test)]
548mod metadata_decode_tests {
549    use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
550    use solana_message::v0::LoadedAddresses;
551    use solana_storage_proto::StoredTransactionStatusMeta;
552    use solana_transaction_status::TransactionStatusMeta;
553
554    fn sample_meta() -> TransactionStatusMeta {
555        let mut meta = TransactionStatusMeta::default();
556        meta.fee = 42;
557        meta.pre_balances = vec![1, 2];
558        meta.post_balances = vec![3, 4];
559        meta.log_messages = Some(vec!["hello".into()]);
560        meta.pre_token_balances = Some(Vec::new());
561        meta.post_token_balances = Some(Vec::new());
562        meta.rewards = Some(Vec::new());
563        meta.compute_units_consumed = Some(7);
564        meta.cost_units = Some(9);
565        meta.loaded_addresses = LoadedAddresses::default();
566        meta
567    }
568
569    #[test]
570    fn decodes_bincode_metadata_for_early_epochs() {
571        let stored = StoredTransactionStatusMeta {
572            status: Ok(()),
573            fee: 42,
574            pre_balances: vec![1, 2],
575            post_balances: vec![3, 4],
576            inner_instructions: None,
577            log_messages: Some(vec!["hello".into()]),
578            pre_token_balances: Some(Vec::new()),
579            post_token_balances: Some(Vec::new()),
580            rewards: Some(Vec::new()),
581            return_data: None,
582            compute_units_consumed: Some(7),
583            cost_units: Some(9),
584        };
585        let bytes = bincode::serialize(&stored).expect("bincode serialize");
586        let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
587        assert_eq!(decoded, TransactionStatusMeta::from(stored));
588    }
589
590    #[test]
591    fn decodes_protobuf_metadata_for_later_epochs() {
592        let meta = sample_meta();
593        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
594            meta.clone().into();
595        let bytes = prost_011::Message::encode_to_vec(&generated);
596        let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
597        assert_eq!(decoded, meta);
598    }
599
600    #[test]
601    fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
602        let meta = sample_meta();
603        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
604            meta.clone().into();
605        let bytes = prost_011::Message::encode_to_vec(&generated);
606        // Epoch 100 should try bincode first; if those bytes are proto, we must fall back.
607        let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
608        assert_eq!(decoded, meta);
609    }
610
611    #[test]
612    fn empty_frame_decodes_to_default() {
613        let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
614        assert_eq!(decoded, TransactionStatusMeta::default());
615    }
616
617    #[test]
618    fn raw_bincode_frame_without_zstd_still_decodes() {
619        let stored = StoredTransactionStatusMeta {
620            status: Ok(()),
621            fee: 1,
622            pre_balances: vec![],
623            post_balances: vec![],
624            inner_instructions: None,
625            log_messages: None,
626            pre_token_balances: Some(Vec::new()),
627            post_token_balances: Some(Vec::new()),
628            rewards: Some(Vec::new()),
629            return_data: None,
630            compute_units_consumed: None,
631            cost_units: None,
632        };
633        let raw_bytes = bincode::serialize(&stored).expect("serialize");
634        let decoded =
635            decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
636        assert_eq!(decoded, TransactionStatusMeta::from(stored));
637    }
638}
639
640#[cfg(test)]
641mod rewards_decode_tests {
642    use super::decode_rewards_from_bytes;
643    use solana_sdk_ids::vote::id as vote_program_id;
644    use solana_storage_proto::StoredExtendedRewards;
645    use solana_transaction_status::{Reward, RewardType};
646
647    #[test]
648    fn decodes_protobuf_rewards() {
649        let pubkey = vote_program_id().to_string();
650        let proto = solana_storage_proto::convert::generated::Rewards {
651            rewards: vec![solana_storage_proto::convert::generated::Reward {
652                pubkey,
653                lamports: 5,
654                post_balance: 10,
655                reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
656                commission: "1".to_string(),
657            }],
658            num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
659                num_partitions: 2,
660            }),
661        };
662        let bytes = prost_011::Message::encode_to_vec(&proto);
663        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
664        assert_eq!(decoded.keyed_rewards.len(), 1);
665        assert_eq!(decoded.num_partitions, Some(2));
666    }
667
668    #[test]
669    fn decodes_bincode_rewards() {
670        let pubkey = vote_program_id().to_string();
671        let reward = Reward {
672            pubkey,
673            lamports: 7,
674            post_balance: 9,
675            reward_type: Some(RewardType::Rent),
676            commission: Some(3),
677        };
678        let stored_rewards: StoredExtendedRewards = vec![reward.into()];
679        let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
680        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
681        assert_eq!(decoded.keyed_rewards.len(), 1);
682        assert_eq!(decoded.num_partitions, None);
683    }
684}
685
686/// Firehose transaction payload passed to [`Handler`] callbacks.
687#[derive(Debug, Clone)]
688pub struct TransactionData {
689    /// Slot that contains the transaction.
690    pub slot: u64,
691    /// Index of the transaction within the slot.
692    pub transaction_slot_index: usize,
693    /// Transaction signature.
694    pub signature: solana_signature::Signature,
695    /// Hash of the transaction message.
696    pub message_hash: Hash,
697    /// Indicates whether the transaction is a vote.
698    pub is_vote: bool,
699    /// Status metadata returned by the Solana runtime.
700    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
701    /// Fully decoded transaction.
702    pub transaction: VersionedTransaction,
703}
704
705/// Block entry metadata passed to [`Handler`] callbacks.
706#[derive(Debug, Clone)]
707pub struct EntryData {
708    /// Slot that generated the entry.
709    pub slot: u64,
710    /// Index of the entry within the slot.
711    pub entry_index: usize,
712    /// Range of transaction indexes covered by the entry.
713    pub transaction_indexes: Range<usize>,
714    /// Number of hashes associated with the entry.
715    pub num_hashes: u64,
716    /// Entry hash.
717    pub hash: Hash,
718}
719
720/// Reward data conveyed to reward [`Handler`] callbacks.
721#[derive(Debug, Clone)]
722pub struct RewardsData {
723    /// Slot the rewards correspond to.
724    pub slot: u64,
725    /// Reward recipients and their associated reward information.
726    pub rewards: Vec<(Address, RewardInfo)>,
727}
728
729/// Block-level data streamed to block handlers.
730#[derive(Debug)]
731pub enum BlockData {
732    /// Fully populated block payload with ledger metadata.
733    Block {
734        /// Parent slot number.
735        parent_slot: u64,
736        /// Parent block hash.
737        parent_blockhash: Hash,
738        /// Current block slot.
739        slot: u64,
740        /// Current block hash.
741        blockhash: Hash,
742        /// Rewards keyed by account and partition information.
743        rewards: KeyedRewardsAndNumPartitions,
744        /// Optional Unix timestamp for the block.
745        block_time: Option<i64>,
746        /// Optional ledger block height.
747        block_height: Option<u64>,
748        /// Number of executed transactions in the block.
749        executed_transaction_count: u64,
750        /// Number of entries contained in the block.
751        entry_count: u64,
752    },
753    /// Marker indicating the slot appears skipped (either truly skipped or it is late and will
754    /// arrive out of order).
755    PossibleLeaderSkipped {
756        /// Slot number that either lacked a block or may still arrive later.
757        slot: u64,
758    },
759}
760
761impl BlockData {
762    /// Returns the slot associated with this block or skipped slot.
763    #[inline(always)]
764    pub const fn slot(&self) -> u64 {
765        match self {
766            BlockData::Block { slot, .. } => *slot,
767            BlockData::PossibleLeaderSkipped { slot } => *slot,
768        }
769    }
770
771    /// Returns `true` if this record currently represents a missing/possibly skipped slot.
772    #[inline(always)]
773    pub const fn was_skipped(&self) -> bool {
774        matches!(self, BlockData::PossibleLeaderSkipped { .. })
775    }
776
777    /// Returns the optional block time when available.
778    #[inline(always)]
779    pub const fn block_time(&self) -> Option<i64> {
780        match self {
781            BlockData::Block { block_time, .. } => *block_time,
782            BlockData::PossibleLeaderSkipped { .. } => None,
783        }
784    }
785}
786
787type HandlerResult = Result<(), SharedError>;
788type HandlerFuture = BoxFuture<'static, HandlerResult>;
789
790/// Asynchronous callback invoked for each firehose event of type `Data`.
791pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
792
793impl<Data, F> Handler<Data> for F where
794    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
795{
796}
797
798/// Function pointer alias for [`Handler`] callbacks.
799pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
800/// Convenience alias for block handlers accepted by [`firehose`].
801pub type OnBlockFn = HandlerFn<BlockData>;
802/// Convenience alias for transaction handlers accepted by [`firehose`].
803pub type OnTxFn = HandlerFn<TransactionData>;
804/// Convenience alias for entry handlers accepted by [`firehose`].
805pub type OnEntryFn = HandlerFn<EntryData>;
806/// Convenience alias for reward handlers accepted by [`firehose`].
807pub type OnRewardFn = HandlerFn<RewardsData>;
808/// Type alias for [`StatsTracking`] using simple function pointers.
809pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
810/// Convenience alias for firehose error handlers.
811pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
812/// Convenience alias for stats tracking handlers accepted by [`firehose`].
813pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
814
815/// Metadata describing a firehose worker failure.
816#[derive(Clone, Debug)]
817pub struct FirehoseErrorContext {
818    /// Thread index that encountered the error.
819    pub thread_id: usize,
820    /// Slot the worker was processing when the error surfaced.
821    pub slot: u64,
822    /// Epoch derived from the failing slot.
823    pub epoch: u64,
824    /// Stringified error payload for display/logging.
825    pub error_message: String,
826}
827
828/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
829///
830/// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the
831/// runner restarts from the last processed slot to maintain coverage.
832#[inline]
833#[allow(clippy::too_many_arguments)]
834pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
835    threads: u64,
836    slot_range: Range<u64>,
837    on_block: Option<OnBlock>,
838    on_tx: Option<OnTransaction>,
839    on_entry: Option<OnEntry>,
840    on_rewards: Option<OnRewards>,
841    on_error: Option<OnError>,
842    stats_tracking: Option<StatsTracking<OnStats>>,
843    shutdown_signal: Option<broadcast::Receiver<()>>,
844) -> Result<(), (FirehoseError, u64)>
845where
846    OnBlock: Handler<BlockData>,
847    OnTransaction: Handler<TransactionData>,
848    OnEntry: Handler<EntryData>,
849    OnRewards: Handler<RewardsData>,
850    OnStats: Handler<Stats>,
851    OnError: Handler<FirehoseErrorContext>,
852{
853    if threads == 0 {
854        return Err((
855            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
856            slot_range.start,
857        ));
858    }
859    let client = crate::network::create_http_client();
860    log::info!(target: LOG_MODULE, "starting firehose...");
861    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
862
863    let slot_range = Arc::new(slot_range);
864
865    // divide slot_range into n subranges
866    let subranges = generate_subranges(&slot_range, threads);
867    if threads > 1 {
868        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
869    }
870
871    let firehose_start = std::time::Instant::now();
872    let shutdown_flag = Arc::new(AtomicBool::new(false));
873    if let Some(ref rx) = shutdown_signal {
874        let mut rx = rx.resubscribe();
875        let flag = shutdown_flag.clone();
876        tokio::spawn(async move {
877            if rx.recv().await.is_ok() {
878                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
879                flag.store(true, Ordering::SeqCst);
880            }
881        });
882    }
883    let mut handles = Vec::new();
884    // Shared per-thread error counters
885    let error_counts: Arc<Vec<AtomicU32>> =
886        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
887
888    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
889    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
890    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
891    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
892    let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
893
894    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
895        let error_counts = error_counts.clone();
896        let client = client.clone();
897        let on_block = on_block.clone();
898        let on_tx = on_tx.clone();
899        let on_entry = on_entry.clone();
900        let on_reward = on_rewards.clone();
901        let on_error = on_error.clone();
902        let overall_slots_processed = overall_slots_processed.clone();
903        let overall_blocks_processed = overall_blocks_processed.clone();
904        let overall_transactions_processed = overall_transactions_processed.clone();
905        let overall_entries_processed = overall_entries_processed.clone();
906        let stats_tracking = stats_tracking.clone();
907        let transactions_since_stats = Arc::new(AtomicU64::new(0));
908        let blocks_since_stats = Arc::new(AtomicU64::new(0));
909        let slots_since_stats = Arc::new(AtomicU64::new(0));
910        let last_pulse = Arc::new(AtomicU64::new(0));
911        let transactions_since_stats_cloned = transactions_since_stats.clone();
912        let blocks_since_stats_cloned = blocks_since_stats.clone();
913        let slots_since_stats_cloned = slots_since_stats.clone();
914        let last_pulse_cloned = last_pulse.clone();
915        let shutdown_flag = shutdown_flag.clone();
916        let pending_skipped_slots = pending_skipped_slots.clone();
917        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
918
919        let handle = tokio::spawn(async move {
920            let transactions_since_stats = transactions_since_stats_cloned;
921            let blocks_since_stats = blocks_since_stats_cloned;
922            let slots_since_stats = slots_since_stats_cloned;
923            let last_pulse = last_pulse_cloned;
924            let mut shutdown_rx = thread_shutdown_rx;
925            let start_time = firehose_start;
926            last_pulse.store(
927                firehose_start.elapsed().as_nanos() as u64,
928                Ordering::Relaxed,
929            );
930            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
931            let mut skip_until_index = None;
932            let last_emitted_slot = slot_range.start.saturating_sub(1);
933            let block_enabled = on_block.is_some();
934            let tx_enabled = on_tx.is_some();
935            let entry_enabled = on_entry.is_some();
936            let reward_enabled = on_reward.is_some();
937            let tracking_enabled = stats_tracking.is_some();
938            if block_enabled {
939                pending_skipped_slots.entry(thread_index).or_default();
940            }
941            let mut last_counted_slot = slot_range.start.saturating_sub(1);
942            let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
943            let mut thread_stats = if tracking_enabled {
944                Some(ThreadStats {
945                    thread_id: thread_index,
946                    start_time,
947                    finish_time: None,
948                    slot_range: slot_range.clone(),
949                    initial_slot_range: slot_range.clone(),
950                    current_slot: slot_range.start,
951                    slots_processed: 0,
952                    blocks_processed: 0,
953                    leader_skipped_slots: 0,
954                    transactions_processed: 0,
955                    entries_processed: 0,
956                    rewards_processed: 0,
957                })
958            } else {
959                None
960            };
961
962            // let mut triggered = false;
963            while let Err((err, slot)) = async {
964                let mut last_emitted_slot = last_emitted_slot_global;
965                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
966                    log::info!(
967                        target: &log_target,
968                        "shutdown requested; terminating firehose thread {}",
969                        thread_index
970                    );
971                    return Ok(());
972                }
973                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
974                log::info!(
975                    target: &log_target,
976                    "slot range: {} (epoch {}) ... {} (epoch {})",
977                    slot_range.start,
978                    slot_to_epoch(slot_range.start),
979                    slot_range.end,
980                    slot_to_epoch(slot_range.end)
981                );
982
983                log::info!(target: &log_target, "🚒 starting firehose...");
984
985                // for each epoch
986                let mut current_slot: Option<u64> = None;
987                for epoch_num in epoch_range.clone() {
988                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
989                        log::info!(
990                            target: &log_target,
991                            "shutdown requested; terminating firehose thread {}",
992                            thread_index
993                        );
994                        return Ok(());
995                    }
996                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
997                    let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
998                        Ok(stream) => stream,
999                        Err(_) => {
1000                            return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1001                        }
1002                    };
1003                    let mut reader = NodeReader::new(stream);
1004
1005                    let header_fut = reader.read_raw_header();
1006                    let header = match timeout(OP_TIMEOUT, header_fut).await {
1007                        Ok(res) => res
1008                            .map_err(FirehoseError::ReadHeader)
1009                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1010                        Err(_) => {
1011                            return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1012                        }
1013                    };
1014                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1015
1016                    let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1017                    let local_start = std::cmp::max(slot_range.start, epoch_start);
1018                    let local_end_inclusive =
1019                        std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1020                    if local_start > local_end_inclusive {
1021                        log::debug!(
1022                            target: &log_target,
1023                            "epoch {} has no overlap with thread range ({}..{}), skipping",
1024                            epoch_num,
1025                            slot_range.start,
1026                            slot_range.end
1027                        );
1028                        continue;
1029                    }
1030
1031                    let mut previous_blockhash = Hash::default();
1032                    let mut latest_entry_blockhash = Hash::default();
1033                    // Reset counters to align to the local epoch slice; prevents boundary slots
1034                    // from being treated as already-counted after a restart.
1035                    last_counted_slot = local_start.saturating_sub(1);
1036                    current_slot = None;
1037                    if tracking_enabled
1038                        && let Some(ref mut stats) = thread_stats {
1039                            stats.current_slot = local_start;
1040                            stats.slot_range.start = local_start;
1041                        }
1042
1043                    if local_start > epoch_start {
1044                        // Seek to the nearest previous indexed slot so the stream includes all
1045                        // nodes (transactions, entries, rewards) that precede `local_start`.
1046                        let seek_slot = match timeout(
1047                            OP_TIMEOUT,
1048                            find_previous_indexed_slot(local_start, epoch_start, &log_target),
1049                        )
1050                        .await
1051                        {
1052                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1053                            Err(_) => {
1054                                return Err((
1055                                    FirehoseError::OperationTimeout(
1056                                        "seek_to_previous_indexed_slot",
1057                                    ),
1058                                    current_slot.unwrap_or(slot_range.start),
1059                                ));
1060                            }
1061                        };
1062                        if let Some(seek_slot) = seek_slot {
1063                            let seek_fut = reader.seek_to_slot(seek_slot);
1064                            match timeout(OP_TIMEOUT, seek_fut).await {
1065                                Ok(res) => {
1066                                    res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1067                                }
1068                                Err(_) => {
1069                                    return Err((
1070                                        FirehoseError::OperationTimeout("seek_to_slot"),
1071                                        current_slot.unwrap_or(slot_range.start),
1072                                    ));
1073                                }
1074                            }
1075                        }
1076                    }
1077
1078                    // for each item in each block
1079                    let mut item_index = 0;
1080                    let mut displayed_skip_message = false;
1081                    loop {
1082                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1083                            log::info!(
1084                                target: &log_target,
1085                                "shutdown requested; terminating firehose thread {}",
1086                                thread_index
1087                            );
1088                            return Ok(());
1089                        }
1090                        let read_fut = reader.read_until_block();
1091                        let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1092                            Ok(result) => result
1093                                .map_err(FirehoseError::ReadUntilBlockError)
1094                                .map_err(|e| {
1095                                    (
1096                                        e,
1097                                        current_slot
1098                                            .map(|slot| slot.saturating_add(1))
1099                                            .unwrap_or(slot_range.start),
1100                                    )
1101                                })?,
1102                            Err(_) => {
1103                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1104                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1105                            }
1106                        };
1107                        if nodes.is_empty() {
1108                            log::info!(
1109                                target: &log_target,
1110                                "reached end of epoch {}",
1111                                epoch_num
1112                            );
1113                            break;
1114                        }
1115                        if let Some(last_node) = nodes.0.last()
1116                            && !last_node.get_node().is_block()
1117                        {
1118                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1119                            break;
1120                        }
1121                        let block = nodes
1122                            .get_block()
1123                            .map_err(FirehoseError::GetBlockError)
1124                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1125                        log::debug!(
1126                            target: &log_target,
1127                            "read {} items from epoch {}, now at slot {}",
1128                            item_index,
1129                            epoch_num,
1130                            block.slot
1131                        );
1132                        let slot = block.slot;
1133                        if slot > local_end_inclusive {
1134                            log::debug!(
1135                                target: &log_target,
1136                                "reached end of local slice at slot {} (epoch {}), stopping",
1137                                slot,
1138                                epoch_num
1139                            );
1140                            break;
1141                        }
1142                        if slot >= slot_range.end {
1143                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1144                            // Return early to terminate the firehose thread cleanly. We use >=
1145                            // because slot_range is half-open [start, end), so any slot equal
1146                            // to end is out-of-range and must not be processed. Do not emit
1147                            // synthetic skipped slots here; another thread may own the boundary.
1148                            if block_enabled {
1149                                pending_skipped_slots.remove(&thread_index);
1150                            }
1151                            return Ok(());
1152                        }
1153                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1154                        if slot < slot_range.start {
1155                            if slot.saturating_add(1) == slot_range.start {
1156                                log::debug!(
1157                                    target: &log_target,
1158                                    "priming reader with preceding slot {}, skipping",
1159                                    slot
1160                                );
1161                            } else {
1162                                log::warn!(
1163                                    target: &log_target,
1164                                    "encountered slot {} before start of range {}, skipping",
1165                                    slot,
1166                                    slot_range.start
1167                                );
1168                            }
1169                            continue;
1170                        }
1171                        current_slot = Some(slot);
1172                        let mut entry_index: usize = 0;
1173                        let mut this_block_executed_transaction_count: u64 = 0;
1174                        let mut this_block_entry_count: u64 = 0;
1175                        let mut this_block_rewards = DecodedRewards::empty();
1176
1177                        for node_with_cid in &nodes.0 {
1178                            item_index += 1;
1179                            if let Some(skip) = skip_until_index {
1180                                if item_index < skip {
1181                                    if !displayed_skip_message {
1182                                        log::info!(
1183                                            target: &log_target,
1184                                            "skipping until index {} (at {})",
1185                                            skip,
1186                                            item_index
1187                                        );
1188                                        displayed_skip_message = true;
1189                                    }
1190                                    continue;
1191                                } else {
1192                                    log::info!(
1193                                        target: &log_target,
1194                                        "reached target index {}, resuming...",
1195                                        skip
1196                                    );
1197                                    skip_until_index = None;
1198                                }
1199                            }
1200                            let node = node_with_cid.get_node();
1201
1202                            if let Some(ref mut stats) = thread_stats {
1203                                stats.current_slot = slot;
1204                            }
1205
1206                            let error_slot = current_slot.unwrap_or(slot_range.start);
1207
1208                            use crate::node::Node::*;
1209                            match node {
1210                                Transaction(tx) => {
1211                                    if tx_enabled
1212                                        && let Some(on_tx_cb) = on_tx.as_ref()
1213                                    {
1214                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1215                                        let versioned_tx = tx.as_parsed().map_err(|err| {
1216                                            (
1217                                                FirehoseError::NodeDecodingError(item_index, err),
1218                                                error_slot,
1219                                            )
1220                                        })?;
1221                                        let reassembled_metadata = nodes
1222                                            .reassemble_dataframes(tx.metadata.clone())
1223                                            .map_err(|err| {
1224                                                (
1225                                                    FirehoseError::NodeDecodingError(item_index, err),
1226                                                    error_slot,
1227                                                )
1228                                            })?;
1229
1230                                        let as_native_metadata = decode_transaction_status_meta_from_frame(
1231                                            block.slot,
1232                                            reassembled_metadata,
1233                                        )
1234                                        .map_err(|err| {
1235                                            (
1236                                                FirehoseError::NodeDecodingError(item_index, err),
1237                                                error_slot,
1238                                            )
1239                                        })?;
1240
1241                                        let message_hash = {
1242                                            #[cfg(feature = "verify-transaction-signatures")]
1243                                            {
1244                                                versioned_tx.verify_and_hash_message().map_err(|err| {
1245                                                    (
1246                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
1247                                                        error_slot,
1248                                                    )
1249                                                })?
1250                                            }
1251                                            #[cfg(not(feature = "verify-transaction-signatures"))]
1252                                            {
1253                                                versioned_tx.message.hash()
1254                                            }
1255                                        };
1256                                        let signature = versioned_tx
1257                                            .signatures
1258                                            .first()
1259                                            .ok_or_else(|| {
1260                                                Box::new(std::io::Error::new(
1261                                                    std::io::ErrorKind::InvalidData,
1262                                                    "transaction missing signature",
1263                                                )) as SharedError
1264                                            })
1265                                            .map_err(|err| {
1266                                                (
1267                                                    FirehoseError::NodeDecodingError(
1268                                                        item_index,
1269                                                        err,
1270                                                    ),
1271                                                    error_slot,
1272                                                )
1273                                            })?;
1274                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
1275
1276                                        on_tx_cb(
1277                                            thread_index,
1278                                            TransactionData {
1279                                                slot: block.slot,
1280                                                transaction_slot_index: tx.index.unwrap() as usize,
1281                                                signature: *signature,
1282                                                message_hash,
1283                                                is_vote,
1284                                                transaction_status_meta: as_native_metadata.clone(),
1285                                                transaction: versioned_tx.clone(),
1286                                            },
1287                                        )
1288                                        .await
1289                                        .map_err(|e| {
1290                                            (
1291                                                FirehoseError::TransactionHandlerError(e),
1292                                                error_slot,
1293                                            )
1294                                        })?;
1295                                    }
1296                                    fetch_add_if(
1297                                        tracking_enabled,
1298                                        &overall_transactions_processed,
1299                                        1,
1300                                    );
1301                                    if let Some(ref mut stats) = thread_stats {
1302                                        stats.transactions_processed += 1;
1303                                    }
1304                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1305                                }
1306                                Entry(entry) => {
1307                                    let entry_hash = Hash::from(entry.hash.to_bytes());
1308                                    let entry_transaction_count = entry.transactions.len();
1309                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
1310                                    let starting_transaction_index_u64 =
1311                                        this_block_executed_transaction_count;
1312                                    latest_entry_blockhash = entry_hash;
1313                                    this_block_executed_transaction_count += entry_transaction_count_u64;
1314                                    this_block_entry_count += 1;
1315
1316                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1317                                        let starting_transaction_index = usize::try_from(
1318                                            starting_transaction_index_u64,
1319                                        )
1320                                        .map_err(|err| {
1321                                            (
1322                                                FirehoseError::EntryHandlerError(Box::new(err)),
1323                                                error_slot,
1324                                            )
1325                                        })?;
1326                                        let transaction_indexes_end =
1327                                            starting_transaction_index + entry_transaction_count;
1328                                        on_entry_cb(
1329                                            thread_index,
1330                                            EntryData {
1331                                                slot: block.slot,
1332                                                entry_index,
1333                                                transaction_indexes: starting_transaction_index
1334                                                    ..transaction_indexes_end,
1335                                                num_hashes: entry.num_hashes,
1336                                                hash: entry_hash,
1337                                            },
1338                                        )
1339                                        .await
1340                                        .map_err(|e| {
1341                                            (
1342                                                FirehoseError::EntryHandlerError(e),
1343                                                error_slot,
1344                                            )
1345                                        })?;
1346                                    }
1347                                    entry_index += 1;
1348                                    fetch_add_if(
1349                                        tracking_enabled,
1350                                        &overall_entries_processed,
1351                                        1,
1352                                    );
1353                                    if let Some(ref mut stats) = thread_stats {
1354                                        stats.entries_processed += 1;
1355                                    }
1356                                }
1357                                Block(block) => {
1358                                    let prev_last_counted_slot = last_counted_slot;
1359                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1360                                        (
1361                                            stats.slots_processed,
1362                                            stats.blocks_processed,
1363                                            stats.leader_skipped_slots,
1364                                            stats.current_slot,
1365                                        )
1366                                    });
1367
1368                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1369                                    let skip_start_from_previous = last_counted_slot.saturating_add(1);
1370                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
1371
1372                                    let skipped_epoch = slot_to_epoch(last_counted_slot);
1373                                    for skipped_slot in skip_start..slot {
1374                                        if slot_to_epoch(skipped_slot) != skipped_epoch {
1375                                            break;
1376                                        }
1377                                        log::debug!(
1378                                            target: &log_target,
1379                                            "leader skipped slot {} (prev_counted {}, current slot {})",
1380                                            skipped_slot,
1381                                            prev_last_counted_slot,
1382                                            slot,
1383                                        );
1384                                        if block_enabled {
1385                                            pending_skipped_slots
1386                                                .entry(thread_index)
1387                                                .or_default()
1388                                                .insert(skipped_slot);
1389                                        }
1390                                        if block_enabled
1391                                            && let Some(on_block_cb) = on_block.as_ref()
1392                                            && skipped_slot > last_emitted_slot {
1393                                                last_emitted_slot = skipped_slot;
1394                                                on_block_cb(
1395                                                    thread_index,
1396                                                    BlockData::PossibleLeaderSkipped {
1397                                                        slot: skipped_slot,
1398                                                    },
1399                                                )
1400                                                .await
1401                                                .map_err(|e| {
1402                                                    (
1403                                                        FirehoseError::BlockHandlerError(e),
1404                                                        error_slot,
1405                                                    )
1406                                                })?;
1407                                            }
1408                                        if tracking_enabled {
1409                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1410                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1411                                            if let Some(ref mut stats) = thread_stats {
1412                                                stats.leader_skipped_slots += 1;
1413                                                stats.slots_processed += 1;
1414                                                stats.current_slot = skipped_slot;
1415                                            }
1416                                        }
1417                                        last_counted_slot = skipped_slot;
1418                                    }
1419
1420                                    let cleared_pending_skip = if block_enabled {
1421                                        clear_pending_skip(
1422                                            &pending_skipped_slots,
1423                                            thread_index,
1424                                            slot,
1425                                        )
1426                                    } else {
1427                                        false
1428                                    };
1429
1430                                    if slot <= last_counted_slot && !cleared_pending_skip {
1431                                        log::debug!(
1432                                            target: &log_target,
1433                                            "duplicate block {}, already counted (last_counted={})",
1434                                            slot,
1435                                            last_counted_slot,
1436                                        );
1437                                        this_block_rewards = DecodedRewards::empty();
1438                                        continue;
1439                                    }
1440
1441                                    if block_enabled {
1442                                        if let Some(on_block_cb) = on_block.as_ref() {
1443                                            let DecodedRewards {
1444                                                keyed_rewards,
1445                                                num_partitions,
1446                                            } = std::mem::take(&mut this_block_rewards);
1447                                            if slot > last_emitted_slot {
1448                                                last_emitted_slot = slot;
1449                                                on_block_cb(
1450                                                    thread_index,
1451                                                    BlockData::Block {
1452                                                        parent_slot: block.meta.parent_slot,
1453                                                        parent_blockhash: previous_blockhash,
1454                                                        slot: block.slot,
1455                                                        blockhash: latest_entry_blockhash,
1456                                                        rewards: KeyedRewardsAndNumPartitions {
1457                                                            keyed_rewards,
1458                                                            num_partitions,
1459                                                        },
1460                                                        block_time: Some(block.meta.blocktime as i64),
1461                                                        block_height: block.meta.block_height,
1462                                                        executed_transaction_count:
1463                                                            this_block_executed_transaction_count,
1464                                                        entry_count: this_block_entry_count,
1465                                                    },
1466                                                )
1467                                                .await
1468                                                .map_err(|e| {
1469                                                    (
1470                                                        FirehoseError::BlockHandlerError(e),
1471                                                        error_slot,
1472                                                    )
1473                                                })?;
1474                                            }
1475                                        }
1476                                    } else {
1477                                        this_block_rewards = DecodedRewards::empty();
1478                                    }
1479                                    previous_blockhash = latest_entry_blockhash;
1480
1481                                    if tracking_enabled {
1482                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1483                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1484                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1485                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1486                                        if let Some(ref mut stats) = thread_stats {
1487                                            stats.blocks_processed += 1;
1488                                            stats.slots_processed += 1;
1489                                            stats.current_slot = slot;
1490                                        }
1491
1492                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1493                                            (&stats_tracking, thread_stats.as_mut())
1494                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1495                                                && let Err(err) = maybe_emit_stats(
1496                                                    stats_tracking.as_ref(),
1497                                                    thread_index,
1498                                                    thread_stats_ref,
1499                                                    &overall_slots_processed,
1500                                                    &overall_blocks_processed,
1501                                                    &overall_transactions_processed,
1502                                                    &overall_entries_processed,
1503                                                &transactions_since_stats,
1504                                                &blocks_since_stats,
1505                                                &slots_since_stats,
1506                                                &last_pulse,
1507                                                start_time,
1508                                            )
1509                                            .await
1510                                            {
1511                                                blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1512                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1513                                                    overall_blocks_processed
1514                                                        .fetch_sub(1, Ordering::Relaxed);
1515                                                    overall_slots_processed
1516                                                        .fetch_sub(1, Ordering::Relaxed);
1517                                                    if let Some((
1518                                                        prev_slots_processed,
1519                                                        prev_blocks_processed,
1520                                                        prev_leader_skipped,
1521                                                        prev_current_slot,
1522                                                    )) = thread_stats_snapshot
1523                                                    {
1524                                                        thread_stats_ref.slots_processed =
1525                                                            prev_slots_processed;
1526                                                        thread_stats_ref.blocks_processed =
1527                                                            prev_blocks_processed;
1528                                                        thread_stats_ref.leader_skipped_slots =
1529                                                            prev_leader_skipped;
1530                                                        thread_stats_ref.current_slot =
1531                                                            prev_current_slot;
1532                                                    }
1533                                                    last_counted_slot = prev_last_counted_slot;
1534                                                    return Err(err);
1535                                                }
1536                                    }
1537
1538                                    if slot > last_counted_slot {
1539                                        last_counted_slot = slot;
1540                                    }
1541                                }
1542                                Subset(_subset) => (),
1543                                Epoch(_epoch) => (),
1544                                Rewards(rewards) => {
1545                                    if reward_enabled || block_enabled {
1546                                        let reassembled = nodes
1547                                            .reassemble_dataframes(rewards.data.clone())
1548                                            .map_err(|err| {
1549                                                (
1550                                                    FirehoseError::NodeDecodingError(item_index, err),
1551                                                    current_slot.unwrap_or(slot_range.start),
1552                                                )
1553                                            })?;
1554                                        if reassembled.is_empty() {
1555                                            this_block_rewards = DecodedRewards::empty();
1556                                            if reward_enabled
1557                                                && let Some(on_reward_cb) = on_reward.as_ref()
1558                                            {
1559                                                on_reward_cb(
1560                                                    thread_index,
1561                                                    RewardsData {
1562                                                        slot: block.slot,
1563                                                        rewards: Vec::new(),
1564                                                    },
1565                                                )
1566                                                .await
1567                                                .map_err(|e| {
1568                                                    (
1569                                                        FirehoseError::RewardHandlerError(e),
1570                                                        error_slot,
1571                                                    )
1572                                                })?;
1573                                            }
1574                                            continue;
1575                                        }
1576
1577                                        let decoded_rewards =
1578                                            decode_rewards_from_frame(block.slot, reassembled)
1579                                                .map_err(|err| {
1580                                                    (
1581                                                        FirehoseError::NodeDecodingError(
1582                                                            item_index,
1583                                                            err,
1584                                                        ),
1585                                                        error_slot,
1586                                                    )
1587                                                })?;
1588                                        if reward_enabled
1589                                            && let Some(on_reward_cb) = on_reward.as_ref()
1590                                        {
1591                                            on_reward_cb(
1592                                                thread_index,
1593                                                RewardsData {
1594                                                    slot: block.slot,
1595                                                    rewards: decoded_rewards.keyed_rewards.clone(),
1596                                                },
1597                                            )
1598                                            .await
1599                                            .map_err(|e| {
1600                                                (
1601                                                    FirehoseError::RewardHandlerError(e),
1602                                                    error_slot,
1603                                                )
1604                                            })?;
1605                                        }
1606                                        this_block_rewards = decoded_rewards;
1607                                        if let Some(ref mut stats) = thread_stats {
1608                                            stats.rewards_processed +=
1609                                                this_block_rewards.keyed_rewards.len() as u64;
1610                                        }
1611                                    }
1612                                }
1613                                DataFrame(_data_frame) => (),
1614                            }
1615                        }
1616                        if block.slot == slot_range.end - 1 {
1617                            let finish_time = std::time::Instant::now();
1618                            let elapsed = finish_time.duration_since(start_time);
1619                            log::info!(target: &log_target, "processed slot {}", block.slot);
1620                            let elapsed_pretty = human_readable_duration(elapsed);
1621                            log::info!(
1622                                target: &log_target,
1623                                "processed {} slots across {} epochs in {}.",
1624                                slot_range.end - slot_range.start,
1625                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1626                                elapsed_pretty
1627                            );
1628                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1629                            // On completion, report threads with non-zero error counts for
1630                            // visibility.
1631                            let summary: String = error_counts
1632                                .iter()
1633                                .enumerate()
1634                                .filter_map(|(i, c)| {
1635                                    let v = c.load(Ordering::Relaxed);
1636                                    if v > 0 {
1637                                        Some(format!("{:03}({})", i, v))
1638                                    } else {
1639                                        None
1640                                    }
1641                                })
1642                                .collect::<Vec<_>>()
1643                                .join(", ");
1644                            if !summary.is_empty() {
1645                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1646                            }
1647                            return Ok(());
1648                        }
1649                    }
1650                    if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1651                        && last_counted_slot < expected_last_slot
1652                    {
1653                        // Do not synthesize skipped slots during final flush; another thread may
1654                        // cover the remaining range (especially across epoch boundaries).
1655                    }
1656                    if let Some(ref mut stats) = thread_stats {
1657                        stats.finish_time = Some(std::time::Instant::now());
1658                        maybe_emit_stats(
1659                            stats_tracking.as_ref(),
1660                            thread_index,
1661                            stats,
1662                            &overall_slots_processed,
1663                            &overall_blocks_processed,
1664                            &overall_transactions_processed,
1665                            &overall_entries_processed,
1666                            &transactions_since_stats,
1667                            &blocks_since_stats,
1668                            &slots_since_stats,
1669                            &last_pulse,
1670                            start_time,
1671                        )
1672                        .await?;
1673                    }
1674                    if block_enabled {
1675                        pending_skipped_slots.remove(&thread_index);
1676                    }
1677                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1678                    }
1679                    Ok(())
1680            }
1681            .await
1682            {
1683                if is_shutdown_error(&err) {
1684                    log::info!(
1685                        target: &log_target,
1686                        "shutdown requested; terminating firehose thread {}",
1687                        thread_index
1688                    );
1689                    break;
1690                }
1691                let epoch = slot_to_epoch(slot);
1692                let item_index = match &err {
1693                    FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1694                    _ => 0,
1695                };
1696                let error_message = err.to_string();
1697                log::error!(
1698                    target: &log_target,
1699                    "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1700                    slot,
1701                    epoch
1702                );
1703                log::error!(target: &log_target, "{}", error_message);
1704                if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1705                    || error_message.contains("Unknown CID version")
1706                {
1707                    // Clear cached index data for this epoch to avoid retrying with a bad/partial index
1708                    // (or a bad seek offset that landed mid-stream).
1709                    SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1710                }
1711                if let Some(on_error_cb) = on_error.clone() {
1712                    let context = FirehoseErrorContext {
1713                        thread_id: thread_index,
1714                        slot,
1715                        epoch,
1716                        error_message: error_message.clone(),
1717                    };
1718                    if let Err(handler_err) = on_error_cb(thread_index, context).await {
1719                        log::error!(
1720                            target: &log_target,
1721                            "on_error handler failed: {}",
1722                            handler_err
1723                        );
1724                    }
1725                }
1726                // Increment this thread's error counter
1727                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1728                log::warn!(
1729                    target: &log_target,
1730                    "restarting from slot {} at index {}",
1731                    slot,
1732                    item_index,
1733                );
1734                // Update slot range to resume from the failed slot, not the original start.
1735                // Reset local tracking so we don't treat the resumed slot range as already counted.
1736                // If we've already counted this slot, resume from the next one to avoid duplicates.
1737                if slot <= last_counted_slot {
1738                    slot_range.start = last_counted_slot.saturating_add(1);
1739                } else {
1740                    slot_range.start = slot;
1741                }
1742                // Reset pulse timer to exclude downtime from next rate calc.
1743                last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1744                if tracking_enabled
1745                    && let Some(ref mut stats_ref) = thread_stats {
1746                        stats_ref.slot_range.start = slot_range.start;
1747                        stats_ref.slot_range.end = slot_range.end;
1748                        // initial_slot_range remains unchanged for progress reporting.
1749                    }
1750                if block_enabled {
1751                    pending_skipped_slots.remove(&thread_index);
1752                }
1753                // `skip_until_index` is unsafe across retries because `item_index`
1754                // is reset to 0 each epoch restart. Keeping it can skip large portions
1755                // of the stream and silently drop slots.
1756                skip_until_index = None;
1757                last_emitted_slot_global = last_emitted_slot;
1758            }
1759        });
1760        handles.push(handle);
1761    }
1762
1763    // Wait for all threads to complete
1764    for handle in handles {
1765        handle.await.unwrap();
1766    }
1767    if stats_tracking.is_some() {
1768        let elapsed = firehose_start.elapsed();
1769        let elapsed_secs = elapsed.as_secs_f64();
1770        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1771        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1772        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1773        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1774        let total_errors: u64 = error_counts
1775            .iter()
1776            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1777            .sum();
1778        let overall_tps = if elapsed_secs > 0.0 {
1779            total_transactions as f64 / elapsed_secs
1780        } else {
1781            0.0
1782        };
1783        log::info!(
1784            target: LOG_MODULE,
1785            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1786            elapsed_secs,
1787            total_slots,
1788            total_blocks,
1789            total_leader_skipped,
1790            total_transactions,
1791            overall_tps,
1792            total_errors
1793        );
1794    }
1795    if shutdown_flag.load(Ordering::SeqCst) {
1796        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1797    } else {
1798        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1799    }
1800    Ok(())
1801}
1802
1803#[allow(clippy::result_large_err)]
1804/// Builds a Geyser-backed firehose and returns a slot notification stream.
1805///
1806/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1807/// rather than relying solely on remote streams. The provided `slot_range` is treated as a
1808/// half-open interval `[start, end)`, and the thread will restart from the last processed
1809/// slot on recoverable errors to maintain coverage.
1810pub fn firehose_geyser(
1811    rt: Arc<tokio::runtime::Runtime>,
1812    slot_range: Range<u64>,
1813    geyser_config_files: Option<&[PathBuf]>,
1814    index_base_url: &Url,
1815    client: &Client,
1816    on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1817    threads: u64,
1818) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1819    if threads == 0 {
1820        return Err((
1821            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1822            slot_range.start,
1823        ));
1824    }
1825    log::info!(target: LOG_MODULE, "starting firehose...");
1826    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1827    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1828    let mut entry_notifier_maybe = None;
1829    let mut block_meta_notifier_maybe = None;
1830    let mut transaction_notifier_maybe = None;
1831    if let Some(geyser_config_files) = geyser_config_files {
1832        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1833
1834        let service =
1835            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1836                confirmed_bank_receiver.clone(),
1837                true,
1838                geyser_config_files,
1839            )
1840            .map_err(|e| (e.into(), slot_range.start))?;
1841
1842        transaction_notifier_maybe = Some(
1843            service
1844                .get_transaction_notifier()
1845                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1846                .map_err(|e| (e, slot_range.start))?,
1847        );
1848
1849        entry_notifier_maybe = service.get_entry_notifier();
1850        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1851
1852        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1853    }
1854
1855    if entry_notifier_maybe.is_some() {
1856        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1857    } else {
1858        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1859    }
1860    log::info!(target: LOG_MODULE, "running on_load...");
1861    rt.spawn(on_load);
1862
1863    let slot_range = Arc::new(slot_range);
1864    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1865    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1866    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1867    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1868
1869    // divide slot_range into n subranges
1870    let subranges = generate_subranges(&slot_range, threads);
1871    if threads > 1 {
1872        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1873    }
1874
1875    let mut handles = Vec::new();
1876    // Shared per-thread error counters
1877    let error_counts: Arc<Vec<AtomicU32>> =
1878        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1879
1880    for (i, slot_range) in subranges.into_iter().enumerate() {
1881        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1882        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1883        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1884        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1885        let client = client.clone();
1886        let error_counts = error_counts.clone();
1887
1888        let rt_clone = rt.clone();
1889
1890        let handle = std::thread::spawn(move || {
1891            rt_clone.block_on(async {
1892                firehose_geyser_thread(
1893                    slot_range,
1894                    transaction_notifier_maybe,
1895                    entry_notifier_maybe,
1896                    block_meta_notifier_maybe,
1897                    confirmed_bank_sender,
1898                    &client,
1899                    if threads > 1 { Some(i) } else { None },
1900                    error_counts,
1901                )
1902                .await
1903                .unwrap();
1904            });
1905        });
1906        handles.push(handle);
1907    }
1908
1909    // Wait for all threads to complete
1910    for handle in handles {
1911        handle.join().unwrap();
1912    }
1913    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1914    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1915        block_meta_notifier.notify_block_metadata(
1916            u64::MAX,
1917            "unload",
1918            u64::MAX,
1919            "unload",
1920            &KeyedRewardsAndNumPartitions {
1921                keyed_rewards: vec![],
1922                num_partitions: None,
1923            },
1924            None,
1925            None,
1926            0,
1927            0,
1928        );
1929    }
1930    Ok(confirmed_bank_receiver)
1931}
1932
1933#[allow(clippy::too_many_arguments)]
1934#[allow(clippy::result_large_err)]
1935async fn firehose_geyser_thread(
1936    mut slot_range: Range<u64>,
1937    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1938    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1939    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1940    confirmed_bank_sender: Sender<SlotNotification>,
1941    client: &Client,
1942    thread_index: Option<usize>,
1943    error_counts: Arc<Vec<AtomicU32>>,
1944) -> Result<(), (FirehoseError, u64)> {
1945    let start_time = std::time::Instant::now();
1946    let log_target = if let Some(thread_index) = thread_index {
1947        format!("{}::T{:03}", LOG_MODULE, thread_index)
1948    } else {
1949        LOG_MODULE.to_string()
1950    };
1951    let initial_slot_range = slot_range.clone();
1952    let mut skip_until_index = None;
1953    let mut last_counted_slot = slot_range.start.saturating_sub(1);
1954    // let mut triggered = false;
1955    while let Err((err, slot)) = async {
1956            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1957            log::info!(
1958                target: &log_target,
1959                "slot range: {} (epoch {}) ... {} (epoch {})",
1960                slot_range.start,
1961                slot_to_epoch(slot_range.start),
1962                slot_range.end,
1963                slot_to_epoch(slot_range.end)
1964            );
1965
1966            log::info!(target: &log_target, "🚒 starting firehose...");
1967
1968            // for each epoch
1969            let mut current_slot: Option<u64> = None;
1970            for epoch_num in epoch_range.clone() {
1971                log::info!(target: &log_target, "entering epoch {}", epoch_num);
1972                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1973                    Ok(stream) => stream,
1974                    Err(_) => {
1975                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1976                    }
1977                };
1978                let mut reader = NodeReader::new(stream);
1979
1980                let header_fut = reader.read_raw_header();
1981                let header = match timeout(OP_TIMEOUT, header_fut).await {
1982                    Ok(res) => res
1983                        .map_err(FirehoseError::ReadHeader)
1984                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1985                    Err(_) => {
1986                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1987                    }
1988                };
1989                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1990
1991                let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1992                let local_start = std::cmp::max(slot_range.start, epoch_start);
1993                let local_end_inclusive =
1994                    std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1995                if local_start > local_end_inclusive {
1996                    log::debug!(
1997                        target: &log_target,
1998                        "epoch {} has no overlap with thread range ({}..{}), skipping",
1999                        epoch_num,
2000                        slot_range.start,
2001                        slot_range.end
2002                    );
2003                    continue;
2004                }
2005
2006                let mut todo_previous_blockhash = Hash::default();
2007                let mut todo_latest_entry_blockhash = Hash::default();
2008                // Reset counters to align to the local epoch slice; prevents boundary slots
2009                // from being treated as already-counted after a restart.
2010                last_counted_slot = local_start.saturating_sub(1);
2011                current_slot = None;
2012
2013                if local_start > epoch_start {
2014                    // Seek to the nearest previous indexed slot so the reader captures the full
2015                    // node set (transactions, entries, rewards) for the target block.
2016                    let seek_slot = match timeout(
2017                        OP_TIMEOUT,
2018                        find_previous_indexed_slot(local_start, epoch_start, &log_target),
2019                    )
2020                    .await
2021                    {
2022                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2023                        Err(_) => {
2024                            return Err((
2025                                FirehoseError::OperationTimeout(
2026                                    "seek_to_previous_indexed_slot",
2027                                ),
2028                                current_slot.unwrap_or(slot_range.start),
2029                            ));
2030                        }
2031                    };
2032                    if let Some(seek_slot) = seek_slot {
2033                        let seek_fut = reader.seek_to_slot(seek_slot);
2034                        match timeout(OP_TIMEOUT, seek_fut).await {
2035                            Ok(res) => {
2036                                res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2037                            }
2038                            Err(_) => {
2039                                return Err((
2040                                    FirehoseError::OperationTimeout("seek_to_slot"),
2041                                    current_slot.unwrap_or(slot_range.start),
2042                                ));
2043                            }
2044                        }
2045                    }
2046                }
2047
2048                // for each item in each block
2049                let mut item_index = 0;
2050                let mut displayed_skip_message = false;
2051                loop {
2052                    let read_fut = reader.read_until_block();
2053                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2054                        Ok(result) => result
2055                            .map_err(FirehoseError::ReadUntilBlockError)
2056                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2057                        Err(_) => {
2058                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2059                            let restart_slot =
2060                                current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2061                            return Err((
2062                                FirehoseError::OperationTimeout("read_until_block"),
2063                                restart_slot,
2064                            ));
2065                        }
2066                    };
2067                    if nodes.is_empty() {
2068                        log::info!(
2069                            target: &log_target,
2070                            "reached end of epoch {}",
2071                            epoch_num
2072                        );
2073                        break;
2074                    }
2075                    // ignore epoch and subset nodes at end of car file loop { if
2076                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
2077                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
2078                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
2079                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
2080                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
2081                    //         log::info!(target: &log_target, "reached end of epoch {}",
2082                    //             epoch_num); break; }
2083                    if let Some(last_node) = nodes.0.last()
2084                        && !last_node.get_node().is_block() {
2085                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2086                            break;
2087                        }
2088                    let block = nodes
2089                        .get_block()
2090                        .map_err(FirehoseError::GetBlockError)
2091                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2092                    log::debug!(
2093                        target: &log_target,
2094                        "read {} items from epoch {}, now at slot {}",
2095                        item_index,
2096                        epoch_num,
2097                        block.slot
2098                    );
2099                    let slot = block.slot;
2100                    if slot > local_end_inclusive {
2101                        log::debug!(
2102                            target: &log_target,
2103                            "reached end of local slice at slot {} (epoch {}), stopping",
2104                            slot,
2105                            epoch_num
2106                        );
2107                        break;
2108                    }
2109                    if slot >= slot_range.end {
2110                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2111                        // Return early to terminate the firehose thread cleanly. We use >=
2112                        // because slot_range is half-open [start, end), so any slot equal to
2113                        // end is out-of-range and must not be processed.
2114                        return Ok(());
2115                    }
2116                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2117                    if slot < local_start {
2118                        if slot.saturating_add(1) == local_start {
2119                            log::debug!(
2120                                target: &log_target,
2121                                "priming reader with preceding slot {}, skipping",
2122                                slot
2123                            );
2124                        } else {
2125                            log::warn!(
2126                                target: &log_target,
2127                                "encountered slot {} before start of range {}, skipping",
2128                                slot,
2129                                local_start
2130                            );
2131                        }
2132                        continue;
2133                    }
2134                    current_slot = Some(slot);
2135                    let mut entry_index: usize = 0;
2136                    let mut this_block_executed_transaction_count: u64 = 0;
2137                    let mut this_block_entry_count: u64 = 0;
2138                    let mut this_block_rewards = DecodedRewards::empty();
2139
2140                    if slot <= last_counted_slot {
2141                        log::debug!(
2142                            target: &log_target,
2143                            "duplicate block {}, already counted (last_counted={})",
2144                            slot,
2145                            last_counted_slot,
2146                        );
2147                        continue;
2148                    }
2149
2150                    nodes.each(|node_with_cid| -> Result<(), SharedError> {
2151                        item_index += 1;
2152                        // if item_index == 100000 && !triggered { log::info!("simulating
2153                        //     error"); triggered = true; return
2154                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
2155                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
2156                        //         "simulated error", )), ))); }
2157                        if let Some(skip) = skip_until_index {
2158                            if item_index < skip {
2159                                if !displayed_skip_message {
2160                                    log::info!(
2161                                        target: &log_target,
2162                                        "skipping until index {} (at {})",
2163                                        skip,
2164                                        item_index
2165                                    );
2166                                    displayed_skip_message = true;
2167                                }
2168                                return Ok(());
2169                            } else {
2170                                log::info!(
2171                                    target: &log_target,
2172                                    "reached target index {}, resuming...",
2173                                    skip
2174                                );
2175                                skip_until_index = None;
2176                            }
2177                        }
2178                        let node = node_with_cid.get_node();
2179
2180                        use crate::node::Node::*;
2181                        match node {
2182                            Transaction(tx) => {
2183                                let versioned_tx = tx.as_parsed()?;
2184                                let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
2185
2186                                let as_native_metadata = decode_transaction_status_meta_from_frame(
2187                                    block.slot,
2188                                    reassembled_metadata,
2189                                )?;
2190
2191                                let message_hash = {
2192                                    #[cfg(feature = "verify-transaction-signatures")]
2193                                    {
2194                                        versioned_tx.verify_and_hash_message()?
2195                                    }
2196                                    #[cfg(not(feature = "verify-transaction-signatures"))]
2197                                    {
2198                                        // Signature verification is optional because it is
2199                                        // extremely expensive at replay scale.
2200                                        versioned_tx.message.hash()
2201                                    }
2202                                };
2203                                let signature = versioned_tx
2204                                    .signatures
2205                                    .first()
2206                                    .ok_or_else(|| {
2207                                        Box::new(std::io::Error::new(
2208                                            std::io::ErrorKind::InvalidData,
2209                                            "transaction missing signature",
2210                                        )) as SharedError
2211                                    })?;
2212                                let is_vote = is_simple_vote_transaction(&versioned_tx);
2213
2214                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2215                                    transaction_notifier.notify_transaction(
2216                                        block.slot,
2217                                        tx.index.unwrap() as usize,
2218                                        signature,
2219                                        &message_hash,
2220                                        is_vote,
2221                                        &as_native_metadata,
2222                                        &versioned_tx,
2223                                    );
2224                                }
2225
2226                            }
2227                            Entry(entry) => {
2228                                let entry_hash = Hash::from(entry.hash.to_bytes());
2229                                let entry_transaction_count = entry.transactions.len();
2230                                let entry_transaction_count_u64 = entry_transaction_count as u64;
2231                                let starting_transaction_index =
2232                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2233                                        Box::new(std::io::Error::other(
2234                                            "transaction index exceeds usize range",
2235                                        )) as SharedError
2236                                    })?;
2237                                todo_latest_entry_blockhash = entry_hash;
2238                                this_block_executed_transaction_count += entry_transaction_count_u64;
2239                                this_block_entry_count += 1;
2240                                if entry_notifier_maybe.is_none() {
2241                                    return Ok(());
2242                                }
2243                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2244                                let entry_summary = solana_entry::entry::EntrySummary {
2245                                    num_hashes: entry.num_hashes,
2246                                    hash: Hash::from(entry.hash.to_bytes()),
2247                                    num_transactions: entry_transaction_count_u64,
2248                                };
2249                                entry_notifier.notify_entry(
2250                                    block.slot,
2251                                    entry_index,
2252                                    &entry_summary,
2253                                    starting_transaction_index,
2254                                );
2255                                entry_index += 1;
2256                            }
2257                            Block(block) => {
2258                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2259                                confirmed_bank_sender.send(notification).unwrap();
2260
2261                                if block_meta_notifier_maybe.is_none() {
2262                                    last_counted_slot = block.slot;
2263                                    return Ok(());
2264                                }
2265                                let DecodedRewards {
2266                                    keyed_rewards,
2267                                    num_partitions,
2268                                } = std::mem::take(&mut this_block_rewards);
2269                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2270                                block_meta_notifier.notify_block_metadata(
2271                                    block.meta.parent_slot,
2272                                    todo_previous_blockhash.to_string().as_str(),
2273                                    block.slot,
2274                                    todo_latest_entry_blockhash.to_string().as_str(),
2275                                    &KeyedRewardsAndNumPartitions {
2276                                        keyed_rewards,
2277                                        num_partitions,
2278                                    },
2279                                    Some(block.meta.blocktime as i64),
2280                                    block.meta.block_height,
2281                                    this_block_executed_transaction_count,
2282                                    this_block_entry_count,
2283                                );
2284                                todo_previous_blockhash = todo_latest_entry_blockhash;
2285                                last_counted_slot = block.slot;
2286                                std::thread::yield_now();
2287                            }
2288                            Subset(_subset) => (),
2289                            Epoch(_epoch) => (),
2290                            Rewards(rewards) => {
2291                                let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
2292                                if !reassembled.is_empty() {
2293                                    this_block_rewards = decode_rewards_from_frame(
2294                                        block.slot,
2295                                        reassembled,
2296                                    )?;
2297                                } else {
2298                                    this_block_rewards = DecodedRewards::empty();
2299                                }
2300                            }
2301                            DataFrame(_data_frame) => (),
2302                        }
2303                        Ok(())
2304                    })
2305                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2306                    if block.slot == slot_range.end - 1 {
2307                        let finish_time = std::time::Instant::now();
2308                        let elapsed = finish_time.duration_since(start_time);
2309                        log::info!(target: &log_target, "processed slot {}", block.slot);
2310                        let elapsed_pretty = human_readable_duration(elapsed);
2311                        log::info!(
2312                            target: &log_target,
2313                            "processed {} slots across {} epochs in {}.",
2314                            initial_slot_range.end - initial_slot_range.start,
2315                            slot_to_epoch(initial_slot_range.end)
2316                                + 1
2317                                - slot_to_epoch(initial_slot_range.start),
2318                            elapsed_pretty
2319                        );
2320                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2321                        // On completion, report threads with non-zero error counts for
2322                        // visibility.
2323                        let summary: String = error_counts
2324                            .iter()
2325                            .enumerate()
2326                            .filter_map(|(i, c)| {
2327                                let v = c.load(Ordering::Relaxed);
2328                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2329                            })
2330                            .collect::<Vec<_>>()
2331                            .join(", ");
2332                        if !summary.is_empty() {
2333                            log::debug!(target: &log_target, "threads with errors: {}", summary);
2334                        }
2335                        return Ok(());
2336                    }
2337                }
2338            }
2339            Ok(())
2340}
2341.await
2342{
2343        if is_shutdown_error(&err) {
2344            log::info!(
2345                target: &log_target,
2346                "shutdown requested; terminating firehose thread {:?}",
2347                thread_index
2348            );
2349            return Ok(());
2350        }
2351        log::error!(
2352            target: &log_target,
2353            "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2354            slot,
2355            slot_to_epoch(slot)
2356            );
2357            log::error!(target: &log_target, "{}", err);
2358            let error_message = err.to_string();
2359            if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2360                || error_message.contains("Unknown CID version")
2361            {
2362                // Clear cached index data for this epoch to avoid retrying with a bad/partial index
2363                // (or a bad seek offset that landed mid-stream).
2364                SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2365            }
2366            let item_index = match err {
2367                FirehoseError::NodeDecodingError(item_index, _) => item_index,
2368                _ => 0,
2369            };
2370            // Increment this thread's error counter
2371            let idx = thread_index.unwrap_or(0);
2372            error_counts[idx].fetch_add(1, Ordering::Relaxed);
2373            log::warn!(
2374                target: &log_target,
2375                "restarting from slot {} at index {}",
2376                slot,
2377                item_index,
2378            );
2379            // Update slot range to resume from the failed slot, not the original start.
2380            // If the failing slot was already fully processed, resume from the next slot.
2381            if slot <= last_counted_slot {
2382                slot_range.start = last_counted_slot.saturating_add(1);
2383            } else {
2384                slot_range.start = slot;
2385            }
2386            // `skip_until_index` is unsafe across retries because `item_index`
2387            // is reset to 0 each epoch restart. Keeping it can skip large portions
2388            // of the stream and silently drop slots.
2389            skip_until_index = None;
2390}
2391    Ok(())
2392}
2393
2394#[inline]
2395fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2396    if !(1..=2).contains(&versioned_tx.signatures.len()) {
2397        return false;
2398    }
2399
2400    if !matches!(
2401        versioned_tx.version(),
2402        solana_transaction::versioned::TransactionVersion::Legacy(_)
2403    ) {
2404        return false;
2405    }
2406
2407    let instructions = versioned_tx.message.instructions();
2408    if instructions.len() != 1 {
2409        return false;
2410    }
2411
2412    let program_index = instructions[0].program_id_index as usize;
2413    versioned_tx
2414        .message
2415        .static_account_keys()
2416        .get(program_index)
2417        .map(|program_id| program_id == &vote_program_id())
2418        .unwrap_or(false)
2419}
2420
2421#[inline(always)]
2422fn convert_proto_rewards(
2423    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2424) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2425    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2426    for proto_reward in proto_rewards.rewards.iter() {
2427        let reward = RewardInfo {
2428            reward_type: match proto_reward.reward_type - 1 {
2429                0 => RewardType::Fee,
2430                1 => RewardType::Rent,
2431                2 => RewardType::Staking,
2432                3 => RewardType::Voting,
2433                typ => {
2434                    return Err(Box::new(std::io::Error::other(format!(
2435                        "unsupported reward type {}",
2436                        typ
2437                    ))));
2438                }
2439            },
2440            lamports: proto_reward.lamports,
2441            post_balance: proto_reward.post_balance,
2442            commission: proto_reward.commission.parse::<u8>().ok(),
2443        };
2444        let pubkey = proto_reward
2445            .pubkey
2446            .parse::<Address>()
2447            .map_err(|err| Box::new(err) as SharedError)?;
2448        keyed_rewards.push((pubkey, reward));
2449    }
2450    Ok(keyed_rewards)
2451}
2452
2453#[inline]
2454/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
2455pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2456    let total = slot_range.end - slot_range.start;
2457    let slots_per_thread = total / threads;
2458    let remainder = total % threads;
2459
2460    let ranges: Vec<Range<u64>> = (0..threads)
2461        .map(|i| {
2462            // Distribute remainder slots to the first `remainder` threads
2463            let extra_slot = if i < remainder { 1 } else { 0 };
2464            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2465            let end = start + slots_per_thread + extra_slot;
2466            start..end
2467        })
2468        .collect();
2469
2470    // Verify that ranges cover all slots exactly
2471    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2472    assert_eq!(
2473        total_covered, total,
2474        "Range generation failed: {} threads should cover {} slots but only cover {}",
2475        threads, total, total_covered
2476    );
2477
2478    // Verify no gaps between ranges
2479    for i in 1..ranges.len() {
2480        assert_eq!(
2481            ranges[i - 1].end,
2482            ranges[i].start,
2483            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2484            i - 1,
2485            ranges[i - 1].end,
2486            i,
2487            ranges[i].start
2488        );
2489    }
2490
2491    log::info!(
2492        target: LOG_MODULE,
2493        "Generated {} thread ranges covering {} slots total",
2494        threads,
2495        total_covered
2496    );
2497    ranges
2498}
2499
2500fn human_readable_duration(duration: std::time::Duration) -> String {
2501    if duration.is_zero() {
2502        return "0s".into();
2503    }
2504    let total_secs = duration.as_secs();
2505    if total_secs < 60 {
2506        let secs_f = duration.as_secs_f64();
2507        if total_secs == 0 {
2508            format!("{:.2}s", secs_f)
2509        } else if duration.subsec_millis() == 0 {
2510            format!("{}s", total_secs)
2511        } else {
2512            format!("{:.2}s", secs_f)
2513        }
2514    } else {
2515        let mut secs = total_secs;
2516        let days = secs / 86_400;
2517        secs %= 86_400;
2518        let hours = secs / 3_600;
2519        secs %= 3_600;
2520        let minutes = secs / 60;
2521        secs %= 60;
2522        if days > 0 {
2523            if hours > 0 {
2524                format!("{days}d{hours}h")
2525            } else {
2526                format!("{days}d")
2527            }
2528        } else if hours > 0 {
2529            if minutes > 0 {
2530                format!("{hours}h{minutes}m")
2531            } else {
2532                format!("{hours}h")
2533            }
2534        } else if minutes > 0 {
2535            if secs > 0 {
2536                format!("{minutes}m{secs}s")
2537            } else {
2538                format!("{minutes}m")
2539            }
2540        } else {
2541            format!("{secs}s")
2542        }
2543    }
2544}
2545
2546#[cfg(test)]
2547fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2548    Box::pin(async move {
2549        let elapsed = stats.start_time.elapsed();
2550        let elapsed_secs = elapsed.as_secs_f64();
2551        let tps = if elapsed_secs > 0.0 {
2552            stats.transactions_processed as f64 / elapsed_secs
2553        } else {
2554            0.0
2555        };
2556        log::info!(
2557            target: LOG_MODULE,
2558            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2559            stats.thread_stats.current_slot,
2560            stats.slots_processed,
2561            stats.blocks_processed,
2562            stats.transactions_processed,
2563            stats.entries_processed,
2564            stats.rewards_processed,
2565            elapsed_secs,
2566            tps
2567        );
2568        Ok(())
2569    })
2570}
2571
2572#[cfg(test)]
2573use futures_util::FutureExt;
2574#[cfg(test)]
2575use serial_test::serial;
2576#[cfg(test)]
2577use std::sync::{Mutex, OnceLock};
2578
2579#[cfg(test)]
2580async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2581    use std::sync::Arc;
2582    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2583
2584    let found = Arc::new(AtomicBool::new(false));
2585    let observed_total = Arc::new(AtomicU64::new(0));
2586    let observed_non_vote = Arc::new(AtomicU64::new(0));
2587
2588    let found_block = found.clone();
2589    let observed_total_block = observed_total.clone();
2590    let target_slot_block = slot;
2591    let target_slot_tx = slot;
2592    let observed_non_vote_tx = observed_non_vote.clone();
2593
2594    firehose(
2595        1,
2596        target_slot_block..(target_slot_block + 1),
2597        Some(move |_thread_id: usize, block: BlockData| {
2598            let found_block = found_block.clone();
2599            let observed_total_block = observed_total_block.clone();
2600            async move {
2601                if block.slot() == target_slot_block {
2602                    assert!(
2603                        !block.was_skipped(),
2604                        "slot {target_slot_block} was marked leader skipped",
2605                    );
2606                    if let BlockData::Block {
2607                        executed_transaction_count,
2608                        ..
2609                    } = block
2610                    {
2611                        found_block.store(true, Ordering::Relaxed);
2612                        observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2613                    }
2614                }
2615                Ok(())
2616            }
2617            .boxed()
2618        }),
2619        Some(move |_thread_id: usize, transaction: TransactionData| {
2620            let observed_non_vote_tx = observed_non_vote_tx.clone();
2621            async move {
2622                if transaction.slot == target_slot_tx && !transaction.is_vote {
2623                    observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2624                }
2625                Ok(())
2626            }
2627            .boxed()
2628        }),
2629        None::<OnEntryFn>,
2630        None::<OnRewardFn>,
2631        None::<OnErrorFn>,
2632        None::<OnStatsTrackingFn>,
2633        None,
2634    )
2635    .await
2636    .unwrap();
2637
2638    assert!(
2639        found.load(Ordering::Relaxed),
2640        "target slot {slot} was not processed"
2641    );
2642    let observed_total = observed_total.load(Ordering::Relaxed);
2643    let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2644    assert!(
2645        observed_total > 0,
2646        "slot {slot} executed transaction count was zero"
2647    );
2648    assert!(
2649        observed_total >= min_executed,
2650        "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2651    );
2652    log::info!(
2653        target: LOG_MODULE,
2654        "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2655        observed_total,
2656        observed_non_vote
2657    );
2658}
2659
2660#[cfg(test)]
2661async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2662    use crate::index::slot_to_offset;
2663    use crate::node::Node;
2664
2665    let epoch = slot_to_epoch(slot);
2666    let client = crate::network::create_http_client();
2667    let stream = fetch_epoch_stream(epoch, &client).await;
2668    let mut reader = NodeReader::new(stream);
2669    reader
2670        .seek_to_slot(slot)
2671        .await
2672        .map_err(|err| Box::new(err) as SharedError)?;
2673
2674    let nodes = reader.read_until_block().await?;
2675    let mut transactions = 0u64;
2676    let mut entries = 0u64;
2677    let mut entry_tx_total = 0u64;
2678    let mut dataframes = 0u64;
2679    let mut rewards = 0u64;
2680    let mut subsets = 0u64;
2681    let mut epochs = 0u64;
2682    let mut block_slot = None;
2683    let mut block_entries = None;
2684    let first_kind = nodes
2685        .0
2686        .first()
2687        .map(|node| node.get_node())
2688        .map(|node| match node {
2689            Node::Transaction(_) => "transaction",
2690            Node::Entry(_) => "entry",
2691            Node::Block(_) => "block",
2692            Node::Subset(_) => "subset",
2693            Node::Epoch(_) => "epoch",
2694            Node::Rewards(_) => "rewards",
2695            Node::DataFrame(_) => "dataframe",
2696        })
2697        .unwrap_or("none");
2698
2699    for node in &nodes.0 {
2700        match node.get_node() {
2701            Node::Transaction(_) => {
2702                transactions += 1;
2703            }
2704            Node::Entry(entry) => {
2705                entries += 1;
2706                entry_tx_total += entry.transactions.len() as u64;
2707            }
2708            Node::Block(block) => {
2709                block_slot = Some(block.slot);
2710                block_entries = Some(block.entries.len());
2711            }
2712            Node::Subset(_) => {
2713                subsets += 1;
2714            }
2715            Node::Epoch(_) => {
2716                epochs += 1;
2717            }
2718            Node::Rewards(_) => {
2719                rewards += 1;
2720            }
2721            Node::DataFrame(_) => {
2722                dataframes += 1;
2723            }
2724        }
2725    }
2726
2727    log::info!(
2728        target: LOG_MODULE,
2729        "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2730        nodes.len(),
2731        first_kind,
2732        transactions,
2733        entries,
2734        entry_tx_total,
2735        block_slot,
2736        block_entries,
2737        dataframes,
2738        rewards,
2739        subsets,
2740        epochs
2741    );
2742
2743    if slot > 0 {
2744        let mut found_previous = None;
2745        for delta in 1..=5 {
2746            let candidate = slot.saturating_sub(delta);
2747            match slot_to_offset(candidate).await {
2748                Ok(offset) => {
2749                    found_previous = Some((candidate, offset));
2750                    break;
2751                }
2752                Err(err) => {
2753                    log::info!(
2754                        target: LOG_MODULE,
2755                        "slot {slot} previous lookup {candidate} failed: {err}"
2756                    );
2757                }
2758            }
2759        }
2760        if let Some((candidate, offset)) = found_previous {
2761            log::info!(
2762                target: LOG_MODULE,
2763                "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2764            );
2765        } else {
2766            log::info!(
2767                target: LOG_MODULE,
2768                "slot {slot} no previous offsets found within 5 slots"
2769            );
2770        }
2771    }
2772
2773    Ok(())
2774}
2775
2776#[tokio::test(flavor = "multi_thread")]
2777async fn test_firehose_epoch_800() {
2778    use dashmap::DashSet;
2779    use std::sync::atomic::{AtomicU64, Ordering};
2780    solana_logger::setup_with_default("info");
2781    const THREADS: usize = 4;
2782    const NUM_SLOTS_TO_COVER: u64 = 50;
2783    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2784    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2785    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2786    static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2787    static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2788    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2789    let stats_tracking = StatsTracking {
2790        on_stats: log_stats_handler,
2791        tracking_interval_slots: 10,
2792    };
2793
2794    for prev in PREV_BLOCK.iter() {
2795        prev.store(0, Ordering::Relaxed);
2796    }
2797    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2798    NUM_BLOCKS.store(0, Ordering::Relaxed);
2799    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2800    SEEN_SLOTS.get_or_init(DashSet::new).clear();
2801    SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2802
2803    firehose(
2804        THREADS.try_into().unwrap(),
2805        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2806        Some(|thread_id: usize, block: BlockData| {
2807            async move {
2808                let _prev =
2809                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2810                if block.was_skipped() {
2811                    log::info!(
2812                        target: LOG_MODULE,
2813                        "leader skipped block {} on thread {}",
2814                        block.slot(),
2815                        thread_id,
2816                    );
2817                } else {
2818                    /*log::info!(
2819                        target: LOG_MODULE,
2820                        "got block {} on thread {}",
2821                        block.slot(),
2822                        thread_id,
2823                    );*/
2824                }
2825
2826                let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2827                if block.was_skipped() {
2828                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2829                    SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2830                } else {
2831                    if first_time {
2832                        NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2833                        if let BlockData::Block {
2834                            executed_transaction_count,
2835                            ..
2836                        } = &block
2837                        {
2838                            let executed = *executed_transaction_count;
2839                            let _ = MIN_TRANSACTIONS.fetch_update(
2840                                Ordering::Relaxed,
2841                                Ordering::Relaxed,
2842                                |current| {
2843                                    if executed < current {
2844                                        Some(executed)
2845                                    } else {
2846                                        None
2847                                    }
2848                                },
2849                            );
2850                        }
2851                    }
2852                }
2853                Ok(())
2854            }
2855            .boxed()
2856        }),
2857        None::<OnTxFn>,
2858        None::<OnEntryFn>,
2859        None::<OnRewardFn>,
2860        None::<OnErrorFn>,
2861        Some(stats_tracking),
2862        None,
2863    )
2864    .await
2865    .unwrap();
2866    let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2867    assert_eq!(
2868        seen, NUM_SLOTS_TO_COVER,
2869        "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2870    );
2871    let mut skipped: Vec<u64> = SEEN_SKIPPED
2872        .get_or_init(DashSet::new)
2873        .iter()
2874        .map(|v| *v)
2875        .collect();
2876    skipped.sort_unstable();
2877    // 345600000 is present but empty; still emitted as a block. Skip set should not include it.
2878    const EXPECTED_SKIPPED: [u64; 6] = [
2879        345_600_004,
2880        345_600_005,
2881        345_600_008,
2882        345_600_009,
2883        345_600_010,
2884        345_600_011,
2885    ];
2886    assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2887    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2888}
2889
2890#[tokio::test(flavor = "multi_thread")]
2891async fn test_firehose_target_slot_transactions() {
2892    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2893    solana_logger::setup_with_default("info");
2894    const TARGET_SLOT: u64 = 376_273_722;
2895    const SLOT_RADIUS: u64 = 50;
2896    const EXPECTED_TRANSACTIONS: u64 = 1414;
2897    const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2898    static FOUND: AtomicBool = AtomicBool::new(false);
2899    static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2900    static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2901
2902    FOUND.store(false, Ordering::Relaxed);
2903    OBSERVED_TXS.store(0, Ordering::Relaxed);
2904    OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2905
2906    firehose(
2907        4,
2908        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2909        Some(|_thread_id: usize, block: BlockData| {
2910            async move {
2911                if block.slot() == TARGET_SLOT {
2912                    assert!(
2913                        !block.was_skipped(),
2914                        "target slot {TARGET_SLOT} was marked leader skipped",
2915                    );
2916                    if let BlockData::Block {
2917                        executed_transaction_count,
2918                        ..
2919                    } = block
2920                    {
2921                        OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2922                        FOUND.store(true, Ordering::Relaxed);
2923                        assert_eq!(
2924                            executed_transaction_count, EXPECTED_TRANSACTIONS,
2925                            "unexpected transaction count for slot {TARGET_SLOT}"
2926                        );
2927                        assert_eq!(
2928                            OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2929                            EXPECTED_NON_VOTE_TRANSACTIONS,
2930                            "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2931                        );
2932                    }
2933                }
2934                Ok(())
2935            }
2936            .boxed()
2937        }),
2938        Some(|_thread_id: usize, transaction: TransactionData| {
2939            async move {
2940                if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2941                    OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2942                }
2943                Ok(())
2944            }
2945            .boxed()
2946        }),
2947        None::<OnEntryFn>,
2948        None::<OnRewardFn>,
2949        None::<OnErrorFn>,
2950        None::<OnStatsTrackingFn>,
2951        None,
2952    )
2953    .await
2954    .unwrap();
2955
2956    assert!(
2957        FOUND.load(Ordering::Relaxed),
2958        "target slot was not processed"
2959    );
2960    assert_eq!(
2961        OBSERVED_TXS.load(Ordering::Relaxed),
2962        EXPECTED_TRANSACTIONS,
2963        "recorded transaction count mismatch"
2964    );
2965}
2966
2967#[cfg(test)]
2968#[serial]
2969#[tokio::test(flavor = "multi_thread")]
2970async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
2971    solana_logger::setup_with_default("info");
2972    assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
2973}
2974
2975#[cfg(test)]
2976#[serial]
2977#[tokio::test(flavor = "multi_thread")]
2978async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
2979    solana_logger::setup_with_default("info");
2980    assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
2981}
2982
2983#[cfg(test)]
2984#[serial]
2985#[tokio::test(flavor = "multi_thread")]
2986async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
2987    solana_logger::setup_with_default("info");
2988    assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
2989}
2990
2991#[cfg(test)]
2992#[serial]
2993#[tokio::test(flavor = "multi_thread")]
2994async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
2995    solana_logger::setup_with_default("info");
2996    assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
2997}
2998
2999#[cfg(test)]
3000#[ignore]
3001#[serial]
3002#[tokio::test(flavor = "multi_thread")]
3003async fn debug_epoch_720_slot_311173980_node_summary() {
3004    solana_logger::setup_with_default("info");
3005    const SLOTS: &[u64] = &[
3006        311_173_980,
3007        311_225_232,
3008        311_175_860,
3009        311_134_608,
3010        376_273_722,
3011    ];
3012    for slot in SLOTS {
3013        log_slot_node_summary(*slot).await.expect("slot summary");
3014    }
3015}
3016
3017#[tokio::test(flavor = "multi_thread")]
3018async fn test_firehose_epoch_850_has_logs() {
3019    use std::sync::atomic::{AtomicU64, Ordering};
3020    solana_logger::setup_with_default("info");
3021    const START_SLOT: u64 = 367_200_075; // within epoch 850
3022    const SLOT_COUNT: u64 = 50;
3023    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3024
3025    TOTAL_TXS.store(0, Ordering::Relaxed);
3026
3027    firehose(
3028        4,
3029        START_SLOT..(START_SLOT + SLOT_COUNT),
3030        None::<OnBlockFn>,
3031        Some(|_thread_id: usize, transaction: TransactionData| {
3032            async move {
3033                TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3034                if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3035                    let has_logs = logs.iter().any(|msg| !msg.is_empty());
3036                    assert_eq!(has_logs, true);
3037                }
3038                Ok(())
3039            }
3040            .boxed()
3041        }),
3042        None::<OnEntryFn>,
3043        None::<OnRewardFn>,
3044        None::<OnErrorFn>,
3045        None::<OnStatsTrackingFn>,
3046        None,
3047    )
3048    .await
3049    .unwrap();
3050
3051    assert!(
3052        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3053        "no transactions observed in epoch 850 range"
3054    );
3055}
3056
3057#[tokio::test(flavor = "multi_thread")]
3058async fn test_firehose_epoch_850_votes_present() {
3059    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3060    solana_logger::setup_with_default("info");
3061    const TARGET_SLOT: u64 = 367_200_100; // epoch 850
3062    const SLOT_RADIUS: u64 = 10;
3063    static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3064    static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3065    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3066
3067    SEEN_BLOCK.store(false, Ordering::Relaxed);
3068    VOTE_TXS.store(0, Ordering::Relaxed);
3069    TOTAL_TXS.store(0, Ordering::Relaxed);
3070
3071    firehose(
3072        2,
3073        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3074        Some(|_thread_id: usize, block: BlockData| {
3075            async move {
3076                if block.slot() == TARGET_SLOT {
3077                    assert!(
3078                        !block.was_skipped(),
3079                        "target slot {TARGET_SLOT} was marked leader skipped",
3080                    );
3081                    SEEN_BLOCK.store(true, Ordering::Relaxed);
3082                }
3083                Ok(())
3084            }
3085            .boxed()
3086        }),
3087        Some(|_thread_id: usize, transaction: TransactionData| {
3088            async move {
3089                if transaction.slot == TARGET_SLOT {
3090                    TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3091                    if transaction.is_vote {
3092                        VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3093                    }
3094                }
3095                Ok(())
3096            }
3097            .boxed()
3098        }),
3099        None::<OnEntryFn>,
3100        None::<OnRewardFn>,
3101        None::<OnErrorFn>,
3102        None::<OnStatsTrackingFn>,
3103        None,
3104    )
3105    .await
3106    .unwrap();
3107
3108    assert!(
3109        SEEN_BLOCK.load(Ordering::Relaxed),
3110        "target slot was not processed"
3111    );
3112    assert!(
3113        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3114        "no transactions counted in target slot"
3115    );
3116    assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3117}
3118
3119#[cfg(test)]
3120#[serial]
3121#[tokio::test(flavor = "multi_thread")]
3122async fn test_firehose_restart_loses_coverage_without_reset() {
3123    use std::collections::HashMap;
3124    solana_logger::setup_with_default("info");
3125    const THREADS: usize = 1;
3126    const START_SLOT: u64 = 345_600_000;
3127    const NUM_SLOTS: u64 = 8;
3128
3129    static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3130    COVERAGE
3131        .get_or_init(|| Mutex::new(HashMap::new()))
3132        .lock()
3133        .unwrap()
3134        .clear();
3135    static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3136    static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3137    FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3138    SEEN_BLOCKS.store(0, Ordering::Relaxed);
3139
3140    firehose(
3141        THREADS.try_into().unwrap(),
3142        START_SLOT..(START_SLOT + NUM_SLOTS),
3143        Some(|_thread_id: usize, block: BlockData| {
3144            async move {
3145                // Force an error after at least one block has been seen so restart happens mid-range.
3146                if !block.was_skipped()
3147                    && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3148                    && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3149                {
3150                    return Err("synthetic handler failure to exercise restart".into());
3151                }
3152                let mut coverage = COVERAGE
3153                    .get_or_init(|| Mutex::new(HashMap::new()))
3154                    .lock()
3155                    .unwrap();
3156                *coverage.entry(block.slot()).or_insert(0) += 1;
3157                if !block.was_skipped() {
3158                    SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3159                }
3160                Ok(())
3161            }
3162            .boxed()
3163        }),
3164        None::<OnTxFn>,
3165        None::<OnEntryFn>,
3166        None::<OnRewardFn>,
3167        None::<OnErrorFn>,
3168        None::<OnStatsTrackingFn>,
3169        None,
3170    )
3171    .await
3172    .unwrap();
3173
3174    let coverage = COVERAGE.get().unwrap().lock().unwrap();
3175    for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3176        assert!(
3177            coverage.contains_key(&slot),
3178            "missing coverage for slot {slot} after restart"
3179        );
3180    }
3181}
3182
3183#[cfg(test)]
3184#[serial]
3185#[tokio::test(flavor = "multi_thread")]
3186async fn test_firehose_gap_coverage_near_known_missing_range() {
3187    use std::collections::HashSet;
3188    solana_logger::setup_with_default("info");
3189    const GAP_START: u64 = 378864000;
3190    const START_SLOT: u64 = GAP_START - 1000;
3191    const END_SLOT: u64 = GAP_START + 1000;
3192    const THREADS: usize = 16;
3193
3194    static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3195    COVERAGE
3196        .get_or_init(|| Mutex::new(HashSet::new()))
3197        .lock()
3198        .unwrap()
3199        .clear();
3200
3201    firehose(
3202        THREADS.try_into().unwrap(),
3203        START_SLOT..(END_SLOT + 1),
3204        Some(|_thread_id: usize, block: BlockData| {
3205            async move {
3206                if block.was_skipped() {
3207                    return Ok(());
3208                }
3209                let slot = block.slot();
3210                COVERAGE
3211                    .get_or_init(|| Mutex::new(HashSet::new()))
3212                    .lock()
3213                    .unwrap()
3214                    .insert(slot);
3215                Ok(())
3216            }
3217            .boxed()
3218        }),
3219        None::<OnTxFn>,
3220        None::<OnEntryFn>,
3221        None::<OnRewardFn>,
3222        None::<OnErrorFn>,
3223        None::<OnStatsTrackingFn>,
3224        None,
3225    )
3226    .await
3227    .unwrap();
3228
3229    let mut coverage = COVERAGE
3230        .get_or_init(|| Mutex::new(HashSet::new()))
3231        .lock()
3232        .unwrap()
3233        .clone();
3234
3235    // ignore a known 4-slot leader skipped gap
3236    coverage.insert(378864396);
3237    coverage.insert(378864397);
3238    coverage.insert(378864398);
3239    coverage.insert(378864399);
3240
3241    let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3242    let missing: Vec<u64> = expected
3243        .iter()
3244        .copied()
3245        .filter(|slot| !coverage.contains(slot))
3246        .collect();
3247    assert!(
3248        missing.is_empty(),
3249        "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3250        missing.len(),
3251        &missing[..missing.len().min(10)]
3252    );
3253}