1use crossbeam_channel::{Receiver, Sender, unbounded};
2#[cfg(test)]
3use futures_util::FutureExt;
4use futures_util::future::BoxFuture;
5use reqwest::{Client, Url};
6use solana_address::Address;
7use solana_geyser_plugin_manager::{
8 block_metadata_notifier_interface::BlockMetadataNotifier,
9 geyser_plugin_service::GeyserPluginServiceError,
10};
11use solana_hash::Hash;
12use solana_ledger::entry_notifier_interface::EntryNotifier;
13use solana_reward_info::RewardInfo;
14use solana_rpc::{
15 optimistically_confirmed_bank_tracker::SlotNotification,
16 transaction_notifier_interface::TransactionNotifier,
17};
18use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
19use solana_sdk_ids::vote::id as vote_program_id;
20use solana_transaction::versioned::VersionedTransaction;
21use std::{
22 fmt::Display,
23 future::Future,
24 io,
25 ops::Range,
26 path::PathBuf,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
30 },
31};
32use thiserror::Error;
33use tokio::{
34 sync::broadcast::{self, error::TryRecvError},
35 time::timeout,
36};
37
38use crate::{
39 LOG_MODULE,
40 epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
41 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
42 node_reader::NodeReader,
43 utils,
44};
45
46const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
50
51fn poll_shutdown(
52 flag: &Arc<std::sync::atomic::AtomicBool>,
53 receiver: &mut Option<broadcast::Receiver<()>>,
54) -> bool {
55 if let Some(rx) = receiver {
56 match rx.try_recv() {
57 Ok(_) | Err(TryRecvError::Lagged(_)) => {
58 flag.store(true, Ordering::SeqCst);
59 }
60 Err(TryRecvError::Closed) => {
61 flag.store(true, Ordering::SeqCst);
62 }
63 Err(TryRecvError::Empty) => {}
64 }
65 }
66 flag.load(Ordering::SeqCst)
67}
68
69fn is_shutdown_error(err: &FirehoseError) -> bool {
70 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
71 inner
72 .downcast_ref::<io::Error>()
73 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
74 .unwrap_or(false)
75 }
76
77 match err {
78 FirehoseError::BlockHandlerError(inner)
79 | FirehoseError::TransactionHandlerError(inner)
80 | FirehoseError::EntryHandlerError(inner)
81 | FirehoseError::RewardHandlerError(inner)
82 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
83 _ => false,
84 }
85}
86
87#[derive(Debug, Error)]
90pub enum FirehoseError {
91 Reqwest(reqwest::Error),
93 ReadHeader(Box<dyn std::error::Error>),
95 GeyserPluginService(GeyserPluginServiceError),
97 FailedToGetTransactionNotifier,
99 ReadUntilBlockError(Box<dyn std::error::Error>),
101 GetBlockError(Box<dyn std::error::Error>),
103 NodeDecodingError(usize, Box<dyn std::error::Error>),
105 SlotOffsetIndexError(SlotOffsetIndexError),
107 SeekToSlotError(Box<dyn std::error::Error>),
109 OnLoadError(Box<dyn std::error::Error>),
111 OnStatsHandlerError(Box<dyn std::error::Error>),
113 OperationTimeout(&'static str),
115 TransactionHandlerError(Box<dyn std::error::Error>),
117 EntryHandlerError(Box<dyn std::error::Error>),
119 RewardHandlerError(Box<dyn std::error::Error>),
121 BlockHandlerError(Box<dyn std::error::Error>),
123}
124
125impl Display for FirehoseError {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
129 FirehoseError::ReadHeader(error) => {
130 write!(f, "Error reading header: {}", error)
131 }
132 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
133 f,
134 "Error initializing geyser plugin service: {}",
135 geyser_plugin_service_error
136 ),
137 FirehoseError::FailedToGetTransactionNotifier => write!(
138 f,
139 "Failed to get transaction notifier from GeyserPluginService"
140 ),
141 FirehoseError::ReadUntilBlockError(error) => {
142 write!(f, "Error reading until block: {}", error)
143 }
144 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
145 FirehoseError::NodeDecodingError(item_index, error) => {
146 write!(
147 f,
148 "Error seeking, reading data from, or decoding data for data node {}: {}",
149 item_index, error
150 )
151 }
152 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
153 f,
154 "Error getting info from slot offset index: {}",
155 slot_offset_index_error
156 ),
157 FirehoseError::SeekToSlotError(error) => {
158 write!(f, "Error seeking to slot: {}", error)
159 }
160 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
161 FirehoseError::OnStatsHandlerError(error) => {
162 write!(f, "Stats handler error: {}", error)
163 }
164 FirehoseError::OperationTimeout(op) => {
165 write!(f, "Timeout while waiting for operation: {}", op)
166 }
167 FirehoseError::TransactionHandlerError(error) => {
168 write!(f, "Transaction handler error: {}", error)
169 }
170 FirehoseError::EntryHandlerError(error) => {
171 write!(f, "Entry handler error: {}", error)
172 }
173 FirehoseError::RewardHandlerError(error) => {
174 write!(f, "Reward handler error: {}", error)
175 }
176 FirehoseError::BlockHandlerError(error) => {
177 write!(f, "Block handler error: {}", error)
178 }
179 }
180 }
181}
182
183impl From<reqwest::Error> for FirehoseError {
184 fn from(e: reqwest::Error) -> Self {
185 FirehoseError::Reqwest(e)
186 }
187}
188
189impl From<GeyserPluginServiceError> for FirehoseError {
190 fn from(e: GeyserPluginServiceError) -> Self {
191 FirehoseError::GeyserPluginService(e)
192 }
193}
194
195impl From<SlotOffsetIndexError> for FirehoseError {
196 fn from(e: SlotOffsetIndexError) -> Self {
197 FirehoseError::SlotOffsetIndexError(e)
198 }
199}
200
201#[derive(Clone, PartialEq, Eq, Hash, Debug)]
203pub struct ThreadStats {
204 pub thread_id: usize,
206 pub start_time: std::time::Instant,
208 pub finish_time: Option<std::time::Instant>,
210 pub slot_range: Range<u64>,
212 pub current_slot: u64,
214 pub slots_processed: u64,
216 pub blocks_processed: u64,
218 pub leader_skipped_slots: u64,
220 pub transactions_processed: u64,
222 pub entries_processed: u64,
224 pub rewards_processed: u64,
226}
227
228#[derive(Clone, PartialEq, Eq, Hash, Debug)]
230pub struct Stats {
231 pub thread_stats: ThreadStats,
233 pub start_time: std::time::Instant,
235 pub finish_time: Option<std::time::Instant>,
237 pub slot_range: Range<u64>,
239 pub slots_processed: u64,
241 pub blocks_processed: u64,
243 pub leader_skipped_slots: u64,
245 pub transactions_processed: u64,
247 pub entries_processed: u64,
249 pub rewards_processed: u64,
251 pub transactions_since_last_pulse: u64,
253 pub blocks_since_last_pulse: u64,
255 pub slots_since_last_pulse: u64,
257 pub time_since_last_pulse: std::time::Duration,
259}
260
261#[derive(Clone, PartialEq, Eq, Hash, Debug)]
263pub struct StatsTracking<OnStats: Handler<Stats>> {
264 pub on_stats: OnStats,
266 pub tracking_interval_slots: u64,
268}
269
270#[inline(always)]
271#[allow(clippy::too_many_arguments)]
272async fn maybe_emit_stats<OnStats: Handler<Stats>>(
273 stats_tracking: Option<&StatsTracking<OnStats>>,
274 thread_index: usize,
275 thread_stats: &ThreadStats,
276 overall_slots_processed: &AtomicU64,
277 overall_blocks_processed: &AtomicU64,
278 overall_transactions_processed: &AtomicU64,
279 overall_entries_processed: &AtomicU64,
280 transactions_since_stats: &AtomicU64,
281 blocks_since_stats: &AtomicU64,
282 slots_since_stats: &AtomicU64,
283 last_pulse: &Arc<AtomicU64>,
284 base_instant: std::time::Instant,
285) -> Result<(), (FirehoseError, u64)> {
286 if let Some(stats_tracker) = stats_tracking {
287 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
288 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
289 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
290 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
291 let now_nanos = base_instant.elapsed().as_nanos() as u64;
292 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
293 let delta_nanos = now_nanos.saturating_sub(previous);
294 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
295 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
296 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
297 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
298
299 let stats = Stats {
300 thread_stats: thread_stats.clone(),
301 start_time: thread_stats.start_time,
302 finish_time: thread_stats.finish_time,
303 slot_range: thread_stats.slot_range.clone(),
304 slots_processed: total_slots,
305 blocks_processed: total_blocks,
306 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
307 transactions_processed: total_transactions,
308 entries_processed: total_entries,
309 rewards_processed: thread_stats.rewards_processed,
310 transactions_since_last_pulse: processed_transactions,
311 blocks_since_last_pulse: processed_blocks,
312 slots_since_last_pulse: processed_slots,
313 time_since_last_pulse,
314 };
315
316 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
317 last_pulse.store(previous, Ordering::Relaxed);
318 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
319 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
320 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
321 return Err((
322 FirehoseError::OnStatsHandlerError(e),
323 thread_stats.current_slot,
324 ));
325 }
326 }
327 Ok(())
328}
329
330#[inline(always)]
331fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
332 if tracking_enabled {
333 atomic.fetch_add(value, Ordering::Relaxed);
334 }
335}
336
337#[derive(Debug, Clone)]
339pub struct TransactionData {
340 pub slot: u64,
342 pub transaction_slot_index: usize,
344 pub signature: solana_signature::Signature,
346 pub message_hash: Hash,
348 pub is_vote: bool,
350 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
352 pub transaction: VersionedTransaction,
354}
355
356#[derive(Debug, Clone)]
358pub struct EntryData {
359 pub slot: u64,
361 pub entry_index: usize,
363 pub transaction_indexes: Range<usize>,
365 pub num_hashes: u64,
367 pub hash: Hash,
369}
370
371#[derive(Debug, Clone)]
373pub struct RewardsData {
374 pub slot: u64,
376 pub rewards: Vec<(Address, RewardInfo)>,
378}
379
380#[derive(Debug)]
382pub enum BlockData {
383 Block {
385 parent_slot: u64,
387 parent_blockhash: Hash,
389 slot: u64,
391 blockhash: Hash,
393 rewards: KeyedRewardsAndNumPartitions,
395 block_time: Option<i64>,
397 block_height: Option<u64>,
399 executed_transaction_count: u64,
401 entry_count: u64,
403 },
404 LeaderSkipped {
406 slot: u64,
408 },
409}
410
411impl BlockData {
412 #[inline(always)]
414 pub const fn slot(&self) -> u64 {
415 match self {
416 BlockData::Block { slot, .. } => *slot,
417 BlockData::LeaderSkipped { slot } => *slot,
418 }
419 }
420
421 #[inline(always)]
423 pub const fn was_skipped(&self) -> bool {
424 matches!(self, BlockData::LeaderSkipped { .. })
425 }
426}
427
428type HandlerResult = Result<(), Box<dyn std::error::Error + Send + 'static>>;
429type HandlerFuture = BoxFuture<'static, HandlerResult>;
430
431pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
433
434impl<Data, F> Handler<Data> for F where
435 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
436{
437}
438
439pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
441pub type OnBlockFn = HandlerFn<BlockData>;
443pub type OnTxFn = HandlerFn<TransactionData>;
445pub type OnEntryFn = HandlerFn<EntryData>;
447pub type OnRewardFn = HandlerFn<RewardsData>;
449pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
451
452#[inline]
454#[allow(clippy::too_many_arguments)]
455pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats>(
456 threads: u64,
457 slot_range: Range<u64>,
458 on_block: Option<OnBlock>,
459 on_tx: Option<OnTransaction>,
460 on_entry: Option<OnEntry>,
461 on_rewards: Option<OnRewards>,
462 stats_tracking: Option<StatsTracking<OnStats>>,
463 shutdown_signal: Option<broadcast::Receiver<()>>,
464) -> Result<(), (FirehoseError, u64)>
465where
466 OnBlock: Handler<BlockData>,
467 OnTransaction: Handler<TransactionData>,
468 OnEntry: Handler<EntryData>,
469 OnRewards: Handler<RewardsData>,
470 OnStats: Handler<Stats>,
471{
472 if threads == 0 {
473 return Err((
474 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
475 slot_range.start,
476 ));
477 }
478 let client = Client::new();
479 log::info!(target: LOG_MODULE, "starting firehose...");
480 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
481
482 let slot_range = Arc::new(slot_range);
483
484 let subranges = generate_subranges(&slot_range, threads);
486 if threads > 1 {
487 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
488 }
489
490 let firehose_start = std::time::Instant::now();
491 let shutdown_flag = Arc::new(AtomicBool::new(false));
492 if let Some(ref rx) = shutdown_signal {
493 let mut rx = rx.resubscribe();
494 let flag = shutdown_flag.clone();
495 tokio::spawn(async move {
496 if rx.recv().await.is_ok() {
497 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
498 flag.store(true, Ordering::SeqCst);
499 }
500 });
501 }
502 let mut handles = Vec::new();
503 let error_counts: Arc<Vec<AtomicU32>> =
505 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
506
507 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
508 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
509 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
510 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
511
512 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
513 let error_counts = error_counts.clone();
514 let client = client.clone();
515 let on_block = on_block.clone();
516 let on_tx = on_tx.clone();
517 let on_entry = on_entry.clone();
518 let on_reward = on_rewards.clone();
519 let overall_slots_processed = overall_slots_processed.clone();
520 let overall_blocks_processed = overall_blocks_processed.clone();
521 let overall_transactions_processed = overall_transactions_processed.clone();
522 let overall_entries_processed = overall_entries_processed.clone();
523 let stats_tracking = stats_tracking.clone();
524 let transactions_since_stats = Arc::new(AtomicU64::new(0));
525 let blocks_since_stats = Arc::new(AtomicU64::new(0));
526 let slots_since_stats = Arc::new(AtomicU64::new(0));
527 let last_pulse = Arc::new(AtomicU64::new(0));
528 let transactions_since_stats_cloned = transactions_since_stats.clone();
529 let blocks_since_stats_cloned = blocks_since_stats.clone();
530 let slots_since_stats_cloned = slots_since_stats.clone();
531 let last_pulse_cloned = last_pulse.clone();
532 let shutdown_flag = shutdown_flag.clone();
533 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
534
535 let handle = tokio::spawn(async move {
536 let transactions_since_stats = transactions_since_stats_cloned;
537 let blocks_since_stats = blocks_since_stats_cloned;
538 let slots_since_stats = slots_since_stats_cloned;
539 let last_pulse = last_pulse_cloned;
540 let mut shutdown_rx = thread_shutdown_rx;
541 let start_time = std::time::Instant::now();
542 last_pulse.store(0, Ordering::Relaxed);
543 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
544 let mut skip_until_index = None;
545 let block_enabled = on_block.is_some();
546 let tx_enabled = on_tx.is_some();
547 let entry_enabled = on_entry.is_some();
548 let reward_enabled = on_reward.is_some();
549 let tracking_enabled = stats_tracking.is_some();
550 let mut last_counted_slot = slot_range.start.saturating_sub(1);
551
552 while let Err((err, slot)) = async {
554 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
555 log::info!(
556 target: &log_target,
557 "shutdown requested; terminating firehose thread {}",
558 thread_index
559 );
560 return Ok(());
561 }
562 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
563 log::info!(
564 target: &log_target,
565 "slot range: {} (epoch {}) ... {} (epoch {})",
566 slot_range.start,
567 slot_to_epoch(slot_range.start),
568 slot_range.end,
569 slot_to_epoch(slot_range.end)
570 );
571
572 log::info!(target: &log_target, "🚒 starting firehose...");
573
574 let mut current_slot: Option<u64> = None;
576 let mut previous_slot: Option<u64> = Some(slot_range.start.saturating_sub(1));
577 for epoch_num in epoch_range.clone() {
578 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
579 log::info!(
580 target: &log_target,
581 "shutdown requested; terminating firehose thread {}",
582 thread_index
583 );
584 return Ok(());
585 }
586 log::info!(target: &log_target, "entering epoch {}", epoch_num);
587 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
588 Ok(stream) => stream,
589 Err(_) => {
590 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
591 }
592 };
593 let mut reader = NodeReader::new(stream);
594
595 let header_fut = reader.read_raw_header();
596 let header = match timeout(OP_TIMEOUT, header_fut).await {
597 Ok(res) => res
598 .map_err(FirehoseError::ReadHeader)
599 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
600 Err(_) => {
601 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
602 }
603 };
604 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
605
606 let mut previous_blockhash = Hash::default();
607 let mut latest_entry_blockhash = Hash::default();
608
609 let mut thread_stats = if tracking_enabled {
610 Some(ThreadStats {
611 thread_id: thread_index,
612 start_time,
613 finish_time: None,
614 slot_range: slot_range.clone(),
615 current_slot: slot_range.start,
616 slots_processed: 0,
617 blocks_processed: 0,
618 leader_skipped_slots: 0,
619 transactions_processed: 0,
620 entries_processed: 0,
621 rewards_processed: 0,
622 })
623 } else {
624 None
625 };
626
627 if slot_range.start > epoch_to_slot_range(epoch_num).0 {
628 let seek_slot = slot_range.start.saturating_sub(1);
631 let seek_fut = reader.seek_to_slot(seek_slot);
632 match timeout(OP_TIMEOUT, seek_fut).await {
633 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
634 Err(_) => {
635 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
636 }
637 }
638 }
639
640 let mut item_index = 0;
642 let mut displayed_skip_message = false;
643 loop {
644 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
645 log::info!(
646 target: &log_target,
647 "shutdown requested; terminating firehose thread {}",
648 thread_index
649 );
650 return Ok(());
651 }
652 let read_fut = reader.read_until_block();
653 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
654 Ok(result) => result
655 .map_err(FirehoseError::ReadUntilBlockError)
656 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
657 Err(_) => {
658 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
659 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
660 }
661 };
662 if nodes.is_empty() {
663 log::info!(
664 target: &log_target,
665 "reached end of epoch {}",
666 epoch_num
667 );
668 break;
669 }
670 if let Some(last_node) = nodes.0.last()
671 && !last_node.get_node().is_block()
672 {
673 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
674 break;
675 }
676 let block = nodes
677 .get_block()
678 .map_err(FirehoseError::GetBlockError)
679 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
680 log::debug!(
681 target: &log_target,
682 "read {} items from epoch {}, now at slot {}",
683 item_index,
684 epoch_num,
685 block.slot
686 );
687 let slot = block.slot;
688 if slot >= slot_range.end {
689 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
690 let target_last_slot = slot_range.end.saturating_sub(1);
695 let skip_start_from_previous = previous_slot
696 .map(|s| s.saturating_add(1))
697 .unwrap_or(slot_range.start);
698 let first_untracked_slot = last_counted_slot.saturating_add(1);
699 let skip_start = std::cmp::max(skip_start_from_previous, first_untracked_slot);
700
701 if skip_start <= target_last_slot {
702 if block_enabled
703 && let Some(on_block_cb) = on_block.as_ref() {
704 for skipped_slot in skip_start..=target_last_slot {
705 log::debug!(
706 target: &log_target,
707 "leader skipped slot {} (prev_counted {}, target {})",
708 skipped_slot,
709 last_counted_slot,
710 target_last_slot,
711 );
712 on_block_cb(
713 thread_index,
714 BlockData::LeaderSkipped { slot: skipped_slot },
715 )
716 .await
717 .map_err(|e| FirehoseError::BlockHandlerError(e))
718 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
719 }
720 }
721
722 let missing_slots = target_last_slot.saturating_sub(skip_start) + 1;
723 if tracking_enabled {
724 if let Some(ref mut stats) = thread_stats {
725 stats.leader_skipped_slots += missing_slots;
726 stats.slots_processed += missing_slots;
727 stats.current_slot = target_last_slot;
728 }
729 overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
730 slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
731 }
732 last_counted_slot = target_last_slot;
733 }
734
735 return Ok(());
736 }
737 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
738 if slot < slot_range.start {
739 if slot.saturating_add(1) == slot_range.start {
740 log::debug!(
741 target: &log_target,
742 "priming reader with preceding slot {}, skipping",
743 slot
744 );
745 } else {
746 log::warn!(
747 target: &log_target,
748 "encountered slot {} before start of range {}, skipping",
749 slot,
750 slot_range.start
751 );
752 }
753 continue;
754 }
755 if current_slot.is_some() {
756 previous_slot = current_slot;
757 }
758 current_slot = Some(slot);
759 let mut entry_index: usize = 0;
760 let mut this_block_executed_transaction_count: u64 = 0;
761 let mut this_block_entry_count: u64 = 0;
762 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
763
764 for node_with_cid in &nodes.0 {
765 item_index += 1;
766 if let Some(skip) = skip_until_index {
767 if item_index < skip {
768 if !displayed_skip_message {
769 log::info!(
770 target: &log_target,
771 "skipping until index {} (at {})",
772 skip,
773 item_index
774 );
775 displayed_skip_message = true;
776 }
777 continue;
778 } else {
779 log::info!(
780 target: &log_target,
781 "reached target index {}, resuming...",
782 skip
783 );
784 skip_until_index = None;
785 }
786 }
787 let node = node_with_cid.get_node();
788
789 if let Some(ref mut stats) = thread_stats {
790 stats.current_slot = slot;
791 }
792
793 let error_slot = current_slot.unwrap_or(slot_range.start);
794
795 use crate::node::Node::*;
796 match node {
797 Transaction(tx) => {
798 if tx_enabled
799 && let Some(on_tx_cb) = on_tx.as_ref()
800 {
801 let error_slot = current_slot.unwrap_or(slot_range.start);
802 let versioned_tx = tx.as_parsed().map_err(|err| {
803 (
804 FirehoseError::NodeDecodingError(item_index, err),
805 error_slot,
806 )
807 })?;
808 let reassembled_metadata = nodes
809 .reassemble_dataframes(tx.metadata.clone())
810 .map_err(|err| {
811 (
812 FirehoseError::NodeDecodingError(item_index, err),
813 error_slot,
814 )
815 })?;
816
817 let decompressed =
818 utils::decompress_zstd(reassembled_metadata.clone())
819 .map_err(|err| {
820 (
821 FirehoseError::NodeDecodingError(
822 item_index,
823 err,
824 ),
825 error_slot,
826 )
827 })?;
828
829 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
830 prost_011::Message::decode(decompressed.as_slice())
831 .map_err(|err| {
832 (
833 FirehoseError::NodeDecodingError(
834 item_index,
835 Box::new(err),
836 ),
837 error_slot,
838 )
839 })?;
840
841 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
842 metadata.try_into().map_err(|err| {
843 (
844 FirehoseError::NodeDecodingError(
845 item_index,
846 Box::new(err),
847 ),
848 error_slot,
849 )
850 })?;
851
852 let message_hash = {
853 #[cfg(feature = "verify-transaction-signatures")]
854 {
855 versioned_tx.verify_and_hash_message().map_err(|err| {
856 (
857 FirehoseError::TransactionHandlerError(Box::new(err)),
858 error_slot,
859 )
860 })?
861 }
862 #[cfg(not(feature = "verify-transaction-signatures"))]
863 {
864 versioned_tx.message.hash()
865 }
866 };
867 let signature = versioned_tx
868 .signatures
869 .first()
870 .ok_or_else(|| {
871 Box::new(std::io::Error::new(
872 std::io::ErrorKind::InvalidData,
873 "transaction missing signature",
874 )) as Box<dyn std::error::Error>
875 })
876 .map_err(|err| {
877 (
878 FirehoseError::NodeDecodingError(
879 item_index,
880 err,
881 ),
882 error_slot,
883 )
884 })?;
885 let is_vote = is_simple_vote_transaction(&versioned_tx);
886
887 on_tx_cb(
888 thread_index,
889 TransactionData {
890 slot: block.slot,
891 transaction_slot_index: tx.index.unwrap() as usize,
892 signature: *signature,
893 message_hash,
894 is_vote,
895 transaction_status_meta: as_native_metadata.clone(),
896 transaction: versioned_tx.clone(),
897 },
898 )
899 .await
900 .map_err(|e| {
901 (
902 FirehoseError::TransactionHandlerError(e),
903 error_slot,
904 )
905 })?;
906 }
907 fetch_add_if(
908 tracking_enabled,
909 &overall_transactions_processed,
910 1,
911 );
912 if let Some(ref mut stats) = thread_stats {
913 stats.transactions_processed += 1;
914 }
915 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
916 }
917 Entry(entry) => {
918 let entry_hash = Hash::from(entry.hash.to_bytes());
919 let entry_transaction_count = entry.transactions.len();
920 let entry_transaction_count_u64 = entry_transaction_count as u64;
921 let starting_transaction_index_u64 =
922 this_block_executed_transaction_count;
923 latest_entry_blockhash = entry_hash;
924 this_block_executed_transaction_count += entry_transaction_count_u64;
925 this_block_entry_count += 1;
926
927 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
928 let starting_transaction_index = usize::try_from(
929 starting_transaction_index_u64,
930 )
931 .map_err(|err| {
932 (
933 FirehoseError::EntryHandlerError(Box::new(err)),
934 error_slot,
935 )
936 })?;
937 let transaction_indexes_end =
938 starting_transaction_index + entry_transaction_count;
939 on_entry_cb(
940 thread_index,
941 EntryData {
942 slot: block.slot,
943 entry_index,
944 transaction_indexes: starting_transaction_index
945 ..transaction_indexes_end,
946 num_hashes: entry.num_hashes,
947 hash: entry_hash,
948 },
949 )
950 .await
951 .map_err(|e| {
952 (
953 FirehoseError::EntryHandlerError(e),
954 error_slot,
955 )
956 })?;
957 }
958 entry_index += 1;
959 fetch_add_if(
960 tracking_enabled,
961 &overall_entries_processed,
962 1,
963 );
964 if let Some(ref mut stats) = thread_stats {
965 stats.entries_processed += 1;
966 }
967 }
968 Block(block) => {
969 let prev_last_counted_slot = last_counted_slot;
970 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
971 (
972 stats.slots_processed,
973 stats.blocks_processed,
974 stats.leader_skipped_slots,
975 stats.current_slot,
976 )
977 });
978
979 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
980 let skip_start_from_previous = previous_slot
981 .map(|s| s.saturating_add(1))
982 .unwrap_or(next_expected_slot);
983 let skip_start = skip_start_from_previous.max(next_expected_slot);
984
985 for skipped_slot in skip_start..slot {
986 log::debug!(
987 target: &log_target,
988 "leader skipped slot {} (prev_counted {}, current slot {})",
989 skipped_slot,
990 prev_last_counted_slot,
991 slot,
992 );
993 if block_enabled
994 && let Some(on_block_cb) = on_block.as_ref() {
995 on_block_cb(
996 thread_index,
997 BlockData::LeaderSkipped {
998 slot: skipped_slot,
999 },
1000 )
1001 .await
1002 .map_err(|e| {
1003 (
1004 FirehoseError::BlockHandlerError(e),
1005 error_slot,
1006 )
1007 })?;
1008 }
1009 if tracking_enabled {
1010 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1011 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1012 if let Some(ref mut stats) = thread_stats {
1013 stats.leader_skipped_slots += 1;
1014 stats.slots_processed += 1;
1015 stats.current_slot = skipped_slot;
1016 }
1017 }
1018 last_counted_slot = skipped_slot;
1019 }
1020
1021 if slot <= last_counted_slot {
1022 log::debug!(
1023 target: &log_target,
1024 "duplicate block {}, already counted (last_counted={})",
1025 slot,
1026 last_counted_slot,
1027 );
1028 this_block_rewards.clear();
1029 continue;
1030 }
1031
1032 if block_enabled {
1033 if let Some(on_block_cb) = on_block.as_ref() {
1034 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1035 on_block_cb(
1036 thread_index,
1037 BlockData::Block {
1038 parent_slot: block.meta.parent_slot,
1039 parent_blockhash: previous_blockhash,
1040 slot: block.slot,
1041 blockhash: latest_entry_blockhash,
1042 rewards: KeyedRewardsAndNumPartitions {
1043 keyed_rewards,
1044 num_partitions: None,
1045 },
1046 block_time: Some(block.meta.blocktime as i64),
1047 block_height: block.meta.block_height,
1048 executed_transaction_count:
1049 this_block_executed_transaction_count,
1050 entry_count: this_block_entry_count,
1051 },
1052 )
1053 .await
1054 .map_err(|e| {
1055 (
1056 FirehoseError::BlockHandlerError(e),
1057 error_slot,
1058 )
1059 })?;
1060 }
1061 } else {
1062 this_block_rewards.clear();
1063 }
1064 previous_blockhash = latest_entry_blockhash;
1065
1066 if tracking_enabled {
1067 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1068 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1069 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1070 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1071 if let Some(ref mut stats) = thread_stats {
1072 stats.blocks_processed += 1;
1073 stats.slots_processed += 1;
1074 stats.current_slot = slot;
1075 }
1076
1077 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1078 (&stats_tracking, thread_stats.as_mut())
1079 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1080 && let Err(err) = maybe_emit_stats(
1081 stats_tracking.as_ref(),
1082 thread_index,
1083 thread_stats_ref,
1084 &overall_slots_processed,
1085 &overall_blocks_processed,
1086 &overall_transactions_processed,
1087 &overall_entries_processed,
1088 &transactions_since_stats,
1089 &blocks_since_stats,
1090 &slots_since_stats,
1091 &last_pulse,
1092 start_time,
1093 )
1094 .await
1095 {
1096 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1097 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1098 overall_blocks_processed
1099 .fetch_sub(1, Ordering::Relaxed);
1100 overall_slots_processed
1101 .fetch_sub(1, Ordering::Relaxed);
1102 if let Some((
1103 prev_slots_processed,
1104 prev_blocks_processed,
1105 prev_leader_skipped,
1106 prev_current_slot,
1107 )) = thread_stats_snapshot
1108 {
1109 thread_stats_ref.slots_processed =
1110 prev_slots_processed;
1111 thread_stats_ref.blocks_processed =
1112 prev_blocks_processed;
1113 thread_stats_ref.leader_skipped_slots =
1114 prev_leader_skipped;
1115 thread_stats_ref.current_slot =
1116 prev_current_slot;
1117 }
1118 last_counted_slot = prev_last_counted_slot;
1119 return Err(err);
1120 }
1121 }
1122
1123 last_counted_slot = slot;
1124 }
1125 Subset(_subset) => (),
1126 Epoch(_epoch) => (),
1127 Rewards(rewards) => {
1128 if reward_enabled || block_enabled {
1129 let reassembled = nodes
1130 .reassemble_dataframes(rewards.data.clone())
1131 .map_err(|err| {
1132 (
1133 FirehoseError::NodeDecodingError(item_index, err),
1134 current_slot.unwrap_or(slot_range.start),
1135 )
1136 })?;
1137 if reassembled.is_empty() {
1138 this_block_rewards.clear();
1139 if reward_enabled
1140 && let Some(on_reward_cb) = on_reward.as_ref()
1141 {
1142 on_reward_cb(
1143 thread_index,
1144 RewardsData {
1145 slot: block.slot,
1146 rewards: Vec::new(),
1147 },
1148 )
1149 .await
1150 .map_err(|e| {
1151 (
1152 FirehoseError::RewardHandlerError(e),
1153 error_slot,
1154 )
1155 })?;
1156 }
1157 continue;
1158 }
1159
1160 let decompressed = utils::decompress_zstd(reassembled)
1161 .map_err(|err| {
1162 (
1163 FirehoseError::NodeDecodingError(
1164 item_index,
1165 err,
1166 ),
1167 error_slot,
1168 )
1169 })?;
1170
1171 let decoded =
1172 prost_011::Message::decode(decompressed.as_slice())
1173 .map_err(|err| {
1174 (
1175 FirehoseError::NodeDecodingError(
1176 item_index,
1177 Box::new(err),
1178 ),
1179 error_slot,
1180 )
1181 })?;
1182 let keyed_rewards = convert_proto_rewards(&decoded)
1183 .map_err(|err| {
1184 (
1185 FirehoseError::NodeDecodingError(item_index, err),
1186 error_slot,
1187 )
1188 })?;
1189 if reward_enabled
1190 && let Some(on_reward_cb) = on_reward.as_ref()
1191 {
1192 on_reward_cb(
1193 thread_index,
1194 RewardsData {
1195 slot: block.slot,
1196 rewards: keyed_rewards.clone(),
1197 },
1198 )
1199 .await
1200 .map_err(|e| {
1201 (
1202 FirehoseError::RewardHandlerError(e),
1203 error_slot,
1204 )
1205 })?;
1206 }
1207 this_block_rewards = keyed_rewards;
1208 if let Some(ref mut stats) = thread_stats {
1209 stats.rewards_processed +=
1210 this_block_rewards.len() as u64;
1211 }
1212 }
1213 }
1214 DataFrame(_data_frame) => (),
1215 }
1216 }
1217 if block.slot == slot_range.end - 1 {
1218 let finish_time = std::time::Instant::now();
1219 let elapsed = finish_time.duration_since(start_time);
1220 log::info!(target: &log_target, "processed slot {}", block.slot);
1221 let elapsed_pretty = human_readable_duration(elapsed);
1222 log::info!(
1223 target: &log_target,
1224 "processed {} slots across {} epochs in {}.",
1225 slot_range.end - slot_range.start,
1226 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1227 elapsed_pretty
1228 );
1229 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1230 let summary: String = error_counts
1233 .iter()
1234 .enumerate()
1235 .filter_map(|(i, c)| {
1236 let v = c.load(Ordering::Relaxed);
1237 if v > 0 {
1238 Some(format!("{:03}({})", i, v))
1239 } else {
1240 None
1241 }
1242 })
1243 .collect::<Vec<_>>()
1244 .join(", ");
1245 if !summary.is_empty() {
1246 log::debug!(target: &log_target, "threads with errors: {}", summary);
1247 }
1248 return Ok(());
1249 }
1250 }
1251 if tracking_enabled
1252 && let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1253 && last_counted_slot < expected_last_slot {
1254 let flush_start = last_counted_slot.saturating_add(1);
1255 if block_enabled
1256 && let Some(on_block_cb) = on_block.as_ref() {
1257 let error_slot = current_slot.unwrap_or(slot_range.start);
1258 for skipped_slot in flush_start..=expected_last_slot {
1259 log::debug!(
1260 target: &log_target,
1261 "leader skipped slot {} during final flush (prev_counted {})",
1262 skipped_slot,
1263 last_counted_slot,
1264 );
1265 on_block_cb(
1266 thread_index,
1267 BlockData::LeaderSkipped { slot: skipped_slot },
1268 )
1269 .await
1270 .map_err(|e| FirehoseError::BlockHandlerError(e))
1271 .map_err(|e| (e, error_slot))?;
1272 }
1273 }
1274 let missing_slots = expected_last_slot.saturating_sub(last_counted_slot);
1275 if let Some(stats_ref) = thread_stats.as_mut() {
1276 stats_ref.leader_skipped_slots += missing_slots;
1277 stats_ref.slots_processed += missing_slots;
1278 stats_ref.current_slot = expected_last_slot;
1279 }
1280 overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
1281 slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
1282 last_counted_slot = expected_last_slot;
1283 }
1284 if let Some(ref mut stats) = thread_stats {
1285 stats.finish_time = Some(std::time::Instant::now());
1286 maybe_emit_stats(
1287 stats_tracking.as_ref(),
1288 thread_index,
1289 stats,
1290 &overall_slots_processed,
1291 &overall_blocks_processed,
1292 &overall_transactions_processed,
1293 &overall_entries_processed,
1294 &transactions_since_stats,
1295 &blocks_since_stats,
1296 &slots_since_stats,
1297 &last_pulse,
1298 start_time,
1299 )
1300 .await?;
1301 }
1302 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1303 }
1304 Ok(())
1305 }
1306 .await
1307 {
1308 if is_shutdown_error(&err) {
1309 log::info!(
1310 target: &log_target,
1311 "shutdown requested; terminating firehose thread {}",
1312 thread_index
1313 );
1314 break;
1315 }
1316 log::error!(
1317 target: &log_target,
1318 "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1319 slot,
1320 slot_to_epoch(slot)
1321 );
1322 log::error!(target: &log_target, "{}", err);
1323 let item_index = match err {
1324 FirehoseError::NodeDecodingError(item_index, _) => item_index,
1325 _ => 0,
1326 };
1327 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1329 log::warn!(
1330 target: &log_target,
1331 "restarting from slot {} at index {}",
1332 slot,
1333 item_index,
1334 );
1335 slot_range.start = slot;
1337 skip_until_index = Some(item_index);
1338 }
1339 });
1340 handles.push(handle);
1341 }
1342
1343 for handle in handles {
1345 handle.await.unwrap();
1346 }
1347 if stats_tracking.is_some() {
1348 let elapsed = firehose_start.elapsed();
1349 let elapsed_secs = elapsed.as_secs_f64();
1350 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1351 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1352 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1353 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1354 let total_errors: u64 = error_counts
1355 .iter()
1356 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1357 .sum();
1358 let overall_tps = if elapsed_secs > 0.0 {
1359 total_transactions as f64 / elapsed_secs
1360 } else {
1361 0.0
1362 };
1363 log::info!(
1364 target: LOG_MODULE,
1365 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1366 elapsed_secs,
1367 total_slots,
1368 total_blocks,
1369 total_leader_skipped,
1370 total_transactions,
1371 overall_tps,
1372 total_errors
1373 );
1374 }
1375 if shutdown_flag.load(Ordering::SeqCst) {
1376 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1377 } else {
1378 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1379 }
1380 Ok(())
1381}
1382
1383#[allow(clippy::result_large_err)]
1384pub fn firehose_geyser(
1389 rt: Arc<tokio::runtime::Runtime>,
1390 slot_range: Range<u64>,
1391 geyser_config_files: Option<&[PathBuf]>,
1392 index_base_url: &Url,
1393 client: &Client,
1394 on_load: impl Future<Output = Result<(), Box<dyn std::error::Error + Send + 'static>>>
1395 + Send
1396 + 'static,
1397 threads: u64,
1398) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1399 if threads == 0 {
1400 return Err((
1401 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1402 slot_range.start,
1403 ));
1404 }
1405 log::info!(target: LOG_MODULE, "starting firehose...");
1406 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1407 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1408 let mut entry_notifier_maybe = None;
1409 let mut block_meta_notifier_maybe = None;
1410 let mut transaction_notifier_maybe = None;
1411 if let Some(geyser_config_files) = geyser_config_files {
1412 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1413
1414 let service =
1415 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1416 confirmed_bank_receiver.clone(),
1417 true,
1418 geyser_config_files,
1419 )
1420 .map_err(|e| (e.into(), slot_range.start))?;
1421
1422 transaction_notifier_maybe = Some(
1423 service
1424 .get_transaction_notifier()
1425 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1426 .map_err(|e| (e, slot_range.start))?,
1427 );
1428
1429 entry_notifier_maybe = service.get_entry_notifier();
1430 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1431
1432 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1433 }
1434
1435 if entry_notifier_maybe.is_some() {
1436 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1437 } else {
1438 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1439 }
1440 log::info!(target: LOG_MODULE, "running on_load...");
1441 rt.spawn(on_load);
1442
1443 let slot_range = Arc::new(slot_range);
1444 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1445 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1446 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1447 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1448
1449 let subranges = generate_subranges(&slot_range, threads);
1451 if threads > 1 {
1452 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1453 }
1454
1455 let mut handles = Vec::new();
1456 let error_counts: Arc<Vec<AtomicU32>> =
1458 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1459
1460 for (i, slot_range) in subranges.into_iter().enumerate() {
1461 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1462 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1463 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1464 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1465 let client = client.clone();
1466 let error_counts = error_counts.clone();
1467
1468 let rt_clone = rt.clone();
1469
1470 let handle = std::thread::spawn(move || {
1471 rt_clone.block_on(async {
1472 firehose_geyser_thread(
1473 slot_range,
1474 transaction_notifier_maybe,
1475 entry_notifier_maybe,
1476 block_meta_notifier_maybe,
1477 confirmed_bank_sender,
1478 &client,
1479 if threads > 1 { Some(i) } else { None },
1480 error_counts,
1481 )
1482 .await
1483 .unwrap();
1484 });
1485 });
1486 handles.push(handle);
1487 }
1488
1489 for handle in handles {
1491 handle.join().unwrap();
1492 }
1493 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1494 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1495 block_meta_notifier.notify_block_metadata(
1496 u64::MAX,
1497 "unload",
1498 u64::MAX,
1499 "unload",
1500 &KeyedRewardsAndNumPartitions {
1501 keyed_rewards: vec![],
1502 num_partitions: None,
1503 },
1504 None,
1505 None,
1506 0,
1507 0,
1508 );
1509 }
1510 Ok(confirmed_bank_receiver)
1511}
1512
1513#[allow(clippy::too_many_arguments)]
1514#[allow(clippy::result_large_err)]
1515async fn firehose_geyser_thread(
1516 mut slot_range: Range<u64>,
1517 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1518 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1519 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1520 confirmed_bank_sender: Sender<SlotNotification>,
1521 client: &Client,
1522 thread_index: Option<usize>,
1523 error_counts: Arc<Vec<AtomicU32>>,
1524) -> Result<(), (FirehoseError, u64)> {
1525 let start_time = std::time::Instant::now();
1526 let log_target = if let Some(thread_index) = thread_index {
1527 format!("{}::T{:03}", LOG_MODULE, thread_index)
1528 } else {
1529 LOG_MODULE.to_string()
1530 };
1531 let mut skip_until_index = None;
1532 while let Err((err, slot)) = async {
1534 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1535 log::info!(
1536 target: &log_target,
1537 "slot range: {} (epoch {}) ... {} (epoch {})",
1538 slot_range.start,
1539 slot_to_epoch(slot_range.start),
1540 slot_range.end,
1541 slot_to_epoch(slot_range.end)
1542 );
1543
1544 log::info!(target: &log_target, "🚒 starting firehose...");
1545
1546 let mut current_slot: Option<u64> = None;
1548 let mut previous_slot: Option<u64> = None;
1549 for epoch_num in epoch_range.clone() {
1550 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1551 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1552 Ok(stream) => stream,
1553 Err(_) => {
1554 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1555 }
1556 };
1557 let mut reader = NodeReader::new(stream);
1558
1559 let header_fut = reader.read_raw_header();
1560 let header = match timeout(OP_TIMEOUT, header_fut).await {
1561 Ok(res) => res
1562 .map_err(FirehoseError::ReadHeader)
1563 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1564 Err(_) => {
1565 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1566 }
1567 };
1568 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1569
1570 let mut todo_previous_blockhash = Hash::default();
1571 let mut todo_latest_entry_blockhash = Hash::default();
1572
1573 if slot_range.start > epoch_to_slot_range(epoch_num).0 {
1574 let seek_slot = slot_range.start.saturating_sub(1);
1578 let seek_fut = reader.seek_to_slot(seek_slot);
1579 match timeout(OP_TIMEOUT, seek_fut).await {
1580 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1581 Err(_) => {
1582 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1583 }
1584 }
1585 }
1586
1587 let mut item_index = 0;
1589 let mut displayed_skip_message = false;
1590 loop {
1591 let read_fut = reader.read_until_block();
1592 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1593 Ok(result) => result
1594 .map_err(FirehoseError::ReadUntilBlockError)
1595 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1596 Err(_) => {
1597 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1598 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.unwrap_or(slot_range.start)));
1599 }
1600 };
1601 if nodes.is_empty() {
1602 log::info!(
1603 target: &log_target,
1604 "reached end of epoch {}",
1605 epoch_num
1606 );
1607 break;
1608 }
1609 if let Some(last_node) = nodes.0.last()
1618 && !last_node.get_node().is_block() {
1619 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1620 break;
1621 }
1622 let block = nodes
1623 .get_block()
1624 .map_err(FirehoseError::GetBlockError)
1625 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1626 log::debug!(
1627 target: &log_target,
1628 "read {} items from epoch {}, now at slot {}",
1629 item_index,
1630 epoch_num,
1631 block.slot
1632 );
1633 let slot = block.slot;
1634 if slot >= slot_range.end {
1635 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1636 return Ok(());
1640 }
1641 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1642 if slot < slot_range.start {
1643 if slot.saturating_add(1) == slot_range.start {
1644 log::debug!(
1645 target: &log_target,
1646 "priming reader with preceding slot {}, skipping",
1647 slot
1648 );
1649 } else {
1650 log::warn!(
1651 target: &log_target,
1652 "encountered slot {} before start of range {}, skipping",
1653 slot,
1654 slot_range.start
1655 );
1656 }
1657 continue;
1658 }
1659 if let Some(previous_slot) = previous_slot
1660 && slot != previous_slot + 1 {
1661 }
1664 previous_slot = current_slot;
1665 current_slot = Some(slot);
1666 let mut entry_index: usize = 0;
1667 let mut this_block_executed_transaction_count: u64 = 0;
1668 let mut this_block_entry_count: u64 = 0;
1669 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1670
1671 nodes.each(|node_with_cid| -> Result<(), Box<dyn std::error::Error>> {
1672 item_index += 1;
1673 if let Some(skip) = skip_until_index {
1679 if item_index < skip {
1680 if !displayed_skip_message {
1681 log::info!(
1682 target: &log_target,
1683 "skipping until index {} (at {})",
1684 skip,
1685 item_index
1686 );
1687 displayed_skip_message = true;
1688 }
1689 return Ok(());
1690 } else {
1691 log::info!(
1692 target: &log_target,
1693 "reached target index {}, resuming...",
1694 skip
1695 );
1696 skip_until_index = None;
1697 }
1698 }
1699 let node = node_with_cid.get_node();
1700
1701 use crate::node::Node::*;
1702 match node {
1703 Transaction(tx) => {
1704 let versioned_tx = tx.as_parsed()?;
1705 let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1706
1707 let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1708
1709 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1710 prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1711 Box::new(std::io::Error::other(
1712 std::format!("Error decoding metadata: {:?}", err),
1713 ))
1714 })?;
1715
1716 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1717 metadata.try_into()?;
1718
1719 let message_hash = {
1720 #[cfg(feature = "verify-transaction-signatures")]
1721 {
1722 versioned_tx.verify_and_hash_message()?
1723 }
1724 #[cfg(not(feature = "verify-transaction-signatures"))]
1725 {
1726 versioned_tx.message.hash()
1729 }
1730 };
1731 let signature = versioned_tx
1732 .signatures
1733 .first()
1734 .ok_or_else(|| {
1735 Box::new(std::io::Error::new(
1736 std::io::ErrorKind::InvalidData,
1737 "transaction missing signature",
1738 )) as Box<dyn std::error::Error>
1739 })?;
1740 let is_vote = is_simple_vote_transaction(&versioned_tx);
1741
1742 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1743 transaction_notifier.notify_transaction(
1744 block.slot,
1745 tx.index.unwrap() as usize,
1746 signature,
1747 &message_hash,
1748 is_vote,
1749 &as_native_metadata,
1750 &versioned_tx,
1751 );
1752 }
1753
1754 }
1755 Entry(entry) => {
1756 let entry_hash = Hash::from(entry.hash.to_bytes());
1757 let entry_transaction_count = entry.transactions.len();
1758 let entry_transaction_count_u64 = entry_transaction_count as u64;
1759 let starting_transaction_index =
1760 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1761 Box::new(std::io::Error::other(
1762 "transaction index exceeds usize range",
1763 )) as Box<dyn std::error::Error>
1764 })?;
1765 todo_latest_entry_blockhash = entry_hash;
1766 this_block_executed_transaction_count += entry_transaction_count_u64;
1767 this_block_entry_count += 1;
1768 if entry_notifier_maybe.is_none() {
1769 return Ok(());
1770 }
1771 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1772 let entry_summary = solana_entry::entry::EntrySummary {
1773 num_hashes: entry.num_hashes,
1774 hash: Hash::from(entry.hash.to_bytes()),
1775 num_transactions: entry_transaction_count_u64,
1776 };
1777 entry_notifier.notify_entry(
1778 block.slot,
1779 entry_index,
1780 &entry_summary,
1781 starting_transaction_index,
1782 );
1783 entry_index += 1;
1784 }
1785 Block(block) => {
1786 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1787 confirmed_bank_sender.send(notification).unwrap();
1788
1789 if block_meta_notifier_maybe.is_none() {
1790 return Ok(());
1791 }
1792 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1793 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1794 block_meta_notifier.notify_block_metadata(
1795 block.meta.parent_slot,
1796 todo_previous_blockhash.to_string().as_str(),
1797 block.slot,
1798 todo_latest_entry_blockhash.to_string().as_str(),
1799 &KeyedRewardsAndNumPartitions {
1800 keyed_rewards,
1801 num_partitions: None,
1802 },
1803 Some(block.meta.blocktime as i64),
1804 block.meta.block_height,
1805 this_block_executed_transaction_count,
1806 this_block_entry_count,
1807 );
1808 todo_previous_blockhash = todo_latest_entry_blockhash;
1809 std::thread::yield_now();
1810 }
1811 Subset(_subset) => (),
1812 Epoch(_epoch) => (),
1813 Rewards(rewards) => {
1814 if !rewards.is_complete() {
1815 let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1816 let decompressed = utils::decompress_zstd(reassembled)?;
1817 let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1818 Box::new(std::io::Error::other(
1819 std::format!("Error decoding rewards: {:?}", err),
1820 ))
1821 })?;
1822 this_block_rewards = convert_proto_rewards(&decoded)?;
1823 }
1824 }
1825 DataFrame(_data_frame) => (),
1826 }
1827 Ok(())
1828 })
1829 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1830 if block.slot == slot_range.end - 1 {
1831 let finish_time = std::time::Instant::now();
1832 let elapsed = finish_time.duration_since(start_time);
1833 log::info!(target: &log_target, "processed slot {}", block.slot);
1834 let elapsed_pretty = human_readable_duration(elapsed);
1835 log::info!(
1836 target: &log_target,
1837 "processed {} slots across {} epochs in {}.",
1838 slot_range.end - slot_range.start,
1839 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1840 elapsed_pretty
1841 );
1842 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1843 let summary: String = error_counts
1846 .iter()
1847 .enumerate()
1848 .filter_map(|(i, c)| {
1849 let v = c.load(Ordering::Relaxed);
1850 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1851 })
1852 .collect::<Vec<_>>()
1853 .join(", ");
1854 if !summary.is_empty() {
1855 log::debug!(target: &log_target, "threads with errors: {}", summary);
1856 }
1857 return Ok(());
1858 }
1859 }
1860 }
1861 Ok(())
1862}
1863.await
1864{
1865 if is_shutdown_error(&err) {
1866 log::info!(
1867 target: &log_target,
1868 "shutdown requested; terminating firehose thread {:?}",
1869 thread_index
1870 );
1871 return Ok(());
1872 }
1873 log::error!(
1874 target: &log_target,
1875 "🔥🔥🔥 firehose encountered an error at slot {} in epoch {}:",
1876 slot,
1877 slot_to_epoch(slot)
1878 );
1879 log::error!(target: &log_target, "{}", err);
1880 let item_index = match err {
1881 FirehoseError::NodeDecodingError(item_index, _) => item_index,
1882 _ => 0,
1883 };
1884 let idx = thread_index.unwrap_or(0);
1886 error_counts[idx].fetch_add(1, Ordering::Relaxed);
1887 log::warn!(
1888 target: &log_target,
1889 "restarting from slot {} at index {}",
1890 slot,
1891 item_index,
1892 );
1893 slot_range.start = slot;
1895 skip_until_index = Some(item_index);
1896 }
1897 Ok(())
1898}
1899
1900#[inline]
1901fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
1902 if !(1..=2).contains(&versioned_tx.signatures.len()) {
1903 return false;
1904 }
1905
1906 if !matches!(
1907 versioned_tx.version(),
1908 solana_transaction::versioned::TransactionVersion::Legacy(_)
1909 ) {
1910 return false;
1911 }
1912
1913 let instructions = versioned_tx.message.instructions();
1914 if instructions.len() != 1 {
1915 return false;
1916 }
1917
1918 let program_index = instructions[0].program_id_index as usize;
1919 versioned_tx
1920 .message
1921 .static_account_keys()
1922 .get(program_index)
1923 .map(|program_id| program_id == &vote_program_id())
1924 .unwrap_or(false)
1925}
1926
1927#[inline(always)]
1928fn convert_proto_rewards(
1929 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
1930) -> Result<Vec<(Address, RewardInfo)>, Box<dyn std::error::Error>> {
1931 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
1932 for proto_reward in proto_rewards.rewards.iter() {
1933 let reward = RewardInfo {
1934 reward_type: match proto_reward.reward_type - 1 {
1935 0 => RewardType::Fee,
1936 1 => RewardType::Rent,
1937 2 => RewardType::Staking,
1938 3 => RewardType::Voting,
1939 typ => {
1940 return Err(Box::new(std::io::Error::other(format!(
1941 "unsupported reward type {}",
1942 typ
1943 ))));
1944 }
1945 },
1946 lamports: proto_reward.lamports,
1947 post_balance: proto_reward.post_balance,
1948 commission: proto_reward.commission.parse::<u8>().ok(),
1949 };
1950 let pubkey = proto_reward
1951 .pubkey
1952 .parse::<Address>()
1953 .map_err(|err| Box::new(err) as Box<dyn std::error::Error>)?;
1954 keyed_rewards.push((pubkey, reward));
1955 }
1956 Ok(keyed_rewards)
1957}
1958
1959#[inline]
1960pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
1962 let total = slot_range.end - slot_range.start;
1963 let slots_per_thread = total / threads;
1964 let remainder = total % threads;
1965
1966 let ranges: Vec<Range<u64>> = (0..threads)
1967 .map(|i| {
1968 let extra_slot = if i < remainder { 1 } else { 0 };
1970 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
1971 let end = start + slots_per_thread + extra_slot;
1972 start..end
1973 })
1974 .collect();
1975
1976 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1978 assert_eq!(
1979 total_covered, total,
1980 "Range generation failed: {} threads should cover {} slots but only cover {}",
1981 threads, total, total_covered
1982 );
1983
1984 for i in 1..ranges.len() {
1986 assert_eq!(
1987 ranges[i - 1].end,
1988 ranges[i].start,
1989 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
1990 i - 1,
1991 ranges[i - 1].end,
1992 i,
1993 ranges[i].start
1994 );
1995 }
1996
1997 log::info!(
1998 target: LOG_MODULE,
1999 "Generated {} thread ranges covering {} slots total",
2000 threads,
2001 total_covered
2002 );
2003 ranges
2004}
2005
2006fn human_readable_duration(duration: std::time::Duration) -> String {
2007 if duration.is_zero() {
2008 return "0s".into();
2009 }
2010 let total_secs = duration.as_secs();
2011 if total_secs < 60 {
2012 let secs_f = duration.as_secs_f64();
2013 if total_secs == 0 {
2014 format!("{:.2}s", secs_f)
2015 } else if duration.subsec_millis() == 0 {
2016 format!("{}s", total_secs)
2017 } else {
2018 format!("{:.2}s", secs_f)
2019 }
2020 } else {
2021 let mut secs = total_secs;
2022 let days = secs / 86_400;
2023 secs %= 86_400;
2024 let hours = secs / 3_600;
2025 secs %= 3_600;
2026 let minutes = secs / 60;
2027 secs %= 60;
2028 if days > 0 {
2029 if hours > 0 {
2030 format!("{days}d{hours}h")
2031 } else {
2032 format!("{days}d")
2033 }
2034 } else if hours > 0 {
2035 if minutes > 0 {
2036 format!("{hours}h{minutes}m")
2037 } else {
2038 format!("{hours}h")
2039 }
2040 } else if minutes > 0 {
2041 if secs > 0 {
2042 format!("{minutes}m{secs}s")
2043 } else {
2044 format!("{minutes}m")
2045 }
2046 } else {
2047 format!("{secs}s")
2048 }
2049 }
2050}
2051
2052#[cfg(test)]
2053fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2054 Box::pin(async move {
2055 let elapsed = stats.start_time.elapsed();
2056 let elapsed_secs = elapsed.as_secs_f64();
2057 let tps = if elapsed_secs > 0.0 {
2058 stats.transactions_processed as f64 / elapsed_secs
2059 } else {
2060 0.0
2061 };
2062 log::info!(
2063 target: LOG_MODULE,
2064 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2065 stats.thread_stats.current_slot,
2066 stats.slots_processed,
2067 stats.blocks_processed,
2068 stats.transactions_processed,
2069 stats.entries_processed,
2070 stats.rewards_processed,
2071 elapsed_secs,
2072 tps
2073 );
2074 Ok(())
2075 })
2076}
2077
2078#[tokio::test(flavor = "multi_thread")]
2079async fn test_firehose_epoch_800() {
2080 use std::sync::atomic::{AtomicU64, Ordering};
2081 solana_logger::setup_with_default("info");
2082 const THREADS: usize = 4;
2083 const NUM_SLOTS_TO_COVER: u64 = 50;
2084 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2085 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2086 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2087 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2088 let stats_tracking = StatsTracking {
2089 on_stats: log_stats_handler,
2090 tracking_interval_slots: 10,
2091 };
2092
2093 for prev in PREV_BLOCK.iter() {
2094 prev.store(0, Ordering::Relaxed);
2095 }
2096 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2097 NUM_BLOCKS.store(0, Ordering::Relaxed);
2098 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2099
2100 firehose(
2101 THREADS.try_into().unwrap(),
2102 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2103 Some(|thread_id: usize, block: BlockData| {
2104 async move {
2105 let prev =
2106 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2107 if block.was_skipped() {
2108 log::info!(
2109 target: LOG_MODULE,
2110 "leader skipped block {} on thread {}",
2111 block.slot(),
2112 thread_id,
2113 );
2114 } else {
2115 }
2122
2123 if prev > 0 {
2124 assert_eq!(prev + 1, block.slot());
2125 }
2126 if block.was_skipped() {
2127 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2128 } else {
2129 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2130 if let BlockData::Block {
2131 executed_transaction_count,
2132 ..
2133 } = &block
2134 {
2135 let executed = *executed_transaction_count;
2136 let _ = MIN_TRANSACTIONS.fetch_update(
2137 Ordering::Relaxed,
2138 Ordering::Relaxed,
2139 |current| {
2140 if executed < current {
2141 Some(executed)
2142 } else {
2143 None
2144 }
2145 },
2146 );
2147 }
2148 }
2149 Ok(())
2150 }
2151 .boxed()
2152 }),
2153 None::<OnTxFn>,
2154 None::<OnEntryFn>,
2155 None::<OnRewardFn>,
2156 Some(stats_tracking),
2157 None,
2158 )
2159 .await
2160 .unwrap();
2161 assert_eq!(
2162 NUM_BLOCKS.load(Ordering::Relaxed) + NUM_SKIPPED_BLOCKS.load(Ordering::Relaxed),
2163 NUM_SLOTS_TO_COVER
2164 );
2165 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2166 let min_transactions = MIN_TRANSACTIONS.load(Ordering::Relaxed);
2167 assert!(
2168 min_transactions >= 10,
2169 "expected at least 10 transactions in every block, minimum observed {min_transactions}"
2170 );
2171}