Skip to main content

jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7    block_metadata_notifier_interface::BlockMetadataNotifier,
8    geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14    optimistically_confirmed_bank_tracker::SlotNotification,
15    transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21    fmt::Display,
22    future::Future,
23    io,
24    ops::Range,
25    path::PathBuf,
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29    },
30};
31use thiserror::Error;
32use tokio::{
33    sync::broadcast::{self, error::TryRecvError},
34    time::timeout,
35};
36
37use crate::{
38    LOG_MODULE, SharedError,
39    epochs::{
40        FetchEpochStreamOptions, epoch_to_slot_range, fetch_epoch_stream,
41        fetch_epoch_stream_with_options, slot_to_epoch,
42    },
43    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
44    node_reader::NodeReader,
45    utils,
46};
47
48// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
49// header, seeking, reading next block). Adjust here to tune stall detection/restart
50// aggressiveness.
51const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
52const OP_TIMEOUT_SEQUENTIAL: std::time::Duration = std::time::Duration::from_secs(180);
53// Epochs earlier than this were bincode-encoded in Old Faithful.
54const BINCODE_EPOCH_CUTOFF: u64 = 157;
55
56fn poll_shutdown(
57    flag: &Arc<std::sync::atomic::AtomicBool>,
58    receiver: &mut Option<broadcast::Receiver<()>>,
59) -> bool {
60    if let Some(rx) = receiver {
61        match rx.try_recv() {
62            Ok(_) | Err(TryRecvError::Lagged(_)) => {
63                flag.store(true, Ordering::SeqCst);
64            }
65            Err(TryRecvError::Closed) => {
66                flag.store(true, Ordering::SeqCst);
67            }
68            Err(TryRecvError::Empty) => {}
69        }
70    }
71    flag.load(Ordering::SeqCst)
72}
73
74fn is_shutdown_error(err: &FirehoseError) -> bool {
75    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
76        inner
77            .downcast_ref::<io::Error>()
78            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
79            .unwrap_or(false)
80    }
81
82    match err {
83        FirehoseError::BlockHandlerError(inner)
84        | FirehoseError::TransactionHandlerError(inner)
85        | FirehoseError::EntryHandlerError(inner)
86        | FirehoseError::RewardHandlerError(inner)
87        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
88        _ => false,
89    }
90}
91
92async fn find_previous_indexed_slot(
93    local_start: u64,
94    epoch_start: u64,
95    log_target: &str,
96) -> Result<Option<u64>, FirehoseError> {
97    if local_start <= epoch_start {
98        return Ok(None);
99    }
100    let mut candidate = local_start.saturating_sub(1);
101    let mut skipped = 0u64;
102    loop {
103        match slot_to_offset(candidate).await {
104            Ok(_) => {
105                if skipped > 0 {
106                    log::info!(
107                        target: log_target,
108                        "slot {} missing in index; seeking back {} slots to {}",
109                        local_start.saturating_sub(1),
110                        skipped,
111                        candidate
112                    );
113                }
114                return Ok(Some(candidate));
115            }
116            Err(SlotOffsetIndexError::SlotNotFound(..)) => {
117                if candidate <= epoch_start {
118                    break;
119                }
120                skipped += 1;
121                candidate = candidate.saturating_sub(1);
122            }
123            Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
124        }
125    }
126    log::warn!(
127        target: log_target,
128        "no indexed slot found before {} (epoch start {}); reading from epoch start",
129        local_start,
130        epoch_start
131    );
132    Ok(None)
133}
134
135/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
136/// the firehose.
137#[derive(Debug, Error)]
138pub enum FirehoseError {
139    /// HTTP client error surfaced from `reqwest`.
140    Reqwest(reqwest::Error),
141    /// Failure while reading the Old Faithful CAR header.
142    ReadHeader(SharedError),
143    /// Error emitted by the Solana Geyser plugin service.
144    GeyserPluginService(GeyserPluginServiceError),
145    /// Transaction notifier could not be acquired from the Geyser service.
146    FailedToGetTransactionNotifier,
147    /// Failure while reading data until the next block boundary.
148    ReadUntilBlockError(SharedError),
149    /// Failure while fetching an individual block.
150    GetBlockError(SharedError),
151    /// Failed to decode a node at the given index.
152    NodeDecodingError(usize, SharedError),
153    /// Error surfaced when querying the slot offset index.
154    SlotOffsetIndexError(SlotOffsetIndexError),
155    /// Failure while seeking to a slot within the Old Faithful CAR stream.
156    SeekToSlotError(SharedError),
157    /// Error surfaced during the plugin `on_load` stage.
158    OnLoadError(SharedError),
159    /// Error emitted while invoking the stats handler.
160    OnStatsHandlerError(SharedError),
161    /// Timeout reached while waiting for a firehose operation.
162    OperationTimeout(&'static str),
163    /// Transaction handler returned an error.
164    TransactionHandlerError(SharedError),
165    /// Entry handler returned an error.
166    EntryHandlerError(SharedError),
167    /// Reward handler returned an error.
168    RewardHandlerError(SharedError),
169    /// Block handler returned an error.
170    BlockHandlerError(SharedError),
171}
172
173unsafe impl Send for FirehoseError {}
174unsafe impl Sync for FirehoseError {}
175
176impl Display for FirehoseError {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        match self {
179            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
180            FirehoseError::ReadHeader(error) => {
181                write!(f, "Error reading header: {}", error)
182            }
183            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
184                f,
185                "Error initializing geyser plugin service: {}",
186                geyser_plugin_service_error
187            ),
188            FirehoseError::FailedToGetTransactionNotifier => write!(
189                f,
190                "Failed to get transaction notifier from GeyserPluginService"
191            ),
192            FirehoseError::ReadUntilBlockError(error) => {
193                write!(f, "Error reading until block: {}", error)
194            }
195            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
196            FirehoseError::NodeDecodingError(item_index, error) => {
197                write!(
198                    f,
199                    "Error seeking, reading data from, or decoding data for data node {}: {}",
200                    item_index, error
201                )
202            }
203            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
204                f,
205                "Error getting info from slot offset index: {}",
206                slot_offset_index_error
207            ),
208            FirehoseError::SeekToSlotError(error) => {
209                write!(f, "Error seeking to slot: {}", error)
210            }
211            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
212            FirehoseError::OnStatsHandlerError(error) => {
213                write!(f, "Stats handler error: {}", error)
214            }
215            FirehoseError::OperationTimeout(op) => {
216                write!(f, "Timeout while waiting for operation: {}", op)
217            }
218            FirehoseError::TransactionHandlerError(error) => {
219                write!(f, "Transaction handler error: {}", error)
220            }
221            FirehoseError::EntryHandlerError(error) => {
222                write!(f, "Entry handler error: {}", error)
223            }
224            FirehoseError::RewardHandlerError(error) => {
225                write!(f, "Reward handler error: {}", error)
226            }
227            FirehoseError::BlockHandlerError(error) => {
228                write!(f, "Block handler error: {}", error)
229            }
230        }
231    }
232}
233
234impl From<reqwest::Error> for FirehoseError {
235    fn from(e: reqwest::Error) -> Self {
236        FirehoseError::Reqwest(e)
237    }
238}
239
240impl From<GeyserPluginServiceError> for FirehoseError {
241    fn from(e: GeyserPluginServiceError) -> Self {
242        FirehoseError::GeyserPluginService(e)
243    }
244}
245
246impl From<SlotOffsetIndexError> for FirehoseError {
247    fn from(e: SlotOffsetIndexError) -> Self {
248        FirehoseError::SlotOffsetIndexError(e)
249    }
250}
251
252/// Per-thread progress information emitted by the firehose runner.
253#[derive(Clone, PartialEq, Eq, Hash, Debug)]
254pub struct ThreadStats {
255    /// Identifier of the worker thread reporting the stats.
256    pub thread_id: usize,
257    /// Timestamp captured when the thread began processing.
258    pub start_time: std::time::Instant,
259    /// Timestamp captured when the thread finished, if finished.
260    pub finish_time: Option<std::time::Instant>,
261    /// Slot range currently assigned to the thread (half-open, may shrink on restart).
262    pub slot_range: Range<u64>,
263    /// Original slot range assigned to the thread (half-open, never modified).
264    pub initial_slot_range: Range<u64>,
265    /// Latest slot processed by the thread.
266    pub current_slot: u64,
267    /// Total slots processed by the thread.
268    pub slots_processed: u64,
269    /// Number of blocks successfully processed.
270    pub blocks_processed: u64,
271    /// Number of slots skipped by the cluster leader.
272    pub leader_skipped_slots: u64,
273    /// Total transactions processed.
274    pub transactions_processed: u64,
275    /// Total entries processed.
276    pub entries_processed: u64,
277    /// Total rewards processed.
278    pub rewards_processed: u64,
279}
280
281/// Aggregated firehose statistics covering all worker threads.
282#[derive(Clone, PartialEq, Eq, Hash, Debug)]
283pub struct Stats {
284    /// Per-thread statistics for the current update.
285    pub thread_stats: ThreadStats,
286    /// Timestamp captured when processing began.
287    pub start_time: std::time::Instant,
288    /// Timestamp captured when all processing finished, if finished.
289    pub finish_time: Option<std::time::Instant>,
290    /// Slot range currently being processed (half-open [start, end)).
291    pub slot_range: Range<u64>,
292    /// Aggregate slots processed across all threads.
293    pub slots_processed: u64,
294    /// Aggregate blocks processed across all threads.
295    pub blocks_processed: u64,
296    /// Aggregate skipped slots across all threads.
297    pub leader_skipped_slots: u64,
298    /// Aggregate transactions processed across all threads.
299    pub transactions_processed: u64,
300    /// Aggregate entries processed across all threads.
301    pub entries_processed: u64,
302    /// Aggregate rewards processed across all threads.
303    pub rewards_processed: u64,
304    /// Transactions processed since the previous stats pulse.
305    pub transactions_since_last_pulse: u64,
306    /// Blocks processed since the previous stats pulse.
307    pub blocks_since_last_pulse: u64,
308    /// Slots processed since the previous stats pulse.
309    pub slots_since_last_pulse: u64,
310    /// Elapsed time since the previous stats pulse.
311    pub time_since_last_pulse: std::time::Duration,
312}
313
314/// Configuration for periodic stats emission via a [`Handler`] callback.
315#[derive(Clone, PartialEq, Eq, Hash, Debug)]
316pub struct StatsTracking<OnStats: Handler<Stats>> {
317    /// Callback invoked whenever new stats are available.
318    pub on_stats: OnStats,
319    /// Emits a stats callback when the current slot is a multiple of this interval.
320    pub tracking_interval_slots: u64,
321}
322
323#[inline(always)]
324#[allow(clippy::too_many_arguments)]
325async fn maybe_emit_stats<OnStats: Handler<Stats>>(
326    stats_tracking: Option<&StatsTracking<OnStats>>,
327    thread_index: usize,
328    thread_stats: &ThreadStats,
329    overall_slots_processed: &AtomicU64,
330    overall_blocks_processed: &AtomicU64,
331    overall_transactions_processed: &AtomicU64,
332    overall_entries_processed: &AtomicU64,
333    transactions_since_stats: &AtomicU64,
334    blocks_since_stats: &AtomicU64,
335    slots_since_stats: &AtomicU64,
336    last_pulse: &Arc<AtomicU64>,
337    base_instant: std::time::Instant,
338) -> Result<(), (FirehoseError, u64)> {
339    if let Some(stats_tracker) = stats_tracking {
340        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
341        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
342        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
343        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
344        let now_nanos = base_instant.elapsed().as_nanos() as u64;
345        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
346        let delta_nanos = now_nanos.saturating_sub(previous);
347        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
348        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
349        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
350        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
351
352        let stats = Stats {
353            thread_stats: thread_stats.clone(),
354            start_time: thread_stats.start_time,
355            finish_time: thread_stats.finish_time,
356            slot_range: thread_stats.slot_range.clone(),
357            slots_processed: total_slots,
358            blocks_processed: total_blocks,
359            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
360            transactions_processed: total_transactions,
361            entries_processed: total_entries,
362            rewards_processed: thread_stats.rewards_processed,
363            transactions_since_last_pulse: processed_transactions,
364            blocks_since_last_pulse: processed_blocks,
365            slots_since_last_pulse: processed_slots,
366            time_since_last_pulse,
367        };
368
369        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
370            last_pulse.store(previous, Ordering::Relaxed);
371            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
372            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
373            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
374            return Err((
375                FirehoseError::OnStatsHandlerError(e),
376                thread_stats.current_slot,
377            ));
378        }
379    }
380    Ok(())
381}
382
383#[inline(always)]
384fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
385    if tracking_enabled {
386        atomic.fetch_add(value, Ordering::Relaxed);
387    }
388}
389
390fn clear_pending_skip(
391    map: &DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
392    thread_id: usize,
393    slot: u64,
394) -> bool {
395    map.get(&thread_id)
396        .map(|set| set.remove(&slot).is_some())
397        .unwrap_or(false)
398}
399
400fn decode_transaction_status_meta_from_frame(
401    slot: u64,
402    reassembled_metadata: Vec<u8>,
403) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
404    if reassembled_metadata.is_empty() {
405        // Early epochs often omit metadata entirely.
406        return Ok(solana_transaction_status::TransactionStatusMeta::default());
407    }
408
409    match utils::decompress_zstd(reassembled_metadata.as_slice()) {
410        Ok(decompressed) => {
411            decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
412                Box::new(std::io::Error::other(format!(
413                    "decode transaction metadata (slot {slot}): {err}"
414                ))) as SharedError
415            })
416        }
417        Err(decomp_err) => {
418            // If the frame was not zstd-compressed (common for very early data), try to
419            // decode the raw bytes directly before bailing.
420            decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
421                Box::new(std::io::Error::other(format!(
422                    "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
423                ))) as SharedError
424            })
425        }
426    }
427}
428
429#[derive(Debug, Default)]
430struct DecodedRewards {
431    keyed_rewards: Vec<(Address, RewardInfo)>,
432    num_partitions: Option<u64>,
433}
434
435impl DecodedRewards {
436    fn empty() -> Self {
437        Self {
438            keyed_rewards: Vec::new(),
439            num_partitions: None,
440        }
441    }
442}
443
444fn decode_rewards_from_frame(
445    slot: u64,
446    reassembled_rewards: Vec<u8>,
447) -> Result<DecodedRewards, SharedError> {
448    if reassembled_rewards.is_empty() {
449        // Early epochs sometimes omit rewards payloads entirely.
450        return Ok(DecodedRewards::empty());
451    }
452
453    match utils::decompress_zstd(reassembled_rewards.as_slice()) {
454        Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
455            |err| {
456                Box::new(std::io::Error::other(format!(
457                    "decode rewards (slot {slot}): {err}"
458                ))) as SharedError
459            },
460        ),
461        Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
462            |err| {
463                Box::new(std::io::Error::other(format!(
464                    "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
465                ))) as SharedError
466            },
467        ),
468    }
469}
470
471fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
472    let epoch = slot_to_epoch(slot);
473    let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
474        prost_011::Message::decode(bytes);
475    match proto_attempt {
476        Ok(proto) => {
477            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
478            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
479                Box::new(std::io::Error::other(format!(
480                    "convert rewards proto failed (epoch {epoch}): {err}"
481                ))) as SharedError
482            })?;
483            Ok(DecodedRewards {
484                keyed_rewards,
485                num_partitions,
486            })
487        }
488        Err(proto_err) => {
489            let stored: solana_storage_proto::StoredExtendedRewards =
490                bincode::deserialize(bytes).map_err(|bin_err| {
491                    Box::new(std::io::Error::other(format!(
492                        "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
493                    ))) as SharedError
494                })?;
495            let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
496            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
497            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
498                Box::new(std::io::Error::other(format!(
499                    "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
500                ))) as SharedError
501            })?;
502            Ok(DecodedRewards {
503                keyed_rewards,
504                num_partitions,
505            })
506        }
507    }
508}
509
510fn decode_transaction_status_meta(
511    slot: u64,
512    metadata_bytes: &[u8],
513) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
514    let epoch = slot_to_epoch(slot);
515    let mut bincode_err: Option<String> = None;
516    if epoch < BINCODE_EPOCH_CUTOFF {
517        match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
518            metadata_bytes,
519        ) {
520            Ok(stored) => return Ok(stored.into()),
521            Err(err) => {
522                bincode_err = Some(err.to_string());
523            }
524        }
525    }
526
527    let bin_err_for_proto = bincode_err.clone();
528    let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
529        prost_011::Message::decode(metadata_bytes).map_err(|err| {
530            // If we already tried bincode, surface both failures for easier debugging.
531            if let Some(ref bin_err) = bin_err_for_proto {
532                Box::new(std::io::Error::other(format!(
533                    "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
534                ))) as SharedError
535            } else {
536                Box::new(std::io::Error::other(format!(
537                    "protobuf decode transaction metadata: {err}"
538                ))) as SharedError
539            }
540        })?;
541
542    proto.try_into().map_err(|err| {
543        if let Some(ref bin_err) = bincode_err {
544            Box::new(std::io::Error::other(format!(
545                "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
546            ))) as SharedError
547        } else {
548            Box::new(std::io::Error::other(format!(
549                "convert transaction metadata proto: {err}"
550            ))) as SharedError
551        }
552    })
553}
554
555#[cfg(test)]
556mod metadata_decode_tests {
557    use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
558    use solana_message::v0::LoadedAddresses;
559    use solana_storage_proto::StoredTransactionStatusMeta;
560    use solana_transaction_status::TransactionStatusMeta;
561
562    fn sample_meta() -> TransactionStatusMeta {
563        let mut meta = TransactionStatusMeta::default();
564        meta.fee = 42;
565        meta.pre_balances = vec![1, 2];
566        meta.post_balances = vec![3, 4];
567        meta.log_messages = Some(vec!["hello".into()]);
568        meta.pre_token_balances = Some(Vec::new());
569        meta.post_token_balances = Some(Vec::new());
570        meta.rewards = Some(Vec::new());
571        meta.compute_units_consumed = Some(7);
572        meta.cost_units = Some(9);
573        meta.loaded_addresses = LoadedAddresses::default();
574        meta
575    }
576
577    #[test]
578    fn decodes_bincode_metadata_for_early_epochs() {
579        let stored = StoredTransactionStatusMeta {
580            status: Ok(()),
581            fee: 42,
582            pre_balances: vec![1, 2],
583            post_balances: vec![3, 4],
584            inner_instructions: None,
585            log_messages: Some(vec!["hello".into()]),
586            pre_token_balances: Some(Vec::new()),
587            post_token_balances: Some(Vec::new()),
588            rewards: Some(Vec::new()),
589            return_data: None,
590            compute_units_consumed: Some(7),
591            cost_units: Some(9),
592        };
593        let bytes = bincode::serialize(&stored).expect("bincode serialize");
594        let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
595        assert_eq!(decoded, TransactionStatusMeta::from(stored));
596    }
597
598    #[test]
599    fn decodes_protobuf_metadata_for_later_epochs() {
600        let meta = sample_meta();
601        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
602            meta.clone().into();
603        let bytes = prost_011::Message::encode_to_vec(&generated);
604        let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
605        assert_eq!(decoded, meta);
606    }
607
608    #[test]
609    fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
610        let meta = sample_meta();
611        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
612            meta.clone().into();
613        let bytes = prost_011::Message::encode_to_vec(&generated);
614        // Epoch 100 should try bincode first; if those bytes are proto, we must fall back.
615        let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
616        assert_eq!(decoded, meta);
617    }
618
619    #[test]
620    fn empty_frame_decodes_to_default() {
621        let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
622        assert_eq!(decoded, TransactionStatusMeta::default());
623    }
624
625    #[test]
626    fn raw_bincode_frame_without_zstd_still_decodes() {
627        let stored = StoredTransactionStatusMeta {
628            status: Ok(()),
629            fee: 1,
630            pre_balances: vec![],
631            post_balances: vec![],
632            inner_instructions: None,
633            log_messages: None,
634            pre_token_balances: Some(Vec::new()),
635            post_token_balances: Some(Vec::new()),
636            rewards: Some(Vec::new()),
637            return_data: None,
638            compute_units_consumed: None,
639            cost_units: None,
640        };
641        let raw_bytes = bincode::serialize(&stored).expect("serialize");
642        let decoded =
643            decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
644        assert_eq!(decoded, TransactionStatusMeta::from(stored));
645    }
646}
647
648#[cfg(test)]
649mod rewards_decode_tests {
650    use super::decode_rewards_from_bytes;
651    use solana_sdk_ids::vote::id as vote_program_id;
652    use solana_storage_proto::StoredExtendedRewards;
653    use solana_transaction_status::{Reward, RewardType};
654
655    #[test]
656    fn decodes_protobuf_rewards() {
657        let pubkey = vote_program_id().to_string();
658        let proto = solana_storage_proto::convert::generated::Rewards {
659            rewards: vec![solana_storage_proto::convert::generated::Reward {
660                pubkey,
661                lamports: 5,
662                post_balance: 10,
663                reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
664                commission: "1".to_string(),
665            }],
666            num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
667                num_partitions: 2,
668            }),
669        };
670        let bytes = prost_011::Message::encode_to_vec(&proto);
671        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
672        assert_eq!(decoded.keyed_rewards.len(), 1);
673        assert_eq!(decoded.num_partitions, Some(2));
674    }
675
676    #[test]
677    fn decodes_bincode_rewards() {
678        let pubkey = vote_program_id().to_string();
679        let reward = Reward {
680            pubkey,
681            lamports: 7,
682            post_balance: 9,
683            reward_type: Some(RewardType::Rent),
684            commission: Some(3),
685        };
686        let stored_rewards: StoredExtendedRewards = vec![reward.into()];
687        let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
688        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
689        assert_eq!(decoded.keyed_rewards.len(), 1);
690        assert_eq!(decoded.num_partitions, None);
691    }
692}
693
694/// Firehose transaction payload passed to [`Handler`] callbacks.
695#[derive(Debug, Clone)]
696pub struct TransactionData {
697    /// Slot that contains the transaction.
698    pub slot: u64,
699    /// Index of the transaction within the slot.
700    pub transaction_slot_index: usize,
701    /// Transaction signature.
702    pub signature: solana_signature::Signature,
703    /// Hash of the transaction message.
704    pub message_hash: Hash,
705    /// Indicates whether the transaction is a vote.
706    pub is_vote: bool,
707    /// Status metadata returned by the Solana runtime.
708    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
709    /// Fully decoded transaction.
710    pub transaction: VersionedTransaction,
711}
712
713/// Block entry metadata passed to [`Handler`] callbacks.
714#[derive(Debug, Clone)]
715pub struct EntryData {
716    /// Slot that generated the entry.
717    pub slot: u64,
718    /// Index of the entry within the slot.
719    pub entry_index: usize,
720    /// Range of transaction indexes covered by the entry.
721    pub transaction_indexes: Range<usize>,
722    /// Number of hashes associated with the entry.
723    pub num_hashes: u64,
724    /// Entry hash.
725    pub hash: Hash,
726}
727
728/// Reward data conveyed to reward [`Handler`] callbacks.
729#[derive(Debug, Clone)]
730pub struct RewardsData {
731    /// Slot the rewards correspond to.
732    pub slot: u64,
733    /// Reward recipients and their associated reward information.
734    pub rewards: Vec<(Address, RewardInfo)>,
735}
736
737/// Block-level data streamed to block handlers.
738#[derive(Debug)]
739pub enum BlockData {
740    /// Fully populated block payload with ledger metadata.
741    Block {
742        /// Parent slot number.
743        parent_slot: u64,
744        /// Parent block hash.
745        parent_blockhash: Hash,
746        /// Current block slot.
747        slot: u64,
748        /// Current block hash.
749        blockhash: Hash,
750        /// Rewards keyed by account and partition information.
751        rewards: KeyedRewardsAndNumPartitions,
752        /// Optional Unix timestamp for the block.
753        block_time: Option<i64>,
754        /// Optional ledger block height.
755        block_height: Option<u64>,
756        /// Number of executed transactions in the block.
757        executed_transaction_count: u64,
758        /// Number of entries contained in the block.
759        entry_count: u64,
760    },
761    /// Marker indicating the slot appears skipped (either truly skipped or it is late and will
762    /// arrive out of order).
763    PossibleLeaderSkipped {
764        /// Slot number that either lacked a block or may still arrive later.
765        slot: u64,
766    },
767}
768
769impl BlockData {
770    /// Returns the slot associated with this block or skipped slot.
771    #[inline(always)]
772    pub const fn slot(&self) -> u64 {
773        match self {
774            BlockData::Block { slot, .. } => *slot,
775            BlockData::PossibleLeaderSkipped { slot } => *slot,
776        }
777    }
778
779    /// Returns `true` if this record currently represents a missing/possibly skipped slot.
780    #[inline(always)]
781    pub const fn was_skipped(&self) -> bool {
782        matches!(self, BlockData::PossibleLeaderSkipped { .. })
783    }
784
785    /// Returns the optional block time when available.
786    #[inline(always)]
787    pub const fn block_time(&self) -> Option<i64> {
788        match self {
789            BlockData::Block { block_time, .. } => *block_time,
790            BlockData::PossibleLeaderSkipped { .. } => None,
791        }
792    }
793}
794
795type HandlerResult = Result<(), SharedError>;
796type HandlerFuture = BoxFuture<'static, HandlerResult>;
797
798/// Asynchronous callback invoked for each firehose event of type `Data`.
799pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
800
801impl<Data, F> Handler<Data> for F where
802    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
803{
804}
805
806/// Function pointer alias for [`Handler`] callbacks.
807pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
808/// Convenience alias for block handlers accepted by [`firehose`].
809pub type OnBlockFn = HandlerFn<BlockData>;
810/// Convenience alias for transaction handlers accepted by [`firehose`].
811pub type OnTxFn = HandlerFn<TransactionData>;
812/// Convenience alias for entry handlers accepted by [`firehose`].
813pub type OnEntryFn = HandlerFn<EntryData>;
814/// Convenience alias for reward handlers accepted by [`firehose`].
815pub type OnRewardFn = HandlerFn<RewardsData>;
816/// Type alias for [`StatsTracking`] using simple function pointers.
817pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
818/// Convenience alias for firehose error handlers.
819pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
820/// Convenience alias for stats tracking handlers accepted by [`firehose`].
821pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
822
823/// Metadata describing a firehose worker failure.
824#[derive(Clone, Debug)]
825pub struct FirehoseErrorContext {
826    /// Thread index that encountered the error.
827    pub thread_id: usize,
828    /// Slot the worker was processing when the error surfaced.
829    pub slot: u64,
830    /// Epoch derived from the failing slot.
831    pub epoch: u64,
832    /// Stringified error payload for display/logging.
833    pub error_message: String,
834}
835
836/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
837///
838/// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the
839/// runner restarts from the last processed slot to maintain coverage.
840///
841/// When `sequential` is `true`, the firehose uses one worker thread and opens epoch streams
842/// with ripget's parallel windowed downloader. In this mode `threads` configures ripget range
843/// concurrency rather than firehose worker partitioning.
844///
845/// `buffer_window_bytes` controls the ripget hot/cold window when `sequential` is enabled.
846/// Pass `None` to use the default (`min(4 GiB, 15% of available RAM)`).
847#[inline]
848#[allow(clippy::too_many_arguments)]
849pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
850    threads: u64,
851    sequential: bool,
852    buffer_window_bytes: Option<u64>,
853    slot_range: Range<u64>,
854    on_block: Option<OnBlock>,
855    on_tx: Option<OnTransaction>,
856    on_entry: Option<OnEntry>,
857    on_rewards: Option<OnRewards>,
858    on_error: Option<OnError>,
859    stats_tracking: Option<StatsTracking<OnStats>>,
860    shutdown_signal: Option<broadcast::Receiver<()>>,
861) -> Result<(), (FirehoseError, u64)>
862where
863    OnBlock: Handler<BlockData>,
864    OnTransaction: Handler<TransactionData>,
865    OnEntry: Handler<EntryData>,
866    OnRewards: Handler<RewardsData>,
867    OnStats: Handler<Stats>,
868    OnError: Handler<FirehoseErrorContext>,
869{
870    if threads == 0 {
871        return Err((
872            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
873            slot_range.start,
874        ));
875    }
876    let client = crate::network::create_http_client();
877    log::info!(target: LOG_MODULE, "starting firehose...");
878    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
879    let firehose_threads = if sequential { 1 } else { threads };
880    let sequential_download_threads = std::cmp::max(1, threads as usize);
881    let sequential_buffer_window_bytes = buffer_window_bytes
882        .filter(|value| *value >= 2)
883        .unwrap_or_else(crate::system::default_firehose_buffer_window_bytes);
884    if sequential {
885        log::info!(
886            target: LOG_MODULE,
887            "sequential mode enabled: firehose_threads=1, ripget_threads={}, ripget_window={}",
888            sequential_download_threads,
889            crate::system::format_byte_size(sequential_buffer_window_bytes)
890        );
891    }
892
893    let slot_range = Arc::new(slot_range);
894
895    // divide slot_range into n subranges
896    let subranges = generate_subranges(&slot_range, firehose_threads);
897    if firehose_threads > 1 {
898        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
899    }
900
901    let firehose_start = std::time::Instant::now();
902    let shutdown_flag = Arc::new(AtomicBool::new(false));
903    if let Some(ref rx) = shutdown_signal {
904        let mut rx = rx.resubscribe();
905        let flag = shutdown_flag.clone();
906        tokio::spawn(async move {
907            if rx.recv().await.is_ok() {
908                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
909                flag.store(true, Ordering::SeqCst);
910            }
911        });
912    }
913
914    // Build a shared ripget HTTP client so TCP connections survive across epoch transitions.
915    let shared_ripget_client: Option<ripget::Client> = if sequential {
916        Some(
917            ripget::build_client(Some(&format!(
918                "jetstreamer-firehose/{}",
919                env!("CARGO_PKG_VERSION")
920            )))
921            .expect("failed to build ripget HTTP client"),
922        )
923    } else {
924        None
925    };
926
927    let mut handles = Vec::new();
928    // Shared per-thread error counters
929    let error_counts: Arc<Vec<AtomicU32>> =
930        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
931
932    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
933    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
934    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
935    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
936    let pending_skipped_slots: Arc<
937        DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
938    > = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
939
940    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
941        let error_counts = error_counts.clone();
942        let client = client.clone();
943        let on_block = on_block.clone();
944        let on_tx = on_tx.clone();
945        let on_entry = on_entry.clone();
946        let on_reward = on_rewards.clone();
947        let on_error = on_error.clone();
948        let overall_slots_processed = overall_slots_processed.clone();
949        let overall_blocks_processed = overall_blocks_processed.clone();
950        let overall_transactions_processed = overall_transactions_processed.clone();
951        let overall_entries_processed = overall_entries_processed.clone();
952        let stats_tracking = stats_tracking.clone();
953        let transactions_since_stats = Arc::new(AtomicU64::new(0));
954        let blocks_since_stats = Arc::new(AtomicU64::new(0));
955        let slots_since_stats = Arc::new(AtomicU64::new(0));
956        let last_pulse = Arc::new(AtomicU64::new(0));
957        let transactions_since_stats_cloned = transactions_since_stats.clone();
958        let blocks_since_stats_cloned = blocks_since_stats.clone();
959        let slots_since_stats_cloned = slots_since_stats.clone();
960        let last_pulse_cloned = last_pulse.clone();
961        let shutdown_flag = shutdown_flag.clone();
962        let pending_skipped_slots = pending_skipped_slots.clone();
963        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
964        let sequential_mode = sequential;
965        let ripget_threads = sequential_download_threads;
966        let ripget_buffer_window_bytes = sequential_buffer_window_bytes;
967        let ripget_client = shared_ripget_client.clone();
968
969        let handle = tokio::spawn(async move {
970            let transactions_since_stats = transactions_since_stats_cloned;
971            let blocks_since_stats = blocks_since_stats_cloned;
972            let slots_since_stats = slots_since_stats_cloned;
973            let last_pulse = last_pulse_cloned;
974            let mut shutdown_rx = thread_shutdown_rx;
975            let start_time = firehose_start;
976            last_pulse.store(
977                firehose_start.elapsed().as_nanos() as u64,
978                Ordering::Relaxed,
979            );
980            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
981            let mut skip_until_index = None;
982            let last_emitted_slot = slot_range.start.saturating_sub(1);
983            let block_enabled = on_block.is_some();
984            let tx_enabled = on_tx.is_some();
985            let entry_enabled = on_entry.is_some();
986            let reward_enabled = on_reward.is_some();
987            let tracking_enabled = stats_tracking.is_some();
988            if block_enabled {
989                pending_skipped_slots
990                    .entry(thread_index)
991                    .or_insert_with(|| DashSet::with_hasher(ahash::RandomState::new()));
992            }
993            let mut last_counted_slot = slot_range.start.saturating_sub(1);
994            let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
995            let mut thread_stats = if tracking_enabled {
996                Some(ThreadStats {
997                    thread_id: thread_index,
998                    start_time,
999                    finish_time: None,
1000                    slot_range: slot_range.clone(),
1001                    initial_slot_range: slot_range.clone(),
1002                    current_slot: slot_range.start,
1003                    slots_processed: 0,
1004                    blocks_processed: 0,
1005                    leader_skipped_slots: 0,
1006                    transactions_processed: 0,
1007                    entries_processed: 0,
1008                    rewards_processed: 0,
1009                })
1010            } else {
1011                None
1012            };
1013
1014            // let mut triggered = false;
1015            while let Err((err, slot)) = async {
1016                let mut last_emitted_slot = last_emitted_slot_global;
1017                let op_timeout = if sequential_mode {
1018                    OP_TIMEOUT_SEQUENTIAL
1019                } else {
1020                    OP_TIMEOUT
1021                };
1022                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1023                    log::info!(
1024                        target: &log_target,
1025                        "shutdown requested; terminating firehose thread {}",
1026                        thread_index
1027                    );
1028                    return Ok(());
1029                }
1030                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1031                log::info!(
1032                    target: &log_target,
1033                    "slot range: {} (epoch {}) ... {} (epoch {})",
1034                    slot_range.start,
1035                    slot_to_epoch(slot_range.start),
1036                    slot_range.end,
1037                    slot_to_epoch(slot_range.end)
1038                );
1039
1040                log::info!(target: &log_target, "🚒 starting firehose...");
1041
1042                // for each epoch
1043                let mut current_slot: Option<u64> = None;
1044                for epoch_num in epoch_range.clone() {
1045                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1046                        log::info!(
1047                            target: &log_target,
1048                            "shutdown requested; terminating firehose thread {}",
1049                            thread_index
1050                        );
1051                        return Ok(());
1052                    }
1053                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
1054                    let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1055                    let local_start = std::cmp::max(slot_range.start, epoch_start);
1056                    let local_end_inclusive =
1057                        std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1058                    if local_start > local_end_inclusive {
1059                        log::debug!(
1060                            target: &log_target,
1061                            "epoch {} has no overlap with thread range ({}..{}), skipping",
1062                            epoch_num,
1063                            slot_range.start,
1064                            slot_range.end
1065                        );
1066                        continue;
1067                    }
1068                    let use_sequential_stream = sequential_mode && local_start == epoch_start;
1069                    let stream = match timeout(op_timeout, async {
1070                        if use_sequential_stream {
1071                            fetch_epoch_stream_with_options(
1072                                epoch_num,
1073                                &client,
1074                                Some(FetchEpochStreamOptions {
1075                                    sequential: true,
1076                                    ripget_threads,
1077                                    buffer_window_bytes: ripget_buffer_window_bytes,
1078                                    ripget_client: ripget_client.clone(),
1079                                }),
1080                            )
1081                            .await
1082                        } else {
1083                            fetch_epoch_stream(epoch_num, &client).await
1084                        }
1085                    })
1086                    .await
1087                    {
1088                        Ok(stream) => stream,
1089                        Err(_) => {
1090                            return Err((
1091                                FirehoseError::OperationTimeout("fetch_epoch_stream"),
1092                                current_slot.unwrap_or(slot_range.start),
1093                            ));
1094                        }
1095                    };
1096                    let mut reader = NodeReader::new(stream);
1097
1098                    let header_fut = reader.read_raw_header();
1099                    let header = match timeout(op_timeout, header_fut).await {
1100                        Ok(res) => res
1101                            .map_err(FirehoseError::ReadHeader)
1102                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1103                        Err(_) => {
1104                            return Err((
1105                                FirehoseError::OperationTimeout("read_raw_header"),
1106                                current_slot.unwrap_or(slot_range.start),
1107                            ));
1108                        }
1109                    };
1110                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1111
1112                    let mut previous_blockhash = Hash::default();
1113                    let mut latest_entry_blockhash = Hash::default();
1114                    // Reset counters to align to the local epoch slice; prevents boundary slots
1115                    // from being treated as already-counted after a restart.
1116                    last_counted_slot = local_start.saturating_sub(1);
1117                    current_slot = None;
1118                    if tracking_enabled
1119                        && let Some(ref mut stats) = thread_stats {
1120                            stats.current_slot = local_start;
1121                            stats.slot_range.start = local_start;
1122                        }
1123
1124                    if local_start > epoch_start {
1125                        // Seek to the nearest previous indexed slot so the stream includes all
1126                        // nodes (transactions, entries, rewards) that precede `local_start`.
1127                        let seek_slot = match timeout(
1128                            OP_TIMEOUT,
1129                            find_previous_indexed_slot(local_start, epoch_start, &log_target),
1130                        )
1131                        .await
1132                        {
1133                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1134                            Err(_) => {
1135                                return Err((
1136                                    FirehoseError::OperationTimeout(
1137                                        "seek_to_previous_indexed_slot",
1138                                    ),
1139                                    current_slot.unwrap_or(slot_range.start),
1140                                ));
1141                            }
1142                        };
1143                        if let Some(seek_slot) = seek_slot {
1144                            let seek_fut = reader.seek_to_slot(seek_slot);
1145                            match timeout(op_timeout, seek_fut).await {
1146                                Ok(res) => {
1147                                    res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1148                                }
1149                                Err(_) => {
1150                                    return Err((
1151                                        FirehoseError::OperationTimeout("seek_to_slot"),
1152                                        current_slot.unwrap_or(slot_range.start),
1153                                    ));
1154                                }
1155                            }
1156                        }
1157                    }
1158
1159                    // for each item in each block
1160                    let mut item_index = 0;
1161                    let mut displayed_skip_message = false;
1162                    loop {
1163                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1164                            log::info!(
1165                                target: &log_target,
1166                                "shutdown requested; terminating firehose thread {}",
1167                                thread_index
1168                            );
1169                            return Ok(());
1170                        }
1171                        let read_fut = reader.read_until_block();
1172                        let nodes = match timeout(op_timeout, read_fut).await {
1173                            Ok(result) => result
1174                                .map_err(FirehoseError::ReadUntilBlockError)
1175                                .map_err(|e| {
1176                                    (
1177                                        e,
1178                                        current_slot
1179                                            .map(|slot| slot.saturating_add(1))
1180                                            .unwrap_or(slot_range.start),
1181                                    )
1182                                })?,
1183                            Err(_) => {
1184                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1185                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1186                            }
1187                        };
1188                        if nodes.is_empty() {
1189                            log::info!(
1190                                target: &log_target,
1191                                "reached end of epoch {}",
1192                                epoch_num
1193                            );
1194                            break;
1195                        }
1196                        if let Some(last_node) = nodes.0.last()
1197                            && !last_node.get_node().is_block()
1198                        {
1199                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1200                            break;
1201                        }
1202                        let block = nodes
1203                            .get_block()
1204                            .map_err(FirehoseError::GetBlockError)
1205                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1206                        log::debug!(
1207                            target: &log_target,
1208                            "read {} items from epoch {}, now at slot {}",
1209                            item_index,
1210                            epoch_num,
1211                            block.slot
1212                        );
1213                        let slot = block.slot;
1214                        if slot > local_end_inclusive {
1215                            log::debug!(
1216                                target: &log_target,
1217                                "reached end of local slice at slot {} (epoch {}), stopping",
1218                                slot,
1219                                epoch_num
1220                            );
1221                            break;
1222                        }
1223                        if slot >= slot_range.end {
1224                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1225                            // Return early to terminate the firehose thread cleanly. We use >=
1226                            // because slot_range is half-open [start, end), so any slot equal
1227                            // to end is out-of-range and must not be processed. Do not emit
1228                            // synthetic skipped slots here; another thread may own the boundary.
1229                            if block_enabled {
1230                                pending_skipped_slots.remove(&thread_index);
1231                            }
1232                            return Ok(());
1233                        }
1234                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1235                        if slot < slot_range.start {
1236                            if slot.saturating_add(1) == slot_range.start {
1237                                log::debug!(
1238                                    target: &log_target,
1239                                    "priming reader with preceding slot {}, skipping",
1240                                    slot
1241                                );
1242                            } else {
1243                                log::warn!(
1244                                    target: &log_target,
1245                                    "encountered slot {} before start of range {}, skipping",
1246                                    slot,
1247                                    slot_range.start
1248                                );
1249                            }
1250                            continue;
1251                        }
1252                        current_slot = Some(slot);
1253                        let mut entry_index: usize = 0;
1254                        let mut this_block_executed_transaction_count: u64 = 0;
1255                        let mut this_block_entry_count: u64 = 0;
1256                        let mut this_block_rewards = DecodedRewards::empty();
1257
1258                        for node_with_cid in &nodes.0 {
1259                            item_index += 1;
1260                            if let Some(skip) = skip_until_index {
1261                                if item_index < skip {
1262                                    if !displayed_skip_message {
1263                                        log::info!(
1264                                            target: &log_target,
1265                                            "skipping until index {} (at {})",
1266                                            skip,
1267                                            item_index
1268                                        );
1269                                        displayed_skip_message = true;
1270                                    }
1271                                    continue;
1272                                } else {
1273                                    log::info!(
1274                                        target: &log_target,
1275                                        "reached target index {}, resuming...",
1276                                        skip
1277                                    );
1278                                    skip_until_index = None;
1279                                }
1280                            }
1281                            let node = node_with_cid.get_node();
1282
1283                            if let Some(ref mut stats) = thread_stats {
1284                                stats.current_slot = slot;
1285                            }
1286
1287                            let error_slot = current_slot.unwrap_or(slot_range.start);
1288
1289                            use crate::node::Node::*;
1290                            match node {
1291                                Transaction(tx) => {
1292                                    if tx_enabled
1293                                        && let Some(on_tx_cb) = on_tx.as_ref()
1294                                    {
1295                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1296                                        let versioned_tx = tx.as_parsed().map_err(|err| {
1297                                            (
1298                                                FirehoseError::NodeDecodingError(item_index, err),
1299                                                error_slot,
1300                                            )
1301                                        })?;
1302                                        let reassembled_metadata = nodes
1303                                            .reassemble_dataframes(&tx.metadata)
1304                                            .map_err(|err| {
1305                                                (
1306                                                    FirehoseError::NodeDecodingError(item_index, err),
1307                                                    error_slot,
1308                                                )
1309                                            })?;
1310
1311                                        let as_native_metadata = decode_transaction_status_meta_from_frame(
1312                                            block.slot,
1313                                            reassembled_metadata,
1314                                        )
1315                                        .map_err(|err| {
1316                                            (
1317                                                FirehoseError::NodeDecodingError(item_index, err),
1318                                                error_slot,
1319                                            )
1320                                        })?;
1321
1322                                        let message_hash = {
1323                                            #[cfg(feature = "verify-transaction-signatures")]
1324                                            {
1325                                                versioned_tx.verify_and_hash_message().map_err(|err| {
1326                                                    (
1327                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
1328                                                        error_slot,
1329                                                    )
1330                                                })?
1331                                            }
1332                                            #[cfg(not(feature = "verify-transaction-signatures"))]
1333                                            {
1334                                                versioned_tx.message.hash()
1335                                            }
1336                                        };
1337                                        let signature = versioned_tx
1338                                            .signatures
1339                                            .first()
1340                                            .ok_or_else(|| {
1341                                                Box::new(std::io::Error::new(
1342                                                    std::io::ErrorKind::InvalidData,
1343                                                    "transaction missing signature",
1344                                                )) as SharedError
1345                                            })
1346                                            .map_err(|err| {
1347                                                (
1348                                                    FirehoseError::NodeDecodingError(
1349                                                        item_index,
1350                                                        err,
1351                                                    ),
1352                                                    error_slot,
1353                                                )
1354                                            })?;
1355                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
1356
1357                                        on_tx_cb(
1358                                            thread_index,
1359                                            TransactionData {
1360                                                slot: block.slot,
1361                                                transaction_slot_index: tx.index.unwrap() as usize,
1362                                                signature: *signature,
1363                                                message_hash,
1364                                                is_vote,
1365                                                transaction_status_meta: as_native_metadata,
1366                                                transaction: versioned_tx,
1367                                            },
1368                                        )
1369                                        .await
1370                                        .map_err(|e| {
1371                                            (
1372                                                FirehoseError::TransactionHandlerError(e),
1373                                                error_slot,
1374                                            )
1375                                        })?;
1376                                    }
1377                                    fetch_add_if(
1378                                        tracking_enabled,
1379                                        &overall_transactions_processed,
1380                                        1,
1381                                    );
1382                                    if let Some(ref mut stats) = thread_stats {
1383                                        stats.transactions_processed += 1;
1384                                    }
1385                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1386                                }
1387                                Entry(entry) => {
1388                                    let entry_hash = Hash::from(entry.hash.to_bytes());
1389                                    let entry_transaction_count = entry.transactions.len();
1390                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
1391                                    let starting_transaction_index_u64 =
1392                                        this_block_executed_transaction_count;
1393                                    latest_entry_blockhash = entry_hash;
1394                                    this_block_executed_transaction_count += entry_transaction_count_u64;
1395                                    this_block_entry_count += 1;
1396
1397                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1398                                        let starting_transaction_index = usize::try_from(
1399                                            starting_transaction_index_u64,
1400                                        )
1401                                        .map_err(|err| {
1402                                            (
1403                                                FirehoseError::EntryHandlerError(Box::new(err)),
1404                                                error_slot,
1405                                            )
1406                                        })?;
1407                                        let transaction_indexes_end =
1408                                            starting_transaction_index + entry_transaction_count;
1409                                        on_entry_cb(
1410                                            thread_index,
1411                                            EntryData {
1412                                                slot: block.slot,
1413                                                entry_index,
1414                                                transaction_indexes: starting_transaction_index
1415                                                    ..transaction_indexes_end,
1416                                                num_hashes: entry.num_hashes,
1417                                                hash: entry_hash,
1418                                            },
1419                                        )
1420                                        .await
1421                                        .map_err(|e| {
1422                                            (
1423                                                FirehoseError::EntryHandlerError(e),
1424                                                error_slot,
1425                                            )
1426                                        })?;
1427                                    }
1428                                    entry_index += 1;
1429                                    fetch_add_if(
1430                                        tracking_enabled,
1431                                        &overall_entries_processed,
1432                                        1,
1433                                    );
1434                                    if let Some(ref mut stats) = thread_stats {
1435                                        stats.entries_processed += 1;
1436                                    }
1437                                }
1438                                Block(block) => {
1439                                    let prev_last_counted_slot = last_counted_slot;
1440                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1441                                        (
1442                                            stats.slots_processed,
1443                                            stats.blocks_processed,
1444                                            stats.leader_skipped_slots,
1445                                            stats.current_slot,
1446                                        )
1447                                    });
1448
1449                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1450                                    let skip_start_from_previous = last_counted_slot.saturating_add(1);
1451                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
1452
1453                                    let skipped_epoch = slot_to_epoch(last_counted_slot);
1454                                    for skipped_slot in skip_start..slot {
1455                                        if slot_to_epoch(skipped_slot) != skipped_epoch {
1456                                            break;
1457                                        }
1458                                        log::debug!(
1459                                            target: &log_target,
1460                                            "leader skipped slot {} (prev_counted {}, current slot {})",
1461                                            skipped_slot,
1462                                            prev_last_counted_slot,
1463                                            slot,
1464                                        );
1465                                        if block_enabled {
1466                                            pending_skipped_slots
1467                                                .entry(thread_index)
1468                                                .or_default()
1469                                                .insert(skipped_slot);
1470                                        }
1471                                        if block_enabled
1472                                            && let Some(on_block_cb) = on_block.as_ref()
1473                                            && skipped_slot > last_emitted_slot {
1474                                                last_emitted_slot = skipped_slot;
1475                                                on_block_cb(
1476                                                    thread_index,
1477                                                    BlockData::PossibleLeaderSkipped {
1478                                                        slot: skipped_slot,
1479                                                    },
1480                                                )
1481                                                .await
1482                                                .map_err(|e| {
1483                                                    (
1484                                                        FirehoseError::BlockHandlerError(e),
1485                                                        error_slot,
1486                                                    )
1487                                                })?;
1488                                            }
1489                                        if tracking_enabled {
1490                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1491                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1492                                            if let Some(ref mut stats) = thread_stats {
1493                                                stats.leader_skipped_slots += 1;
1494                                                stats.slots_processed += 1;
1495                                                stats.current_slot = skipped_slot;
1496                                            }
1497                                        }
1498                                        last_counted_slot = skipped_slot;
1499                                    }
1500
1501                                    let cleared_pending_skip = if block_enabled {
1502                                        clear_pending_skip(
1503                                            &pending_skipped_slots,
1504                                            thread_index,
1505                                            slot,
1506                                        )
1507                                    } else {
1508                                        false
1509                                    };
1510
1511                                    if slot <= last_counted_slot && !cleared_pending_skip {
1512                                        log::debug!(
1513                                            target: &log_target,
1514                                            "duplicate block {}, already counted (last_counted={})",
1515                                            slot,
1516                                            last_counted_slot,
1517                                        );
1518                                        this_block_rewards = DecodedRewards::empty();
1519                                        continue;
1520                                    }
1521
1522                                    if block_enabled {
1523                                        if let Some(on_block_cb) = on_block.as_ref() {
1524                                            let DecodedRewards {
1525                                                keyed_rewards,
1526                                                num_partitions,
1527                                            } = std::mem::take(&mut this_block_rewards);
1528                                            if slot > last_emitted_slot {
1529                                                last_emitted_slot = slot;
1530                                                on_block_cb(
1531                                                    thread_index,
1532                                                    BlockData::Block {
1533                                                        parent_slot: block.meta.parent_slot,
1534                                                        parent_blockhash: previous_blockhash,
1535                                                        slot: block.slot,
1536                                                        blockhash: latest_entry_blockhash,
1537                                                        rewards: KeyedRewardsAndNumPartitions {
1538                                                            keyed_rewards,
1539                                                            num_partitions,
1540                                                        },
1541                                                        block_time: Some(block.meta.blocktime as i64),
1542                                                        block_height: block.meta.block_height,
1543                                                        executed_transaction_count:
1544                                                            this_block_executed_transaction_count,
1545                                                        entry_count: this_block_entry_count,
1546                                                    },
1547                                                )
1548                                                .await
1549                                                .map_err(|e| {
1550                                                    (
1551                                                        FirehoseError::BlockHandlerError(e),
1552                                                        error_slot,
1553                                                    )
1554                                                })?;
1555                                            }
1556                                        }
1557                                    } else {
1558                                        this_block_rewards = DecodedRewards::empty();
1559                                    }
1560                                    previous_blockhash = latest_entry_blockhash;
1561
1562                                    if tracking_enabled {
1563                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1564                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1565                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1566                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1567                                        if let Some(ref mut stats) = thread_stats {
1568                                            stats.blocks_processed += 1;
1569                                            stats.slots_processed += 1;
1570                                            stats.current_slot = slot;
1571                                        }
1572
1573                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1574                                            (&stats_tracking, thread_stats.as_mut())
1575                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1576                                                && let Err(err) = maybe_emit_stats(
1577                                                    stats_tracking.as_ref(),
1578                                                    thread_index,
1579                                                    thread_stats_ref,
1580                                                    &overall_slots_processed,
1581                                                    &overall_blocks_processed,
1582                                                    &overall_transactions_processed,
1583                                                    &overall_entries_processed,
1584                                                &transactions_since_stats,
1585                                                &blocks_since_stats,
1586                                                &slots_since_stats,
1587                                                &last_pulse,
1588                                                start_time,
1589                                            )
1590                                            .await
1591                                            {
1592                                                blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1593                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1594                                                    overall_blocks_processed
1595                                                        .fetch_sub(1, Ordering::Relaxed);
1596                                                    overall_slots_processed
1597                                                        .fetch_sub(1, Ordering::Relaxed);
1598                                                    if let Some((
1599                                                        prev_slots_processed,
1600                                                        prev_blocks_processed,
1601                                                        prev_leader_skipped,
1602                                                        prev_current_slot,
1603                                                    )) = thread_stats_snapshot
1604                                                    {
1605                                                        thread_stats_ref.slots_processed =
1606                                                            prev_slots_processed;
1607                                                        thread_stats_ref.blocks_processed =
1608                                                            prev_blocks_processed;
1609                                                        thread_stats_ref.leader_skipped_slots =
1610                                                            prev_leader_skipped;
1611                                                        thread_stats_ref.current_slot =
1612                                                            prev_current_slot;
1613                                                    }
1614                                                    last_counted_slot = prev_last_counted_slot;
1615                                                    return Err(err);
1616                                                }
1617                                    }
1618
1619                                    if slot > last_counted_slot {
1620                                        last_counted_slot = slot;
1621                                    }
1622                                }
1623                                Subset(_subset) => (),
1624                                Epoch(_epoch) => (),
1625                                Rewards(rewards) => {
1626                                    if reward_enabled || block_enabled {
1627                                        let reassembled = nodes
1628                                            .reassemble_dataframes(&rewards.data)
1629                                            .map_err(|err| {
1630                                                (
1631                                                    FirehoseError::NodeDecodingError(item_index, err),
1632                                                    current_slot.unwrap_or(slot_range.start),
1633                                                )
1634                                            })?;
1635                                        if reassembled.is_empty() {
1636                                            this_block_rewards = DecodedRewards::empty();
1637                                            if reward_enabled
1638                                                && let Some(on_reward_cb) = on_reward.as_ref()
1639                                            {
1640                                                on_reward_cb(
1641                                                    thread_index,
1642                                                    RewardsData {
1643                                                        slot: block.slot,
1644                                                        rewards: Vec::new(),
1645                                                    },
1646                                                )
1647                                                .await
1648                                                .map_err(|e| {
1649                                                    (
1650                                                        FirehoseError::RewardHandlerError(e),
1651                                                        error_slot,
1652                                                    )
1653                                                })?;
1654                                            }
1655                                            continue;
1656                                        }
1657
1658                                        let decoded_rewards =
1659                                            decode_rewards_from_frame(block.slot, reassembled)
1660                                                .map_err(|err| {
1661                                                    (
1662                                                        FirehoseError::NodeDecodingError(
1663                                                            item_index,
1664                                                            err,
1665                                                        ),
1666                                                        error_slot,
1667                                                    )
1668                                                })?;
1669                                        if reward_enabled
1670                                            && let Some(on_reward_cb) = on_reward.as_ref()
1671                                        {
1672                                            on_reward_cb(
1673                                                thread_index,
1674                                                RewardsData {
1675                                                    slot: block.slot,
1676                                                    rewards: decoded_rewards.keyed_rewards.clone(),
1677                                                },
1678                                            )
1679                                            .await
1680                                            .map_err(|e| {
1681                                                (
1682                                                    FirehoseError::RewardHandlerError(e),
1683                                                    error_slot,
1684                                                )
1685                                            })?;
1686                                        }
1687                                        this_block_rewards = decoded_rewards;
1688                                        if let Some(ref mut stats) = thread_stats {
1689                                            stats.rewards_processed +=
1690                                                this_block_rewards.keyed_rewards.len() as u64;
1691                                        }
1692                                    }
1693                                }
1694                                DataFrame(_data_frame) => (),
1695                            }
1696                        }
1697                        if block.slot == slot_range.end - 1 {
1698                            let finish_time = std::time::Instant::now();
1699                            let elapsed = finish_time.duration_since(start_time);
1700                            log::info!(target: &log_target, "processed slot {}", block.slot);
1701                            let elapsed_pretty = human_readable_duration(elapsed);
1702                            log::info!(
1703                                target: &log_target,
1704                                "processed {} slots across {} epochs in {}.",
1705                                slot_range.end - slot_range.start,
1706                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1707                                elapsed_pretty
1708                            );
1709                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1710                            // On completion, report threads with non-zero error counts for
1711                            // visibility.
1712                            let summary: String = error_counts
1713                                .iter()
1714                                .enumerate()
1715                                .filter_map(|(i, c)| {
1716                                    let v = c.load(Ordering::Relaxed);
1717                                    if v > 0 {
1718                                        Some(format!("{:03}({})", i, v))
1719                                    } else {
1720                                        None
1721                                    }
1722                                })
1723                                .collect::<Vec<_>>()
1724                                .join(", ");
1725                            if !summary.is_empty() {
1726                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1727                            }
1728                            return Ok(());
1729                        }
1730                    }
1731                    if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1732                        && last_counted_slot < expected_last_slot
1733                    {
1734                        // Do not synthesize skipped slots during final flush; another thread may
1735                        // cover the remaining range (especially across epoch boundaries).
1736                    }
1737                    if let Some(ref mut stats) = thread_stats {
1738                        stats.finish_time = Some(std::time::Instant::now());
1739                        maybe_emit_stats(
1740                            stats_tracking.as_ref(),
1741                            thread_index,
1742                            stats,
1743                            &overall_slots_processed,
1744                            &overall_blocks_processed,
1745                            &overall_transactions_processed,
1746                            &overall_entries_processed,
1747                            &transactions_since_stats,
1748                            &blocks_since_stats,
1749                            &slots_since_stats,
1750                            &last_pulse,
1751                            start_time,
1752                        )
1753                        .await?;
1754                    }
1755                    if block_enabled {
1756                        pending_skipped_slots.remove(&thread_index);
1757                    }
1758                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1759                    }
1760                    Ok(())
1761            }
1762            .await
1763            {
1764                if is_shutdown_error(&err) {
1765                    log::info!(
1766                        target: &log_target,
1767                        "shutdown requested; terminating firehose thread {}",
1768                        thread_index
1769                    );
1770                    break;
1771                }
1772                let epoch = slot_to_epoch(slot);
1773                let item_index = match &err {
1774                    FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1775                    _ => 0,
1776                };
1777                let error_message = err.to_string();
1778                log::error!(
1779                    target: &log_target,
1780                    "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1781                    slot,
1782                    epoch
1783                );
1784                log::error!(target: &log_target, "{}", error_message);
1785                if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1786                    || error_message.contains("Unknown CID version")
1787                {
1788                    // Clear cached index data for this epoch to avoid retrying with a bad/partial index
1789                    // (or a bad seek offset that landed mid-stream).
1790                    SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1791                }
1792                if let Some(on_error_cb) = on_error.clone() {
1793                    let context = FirehoseErrorContext {
1794                        thread_id: thread_index,
1795                        slot,
1796                        epoch,
1797                        error_message: error_message.clone(),
1798                    };
1799                    if let Err(handler_err) = on_error_cb(thread_index, context).await {
1800                        log::error!(
1801                            target: &log_target,
1802                            "on_error handler failed: {}",
1803                            handler_err
1804                        );
1805                    }
1806                }
1807                // Increment this thread's error counter
1808                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1809                log::warn!(
1810                    target: &log_target,
1811                    "restarting from slot {} at index {}",
1812                    slot,
1813                    item_index,
1814                );
1815                // Update slot range to resume from the failed slot, not the original start.
1816                // Reset local tracking so we don't treat the resumed slot range as already counted.
1817                // If we've already counted this slot, resume from the next one to avoid duplicates.
1818                if slot <= last_counted_slot {
1819                    slot_range.start = last_counted_slot.saturating_add(1);
1820                } else {
1821                    slot_range.start = slot;
1822                }
1823                // Reset pulse timer to exclude downtime from next rate calc.
1824                last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1825                if tracking_enabled
1826                    && let Some(ref mut stats_ref) = thread_stats {
1827                        stats_ref.slot_range.start = slot_range.start;
1828                        stats_ref.slot_range.end = slot_range.end;
1829                        // initial_slot_range remains unchanged for progress reporting.
1830                    }
1831                if block_enabled {
1832                    pending_skipped_slots.remove(&thread_index);
1833                }
1834                // `skip_until_index` is unsafe across retries because `item_index`
1835                // is reset to 0 each epoch restart. Keeping it can skip large portions
1836                // of the stream and silently drop slots.
1837                skip_until_index = None;
1838                last_emitted_slot_global = last_emitted_slot;
1839            }
1840        });
1841        handles.push(handle);
1842    }
1843
1844    // Wait for all threads to complete
1845    for handle in handles {
1846        handle.await.unwrap();
1847    }
1848    if stats_tracking.is_some() {
1849        let elapsed = firehose_start.elapsed();
1850        let elapsed_secs = elapsed.as_secs_f64();
1851        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1852        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1853        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1854        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1855        let total_errors: u64 = error_counts
1856            .iter()
1857            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1858            .sum();
1859        let overall_tps = if elapsed_secs > 0.0 {
1860            total_transactions as f64 / elapsed_secs
1861        } else {
1862            0.0
1863        };
1864        log::info!(
1865            target: LOG_MODULE,
1866            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1867            elapsed_secs,
1868            total_slots,
1869            total_blocks,
1870            total_leader_skipped,
1871            total_transactions,
1872            overall_tps,
1873            total_errors
1874        );
1875    }
1876    if shutdown_flag.load(Ordering::SeqCst) {
1877        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1878    } else {
1879        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1880    }
1881    Ok(())
1882}
1883
1884#[allow(clippy::result_large_err)]
1885/// Builds a Geyser-backed firehose and returns a slot notification stream.
1886///
1887/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1888/// rather than relying solely on remote streams. The provided `slot_range` is treated as a
1889/// half-open interval `[start, end)`, and the thread will restart from the last processed
1890/// slot on recoverable errors to maintain coverage.
1891pub fn firehose_geyser(
1892    rt: Arc<tokio::runtime::Runtime>,
1893    slot_range: Range<u64>,
1894    geyser_config_files: Option<&[PathBuf]>,
1895    index_base_url: &Url,
1896    client: &Client,
1897    on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1898    threads: u64,
1899) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1900    if threads == 0 {
1901        return Err((
1902            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1903            slot_range.start,
1904        ));
1905    }
1906    log::info!(target: LOG_MODULE, "starting firehose...");
1907    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1908    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1909    let mut entry_notifier_maybe = None;
1910    let mut block_meta_notifier_maybe = None;
1911    let mut transaction_notifier_maybe = None;
1912    if let Some(geyser_config_files) = geyser_config_files {
1913        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1914
1915        let service =
1916            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1917                confirmed_bank_receiver.clone(),
1918                true,
1919                geyser_config_files,
1920            )
1921            .map_err(|e| (e.into(), slot_range.start))?;
1922
1923        transaction_notifier_maybe = Some(
1924            service
1925                .get_transaction_notifier()
1926                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1927                .map_err(|e| (e, slot_range.start))?,
1928        );
1929
1930        entry_notifier_maybe = service.get_entry_notifier();
1931        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1932
1933        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1934    }
1935
1936    if entry_notifier_maybe.is_some() {
1937        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1938    } else {
1939        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1940    }
1941    log::info!(target: LOG_MODULE, "running on_load...");
1942    rt.spawn(on_load);
1943
1944    let slot_range = Arc::new(slot_range);
1945    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1946    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1947    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1948    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1949
1950    // divide slot_range into n subranges
1951    let subranges = generate_subranges(&slot_range, threads);
1952    if threads > 1 {
1953        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1954    }
1955
1956    let mut handles = Vec::new();
1957    // Shared per-thread error counters
1958    let error_counts: Arc<Vec<AtomicU32>> =
1959        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1960
1961    for (i, slot_range) in subranges.into_iter().enumerate() {
1962        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1963        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1964        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1965        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1966        let client = client.clone();
1967        let error_counts = error_counts.clone();
1968
1969        let rt_clone = rt.clone();
1970
1971        let handle = std::thread::spawn(move || {
1972            rt_clone.block_on(async {
1973                firehose_geyser_thread(
1974                    slot_range,
1975                    transaction_notifier_maybe,
1976                    entry_notifier_maybe,
1977                    block_meta_notifier_maybe,
1978                    confirmed_bank_sender,
1979                    &client,
1980                    if threads > 1 { Some(i) } else { None },
1981                    error_counts,
1982                )
1983                .await
1984                .unwrap();
1985            });
1986        });
1987        handles.push(handle);
1988    }
1989
1990    // Wait for all threads to complete
1991    for handle in handles {
1992        handle.join().unwrap();
1993    }
1994    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1995    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1996        block_meta_notifier.notify_block_metadata(
1997            u64::MAX,
1998            "unload",
1999            u64::MAX,
2000            "unload",
2001            &KeyedRewardsAndNumPartitions {
2002                keyed_rewards: vec![],
2003                num_partitions: None,
2004            },
2005            None,
2006            None,
2007            0,
2008            0,
2009        );
2010    }
2011    Ok(confirmed_bank_receiver)
2012}
2013
2014#[allow(clippy::too_many_arguments)]
2015#[allow(clippy::result_large_err)]
2016async fn firehose_geyser_thread(
2017    mut slot_range: Range<u64>,
2018    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
2019    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
2020    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
2021    confirmed_bank_sender: Sender<SlotNotification>,
2022    client: &Client,
2023    thread_index: Option<usize>,
2024    error_counts: Arc<Vec<AtomicU32>>,
2025) -> Result<(), (FirehoseError, u64)> {
2026    let start_time = std::time::Instant::now();
2027    let log_target = if let Some(thread_index) = thread_index {
2028        format!("{}::T{:03}", LOG_MODULE, thread_index)
2029    } else {
2030        LOG_MODULE.to_string()
2031    };
2032    let initial_slot_range = slot_range.clone();
2033    let mut skip_until_index = None;
2034    let mut last_counted_slot = slot_range.start.saturating_sub(1);
2035    // let mut triggered = false;
2036    while let Err((err, slot)) = async {
2037            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
2038            log::info!(
2039                target: &log_target,
2040                "slot range: {} (epoch {}) ... {} (epoch {})",
2041                slot_range.start,
2042                slot_to_epoch(slot_range.start),
2043                slot_range.end,
2044                slot_to_epoch(slot_range.end)
2045            );
2046
2047            log::info!(target: &log_target, "🚒 starting firehose...");
2048
2049            // for each epoch
2050            let mut current_slot: Option<u64> = None;
2051            for epoch_num in epoch_range.clone() {
2052                log::info!(target: &log_target, "entering epoch {}", epoch_num);
2053                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
2054                    Ok(stream) => stream,
2055                    Err(_) => {
2056                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
2057                    }
2058                };
2059                let mut reader = NodeReader::new(stream);
2060
2061                let header_fut = reader.read_raw_header();
2062                let header = match timeout(OP_TIMEOUT, header_fut).await {
2063                    Ok(res) => res
2064                        .map_err(FirehoseError::ReadHeader)
2065                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2066                    Err(_) => {
2067                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
2068                    }
2069                };
2070                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
2071
2072                let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
2073                let local_start = std::cmp::max(slot_range.start, epoch_start);
2074                let local_end_inclusive =
2075                    std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
2076                if local_start > local_end_inclusive {
2077                    log::debug!(
2078                        target: &log_target,
2079                        "epoch {} has no overlap with thread range ({}..{}), skipping",
2080                        epoch_num,
2081                        slot_range.start,
2082                        slot_range.end
2083                    );
2084                    continue;
2085                }
2086
2087                let mut todo_previous_blockhash = Hash::default();
2088                let mut todo_latest_entry_blockhash = Hash::default();
2089                // Reset counters to align to the local epoch slice; prevents boundary slots
2090                // from being treated as already-counted after a restart.
2091                last_counted_slot = local_start.saturating_sub(1);
2092                current_slot = None;
2093
2094                if local_start > epoch_start {
2095                    // Seek to the nearest previous indexed slot so the reader captures the full
2096                    // node set (transactions, entries, rewards) for the target block.
2097                    let seek_slot = match timeout(
2098                        OP_TIMEOUT,
2099                        find_previous_indexed_slot(local_start, epoch_start, &log_target),
2100                    )
2101                    .await
2102                    {
2103                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2104                        Err(_) => {
2105                            return Err((
2106                                FirehoseError::OperationTimeout(
2107                                    "seek_to_previous_indexed_slot",
2108                                ),
2109                                current_slot.unwrap_or(slot_range.start),
2110                            ));
2111                        }
2112                    };
2113                    if let Some(seek_slot) = seek_slot {
2114                        let seek_fut = reader.seek_to_slot(seek_slot);
2115                        match timeout(OP_TIMEOUT, seek_fut).await {
2116                            Ok(res) => {
2117                                res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2118                            }
2119                            Err(_) => {
2120                                return Err((
2121                                    FirehoseError::OperationTimeout("seek_to_slot"),
2122                                    current_slot.unwrap_or(slot_range.start),
2123                                ));
2124                            }
2125                        }
2126                    }
2127                }
2128
2129                // for each item in each block
2130                let mut item_index = 0;
2131                let mut displayed_skip_message = false;
2132                loop {
2133                    let read_fut = reader.read_until_block();
2134                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2135                        Ok(result) => result
2136                            .map_err(FirehoseError::ReadUntilBlockError)
2137                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2138                        Err(_) => {
2139                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2140                            let restart_slot =
2141                                current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2142                            return Err((
2143                                FirehoseError::OperationTimeout("read_until_block"),
2144                                restart_slot,
2145                            ));
2146                        }
2147                    };
2148                    if nodes.is_empty() {
2149                        log::info!(
2150                            target: &log_target,
2151                            "reached end of epoch {}",
2152                            epoch_num
2153                        );
2154                        break;
2155                    }
2156                    // ignore epoch and subset nodes at end of car file loop { if
2157                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
2158                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
2159                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
2160                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
2161                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
2162                    //         log::info!(target: &log_target, "reached end of epoch {}",
2163                    //             epoch_num); break; }
2164                    if let Some(last_node) = nodes.0.last()
2165                        && !last_node.get_node().is_block() {
2166                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2167                            break;
2168                        }
2169                    let block = nodes
2170                        .get_block()
2171                        .map_err(FirehoseError::GetBlockError)
2172                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2173                    log::debug!(
2174                        target: &log_target,
2175                        "read {} items from epoch {}, now at slot {}",
2176                        item_index,
2177                        epoch_num,
2178                        block.slot
2179                    );
2180                    let slot = block.slot;
2181                    if slot > local_end_inclusive {
2182                        log::debug!(
2183                            target: &log_target,
2184                            "reached end of local slice at slot {} (epoch {}), stopping",
2185                            slot,
2186                            epoch_num
2187                        );
2188                        break;
2189                    }
2190                    if slot >= slot_range.end {
2191                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2192                        // Return early to terminate the firehose thread cleanly. We use >=
2193                        // because slot_range is half-open [start, end), so any slot equal to
2194                        // end is out-of-range and must not be processed.
2195                        return Ok(());
2196                    }
2197                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2198                    if slot < local_start {
2199                        if slot.saturating_add(1) == local_start {
2200                            log::debug!(
2201                                target: &log_target,
2202                                "priming reader with preceding slot {}, skipping",
2203                                slot
2204                            );
2205                        } else {
2206                            log::warn!(
2207                                target: &log_target,
2208                                "encountered slot {} before start of range {}, skipping",
2209                                slot,
2210                                local_start
2211                            );
2212                        }
2213                        continue;
2214                    }
2215                    current_slot = Some(slot);
2216                    let mut entry_index: usize = 0;
2217                    let mut this_block_executed_transaction_count: u64 = 0;
2218                    let mut this_block_entry_count: u64 = 0;
2219                    let mut this_block_rewards = DecodedRewards::empty();
2220
2221                    if slot <= last_counted_slot {
2222                        log::debug!(
2223                            target: &log_target,
2224                            "duplicate block {}, already counted (last_counted={})",
2225                            slot,
2226                            last_counted_slot,
2227                        );
2228                        continue;
2229                    }
2230
2231                    nodes.each(|node_with_cid| -> Result<(), SharedError> {
2232                        item_index += 1;
2233                        // if item_index == 100000 && !triggered { log::info!("simulating
2234                        //     error"); triggered = true; return
2235                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
2236                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
2237                        //         "simulated error", )), ))); }
2238                        if let Some(skip) = skip_until_index {
2239                            if item_index < skip {
2240                                if !displayed_skip_message {
2241                                    log::info!(
2242                                        target: &log_target,
2243                                        "skipping until index {} (at {})",
2244                                        skip,
2245                                        item_index
2246                                    );
2247                                    displayed_skip_message = true;
2248                                }
2249                                return Ok(());
2250                            } else {
2251                                log::info!(
2252                                    target: &log_target,
2253                                    "reached target index {}, resuming...",
2254                                    skip
2255                                );
2256                                skip_until_index = None;
2257                            }
2258                        }
2259                        let node = node_with_cid.get_node();
2260
2261                        use crate::node::Node::*;
2262                        match node {
2263                            Transaction(tx) => {
2264                                let versioned_tx = tx.as_parsed()?;
2265                                let reassembled_metadata = nodes.reassemble_dataframes(&tx.metadata)?;
2266
2267                                let as_native_metadata = decode_transaction_status_meta_from_frame(
2268                                    block.slot,
2269                                    reassembled_metadata,
2270                                )?;
2271
2272                                let message_hash = {
2273                                    #[cfg(feature = "verify-transaction-signatures")]
2274                                    {
2275                                        versioned_tx.verify_and_hash_message()?
2276                                    }
2277                                    #[cfg(not(feature = "verify-transaction-signatures"))]
2278                                    {
2279                                        // Signature verification is optional because it is
2280                                        // extremely expensive at replay scale.
2281                                        versioned_tx.message.hash()
2282                                    }
2283                                };
2284                                let signature = versioned_tx
2285                                    .signatures
2286                                    .first()
2287                                    .ok_or_else(|| {
2288                                        Box::new(std::io::Error::new(
2289                                            std::io::ErrorKind::InvalidData,
2290                                            "transaction missing signature",
2291                                        )) as SharedError
2292                                    })?;
2293                                let is_vote = is_simple_vote_transaction(&versioned_tx);
2294
2295                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2296                                    transaction_notifier.notify_transaction(
2297                                        block.slot,
2298                                        tx.index.unwrap() as usize,
2299                                        signature,
2300                                        &message_hash,
2301                                        is_vote,
2302                                        &as_native_metadata,
2303                                        &versioned_tx,
2304                                    );
2305                                }
2306
2307                            }
2308                            Entry(entry) => {
2309                                let entry_hash = Hash::from(entry.hash.to_bytes());
2310                                let entry_transaction_count = entry.transactions.len();
2311                                let entry_transaction_count_u64 = entry_transaction_count as u64;
2312                                let starting_transaction_index =
2313                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2314                                        Box::new(std::io::Error::other(
2315                                            "transaction index exceeds usize range",
2316                                        )) as SharedError
2317                                    })?;
2318                                todo_latest_entry_blockhash = entry_hash;
2319                                this_block_executed_transaction_count += entry_transaction_count_u64;
2320                                this_block_entry_count += 1;
2321                                if entry_notifier_maybe.is_none() {
2322                                    return Ok(());
2323                                }
2324                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2325                                let entry_summary = solana_entry::entry::EntrySummary {
2326                                    num_hashes: entry.num_hashes,
2327                                    hash: Hash::from(entry.hash.to_bytes()),
2328                                    num_transactions: entry_transaction_count_u64,
2329                                };
2330                                entry_notifier.notify_entry(
2331                                    block.slot,
2332                                    entry_index,
2333                                    &entry_summary,
2334                                    starting_transaction_index,
2335                                );
2336                                entry_index += 1;
2337                            }
2338                            Block(block) => {
2339                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2340                                confirmed_bank_sender.send(notification).unwrap();
2341
2342                                if block_meta_notifier_maybe.is_none() {
2343                                    last_counted_slot = block.slot;
2344                                    return Ok(());
2345                                }
2346                                let DecodedRewards {
2347                                    keyed_rewards,
2348                                    num_partitions,
2349                                } = std::mem::take(&mut this_block_rewards);
2350                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2351                                block_meta_notifier.notify_block_metadata(
2352                                    block.meta.parent_slot,
2353                                    todo_previous_blockhash.to_string().as_str(),
2354                                    block.slot,
2355                                    todo_latest_entry_blockhash.to_string().as_str(),
2356                                    &KeyedRewardsAndNumPartitions {
2357                                        keyed_rewards,
2358                                        num_partitions,
2359                                    },
2360                                    Some(block.meta.blocktime as i64),
2361                                    block.meta.block_height,
2362                                    this_block_executed_transaction_count,
2363                                    this_block_entry_count,
2364                                );
2365                                todo_previous_blockhash = todo_latest_entry_blockhash;
2366                                last_counted_slot = block.slot;
2367                                std::thread::yield_now();
2368                            }
2369                            Subset(_subset) => (),
2370                            Epoch(_epoch) => (),
2371                            Rewards(rewards) => {
2372                                let reassembled = nodes.reassemble_dataframes(&rewards.data)?;
2373                                if !reassembled.is_empty() {
2374                                    this_block_rewards = decode_rewards_from_frame(
2375                                        block.slot,
2376                                        reassembled,
2377                                    )?;
2378                                } else {
2379                                    this_block_rewards = DecodedRewards::empty();
2380                                }
2381                            }
2382                            DataFrame(_data_frame) => (),
2383                        }
2384                        Ok(())
2385                    })
2386                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2387                    if block.slot == slot_range.end - 1 {
2388                        let finish_time = std::time::Instant::now();
2389                        let elapsed = finish_time.duration_since(start_time);
2390                        log::info!(target: &log_target, "processed slot {}", block.slot);
2391                        let elapsed_pretty = human_readable_duration(elapsed);
2392                        log::info!(
2393                            target: &log_target,
2394                            "processed {} slots across {} epochs in {}.",
2395                            initial_slot_range.end - initial_slot_range.start,
2396                            slot_to_epoch(initial_slot_range.end)
2397                                + 1
2398                                - slot_to_epoch(initial_slot_range.start),
2399                            elapsed_pretty
2400                        );
2401                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2402                        // On completion, report threads with non-zero error counts for
2403                        // visibility.
2404                        let summary: String = error_counts
2405                            .iter()
2406                            .enumerate()
2407                            .filter_map(|(i, c)| {
2408                                let v = c.load(Ordering::Relaxed);
2409                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2410                            })
2411                            .collect::<Vec<_>>()
2412                            .join(", ");
2413                        if !summary.is_empty() {
2414                            log::debug!(target: &log_target, "threads with errors: {}", summary);
2415                        }
2416                        return Ok(());
2417                    }
2418                }
2419            }
2420            Ok(())
2421}
2422.await
2423{
2424        if is_shutdown_error(&err) {
2425            log::info!(
2426                target: &log_target,
2427                "shutdown requested; terminating firehose thread {:?}",
2428                thread_index
2429            );
2430            return Ok(());
2431        }
2432        log::error!(
2433            target: &log_target,
2434            "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2435            slot,
2436            slot_to_epoch(slot)
2437            );
2438            log::error!(target: &log_target, "{}", err);
2439            let error_message = err.to_string();
2440            if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2441                || error_message.contains("Unknown CID version")
2442            {
2443                // Clear cached index data for this epoch to avoid retrying with a bad/partial index
2444                // (or a bad seek offset that landed mid-stream).
2445                SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2446            }
2447            let item_index = match err {
2448                FirehoseError::NodeDecodingError(item_index, _) => item_index,
2449                _ => 0,
2450            };
2451            // Increment this thread's error counter
2452            let idx = thread_index.unwrap_or(0);
2453            error_counts[idx].fetch_add(1, Ordering::Relaxed);
2454            log::warn!(
2455                target: &log_target,
2456                "restarting from slot {} at index {}",
2457                slot,
2458                item_index,
2459            );
2460            // Update slot range to resume from the failed slot, not the original start.
2461            // If the failing slot was already fully processed, resume from the next slot.
2462            if slot <= last_counted_slot {
2463                slot_range.start = last_counted_slot.saturating_add(1);
2464            } else {
2465                slot_range.start = slot;
2466            }
2467            // `skip_until_index` is unsafe across retries because `item_index`
2468            // is reset to 0 each epoch restart. Keeping it can skip large portions
2469            // of the stream and silently drop slots.
2470            skip_until_index = None;
2471}
2472    Ok(())
2473}
2474
2475#[inline]
2476fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2477    if !(1..=2).contains(&versioned_tx.signatures.len()) {
2478        return false;
2479    }
2480
2481    if !matches!(
2482        versioned_tx.version(),
2483        solana_transaction::versioned::TransactionVersion::Legacy(_)
2484    ) {
2485        return false;
2486    }
2487
2488    let instructions = versioned_tx.message.instructions();
2489    if instructions.len() != 1 {
2490        return false;
2491    }
2492
2493    let program_index = instructions[0].program_id_index as usize;
2494    versioned_tx
2495        .message
2496        .static_account_keys()
2497        .get(program_index)
2498        .map(|program_id| program_id == &vote_program_id())
2499        .unwrap_or(false)
2500}
2501
2502#[inline(always)]
2503fn convert_proto_rewards(
2504    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2505) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2506    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2507    for proto_reward in proto_rewards.rewards.iter() {
2508        let reward = RewardInfo {
2509            reward_type: match proto_reward.reward_type - 1 {
2510                0 => RewardType::Fee,
2511                1 => RewardType::Rent,
2512                2 => RewardType::Staking,
2513                3 => RewardType::Voting,
2514                typ => {
2515                    return Err(Box::new(std::io::Error::other(format!(
2516                        "unsupported reward type {}",
2517                        typ
2518                    ))));
2519                }
2520            },
2521            lamports: proto_reward.lamports,
2522            post_balance: proto_reward.post_balance,
2523            commission: proto_reward.commission.parse::<u8>().ok(),
2524        };
2525        let pubkey = proto_reward
2526            .pubkey
2527            .parse::<Address>()
2528            .map_err(|err| Box::new(err) as SharedError)?;
2529        keyed_rewards.push((pubkey, reward));
2530    }
2531    Ok(keyed_rewards)
2532}
2533
2534#[inline]
2535/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
2536pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2537    let total = slot_range.end - slot_range.start;
2538    let slots_per_thread = total / threads;
2539    let remainder = total % threads;
2540
2541    let ranges: Vec<Range<u64>> = (0..threads)
2542        .map(|i| {
2543            // Distribute remainder slots to the first `remainder` threads
2544            let extra_slot = if i < remainder { 1 } else { 0 };
2545            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2546            let end = start + slots_per_thread + extra_slot;
2547            start..end
2548        })
2549        .collect();
2550
2551    // Verify that ranges cover all slots exactly
2552    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2553    assert_eq!(
2554        total_covered, total,
2555        "Range generation failed: {} threads should cover {} slots but only cover {}",
2556        threads, total, total_covered
2557    );
2558
2559    // Verify no gaps between ranges
2560    for i in 1..ranges.len() {
2561        assert_eq!(
2562            ranges[i - 1].end,
2563            ranges[i].start,
2564            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2565            i - 1,
2566            ranges[i - 1].end,
2567            i,
2568            ranges[i].start
2569        );
2570    }
2571
2572    log::info!(
2573        target: LOG_MODULE,
2574        "Generated {} thread ranges covering {} slots total",
2575        threads,
2576        total_covered
2577    );
2578    ranges
2579}
2580
2581fn human_readable_duration(duration: std::time::Duration) -> String {
2582    if duration.is_zero() {
2583        return "0s".into();
2584    }
2585    let total_secs = duration.as_secs();
2586    if total_secs < 60 {
2587        let secs_f = duration.as_secs_f64();
2588        if total_secs == 0 {
2589            format!("{:.2}s", secs_f)
2590        } else if duration.subsec_millis() == 0 {
2591            format!("{}s", total_secs)
2592        } else {
2593            format!("{:.2}s", secs_f)
2594        }
2595    } else {
2596        let mut secs = total_secs;
2597        let days = secs / 86_400;
2598        secs %= 86_400;
2599        let hours = secs / 3_600;
2600        secs %= 3_600;
2601        let minutes = secs / 60;
2602        secs %= 60;
2603        if days > 0 {
2604            if hours > 0 {
2605                format!("{days}d{hours}h")
2606            } else {
2607                format!("{days}d")
2608            }
2609        } else if hours > 0 {
2610            if minutes > 0 {
2611                format!("{hours}h{minutes}m")
2612            } else {
2613                format!("{hours}h")
2614            }
2615        } else if minutes > 0 {
2616            if secs > 0 {
2617                format!("{minutes}m{secs}s")
2618            } else {
2619                format!("{minutes}m")
2620            }
2621        } else {
2622            format!("{secs}s")
2623        }
2624    }
2625}
2626
2627#[cfg(test)]
2628fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2629    Box::pin(async move {
2630        let elapsed = stats.start_time.elapsed();
2631        let elapsed_secs = elapsed.as_secs_f64();
2632        let tps = if elapsed_secs > 0.0 {
2633            stats.transactions_processed as f64 / elapsed_secs
2634        } else {
2635            0.0
2636        };
2637        log::info!(
2638            target: LOG_MODULE,
2639            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2640            stats.thread_stats.current_slot,
2641            stats.slots_processed,
2642            stats.blocks_processed,
2643            stats.transactions_processed,
2644            stats.entries_processed,
2645            stats.rewards_processed,
2646            elapsed_secs,
2647            tps
2648        );
2649        Ok(())
2650    })
2651}
2652
2653#[cfg(test)]
2654use futures_util::FutureExt;
2655#[cfg(test)]
2656use serial_test::serial;
2657#[cfg(test)]
2658use std::sync::{Mutex, OnceLock};
2659
2660#[cfg(test)]
2661async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2662    use std::sync::Arc;
2663    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2664
2665    let found = Arc::new(AtomicBool::new(false));
2666    let observed_total = Arc::new(AtomicU64::new(0));
2667    let observed_non_vote = Arc::new(AtomicU64::new(0));
2668
2669    let found_block = found.clone();
2670    let observed_total_block = observed_total.clone();
2671    let target_slot_block = slot;
2672    let target_slot_tx = slot;
2673    let observed_non_vote_tx = observed_non_vote.clone();
2674
2675    firehose(
2676        1,
2677        false,
2678        None,
2679        target_slot_block..(target_slot_block + 1),
2680        Some(move |_thread_id: usize, block: BlockData| {
2681            let found_block = found_block.clone();
2682            let observed_total_block = observed_total_block.clone();
2683            async move {
2684                if block.slot() == target_slot_block {
2685                    assert!(
2686                        !block.was_skipped(),
2687                        "slot {target_slot_block} was marked leader skipped",
2688                    );
2689                    if let BlockData::Block {
2690                        executed_transaction_count,
2691                        ..
2692                    } = block
2693                    {
2694                        found_block.store(true, Ordering::Relaxed);
2695                        observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2696                    }
2697                }
2698                Ok(())
2699            }
2700            .boxed()
2701        }),
2702        Some(move |_thread_id: usize, transaction: TransactionData| {
2703            let observed_non_vote_tx = observed_non_vote_tx.clone();
2704            async move {
2705                if transaction.slot == target_slot_tx && !transaction.is_vote {
2706                    observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2707                }
2708                Ok(())
2709            }
2710            .boxed()
2711        }),
2712        None::<OnEntryFn>,
2713        None::<OnRewardFn>,
2714        None::<OnErrorFn>,
2715        None::<OnStatsTrackingFn>,
2716        None,
2717    )
2718    .await
2719    .unwrap();
2720
2721    assert!(
2722        found.load(Ordering::Relaxed),
2723        "target slot {slot} was not processed"
2724    );
2725    let observed_total = observed_total.load(Ordering::Relaxed);
2726    let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2727    assert!(
2728        observed_total > 0,
2729        "slot {slot} executed transaction count was zero"
2730    );
2731    assert!(
2732        observed_total >= min_executed,
2733        "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2734    );
2735    log::info!(
2736        target: LOG_MODULE,
2737        "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2738        observed_total,
2739        observed_non_vote
2740    );
2741}
2742
2743#[cfg(test)]
2744async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2745    use crate::index::slot_to_offset;
2746    use crate::node::Node;
2747
2748    let epoch = slot_to_epoch(slot);
2749    let client = crate::network::create_http_client();
2750    let stream = fetch_epoch_stream(epoch, &client).await;
2751    let mut reader = NodeReader::new(stream);
2752    reader
2753        .seek_to_slot(slot)
2754        .await
2755        .map_err(|err| Box::new(err) as SharedError)?;
2756
2757    let nodes = reader.read_until_block().await?;
2758    let mut transactions = 0u64;
2759    let mut entries = 0u64;
2760    let mut entry_tx_total = 0u64;
2761    let mut dataframes = 0u64;
2762    let mut rewards = 0u64;
2763    let mut subsets = 0u64;
2764    let mut epochs = 0u64;
2765    let mut block_slot = None;
2766    let mut block_entries = None;
2767    let first_kind = nodes
2768        .0
2769        .first()
2770        .map(|node| node.get_node())
2771        .map(|node| match node {
2772            Node::Transaction(_) => "transaction",
2773            Node::Entry(_) => "entry",
2774            Node::Block(_) => "block",
2775            Node::Subset(_) => "subset",
2776            Node::Epoch(_) => "epoch",
2777            Node::Rewards(_) => "rewards",
2778            Node::DataFrame(_) => "dataframe",
2779        })
2780        .unwrap_or("none");
2781
2782    for node in &nodes.0 {
2783        match node.get_node() {
2784            Node::Transaction(_) => {
2785                transactions += 1;
2786            }
2787            Node::Entry(entry) => {
2788                entries += 1;
2789                entry_tx_total += entry.transactions.len() as u64;
2790            }
2791            Node::Block(block) => {
2792                block_slot = Some(block.slot);
2793                block_entries = Some(block.entries.len());
2794            }
2795            Node::Subset(_) => {
2796                subsets += 1;
2797            }
2798            Node::Epoch(_) => {
2799                epochs += 1;
2800            }
2801            Node::Rewards(_) => {
2802                rewards += 1;
2803            }
2804            Node::DataFrame(_) => {
2805                dataframes += 1;
2806            }
2807        }
2808    }
2809
2810    log::info!(
2811        target: LOG_MODULE,
2812        "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2813        nodes.len(),
2814        first_kind,
2815        transactions,
2816        entries,
2817        entry_tx_total,
2818        block_slot,
2819        block_entries,
2820        dataframes,
2821        rewards,
2822        subsets,
2823        epochs
2824    );
2825
2826    if slot > 0 {
2827        let mut found_previous = None;
2828        for delta in 1..=5 {
2829            let candidate = slot.saturating_sub(delta);
2830            match slot_to_offset(candidate).await {
2831                Ok(offset) => {
2832                    found_previous = Some((candidate, offset));
2833                    break;
2834                }
2835                Err(err) => {
2836                    log::info!(
2837                        target: LOG_MODULE,
2838                        "slot {slot} previous lookup {candidate} failed: {err}"
2839                    );
2840                }
2841            }
2842        }
2843        if let Some((candidate, offset)) = found_previous {
2844            log::info!(
2845                target: LOG_MODULE,
2846                "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2847            );
2848        } else {
2849            log::info!(
2850                target: LOG_MODULE,
2851                "slot {slot} no previous offsets found within 5 slots"
2852            );
2853        }
2854    }
2855
2856    Ok(())
2857}
2858
2859#[tokio::test(flavor = "multi_thread")]
2860async fn test_firehose_epoch_800() {
2861    use dashmap::DashSet;
2862    use std::sync::atomic::{AtomicU64, Ordering};
2863    solana_logger::setup_with_default("info");
2864    const THREADS: usize = 4;
2865    const NUM_SLOTS_TO_COVER: u64 = 50;
2866    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2867    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2868    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2869    static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2870    static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2871    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2872    let stats_tracking = StatsTracking {
2873        on_stats: log_stats_handler,
2874        tracking_interval_slots: 10,
2875    };
2876
2877    for prev in PREV_BLOCK.iter() {
2878        prev.store(0, Ordering::Relaxed);
2879    }
2880    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2881    NUM_BLOCKS.store(0, Ordering::Relaxed);
2882    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2883    SEEN_SLOTS.get_or_init(DashSet::new).clear();
2884    SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2885
2886    firehose(
2887        THREADS.try_into().unwrap(),
2888        false,
2889        None,
2890        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2891        Some(|thread_id: usize, block: BlockData| {
2892            async move {
2893                let _prev =
2894                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2895                if block.was_skipped() {
2896                    log::info!(
2897                        target: LOG_MODULE,
2898                        "leader skipped block {} on thread {}",
2899                        block.slot(),
2900                        thread_id,
2901                    );
2902                } else {
2903                    /*log::info!(
2904                        target: LOG_MODULE,
2905                        "got block {} on thread {}",
2906                        block.slot(),
2907                        thread_id,
2908                    );*/
2909                }
2910
2911                let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2912                if block.was_skipped() {
2913                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2914                    SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2915                } else {
2916                    if first_time {
2917                        NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2918                        if let BlockData::Block {
2919                            executed_transaction_count,
2920                            ..
2921                        } = &block
2922                        {
2923                            let executed = *executed_transaction_count;
2924                            let _ = MIN_TRANSACTIONS.fetch_update(
2925                                Ordering::Relaxed,
2926                                Ordering::Relaxed,
2927                                |current| {
2928                                    if executed < current {
2929                                        Some(executed)
2930                                    } else {
2931                                        None
2932                                    }
2933                                },
2934                            );
2935                        }
2936                    }
2937                }
2938                Ok(())
2939            }
2940            .boxed()
2941        }),
2942        None::<OnTxFn>,
2943        None::<OnEntryFn>,
2944        None::<OnRewardFn>,
2945        None::<OnErrorFn>,
2946        Some(stats_tracking),
2947        None,
2948    )
2949    .await
2950    .unwrap();
2951    let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2952    assert_eq!(
2953        seen, NUM_SLOTS_TO_COVER,
2954        "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2955    );
2956    let mut skipped: Vec<u64> = SEEN_SKIPPED
2957        .get_or_init(DashSet::new)
2958        .iter()
2959        .map(|v| *v)
2960        .collect();
2961    skipped.sort_unstable();
2962    // 345600000 is present but empty; still emitted as a block. Skip set should not include it.
2963    const EXPECTED_SKIPPED: [u64; 6] = [
2964        345_600_004,
2965        345_600_005,
2966        345_600_008,
2967        345_600_009,
2968        345_600_010,
2969        345_600_011,
2970    ];
2971    assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2972    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2973}
2974
2975#[tokio::test(flavor = "multi_thread")]
2976async fn test_firehose_target_slot_transactions() {
2977    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2978    solana_logger::setup_with_default("info");
2979    const TARGET_SLOT: u64 = 376_273_722;
2980    const SLOT_RADIUS: u64 = 50;
2981    const EXPECTED_TRANSACTIONS: u64 = 1414;
2982    const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2983    static FOUND: AtomicBool = AtomicBool::new(false);
2984    static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2985    static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2986
2987    FOUND.store(false, Ordering::Relaxed);
2988    OBSERVED_TXS.store(0, Ordering::Relaxed);
2989    OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2990
2991    firehose(
2992        4,
2993        false,
2994        None,
2995        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2996        Some(|_thread_id: usize, block: BlockData| {
2997            async move {
2998                if block.slot() == TARGET_SLOT {
2999                    assert!(
3000                        !block.was_skipped(),
3001                        "target slot {TARGET_SLOT} was marked leader skipped",
3002                    );
3003                    if let BlockData::Block {
3004                        executed_transaction_count,
3005                        ..
3006                    } = block
3007                    {
3008                        OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
3009                        FOUND.store(true, Ordering::Relaxed);
3010                        assert_eq!(
3011                            executed_transaction_count, EXPECTED_TRANSACTIONS,
3012                            "unexpected transaction count for slot {TARGET_SLOT}"
3013                        );
3014                        assert_eq!(
3015                            OBSERVED_NON_VOTE.load(Ordering::Relaxed),
3016                            EXPECTED_NON_VOTE_TRANSACTIONS,
3017                            "unexpected non-vote transaction count for slot {TARGET_SLOT}"
3018                        );
3019                    }
3020                }
3021                Ok(())
3022            }
3023            .boxed()
3024        }),
3025        Some(|_thread_id: usize, transaction: TransactionData| {
3026            async move {
3027                if transaction.slot == TARGET_SLOT && !transaction.is_vote {
3028                    OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
3029                }
3030                Ok(())
3031            }
3032            .boxed()
3033        }),
3034        None::<OnEntryFn>,
3035        None::<OnRewardFn>,
3036        None::<OnErrorFn>,
3037        None::<OnStatsTrackingFn>,
3038        None,
3039    )
3040    .await
3041    .unwrap();
3042
3043    assert!(
3044        FOUND.load(Ordering::Relaxed),
3045        "target slot was not processed"
3046    );
3047    assert_eq!(
3048        OBSERVED_TXS.load(Ordering::Relaxed),
3049        EXPECTED_TRANSACTIONS,
3050        "recorded transaction count mismatch"
3051    );
3052}
3053
3054#[cfg(test)]
3055#[serial]
3056#[tokio::test(flavor = "multi_thread")]
3057async fn test_firehose_epoch_900_boundary_window_sequential_monotonic_transactions() {
3058    use std::sync::{
3059        Arc, Mutex,
3060        atomic::{AtomicU64, Ordering},
3061    };
3062
3063    solana_logger::setup_with_default("info");
3064    const SLOT_COUNT: u64 = 100;
3065    const THREADS: u64 = 4;
3066    const TEST_BUFFER_WINDOW: &str = "4GiB";
3067
3068    let (epoch_900_start, _) = epoch_to_slot_range(900);
3069    let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3070
3071    let last_seen_tx_slot = Arc::new(Mutex::new(slot_range.start));
3072    let observed_txs = Arc::new(AtomicU64::new(0));
3073    let stats_tracking = StatsTracking {
3074        on_stats: log_stats_handler,
3075        tracking_interval_slots: 100,
3076    };
3077    let test_buffer_window_bytes = crate::system::parse_buffer_window_bytes(TEST_BUFFER_WINDOW)
3078        .expect("valid test buffer window");
3079
3080    firehose(
3081        THREADS,
3082        true,
3083        Some(test_buffer_window_bytes),
3084        slot_range.clone(),
3085        None::<OnBlockFn>,
3086        Some({
3087            let last_seen_tx_slot = last_seen_tx_slot.clone();
3088            let observed_txs = observed_txs.clone();
3089            move |_thread_id: usize, transaction: TransactionData| {
3090                let last_seen_tx_slot = last_seen_tx_slot.clone();
3091                let observed_txs = observed_txs.clone();
3092                async move {
3093                    let mut previous = last_seen_tx_slot.lock().unwrap();
3094                    // Old Faithful does not include leader-skipped slots, so gaps are
3095                    // expected. We only enforce monotonic (non-decreasing) tx slot ordering.
3096                    assert!(
3097                        transaction.slot >= *previous,
3098                        "transaction slot regressed: prev={}, current={}",
3099                        *previous,
3100                        transaction.slot
3101                    );
3102                    *previous = transaction.slot;
3103                    observed_txs.fetch_add(1, Ordering::Relaxed);
3104                    Ok(())
3105                }
3106                .boxed()
3107            }
3108        }),
3109        None::<OnEntryFn>,
3110        None::<OnRewardFn>,
3111        None::<OnErrorFn>,
3112        Some(stats_tracking),
3113        None,
3114    )
3115    .await
3116    .unwrap();
3117
3118    assert!(
3119        observed_txs.load(Ordering::Relaxed) > 0,
3120        "expected to observe at least one transaction in slots [{}, {})",
3121        slot_range.start,
3122        slot_range.end
3123    );
3124}
3125
3126#[cfg(test)]
3127#[serial]
3128#[tokio::test(flavor = "multi_thread")]
3129async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
3130    solana_logger::setup_with_default("info");
3131    assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
3132}
3133
3134#[cfg(test)]
3135#[serial]
3136#[tokio::test(flavor = "multi_thread")]
3137async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
3138    solana_logger::setup_with_default("info");
3139    assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
3140}
3141
3142#[cfg(test)]
3143#[serial]
3144#[tokio::test(flavor = "multi_thread")]
3145async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
3146    solana_logger::setup_with_default("info");
3147    assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
3148}
3149
3150#[cfg(test)]
3151#[serial]
3152#[tokio::test(flavor = "multi_thread")]
3153async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
3154    solana_logger::setup_with_default("info");
3155    assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
3156}
3157
3158#[cfg(test)]
3159#[ignore]
3160#[serial]
3161#[tokio::test(flavor = "multi_thread")]
3162async fn debug_epoch_720_slot_311173980_node_summary() {
3163    solana_logger::setup_with_default("info");
3164    const SLOTS: &[u64] = &[
3165        311_173_980,
3166        311_225_232,
3167        311_175_860,
3168        311_134_608,
3169        376_273_722,
3170    ];
3171    for slot in SLOTS {
3172        log_slot_node_summary(*slot).await.expect("slot summary");
3173    }
3174}
3175
3176#[tokio::test(flavor = "multi_thread")]
3177async fn test_firehose_epoch_850_has_logs() {
3178    use std::sync::atomic::{AtomicU64, Ordering};
3179    solana_logger::setup_with_default("info");
3180    const START_SLOT: u64 = 367_200_075; // within epoch 850
3181    const SLOT_COUNT: u64 = 50;
3182    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3183
3184    TOTAL_TXS.store(0, Ordering::Relaxed);
3185
3186    firehose(
3187        4,
3188        false,
3189        None,
3190        START_SLOT..(START_SLOT + SLOT_COUNT),
3191        None::<OnBlockFn>,
3192        Some(|_thread_id: usize, transaction: TransactionData| {
3193            async move {
3194                TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3195                if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3196                    let has_logs = logs.iter().any(|msg| !msg.is_empty());
3197                    assert_eq!(has_logs, true);
3198                }
3199                Ok(())
3200            }
3201            .boxed()
3202        }),
3203        None::<OnEntryFn>,
3204        None::<OnRewardFn>,
3205        None::<OnErrorFn>,
3206        None::<OnStatsTrackingFn>,
3207        None,
3208    )
3209    .await
3210    .unwrap();
3211
3212    assert!(
3213        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3214        "no transactions observed in epoch 850 range"
3215    );
3216}
3217
3218#[tokio::test(flavor = "multi_thread")]
3219async fn test_firehose_epoch_850_votes_present() {
3220    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3221    solana_logger::setup_with_default("info");
3222    const TARGET_SLOT: u64 = 367_200_100; // epoch 850
3223    const SLOT_RADIUS: u64 = 10;
3224    static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3225    static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3226    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3227
3228    SEEN_BLOCK.store(false, Ordering::Relaxed);
3229    VOTE_TXS.store(0, Ordering::Relaxed);
3230    TOTAL_TXS.store(0, Ordering::Relaxed);
3231
3232    firehose(
3233        2,
3234        false,
3235        None,
3236        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3237        Some(|_thread_id: usize, block: BlockData| {
3238            async move {
3239                if block.slot() == TARGET_SLOT {
3240                    assert!(
3241                        !block.was_skipped(),
3242                        "target slot {TARGET_SLOT} was marked leader skipped",
3243                    );
3244                    SEEN_BLOCK.store(true, Ordering::Relaxed);
3245                }
3246                Ok(())
3247            }
3248            .boxed()
3249        }),
3250        Some(|_thread_id: usize, transaction: TransactionData| {
3251            async move {
3252                if transaction.slot == TARGET_SLOT {
3253                    TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3254                    if transaction.is_vote {
3255                        VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3256                    }
3257                }
3258                Ok(())
3259            }
3260            .boxed()
3261        }),
3262        None::<OnEntryFn>,
3263        None::<OnRewardFn>,
3264        None::<OnErrorFn>,
3265        None::<OnStatsTrackingFn>,
3266        None,
3267    )
3268    .await
3269    .unwrap();
3270
3271    assert!(
3272        SEEN_BLOCK.load(Ordering::Relaxed),
3273        "target slot was not processed"
3274    );
3275    assert!(
3276        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3277        "no transactions counted in target slot"
3278    );
3279    assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3280}
3281
3282#[cfg(test)]
3283#[serial]
3284#[tokio::test(flavor = "multi_thread")]
3285async fn test_firehose_restart_loses_coverage_without_reset() {
3286    use std::collections::HashMap;
3287    solana_logger::setup_with_default("info");
3288    const THREADS: usize = 1;
3289    const START_SLOT: u64 = 345_600_000;
3290    const NUM_SLOTS: u64 = 8;
3291
3292    static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3293    COVERAGE
3294        .get_or_init(|| Mutex::new(HashMap::new()))
3295        .lock()
3296        .unwrap()
3297        .clear();
3298    static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3299    static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3300    FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3301    SEEN_BLOCKS.store(0, Ordering::Relaxed);
3302
3303    firehose(
3304        THREADS.try_into().unwrap(),
3305        false,
3306        None,
3307        START_SLOT..(START_SLOT + NUM_SLOTS),
3308        Some(|_thread_id: usize, block: BlockData| {
3309            async move {
3310                // Force an error after at least one block has been seen so restart happens mid-range.
3311                if !block.was_skipped()
3312                    && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3313                    && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3314                {
3315                    return Err("synthetic handler failure to exercise restart".into());
3316                }
3317                let mut coverage = COVERAGE
3318                    .get_or_init(|| Mutex::new(HashMap::new()))
3319                    .lock()
3320                    .unwrap();
3321                *coverage.entry(block.slot()).or_insert(0) += 1;
3322                if !block.was_skipped() {
3323                    SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3324                }
3325                Ok(())
3326            }
3327            .boxed()
3328        }),
3329        None::<OnTxFn>,
3330        None::<OnEntryFn>,
3331        None::<OnRewardFn>,
3332        None::<OnErrorFn>,
3333        None::<OnStatsTrackingFn>,
3334        None,
3335    )
3336    .await
3337    .unwrap();
3338
3339    let coverage = COVERAGE.get().unwrap().lock().unwrap();
3340    for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3341        assert!(
3342            coverage.contains_key(&slot),
3343            "missing coverage for slot {slot} after restart"
3344        );
3345    }
3346}
3347
3348#[cfg(test)]
3349#[serial]
3350#[tokio::test(flavor = "multi_thread")]
3351async fn test_firehose_gap_coverage_near_known_missing_range() {
3352    use std::collections::HashSet;
3353    solana_logger::setup_with_default("info");
3354    const GAP_START: u64 = 378864000;
3355    const START_SLOT: u64 = GAP_START - 1000;
3356    const END_SLOT: u64 = GAP_START + 1000;
3357    const THREADS: usize = 16;
3358
3359    static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3360    COVERAGE
3361        .get_or_init(|| Mutex::new(HashSet::new()))
3362        .lock()
3363        .unwrap()
3364        .clear();
3365
3366    firehose(
3367        THREADS.try_into().unwrap(),
3368        false,
3369        None,
3370        START_SLOT..(END_SLOT + 1),
3371        Some(|_thread_id: usize, block: BlockData| {
3372            async move {
3373                if block.was_skipped() {
3374                    return Ok(());
3375                }
3376                let slot = block.slot();
3377                COVERAGE
3378                    .get_or_init(|| Mutex::new(HashSet::new()))
3379                    .lock()
3380                    .unwrap()
3381                    .insert(slot);
3382                Ok(())
3383            }
3384            .boxed()
3385        }),
3386        None::<OnTxFn>,
3387        None::<OnEntryFn>,
3388        None::<OnRewardFn>,
3389        None::<OnErrorFn>,
3390        None::<OnStatsTrackingFn>,
3391        None,
3392    )
3393    .await
3394    .unwrap();
3395
3396    let mut coverage = COVERAGE
3397        .get_or_init(|| Mutex::new(HashSet::new()))
3398        .lock()
3399        .unwrap()
3400        .clone();
3401
3402    // ignore a known 4-slot leader skipped gap
3403    coverage.insert(378864396);
3404    coverage.insert(378864397);
3405    coverage.insert(378864398);
3406    coverage.insert(378864399);
3407
3408    let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3409    let missing: Vec<u64> = expected
3410        .iter()
3411        .copied()
3412        .filter(|slot| !coverage.contains(slot))
3413        .collect();
3414    assert!(
3415        missing.is_empty(),
3416        "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3417        missing.len(),
3418        &missing[..missing.len().min(10)]
3419    );
3420}