Skip to main content

jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7    block_metadata_notifier_interface::BlockMetadataNotifier,
8    geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14    optimistically_confirmed_bank_tracker::SlotNotification,
15    transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21    fmt::Display,
22    future::Future,
23    io,
24    ops::Range,
25    path::PathBuf,
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29    },
30};
31use thiserror::Error;
32use tokio::{
33    sync::broadcast::{self, error::TryRecvError},
34    time::timeout,
35};
36
37use crate::{
38    LOG_MODULE, SharedError,
39    epochs::{
40        FetchEpochStreamOptions, epoch_to_slot_range, fetch_epoch_stream,
41        fetch_epoch_stream_with_options, slot_to_epoch,
42    },
43    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
44    node_reader::NodeReader,
45    utils,
46};
47
48// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
49// header, seeking, reading next block). Adjust here to tune stall detection/restart
50// aggressiveness.
51const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
52const OP_TIMEOUT_SEQUENTIAL: std::time::Duration = std::time::Duration::from_secs(180);
53// Epochs earlier than this were bincode-encoded in Old Faithful.
54const BINCODE_EPOCH_CUTOFF: u64 = 157;
55
56fn poll_shutdown(
57    flag: &Arc<std::sync::atomic::AtomicBool>,
58    receiver: &mut Option<broadcast::Receiver<()>>,
59) -> bool {
60    if let Some(rx) = receiver {
61        match rx.try_recv() {
62            Ok(_) | Err(TryRecvError::Lagged(_)) => {
63                flag.store(true, Ordering::SeqCst);
64            }
65            Err(TryRecvError::Closed) => {
66                flag.store(true, Ordering::SeqCst);
67            }
68            Err(TryRecvError::Empty) => {}
69        }
70    }
71    flag.load(Ordering::SeqCst)
72}
73
74fn is_shutdown_error(err: &FirehoseError) -> bool {
75    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
76        inner
77            .downcast_ref::<io::Error>()
78            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
79            .unwrap_or(false)
80    }
81
82    match err {
83        FirehoseError::BlockHandlerError(inner)
84        | FirehoseError::TransactionHandlerError(inner)
85        | FirehoseError::EntryHandlerError(inner)
86        | FirehoseError::RewardHandlerError(inner)
87        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
88        _ => false,
89    }
90}
91
92async fn find_previous_indexed_slot(
93    local_start: u64,
94    epoch_start: u64,
95    log_target: &str,
96) -> Result<Option<u64>, FirehoseError> {
97    if local_start <= epoch_start {
98        return Ok(None);
99    }
100    let mut candidate = local_start.saturating_sub(1);
101    let mut skipped = 0u64;
102    loop {
103        match slot_to_offset(candidate).await {
104            Ok(_) => {
105                if skipped > 0 {
106                    log::info!(
107                        target: log_target,
108                        "slot {} missing in index; seeking back {} slots to {}",
109                        local_start.saturating_sub(1),
110                        skipped,
111                        candidate
112                    );
113                }
114                return Ok(Some(candidate));
115            }
116            Err(SlotOffsetIndexError::SlotNotFound(..)) => {
117                if candidate <= epoch_start {
118                    break;
119                }
120                skipped += 1;
121                candidate = candidate.saturating_sub(1);
122            }
123            Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
124        }
125    }
126    log::warn!(
127        target: log_target,
128        "no indexed slot found before {} (epoch start {}); reading from epoch start",
129        local_start,
130        epoch_start
131    );
132    Ok(None)
133}
134
135/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
136/// the firehose.
137#[derive(Debug, Error)]
138pub enum FirehoseError {
139    /// HTTP client error surfaced from `reqwest`.
140    Reqwest(reqwest::Error),
141    /// Failure while reading the Old Faithful CAR header.
142    ReadHeader(SharedError),
143    /// Error emitted by the Solana Geyser plugin service.
144    GeyserPluginService(GeyserPluginServiceError),
145    /// Transaction notifier could not be acquired from the Geyser service.
146    FailedToGetTransactionNotifier,
147    /// Failure while reading data until the next block boundary.
148    ReadUntilBlockError(SharedError),
149    /// Failure while fetching an individual block.
150    GetBlockError(SharedError),
151    /// Failed to decode a node at the given index.
152    NodeDecodingError(usize, SharedError),
153    /// Error surfaced when querying the slot offset index.
154    SlotOffsetIndexError(SlotOffsetIndexError),
155    /// Failure while seeking to a slot within the Old Faithful CAR stream.
156    SeekToSlotError(SharedError),
157    /// Error surfaced during the plugin `on_load` stage.
158    OnLoadError(SharedError),
159    /// Error emitted while invoking the stats handler.
160    OnStatsHandlerError(SharedError),
161    /// Timeout reached while waiting for a firehose operation.
162    OperationTimeout(&'static str),
163    /// Transaction handler returned an error.
164    TransactionHandlerError(SharedError),
165    /// Entry handler returned an error.
166    EntryHandlerError(SharedError),
167    /// Reward handler returned an error.
168    RewardHandlerError(SharedError),
169    /// Block handler returned an error.
170    BlockHandlerError(SharedError),
171}
172
173unsafe impl Send for FirehoseError {}
174unsafe impl Sync for FirehoseError {}
175
176impl Display for FirehoseError {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        match self {
179            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
180            FirehoseError::ReadHeader(error) => {
181                write!(f, "Error reading header: {}", error)
182            }
183            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
184                f,
185                "Error initializing geyser plugin service: {}",
186                geyser_plugin_service_error
187            ),
188            FirehoseError::FailedToGetTransactionNotifier => write!(
189                f,
190                "Failed to get transaction notifier from GeyserPluginService"
191            ),
192            FirehoseError::ReadUntilBlockError(error) => {
193                write!(f, "Error reading until block: {}", error)
194            }
195            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
196            FirehoseError::NodeDecodingError(item_index, error) => {
197                write!(
198                    f,
199                    "Error seeking, reading data from, or decoding data for data node {}: {}",
200                    item_index, error
201                )
202            }
203            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
204                f,
205                "Error getting info from slot offset index: {}",
206                slot_offset_index_error
207            ),
208            FirehoseError::SeekToSlotError(error) => {
209                write!(f, "Error seeking to slot: {}", error)
210            }
211            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
212            FirehoseError::OnStatsHandlerError(error) => {
213                write!(f, "Stats handler error: {}", error)
214            }
215            FirehoseError::OperationTimeout(op) => {
216                write!(f, "Timeout while waiting for operation: {}", op)
217            }
218            FirehoseError::TransactionHandlerError(error) => {
219                write!(f, "Transaction handler error: {}", error)
220            }
221            FirehoseError::EntryHandlerError(error) => {
222                write!(f, "Entry handler error: {}", error)
223            }
224            FirehoseError::RewardHandlerError(error) => {
225                write!(f, "Reward handler error: {}", error)
226            }
227            FirehoseError::BlockHandlerError(error) => {
228                write!(f, "Block handler error: {}", error)
229            }
230        }
231    }
232}
233
234impl From<reqwest::Error> for FirehoseError {
235    fn from(e: reqwest::Error) -> Self {
236        FirehoseError::Reqwest(e)
237    }
238}
239
240impl From<GeyserPluginServiceError> for FirehoseError {
241    fn from(e: GeyserPluginServiceError) -> Self {
242        FirehoseError::GeyserPluginService(e)
243    }
244}
245
246impl From<SlotOffsetIndexError> for FirehoseError {
247    fn from(e: SlotOffsetIndexError) -> Self {
248        FirehoseError::SlotOffsetIndexError(e)
249    }
250}
251
252/// Per-thread progress information emitted by the firehose runner.
253#[derive(Clone, PartialEq, Eq, Hash, Debug)]
254pub struct ThreadStats {
255    /// Identifier of the worker thread reporting the stats.
256    pub thread_id: usize,
257    /// Timestamp captured when the thread began processing.
258    pub start_time: std::time::Instant,
259    /// Timestamp captured when the thread finished, if finished.
260    pub finish_time: Option<std::time::Instant>,
261    /// Slot range currently assigned to the thread (half-open, may shrink on restart).
262    pub slot_range: Range<u64>,
263    /// Original slot range assigned to the thread (half-open, never modified).
264    pub initial_slot_range: Range<u64>,
265    /// Latest slot processed by the thread.
266    pub current_slot: u64,
267    /// Total slots processed by the thread.
268    pub slots_processed: u64,
269    /// Number of blocks successfully processed.
270    pub blocks_processed: u64,
271    /// Number of slots skipped by the cluster leader.
272    pub leader_skipped_slots: u64,
273    /// Total transactions processed.
274    pub transactions_processed: u64,
275    /// Total entries processed.
276    pub entries_processed: u64,
277    /// Total rewards processed.
278    pub rewards_processed: u64,
279}
280
281/// Aggregated firehose statistics covering all worker threads.
282#[derive(Clone, PartialEq, Eq, Hash, Debug)]
283pub struct Stats {
284    /// Per-thread statistics for the current update.
285    pub thread_stats: ThreadStats,
286    /// Timestamp captured when processing began.
287    pub start_time: std::time::Instant,
288    /// Timestamp captured when all processing finished, if finished.
289    pub finish_time: Option<std::time::Instant>,
290    /// Slot range currently being processed (half-open [start, end)).
291    pub slot_range: Range<u64>,
292    /// Aggregate slots processed across all threads.
293    pub slots_processed: u64,
294    /// Aggregate blocks processed across all threads.
295    pub blocks_processed: u64,
296    /// Aggregate skipped slots across all threads.
297    pub leader_skipped_slots: u64,
298    /// Aggregate transactions processed across all threads.
299    pub transactions_processed: u64,
300    /// Aggregate entries processed across all threads.
301    pub entries_processed: u64,
302    /// Aggregate rewards processed across all threads.
303    pub rewards_processed: u64,
304    /// Transactions processed since the previous stats pulse.
305    pub transactions_since_last_pulse: u64,
306    /// Blocks processed since the previous stats pulse.
307    pub blocks_since_last_pulse: u64,
308    /// Slots processed since the previous stats pulse.
309    pub slots_since_last_pulse: u64,
310    /// Elapsed time since the previous stats pulse.
311    pub time_since_last_pulse: std::time::Duration,
312}
313
314/// Configuration for periodic stats emission via a [`Handler`] callback.
315#[derive(Clone, PartialEq, Eq, Hash, Debug)]
316pub struct StatsTracking<OnStats: Handler<Stats>> {
317    /// Callback invoked whenever new stats are available.
318    pub on_stats: OnStats,
319    /// Emits a stats callback when the current slot is a multiple of this interval.
320    pub tracking_interval_slots: u64,
321}
322
323#[inline(always)]
324#[allow(clippy::too_many_arguments)]
325async fn maybe_emit_stats<OnStats: Handler<Stats>>(
326    stats_tracking: Option<&StatsTracking<OnStats>>,
327    thread_index: usize,
328    thread_stats: &ThreadStats,
329    overall_slots_processed: &AtomicU64,
330    overall_blocks_processed: &AtomicU64,
331    overall_transactions_processed: &AtomicU64,
332    overall_entries_processed: &AtomicU64,
333    transactions_since_stats: &AtomicU64,
334    blocks_since_stats: &AtomicU64,
335    slots_since_stats: &AtomicU64,
336    last_pulse: &Arc<AtomicU64>,
337    base_instant: std::time::Instant,
338) -> Result<(), (FirehoseError, u64)> {
339    if let Some(stats_tracker) = stats_tracking {
340        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
341        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
342        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
343        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
344        let now_nanos = base_instant.elapsed().as_nanos() as u64;
345        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
346        let delta_nanos = now_nanos.saturating_sub(previous);
347        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
348        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
349        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
350        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
351
352        let stats = Stats {
353            thread_stats: thread_stats.clone(),
354            start_time: thread_stats.start_time,
355            finish_time: thread_stats.finish_time,
356            slot_range: thread_stats.slot_range.clone(),
357            slots_processed: total_slots,
358            blocks_processed: total_blocks,
359            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
360            transactions_processed: total_transactions,
361            entries_processed: total_entries,
362            rewards_processed: thread_stats.rewards_processed,
363            transactions_since_last_pulse: processed_transactions,
364            blocks_since_last_pulse: processed_blocks,
365            slots_since_last_pulse: processed_slots,
366            time_since_last_pulse,
367        };
368
369        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
370            last_pulse.store(previous, Ordering::Relaxed);
371            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
372            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
373            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
374            return Err((
375                FirehoseError::OnStatsHandlerError(e),
376                thread_stats.current_slot,
377            ));
378        }
379    }
380    Ok(())
381}
382
383#[inline(always)]
384fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
385    if tracking_enabled {
386        atomic.fetch_add(value, Ordering::Relaxed);
387    }
388}
389
390fn clear_pending_skip(
391    map: &DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
392    thread_id: usize,
393    slot: u64,
394) -> bool {
395    map.get(&thread_id)
396        .map(|set| set.remove(&slot).is_some())
397        .unwrap_or(false)
398}
399
400fn decode_transaction_status_meta_from_frame(
401    slot: u64,
402    reassembled_metadata: Vec<u8>,
403) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
404    if reassembled_metadata.is_empty() {
405        // Early epochs often omit metadata entirely.
406        return Ok(solana_transaction_status::TransactionStatusMeta::default());
407    }
408
409    match utils::decompress_zstd(reassembled_metadata.as_slice()) {
410        Ok(decompressed) => {
411            decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
412                Box::new(std::io::Error::other(format!(
413                    "decode transaction metadata (slot {slot}): {err}"
414                ))) as SharedError
415            })
416        }
417        Err(decomp_err) => {
418            // If the frame was not zstd-compressed (common for very early data), try to
419            // decode the raw bytes directly before bailing.
420            decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
421                Box::new(std::io::Error::other(format!(
422                    "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
423                ))) as SharedError
424            })
425        }
426    }
427}
428
429#[derive(Debug, Default)]
430struct DecodedRewards {
431    keyed_rewards: Vec<(Address, RewardInfo)>,
432    num_partitions: Option<u64>,
433}
434
435impl DecodedRewards {
436    fn empty() -> Self {
437        Self {
438            keyed_rewards: Vec::new(),
439            num_partitions: None,
440        }
441    }
442}
443
444fn decode_rewards_from_frame(
445    slot: u64,
446    reassembled_rewards: Vec<u8>,
447) -> Result<DecodedRewards, SharedError> {
448    if reassembled_rewards.is_empty() {
449        // Early epochs sometimes omit rewards payloads entirely.
450        return Ok(DecodedRewards::empty());
451    }
452
453    match utils::decompress_zstd(reassembled_rewards.as_slice()) {
454        Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
455            |err| {
456                Box::new(std::io::Error::other(format!(
457                    "decode rewards (slot {slot}): {err}"
458                ))) as SharedError
459            },
460        ),
461        Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
462            |err| {
463                Box::new(std::io::Error::other(format!(
464                    "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
465                ))) as SharedError
466            },
467        ),
468    }
469}
470
471fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
472    let epoch = slot_to_epoch(slot);
473    let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
474        prost_011::Message::decode(bytes);
475    match proto_attempt {
476        Ok(proto) => {
477            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
478            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
479                Box::new(std::io::Error::other(format!(
480                    "convert rewards proto failed (epoch {epoch}): {err}"
481                ))) as SharedError
482            })?;
483            Ok(DecodedRewards {
484                keyed_rewards,
485                num_partitions,
486            })
487        }
488        Err(proto_err) => {
489            let stored: solana_storage_proto::StoredExtendedRewards =
490                bincode::deserialize(bytes).map_err(|bin_err| {
491                    Box::new(std::io::Error::other(format!(
492                        "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
493                    ))) as SharedError
494                })?;
495            let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
496            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
497            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
498                Box::new(std::io::Error::other(format!(
499                    "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
500                ))) as SharedError
501            })?;
502            Ok(DecodedRewards {
503                keyed_rewards,
504                num_partitions,
505            })
506        }
507    }
508}
509
510fn decode_transaction_status_meta(
511    slot: u64,
512    metadata_bytes: &[u8],
513) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
514    let epoch = slot_to_epoch(slot);
515    let mut bincode_err: Option<String> = None;
516    if epoch < BINCODE_EPOCH_CUTOFF {
517        match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
518            metadata_bytes,
519        ) {
520            Ok(stored) => return Ok(stored.into()),
521            Err(err) => {
522                bincode_err = Some(err.to_string());
523            }
524        }
525    }
526
527    let bin_err_for_proto = bincode_err.clone();
528    let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
529        prost_011::Message::decode(metadata_bytes).map_err(|err| {
530            // If we already tried bincode, surface both failures for easier debugging.
531            if let Some(ref bin_err) = bin_err_for_proto {
532                Box::new(std::io::Error::other(format!(
533                    "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
534                ))) as SharedError
535            } else {
536                Box::new(std::io::Error::other(format!(
537                    "protobuf decode transaction metadata: {err}"
538                ))) as SharedError
539            }
540        })?;
541
542    proto.try_into().map_err(|err| {
543        if let Some(ref bin_err) = bincode_err {
544            Box::new(std::io::Error::other(format!(
545                "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
546            ))) as SharedError
547        } else {
548            Box::new(std::io::Error::other(format!(
549                "convert transaction metadata proto: {err}"
550            ))) as SharedError
551        }
552    })
553}
554
555#[cfg(test)]
556mod metadata_decode_tests {
557    use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
558    use solana_message::v0::LoadedAddresses;
559    use solana_storage_proto::StoredTransactionStatusMeta;
560    use solana_transaction_status::TransactionStatusMeta;
561
562    fn sample_meta() -> TransactionStatusMeta {
563        TransactionStatusMeta {
564            fee: 42,
565            pre_balances: vec![1, 2],
566            post_balances: vec![3, 4],
567            log_messages: Some(vec!["hello".into()]),
568            pre_token_balances: Some(Vec::new()),
569            post_token_balances: Some(Vec::new()),
570            rewards: Some(Vec::new()),
571            compute_units_consumed: Some(7),
572            cost_units: Some(9),
573            loaded_addresses: LoadedAddresses::default(),
574            ..TransactionStatusMeta::default()
575        }
576    }
577
578    #[test]
579    fn decodes_bincode_metadata_for_early_epochs() {
580        let stored = StoredTransactionStatusMeta {
581            status: Ok(()),
582            fee: 42,
583            pre_balances: vec![1, 2],
584            post_balances: vec![3, 4],
585            inner_instructions: None,
586            log_messages: Some(vec!["hello".into()]),
587            pre_token_balances: Some(Vec::new()),
588            post_token_balances: Some(Vec::new()),
589            rewards: Some(Vec::new()),
590            return_data: None,
591            compute_units_consumed: Some(7),
592            cost_units: Some(9),
593        };
594        let bytes = bincode::serialize(&stored).expect("bincode serialize");
595        let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
596        assert_eq!(decoded, TransactionStatusMeta::from(stored));
597    }
598
599    #[test]
600    fn decodes_protobuf_metadata_for_later_epochs() {
601        let meta = sample_meta();
602        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
603            meta.clone().into();
604        let bytes = prost_011::Message::encode_to_vec(&generated);
605        let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
606        assert_eq!(decoded, meta);
607    }
608
609    #[test]
610    fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
611        let meta = sample_meta();
612        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
613            meta.clone().into();
614        let bytes = prost_011::Message::encode_to_vec(&generated);
615        // Epoch 100 should try bincode first; if those bytes are proto, we must fall back.
616        let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
617        assert_eq!(decoded, meta);
618    }
619
620    #[test]
621    fn empty_frame_decodes_to_default() {
622        let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
623        assert_eq!(decoded, TransactionStatusMeta::default());
624    }
625
626    #[test]
627    fn raw_bincode_frame_without_zstd_still_decodes() {
628        let stored = StoredTransactionStatusMeta {
629            status: Ok(()),
630            fee: 1,
631            pre_balances: vec![],
632            post_balances: vec![],
633            inner_instructions: None,
634            log_messages: None,
635            pre_token_balances: Some(Vec::new()),
636            post_token_balances: Some(Vec::new()),
637            rewards: Some(Vec::new()),
638            return_data: None,
639            compute_units_consumed: None,
640            cost_units: None,
641        };
642        let raw_bytes = bincode::serialize(&stored).expect("serialize");
643        let decoded =
644            decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
645        assert_eq!(decoded, TransactionStatusMeta::from(stored));
646    }
647}
648
649#[cfg(test)]
650mod rewards_decode_tests {
651    use super::decode_rewards_from_bytes;
652    use solana_sdk_ids::vote::id as vote_program_id;
653    use solana_storage_proto::StoredExtendedRewards;
654    use solana_transaction_status::{Reward, RewardType};
655
656    #[test]
657    fn decodes_protobuf_rewards() {
658        let pubkey = vote_program_id().to_string();
659        let proto = solana_storage_proto::convert::generated::Rewards {
660            rewards: vec![solana_storage_proto::convert::generated::Reward {
661                pubkey,
662                lamports: 5,
663                post_balance: 10,
664                reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
665                commission: "1".to_string(),
666            }],
667            num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
668                num_partitions: 2,
669            }),
670        };
671        let bytes = prost_011::Message::encode_to_vec(&proto);
672        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
673        assert_eq!(decoded.keyed_rewards.len(), 1);
674        assert_eq!(decoded.num_partitions, Some(2));
675    }
676
677    #[test]
678    fn decodes_bincode_rewards() {
679        let pubkey = vote_program_id().to_string();
680        let reward = Reward {
681            pubkey,
682            lamports: 7,
683            post_balance: 9,
684            reward_type: Some(RewardType::Rent),
685            commission: Some(3),
686        };
687        let stored_rewards: StoredExtendedRewards = vec![reward.into()];
688        let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
689        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
690        assert_eq!(decoded.keyed_rewards.len(), 1);
691        assert_eq!(decoded.num_partitions, None);
692    }
693}
694
695/// Firehose transaction payload passed to [`Handler`] callbacks.
696#[derive(Debug, Clone)]
697pub struct TransactionData {
698    /// Slot that contains the transaction.
699    pub slot: u64,
700    /// Index of the transaction within the slot.
701    pub transaction_slot_index: usize,
702    /// Transaction signature.
703    pub signature: solana_signature::Signature,
704    /// Hash of the transaction message.
705    pub message_hash: Hash,
706    /// Indicates whether the transaction is a vote.
707    pub is_vote: bool,
708    /// Status metadata returned by the Solana runtime.
709    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
710    /// Fully decoded transaction.
711    pub transaction: VersionedTransaction,
712}
713
714/// Block entry metadata passed to [`Handler`] callbacks.
715#[derive(Debug, Clone)]
716pub struct EntryData {
717    /// Slot that generated the entry.
718    pub slot: u64,
719    /// Index of the entry within the slot.
720    pub entry_index: usize,
721    /// Range of transaction indexes covered by the entry.
722    pub transaction_indexes: Range<usize>,
723    /// Number of hashes associated with the entry.
724    pub num_hashes: u64,
725    /// Entry hash.
726    pub hash: Hash,
727}
728
729/// Reward data conveyed to reward [`Handler`] callbacks.
730#[derive(Debug, Clone)]
731pub struct RewardsData {
732    /// Slot the rewards correspond to.
733    pub slot: u64,
734    /// Reward recipients and their associated reward information.
735    pub rewards: Vec<(Address, RewardInfo)>,
736}
737
738/// Block-level data streamed to block handlers.
739#[derive(Debug)]
740pub enum BlockData {
741    /// Fully populated block payload with ledger metadata.
742    Block {
743        /// Parent slot number.
744        parent_slot: u64,
745        /// Parent block hash.
746        parent_blockhash: Hash,
747        /// Current block slot.
748        slot: u64,
749        /// Current block hash.
750        blockhash: Hash,
751        /// Rewards keyed by account and partition information.
752        rewards: KeyedRewardsAndNumPartitions,
753        /// Optional Unix timestamp for the block.
754        block_time: Option<i64>,
755        /// Optional ledger block height.
756        block_height: Option<u64>,
757        /// Number of executed transactions in the block.
758        executed_transaction_count: u64,
759        /// Number of entries contained in the block.
760        entry_count: u64,
761    },
762    /// Marker indicating the slot appears skipped (either truly skipped or it is late and will
763    /// arrive out of order).
764    PossibleLeaderSkipped {
765        /// Slot number that either lacked a block or may still arrive later.
766        slot: u64,
767    },
768}
769
770impl BlockData {
771    /// Returns the slot associated with this block or skipped slot.
772    #[inline(always)]
773    pub const fn slot(&self) -> u64 {
774        match self {
775            BlockData::Block { slot, .. } => *slot,
776            BlockData::PossibleLeaderSkipped { slot } => *slot,
777        }
778    }
779
780    /// Returns `true` if this record currently represents a missing/possibly skipped slot.
781    #[inline(always)]
782    pub const fn was_skipped(&self) -> bool {
783        matches!(self, BlockData::PossibleLeaderSkipped { .. })
784    }
785
786    /// Returns the optional block time when available.
787    #[inline(always)]
788    pub const fn block_time(&self) -> Option<i64> {
789        match self {
790            BlockData::Block { block_time, .. } => *block_time,
791            BlockData::PossibleLeaderSkipped { .. } => None,
792        }
793    }
794}
795
796type HandlerResult = Result<(), SharedError>;
797type HandlerFuture = BoxFuture<'static, HandlerResult>;
798
799/// Asynchronous callback invoked for each firehose event of type `Data`.
800pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
801
802impl<Data, F> Handler<Data> for F where
803    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
804{
805}
806
807/// Function pointer alias for [`Handler`] callbacks.
808pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
809/// Convenience alias for block handlers accepted by [`firehose`].
810pub type OnBlockFn = HandlerFn<BlockData>;
811/// Convenience alias for transaction handlers accepted by [`firehose`].
812pub type OnTxFn = HandlerFn<TransactionData>;
813/// Convenience alias for entry handlers accepted by [`firehose`].
814pub type OnEntryFn = HandlerFn<EntryData>;
815/// Convenience alias for reward handlers accepted by [`firehose`].
816pub type OnRewardFn = HandlerFn<RewardsData>;
817/// Type alias for [`StatsTracking`] using simple function pointers.
818pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
819/// Convenience alias for firehose error handlers.
820pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
821/// Convenience alias for stats tracking handlers accepted by [`firehose`].
822pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
823
824/// Metadata describing a firehose worker failure.
825#[derive(Clone, Debug)]
826pub struct FirehoseErrorContext {
827    /// Thread index that encountered the error.
828    pub thread_id: usize,
829    /// Slot the worker was processing when the error surfaced.
830    pub slot: u64,
831    /// Epoch derived from the failing slot.
832    pub epoch: u64,
833    /// Stringified error payload for display/logging.
834    pub error_message: String,
835}
836
837/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
838///
839/// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the
840/// runner restarts from the last processed slot to maintain coverage.
841///
842/// When `sequential` is `true`, the firehose uses one worker thread and opens epoch streams
843/// with ripget's parallel windowed downloader. In this mode `threads` configures ripget range
844/// concurrency rather than firehose worker partitioning.
845///
846/// `buffer_window_bytes` controls the ripget hot/cold window when `sequential` is enabled.
847/// Pass `None` to use the default (`min(4 GiB, 15% of available RAM)`).
848///
849/// When `reverse` is `true` (sequential mode only), epochs in the requested range are
850/// processed from highest to lowest. Within each epoch slots are still emitted in ascending
851/// order because the underlying CAR archive can only be streamed forward.
852#[inline]
853#[allow(clippy::too_many_arguments)]
854pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
855    threads: u64,
856    sequential: bool,
857    reverse: bool,
858    buffer_window_bytes: Option<u64>,
859    slot_range: Range<u64>,
860    on_block: Option<OnBlock>,
861    on_tx: Option<OnTransaction>,
862    on_entry: Option<OnEntry>,
863    on_rewards: Option<OnRewards>,
864    on_error: Option<OnError>,
865    stats_tracking: Option<StatsTracking<OnStats>>,
866    shutdown_signal: Option<broadcast::Receiver<()>>,
867) -> Result<(), (FirehoseError, u64)>
868where
869    OnBlock: Handler<BlockData>,
870    OnTransaction: Handler<TransactionData>,
871    OnEntry: Handler<EntryData>,
872    OnRewards: Handler<RewardsData>,
873    OnStats: Handler<Stats>,
874    OnError: Handler<FirehoseErrorContext>,
875{
876    if threads == 0 {
877        return Err((
878            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
879            slot_range.start,
880        ));
881    }
882    let client = crate::network::create_http_client();
883    log::info!(target: LOG_MODULE, "starting firehose...");
884    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
885    // Reverse mode implies sequential mode; activate it automatically when caller passed
886    // `reverse: true` without `sequential: true`.
887    let sequential = sequential || reverse;
888    let firehose_threads = if sequential { 1 } else { threads };
889    let sequential_download_threads = std::cmp::max(1, threads as usize);
890    let sequential_buffer_window_bytes = buffer_window_bytes
891        .filter(|value| *value >= 2)
892        .unwrap_or_else(crate::system::default_firehose_buffer_window_bytes);
893    if sequential {
894        log::info!(
895            target: LOG_MODULE,
896            "sequential mode enabled: firehose_threads=1, ripget_threads={}, ripget_window={}",
897            sequential_download_threads,
898            crate::system::format_byte_size(sequential_buffer_window_bytes)
899        );
900    }
901    let reverse_mode = reverse;
902    if reverse_mode {
903        log::info!(
904            target: LOG_MODULE,
905            "reverse mode enabled: epochs processed from highest to lowest"
906        );
907    }
908
909    let slot_range = Arc::new(slot_range);
910
911    // divide slot_range into n subranges
912    let subranges = generate_subranges(&slot_range, firehose_threads);
913    if firehose_threads > 1 {
914        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
915    }
916
917    let firehose_start = std::time::Instant::now();
918    let shutdown_flag = Arc::new(AtomicBool::new(false));
919    if let Some(ref rx) = shutdown_signal {
920        let mut rx = rx.resubscribe();
921        let flag = shutdown_flag.clone();
922        tokio::spawn(async move {
923            if rx.recv().await.is_ok() {
924                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
925                flag.store(true, Ordering::SeqCst);
926            }
927        });
928    }
929
930    // Build a shared ripget HTTP client so TCP connections survive across epoch transitions.
931    let shared_ripget_client: Option<ripget::Client> = if sequential {
932        Some(
933            ripget::build_client(Some(&format!(
934                "jetstreamer-firehose/{}",
935                env!("CARGO_PKG_VERSION")
936            )))
937            .expect("failed to build ripget HTTP client"),
938        )
939    } else {
940        None
941    };
942
943    let mut handles = Vec::new();
944    // Shared per-thread error counters
945    let error_counts: Arc<Vec<AtomicU32>> =
946        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
947
948    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
949    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
950    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
951    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
952    let pending_skipped_slots: Arc<
953        DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
954    > = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
955
956    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
957        let error_counts = error_counts.clone();
958        let client = client.clone();
959        let on_block = on_block.clone();
960        let on_tx = on_tx.clone();
961        let on_entry = on_entry.clone();
962        let on_reward = on_rewards.clone();
963        let on_error = on_error.clone();
964        let overall_slots_processed = overall_slots_processed.clone();
965        let overall_blocks_processed = overall_blocks_processed.clone();
966        let overall_transactions_processed = overall_transactions_processed.clone();
967        let overall_entries_processed = overall_entries_processed.clone();
968        let stats_tracking = stats_tracking.clone();
969        let transactions_since_stats = Arc::new(AtomicU64::new(0));
970        let blocks_since_stats = Arc::new(AtomicU64::new(0));
971        let slots_since_stats = Arc::new(AtomicU64::new(0));
972        let last_pulse = Arc::new(AtomicU64::new(0));
973        let transactions_since_stats_cloned = transactions_since_stats.clone();
974        let blocks_since_stats_cloned = blocks_since_stats.clone();
975        let slots_since_stats_cloned = slots_since_stats.clone();
976        let last_pulse_cloned = last_pulse.clone();
977        let shutdown_flag = shutdown_flag.clone();
978        let pending_skipped_slots = pending_skipped_slots.clone();
979        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
980        let sequential_mode = sequential;
981        let reverse_mode_local = reverse_mode;
982        let ripget_threads = sequential_download_threads;
983        let ripget_buffer_window_bytes = sequential_buffer_window_bytes;
984        let ripget_client = shared_ripget_client.clone();
985
986        let handle = tokio::spawn(async move {
987            let transactions_since_stats = transactions_since_stats_cloned;
988            let blocks_since_stats = blocks_since_stats_cloned;
989            let slots_since_stats = slots_since_stats_cloned;
990            let last_pulse = last_pulse_cloned;
991            let mut shutdown_rx = thread_shutdown_rx;
992            let start_time = firehose_start;
993            last_pulse.store(
994                firehose_start.elapsed().as_nanos() as u64,
995                Ordering::Relaxed,
996            );
997            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
998            let mut skip_until_index = None;
999            let last_emitted_slot = slot_range.start.saturating_sub(1);
1000            let block_enabled = on_block.is_some();
1001            let tx_enabled = on_tx.is_some();
1002            let entry_enabled = on_entry.is_some();
1003            let reward_enabled = on_reward.is_some();
1004            let tracking_enabled = stats_tracking.is_some();
1005            if block_enabled {
1006                pending_skipped_slots
1007                    .entry(thread_index)
1008                    .or_insert_with(|| DashSet::with_hasher(ahash::RandomState::new()));
1009            }
1010            let mut last_counted_slot = slot_range.start.saturating_sub(1);
1011            let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
1012            // Reverse-mode state preserved across retries.
1013            let mut reverse_partial_resume: Option<u64> = None;
1014            let mut reverse_highest_remaining_epoch: u64 = if reverse_mode_local {
1015                slot_to_epoch(slot_range.end.saturating_sub(1))
1016            } else {
1017                0
1018            };
1019            let mut thread_stats = if tracking_enabled {
1020                Some(ThreadStats {
1021                    thread_id: thread_index,
1022                    start_time,
1023                    finish_time: None,
1024                    slot_range: slot_range.clone(),
1025                    initial_slot_range: slot_range.clone(),
1026                    current_slot: slot_range.start,
1027                    slots_processed: 0,
1028                    blocks_processed: 0,
1029                    leader_skipped_slots: 0,
1030                    transactions_processed: 0,
1031                    entries_processed: 0,
1032                    rewards_processed: 0,
1033                })
1034            } else {
1035                None
1036            };
1037
1038            // let mut triggered = false;
1039            while let Err((err, slot)) = async {
1040                let mut last_emitted_slot = last_emitted_slot_global;
1041                let op_timeout = if sequential_mode {
1042                    OP_TIMEOUT_SEQUENTIAL
1043                } else {
1044                    OP_TIMEOUT
1045                };
1046                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1047                    log::info!(
1048                        target: &log_target,
1049                        "shutdown requested; terminating firehose thread {}",
1050                        thread_index
1051                    );
1052                    return Ok(());
1053                }
1054                let lowest_epoch = slot_to_epoch(slot_range.start);
1055                let highest_epoch = slot_to_epoch(slot_range.end - 1);
1056                let epoch_range = lowest_epoch..=highest_epoch;
1057                log::info!(
1058                    target: &log_target,
1059                    "slot range: {} (epoch {}) ... {} (epoch {})",
1060                    slot_range.start,
1061                    slot_to_epoch(slot_range.start),
1062                    slot_range.end,
1063                    slot_to_epoch(slot_range.end)
1064                );
1065
1066                log::info!(target: &log_target, "🚒 starting firehose...");
1067
1068                // for each epoch
1069                let mut current_slot: Option<u64> = None;
1070                let epoch_iter: Vec<u64> = if reverse_mode_local {
1071                    if reverse_highest_remaining_epoch < lowest_epoch {
1072                        // All epochs already completed across previous retries.
1073                        return Ok(());
1074                    }
1075                    (lowest_epoch..=reverse_highest_remaining_epoch)
1076                        .rev()
1077                        .collect()
1078                } else {
1079                    epoch_range.clone().collect()
1080                };
1081                for epoch_num in epoch_iter {
1082                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1083                        log::info!(
1084                            target: &log_target,
1085                            "shutdown requested; terminating firehose thread {}",
1086                            thread_index
1087                        );
1088                        return Ok(());
1089                    }
1090                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
1091                    let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1092                    let local_start = if reverse_mode_local {
1093                        match reverse_partial_resume {
1094                            Some(s) if slot_to_epoch(s) == epoch_num => {
1095                                std::cmp::max(epoch_start, s)
1096                            }
1097                            _ => std::cmp::max(slot_range.start, epoch_start),
1098                        }
1099                    } else {
1100                        std::cmp::max(slot_range.start, epoch_start)
1101                    };
1102                    let local_end_inclusive =
1103                        std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1104                    if local_start > local_end_inclusive {
1105                        log::debug!(
1106                            target: &log_target,
1107                            "epoch {} has no overlap with thread range ({}..{}), skipping",
1108                            epoch_num,
1109                            slot_range.start,
1110                            slot_range.end
1111                        );
1112                        continue;
1113                    }
1114                    let use_sequential_stream = sequential_mode && local_start == epoch_start;
1115                    let stream = match timeout(op_timeout, async {
1116                        if use_sequential_stream {
1117                            fetch_epoch_stream_with_options(
1118                                epoch_num,
1119                                &client,
1120                                Some(FetchEpochStreamOptions {
1121                                    sequential: true,
1122                                    ripget_threads,
1123                                    buffer_window_bytes: ripget_buffer_window_bytes,
1124                                    ripget_client: ripget_client.clone(),
1125                                }),
1126                            )
1127                            .await
1128                        } else {
1129                            fetch_epoch_stream(epoch_num, &client).await
1130                        }
1131                    })
1132                    .await
1133                    {
1134                        Ok(stream) => stream,
1135                        Err(_) => {
1136                            return Err((
1137                                FirehoseError::OperationTimeout("fetch_epoch_stream"),
1138                                current_slot.unwrap_or(slot_range.start),
1139                            ));
1140                        }
1141                    };
1142                    let mut reader = NodeReader::new(stream);
1143
1144                    let header_fut = reader.read_raw_header();
1145                    let header = match timeout(op_timeout, header_fut).await {
1146                        Ok(res) => res
1147                            .map_err(FirehoseError::ReadHeader)
1148                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1149                        Err(_) => {
1150                            return Err((
1151                                FirehoseError::OperationTimeout("read_raw_header"),
1152                                current_slot.unwrap_or(slot_range.start),
1153                            ));
1154                        }
1155                    };
1156                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1157
1158                    let mut previous_blockhash = Hash::default();
1159                    let mut latest_entry_blockhash = Hash::default();
1160                    // Reset counters to align to the local epoch slice; prevents boundary slots
1161                    // from being treated as already-counted after a restart.
1162                    last_counted_slot = local_start.saturating_sub(1);
1163                    current_slot = None;
1164                    if reverse_mode_local {
1165                        // In reverse mode each epoch is processed forward independently;
1166                        // the cross-epoch monotonic dedup check would otherwise reject every
1167                        // slot below the previously processed (higher) epoch's range.
1168                        last_emitted_slot = local_start.saturating_sub(1);
1169                    }
1170                    if tracking_enabled
1171                        && let Some(ref mut stats) = thread_stats {
1172                            stats.current_slot = local_start;
1173                            stats.slot_range.start = local_start;
1174                        }
1175
1176                    if local_start > epoch_start {
1177                        // Seek to the nearest previous indexed slot so the stream includes all
1178                        // nodes (transactions, entries, rewards) that precede `local_start`.
1179                        let seek_slot = match timeout(
1180                            OP_TIMEOUT,
1181                            find_previous_indexed_slot(local_start, epoch_start, &log_target),
1182                        )
1183                        .await
1184                        {
1185                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1186                            Err(_) => {
1187                                return Err((
1188                                    FirehoseError::OperationTimeout(
1189                                        "seek_to_previous_indexed_slot",
1190                                    ),
1191                                    current_slot.unwrap_or(slot_range.start),
1192                                ));
1193                            }
1194                        };
1195                        if let Some(seek_slot) = seek_slot {
1196                            let seek_fut = reader.seek_to_slot(seek_slot);
1197                            match timeout(op_timeout, seek_fut).await {
1198                                Ok(res) => {
1199                                    res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1200                                }
1201                                Err(_) => {
1202                                    return Err((
1203                                        FirehoseError::OperationTimeout("seek_to_slot"),
1204                                        current_slot.unwrap_or(slot_range.start),
1205                                    ));
1206                                }
1207                            }
1208                        }
1209                    }
1210
1211                    // for each item in each block
1212                    let mut item_index = 0;
1213                    let mut displayed_skip_message = false;
1214                    loop {
1215                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1216                            log::info!(
1217                                target: &log_target,
1218                                "shutdown requested; terminating firehose thread {}",
1219                                thread_index
1220                            );
1221                            return Ok(());
1222                        }
1223                        let read_fut = reader.read_until_block();
1224                        let nodes = match timeout(op_timeout, read_fut).await {
1225                            Ok(result) => result
1226                                .map_err(FirehoseError::ReadUntilBlockError)
1227                                .map_err(|e| {
1228                                    (
1229                                        e,
1230                                        current_slot
1231                                            .map(|slot| slot.saturating_add(1))
1232                                            .unwrap_or(slot_range.start),
1233                                    )
1234                                })?,
1235                            Err(_) => {
1236                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1237                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1238                            }
1239                        };
1240                        if nodes.is_empty() {
1241                            log::info!(
1242                                target: &log_target,
1243                                "reached end of epoch {}",
1244                                epoch_num
1245                            );
1246                            break;
1247                        }
1248                        if let Some(last_node) = nodes.0.last()
1249                            && !last_node.get_node().is_block()
1250                        {
1251                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1252                            break;
1253                        }
1254                        let block = nodes
1255                            .get_block()
1256                            .map_err(FirehoseError::GetBlockError)
1257                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1258                        log::debug!(
1259                            target: &log_target,
1260                            "read {} items from epoch {}, now at slot {}",
1261                            item_index,
1262                            epoch_num,
1263                            block.slot
1264                        );
1265                        let slot = block.slot;
1266                        if slot > local_end_inclusive {
1267                            log::debug!(
1268                                target: &log_target,
1269                                "reached end of local slice at slot {} (epoch {}), stopping",
1270                                slot,
1271                                epoch_num
1272                            );
1273                            break;
1274                        }
1275                        if slot >= slot_range.end {
1276                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1277                            // We use >= because slot_range is half-open [start, end), so any
1278                            // slot equal to end is out-of-range and must not be processed. Do
1279                            // not emit synthetic skipped slots here; another thread may own the
1280                            // boundary. In reverse mode we still have lower epochs to process,
1281                            // so just break out of this epoch's inner loop.
1282                            if reverse_mode_local {
1283                                break;
1284                            }
1285                            if block_enabled {
1286                                pending_skipped_slots.remove(&thread_index);
1287                            }
1288                            return Ok(());
1289                        }
1290                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1291                        if slot < slot_range.start {
1292                            if slot.saturating_add(1) == slot_range.start {
1293                                log::debug!(
1294                                    target: &log_target,
1295                                    "priming reader with preceding slot {}, skipping",
1296                                    slot
1297                                );
1298                            } else {
1299                                log::warn!(
1300                                    target: &log_target,
1301                                    "encountered slot {} before start of range {}, skipping",
1302                                    slot,
1303                                    slot_range.start
1304                                );
1305                            }
1306                            continue;
1307                        }
1308                        current_slot = Some(slot);
1309                        let mut entry_index: usize = 0;
1310                        let mut this_block_executed_transaction_count: u64 = 0;
1311                        let mut this_block_entry_count: u64 = 0;
1312                        let mut this_block_rewards = DecodedRewards::empty();
1313
1314                        for node_with_cid in &nodes.0 {
1315                            item_index += 1;
1316                            if let Some(skip) = skip_until_index {
1317                                if item_index < skip {
1318                                    if !displayed_skip_message {
1319                                        log::info!(
1320                                            target: &log_target,
1321                                            "skipping until index {} (at {})",
1322                                            skip,
1323                                            item_index
1324                                        );
1325                                        displayed_skip_message = true;
1326                                    }
1327                                    continue;
1328                                } else {
1329                                    log::info!(
1330                                        target: &log_target,
1331                                        "reached target index {}, resuming...",
1332                                        skip
1333                                    );
1334                                    skip_until_index = None;
1335                                }
1336                            }
1337                            let node = node_with_cid.get_node();
1338
1339                            if let Some(ref mut stats) = thread_stats {
1340                                stats.current_slot = slot;
1341                            }
1342
1343                            let error_slot = current_slot.unwrap_or(slot_range.start);
1344
1345                            use crate::node::Node::*;
1346                            match node {
1347                                Transaction(tx) => {
1348                                    if tx_enabled
1349                                        && let Some(on_tx_cb) = on_tx.as_ref()
1350                                    {
1351                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1352                                        let versioned_tx = tx.as_parsed().map_err(|err| {
1353                                            (
1354                                                FirehoseError::NodeDecodingError(item_index, err),
1355                                                error_slot,
1356                                            )
1357                                        })?;
1358                                        let reassembled_metadata = nodes
1359                                            .reassemble_dataframes(&tx.metadata)
1360                                            .map_err(|err| {
1361                                                (
1362                                                    FirehoseError::NodeDecodingError(item_index, err),
1363                                                    error_slot,
1364                                                )
1365                                            })?;
1366
1367                                        let as_native_metadata = decode_transaction_status_meta_from_frame(
1368                                            block.slot,
1369                                            reassembled_metadata,
1370                                        )
1371                                        .map_err(|err| {
1372                                            (
1373                                                FirehoseError::NodeDecodingError(item_index, err),
1374                                                error_slot,
1375                                            )
1376                                        })?;
1377
1378                                        let message_hash = {
1379                                            #[cfg(feature = "verify-transaction-signatures")]
1380                                            {
1381                                                versioned_tx.verify_and_hash_message().map_err(|err| {
1382                                                    (
1383                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
1384                                                        error_slot,
1385                                                    )
1386                                                })?
1387                                            }
1388                                            #[cfg(not(feature = "verify-transaction-signatures"))]
1389                                            {
1390                                                versioned_tx.message.hash()
1391                                            }
1392                                        };
1393                                        let signature = versioned_tx
1394                                            .signatures
1395                                            .first()
1396                                            .ok_or_else(|| {
1397                                                Box::new(std::io::Error::new(
1398                                                    std::io::ErrorKind::InvalidData,
1399                                                    "transaction missing signature",
1400                                                )) as SharedError
1401                                            })
1402                                            .map_err(|err| {
1403                                                (
1404                                                    FirehoseError::NodeDecodingError(
1405                                                        item_index,
1406                                                        err,
1407                                                    ),
1408                                                    error_slot,
1409                                                )
1410                                            })?;
1411                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
1412
1413                                        on_tx_cb(
1414                                            thread_index,
1415                                            TransactionData {
1416                                                slot: block.slot,
1417                                                transaction_slot_index: tx.index.unwrap() as usize,
1418                                                signature: *signature,
1419                                                message_hash,
1420                                                is_vote,
1421                                                transaction_status_meta: as_native_metadata,
1422                                                transaction: versioned_tx,
1423                                            },
1424                                        )
1425                                        .await
1426                                        .map_err(|e| {
1427                                            (
1428                                                FirehoseError::TransactionHandlerError(e),
1429                                                error_slot,
1430                                            )
1431                                        })?;
1432                                    }
1433                                    fetch_add_if(
1434                                        tracking_enabled,
1435                                        &overall_transactions_processed,
1436                                        1,
1437                                    );
1438                                    if let Some(ref mut stats) = thread_stats {
1439                                        stats.transactions_processed += 1;
1440                                    }
1441                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1442                                }
1443                                Entry(entry) => {
1444                                    let entry_hash = Hash::from(entry.hash.to_bytes());
1445                                    let entry_transaction_count = entry.transactions.len();
1446                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
1447                                    let starting_transaction_index_u64 =
1448                                        this_block_executed_transaction_count;
1449                                    latest_entry_blockhash = entry_hash;
1450                                    this_block_executed_transaction_count += entry_transaction_count_u64;
1451                                    this_block_entry_count += 1;
1452
1453                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1454                                        let starting_transaction_index = usize::try_from(
1455                                            starting_transaction_index_u64,
1456                                        )
1457                                        .map_err(|err| {
1458                                            (
1459                                                FirehoseError::EntryHandlerError(Box::new(err)),
1460                                                error_slot,
1461                                            )
1462                                        })?;
1463                                        let transaction_indexes_end =
1464                                            starting_transaction_index + entry_transaction_count;
1465                                        on_entry_cb(
1466                                            thread_index,
1467                                            EntryData {
1468                                                slot: block.slot,
1469                                                entry_index,
1470                                                transaction_indexes: starting_transaction_index
1471                                                    ..transaction_indexes_end,
1472                                                num_hashes: entry.num_hashes,
1473                                                hash: entry_hash,
1474                                            },
1475                                        )
1476                                        .await
1477                                        .map_err(|e| {
1478                                            (
1479                                                FirehoseError::EntryHandlerError(e),
1480                                                error_slot,
1481                                            )
1482                                        })?;
1483                                    }
1484                                    entry_index += 1;
1485                                    fetch_add_if(
1486                                        tracking_enabled,
1487                                        &overall_entries_processed,
1488                                        1,
1489                                    );
1490                                    if let Some(ref mut stats) = thread_stats {
1491                                        stats.entries_processed += 1;
1492                                    }
1493                                }
1494                                Block(block) => {
1495                                    let prev_last_counted_slot = last_counted_slot;
1496                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1497                                        (
1498                                            stats.slots_processed,
1499                                            stats.blocks_processed,
1500                                            stats.leader_skipped_slots,
1501                                            stats.current_slot,
1502                                        )
1503                                    });
1504
1505                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1506                                    let skip_start_from_previous = last_counted_slot.saturating_add(1);
1507                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
1508
1509                                    let skipped_epoch = slot_to_epoch(last_counted_slot);
1510                                    for skipped_slot in skip_start..slot {
1511                                        if slot_to_epoch(skipped_slot) != skipped_epoch {
1512                                            break;
1513                                        }
1514                                        log::debug!(
1515                                            target: &log_target,
1516                                            "leader skipped slot {} (prev_counted {}, current slot {})",
1517                                            skipped_slot,
1518                                            prev_last_counted_slot,
1519                                            slot,
1520                                        );
1521                                        if block_enabled {
1522                                            pending_skipped_slots
1523                                                .entry(thread_index)
1524                                                .or_default()
1525                                                .insert(skipped_slot);
1526                                        }
1527                                        if block_enabled
1528                                            && let Some(on_block_cb) = on_block.as_ref()
1529                                            && skipped_slot > last_emitted_slot {
1530                                                last_emitted_slot = skipped_slot;
1531                                                on_block_cb(
1532                                                    thread_index,
1533                                                    BlockData::PossibleLeaderSkipped {
1534                                                        slot: skipped_slot,
1535                                                    },
1536                                                )
1537                                                .await
1538                                                .map_err(|e| {
1539                                                    (
1540                                                        FirehoseError::BlockHandlerError(e),
1541                                                        error_slot,
1542                                                    )
1543                                                })?;
1544                                            }
1545                                        if tracking_enabled {
1546                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1547                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1548                                            if let Some(ref mut stats) = thread_stats {
1549                                                stats.leader_skipped_slots += 1;
1550                                                stats.slots_processed += 1;
1551                                                stats.current_slot = skipped_slot;
1552                                            }
1553                                        }
1554                                        last_counted_slot = skipped_slot;
1555                                    }
1556
1557                                    let cleared_pending_skip = if block_enabled {
1558                                        clear_pending_skip(
1559                                            &pending_skipped_slots,
1560                                            thread_index,
1561                                            slot,
1562                                        )
1563                                    } else {
1564                                        false
1565                                    };
1566
1567                                    if slot <= last_counted_slot && !cleared_pending_skip {
1568                                        log::debug!(
1569                                            target: &log_target,
1570                                            "duplicate block {}, already counted (last_counted={})",
1571                                            slot,
1572                                            last_counted_slot,
1573                                        );
1574                                        this_block_rewards = DecodedRewards::empty();
1575                                        continue;
1576                                    }
1577
1578                                    if block_enabled {
1579                                        if let Some(on_block_cb) = on_block.as_ref() {
1580                                            let DecodedRewards {
1581                                                keyed_rewards,
1582                                                num_partitions,
1583                                            } = std::mem::take(&mut this_block_rewards);
1584                                            if slot > last_emitted_slot {
1585                                                last_emitted_slot = slot;
1586                                                on_block_cb(
1587                                                    thread_index,
1588                                                    BlockData::Block {
1589                                                        parent_slot: block.meta.parent_slot,
1590                                                        parent_blockhash: previous_blockhash,
1591                                                        slot: block.slot,
1592                                                        blockhash: latest_entry_blockhash,
1593                                                        rewards: KeyedRewardsAndNumPartitions {
1594                                                            keyed_rewards,
1595                                                            num_partitions,
1596                                                        },
1597                                                        block_time: Some(block.meta.blocktime as i64),
1598                                                        block_height: block.meta.block_height,
1599                                                        executed_transaction_count:
1600                                                            this_block_executed_transaction_count,
1601                                                        entry_count: this_block_entry_count,
1602                                                    },
1603                                                )
1604                                                .await
1605                                                .map_err(|e| {
1606                                                    (
1607                                                        FirehoseError::BlockHandlerError(e),
1608                                                        error_slot,
1609                                                    )
1610                                                })?;
1611                                            }
1612                                        }
1613                                    } else {
1614                                        this_block_rewards = DecodedRewards::empty();
1615                                    }
1616                                    previous_blockhash = latest_entry_blockhash;
1617
1618                                    if tracking_enabled {
1619                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1620                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1621                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1622                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1623                                        if let Some(ref mut stats) = thread_stats {
1624                                            stats.blocks_processed += 1;
1625                                            stats.slots_processed += 1;
1626                                            stats.current_slot = slot;
1627                                        }
1628
1629                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1630                                            (&stats_tracking, thread_stats.as_mut())
1631                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1632                                                && let Err(err) = maybe_emit_stats(
1633                                                    stats_tracking.as_ref(),
1634                                                    thread_index,
1635                                                    thread_stats_ref,
1636                                                    &overall_slots_processed,
1637                                                    &overall_blocks_processed,
1638                                                    &overall_transactions_processed,
1639                                                    &overall_entries_processed,
1640                                                &transactions_since_stats,
1641                                                &blocks_since_stats,
1642                                                &slots_since_stats,
1643                                                &last_pulse,
1644                                                start_time,
1645                                            )
1646                                            .await
1647                                            {
1648                                                blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1649                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1650                                                    overall_blocks_processed
1651                                                        .fetch_sub(1, Ordering::Relaxed);
1652                                                    overall_slots_processed
1653                                                        .fetch_sub(1, Ordering::Relaxed);
1654                                                    if let Some((
1655                                                        prev_slots_processed,
1656                                                        prev_blocks_processed,
1657                                                        prev_leader_skipped,
1658                                                        prev_current_slot,
1659                                                    )) = thread_stats_snapshot
1660                                                    {
1661                                                        thread_stats_ref.slots_processed =
1662                                                            prev_slots_processed;
1663                                                        thread_stats_ref.blocks_processed =
1664                                                            prev_blocks_processed;
1665                                                        thread_stats_ref.leader_skipped_slots =
1666                                                            prev_leader_skipped;
1667                                                        thread_stats_ref.current_slot =
1668                                                            prev_current_slot;
1669                                                    }
1670                                                    last_counted_slot = prev_last_counted_slot;
1671                                                    return Err(err);
1672                                                }
1673                                    }
1674
1675                                    if slot > last_counted_slot {
1676                                        last_counted_slot = slot;
1677                                    }
1678                                }
1679                                Subset(_subset) => (),
1680                                Epoch(_epoch) => (),
1681                                Rewards(rewards) => {
1682                                    if reward_enabled || block_enabled {
1683                                        let reassembled = nodes
1684                                            .reassemble_dataframes(&rewards.data)
1685                                            .map_err(|err| {
1686                                                (
1687                                                    FirehoseError::NodeDecodingError(item_index, err),
1688                                                    current_slot.unwrap_or(slot_range.start),
1689                                                )
1690                                            })?;
1691                                        if reassembled.is_empty() {
1692                                            this_block_rewards = DecodedRewards::empty();
1693                                            if reward_enabled
1694                                                && let Some(on_reward_cb) = on_reward.as_ref()
1695                                            {
1696                                                on_reward_cb(
1697                                                    thread_index,
1698                                                    RewardsData {
1699                                                        slot: block.slot,
1700                                                        rewards: Vec::new(),
1701                                                    },
1702                                                )
1703                                                .await
1704                                                .map_err(|e| {
1705                                                    (
1706                                                        FirehoseError::RewardHandlerError(e),
1707                                                        error_slot,
1708                                                    )
1709                                                })?;
1710                                            }
1711                                            continue;
1712                                        }
1713
1714                                        let decoded_rewards =
1715                                            decode_rewards_from_frame(block.slot, reassembled)
1716                                                .map_err(|err| {
1717                                                    (
1718                                                        FirehoseError::NodeDecodingError(
1719                                                            item_index,
1720                                                            err,
1721                                                        ),
1722                                                        error_slot,
1723                                                    )
1724                                                })?;
1725                                        if reward_enabled
1726                                            && let Some(on_reward_cb) = on_reward.as_ref()
1727                                        {
1728                                            on_reward_cb(
1729                                                thread_index,
1730                                                RewardsData {
1731                                                    slot: block.slot,
1732                                                    rewards: decoded_rewards.keyed_rewards.clone(),
1733                                                },
1734                                            )
1735                                            .await
1736                                            .map_err(|e| {
1737                                                (
1738                                                    FirehoseError::RewardHandlerError(e),
1739                                                    error_slot,
1740                                                )
1741                                            })?;
1742                                        }
1743                                        this_block_rewards = decoded_rewards;
1744                                        if let Some(ref mut stats) = thread_stats {
1745                                            stats.rewards_processed +=
1746                                                this_block_rewards.keyed_rewards.len() as u64;
1747                                        }
1748                                    }
1749                                }
1750                                DataFrame(_data_frame) => (),
1751                            }
1752                        }
1753                        if !reverse_mode_local && block.slot == slot_range.end - 1 {
1754                            let finish_time = std::time::Instant::now();
1755                            let elapsed = finish_time.duration_since(start_time);
1756                            log::info!(target: &log_target, "processed slot {}", block.slot);
1757                            let elapsed_pretty = human_readable_duration(elapsed);
1758                            log::info!(
1759                                target: &log_target,
1760                                "processed {} slots across {} epochs in {}.",
1761                                slot_range.end - slot_range.start,
1762                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1763                                elapsed_pretty
1764                            );
1765                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1766                            // On completion, report threads with non-zero error counts for
1767                            // visibility.
1768                            let summary: String = error_counts
1769                                .iter()
1770                                .enumerate()
1771                                .filter_map(|(i, c)| {
1772                                    let v = c.load(Ordering::Relaxed);
1773                                    if v > 0 {
1774                                        Some(format!("{:03}({})", i, v))
1775                                    } else {
1776                                        None
1777                                    }
1778                                })
1779                                .collect::<Vec<_>>()
1780                                .join(", ");
1781                            if !summary.is_empty() {
1782                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1783                            }
1784                            return Ok(());
1785                        }
1786                    }
1787                    if reverse_mode_local {
1788                        // Mark this epoch as fully processed so retries skip it.
1789                        if epoch_num == reverse_highest_remaining_epoch {
1790                            reverse_highest_remaining_epoch =
1791                                reverse_highest_remaining_epoch.saturating_sub(1);
1792                        }
1793                        if matches!(
1794                            reverse_partial_resume,
1795                            Some(s) if slot_to_epoch(s) == epoch_num
1796                        ) {
1797                            reverse_partial_resume = None;
1798                        }
1799                    }
1800                    if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1801                        && last_counted_slot < expected_last_slot
1802                    {
1803                        // Do not synthesize skipped slots during final flush; another thread may
1804                        // cover the remaining range (especially across epoch boundaries).
1805                    }
1806                    if let Some(ref mut stats) = thread_stats {
1807                        stats.finish_time = Some(std::time::Instant::now());
1808                        maybe_emit_stats(
1809                            stats_tracking.as_ref(),
1810                            thread_index,
1811                            stats,
1812                            &overall_slots_processed,
1813                            &overall_blocks_processed,
1814                            &overall_transactions_processed,
1815                            &overall_entries_processed,
1816                            &transactions_since_stats,
1817                            &blocks_since_stats,
1818                            &slots_since_stats,
1819                            &last_pulse,
1820                            start_time,
1821                        )
1822                        .await?;
1823                    }
1824                    if block_enabled {
1825                        pending_skipped_slots.remove(&thread_index);
1826                    }
1827                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1828                    }
1829                    Ok(())
1830            }
1831            .await
1832            {
1833                if is_shutdown_error(&err) {
1834                    log::info!(
1835                        target: &log_target,
1836                        "shutdown requested; terminating firehose thread {}",
1837                        thread_index
1838                    );
1839                    break;
1840                }
1841                let epoch = slot_to_epoch(slot);
1842                let item_index = match &err {
1843                    FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1844                    _ => 0,
1845                };
1846                let error_message = err.to_string();
1847                log::error!(
1848                    target: &log_target,
1849                    "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1850                    slot,
1851                    epoch
1852                );
1853                log::error!(target: &log_target, "{}", error_message);
1854                if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1855                    || error_message.contains("Unknown CID version")
1856                {
1857                    // Clear cached index data for this epoch to avoid retrying with a bad/partial index
1858                    // (or a bad seek offset that landed mid-stream).
1859                    SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1860                }
1861                if let Some(on_error_cb) = on_error.clone() {
1862                    let context = FirehoseErrorContext {
1863                        thread_id: thread_index,
1864                        slot,
1865                        epoch,
1866                        error_message: error_message.clone(),
1867                    };
1868                    if let Err(handler_err) = on_error_cb(thread_index, context).await {
1869                        log::error!(
1870                            target: &log_target,
1871                            "on_error handler failed: {}",
1872                            handler_err
1873                        );
1874                    }
1875                }
1876                // Increment this thread's error counter
1877                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1878                log::warn!(
1879                    target: &log_target,
1880                    "restarting from slot {} at index {}",
1881                    slot,
1882                    item_index,
1883                );
1884                // Update slot range to resume from the failed slot, not the original start.
1885                // Reset local tracking so we don't treat the resumed slot range as already counted.
1886                // If we've already counted this slot, resume from the next one to avoid duplicates.
1887                if reverse_mode_local {
1888                    // In reverse mode, completed higher epochs are tracked via
1889                    // reverse_highest_remaining_epoch and the within-epoch resume slot lives in
1890                    // reverse_partial_resume; slot_range stays at its original bounds.
1891                    let resume_slot = if slot <= last_counted_slot {
1892                        last_counted_slot.saturating_add(1)
1893                    } else {
1894                        slot
1895                    };
1896                    reverse_partial_resume = Some(resume_slot);
1897                } else if slot <= last_counted_slot {
1898                    slot_range.start = last_counted_slot.saturating_add(1);
1899                } else {
1900                    slot_range.start = slot;
1901                }
1902                // Reset pulse timer to exclude downtime from next rate calc.
1903                last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1904                if tracking_enabled
1905                    && let Some(ref mut stats_ref) = thread_stats {
1906                        stats_ref.slot_range.start = slot_range.start;
1907                        stats_ref.slot_range.end = slot_range.end;
1908                        // initial_slot_range remains unchanged for progress reporting.
1909                    }
1910                if block_enabled {
1911                    pending_skipped_slots.remove(&thread_index);
1912                }
1913                // `skip_until_index` is unsafe across retries because `item_index`
1914                // is reset to 0 each epoch restart. Keeping it can skip large portions
1915                // of the stream and silently drop slots.
1916                skip_until_index = None;
1917                last_emitted_slot_global = last_emitted_slot;
1918            }
1919        });
1920        handles.push(handle);
1921    }
1922
1923    // Wait for all threads to complete
1924    for handle in handles {
1925        handle.await.unwrap();
1926    }
1927    if stats_tracking.is_some() {
1928        let elapsed = firehose_start.elapsed();
1929        let elapsed_secs = elapsed.as_secs_f64();
1930        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1931        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1932        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1933        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1934        let total_errors: u64 = error_counts
1935            .iter()
1936            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1937            .sum();
1938        let overall_tps = if elapsed_secs > 0.0 {
1939            total_transactions as f64 / elapsed_secs
1940        } else {
1941            0.0
1942        };
1943        log::info!(
1944            target: LOG_MODULE,
1945            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1946            elapsed_secs,
1947            total_slots,
1948            total_blocks,
1949            total_leader_skipped,
1950            total_transactions,
1951            overall_tps,
1952            total_errors
1953        );
1954    }
1955    if shutdown_flag.load(Ordering::SeqCst) {
1956        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1957    } else {
1958        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1959    }
1960    Ok(())
1961}
1962
1963#[allow(clippy::result_large_err)]
1964/// Builds a Geyser-backed firehose and returns a slot notification stream.
1965///
1966/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1967/// rather than relying solely on remote streams. The provided `slot_range` is treated as a
1968/// half-open interval `[start, end)`, and the thread will restart from the last processed
1969/// slot on recoverable errors to maintain coverage.
1970pub fn firehose_geyser(
1971    rt: Arc<tokio::runtime::Runtime>,
1972    slot_range: Range<u64>,
1973    geyser_config_files: Option<&[PathBuf]>,
1974    index_base_url: &Url,
1975    client: &Client,
1976    on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1977    threads: u64,
1978) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1979    if threads == 0 {
1980        return Err((
1981            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1982            slot_range.start,
1983        ));
1984    }
1985    log::info!(target: LOG_MODULE, "starting firehose...");
1986    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1987    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1988    let mut entry_notifier_maybe = None;
1989    let mut block_meta_notifier_maybe = None;
1990    let mut transaction_notifier_maybe = None;
1991    if let Some(geyser_config_files) = geyser_config_files {
1992        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1993
1994        let service =
1995            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1996                confirmed_bank_receiver.clone(),
1997                true,
1998                geyser_config_files,
1999            )
2000            .map_err(|e| (e.into(), slot_range.start))?;
2001
2002        transaction_notifier_maybe = Some(
2003            service
2004                .get_transaction_notifier()
2005                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
2006                .map_err(|e| (e, slot_range.start))?,
2007        );
2008
2009        entry_notifier_maybe = service.get_entry_notifier();
2010        block_meta_notifier_maybe = service.get_block_metadata_notifier();
2011
2012        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
2013    }
2014
2015    if entry_notifier_maybe.is_some() {
2016        log::debug!(target: LOG_MODULE, "entry notifications enabled")
2017    } else {
2018        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
2019    }
2020    log::info!(target: LOG_MODULE, "running on_load...");
2021    rt.spawn(on_load);
2022
2023    let slot_range = Arc::new(slot_range);
2024    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
2025    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
2026    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
2027    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
2028
2029    // divide slot_range into n subranges
2030    let subranges = generate_subranges(&slot_range, threads);
2031    if threads > 1 {
2032        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
2033    }
2034
2035    let mut handles = Vec::new();
2036    // Shared per-thread error counters
2037    let error_counts: Arc<Vec<AtomicU32>> =
2038        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
2039
2040    for (i, slot_range) in subranges.into_iter().enumerate() {
2041        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
2042        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
2043        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
2044        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
2045        let client = client.clone();
2046        let error_counts = error_counts.clone();
2047
2048        let rt_clone = rt.clone();
2049
2050        let handle = std::thread::spawn(move || {
2051            rt_clone.block_on(async {
2052                firehose_geyser_thread(
2053                    slot_range,
2054                    transaction_notifier_maybe,
2055                    entry_notifier_maybe,
2056                    block_meta_notifier_maybe,
2057                    confirmed_bank_sender,
2058                    &client,
2059                    if threads > 1 { Some(i) } else { None },
2060                    error_counts,
2061                )
2062                .await
2063                .unwrap();
2064            });
2065        });
2066        handles.push(handle);
2067    }
2068
2069    // Wait for all threads to complete
2070    for handle in handles {
2071        handle.join().unwrap();
2072    }
2073    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
2074    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
2075        block_meta_notifier.notify_block_metadata(
2076            u64::MAX,
2077            "unload",
2078            u64::MAX,
2079            "unload",
2080            &KeyedRewardsAndNumPartitions {
2081                keyed_rewards: vec![],
2082                num_partitions: None,
2083            },
2084            None,
2085            None,
2086            0,
2087            0,
2088        );
2089    }
2090    Ok(confirmed_bank_receiver)
2091}
2092
2093#[allow(clippy::too_many_arguments)]
2094#[allow(clippy::result_large_err)]
2095async fn firehose_geyser_thread(
2096    mut slot_range: Range<u64>,
2097    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
2098    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
2099    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
2100    confirmed_bank_sender: Sender<SlotNotification>,
2101    client: &Client,
2102    thread_index: Option<usize>,
2103    error_counts: Arc<Vec<AtomicU32>>,
2104) -> Result<(), (FirehoseError, u64)> {
2105    let start_time = std::time::Instant::now();
2106    let log_target = if let Some(thread_index) = thread_index {
2107        format!("{}::T{:03}", LOG_MODULE, thread_index)
2108    } else {
2109        LOG_MODULE.to_string()
2110    };
2111    let initial_slot_range = slot_range.clone();
2112    let mut skip_until_index = None;
2113    let mut last_counted_slot = slot_range.start.saturating_sub(1);
2114    // let mut triggered = false;
2115    while let Err((err, slot)) = async {
2116            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
2117            log::info!(
2118                target: &log_target,
2119                "slot range: {} (epoch {}) ... {} (epoch {})",
2120                slot_range.start,
2121                slot_to_epoch(slot_range.start),
2122                slot_range.end,
2123                slot_to_epoch(slot_range.end)
2124            );
2125
2126            log::info!(target: &log_target, "🚒 starting firehose...");
2127
2128            // for each epoch
2129            let mut current_slot: Option<u64> = None;
2130            for epoch_num in epoch_range.clone() {
2131                log::info!(target: &log_target, "entering epoch {}", epoch_num);
2132                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
2133                    Ok(stream) => stream,
2134                    Err(_) => {
2135                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
2136                    }
2137                };
2138                let mut reader = NodeReader::new(stream);
2139
2140                let header_fut = reader.read_raw_header();
2141                let header = match timeout(OP_TIMEOUT, header_fut).await {
2142                    Ok(res) => res
2143                        .map_err(FirehoseError::ReadHeader)
2144                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2145                    Err(_) => {
2146                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
2147                    }
2148                };
2149                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
2150
2151                let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
2152                let local_start = std::cmp::max(slot_range.start, epoch_start);
2153                let local_end_inclusive =
2154                    std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
2155                if local_start > local_end_inclusive {
2156                    log::debug!(
2157                        target: &log_target,
2158                        "epoch {} has no overlap with thread range ({}..{}), skipping",
2159                        epoch_num,
2160                        slot_range.start,
2161                        slot_range.end
2162                    );
2163                    continue;
2164                }
2165
2166                let mut todo_previous_blockhash = Hash::default();
2167                let mut todo_latest_entry_blockhash = Hash::default();
2168                // Reset counters to align to the local epoch slice; prevents boundary slots
2169                // from being treated as already-counted after a restart.
2170                last_counted_slot = local_start.saturating_sub(1);
2171                current_slot = None;
2172
2173                if local_start > epoch_start {
2174                    // Seek to the nearest previous indexed slot so the reader captures the full
2175                    // node set (transactions, entries, rewards) for the target block.
2176                    let seek_slot = match timeout(
2177                        OP_TIMEOUT,
2178                        find_previous_indexed_slot(local_start, epoch_start, &log_target),
2179                    )
2180                    .await
2181                    {
2182                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2183                        Err(_) => {
2184                            return Err((
2185                                FirehoseError::OperationTimeout(
2186                                    "seek_to_previous_indexed_slot",
2187                                ),
2188                                current_slot.unwrap_or(slot_range.start),
2189                            ));
2190                        }
2191                    };
2192                    if let Some(seek_slot) = seek_slot {
2193                        let seek_fut = reader.seek_to_slot(seek_slot);
2194                        match timeout(OP_TIMEOUT, seek_fut).await {
2195                            Ok(res) => {
2196                                res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2197                            }
2198                            Err(_) => {
2199                                return Err((
2200                                    FirehoseError::OperationTimeout("seek_to_slot"),
2201                                    current_slot.unwrap_or(slot_range.start),
2202                                ));
2203                            }
2204                        }
2205                    }
2206                }
2207
2208                // for each item in each block
2209                let mut item_index = 0;
2210                let mut displayed_skip_message = false;
2211                loop {
2212                    let read_fut = reader.read_until_block();
2213                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2214                        Ok(result) => result
2215                            .map_err(FirehoseError::ReadUntilBlockError)
2216                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2217                        Err(_) => {
2218                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2219                            let restart_slot =
2220                                current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2221                            return Err((
2222                                FirehoseError::OperationTimeout("read_until_block"),
2223                                restart_slot,
2224                            ));
2225                        }
2226                    };
2227                    if nodes.is_empty() {
2228                        log::info!(
2229                            target: &log_target,
2230                            "reached end of epoch {}",
2231                            epoch_num
2232                        );
2233                        break;
2234                    }
2235                    // ignore epoch and subset nodes at end of car file loop { if
2236                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
2237                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
2238                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
2239                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
2240                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
2241                    //         log::info!(target: &log_target, "reached end of epoch {}",
2242                    //             epoch_num); break; }
2243                    if let Some(last_node) = nodes.0.last()
2244                        && !last_node.get_node().is_block() {
2245                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2246                            break;
2247                        }
2248                    let block = nodes
2249                        .get_block()
2250                        .map_err(FirehoseError::GetBlockError)
2251                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2252                    log::debug!(
2253                        target: &log_target,
2254                        "read {} items from epoch {}, now at slot {}",
2255                        item_index,
2256                        epoch_num,
2257                        block.slot
2258                    );
2259                    let slot = block.slot;
2260                    if slot > local_end_inclusive {
2261                        log::debug!(
2262                            target: &log_target,
2263                            "reached end of local slice at slot {} (epoch {}), stopping",
2264                            slot,
2265                            epoch_num
2266                        );
2267                        break;
2268                    }
2269                    if slot >= slot_range.end {
2270                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2271                        // Return early to terminate the firehose thread cleanly. We use >=
2272                        // because slot_range is half-open [start, end), so any slot equal to
2273                        // end is out-of-range and must not be processed.
2274                        return Ok(());
2275                    }
2276                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2277                    if slot < local_start {
2278                        if slot.saturating_add(1) == local_start {
2279                            log::debug!(
2280                                target: &log_target,
2281                                "priming reader with preceding slot {}, skipping",
2282                                slot
2283                            );
2284                        } else {
2285                            log::warn!(
2286                                target: &log_target,
2287                                "encountered slot {} before start of range {}, skipping",
2288                                slot,
2289                                local_start
2290                            );
2291                        }
2292                        continue;
2293                    }
2294                    current_slot = Some(slot);
2295                    let mut entry_index: usize = 0;
2296                    let mut this_block_executed_transaction_count: u64 = 0;
2297                    let mut this_block_entry_count: u64 = 0;
2298                    let mut this_block_rewards = DecodedRewards::empty();
2299
2300                    if slot <= last_counted_slot {
2301                        log::debug!(
2302                            target: &log_target,
2303                            "duplicate block {}, already counted (last_counted={})",
2304                            slot,
2305                            last_counted_slot,
2306                        );
2307                        continue;
2308                    }
2309
2310                    nodes.each(|node_with_cid| -> Result<(), SharedError> {
2311                        item_index += 1;
2312                        // if item_index == 100000 && !triggered { log::info!("simulating
2313                        //     error"); triggered = true; return
2314                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
2315                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
2316                        //         "simulated error", )), ))); }
2317                        if let Some(skip) = skip_until_index {
2318                            if item_index < skip {
2319                                if !displayed_skip_message {
2320                                    log::info!(
2321                                        target: &log_target,
2322                                        "skipping until index {} (at {})",
2323                                        skip,
2324                                        item_index
2325                                    );
2326                                    displayed_skip_message = true;
2327                                }
2328                                return Ok(());
2329                            } else {
2330                                log::info!(
2331                                    target: &log_target,
2332                                    "reached target index {}, resuming...",
2333                                    skip
2334                                );
2335                                skip_until_index = None;
2336                            }
2337                        }
2338                        let node = node_with_cid.get_node();
2339
2340                        use crate::node::Node::*;
2341                        match node {
2342                            Transaction(tx) => {
2343                                let versioned_tx = tx.as_parsed()?;
2344                                let reassembled_metadata = nodes.reassemble_dataframes(&tx.metadata)?;
2345
2346                                let as_native_metadata = decode_transaction_status_meta_from_frame(
2347                                    block.slot,
2348                                    reassembled_metadata,
2349                                )?;
2350
2351                                let message_hash = {
2352                                    #[cfg(feature = "verify-transaction-signatures")]
2353                                    {
2354                                        versioned_tx.verify_and_hash_message()?
2355                                    }
2356                                    #[cfg(not(feature = "verify-transaction-signatures"))]
2357                                    {
2358                                        // Signature verification is optional because it is
2359                                        // extremely expensive at replay scale.
2360                                        versioned_tx.message.hash()
2361                                    }
2362                                };
2363                                let signature = versioned_tx
2364                                    .signatures
2365                                    .first()
2366                                    .ok_or_else(|| {
2367                                        Box::new(std::io::Error::new(
2368                                            std::io::ErrorKind::InvalidData,
2369                                            "transaction missing signature",
2370                                        )) as SharedError
2371                                    })?;
2372                                let is_vote = is_simple_vote_transaction(&versioned_tx);
2373
2374                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2375                                    transaction_notifier.notify_transaction(
2376                                        block.slot,
2377                                        tx.index.unwrap() as usize,
2378                                        signature,
2379                                        &message_hash,
2380                                        is_vote,
2381                                        &as_native_metadata,
2382                                        &versioned_tx,
2383                                    );
2384                                }
2385
2386                            }
2387                            Entry(entry) => {
2388                                let entry_hash = Hash::from(entry.hash.to_bytes());
2389                                let entry_transaction_count = entry.transactions.len();
2390                                let entry_transaction_count_u64 = entry_transaction_count as u64;
2391                                let starting_transaction_index =
2392                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2393                                        Box::new(std::io::Error::other(
2394                                            "transaction index exceeds usize range",
2395                                        )) as SharedError
2396                                    })?;
2397                                todo_latest_entry_blockhash = entry_hash;
2398                                this_block_executed_transaction_count += entry_transaction_count_u64;
2399                                this_block_entry_count += 1;
2400                                if entry_notifier_maybe.is_none() {
2401                                    return Ok(());
2402                                }
2403                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2404                                let entry_summary = solana_entry::entry::EntrySummary {
2405                                    num_hashes: entry.num_hashes,
2406                                    hash: Hash::from(entry.hash.to_bytes()),
2407                                    num_transactions: entry_transaction_count_u64,
2408                                };
2409                                entry_notifier.notify_entry(
2410                                    block.slot,
2411                                    entry_index,
2412                                    &entry_summary,
2413                                    starting_transaction_index,
2414                                );
2415                                entry_index += 1;
2416                            }
2417                            Block(block) => {
2418                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2419                                confirmed_bank_sender.send(notification).unwrap();
2420
2421                                if block_meta_notifier_maybe.is_none() {
2422                                    last_counted_slot = block.slot;
2423                                    return Ok(());
2424                                }
2425                                let DecodedRewards {
2426                                    keyed_rewards,
2427                                    num_partitions,
2428                                } = std::mem::take(&mut this_block_rewards);
2429                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2430                                block_meta_notifier.notify_block_metadata(
2431                                    block.meta.parent_slot,
2432                                    todo_previous_blockhash.to_string().as_str(),
2433                                    block.slot,
2434                                    todo_latest_entry_blockhash.to_string().as_str(),
2435                                    &KeyedRewardsAndNumPartitions {
2436                                        keyed_rewards,
2437                                        num_partitions,
2438                                    },
2439                                    Some(block.meta.blocktime as i64),
2440                                    block.meta.block_height,
2441                                    this_block_executed_transaction_count,
2442                                    this_block_entry_count,
2443                                );
2444                                todo_previous_blockhash = todo_latest_entry_blockhash;
2445                                last_counted_slot = block.slot;
2446                                std::thread::yield_now();
2447                            }
2448                            Subset(_subset) => (),
2449                            Epoch(_epoch) => (),
2450                            Rewards(rewards) => {
2451                                let reassembled = nodes.reassemble_dataframes(&rewards.data)?;
2452                                if !reassembled.is_empty() {
2453                                    this_block_rewards = decode_rewards_from_frame(
2454                                        block.slot,
2455                                        reassembled,
2456                                    )?;
2457                                } else {
2458                                    this_block_rewards = DecodedRewards::empty();
2459                                }
2460                            }
2461                            DataFrame(_data_frame) => (),
2462                        }
2463                        Ok(())
2464                    })
2465                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2466                    if block.slot == slot_range.end - 1 {
2467                        let finish_time = std::time::Instant::now();
2468                        let elapsed = finish_time.duration_since(start_time);
2469                        log::info!(target: &log_target, "processed slot {}", block.slot);
2470                        let elapsed_pretty = human_readable_duration(elapsed);
2471                        log::info!(
2472                            target: &log_target,
2473                            "processed {} slots across {} epochs in {}.",
2474                            initial_slot_range.end - initial_slot_range.start,
2475                            slot_to_epoch(initial_slot_range.end)
2476                                + 1
2477                                - slot_to_epoch(initial_slot_range.start),
2478                            elapsed_pretty
2479                        );
2480                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2481                        // On completion, report threads with non-zero error counts for
2482                        // visibility.
2483                        let summary: String = error_counts
2484                            .iter()
2485                            .enumerate()
2486                            .filter_map(|(i, c)| {
2487                                let v = c.load(Ordering::Relaxed);
2488                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2489                            })
2490                            .collect::<Vec<_>>()
2491                            .join(", ");
2492                        if !summary.is_empty() {
2493                            log::debug!(target: &log_target, "threads with errors: {}", summary);
2494                        }
2495                        return Ok(());
2496                    }
2497                }
2498            }
2499            Ok(())
2500}
2501.await
2502{
2503        if is_shutdown_error(&err) {
2504            log::info!(
2505                target: &log_target,
2506                "shutdown requested; terminating firehose thread {:?}",
2507                thread_index
2508            );
2509            return Ok(());
2510        }
2511        log::error!(
2512            target: &log_target,
2513            "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2514            slot,
2515            slot_to_epoch(slot)
2516            );
2517            log::error!(target: &log_target, "{}", err);
2518            let error_message = err.to_string();
2519            if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2520                || error_message.contains("Unknown CID version")
2521            {
2522                // Clear cached index data for this epoch to avoid retrying with a bad/partial index
2523                // (or a bad seek offset that landed mid-stream).
2524                SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2525            }
2526            let item_index = match err {
2527                FirehoseError::NodeDecodingError(item_index, _) => item_index,
2528                _ => 0,
2529            };
2530            // Increment this thread's error counter
2531            let idx = thread_index.unwrap_or(0);
2532            error_counts[idx].fetch_add(1, Ordering::Relaxed);
2533            log::warn!(
2534                target: &log_target,
2535                "restarting from slot {} at index {}",
2536                slot,
2537                item_index,
2538            );
2539            // Update slot range to resume from the failed slot, not the original start.
2540            // If the failing slot was already fully processed, resume from the next slot.
2541            if slot <= last_counted_slot {
2542                slot_range.start = last_counted_slot.saturating_add(1);
2543            } else {
2544                slot_range.start = slot;
2545            }
2546            // `skip_until_index` is unsafe across retries because `item_index`
2547            // is reset to 0 each epoch restart. Keeping it can skip large portions
2548            // of the stream and silently drop slots.
2549            skip_until_index = None;
2550}
2551    Ok(())
2552}
2553
2554#[inline]
2555fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2556    if !(1..=2).contains(&versioned_tx.signatures.len()) {
2557        return false;
2558    }
2559
2560    if !matches!(
2561        versioned_tx.version(),
2562        solana_transaction::versioned::TransactionVersion::Legacy(_)
2563    ) {
2564        return false;
2565    }
2566
2567    let instructions = versioned_tx.message.instructions();
2568    if instructions.len() != 1 {
2569        return false;
2570    }
2571
2572    let program_index = instructions[0].program_id_index as usize;
2573    versioned_tx
2574        .message
2575        .static_account_keys()
2576        .get(program_index)
2577        .map(|program_id| program_id == &vote_program_id())
2578        .unwrap_or(false)
2579}
2580
2581#[inline(always)]
2582fn convert_proto_rewards(
2583    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2584) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2585    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2586    for proto_reward in proto_rewards.rewards.iter() {
2587        let reward = RewardInfo {
2588            reward_type: match proto_reward.reward_type - 1 {
2589                0 => RewardType::Fee,
2590                1 => RewardType::Rent,
2591                2 => RewardType::Staking,
2592                3 => RewardType::Voting,
2593                typ => {
2594                    return Err(Box::new(std::io::Error::other(format!(
2595                        "unsupported reward type {}",
2596                        typ
2597                    ))));
2598                }
2599            },
2600            lamports: proto_reward.lamports,
2601            post_balance: proto_reward.post_balance,
2602            commission: proto_reward.commission.parse::<u8>().ok(),
2603        };
2604        let pubkey = proto_reward
2605            .pubkey
2606            .parse::<Address>()
2607            .map_err(|err| Box::new(err) as SharedError)?;
2608        keyed_rewards.push((pubkey, reward));
2609    }
2610    Ok(keyed_rewards)
2611}
2612
2613#[inline]
2614/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
2615pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2616    let total = slot_range.end - slot_range.start;
2617    let slots_per_thread = total / threads;
2618    let remainder = total % threads;
2619
2620    let ranges: Vec<Range<u64>> = (0..threads)
2621        .map(|i| {
2622            // Distribute remainder slots to the first `remainder` threads
2623            let extra_slot = if i < remainder { 1 } else { 0 };
2624            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2625            let end = start + slots_per_thread + extra_slot;
2626            start..end
2627        })
2628        .collect();
2629
2630    // Verify that ranges cover all slots exactly
2631    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2632    assert_eq!(
2633        total_covered, total,
2634        "Range generation failed: {} threads should cover {} slots but only cover {}",
2635        threads, total, total_covered
2636    );
2637
2638    // Verify no gaps between ranges
2639    for i in 1..ranges.len() {
2640        assert_eq!(
2641            ranges[i - 1].end,
2642            ranges[i].start,
2643            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2644            i - 1,
2645            ranges[i - 1].end,
2646            i,
2647            ranges[i].start
2648        );
2649    }
2650
2651    log::info!(
2652        target: LOG_MODULE,
2653        "Generated {} thread ranges covering {} slots total",
2654        threads,
2655        total_covered
2656    );
2657    ranges
2658}
2659
2660fn human_readable_duration(duration: std::time::Duration) -> String {
2661    if duration.is_zero() {
2662        return "0s".into();
2663    }
2664    let total_secs = duration.as_secs();
2665    if total_secs < 60 {
2666        let secs_f = duration.as_secs_f64();
2667        if total_secs == 0 {
2668            format!("{:.2}s", secs_f)
2669        } else if duration.subsec_millis() == 0 {
2670            format!("{}s", total_secs)
2671        } else {
2672            format!("{:.2}s", secs_f)
2673        }
2674    } else {
2675        let mut secs = total_secs;
2676        let days = secs / 86_400;
2677        secs %= 86_400;
2678        let hours = secs / 3_600;
2679        secs %= 3_600;
2680        let minutes = secs / 60;
2681        secs %= 60;
2682        if days > 0 {
2683            if hours > 0 {
2684                format!("{days}d{hours}h")
2685            } else {
2686                format!("{days}d")
2687            }
2688        } else if hours > 0 {
2689            if minutes > 0 {
2690                format!("{hours}h{minutes}m")
2691            } else {
2692                format!("{hours}h")
2693            }
2694        } else if minutes > 0 {
2695            if secs > 0 {
2696                format!("{minutes}m{secs}s")
2697            } else {
2698                format!("{minutes}m")
2699            }
2700        } else {
2701            format!("{secs}s")
2702        }
2703    }
2704}
2705
2706#[cfg(test)]
2707fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2708    Box::pin(async move {
2709        let elapsed = stats.start_time.elapsed();
2710        let elapsed_secs = elapsed.as_secs_f64();
2711        let tps = if elapsed_secs > 0.0 {
2712            stats.transactions_processed as f64 / elapsed_secs
2713        } else {
2714            0.0
2715        };
2716        log::info!(
2717            target: LOG_MODULE,
2718            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2719            stats.thread_stats.current_slot,
2720            stats.slots_processed,
2721            stats.blocks_processed,
2722            stats.transactions_processed,
2723            stats.entries_processed,
2724            stats.rewards_processed,
2725            elapsed_secs,
2726            tps
2727        );
2728        Ok(())
2729    })
2730}
2731
2732#[cfg(test)]
2733use futures_util::FutureExt;
2734#[cfg(test)]
2735use serial_test::serial;
2736#[cfg(test)]
2737use std::sync::{Mutex, OnceLock};
2738
2739#[cfg(test)]
2740async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2741    use std::sync::Arc;
2742    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2743
2744    let found = Arc::new(AtomicBool::new(false));
2745    let observed_total = Arc::new(AtomicU64::new(0));
2746    let observed_non_vote = Arc::new(AtomicU64::new(0));
2747
2748    let found_block = found.clone();
2749    let observed_total_block = observed_total.clone();
2750    let target_slot_block = slot;
2751    let target_slot_tx = slot;
2752    let observed_non_vote_tx = observed_non_vote.clone();
2753
2754    firehose(
2755        1,
2756        false,
2757        false,
2758        None,
2759        target_slot_block..(target_slot_block + 1),
2760        Some(move |_thread_id: usize, block: BlockData| {
2761            let found_block = found_block.clone();
2762            let observed_total_block = observed_total_block.clone();
2763            async move {
2764                if block.slot() == target_slot_block {
2765                    assert!(
2766                        !block.was_skipped(),
2767                        "slot {target_slot_block} was marked leader skipped",
2768                    );
2769                    if let BlockData::Block {
2770                        executed_transaction_count,
2771                        ..
2772                    } = block
2773                    {
2774                        found_block.store(true, Ordering::Relaxed);
2775                        observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2776                    }
2777                }
2778                Ok(())
2779            }
2780            .boxed()
2781        }),
2782        Some(move |_thread_id: usize, transaction: TransactionData| {
2783            let observed_non_vote_tx = observed_non_vote_tx.clone();
2784            async move {
2785                if transaction.slot == target_slot_tx && !transaction.is_vote {
2786                    observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2787                }
2788                Ok(())
2789            }
2790            .boxed()
2791        }),
2792        None::<OnEntryFn>,
2793        None::<OnRewardFn>,
2794        None::<OnErrorFn>,
2795        None::<OnStatsTrackingFn>,
2796        None,
2797    )
2798    .await
2799    .unwrap();
2800
2801    assert!(
2802        found.load(Ordering::Relaxed),
2803        "target slot {slot} was not processed"
2804    );
2805    let observed_total = observed_total.load(Ordering::Relaxed);
2806    let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2807    assert!(
2808        observed_total > 0,
2809        "slot {slot} executed transaction count was zero"
2810    );
2811    assert!(
2812        observed_total >= min_executed,
2813        "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2814    );
2815    log::info!(
2816        target: LOG_MODULE,
2817        "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2818        observed_total,
2819        observed_non_vote
2820    );
2821}
2822
2823#[cfg(test)]
2824async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2825    use crate::index::slot_to_offset;
2826    use crate::node::Node;
2827
2828    let epoch = slot_to_epoch(slot);
2829    let client = crate::network::create_http_client();
2830    let stream = fetch_epoch_stream(epoch, &client).await;
2831    let mut reader = NodeReader::new(stream);
2832    reader
2833        .seek_to_slot(slot)
2834        .await
2835        .map_err(|err| Box::new(err) as SharedError)?;
2836
2837    let nodes = reader.read_until_block().await?;
2838    let mut transactions = 0u64;
2839    let mut entries = 0u64;
2840    let mut entry_tx_total = 0u64;
2841    let mut dataframes = 0u64;
2842    let mut rewards = 0u64;
2843    let mut subsets = 0u64;
2844    let mut epochs = 0u64;
2845    let mut block_slot = None;
2846    let mut block_entries = None;
2847    let first_kind = nodes
2848        .0
2849        .first()
2850        .map(|node| node.get_node())
2851        .map(|node| match node {
2852            Node::Transaction(_) => "transaction",
2853            Node::Entry(_) => "entry",
2854            Node::Block(_) => "block",
2855            Node::Subset(_) => "subset",
2856            Node::Epoch(_) => "epoch",
2857            Node::Rewards(_) => "rewards",
2858            Node::DataFrame(_) => "dataframe",
2859        })
2860        .unwrap_or("none");
2861
2862    for node in &nodes.0 {
2863        match node.get_node() {
2864            Node::Transaction(_) => {
2865                transactions += 1;
2866            }
2867            Node::Entry(entry) => {
2868                entries += 1;
2869                entry_tx_total += entry.transactions.len() as u64;
2870            }
2871            Node::Block(block) => {
2872                block_slot = Some(block.slot);
2873                block_entries = Some(block.entries.len());
2874            }
2875            Node::Subset(_) => {
2876                subsets += 1;
2877            }
2878            Node::Epoch(_) => {
2879                epochs += 1;
2880            }
2881            Node::Rewards(_) => {
2882                rewards += 1;
2883            }
2884            Node::DataFrame(_) => {
2885                dataframes += 1;
2886            }
2887        }
2888    }
2889
2890    log::info!(
2891        target: LOG_MODULE,
2892        "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2893        nodes.len(),
2894        first_kind,
2895        transactions,
2896        entries,
2897        entry_tx_total,
2898        block_slot,
2899        block_entries,
2900        dataframes,
2901        rewards,
2902        subsets,
2903        epochs
2904    );
2905
2906    if slot > 0 {
2907        let mut found_previous = None;
2908        for delta in 1..=5 {
2909            let candidate = slot.saturating_sub(delta);
2910            match slot_to_offset(candidate).await {
2911                Ok(offset) => {
2912                    found_previous = Some((candidate, offset));
2913                    break;
2914                }
2915                Err(err) => {
2916                    log::info!(
2917                        target: LOG_MODULE,
2918                        "slot {slot} previous lookup {candidate} failed: {err}"
2919                    );
2920                }
2921            }
2922        }
2923        if let Some((candidate, offset)) = found_previous {
2924            log::info!(
2925                target: LOG_MODULE,
2926                "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2927            );
2928        } else {
2929            log::info!(
2930                target: LOG_MODULE,
2931                "slot {slot} no previous offsets found within 5 slots"
2932            );
2933        }
2934    }
2935
2936    Ok(())
2937}
2938
2939#[tokio::test(flavor = "multi_thread")]
2940async fn test_firehose_epoch_800() {
2941    use dashmap::DashSet;
2942    use std::sync::atomic::{AtomicU64, Ordering};
2943    solana_logger::setup_with_default("info");
2944    const THREADS: usize = 4;
2945    const NUM_SLOTS_TO_COVER: u64 = 50;
2946    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2947    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2948    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2949    static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2950    static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2951    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2952    let stats_tracking = StatsTracking {
2953        on_stats: log_stats_handler,
2954        tracking_interval_slots: 10,
2955    };
2956
2957    for prev in PREV_BLOCK.iter() {
2958        prev.store(0, Ordering::Relaxed);
2959    }
2960    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2961    NUM_BLOCKS.store(0, Ordering::Relaxed);
2962    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2963    SEEN_SLOTS.get_or_init(DashSet::new).clear();
2964    SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2965
2966    firehose(
2967        THREADS.try_into().unwrap(),
2968        false,
2969        false,
2970        None,
2971        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2972        Some(|thread_id: usize, block: BlockData| {
2973            async move {
2974                let _prev =
2975                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2976                if block.was_skipped() {
2977                    log::info!(
2978                        target: LOG_MODULE,
2979                        "leader skipped block {} on thread {}",
2980                        block.slot(),
2981                        thread_id,
2982                    );
2983                } else {
2984                    /*log::info!(
2985                        target: LOG_MODULE,
2986                        "got block {} on thread {}",
2987                        block.slot(),
2988                        thread_id,
2989                    );*/
2990                }
2991
2992                let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2993                if block.was_skipped() {
2994                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2995                    SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2996                } else if first_time {
2997                    NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2998                    if let BlockData::Block {
2999                        executed_transaction_count,
3000                        ..
3001                    } = &block
3002                    {
3003                        let executed = *executed_transaction_count;
3004                        let _ = MIN_TRANSACTIONS.fetch_update(
3005                            Ordering::Relaxed,
3006                            Ordering::Relaxed,
3007                            |current| {
3008                                if executed < current {
3009                                    Some(executed)
3010                                } else {
3011                                    None
3012                                }
3013                            },
3014                        );
3015                    }
3016                }
3017                Ok(())
3018            }
3019            .boxed()
3020        }),
3021        None::<OnTxFn>,
3022        None::<OnEntryFn>,
3023        None::<OnRewardFn>,
3024        None::<OnErrorFn>,
3025        Some(stats_tracking),
3026        None,
3027    )
3028    .await
3029    .unwrap();
3030    let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
3031    assert_eq!(
3032        seen, NUM_SLOTS_TO_COVER,
3033        "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
3034    );
3035    let mut skipped: Vec<u64> = SEEN_SKIPPED
3036        .get_or_init(DashSet::new)
3037        .iter()
3038        .map(|v| *v)
3039        .collect();
3040    skipped.sort_unstable();
3041    // 345600000 is present but empty; still emitted as a block. Skip set should not include it.
3042    const EXPECTED_SKIPPED: [u64; 6] = [
3043        345_600_004,
3044        345_600_005,
3045        345_600_008,
3046        345_600_009,
3047        345_600_010,
3048        345_600_011,
3049    ];
3050    assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
3051    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
3052}
3053
3054#[tokio::test(flavor = "multi_thread")]
3055async fn test_firehose_target_slot_transactions() {
3056    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3057    solana_logger::setup_with_default("info");
3058    const TARGET_SLOT: u64 = 376_273_722;
3059    const SLOT_RADIUS: u64 = 50;
3060    const EXPECTED_TRANSACTIONS: u64 = 1414;
3061    const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
3062    static FOUND: AtomicBool = AtomicBool::new(false);
3063    static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
3064    static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
3065
3066    FOUND.store(false, Ordering::Relaxed);
3067    OBSERVED_TXS.store(0, Ordering::Relaxed);
3068    OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
3069
3070    firehose(
3071        4,
3072        false,
3073        false,
3074        None,
3075        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3076        Some(|_thread_id: usize, block: BlockData| {
3077            async move {
3078                if block.slot() == TARGET_SLOT {
3079                    assert!(
3080                        !block.was_skipped(),
3081                        "target slot {TARGET_SLOT} was marked leader skipped",
3082                    );
3083                    if let BlockData::Block {
3084                        executed_transaction_count,
3085                        ..
3086                    } = block
3087                    {
3088                        OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
3089                        FOUND.store(true, Ordering::Relaxed);
3090                        assert_eq!(
3091                            executed_transaction_count, EXPECTED_TRANSACTIONS,
3092                            "unexpected transaction count for slot {TARGET_SLOT}"
3093                        );
3094                        assert_eq!(
3095                            OBSERVED_NON_VOTE.load(Ordering::Relaxed),
3096                            EXPECTED_NON_VOTE_TRANSACTIONS,
3097                            "unexpected non-vote transaction count for slot {TARGET_SLOT}"
3098                        );
3099                    }
3100                }
3101                Ok(())
3102            }
3103            .boxed()
3104        }),
3105        Some(|_thread_id: usize, transaction: TransactionData| {
3106            async move {
3107                if transaction.slot == TARGET_SLOT && !transaction.is_vote {
3108                    OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
3109                }
3110                Ok(())
3111            }
3112            .boxed()
3113        }),
3114        None::<OnEntryFn>,
3115        None::<OnRewardFn>,
3116        None::<OnErrorFn>,
3117        None::<OnStatsTrackingFn>,
3118        None,
3119    )
3120    .await
3121    .unwrap();
3122
3123    assert!(
3124        FOUND.load(Ordering::Relaxed),
3125        "target slot was not processed"
3126    );
3127    assert_eq!(
3128        OBSERVED_TXS.load(Ordering::Relaxed),
3129        EXPECTED_TRANSACTIONS,
3130        "recorded transaction count mismatch"
3131    );
3132}
3133
3134#[cfg(test)]
3135#[serial]
3136#[tokio::test(flavor = "multi_thread")]
3137async fn test_firehose_epoch_900_boundary_window_sequential_monotonic_transactions() {
3138    use std::sync::{
3139        Arc, Mutex,
3140        atomic::{AtomicU64, Ordering},
3141    };
3142
3143    solana_logger::setup_with_default("info");
3144    const SLOT_COUNT: u64 = 100;
3145    const THREADS: u64 = 4;
3146    const TEST_BUFFER_WINDOW: &str = "4GiB";
3147
3148    let (epoch_900_start, _) = epoch_to_slot_range(900);
3149    let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3150
3151    let last_seen_tx_slot = Arc::new(Mutex::new(slot_range.start));
3152    let observed_txs = Arc::new(AtomicU64::new(0));
3153    let stats_tracking = StatsTracking {
3154        on_stats: log_stats_handler,
3155        tracking_interval_slots: 100,
3156    };
3157    let test_buffer_window_bytes = crate::system::parse_buffer_window_bytes(TEST_BUFFER_WINDOW)
3158        .expect("valid test buffer window");
3159
3160    firehose(
3161        THREADS,
3162        true,
3163        false,
3164        Some(test_buffer_window_bytes),
3165        slot_range.clone(),
3166        None::<OnBlockFn>,
3167        Some({
3168            let last_seen_tx_slot = last_seen_tx_slot.clone();
3169            let observed_txs = observed_txs.clone();
3170            move |_thread_id: usize, transaction: TransactionData| {
3171                let last_seen_tx_slot = last_seen_tx_slot.clone();
3172                let observed_txs = observed_txs.clone();
3173                async move {
3174                    let mut previous = last_seen_tx_slot.lock().unwrap();
3175                    // Old Faithful does not include leader-skipped slots, so gaps are
3176                    // expected. We only enforce monotonic (non-decreasing) tx slot ordering.
3177                    assert!(
3178                        transaction.slot >= *previous,
3179                        "transaction slot regressed: prev={}, current={}",
3180                        *previous,
3181                        transaction.slot
3182                    );
3183                    *previous = transaction.slot;
3184                    observed_txs.fetch_add(1, Ordering::Relaxed);
3185                    Ok(())
3186                }
3187                .boxed()
3188            }
3189        }),
3190        None::<OnEntryFn>,
3191        None::<OnRewardFn>,
3192        None::<OnErrorFn>,
3193        Some(stats_tracking),
3194        None,
3195    )
3196    .await
3197    .unwrap();
3198
3199    assert!(
3200        observed_txs.load(Ordering::Relaxed) > 0,
3201        "expected to observe at least one transaction in slots [{}, {})",
3202        slot_range.start,
3203        slot_range.end
3204    );
3205}
3206
3207#[cfg(test)]
3208#[serial]
3209#[tokio::test(flavor = "multi_thread")]
3210async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
3211    solana_logger::setup_with_default("info");
3212    assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
3213}
3214
3215#[cfg(test)]
3216#[serial]
3217#[tokio::test(flavor = "multi_thread")]
3218async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
3219    solana_logger::setup_with_default("info");
3220    assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
3221}
3222
3223#[cfg(test)]
3224#[serial]
3225#[tokio::test(flavor = "multi_thread")]
3226async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
3227    solana_logger::setup_with_default("info");
3228    assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
3229}
3230
3231#[cfg(test)]
3232#[serial]
3233#[tokio::test(flavor = "multi_thread")]
3234async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
3235    solana_logger::setup_with_default("info");
3236    assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
3237}
3238
3239#[cfg(test)]
3240#[ignore]
3241#[serial]
3242#[tokio::test(flavor = "multi_thread")]
3243async fn debug_epoch_720_slot_311173980_node_summary() {
3244    solana_logger::setup_with_default("info");
3245    const SLOTS: &[u64] = &[
3246        311_173_980,
3247        311_225_232,
3248        311_175_860,
3249        311_134_608,
3250        376_273_722,
3251    ];
3252    for slot in SLOTS {
3253        log_slot_node_summary(*slot).await.expect("slot summary");
3254    }
3255}
3256
3257#[tokio::test(flavor = "multi_thread")]
3258async fn test_firehose_epoch_850_has_logs() {
3259    use std::sync::atomic::{AtomicU64, Ordering};
3260    solana_logger::setup_with_default("info");
3261    const START_SLOT: u64 = 367_200_075; // within epoch 850
3262    const SLOT_COUNT: u64 = 50;
3263    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3264
3265    TOTAL_TXS.store(0, Ordering::Relaxed);
3266
3267    firehose(
3268        4,
3269        false,
3270        false,
3271        None,
3272        START_SLOT..(START_SLOT + SLOT_COUNT),
3273        None::<OnBlockFn>,
3274        Some(|_thread_id: usize, transaction: TransactionData| {
3275            async move {
3276                TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3277                if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3278                    let has_logs = logs.iter().any(|msg| !msg.is_empty());
3279                    assert!(has_logs);
3280                }
3281                Ok(())
3282            }
3283            .boxed()
3284        }),
3285        None::<OnEntryFn>,
3286        None::<OnRewardFn>,
3287        None::<OnErrorFn>,
3288        None::<OnStatsTrackingFn>,
3289        None,
3290    )
3291    .await
3292    .unwrap();
3293
3294    assert!(
3295        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3296        "no transactions observed in epoch 850 range"
3297    );
3298}
3299
3300#[tokio::test(flavor = "multi_thread")]
3301async fn test_firehose_epoch_850_votes_present() {
3302    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3303    solana_logger::setup_with_default("info");
3304    const TARGET_SLOT: u64 = 367_200_100; // epoch 850
3305    const SLOT_RADIUS: u64 = 10;
3306    static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3307    static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3308    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3309
3310    SEEN_BLOCK.store(false, Ordering::Relaxed);
3311    VOTE_TXS.store(0, Ordering::Relaxed);
3312    TOTAL_TXS.store(0, Ordering::Relaxed);
3313
3314    firehose(
3315        2,
3316        false,
3317        false,
3318        None,
3319        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3320        Some(|_thread_id: usize, block: BlockData| {
3321            async move {
3322                if block.slot() == TARGET_SLOT {
3323                    assert!(
3324                        !block.was_skipped(),
3325                        "target slot {TARGET_SLOT} was marked leader skipped",
3326                    );
3327                    SEEN_BLOCK.store(true, Ordering::Relaxed);
3328                }
3329                Ok(())
3330            }
3331            .boxed()
3332        }),
3333        Some(|_thread_id: usize, transaction: TransactionData| {
3334            async move {
3335                if transaction.slot == TARGET_SLOT {
3336                    TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3337                    if transaction.is_vote {
3338                        VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3339                    }
3340                }
3341                Ok(())
3342            }
3343            .boxed()
3344        }),
3345        None::<OnEntryFn>,
3346        None::<OnRewardFn>,
3347        None::<OnErrorFn>,
3348        None::<OnStatsTrackingFn>,
3349        None,
3350    )
3351    .await
3352    .unwrap();
3353
3354    assert!(
3355        SEEN_BLOCK.load(Ordering::Relaxed),
3356        "target slot was not processed"
3357    );
3358    assert!(
3359        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3360        "no transactions counted in target slot"
3361    );
3362    assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3363}
3364
3365#[cfg(test)]
3366#[serial]
3367#[tokio::test(flavor = "multi_thread")]
3368async fn test_firehose_restart_loses_coverage_without_reset() {
3369    use std::collections::HashMap;
3370    solana_logger::setup_with_default("info");
3371    const THREADS: usize = 1;
3372    const START_SLOT: u64 = 345_600_000;
3373    const NUM_SLOTS: u64 = 8;
3374
3375    static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3376    COVERAGE
3377        .get_or_init(|| Mutex::new(HashMap::new()))
3378        .lock()
3379        .unwrap()
3380        .clear();
3381    static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3382    static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3383    FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3384    SEEN_BLOCKS.store(0, Ordering::Relaxed);
3385
3386    firehose(
3387        THREADS.try_into().unwrap(),
3388        false,
3389        false,
3390        None,
3391        START_SLOT..(START_SLOT + NUM_SLOTS),
3392        Some(|_thread_id: usize, block: BlockData| {
3393            async move {
3394                // Force an error after at least one block has been seen so restart happens mid-range.
3395                if !block.was_skipped()
3396                    && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3397                    && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3398                {
3399                    return Err("synthetic handler failure to exercise restart".into());
3400                }
3401                let mut coverage = COVERAGE
3402                    .get_or_init(|| Mutex::new(HashMap::new()))
3403                    .lock()
3404                    .unwrap();
3405                *coverage.entry(block.slot()).or_insert(0) += 1;
3406                if !block.was_skipped() {
3407                    SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3408                }
3409                Ok(())
3410            }
3411            .boxed()
3412        }),
3413        None::<OnTxFn>,
3414        None::<OnEntryFn>,
3415        None::<OnRewardFn>,
3416        None::<OnErrorFn>,
3417        None::<OnStatsTrackingFn>,
3418        None,
3419    )
3420    .await
3421    .unwrap();
3422
3423    let coverage = COVERAGE.get().unwrap().lock().unwrap();
3424    for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3425        assert!(
3426            coverage.contains_key(&slot),
3427            "missing coverage for slot {slot} after restart"
3428        );
3429    }
3430}
3431
3432#[cfg(test)]
3433#[serial]
3434#[tokio::test(flavor = "multi_thread")]
3435async fn test_firehose_gap_coverage_near_known_missing_range() {
3436    use std::collections::HashSet;
3437    solana_logger::setup_with_default("info");
3438    const GAP_START: u64 = 378864000;
3439    const START_SLOT: u64 = GAP_START - 1000;
3440    const END_SLOT: u64 = GAP_START + 1000;
3441    const THREADS: usize = 16;
3442
3443    static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3444    COVERAGE
3445        .get_or_init(|| Mutex::new(HashSet::new()))
3446        .lock()
3447        .unwrap()
3448        .clear();
3449
3450    firehose(
3451        THREADS.try_into().unwrap(),
3452        false,
3453        false,
3454        None,
3455        START_SLOT..(END_SLOT + 1),
3456        Some(|_thread_id: usize, block: BlockData| {
3457            async move {
3458                if block.was_skipped() {
3459                    return Ok(());
3460                }
3461                let slot = block.slot();
3462                COVERAGE
3463                    .get_or_init(|| Mutex::new(HashSet::new()))
3464                    .lock()
3465                    .unwrap()
3466                    .insert(slot);
3467                Ok(())
3468            }
3469            .boxed()
3470        }),
3471        None::<OnTxFn>,
3472        None::<OnEntryFn>,
3473        None::<OnRewardFn>,
3474        None::<OnErrorFn>,
3475        None::<OnStatsTrackingFn>,
3476        None,
3477    )
3478    .await
3479    .unwrap();
3480
3481    let mut coverage = COVERAGE
3482        .get_or_init(|| Mutex::new(HashSet::new()))
3483        .lock()
3484        .unwrap()
3485        .clone();
3486
3487    // ignore a known 4-slot leader skipped gap
3488    coverage.insert(378864396);
3489    coverage.insert(378864397);
3490    coverage.insert(378864398);
3491    coverage.insert(378864399);
3492
3493    let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3494    let missing: Vec<u64> = expected
3495        .iter()
3496        .copied()
3497        .filter(|slot| !coverage.contains(slot))
3498        .collect();
3499    assert!(
3500        missing.is_empty(),
3501        "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3502        missing.len(),
3503        &missing[..missing.len().min(10)]
3504    );
3505}
3506
3507#[cfg(test)]
3508#[serial]
3509#[tokio::test(flavor = "multi_thread")]
3510async fn test_firehose_sequential_reverse_crosses_epoch_boundary() {
3511    use std::sync::{
3512        Arc, Mutex,
3513        atomic::{AtomicU64, Ordering},
3514    };
3515
3516    solana_logger::setup_with_default("info");
3517    const SLOT_COUNT: u64 = 100;
3518
3519    let (epoch_900_start, _) = epoch_to_slot_range(900);
3520    let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3521
3522    let observed_blocks: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
3523    let observed_tx_count = Arc::new(AtomicU64::new(0));
3524
3525    firehose(
3526        1,
3527        true,
3528        true,
3529        None,
3530        slot_range.clone(),
3531        Some({
3532            let observed_blocks = observed_blocks.clone();
3533            move |_thread_id: usize, block: BlockData| {
3534                let observed_blocks = observed_blocks.clone();
3535                async move {
3536                    observed_blocks.lock().unwrap().push(block.slot());
3537                    Ok(())
3538                }
3539                .boxed()
3540            }
3541        }),
3542        Some({
3543            let observed_tx_count = observed_tx_count.clone();
3544            move |_thread_id: usize, _tx: TransactionData| {
3545                let observed_tx_count = observed_tx_count.clone();
3546                async move {
3547                    observed_tx_count.fetch_add(1, Ordering::Relaxed);
3548                    Ok(())
3549                }
3550                .boxed()
3551            }
3552        }),
3553        None::<OnEntryFn>,
3554        None::<OnRewardFn>,
3555        None::<OnErrorFn>,
3556        None::<OnStatsTrackingFn>,
3557        None,
3558    )
3559    .await
3560    .unwrap();
3561
3562    let observed = observed_blocks.lock().unwrap().clone();
3563    assert!(
3564        !observed.is_empty(),
3565        "expected to observe at least one block"
3566    );
3567    assert!(
3568        observed_tx_count.load(Ordering::Relaxed) > 0,
3569        "expected to observe at least one transaction"
3570    );
3571
3572    // First observed slot must be in the higher epoch (900).
3573    let first_epoch = slot_to_epoch(observed[0]);
3574    assert_eq!(
3575        first_epoch, 900,
3576        "reverse mode must start with the highest epoch, got slot {} in epoch {}",
3577        observed[0], first_epoch,
3578    );
3579
3580    // Verify within-epoch ascending order and exactly one epoch decrease.
3581    let mut transitions = 0u32;
3582    let mut current_epoch = first_epoch;
3583    let mut prev_slot_in_epoch: Option<u64> = None;
3584    for &slot in &observed {
3585        let epoch = slot_to_epoch(slot);
3586        if epoch != current_epoch {
3587            assert!(
3588                epoch < current_epoch,
3589                "epoch did not decrease across boundary: prev={current_epoch} now={epoch}",
3590            );
3591            transitions += 1;
3592            current_epoch = epoch;
3593            prev_slot_in_epoch = None;
3594        }
3595        if let Some(prev) = prev_slot_in_epoch {
3596            assert!(
3597                slot >= prev,
3598                "within epoch {epoch}, slot regressed: prev={prev} now={slot}",
3599            );
3600        }
3601        prev_slot_in_epoch = Some(slot);
3602    }
3603    assert_eq!(
3604        transitions, 1,
3605        "expected exactly one epoch transition for a range crossing one boundary",
3606    );
3607    assert_eq!(
3608        current_epoch, 899,
3609        "reverse mode should end at the lower epoch (899), got {current_epoch}",
3610    );
3611}
3612
3613#[cfg(test)]
3614#[serial]
3615#[tokio::test(flavor = "multi_thread")]
3616async fn test_firehose_reverse_implies_sequential() {
3617    use std::sync::{
3618        Arc, Mutex,
3619        atomic::{AtomicU64, Ordering},
3620    };
3621
3622    solana_logger::setup_with_default("info");
3623    const SLOT_COUNT: u64 = 100;
3624
3625    let (epoch_900_start, _) = epoch_to_slot_range(900);
3626    let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3627
3628    let observed_blocks: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
3629    let observed_tx_count = Arc::new(AtomicU64::new(0));
3630
3631    // sequential = false, reverse = true: firehose should auto-activate sequential mode.
3632    firehose(
3633        4,
3634        false,
3635        true,
3636        None,
3637        slot_range.clone(),
3638        Some({
3639            let observed_blocks = observed_blocks.clone();
3640            move |_thread_id: usize, block: BlockData| {
3641                let observed_blocks = observed_blocks.clone();
3642                async move {
3643                    observed_blocks.lock().unwrap().push(block.slot());
3644                    Ok(())
3645                }
3646                .boxed()
3647            }
3648        }),
3649        Some({
3650            let observed_tx_count = observed_tx_count.clone();
3651            move |_thread_id: usize, _tx: TransactionData| {
3652                let observed_tx_count = observed_tx_count.clone();
3653                async move {
3654                    observed_tx_count.fetch_add(1, Ordering::Relaxed);
3655                    Ok(())
3656                }
3657                .boxed()
3658            }
3659        }),
3660        None::<OnEntryFn>,
3661        None::<OnRewardFn>,
3662        None::<OnErrorFn>,
3663        None::<OnStatsTrackingFn>,
3664        None,
3665    )
3666    .await
3667    .unwrap();
3668
3669    let observed = observed_blocks.lock().unwrap().clone();
3670    assert!(
3671        !observed.is_empty(),
3672        "expected to observe at least one block"
3673    );
3674    // If sequential were ignored, multiple firehose threads would interleave epochs and the
3675    // first-observed slot is unlikely to be in epoch 900. The reverse-implies-sequential
3676    // contract requires the first observed slot to be in the highest epoch.
3677    assert_eq!(
3678        slot_to_epoch(observed[0]),
3679        900,
3680        "reverse should imply sequential and emit highest epoch first; first slot was {}",
3681        observed[0],
3682    );
3683}