1use crossbeam_channel::{Receiver, Sender, unbounded};
2#[cfg(test)]
3use futures_util::FutureExt;
4use futures_util::future::BoxFuture;
5use reqwest::{Client, Url};
6use solana_address::Address;
7use solana_geyser_plugin_manager::{
8 block_metadata_notifier_interface::BlockMetadataNotifier,
9 geyser_plugin_service::GeyserPluginServiceError,
10};
11use solana_hash::Hash;
12use solana_ledger::entry_notifier_interface::EntryNotifier;
13use solana_reward_info::RewardInfo;
14use solana_rpc::{
15 optimistically_confirmed_bank_tracker::SlotNotification,
16 transaction_notifier_interface::TransactionNotifier,
17};
18use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
19use solana_sdk_ids::vote::id as vote_program_id;
20use solana_transaction::versioned::VersionedTransaction;
21use std::{
22 fmt::Display,
23 future::Future,
24 io,
25 ops::Range,
26 path::PathBuf,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
30 },
31};
32use thiserror::Error;
33use tokio::{
34 sync::broadcast::{self, error::TryRecvError},
35 time::timeout,
36};
37
38use crate::{
39 LOG_MODULE,
40 epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
41 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
42 node_reader::NodeReader,
43 utils,
44};
45
46const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
50
51fn poll_shutdown(
52 flag: &Arc<std::sync::atomic::AtomicBool>,
53 receiver: &mut Option<broadcast::Receiver<()>>,
54) -> bool {
55 if let Some(rx) = receiver {
56 match rx.try_recv() {
57 Ok(_) | Err(TryRecvError::Lagged(_)) => {
58 flag.store(true, Ordering::SeqCst);
59 }
60 Err(TryRecvError::Closed) => {
61 flag.store(true, Ordering::SeqCst);
62 }
63 Err(TryRecvError::Empty) => {}
64 }
65 }
66 flag.load(Ordering::SeqCst)
67}
68
69fn is_shutdown_error(err: &FirehoseError) -> bool {
70 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
71 inner
72 .downcast_ref::<io::Error>()
73 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
74 .unwrap_or(false)
75 }
76
77 match err {
78 FirehoseError::BlockHandlerError(inner)
79 | FirehoseError::TransactionHandlerError(inner)
80 | FirehoseError::EntryHandlerError(inner)
81 | FirehoseError::RewardHandlerError(inner)
82 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
83 _ => false,
84 }
85}
86
87#[derive(Debug, Error)]
90pub enum FirehoseError {
91 Reqwest(reqwest::Error),
93 ReadHeader(Box<dyn std::error::Error>),
95 GeyserPluginService(GeyserPluginServiceError),
97 FailedToGetTransactionNotifier,
99 ReadUntilBlockError(Box<dyn std::error::Error>),
101 GetBlockError(Box<dyn std::error::Error>),
103 NodeDecodingError(usize, Box<dyn std::error::Error>),
105 SlotOffsetIndexError(SlotOffsetIndexError),
107 SeekToSlotError(Box<dyn std::error::Error>),
109 OnLoadError(Box<dyn std::error::Error>),
111 OnStatsHandlerError(Box<dyn std::error::Error>),
113 OperationTimeout(&'static str),
115 TransactionHandlerError(Box<dyn std::error::Error>),
117 EntryHandlerError(Box<dyn std::error::Error>),
119 RewardHandlerError(Box<dyn std::error::Error>),
121 BlockHandlerError(Box<dyn std::error::Error>),
123}
124
125impl Display for FirehoseError {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
129 FirehoseError::ReadHeader(error) => {
130 write!(f, "Error reading header: {}", error)
131 }
132 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
133 f,
134 "Error initializing geyser plugin service: {}",
135 geyser_plugin_service_error
136 ),
137 FirehoseError::FailedToGetTransactionNotifier => write!(
138 f,
139 "Failed to get transaction notifier from GeyserPluginService"
140 ),
141 FirehoseError::ReadUntilBlockError(error) => {
142 write!(f, "Error reading until block: {}", error)
143 }
144 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
145 FirehoseError::NodeDecodingError(item_index, error) => {
146 write!(
147 f,
148 "Error seeking, reading data from, or decoding data for data node {}: {}",
149 item_index, error
150 )
151 }
152 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
153 f,
154 "Error getting info from slot offset index: {}",
155 slot_offset_index_error
156 ),
157 FirehoseError::SeekToSlotError(error) => {
158 write!(f, "Error seeking to slot: {}", error)
159 }
160 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
161 FirehoseError::OnStatsHandlerError(error) => {
162 write!(f, "Stats handler error: {}", error)
163 }
164 FirehoseError::OperationTimeout(op) => {
165 write!(f, "Timeout while waiting for operation: {}", op)
166 }
167 FirehoseError::TransactionHandlerError(error) => {
168 write!(f, "Transaction handler error: {}", error)
169 }
170 FirehoseError::EntryHandlerError(error) => {
171 write!(f, "Entry handler error: {}", error)
172 }
173 FirehoseError::RewardHandlerError(error) => {
174 write!(f, "Reward handler error: {}", error)
175 }
176 FirehoseError::BlockHandlerError(error) => {
177 write!(f, "Block handler error: {}", error)
178 }
179 }
180 }
181}
182
183impl From<reqwest::Error> for FirehoseError {
184 fn from(e: reqwest::Error) -> Self {
185 FirehoseError::Reqwest(e)
186 }
187}
188
189impl From<GeyserPluginServiceError> for FirehoseError {
190 fn from(e: GeyserPluginServiceError) -> Self {
191 FirehoseError::GeyserPluginService(e)
192 }
193}
194
195impl From<SlotOffsetIndexError> for FirehoseError {
196 fn from(e: SlotOffsetIndexError) -> Self {
197 FirehoseError::SlotOffsetIndexError(e)
198 }
199}
200
201#[derive(Clone, PartialEq, Eq, Hash, Debug)]
203pub struct ThreadStats {
204 pub thread_id: usize,
206 pub start_time: std::time::Instant,
208 pub finish_time: Option<std::time::Instant>,
210 pub slot_range: Range<u64>,
212 pub current_slot: u64,
214 pub slots_processed: u64,
216 pub blocks_processed: u64,
218 pub leader_skipped_slots: u64,
220 pub transactions_processed: u64,
222 pub entries_processed: u64,
224 pub rewards_processed: u64,
226}
227
228#[derive(Clone, PartialEq, Eq, Hash, Debug)]
230pub struct Stats {
231 pub thread_stats: ThreadStats,
233 pub start_time: std::time::Instant,
235 pub finish_time: Option<std::time::Instant>,
237 pub slot_range: Range<u64>,
239 pub slots_processed: u64,
241 pub blocks_processed: u64,
243 pub leader_skipped_slots: u64,
245 pub transactions_processed: u64,
247 pub entries_processed: u64,
249 pub rewards_processed: u64,
251 pub transactions_since_last_pulse: u64,
253 pub blocks_since_last_pulse: u64,
255 pub slots_since_last_pulse: u64,
257 pub time_since_last_pulse: std::time::Duration,
259}
260
261#[derive(Clone, PartialEq, Eq, Hash, Debug)]
263pub struct StatsTracking<OnStats: Handler<Stats>> {
264 pub on_stats: OnStats,
266 pub tracking_interval_slots: u64,
268}
269
270#[inline(always)]
271#[allow(clippy::too_many_arguments)]
272async fn maybe_emit_stats<OnStats: Handler<Stats>>(
273 stats_tracking: Option<&StatsTracking<OnStats>>,
274 thread_index: usize,
275 thread_stats: &ThreadStats,
276 overall_slots_processed: &AtomicU64,
277 overall_blocks_processed: &AtomicU64,
278 overall_transactions_processed: &AtomicU64,
279 overall_entries_processed: &AtomicU64,
280 transactions_since_stats: &AtomicU64,
281 blocks_since_stats: &AtomicU64,
282 slots_since_stats: &AtomicU64,
283 last_pulse: &Arc<AtomicU64>,
284 base_instant: std::time::Instant,
285) -> Result<(), (FirehoseError, u64)> {
286 if let Some(stats_tracker) = stats_tracking {
287 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
288 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
289 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
290 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
291 let now_nanos = base_instant.elapsed().as_nanos() as u64;
292 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
293 let delta_nanos = now_nanos.saturating_sub(previous);
294 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
295 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
296 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
297 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
298
299 let stats = Stats {
300 thread_stats: thread_stats.clone(),
301 start_time: thread_stats.start_time,
302 finish_time: thread_stats.finish_time,
303 slot_range: thread_stats.slot_range.clone(),
304 slots_processed: total_slots,
305 blocks_processed: total_blocks,
306 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
307 transactions_processed: total_transactions,
308 entries_processed: total_entries,
309 rewards_processed: thread_stats.rewards_processed,
310 transactions_since_last_pulse: processed_transactions,
311 blocks_since_last_pulse: processed_blocks,
312 slots_since_last_pulse: processed_slots,
313 time_since_last_pulse,
314 };
315
316 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
317 last_pulse.store(previous, Ordering::Relaxed);
318 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
319 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
320 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
321 return Err((
322 FirehoseError::OnStatsHandlerError(e),
323 thread_stats.current_slot,
324 ));
325 }
326 }
327 Ok(())
328}
329
330#[inline(always)]
331fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
332 if tracking_enabled {
333 atomic.fetch_add(value, Ordering::Relaxed);
334 }
335}
336
337#[derive(Debug, Clone)]
339pub struct TransactionData {
340 pub slot: u64,
342 pub transaction_slot_index: usize,
344 pub signature: solana_signature::Signature,
346 pub message_hash: Hash,
348 pub is_vote: bool,
350 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
352 pub transaction: VersionedTransaction,
354}
355
356#[derive(Debug, Clone)]
358pub struct EntryData {
359 pub slot: u64,
361 pub entry_index: usize,
363 pub transaction_indexes: Range<usize>,
365 pub num_hashes: u64,
367 pub hash: Hash,
369}
370
371#[derive(Debug, Clone)]
373pub struct RewardsData {
374 pub slot: u64,
376 pub rewards: Vec<(Address, RewardInfo)>,
378}
379
380#[derive(Debug)]
382pub enum BlockData {
383 Block {
385 parent_slot: u64,
387 parent_blockhash: Hash,
389 slot: u64,
391 blockhash: Hash,
393 rewards: KeyedRewardsAndNumPartitions,
395 block_time: Option<i64>,
397 block_height: Option<u64>,
399 executed_transaction_count: u64,
401 entry_count: u64,
403 },
404 LeaderSkipped {
406 slot: u64,
408 },
409}
410
411impl BlockData {
412 #[inline(always)]
414 pub const fn slot(&self) -> u64 {
415 match self {
416 BlockData::Block { slot, .. } => *slot,
417 BlockData::LeaderSkipped { slot } => *slot,
418 }
419 }
420
421 #[inline(always)]
423 pub const fn was_skipped(&self) -> bool {
424 matches!(self, BlockData::LeaderSkipped { .. })
425 }
426}
427
428type HandlerResult = Result<(), Box<dyn std::error::Error + Send + 'static>>;
429type HandlerFuture = BoxFuture<'static, HandlerResult>;
430
431pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
433
434impl<Data, F> Handler<Data> for F where
435 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
436{
437}
438
439pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
441pub type OnBlockFn = HandlerFn<BlockData>;
443pub type OnTxFn = HandlerFn<TransactionData>;
445pub type OnEntryFn = HandlerFn<EntryData>;
447pub type OnRewardFn = HandlerFn<RewardsData>;
449pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
451
452#[inline]
454#[allow(clippy::too_many_arguments)]
455pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats>(
456 threads: u64,
457 slot_range: Range<u64>,
458 on_block: Option<OnBlock>,
459 on_tx: Option<OnTransaction>,
460 on_entry: Option<OnEntry>,
461 on_rewards: Option<OnRewards>,
462 stats_tracking: Option<StatsTracking<OnStats>>,
463 shutdown_signal: Option<broadcast::Receiver<()>>,
464) -> Result<(), (FirehoseError, u64)>
465where
466 OnBlock: Handler<BlockData>,
467 OnTransaction: Handler<TransactionData>,
468 OnEntry: Handler<EntryData>,
469 OnRewards: Handler<RewardsData>,
470 OnStats: Handler<Stats>,
471{
472 if threads == 0 {
473 return Err((
474 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
475 slot_range.start,
476 ));
477 }
478 let client = Client::new();
479 log::info!(target: LOG_MODULE, "starting firehose...");
480 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
481
482 let slot_range = Arc::new(slot_range);
483
484 let subranges = generate_subranges(&slot_range, threads);
486 if threads > 1 {
487 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
488 }
489
490 let firehose_start = std::time::Instant::now();
491 let shutdown_flag = Arc::new(AtomicBool::new(false));
492 if let Some(ref rx) = shutdown_signal {
493 let mut rx = rx.resubscribe();
494 let flag = shutdown_flag.clone();
495 tokio::spawn(async move {
496 if rx.recv().await.is_ok() {
497 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
498 flag.store(true, Ordering::SeqCst);
499 }
500 });
501 }
502 let mut handles = Vec::new();
503 let error_counts: Arc<Vec<AtomicU32>> =
505 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
506
507 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
508 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
509 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
510 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
511
512 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
513 let error_counts = error_counts.clone();
514 let client = client.clone();
515 let on_block = on_block.clone();
516 let on_tx = on_tx.clone();
517 let on_entry = on_entry.clone();
518 let on_reward = on_rewards.clone();
519 let overall_slots_processed = overall_slots_processed.clone();
520 let overall_blocks_processed = overall_blocks_processed.clone();
521 let overall_transactions_processed = overall_transactions_processed.clone();
522 let overall_entries_processed = overall_entries_processed.clone();
523 let stats_tracking = stats_tracking.clone();
524 let transactions_since_stats = Arc::new(AtomicU64::new(0));
525 let blocks_since_stats = Arc::new(AtomicU64::new(0));
526 let slots_since_stats = Arc::new(AtomicU64::new(0));
527 let last_pulse = Arc::new(AtomicU64::new(0));
528 let transactions_since_stats_cloned = transactions_since_stats.clone();
529 let blocks_since_stats_cloned = blocks_since_stats.clone();
530 let slots_since_stats_cloned = slots_since_stats.clone();
531 let last_pulse_cloned = last_pulse.clone();
532 let shutdown_flag = shutdown_flag.clone();
533 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
534
535 let handle = tokio::spawn(async move {
536 let transactions_since_stats = transactions_since_stats_cloned;
537 let blocks_since_stats = blocks_since_stats_cloned;
538 let slots_since_stats = slots_since_stats_cloned;
539 let last_pulse = last_pulse_cloned;
540 let mut shutdown_rx = thread_shutdown_rx;
541 let start_time = std::time::Instant::now();
542 last_pulse.store(0, Ordering::Relaxed);
543 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
544 let mut skip_until_index = None;
545 let block_enabled = on_block.is_some();
546 let tx_enabled = on_tx.is_some();
547 let entry_enabled = on_entry.is_some();
548 let reward_enabled = on_reward.is_some();
549 let tracking_enabled = stats_tracking.is_some();
550 let mut last_counted_slot = slot_range.start.saturating_sub(1);
551
552 while let Err((err, slot)) = async {
554 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
555 log::info!(
556 target: &log_target,
557 "shutdown requested; terminating firehose thread {}",
558 thread_index
559 );
560 return Ok(());
561 }
562 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
563 log::info!(
564 target: &log_target,
565 "slot range: {} (epoch {}) ... {} (epoch {})",
566 slot_range.start,
567 slot_to_epoch(slot_range.start),
568 slot_range.end,
569 slot_to_epoch(slot_range.end)
570 );
571
572 log::info!(target: &log_target, "🚒 starting firehose...");
573
574 let mut current_slot: Option<u64> = None;
576 let mut previous_slot: Option<u64> = Some(slot_range.start.saturating_sub(1));
577 for epoch_num in epoch_range.clone() {
578 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
579 log::info!(
580 target: &log_target,
581 "shutdown requested; terminating firehose thread {}",
582 thread_index
583 );
584 return Ok(());
585 }
586 log::info!(target: &log_target, "entering epoch {}", epoch_num);
587 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
588 Ok(stream) => stream,
589 Err(_) => {
590 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
591 }
592 };
593 let mut reader = NodeReader::new(stream);
594
595 let header_fut = reader.read_raw_header();
596 let header = match timeout(OP_TIMEOUT, header_fut).await {
597 Ok(res) => res
598 .map_err(FirehoseError::ReadHeader)
599 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
600 Err(_) => {
601 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
602 }
603 };
604 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
605
606 let mut previous_blockhash = Hash::default();
607 let mut latest_entry_blockhash = Hash::default();
608
609 let mut thread_stats = if tracking_enabled {
610 Some(ThreadStats {
611 thread_id: thread_index,
612 start_time,
613 finish_time: None,
614 slot_range: slot_range.clone(),
615 current_slot: slot_range.start,
616 slots_processed: 0,
617 blocks_processed: 0,
618 leader_skipped_slots: 0,
619 transactions_processed: 0,
620 entries_processed: 0,
621 rewards_processed: 0,
622 })
623 } else {
624 None
625 };
626
627 if slot_range.start > epoch_to_slot_range(epoch_num).0 {
628 let seek_fut = reader.seek_to_slot(slot_range.start);
629 match timeout(OP_TIMEOUT, seek_fut).await {
630 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
631 Err(_) => {
632 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
633 }
634 }
635 }
636
637 let mut item_index = 0;
639 let mut displayed_skip_message = false;
640 loop {
641 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
642 log::info!(
643 target: &log_target,
644 "shutdown requested; terminating firehose thread {}",
645 thread_index
646 );
647 return Ok(());
648 }
649 let read_fut = reader.read_until_block();
650 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
651 Ok(result) => result
652 .map_err(FirehoseError::ReadUntilBlockError)
653 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
654 Err(_) => {
655 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
656 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
657 }
658 };
659 if nodes.is_empty() {
660 log::info!(
661 target: &log_target,
662 "reached end of epoch {}",
663 epoch_num
664 );
665 break;
666 }
667 if let Some(last_node) = nodes.0.last()
668 && !last_node.get_node().is_block()
669 {
670 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
671 break;
672 }
673 let block = nodes
674 .get_block()
675 .map_err(FirehoseError::GetBlockError)
676 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
677 log::debug!(
678 target: &log_target,
679 "read {} items from epoch {}, now at slot {}",
680 item_index,
681 epoch_num,
682 block.slot
683 );
684 let slot = block.slot;
685 if slot >= slot_range.end {
686 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
687 let target_last_slot = slot_range.end.saturating_sub(1);
692 let skip_start_from_previous = previous_slot
693 .map(|s| s.saturating_add(1))
694 .unwrap_or(slot_range.start);
695 let first_untracked_slot = last_counted_slot.saturating_add(1);
696 let skip_start = std::cmp::max(skip_start_from_previous, first_untracked_slot);
697
698 if skip_start <= target_last_slot {
699 if block_enabled
700 && let Some(on_block_cb) = on_block.as_ref() {
701 for skipped_slot in skip_start..=target_last_slot {
702 log::debug!(
703 target: &log_target,
704 "leader skipped slot {} (prev_counted {}, target {})",
705 skipped_slot,
706 last_counted_slot,
707 target_last_slot,
708 );
709 on_block_cb(
710 thread_index,
711 BlockData::LeaderSkipped { slot: skipped_slot },
712 )
713 .await
714 .map_err(|e| FirehoseError::BlockHandlerError(e))
715 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
716 }
717 }
718
719 let missing_slots = target_last_slot.saturating_sub(skip_start) + 1;
720 if tracking_enabled {
721 if let Some(ref mut stats) = thread_stats {
722 stats.leader_skipped_slots += missing_slots;
723 stats.slots_processed += missing_slots;
724 stats.current_slot = target_last_slot;
725 }
726 overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
727 slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
728 }
729 last_counted_slot = target_last_slot;
730 }
731
732 return Ok(());
733 }
734 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
735 if slot < slot_range.start {
736 log::warn!(
737 target: &log_target,
738 "encountered slot {} before start of range {}, skipping",
739 slot,
740 slot_range.start
741 );
742 continue;
743 }
744 if current_slot.is_some() {
745 previous_slot = current_slot;
746 }
747 current_slot = Some(slot);
748 let mut entry_index: usize = 0;
749 let mut this_block_executed_transaction_count: u64 = 0;
750 let mut this_block_entry_count: u64 = 0;
751 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
752
753 for node_with_cid in &nodes.0 {
754 item_index += 1;
755 if let Some(skip) = skip_until_index {
756 if item_index < skip {
757 if !displayed_skip_message {
758 log::info!(
759 target: &log_target,
760 "skipping until index {} (at {})",
761 skip,
762 item_index
763 );
764 displayed_skip_message = true;
765 }
766 continue;
767 } else {
768 log::info!(
769 target: &log_target,
770 "reached target index {}, resuming...",
771 skip
772 );
773 skip_until_index = None;
774 }
775 }
776 let node = node_with_cid.get_node();
777
778 if let Some(ref mut stats) = thread_stats {
779 stats.current_slot = slot;
780 }
781
782 let error_slot = current_slot.unwrap_or(slot_range.start);
783
784 use crate::node::Node::*;
785 match node {
786 Transaction(tx) => {
787 if tx_enabled
788 && let Some(on_tx_cb) = on_tx.as_ref()
789 {
790 let error_slot = current_slot.unwrap_or(slot_range.start);
791 let versioned_tx = tx.as_parsed().map_err(|err| {
792 (
793 FirehoseError::NodeDecodingError(item_index, err),
794 error_slot,
795 )
796 })?;
797 let reassembled_metadata = nodes
798 .reassemble_dataframes(tx.metadata.clone())
799 .map_err(|err| {
800 (
801 FirehoseError::NodeDecodingError(item_index, err),
802 error_slot,
803 )
804 })?;
805
806 let decompressed =
807 utils::decompress_zstd(reassembled_metadata.clone())
808 .map_err(|err| {
809 (
810 FirehoseError::NodeDecodingError(
811 item_index,
812 err,
813 ),
814 error_slot,
815 )
816 })?;
817
818 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
819 prost_011::Message::decode(decompressed.as_slice())
820 .map_err(|err| {
821 (
822 FirehoseError::NodeDecodingError(
823 item_index,
824 Box::new(err),
825 ),
826 error_slot,
827 )
828 })?;
829
830 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
831 metadata.try_into().map_err(|err| {
832 (
833 FirehoseError::NodeDecodingError(
834 item_index,
835 Box::new(err),
836 ),
837 error_slot,
838 )
839 })?;
840
841 let message_hash = {
842 #[cfg(feature = "verify-transaction-signatures")]
843 {
844 versioned_tx.verify_and_hash_message().map_err(|err| {
845 (
846 FirehoseError::TransactionHandlerError(Box::new(err)),
847 error_slot,
848 )
849 })?
850 }
851 #[cfg(not(feature = "verify-transaction-signatures"))]
852 {
853 versioned_tx.message.hash()
854 }
855 };
856 let signature = versioned_tx
857 .signatures
858 .first()
859 .ok_or_else(|| {
860 Box::new(std::io::Error::new(
861 std::io::ErrorKind::InvalidData,
862 "transaction missing signature",
863 )) as Box<dyn std::error::Error>
864 })
865 .map_err(|err| {
866 (
867 FirehoseError::NodeDecodingError(
868 item_index,
869 err,
870 ),
871 error_slot,
872 )
873 })?;
874 let is_vote = is_simple_vote_transaction(&versioned_tx);
875
876 on_tx_cb(
877 thread_index,
878 TransactionData {
879 slot: block.slot,
880 transaction_slot_index: tx.index.unwrap() as usize,
881 signature: *signature,
882 message_hash,
883 is_vote,
884 transaction_status_meta: as_native_metadata.clone(),
885 transaction: versioned_tx.clone(),
886 },
887 )
888 .await
889 .map_err(|e| {
890 (
891 FirehoseError::TransactionHandlerError(e),
892 error_slot,
893 )
894 })?;
895 }
896 fetch_add_if(
897 tracking_enabled,
898 &overall_transactions_processed,
899 1,
900 );
901 if let Some(ref mut stats) = thread_stats {
902 stats.transactions_processed += 1;
903 }
904 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
905 }
906 Entry(entry) => {
907 let entry_hash = Hash::from(entry.hash.to_bytes());
908 let entry_transaction_count = entry.transactions.len();
909 let entry_transaction_count_u64 = entry_transaction_count as u64;
910 let starting_transaction_index_u64 =
911 this_block_executed_transaction_count;
912 latest_entry_blockhash = entry_hash;
913 this_block_executed_transaction_count += entry_transaction_count_u64;
914 this_block_entry_count += 1;
915
916 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
917 let starting_transaction_index = usize::try_from(
918 starting_transaction_index_u64,
919 )
920 .map_err(|err| {
921 (
922 FirehoseError::EntryHandlerError(Box::new(err)),
923 error_slot,
924 )
925 })?;
926 let transaction_indexes_end =
927 starting_transaction_index + entry_transaction_count;
928 on_entry_cb(
929 thread_index,
930 EntryData {
931 slot: block.slot,
932 entry_index,
933 transaction_indexes: starting_transaction_index
934 ..transaction_indexes_end,
935 num_hashes: entry.num_hashes,
936 hash: entry_hash,
937 },
938 )
939 .await
940 .map_err(|e| {
941 (
942 FirehoseError::EntryHandlerError(e),
943 error_slot,
944 )
945 })?;
946 }
947 entry_index += 1;
948 fetch_add_if(
949 tracking_enabled,
950 &overall_entries_processed,
951 1,
952 );
953 if let Some(ref mut stats) = thread_stats {
954 stats.entries_processed += 1;
955 }
956 }
957 Block(block) => {
958 let prev_last_counted_slot = last_counted_slot;
959 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
960 (
961 stats.slots_processed,
962 stats.blocks_processed,
963 stats.leader_skipped_slots,
964 stats.current_slot,
965 )
966 });
967
968 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
969 let skip_start_from_previous = previous_slot
970 .map(|s| s.saturating_add(1))
971 .unwrap_or(next_expected_slot);
972 let skip_start = skip_start_from_previous.max(next_expected_slot);
973
974 for skipped_slot in skip_start..slot {
975 log::debug!(
976 target: &log_target,
977 "leader skipped slot {} (prev_counted {}, current slot {})",
978 skipped_slot,
979 prev_last_counted_slot,
980 slot,
981 );
982 if block_enabled
983 && let Some(on_block_cb) = on_block.as_ref() {
984 on_block_cb(
985 thread_index,
986 BlockData::LeaderSkipped {
987 slot: skipped_slot,
988 },
989 )
990 .await
991 .map_err(|e| {
992 (
993 FirehoseError::BlockHandlerError(e),
994 error_slot,
995 )
996 })?;
997 }
998 if tracking_enabled {
999 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1000 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1001 if let Some(ref mut stats) = thread_stats {
1002 stats.leader_skipped_slots += 1;
1003 stats.slots_processed += 1;
1004 stats.current_slot = skipped_slot;
1005 }
1006 }
1007 last_counted_slot = skipped_slot;
1008 }
1009
1010 if slot <= last_counted_slot {
1011 log::debug!(
1012 target: &log_target,
1013 "duplicate block {}, already counted (last_counted={})",
1014 slot,
1015 last_counted_slot,
1016 );
1017 this_block_rewards.clear();
1018 continue;
1019 }
1020
1021 if block_enabled {
1022 if let Some(on_block_cb) = on_block.as_ref() {
1023 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1024 on_block_cb(
1025 thread_index,
1026 BlockData::Block {
1027 parent_slot: block.meta.parent_slot,
1028 parent_blockhash: previous_blockhash,
1029 slot: block.slot,
1030 blockhash: latest_entry_blockhash,
1031 rewards: KeyedRewardsAndNumPartitions {
1032 keyed_rewards,
1033 num_partitions: None,
1034 },
1035 block_time: Some(block.meta.blocktime as i64),
1036 block_height: block.meta.block_height,
1037 executed_transaction_count:
1038 this_block_executed_transaction_count,
1039 entry_count: this_block_entry_count,
1040 },
1041 )
1042 .await
1043 .map_err(|e| {
1044 (
1045 FirehoseError::BlockHandlerError(e),
1046 error_slot,
1047 )
1048 })?;
1049 }
1050 } else {
1051 this_block_rewards.clear();
1052 }
1053 previous_blockhash = latest_entry_blockhash;
1054
1055 if tracking_enabled {
1056 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1057 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1058 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1059 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1060 if let Some(ref mut stats) = thread_stats {
1061 stats.blocks_processed += 1;
1062 stats.slots_processed += 1;
1063 stats.current_slot = slot;
1064 }
1065
1066 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1067 (&stats_tracking, thread_stats.as_mut())
1068 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1069 && let Err(err) = maybe_emit_stats(
1070 stats_tracking.as_ref(),
1071 thread_index,
1072 thread_stats_ref,
1073 &overall_slots_processed,
1074 &overall_blocks_processed,
1075 &overall_transactions_processed,
1076 &overall_entries_processed,
1077 &transactions_since_stats,
1078 &blocks_since_stats,
1079 &slots_since_stats,
1080 &last_pulse,
1081 start_time,
1082 )
1083 .await
1084 {
1085 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1086 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1087 overall_blocks_processed
1088 .fetch_sub(1, Ordering::Relaxed);
1089 overall_slots_processed
1090 .fetch_sub(1, Ordering::Relaxed);
1091 if let Some((
1092 prev_slots_processed,
1093 prev_blocks_processed,
1094 prev_leader_skipped,
1095 prev_current_slot,
1096 )) = thread_stats_snapshot
1097 {
1098 thread_stats_ref.slots_processed =
1099 prev_slots_processed;
1100 thread_stats_ref.blocks_processed =
1101 prev_blocks_processed;
1102 thread_stats_ref.leader_skipped_slots =
1103 prev_leader_skipped;
1104 thread_stats_ref.current_slot =
1105 prev_current_slot;
1106 }
1107 last_counted_slot = prev_last_counted_slot;
1108 return Err(err);
1109 }
1110 }
1111
1112 last_counted_slot = slot;
1113 }
1114 Subset(_subset) => (),
1115 Epoch(_epoch) => (),
1116 Rewards(rewards) => {
1117 if reward_enabled || block_enabled {
1118 let reassembled = nodes
1119 .reassemble_dataframes(rewards.data.clone())
1120 .map_err(|err| {
1121 (
1122 FirehoseError::NodeDecodingError(item_index, err),
1123 current_slot.unwrap_or(slot_range.start),
1124 )
1125 })?;
1126 if reassembled.is_empty() {
1127 this_block_rewards.clear();
1128 if reward_enabled
1129 && let Some(on_reward_cb) = on_reward.as_ref()
1130 {
1131 on_reward_cb(
1132 thread_index,
1133 RewardsData {
1134 slot: block.slot,
1135 rewards: Vec::new(),
1136 },
1137 )
1138 .await
1139 .map_err(|e| {
1140 (
1141 FirehoseError::RewardHandlerError(e),
1142 error_slot,
1143 )
1144 })?;
1145 }
1146 continue;
1147 }
1148
1149 let decompressed = utils::decompress_zstd(reassembled)
1150 .map_err(|err| {
1151 (
1152 FirehoseError::NodeDecodingError(
1153 item_index,
1154 err,
1155 ),
1156 error_slot,
1157 )
1158 })?;
1159
1160 let decoded =
1161 prost_011::Message::decode(decompressed.as_slice())
1162 .map_err(|err| {
1163 (
1164 FirehoseError::NodeDecodingError(
1165 item_index,
1166 Box::new(err),
1167 ),
1168 error_slot,
1169 )
1170 })?;
1171 let keyed_rewards = convert_proto_rewards(&decoded)
1172 .map_err(|err| {
1173 (
1174 FirehoseError::NodeDecodingError(item_index, err),
1175 error_slot,
1176 )
1177 })?;
1178 if reward_enabled
1179 && let Some(on_reward_cb) = on_reward.as_ref()
1180 {
1181 on_reward_cb(
1182 thread_index,
1183 RewardsData {
1184 slot: block.slot,
1185 rewards: keyed_rewards.clone(),
1186 },
1187 )
1188 .await
1189 .map_err(|e| {
1190 (
1191 FirehoseError::RewardHandlerError(e),
1192 error_slot,
1193 )
1194 })?;
1195 }
1196 this_block_rewards = keyed_rewards;
1197 if let Some(ref mut stats) = thread_stats {
1198 stats.rewards_processed +=
1199 this_block_rewards.len() as u64;
1200 }
1201 }
1202 }
1203 DataFrame(_data_frame) => (),
1204 }
1205 }
1206 if block.slot == slot_range.end - 1 {
1207 let finish_time = std::time::Instant::now();
1208 let elapsed = finish_time.duration_since(start_time);
1209 log::info!(target: &log_target, "processed slot {}", block.slot);
1210 let elapsed_pretty = human_readable_duration(elapsed);
1211 log::info!(
1212 target: &log_target,
1213 "processed {} slots across {} epochs in {}.",
1214 slot_range.end - slot_range.start,
1215 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1216 elapsed_pretty
1217 );
1218 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1219 let summary: String = error_counts
1222 .iter()
1223 .enumerate()
1224 .filter_map(|(i, c)| {
1225 let v = c.load(Ordering::Relaxed);
1226 if v > 0 {
1227 Some(format!("{:03}({})", i, v))
1228 } else {
1229 None
1230 }
1231 })
1232 .collect::<Vec<_>>()
1233 .join(", ");
1234 if !summary.is_empty() {
1235 log::debug!(target: &log_target, "threads with errors: {}", summary);
1236 }
1237 return Ok(());
1238 }
1239 }
1240 if tracking_enabled
1241 && let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1242 && last_counted_slot < expected_last_slot {
1243 let flush_start = last_counted_slot.saturating_add(1);
1244 if block_enabled
1245 && let Some(on_block_cb) = on_block.as_ref() {
1246 let error_slot = current_slot.unwrap_or(slot_range.start);
1247 for skipped_slot in flush_start..=expected_last_slot {
1248 log::debug!(
1249 target: &log_target,
1250 "leader skipped slot {} during final flush (prev_counted {})",
1251 skipped_slot,
1252 last_counted_slot,
1253 );
1254 on_block_cb(
1255 thread_index,
1256 BlockData::LeaderSkipped { slot: skipped_slot },
1257 )
1258 .await
1259 .map_err(|e| FirehoseError::BlockHandlerError(e))
1260 .map_err(|e| (e, error_slot))?;
1261 }
1262 }
1263 let missing_slots = expected_last_slot.saturating_sub(last_counted_slot);
1264 if let Some(stats_ref) = thread_stats.as_mut() {
1265 stats_ref.leader_skipped_slots += missing_slots;
1266 stats_ref.slots_processed += missing_slots;
1267 stats_ref.current_slot = expected_last_slot;
1268 }
1269 overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
1270 slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
1271 last_counted_slot = expected_last_slot;
1272 }
1273 if let Some(ref mut stats) = thread_stats {
1274 stats.finish_time = Some(std::time::Instant::now());
1275 maybe_emit_stats(
1276 stats_tracking.as_ref(),
1277 thread_index,
1278 stats,
1279 &overall_slots_processed,
1280 &overall_blocks_processed,
1281 &overall_transactions_processed,
1282 &overall_entries_processed,
1283 &transactions_since_stats,
1284 &blocks_since_stats,
1285 &slots_since_stats,
1286 &last_pulse,
1287 start_time,
1288 )
1289 .await?;
1290 }
1291 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1292 }
1293 Ok(())
1294 }
1295 .await
1296 {
1297 if is_shutdown_error(&err) {
1298 log::info!(
1299 target: &log_target,
1300 "shutdown requested; terminating firehose thread {}",
1301 thread_index
1302 );
1303 break;
1304 }
1305 log::error!(
1306 target: &log_target,
1307 "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1308 slot,
1309 slot_to_epoch(slot)
1310 );
1311 log::error!(target: &log_target, "{}", err);
1312 let item_index = match err {
1313 FirehoseError::NodeDecodingError(item_index, _) => item_index,
1314 _ => 0,
1315 };
1316 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1318 log::warn!(
1319 target: &log_target,
1320 "restarting from slot {} at index {}",
1321 slot,
1322 item_index,
1323 );
1324 slot_range.start = slot;
1326 skip_until_index = Some(item_index);
1327 }
1328 });
1329 handles.push(handle);
1330 }
1331
1332 for handle in handles {
1334 handle.await.unwrap();
1335 }
1336 if stats_tracking.is_some() {
1337 let elapsed = firehose_start.elapsed();
1338 let elapsed_secs = elapsed.as_secs_f64();
1339 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1340 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1341 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1342 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1343 let total_errors: u64 = error_counts
1344 .iter()
1345 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1346 .sum();
1347 let overall_tps = if elapsed_secs > 0.0 {
1348 total_transactions as f64 / elapsed_secs
1349 } else {
1350 0.0
1351 };
1352 log::info!(
1353 target: LOG_MODULE,
1354 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1355 elapsed_secs,
1356 total_slots,
1357 total_blocks,
1358 total_leader_skipped,
1359 total_transactions,
1360 overall_tps,
1361 total_errors
1362 );
1363 }
1364 if shutdown_flag.load(Ordering::SeqCst) {
1365 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1366 } else {
1367 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1368 }
1369 Ok(())
1370}
1371
1372#[allow(clippy::result_large_err)]
1373pub fn firehose_geyser(
1378 rt: Arc<tokio::runtime::Runtime>,
1379 slot_range: Range<u64>,
1380 geyser_config_files: Option<&[PathBuf]>,
1381 index_base_url: &Url,
1382 client: &Client,
1383 on_load: impl Future<Output = Result<(), Box<dyn std::error::Error + Send + 'static>>>
1384 + Send
1385 + 'static,
1386 threads: u64,
1387) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1388 if threads == 0 {
1389 return Err((
1390 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1391 slot_range.start,
1392 ));
1393 }
1394 log::info!(target: LOG_MODULE, "starting firehose...");
1395 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1396 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1397 let mut entry_notifier_maybe = None;
1398 let mut block_meta_notifier_maybe = None;
1399 let mut transaction_notifier_maybe = None;
1400 if let Some(geyser_config_files) = geyser_config_files {
1401 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1402
1403 let service =
1404 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1405 confirmed_bank_receiver.clone(),
1406 true,
1407 geyser_config_files,
1408 )
1409 .map_err(|e| (e.into(), slot_range.start))?;
1410
1411 transaction_notifier_maybe = Some(
1412 service
1413 .get_transaction_notifier()
1414 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1415 .map_err(|e| (e, slot_range.start))?,
1416 );
1417
1418 entry_notifier_maybe = service.get_entry_notifier();
1419 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1420
1421 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1422 }
1423
1424 if entry_notifier_maybe.is_some() {
1425 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1426 } else {
1427 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1428 }
1429 log::info!(target: LOG_MODULE, "running on_load...");
1430 rt.spawn(on_load);
1431
1432 let slot_range = Arc::new(slot_range);
1433 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1434 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1435 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1436 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1437
1438 let subranges = generate_subranges(&slot_range, threads);
1440 if threads > 1 {
1441 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1442 }
1443
1444 let mut handles = Vec::new();
1445 let error_counts: Arc<Vec<AtomicU32>> =
1447 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1448
1449 for (i, slot_range) in subranges.into_iter().enumerate() {
1450 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1451 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1452 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1453 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1454 let client = client.clone();
1455 let error_counts = error_counts.clone();
1456
1457 let rt_clone = rt.clone();
1458
1459 let handle = std::thread::spawn(move || {
1460 rt_clone.block_on(async {
1461 firehose_geyser_thread(
1462 slot_range,
1463 transaction_notifier_maybe,
1464 entry_notifier_maybe,
1465 block_meta_notifier_maybe,
1466 confirmed_bank_sender,
1467 &client,
1468 if threads > 1 { Some(i) } else { None },
1469 error_counts,
1470 )
1471 .await
1472 .unwrap();
1473 });
1474 });
1475 handles.push(handle);
1476 }
1477
1478 for handle in handles {
1480 handle.join().unwrap();
1481 }
1482 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1483 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1484 block_meta_notifier.notify_block_metadata(
1485 u64::MAX,
1486 "unload",
1487 u64::MAX,
1488 "unload",
1489 &KeyedRewardsAndNumPartitions {
1490 keyed_rewards: vec![],
1491 num_partitions: None,
1492 },
1493 None,
1494 None,
1495 0,
1496 0,
1497 );
1498 }
1499 Ok(confirmed_bank_receiver)
1500}
1501
1502#[allow(clippy::too_many_arguments)]
1503#[allow(clippy::result_large_err)]
1504async fn firehose_geyser_thread(
1505 mut slot_range: Range<u64>,
1506 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1507 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1508 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1509 confirmed_bank_sender: Sender<SlotNotification>,
1510 client: &Client,
1511 thread_index: Option<usize>,
1512 error_counts: Arc<Vec<AtomicU32>>,
1513) -> Result<(), (FirehoseError, u64)> {
1514 let start_time = std::time::Instant::now();
1515 let log_target = if let Some(thread_index) = thread_index {
1516 format!("{}::T{:03}", LOG_MODULE, thread_index)
1517 } else {
1518 LOG_MODULE.to_string()
1519 };
1520 let mut skip_until_index = None;
1521 while let Err((err, slot)) = async {
1523 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1524 log::info!(
1525 target: &log_target,
1526 "slot range: {} (epoch {}) ... {} (epoch {})",
1527 slot_range.start,
1528 slot_to_epoch(slot_range.start),
1529 slot_range.end,
1530 slot_to_epoch(slot_range.end)
1531 );
1532
1533 log::info!(target: &log_target, "🚒 starting firehose...");
1534
1535 let mut current_slot: Option<u64> = None;
1537 let mut previous_slot: Option<u64> = None;
1538 for epoch_num in epoch_range.clone() {
1539 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1540 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1541 Ok(stream) => stream,
1542 Err(_) => {
1543 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1544 }
1545 };
1546 let mut reader = NodeReader::new(stream);
1547
1548 let header_fut = reader.read_raw_header();
1549 let header = match timeout(OP_TIMEOUT, header_fut).await {
1550 Ok(res) => res
1551 .map_err(FirehoseError::ReadHeader)
1552 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1553 Err(_) => {
1554 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1555 }
1556 };
1557 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1558
1559 let mut todo_previous_blockhash = Hash::default();
1560 let mut todo_latest_entry_blockhash = Hash::default();
1561
1562 if slot_range.start > epoch_to_slot_range(epoch_num).0 {
1563 let seek_fut = reader.seek_to_slot(slot_range.start);
1564 match timeout(OP_TIMEOUT, seek_fut).await {
1565 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1566 Err(_) => {
1567 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1568 }
1569 }
1570 }
1571
1572 let mut item_index = 0;
1574 let mut displayed_skip_message = false;
1575 loop {
1576 let read_fut = reader.read_until_block();
1577 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1578 Ok(result) => result
1579 .map_err(FirehoseError::ReadUntilBlockError)
1580 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1581 Err(_) => {
1582 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1583 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.unwrap_or(slot_range.start)));
1584 }
1585 };
1586 if nodes.is_empty() {
1587 log::info!(
1588 target: &log_target,
1589 "reached end of epoch {}",
1590 epoch_num
1591 );
1592 break;
1593 }
1594 if let Some(last_node) = nodes.0.last()
1603 && !last_node.get_node().is_block() {
1604 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1605 break;
1606 }
1607 let block = nodes
1608 .get_block()
1609 .map_err(FirehoseError::GetBlockError)
1610 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1611 log::debug!(
1612 target: &log_target,
1613 "read {} items from epoch {}, now at slot {}",
1614 item_index,
1615 epoch_num,
1616 block.slot
1617 );
1618 let slot = block.slot;
1619 if slot >= slot_range.end {
1620 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1621 return Ok(());
1625 }
1626 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1627 if slot < slot_range.start {
1628 log::warn!(
1629 target: &log_target,
1630 "encountered slot {} before start of range {}, skipping",
1631 slot,
1632 slot_range.start
1633 );
1634 continue;
1635 }
1636 if let Some(previous_slot) = previous_slot
1637 && slot != previous_slot + 1 {
1638 }
1641 previous_slot = current_slot;
1642 current_slot = Some(slot);
1643 let mut entry_index: usize = 0;
1644 let mut this_block_executed_transaction_count: u64 = 0;
1645 let mut this_block_entry_count: u64 = 0;
1646 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1647
1648 nodes.each(|node_with_cid| -> Result<(), Box<dyn std::error::Error>> {
1649 item_index += 1;
1650 if let Some(skip) = skip_until_index {
1656 if item_index < skip {
1657 if !displayed_skip_message {
1658 log::info!(
1659 target: &log_target,
1660 "skipping until index {} (at {})",
1661 skip,
1662 item_index
1663 );
1664 displayed_skip_message = true;
1665 }
1666 return Ok(());
1667 } else {
1668 log::info!(
1669 target: &log_target,
1670 "reached target index {}, resuming...",
1671 skip
1672 );
1673 skip_until_index = None;
1674 }
1675 }
1676 let node = node_with_cid.get_node();
1677
1678 use crate::node::Node::*;
1679 match node {
1680 Transaction(tx) => {
1681 let versioned_tx = tx.as_parsed()?;
1682 let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1683
1684 let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1685
1686 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1687 prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1688 Box::new(std::io::Error::other(
1689 std::format!("Error decoding metadata: {:?}", err),
1690 ))
1691 })?;
1692
1693 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1694 metadata.try_into()?;
1695
1696 let message_hash = {
1697 #[cfg(feature = "verify-transaction-signatures")]
1698 {
1699 versioned_tx.verify_and_hash_message()?
1700 }
1701 #[cfg(not(feature = "verify-transaction-signatures"))]
1702 {
1703 versioned_tx.message.hash()
1706 }
1707 };
1708 let signature = versioned_tx
1709 .signatures
1710 .first()
1711 .ok_or_else(|| {
1712 Box::new(std::io::Error::new(
1713 std::io::ErrorKind::InvalidData,
1714 "transaction missing signature",
1715 )) as Box<dyn std::error::Error>
1716 })?;
1717 let is_vote = is_simple_vote_transaction(&versioned_tx);
1718
1719 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1720 transaction_notifier.notify_transaction(
1721 block.slot,
1722 tx.index.unwrap() as usize,
1723 signature,
1724 &message_hash,
1725 is_vote,
1726 &as_native_metadata,
1727 &versioned_tx,
1728 );
1729 }
1730
1731 }
1732 Entry(entry) => {
1733 let entry_hash = Hash::from(entry.hash.to_bytes());
1734 let entry_transaction_count = entry.transactions.len();
1735 let entry_transaction_count_u64 = entry_transaction_count as u64;
1736 let starting_transaction_index =
1737 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1738 Box::new(std::io::Error::other(
1739 "transaction index exceeds usize range",
1740 )) as Box<dyn std::error::Error>
1741 })?;
1742 todo_latest_entry_blockhash = entry_hash;
1743 this_block_executed_transaction_count += entry_transaction_count_u64;
1744 this_block_entry_count += 1;
1745 if entry_notifier_maybe.is_none() {
1746 return Ok(());
1747 }
1748 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1749 let entry_summary = solana_entry::entry::EntrySummary {
1750 num_hashes: entry.num_hashes,
1751 hash: Hash::from(entry.hash.to_bytes()),
1752 num_transactions: entry_transaction_count_u64,
1753 };
1754 entry_notifier.notify_entry(
1755 block.slot,
1756 entry_index,
1757 &entry_summary,
1758 starting_transaction_index,
1759 );
1760 entry_index += 1;
1761 }
1762 Block(block) => {
1763 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1764 confirmed_bank_sender.send(notification).unwrap();
1765
1766 if block_meta_notifier_maybe.is_none() {
1767 return Ok(());
1768 }
1769 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1770 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1771 block_meta_notifier.notify_block_metadata(
1772 block.meta.parent_slot,
1773 todo_previous_blockhash.to_string().as_str(),
1774 block.slot,
1775 todo_latest_entry_blockhash.to_string().as_str(),
1776 &KeyedRewardsAndNumPartitions {
1777 keyed_rewards,
1778 num_partitions: None,
1779 },
1780 Some(block.meta.blocktime as i64),
1781 block.meta.block_height,
1782 this_block_executed_transaction_count,
1783 this_block_entry_count,
1784 );
1785 todo_previous_blockhash = todo_latest_entry_blockhash;
1786 std::thread::yield_now();
1787 }
1788 Subset(_subset) => (),
1789 Epoch(_epoch) => (),
1790 Rewards(rewards) => {
1791 if !rewards.is_complete() {
1792 let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1793 let decompressed = utils::decompress_zstd(reassembled)?;
1794 let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1795 Box::new(std::io::Error::other(
1796 std::format!("Error decoding rewards: {:?}", err),
1797 ))
1798 })?;
1799 this_block_rewards = convert_proto_rewards(&decoded)?;
1800 }
1801 }
1802 DataFrame(_data_frame) => (),
1803 }
1804 Ok(())
1805 })
1806 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1807 if block.slot == slot_range.end - 1 {
1808 let finish_time = std::time::Instant::now();
1809 let elapsed = finish_time.duration_since(start_time);
1810 log::info!(target: &log_target, "processed slot {}", block.slot);
1811 let elapsed_pretty = human_readable_duration(elapsed);
1812 log::info!(
1813 target: &log_target,
1814 "processed {} slots across {} epochs in {}.",
1815 slot_range.end - slot_range.start,
1816 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1817 elapsed_pretty
1818 );
1819 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1820 let summary: String = error_counts
1823 .iter()
1824 .enumerate()
1825 .filter_map(|(i, c)| {
1826 let v = c.load(Ordering::Relaxed);
1827 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1828 })
1829 .collect::<Vec<_>>()
1830 .join(", ");
1831 if !summary.is_empty() {
1832 log::debug!(target: &log_target, "threads with errors: {}", summary);
1833 }
1834 return Ok(());
1835 }
1836 }
1837 }
1838 Ok(())
1839}
1840.await
1841{
1842 if is_shutdown_error(&err) {
1843 log::info!(
1844 target: &log_target,
1845 "shutdown requested; terminating firehose thread {:?}",
1846 thread_index
1847 );
1848 return Ok(());
1849 }
1850 log::error!(
1851 target: &log_target,
1852 "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1853 slot,
1854 slot_to_epoch(slot)
1855 );
1856 log::error!(target: &log_target, "{}", err);
1857 let item_index = match err {
1858 FirehoseError::NodeDecodingError(item_index, _) => item_index,
1859 _ => 0,
1860 };
1861 let idx = thread_index.unwrap_or(0);
1863 error_counts[idx].fetch_add(1, Ordering::Relaxed);
1864 log::warn!(
1865 target: &log_target,
1866 "restarting from slot {} at index {}",
1867 slot,
1868 item_index,
1869 );
1870 slot_range.start = slot;
1872 skip_until_index = Some(item_index);
1873 }
1874 Ok(())
1875}
1876
1877#[inline]
1878fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
1879 if !(1..=2).contains(&versioned_tx.signatures.len()) {
1880 return false;
1881 }
1882
1883 if !matches!(
1884 versioned_tx.version(),
1885 solana_transaction::versioned::TransactionVersion::Legacy(_)
1886 ) {
1887 return false;
1888 }
1889
1890 let instructions = versioned_tx.message.instructions();
1891 if instructions.len() != 1 {
1892 return false;
1893 }
1894
1895 let program_index = instructions[0].program_id_index as usize;
1896 versioned_tx
1897 .message
1898 .static_account_keys()
1899 .get(program_index)
1900 .map(|program_id| program_id == &vote_program_id())
1901 .unwrap_or(false)
1902}
1903
1904#[inline(always)]
1905fn convert_proto_rewards(
1906 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
1907) -> Result<Vec<(Address, RewardInfo)>, Box<dyn std::error::Error>> {
1908 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
1909 for proto_reward in proto_rewards.rewards.iter() {
1910 let reward = RewardInfo {
1911 reward_type: match proto_reward.reward_type - 1 {
1912 0 => RewardType::Fee,
1913 1 => RewardType::Rent,
1914 2 => RewardType::Staking,
1915 3 => RewardType::Voting,
1916 typ => {
1917 return Err(Box::new(std::io::Error::other(format!(
1918 "unsupported reward type {}",
1919 typ
1920 ))));
1921 }
1922 },
1923 lamports: proto_reward.lamports,
1924 post_balance: proto_reward.post_balance,
1925 commission: proto_reward.commission.parse::<u8>().ok(),
1926 };
1927 let pubkey = proto_reward
1928 .pubkey
1929 .parse::<Address>()
1930 .map_err(|err| Box::new(err) as Box<dyn std::error::Error>)?;
1931 keyed_rewards.push((pubkey, reward));
1932 }
1933 Ok(keyed_rewards)
1934}
1935
1936#[inline]
1937pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
1939 let total = slot_range.end - slot_range.start;
1940 let slots_per_thread = total / threads;
1941 let remainder = total % threads;
1942
1943 let ranges: Vec<Range<u64>> = (0..threads)
1944 .map(|i| {
1945 let extra_slot = if i < remainder { 1 } else { 0 };
1947 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
1948 let end = start + slots_per_thread + extra_slot;
1949 start..end
1950 })
1951 .collect();
1952
1953 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1955 assert_eq!(
1956 total_covered, total,
1957 "Range generation failed: {} threads should cover {} slots but only cover {}",
1958 threads, total, total_covered
1959 );
1960
1961 for i in 1..ranges.len() {
1963 assert_eq!(
1964 ranges[i - 1].end,
1965 ranges[i].start,
1966 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
1967 i - 1,
1968 ranges[i - 1].end,
1969 i,
1970 ranges[i].start
1971 );
1972 }
1973
1974 log::info!(
1975 target: LOG_MODULE,
1976 "Generated {} thread ranges covering {} slots total",
1977 threads,
1978 total_covered
1979 );
1980 ranges
1981}
1982
1983fn human_readable_duration(duration: std::time::Duration) -> String {
1984 if duration.is_zero() {
1985 return "0s".into();
1986 }
1987 let total_secs = duration.as_secs();
1988 if total_secs < 60 {
1989 let secs_f = duration.as_secs_f64();
1990 if total_secs == 0 {
1991 format!("{:.2}s", secs_f)
1992 } else if duration.subsec_millis() == 0 {
1993 format!("{}s", total_secs)
1994 } else {
1995 format!("{:.2}s", secs_f)
1996 }
1997 } else {
1998 let mut secs = total_secs;
1999 let days = secs / 86_400;
2000 secs %= 86_400;
2001 let hours = secs / 3_600;
2002 secs %= 3_600;
2003 let minutes = secs / 60;
2004 secs %= 60;
2005 if days > 0 {
2006 if hours > 0 {
2007 format!("{days}d{hours}h")
2008 } else {
2009 format!("{days}d")
2010 }
2011 } else if hours > 0 {
2012 if minutes > 0 {
2013 format!("{hours}h{minutes}m")
2014 } else {
2015 format!("{hours}h")
2016 }
2017 } else if minutes > 0 {
2018 if secs > 0 {
2019 format!("{minutes}m{secs}s")
2020 } else {
2021 format!("{minutes}m")
2022 }
2023 } else {
2024 format!("{secs}s")
2025 }
2026 }
2027}
2028
2029#[cfg(test)]
2030fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2031 Box::pin(async move {
2032 let elapsed = stats.start_time.elapsed();
2033 let elapsed_secs = elapsed.as_secs_f64();
2034 let tps = if elapsed_secs > 0.0 {
2035 stats.transactions_processed as f64 / elapsed_secs
2036 } else {
2037 0.0
2038 };
2039 log::info!(
2040 target: LOG_MODULE,
2041 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2042 stats.thread_stats.current_slot,
2043 stats.slots_processed,
2044 stats.blocks_processed,
2045 stats.transactions_processed,
2046 stats.entries_processed,
2047 stats.rewards_processed,
2048 elapsed_secs,
2049 tps
2050 );
2051 Ok(())
2052 })
2053}
2054
2055#[tokio::test(flavor = "multi_thread")]
2056async fn test_firehose_epoch_800() {
2057 use std::sync::atomic::{AtomicU64, Ordering};
2058 solana_logger::setup_with_default("info");
2059 const THREADS: usize = 4;
2060 const NUM_SLOTS_TO_COVER: u64 = 50;
2061 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2062 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2063 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2064 let stats_tracking = StatsTracking {
2065 on_stats: log_stats_handler,
2066 tracking_interval_slots: 10,
2067 };
2068
2069 firehose(
2070 THREADS.try_into().unwrap(),
2071 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2072 Some(|thread_id: usize, block: BlockData| {
2073 async move {
2074 let prev =
2075 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2076 if block.was_skipped() {
2077 log::info!(
2078 target: LOG_MODULE,
2079 "leader skipped block {} on thread {}",
2080 block.slot(),
2081 thread_id,
2082 );
2083 } else {
2084 }
2091
2092 if prev > 0 {
2093 assert_eq!(prev + 1, block.slot());
2094 }
2095 if block.was_skipped() {
2096 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2097 } else {
2098 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2099 }
2100 Ok(())
2101 }
2102 .boxed()
2103 }),
2104 None::<OnTxFn>,
2105 None::<OnEntryFn>,
2106 None::<OnRewardFn>,
2107 Some(stats_tracking),
2108 None,
2109 )
2110 .await
2111 .unwrap();
2112 assert_eq!(
2113 NUM_BLOCKS.load(Ordering::Relaxed) + NUM_SKIPPED_BLOCKS.load(Ordering::Relaxed),
2114 NUM_SLOTS_TO_COVER
2115 );
2116 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2117}