Skip to main content

jetstreamer_firehose/
firehose.rs

1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7    block_metadata_notifier_interface::BlockMetadataNotifier,
8    geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14    optimistically_confirmed_bank_tracker::SlotNotification,
15    transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21    fmt::Display,
22    future::Future,
23    io,
24    ops::Range,
25    path::PathBuf,
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29    },
30};
31use thiserror::Error;
32use tokio::{
33    sync::broadcast::{self, error::TryRecvError},
34    time::timeout,
35};
36
37use crate::{
38    LOG_MODULE, SharedError,
39    epochs::{
40        FetchEpochStreamOptions, epoch_to_slot_range, fetch_epoch_stream,
41        fetch_epoch_stream_with_options, slot_to_epoch,
42    },
43    index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
44    node_reader::NodeReader,
45    utils,
46};
47
48// Timeout applied to each asynchronous firehose operation (fetching epoch stream, reading
49// header, seeking, reading next block). Adjust here to tune stall detection/restart
50// aggressiveness.
51const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
52const OP_TIMEOUT_SEQUENTIAL: std::time::Duration = std::time::Duration::from_secs(180);
53// Epochs earlier than this were bincode-encoded in Old Faithful.
54const BINCODE_EPOCH_CUTOFF: u64 = 157;
55
56fn poll_shutdown(
57    flag: &Arc<std::sync::atomic::AtomicBool>,
58    receiver: &mut Option<broadcast::Receiver<()>>,
59) -> bool {
60    if let Some(rx) = receiver {
61        match rx.try_recv() {
62            Ok(_) | Err(TryRecvError::Lagged(_)) => {
63                flag.store(true, Ordering::SeqCst);
64            }
65            Err(TryRecvError::Closed) => {
66                flag.store(true, Ordering::SeqCst);
67            }
68            Err(TryRecvError::Empty) => {}
69        }
70    }
71    flag.load(Ordering::SeqCst)
72}
73
74fn is_shutdown_error(err: &FirehoseError) -> bool {
75    fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
76        inner
77            .downcast_ref::<io::Error>()
78            .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
79            .unwrap_or(false)
80    }
81
82    match err {
83        FirehoseError::BlockHandlerError(inner)
84        | FirehoseError::TransactionHandlerError(inner)
85        | FirehoseError::EntryHandlerError(inner)
86        | FirehoseError::RewardHandlerError(inner)
87        | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
88        _ => false,
89    }
90}
91
92async fn find_previous_indexed_slot(
93    local_start: u64,
94    epoch_start: u64,
95    log_target: &str,
96) -> Result<Option<u64>, FirehoseError> {
97    if local_start <= epoch_start {
98        return Ok(None);
99    }
100    let mut candidate = local_start.saturating_sub(1);
101    let mut skipped = 0u64;
102    loop {
103        match slot_to_offset(candidate).await {
104            Ok(_) => {
105                if skipped > 0 {
106                    log::info!(
107                        target: log_target,
108                        "slot {} missing in index; seeking back {} slots to {}",
109                        local_start.saturating_sub(1),
110                        skipped,
111                        candidate
112                    );
113                }
114                return Ok(Some(candidate));
115            }
116            Err(SlotOffsetIndexError::SlotNotFound(..)) => {
117                if candidate <= epoch_start {
118                    break;
119                }
120                skipped += 1;
121                candidate = candidate.saturating_sub(1);
122            }
123            Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
124        }
125    }
126    log::warn!(
127        target: log_target,
128        "no indexed slot found before {} (epoch start {}); reading from epoch start",
129        local_start,
130        epoch_start
131    );
132    Ok(None)
133}
134
135/// Errors that can occur while streaming the firehose. Errors that can occur while streaming
136/// the firehose.
137#[derive(Debug, Error)]
138pub enum FirehoseError {
139    /// HTTP client error surfaced from `reqwest`.
140    Reqwest(reqwest::Error),
141    /// Failure while reading the Old Faithful CAR header.
142    ReadHeader(SharedError),
143    /// Error emitted by the Solana Geyser plugin service.
144    GeyserPluginService(GeyserPluginServiceError),
145    /// Transaction notifier could not be acquired from the Geyser service.
146    FailedToGetTransactionNotifier,
147    /// Failure while reading data until the next block boundary.
148    ReadUntilBlockError(SharedError),
149    /// Failure while fetching an individual block.
150    GetBlockError(SharedError),
151    /// Failed to decode a node at the given index.
152    NodeDecodingError(usize, SharedError),
153    /// Error surfaced when querying the slot offset index.
154    SlotOffsetIndexError(SlotOffsetIndexError),
155    /// Failure while seeking to a slot within the Old Faithful CAR stream.
156    SeekToSlotError(SharedError),
157    /// Error surfaced during the plugin `on_load` stage.
158    OnLoadError(SharedError),
159    /// Error emitted while invoking the stats handler.
160    OnStatsHandlerError(SharedError),
161    /// Timeout reached while waiting for a firehose operation.
162    OperationTimeout(&'static str),
163    /// Transaction handler returned an error.
164    TransactionHandlerError(SharedError),
165    /// Entry handler returned an error.
166    EntryHandlerError(SharedError),
167    /// Reward handler returned an error.
168    RewardHandlerError(SharedError),
169    /// Block handler returned an error.
170    BlockHandlerError(SharedError),
171}
172
173unsafe impl Send for FirehoseError {}
174unsafe impl Sync for FirehoseError {}
175
176impl Display for FirehoseError {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        match self {
179            FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
180            FirehoseError::ReadHeader(error) => {
181                write!(f, "Error reading header: {}", error)
182            }
183            FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
184                f,
185                "Error initializing geyser plugin service: {}",
186                geyser_plugin_service_error
187            ),
188            FirehoseError::FailedToGetTransactionNotifier => write!(
189                f,
190                "Failed to get transaction notifier from GeyserPluginService"
191            ),
192            FirehoseError::ReadUntilBlockError(error) => {
193                write!(f, "Error reading until block: {}", error)
194            }
195            FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
196            FirehoseError::NodeDecodingError(item_index, error) => {
197                write!(
198                    f,
199                    "Error seeking, reading data from, or decoding data for data node {}: {}",
200                    item_index, error
201                )
202            }
203            FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
204                f,
205                "Error getting info from slot offset index: {}",
206                slot_offset_index_error
207            ),
208            FirehoseError::SeekToSlotError(error) => {
209                write!(f, "Error seeking to slot: {}", error)
210            }
211            FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
212            FirehoseError::OnStatsHandlerError(error) => {
213                write!(f, "Stats handler error: {}", error)
214            }
215            FirehoseError::OperationTimeout(op) => {
216                write!(f, "Timeout while waiting for operation: {}", op)
217            }
218            FirehoseError::TransactionHandlerError(error) => {
219                write!(f, "Transaction handler error: {}", error)
220            }
221            FirehoseError::EntryHandlerError(error) => {
222                write!(f, "Entry handler error: {}", error)
223            }
224            FirehoseError::RewardHandlerError(error) => {
225                write!(f, "Reward handler error: {}", error)
226            }
227            FirehoseError::BlockHandlerError(error) => {
228                write!(f, "Block handler error: {}", error)
229            }
230        }
231    }
232}
233
234impl From<reqwest::Error> for FirehoseError {
235    fn from(e: reqwest::Error) -> Self {
236        FirehoseError::Reqwest(e)
237    }
238}
239
240impl From<GeyserPluginServiceError> for FirehoseError {
241    fn from(e: GeyserPluginServiceError) -> Self {
242        FirehoseError::GeyserPluginService(e)
243    }
244}
245
246impl From<SlotOffsetIndexError> for FirehoseError {
247    fn from(e: SlotOffsetIndexError) -> Self {
248        FirehoseError::SlotOffsetIndexError(e)
249    }
250}
251
252/// Per-thread progress information emitted by the firehose runner.
253#[derive(Clone, PartialEq, Eq, Hash, Debug)]
254pub struct ThreadStats {
255    /// Identifier of the worker thread reporting the stats.
256    pub thread_id: usize,
257    /// Timestamp captured when the thread began processing.
258    pub start_time: std::time::Instant,
259    /// Timestamp captured when the thread finished, if finished.
260    pub finish_time: Option<std::time::Instant>,
261    /// Slot range currently assigned to the thread (half-open, may shrink on restart).
262    pub slot_range: Range<u64>,
263    /// Original slot range assigned to the thread (half-open, never modified).
264    pub initial_slot_range: Range<u64>,
265    /// Latest slot processed by the thread.
266    pub current_slot: u64,
267    /// Total slots processed by the thread.
268    pub slots_processed: u64,
269    /// Number of blocks successfully processed.
270    pub blocks_processed: u64,
271    /// Number of slots skipped by the cluster leader.
272    pub leader_skipped_slots: u64,
273    /// Total transactions processed.
274    pub transactions_processed: u64,
275    /// Total entries processed.
276    pub entries_processed: u64,
277    /// Total rewards processed.
278    pub rewards_processed: u64,
279}
280
281/// Aggregated firehose statistics covering all worker threads.
282#[derive(Clone, PartialEq, Eq, Hash, Debug)]
283pub struct Stats {
284    /// Per-thread statistics for the current update.
285    pub thread_stats: ThreadStats,
286    /// Timestamp captured when processing began.
287    pub start_time: std::time::Instant,
288    /// Timestamp captured when all processing finished, if finished.
289    pub finish_time: Option<std::time::Instant>,
290    /// Slot range currently being processed (half-open [start, end)).
291    pub slot_range: Range<u64>,
292    /// Aggregate slots processed across all threads.
293    pub slots_processed: u64,
294    /// Aggregate blocks processed across all threads.
295    pub blocks_processed: u64,
296    /// Aggregate skipped slots across all threads.
297    pub leader_skipped_slots: u64,
298    /// Aggregate transactions processed across all threads.
299    pub transactions_processed: u64,
300    /// Aggregate entries processed across all threads.
301    pub entries_processed: u64,
302    /// Aggregate rewards processed across all threads.
303    pub rewards_processed: u64,
304    /// Transactions processed since the previous stats pulse.
305    pub transactions_since_last_pulse: u64,
306    /// Blocks processed since the previous stats pulse.
307    pub blocks_since_last_pulse: u64,
308    /// Slots processed since the previous stats pulse.
309    pub slots_since_last_pulse: u64,
310    /// Elapsed time since the previous stats pulse.
311    pub time_since_last_pulse: std::time::Duration,
312}
313
314/// Configuration for periodic stats emission via a [`Handler`] callback.
315#[derive(Clone, PartialEq, Eq, Hash, Debug)]
316pub struct StatsTracking<OnStats: Handler<Stats>> {
317    /// Callback invoked whenever new stats are available.
318    pub on_stats: OnStats,
319    /// Emits a stats callback when the current slot is a multiple of this interval.
320    pub tracking_interval_slots: u64,
321}
322
323#[inline(always)]
324#[allow(clippy::too_many_arguments)]
325async fn maybe_emit_stats<OnStats: Handler<Stats>>(
326    stats_tracking: Option<&StatsTracking<OnStats>>,
327    thread_index: usize,
328    thread_stats: &ThreadStats,
329    overall_slots_processed: &AtomicU64,
330    overall_blocks_processed: &AtomicU64,
331    overall_transactions_processed: &AtomicU64,
332    overall_entries_processed: &AtomicU64,
333    transactions_since_stats: &AtomicU64,
334    blocks_since_stats: &AtomicU64,
335    slots_since_stats: &AtomicU64,
336    last_pulse: &Arc<AtomicU64>,
337    base_instant: std::time::Instant,
338) -> Result<(), (FirehoseError, u64)> {
339    if let Some(stats_tracker) = stats_tracking {
340        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
341        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
342        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
343        let total_entries = overall_entries_processed.load(Ordering::Relaxed);
344        let now_nanos = base_instant.elapsed().as_nanos() as u64;
345        let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
346        let delta_nanos = now_nanos.saturating_sub(previous);
347        let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
348        let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
349        let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
350        let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
351
352        let stats = Stats {
353            thread_stats: thread_stats.clone(),
354            start_time: thread_stats.start_time,
355            finish_time: thread_stats.finish_time,
356            slot_range: thread_stats.slot_range.clone(),
357            slots_processed: total_slots,
358            blocks_processed: total_blocks,
359            leader_skipped_slots: total_slots.saturating_sub(total_blocks),
360            transactions_processed: total_transactions,
361            entries_processed: total_entries,
362            rewards_processed: thread_stats.rewards_processed,
363            transactions_since_last_pulse: processed_transactions,
364            blocks_since_last_pulse: processed_blocks,
365            slots_since_last_pulse: processed_slots,
366            time_since_last_pulse,
367        };
368
369        if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
370            last_pulse.store(previous, Ordering::Relaxed);
371            transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
372            blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
373            slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
374            return Err((
375                FirehoseError::OnStatsHandlerError(e),
376                thread_stats.current_slot,
377            ));
378        }
379    }
380    Ok(())
381}
382
383#[inline(always)]
384fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
385    if tracking_enabled {
386        atomic.fetch_add(value, Ordering::Relaxed);
387    }
388}
389
390fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
391    map.get(&thread_id)
392        .map(|set| set.remove(&slot).is_some())
393        .unwrap_or(false)
394}
395
396fn decode_transaction_status_meta_from_frame(
397    slot: u64,
398    reassembled_metadata: Vec<u8>,
399) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
400    if reassembled_metadata.is_empty() {
401        // Early epochs often omit metadata entirely.
402        return Ok(solana_transaction_status::TransactionStatusMeta::default());
403    }
404
405    match utils::decompress_zstd(reassembled_metadata.as_slice()) {
406        Ok(decompressed) => {
407            decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
408                Box::new(std::io::Error::other(format!(
409                    "decode transaction metadata (slot {slot}): {err}"
410                ))) as SharedError
411            })
412        }
413        Err(decomp_err) => {
414            // If the frame was not zstd-compressed (common for very early data), try to
415            // decode the raw bytes directly before bailing.
416            decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
417                Box::new(std::io::Error::other(format!(
418                    "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
419                ))) as SharedError
420            })
421        }
422    }
423}
424
425#[derive(Debug, Default)]
426struct DecodedRewards {
427    keyed_rewards: Vec<(Address, RewardInfo)>,
428    num_partitions: Option<u64>,
429}
430
431impl DecodedRewards {
432    fn empty() -> Self {
433        Self {
434            keyed_rewards: Vec::new(),
435            num_partitions: None,
436        }
437    }
438}
439
440fn decode_rewards_from_frame(
441    slot: u64,
442    reassembled_rewards: Vec<u8>,
443) -> Result<DecodedRewards, SharedError> {
444    if reassembled_rewards.is_empty() {
445        // Early epochs sometimes omit rewards payloads entirely.
446        return Ok(DecodedRewards::empty());
447    }
448
449    match utils::decompress_zstd(reassembled_rewards.as_slice()) {
450        Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
451            |err| {
452                Box::new(std::io::Error::other(format!(
453                    "decode rewards (slot {slot}): {err}"
454                ))) as SharedError
455            },
456        ),
457        Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
458            |err| {
459                Box::new(std::io::Error::other(format!(
460                    "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
461                ))) as SharedError
462            },
463        ),
464    }
465}
466
467fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
468    let epoch = slot_to_epoch(slot);
469    let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
470        prost_011::Message::decode(bytes);
471    match proto_attempt {
472        Ok(proto) => {
473            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
474            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
475                Box::new(std::io::Error::other(format!(
476                    "convert rewards proto failed (epoch {epoch}): {err}"
477                ))) as SharedError
478            })?;
479            Ok(DecodedRewards {
480                keyed_rewards,
481                num_partitions,
482            })
483        }
484        Err(proto_err) => {
485            let stored: solana_storage_proto::StoredExtendedRewards =
486                bincode::deserialize(bytes).map_err(|bin_err| {
487                    Box::new(std::io::Error::other(format!(
488                        "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
489                    ))) as SharedError
490                })?;
491            let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
492            let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
493            let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
494                Box::new(std::io::Error::other(format!(
495                    "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
496                ))) as SharedError
497            })?;
498            Ok(DecodedRewards {
499                keyed_rewards,
500                num_partitions,
501            })
502        }
503    }
504}
505
506fn decode_transaction_status_meta(
507    slot: u64,
508    metadata_bytes: &[u8],
509) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
510    let epoch = slot_to_epoch(slot);
511    let mut bincode_err: Option<String> = None;
512    if epoch < BINCODE_EPOCH_CUTOFF {
513        match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
514            metadata_bytes,
515        ) {
516            Ok(stored) => return Ok(stored.into()),
517            Err(err) => {
518                bincode_err = Some(err.to_string());
519            }
520        }
521    }
522
523    let bin_err_for_proto = bincode_err.clone();
524    let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
525        prost_011::Message::decode(metadata_bytes).map_err(|err| {
526            // If we already tried bincode, surface both failures for easier debugging.
527            if let Some(ref bin_err) = bin_err_for_proto {
528                Box::new(std::io::Error::other(format!(
529                    "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
530                ))) as SharedError
531            } else {
532                Box::new(std::io::Error::other(format!(
533                    "protobuf decode transaction metadata: {err}"
534                ))) as SharedError
535            }
536        })?;
537
538    proto.try_into().map_err(|err| {
539        if let Some(ref bin_err) = bincode_err {
540            Box::new(std::io::Error::other(format!(
541                "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
542            ))) as SharedError
543        } else {
544            Box::new(std::io::Error::other(format!(
545                "convert transaction metadata proto: {err}"
546            ))) as SharedError
547        }
548    })
549}
550
551#[cfg(test)]
552mod metadata_decode_tests {
553    use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
554    use solana_message::v0::LoadedAddresses;
555    use solana_storage_proto::StoredTransactionStatusMeta;
556    use solana_transaction_status::TransactionStatusMeta;
557
558    fn sample_meta() -> TransactionStatusMeta {
559        let mut meta = TransactionStatusMeta::default();
560        meta.fee = 42;
561        meta.pre_balances = vec![1, 2];
562        meta.post_balances = vec![3, 4];
563        meta.log_messages = Some(vec!["hello".into()]);
564        meta.pre_token_balances = Some(Vec::new());
565        meta.post_token_balances = Some(Vec::new());
566        meta.rewards = Some(Vec::new());
567        meta.compute_units_consumed = Some(7);
568        meta.cost_units = Some(9);
569        meta.loaded_addresses = LoadedAddresses::default();
570        meta
571    }
572
573    #[test]
574    fn decodes_bincode_metadata_for_early_epochs() {
575        let stored = StoredTransactionStatusMeta {
576            status: Ok(()),
577            fee: 42,
578            pre_balances: vec![1, 2],
579            post_balances: vec![3, 4],
580            inner_instructions: None,
581            log_messages: Some(vec!["hello".into()]),
582            pre_token_balances: Some(Vec::new()),
583            post_token_balances: Some(Vec::new()),
584            rewards: Some(Vec::new()),
585            return_data: None,
586            compute_units_consumed: Some(7),
587            cost_units: Some(9),
588        };
589        let bytes = bincode::serialize(&stored).expect("bincode serialize");
590        let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
591        assert_eq!(decoded, TransactionStatusMeta::from(stored));
592    }
593
594    #[test]
595    fn decodes_protobuf_metadata_for_later_epochs() {
596        let meta = sample_meta();
597        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
598            meta.clone().into();
599        let bytes = prost_011::Message::encode_to_vec(&generated);
600        let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
601        assert_eq!(decoded, meta);
602    }
603
604    #[test]
605    fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
606        let meta = sample_meta();
607        let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
608            meta.clone().into();
609        let bytes = prost_011::Message::encode_to_vec(&generated);
610        // Epoch 100 should try bincode first; if those bytes are proto, we must fall back.
611        let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
612        assert_eq!(decoded, meta);
613    }
614
615    #[test]
616    fn empty_frame_decodes_to_default() {
617        let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
618        assert_eq!(decoded, TransactionStatusMeta::default());
619    }
620
621    #[test]
622    fn raw_bincode_frame_without_zstd_still_decodes() {
623        let stored = StoredTransactionStatusMeta {
624            status: Ok(()),
625            fee: 1,
626            pre_balances: vec![],
627            post_balances: vec![],
628            inner_instructions: None,
629            log_messages: None,
630            pre_token_balances: Some(Vec::new()),
631            post_token_balances: Some(Vec::new()),
632            rewards: Some(Vec::new()),
633            return_data: None,
634            compute_units_consumed: None,
635            cost_units: None,
636        };
637        let raw_bytes = bincode::serialize(&stored).expect("serialize");
638        let decoded =
639            decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
640        assert_eq!(decoded, TransactionStatusMeta::from(stored));
641    }
642}
643
644#[cfg(test)]
645mod rewards_decode_tests {
646    use super::decode_rewards_from_bytes;
647    use solana_sdk_ids::vote::id as vote_program_id;
648    use solana_storage_proto::StoredExtendedRewards;
649    use solana_transaction_status::{Reward, RewardType};
650
651    #[test]
652    fn decodes_protobuf_rewards() {
653        let pubkey = vote_program_id().to_string();
654        let proto = solana_storage_proto::convert::generated::Rewards {
655            rewards: vec![solana_storage_proto::convert::generated::Reward {
656                pubkey,
657                lamports: 5,
658                post_balance: 10,
659                reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
660                commission: "1".to_string(),
661            }],
662            num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
663                num_partitions: 2,
664            }),
665        };
666        let bytes = prost_011::Message::encode_to_vec(&proto);
667        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
668        assert_eq!(decoded.keyed_rewards.len(), 1);
669        assert_eq!(decoded.num_partitions, Some(2));
670    }
671
672    #[test]
673    fn decodes_bincode_rewards() {
674        let pubkey = vote_program_id().to_string();
675        let reward = Reward {
676            pubkey,
677            lamports: 7,
678            post_balance: 9,
679            reward_type: Some(RewardType::Rent),
680            commission: Some(3),
681        };
682        let stored_rewards: StoredExtendedRewards = vec![reward.into()];
683        let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
684        let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
685        assert_eq!(decoded.keyed_rewards.len(), 1);
686        assert_eq!(decoded.num_partitions, None);
687    }
688}
689
690/// Firehose transaction payload passed to [`Handler`] callbacks.
691#[derive(Debug, Clone)]
692pub struct TransactionData {
693    /// Slot that contains the transaction.
694    pub slot: u64,
695    /// Index of the transaction within the slot.
696    pub transaction_slot_index: usize,
697    /// Transaction signature.
698    pub signature: solana_signature::Signature,
699    /// Hash of the transaction message.
700    pub message_hash: Hash,
701    /// Indicates whether the transaction is a vote.
702    pub is_vote: bool,
703    /// Status metadata returned by the Solana runtime.
704    pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
705    /// Fully decoded transaction.
706    pub transaction: VersionedTransaction,
707}
708
709/// Block entry metadata passed to [`Handler`] callbacks.
710#[derive(Debug, Clone)]
711pub struct EntryData {
712    /// Slot that generated the entry.
713    pub slot: u64,
714    /// Index of the entry within the slot.
715    pub entry_index: usize,
716    /// Range of transaction indexes covered by the entry.
717    pub transaction_indexes: Range<usize>,
718    /// Number of hashes associated with the entry.
719    pub num_hashes: u64,
720    /// Entry hash.
721    pub hash: Hash,
722}
723
724/// Reward data conveyed to reward [`Handler`] callbacks.
725#[derive(Debug, Clone)]
726pub struct RewardsData {
727    /// Slot the rewards correspond to.
728    pub slot: u64,
729    /// Reward recipients and their associated reward information.
730    pub rewards: Vec<(Address, RewardInfo)>,
731}
732
733/// Block-level data streamed to block handlers.
734#[derive(Debug)]
735pub enum BlockData {
736    /// Fully populated block payload with ledger metadata.
737    Block {
738        /// Parent slot number.
739        parent_slot: u64,
740        /// Parent block hash.
741        parent_blockhash: Hash,
742        /// Current block slot.
743        slot: u64,
744        /// Current block hash.
745        blockhash: Hash,
746        /// Rewards keyed by account and partition information.
747        rewards: KeyedRewardsAndNumPartitions,
748        /// Optional Unix timestamp for the block.
749        block_time: Option<i64>,
750        /// Optional ledger block height.
751        block_height: Option<u64>,
752        /// Number of executed transactions in the block.
753        executed_transaction_count: u64,
754        /// Number of entries contained in the block.
755        entry_count: u64,
756    },
757    /// Marker indicating the slot appears skipped (either truly skipped or it is late and will
758    /// arrive out of order).
759    PossibleLeaderSkipped {
760        /// Slot number that either lacked a block or may still arrive later.
761        slot: u64,
762    },
763}
764
765impl BlockData {
766    /// Returns the slot associated with this block or skipped slot.
767    #[inline(always)]
768    pub const fn slot(&self) -> u64 {
769        match self {
770            BlockData::Block { slot, .. } => *slot,
771            BlockData::PossibleLeaderSkipped { slot } => *slot,
772        }
773    }
774
775    /// Returns `true` if this record currently represents a missing/possibly skipped slot.
776    #[inline(always)]
777    pub const fn was_skipped(&self) -> bool {
778        matches!(self, BlockData::PossibleLeaderSkipped { .. })
779    }
780
781    /// Returns the optional block time when available.
782    #[inline(always)]
783    pub const fn block_time(&self) -> Option<i64> {
784        match self {
785            BlockData::Block { block_time, .. } => *block_time,
786            BlockData::PossibleLeaderSkipped { .. } => None,
787        }
788    }
789}
790
791type HandlerResult = Result<(), SharedError>;
792type HandlerFuture = BoxFuture<'static, HandlerResult>;
793
794/// Asynchronous callback invoked for each firehose event of type `Data`.
795pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
796
797impl<Data, F> Handler<Data> for F where
798    F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
799{
800}
801
802/// Function pointer alias for [`Handler`] callbacks.
803pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
804/// Convenience alias for block handlers accepted by [`firehose`].
805pub type OnBlockFn = HandlerFn<BlockData>;
806/// Convenience alias for transaction handlers accepted by [`firehose`].
807pub type OnTxFn = HandlerFn<TransactionData>;
808/// Convenience alias for entry handlers accepted by [`firehose`].
809pub type OnEntryFn = HandlerFn<EntryData>;
810/// Convenience alias for reward handlers accepted by [`firehose`].
811pub type OnRewardFn = HandlerFn<RewardsData>;
812/// Type alias for [`StatsTracking`] using simple function pointers.
813pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
814/// Convenience alias for firehose error handlers.
815pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
816/// Convenience alias for stats tracking handlers accepted by [`firehose`].
817pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
818
819/// Metadata describing a firehose worker failure.
820#[derive(Clone, Debug)]
821pub struct FirehoseErrorContext {
822    /// Thread index that encountered the error.
823    pub thread_id: usize,
824    /// Slot the worker was processing when the error surfaced.
825    pub slot: u64,
826    /// Epoch derived from the failing slot.
827    pub epoch: u64,
828    /// Stringified error payload for display/logging.
829    pub error_message: String,
830}
831
832/// Streams blocks, transactions, entries, rewards, and stats to user-provided handlers.
833///
834/// The requested `slot_range` is half-open `[start, end)`; on recoverable errors the
835/// runner restarts from the last processed slot to maintain coverage.
836///
837/// When `sequential` is `true`, the firehose uses one worker thread and opens epoch streams
838/// with ripget's parallel windowed downloader. In this mode `threads` configures ripget range
839/// concurrency rather than firehose worker partitioning.
840///
841/// `buffer_window_bytes` controls the ripget hot/cold window when `sequential` is enabled.
842/// Pass `None` to use the default (`min(4 GiB, 15% of available RAM)`).
843#[inline]
844#[allow(clippy::too_many_arguments)]
845pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
846    threads: u64,
847    sequential: bool,
848    buffer_window_bytes: Option<u64>,
849    slot_range: Range<u64>,
850    on_block: Option<OnBlock>,
851    on_tx: Option<OnTransaction>,
852    on_entry: Option<OnEntry>,
853    on_rewards: Option<OnRewards>,
854    on_error: Option<OnError>,
855    stats_tracking: Option<StatsTracking<OnStats>>,
856    shutdown_signal: Option<broadcast::Receiver<()>>,
857) -> Result<(), (FirehoseError, u64)>
858where
859    OnBlock: Handler<BlockData>,
860    OnTransaction: Handler<TransactionData>,
861    OnEntry: Handler<EntryData>,
862    OnRewards: Handler<RewardsData>,
863    OnStats: Handler<Stats>,
864    OnError: Handler<FirehoseErrorContext>,
865{
866    if threads == 0 {
867        return Err((
868            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
869            slot_range.start,
870        ));
871    }
872    let client = crate::network::create_http_client();
873    log::info!(target: LOG_MODULE, "starting firehose...");
874    log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
875    let firehose_threads = if sequential { 1 } else { threads };
876    let sequential_download_threads = std::cmp::max(1, threads as usize);
877    let sequential_buffer_window_bytes = buffer_window_bytes
878        .filter(|value| *value >= 2)
879        .unwrap_or_else(crate::system::default_firehose_buffer_window_bytes);
880    if sequential {
881        log::info!(
882            target: LOG_MODULE,
883            "sequential mode enabled: firehose_threads=1, ripget_threads={}, ripget_window={}",
884            sequential_download_threads,
885            crate::system::format_byte_size(sequential_buffer_window_bytes)
886        );
887    }
888
889    let slot_range = Arc::new(slot_range);
890
891    // divide slot_range into n subranges
892    let subranges = generate_subranges(&slot_range, firehose_threads);
893    if firehose_threads > 1 {
894        log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
895    }
896
897    let firehose_start = std::time::Instant::now();
898    let shutdown_flag = Arc::new(AtomicBool::new(false));
899    if let Some(ref rx) = shutdown_signal {
900        let mut rx = rx.resubscribe();
901        let flag = shutdown_flag.clone();
902        tokio::spawn(async move {
903            if rx.recv().await.is_ok() {
904                log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
905                flag.store(true, Ordering::SeqCst);
906            }
907        });
908    }
909
910    // Build a shared ripget HTTP client so TCP connections survive across epoch transitions.
911    let shared_ripget_client: Option<ripget::Client> = if sequential {
912        Some(
913            ripget::build_client(Some(&format!(
914                "jetstreamer-firehose/{}",
915                env!("CARGO_PKG_VERSION")
916            )))
917            .expect("failed to build ripget HTTP client"),
918        )
919    } else {
920        None
921    };
922
923    let mut handles = Vec::new();
924    // Shared per-thread error counters
925    let error_counts: Arc<Vec<AtomicU32>> =
926        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
927
928    let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
929    let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
930    let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
931    let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
932    let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
933
934    for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
935        let error_counts = error_counts.clone();
936        let client = client.clone();
937        let on_block = on_block.clone();
938        let on_tx = on_tx.clone();
939        let on_entry = on_entry.clone();
940        let on_reward = on_rewards.clone();
941        let on_error = on_error.clone();
942        let overall_slots_processed = overall_slots_processed.clone();
943        let overall_blocks_processed = overall_blocks_processed.clone();
944        let overall_transactions_processed = overall_transactions_processed.clone();
945        let overall_entries_processed = overall_entries_processed.clone();
946        let stats_tracking = stats_tracking.clone();
947        let transactions_since_stats = Arc::new(AtomicU64::new(0));
948        let blocks_since_stats = Arc::new(AtomicU64::new(0));
949        let slots_since_stats = Arc::new(AtomicU64::new(0));
950        let last_pulse = Arc::new(AtomicU64::new(0));
951        let transactions_since_stats_cloned = transactions_since_stats.clone();
952        let blocks_since_stats_cloned = blocks_since_stats.clone();
953        let slots_since_stats_cloned = slots_since_stats.clone();
954        let last_pulse_cloned = last_pulse.clone();
955        let shutdown_flag = shutdown_flag.clone();
956        let pending_skipped_slots = pending_skipped_slots.clone();
957        let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
958        let sequential_mode = sequential;
959        let ripget_threads = sequential_download_threads;
960        let ripget_buffer_window_bytes = sequential_buffer_window_bytes;
961        let ripget_client = shared_ripget_client.clone();
962
963        let handle = tokio::spawn(async move {
964            let transactions_since_stats = transactions_since_stats_cloned;
965            let blocks_since_stats = blocks_since_stats_cloned;
966            let slots_since_stats = slots_since_stats_cloned;
967            let last_pulse = last_pulse_cloned;
968            let mut shutdown_rx = thread_shutdown_rx;
969            let start_time = firehose_start;
970            last_pulse.store(
971                firehose_start.elapsed().as_nanos() as u64,
972                Ordering::Relaxed,
973            );
974            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
975            let mut skip_until_index = None;
976            let last_emitted_slot = slot_range.start.saturating_sub(1);
977            let block_enabled = on_block.is_some();
978            let tx_enabled = on_tx.is_some();
979            let entry_enabled = on_entry.is_some();
980            let reward_enabled = on_reward.is_some();
981            let tracking_enabled = stats_tracking.is_some();
982            if block_enabled {
983                pending_skipped_slots.entry(thread_index).or_default();
984            }
985            let mut last_counted_slot = slot_range.start.saturating_sub(1);
986            let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
987            let mut thread_stats = if tracking_enabled {
988                Some(ThreadStats {
989                    thread_id: thread_index,
990                    start_time,
991                    finish_time: None,
992                    slot_range: slot_range.clone(),
993                    initial_slot_range: slot_range.clone(),
994                    current_slot: slot_range.start,
995                    slots_processed: 0,
996                    blocks_processed: 0,
997                    leader_skipped_slots: 0,
998                    transactions_processed: 0,
999                    entries_processed: 0,
1000                    rewards_processed: 0,
1001                })
1002            } else {
1003                None
1004            };
1005
1006            // let mut triggered = false;
1007            while let Err((err, slot)) = async {
1008                let mut last_emitted_slot = last_emitted_slot_global;
1009                let op_timeout = if sequential_mode {
1010                    OP_TIMEOUT_SEQUENTIAL
1011                } else {
1012                    OP_TIMEOUT
1013                };
1014                if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1015                    log::info!(
1016                        target: &log_target,
1017                        "shutdown requested; terminating firehose thread {}",
1018                        thread_index
1019                    );
1020                    return Ok(());
1021                }
1022                let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1023                log::info!(
1024                    target: &log_target,
1025                    "slot range: {} (epoch {}) ... {} (epoch {})",
1026                    slot_range.start,
1027                    slot_to_epoch(slot_range.start),
1028                    slot_range.end,
1029                    slot_to_epoch(slot_range.end)
1030                );
1031
1032                log::info!(target: &log_target, "🚒 starting firehose...");
1033
1034                // for each epoch
1035                let mut current_slot: Option<u64> = None;
1036                for epoch_num in epoch_range.clone() {
1037                    if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1038                        log::info!(
1039                            target: &log_target,
1040                            "shutdown requested; terminating firehose thread {}",
1041                            thread_index
1042                        );
1043                        return Ok(());
1044                    }
1045                    log::info!(target: &log_target, "entering epoch {}", epoch_num);
1046                    let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1047                    let local_start = std::cmp::max(slot_range.start, epoch_start);
1048                    let local_end_inclusive =
1049                        std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1050                    if local_start > local_end_inclusive {
1051                        log::debug!(
1052                            target: &log_target,
1053                            "epoch {} has no overlap with thread range ({}..{}), skipping",
1054                            epoch_num,
1055                            slot_range.start,
1056                            slot_range.end
1057                        );
1058                        continue;
1059                    }
1060                    let use_sequential_stream = sequential_mode && local_start == epoch_start;
1061                    let stream = match timeout(op_timeout, async {
1062                        if use_sequential_stream {
1063                            fetch_epoch_stream_with_options(
1064                                epoch_num,
1065                                &client,
1066                                Some(FetchEpochStreamOptions {
1067                                    sequential: true,
1068                                    ripget_threads,
1069                                    buffer_window_bytes: ripget_buffer_window_bytes,
1070                                    ripget_client: ripget_client.clone(),
1071                                }),
1072                            )
1073                            .await
1074                        } else {
1075                            fetch_epoch_stream(epoch_num, &client).await
1076                        }
1077                    })
1078                    .await
1079                    {
1080                        Ok(stream) => stream,
1081                        Err(_) => {
1082                            return Err((
1083                                FirehoseError::OperationTimeout("fetch_epoch_stream"),
1084                                current_slot.unwrap_or(slot_range.start),
1085                            ));
1086                        }
1087                    };
1088                    let mut reader = NodeReader::new(stream);
1089
1090                    let header_fut = reader.read_raw_header();
1091                    let header = match timeout(op_timeout, header_fut).await {
1092                        Ok(res) => res
1093                            .map_err(FirehoseError::ReadHeader)
1094                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1095                        Err(_) => {
1096                            return Err((
1097                                FirehoseError::OperationTimeout("read_raw_header"),
1098                                current_slot.unwrap_or(slot_range.start),
1099                            ));
1100                        }
1101                    };
1102                    log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1103
1104                    let mut previous_blockhash = Hash::default();
1105                    let mut latest_entry_blockhash = Hash::default();
1106                    // Reset counters to align to the local epoch slice; prevents boundary slots
1107                    // from being treated as already-counted after a restart.
1108                    last_counted_slot = local_start.saturating_sub(1);
1109                    current_slot = None;
1110                    if tracking_enabled
1111                        && let Some(ref mut stats) = thread_stats {
1112                            stats.current_slot = local_start;
1113                            stats.slot_range.start = local_start;
1114                        }
1115
1116                    if local_start > epoch_start {
1117                        // Seek to the nearest previous indexed slot so the stream includes all
1118                        // nodes (transactions, entries, rewards) that precede `local_start`.
1119                        let seek_slot = match timeout(
1120                            OP_TIMEOUT,
1121                            find_previous_indexed_slot(local_start, epoch_start, &log_target),
1122                        )
1123                        .await
1124                        {
1125                            Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1126                            Err(_) => {
1127                                return Err((
1128                                    FirehoseError::OperationTimeout(
1129                                        "seek_to_previous_indexed_slot",
1130                                    ),
1131                                    current_slot.unwrap_or(slot_range.start),
1132                                ));
1133                            }
1134                        };
1135                        if let Some(seek_slot) = seek_slot {
1136                            let seek_fut = reader.seek_to_slot(seek_slot);
1137                            match timeout(op_timeout, seek_fut).await {
1138                                Ok(res) => {
1139                                    res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1140                                }
1141                                Err(_) => {
1142                                    return Err((
1143                                        FirehoseError::OperationTimeout("seek_to_slot"),
1144                                        current_slot.unwrap_or(slot_range.start),
1145                                    ));
1146                                }
1147                            }
1148                        }
1149                    }
1150
1151                    // for each item in each block
1152                    let mut item_index = 0;
1153                    let mut displayed_skip_message = false;
1154                    loop {
1155                        if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1156                            log::info!(
1157                                target: &log_target,
1158                                "shutdown requested; terminating firehose thread {}",
1159                                thread_index
1160                            );
1161                            return Ok(());
1162                        }
1163                        let read_fut = reader.read_until_block();
1164                        let nodes = match timeout(op_timeout, read_fut).await {
1165                            Ok(result) => result
1166                                .map_err(FirehoseError::ReadUntilBlockError)
1167                                .map_err(|e| {
1168                                    (
1169                                        e,
1170                                        current_slot
1171                                            .map(|slot| slot.saturating_add(1))
1172                                            .unwrap_or(slot_range.start),
1173                                    )
1174                                })?,
1175                            Err(_) => {
1176                                log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1177                                return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1178                            }
1179                        };
1180                        if nodes.is_empty() {
1181                            log::info!(
1182                                target: &log_target,
1183                                "reached end of epoch {}",
1184                                epoch_num
1185                            );
1186                            break;
1187                        }
1188                        if let Some(last_node) = nodes.0.last()
1189                            && !last_node.get_node().is_block()
1190                        {
1191                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1192                            break;
1193                        }
1194                        let block = nodes
1195                            .get_block()
1196                            .map_err(FirehoseError::GetBlockError)
1197                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1198                        log::debug!(
1199                            target: &log_target,
1200                            "read {} items from epoch {}, now at slot {}",
1201                            item_index,
1202                            epoch_num,
1203                            block.slot
1204                        );
1205                        let slot = block.slot;
1206                        if slot > local_end_inclusive {
1207                            log::debug!(
1208                                target: &log_target,
1209                                "reached end of local slice at slot {} (epoch {}), stopping",
1210                                slot,
1211                                epoch_num
1212                            );
1213                            break;
1214                        }
1215                        if slot >= slot_range.end {
1216                            log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1217                            // Return early to terminate the firehose thread cleanly. We use >=
1218                            // because slot_range is half-open [start, end), so any slot equal
1219                            // to end is out-of-range and must not be processed. Do not emit
1220                            // synthetic skipped slots here; another thread may own the boundary.
1221                            if block_enabled {
1222                                pending_skipped_slots.remove(&thread_index);
1223                            }
1224                            return Ok(());
1225                        }
1226                        debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1227                        if slot < slot_range.start {
1228                            if slot.saturating_add(1) == slot_range.start {
1229                                log::debug!(
1230                                    target: &log_target,
1231                                    "priming reader with preceding slot {}, skipping",
1232                                    slot
1233                                );
1234                            } else {
1235                                log::warn!(
1236                                    target: &log_target,
1237                                    "encountered slot {} before start of range {}, skipping",
1238                                    slot,
1239                                    slot_range.start
1240                                );
1241                            }
1242                            continue;
1243                        }
1244                        current_slot = Some(slot);
1245                        let mut entry_index: usize = 0;
1246                        let mut this_block_executed_transaction_count: u64 = 0;
1247                        let mut this_block_entry_count: u64 = 0;
1248                        let mut this_block_rewards = DecodedRewards::empty();
1249
1250                        for node_with_cid in &nodes.0 {
1251                            item_index += 1;
1252                            if let Some(skip) = skip_until_index {
1253                                if item_index < skip {
1254                                    if !displayed_skip_message {
1255                                        log::info!(
1256                                            target: &log_target,
1257                                            "skipping until index {} (at {})",
1258                                            skip,
1259                                            item_index
1260                                        );
1261                                        displayed_skip_message = true;
1262                                    }
1263                                    continue;
1264                                } else {
1265                                    log::info!(
1266                                        target: &log_target,
1267                                        "reached target index {}, resuming...",
1268                                        skip
1269                                    );
1270                                    skip_until_index = None;
1271                                }
1272                            }
1273                            let node = node_with_cid.get_node();
1274
1275                            if let Some(ref mut stats) = thread_stats {
1276                                stats.current_slot = slot;
1277                            }
1278
1279                            let error_slot = current_slot.unwrap_or(slot_range.start);
1280
1281                            use crate::node::Node::*;
1282                            match node {
1283                                Transaction(tx) => {
1284                                    if tx_enabled
1285                                        && let Some(on_tx_cb) = on_tx.as_ref()
1286                                    {
1287                                        let error_slot = current_slot.unwrap_or(slot_range.start);
1288                                        let versioned_tx = tx.as_parsed().map_err(|err| {
1289                                            (
1290                                                FirehoseError::NodeDecodingError(item_index, err),
1291                                                error_slot,
1292                                            )
1293                                        })?;
1294                                        let reassembled_metadata = nodes
1295                                            .reassemble_dataframes(&tx.metadata)
1296                                            .map_err(|err| {
1297                                                (
1298                                                    FirehoseError::NodeDecodingError(item_index, err),
1299                                                    error_slot,
1300                                                )
1301                                            })?;
1302
1303                                        let as_native_metadata = decode_transaction_status_meta_from_frame(
1304                                            block.slot,
1305                                            reassembled_metadata,
1306                                        )
1307                                        .map_err(|err| {
1308                                            (
1309                                                FirehoseError::NodeDecodingError(item_index, err),
1310                                                error_slot,
1311                                            )
1312                                        })?;
1313
1314                                        let message_hash = {
1315                                            #[cfg(feature = "verify-transaction-signatures")]
1316                                            {
1317                                                versioned_tx.verify_and_hash_message().map_err(|err| {
1318                                                    (
1319                                                        FirehoseError::TransactionHandlerError(Box::new(err)),
1320                                                        error_slot,
1321                                                    )
1322                                                })?
1323                                            }
1324                                            #[cfg(not(feature = "verify-transaction-signatures"))]
1325                                            {
1326                                                versioned_tx.message.hash()
1327                                            }
1328                                        };
1329                                        let signature = versioned_tx
1330                                            .signatures
1331                                            .first()
1332                                            .ok_or_else(|| {
1333                                                Box::new(std::io::Error::new(
1334                                                    std::io::ErrorKind::InvalidData,
1335                                                    "transaction missing signature",
1336                                                )) as SharedError
1337                                            })
1338                                            .map_err(|err| {
1339                                                (
1340                                                    FirehoseError::NodeDecodingError(
1341                                                        item_index,
1342                                                        err,
1343                                                    ),
1344                                                    error_slot,
1345                                                )
1346                                            })?;
1347                                        let is_vote = is_simple_vote_transaction(&versioned_tx);
1348
1349                                        on_tx_cb(
1350                                            thread_index,
1351                                            TransactionData {
1352                                                slot: block.slot,
1353                                                transaction_slot_index: tx.index.unwrap() as usize,
1354                                                signature: *signature,
1355                                                message_hash,
1356                                                is_vote,
1357                                                transaction_status_meta: as_native_metadata,
1358                                                transaction: versioned_tx,
1359                                            },
1360                                        )
1361                                        .await
1362                                        .map_err(|e| {
1363                                            (
1364                                                FirehoseError::TransactionHandlerError(e),
1365                                                error_slot,
1366                                            )
1367                                        })?;
1368                                    }
1369                                    fetch_add_if(
1370                                        tracking_enabled,
1371                                        &overall_transactions_processed,
1372                                        1,
1373                                    );
1374                                    if let Some(ref mut stats) = thread_stats {
1375                                        stats.transactions_processed += 1;
1376                                    }
1377                                    transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1378                                }
1379                                Entry(entry) => {
1380                                    let entry_hash = Hash::from(entry.hash.to_bytes());
1381                                    let entry_transaction_count = entry.transactions.len();
1382                                    let entry_transaction_count_u64 = entry_transaction_count as u64;
1383                                    let starting_transaction_index_u64 =
1384                                        this_block_executed_transaction_count;
1385                                    latest_entry_blockhash = entry_hash;
1386                                    this_block_executed_transaction_count += entry_transaction_count_u64;
1387                                    this_block_entry_count += 1;
1388
1389                                    if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1390                                        let starting_transaction_index = usize::try_from(
1391                                            starting_transaction_index_u64,
1392                                        )
1393                                        .map_err(|err| {
1394                                            (
1395                                                FirehoseError::EntryHandlerError(Box::new(err)),
1396                                                error_slot,
1397                                            )
1398                                        })?;
1399                                        let transaction_indexes_end =
1400                                            starting_transaction_index + entry_transaction_count;
1401                                        on_entry_cb(
1402                                            thread_index,
1403                                            EntryData {
1404                                                slot: block.slot,
1405                                                entry_index,
1406                                                transaction_indexes: starting_transaction_index
1407                                                    ..transaction_indexes_end,
1408                                                num_hashes: entry.num_hashes,
1409                                                hash: entry_hash,
1410                                            },
1411                                        )
1412                                        .await
1413                                        .map_err(|e| {
1414                                            (
1415                                                FirehoseError::EntryHandlerError(e),
1416                                                error_slot,
1417                                            )
1418                                        })?;
1419                                    }
1420                                    entry_index += 1;
1421                                    fetch_add_if(
1422                                        tracking_enabled,
1423                                        &overall_entries_processed,
1424                                        1,
1425                                    );
1426                                    if let Some(ref mut stats) = thread_stats {
1427                                        stats.entries_processed += 1;
1428                                    }
1429                                }
1430                                Block(block) => {
1431                                    let prev_last_counted_slot = last_counted_slot;
1432                                    let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1433                                        (
1434                                            stats.slots_processed,
1435                                            stats.blocks_processed,
1436                                            stats.leader_skipped_slots,
1437                                            stats.current_slot,
1438                                        )
1439                                    });
1440
1441                                    let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1442                                    let skip_start_from_previous = last_counted_slot.saturating_add(1);
1443                                    let skip_start = skip_start_from_previous.max(next_expected_slot);
1444
1445                                    let skipped_epoch = slot_to_epoch(last_counted_slot);
1446                                    for skipped_slot in skip_start..slot {
1447                                        if slot_to_epoch(skipped_slot) != skipped_epoch {
1448                                            break;
1449                                        }
1450                                        log::debug!(
1451                                            target: &log_target,
1452                                            "leader skipped slot {} (prev_counted {}, current slot {})",
1453                                            skipped_slot,
1454                                            prev_last_counted_slot,
1455                                            slot,
1456                                        );
1457                                        if block_enabled {
1458                                            pending_skipped_slots
1459                                                .entry(thread_index)
1460                                                .or_default()
1461                                                .insert(skipped_slot);
1462                                        }
1463                                        if block_enabled
1464                                            && let Some(on_block_cb) = on_block.as_ref()
1465                                            && skipped_slot > last_emitted_slot {
1466                                                last_emitted_slot = skipped_slot;
1467                                                on_block_cb(
1468                                                    thread_index,
1469                                                    BlockData::PossibleLeaderSkipped {
1470                                                        slot: skipped_slot,
1471                                                    },
1472                                                )
1473                                                .await
1474                                                .map_err(|e| {
1475                                                    (
1476                                                        FirehoseError::BlockHandlerError(e),
1477                                                        error_slot,
1478                                                    )
1479                                                })?;
1480                                            }
1481                                        if tracking_enabled {
1482                                            overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1483                                            slots_since_stats.fetch_add(1, Ordering::Relaxed);
1484                                            if let Some(ref mut stats) = thread_stats {
1485                                                stats.leader_skipped_slots += 1;
1486                                                stats.slots_processed += 1;
1487                                                stats.current_slot = skipped_slot;
1488                                            }
1489                                        }
1490                                        last_counted_slot = skipped_slot;
1491                                    }
1492
1493                                    let cleared_pending_skip = if block_enabled {
1494                                        clear_pending_skip(
1495                                            &pending_skipped_slots,
1496                                            thread_index,
1497                                            slot,
1498                                        )
1499                                    } else {
1500                                        false
1501                                    };
1502
1503                                    if slot <= last_counted_slot && !cleared_pending_skip {
1504                                        log::debug!(
1505                                            target: &log_target,
1506                                            "duplicate block {}, already counted (last_counted={})",
1507                                            slot,
1508                                            last_counted_slot,
1509                                        );
1510                                        this_block_rewards = DecodedRewards::empty();
1511                                        continue;
1512                                    }
1513
1514                                    if block_enabled {
1515                                        if let Some(on_block_cb) = on_block.as_ref() {
1516                                            let DecodedRewards {
1517                                                keyed_rewards,
1518                                                num_partitions,
1519                                            } = std::mem::take(&mut this_block_rewards);
1520                                            if slot > last_emitted_slot {
1521                                                last_emitted_slot = slot;
1522                                                on_block_cb(
1523                                                    thread_index,
1524                                                    BlockData::Block {
1525                                                        parent_slot: block.meta.parent_slot,
1526                                                        parent_blockhash: previous_blockhash,
1527                                                        slot: block.slot,
1528                                                        blockhash: latest_entry_blockhash,
1529                                                        rewards: KeyedRewardsAndNumPartitions {
1530                                                            keyed_rewards,
1531                                                            num_partitions,
1532                                                        },
1533                                                        block_time: Some(block.meta.blocktime as i64),
1534                                                        block_height: block.meta.block_height,
1535                                                        executed_transaction_count:
1536                                                            this_block_executed_transaction_count,
1537                                                        entry_count: this_block_entry_count,
1538                                                    },
1539                                                )
1540                                                .await
1541                                                .map_err(|e| {
1542                                                    (
1543                                                        FirehoseError::BlockHandlerError(e),
1544                                                        error_slot,
1545                                                    )
1546                                                })?;
1547                                            }
1548                                        }
1549                                    } else {
1550                                        this_block_rewards = DecodedRewards::empty();
1551                                    }
1552                                    previous_blockhash = latest_entry_blockhash;
1553
1554                                    if tracking_enabled {
1555                                        overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1556                                        overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1557                                        slots_since_stats.fetch_add(1, Ordering::Relaxed);
1558                                        blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1559                                        if let Some(ref mut stats) = thread_stats {
1560                                            stats.blocks_processed += 1;
1561                                            stats.slots_processed += 1;
1562                                            stats.current_slot = slot;
1563                                        }
1564
1565                                        if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1566                                            (&stats_tracking, thread_stats.as_mut())
1567                                            && slot % stats_tracking_cfg.tracking_interval_slots == 0
1568                                                && let Err(err) = maybe_emit_stats(
1569                                                    stats_tracking.as_ref(),
1570                                                    thread_index,
1571                                                    thread_stats_ref,
1572                                                    &overall_slots_processed,
1573                                                    &overall_blocks_processed,
1574                                                    &overall_transactions_processed,
1575                                                    &overall_entries_processed,
1576                                                &transactions_since_stats,
1577                                                &blocks_since_stats,
1578                                                &slots_since_stats,
1579                                                &last_pulse,
1580                                                start_time,
1581                                            )
1582                                            .await
1583                                            {
1584                                                blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1585                                                    slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1586                                                    overall_blocks_processed
1587                                                        .fetch_sub(1, Ordering::Relaxed);
1588                                                    overall_slots_processed
1589                                                        .fetch_sub(1, Ordering::Relaxed);
1590                                                    if let Some((
1591                                                        prev_slots_processed,
1592                                                        prev_blocks_processed,
1593                                                        prev_leader_skipped,
1594                                                        prev_current_slot,
1595                                                    )) = thread_stats_snapshot
1596                                                    {
1597                                                        thread_stats_ref.slots_processed =
1598                                                            prev_slots_processed;
1599                                                        thread_stats_ref.blocks_processed =
1600                                                            prev_blocks_processed;
1601                                                        thread_stats_ref.leader_skipped_slots =
1602                                                            prev_leader_skipped;
1603                                                        thread_stats_ref.current_slot =
1604                                                            prev_current_slot;
1605                                                    }
1606                                                    last_counted_slot = prev_last_counted_slot;
1607                                                    return Err(err);
1608                                                }
1609                                    }
1610
1611                                    if slot > last_counted_slot {
1612                                        last_counted_slot = slot;
1613                                    }
1614                                }
1615                                Subset(_subset) => (),
1616                                Epoch(_epoch) => (),
1617                                Rewards(rewards) => {
1618                                    if reward_enabled || block_enabled {
1619                                        let reassembled = nodes
1620                                            .reassemble_dataframes(&rewards.data)
1621                                            .map_err(|err| {
1622                                                (
1623                                                    FirehoseError::NodeDecodingError(item_index, err),
1624                                                    current_slot.unwrap_or(slot_range.start),
1625                                                )
1626                                            })?;
1627                                        if reassembled.is_empty() {
1628                                            this_block_rewards = DecodedRewards::empty();
1629                                            if reward_enabled
1630                                                && let Some(on_reward_cb) = on_reward.as_ref()
1631                                            {
1632                                                on_reward_cb(
1633                                                    thread_index,
1634                                                    RewardsData {
1635                                                        slot: block.slot,
1636                                                        rewards: Vec::new(),
1637                                                    },
1638                                                )
1639                                                .await
1640                                                .map_err(|e| {
1641                                                    (
1642                                                        FirehoseError::RewardHandlerError(e),
1643                                                        error_slot,
1644                                                    )
1645                                                })?;
1646                                            }
1647                                            continue;
1648                                        }
1649
1650                                        let decoded_rewards =
1651                                            decode_rewards_from_frame(block.slot, reassembled)
1652                                                .map_err(|err| {
1653                                                    (
1654                                                        FirehoseError::NodeDecodingError(
1655                                                            item_index,
1656                                                            err,
1657                                                        ),
1658                                                        error_slot,
1659                                                    )
1660                                                })?;
1661                                        if reward_enabled
1662                                            && let Some(on_reward_cb) = on_reward.as_ref()
1663                                        {
1664                                            on_reward_cb(
1665                                                thread_index,
1666                                                RewardsData {
1667                                                    slot: block.slot,
1668                                                    rewards: decoded_rewards.keyed_rewards.clone(),
1669                                                },
1670                                            )
1671                                            .await
1672                                            .map_err(|e| {
1673                                                (
1674                                                    FirehoseError::RewardHandlerError(e),
1675                                                    error_slot,
1676                                                )
1677                                            })?;
1678                                        }
1679                                        this_block_rewards = decoded_rewards;
1680                                        if let Some(ref mut stats) = thread_stats {
1681                                            stats.rewards_processed +=
1682                                                this_block_rewards.keyed_rewards.len() as u64;
1683                                        }
1684                                    }
1685                                }
1686                                DataFrame(_data_frame) => (),
1687                            }
1688                        }
1689                        if block.slot == slot_range.end - 1 {
1690                            let finish_time = std::time::Instant::now();
1691                            let elapsed = finish_time.duration_since(start_time);
1692                            log::info!(target: &log_target, "processed slot {}", block.slot);
1693                            let elapsed_pretty = human_readable_duration(elapsed);
1694                            log::info!(
1695                                target: &log_target,
1696                                "processed {} slots across {} epochs in {}.",
1697                                slot_range.end - slot_range.start,
1698                                slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1699                                elapsed_pretty
1700                            );
1701                            log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1702                            // On completion, report threads with non-zero error counts for
1703                            // visibility.
1704                            let summary: String = error_counts
1705                                .iter()
1706                                .enumerate()
1707                                .filter_map(|(i, c)| {
1708                                    let v = c.load(Ordering::Relaxed);
1709                                    if v > 0 {
1710                                        Some(format!("{:03}({})", i, v))
1711                                    } else {
1712                                        None
1713                                    }
1714                                })
1715                                .collect::<Vec<_>>()
1716                                .join(", ");
1717                            if !summary.is_empty() {
1718                                log::debug!(target: &log_target, "threads with errors: {}", summary);
1719                            }
1720                            return Ok(());
1721                        }
1722                    }
1723                    if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1724                        && last_counted_slot < expected_last_slot
1725                    {
1726                        // Do not synthesize skipped slots during final flush; another thread may
1727                        // cover the remaining range (especially across epoch boundaries).
1728                    }
1729                    if let Some(ref mut stats) = thread_stats {
1730                        stats.finish_time = Some(std::time::Instant::now());
1731                        maybe_emit_stats(
1732                            stats_tracking.as_ref(),
1733                            thread_index,
1734                            stats,
1735                            &overall_slots_processed,
1736                            &overall_blocks_processed,
1737                            &overall_transactions_processed,
1738                            &overall_entries_processed,
1739                            &transactions_since_stats,
1740                            &blocks_since_stats,
1741                            &slots_since_stats,
1742                            &last_pulse,
1743                            start_time,
1744                        )
1745                        .await?;
1746                    }
1747                    if block_enabled {
1748                        pending_skipped_slots.remove(&thread_index);
1749                    }
1750                    log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1751                    }
1752                    Ok(())
1753            }
1754            .await
1755            {
1756                if is_shutdown_error(&err) {
1757                    log::info!(
1758                        target: &log_target,
1759                        "shutdown requested; terminating firehose thread {}",
1760                        thread_index
1761                    );
1762                    break;
1763                }
1764                let epoch = slot_to_epoch(slot);
1765                let item_index = match &err {
1766                    FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1767                    _ => 0,
1768                };
1769                let error_message = err.to_string();
1770                log::error!(
1771                    target: &log_target,
1772                    "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1773                    slot,
1774                    epoch
1775                );
1776                log::error!(target: &log_target, "{}", error_message);
1777                if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1778                    || error_message.contains("Unknown CID version")
1779                {
1780                    // Clear cached index data for this epoch to avoid retrying with a bad/partial index
1781                    // (or a bad seek offset that landed mid-stream).
1782                    SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1783                }
1784                if let Some(on_error_cb) = on_error.clone() {
1785                    let context = FirehoseErrorContext {
1786                        thread_id: thread_index,
1787                        slot,
1788                        epoch,
1789                        error_message: error_message.clone(),
1790                    };
1791                    if let Err(handler_err) = on_error_cb(thread_index, context).await {
1792                        log::error!(
1793                            target: &log_target,
1794                            "on_error handler failed: {}",
1795                            handler_err
1796                        );
1797                    }
1798                }
1799                // Increment this thread's error counter
1800                error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1801                log::warn!(
1802                    target: &log_target,
1803                    "restarting from slot {} at index {}",
1804                    slot,
1805                    item_index,
1806                );
1807                // Update slot range to resume from the failed slot, not the original start.
1808                // Reset local tracking so we don't treat the resumed slot range as already counted.
1809                // If we've already counted this slot, resume from the next one to avoid duplicates.
1810                if slot <= last_counted_slot {
1811                    slot_range.start = last_counted_slot.saturating_add(1);
1812                } else {
1813                    slot_range.start = slot;
1814                }
1815                // Reset pulse timer to exclude downtime from next rate calc.
1816                last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1817                if tracking_enabled
1818                    && let Some(ref mut stats_ref) = thread_stats {
1819                        stats_ref.slot_range.start = slot_range.start;
1820                        stats_ref.slot_range.end = slot_range.end;
1821                        // initial_slot_range remains unchanged for progress reporting.
1822                    }
1823                if block_enabled {
1824                    pending_skipped_slots.remove(&thread_index);
1825                }
1826                // `skip_until_index` is unsafe across retries because `item_index`
1827                // is reset to 0 each epoch restart. Keeping it can skip large portions
1828                // of the stream and silently drop slots.
1829                skip_until_index = None;
1830                last_emitted_slot_global = last_emitted_slot;
1831            }
1832        });
1833        handles.push(handle);
1834    }
1835
1836    // Wait for all threads to complete
1837    for handle in handles {
1838        handle.await.unwrap();
1839    }
1840    if stats_tracking.is_some() {
1841        let elapsed = firehose_start.elapsed();
1842        let elapsed_secs = elapsed.as_secs_f64();
1843        let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1844        let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1845        let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1846        let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1847        let total_errors: u64 = error_counts
1848            .iter()
1849            .map(|counter| counter.load(Ordering::Relaxed) as u64)
1850            .sum();
1851        let overall_tps = if elapsed_secs > 0.0 {
1852            total_transactions as f64 / elapsed_secs
1853        } else {
1854            0.0
1855        };
1856        log::info!(
1857            target: LOG_MODULE,
1858            "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1859            elapsed_secs,
1860            total_slots,
1861            total_blocks,
1862            total_leader_skipped,
1863            total_transactions,
1864            overall_tps,
1865            total_errors
1866        );
1867    }
1868    if shutdown_flag.load(Ordering::SeqCst) {
1869        log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1870    } else {
1871        log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1872    }
1873    Ok(())
1874}
1875
1876#[allow(clippy::result_large_err)]
1877/// Builds a Geyser-backed firehose and returns a slot notification stream.
1878///
1879/// This helper is used by [`firehose`] when Geyser plugins need to be stood up in-process
1880/// rather than relying solely on remote streams. The provided `slot_range` is treated as a
1881/// half-open interval `[start, end)`, and the thread will restart from the last processed
1882/// slot on recoverable errors to maintain coverage.
1883pub fn firehose_geyser(
1884    rt: Arc<tokio::runtime::Runtime>,
1885    slot_range: Range<u64>,
1886    geyser_config_files: Option<&[PathBuf]>,
1887    index_base_url: &Url,
1888    client: &Client,
1889    on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1890    threads: u64,
1891) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1892    if threads == 0 {
1893        return Err((
1894            FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1895            slot_range.start,
1896        ));
1897    }
1898    log::info!(target: LOG_MODULE, "starting firehose...");
1899    log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1900    let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1901    let mut entry_notifier_maybe = None;
1902    let mut block_meta_notifier_maybe = None;
1903    let mut transaction_notifier_maybe = None;
1904    if let Some(geyser_config_files) = geyser_config_files {
1905        log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1906
1907        let service =
1908            solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1909                confirmed_bank_receiver.clone(),
1910                true,
1911                geyser_config_files,
1912            )
1913            .map_err(|e| (e.into(), slot_range.start))?;
1914
1915        transaction_notifier_maybe = Some(
1916            service
1917                .get_transaction_notifier()
1918                .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1919                .map_err(|e| (e, slot_range.start))?,
1920        );
1921
1922        entry_notifier_maybe = service.get_entry_notifier();
1923        block_meta_notifier_maybe = service.get_block_metadata_notifier();
1924
1925        log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1926    }
1927
1928    if entry_notifier_maybe.is_some() {
1929        log::debug!(target: LOG_MODULE, "entry notifications enabled")
1930    } else {
1931        log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1932    }
1933    log::info!(target: LOG_MODULE, "running on_load...");
1934    rt.spawn(on_load);
1935
1936    let slot_range = Arc::new(slot_range);
1937    let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1938    let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1939    let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1940    let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1941
1942    // divide slot_range into n subranges
1943    let subranges = generate_subranges(&slot_range, threads);
1944    if threads > 1 {
1945        log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1946    }
1947
1948    let mut handles = Vec::new();
1949    // Shared per-thread error counters
1950    let error_counts: Arc<Vec<AtomicU32>> =
1951        Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1952
1953    for (i, slot_range) in subranges.into_iter().enumerate() {
1954        let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1955        let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1956        let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1957        let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1958        let client = client.clone();
1959        let error_counts = error_counts.clone();
1960
1961        let rt_clone = rt.clone();
1962
1963        let handle = std::thread::spawn(move || {
1964            rt_clone.block_on(async {
1965                firehose_geyser_thread(
1966                    slot_range,
1967                    transaction_notifier_maybe,
1968                    entry_notifier_maybe,
1969                    block_meta_notifier_maybe,
1970                    confirmed_bank_sender,
1971                    &client,
1972                    if threads > 1 { Some(i) } else { None },
1973                    error_counts,
1974                )
1975                .await
1976                .unwrap();
1977            });
1978        });
1979        handles.push(handle);
1980    }
1981
1982    // Wait for all threads to complete
1983    for handle in handles {
1984        handle.join().unwrap();
1985    }
1986    log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1987    if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1988        block_meta_notifier.notify_block_metadata(
1989            u64::MAX,
1990            "unload",
1991            u64::MAX,
1992            "unload",
1993            &KeyedRewardsAndNumPartitions {
1994                keyed_rewards: vec![],
1995                num_partitions: None,
1996            },
1997            None,
1998            None,
1999            0,
2000            0,
2001        );
2002    }
2003    Ok(confirmed_bank_receiver)
2004}
2005
2006#[allow(clippy::too_many_arguments)]
2007#[allow(clippy::result_large_err)]
2008async fn firehose_geyser_thread(
2009    mut slot_range: Range<u64>,
2010    transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
2011    entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
2012    block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
2013    confirmed_bank_sender: Sender<SlotNotification>,
2014    client: &Client,
2015    thread_index: Option<usize>,
2016    error_counts: Arc<Vec<AtomicU32>>,
2017) -> Result<(), (FirehoseError, u64)> {
2018    let start_time = std::time::Instant::now();
2019    let log_target = if let Some(thread_index) = thread_index {
2020        format!("{}::T{:03}", LOG_MODULE, thread_index)
2021    } else {
2022        LOG_MODULE.to_string()
2023    };
2024    let initial_slot_range = slot_range.clone();
2025    let mut skip_until_index = None;
2026    let mut last_counted_slot = slot_range.start.saturating_sub(1);
2027    // let mut triggered = false;
2028    while let Err((err, slot)) = async {
2029            let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
2030            log::info!(
2031                target: &log_target,
2032                "slot range: {} (epoch {}) ... {} (epoch {})",
2033                slot_range.start,
2034                slot_to_epoch(slot_range.start),
2035                slot_range.end,
2036                slot_to_epoch(slot_range.end)
2037            );
2038
2039            log::info!(target: &log_target, "🚒 starting firehose...");
2040
2041            // for each epoch
2042            let mut current_slot: Option<u64> = None;
2043            for epoch_num in epoch_range.clone() {
2044                log::info!(target: &log_target, "entering epoch {}", epoch_num);
2045                let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
2046                    Ok(stream) => stream,
2047                    Err(_) => {
2048                        return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
2049                    }
2050                };
2051                let mut reader = NodeReader::new(stream);
2052
2053                let header_fut = reader.read_raw_header();
2054                let header = match timeout(OP_TIMEOUT, header_fut).await {
2055                    Ok(res) => res
2056                        .map_err(FirehoseError::ReadHeader)
2057                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2058                    Err(_) => {
2059                        return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
2060                    }
2061                };
2062                log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
2063
2064                let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
2065                let local_start = std::cmp::max(slot_range.start, epoch_start);
2066                let local_end_inclusive =
2067                    std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
2068                if local_start > local_end_inclusive {
2069                    log::debug!(
2070                        target: &log_target,
2071                        "epoch {} has no overlap with thread range ({}..{}), skipping",
2072                        epoch_num,
2073                        slot_range.start,
2074                        slot_range.end
2075                    );
2076                    continue;
2077                }
2078
2079                let mut todo_previous_blockhash = Hash::default();
2080                let mut todo_latest_entry_blockhash = Hash::default();
2081                // Reset counters to align to the local epoch slice; prevents boundary slots
2082                // from being treated as already-counted after a restart.
2083                last_counted_slot = local_start.saturating_sub(1);
2084                current_slot = None;
2085
2086                if local_start > epoch_start {
2087                    // Seek to the nearest previous indexed slot so the reader captures the full
2088                    // node set (transactions, entries, rewards) for the target block.
2089                    let seek_slot = match timeout(
2090                        OP_TIMEOUT,
2091                        find_previous_indexed_slot(local_start, epoch_start, &log_target),
2092                    )
2093                    .await
2094                    {
2095                        Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2096                        Err(_) => {
2097                            return Err((
2098                                FirehoseError::OperationTimeout(
2099                                    "seek_to_previous_indexed_slot",
2100                                ),
2101                                current_slot.unwrap_or(slot_range.start),
2102                            ));
2103                        }
2104                    };
2105                    if let Some(seek_slot) = seek_slot {
2106                        let seek_fut = reader.seek_to_slot(seek_slot);
2107                        match timeout(OP_TIMEOUT, seek_fut).await {
2108                            Ok(res) => {
2109                                res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2110                            }
2111                            Err(_) => {
2112                                return Err((
2113                                    FirehoseError::OperationTimeout("seek_to_slot"),
2114                                    current_slot.unwrap_or(slot_range.start),
2115                                ));
2116                            }
2117                        }
2118                    }
2119                }
2120
2121                // for each item in each block
2122                let mut item_index = 0;
2123                let mut displayed_skip_message = false;
2124                loop {
2125                    let read_fut = reader.read_until_block();
2126                    let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2127                        Ok(result) => result
2128                            .map_err(FirehoseError::ReadUntilBlockError)
2129                            .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2130                        Err(_) => {
2131                            log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2132                            let restart_slot =
2133                                current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2134                            return Err((
2135                                FirehoseError::OperationTimeout("read_until_block"),
2136                                restart_slot,
2137                            ));
2138                        }
2139                    };
2140                    if nodes.is_empty() {
2141                        log::info!(
2142                            target: &log_target,
2143                            "reached end of epoch {}",
2144                            epoch_num
2145                        );
2146                        break;
2147                    }
2148                    // ignore epoch and subset nodes at end of car file loop { if
2149                    // nodes.0.is_empty() { break; } if let Some(node) = nodes.0.last() { if
2150                    //     node.get_node().is_epoch() { log::debug!(target: &log_target,
2151                    //         "skipping epoch node for epoch {}", epoch_num); nodes.0.pop(); }
2152                    //     else if node.get_node().is_subset() { nodes.0.pop(); } else if
2153                    //     node.get_node().is_block() { break; } } } if nodes.0.is_empty() {
2154                    //         log::info!(target: &log_target, "reached end of epoch {}",
2155                    //             epoch_num); break; }
2156                    if let Some(last_node) = nodes.0.last()
2157                        && !last_node.get_node().is_block() {
2158                            log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2159                            break;
2160                        }
2161                    let block = nodes
2162                        .get_block()
2163                        .map_err(FirehoseError::GetBlockError)
2164                        .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2165                    log::debug!(
2166                        target: &log_target,
2167                        "read {} items from epoch {}, now at slot {}",
2168                        item_index,
2169                        epoch_num,
2170                        block.slot
2171                    );
2172                    let slot = block.slot;
2173                    if slot > local_end_inclusive {
2174                        log::debug!(
2175                            target: &log_target,
2176                            "reached end of local slice at slot {} (epoch {}), stopping",
2177                            slot,
2178                            epoch_num
2179                        );
2180                        break;
2181                    }
2182                    if slot >= slot_range.end {
2183                        log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2184                        // Return early to terminate the firehose thread cleanly. We use >=
2185                        // because slot_range is half-open [start, end), so any slot equal to
2186                        // end is out-of-range and must not be processed.
2187                        return Ok(());
2188                    }
2189                    debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2190                    if slot < local_start {
2191                        if slot.saturating_add(1) == local_start {
2192                            log::debug!(
2193                                target: &log_target,
2194                                "priming reader with preceding slot {}, skipping",
2195                                slot
2196                            );
2197                        } else {
2198                            log::warn!(
2199                                target: &log_target,
2200                                "encountered slot {} before start of range {}, skipping",
2201                                slot,
2202                                local_start
2203                            );
2204                        }
2205                        continue;
2206                    }
2207                    current_slot = Some(slot);
2208                    let mut entry_index: usize = 0;
2209                    let mut this_block_executed_transaction_count: u64 = 0;
2210                    let mut this_block_entry_count: u64 = 0;
2211                    let mut this_block_rewards = DecodedRewards::empty();
2212
2213                    if slot <= last_counted_slot {
2214                        log::debug!(
2215                            target: &log_target,
2216                            "duplicate block {}, already counted (last_counted={})",
2217                            slot,
2218                            last_counted_slot,
2219                        );
2220                        continue;
2221                    }
2222
2223                    nodes.each(|node_with_cid| -> Result<(), SharedError> {
2224                        item_index += 1;
2225                        // if item_index == 100000 && !triggered { log::info!("simulating
2226                        //     error"); triggered = true; return
2227                        //     Err(Box::new(GeyserReplayError::NodeDecodingError(item_index,
2228                        //     Box::new(std::io::Error::new( std::io::ErrorKind::Other,
2229                        //         "simulated error", )), ))); }
2230                        if let Some(skip) = skip_until_index {
2231                            if item_index < skip {
2232                                if !displayed_skip_message {
2233                                    log::info!(
2234                                        target: &log_target,
2235                                        "skipping until index {} (at {})",
2236                                        skip,
2237                                        item_index
2238                                    );
2239                                    displayed_skip_message = true;
2240                                }
2241                                return Ok(());
2242                            } else {
2243                                log::info!(
2244                                    target: &log_target,
2245                                    "reached target index {}, resuming...",
2246                                    skip
2247                                );
2248                                skip_until_index = None;
2249                            }
2250                        }
2251                        let node = node_with_cid.get_node();
2252
2253                        use crate::node::Node::*;
2254                        match node {
2255                            Transaction(tx) => {
2256                                let versioned_tx = tx.as_parsed()?;
2257                                let reassembled_metadata = nodes.reassemble_dataframes(&tx.metadata)?;
2258
2259                                let as_native_metadata = decode_transaction_status_meta_from_frame(
2260                                    block.slot,
2261                                    reassembled_metadata,
2262                                )?;
2263
2264                                let message_hash = {
2265                                    #[cfg(feature = "verify-transaction-signatures")]
2266                                    {
2267                                        versioned_tx.verify_and_hash_message()?
2268                                    }
2269                                    #[cfg(not(feature = "verify-transaction-signatures"))]
2270                                    {
2271                                        // Signature verification is optional because it is
2272                                        // extremely expensive at replay scale.
2273                                        versioned_tx.message.hash()
2274                                    }
2275                                };
2276                                let signature = versioned_tx
2277                                    .signatures
2278                                    .first()
2279                                    .ok_or_else(|| {
2280                                        Box::new(std::io::Error::new(
2281                                            std::io::ErrorKind::InvalidData,
2282                                            "transaction missing signature",
2283                                        )) as SharedError
2284                                    })?;
2285                                let is_vote = is_simple_vote_transaction(&versioned_tx);
2286
2287                                if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2288                                    transaction_notifier.notify_transaction(
2289                                        block.slot,
2290                                        tx.index.unwrap() as usize,
2291                                        signature,
2292                                        &message_hash,
2293                                        is_vote,
2294                                        &as_native_metadata,
2295                                        &versioned_tx,
2296                                    );
2297                                }
2298
2299                            }
2300                            Entry(entry) => {
2301                                let entry_hash = Hash::from(entry.hash.to_bytes());
2302                                let entry_transaction_count = entry.transactions.len();
2303                                let entry_transaction_count_u64 = entry_transaction_count as u64;
2304                                let starting_transaction_index =
2305                                    usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2306                                        Box::new(std::io::Error::other(
2307                                            "transaction index exceeds usize range",
2308                                        )) as SharedError
2309                                    })?;
2310                                todo_latest_entry_blockhash = entry_hash;
2311                                this_block_executed_transaction_count += entry_transaction_count_u64;
2312                                this_block_entry_count += 1;
2313                                if entry_notifier_maybe.is_none() {
2314                                    return Ok(());
2315                                }
2316                                let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2317                                let entry_summary = solana_entry::entry::EntrySummary {
2318                                    num_hashes: entry.num_hashes,
2319                                    hash: Hash::from(entry.hash.to_bytes()),
2320                                    num_transactions: entry_transaction_count_u64,
2321                                };
2322                                entry_notifier.notify_entry(
2323                                    block.slot,
2324                                    entry_index,
2325                                    &entry_summary,
2326                                    starting_transaction_index,
2327                                );
2328                                entry_index += 1;
2329                            }
2330                            Block(block) => {
2331                                let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2332                                confirmed_bank_sender.send(notification).unwrap();
2333
2334                                if block_meta_notifier_maybe.is_none() {
2335                                    last_counted_slot = block.slot;
2336                                    return Ok(());
2337                                }
2338                                let DecodedRewards {
2339                                    keyed_rewards,
2340                                    num_partitions,
2341                                } = std::mem::take(&mut this_block_rewards);
2342                                let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2343                                block_meta_notifier.notify_block_metadata(
2344                                    block.meta.parent_slot,
2345                                    todo_previous_blockhash.to_string().as_str(),
2346                                    block.slot,
2347                                    todo_latest_entry_blockhash.to_string().as_str(),
2348                                    &KeyedRewardsAndNumPartitions {
2349                                        keyed_rewards,
2350                                        num_partitions,
2351                                    },
2352                                    Some(block.meta.blocktime as i64),
2353                                    block.meta.block_height,
2354                                    this_block_executed_transaction_count,
2355                                    this_block_entry_count,
2356                                );
2357                                todo_previous_blockhash = todo_latest_entry_blockhash;
2358                                last_counted_slot = block.slot;
2359                                std::thread::yield_now();
2360                            }
2361                            Subset(_subset) => (),
2362                            Epoch(_epoch) => (),
2363                            Rewards(rewards) => {
2364                                let reassembled = nodes.reassemble_dataframes(&rewards.data)?;
2365                                if !reassembled.is_empty() {
2366                                    this_block_rewards = decode_rewards_from_frame(
2367                                        block.slot,
2368                                        reassembled,
2369                                    )?;
2370                                } else {
2371                                    this_block_rewards = DecodedRewards::empty();
2372                                }
2373                            }
2374                            DataFrame(_data_frame) => (),
2375                        }
2376                        Ok(())
2377                    })
2378                .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2379                    if block.slot == slot_range.end - 1 {
2380                        let finish_time = std::time::Instant::now();
2381                        let elapsed = finish_time.duration_since(start_time);
2382                        log::info!(target: &log_target, "processed slot {}", block.slot);
2383                        let elapsed_pretty = human_readable_duration(elapsed);
2384                        log::info!(
2385                            target: &log_target,
2386                            "processed {} slots across {} epochs in {}.",
2387                            initial_slot_range.end - initial_slot_range.start,
2388                            slot_to_epoch(initial_slot_range.end)
2389                                + 1
2390                                - slot_to_epoch(initial_slot_range.start),
2391                            elapsed_pretty
2392                        );
2393                        log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2394                        // On completion, report threads with non-zero error counts for
2395                        // visibility.
2396                        let summary: String = error_counts
2397                            .iter()
2398                            .enumerate()
2399                            .filter_map(|(i, c)| {
2400                                let v = c.load(Ordering::Relaxed);
2401                                if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2402                            })
2403                            .collect::<Vec<_>>()
2404                            .join(", ");
2405                        if !summary.is_empty() {
2406                            log::debug!(target: &log_target, "threads with errors: {}", summary);
2407                        }
2408                        return Ok(());
2409                    }
2410                }
2411            }
2412            Ok(())
2413}
2414.await
2415{
2416        if is_shutdown_error(&err) {
2417            log::info!(
2418                target: &log_target,
2419                "shutdown requested; terminating firehose thread {:?}",
2420                thread_index
2421            );
2422            return Ok(());
2423        }
2424        log::error!(
2425            target: &log_target,
2426            "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2427            slot,
2428            slot_to_epoch(slot)
2429            );
2430            log::error!(target: &log_target, "{}", err);
2431            let error_message = err.to_string();
2432            if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2433                || error_message.contains("Unknown CID version")
2434            {
2435                // Clear cached index data for this epoch to avoid retrying with a bad/partial index
2436                // (or a bad seek offset that landed mid-stream).
2437                SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2438            }
2439            let item_index = match err {
2440                FirehoseError::NodeDecodingError(item_index, _) => item_index,
2441                _ => 0,
2442            };
2443            // Increment this thread's error counter
2444            let idx = thread_index.unwrap_or(0);
2445            error_counts[idx].fetch_add(1, Ordering::Relaxed);
2446            log::warn!(
2447                target: &log_target,
2448                "restarting from slot {} at index {}",
2449                slot,
2450                item_index,
2451            );
2452            // Update slot range to resume from the failed slot, not the original start.
2453            // If the failing slot was already fully processed, resume from the next slot.
2454            if slot <= last_counted_slot {
2455                slot_range.start = last_counted_slot.saturating_add(1);
2456            } else {
2457                slot_range.start = slot;
2458            }
2459            // `skip_until_index` is unsafe across retries because `item_index`
2460            // is reset to 0 each epoch restart. Keeping it can skip large portions
2461            // of the stream and silently drop slots.
2462            skip_until_index = None;
2463}
2464    Ok(())
2465}
2466
2467#[inline]
2468fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2469    if !(1..=2).contains(&versioned_tx.signatures.len()) {
2470        return false;
2471    }
2472
2473    if !matches!(
2474        versioned_tx.version(),
2475        solana_transaction::versioned::TransactionVersion::Legacy(_)
2476    ) {
2477        return false;
2478    }
2479
2480    let instructions = versioned_tx.message.instructions();
2481    if instructions.len() != 1 {
2482        return false;
2483    }
2484
2485    let program_index = instructions[0].program_id_index as usize;
2486    versioned_tx
2487        .message
2488        .static_account_keys()
2489        .get(program_index)
2490        .map(|program_id| program_id == &vote_program_id())
2491        .unwrap_or(false)
2492}
2493
2494#[inline(always)]
2495fn convert_proto_rewards(
2496    proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2497) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2498    let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2499    for proto_reward in proto_rewards.rewards.iter() {
2500        let reward = RewardInfo {
2501            reward_type: match proto_reward.reward_type - 1 {
2502                0 => RewardType::Fee,
2503                1 => RewardType::Rent,
2504                2 => RewardType::Staking,
2505                3 => RewardType::Voting,
2506                typ => {
2507                    return Err(Box::new(std::io::Error::other(format!(
2508                        "unsupported reward type {}",
2509                        typ
2510                    ))));
2511                }
2512            },
2513            lamports: proto_reward.lamports,
2514            post_balance: proto_reward.post_balance,
2515            commission: proto_reward.commission.parse::<u8>().ok(),
2516        };
2517        let pubkey = proto_reward
2518            .pubkey
2519            .parse::<Address>()
2520            .map_err(|err| Box::new(err) as SharedError)?;
2521        keyed_rewards.push((pubkey, reward));
2522    }
2523    Ok(keyed_rewards)
2524}
2525
2526#[inline]
2527/// Splits `slot_range` into nearly-even sub-ranges for the given thread count.
2528pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2529    let total = slot_range.end - slot_range.start;
2530    let slots_per_thread = total / threads;
2531    let remainder = total % threads;
2532
2533    let ranges: Vec<Range<u64>> = (0..threads)
2534        .map(|i| {
2535            // Distribute remainder slots to the first `remainder` threads
2536            let extra_slot = if i < remainder { 1 } else { 0 };
2537            let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2538            let end = start + slots_per_thread + extra_slot;
2539            start..end
2540        })
2541        .collect();
2542
2543    // Verify that ranges cover all slots exactly
2544    let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2545    assert_eq!(
2546        total_covered, total,
2547        "Range generation failed: {} threads should cover {} slots but only cover {}",
2548        threads, total, total_covered
2549    );
2550
2551    // Verify no gaps between ranges
2552    for i in 1..ranges.len() {
2553        assert_eq!(
2554            ranges[i - 1].end,
2555            ranges[i].start,
2556            "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2557            i - 1,
2558            ranges[i - 1].end,
2559            i,
2560            ranges[i].start
2561        );
2562    }
2563
2564    log::info!(
2565        target: LOG_MODULE,
2566        "Generated {} thread ranges covering {} slots total",
2567        threads,
2568        total_covered
2569    );
2570    ranges
2571}
2572
2573fn human_readable_duration(duration: std::time::Duration) -> String {
2574    if duration.is_zero() {
2575        return "0s".into();
2576    }
2577    let total_secs = duration.as_secs();
2578    if total_secs < 60 {
2579        let secs_f = duration.as_secs_f64();
2580        if total_secs == 0 {
2581            format!("{:.2}s", secs_f)
2582        } else if duration.subsec_millis() == 0 {
2583            format!("{}s", total_secs)
2584        } else {
2585            format!("{:.2}s", secs_f)
2586        }
2587    } else {
2588        let mut secs = total_secs;
2589        let days = secs / 86_400;
2590        secs %= 86_400;
2591        let hours = secs / 3_600;
2592        secs %= 3_600;
2593        let minutes = secs / 60;
2594        secs %= 60;
2595        if days > 0 {
2596            if hours > 0 {
2597                format!("{days}d{hours}h")
2598            } else {
2599                format!("{days}d")
2600            }
2601        } else if hours > 0 {
2602            if minutes > 0 {
2603                format!("{hours}h{minutes}m")
2604            } else {
2605                format!("{hours}h")
2606            }
2607        } else if minutes > 0 {
2608            if secs > 0 {
2609                format!("{minutes}m{secs}s")
2610            } else {
2611                format!("{minutes}m")
2612            }
2613        } else {
2614            format!("{secs}s")
2615        }
2616    }
2617}
2618
2619#[cfg(test)]
2620fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2621    Box::pin(async move {
2622        let elapsed = stats.start_time.elapsed();
2623        let elapsed_secs = elapsed.as_secs_f64();
2624        let tps = if elapsed_secs > 0.0 {
2625            stats.transactions_processed as f64 / elapsed_secs
2626        } else {
2627            0.0
2628        };
2629        log::info!(
2630            target: LOG_MODULE,
2631            "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2632            stats.thread_stats.current_slot,
2633            stats.slots_processed,
2634            stats.blocks_processed,
2635            stats.transactions_processed,
2636            stats.entries_processed,
2637            stats.rewards_processed,
2638            elapsed_secs,
2639            tps
2640        );
2641        Ok(())
2642    })
2643}
2644
2645#[cfg(test)]
2646use futures_util::FutureExt;
2647#[cfg(test)]
2648use serial_test::serial;
2649#[cfg(test)]
2650use std::sync::{Mutex, OnceLock};
2651
2652#[cfg(test)]
2653async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2654    use std::sync::Arc;
2655    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2656
2657    let found = Arc::new(AtomicBool::new(false));
2658    let observed_total = Arc::new(AtomicU64::new(0));
2659    let observed_non_vote = Arc::new(AtomicU64::new(0));
2660
2661    let found_block = found.clone();
2662    let observed_total_block = observed_total.clone();
2663    let target_slot_block = slot;
2664    let target_slot_tx = slot;
2665    let observed_non_vote_tx = observed_non_vote.clone();
2666
2667    firehose(
2668        1,
2669        false,
2670        None,
2671        target_slot_block..(target_slot_block + 1),
2672        Some(move |_thread_id: usize, block: BlockData| {
2673            let found_block = found_block.clone();
2674            let observed_total_block = observed_total_block.clone();
2675            async move {
2676                if block.slot() == target_slot_block {
2677                    assert!(
2678                        !block.was_skipped(),
2679                        "slot {target_slot_block} was marked leader skipped",
2680                    );
2681                    if let BlockData::Block {
2682                        executed_transaction_count,
2683                        ..
2684                    } = block
2685                    {
2686                        found_block.store(true, Ordering::Relaxed);
2687                        observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2688                    }
2689                }
2690                Ok(())
2691            }
2692            .boxed()
2693        }),
2694        Some(move |_thread_id: usize, transaction: TransactionData| {
2695            let observed_non_vote_tx = observed_non_vote_tx.clone();
2696            async move {
2697                if transaction.slot == target_slot_tx && !transaction.is_vote {
2698                    observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2699                }
2700                Ok(())
2701            }
2702            .boxed()
2703        }),
2704        None::<OnEntryFn>,
2705        None::<OnRewardFn>,
2706        None::<OnErrorFn>,
2707        None::<OnStatsTrackingFn>,
2708        None,
2709    )
2710    .await
2711    .unwrap();
2712
2713    assert!(
2714        found.load(Ordering::Relaxed),
2715        "target slot {slot} was not processed"
2716    );
2717    let observed_total = observed_total.load(Ordering::Relaxed);
2718    let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2719    assert!(
2720        observed_total > 0,
2721        "slot {slot} executed transaction count was zero"
2722    );
2723    assert!(
2724        observed_total >= min_executed,
2725        "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2726    );
2727    log::info!(
2728        target: LOG_MODULE,
2729        "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2730        observed_total,
2731        observed_non_vote
2732    );
2733}
2734
2735#[cfg(test)]
2736async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2737    use crate::index::slot_to_offset;
2738    use crate::node::Node;
2739
2740    let epoch = slot_to_epoch(slot);
2741    let client = crate::network::create_http_client();
2742    let stream = fetch_epoch_stream(epoch, &client).await;
2743    let mut reader = NodeReader::new(stream);
2744    reader
2745        .seek_to_slot(slot)
2746        .await
2747        .map_err(|err| Box::new(err) as SharedError)?;
2748
2749    let nodes = reader.read_until_block().await?;
2750    let mut transactions = 0u64;
2751    let mut entries = 0u64;
2752    let mut entry_tx_total = 0u64;
2753    let mut dataframes = 0u64;
2754    let mut rewards = 0u64;
2755    let mut subsets = 0u64;
2756    let mut epochs = 0u64;
2757    let mut block_slot = None;
2758    let mut block_entries = None;
2759    let first_kind = nodes
2760        .0
2761        .first()
2762        .map(|node| node.get_node())
2763        .map(|node| match node {
2764            Node::Transaction(_) => "transaction",
2765            Node::Entry(_) => "entry",
2766            Node::Block(_) => "block",
2767            Node::Subset(_) => "subset",
2768            Node::Epoch(_) => "epoch",
2769            Node::Rewards(_) => "rewards",
2770            Node::DataFrame(_) => "dataframe",
2771        })
2772        .unwrap_or("none");
2773
2774    for node in &nodes.0 {
2775        match node.get_node() {
2776            Node::Transaction(_) => {
2777                transactions += 1;
2778            }
2779            Node::Entry(entry) => {
2780                entries += 1;
2781                entry_tx_total += entry.transactions.len() as u64;
2782            }
2783            Node::Block(block) => {
2784                block_slot = Some(block.slot);
2785                block_entries = Some(block.entries.len());
2786            }
2787            Node::Subset(_) => {
2788                subsets += 1;
2789            }
2790            Node::Epoch(_) => {
2791                epochs += 1;
2792            }
2793            Node::Rewards(_) => {
2794                rewards += 1;
2795            }
2796            Node::DataFrame(_) => {
2797                dataframes += 1;
2798            }
2799        }
2800    }
2801
2802    log::info!(
2803        target: LOG_MODULE,
2804        "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2805        nodes.len(),
2806        first_kind,
2807        transactions,
2808        entries,
2809        entry_tx_total,
2810        block_slot,
2811        block_entries,
2812        dataframes,
2813        rewards,
2814        subsets,
2815        epochs
2816    );
2817
2818    if slot > 0 {
2819        let mut found_previous = None;
2820        for delta in 1..=5 {
2821            let candidate = slot.saturating_sub(delta);
2822            match slot_to_offset(candidate).await {
2823                Ok(offset) => {
2824                    found_previous = Some((candidate, offset));
2825                    break;
2826                }
2827                Err(err) => {
2828                    log::info!(
2829                        target: LOG_MODULE,
2830                        "slot {slot} previous lookup {candidate} failed: {err}"
2831                    );
2832                }
2833            }
2834        }
2835        if let Some((candidate, offset)) = found_previous {
2836            log::info!(
2837                target: LOG_MODULE,
2838                "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2839            );
2840        } else {
2841            log::info!(
2842                target: LOG_MODULE,
2843                "slot {slot} no previous offsets found within 5 slots"
2844            );
2845        }
2846    }
2847
2848    Ok(())
2849}
2850
2851#[tokio::test(flavor = "multi_thread")]
2852async fn test_firehose_epoch_800() {
2853    use dashmap::DashSet;
2854    use std::sync::atomic::{AtomicU64, Ordering};
2855    solana_logger::setup_with_default("info");
2856    const THREADS: usize = 4;
2857    const NUM_SLOTS_TO_COVER: u64 = 50;
2858    static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2859    static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2860    static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2861    static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2862    static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2863    static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2864    let stats_tracking = StatsTracking {
2865        on_stats: log_stats_handler,
2866        tracking_interval_slots: 10,
2867    };
2868
2869    for prev in PREV_BLOCK.iter() {
2870        prev.store(0, Ordering::Relaxed);
2871    }
2872    NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2873    NUM_BLOCKS.store(0, Ordering::Relaxed);
2874    MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2875    SEEN_SLOTS.get_or_init(DashSet::new).clear();
2876    SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2877
2878    firehose(
2879        THREADS.try_into().unwrap(),
2880        false,
2881        None,
2882        (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2883        Some(|thread_id: usize, block: BlockData| {
2884            async move {
2885                let _prev =
2886                    PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2887                if block.was_skipped() {
2888                    log::info!(
2889                        target: LOG_MODULE,
2890                        "leader skipped block {} on thread {}",
2891                        block.slot(),
2892                        thread_id,
2893                    );
2894                } else {
2895                    /*log::info!(
2896                        target: LOG_MODULE,
2897                        "got block {} on thread {}",
2898                        block.slot(),
2899                        thread_id,
2900                    );*/
2901                }
2902
2903                let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2904                if block.was_skipped() {
2905                    NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2906                    SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2907                } else {
2908                    if first_time {
2909                        NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2910                        if let BlockData::Block {
2911                            executed_transaction_count,
2912                            ..
2913                        } = &block
2914                        {
2915                            let executed = *executed_transaction_count;
2916                            let _ = MIN_TRANSACTIONS.fetch_update(
2917                                Ordering::Relaxed,
2918                                Ordering::Relaxed,
2919                                |current| {
2920                                    if executed < current {
2921                                        Some(executed)
2922                                    } else {
2923                                        None
2924                                    }
2925                                },
2926                            );
2927                        }
2928                    }
2929                }
2930                Ok(())
2931            }
2932            .boxed()
2933        }),
2934        None::<OnTxFn>,
2935        None::<OnEntryFn>,
2936        None::<OnRewardFn>,
2937        None::<OnErrorFn>,
2938        Some(stats_tracking),
2939        None,
2940    )
2941    .await
2942    .unwrap();
2943    let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2944    assert_eq!(
2945        seen, NUM_SLOTS_TO_COVER,
2946        "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2947    );
2948    let mut skipped: Vec<u64> = SEEN_SKIPPED
2949        .get_or_init(DashSet::new)
2950        .iter()
2951        .map(|v| *v)
2952        .collect();
2953    skipped.sort_unstable();
2954    // 345600000 is present but empty; still emitted as a block. Skip set should not include it.
2955    const EXPECTED_SKIPPED: [u64; 6] = [
2956        345_600_004,
2957        345_600_005,
2958        345_600_008,
2959        345_600_009,
2960        345_600_010,
2961        345_600_011,
2962    ];
2963    assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2964    assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2965}
2966
2967#[tokio::test(flavor = "multi_thread")]
2968async fn test_firehose_target_slot_transactions() {
2969    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2970    solana_logger::setup_with_default("info");
2971    const TARGET_SLOT: u64 = 376_273_722;
2972    const SLOT_RADIUS: u64 = 50;
2973    const EXPECTED_TRANSACTIONS: u64 = 1414;
2974    const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2975    static FOUND: AtomicBool = AtomicBool::new(false);
2976    static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2977    static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2978
2979    FOUND.store(false, Ordering::Relaxed);
2980    OBSERVED_TXS.store(0, Ordering::Relaxed);
2981    OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2982
2983    firehose(
2984        4,
2985        false,
2986        None,
2987        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2988        Some(|_thread_id: usize, block: BlockData| {
2989            async move {
2990                if block.slot() == TARGET_SLOT {
2991                    assert!(
2992                        !block.was_skipped(),
2993                        "target slot {TARGET_SLOT} was marked leader skipped",
2994                    );
2995                    if let BlockData::Block {
2996                        executed_transaction_count,
2997                        ..
2998                    } = block
2999                    {
3000                        OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
3001                        FOUND.store(true, Ordering::Relaxed);
3002                        assert_eq!(
3003                            executed_transaction_count, EXPECTED_TRANSACTIONS,
3004                            "unexpected transaction count for slot {TARGET_SLOT}"
3005                        );
3006                        assert_eq!(
3007                            OBSERVED_NON_VOTE.load(Ordering::Relaxed),
3008                            EXPECTED_NON_VOTE_TRANSACTIONS,
3009                            "unexpected non-vote transaction count for slot {TARGET_SLOT}"
3010                        );
3011                    }
3012                }
3013                Ok(())
3014            }
3015            .boxed()
3016        }),
3017        Some(|_thread_id: usize, transaction: TransactionData| {
3018            async move {
3019                if transaction.slot == TARGET_SLOT && !transaction.is_vote {
3020                    OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
3021                }
3022                Ok(())
3023            }
3024            .boxed()
3025        }),
3026        None::<OnEntryFn>,
3027        None::<OnRewardFn>,
3028        None::<OnErrorFn>,
3029        None::<OnStatsTrackingFn>,
3030        None,
3031    )
3032    .await
3033    .unwrap();
3034
3035    assert!(
3036        FOUND.load(Ordering::Relaxed),
3037        "target slot was not processed"
3038    );
3039    assert_eq!(
3040        OBSERVED_TXS.load(Ordering::Relaxed),
3041        EXPECTED_TRANSACTIONS,
3042        "recorded transaction count mismatch"
3043    );
3044}
3045
3046#[cfg(test)]
3047#[serial]
3048#[tokio::test(flavor = "multi_thread")]
3049async fn test_firehose_epoch_900_boundary_window_sequential_monotonic_transactions() {
3050    use std::sync::{
3051        Arc, Mutex,
3052        atomic::{AtomicU64, Ordering},
3053    };
3054
3055    solana_logger::setup_with_default("info");
3056    const SLOT_COUNT: u64 = 100;
3057    const THREADS: u64 = 4;
3058    const TEST_BUFFER_WINDOW: &str = "4GiB";
3059
3060    let (epoch_900_start, _) = epoch_to_slot_range(900);
3061    let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3062
3063    let last_seen_tx_slot = Arc::new(Mutex::new(slot_range.start));
3064    let observed_txs = Arc::new(AtomicU64::new(0));
3065    let stats_tracking = StatsTracking {
3066        on_stats: log_stats_handler,
3067        tracking_interval_slots: 100,
3068    };
3069    let test_buffer_window_bytes = crate::system::parse_buffer_window_bytes(TEST_BUFFER_WINDOW)
3070        .expect("valid test buffer window");
3071
3072    firehose(
3073        THREADS,
3074        true,
3075        Some(test_buffer_window_bytes),
3076        slot_range.clone(),
3077        None::<OnBlockFn>,
3078        Some({
3079            let last_seen_tx_slot = last_seen_tx_slot.clone();
3080            let observed_txs = observed_txs.clone();
3081            move |_thread_id: usize, transaction: TransactionData| {
3082                let last_seen_tx_slot = last_seen_tx_slot.clone();
3083                let observed_txs = observed_txs.clone();
3084                async move {
3085                    let mut previous = last_seen_tx_slot.lock().unwrap();
3086                    // Old Faithful does not include leader-skipped slots, so gaps are
3087                    // expected. We only enforce monotonic (non-decreasing) tx slot ordering.
3088                    assert!(
3089                        transaction.slot >= *previous,
3090                        "transaction slot regressed: prev={}, current={}",
3091                        *previous,
3092                        transaction.slot
3093                    );
3094                    *previous = transaction.slot;
3095                    observed_txs.fetch_add(1, Ordering::Relaxed);
3096                    Ok(())
3097                }
3098                .boxed()
3099            }
3100        }),
3101        None::<OnEntryFn>,
3102        None::<OnRewardFn>,
3103        None::<OnErrorFn>,
3104        Some(stats_tracking),
3105        None,
3106    )
3107    .await
3108    .unwrap();
3109
3110    assert!(
3111        observed_txs.load(Ordering::Relaxed) > 0,
3112        "expected to observe at least one transaction in slots [{}, {})",
3113        slot_range.start,
3114        slot_range.end
3115    );
3116}
3117
3118#[cfg(test)]
3119#[serial]
3120#[tokio::test(flavor = "multi_thread")]
3121async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
3122    solana_logger::setup_with_default("info");
3123    assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
3124}
3125
3126#[cfg(test)]
3127#[serial]
3128#[tokio::test(flavor = "multi_thread")]
3129async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
3130    solana_logger::setup_with_default("info");
3131    assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
3132}
3133
3134#[cfg(test)]
3135#[serial]
3136#[tokio::test(flavor = "multi_thread")]
3137async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
3138    solana_logger::setup_with_default("info");
3139    assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
3140}
3141
3142#[cfg(test)]
3143#[serial]
3144#[tokio::test(flavor = "multi_thread")]
3145async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
3146    solana_logger::setup_with_default("info");
3147    assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
3148}
3149
3150#[cfg(test)]
3151#[ignore]
3152#[serial]
3153#[tokio::test(flavor = "multi_thread")]
3154async fn debug_epoch_720_slot_311173980_node_summary() {
3155    solana_logger::setup_with_default("info");
3156    const SLOTS: &[u64] = &[
3157        311_173_980,
3158        311_225_232,
3159        311_175_860,
3160        311_134_608,
3161        376_273_722,
3162    ];
3163    for slot in SLOTS {
3164        log_slot_node_summary(*slot).await.expect("slot summary");
3165    }
3166}
3167
3168#[tokio::test(flavor = "multi_thread")]
3169async fn test_firehose_epoch_850_has_logs() {
3170    use std::sync::atomic::{AtomicU64, Ordering};
3171    solana_logger::setup_with_default("info");
3172    const START_SLOT: u64 = 367_200_075; // within epoch 850
3173    const SLOT_COUNT: u64 = 50;
3174    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3175
3176    TOTAL_TXS.store(0, Ordering::Relaxed);
3177
3178    firehose(
3179        4,
3180        false,
3181        None,
3182        START_SLOT..(START_SLOT + SLOT_COUNT),
3183        None::<OnBlockFn>,
3184        Some(|_thread_id: usize, transaction: TransactionData| {
3185            async move {
3186                TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3187                if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3188                    let has_logs = logs.iter().any(|msg| !msg.is_empty());
3189                    assert_eq!(has_logs, true);
3190                }
3191                Ok(())
3192            }
3193            .boxed()
3194        }),
3195        None::<OnEntryFn>,
3196        None::<OnRewardFn>,
3197        None::<OnErrorFn>,
3198        None::<OnStatsTrackingFn>,
3199        None,
3200    )
3201    .await
3202    .unwrap();
3203
3204    assert!(
3205        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3206        "no transactions observed in epoch 850 range"
3207    );
3208}
3209
3210#[tokio::test(flavor = "multi_thread")]
3211async fn test_firehose_epoch_850_votes_present() {
3212    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3213    solana_logger::setup_with_default("info");
3214    const TARGET_SLOT: u64 = 367_200_100; // epoch 850
3215    const SLOT_RADIUS: u64 = 10;
3216    static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3217    static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3218    static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3219
3220    SEEN_BLOCK.store(false, Ordering::Relaxed);
3221    VOTE_TXS.store(0, Ordering::Relaxed);
3222    TOTAL_TXS.store(0, Ordering::Relaxed);
3223
3224    firehose(
3225        2,
3226        false,
3227        None,
3228        (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3229        Some(|_thread_id: usize, block: BlockData| {
3230            async move {
3231                if block.slot() == TARGET_SLOT {
3232                    assert!(
3233                        !block.was_skipped(),
3234                        "target slot {TARGET_SLOT} was marked leader skipped",
3235                    );
3236                    SEEN_BLOCK.store(true, Ordering::Relaxed);
3237                }
3238                Ok(())
3239            }
3240            .boxed()
3241        }),
3242        Some(|_thread_id: usize, transaction: TransactionData| {
3243            async move {
3244                if transaction.slot == TARGET_SLOT {
3245                    TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3246                    if transaction.is_vote {
3247                        VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3248                    }
3249                }
3250                Ok(())
3251            }
3252            .boxed()
3253        }),
3254        None::<OnEntryFn>,
3255        None::<OnRewardFn>,
3256        None::<OnErrorFn>,
3257        None::<OnStatsTrackingFn>,
3258        None,
3259    )
3260    .await
3261    .unwrap();
3262
3263    assert!(
3264        SEEN_BLOCK.load(Ordering::Relaxed),
3265        "target slot was not processed"
3266    );
3267    assert!(
3268        TOTAL_TXS.load(Ordering::Relaxed) > 0,
3269        "no transactions counted in target slot"
3270    );
3271    assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3272}
3273
3274#[cfg(test)]
3275#[serial]
3276#[tokio::test(flavor = "multi_thread")]
3277async fn test_firehose_restart_loses_coverage_without_reset() {
3278    use std::collections::HashMap;
3279    solana_logger::setup_with_default("info");
3280    const THREADS: usize = 1;
3281    const START_SLOT: u64 = 345_600_000;
3282    const NUM_SLOTS: u64 = 8;
3283
3284    static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3285    COVERAGE
3286        .get_or_init(|| Mutex::new(HashMap::new()))
3287        .lock()
3288        .unwrap()
3289        .clear();
3290    static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3291    static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3292    FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3293    SEEN_BLOCKS.store(0, Ordering::Relaxed);
3294
3295    firehose(
3296        THREADS.try_into().unwrap(),
3297        false,
3298        None,
3299        START_SLOT..(START_SLOT + NUM_SLOTS),
3300        Some(|_thread_id: usize, block: BlockData| {
3301            async move {
3302                // Force an error after at least one block has been seen so restart happens mid-range.
3303                if !block.was_skipped()
3304                    && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3305                    && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3306                {
3307                    return Err("synthetic handler failure to exercise restart".into());
3308                }
3309                let mut coverage = COVERAGE
3310                    .get_or_init(|| Mutex::new(HashMap::new()))
3311                    .lock()
3312                    .unwrap();
3313                *coverage.entry(block.slot()).or_insert(0) += 1;
3314                if !block.was_skipped() {
3315                    SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3316                }
3317                Ok(())
3318            }
3319            .boxed()
3320        }),
3321        None::<OnTxFn>,
3322        None::<OnEntryFn>,
3323        None::<OnRewardFn>,
3324        None::<OnErrorFn>,
3325        None::<OnStatsTrackingFn>,
3326        None,
3327    )
3328    .await
3329    .unwrap();
3330
3331    let coverage = COVERAGE.get().unwrap().lock().unwrap();
3332    for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3333        assert!(
3334            coverage.contains_key(&slot),
3335            "missing coverage for slot {slot} after restart"
3336        );
3337    }
3338}
3339
3340#[cfg(test)]
3341#[serial]
3342#[tokio::test(flavor = "multi_thread")]
3343async fn test_firehose_gap_coverage_near_known_missing_range() {
3344    use std::collections::HashSet;
3345    solana_logger::setup_with_default("info");
3346    const GAP_START: u64 = 378864000;
3347    const START_SLOT: u64 = GAP_START - 1000;
3348    const END_SLOT: u64 = GAP_START + 1000;
3349    const THREADS: usize = 16;
3350
3351    static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3352    COVERAGE
3353        .get_or_init(|| Mutex::new(HashSet::new()))
3354        .lock()
3355        .unwrap()
3356        .clear();
3357
3358    firehose(
3359        THREADS.try_into().unwrap(),
3360        false,
3361        None,
3362        START_SLOT..(END_SLOT + 1),
3363        Some(|_thread_id: usize, block: BlockData| {
3364            async move {
3365                if block.was_skipped() {
3366                    return Ok(());
3367                }
3368                let slot = block.slot();
3369                COVERAGE
3370                    .get_or_init(|| Mutex::new(HashSet::new()))
3371                    .lock()
3372                    .unwrap()
3373                    .insert(slot);
3374                Ok(())
3375            }
3376            .boxed()
3377        }),
3378        None::<OnTxFn>,
3379        None::<OnEntryFn>,
3380        None::<OnRewardFn>,
3381        None::<OnErrorFn>,
3382        None::<OnStatsTrackingFn>,
3383        None,
3384    )
3385    .await
3386    .unwrap();
3387
3388    let mut coverage = COVERAGE
3389        .get_or_init(|| Mutex::new(HashSet::new()))
3390        .lock()
3391        .unwrap()
3392        .clone();
3393
3394    // ignore a known 4-slot leader skipped gap
3395    coverage.insert(378864396);
3396    coverage.insert(378864397);
3397    coverage.insert(378864398);
3398    coverage.insert(378864399);
3399
3400    let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3401    let missing: Vec<u64> = expected
3402        .iter()
3403        .copied()
3404        .filter(|slot| !coverage.contains(slot))
3405        .collect();
3406    assert!(
3407        missing.is_empty(),
3408        "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3409        missing.len(),
3410        &missing[..missing.len().min(10)]
3411    );
3412}