1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7 block_metadata_notifier_interface::BlockMetadataNotifier,
8 geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14 optimistically_confirmed_bank_tracker::SlotNotification,
15 transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21 fmt::Display,
22 future::Future,
23 io,
24 ops::Range,
25 path::PathBuf,
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29 },
30};
31use thiserror::Error;
32use tokio::{
33 sync::broadcast::{self, error::TryRecvError},
34 time::timeout,
35};
36
37use crate::{
38 LOG_MODULE, SharedError,
39 epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
40 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
41 node_reader::NodeReader,
42 utils,
43};
44
45const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49const BINCODE_EPOCH_CUTOFF: u64 = 157;
51
52fn poll_shutdown(
53 flag: &Arc<std::sync::atomic::AtomicBool>,
54 receiver: &mut Option<broadcast::Receiver<()>>,
55) -> bool {
56 if let Some(rx) = receiver {
57 match rx.try_recv() {
58 Ok(_) | Err(TryRecvError::Lagged(_)) => {
59 flag.store(true, Ordering::SeqCst);
60 }
61 Err(TryRecvError::Closed) => {
62 flag.store(true, Ordering::SeqCst);
63 }
64 Err(TryRecvError::Empty) => {}
65 }
66 }
67 flag.load(Ordering::SeqCst)
68}
69
70fn is_shutdown_error(err: &FirehoseError) -> bool {
71 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
72 inner
73 .downcast_ref::<io::Error>()
74 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
75 .unwrap_or(false)
76 }
77
78 match err {
79 FirehoseError::BlockHandlerError(inner)
80 | FirehoseError::TransactionHandlerError(inner)
81 | FirehoseError::EntryHandlerError(inner)
82 | FirehoseError::RewardHandlerError(inner)
83 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
84 _ => false,
85 }
86}
87
88#[derive(Debug, Error)]
91pub enum FirehoseError {
92 Reqwest(reqwest::Error),
94 ReadHeader(SharedError),
96 GeyserPluginService(GeyserPluginServiceError),
98 FailedToGetTransactionNotifier,
100 ReadUntilBlockError(SharedError),
102 GetBlockError(SharedError),
104 NodeDecodingError(usize, SharedError),
106 SlotOffsetIndexError(SlotOffsetIndexError),
108 SeekToSlotError(SharedError),
110 OnLoadError(SharedError),
112 OnStatsHandlerError(SharedError),
114 OperationTimeout(&'static str),
116 TransactionHandlerError(SharedError),
118 EntryHandlerError(SharedError),
120 RewardHandlerError(SharedError),
122 BlockHandlerError(SharedError),
124}
125
126unsafe impl Send for FirehoseError {}
127unsafe impl Sync for FirehoseError {}
128
129impl Display for FirehoseError {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 match self {
132 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
133 FirehoseError::ReadHeader(error) => {
134 write!(f, "Error reading header: {}", error)
135 }
136 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
137 f,
138 "Error initializing geyser plugin service: {}",
139 geyser_plugin_service_error
140 ),
141 FirehoseError::FailedToGetTransactionNotifier => write!(
142 f,
143 "Failed to get transaction notifier from GeyserPluginService"
144 ),
145 FirehoseError::ReadUntilBlockError(error) => {
146 write!(f, "Error reading until block: {}", error)
147 }
148 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
149 FirehoseError::NodeDecodingError(item_index, error) => {
150 write!(
151 f,
152 "Error seeking, reading data from, or decoding data for data node {}: {}",
153 item_index, error
154 )
155 }
156 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
157 f,
158 "Error getting info from slot offset index: {}",
159 slot_offset_index_error
160 ),
161 FirehoseError::SeekToSlotError(error) => {
162 write!(f, "Error seeking to slot: {}", error)
163 }
164 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
165 FirehoseError::OnStatsHandlerError(error) => {
166 write!(f, "Stats handler error: {}", error)
167 }
168 FirehoseError::OperationTimeout(op) => {
169 write!(f, "Timeout while waiting for operation: {}", op)
170 }
171 FirehoseError::TransactionHandlerError(error) => {
172 write!(f, "Transaction handler error: {}", error)
173 }
174 FirehoseError::EntryHandlerError(error) => {
175 write!(f, "Entry handler error: {}", error)
176 }
177 FirehoseError::RewardHandlerError(error) => {
178 write!(f, "Reward handler error: {}", error)
179 }
180 FirehoseError::BlockHandlerError(error) => {
181 write!(f, "Block handler error: {}", error)
182 }
183 }
184 }
185}
186
187impl From<reqwest::Error> for FirehoseError {
188 fn from(e: reqwest::Error) -> Self {
189 FirehoseError::Reqwest(e)
190 }
191}
192
193impl From<GeyserPluginServiceError> for FirehoseError {
194 fn from(e: GeyserPluginServiceError) -> Self {
195 FirehoseError::GeyserPluginService(e)
196 }
197}
198
199impl From<SlotOffsetIndexError> for FirehoseError {
200 fn from(e: SlotOffsetIndexError) -> Self {
201 FirehoseError::SlotOffsetIndexError(e)
202 }
203}
204
205#[derive(Clone, PartialEq, Eq, Hash, Debug)]
207pub struct ThreadStats {
208 pub thread_id: usize,
210 pub start_time: std::time::Instant,
212 pub finish_time: Option<std::time::Instant>,
214 pub slot_range: Range<u64>,
216 pub initial_slot_range: Range<u64>,
218 pub current_slot: u64,
220 pub slots_processed: u64,
222 pub blocks_processed: u64,
224 pub leader_skipped_slots: u64,
226 pub transactions_processed: u64,
228 pub entries_processed: u64,
230 pub rewards_processed: u64,
232}
233
234#[derive(Clone, PartialEq, Eq, Hash, Debug)]
236pub struct Stats {
237 pub thread_stats: ThreadStats,
239 pub start_time: std::time::Instant,
241 pub finish_time: Option<std::time::Instant>,
243 pub slot_range: Range<u64>,
245 pub slots_processed: u64,
247 pub blocks_processed: u64,
249 pub leader_skipped_slots: u64,
251 pub transactions_processed: u64,
253 pub entries_processed: u64,
255 pub rewards_processed: u64,
257 pub transactions_since_last_pulse: u64,
259 pub blocks_since_last_pulse: u64,
261 pub slots_since_last_pulse: u64,
263 pub time_since_last_pulse: std::time::Duration,
265}
266
267#[derive(Clone, PartialEq, Eq, Hash, Debug)]
269pub struct StatsTracking<OnStats: Handler<Stats>> {
270 pub on_stats: OnStats,
272 pub tracking_interval_slots: u64,
274}
275
276#[inline(always)]
277#[allow(clippy::too_many_arguments)]
278async fn maybe_emit_stats<OnStats: Handler<Stats>>(
279 stats_tracking: Option<&StatsTracking<OnStats>>,
280 thread_index: usize,
281 thread_stats: &ThreadStats,
282 overall_slots_processed: &AtomicU64,
283 overall_blocks_processed: &AtomicU64,
284 overall_transactions_processed: &AtomicU64,
285 overall_entries_processed: &AtomicU64,
286 transactions_since_stats: &AtomicU64,
287 blocks_since_stats: &AtomicU64,
288 slots_since_stats: &AtomicU64,
289 last_pulse: &Arc<AtomicU64>,
290 base_instant: std::time::Instant,
291) -> Result<(), (FirehoseError, u64)> {
292 if let Some(stats_tracker) = stats_tracking {
293 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
294 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
295 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
296 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
297 let now_nanos = base_instant.elapsed().as_nanos() as u64;
298 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
299 let delta_nanos = now_nanos.saturating_sub(previous);
300 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
301 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
302 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
303 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
304
305 let stats = Stats {
306 thread_stats: thread_stats.clone(),
307 start_time: thread_stats.start_time,
308 finish_time: thread_stats.finish_time,
309 slot_range: thread_stats.slot_range.clone(),
310 slots_processed: total_slots,
311 blocks_processed: total_blocks,
312 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
313 transactions_processed: total_transactions,
314 entries_processed: total_entries,
315 rewards_processed: thread_stats.rewards_processed,
316 transactions_since_last_pulse: processed_transactions,
317 blocks_since_last_pulse: processed_blocks,
318 slots_since_last_pulse: processed_slots,
319 time_since_last_pulse,
320 };
321
322 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
323 last_pulse.store(previous, Ordering::Relaxed);
324 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
325 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
326 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
327 return Err((
328 FirehoseError::OnStatsHandlerError(e),
329 thread_stats.current_slot,
330 ));
331 }
332 }
333 Ok(())
334}
335
336#[inline(always)]
337fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
338 if tracking_enabled {
339 atomic.fetch_add(value, Ordering::Relaxed);
340 }
341}
342
343fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
344 map.get(&thread_id)
345 .map(|set| set.remove(&slot).is_some())
346 .unwrap_or(false)
347}
348
349fn decode_transaction_status_meta_from_frame(
350 slot: u64,
351 reassembled_metadata: Vec<u8>,
352) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
353 if reassembled_metadata.is_empty() {
354 return Ok(solana_transaction_status::TransactionStatusMeta::default());
356 }
357
358 match utils::decompress_zstd(reassembled_metadata.clone()) {
359 Ok(decompressed) => {
360 decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
361 Box::new(std::io::Error::other(format!(
362 "decode transaction metadata (slot {slot}): {err}"
363 ))) as SharedError
364 })
365 }
366 Err(decomp_err) => {
367 decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
370 Box::new(std::io::Error::other(format!(
371 "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
372 ))) as SharedError
373 })
374 }
375 }
376}
377
378fn decode_transaction_status_meta(
379 slot: u64,
380 metadata_bytes: &[u8],
381) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
382 let epoch = slot_to_epoch(slot);
383 let mut bincode_err: Option<String> = None;
384 if epoch < BINCODE_EPOCH_CUTOFF {
385 match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
386 metadata_bytes,
387 ) {
388 Ok(stored) => return Ok(stored.into()),
389 Err(err) => {
390 bincode_err = Some(err.to_string());
391 }
392 }
393 }
394
395 let bin_err_for_proto = bincode_err.clone();
396 let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
397 prost_011::Message::decode(metadata_bytes).map_err(|err| {
398 if let Some(ref bin_err) = bin_err_for_proto {
400 Box::new(std::io::Error::other(format!(
401 "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
402 ))) as SharedError
403 } else {
404 Box::new(std::io::Error::other(format!(
405 "protobuf decode transaction metadata: {err}"
406 ))) as SharedError
407 }
408 })?;
409
410 proto.try_into().map_err(|err| {
411 if let Some(ref bin_err) = bincode_err {
412 Box::new(std::io::Error::other(format!(
413 "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
414 ))) as SharedError
415 } else {
416 Box::new(std::io::Error::other(format!(
417 "convert transaction metadata proto: {err}"
418 ))) as SharedError
419 }
420 })
421}
422
423#[cfg(test)]
424mod metadata_decode_tests {
425 use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
426 use solana_message::v0::LoadedAddresses;
427 use solana_storage_proto::StoredTransactionStatusMeta;
428 use solana_transaction_status::TransactionStatusMeta;
429
430 fn sample_meta() -> TransactionStatusMeta {
431 let mut meta = TransactionStatusMeta::default();
432 meta.fee = 42;
433 meta.pre_balances = vec![1, 2];
434 meta.post_balances = vec![3, 4];
435 meta.log_messages = Some(vec!["hello".into()]);
436 meta.pre_token_balances = Some(Vec::new());
437 meta.post_token_balances = Some(Vec::new());
438 meta.rewards = Some(Vec::new());
439 meta.compute_units_consumed = Some(7);
440 meta.cost_units = Some(9);
441 meta.loaded_addresses = LoadedAddresses::default();
442 meta
443 }
444
445 #[test]
446 fn decodes_bincode_metadata_for_early_epochs() {
447 let stored = StoredTransactionStatusMeta {
448 status: Ok(()),
449 fee: 42,
450 pre_balances: vec![1, 2],
451 post_balances: vec![3, 4],
452 inner_instructions: None,
453 log_messages: Some(vec!["hello".into()]),
454 pre_token_balances: Some(Vec::new()),
455 post_token_balances: Some(Vec::new()),
456 rewards: Some(Vec::new()),
457 return_data: None,
458 compute_units_consumed: Some(7),
459 cost_units: Some(9),
460 };
461 let bytes = bincode::serialize(&stored).expect("bincode serialize");
462 let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
463 assert_eq!(decoded, TransactionStatusMeta::from(stored));
464 }
465
466 #[test]
467 fn decodes_protobuf_metadata_for_later_epochs() {
468 let meta = sample_meta();
469 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
470 meta.clone().into();
471 let bytes = prost_011::Message::encode_to_vec(&generated);
472 let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
473 assert_eq!(decoded, meta);
474 }
475
476 #[test]
477 fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
478 let meta = sample_meta();
479 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
480 meta.clone().into();
481 let bytes = prost_011::Message::encode_to_vec(&generated);
482 let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
484 assert_eq!(decoded, meta);
485 }
486
487 #[test]
488 fn empty_frame_decodes_to_default() {
489 let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
490 assert_eq!(decoded, TransactionStatusMeta::default());
491 }
492
493 #[test]
494 fn raw_bincode_frame_without_zstd_still_decodes() {
495 let stored = StoredTransactionStatusMeta {
496 status: Ok(()),
497 fee: 1,
498 pre_balances: vec![],
499 post_balances: vec![],
500 inner_instructions: None,
501 log_messages: None,
502 pre_token_balances: Some(Vec::new()),
503 post_token_balances: Some(Vec::new()),
504 rewards: Some(Vec::new()),
505 return_data: None,
506 compute_units_consumed: None,
507 cost_units: None,
508 };
509 let raw_bytes = bincode::serialize(&stored).expect("serialize");
510 let decoded =
511 decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
512 assert_eq!(decoded, TransactionStatusMeta::from(stored));
513 }
514}
515
516#[derive(Debug, Clone)]
518pub struct TransactionData {
519 pub slot: u64,
521 pub transaction_slot_index: usize,
523 pub signature: solana_signature::Signature,
525 pub message_hash: Hash,
527 pub is_vote: bool,
529 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
531 pub transaction: VersionedTransaction,
533}
534
535#[derive(Debug, Clone)]
537pub struct EntryData {
538 pub slot: u64,
540 pub entry_index: usize,
542 pub transaction_indexes: Range<usize>,
544 pub num_hashes: u64,
546 pub hash: Hash,
548}
549
550#[derive(Debug, Clone)]
552pub struct RewardsData {
553 pub slot: u64,
555 pub rewards: Vec<(Address, RewardInfo)>,
557}
558
559#[derive(Debug)]
561pub enum BlockData {
562 Block {
564 parent_slot: u64,
566 parent_blockhash: Hash,
568 slot: u64,
570 blockhash: Hash,
572 rewards: KeyedRewardsAndNumPartitions,
574 block_time: Option<i64>,
576 block_height: Option<u64>,
578 executed_transaction_count: u64,
580 entry_count: u64,
582 },
583 PossibleLeaderSkipped {
586 slot: u64,
588 },
589}
590
591impl BlockData {
592 #[inline(always)]
594 pub const fn slot(&self) -> u64 {
595 match self {
596 BlockData::Block { slot, .. } => *slot,
597 BlockData::PossibleLeaderSkipped { slot } => *slot,
598 }
599 }
600
601 #[inline(always)]
603 pub const fn was_skipped(&self) -> bool {
604 matches!(self, BlockData::PossibleLeaderSkipped { .. })
605 }
606
607 #[inline(always)]
609 pub const fn block_time(&self) -> Option<i64> {
610 match self {
611 BlockData::Block { block_time, .. } => *block_time,
612 BlockData::PossibleLeaderSkipped { .. } => None,
613 }
614 }
615}
616
617type HandlerResult = Result<(), SharedError>;
618type HandlerFuture = BoxFuture<'static, HandlerResult>;
619
620pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
622
623impl<Data, F> Handler<Data> for F where
624 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
625{
626}
627
628pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
630pub type OnBlockFn = HandlerFn<BlockData>;
632pub type OnTxFn = HandlerFn<TransactionData>;
634pub type OnEntryFn = HandlerFn<EntryData>;
636pub type OnRewardFn = HandlerFn<RewardsData>;
638pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
640pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
642pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
644
645#[derive(Clone, Debug)]
647pub struct FirehoseErrorContext {
648 pub thread_id: usize,
650 pub slot: u64,
652 pub epoch: u64,
654 pub error_message: String,
656}
657
658#[inline]
663#[allow(clippy::too_many_arguments)]
664pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
665 threads: u64,
666 slot_range: Range<u64>,
667 on_block: Option<OnBlock>,
668 on_tx: Option<OnTransaction>,
669 on_entry: Option<OnEntry>,
670 on_rewards: Option<OnRewards>,
671 on_error: Option<OnError>,
672 stats_tracking: Option<StatsTracking<OnStats>>,
673 shutdown_signal: Option<broadcast::Receiver<()>>,
674) -> Result<(), (FirehoseError, u64)>
675where
676 OnBlock: Handler<BlockData>,
677 OnTransaction: Handler<TransactionData>,
678 OnEntry: Handler<EntryData>,
679 OnRewards: Handler<RewardsData>,
680 OnStats: Handler<Stats>,
681 OnError: Handler<FirehoseErrorContext>,
682{
683 if threads == 0 {
684 return Err((
685 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
686 slot_range.start,
687 ));
688 }
689 let client = crate::network::create_http_client();
690 log::info!(target: LOG_MODULE, "starting firehose...");
691 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
692
693 let slot_range = Arc::new(slot_range);
694
695 let subranges = generate_subranges(&slot_range, threads);
697 if threads > 1 {
698 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
699 }
700
701 let firehose_start = std::time::Instant::now();
702 let shutdown_flag = Arc::new(AtomicBool::new(false));
703 if let Some(ref rx) = shutdown_signal {
704 let mut rx = rx.resubscribe();
705 let flag = shutdown_flag.clone();
706 tokio::spawn(async move {
707 if rx.recv().await.is_ok() {
708 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
709 flag.store(true, Ordering::SeqCst);
710 }
711 });
712 }
713 let mut handles = Vec::new();
714 let error_counts: Arc<Vec<AtomicU32>> =
716 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
717
718 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
719 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
720 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
721 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
722 let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
723
724 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
725 let error_counts = error_counts.clone();
726 let client = client.clone();
727 let on_block = on_block.clone();
728 let on_tx = on_tx.clone();
729 let on_entry = on_entry.clone();
730 let on_reward = on_rewards.clone();
731 let on_error = on_error.clone();
732 let overall_slots_processed = overall_slots_processed.clone();
733 let overall_blocks_processed = overall_blocks_processed.clone();
734 let overall_transactions_processed = overall_transactions_processed.clone();
735 let overall_entries_processed = overall_entries_processed.clone();
736 let stats_tracking = stats_tracking.clone();
737 let transactions_since_stats = Arc::new(AtomicU64::new(0));
738 let blocks_since_stats = Arc::new(AtomicU64::new(0));
739 let slots_since_stats = Arc::new(AtomicU64::new(0));
740 let last_pulse = Arc::new(AtomicU64::new(0));
741 let transactions_since_stats_cloned = transactions_since_stats.clone();
742 let blocks_since_stats_cloned = blocks_since_stats.clone();
743 let slots_since_stats_cloned = slots_since_stats.clone();
744 let last_pulse_cloned = last_pulse.clone();
745 let shutdown_flag = shutdown_flag.clone();
746 let pending_skipped_slots = pending_skipped_slots.clone();
747 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
748
749 let handle = tokio::spawn(async move {
750 let transactions_since_stats = transactions_since_stats_cloned;
751 let blocks_since_stats = blocks_since_stats_cloned;
752 let slots_since_stats = slots_since_stats_cloned;
753 let last_pulse = last_pulse_cloned;
754 let mut shutdown_rx = thread_shutdown_rx;
755 let start_time = firehose_start;
756 last_pulse.store(
757 firehose_start.elapsed().as_nanos() as u64,
758 Ordering::Relaxed,
759 );
760 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
761 let mut skip_until_index = None;
762 let last_emitted_slot = slot_range.start.saturating_sub(1);
763 let block_enabled = on_block.is_some();
764 let tx_enabled = on_tx.is_some();
765 let entry_enabled = on_entry.is_some();
766 let reward_enabled = on_reward.is_some();
767 let tracking_enabled = stats_tracking.is_some();
768 if block_enabled {
769 pending_skipped_slots.entry(thread_index).or_default();
770 }
771 let mut last_counted_slot = slot_range.start.saturating_sub(1);
772 let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
773 let mut thread_stats = if tracking_enabled {
774 Some(ThreadStats {
775 thread_id: thread_index,
776 start_time,
777 finish_time: None,
778 slot_range: slot_range.clone(),
779 initial_slot_range: slot_range.clone(),
780 current_slot: slot_range.start,
781 slots_processed: 0,
782 blocks_processed: 0,
783 leader_skipped_slots: 0,
784 transactions_processed: 0,
785 entries_processed: 0,
786 rewards_processed: 0,
787 })
788 } else {
789 None
790 };
791
792 while let Err((err, slot)) = async {
794 let mut last_emitted_slot = last_emitted_slot_global;
795 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
796 log::info!(
797 target: &log_target,
798 "shutdown requested; terminating firehose thread {}",
799 thread_index
800 );
801 return Ok(());
802 }
803 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
804 log::info!(
805 target: &log_target,
806 "slot range: {} (epoch {}) ... {} (epoch {})",
807 slot_range.start,
808 slot_to_epoch(slot_range.start),
809 slot_range.end,
810 slot_to_epoch(slot_range.end)
811 );
812
813 log::info!(target: &log_target, "🚒 starting firehose...");
814
815 let mut current_slot: Option<u64> = None;
817 for epoch_num in epoch_range.clone() {
818 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
819 log::info!(
820 target: &log_target,
821 "shutdown requested; terminating firehose thread {}",
822 thread_index
823 );
824 return Ok(());
825 }
826 log::info!(target: &log_target, "entering epoch {}", epoch_num);
827 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
828 Ok(stream) => stream,
829 Err(_) => {
830 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
831 }
832 };
833 let mut reader = NodeReader::new(stream);
834
835 let header_fut = reader.read_raw_header();
836 let header = match timeout(OP_TIMEOUT, header_fut).await {
837 Ok(res) => res
838 .map_err(FirehoseError::ReadHeader)
839 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
840 Err(_) => {
841 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
842 }
843 };
844 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
845
846 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
847 let local_start = std::cmp::max(slot_range.start, epoch_start);
848 let local_end_inclusive =
849 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
850 if local_start > local_end_inclusive {
851 log::debug!(
852 target: &log_target,
853 "epoch {} has no overlap with thread range ({}..{}), skipping",
854 epoch_num,
855 slot_range.start,
856 slot_range.end
857 );
858 continue;
859 }
860
861 let mut previous_blockhash = Hash::default();
862 let mut latest_entry_blockhash = Hash::default();
863 last_counted_slot = local_start.saturating_sub(1);
866 current_slot = None;
867 if tracking_enabled
868 && let Some(ref mut stats) = thread_stats {
869 stats.current_slot = local_start;
870 stats.slot_range.start = local_start;
871 }
872
873 if local_start > epoch_start {
874 let seek_slot = local_start.saturating_sub(1);
878 let seek_fut = reader.seek_to_slot(seek_slot);
879 match timeout(OP_TIMEOUT, seek_fut).await {
880 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
881 Err(_) => {
882 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
883 }
884 }
885 }
886
887 let mut item_index = 0;
889 let mut displayed_skip_message = false;
890 loop {
891 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
892 log::info!(
893 target: &log_target,
894 "shutdown requested; terminating firehose thread {}",
895 thread_index
896 );
897 return Ok(());
898 }
899 let read_fut = reader.read_until_block();
900 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
901 Ok(result) => result
902 .map_err(FirehoseError::ReadUntilBlockError)
903 .map_err(|e| {
904 (
905 e,
906 current_slot
907 .map(|slot| slot.saturating_add(1))
908 .unwrap_or(slot_range.start),
909 )
910 })?,
911 Err(_) => {
912 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
913 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
914 }
915 };
916 if nodes.is_empty() {
917 log::info!(
918 target: &log_target,
919 "reached end of epoch {}",
920 epoch_num
921 );
922 break;
923 }
924 if let Some(last_node) = nodes.0.last()
925 && !last_node.get_node().is_block()
926 {
927 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
928 break;
929 }
930 let block = nodes
931 .get_block()
932 .map_err(FirehoseError::GetBlockError)
933 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
934 log::debug!(
935 target: &log_target,
936 "read {} items from epoch {}, now at slot {}",
937 item_index,
938 epoch_num,
939 block.slot
940 );
941 let slot = block.slot;
942 if slot > local_end_inclusive {
943 log::debug!(
944 target: &log_target,
945 "reached end of local slice at slot {} (epoch {}), stopping",
946 slot,
947 epoch_num
948 );
949 break;
950 }
951 if slot >= slot_range.end {
952 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
953 if block_enabled {
958 pending_skipped_slots.remove(&thread_index);
959 }
960 return Ok(());
961 }
962 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
963 if slot < slot_range.start {
964 if slot.saturating_add(1) == slot_range.start {
965 log::debug!(
966 target: &log_target,
967 "priming reader with preceding slot {}, skipping",
968 slot
969 );
970 } else {
971 log::warn!(
972 target: &log_target,
973 "encountered slot {} before start of range {}, skipping",
974 slot,
975 slot_range.start
976 );
977 }
978 continue;
979 }
980 current_slot = Some(slot);
981 let mut entry_index: usize = 0;
982 let mut this_block_executed_transaction_count: u64 = 0;
983 let mut this_block_entry_count: u64 = 0;
984 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
985
986 for node_with_cid in &nodes.0 {
987 item_index += 1;
988 if let Some(skip) = skip_until_index {
989 if item_index < skip {
990 if !displayed_skip_message {
991 log::info!(
992 target: &log_target,
993 "skipping until index {} (at {})",
994 skip,
995 item_index
996 );
997 displayed_skip_message = true;
998 }
999 continue;
1000 } else {
1001 log::info!(
1002 target: &log_target,
1003 "reached target index {}, resuming...",
1004 skip
1005 );
1006 skip_until_index = None;
1007 }
1008 }
1009 let node = node_with_cid.get_node();
1010
1011 if let Some(ref mut stats) = thread_stats {
1012 stats.current_slot = slot;
1013 }
1014
1015 let error_slot = current_slot.unwrap_or(slot_range.start);
1016
1017 use crate::node::Node::*;
1018 match node {
1019 Transaction(tx) => {
1020 if tx_enabled
1021 && let Some(on_tx_cb) = on_tx.as_ref()
1022 {
1023 let error_slot = current_slot.unwrap_or(slot_range.start);
1024 let versioned_tx = tx.as_parsed().map_err(|err| {
1025 (
1026 FirehoseError::NodeDecodingError(item_index, err),
1027 error_slot,
1028 )
1029 })?;
1030 let reassembled_metadata = nodes
1031 .reassemble_dataframes(tx.metadata.clone())
1032 .map_err(|err| {
1033 (
1034 FirehoseError::NodeDecodingError(item_index, err),
1035 error_slot,
1036 )
1037 })?;
1038
1039 let as_native_metadata = decode_transaction_status_meta_from_frame(
1040 block.slot,
1041 reassembled_metadata,
1042 )
1043 .map_err(|err| {
1044 (
1045 FirehoseError::NodeDecodingError(item_index, err),
1046 error_slot,
1047 )
1048 })?;
1049
1050 let message_hash = {
1051 #[cfg(feature = "verify-transaction-signatures")]
1052 {
1053 versioned_tx.verify_and_hash_message().map_err(|err| {
1054 (
1055 FirehoseError::TransactionHandlerError(Box::new(err)),
1056 error_slot,
1057 )
1058 })?
1059 }
1060 #[cfg(not(feature = "verify-transaction-signatures"))]
1061 {
1062 versioned_tx.message.hash()
1063 }
1064 };
1065 let signature = versioned_tx
1066 .signatures
1067 .first()
1068 .ok_or_else(|| {
1069 Box::new(std::io::Error::new(
1070 std::io::ErrorKind::InvalidData,
1071 "transaction missing signature",
1072 )) as SharedError
1073 })
1074 .map_err(|err| {
1075 (
1076 FirehoseError::NodeDecodingError(
1077 item_index,
1078 err,
1079 ),
1080 error_slot,
1081 )
1082 })?;
1083 let is_vote = is_simple_vote_transaction(&versioned_tx);
1084
1085 on_tx_cb(
1086 thread_index,
1087 TransactionData {
1088 slot: block.slot,
1089 transaction_slot_index: tx.index.unwrap() as usize,
1090 signature: *signature,
1091 message_hash,
1092 is_vote,
1093 transaction_status_meta: as_native_metadata.clone(),
1094 transaction: versioned_tx.clone(),
1095 },
1096 )
1097 .await
1098 .map_err(|e| {
1099 (
1100 FirehoseError::TransactionHandlerError(e),
1101 error_slot,
1102 )
1103 })?;
1104 }
1105 fetch_add_if(
1106 tracking_enabled,
1107 &overall_transactions_processed,
1108 1,
1109 );
1110 if let Some(ref mut stats) = thread_stats {
1111 stats.transactions_processed += 1;
1112 }
1113 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1114 }
1115 Entry(entry) => {
1116 let entry_hash = Hash::from(entry.hash.to_bytes());
1117 let entry_transaction_count = entry.transactions.len();
1118 let entry_transaction_count_u64 = entry_transaction_count as u64;
1119 let starting_transaction_index_u64 =
1120 this_block_executed_transaction_count;
1121 latest_entry_blockhash = entry_hash;
1122 this_block_executed_transaction_count += entry_transaction_count_u64;
1123 this_block_entry_count += 1;
1124
1125 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1126 let starting_transaction_index = usize::try_from(
1127 starting_transaction_index_u64,
1128 )
1129 .map_err(|err| {
1130 (
1131 FirehoseError::EntryHandlerError(Box::new(err)),
1132 error_slot,
1133 )
1134 })?;
1135 let transaction_indexes_end =
1136 starting_transaction_index + entry_transaction_count;
1137 on_entry_cb(
1138 thread_index,
1139 EntryData {
1140 slot: block.slot,
1141 entry_index,
1142 transaction_indexes: starting_transaction_index
1143 ..transaction_indexes_end,
1144 num_hashes: entry.num_hashes,
1145 hash: entry_hash,
1146 },
1147 )
1148 .await
1149 .map_err(|e| {
1150 (
1151 FirehoseError::EntryHandlerError(e),
1152 error_slot,
1153 )
1154 })?;
1155 }
1156 entry_index += 1;
1157 fetch_add_if(
1158 tracking_enabled,
1159 &overall_entries_processed,
1160 1,
1161 );
1162 if let Some(ref mut stats) = thread_stats {
1163 stats.entries_processed += 1;
1164 }
1165 }
1166 Block(block) => {
1167 let prev_last_counted_slot = last_counted_slot;
1168 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1169 (
1170 stats.slots_processed,
1171 stats.blocks_processed,
1172 stats.leader_skipped_slots,
1173 stats.current_slot,
1174 )
1175 });
1176
1177 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1178 let skip_start_from_previous = last_counted_slot.saturating_add(1);
1179 let skip_start = skip_start_from_previous.max(next_expected_slot);
1180
1181 let skipped_epoch = slot_to_epoch(last_counted_slot);
1182 for skipped_slot in skip_start..slot {
1183 if slot_to_epoch(skipped_slot) != skipped_epoch {
1184 break;
1185 }
1186 log::debug!(
1187 target: &log_target,
1188 "leader skipped slot {} (prev_counted {}, current slot {})",
1189 skipped_slot,
1190 prev_last_counted_slot,
1191 slot,
1192 );
1193 if block_enabled {
1194 pending_skipped_slots
1195 .entry(thread_index)
1196 .or_default()
1197 .insert(skipped_slot);
1198 }
1199 if block_enabled
1200 && let Some(on_block_cb) = on_block.as_ref()
1201 && skipped_slot > last_emitted_slot {
1202 last_emitted_slot = skipped_slot;
1203 on_block_cb(
1204 thread_index,
1205 BlockData::PossibleLeaderSkipped {
1206 slot: skipped_slot,
1207 },
1208 )
1209 .await
1210 .map_err(|e| {
1211 (
1212 FirehoseError::BlockHandlerError(e),
1213 error_slot,
1214 )
1215 })?;
1216 }
1217 if tracking_enabled {
1218 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1219 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1220 if let Some(ref mut stats) = thread_stats {
1221 stats.leader_skipped_slots += 1;
1222 stats.slots_processed += 1;
1223 stats.current_slot = skipped_slot;
1224 }
1225 }
1226 last_counted_slot = skipped_slot;
1227 }
1228
1229 let cleared_pending_skip = if block_enabled {
1230 clear_pending_skip(
1231 &pending_skipped_slots,
1232 thread_index,
1233 slot,
1234 )
1235 } else {
1236 false
1237 };
1238
1239 if slot <= last_counted_slot && !cleared_pending_skip {
1240 log::debug!(
1241 target: &log_target,
1242 "duplicate block {}, already counted (last_counted={})",
1243 slot,
1244 last_counted_slot,
1245 );
1246 this_block_rewards.clear();
1247 continue;
1248 }
1249
1250 if block_enabled {
1251 if let Some(on_block_cb) = on_block.as_ref() {
1252 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1253 if slot > last_emitted_slot {
1254 last_emitted_slot = slot;
1255 on_block_cb(
1256 thread_index,
1257 BlockData::Block {
1258 parent_slot: block.meta.parent_slot,
1259 parent_blockhash: previous_blockhash,
1260 slot: block.slot,
1261 blockhash: latest_entry_blockhash,
1262 rewards: KeyedRewardsAndNumPartitions {
1263 keyed_rewards,
1264 num_partitions: None,
1265 },
1266 block_time: Some(block.meta.blocktime as i64),
1267 block_height: block.meta.block_height,
1268 executed_transaction_count:
1269 this_block_executed_transaction_count,
1270 entry_count: this_block_entry_count,
1271 },
1272 )
1273 .await
1274 .map_err(|e| {
1275 (
1276 FirehoseError::BlockHandlerError(e),
1277 error_slot,
1278 )
1279 })?;
1280 }
1281 }
1282 } else {
1283 this_block_rewards.clear();
1284 }
1285 previous_blockhash = latest_entry_blockhash;
1286
1287 if tracking_enabled {
1288 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1289 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1290 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1291 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1292 if let Some(ref mut stats) = thread_stats {
1293 stats.blocks_processed += 1;
1294 stats.slots_processed += 1;
1295 stats.current_slot = slot;
1296 }
1297
1298 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1299 (&stats_tracking, thread_stats.as_mut())
1300 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1301 && let Err(err) = maybe_emit_stats(
1302 stats_tracking.as_ref(),
1303 thread_index,
1304 thread_stats_ref,
1305 &overall_slots_processed,
1306 &overall_blocks_processed,
1307 &overall_transactions_processed,
1308 &overall_entries_processed,
1309 &transactions_since_stats,
1310 &blocks_since_stats,
1311 &slots_since_stats,
1312 &last_pulse,
1313 start_time,
1314 )
1315 .await
1316 {
1317 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1318 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1319 overall_blocks_processed
1320 .fetch_sub(1, Ordering::Relaxed);
1321 overall_slots_processed
1322 .fetch_sub(1, Ordering::Relaxed);
1323 if let Some((
1324 prev_slots_processed,
1325 prev_blocks_processed,
1326 prev_leader_skipped,
1327 prev_current_slot,
1328 )) = thread_stats_snapshot
1329 {
1330 thread_stats_ref.slots_processed =
1331 prev_slots_processed;
1332 thread_stats_ref.blocks_processed =
1333 prev_blocks_processed;
1334 thread_stats_ref.leader_skipped_slots =
1335 prev_leader_skipped;
1336 thread_stats_ref.current_slot =
1337 prev_current_slot;
1338 }
1339 last_counted_slot = prev_last_counted_slot;
1340 return Err(err);
1341 }
1342 }
1343
1344 if slot > last_counted_slot {
1345 last_counted_slot = slot;
1346 }
1347 }
1348 Subset(_subset) => (),
1349 Epoch(_epoch) => (),
1350 Rewards(rewards) => {
1351 if reward_enabled || block_enabled {
1352 let reassembled = nodes
1353 .reassemble_dataframes(rewards.data.clone())
1354 .map_err(|err| {
1355 (
1356 FirehoseError::NodeDecodingError(item_index, err),
1357 current_slot.unwrap_or(slot_range.start),
1358 )
1359 })?;
1360 if reassembled.is_empty() {
1361 this_block_rewards.clear();
1362 if reward_enabled
1363 && let Some(on_reward_cb) = on_reward.as_ref()
1364 {
1365 on_reward_cb(
1366 thread_index,
1367 RewardsData {
1368 slot: block.slot,
1369 rewards: Vec::new(),
1370 },
1371 )
1372 .await
1373 .map_err(|e| {
1374 (
1375 FirehoseError::RewardHandlerError(e),
1376 error_slot,
1377 )
1378 })?;
1379 }
1380 continue;
1381 }
1382
1383 let decompressed = utils::decompress_zstd(reassembled)
1384 .map_err(|err| {
1385 (
1386 FirehoseError::NodeDecodingError(
1387 item_index,
1388 err,
1389 ),
1390 error_slot,
1391 )
1392 })?;
1393
1394 let decoded =
1395 prost_011::Message::decode(decompressed.as_slice())
1396 .map_err(|err| {
1397 (
1398 FirehoseError::NodeDecodingError(
1399 item_index,
1400 Box::new(err),
1401 ),
1402 error_slot,
1403 )
1404 })?;
1405 let keyed_rewards = convert_proto_rewards(&decoded)
1406 .map_err(|err| {
1407 (
1408 FirehoseError::NodeDecodingError(item_index, err),
1409 error_slot,
1410 )
1411 })?;
1412 if reward_enabled
1413 && let Some(on_reward_cb) = on_reward.as_ref()
1414 {
1415 on_reward_cb(
1416 thread_index,
1417 RewardsData {
1418 slot: block.slot,
1419 rewards: keyed_rewards.clone(),
1420 },
1421 )
1422 .await
1423 .map_err(|e| {
1424 (
1425 FirehoseError::RewardHandlerError(e),
1426 error_slot,
1427 )
1428 })?;
1429 }
1430 this_block_rewards = keyed_rewards;
1431 if let Some(ref mut stats) = thread_stats {
1432 stats.rewards_processed +=
1433 this_block_rewards.len() as u64;
1434 }
1435 }
1436 }
1437 DataFrame(_data_frame) => (),
1438 }
1439 }
1440 if block.slot == slot_range.end - 1 {
1441 let finish_time = std::time::Instant::now();
1442 let elapsed = finish_time.duration_since(start_time);
1443 log::info!(target: &log_target, "processed slot {}", block.slot);
1444 let elapsed_pretty = human_readable_duration(elapsed);
1445 log::info!(
1446 target: &log_target,
1447 "processed {} slots across {} epochs in {}.",
1448 slot_range.end - slot_range.start,
1449 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1450 elapsed_pretty
1451 );
1452 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1453 let summary: String = error_counts
1456 .iter()
1457 .enumerate()
1458 .filter_map(|(i, c)| {
1459 let v = c.load(Ordering::Relaxed);
1460 if v > 0 {
1461 Some(format!("{:03}({})", i, v))
1462 } else {
1463 None
1464 }
1465 })
1466 .collect::<Vec<_>>()
1467 .join(", ");
1468 if !summary.is_empty() {
1469 log::debug!(target: &log_target, "threads with errors: {}", summary);
1470 }
1471 return Ok(());
1472 }
1473 }
1474 if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1475 && last_counted_slot < expected_last_slot
1476 {
1477 }
1480 if let Some(ref mut stats) = thread_stats {
1481 stats.finish_time = Some(std::time::Instant::now());
1482 maybe_emit_stats(
1483 stats_tracking.as_ref(),
1484 thread_index,
1485 stats,
1486 &overall_slots_processed,
1487 &overall_blocks_processed,
1488 &overall_transactions_processed,
1489 &overall_entries_processed,
1490 &transactions_since_stats,
1491 &blocks_since_stats,
1492 &slots_since_stats,
1493 &last_pulse,
1494 start_time,
1495 )
1496 .await?;
1497 }
1498 if block_enabled {
1499 pending_skipped_slots.remove(&thread_index);
1500 }
1501 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1502 }
1503 Ok(())
1504 }
1505 .await
1506 {
1507 if is_shutdown_error(&err) {
1508 log::info!(
1509 target: &log_target,
1510 "shutdown requested; terminating firehose thread {}",
1511 thread_index
1512 );
1513 break;
1514 }
1515 let epoch = slot_to_epoch(slot);
1516 let item_index = match &err {
1517 FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1518 _ => 0,
1519 };
1520 let error_message = err.to_string();
1521 log::error!(
1522 target: &log_target,
1523 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1524 slot,
1525 epoch
1526 );
1527 log::error!(target: &log_target, "{}", error_message);
1528 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1529 || error_message.contains("Unknown CID version")
1530 {
1531 SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1534 }
1535 if let Some(on_error_cb) = on_error.clone() {
1536 let context = FirehoseErrorContext {
1537 thread_id: thread_index,
1538 slot,
1539 epoch,
1540 error_message: error_message.clone(),
1541 };
1542 if let Err(handler_err) = on_error_cb(thread_index, context).await {
1543 log::error!(
1544 target: &log_target,
1545 "on_error handler failed: {}",
1546 handler_err
1547 );
1548 }
1549 }
1550 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1552 log::warn!(
1553 target: &log_target,
1554 "restarting from slot {} at index {}",
1555 slot,
1556 item_index,
1557 );
1558 if slot <= last_counted_slot {
1562 slot_range.start = last_counted_slot.saturating_add(1);
1563 } else {
1564 slot_range.start = slot;
1565 }
1566 last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1568 if tracking_enabled
1569 && let Some(ref mut stats_ref) = thread_stats {
1570 stats_ref.slot_range.start = slot_range.start;
1571 stats_ref.slot_range.end = slot_range.end;
1572 }
1574 if block_enabled {
1575 pending_skipped_slots.remove(&thread_index);
1576 }
1577 skip_until_index = None;
1581 last_emitted_slot_global = last_emitted_slot;
1582 }
1583 });
1584 handles.push(handle);
1585 }
1586
1587 for handle in handles {
1589 handle.await.unwrap();
1590 }
1591 if stats_tracking.is_some() {
1592 let elapsed = firehose_start.elapsed();
1593 let elapsed_secs = elapsed.as_secs_f64();
1594 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1595 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1596 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1597 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1598 let total_errors: u64 = error_counts
1599 .iter()
1600 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1601 .sum();
1602 let overall_tps = if elapsed_secs > 0.0 {
1603 total_transactions as f64 / elapsed_secs
1604 } else {
1605 0.0
1606 };
1607 log::info!(
1608 target: LOG_MODULE,
1609 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1610 elapsed_secs,
1611 total_slots,
1612 total_blocks,
1613 total_leader_skipped,
1614 total_transactions,
1615 overall_tps,
1616 total_errors
1617 );
1618 }
1619 if shutdown_flag.load(Ordering::SeqCst) {
1620 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1621 } else {
1622 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1623 }
1624 Ok(())
1625}
1626
1627#[allow(clippy::result_large_err)]
1628pub fn firehose_geyser(
1635 rt: Arc<tokio::runtime::Runtime>,
1636 slot_range: Range<u64>,
1637 geyser_config_files: Option<&[PathBuf]>,
1638 index_base_url: &Url,
1639 client: &Client,
1640 on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1641 threads: u64,
1642) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1643 if threads == 0 {
1644 return Err((
1645 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1646 slot_range.start,
1647 ));
1648 }
1649 log::info!(target: LOG_MODULE, "starting firehose...");
1650 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1651 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1652 let mut entry_notifier_maybe = None;
1653 let mut block_meta_notifier_maybe = None;
1654 let mut transaction_notifier_maybe = None;
1655 if let Some(geyser_config_files) = geyser_config_files {
1656 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1657
1658 let service =
1659 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1660 confirmed_bank_receiver.clone(),
1661 true,
1662 geyser_config_files,
1663 )
1664 .map_err(|e| (e.into(), slot_range.start))?;
1665
1666 transaction_notifier_maybe = Some(
1667 service
1668 .get_transaction_notifier()
1669 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1670 .map_err(|e| (e, slot_range.start))?,
1671 );
1672
1673 entry_notifier_maybe = service.get_entry_notifier();
1674 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1675
1676 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1677 }
1678
1679 if entry_notifier_maybe.is_some() {
1680 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1681 } else {
1682 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1683 }
1684 log::info!(target: LOG_MODULE, "running on_load...");
1685 rt.spawn(on_load);
1686
1687 let slot_range = Arc::new(slot_range);
1688 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1689 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1690 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1691 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1692
1693 let subranges = generate_subranges(&slot_range, threads);
1695 if threads > 1 {
1696 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1697 }
1698
1699 let mut handles = Vec::new();
1700 let error_counts: Arc<Vec<AtomicU32>> =
1702 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1703
1704 for (i, slot_range) in subranges.into_iter().enumerate() {
1705 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1706 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1707 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1708 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1709 let client = client.clone();
1710 let error_counts = error_counts.clone();
1711
1712 let rt_clone = rt.clone();
1713
1714 let handle = std::thread::spawn(move || {
1715 rt_clone.block_on(async {
1716 firehose_geyser_thread(
1717 slot_range,
1718 transaction_notifier_maybe,
1719 entry_notifier_maybe,
1720 block_meta_notifier_maybe,
1721 confirmed_bank_sender,
1722 &client,
1723 if threads > 1 { Some(i) } else { None },
1724 error_counts,
1725 )
1726 .await
1727 .unwrap();
1728 });
1729 });
1730 handles.push(handle);
1731 }
1732
1733 for handle in handles {
1735 handle.join().unwrap();
1736 }
1737 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1738 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1739 block_meta_notifier.notify_block_metadata(
1740 u64::MAX,
1741 "unload",
1742 u64::MAX,
1743 "unload",
1744 &KeyedRewardsAndNumPartitions {
1745 keyed_rewards: vec![],
1746 num_partitions: None,
1747 },
1748 None,
1749 None,
1750 0,
1751 0,
1752 );
1753 }
1754 Ok(confirmed_bank_receiver)
1755}
1756
1757#[allow(clippy::too_many_arguments)]
1758#[allow(clippy::result_large_err)]
1759async fn firehose_geyser_thread(
1760 mut slot_range: Range<u64>,
1761 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1762 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1763 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1764 confirmed_bank_sender: Sender<SlotNotification>,
1765 client: &Client,
1766 thread_index: Option<usize>,
1767 error_counts: Arc<Vec<AtomicU32>>,
1768) -> Result<(), (FirehoseError, u64)> {
1769 let start_time = std::time::Instant::now();
1770 let log_target = if let Some(thread_index) = thread_index {
1771 format!("{}::T{:03}", LOG_MODULE, thread_index)
1772 } else {
1773 LOG_MODULE.to_string()
1774 };
1775 let initial_slot_range = slot_range.clone();
1776 let mut skip_until_index = None;
1777 let mut last_counted_slot = slot_range.start.saturating_sub(1);
1778 while let Err((err, slot)) = async {
1780 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1781 log::info!(
1782 target: &log_target,
1783 "slot range: {} (epoch {}) ... {} (epoch {})",
1784 slot_range.start,
1785 slot_to_epoch(slot_range.start),
1786 slot_range.end,
1787 slot_to_epoch(slot_range.end)
1788 );
1789
1790 log::info!(target: &log_target, "🚒 starting firehose...");
1791
1792 let mut current_slot: Option<u64> = None;
1794 for epoch_num in epoch_range.clone() {
1795 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1796 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1797 Ok(stream) => stream,
1798 Err(_) => {
1799 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1800 }
1801 };
1802 let mut reader = NodeReader::new(stream);
1803
1804 let header_fut = reader.read_raw_header();
1805 let header = match timeout(OP_TIMEOUT, header_fut).await {
1806 Ok(res) => res
1807 .map_err(FirehoseError::ReadHeader)
1808 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1809 Err(_) => {
1810 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1811 }
1812 };
1813 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1814
1815 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1816 let local_start = std::cmp::max(slot_range.start, epoch_start);
1817 let local_end_inclusive =
1818 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1819 if local_start > local_end_inclusive {
1820 log::debug!(
1821 target: &log_target,
1822 "epoch {} has no overlap with thread range ({}..{}), skipping",
1823 epoch_num,
1824 slot_range.start,
1825 slot_range.end
1826 );
1827 continue;
1828 }
1829
1830 let mut todo_previous_blockhash = Hash::default();
1831 let mut todo_latest_entry_blockhash = Hash::default();
1832 last_counted_slot = local_start.saturating_sub(1);
1835 current_slot = None;
1836
1837 if local_start > epoch_start {
1838 let seek_slot = local_start.saturating_sub(1);
1842 let seek_fut = reader.seek_to_slot(seek_slot);
1843 match timeout(OP_TIMEOUT, seek_fut).await {
1844 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1845 Err(_) => {
1846 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1847 }
1848 }
1849 }
1850
1851 let mut item_index = 0;
1853 let mut displayed_skip_message = false;
1854 loop {
1855 let read_fut = reader.read_until_block();
1856 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1857 Ok(result) => result
1858 .map_err(FirehoseError::ReadUntilBlockError)
1859 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1860 Err(_) => {
1861 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1862 let restart_slot =
1863 current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
1864 return Err((
1865 FirehoseError::OperationTimeout("read_until_block"),
1866 restart_slot,
1867 ));
1868 }
1869 };
1870 if nodes.is_empty() {
1871 log::info!(
1872 target: &log_target,
1873 "reached end of epoch {}",
1874 epoch_num
1875 );
1876 break;
1877 }
1878 if let Some(last_node) = nodes.0.last()
1887 && !last_node.get_node().is_block() {
1888 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1889 break;
1890 }
1891 let block = nodes
1892 .get_block()
1893 .map_err(FirehoseError::GetBlockError)
1894 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1895 log::debug!(
1896 target: &log_target,
1897 "read {} items from epoch {}, now at slot {}",
1898 item_index,
1899 epoch_num,
1900 block.slot
1901 );
1902 let slot = block.slot;
1903 if slot > local_end_inclusive {
1904 log::debug!(
1905 target: &log_target,
1906 "reached end of local slice at slot {} (epoch {}), stopping",
1907 slot,
1908 epoch_num
1909 );
1910 break;
1911 }
1912 if slot >= slot_range.end {
1913 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1914 return Ok(());
1918 }
1919 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1920 if slot < local_start {
1921 if slot.saturating_add(1) == local_start {
1922 log::debug!(
1923 target: &log_target,
1924 "priming reader with preceding slot {}, skipping",
1925 slot
1926 );
1927 } else {
1928 log::warn!(
1929 target: &log_target,
1930 "encountered slot {} before start of range {}, skipping",
1931 slot,
1932 local_start
1933 );
1934 }
1935 continue;
1936 }
1937 current_slot = Some(slot);
1938 let mut entry_index: usize = 0;
1939 let mut this_block_executed_transaction_count: u64 = 0;
1940 let mut this_block_entry_count: u64 = 0;
1941 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1942
1943 if slot <= last_counted_slot {
1944 log::debug!(
1945 target: &log_target,
1946 "duplicate block {}, already counted (last_counted={})",
1947 slot,
1948 last_counted_slot,
1949 );
1950 this_block_rewards.clear();
1951 continue;
1952 }
1953
1954 nodes.each(|node_with_cid| -> Result<(), SharedError> {
1955 item_index += 1;
1956 if let Some(skip) = skip_until_index {
1962 if item_index < skip {
1963 if !displayed_skip_message {
1964 log::info!(
1965 target: &log_target,
1966 "skipping until index {} (at {})",
1967 skip,
1968 item_index
1969 );
1970 displayed_skip_message = true;
1971 }
1972 return Ok(());
1973 } else {
1974 log::info!(
1975 target: &log_target,
1976 "reached target index {}, resuming...",
1977 skip
1978 );
1979 skip_until_index = None;
1980 }
1981 }
1982 let node = node_with_cid.get_node();
1983
1984 use crate::node::Node::*;
1985 match node {
1986 Transaction(tx) => {
1987 let versioned_tx = tx.as_parsed()?;
1988 let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1989
1990 let as_native_metadata = decode_transaction_status_meta_from_frame(
1991 block.slot,
1992 reassembled_metadata,
1993 )?;
1994
1995 let message_hash = {
1996 #[cfg(feature = "verify-transaction-signatures")]
1997 {
1998 versioned_tx.verify_and_hash_message()?
1999 }
2000 #[cfg(not(feature = "verify-transaction-signatures"))]
2001 {
2002 versioned_tx.message.hash()
2005 }
2006 };
2007 let signature = versioned_tx
2008 .signatures
2009 .first()
2010 .ok_or_else(|| {
2011 Box::new(std::io::Error::new(
2012 std::io::ErrorKind::InvalidData,
2013 "transaction missing signature",
2014 )) as SharedError
2015 })?;
2016 let is_vote = is_simple_vote_transaction(&versioned_tx);
2017
2018 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2019 transaction_notifier.notify_transaction(
2020 block.slot,
2021 tx.index.unwrap() as usize,
2022 signature,
2023 &message_hash,
2024 is_vote,
2025 &as_native_metadata,
2026 &versioned_tx,
2027 );
2028 }
2029
2030 }
2031 Entry(entry) => {
2032 let entry_hash = Hash::from(entry.hash.to_bytes());
2033 let entry_transaction_count = entry.transactions.len();
2034 let entry_transaction_count_u64 = entry_transaction_count as u64;
2035 let starting_transaction_index =
2036 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2037 Box::new(std::io::Error::other(
2038 "transaction index exceeds usize range",
2039 )) as SharedError
2040 })?;
2041 todo_latest_entry_blockhash = entry_hash;
2042 this_block_executed_transaction_count += entry_transaction_count_u64;
2043 this_block_entry_count += 1;
2044 if entry_notifier_maybe.is_none() {
2045 return Ok(());
2046 }
2047 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2048 let entry_summary = solana_entry::entry::EntrySummary {
2049 num_hashes: entry.num_hashes,
2050 hash: Hash::from(entry.hash.to_bytes()),
2051 num_transactions: entry_transaction_count_u64,
2052 };
2053 entry_notifier.notify_entry(
2054 block.slot,
2055 entry_index,
2056 &entry_summary,
2057 starting_transaction_index,
2058 );
2059 entry_index += 1;
2060 }
2061 Block(block) => {
2062 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2063 confirmed_bank_sender.send(notification).unwrap();
2064
2065 if block_meta_notifier_maybe.is_none() {
2066 last_counted_slot = block.slot;
2067 return Ok(());
2068 }
2069 let keyed_rewards = std::mem::take(&mut this_block_rewards);
2070 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2071 block_meta_notifier.notify_block_metadata(
2072 block.meta.parent_slot,
2073 todo_previous_blockhash.to_string().as_str(),
2074 block.slot,
2075 todo_latest_entry_blockhash.to_string().as_str(),
2076 &KeyedRewardsAndNumPartitions {
2077 keyed_rewards,
2078 num_partitions: None,
2079 },
2080 Some(block.meta.blocktime as i64),
2081 block.meta.block_height,
2082 this_block_executed_transaction_count,
2083 this_block_entry_count,
2084 );
2085 todo_previous_blockhash = todo_latest_entry_blockhash;
2086 last_counted_slot = block.slot;
2087 std::thread::yield_now();
2088 }
2089 Subset(_subset) => (),
2090 Epoch(_epoch) => (),
2091 Rewards(rewards) => {
2092 if !rewards.is_complete() {
2093 let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
2094 let decompressed = utils::decompress_zstd(reassembled)?;
2095 let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
2096 Box::new(std::io::Error::other(
2097 std::format!("Error decoding rewards: {:?}", err),
2098 ))
2099 })?;
2100 this_block_rewards = convert_proto_rewards(&decoded)?;
2101 }
2102 }
2103 DataFrame(_data_frame) => (),
2104 }
2105 Ok(())
2106 })
2107 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2108 if block.slot == slot_range.end - 1 {
2109 let finish_time = std::time::Instant::now();
2110 let elapsed = finish_time.duration_since(start_time);
2111 log::info!(target: &log_target, "processed slot {}", block.slot);
2112 let elapsed_pretty = human_readable_duration(elapsed);
2113 log::info!(
2114 target: &log_target,
2115 "processed {} slots across {} epochs in {}.",
2116 initial_slot_range.end - initial_slot_range.start,
2117 slot_to_epoch(initial_slot_range.end)
2118 + 1
2119 - slot_to_epoch(initial_slot_range.start),
2120 elapsed_pretty
2121 );
2122 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2123 let summary: String = error_counts
2126 .iter()
2127 .enumerate()
2128 .filter_map(|(i, c)| {
2129 let v = c.load(Ordering::Relaxed);
2130 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2131 })
2132 .collect::<Vec<_>>()
2133 .join(", ");
2134 if !summary.is_empty() {
2135 log::debug!(target: &log_target, "threads with errors: {}", summary);
2136 }
2137 return Ok(());
2138 }
2139 }
2140 }
2141 Ok(())
2142}
2143.await
2144{
2145 if is_shutdown_error(&err) {
2146 log::info!(
2147 target: &log_target,
2148 "shutdown requested; terminating firehose thread {:?}",
2149 thread_index
2150 );
2151 return Ok(());
2152 }
2153 log::error!(
2154 target: &log_target,
2155 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2156 slot,
2157 slot_to_epoch(slot)
2158 );
2159 log::error!(target: &log_target, "{}", err);
2160 let error_message = err.to_string();
2161 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2162 || error_message.contains("Unknown CID version")
2163 {
2164 SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2167 }
2168 let item_index = match err {
2169 FirehoseError::NodeDecodingError(item_index, _) => item_index,
2170 _ => 0,
2171 };
2172 let idx = thread_index.unwrap_or(0);
2174 error_counts[idx].fetch_add(1, Ordering::Relaxed);
2175 log::warn!(
2176 target: &log_target,
2177 "restarting from slot {} at index {}",
2178 slot,
2179 item_index,
2180 );
2181 if slot <= last_counted_slot {
2184 slot_range.start = last_counted_slot.saturating_add(1);
2185 } else {
2186 slot_range.start = slot;
2187 }
2188 skip_until_index = None;
2192}
2193 Ok(())
2194}
2195
2196#[inline]
2197fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2198 if !(1..=2).contains(&versioned_tx.signatures.len()) {
2199 return false;
2200 }
2201
2202 if !matches!(
2203 versioned_tx.version(),
2204 solana_transaction::versioned::TransactionVersion::Legacy(_)
2205 ) {
2206 return false;
2207 }
2208
2209 let instructions = versioned_tx.message.instructions();
2210 if instructions.len() != 1 {
2211 return false;
2212 }
2213
2214 let program_index = instructions[0].program_id_index as usize;
2215 versioned_tx
2216 .message
2217 .static_account_keys()
2218 .get(program_index)
2219 .map(|program_id| program_id == &vote_program_id())
2220 .unwrap_or(false)
2221}
2222
2223#[inline(always)]
2224fn convert_proto_rewards(
2225 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2226) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2227 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2228 for proto_reward in proto_rewards.rewards.iter() {
2229 let reward = RewardInfo {
2230 reward_type: match proto_reward.reward_type - 1 {
2231 0 => RewardType::Fee,
2232 1 => RewardType::Rent,
2233 2 => RewardType::Staking,
2234 3 => RewardType::Voting,
2235 typ => {
2236 return Err(Box::new(std::io::Error::other(format!(
2237 "unsupported reward type {}",
2238 typ
2239 ))));
2240 }
2241 },
2242 lamports: proto_reward.lamports,
2243 post_balance: proto_reward.post_balance,
2244 commission: proto_reward.commission.parse::<u8>().ok(),
2245 };
2246 let pubkey = proto_reward
2247 .pubkey
2248 .parse::<Address>()
2249 .map_err(|err| Box::new(err) as SharedError)?;
2250 keyed_rewards.push((pubkey, reward));
2251 }
2252 Ok(keyed_rewards)
2253}
2254
2255#[inline]
2256pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2258 let total = slot_range.end - slot_range.start;
2259 let slots_per_thread = total / threads;
2260 let remainder = total % threads;
2261
2262 let ranges: Vec<Range<u64>> = (0..threads)
2263 .map(|i| {
2264 let extra_slot = if i < remainder { 1 } else { 0 };
2266 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2267 let end = start + slots_per_thread + extra_slot;
2268 start..end
2269 })
2270 .collect();
2271
2272 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2274 assert_eq!(
2275 total_covered, total,
2276 "Range generation failed: {} threads should cover {} slots but only cover {}",
2277 threads, total, total_covered
2278 );
2279
2280 for i in 1..ranges.len() {
2282 assert_eq!(
2283 ranges[i - 1].end,
2284 ranges[i].start,
2285 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2286 i - 1,
2287 ranges[i - 1].end,
2288 i,
2289 ranges[i].start
2290 );
2291 }
2292
2293 log::info!(
2294 target: LOG_MODULE,
2295 "Generated {} thread ranges covering {} slots total",
2296 threads,
2297 total_covered
2298 );
2299 ranges
2300}
2301
2302fn human_readable_duration(duration: std::time::Duration) -> String {
2303 if duration.is_zero() {
2304 return "0s".into();
2305 }
2306 let total_secs = duration.as_secs();
2307 if total_secs < 60 {
2308 let secs_f = duration.as_secs_f64();
2309 if total_secs == 0 {
2310 format!("{:.2}s", secs_f)
2311 } else if duration.subsec_millis() == 0 {
2312 format!("{}s", total_secs)
2313 } else {
2314 format!("{:.2}s", secs_f)
2315 }
2316 } else {
2317 let mut secs = total_secs;
2318 let days = secs / 86_400;
2319 secs %= 86_400;
2320 let hours = secs / 3_600;
2321 secs %= 3_600;
2322 let minutes = secs / 60;
2323 secs %= 60;
2324 if days > 0 {
2325 if hours > 0 {
2326 format!("{days}d{hours}h")
2327 } else {
2328 format!("{days}d")
2329 }
2330 } else if hours > 0 {
2331 if minutes > 0 {
2332 format!("{hours}h{minutes}m")
2333 } else {
2334 format!("{hours}h")
2335 }
2336 } else if minutes > 0 {
2337 if secs > 0 {
2338 format!("{minutes}m{secs}s")
2339 } else {
2340 format!("{minutes}m")
2341 }
2342 } else {
2343 format!("{secs}s")
2344 }
2345 }
2346}
2347
2348#[cfg(test)]
2349fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2350 Box::pin(async move {
2351 let elapsed = stats.start_time.elapsed();
2352 let elapsed_secs = elapsed.as_secs_f64();
2353 let tps = if elapsed_secs > 0.0 {
2354 stats.transactions_processed as f64 / elapsed_secs
2355 } else {
2356 0.0
2357 };
2358 log::info!(
2359 target: LOG_MODULE,
2360 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2361 stats.thread_stats.current_slot,
2362 stats.slots_processed,
2363 stats.blocks_processed,
2364 stats.transactions_processed,
2365 stats.entries_processed,
2366 stats.rewards_processed,
2367 elapsed_secs,
2368 tps
2369 );
2370 Ok(())
2371 })
2372}
2373
2374#[cfg(test)]
2375use futures_util::FutureExt;
2376#[cfg(test)]
2377use serial_test::serial;
2378#[cfg(test)]
2379use std::sync::{Mutex, OnceLock};
2380
2381#[tokio::test(flavor = "multi_thread")]
2382async fn test_firehose_epoch_800() {
2383 use dashmap::DashSet;
2384 use std::sync::atomic::{AtomicU64, Ordering};
2385 solana_logger::setup_with_default("info");
2386 const THREADS: usize = 4;
2387 const NUM_SLOTS_TO_COVER: u64 = 50;
2388 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2389 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2390 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2391 static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2392 static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2393 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2394 let stats_tracking = StatsTracking {
2395 on_stats: log_stats_handler,
2396 tracking_interval_slots: 10,
2397 };
2398
2399 for prev in PREV_BLOCK.iter() {
2400 prev.store(0, Ordering::Relaxed);
2401 }
2402 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2403 NUM_BLOCKS.store(0, Ordering::Relaxed);
2404 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2405 SEEN_SLOTS.get_or_init(DashSet::new).clear();
2406 SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2407
2408 firehose(
2409 THREADS.try_into().unwrap(),
2410 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2411 Some(|thread_id: usize, block: BlockData| {
2412 async move {
2413 let _prev =
2414 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2415 if block.was_skipped() {
2416 log::info!(
2417 target: LOG_MODULE,
2418 "leader skipped block {} on thread {}",
2419 block.slot(),
2420 thread_id,
2421 );
2422 } else {
2423 }
2430
2431 let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2432 if block.was_skipped() {
2433 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2434 SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2435 } else {
2436 if first_time {
2437 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2438 if let BlockData::Block {
2439 executed_transaction_count,
2440 ..
2441 } = &block
2442 {
2443 let executed = *executed_transaction_count;
2444 let _ = MIN_TRANSACTIONS.fetch_update(
2445 Ordering::Relaxed,
2446 Ordering::Relaxed,
2447 |current| {
2448 if executed < current {
2449 Some(executed)
2450 } else {
2451 None
2452 }
2453 },
2454 );
2455 }
2456 }
2457 }
2458 Ok(())
2459 }
2460 .boxed()
2461 }),
2462 None::<OnTxFn>,
2463 None::<OnEntryFn>,
2464 None::<OnRewardFn>,
2465 None::<OnErrorFn>,
2466 Some(stats_tracking),
2467 None,
2468 )
2469 .await
2470 .unwrap();
2471 let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2472 assert_eq!(
2473 seen, NUM_SLOTS_TO_COVER,
2474 "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2475 );
2476 let mut skipped: Vec<u64> = SEEN_SKIPPED
2477 .get_or_init(DashSet::new)
2478 .iter()
2479 .map(|v| *v)
2480 .collect();
2481 skipped.sort_unstable();
2482 const EXPECTED_SKIPPED: [u64; 6] = [
2484 345_600_004,
2485 345_600_005,
2486 345_600_008,
2487 345_600_009,
2488 345_600_010,
2489 345_600_011,
2490 ];
2491 assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2492 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2493}
2494
2495#[tokio::test(flavor = "multi_thread")]
2496async fn test_firehose_target_slot_transactions() {
2497 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2498 solana_logger::setup_with_default("info");
2499 const TARGET_SLOT: u64 = 376_273_722;
2500 const SLOT_RADIUS: u64 = 50;
2501 const EXPECTED_TRANSACTIONS: u64 = 1414;
2502 const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2503 static FOUND: AtomicBool = AtomicBool::new(false);
2504 static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2505 static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2506
2507 FOUND.store(false, Ordering::Relaxed);
2508 OBSERVED_TXS.store(0, Ordering::Relaxed);
2509 OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2510
2511 firehose(
2512 4,
2513 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2514 Some(|_thread_id: usize, block: BlockData| {
2515 async move {
2516 if block.slot() == TARGET_SLOT {
2517 assert!(
2518 !block.was_skipped(),
2519 "target slot {TARGET_SLOT} was marked leader skipped",
2520 );
2521 if let BlockData::Block {
2522 executed_transaction_count,
2523 ..
2524 } = block
2525 {
2526 OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2527 FOUND.store(true, Ordering::Relaxed);
2528 assert_eq!(
2529 executed_transaction_count, EXPECTED_TRANSACTIONS,
2530 "unexpected transaction count for slot {TARGET_SLOT}"
2531 );
2532 assert_eq!(
2533 OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2534 EXPECTED_NON_VOTE_TRANSACTIONS,
2535 "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2536 );
2537 }
2538 }
2539 Ok(())
2540 }
2541 .boxed()
2542 }),
2543 Some(|_thread_id: usize, transaction: TransactionData| {
2544 async move {
2545 if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2546 OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2547 }
2548 Ok(())
2549 }
2550 .boxed()
2551 }),
2552 None::<OnEntryFn>,
2553 None::<OnRewardFn>,
2554 None::<OnErrorFn>,
2555 None::<OnStatsTrackingFn>,
2556 None,
2557 )
2558 .await
2559 .unwrap();
2560
2561 assert!(
2562 FOUND.load(Ordering::Relaxed),
2563 "target slot was not processed"
2564 );
2565 assert_eq!(
2566 OBSERVED_TXS.load(Ordering::Relaxed),
2567 EXPECTED_TRANSACTIONS,
2568 "recorded transaction count mismatch"
2569 );
2570}
2571
2572#[tokio::test(flavor = "multi_thread")]
2573async fn test_firehose_epoch_850_has_logs() {
2574 use std::sync::atomic::{AtomicU64, Ordering};
2575 solana_logger::setup_with_default("info");
2576 const START_SLOT: u64 = 367_200_075; const SLOT_COUNT: u64 = 50;
2578 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
2579
2580 TOTAL_TXS.store(0, Ordering::Relaxed);
2581
2582 firehose(
2583 4,
2584 START_SLOT..(START_SLOT + SLOT_COUNT),
2585 None::<OnBlockFn>,
2586 Some(|_thread_id: usize, transaction: TransactionData| {
2587 async move {
2588 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
2589 if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
2590 let has_logs = logs.iter().any(|msg| !msg.is_empty());
2591 assert_eq!(has_logs, true);
2592 }
2593 Ok(())
2594 }
2595 .boxed()
2596 }),
2597 None::<OnEntryFn>,
2598 None::<OnRewardFn>,
2599 None::<OnErrorFn>,
2600 None::<OnStatsTrackingFn>,
2601 None,
2602 )
2603 .await
2604 .unwrap();
2605
2606 assert!(
2607 TOTAL_TXS.load(Ordering::Relaxed) > 0,
2608 "no transactions observed in epoch 850 range"
2609 );
2610}
2611
2612#[tokio::test(flavor = "multi_thread")]
2613async fn test_firehose_epoch_850_votes_present() {
2614 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2615 solana_logger::setup_with_default("info");
2616 const TARGET_SLOT: u64 = 367_200_100; const SLOT_RADIUS: u64 = 10;
2618 static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
2619 static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
2620 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
2621
2622 SEEN_BLOCK.store(false, Ordering::Relaxed);
2623 VOTE_TXS.store(0, Ordering::Relaxed);
2624 TOTAL_TXS.store(0, Ordering::Relaxed);
2625
2626 firehose(
2627 2,
2628 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2629 Some(|_thread_id: usize, block: BlockData| {
2630 async move {
2631 if block.slot() == TARGET_SLOT {
2632 assert!(
2633 !block.was_skipped(),
2634 "target slot {TARGET_SLOT} was marked leader skipped",
2635 );
2636 SEEN_BLOCK.store(true, Ordering::Relaxed);
2637 }
2638 Ok(())
2639 }
2640 .boxed()
2641 }),
2642 Some(|_thread_id: usize, transaction: TransactionData| {
2643 async move {
2644 if transaction.slot == TARGET_SLOT {
2645 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
2646 if transaction.is_vote {
2647 VOTE_TXS.fetch_add(1, Ordering::Relaxed);
2648 }
2649 }
2650 Ok(())
2651 }
2652 .boxed()
2653 }),
2654 None::<OnEntryFn>,
2655 None::<OnRewardFn>,
2656 None::<OnErrorFn>,
2657 None::<OnStatsTrackingFn>,
2658 None,
2659 )
2660 .await
2661 .unwrap();
2662
2663 assert!(
2664 SEEN_BLOCK.load(Ordering::Relaxed),
2665 "target slot was not processed"
2666 );
2667 assert!(
2668 TOTAL_TXS.load(Ordering::Relaxed) > 0,
2669 "no transactions counted in target slot"
2670 );
2671 assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
2672}
2673
2674#[cfg(test)]
2675#[serial]
2676#[tokio::test(flavor = "multi_thread")]
2677async fn test_firehose_restart_loses_coverage_without_reset() {
2678 use std::collections::HashMap;
2679 solana_logger::setup_with_default("info");
2680 const THREADS: usize = 1;
2681 const START_SLOT: u64 = 345_600_000;
2682 const NUM_SLOTS: u64 = 8;
2683
2684 static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
2685 COVERAGE
2686 .get_or_init(|| Mutex::new(HashMap::new()))
2687 .lock()
2688 .unwrap()
2689 .clear();
2690 static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
2691 static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
2692 FAIL_TRIGGERED.store(false, Ordering::Relaxed);
2693 SEEN_BLOCKS.store(0, Ordering::Relaxed);
2694
2695 firehose(
2696 THREADS.try_into().unwrap(),
2697 START_SLOT..(START_SLOT + NUM_SLOTS),
2698 Some(|_thread_id: usize, block: BlockData| {
2699 async move {
2700 if !block.was_skipped()
2702 && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
2703 && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
2704 {
2705 return Err("synthetic handler failure to exercise restart".into());
2706 }
2707 let mut coverage = COVERAGE
2708 .get_or_init(|| Mutex::new(HashMap::new()))
2709 .lock()
2710 .unwrap();
2711 *coverage.entry(block.slot()).or_insert(0) += 1;
2712 if !block.was_skipped() {
2713 SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
2714 }
2715 Ok(())
2716 }
2717 .boxed()
2718 }),
2719 None::<OnTxFn>,
2720 None::<OnEntryFn>,
2721 None::<OnRewardFn>,
2722 None::<OnErrorFn>,
2723 None::<OnStatsTrackingFn>,
2724 None,
2725 )
2726 .await
2727 .unwrap();
2728
2729 let coverage = COVERAGE.get().unwrap().lock().unwrap();
2730 for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
2731 assert!(
2732 coverage.contains_key(&slot),
2733 "missing coverage for slot {slot} after restart"
2734 );
2735 }
2736}
2737
2738#[cfg(test)]
2739#[serial]
2740#[tokio::test(flavor = "multi_thread")]
2741async fn test_firehose_gap_coverage_near_known_missing_range() {
2742 use std::collections::HashSet;
2743 solana_logger::setup_with_default("info");
2744 const GAP_START: u64 = 378864000;
2745 const START_SLOT: u64 = GAP_START - 1000;
2746 const END_SLOT: u64 = GAP_START + 1000;
2747 const THREADS: usize = 16;
2748
2749 static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
2750 COVERAGE
2751 .get_or_init(|| Mutex::new(HashSet::new()))
2752 .lock()
2753 .unwrap()
2754 .clear();
2755
2756 firehose(
2757 THREADS.try_into().unwrap(),
2758 START_SLOT..(END_SLOT + 1),
2759 Some(|_thread_id: usize, block: BlockData| {
2760 async move {
2761 if block.was_skipped() {
2762 return Ok(());
2763 }
2764 let slot = block.slot();
2765 COVERAGE
2766 .get_or_init(|| Mutex::new(HashSet::new()))
2767 .lock()
2768 .unwrap()
2769 .insert(slot);
2770 Ok(())
2771 }
2772 .boxed()
2773 }),
2774 None::<OnTxFn>,
2775 None::<OnEntryFn>,
2776 None::<OnRewardFn>,
2777 None::<OnErrorFn>,
2778 None::<OnStatsTrackingFn>,
2779 None,
2780 )
2781 .await
2782 .unwrap();
2783
2784 let mut coverage = COVERAGE
2785 .get_or_init(|| Mutex::new(HashSet::new()))
2786 .lock()
2787 .unwrap()
2788 .clone();
2789
2790 coverage.insert(378864396);
2792 coverage.insert(378864397);
2793 coverage.insert(378864398);
2794 coverage.insert(378864399);
2795
2796 let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
2797 let missing: Vec<u64> = expected
2798 .iter()
2799 .copied()
2800 .filter(|slot| !coverage.contains(slot))
2801 .collect();
2802 assert!(
2803 missing.is_empty(),
2804 "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
2805 missing.len(),
2806 &missing[..missing.len().min(10)]
2807 );
2808}