1use crossbeam_channel::{Receiver, Sender, unbounded};
2#[cfg(test)]
3use futures_util::FutureExt;
4use futures_util::future::BoxFuture;
5use reqwest::{Client, Url};
6use solana_geyser_plugin_manager::{
7 block_metadata_notifier_interface::BlockMetadataNotifier,
8 geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_ledger::entry_notifier_interface::EntryNotifier;
11use solana_reward_info::RewardInfo;
12use solana_rpc::{
13 optimistically_confirmed_bank_tracker::SlotNotification,
14 transaction_notifier_interface::TransactionNotifier,
15};
16use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
17use solana_sdk::{hash::Hash, pubkey::Pubkey, transaction::VersionedTransaction};
18use solana_vote_program::id as vote_program_id;
19use std::{
20 fmt::Display,
21 future::Future,
22 io,
23 ops::Range,
24 path::PathBuf,
25 sync::{
26 Arc,
27 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
28 },
29};
30use thiserror::Error;
31use tokio::{
32 sync::broadcast::{self, error::TryRecvError},
33 time::timeout,
34};
35
36use crate::{
37 LOG_MODULE,
38 epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
39 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
40 node_reader::NodeReader,
41 utils,
42};
43
44const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
48
49fn poll_shutdown(
50 flag: &Arc<std::sync::atomic::AtomicBool>,
51 receiver: &mut Option<broadcast::Receiver<()>>,
52) -> bool {
53 if let Some(rx) = receiver {
54 match rx.try_recv() {
55 Ok(_) | Err(TryRecvError::Lagged(_)) => {
56 flag.store(true, Ordering::SeqCst);
57 }
58 Err(TryRecvError::Closed) => {
59 flag.store(true, Ordering::SeqCst);
60 }
61 Err(TryRecvError::Empty) => {}
62 }
63 }
64 flag.load(Ordering::SeqCst)
65}
66
67fn is_shutdown_error(err: &FirehoseError) -> bool {
68 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
69 inner
70 .downcast_ref::<io::Error>()
71 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
72 .unwrap_or(false)
73 }
74
75 match err {
76 FirehoseError::BlockHandlerError(inner)
77 | FirehoseError::TransactionHandlerError(inner)
78 | FirehoseError::EntryHandlerError(inner)
79 | FirehoseError::RewardHandlerError(inner)
80 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
81 _ => false,
82 }
83}
84
85#[derive(Debug, Error)]
88pub enum FirehoseError {
89 Reqwest(reqwest::Error),
91 ReadHeader(Box<dyn std::error::Error>),
93 GeyserPluginService(GeyserPluginServiceError),
95 FailedToGetTransactionNotifier,
97 ReadUntilBlockError(Box<dyn std::error::Error>),
99 GetBlockError(Box<dyn std::error::Error>),
101 NodeDecodingError(usize, Box<dyn std::error::Error>),
103 SlotOffsetIndexError(SlotOffsetIndexError),
105 SeekToSlotError(Box<dyn std::error::Error>),
107 OnLoadError(Box<dyn std::error::Error>),
109 OnStatsHandlerError(Box<dyn std::error::Error>),
111 OperationTimeout(&'static str),
113 TransactionHandlerError(Box<dyn std::error::Error>),
115 EntryHandlerError(Box<dyn std::error::Error>),
117 RewardHandlerError(Box<dyn std::error::Error>),
119 BlockHandlerError(Box<dyn std::error::Error>),
121}
122
123impl Display for FirehoseError {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 match self {
126 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
127 FirehoseError::ReadHeader(error) => {
128 write!(f, "Error reading header: {}", error)
129 }
130 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
131 f,
132 "Error initializing geyser plugin service: {}",
133 geyser_plugin_service_error
134 ),
135 FirehoseError::FailedToGetTransactionNotifier => write!(
136 f,
137 "Failed to get transaction notifier from GeyserPluginService"
138 ),
139 FirehoseError::ReadUntilBlockError(error) => {
140 write!(f, "Error reading until block: {}", error)
141 }
142 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
143 FirehoseError::NodeDecodingError(item_index, error) => {
144 write!(
145 f,
146 "Error seeking, reading data from, or decoding data for data node {}: {}",
147 item_index, error
148 )
149 }
150 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
151 f,
152 "Error getting info from slot offset index: {}",
153 slot_offset_index_error
154 ),
155 FirehoseError::SeekToSlotError(error) => {
156 write!(f, "Error seeking to slot: {}", error)
157 }
158 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
159 FirehoseError::OnStatsHandlerError(error) => {
160 write!(f, "Stats handler error: {}", error)
161 }
162 FirehoseError::OperationTimeout(op) => {
163 write!(f, "Timeout while waiting for operation: {}", op)
164 }
165 FirehoseError::TransactionHandlerError(error) => {
166 write!(f, "Transaction handler error: {}", error)
167 }
168 FirehoseError::EntryHandlerError(error) => {
169 write!(f, "Entry handler error: {}", error)
170 }
171 FirehoseError::RewardHandlerError(error) => {
172 write!(f, "Reward handler error: {}", error)
173 }
174 FirehoseError::BlockHandlerError(error) => {
175 write!(f, "Block handler error: {}", error)
176 }
177 }
178 }
179}
180
181impl From<reqwest::Error> for FirehoseError {
182 fn from(e: reqwest::Error) -> Self {
183 FirehoseError::Reqwest(e)
184 }
185}
186
187impl From<GeyserPluginServiceError> for FirehoseError {
188 fn from(e: GeyserPluginServiceError) -> Self {
189 FirehoseError::GeyserPluginService(e)
190 }
191}
192
193impl From<SlotOffsetIndexError> for FirehoseError {
194 fn from(e: SlotOffsetIndexError) -> Self {
195 FirehoseError::SlotOffsetIndexError(e)
196 }
197}
198
199#[derive(Clone, PartialEq, Eq, Hash, Debug)]
201pub struct ThreadStats {
202 pub thread_id: usize,
204 pub start_time: std::time::Instant,
206 pub finish_time: Option<std::time::Instant>,
208 pub slot_range: Range<u64>,
210 pub current_slot: u64,
212 pub slots_processed: u64,
214 pub blocks_processed: u64,
216 pub leader_skipped_slots: u64,
218 pub transactions_processed: u64,
220 pub entries_processed: u64,
222 pub rewards_processed: u64,
224}
225
226#[derive(Clone, PartialEq, Eq, Hash, Debug)]
228pub struct Stats {
229 pub thread_stats: ThreadStats,
231 pub start_time: std::time::Instant,
233 pub finish_time: Option<std::time::Instant>,
235 pub slot_range: Range<u64>,
237 pub slots_processed: u64,
239 pub blocks_processed: u64,
241 pub leader_skipped_slots: u64,
243 pub transactions_processed: u64,
245 pub entries_processed: u64,
247 pub rewards_processed: u64,
249 pub transactions_since_last_pulse: u64,
251 pub blocks_since_last_pulse: u64,
253 pub slots_since_last_pulse: u64,
255 pub time_since_last_pulse: std::time::Duration,
257}
258
259#[derive(Clone, PartialEq, Eq, Hash, Debug)]
261pub struct StatsTracking<OnStats: Handler<Stats>> {
262 pub on_stats: OnStats,
264 pub tracking_interval_slots: u64,
266}
267
268#[inline(always)]
269#[allow(clippy::too_many_arguments)]
270async fn maybe_emit_stats<OnStats: Handler<Stats>>(
271 stats_tracking: Option<&StatsTracking<OnStats>>,
272 thread_index: usize,
273 thread_stats: &ThreadStats,
274 overall_slots_processed: &AtomicU64,
275 overall_blocks_processed: &AtomicU64,
276 overall_transactions_processed: &AtomicU64,
277 overall_entries_processed: &AtomicU64,
278 transactions_since_stats: &AtomicU64,
279 blocks_since_stats: &AtomicU64,
280 slots_since_stats: &AtomicU64,
281 last_pulse: &Arc<AtomicU64>,
282 base_instant: std::time::Instant,
283) -> Result<(), (FirehoseError, u64)> {
284 if let Some(stats_tracker) = stats_tracking {
285 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
286 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
287 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
288 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
289 let now_nanos = base_instant.elapsed().as_nanos() as u64;
290 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
291 let delta_nanos = now_nanos.saturating_sub(previous);
292 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
293 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
294 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
295 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
296
297 let stats = Stats {
298 thread_stats: thread_stats.clone(),
299 start_time: thread_stats.start_time,
300 finish_time: thread_stats.finish_time,
301 slot_range: thread_stats.slot_range.clone(),
302 slots_processed: total_slots,
303 blocks_processed: total_blocks,
304 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
305 transactions_processed: total_transactions,
306 entries_processed: total_entries,
307 rewards_processed: thread_stats.rewards_processed,
308 transactions_since_last_pulse: processed_transactions,
309 blocks_since_last_pulse: processed_blocks,
310 slots_since_last_pulse: processed_slots,
311 time_since_last_pulse,
312 };
313
314 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
315 last_pulse.store(previous, Ordering::Relaxed);
316 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
317 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
318 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
319 return Err((
320 FirehoseError::OnStatsHandlerError(e),
321 thread_stats.current_slot,
322 ));
323 }
324 }
325 Ok(())
326}
327
328#[inline(always)]
329fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
330 if tracking_enabled {
331 atomic.fetch_add(value, Ordering::Relaxed);
332 }
333}
334
335#[derive(Debug, Clone)]
337pub struct TransactionData {
338 pub slot: u64,
340 pub transaction_slot_index: usize,
342 pub signature: solana_sdk::signature::Signature,
344 pub message_hash: Hash,
346 pub is_vote: bool,
348 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
350 pub transaction: VersionedTransaction,
352}
353
354#[derive(Debug, Clone)]
356pub struct EntryData {
357 pub slot: u64,
359 pub entry_index: usize,
361 pub transaction_indexes: Range<usize>,
363 pub num_hashes: u64,
365 pub hash: Hash,
367}
368
369#[derive(Debug, Clone)]
371pub struct RewardsData {
372 pub slot: u64,
374 pub rewards: Vec<(Pubkey, RewardInfo)>,
376}
377
378#[derive(Debug)]
380pub enum BlockData {
381 Block {
383 parent_slot: u64,
385 parent_blockhash: Hash,
387 slot: u64,
389 blockhash: Hash,
391 rewards: KeyedRewardsAndNumPartitions,
393 block_time: Option<i64>,
395 block_height: Option<u64>,
397 executed_transaction_count: u64,
399 entry_count: u64,
401 },
402 LeaderSkipped {
404 slot: u64,
406 },
407}
408
409impl BlockData {
410 #[inline(always)]
412 pub const fn slot(&self) -> u64 {
413 match self {
414 BlockData::Block { slot, .. } => *slot,
415 BlockData::LeaderSkipped { slot } => *slot,
416 }
417 }
418
419 #[inline(always)]
421 pub const fn was_skipped(&self) -> bool {
422 matches!(self, BlockData::LeaderSkipped { .. })
423 }
424}
425
426type HandlerResult = Result<(), Box<dyn std::error::Error + Send + 'static>>;
427type HandlerFuture = BoxFuture<'static, HandlerResult>;
428
429pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
431
432impl<Data, F> Handler<Data> for F where
433 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
434{
435}
436
437pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
439pub type OnBlockFn = HandlerFn<BlockData>;
441pub type OnTxFn = HandlerFn<TransactionData>;
443pub type OnEntryFn = HandlerFn<EntryData>;
445pub type OnRewardFn = HandlerFn<RewardsData>;
447pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
449
450#[inline]
452#[allow(clippy::too_many_arguments)]
453pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats>(
454 threads: u64,
455 slot_range: Range<u64>,
456 on_block: Option<OnBlock>,
457 on_tx: Option<OnTransaction>,
458 on_entry: Option<OnEntry>,
459 on_rewards: Option<OnRewards>,
460 stats_tracking: Option<StatsTracking<OnStats>>,
461 shutdown_signal: Option<broadcast::Receiver<()>>,
462) -> Result<(), (FirehoseError, u64)>
463where
464 OnBlock: Handler<BlockData>,
465 OnTransaction: Handler<TransactionData>,
466 OnEntry: Handler<EntryData>,
467 OnRewards: Handler<RewardsData>,
468 OnStats: Handler<Stats>,
469{
470 if threads == 0 {
471 return Err((
472 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
473 slot_range.start,
474 ));
475 }
476 let client = Client::new();
477 log::info!(target: LOG_MODULE, "starting firehose...");
478 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
479
480 let slot_range = Arc::new(slot_range);
481
482 let subranges = generate_subranges(&slot_range, threads);
484 if threads > 1 {
485 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
486 }
487
488 let firehose_start = std::time::Instant::now();
489 let shutdown_flag = Arc::new(AtomicBool::new(false));
490 if let Some(ref rx) = shutdown_signal {
491 let mut rx = rx.resubscribe();
492 let flag = shutdown_flag.clone();
493 tokio::spawn(async move {
494 if rx.recv().await.is_ok() {
495 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
496 flag.store(true, Ordering::SeqCst);
497 }
498 });
499 }
500 let mut handles = Vec::new();
501 let error_counts: Arc<Vec<AtomicU32>> =
503 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
504
505 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
506 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
507 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
508 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
509
510 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
511 let error_counts = error_counts.clone();
512 let client = client.clone();
513 let on_block = on_block.clone();
514 let on_tx = on_tx.clone();
515 let on_entry = on_entry.clone();
516 let on_reward = on_rewards.clone();
517 let overall_slots_processed = overall_slots_processed.clone();
518 let overall_blocks_processed = overall_blocks_processed.clone();
519 let overall_transactions_processed = overall_transactions_processed.clone();
520 let overall_entries_processed = overall_entries_processed.clone();
521 let stats_tracking = stats_tracking.clone();
522 let transactions_since_stats = Arc::new(AtomicU64::new(0));
523 let blocks_since_stats = Arc::new(AtomicU64::new(0));
524 let slots_since_stats = Arc::new(AtomicU64::new(0));
525 let last_pulse = Arc::new(AtomicU64::new(0));
526 let transactions_since_stats_cloned = transactions_since_stats.clone();
527 let blocks_since_stats_cloned = blocks_since_stats.clone();
528 let slots_since_stats_cloned = slots_since_stats.clone();
529 let last_pulse_cloned = last_pulse.clone();
530 let shutdown_flag = shutdown_flag.clone();
531 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
532
533 let handle = tokio::spawn(async move {
534 let transactions_since_stats = transactions_since_stats_cloned;
535 let blocks_since_stats = blocks_since_stats_cloned;
536 let slots_since_stats = slots_since_stats_cloned;
537 let last_pulse = last_pulse_cloned;
538 let mut shutdown_rx = thread_shutdown_rx;
539 let start_time = std::time::Instant::now();
540 last_pulse.store(0, Ordering::Relaxed);
541 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
542 let mut skip_until_index = None;
543 let block_enabled = on_block.is_some();
544 let tx_enabled = on_tx.is_some();
545 let entry_enabled = on_entry.is_some();
546 let reward_enabled = on_reward.is_some();
547 let tracking_enabled = stats_tracking.is_some();
548 let mut last_counted_slot = slot_range.start.saturating_sub(1);
549
550 while let Err((err, slot)) = async {
552 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
553 log::info!(
554 target: &log_target,
555 "shutdown requested; terminating firehose thread {}",
556 thread_index
557 );
558 return Ok(());
559 }
560 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
561 log::info!(
562 target: &log_target,
563 "slot range: {} (epoch {}) ... {} (epoch {})",
564 slot_range.start,
565 slot_to_epoch(slot_range.start),
566 slot_range.end,
567 slot_to_epoch(slot_range.end)
568 );
569
570 log::info!(target: &log_target, "🚒 starting firehose...");
571
572 let mut current_slot: Option<u64> = None;
574 let mut previous_slot: Option<u64> = Some(slot_range.start.saturating_sub(1));
575 for epoch_num in epoch_range.clone() {
576 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
577 log::info!(
578 target: &log_target,
579 "shutdown requested; terminating firehose thread {}",
580 thread_index
581 );
582 return Ok(());
583 }
584 log::info!(target: &log_target, "entering epoch {}", epoch_num);
585 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
586 Ok(stream) => stream,
587 Err(_) => {
588 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
589 }
590 };
591 let mut reader = NodeReader::new(stream);
592
593 let header_fut = reader.read_raw_header();
594 let header = match timeout(OP_TIMEOUT, header_fut).await {
595 Ok(res) => res
596 .map_err(FirehoseError::ReadHeader)
597 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
598 Err(_) => {
599 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
600 }
601 };
602 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
603
604 let mut previous_blockhash = Hash::default();
605 let mut latest_entry_blockhash = Hash::default();
606
607 let mut thread_stats = if tracking_enabled {
608 Some(ThreadStats {
609 thread_id: thread_index,
610 start_time,
611 finish_time: None,
612 slot_range: slot_range.clone(),
613 current_slot: slot_range.start,
614 slots_processed: 0,
615 blocks_processed: 0,
616 leader_skipped_slots: 0,
617 transactions_processed: 0,
618 entries_processed: 0,
619 rewards_processed: 0,
620 })
621 } else {
622 None
623 };
624
625 if slot_range.start > epoch_to_slot_range(epoch_num).0 {
626 let seek_fut = reader.seek_to_slot(slot_range.start);
627 match timeout(OP_TIMEOUT, seek_fut).await {
628 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
629 Err(_) => {
630 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
631 }
632 }
633 }
634
635 let mut item_index = 0;
637 let mut displayed_skip_message = false;
638 loop {
639 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
640 log::info!(
641 target: &log_target,
642 "shutdown requested; terminating firehose thread {}",
643 thread_index
644 );
645 return Ok(());
646 }
647 let read_fut = reader.read_until_block();
648 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
649 Ok(result) => result
650 .map_err(FirehoseError::ReadUntilBlockError)
651 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
652 Err(_) => {
653 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
654 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
655 }
656 };
657 if nodes.is_empty() {
658 log::info!(
659 target: &log_target,
660 "reached end of epoch {}",
661 epoch_num
662 );
663 break;
664 }
665 if let Some(last_node) = nodes.0.last()
666 && !last_node.get_node().is_block()
667 {
668 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
669 break;
670 }
671 let block = nodes
672 .get_block()
673 .map_err(FirehoseError::GetBlockError)
674 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
675 log::debug!(
676 target: &log_target,
677 "read {} items from epoch {}, now at slot {}",
678 item_index,
679 epoch_num,
680 block.slot
681 );
682 let slot = block.slot;
683 if slot >= slot_range.end {
684 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
685 let target_last_slot = slot_range.end.saturating_sub(1);
690 let skip_start_from_previous = previous_slot
691 .map(|s| s.saturating_add(1))
692 .unwrap_or(slot_range.start);
693 let first_untracked_slot = last_counted_slot.saturating_add(1);
694 let skip_start = std::cmp::max(skip_start_from_previous, first_untracked_slot);
695
696 if skip_start <= target_last_slot {
697 if block_enabled
698 && let Some(on_block_cb) = on_block.as_ref() {
699 for skipped_slot in skip_start..=target_last_slot {
700 log::debug!(
701 target: &log_target,
702 "leader skipped slot {} (prev_counted {}, target {})",
703 skipped_slot,
704 last_counted_slot,
705 target_last_slot,
706 );
707 on_block_cb(
708 thread_index,
709 BlockData::LeaderSkipped { slot: skipped_slot },
710 )
711 .await
712 .map_err(|e| FirehoseError::BlockHandlerError(e))
713 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
714 }
715 }
716
717 let missing_slots = target_last_slot.saturating_sub(skip_start) + 1;
718 if tracking_enabled {
719 if let Some(ref mut stats) = thread_stats {
720 stats.leader_skipped_slots += missing_slots;
721 stats.slots_processed += missing_slots;
722 stats.current_slot = target_last_slot;
723 }
724 overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
725 slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
726 }
727 last_counted_slot = target_last_slot;
728 }
729
730 return Ok(());
731 }
732 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
733 if slot < slot_range.start {
734 log::warn!(
735 target: &log_target,
736 "encountered slot {} before start of range {}, skipping",
737 slot,
738 slot_range.start
739 );
740 continue;
741 }
742 if current_slot.is_some() {
743 previous_slot = current_slot;
744 }
745 current_slot = Some(slot);
746 let mut entry_index: usize = 0;
747 let mut this_block_executed_transaction_count: u64 = 0;
748 let mut this_block_entry_count: u64 = 0;
749 let mut this_block_rewards: Vec<(Pubkey, RewardInfo)> = Vec::new();
750
751 for node_with_cid in &nodes.0 {
752 item_index += 1;
753 if let Some(skip) = skip_until_index {
754 if item_index < skip {
755 if !displayed_skip_message {
756 log::info!(
757 target: &log_target,
758 "skipping until index {} (at {})",
759 skip,
760 item_index
761 );
762 displayed_skip_message = true;
763 }
764 continue;
765 } else {
766 log::info!(
767 target: &log_target,
768 "reached target index {}, resuming...",
769 skip
770 );
771 skip_until_index = None;
772 }
773 }
774 let node = node_with_cid.get_node();
775
776 if let Some(ref mut stats) = thread_stats {
777 stats.current_slot = slot;
778 }
779
780 let error_slot = current_slot.unwrap_or(slot_range.start);
781
782 use crate::node::Node::*;
783 match node {
784 Transaction(tx) => {
785 if tx_enabled
786 && let Some(on_tx_cb) = on_tx.as_ref()
787 {
788 let error_slot = current_slot.unwrap_or(slot_range.start);
789 let versioned_tx = tx.as_parsed().map_err(|err| {
790 (
791 FirehoseError::NodeDecodingError(item_index, err),
792 error_slot,
793 )
794 })?;
795 let reassembled_metadata = nodes
796 .reassemble_dataframes(tx.metadata.clone())
797 .map_err(|err| {
798 (
799 FirehoseError::NodeDecodingError(item_index, err),
800 error_slot,
801 )
802 })?;
803
804 let decompressed =
805 utils::decompress_zstd(reassembled_metadata.clone())
806 .map_err(|err| {
807 (
808 FirehoseError::NodeDecodingError(
809 item_index,
810 err,
811 ),
812 error_slot,
813 )
814 })?;
815
816 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
817 prost_011::Message::decode(decompressed.as_slice())
818 .map_err(|err| {
819 (
820 FirehoseError::NodeDecodingError(
821 item_index,
822 Box::new(err),
823 ),
824 error_slot,
825 )
826 })?;
827
828 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
829 metadata.try_into().map_err(|err| {
830 (
831 FirehoseError::NodeDecodingError(
832 item_index,
833 Box::new(err),
834 ),
835 error_slot,
836 )
837 })?;
838
839 let message_hash = {
840 #[cfg(feature = "verify-transaction-signatures")]
841 {
842 versioned_tx.verify_and_hash_message().map_err(|err| {
843 (
844 FirehoseError::TransactionHandlerError(Box::new(err)),
845 error_slot,
846 )
847 })?
848 }
849 #[cfg(not(feature = "verify-transaction-signatures"))]
850 {
851 versioned_tx.message.hash()
852 }
853 };
854 let signature = versioned_tx
855 .signatures
856 .first()
857 .ok_or_else(|| {
858 Box::new(std::io::Error::new(
859 std::io::ErrorKind::InvalidData,
860 "transaction missing signature",
861 )) as Box<dyn std::error::Error>
862 })
863 .map_err(|err| {
864 (
865 FirehoseError::NodeDecodingError(
866 item_index,
867 err,
868 ),
869 error_slot,
870 )
871 })?;
872 let is_vote = is_simple_vote_transaction(&versioned_tx);
873
874 on_tx_cb(
875 thread_index,
876 TransactionData {
877 slot: block.slot,
878 transaction_slot_index: tx.index.unwrap() as usize,
879 signature: *signature,
880 message_hash,
881 is_vote,
882 transaction_status_meta: as_native_metadata.clone(),
883 transaction: versioned_tx.clone(),
884 },
885 )
886 .await
887 .map_err(|e| {
888 (
889 FirehoseError::TransactionHandlerError(e),
890 error_slot,
891 )
892 })?;
893 }
894 fetch_add_if(
895 tracking_enabled,
896 &overall_transactions_processed,
897 1,
898 );
899 if let Some(ref mut stats) = thread_stats {
900 stats.transactions_processed += 1;
901 }
902 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
903 }
904 Entry(entry) => {
905 let entry_hash = Hash::from(entry.hash.to_bytes());
906 let entry_transaction_count = entry.transactions.len();
907 let entry_transaction_count_u64 = entry_transaction_count as u64;
908 let starting_transaction_index_u64 =
909 this_block_executed_transaction_count;
910 latest_entry_blockhash = entry_hash;
911 this_block_executed_transaction_count += entry_transaction_count_u64;
912 this_block_entry_count += 1;
913
914 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
915 let starting_transaction_index = usize::try_from(
916 starting_transaction_index_u64,
917 )
918 .map_err(|err| {
919 (
920 FirehoseError::EntryHandlerError(Box::new(err)),
921 error_slot,
922 )
923 })?;
924 let transaction_indexes_end =
925 starting_transaction_index + entry_transaction_count;
926 on_entry_cb(
927 thread_index,
928 EntryData {
929 slot: block.slot,
930 entry_index,
931 transaction_indexes: starting_transaction_index
932 ..transaction_indexes_end,
933 num_hashes: entry.num_hashes,
934 hash: entry_hash,
935 },
936 )
937 .await
938 .map_err(|e| {
939 (
940 FirehoseError::EntryHandlerError(e),
941 error_slot,
942 )
943 })?;
944 }
945 entry_index += 1;
946 fetch_add_if(
947 tracking_enabled,
948 &overall_entries_processed,
949 1,
950 );
951 if let Some(ref mut stats) = thread_stats {
952 stats.entries_processed += 1;
953 }
954 }
955 Block(block) => {
956 let prev_last_counted_slot = last_counted_slot;
957 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
958 (
959 stats.slots_processed,
960 stats.blocks_processed,
961 stats.leader_skipped_slots,
962 stats.current_slot,
963 )
964 });
965
966 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
967 let skip_start_from_previous = previous_slot
968 .map(|s| s.saturating_add(1))
969 .unwrap_or(next_expected_slot);
970 let skip_start = skip_start_from_previous.max(next_expected_slot);
971
972 for skipped_slot in skip_start..slot {
973 log::debug!(
974 target: &log_target,
975 "leader skipped slot {} (prev_counted {}, current slot {})",
976 skipped_slot,
977 prev_last_counted_slot,
978 slot,
979 );
980 if block_enabled
981 && let Some(on_block_cb) = on_block.as_ref() {
982 on_block_cb(
983 thread_index,
984 BlockData::LeaderSkipped {
985 slot: skipped_slot,
986 },
987 )
988 .await
989 .map_err(|e| {
990 (
991 FirehoseError::BlockHandlerError(e),
992 error_slot,
993 )
994 })?;
995 }
996 if tracking_enabled {
997 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
998 slots_since_stats.fetch_add(1, Ordering::Relaxed);
999 if let Some(ref mut stats) = thread_stats {
1000 stats.leader_skipped_slots += 1;
1001 stats.slots_processed += 1;
1002 stats.current_slot = skipped_slot;
1003 }
1004 }
1005 last_counted_slot = skipped_slot;
1006 }
1007
1008 if slot <= last_counted_slot {
1009 log::debug!(
1010 target: &log_target,
1011 "duplicate block {}, already counted (last_counted={})",
1012 slot,
1013 last_counted_slot,
1014 );
1015 this_block_rewards.clear();
1016 continue;
1017 }
1018
1019 if block_enabled {
1020 if let Some(on_block_cb) = on_block.as_ref() {
1021 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1022 on_block_cb(
1023 thread_index,
1024 BlockData::Block {
1025 parent_slot: block.meta.parent_slot,
1026 parent_blockhash: previous_blockhash,
1027 slot: block.slot,
1028 blockhash: latest_entry_blockhash,
1029 rewards: KeyedRewardsAndNumPartitions {
1030 keyed_rewards,
1031 num_partitions: None,
1032 },
1033 block_time: Some(block.meta.blocktime as i64),
1034 block_height: block.meta.block_height,
1035 executed_transaction_count:
1036 this_block_executed_transaction_count,
1037 entry_count: this_block_entry_count,
1038 },
1039 )
1040 .await
1041 .map_err(|e| {
1042 (
1043 FirehoseError::BlockHandlerError(e),
1044 error_slot,
1045 )
1046 })?;
1047 }
1048 } else {
1049 this_block_rewards.clear();
1050 }
1051 previous_blockhash = latest_entry_blockhash;
1052
1053 if tracking_enabled {
1054 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1055 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1056 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1057 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1058 if let Some(ref mut stats) = thread_stats {
1059 stats.blocks_processed += 1;
1060 stats.slots_processed += 1;
1061 stats.current_slot = slot;
1062 }
1063
1064 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1065 (&stats_tracking, thread_stats.as_mut())
1066 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1067 && let Err(err) = maybe_emit_stats(
1068 stats_tracking.as_ref(),
1069 thread_index,
1070 thread_stats_ref,
1071 &overall_slots_processed,
1072 &overall_blocks_processed,
1073 &overall_transactions_processed,
1074 &overall_entries_processed,
1075 &transactions_since_stats,
1076 &blocks_since_stats,
1077 &slots_since_stats,
1078 &last_pulse,
1079 start_time,
1080 )
1081 .await
1082 {
1083 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1084 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1085 overall_blocks_processed
1086 .fetch_sub(1, Ordering::Relaxed);
1087 overall_slots_processed
1088 .fetch_sub(1, Ordering::Relaxed);
1089 if let Some((
1090 prev_slots_processed,
1091 prev_blocks_processed,
1092 prev_leader_skipped,
1093 prev_current_slot,
1094 )) = thread_stats_snapshot
1095 {
1096 thread_stats_ref.slots_processed =
1097 prev_slots_processed;
1098 thread_stats_ref.blocks_processed =
1099 prev_blocks_processed;
1100 thread_stats_ref.leader_skipped_slots =
1101 prev_leader_skipped;
1102 thread_stats_ref.current_slot =
1103 prev_current_slot;
1104 }
1105 last_counted_slot = prev_last_counted_slot;
1106 return Err(err);
1107 }
1108 }
1109
1110 last_counted_slot = slot;
1111 }
1112 Subset(_subset) => (),
1113 Epoch(_epoch) => (),
1114 Rewards(rewards) => {
1115 if reward_enabled || block_enabled {
1116 let reassembled = nodes
1117 .reassemble_dataframes(rewards.data.clone())
1118 .map_err(|err| {
1119 (
1120 FirehoseError::NodeDecodingError(item_index, err),
1121 current_slot.unwrap_or(slot_range.start),
1122 )
1123 })?;
1124 if reassembled.is_empty() {
1125 this_block_rewards.clear();
1126 if reward_enabled
1127 && let Some(on_reward_cb) = on_reward.as_ref()
1128 {
1129 on_reward_cb(
1130 thread_index,
1131 RewardsData {
1132 slot: block.slot,
1133 rewards: Vec::new(),
1134 },
1135 )
1136 .await
1137 .map_err(|e| {
1138 (
1139 FirehoseError::RewardHandlerError(e),
1140 error_slot,
1141 )
1142 })?;
1143 }
1144 continue;
1145 }
1146
1147 let decompressed = utils::decompress_zstd(reassembled)
1148 .map_err(|err| {
1149 (
1150 FirehoseError::NodeDecodingError(
1151 item_index,
1152 err,
1153 ),
1154 error_slot,
1155 )
1156 })?;
1157
1158 let decoded =
1159 prost_011::Message::decode(decompressed.as_slice())
1160 .map_err(|err| {
1161 (
1162 FirehoseError::NodeDecodingError(
1163 item_index,
1164 Box::new(err),
1165 ),
1166 error_slot,
1167 )
1168 })?;
1169 let keyed_rewards = convert_proto_rewards(&decoded)
1170 .map_err(|err| {
1171 (
1172 FirehoseError::NodeDecodingError(item_index, err),
1173 error_slot,
1174 )
1175 })?;
1176 if reward_enabled
1177 && let Some(on_reward_cb) = on_reward.as_ref()
1178 {
1179 on_reward_cb(
1180 thread_index,
1181 RewardsData {
1182 slot: block.slot,
1183 rewards: keyed_rewards.clone(),
1184 },
1185 )
1186 .await
1187 .map_err(|e| {
1188 (
1189 FirehoseError::RewardHandlerError(e),
1190 error_slot,
1191 )
1192 })?;
1193 }
1194 this_block_rewards = keyed_rewards;
1195 if let Some(ref mut stats) = thread_stats {
1196 stats.rewards_processed +=
1197 this_block_rewards.len() as u64;
1198 }
1199 }
1200 }
1201 DataFrame(_data_frame) => (),
1202 }
1203 }
1204 if block.slot == slot_range.end - 1 {
1205 let finish_time = std::time::Instant::now();
1206 let elapsed = finish_time.duration_since(start_time);
1207 log::info!(target: &log_target, "processed slot {}", block.slot);
1208 let elapsed_pretty = human_readable_duration(elapsed);
1209 log::info!(
1210 target: &log_target,
1211 "processed {} slots across {} epochs in {}.",
1212 slot_range.end - slot_range.start,
1213 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1214 elapsed_pretty
1215 );
1216 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1217 let summary: String = error_counts
1220 .iter()
1221 .enumerate()
1222 .filter_map(|(i, c)| {
1223 let v = c.load(Ordering::Relaxed);
1224 if v > 0 {
1225 Some(format!("{:03}({})", i, v))
1226 } else {
1227 None
1228 }
1229 })
1230 .collect::<Vec<_>>()
1231 .join(", ");
1232 if !summary.is_empty() {
1233 log::debug!(target: &log_target, "threads with errors: {}", summary);
1234 }
1235 return Ok(());
1236 }
1237 }
1238 if tracking_enabled
1239 && let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1240 && last_counted_slot < expected_last_slot {
1241 let flush_start = last_counted_slot.saturating_add(1);
1242 if block_enabled
1243 && let Some(on_block_cb) = on_block.as_ref() {
1244 let error_slot = current_slot.unwrap_or(slot_range.start);
1245 for skipped_slot in flush_start..=expected_last_slot {
1246 log::debug!(
1247 target: &log_target,
1248 "leader skipped slot {} during final flush (prev_counted {})",
1249 skipped_slot,
1250 last_counted_slot,
1251 );
1252 on_block_cb(
1253 thread_index,
1254 BlockData::LeaderSkipped { slot: skipped_slot },
1255 )
1256 .await
1257 .map_err(|e| FirehoseError::BlockHandlerError(e))
1258 .map_err(|e| (e, error_slot))?;
1259 }
1260 }
1261 let missing_slots = expected_last_slot.saturating_sub(last_counted_slot);
1262 if let Some(stats_ref) = thread_stats.as_mut() {
1263 stats_ref.leader_skipped_slots += missing_slots;
1264 stats_ref.slots_processed += missing_slots;
1265 stats_ref.current_slot = expected_last_slot;
1266 }
1267 overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
1268 slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
1269 last_counted_slot = expected_last_slot;
1270 }
1271 if let Some(ref mut stats) = thread_stats {
1272 stats.finish_time = Some(std::time::Instant::now());
1273 maybe_emit_stats(
1274 stats_tracking.as_ref(),
1275 thread_index,
1276 stats,
1277 &overall_slots_processed,
1278 &overall_blocks_processed,
1279 &overall_transactions_processed,
1280 &overall_entries_processed,
1281 &transactions_since_stats,
1282 &blocks_since_stats,
1283 &slots_since_stats,
1284 &last_pulse,
1285 start_time,
1286 )
1287 .await?;
1288 }
1289 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1290 }
1291 Ok(())
1292 }
1293 .await
1294 {
1295 if is_shutdown_error(&err) {
1296 log::info!(
1297 target: &log_target,
1298 "shutdown requested; terminating firehose thread {}",
1299 thread_index
1300 );
1301 break;
1302 }
1303 log::error!(
1304 target: &log_target,
1305 "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1306 slot,
1307 slot_to_epoch(slot)
1308 );
1309 log::error!(target: &log_target, "{}", err);
1310 let item_index = match err {
1311 FirehoseError::NodeDecodingError(item_index, _) => item_index,
1312 _ => 0,
1313 };
1314 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1316 log::warn!(
1317 target: &log_target,
1318 "restarting from slot {} at index {}",
1319 slot,
1320 item_index,
1321 );
1322 slot_range.start = slot;
1324 skip_until_index = Some(item_index);
1325 }
1326 });
1327 handles.push(handle);
1328 }
1329
1330 for handle in handles {
1332 handle.await.unwrap();
1333 }
1334 if stats_tracking.is_some() {
1335 let elapsed = firehose_start.elapsed();
1336 let elapsed_secs = elapsed.as_secs_f64();
1337 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1338 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1339 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1340 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1341 let total_errors: u64 = error_counts
1342 .iter()
1343 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1344 .sum();
1345 let overall_tps = if elapsed_secs > 0.0 {
1346 total_transactions as f64 / elapsed_secs
1347 } else {
1348 0.0
1349 };
1350 log::info!(
1351 target: LOG_MODULE,
1352 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1353 elapsed_secs,
1354 total_slots,
1355 total_blocks,
1356 total_leader_skipped,
1357 total_transactions,
1358 overall_tps,
1359 total_errors
1360 );
1361 }
1362 if shutdown_flag.load(Ordering::SeqCst) {
1363 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1364 } else {
1365 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1366 }
1367 Ok(())
1368}
1369
1370#[allow(clippy::result_large_err)]
1371pub fn firehose_geyser(
1376 rt: Arc<tokio::runtime::Runtime>,
1377 slot_range: Range<u64>,
1378 geyser_config_files: Option<&[PathBuf]>,
1379 index_base_url: &Url,
1380 client: &Client,
1381 on_load: impl Future<Output = Result<(), Box<dyn std::error::Error + Send + 'static>>>
1382 + Send
1383 + 'static,
1384 threads: u64,
1385) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1386 if threads == 0 {
1387 return Err((
1388 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1389 slot_range.start,
1390 ));
1391 }
1392 log::info!(target: LOG_MODULE, "starting firehose...");
1393 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1394 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1395 let mut entry_notifier_maybe = None;
1396 let mut block_meta_notifier_maybe = None;
1397 let mut transaction_notifier_maybe = None;
1398 if let Some(geyser_config_files) = geyser_config_files {
1399 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1400
1401 let service =
1402 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1403 confirmed_bank_receiver.clone(),
1404 true,
1405 geyser_config_files,
1406 )
1407 .map_err(|e| (e.into(), slot_range.start))?;
1408
1409 transaction_notifier_maybe = Some(
1410 service
1411 .get_transaction_notifier()
1412 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1413 .map_err(|e| (e, slot_range.start))?,
1414 );
1415
1416 entry_notifier_maybe = service.get_entry_notifier();
1417 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1418
1419 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1420 }
1421
1422 if entry_notifier_maybe.is_some() {
1423 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1424 } else {
1425 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1426 }
1427 log::info!(target: LOG_MODULE, "running on_load...");
1428 rt.spawn(on_load);
1429
1430 let slot_range = Arc::new(slot_range);
1431 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1432 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1433 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1434 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1435
1436 let subranges = generate_subranges(&slot_range, threads);
1438 if threads > 1 {
1439 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1440 }
1441
1442 let mut handles = Vec::new();
1443 let error_counts: Arc<Vec<AtomicU32>> =
1445 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1446
1447 for (i, slot_range) in subranges.into_iter().enumerate() {
1448 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1449 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1450 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1451 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1452 let client = client.clone();
1453 let error_counts = error_counts.clone();
1454
1455 let rt_clone = rt.clone();
1456
1457 let handle = std::thread::spawn(move || {
1458 rt_clone.block_on(async {
1459 firehose_geyser_thread(
1460 slot_range,
1461 transaction_notifier_maybe,
1462 entry_notifier_maybe,
1463 block_meta_notifier_maybe,
1464 confirmed_bank_sender,
1465 &client,
1466 if threads > 1 { Some(i) } else { None },
1467 error_counts,
1468 )
1469 .await
1470 .unwrap();
1471 });
1472 });
1473 handles.push(handle);
1474 }
1475
1476 for handle in handles {
1478 handle.join().unwrap();
1479 }
1480 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1481 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1482 block_meta_notifier.notify_block_metadata(
1483 u64::MAX,
1484 "unload",
1485 u64::MAX,
1486 "unload",
1487 &KeyedRewardsAndNumPartitions {
1488 keyed_rewards: vec![],
1489 num_partitions: None,
1490 },
1491 None,
1492 None,
1493 0,
1494 0,
1495 );
1496 }
1497 Ok(confirmed_bank_receiver)
1498}
1499
1500#[allow(clippy::too_many_arguments)]
1501#[allow(clippy::result_large_err)]
1502async fn firehose_geyser_thread(
1503 mut slot_range: Range<u64>,
1504 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1505 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1506 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1507 confirmed_bank_sender: Sender<SlotNotification>,
1508 client: &Client,
1509 thread_index: Option<usize>,
1510 error_counts: Arc<Vec<AtomicU32>>,
1511) -> Result<(), (FirehoseError, u64)> {
1512 let start_time = std::time::Instant::now();
1513 let log_target = if let Some(thread_index) = thread_index {
1514 format!("{}::T{:03}", LOG_MODULE, thread_index)
1515 } else {
1516 LOG_MODULE.to_string()
1517 };
1518 let mut skip_until_index = None;
1519 while let Err((err, slot)) = async {
1521 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1522 log::info!(
1523 target: &log_target,
1524 "slot range: {} (epoch {}) ... {} (epoch {})",
1525 slot_range.start,
1526 slot_to_epoch(slot_range.start),
1527 slot_range.end,
1528 slot_to_epoch(slot_range.end)
1529 );
1530
1531 log::info!(target: &log_target, "🚒 starting firehose...");
1532
1533 let mut current_slot: Option<u64> = None;
1535 let mut previous_slot: Option<u64> = None;
1536 for epoch_num in epoch_range.clone() {
1537 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1538 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1539 Ok(stream) => stream,
1540 Err(_) => {
1541 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1542 }
1543 };
1544 let mut reader = NodeReader::new(stream);
1545
1546 let header_fut = reader.read_raw_header();
1547 let header = match timeout(OP_TIMEOUT, header_fut).await {
1548 Ok(res) => res
1549 .map_err(FirehoseError::ReadHeader)
1550 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1551 Err(_) => {
1552 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1553 }
1554 };
1555 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1556
1557 let mut todo_previous_blockhash = Hash::default();
1558 let mut todo_latest_entry_blockhash = Hash::default();
1559
1560 if slot_range.start > epoch_to_slot_range(epoch_num).0 {
1561 let seek_fut = reader.seek_to_slot(slot_range.start);
1562 match timeout(OP_TIMEOUT, seek_fut).await {
1563 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1564 Err(_) => {
1565 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1566 }
1567 }
1568 }
1569
1570 let mut item_index = 0;
1572 let mut displayed_skip_message = false;
1573 loop {
1574 let read_fut = reader.read_until_block();
1575 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1576 Ok(result) => result
1577 .map_err(FirehoseError::ReadUntilBlockError)
1578 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1579 Err(_) => {
1580 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1581 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.unwrap_or(slot_range.start)));
1582 }
1583 };
1584 if nodes.is_empty() {
1585 log::info!(
1586 target: &log_target,
1587 "reached end of epoch {}",
1588 epoch_num
1589 );
1590 break;
1591 }
1592 if let Some(last_node) = nodes.0.last()
1601 && !last_node.get_node().is_block() {
1602 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1603 break;
1604 }
1605 let block = nodes
1606 .get_block()
1607 .map_err(FirehoseError::GetBlockError)
1608 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1609 log::debug!(
1610 target: &log_target,
1611 "read {} items from epoch {}, now at slot {}",
1612 item_index,
1613 epoch_num,
1614 block.slot
1615 );
1616 let slot = block.slot;
1617 if slot >= slot_range.end {
1618 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1619 return Ok(());
1623 }
1624 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1625 if slot < slot_range.start {
1626 log::warn!(
1627 target: &log_target,
1628 "encountered slot {} before start of range {}, skipping",
1629 slot,
1630 slot_range.start
1631 );
1632 continue;
1633 }
1634 if let Some(previous_slot) = previous_slot
1635 && slot != previous_slot + 1 {
1636 }
1639 previous_slot = current_slot;
1640 current_slot = Some(slot);
1641 let mut entry_index: usize = 0;
1642 let mut this_block_executed_transaction_count: u64 = 0;
1643 let mut this_block_entry_count: u64 = 0;
1644 let mut this_block_rewards: Vec<(Pubkey, RewardInfo)> = Vec::new();
1645
1646 nodes.each(|node_with_cid| -> Result<(), Box<dyn std::error::Error>> {
1647 item_index += 1;
1648 if let Some(skip) = skip_until_index {
1654 if item_index < skip {
1655 if !displayed_skip_message {
1656 log::info!(
1657 target: &log_target,
1658 "skipping until index {} (at {})",
1659 skip,
1660 item_index
1661 );
1662 displayed_skip_message = true;
1663 }
1664 return Ok(());
1665 } else {
1666 log::info!(
1667 target: &log_target,
1668 "reached target index {}, resuming...",
1669 skip
1670 );
1671 skip_until_index = None;
1672 }
1673 }
1674 let node = node_with_cid.get_node();
1675
1676 use crate::node::Node::*;
1677 match node {
1678 Transaction(tx) => {
1679 let versioned_tx = tx.as_parsed()?;
1680 let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1681
1682 let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1683
1684 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1685 prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1686 Box::new(std::io::Error::other(
1687 std::format!("Error decoding metadata: {:?}", err),
1688 ))
1689 })?;
1690
1691 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1692 metadata.try_into()?;
1693
1694 let message_hash = {
1695 #[cfg(feature = "verify-transaction-signatures")]
1696 {
1697 versioned_tx.verify_and_hash_message()?
1698 }
1699 #[cfg(not(feature = "verify-transaction-signatures"))]
1700 {
1701 versioned_tx.message.hash()
1704 }
1705 };
1706 let signature = versioned_tx
1707 .signatures
1708 .first()
1709 .ok_or_else(|| {
1710 Box::new(std::io::Error::new(
1711 std::io::ErrorKind::InvalidData,
1712 "transaction missing signature",
1713 )) as Box<dyn std::error::Error>
1714 })?;
1715 let is_vote = is_simple_vote_transaction(&versioned_tx);
1716
1717 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1718 transaction_notifier.notify_transaction(
1719 block.slot,
1720 tx.index.unwrap() as usize,
1721 signature,
1722 &message_hash,
1723 is_vote,
1724 &as_native_metadata,
1725 &versioned_tx,
1726 );
1727 }
1728
1729 }
1730 Entry(entry) => {
1731 let entry_hash = Hash::from(entry.hash.to_bytes());
1732 let entry_transaction_count = entry.transactions.len();
1733 let entry_transaction_count_u64 = entry_transaction_count as u64;
1734 let starting_transaction_index =
1735 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1736 Box::new(std::io::Error::other(
1737 "transaction index exceeds usize range",
1738 )) as Box<dyn std::error::Error>
1739 })?;
1740 todo_latest_entry_blockhash = entry_hash;
1741 this_block_executed_transaction_count += entry_transaction_count_u64;
1742 this_block_entry_count += 1;
1743 if entry_notifier_maybe.is_none() {
1744 return Ok(());
1745 }
1746 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1747 let entry_summary = solana_entry::entry::EntrySummary {
1748 num_hashes: entry.num_hashes,
1749 hash: Hash::from(entry.hash.to_bytes()),
1750 num_transactions: entry_transaction_count_u64,
1751 };
1752 entry_notifier.notify_entry(
1753 block.slot,
1754 entry_index,
1755 &entry_summary,
1756 starting_transaction_index,
1757 );
1758 entry_index += 1;
1759 }
1760 Block(block) => {
1761 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1762 confirmed_bank_sender.send(notification).unwrap();
1763
1764 if block_meta_notifier_maybe.is_none() {
1765 return Ok(());
1766 }
1767 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1768 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1769 block_meta_notifier.notify_block_metadata(
1770 block.meta.parent_slot,
1771 todo_previous_blockhash.to_string().as_str(),
1772 block.slot,
1773 todo_latest_entry_blockhash.to_string().as_str(),
1774 &KeyedRewardsAndNumPartitions {
1775 keyed_rewards,
1776 num_partitions: None,
1777 },
1778 Some(block.meta.blocktime as i64),
1779 block.meta.block_height,
1780 this_block_executed_transaction_count,
1781 this_block_entry_count,
1782 );
1783 todo_previous_blockhash = todo_latest_entry_blockhash;
1784 std::thread::yield_now();
1785 }
1786 Subset(_subset) => (),
1787 Epoch(_epoch) => (),
1788 Rewards(rewards) => {
1789 if !rewards.is_complete() {
1790 let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1791 let decompressed = utils::decompress_zstd(reassembled)?;
1792 let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1793 Box::new(std::io::Error::other(
1794 std::format!("Error decoding rewards: {:?}", err),
1795 ))
1796 })?;
1797 this_block_rewards = convert_proto_rewards(&decoded)?;
1798 }
1799 }
1800 DataFrame(_data_frame) => (),
1801 }
1802 Ok(())
1803 })
1804 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1805 if block.slot == slot_range.end - 1 {
1806 let finish_time = std::time::Instant::now();
1807 let elapsed = finish_time.duration_since(start_time);
1808 log::info!(target: &log_target, "processed slot {}", block.slot);
1809 let elapsed_pretty = human_readable_duration(elapsed);
1810 log::info!(
1811 target: &log_target,
1812 "processed {} slots across {} epochs in {}.",
1813 slot_range.end - slot_range.start,
1814 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1815 elapsed_pretty
1816 );
1817 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1818 let summary: String = error_counts
1821 .iter()
1822 .enumerate()
1823 .filter_map(|(i, c)| {
1824 let v = c.load(Ordering::Relaxed);
1825 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1826 })
1827 .collect::<Vec<_>>()
1828 .join(", ");
1829 if !summary.is_empty() {
1830 log::debug!(target: &log_target, "threads with errors: {}", summary);
1831 }
1832 return Ok(());
1833 }
1834 }
1835 }
1836 Ok(())
1837}
1838.await
1839{
1840 if is_shutdown_error(&err) {
1841 log::info!(
1842 target: &log_target,
1843 "shutdown requested; terminating firehose thread {:?}",
1844 thread_index
1845 );
1846 return Ok(());
1847 }
1848 log::error!(
1849 target: &log_target,
1850 "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1851 slot,
1852 slot_to_epoch(slot)
1853 );
1854 log::error!(target: &log_target, "{}", err);
1855 let item_index = match err {
1856 FirehoseError::NodeDecodingError(item_index, _) => item_index,
1857 _ => 0,
1858 };
1859 let idx = thread_index.unwrap_or(0);
1861 error_counts[idx].fetch_add(1, Ordering::Relaxed);
1862 log::warn!(
1863 target: &log_target,
1864 "restarting from slot {} at index {}",
1865 slot,
1866 item_index,
1867 );
1868 slot_range.start = slot;
1870 skip_until_index = Some(item_index);
1871 }
1872 Ok(())
1873}
1874
1875#[inline]
1876fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
1877 if !(1..=2).contains(&versioned_tx.signatures.len()) {
1878 return false;
1879 }
1880
1881 if !matches!(
1882 versioned_tx.version(),
1883 solana_sdk::transaction::TransactionVersion::Legacy(_)
1884 ) {
1885 return false;
1886 }
1887
1888 let instructions = versioned_tx.message.instructions();
1889 if instructions.len() != 1 {
1890 return false;
1891 }
1892
1893 let program_index = instructions[0].program_id_index as usize;
1894 versioned_tx
1895 .message
1896 .static_account_keys()
1897 .get(program_index)
1898 .map(|program_id| program_id == &vote_program_id())
1899 .unwrap_or(false)
1900}
1901
1902#[inline(always)]
1903fn convert_proto_rewards(
1904 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
1905) -> Result<Vec<(Pubkey, RewardInfo)>, Box<dyn std::error::Error>> {
1906 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
1907 for proto_reward in proto_rewards.rewards.iter() {
1908 let reward = RewardInfo {
1909 reward_type: match proto_reward.reward_type - 1 {
1910 0 => RewardType::Fee,
1911 1 => RewardType::Rent,
1912 2 => RewardType::Staking,
1913 3 => RewardType::Voting,
1914 typ => {
1915 return Err(Box::new(std::io::Error::other(format!(
1916 "unsupported reward type {}",
1917 typ
1918 ))));
1919 }
1920 },
1921 lamports: proto_reward.lamports,
1922 post_balance: proto_reward.post_balance,
1923 commission: proto_reward.commission.parse::<u8>().ok(),
1924 };
1925 let pubkey = proto_reward
1926 .pubkey
1927 .parse::<Pubkey>()
1928 .map_err(|err| Box::new(err) as Box<dyn std::error::Error>)?;
1929 keyed_rewards.push((pubkey, reward));
1930 }
1931 Ok(keyed_rewards)
1932}
1933
1934#[inline]
1935pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
1937 let total = slot_range.end - slot_range.start;
1938 let slots_per_thread = total / threads;
1939 let remainder = total % threads;
1940
1941 let ranges: Vec<Range<u64>> = (0..threads)
1942 .map(|i| {
1943 let extra_slot = if i < remainder { 1 } else { 0 };
1945 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
1946 let end = start + slots_per_thread + extra_slot;
1947 start..end
1948 })
1949 .collect();
1950
1951 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1953 assert_eq!(
1954 total_covered, total,
1955 "Range generation failed: {} threads should cover {} slots but only cover {}",
1956 threads, total, total_covered
1957 );
1958
1959 for i in 1..ranges.len() {
1961 assert_eq!(
1962 ranges[i - 1].end,
1963 ranges[i].start,
1964 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
1965 i - 1,
1966 ranges[i - 1].end,
1967 i,
1968 ranges[i].start
1969 );
1970 }
1971
1972 log::info!(
1973 target: LOG_MODULE,
1974 "Generated {} thread ranges covering {} slots total",
1975 threads,
1976 total_covered
1977 );
1978 ranges
1979}
1980
1981fn human_readable_duration(duration: std::time::Duration) -> String {
1982 if duration.is_zero() {
1983 return "0s".into();
1984 }
1985 let total_secs = duration.as_secs();
1986 if total_secs < 60 {
1987 let secs_f = duration.as_secs_f64();
1988 if total_secs == 0 {
1989 format!("{:.2}s", secs_f)
1990 } else if duration.subsec_millis() == 0 {
1991 format!("{}s", total_secs)
1992 } else {
1993 format!("{:.2}s", secs_f)
1994 }
1995 } else {
1996 let mut secs = total_secs;
1997 let days = secs / 86_400;
1998 secs %= 86_400;
1999 let hours = secs / 3_600;
2000 secs %= 3_600;
2001 let minutes = secs / 60;
2002 secs %= 60;
2003 if days > 0 {
2004 if hours > 0 {
2005 format!("{days}d{hours}h")
2006 } else {
2007 format!("{days}d")
2008 }
2009 } else if hours > 0 {
2010 if minutes > 0 {
2011 format!("{hours}h{minutes}m")
2012 } else {
2013 format!("{hours}h")
2014 }
2015 } else if minutes > 0 {
2016 if secs > 0 {
2017 format!("{minutes}m{secs}s")
2018 } else {
2019 format!("{minutes}m")
2020 }
2021 } else {
2022 format!("{secs}s")
2023 }
2024 }
2025}
2026
2027#[cfg(test)]
2028fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2029 Box::pin(async move {
2030 let elapsed = stats.start_time.elapsed();
2031 let elapsed_secs = elapsed.as_secs_f64();
2032 let tps = if elapsed_secs > 0.0 {
2033 stats.transactions_processed as f64 / elapsed_secs
2034 } else {
2035 0.0
2036 };
2037 log::info!(
2038 target: LOG_MODULE,
2039 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2040 stats.thread_stats.current_slot,
2041 stats.slots_processed,
2042 stats.blocks_processed,
2043 stats.transactions_processed,
2044 stats.entries_processed,
2045 stats.rewards_processed,
2046 elapsed_secs,
2047 tps
2048 );
2049 Ok(())
2050 })
2051}
2052
2053#[tokio::test(flavor = "multi_thread")]
2054async fn test_firehose_epoch_800() {
2055 use std::sync::atomic::{AtomicU64, Ordering};
2056 solana_logger::setup_with_default("info");
2057 const THREADS: usize = 4;
2058 const NUM_SLOTS_TO_COVER: u64 = 50;
2059 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2060 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2061 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2062 let stats_tracking = StatsTracking {
2063 on_stats: log_stats_handler,
2064 tracking_interval_slots: 10,
2065 };
2066
2067 firehose(
2068 THREADS.try_into().unwrap(),
2069 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2070 Some(|thread_id: usize, block: BlockData| {
2071 async move {
2072 let prev =
2073 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2074 if block.was_skipped() {
2075 log::info!(
2076 target: LOG_MODULE,
2077 "leader skipped block {} on thread {}",
2078 block.slot(),
2079 thread_id,
2080 );
2081 } else {
2082 }
2089
2090 if prev > 0 {
2091 assert_eq!(prev + 1, block.slot());
2092 }
2093 if block.was_skipped() {
2094 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2095 } else {
2096 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2097 }
2098 Ok(())
2099 }
2100 .boxed()
2101 }),
2102 None::<OnTxFn>,
2103 None::<OnEntryFn>,
2104 None::<OnRewardFn>,
2105 Some(stats_tracking),
2106 None,
2107 )
2108 .await
2109 .unwrap();
2110 assert_eq!(
2111 NUM_BLOCKS.load(Ordering::Relaxed) + NUM_SKIPPED_BLOCKS.load(Ordering::Relaxed),
2112 NUM_SLOTS_TO_COVER
2113 );
2114 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2115}