1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7 block_metadata_notifier_interface::BlockMetadataNotifier,
8 geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14 optimistically_confirmed_bank_tracker::SlotNotification,
15 transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21 fmt::Display,
22 future::Future,
23 io,
24 ops::Range,
25 path::PathBuf,
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29 },
30};
31use thiserror::Error;
32use tokio::{
33 sync::broadcast::{self, error::TryRecvError},
34 time::timeout,
35};
36
37use crate::{
38 LOG_MODULE, SharedError,
39 epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
40 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
41 node_reader::NodeReader,
42 utils,
43};
44
45const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49const BINCODE_EPOCH_CUTOFF: u64 = 157;
51
52fn poll_shutdown(
53 flag: &Arc<std::sync::atomic::AtomicBool>,
54 receiver: &mut Option<broadcast::Receiver<()>>,
55) -> bool {
56 if let Some(rx) = receiver {
57 match rx.try_recv() {
58 Ok(_) | Err(TryRecvError::Lagged(_)) => {
59 flag.store(true, Ordering::SeqCst);
60 }
61 Err(TryRecvError::Closed) => {
62 flag.store(true, Ordering::SeqCst);
63 }
64 Err(TryRecvError::Empty) => {}
65 }
66 }
67 flag.load(Ordering::SeqCst)
68}
69
70fn is_shutdown_error(err: &FirehoseError) -> bool {
71 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
72 inner
73 .downcast_ref::<io::Error>()
74 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
75 .unwrap_or(false)
76 }
77
78 match err {
79 FirehoseError::BlockHandlerError(inner)
80 | FirehoseError::TransactionHandlerError(inner)
81 | FirehoseError::EntryHandlerError(inner)
82 | FirehoseError::RewardHandlerError(inner)
83 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
84 _ => false,
85 }
86}
87
88#[derive(Debug, Error)]
91pub enum FirehoseError {
92 Reqwest(reqwest::Error),
94 ReadHeader(SharedError),
96 GeyserPluginService(GeyserPluginServiceError),
98 FailedToGetTransactionNotifier,
100 ReadUntilBlockError(SharedError),
102 GetBlockError(SharedError),
104 NodeDecodingError(usize, SharedError),
106 SlotOffsetIndexError(SlotOffsetIndexError),
108 SeekToSlotError(SharedError),
110 OnLoadError(SharedError),
112 OnStatsHandlerError(SharedError),
114 OperationTimeout(&'static str),
116 TransactionHandlerError(SharedError),
118 EntryHandlerError(SharedError),
120 RewardHandlerError(SharedError),
122 BlockHandlerError(SharedError),
124}
125
126unsafe impl Send for FirehoseError {}
127unsafe impl Sync for FirehoseError {}
128
129impl Display for FirehoseError {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 match self {
132 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
133 FirehoseError::ReadHeader(error) => {
134 write!(f, "Error reading header: {}", error)
135 }
136 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
137 f,
138 "Error initializing geyser plugin service: {}",
139 geyser_plugin_service_error
140 ),
141 FirehoseError::FailedToGetTransactionNotifier => write!(
142 f,
143 "Failed to get transaction notifier from GeyserPluginService"
144 ),
145 FirehoseError::ReadUntilBlockError(error) => {
146 write!(f, "Error reading until block: {}", error)
147 }
148 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
149 FirehoseError::NodeDecodingError(item_index, error) => {
150 write!(
151 f,
152 "Error seeking, reading data from, or decoding data for data node {}: {}",
153 item_index, error
154 )
155 }
156 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
157 f,
158 "Error getting info from slot offset index: {}",
159 slot_offset_index_error
160 ),
161 FirehoseError::SeekToSlotError(error) => {
162 write!(f, "Error seeking to slot: {}", error)
163 }
164 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
165 FirehoseError::OnStatsHandlerError(error) => {
166 write!(f, "Stats handler error: {}", error)
167 }
168 FirehoseError::OperationTimeout(op) => {
169 write!(f, "Timeout while waiting for operation: {}", op)
170 }
171 FirehoseError::TransactionHandlerError(error) => {
172 write!(f, "Transaction handler error: {}", error)
173 }
174 FirehoseError::EntryHandlerError(error) => {
175 write!(f, "Entry handler error: {}", error)
176 }
177 FirehoseError::RewardHandlerError(error) => {
178 write!(f, "Reward handler error: {}", error)
179 }
180 FirehoseError::BlockHandlerError(error) => {
181 write!(f, "Block handler error: {}", error)
182 }
183 }
184 }
185}
186
187impl From<reqwest::Error> for FirehoseError {
188 fn from(e: reqwest::Error) -> Self {
189 FirehoseError::Reqwest(e)
190 }
191}
192
193impl From<GeyserPluginServiceError> for FirehoseError {
194 fn from(e: GeyserPluginServiceError) -> Self {
195 FirehoseError::GeyserPluginService(e)
196 }
197}
198
199impl From<SlotOffsetIndexError> for FirehoseError {
200 fn from(e: SlotOffsetIndexError) -> Self {
201 FirehoseError::SlotOffsetIndexError(e)
202 }
203}
204
205#[derive(Clone, PartialEq, Eq, Hash, Debug)]
207pub struct ThreadStats {
208 pub thread_id: usize,
210 pub start_time: std::time::Instant,
212 pub finish_time: Option<std::time::Instant>,
214 pub slot_range: Range<u64>,
216 pub initial_slot_range: Range<u64>,
218 pub current_slot: u64,
220 pub slots_processed: u64,
222 pub blocks_processed: u64,
224 pub leader_skipped_slots: u64,
226 pub transactions_processed: u64,
228 pub entries_processed: u64,
230 pub rewards_processed: u64,
232}
233
234#[derive(Clone, PartialEq, Eq, Hash, Debug)]
236pub struct Stats {
237 pub thread_stats: ThreadStats,
239 pub start_time: std::time::Instant,
241 pub finish_time: Option<std::time::Instant>,
243 pub slot_range: Range<u64>,
245 pub slots_processed: u64,
247 pub blocks_processed: u64,
249 pub leader_skipped_slots: u64,
251 pub transactions_processed: u64,
253 pub entries_processed: u64,
255 pub rewards_processed: u64,
257 pub transactions_since_last_pulse: u64,
259 pub blocks_since_last_pulse: u64,
261 pub slots_since_last_pulse: u64,
263 pub time_since_last_pulse: std::time::Duration,
265}
266
267#[derive(Clone, PartialEq, Eq, Hash, Debug)]
269pub struct StatsTracking<OnStats: Handler<Stats>> {
270 pub on_stats: OnStats,
272 pub tracking_interval_slots: u64,
274}
275
276#[inline(always)]
277#[allow(clippy::too_many_arguments)]
278async fn maybe_emit_stats<OnStats: Handler<Stats>>(
279 stats_tracking: Option<&StatsTracking<OnStats>>,
280 thread_index: usize,
281 thread_stats: &ThreadStats,
282 overall_slots_processed: &AtomicU64,
283 overall_blocks_processed: &AtomicU64,
284 overall_transactions_processed: &AtomicU64,
285 overall_entries_processed: &AtomicU64,
286 transactions_since_stats: &AtomicU64,
287 blocks_since_stats: &AtomicU64,
288 slots_since_stats: &AtomicU64,
289 last_pulse: &Arc<AtomicU64>,
290 base_instant: std::time::Instant,
291) -> Result<(), (FirehoseError, u64)> {
292 if let Some(stats_tracker) = stats_tracking {
293 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
294 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
295 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
296 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
297 let now_nanos = base_instant.elapsed().as_nanos() as u64;
298 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
299 let delta_nanos = now_nanos.saturating_sub(previous);
300 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
301 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
302 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
303 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
304
305 let stats = Stats {
306 thread_stats: thread_stats.clone(),
307 start_time: thread_stats.start_time,
308 finish_time: thread_stats.finish_time,
309 slot_range: thread_stats.slot_range.clone(),
310 slots_processed: total_slots,
311 blocks_processed: total_blocks,
312 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
313 transactions_processed: total_transactions,
314 entries_processed: total_entries,
315 rewards_processed: thread_stats.rewards_processed,
316 transactions_since_last_pulse: processed_transactions,
317 blocks_since_last_pulse: processed_blocks,
318 slots_since_last_pulse: processed_slots,
319 time_since_last_pulse,
320 };
321
322 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
323 last_pulse.store(previous, Ordering::Relaxed);
324 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
325 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
326 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
327 return Err((
328 FirehoseError::OnStatsHandlerError(e),
329 thread_stats.current_slot,
330 ));
331 }
332 }
333 Ok(())
334}
335
336#[inline(always)]
337fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
338 if tracking_enabled {
339 atomic.fetch_add(value, Ordering::Relaxed);
340 }
341}
342
343fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
344 map.get(&thread_id)
345 .map(|set| set.remove(&slot).is_some())
346 .unwrap_or(false)
347}
348
349fn decode_transaction_status_meta_from_frame(
350 slot: u64,
351 reassembled_metadata: Vec<u8>,
352) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
353 if reassembled_metadata.is_empty() {
354 return Ok(solana_transaction_status::TransactionStatusMeta::default());
356 }
357
358 match utils::decompress_zstd(reassembled_metadata.clone()) {
359 Ok(decompressed) => {
360 decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
361 Box::new(std::io::Error::other(format!(
362 "decode transaction metadata (slot {slot}): {err}"
363 ))) as SharedError
364 })
365 }
366 Err(decomp_err) => {
367 decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
370 Box::new(std::io::Error::other(format!(
371 "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
372 ))) as SharedError
373 })
374 }
375 }
376}
377
378fn decode_transaction_status_meta(
379 slot: u64,
380 metadata_bytes: &[u8],
381) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
382 let epoch = slot_to_epoch(slot);
383 let mut bincode_err: Option<String> = None;
384 if epoch < BINCODE_EPOCH_CUTOFF {
385 match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
386 metadata_bytes,
387 ) {
388 Ok(stored) => return Ok(stored.into()),
389 Err(err) => {
390 bincode_err = Some(err.to_string());
391 }
392 }
393 }
394
395 let bin_err_for_proto = bincode_err.clone();
396 let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
397 prost_011::Message::decode(metadata_bytes).map_err(|err| {
398 if let Some(ref bin_err) = bin_err_for_proto {
400 Box::new(std::io::Error::other(format!(
401 "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
402 ))) as SharedError
403 } else {
404 Box::new(std::io::Error::other(format!(
405 "protobuf decode transaction metadata: {err}"
406 ))) as SharedError
407 }
408 })?;
409
410 proto.try_into().map_err(|err| {
411 if let Some(ref bin_err) = bincode_err {
412 Box::new(std::io::Error::other(format!(
413 "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
414 ))) as SharedError
415 } else {
416 Box::new(std::io::Error::other(format!(
417 "convert transaction metadata proto: {err}"
418 ))) as SharedError
419 }
420 })
421}
422
423#[cfg(test)]
424mod metadata_decode_tests {
425 use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
426 use solana_message::v0::LoadedAddresses;
427 use solana_storage_proto::StoredTransactionStatusMeta;
428 use solana_transaction_status::TransactionStatusMeta;
429
430 fn sample_meta() -> TransactionStatusMeta {
431 let mut meta = TransactionStatusMeta::default();
432 meta.fee = 42;
433 meta.pre_balances = vec![1, 2];
434 meta.post_balances = vec![3, 4];
435 meta.log_messages = Some(vec!["hello".into()]);
436 meta.pre_token_balances = Some(Vec::new());
437 meta.post_token_balances = Some(Vec::new());
438 meta.rewards = Some(Vec::new());
439 meta.compute_units_consumed = Some(7);
440 meta.cost_units = Some(9);
441 meta.loaded_addresses = LoadedAddresses::default();
442 meta
443 }
444
445 #[test]
446 fn decodes_bincode_metadata_for_early_epochs() {
447 let stored = StoredTransactionStatusMeta {
448 status: Ok(()),
449 fee: 42,
450 pre_balances: vec![1, 2],
451 post_balances: vec![3, 4],
452 inner_instructions: None,
453 log_messages: Some(vec!["hello".into()]),
454 pre_token_balances: Some(Vec::new()),
455 post_token_balances: Some(Vec::new()),
456 rewards: Some(Vec::new()),
457 return_data: None,
458 compute_units_consumed: Some(7),
459 cost_units: Some(9),
460 };
461 let bytes = bincode::serialize(&stored).expect("bincode serialize");
462 let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
463 assert_eq!(decoded, TransactionStatusMeta::from(stored));
464 }
465
466 #[test]
467 fn decodes_protobuf_metadata_for_later_epochs() {
468 let meta = sample_meta();
469 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
470 meta.clone().into();
471 let bytes = prost_011::Message::encode_to_vec(&generated);
472 let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
473 assert_eq!(decoded, meta);
474 }
475
476 #[test]
477 fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
478 let meta = sample_meta();
479 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
480 meta.clone().into();
481 let bytes = prost_011::Message::encode_to_vec(&generated);
482 let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
484 assert_eq!(decoded, meta);
485 }
486
487 #[test]
488 fn empty_frame_decodes_to_default() {
489 let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
490 assert_eq!(decoded, TransactionStatusMeta::default());
491 }
492
493 #[test]
494 fn raw_bincode_frame_without_zstd_still_decodes() {
495 let stored = StoredTransactionStatusMeta {
496 status: Ok(()),
497 fee: 1,
498 pre_balances: vec![],
499 post_balances: vec![],
500 inner_instructions: None,
501 log_messages: None,
502 pre_token_balances: Some(Vec::new()),
503 post_token_balances: Some(Vec::new()),
504 rewards: Some(Vec::new()),
505 return_data: None,
506 compute_units_consumed: None,
507 cost_units: None,
508 };
509 let raw_bytes = bincode::serialize(&stored).expect("serialize");
510 let decoded =
511 decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
512 assert_eq!(decoded, TransactionStatusMeta::from(stored));
513 }
514}
515
516#[derive(Debug, Clone)]
518pub struct TransactionData {
519 pub slot: u64,
521 pub transaction_slot_index: usize,
523 pub signature: solana_signature::Signature,
525 pub message_hash: Hash,
527 pub is_vote: bool,
529 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
531 pub transaction: VersionedTransaction,
533}
534
535#[derive(Debug, Clone)]
537pub struct EntryData {
538 pub slot: u64,
540 pub entry_index: usize,
542 pub transaction_indexes: Range<usize>,
544 pub num_hashes: u64,
546 pub hash: Hash,
548}
549
550#[derive(Debug, Clone)]
552pub struct RewardsData {
553 pub slot: u64,
555 pub rewards: Vec<(Address, RewardInfo)>,
557}
558
559#[derive(Debug)]
561pub enum BlockData {
562 Block {
564 parent_slot: u64,
566 parent_blockhash: Hash,
568 slot: u64,
570 blockhash: Hash,
572 rewards: KeyedRewardsAndNumPartitions,
574 block_time: Option<i64>,
576 block_height: Option<u64>,
578 executed_transaction_count: u64,
580 entry_count: u64,
582 },
583 PossibleLeaderSkipped {
586 slot: u64,
588 },
589}
590
591impl BlockData {
592 #[inline(always)]
594 pub const fn slot(&self) -> u64 {
595 match self {
596 BlockData::Block { slot, .. } => *slot,
597 BlockData::PossibleLeaderSkipped { slot } => *slot,
598 }
599 }
600
601 #[inline(always)]
603 pub const fn was_skipped(&self) -> bool {
604 matches!(self, BlockData::PossibleLeaderSkipped { .. })
605 }
606
607 #[inline(always)]
609 pub const fn block_time(&self) -> Option<i64> {
610 match self {
611 BlockData::Block { block_time, .. } => *block_time,
612 BlockData::PossibleLeaderSkipped { .. } => None,
613 }
614 }
615}
616
617type HandlerResult = Result<(), SharedError>;
618type HandlerFuture = BoxFuture<'static, HandlerResult>;
619
620pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
622
623impl<Data, F> Handler<Data> for F where
624 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
625{
626}
627
628pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
630pub type OnBlockFn = HandlerFn<BlockData>;
632pub type OnTxFn = HandlerFn<TransactionData>;
634pub type OnEntryFn = HandlerFn<EntryData>;
636pub type OnRewardFn = HandlerFn<RewardsData>;
638pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
640pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
642pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
644
645#[derive(Clone, Debug)]
647pub struct FirehoseErrorContext {
648 pub thread_id: usize,
650 pub slot: u64,
652 pub epoch: u64,
654 pub error_message: String,
656}
657
658#[inline]
663#[allow(clippy::too_many_arguments)]
664pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
665 threads: u64,
666 slot_range: Range<u64>,
667 on_block: Option<OnBlock>,
668 on_tx: Option<OnTransaction>,
669 on_entry: Option<OnEntry>,
670 on_rewards: Option<OnRewards>,
671 on_error: Option<OnError>,
672 stats_tracking: Option<StatsTracking<OnStats>>,
673 shutdown_signal: Option<broadcast::Receiver<()>>,
674) -> Result<(), (FirehoseError, u64)>
675where
676 OnBlock: Handler<BlockData>,
677 OnTransaction: Handler<TransactionData>,
678 OnEntry: Handler<EntryData>,
679 OnRewards: Handler<RewardsData>,
680 OnStats: Handler<Stats>,
681 OnError: Handler<FirehoseErrorContext>,
682{
683 if threads == 0 {
684 return Err((
685 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
686 slot_range.start,
687 ));
688 }
689 let client = Client::new();
690 log::info!(target: LOG_MODULE, "starting firehose...");
691 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
692
693 let slot_range = Arc::new(slot_range);
694
695 let subranges = generate_subranges(&slot_range, threads);
697 if threads > 1 {
698 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
699 }
700
701 let firehose_start = std::time::Instant::now();
702 let shutdown_flag = Arc::new(AtomicBool::new(false));
703 if let Some(ref rx) = shutdown_signal {
704 let mut rx = rx.resubscribe();
705 let flag = shutdown_flag.clone();
706 tokio::spawn(async move {
707 if rx.recv().await.is_ok() {
708 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
709 flag.store(true, Ordering::SeqCst);
710 }
711 });
712 }
713 let mut handles = Vec::new();
714 let error_counts: Arc<Vec<AtomicU32>> =
716 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
717
718 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
719 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
720 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
721 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
722 let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
723
724 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
725 let error_counts = error_counts.clone();
726 let client = client.clone();
727 let on_block = on_block.clone();
728 let on_tx = on_tx.clone();
729 let on_entry = on_entry.clone();
730 let on_reward = on_rewards.clone();
731 let on_error = on_error.clone();
732 let overall_slots_processed = overall_slots_processed.clone();
733 let overall_blocks_processed = overall_blocks_processed.clone();
734 let overall_transactions_processed = overall_transactions_processed.clone();
735 let overall_entries_processed = overall_entries_processed.clone();
736 let stats_tracking = stats_tracking.clone();
737 let transactions_since_stats = Arc::new(AtomicU64::new(0));
738 let blocks_since_stats = Arc::new(AtomicU64::new(0));
739 let slots_since_stats = Arc::new(AtomicU64::new(0));
740 let last_pulse = Arc::new(AtomicU64::new(0));
741 let transactions_since_stats_cloned = transactions_since_stats.clone();
742 let blocks_since_stats_cloned = blocks_since_stats.clone();
743 let slots_since_stats_cloned = slots_since_stats.clone();
744 let last_pulse_cloned = last_pulse.clone();
745 let shutdown_flag = shutdown_flag.clone();
746 let pending_skipped_slots = pending_skipped_slots.clone();
747 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
748
749 let handle = tokio::spawn(async move {
750 let transactions_since_stats = transactions_since_stats_cloned;
751 let blocks_since_stats = blocks_since_stats_cloned;
752 let slots_since_stats = slots_since_stats_cloned;
753 let last_pulse = last_pulse_cloned;
754 let mut shutdown_rx = thread_shutdown_rx;
755 let start_time = firehose_start;
756 last_pulse.store(
757 firehose_start.elapsed().as_nanos() as u64,
758 Ordering::Relaxed,
759 );
760 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
761 let mut skip_until_index = None;
762 let last_emitted_slot = slot_range.start.saturating_sub(1);
763 let block_enabled = on_block.is_some();
764 let tx_enabled = on_tx.is_some();
765 let entry_enabled = on_entry.is_some();
766 let reward_enabled = on_reward.is_some();
767 let tracking_enabled = stats_tracking.is_some();
768 if block_enabled {
769 pending_skipped_slots.entry(thread_index).or_default();
770 }
771 let mut last_counted_slot = slot_range.start.saturating_sub(1);
772 let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
773 let mut thread_stats = if tracking_enabled {
774 Some(ThreadStats {
775 thread_id: thread_index,
776 start_time,
777 finish_time: None,
778 slot_range: slot_range.clone(),
779 initial_slot_range: slot_range.clone(),
780 current_slot: slot_range.start,
781 slots_processed: 0,
782 blocks_processed: 0,
783 leader_skipped_slots: 0,
784 transactions_processed: 0,
785 entries_processed: 0,
786 rewards_processed: 0,
787 })
788 } else {
789 None
790 };
791
792 while let Err((err, slot)) = async {
794 let mut last_emitted_slot = last_emitted_slot_global;
795 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
796 log::info!(
797 target: &log_target,
798 "shutdown requested; terminating firehose thread {}",
799 thread_index
800 );
801 return Ok(());
802 }
803 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
804 log::info!(
805 target: &log_target,
806 "slot range: {} (epoch {}) ... {} (epoch {})",
807 slot_range.start,
808 slot_to_epoch(slot_range.start),
809 slot_range.end,
810 slot_to_epoch(slot_range.end)
811 );
812
813 log::info!(target: &log_target, "🚒 starting firehose...");
814
815 let mut current_slot: Option<u64> = None;
817 for epoch_num in epoch_range.clone() {
818 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
819 log::info!(
820 target: &log_target,
821 "shutdown requested; terminating firehose thread {}",
822 thread_index
823 );
824 return Ok(());
825 }
826 log::info!(target: &log_target, "entering epoch {}", epoch_num);
827 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
828 Ok(stream) => stream,
829 Err(_) => {
830 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
831 }
832 };
833 let mut reader = NodeReader::new(stream);
834
835 let header_fut = reader.read_raw_header();
836 let header = match timeout(OP_TIMEOUT, header_fut).await {
837 Ok(res) => res
838 .map_err(FirehoseError::ReadHeader)
839 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
840 Err(_) => {
841 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
842 }
843 };
844 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
845
846 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
847 let local_start = std::cmp::max(slot_range.start, epoch_start);
848 let local_end_inclusive =
849 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
850 if local_start > local_end_inclusive {
851 log::debug!(
852 target: &log_target,
853 "epoch {} has no overlap with thread range ({}..{}), skipping",
854 epoch_num,
855 slot_range.start,
856 slot_range.end
857 );
858 continue;
859 }
860
861 let mut previous_blockhash = Hash::default();
862 let mut latest_entry_blockhash = Hash::default();
863 last_counted_slot = local_start.saturating_sub(1);
866 current_slot = None;
867 if tracking_enabled
868 && let Some(ref mut stats) = thread_stats {
869 stats.current_slot = local_start;
870 stats.slot_range.start = local_start;
871 }
872
873 if local_start > epoch_start {
874 let seek_slot = local_start.saturating_sub(1);
878 let seek_fut = reader.seek_to_slot(seek_slot);
879 match timeout(OP_TIMEOUT, seek_fut).await {
880 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
881 Err(_) => {
882 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
883 }
884 }
885 }
886
887 let mut item_index = 0;
889 let mut displayed_skip_message = false;
890 loop {
891 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
892 log::info!(
893 target: &log_target,
894 "shutdown requested; terminating firehose thread {}",
895 thread_index
896 );
897 return Ok(());
898 }
899 let read_fut = reader.read_until_block();
900 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
901 Ok(result) => result
902 .map_err(FirehoseError::ReadUntilBlockError)
903 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
904 Err(_) => {
905 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
906 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
907 }
908 };
909 if nodes.is_empty() {
910 log::info!(
911 target: &log_target,
912 "reached end of epoch {}",
913 epoch_num
914 );
915 break;
916 }
917 if let Some(last_node) = nodes.0.last()
918 && !last_node.get_node().is_block()
919 {
920 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
921 break;
922 }
923 let block = nodes
924 .get_block()
925 .map_err(FirehoseError::GetBlockError)
926 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
927 log::debug!(
928 target: &log_target,
929 "read {} items from epoch {}, now at slot {}",
930 item_index,
931 epoch_num,
932 block.slot
933 );
934 let slot = block.slot;
935 if slot > local_end_inclusive {
936 log::debug!(
937 target: &log_target,
938 "reached end of local slice at slot {} (epoch {}), stopping",
939 slot,
940 epoch_num
941 );
942 break;
943 }
944 if slot >= slot_range.end {
945 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
946 if block_enabled {
951 pending_skipped_slots.remove(&thread_index);
952 }
953 return Ok(());
954 }
955 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
956 if slot < slot_range.start {
957 if slot.saturating_add(1) == slot_range.start {
958 log::debug!(
959 target: &log_target,
960 "priming reader with preceding slot {}, skipping",
961 slot
962 );
963 } else {
964 log::warn!(
965 target: &log_target,
966 "encountered slot {} before start of range {}, skipping",
967 slot,
968 slot_range.start
969 );
970 }
971 continue;
972 }
973 current_slot = Some(slot);
974 let mut entry_index: usize = 0;
975 let mut this_block_executed_transaction_count: u64 = 0;
976 let mut this_block_entry_count: u64 = 0;
977 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
978
979 for node_with_cid in &nodes.0 {
980 item_index += 1;
981 if let Some(skip) = skip_until_index {
982 if item_index < skip {
983 if !displayed_skip_message {
984 log::info!(
985 target: &log_target,
986 "skipping until index {} (at {})",
987 skip,
988 item_index
989 );
990 displayed_skip_message = true;
991 }
992 continue;
993 } else {
994 log::info!(
995 target: &log_target,
996 "reached target index {}, resuming...",
997 skip
998 );
999 skip_until_index = None;
1000 }
1001 }
1002 let node = node_with_cid.get_node();
1003
1004 if let Some(ref mut stats) = thread_stats {
1005 stats.current_slot = slot;
1006 }
1007
1008 let error_slot = current_slot.unwrap_or(slot_range.start);
1009
1010 use crate::node::Node::*;
1011 match node {
1012 Transaction(tx) => {
1013 if tx_enabled
1014 && let Some(on_tx_cb) = on_tx.as_ref()
1015 {
1016 let error_slot = current_slot.unwrap_or(slot_range.start);
1017 let versioned_tx = tx.as_parsed().map_err(|err| {
1018 (
1019 FirehoseError::NodeDecodingError(item_index, err),
1020 error_slot,
1021 )
1022 })?;
1023 let reassembled_metadata = nodes
1024 .reassemble_dataframes(tx.metadata.clone())
1025 .map_err(|err| {
1026 (
1027 FirehoseError::NodeDecodingError(item_index, err),
1028 error_slot,
1029 )
1030 })?;
1031
1032 let as_native_metadata = decode_transaction_status_meta_from_frame(
1033 block.slot,
1034 reassembled_metadata,
1035 )
1036 .map_err(|err| {
1037 (
1038 FirehoseError::NodeDecodingError(item_index, err),
1039 error_slot,
1040 )
1041 })?;
1042
1043 let message_hash = {
1044 #[cfg(feature = "verify-transaction-signatures")]
1045 {
1046 versioned_tx.verify_and_hash_message().map_err(|err| {
1047 (
1048 FirehoseError::TransactionHandlerError(Box::new(err)),
1049 error_slot,
1050 )
1051 })?
1052 }
1053 #[cfg(not(feature = "verify-transaction-signatures"))]
1054 {
1055 versioned_tx.message.hash()
1056 }
1057 };
1058 let signature = versioned_tx
1059 .signatures
1060 .first()
1061 .ok_or_else(|| {
1062 Box::new(std::io::Error::new(
1063 std::io::ErrorKind::InvalidData,
1064 "transaction missing signature",
1065 )) as SharedError
1066 })
1067 .map_err(|err| {
1068 (
1069 FirehoseError::NodeDecodingError(
1070 item_index,
1071 err,
1072 ),
1073 error_slot,
1074 )
1075 })?;
1076 let is_vote = is_simple_vote_transaction(&versioned_tx);
1077
1078 on_tx_cb(
1079 thread_index,
1080 TransactionData {
1081 slot: block.slot,
1082 transaction_slot_index: tx.index.unwrap() as usize,
1083 signature: *signature,
1084 message_hash,
1085 is_vote,
1086 transaction_status_meta: as_native_metadata.clone(),
1087 transaction: versioned_tx.clone(),
1088 },
1089 )
1090 .await
1091 .map_err(|e| {
1092 (
1093 FirehoseError::TransactionHandlerError(e),
1094 error_slot,
1095 )
1096 })?;
1097 }
1098 fetch_add_if(
1099 tracking_enabled,
1100 &overall_transactions_processed,
1101 1,
1102 );
1103 if let Some(ref mut stats) = thread_stats {
1104 stats.transactions_processed += 1;
1105 }
1106 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1107 }
1108 Entry(entry) => {
1109 let entry_hash = Hash::from(entry.hash.to_bytes());
1110 let entry_transaction_count = entry.transactions.len();
1111 let entry_transaction_count_u64 = entry_transaction_count as u64;
1112 let starting_transaction_index_u64 =
1113 this_block_executed_transaction_count;
1114 latest_entry_blockhash = entry_hash;
1115 this_block_executed_transaction_count += entry_transaction_count_u64;
1116 this_block_entry_count += 1;
1117
1118 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1119 let starting_transaction_index = usize::try_from(
1120 starting_transaction_index_u64,
1121 )
1122 .map_err(|err| {
1123 (
1124 FirehoseError::EntryHandlerError(Box::new(err)),
1125 error_slot,
1126 )
1127 })?;
1128 let transaction_indexes_end =
1129 starting_transaction_index + entry_transaction_count;
1130 on_entry_cb(
1131 thread_index,
1132 EntryData {
1133 slot: block.slot,
1134 entry_index,
1135 transaction_indexes: starting_transaction_index
1136 ..transaction_indexes_end,
1137 num_hashes: entry.num_hashes,
1138 hash: entry_hash,
1139 },
1140 )
1141 .await
1142 .map_err(|e| {
1143 (
1144 FirehoseError::EntryHandlerError(e),
1145 error_slot,
1146 )
1147 })?;
1148 }
1149 entry_index += 1;
1150 fetch_add_if(
1151 tracking_enabled,
1152 &overall_entries_processed,
1153 1,
1154 );
1155 if let Some(ref mut stats) = thread_stats {
1156 stats.entries_processed += 1;
1157 }
1158 }
1159 Block(block) => {
1160 let prev_last_counted_slot = last_counted_slot;
1161 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1162 (
1163 stats.slots_processed,
1164 stats.blocks_processed,
1165 stats.leader_skipped_slots,
1166 stats.current_slot,
1167 )
1168 });
1169
1170 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1171 let skip_start_from_previous = last_counted_slot.saturating_add(1);
1172 let skip_start = skip_start_from_previous.max(next_expected_slot);
1173
1174 let skipped_epoch = slot_to_epoch(last_counted_slot);
1175 for skipped_slot in skip_start..slot {
1176 if slot_to_epoch(skipped_slot) != skipped_epoch {
1177 break;
1178 }
1179 log::debug!(
1180 target: &log_target,
1181 "leader skipped slot {} (prev_counted {}, current slot {})",
1182 skipped_slot,
1183 prev_last_counted_slot,
1184 slot,
1185 );
1186 if block_enabled {
1187 pending_skipped_slots
1188 .entry(thread_index)
1189 .or_default()
1190 .insert(skipped_slot);
1191 }
1192 if block_enabled
1193 && let Some(on_block_cb) = on_block.as_ref()
1194 && skipped_slot > last_emitted_slot {
1195 last_emitted_slot = skipped_slot;
1196 on_block_cb(
1197 thread_index,
1198 BlockData::PossibleLeaderSkipped {
1199 slot: skipped_slot,
1200 },
1201 )
1202 .await
1203 .map_err(|e| {
1204 (
1205 FirehoseError::BlockHandlerError(e),
1206 error_slot,
1207 )
1208 })?;
1209 }
1210 if tracking_enabled {
1211 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1212 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1213 if let Some(ref mut stats) = thread_stats {
1214 stats.leader_skipped_slots += 1;
1215 stats.slots_processed += 1;
1216 stats.current_slot = skipped_slot;
1217 }
1218 }
1219 last_counted_slot = skipped_slot;
1220 }
1221
1222 let cleared_pending_skip = if block_enabled {
1223 clear_pending_skip(
1224 &pending_skipped_slots,
1225 thread_index,
1226 slot,
1227 )
1228 } else {
1229 false
1230 };
1231
1232 if slot <= last_counted_slot && !cleared_pending_skip {
1233 log::debug!(
1234 target: &log_target,
1235 "duplicate block {}, already counted (last_counted={})",
1236 slot,
1237 last_counted_slot,
1238 );
1239 this_block_rewards.clear();
1240 continue;
1241 }
1242
1243 if block_enabled {
1244 if let Some(on_block_cb) = on_block.as_ref() {
1245 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1246 if slot > last_emitted_slot {
1247 last_emitted_slot = slot;
1248 on_block_cb(
1249 thread_index,
1250 BlockData::Block {
1251 parent_slot: block.meta.parent_slot,
1252 parent_blockhash: previous_blockhash,
1253 slot: block.slot,
1254 blockhash: latest_entry_blockhash,
1255 rewards: KeyedRewardsAndNumPartitions {
1256 keyed_rewards,
1257 num_partitions: None,
1258 },
1259 block_time: Some(block.meta.blocktime as i64),
1260 block_height: block.meta.block_height,
1261 executed_transaction_count:
1262 this_block_executed_transaction_count,
1263 entry_count: this_block_entry_count,
1264 },
1265 )
1266 .await
1267 .map_err(|e| {
1268 (
1269 FirehoseError::BlockHandlerError(e),
1270 error_slot,
1271 )
1272 })?;
1273 }
1274 }
1275 } else {
1276 this_block_rewards.clear();
1277 }
1278 previous_blockhash = latest_entry_blockhash;
1279
1280 if tracking_enabled {
1281 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1282 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1283 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1284 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1285 if let Some(ref mut stats) = thread_stats {
1286 stats.blocks_processed += 1;
1287 stats.slots_processed += 1;
1288 stats.current_slot = slot;
1289 }
1290
1291 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1292 (&stats_tracking, thread_stats.as_mut())
1293 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1294 && let Err(err) = maybe_emit_stats(
1295 stats_tracking.as_ref(),
1296 thread_index,
1297 thread_stats_ref,
1298 &overall_slots_processed,
1299 &overall_blocks_processed,
1300 &overall_transactions_processed,
1301 &overall_entries_processed,
1302 &transactions_since_stats,
1303 &blocks_since_stats,
1304 &slots_since_stats,
1305 &last_pulse,
1306 start_time,
1307 )
1308 .await
1309 {
1310 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1311 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1312 overall_blocks_processed
1313 .fetch_sub(1, Ordering::Relaxed);
1314 overall_slots_processed
1315 .fetch_sub(1, Ordering::Relaxed);
1316 if let Some((
1317 prev_slots_processed,
1318 prev_blocks_processed,
1319 prev_leader_skipped,
1320 prev_current_slot,
1321 )) = thread_stats_snapshot
1322 {
1323 thread_stats_ref.slots_processed =
1324 prev_slots_processed;
1325 thread_stats_ref.blocks_processed =
1326 prev_blocks_processed;
1327 thread_stats_ref.leader_skipped_slots =
1328 prev_leader_skipped;
1329 thread_stats_ref.current_slot =
1330 prev_current_slot;
1331 }
1332 last_counted_slot = prev_last_counted_slot;
1333 return Err(err);
1334 }
1335 }
1336
1337 if slot > last_counted_slot {
1338 last_counted_slot = slot;
1339 }
1340 }
1341 Subset(_subset) => (),
1342 Epoch(_epoch) => (),
1343 Rewards(rewards) => {
1344 if reward_enabled || block_enabled {
1345 let reassembled = nodes
1346 .reassemble_dataframes(rewards.data.clone())
1347 .map_err(|err| {
1348 (
1349 FirehoseError::NodeDecodingError(item_index, err),
1350 current_slot.unwrap_or(slot_range.start),
1351 )
1352 })?;
1353 if reassembled.is_empty() {
1354 this_block_rewards.clear();
1355 if reward_enabled
1356 && let Some(on_reward_cb) = on_reward.as_ref()
1357 {
1358 on_reward_cb(
1359 thread_index,
1360 RewardsData {
1361 slot: block.slot,
1362 rewards: Vec::new(),
1363 },
1364 )
1365 .await
1366 .map_err(|e| {
1367 (
1368 FirehoseError::RewardHandlerError(e),
1369 error_slot,
1370 )
1371 })?;
1372 }
1373 continue;
1374 }
1375
1376 let decompressed = utils::decompress_zstd(reassembled)
1377 .map_err(|err| {
1378 (
1379 FirehoseError::NodeDecodingError(
1380 item_index,
1381 err,
1382 ),
1383 error_slot,
1384 )
1385 })?;
1386
1387 let decoded =
1388 prost_011::Message::decode(decompressed.as_slice())
1389 .map_err(|err| {
1390 (
1391 FirehoseError::NodeDecodingError(
1392 item_index,
1393 Box::new(err),
1394 ),
1395 error_slot,
1396 )
1397 })?;
1398 let keyed_rewards = convert_proto_rewards(&decoded)
1399 .map_err(|err| {
1400 (
1401 FirehoseError::NodeDecodingError(item_index, err),
1402 error_slot,
1403 )
1404 })?;
1405 if reward_enabled
1406 && let Some(on_reward_cb) = on_reward.as_ref()
1407 {
1408 on_reward_cb(
1409 thread_index,
1410 RewardsData {
1411 slot: block.slot,
1412 rewards: keyed_rewards.clone(),
1413 },
1414 )
1415 .await
1416 .map_err(|e| {
1417 (
1418 FirehoseError::RewardHandlerError(e),
1419 error_slot,
1420 )
1421 })?;
1422 }
1423 this_block_rewards = keyed_rewards;
1424 if let Some(ref mut stats) = thread_stats {
1425 stats.rewards_processed +=
1426 this_block_rewards.len() as u64;
1427 }
1428 }
1429 }
1430 DataFrame(_data_frame) => (),
1431 }
1432 }
1433 if block.slot == slot_range.end - 1 {
1434 let finish_time = std::time::Instant::now();
1435 let elapsed = finish_time.duration_since(start_time);
1436 log::info!(target: &log_target, "processed slot {}", block.slot);
1437 let elapsed_pretty = human_readable_duration(elapsed);
1438 log::info!(
1439 target: &log_target,
1440 "processed {} slots across {} epochs in {}.",
1441 slot_range.end - slot_range.start,
1442 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1443 elapsed_pretty
1444 );
1445 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1446 let summary: String = error_counts
1449 .iter()
1450 .enumerate()
1451 .filter_map(|(i, c)| {
1452 let v = c.load(Ordering::Relaxed);
1453 if v > 0 {
1454 Some(format!("{:03}({})", i, v))
1455 } else {
1456 None
1457 }
1458 })
1459 .collect::<Vec<_>>()
1460 .join(", ");
1461 if !summary.is_empty() {
1462 log::debug!(target: &log_target, "threads with errors: {}", summary);
1463 }
1464 return Ok(());
1465 }
1466 }
1467 if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1468 && last_counted_slot < expected_last_slot
1469 {
1470 }
1473 if let Some(ref mut stats) = thread_stats {
1474 stats.finish_time = Some(std::time::Instant::now());
1475 maybe_emit_stats(
1476 stats_tracking.as_ref(),
1477 thread_index,
1478 stats,
1479 &overall_slots_processed,
1480 &overall_blocks_processed,
1481 &overall_transactions_processed,
1482 &overall_entries_processed,
1483 &transactions_since_stats,
1484 &blocks_since_stats,
1485 &slots_since_stats,
1486 &last_pulse,
1487 start_time,
1488 )
1489 .await?;
1490 }
1491 if block_enabled {
1492 pending_skipped_slots.remove(&thread_index);
1493 }
1494 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1495 }
1496 Ok(())
1497 }
1498 .await
1499 {
1500 if is_shutdown_error(&err) {
1501 log::info!(
1502 target: &log_target,
1503 "shutdown requested; terminating firehose thread {}",
1504 thread_index
1505 );
1506 break;
1507 }
1508 let epoch = slot_to_epoch(slot);
1509 let item_index = match &err {
1510 FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1511 _ => 0,
1512 };
1513 let error_message = err.to_string();
1514 log::error!(
1515 target: &log_target,
1516 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1517 slot,
1518 epoch
1519 );
1520 log::error!(target: &log_target, "{}", error_message);
1521 if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
1522 SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1524 }
1525 if let Some(on_error_cb) = on_error.clone() {
1526 let context = FirehoseErrorContext {
1527 thread_id: thread_index,
1528 slot,
1529 epoch,
1530 error_message: error_message.clone(),
1531 };
1532 if let Err(handler_err) = on_error_cb(thread_index, context).await {
1533 log::error!(
1534 target: &log_target,
1535 "on_error handler failed: {}",
1536 handler_err
1537 );
1538 }
1539 }
1540 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1542 log::warn!(
1543 target: &log_target,
1544 "restarting from slot {} at index {}",
1545 slot,
1546 item_index,
1547 );
1548 if slot <= last_counted_slot {
1552 slot_range.start = last_counted_slot.saturating_add(1);
1553 } else {
1554 slot_range.start = slot;
1555 }
1556 last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1558 if tracking_enabled
1559 && let Some(ref mut stats_ref) = thread_stats {
1560 stats_ref.slot_range.start = slot_range.start;
1561 stats_ref.slot_range.end = slot_range.end;
1562 }
1564 if block_enabled {
1565 pending_skipped_slots.remove(&thread_index);
1566 }
1567 skip_until_index = Some(item_index);
1568 last_emitted_slot_global = last_emitted_slot;
1569 }
1570 });
1571 handles.push(handle);
1572 }
1573
1574 for handle in handles {
1576 handle.await.unwrap();
1577 }
1578 if stats_tracking.is_some() {
1579 let elapsed = firehose_start.elapsed();
1580 let elapsed_secs = elapsed.as_secs_f64();
1581 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1582 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1583 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1584 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1585 let total_errors: u64 = error_counts
1586 .iter()
1587 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1588 .sum();
1589 let overall_tps = if elapsed_secs > 0.0 {
1590 total_transactions as f64 / elapsed_secs
1591 } else {
1592 0.0
1593 };
1594 log::info!(
1595 target: LOG_MODULE,
1596 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1597 elapsed_secs,
1598 total_slots,
1599 total_blocks,
1600 total_leader_skipped,
1601 total_transactions,
1602 overall_tps,
1603 total_errors
1604 );
1605 }
1606 if shutdown_flag.load(Ordering::SeqCst) {
1607 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1608 } else {
1609 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1610 }
1611 Ok(())
1612}
1613
1614#[allow(clippy::result_large_err)]
1615pub fn firehose_geyser(
1622 rt: Arc<tokio::runtime::Runtime>,
1623 slot_range: Range<u64>,
1624 geyser_config_files: Option<&[PathBuf]>,
1625 index_base_url: &Url,
1626 client: &Client,
1627 on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1628 threads: u64,
1629) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1630 if threads == 0 {
1631 return Err((
1632 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1633 slot_range.start,
1634 ));
1635 }
1636 log::info!(target: LOG_MODULE, "starting firehose...");
1637 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1638 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1639 let mut entry_notifier_maybe = None;
1640 let mut block_meta_notifier_maybe = None;
1641 let mut transaction_notifier_maybe = None;
1642 if let Some(geyser_config_files) = geyser_config_files {
1643 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1644
1645 let service =
1646 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1647 confirmed_bank_receiver.clone(),
1648 true,
1649 geyser_config_files,
1650 )
1651 .map_err(|e| (e.into(), slot_range.start))?;
1652
1653 transaction_notifier_maybe = Some(
1654 service
1655 .get_transaction_notifier()
1656 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1657 .map_err(|e| (e, slot_range.start))?,
1658 );
1659
1660 entry_notifier_maybe = service.get_entry_notifier();
1661 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1662
1663 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1664 }
1665
1666 if entry_notifier_maybe.is_some() {
1667 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1668 } else {
1669 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1670 }
1671 log::info!(target: LOG_MODULE, "running on_load...");
1672 rt.spawn(on_load);
1673
1674 let slot_range = Arc::new(slot_range);
1675 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1676 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1677 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1678 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1679
1680 let subranges = generate_subranges(&slot_range, threads);
1682 if threads > 1 {
1683 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1684 }
1685
1686 let mut handles = Vec::new();
1687 let error_counts: Arc<Vec<AtomicU32>> =
1689 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1690
1691 for (i, slot_range) in subranges.into_iter().enumerate() {
1692 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1693 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1694 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1695 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1696 let client = client.clone();
1697 let error_counts = error_counts.clone();
1698
1699 let rt_clone = rt.clone();
1700
1701 let handle = std::thread::spawn(move || {
1702 rt_clone.block_on(async {
1703 firehose_geyser_thread(
1704 slot_range,
1705 transaction_notifier_maybe,
1706 entry_notifier_maybe,
1707 block_meta_notifier_maybe,
1708 confirmed_bank_sender,
1709 &client,
1710 if threads > 1 { Some(i) } else { None },
1711 error_counts,
1712 )
1713 .await
1714 .unwrap();
1715 });
1716 });
1717 handles.push(handle);
1718 }
1719
1720 for handle in handles {
1722 handle.join().unwrap();
1723 }
1724 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1725 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1726 block_meta_notifier.notify_block_metadata(
1727 u64::MAX,
1728 "unload",
1729 u64::MAX,
1730 "unload",
1731 &KeyedRewardsAndNumPartitions {
1732 keyed_rewards: vec![],
1733 num_partitions: None,
1734 },
1735 None,
1736 None,
1737 0,
1738 0,
1739 );
1740 }
1741 Ok(confirmed_bank_receiver)
1742}
1743
1744#[allow(clippy::too_many_arguments)]
1745#[allow(clippy::result_large_err)]
1746async fn firehose_geyser_thread(
1747 mut slot_range: Range<u64>,
1748 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1749 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1750 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1751 confirmed_bank_sender: Sender<SlotNotification>,
1752 client: &Client,
1753 thread_index: Option<usize>,
1754 error_counts: Arc<Vec<AtomicU32>>,
1755) -> Result<(), (FirehoseError, u64)> {
1756 let start_time = std::time::Instant::now();
1757 let log_target = if let Some(thread_index) = thread_index {
1758 format!("{}::T{:03}", LOG_MODULE, thread_index)
1759 } else {
1760 LOG_MODULE.to_string()
1761 };
1762 let initial_slot_range = slot_range.clone();
1763 let mut skip_until_index = None;
1764 let mut last_counted_slot = slot_range.start.saturating_sub(1);
1765 while let Err((err, slot)) = async {
1767 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1768 log::info!(
1769 target: &log_target,
1770 "slot range: {} (epoch {}) ... {} (epoch {})",
1771 slot_range.start,
1772 slot_to_epoch(slot_range.start),
1773 slot_range.end,
1774 slot_to_epoch(slot_range.end)
1775 );
1776
1777 log::info!(target: &log_target, "🚒 starting firehose...");
1778
1779 let mut current_slot: Option<u64> = None;
1781 for epoch_num in epoch_range.clone() {
1782 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1783 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1784 Ok(stream) => stream,
1785 Err(_) => {
1786 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1787 }
1788 };
1789 let mut reader = NodeReader::new(stream);
1790
1791 let header_fut = reader.read_raw_header();
1792 let header = match timeout(OP_TIMEOUT, header_fut).await {
1793 Ok(res) => res
1794 .map_err(FirehoseError::ReadHeader)
1795 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1796 Err(_) => {
1797 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1798 }
1799 };
1800 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1801
1802 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1803 let local_start = std::cmp::max(slot_range.start, epoch_start);
1804 let local_end_inclusive =
1805 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1806 if local_start > local_end_inclusive {
1807 log::debug!(
1808 target: &log_target,
1809 "epoch {} has no overlap with thread range ({}..{}), skipping",
1810 epoch_num,
1811 slot_range.start,
1812 slot_range.end
1813 );
1814 continue;
1815 }
1816
1817 let mut todo_previous_blockhash = Hash::default();
1818 let mut todo_latest_entry_blockhash = Hash::default();
1819 last_counted_slot = local_start.saturating_sub(1);
1822 current_slot = None;
1823
1824 if local_start > epoch_start {
1825 let seek_slot = local_start.saturating_sub(1);
1829 let seek_fut = reader.seek_to_slot(seek_slot);
1830 match timeout(OP_TIMEOUT, seek_fut).await {
1831 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1832 Err(_) => {
1833 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1834 }
1835 }
1836 }
1837
1838 let mut item_index = 0;
1840 let mut displayed_skip_message = false;
1841 loop {
1842 let read_fut = reader.read_until_block();
1843 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1844 Ok(result) => result
1845 .map_err(FirehoseError::ReadUntilBlockError)
1846 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1847 Err(_) => {
1848 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1849 let restart_slot =
1850 current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
1851 return Err((
1852 FirehoseError::OperationTimeout("read_until_block"),
1853 restart_slot,
1854 ));
1855 }
1856 };
1857 if nodes.is_empty() {
1858 log::info!(
1859 target: &log_target,
1860 "reached end of epoch {}",
1861 epoch_num
1862 );
1863 break;
1864 }
1865 if let Some(last_node) = nodes.0.last()
1874 && !last_node.get_node().is_block() {
1875 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1876 break;
1877 }
1878 let block = nodes
1879 .get_block()
1880 .map_err(FirehoseError::GetBlockError)
1881 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1882 log::debug!(
1883 target: &log_target,
1884 "read {} items from epoch {}, now at slot {}",
1885 item_index,
1886 epoch_num,
1887 block.slot
1888 );
1889 let slot = block.slot;
1890 if slot > local_end_inclusive {
1891 log::debug!(
1892 target: &log_target,
1893 "reached end of local slice at slot {} (epoch {}), stopping",
1894 slot,
1895 epoch_num
1896 );
1897 break;
1898 }
1899 if slot >= slot_range.end {
1900 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1901 return Ok(());
1905 }
1906 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1907 if slot < local_start {
1908 if slot.saturating_add(1) == local_start {
1909 log::debug!(
1910 target: &log_target,
1911 "priming reader with preceding slot {}, skipping",
1912 slot
1913 );
1914 } else {
1915 log::warn!(
1916 target: &log_target,
1917 "encountered slot {} before start of range {}, skipping",
1918 slot,
1919 local_start
1920 );
1921 }
1922 continue;
1923 }
1924 current_slot = Some(slot);
1925 let mut entry_index: usize = 0;
1926 let mut this_block_executed_transaction_count: u64 = 0;
1927 let mut this_block_entry_count: u64 = 0;
1928 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1929
1930 if slot <= last_counted_slot {
1931 log::debug!(
1932 target: &log_target,
1933 "duplicate block {}, already counted (last_counted={})",
1934 slot,
1935 last_counted_slot,
1936 );
1937 this_block_rewards.clear();
1938 continue;
1939 }
1940
1941 nodes.each(|node_with_cid| -> Result<(), SharedError> {
1942 item_index += 1;
1943 if let Some(skip) = skip_until_index {
1949 if item_index < skip {
1950 if !displayed_skip_message {
1951 log::info!(
1952 target: &log_target,
1953 "skipping until index {} (at {})",
1954 skip,
1955 item_index
1956 );
1957 displayed_skip_message = true;
1958 }
1959 return Ok(());
1960 } else {
1961 log::info!(
1962 target: &log_target,
1963 "reached target index {}, resuming...",
1964 skip
1965 );
1966 skip_until_index = None;
1967 }
1968 }
1969 let node = node_with_cid.get_node();
1970
1971 use crate::node::Node::*;
1972 match node {
1973 Transaction(tx) => {
1974 let versioned_tx = tx.as_parsed()?;
1975 let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1976
1977 let as_native_metadata = decode_transaction_status_meta_from_frame(
1978 block.slot,
1979 reassembled_metadata,
1980 )?;
1981
1982 let message_hash = {
1983 #[cfg(feature = "verify-transaction-signatures")]
1984 {
1985 versioned_tx.verify_and_hash_message()?
1986 }
1987 #[cfg(not(feature = "verify-transaction-signatures"))]
1988 {
1989 versioned_tx.message.hash()
1992 }
1993 };
1994 let signature = versioned_tx
1995 .signatures
1996 .first()
1997 .ok_or_else(|| {
1998 Box::new(std::io::Error::new(
1999 std::io::ErrorKind::InvalidData,
2000 "transaction missing signature",
2001 )) as SharedError
2002 })?;
2003 let is_vote = is_simple_vote_transaction(&versioned_tx);
2004
2005 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2006 transaction_notifier.notify_transaction(
2007 block.slot,
2008 tx.index.unwrap() as usize,
2009 signature,
2010 &message_hash,
2011 is_vote,
2012 &as_native_metadata,
2013 &versioned_tx,
2014 );
2015 }
2016
2017 }
2018 Entry(entry) => {
2019 let entry_hash = Hash::from(entry.hash.to_bytes());
2020 let entry_transaction_count = entry.transactions.len();
2021 let entry_transaction_count_u64 = entry_transaction_count as u64;
2022 let starting_transaction_index =
2023 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2024 Box::new(std::io::Error::other(
2025 "transaction index exceeds usize range",
2026 )) as SharedError
2027 })?;
2028 todo_latest_entry_blockhash = entry_hash;
2029 this_block_executed_transaction_count += entry_transaction_count_u64;
2030 this_block_entry_count += 1;
2031 if entry_notifier_maybe.is_none() {
2032 return Ok(());
2033 }
2034 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2035 let entry_summary = solana_entry::entry::EntrySummary {
2036 num_hashes: entry.num_hashes,
2037 hash: Hash::from(entry.hash.to_bytes()),
2038 num_transactions: entry_transaction_count_u64,
2039 };
2040 entry_notifier.notify_entry(
2041 block.slot,
2042 entry_index,
2043 &entry_summary,
2044 starting_transaction_index,
2045 );
2046 entry_index += 1;
2047 }
2048 Block(block) => {
2049 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2050 confirmed_bank_sender.send(notification).unwrap();
2051
2052 if block_meta_notifier_maybe.is_none() {
2053 last_counted_slot = block.slot;
2054 return Ok(());
2055 }
2056 let keyed_rewards = std::mem::take(&mut this_block_rewards);
2057 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2058 block_meta_notifier.notify_block_metadata(
2059 block.meta.parent_slot,
2060 todo_previous_blockhash.to_string().as_str(),
2061 block.slot,
2062 todo_latest_entry_blockhash.to_string().as_str(),
2063 &KeyedRewardsAndNumPartitions {
2064 keyed_rewards,
2065 num_partitions: None,
2066 },
2067 Some(block.meta.blocktime as i64),
2068 block.meta.block_height,
2069 this_block_executed_transaction_count,
2070 this_block_entry_count,
2071 );
2072 todo_previous_blockhash = todo_latest_entry_blockhash;
2073 last_counted_slot = block.slot;
2074 std::thread::yield_now();
2075 }
2076 Subset(_subset) => (),
2077 Epoch(_epoch) => (),
2078 Rewards(rewards) => {
2079 if !rewards.is_complete() {
2080 let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
2081 let decompressed = utils::decompress_zstd(reassembled)?;
2082 let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
2083 Box::new(std::io::Error::other(
2084 std::format!("Error decoding rewards: {:?}", err),
2085 ))
2086 })?;
2087 this_block_rewards = convert_proto_rewards(&decoded)?;
2088 }
2089 }
2090 DataFrame(_data_frame) => (),
2091 }
2092 Ok(())
2093 })
2094 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2095 if block.slot == slot_range.end - 1 {
2096 let finish_time = std::time::Instant::now();
2097 let elapsed = finish_time.duration_since(start_time);
2098 log::info!(target: &log_target, "processed slot {}", block.slot);
2099 let elapsed_pretty = human_readable_duration(elapsed);
2100 log::info!(
2101 target: &log_target,
2102 "processed {} slots across {} epochs in {}.",
2103 initial_slot_range.end - initial_slot_range.start,
2104 slot_to_epoch(initial_slot_range.end)
2105 + 1
2106 - slot_to_epoch(initial_slot_range.start),
2107 elapsed_pretty
2108 );
2109 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2110 let summary: String = error_counts
2113 .iter()
2114 .enumerate()
2115 .filter_map(|(i, c)| {
2116 let v = c.load(Ordering::Relaxed);
2117 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2118 })
2119 .collect::<Vec<_>>()
2120 .join(", ");
2121 if !summary.is_empty() {
2122 log::debug!(target: &log_target, "threads with errors: {}", summary);
2123 }
2124 return Ok(());
2125 }
2126 }
2127 }
2128 Ok(())
2129}
2130.await
2131{
2132 if is_shutdown_error(&err) {
2133 log::info!(
2134 target: &log_target,
2135 "shutdown requested; terminating firehose thread {:?}",
2136 thread_index
2137 );
2138 return Ok(());
2139 }
2140 log::error!(
2141 target: &log_target,
2142 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2143 slot,
2144 slot_to_epoch(slot)
2145 );
2146 log::error!(target: &log_target, "{}", err);
2147 if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
2148 SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2150 }
2151 let item_index = match err {
2152 FirehoseError::NodeDecodingError(item_index, _) => item_index,
2153 _ => 0,
2154 };
2155 let idx = thread_index.unwrap_or(0);
2157 error_counts[idx].fetch_add(1, Ordering::Relaxed);
2158 log::warn!(
2159 target: &log_target,
2160 "restarting from slot {} at index {}",
2161 slot,
2162 item_index,
2163 );
2164 if slot <= last_counted_slot {
2167 slot_range.start = last_counted_slot.saturating_add(1);
2168 } else {
2169 slot_range.start = slot;
2170 }
2171 skip_until_index = Some(item_index);
2172}
2173 Ok(())
2174}
2175
2176#[inline]
2177fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2178 if !(1..=2).contains(&versioned_tx.signatures.len()) {
2179 return false;
2180 }
2181
2182 if !matches!(
2183 versioned_tx.version(),
2184 solana_transaction::versioned::TransactionVersion::Legacy(_)
2185 ) {
2186 return false;
2187 }
2188
2189 let instructions = versioned_tx.message.instructions();
2190 if instructions.len() != 1 {
2191 return false;
2192 }
2193
2194 let program_index = instructions[0].program_id_index as usize;
2195 versioned_tx
2196 .message
2197 .static_account_keys()
2198 .get(program_index)
2199 .map(|program_id| program_id == &vote_program_id())
2200 .unwrap_or(false)
2201}
2202
2203#[inline(always)]
2204fn convert_proto_rewards(
2205 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2206) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2207 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2208 for proto_reward in proto_rewards.rewards.iter() {
2209 let reward = RewardInfo {
2210 reward_type: match proto_reward.reward_type - 1 {
2211 0 => RewardType::Fee,
2212 1 => RewardType::Rent,
2213 2 => RewardType::Staking,
2214 3 => RewardType::Voting,
2215 typ => {
2216 return Err(Box::new(std::io::Error::other(format!(
2217 "unsupported reward type {}",
2218 typ
2219 ))));
2220 }
2221 },
2222 lamports: proto_reward.lamports,
2223 post_balance: proto_reward.post_balance,
2224 commission: proto_reward.commission.parse::<u8>().ok(),
2225 };
2226 let pubkey = proto_reward
2227 .pubkey
2228 .parse::<Address>()
2229 .map_err(|err| Box::new(err) as SharedError)?;
2230 keyed_rewards.push((pubkey, reward));
2231 }
2232 Ok(keyed_rewards)
2233}
2234
2235#[inline]
2236pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2238 let total = slot_range.end - slot_range.start;
2239 let slots_per_thread = total / threads;
2240 let remainder = total % threads;
2241
2242 let ranges: Vec<Range<u64>> = (0..threads)
2243 .map(|i| {
2244 let extra_slot = if i < remainder { 1 } else { 0 };
2246 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2247 let end = start + slots_per_thread + extra_slot;
2248 start..end
2249 })
2250 .collect();
2251
2252 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2254 assert_eq!(
2255 total_covered, total,
2256 "Range generation failed: {} threads should cover {} slots but only cover {}",
2257 threads, total, total_covered
2258 );
2259
2260 for i in 1..ranges.len() {
2262 assert_eq!(
2263 ranges[i - 1].end,
2264 ranges[i].start,
2265 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2266 i - 1,
2267 ranges[i - 1].end,
2268 i,
2269 ranges[i].start
2270 );
2271 }
2272
2273 log::info!(
2274 target: LOG_MODULE,
2275 "Generated {} thread ranges covering {} slots total",
2276 threads,
2277 total_covered
2278 );
2279 ranges
2280}
2281
2282fn human_readable_duration(duration: std::time::Duration) -> String {
2283 if duration.is_zero() {
2284 return "0s".into();
2285 }
2286 let total_secs = duration.as_secs();
2287 if total_secs < 60 {
2288 let secs_f = duration.as_secs_f64();
2289 if total_secs == 0 {
2290 format!("{:.2}s", secs_f)
2291 } else if duration.subsec_millis() == 0 {
2292 format!("{}s", total_secs)
2293 } else {
2294 format!("{:.2}s", secs_f)
2295 }
2296 } else {
2297 let mut secs = total_secs;
2298 let days = secs / 86_400;
2299 secs %= 86_400;
2300 let hours = secs / 3_600;
2301 secs %= 3_600;
2302 let minutes = secs / 60;
2303 secs %= 60;
2304 if days > 0 {
2305 if hours > 0 {
2306 format!("{days}d{hours}h")
2307 } else {
2308 format!("{days}d")
2309 }
2310 } else if hours > 0 {
2311 if minutes > 0 {
2312 format!("{hours}h{minutes}m")
2313 } else {
2314 format!("{hours}h")
2315 }
2316 } else if minutes > 0 {
2317 if secs > 0 {
2318 format!("{minutes}m{secs}s")
2319 } else {
2320 format!("{minutes}m")
2321 }
2322 } else {
2323 format!("{secs}s")
2324 }
2325 }
2326}
2327
2328#[cfg(test)]
2329fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2330 Box::pin(async move {
2331 let elapsed = stats.start_time.elapsed();
2332 let elapsed_secs = elapsed.as_secs_f64();
2333 let tps = if elapsed_secs > 0.0 {
2334 stats.transactions_processed as f64 / elapsed_secs
2335 } else {
2336 0.0
2337 };
2338 log::info!(
2339 target: LOG_MODULE,
2340 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2341 stats.thread_stats.current_slot,
2342 stats.slots_processed,
2343 stats.blocks_processed,
2344 stats.transactions_processed,
2345 stats.entries_processed,
2346 stats.rewards_processed,
2347 elapsed_secs,
2348 tps
2349 );
2350 Ok(())
2351 })
2352}
2353
2354#[cfg(test)]
2355use futures_util::FutureExt;
2356#[cfg(test)]
2357use serial_test::serial;
2358#[cfg(test)]
2359use std::sync::{Mutex, OnceLock};
2360
2361#[tokio::test(flavor = "multi_thread")]
2362async fn test_firehose_epoch_800() {
2363 use dashmap::DashSet;
2364 use std::sync::atomic::{AtomicU64, Ordering};
2365 solana_logger::setup_with_default("info");
2366 const THREADS: usize = 4;
2367 const NUM_SLOTS_TO_COVER: u64 = 50;
2368 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2369 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2370 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2371 static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2372 static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2373 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2374 let stats_tracking = StatsTracking {
2375 on_stats: log_stats_handler,
2376 tracking_interval_slots: 10,
2377 };
2378
2379 for prev in PREV_BLOCK.iter() {
2380 prev.store(0, Ordering::Relaxed);
2381 }
2382 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2383 NUM_BLOCKS.store(0, Ordering::Relaxed);
2384 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2385 SEEN_SLOTS.get_or_init(DashSet::new).clear();
2386 SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2387
2388 firehose(
2389 THREADS.try_into().unwrap(),
2390 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2391 Some(|thread_id: usize, block: BlockData| {
2392 async move {
2393 let _prev =
2394 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2395 if block.was_skipped() {
2396 log::info!(
2397 target: LOG_MODULE,
2398 "leader skipped block {} on thread {}",
2399 block.slot(),
2400 thread_id,
2401 );
2402 } else {
2403 }
2410
2411 let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2412 if block.was_skipped() {
2413 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2414 SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2415 } else {
2416 if first_time {
2417 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2418 if let BlockData::Block {
2419 executed_transaction_count,
2420 ..
2421 } = &block
2422 {
2423 let executed = *executed_transaction_count;
2424 let _ = MIN_TRANSACTIONS.fetch_update(
2425 Ordering::Relaxed,
2426 Ordering::Relaxed,
2427 |current| {
2428 if executed < current {
2429 Some(executed)
2430 } else {
2431 None
2432 }
2433 },
2434 );
2435 }
2436 }
2437 }
2438 Ok(())
2439 }
2440 .boxed()
2441 }),
2442 None::<OnTxFn>,
2443 None::<OnEntryFn>,
2444 None::<OnRewardFn>,
2445 None::<OnErrorFn>,
2446 Some(stats_tracking),
2447 None,
2448 )
2449 .await
2450 .unwrap();
2451 let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2452 assert_eq!(
2453 seen, NUM_SLOTS_TO_COVER,
2454 "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2455 );
2456 let mut skipped: Vec<u64> = SEEN_SKIPPED
2457 .get_or_init(DashSet::new)
2458 .iter()
2459 .map(|v| *v)
2460 .collect();
2461 skipped.sort_unstable();
2462 const EXPECTED_SKIPPED: [u64; 6] = [
2464 345_600_004,
2465 345_600_005,
2466 345_600_008,
2467 345_600_009,
2468 345_600_010,
2469 345_600_011,
2470 ];
2471 assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2472 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2473}
2474
2475#[tokio::test(flavor = "multi_thread")]
2476async fn test_firehose_target_slot_transactions() {
2477 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2478 solana_logger::setup_with_default("info");
2479 const TARGET_SLOT: u64 = 376_273_722;
2480 const SLOT_RADIUS: u64 = 50;
2481 const EXPECTED_TRANSACTIONS: u64 = 1414;
2482 const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2483 static FOUND: AtomicBool = AtomicBool::new(false);
2484 static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2485 static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2486
2487 FOUND.store(false, Ordering::Relaxed);
2488 OBSERVED_TXS.store(0, Ordering::Relaxed);
2489 OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2490
2491 firehose(
2492 4,
2493 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2494 Some(|_thread_id: usize, block: BlockData| {
2495 async move {
2496 if block.slot() == TARGET_SLOT {
2497 assert!(
2498 !block.was_skipped(),
2499 "target slot {TARGET_SLOT} was marked leader skipped",
2500 );
2501 if let BlockData::Block {
2502 executed_transaction_count,
2503 ..
2504 } = block
2505 {
2506 OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2507 FOUND.store(true, Ordering::Relaxed);
2508 assert_eq!(
2509 executed_transaction_count, EXPECTED_TRANSACTIONS,
2510 "unexpected transaction count for slot {TARGET_SLOT}"
2511 );
2512 assert_eq!(
2513 OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2514 EXPECTED_NON_VOTE_TRANSACTIONS,
2515 "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2516 );
2517 }
2518 }
2519 Ok(())
2520 }
2521 .boxed()
2522 }),
2523 Some(|_thread_id: usize, transaction: TransactionData| {
2524 async move {
2525 if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2526 OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2527 }
2528 Ok(())
2529 }
2530 .boxed()
2531 }),
2532 None::<OnEntryFn>,
2533 None::<OnRewardFn>,
2534 None::<OnErrorFn>,
2535 None::<OnStatsTrackingFn>,
2536 None,
2537 )
2538 .await
2539 .unwrap();
2540
2541 assert!(
2542 FOUND.load(Ordering::Relaxed),
2543 "target slot was not processed"
2544 );
2545 assert_eq!(
2546 OBSERVED_TXS.load(Ordering::Relaxed),
2547 EXPECTED_TRANSACTIONS,
2548 "recorded transaction count mismatch"
2549 );
2550}
2551
2552#[tokio::test(flavor = "multi_thread")]
2553async fn test_firehose_epoch_850_votes_present() {
2554 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2555 solana_logger::setup_with_default("info");
2556 const TARGET_SLOT: u64 = 367_200_100; const SLOT_RADIUS: u64 = 10;
2558 static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
2559 static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
2560 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
2561
2562 SEEN_BLOCK.store(false, Ordering::Relaxed);
2563 VOTE_TXS.store(0, Ordering::Relaxed);
2564 TOTAL_TXS.store(0, Ordering::Relaxed);
2565
2566 firehose(
2567 2,
2568 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2569 Some(|_thread_id: usize, block: BlockData| {
2570 async move {
2571 if block.slot() == TARGET_SLOT {
2572 assert!(
2573 !block.was_skipped(),
2574 "target slot {TARGET_SLOT} was marked leader skipped",
2575 );
2576 SEEN_BLOCK.store(true, Ordering::Relaxed);
2577 }
2578 Ok(())
2579 }
2580 .boxed()
2581 }),
2582 Some(|_thread_id: usize, transaction: TransactionData| {
2583 async move {
2584 if transaction.slot == TARGET_SLOT {
2585 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
2586 if transaction.is_vote {
2587 VOTE_TXS.fetch_add(1, Ordering::Relaxed);
2588 }
2589 }
2590 Ok(())
2591 }
2592 .boxed()
2593 }),
2594 None::<OnEntryFn>,
2595 None::<OnRewardFn>,
2596 None::<OnErrorFn>,
2597 None::<OnStatsTrackingFn>,
2598 None,
2599 )
2600 .await
2601 .unwrap();
2602
2603 assert!(
2604 SEEN_BLOCK.load(Ordering::Relaxed),
2605 "target slot was not processed"
2606 );
2607 assert!(
2608 TOTAL_TXS.load(Ordering::Relaxed) > 0,
2609 "no transactions counted in target slot"
2610 );
2611 assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
2612}
2613
2614#[cfg(test)]
2615#[serial]
2616#[tokio::test(flavor = "multi_thread")]
2617async fn test_firehose_restart_loses_coverage_without_reset() {
2618 use std::collections::HashMap;
2619 solana_logger::setup_with_default("info");
2620 const THREADS: usize = 1;
2621 const START_SLOT: u64 = 345_600_000;
2622 const NUM_SLOTS: u64 = 8;
2623
2624 static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
2625 COVERAGE
2626 .get_or_init(|| Mutex::new(HashMap::new()))
2627 .lock()
2628 .unwrap()
2629 .clear();
2630 static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
2631 static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
2632 FAIL_TRIGGERED.store(false, Ordering::Relaxed);
2633 SEEN_BLOCKS.store(0, Ordering::Relaxed);
2634
2635 firehose(
2636 THREADS.try_into().unwrap(),
2637 START_SLOT..(START_SLOT + NUM_SLOTS),
2638 Some(|_thread_id: usize, block: BlockData| {
2639 async move {
2640 if !block.was_skipped()
2642 && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
2643 && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
2644 {
2645 return Err("synthetic handler failure to exercise restart".into());
2646 }
2647 let mut coverage = COVERAGE
2648 .get_or_init(|| Mutex::new(HashMap::new()))
2649 .lock()
2650 .unwrap();
2651 *coverage.entry(block.slot()).or_insert(0) += 1;
2652 if !block.was_skipped() {
2653 SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
2654 }
2655 Ok(())
2656 }
2657 .boxed()
2658 }),
2659 None::<OnTxFn>,
2660 None::<OnEntryFn>,
2661 None::<OnRewardFn>,
2662 None::<OnErrorFn>,
2663 None::<OnStatsTrackingFn>,
2664 None,
2665 )
2666 .await
2667 .unwrap();
2668
2669 let coverage = COVERAGE.get().unwrap().lock().unwrap();
2670 for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
2671 assert!(
2672 coverage.contains_key(&slot),
2673 "missing coverage for slot {slot} after restart"
2674 );
2675 }
2676}
2677
2678#[cfg(test)]
2679#[serial]
2680#[tokio::test(flavor = "multi_thread")]
2681async fn test_firehose_gap_coverage_near_known_missing_range() {
2682 use std::collections::HashSet;
2683 solana_logger::setup_with_default("info");
2684 const GAP_START: u64 = 378864000;
2685 const START_SLOT: u64 = GAP_START - 1000;
2686 const END_SLOT: u64 = GAP_START + 1000;
2687 const THREADS: usize = 16;
2688
2689 static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
2690 COVERAGE
2691 .get_or_init(|| Mutex::new(HashSet::new()))
2692 .lock()
2693 .unwrap()
2694 .clear();
2695
2696 firehose(
2697 THREADS.try_into().unwrap(),
2698 START_SLOT..(END_SLOT + 1),
2699 Some(|_thread_id: usize, block: BlockData| {
2700 async move {
2701 if block.was_skipped() {
2702 return Ok(());
2703 }
2704 let slot = block.slot();
2705 COVERAGE
2706 .get_or_init(|| Mutex::new(HashSet::new()))
2707 .lock()
2708 .unwrap()
2709 .insert(slot);
2710 Ok(())
2711 }
2712 .boxed()
2713 }),
2714 None::<OnTxFn>,
2715 None::<OnEntryFn>,
2716 None::<OnRewardFn>,
2717 None::<OnErrorFn>,
2718 None::<OnStatsTrackingFn>,
2719 None,
2720 )
2721 .await
2722 .unwrap();
2723
2724 let mut coverage = COVERAGE
2725 .get_or_init(|| Mutex::new(HashSet::new()))
2726 .lock()
2727 .unwrap()
2728 .clone();
2729
2730 coverage.insert(378864396);
2732 coverage.insert(378864397);
2733 coverage.insert(378864398);
2734 coverage.insert(378864399);
2735
2736 let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
2737 let missing: Vec<u64> = expected
2738 .iter()
2739 .copied()
2740 .filter(|slot| !coverage.contains(slot))
2741 .collect();
2742 assert!(
2743 missing.is_empty(),
2744 "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
2745 missing.len(),
2746 &missing[..missing.len().min(10)]
2747 );
2748}