1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3#[cfg(test)]
4extern crate serial_test;
5#[cfg(test)]
6use futures_util::FutureExt;
7use futures_util::future::BoxFuture;
8use reqwest::{Client, Url};
9#[cfg(test)]
10use serial_test::serial;
11use solana_address::Address;
12use solana_geyser_plugin_manager::{
13 block_metadata_notifier_interface::BlockMetadataNotifier,
14 geyser_plugin_service::GeyserPluginServiceError,
15};
16use solana_hash::Hash;
17use solana_ledger::entry_notifier_interface::EntryNotifier;
18use solana_reward_info::RewardInfo;
19use solana_rpc::{
20 optimistically_confirmed_bank_tracker::SlotNotification,
21 transaction_notifier_interface::TransactionNotifier,
22};
23use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
24use solana_sdk_ids::vote::id as vote_program_id;
25use solana_transaction::versioned::VersionedTransaction;
26#[cfg(test)]
27use std::sync::{Mutex, OnceLock};
28use std::{
29 fmt::Display,
30 future::Future,
31 io,
32 ops::Range,
33 path::PathBuf,
34 sync::{
35 Arc,
36 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
37 },
38};
39
40use thiserror::Error;
41use tokio::{
42 sync::broadcast::{self, error::TryRecvError},
43 time::timeout,
44};
45
46use crate::{
47 LOG_MODULE, SharedError,
48 epochs::{epoch_to_slot_range, fetch_epoch_stream, slot_to_epoch},
49 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError},
50 node_reader::NodeReader,
51 utils,
52};
53
54const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
58
59fn poll_shutdown(
60 flag: &Arc<std::sync::atomic::AtomicBool>,
61 receiver: &mut Option<broadcast::Receiver<()>>,
62) -> bool {
63 if let Some(rx) = receiver {
64 match rx.try_recv() {
65 Ok(_) | Err(TryRecvError::Lagged(_)) => {
66 flag.store(true, Ordering::SeqCst);
67 }
68 Err(TryRecvError::Closed) => {
69 flag.store(true, Ordering::SeqCst);
70 }
71 Err(TryRecvError::Empty) => {}
72 }
73 }
74 flag.load(Ordering::SeqCst)
75}
76
77fn is_shutdown_error(err: &FirehoseError) -> bool {
78 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
79 inner
80 .downcast_ref::<io::Error>()
81 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
82 .unwrap_or(false)
83 }
84
85 match err {
86 FirehoseError::BlockHandlerError(inner)
87 | FirehoseError::TransactionHandlerError(inner)
88 | FirehoseError::EntryHandlerError(inner)
89 | FirehoseError::RewardHandlerError(inner)
90 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
91 _ => false,
92 }
93}
94
95#[derive(Debug, Error)]
98pub enum FirehoseError {
99 Reqwest(reqwest::Error),
101 ReadHeader(SharedError),
103 GeyserPluginService(GeyserPluginServiceError),
105 FailedToGetTransactionNotifier,
107 ReadUntilBlockError(SharedError),
109 GetBlockError(SharedError),
111 NodeDecodingError(usize, SharedError),
113 SlotOffsetIndexError(SlotOffsetIndexError),
115 SeekToSlotError(SharedError),
117 OnLoadError(SharedError),
119 OnStatsHandlerError(SharedError),
121 OperationTimeout(&'static str),
123 TransactionHandlerError(SharedError),
125 EntryHandlerError(SharedError),
127 RewardHandlerError(SharedError),
129 BlockHandlerError(SharedError),
131}
132
133unsafe impl Send for FirehoseError {}
134unsafe impl Sync for FirehoseError {}
135
136impl Display for FirehoseError {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 match self {
139 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
140 FirehoseError::ReadHeader(error) => {
141 write!(f, "Error reading header: {}", error)
142 }
143 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
144 f,
145 "Error initializing geyser plugin service: {}",
146 geyser_plugin_service_error
147 ),
148 FirehoseError::FailedToGetTransactionNotifier => write!(
149 f,
150 "Failed to get transaction notifier from GeyserPluginService"
151 ),
152 FirehoseError::ReadUntilBlockError(error) => {
153 write!(f, "Error reading until block: {}", error)
154 }
155 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
156 FirehoseError::NodeDecodingError(item_index, error) => {
157 write!(
158 f,
159 "Error seeking, reading data from, or decoding data for data node {}: {}",
160 item_index, error
161 )
162 }
163 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
164 f,
165 "Error getting info from slot offset index: {}",
166 slot_offset_index_error
167 ),
168 FirehoseError::SeekToSlotError(error) => {
169 write!(f, "Error seeking to slot: {}", error)
170 }
171 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
172 FirehoseError::OnStatsHandlerError(error) => {
173 write!(f, "Stats handler error: {}", error)
174 }
175 FirehoseError::OperationTimeout(op) => {
176 write!(f, "Timeout while waiting for operation: {}", op)
177 }
178 FirehoseError::TransactionHandlerError(error) => {
179 write!(f, "Transaction handler error: {}", error)
180 }
181 FirehoseError::EntryHandlerError(error) => {
182 write!(f, "Entry handler error: {}", error)
183 }
184 FirehoseError::RewardHandlerError(error) => {
185 write!(f, "Reward handler error: {}", error)
186 }
187 FirehoseError::BlockHandlerError(error) => {
188 write!(f, "Block handler error: {}", error)
189 }
190 }
191 }
192}
193
194impl From<reqwest::Error> for FirehoseError {
195 fn from(e: reqwest::Error) -> Self {
196 FirehoseError::Reqwest(e)
197 }
198}
199
200impl From<GeyserPluginServiceError> for FirehoseError {
201 fn from(e: GeyserPluginServiceError) -> Self {
202 FirehoseError::GeyserPluginService(e)
203 }
204}
205
206impl From<SlotOffsetIndexError> for FirehoseError {
207 fn from(e: SlotOffsetIndexError) -> Self {
208 FirehoseError::SlotOffsetIndexError(e)
209 }
210}
211
212#[derive(Clone, PartialEq, Eq, Hash, Debug)]
214pub struct ThreadStats {
215 pub thread_id: usize,
217 pub start_time: std::time::Instant,
219 pub finish_time: Option<std::time::Instant>,
221 pub slot_range: Range<u64>,
223 pub initial_slot_range: Range<u64>,
225 pub current_slot: u64,
227 pub slots_processed: u64,
229 pub blocks_processed: u64,
231 pub leader_skipped_slots: u64,
233 pub transactions_processed: u64,
235 pub entries_processed: u64,
237 pub rewards_processed: u64,
239}
240
241#[derive(Clone, PartialEq, Eq, Hash, Debug)]
243pub struct Stats {
244 pub thread_stats: ThreadStats,
246 pub start_time: std::time::Instant,
248 pub finish_time: Option<std::time::Instant>,
250 pub slot_range: Range<u64>,
252 pub slots_processed: u64,
254 pub blocks_processed: u64,
256 pub leader_skipped_slots: u64,
258 pub transactions_processed: u64,
260 pub entries_processed: u64,
262 pub rewards_processed: u64,
264 pub transactions_since_last_pulse: u64,
266 pub blocks_since_last_pulse: u64,
268 pub slots_since_last_pulse: u64,
270 pub time_since_last_pulse: std::time::Duration,
272}
273
274#[derive(Clone, PartialEq, Eq, Hash, Debug)]
276pub struct StatsTracking<OnStats: Handler<Stats>> {
277 pub on_stats: OnStats,
279 pub tracking_interval_slots: u64,
281}
282
283#[inline(always)]
284#[allow(clippy::too_many_arguments)]
285async fn maybe_emit_stats<OnStats: Handler<Stats>>(
286 stats_tracking: Option<&StatsTracking<OnStats>>,
287 thread_index: usize,
288 thread_stats: &ThreadStats,
289 overall_slots_processed: &AtomicU64,
290 overall_blocks_processed: &AtomicU64,
291 overall_transactions_processed: &AtomicU64,
292 overall_entries_processed: &AtomicU64,
293 transactions_since_stats: &AtomicU64,
294 blocks_since_stats: &AtomicU64,
295 slots_since_stats: &AtomicU64,
296 last_pulse: &Arc<AtomicU64>,
297 base_instant: std::time::Instant,
298) -> Result<(), (FirehoseError, u64)> {
299 if let Some(stats_tracker) = stats_tracking {
300 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
301 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
302 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
303 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
304 let now_nanos = base_instant.elapsed().as_nanos() as u64;
305 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
306 let delta_nanos = now_nanos.saturating_sub(previous);
307 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
308 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
309 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
310 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
311
312 let stats = Stats {
313 thread_stats: thread_stats.clone(),
314 start_time: thread_stats.start_time,
315 finish_time: thread_stats.finish_time,
316 slot_range: thread_stats.slot_range.clone(),
317 slots_processed: total_slots,
318 blocks_processed: total_blocks,
319 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
320 transactions_processed: total_transactions,
321 entries_processed: total_entries,
322 rewards_processed: thread_stats.rewards_processed,
323 transactions_since_last_pulse: processed_transactions,
324 blocks_since_last_pulse: processed_blocks,
325 slots_since_last_pulse: processed_slots,
326 time_since_last_pulse,
327 };
328
329 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
330 last_pulse.store(previous, Ordering::Relaxed);
331 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
332 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
333 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
334 return Err((
335 FirehoseError::OnStatsHandlerError(e),
336 thread_stats.current_slot,
337 ));
338 }
339 }
340 Ok(())
341}
342
343#[inline(always)]
344fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
345 if tracking_enabled {
346 atomic.fetch_add(value, Ordering::Relaxed);
347 }
348}
349
350fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
351 map.get(&thread_id)
352 .map(|set| set.remove(&slot).is_some())
353 .unwrap_or(false)
354}
355
356#[derive(Debug, Clone)]
358pub struct TransactionData {
359 pub slot: u64,
361 pub transaction_slot_index: usize,
363 pub signature: solana_signature::Signature,
365 pub message_hash: Hash,
367 pub is_vote: bool,
369 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
371 pub transaction: VersionedTransaction,
373}
374
375#[derive(Debug, Clone)]
377pub struct EntryData {
378 pub slot: u64,
380 pub entry_index: usize,
382 pub transaction_indexes: Range<usize>,
384 pub num_hashes: u64,
386 pub hash: Hash,
388}
389
390#[derive(Debug, Clone)]
392pub struct RewardsData {
393 pub slot: u64,
395 pub rewards: Vec<(Address, RewardInfo)>,
397}
398
399#[derive(Debug)]
401pub enum BlockData {
402 Block {
404 parent_slot: u64,
406 parent_blockhash: Hash,
408 slot: u64,
410 blockhash: Hash,
412 rewards: KeyedRewardsAndNumPartitions,
414 block_time: Option<i64>,
416 block_height: Option<u64>,
418 executed_transaction_count: u64,
420 entry_count: u64,
422 },
423 PossibleLeaderSkipped {
426 slot: u64,
428 },
429}
430
431impl BlockData {
432 #[inline(always)]
434 pub const fn slot(&self) -> u64 {
435 match self {
436 BlockData::Block { slot, .. } => *slot,
437 BlockData::PossibleLeaderSkipped { slot } => *slot,
438 }
439 }
440
441 #[inline(always)]
443 pub const fn was_skipped(&self) -> bool {
444 matches!(self, BlockData::PossibleLeaderSkipped { .. })
445 }
446
447 #[inline(always)]
449 pub const fn block_time(&self) -> Option<i64> {
450 match self {
451 BlockData::Block { block_time, .. } => *block_time,
452 BlockData::PossibleLeaderSkipped { .. } => None,
453 }
454 }
455}
456
457type HandlerResult = Result<(), SharedError>;
458type HandlerFuture = BoxFuture<'static, HandlerResult>;
459
460pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
462
463impl<Data, F> Handler<Data> for F where
464 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
465{
466}
467
468pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
470pub type OnBlockFn = HandlerFn<BlockData>;
472pub type OnTxFn = HandlerFn<TransactionData>;
474pub type OnEntryFn = HandlerFn<EntryData>;
476pub type OnRewardFn = HandlerFn<RewardsData>;
478pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
480pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
482pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
484
485#[derive(Clone, Debug)]
487pub struct FirehoseErrorContext {
488 pub thread_id: usize,
490 pub slot: u64,
492 pub epoch: u64,
494 pub error_message: String,
496}
497
498#[inline]
503#[allow(clippy::too_many_arguments)]
504pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
505 threads: u64,
506 slot_range: Range<u64>,
507 on_block: Option<OnBlock>,
508 on_tx: Option<OnTransaction>,
509 on_entry: Option<OnEntry>,
510 on_rewards: Option<OnRewards>,
511 on_error: Option<OnError>,
512 stats_tracking: Option<StatsTracking<OnStats>>,
513 shutdown_signal: Option<broadcast::Receiver<()>>,
514) -> Result<(), (FirehoseError, u64)>
515where
516 OnBlock: Handler<BlockData>,
517 OnTransaction: Handler<TransactionData>,
518 OnEntry: Handler<EntryData>,
519 OnRewards: Handler<RewardsData>,
520 OnStats: Handler<Stats>,
521 OnError: Handler<FirehoseErrorContext>,
522{
523 if threads == 0 {
524 return Err((
525 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
526 slot_range.start,
527 ));
528 }
529 let client = Client::new();
530 log::info!(target: LOG_MODULE, "starting firehose...");
531 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
532
533 let slot_range = Arc::new(slot_range);
534
535 let subranges = generate_subranges(&slot_range, threads);
537 if threads > 1 {
538 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
539 }
540
541 let firehose_start = std::time::Instant::now();
542 let shutdown_flag = Arc::new(AtomicBool::new(false));
543 if let Some(ref rx) = shutdown_signal {
544 let mut rx = rx.resubscribe();
545 let flag = shutdown_flag.clone();
546 tokio::spawn(async move {
547 if rx.recv().await.is_ok() {
548 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
549 flag.store(true, Ordering::SeqCst);
550 }
551 });
552 }
553 let mut handles = Vec::new();
554 let error_counts: Arc<Vec<AtomicU32>> =
556 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
557
558 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
559 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
560 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
561 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
562 let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
563
564 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
565 let error_counts = error_counts.clone();
566 let client = client.clone();
567 let on_block = on_block.clone();
568 let on_tx = on_tx.clone();
569 let on_entry = on_entry.clone();
570 let on_reward = on_rewards.clone();
571 let on_error = on_error.clone();
572 let overall_slots_processed = overall_slots_processed.clone();
573 let overall_blocks_processed = overall_blocks_processed.clone();
574 let overall_transactions_processed = overall_transactions_processed.clone();
575 let overall_entries_processed = overall_entries_processed.clone();
576 let stats_tracking = stats_tracking.clone();
577 let transactions_since_stats = Arc::new(AtomicU64::new(0));
578 let blocks_since_stats = Arc::new(AtomicU64::new(0));
579 let slots_since_stats = Arc::new(AtomicU64::new(0));
580 let last_pulse = Arc::new(AtomicU64::new(0));
581 let transactions_since_stats_cloned = transactions_since_stats.clone();
582 let blocks_since_stats_cloned = blocks_since_stats.clone();
583 let slots_since_stats_cloned = slots_since_stats.clone();
584 let last_pulse_cloned = last_pulse.clone();
585 let shutdown_flag = shutdown_flag.clone();
586 let pending_skipped_slots = pending_skipped_slots.clone();
587 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
588
589 let handle = tokio::spawn(async move {
590 let transactions_since_stats = transactions_since_stats_cloned;
591 let blocks_since_stats = blocks_since_stats_cloned;
592 let slots_since_stats = slots_since_stats_cloned;
593 let last_pulse = last_pulse_cloned;
594 let mut shutdown_rx = thread_shutdown_rx;
595 let start_time = firehose_start;
596 last_pulse.store(
597 firehose_start.elapsed().as_nanos() as u64,
598 Ordering::Relaxed,
599 );
600 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
601 let mut skip_until_index = None;
602 let last_emitted_slot = slot_range.start.saturating_sub(1);
603 let block_enabled = on_block.is_some();
604 let tx_enabled = on_tx.is_some();
605 let entry_enabled = on_entry.is_some();
606 let reward_enabled = on_reward.is_some();
607 let tracking_enabled = stats_tracking.is_some();
608 if block_enabled {
609 pending_skipped_slots.entry(thread_index).or_default();
610 }
611 let mut last_counted_slot = slot_range.start.saturating_sub(1);
612 let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
613 let mut thread_stats = if tracking_enabled {
614 Some(ThreadStats {
615 thread_id: thread_index,
616 start_time,
617 finish_time: None,
618 slot_range: slot_range.clone(),
619 initial_slot_range: slot_range.clone(),
620 current_slot: slot_range.start,
621 slots_processed: 0,
622 blocks_processed: 0,
623 leader_skipped_slots: 0,
624 transactions_processed: 0,
625 entries_processed: 0,
626 rewards_processed: 0,
627 })
628 } else {
629 None
630 };
631
632 while let Err((err, slot)) = async {
634 let mut last_emitted_slot = last_emitted_slot_global;
635 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
636 log::info!(
637 target: &log_target,
638 "shutdown requested; terminating firehose thread {}",
639 thread_index
640 );
641 return Ok(());
642 }
643 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
644 log::info!(
645 target: &log_target,
646 "slot range: {} (epoch {}) ... {} (epoch {})",
647 slot_range.start,
648 slot_to_epoch(slot_range.start),
649 slot_range.end,
650 slot_to_epoch(slot_range.end)
651 );
652
653 log::info!(target: &log_target, "🚒 starting firehose...");
654
655 let mut current_slot: Option<u64> = None;
657 for epoch_num in epoch_range.clone() {
658 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
659 log::info!(
660 target: &log_target,
661 "shutdown requested; terminating firehose thread {}",
662 thread_index
663 );
664 return Ok(());
665 }
666 log::info!(target: &log_target, "entering epoch {}", epoch_num);
667 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, &client)).await {
668 Ok(stream) => stream,
669 Err(_) => {
670 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
671 }
672 };
673 let mut reader = NodeReader::new(stream);
674
675 let header_fut = reader.read_raw_header();
676 let header = match timeout(OP_TIMEOUT, header_fut).await {
677 Ok(res) => res
678 .map_err(FirehoseError::ReadHeader)
679 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
680 Err(_) => {
681 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
682 }
683 };
684 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
685
686 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
687 let local_start = std::cmp::max(slot_range.start, epoch_start);
688 let local_end_inclusive =
689 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
690 if local_start > local_end_inclusive {
691 log::debug!(
692 target: &log_target,
693 "epoch {} has no overlap with thread range ({}..{}), skipping",
694 epoch_num,
695 slot_range.start,
696 slot_range.end
697 );
698 continue;
699 }
700
701 let mut previous_blockhash = Hash::default();
702 let mut latest_entry_blockhash = Hash::default();
703 last_counted_slot = local_start.saturating_sub(1);
706 current_slot = None;
707 if tracking_enabled
708 && let Some(ref mut stats) = thread_stats {
709 stats.current_slot = local_start;
710 stats.slot_range.start = local_start;
711 }
712
713 if local_start > epoch_start {
714 let seek_slot = local_start.saturating_sub(1);
718 let seek_fut = reader.seek_to_slot(seek_slot);
719 match timeout(OP_TIMEOUT, seek_fut).await {
720 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
721 Err(_) => {
722 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
723 }
724 }
725 }
726
727 let mut item_index = 0;
729 let mut displayed_skip_message = false;
730 loop {
731 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
732 log::info!(
733 target: &log_target,
734 "shutdown requested; terminating firehose thread {}",
735 thread_index
736 );
737 return Ok(());
738 }
739 let read_fut = reader.read_until_block();
740 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
741 Ok(result) => result
742 .map_err(FirehoseError::ReadUntilBlockError)
743 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
744 Err(_) => {
745 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
746 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
747 }
748 };
749 if nodes.is_empty() {
750 log::info!(
751 target: &log_target,
752 "reached end of epoch {}",
753 epoch_num
754 );
755 break;
756 }
757 if let Some(last_node) = nodes.0.last()
758 && !last_node.get_node().is_block()
759 {
760 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
761 break;
762 }
763 let block = nodes
764 .get_block()
765 .map_err(FirehoseError::GetBlockError)
766 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
767 log::debug!(
768 target: &log_target,
769 "read {} items from epoch {}, now at slot {}",
770 item_index,
771 epoch_num,
772 block.slot
773 );
774 let slot = block.slot;
775 if slot > local_end_inclusive {
776 log::debug!(
777 target: &log_target,
778 "reached end of local slice at slot {} (epoch {}), stopping",
779 slot,
780 epoch_num
781 );
782 break;
783 }
784 if slot >= slot_range.end {
785 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
786 if block_enabled {
791 pending_skipped_slots.remove(&thread_index);
792 }
793 return Ok(());
794 }
795 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
796 if slot < slot_range.start {
797 if slot.saturating_add(1) == slot_range.start {
798 log::debug!(
799 target: &log_target,
800 "priming reader with preceding slot {}, skipping",
801 slot
802 );
803 } else {
804 log::warn!(
805 target: &log_target,
806 "encountered slot {} before start of range {}, skipping",
807 slot,
808 slot_range.start
809 );
810 }
811 continue;
812 }
813 current_slot = Some(slot);
814 let mut entry_index: usize = 0;
815 let mut this_block_executed_transaction_count: u64 = 0;
816 let mut this_block_entry_count: u64 = 0;
817 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
818
819 for node_with_cid in &nodes.0 {
820 item_index += 1;
821 if let Some(skip) = skip_until_index {
822 if item_index < skip {
823 if !displayed_skip_message {
824 log::info!(
825 target: &log_target,
826 "skipping until index {} (at {})",
827 skip,
828 item_index
829 );
830 displayed_skip_message = true;
831 }
832 continue;
833 } else {
834 log::info!(
835 target: &log_target,
836 "reached target index {}, resuming...",
837 skip
838 );
839 skip_until_index = None;
840 }
841 }
842 let node = node_with_cid.get_node();
843
844 if let Some(ref mut stats) = thread_stats {
845 stats.current_slot = slot;
846 }
847
848 let error_slot = current_slot.unwrap_or(slot_range.start);
849
850 use crate::node::Node::*;
851 match node {
852 Transaction(tx) => {
853 if tx_enabled
854 && let Some(on_tx_cb) = on_tx.as_ref()
855 {
856 let error_slot = current_slot.unwrap_or(slot_range.start);
857 let versioned_tx = tx.as_parsed().map_err(|err| {
858 (
859 FirehoseError::NodeDecodingError(item_index, err),
860 error_slot,
861 )
862 })?;
863 let reassembled_metadata = nodes
864 .reassemble_dataframes(tx.metadata.clone())
865 .map_err(|err| {
866 (
867 FirehoseError::NodeDecodingError(item_index, err),
868 error_slot,
869 )
870 })?;
871
872 let decompressed =
873 utils::decompress_zstd(reassembled_metadata.clone())
874 .map_err(|err| {
875 (
876 FirehoseError::NodeDecodingError(
877 item_index,
878 err,
879 ),
880 error_slot,
881 )
882 })?;
883
884 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
885 prost_011::Message::decode(decompressed.as_slice())
886 .map_err(|err| {
887 (
888 FirehoseError::NodeDecodingError(
889 item_index,
890 Box::new(err),
891 ),
892 error_slot,
893 )
894 })?;
895
896 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
897 metadata.try_into().map_err(|err| {
898 (
899 FirehoseError::NodeDecodingError(
900 item_index,
901 Box::new(err),
902 ),
903 error_slot,
904 )
905 })?;
906
907 let message_hash = {
908 #[cfg(feature = "verify-transaction-signatures")]
909 {
910 versioned_tx.verify_and_hash_message().map_err(|err| {
911 (
912 FirehoseError::TransactionHandlerError(Box::new(err)),
913 error_slot,
914 )
915 })?
916 }
917 #[cfg(not(feature = "verify-transaction-signatures"))]
918 {
919 versioned_tx.message.hash()
920 }
921 };
922 let signature = versioned_tx
923 .signatures
924 .first()
925 .ok_or_else(|| {
926 Box::new(std::io::Error::new(
927 std::io::ErrorKind::InvalidData,
928 "transaction missing signature",
929 )) as SharedError
930 })
931 .map_err(|err| {
932 (
933 FirehoseError::NodeDecodingError(
934 item_index,
935 err,
936 ),
937 error_slot,
938 )
939 })?;
940 let is_vote = is_simple_vote_transaction(&versioned_tx);
941
942 on_tx_cb(
943 thread_index,
944 TransactionData {
945 slot: block.slot,
946 transaction_slot_index: tx.index.unwrap() as usize,
947 signature: *signature,
948 message_hash,
949 is_vote,
950 transaction_status_meta: as_native_metadata.clone(),
951 transaction: versioned_tx.clone(),
952 },
953 )
954 .await
955 .map_err(|e| {
956 (
957 FirehoseError::TransactionHandlerError(e),
958 error_slot,
959 )
960 })?;
961 }
962 fetch_add_if(
963 tracking_enabled,
964 &overall_transactions_processed,
965 1,
966 );
967 if let Some(ref mut stats) = thread_stats {
968 stats.transactions_processed += 1;
969 }
970 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
971 }
972 Entry(entry) => {
973 let entry_hash = Hash::from(entry.hash.to_bytes());
974 let entry_transaction_count = entry.transactions.len();
975 let entry_transaction_count_u64 = entry_transaction_count as u64;
976 let starting_transaction_index_u64 =
977 this_block_executed_transaction_count;
978 latest_entry_blockhash = entry_hash;
979 this_block_executed_transaction_count += entry_transaction_count_u64;
980 this_block_entry_count += 1;
981
982 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
983 let starting_transaction_index = usize::try_from(
984 starting_transaction_index_u64,
985 )
986 .map_err(|err| {
987 (
988 FirehoseError::EntryHandlerError(Box::new(err)),
989 error_slot,
990 )
991 })?;
992 let transaction_indexes_end =
993 starting_transaction_index + entry_transaction_count;
994 on_entry_cb(
995 thread_index,
996 EntryData {
997 slot: block.slot,
998 entry_index,
999 transaction_indexes: starting_transaction_index
1000 ..transaction_indexes_end,
1001 num_hashes: entry.num_hashes,
1002 hash: entry_hash,
1003 },
1004 )
1005 .await
1006 .map_err(|e| {
1007 (
1008 FirehoseError::EntryHandlerError(e),
1009 error_slot,
1010 )
1011 })?;
1012 }
1013 entry_index += 1;
1014 fetch_add_if(
1015 tracking_enabled,
1016 &overall_entries_processed,
1017 1,
1018 );
1019 if let Some(ref mut stats) = thread_stats {
1020 stats.entries_processed += 1;
1021 }
1022 }
1023 Block(block) => {
1024 let prev_last_counted_slot = last_counted_slot;
1025 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1026 (
1027 stats.slots_processed,
1028 stats.blocks_processed,
1029 stats.leader_skipped_slots,
1030 stats.current_slot,
1031 )
1032 });
1033
1034 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1035 let skip_start_from_previous = last_counted_slot.saturating_add(1);
1036 let skip_start = skip_start_from_previous.max(next_expected_slot);
1037
1038 let skipped_epoch = slot_to_epoch(last_counted_slot);
1039 for skipped_slot in skip_start..slot {
1040 if slot_to_epoch(skipped_slot) != skipped_epoch {
1041 break;
1042 }
1043 log::debug!(
1044 target: &log_target,
1045 "leader skipped slot {} (prev_counted {}, current slot {})",
1046 skipped_slot,
1047 prev_last_counted_slot,
1048 slot,
1049 );
1050 if block_enabled {
1051 pending_skipped_slots
1052 .entry(thread_index)
1053 .or_default()
1054 .insert(skipped_slot);
1055 }
1056 if block_enabled
1057 && let Some(on_block_cb) = on_block.as_ref()
1058 && skipped_slot > last_emitted_slot {
1059 last_emitted_slot = skipped_slot;
1060 on_block_cb(
1061 thread_index,
1062 BlockData::PossibleLeaderSkipped {
1063 slot: skipped_slot,
1064 },
1065 )
1066 .await
1067 .map_err(|e| {
1068 (
1069 FirehoseError::BlockHandlerError(e),
1070 error_slot,
1071 )
1072 })?;
1073 }
1074 if tracking_enabled {
1075 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1076 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1077 if let Some(ref mut stats) = thread_stats {
1078 stats.leader_skipped_slots += 1;
1079 stats.slots_processed += 1;
1080 stats.current_slot = skipped_slot;
1081 }
1082 }
1083 last_counted_slot = skipped_slot;
1084 }
1085
1086 let cleared_pending_skip = if block_enabled {
1087 clear_pending_skip(
1088 &pending_skipped_slots,
1089 thread_index,
1090 slot,
1091 )
1092 } else {
1093 false
1094 };
1095
1096 if slot <= last_counted_slot && !cleared_pending_skip {
1097 log::debug!(
1098 target: &log_target,
1099 "duplicate block {}, already counted (last_counted={})",
1100 slot,
1101 last_counted_slot,
1102 );
1103 this_block_rewards.clear();
1104 continue;
1105 }
1106
1107 if block_enabled {
1108 if let Some(on_block_cb) = on_block.as_ref() {
1109 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1110 if slot > last_emitted_slot {
1111 last_emitted_slot = slot;
1112 on_block_cb(
1113 thread_index,
1114 BlockData::Block {
1115 parent_slot: block.meta.parent_slot,
1116 parent_blockhash: previous_blockhash,
1117 slot: block.slot,
1118 blockhash: latest_entry_blockhash,
1119 rewards: KeyedRewardsAndNumPartitions {
1120 keyed_rewards,
1121 num_partitions: None,
1122 },
1123 block_time: Some(block.meta.blocktime as i64),
1124 block_height: block.meta.block_height,
1125 executed_transaction_count:
1126 this_block_executed_transaction_count,
1127 entry_count: this_block_entry_count,
1128 },
1129 )
1130 .await
1131 .map_err(|e| {
1132 (
1133 FirehoseError::BlockHandlerError(e),
1134 error_slot,
1135 )
1136 })?;
1137 }
1138 }
1139 } else {
1140 this_block_rewards.clear();
1141 }
1142 previous_blockhash = latest_entry_blockhash;
1143
1144 if tracking_enabled {
1145 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1146 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1147 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1148 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1149 if let Some(ref mut stats) = thread_stats {
1150 stats.blocks_processed += 1;
1151 stats.slots_processed += 1;
1152 stats.current_slot = slot;
1153 }
1154
1155 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1156 (&stats_tracking, thread_stats.as_mut())
1157 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1158 && let Err(err) = maybe_emit_stats(
1159 stats_tracking.as_ref(),
1160 thread_index,
1161 thread_stats_ref,
1162 &overall_slots_processed,
1163 &overall_blocks_processed,
1164 &overall_transactions_processed,
1165 &overall_entries_processed,
1166 &transactions_since_stats,
1167 &blocks_since_stats,
1168 &slots_since_stats,
1169 &last_pulse,
1170 start_time,
1171 )
1172 .await
1173 {
1174 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1175 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1176 overall_blocks_processed
1177 .fetch_sub(1, Ordering::Relaxed);
1178 overall_slots_processed
1179 .fetch_sub(1, Ordering::Relaxed);
1180 if let Some((
1181 prev_slots_processed,
1182 prev_blocks_processed,
1183 prev_leader_skipped,
1184 prev_current_slot,
1185 )) = thread_stats_snapshot
1186 {
1187 thread_stats_ref.slots_processed =
1188 prev_slots_processed;
1189 thread_stats_ref.blocks_processed =
1190 prev_blocks_processed;
1191 thread_stats_ref.leader_skipped_slots =
1192 prev_leader_skipped;
1193 thread_stats_ref.current_slot =
1194 prev_current_slot;
1195 }
1196 last_counted_slot = prev_last_counted_slot;
1197 return Err(err);
1198 }
1199 }
1200
1201 if slot > last_counted_slot {
1202 last_counted_slot = slot;
1203 }
1204 }
1205 Subset(_subset) => (),
1206 Epoch(_epoch) => (),
1207 Rewards(rewards) => {
1208 if reward_enabled || block_enabled {
1209 let reassembled = nodes
1210 .reassemble_dataframes(rewards.data.clone())
1211 .map_err(|err| {
1212 (
1213 FirehoseError::NodeDecodingError(item_index, err),
1214 current_slot.unwrap_or(slot_range.start),
1215 )
1216 })?;
1217 if reassembled.is_empty() {
1218 this_block_rewards.clear();
1219 if reward_enabled
1220 && let Some(on_reward_cb) = on_reward.as_ref()
1221 {
1222 on_reward_cb(
1223 thread_index,
1224 RewardsData {
1225 slot: block.slot,
1226 rewards: Vec::new(),
1227 },
1228 )
1229 .await
1230 .map_err(|e| {
1231 (
1232 FirehoseError::RewardHandlerError(e),
1233 error_slot,
1234 )
1235 })?;
1236 }
1237 continue;
1238 }
1239
1240 let decompressed = utils::decompress_zstd(reassembled)
1241 .map_err(|err| {
1242 (
1243 FirehoseError::NodeDecodingError(
1244 item_index,
1245 err,
1246 ),
1247 error_slot,
1248 )
1249 })?;
1250
1251 let decoded =
1252 prost_011::Message::decode(decompressed.as_slice())
1253 .map_err(|err| {
1254 (
1255 FirehoseError::NodeDecodingError(
1256 item_index,
1257 Box::new(err),
1258 ),
1259 error_slot,
1260 )
1261 })?;
1262 let keyed_rewards = convert_proto_rewards(&decoded)
1263 .map_err(|err| {
1264 (
1265 FirehoseError::NodeDecodingError(item_index, err),
1266 error_slot,
1267 )
1268 })?;
1269 if reward_enabled
1270 && let Some(on_reward_cb) = on_reward.as_ref()
1271 {
1272 on_reward_cb(
1273 thread_index,
1274 RewardsData {
1275 slot: block.slot,
1276 rewards: keyed_rewards.clone(),
1277 },
1278 )
1279 .await
1280 .map_err(|e| {
1281 (
1282 FirehoseError::RewardHandlerError(e),
1283 error_slot,
1284 )
1285 })?;
1286 }
1287 this_block_rewards = keyed_rewards;
1288 if let Some(ref mut stats) = thread_stats {
1289 stats.rewards_processed +=
1290 this_block_rewards.len() as u64;
1291 }
1292 }
1293 }
1294 DataFrame(_data_frame) => (),
1295 }
1296 }
1297 if block.slot == slot_range.end - 1 {
1298 let finish_time = std::time::Instant::now();
1299 let elapsed = finish_time.duration_since(start_time);
1300 log::info!(target: &log_target, "processed slot {}", block.slot);
1301 let elapsed_pretty = human_readable_duration(elapsed);
1302 log::info!(
1303 target: &log_target,
1304 "processed {} slots across {} epochs in {}.",
1305 slot_range.end - slot_range.start,
1306 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1307 elapsed_pretty
1308 );
1309 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1310 let summary: String = error_counts
1313 .iter()
1314 .enumerate()
1315 .filter_map(|(i, c)| {
1316 let v = c.load(Ordering::Relaxed);
1317 if v > 0 {
1318 Some(format!("{:03}({})", i, v))
1319 } else {
1320 None
1321 }
1322 })
1323 .collect::<Vec<_>>()
1324 .join(", ");
1325 if !summary.is_empty() {
1326 log::debug!(target: &log_target, "threads with errors: {}", summary);
1327 }
1328 return Ok(());
1329 }
1330 }
1331 if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1332 && last_counted_slot < expected_last_slot
1333 {
1334 }
1337 if let Some(ref mut stats) = thread_stats {
1338 stats.finish_time = Some(std::time::Instant::now());
1339 maybe_emit_stats(
1340 stats_tracking.as_ref(),
1341 thread_index,
1342 stats,
1343 &overall_slots_processed,
1344 &overall_blocks_processed,
1345 &overall_transactions_processed,
1346 &overall_entries_processed,
1347 &transactions_since_stats,
1348 &blocks_since_stats,
1349 &slots_since_stats,
1350 &last_pulse,
1351 start_time,
1352 )
1353 .await?;
1354 }
1355 if block_enabled {
1356 pending_skipped_slots.remove(&thread_index);
1357 }
1358 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1359 }
1360 Ok(())
1361 }
1362 .await
1363 {
1364 if is_shutdown_error(&err) {
1365 log::info!(
1366 target: &log_target,
1367 "shutdown requested; terminating firehose thread {}",
1368 thread_index
1369 );
1370 break;
1371 }
1372 let epoch = slot_to_epoch(slot);
1373 let item_index = match &err {
1374 FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1375 _ => 0,
1376 };
1377 let error_message = err.to_string();
1378 log::error!(
1379 target: &log_target,
1380 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1381 slot,
1382 epoch
1383 );
1384 log::error!(target: &log_target, "{}", error_message);
1385 if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
1386 SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1388 }
1389 if let Some(on_error_cb) = on_error.clone() {
1390 let context = FirehoseErrorContext {
1391 thread_id: thread_index,
1392 slot,
1393 epoch,
1394 error_message: error_message.clone(),
1395 };
1396 if let Err(handler_err) = on_error_cb(thread_index, context).await {
1397 log::error!(
1398 target: &log_target,
1399 "on_error handler failed: {}",
1400 handler_err
1401 );
1402 }
1403 }
1404 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1406 log::warn!(
1407 target: &log_target,
1408 "restarting from slot {} at index {}",
1409 slot,
1410 item_index,
1411 );
1412 if slot <= last_counted_slot {
1416 slot_range.start = last_counted_slot.saturating_add(1);
1417 } else {
1418 slot_range.start = slot;
1419 }
1420 last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1422 if tracking_enabled
1423 && let Some(ref mut stats_ref) = thread_stats {
1424 stats_ref.slot_range.start = slot_range.start;
1425 stats_ref.slot_range.end = slot_range.end;
1426 }
1428 if block_enabled {
1429 pending_skipped_slots.remove(&thread_index);
1430 }
1431 skip_until_index = Some(item_index);
1432 last_emitted_slot_global = last_emitted_slot;
1433 }
1434 });
1435 handles.push(handle);
1436 }
1437
1438 for handle in handles {
1440 handle.await.unwrap();
1441 }
1442 if stats_tracking.is_some() {
1443 let elapsed = firehose_start.elapsed();
1444 let elapsed_secs = elapsed.as_secs_f64();
1445 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1446 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1447 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1448 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1449 let total_errors: u64 = error_counts
1450 .iter()
1451 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1452 .sum();
1453 let overall_tps = if elapsed_secs > 0.0 {
1454 total_transactions as f64 / elapsed_secs
1455 } else {
1456 0.0
1457 };
1458 log::info!(
1459 target: LOG_MODULE,
1460 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1461 elapsed_secs,
1462 total_slots,
1463 total_blocks,
1464 total_leader_skipped,
1465 total_transactions,
1466 overall_tps,
1467 total_errors
1468 );
1469 }
1470 if shutdown_flag.load(Ordering::SeqCst) {
1471 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1472 } else {
1473 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1474 }
1475 Ok(())
1476}
1477
1478#[allow(clippy::result_large_err)]
1479pub fn firehose_geyser(
1486 rt: Arc<tokio::runtime::Runtime>,
1487 slot_range: Range<u64>,
1488 geyser_config_files: Option<&[PathBuf]>,
1489 index_base_url: &Url,
1490 client: &Client,
1491 on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1492 threads: u64,
1493) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1494 if threads == 0 {
1495 return Err((
1496 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1497 slot_range.start,
1498 ));
1499 }
1500 log::info!(target: LOG_MODULE, "starting firehose...");
1501 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1502 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1503 let mut entry_notifier_maybe = None;
1504 let mut block_meta_notifier_maybe = None;
1505 let mut transaction_notifier_maybe = None;
1506 if let Some(geyser_config_files) = geyser_config_files {
1507 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1508
1509 let service =
1510 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1511 confirmed_bank_receiver.clone(),
1512 true,
1513 geyser_config_files,
1514 )
1515 .map_err(|e| (e.into(), slot_range.start))?;
1516
1517 transaction_notifier_maybe = Some(
1518 service
1519 .get_transaction_notifier()
1520 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1521 .map_err(|e| (e, slot_range.start))?,
1522 );
1523
1524 entry_notifier_maybe = service.get_entry_notifier();
1525 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1526
1527 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1528 }
1529
1530 if entry_notifier_maybe.is_some() {
1531 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1532 } else {
1533 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1534 }
1535 log::info!(target: LOG_MODULE, "running on_load...");
1536 rt.spawn(on_load);
1537
1538 let slot_range = Arc::new(slot_range);
1539 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1540 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1541 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1542 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1543
1544 let subranges = generate_subranges(&slot_range, threads);
1546 if threads > 1 {
1547 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1548 }
1549
1550 let mut handles = Vec::new();
1551 let error_counts: Arc<Vec<AtomicU32>> =
1553 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1554
1555 for (i, slot_range) in subranges.into_iter().enumerate() {
1556 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1557 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1558 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1559 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1560 let client = client.clone();
1561 let error_counts = error_counts.clone();
1562
1563 let rt_clone = rt.clone();
1564
1565 let handle = std::thread::spawn(move || {
1566 rt_clone.block_on(async {
1567 firehose_geyser_thread(
1568 slot_range,
1569 transaction_notifier_maybe,
1570 entry_notifier_maybe,
1571 block_meta_notifier_maybe,
1572 confirmed_bank_sender,
1573 &client,
1574 if threads > 1 { Some(i) } else { None },
1575 error_counts,
1576 )
1577 .await
1578 .unwrap();
1579 });
1580 });
1581 handles.push(handle);
1582 }
1583
1584 for handle in handles {
1586 handle.join().unwrap();
1587 }
1588 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1589 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1590 block_meta_notifier.notify_block_metadata(
1591 u64::MAX,
1592 "unload",
1593 u64::MAX,
1594 "unload",
1595 &KeyedRewardsAndNumPartitions {
1596 keyed_rewards: vec![],
1597 num_partitions: None,
1598 },
1599 None,
1600 None,
1601 0,
1602 0,
1603 );
1604 }
1605 Ok(confirmed_bank_receiver)
1606}
1607
1608#[allow(clippy::too_many_arguments)]
1609#[allow(clippy::result_large_err)]
1610async fn firehose_geyser_thread(
1611 mut slot_range: Range<u64>,
1612 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
1613 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
1614 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
1615 confirmed_bank_sender: Sender<SlotNotification>,
1616 client: &Client,
1617 thread_index: Option<usize>,
1618 error_counts: Arc<Vec<AtomicU32>>,
1619) -> Result<(), (FirehoseError, u64)> {
1620 let start_time = std::time::Instant::now();
1621 let log_target = if let Some(thread_index) = thread_index {
1622 format!("{}::T{:03}", LOG_MODULE, thread_index)
1623 } else {
1624 LOG_MODULE.to_string()
1625 };
1626 let initial_slot_range = slot_range.clone();
1627 let mut skip_until_index = None;
1628 let mut last_counted_slot = slot_range.start.saturating_sub(1);
1629 while let Err((err, slot)) = async {
1631 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1632 log::info!(
1633 target: &log_target,
1634 "slot range: {} (epoch {}) ... {} (epoch {})",
1635 slot_range.start,
1636 slot_to_epoch(slot_range.start),
1637 slot_range.end,
1638 slot_to_epoch(slot_range.end)
1639 );
1640
1641 log::info!(target: &log_target, "🚒 starting firehose...");
1642
1643 let mut current_slot: Option<u64> = None;
1645 for epoch_num in epoch_range.clone() {
1646 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1647 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
1648 Ok(stream) => stream,
1649 Err(_) => {
1650 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
1651 }
1652 };
1653 let mut reader = NodeReader::new(stream);
1654
1655 let header_fut = reader.read_raw_header();
1656 let header = match timeout(OP_TIMEOUT, header_fut).await {
1657 Ok(res) => res
1658 .map_err(FirehoseError::ReadHeader)
1659 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1660 Err(_) => {
1661 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
1662 }
1663 };
1664 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1665
1666 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1667 let local_start = std::cmp::max(slot_range.start, epoch_start);
1668 let local_end_inclusive =
1669 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1670 if local_start > local_end_inclusive {
1671 log::debug!(
1672 target: &log_target,
1673 "epoch {} has no overlap with thread range ({}..{}), skipping",
1674 epoch_num,
1675 slot_range.start,
1676 slot_range.end
1677 );
1678 continue;
1679 }
1680
1681 let mut todo_previous_blockhash = Hash::default();
1682 let mut todo_latest_entry_blockhash = Hash::default();
1683 last_counted_slot = local_start.saturating_sub(1);
1686 current_slot = None;
1687
1688 if local_start > epoch_start {
1689 let seek_slot = local_start.saturating_sub(1);
1693 let seek_fut = reader.seek_to_slot(seek_slot);
1694 match timeout(OP_TIMEOUT, seek_fut).await {
1695 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1696 Err(_) => {
1697 return Err((FirehoseError::OperationTimeout("seek_to_slot"), current_slot.unwrap_or(slot_range.start)));
1698 }
1699 }
1700 }
1701
1702 let mut item_index = 0;
1704 let mut displayed_skip_message = false;
1705 loop {
1706 let read_fut = reader.read_until_block();
1707 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
1708 Ok(result) => result
1709 .map_err(FirehoseError::ReadUntilBlockError)
1710 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1711 Err(_) => {
1712 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1713 let restart_slot =
1714 current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
1715 return Err((
1716 FirehoseError::OperationTimeout("read_until_block"),
1717 restart_slot,
1718 ));
1719 }
1720 };
1721 if nodes.is_empty() {
1722 log::info!(
1723 target: &log_target,
1724 "reached end of epoch {}",
1725 epoch_num
1726 );
1727 break;
1728 }
1729 if let Some(last_node) = nodes.0.last()
1738 && !last_node.get_node().is_block() {
1739 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1740 break;
1741 }
1742 let block = nodes
1743 .get_block()
1744 .map_err(FirehoseError::GetBlockError)
1745 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1746 log::debug!(
1747 target: &log_target,
1748 "read {} items from epoch {}, now at slot {}",
1749 item_index,
1750 epoch_num,
1751 block.slot
1752 );
1753 let slot = block.slot;
1754 if slot > local_end_inclusive {
1755 log::debug!(
1756 target: &log_target,
1757 "reached end of local slice at slot {} (epoch {}), stopping",
1758 slot,
1759 epoch_num
1760 );
1761 break;
1762 }
1763 if slot >= slot_range.end {
1764 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1765 return Ok(());
1769 }
1770 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1771 if slot < local_start {
1772 if slot.saturating_add(1) == local_start {
1773 log::debug!(
1774 target: &log_target,
1775 "priming reader with preceding slot {}, skipping",
1776 slot
1777 );
1778 } else {
1779 log::warn!(
1780 target: &log_target,
1781 "encountered slot {} before start of range {}, skipping",
1782 slot,
1783 local_start
1784 );
1785 }
1786 continue;
1787 }
1788 current_slot = Some(slot);
1789 let mut entry_index: usize = 0;
1790 let mut this_block_executed_transaction_count: u64 = 0;
1791 let mut this_block_entry_count: u64 = 0;
1792 let mut this_block_rewards: Vec<(Address, RewardInfo)> = Vec::new();
1793
1794 if slot <= last_counted_slot {
1795 log::debug!(
1796 target: &log_target,
1797 "duplicate block {}, already counted (last_counted={})",
1798 slot,
1799 last_counted_slot,
1800 );
1801 this_block_rewards.clear();
1802 continue;
1803 }
1804
1805 nodes.each(|node_with_cid| -> Result<(), SharedError> {
1806 item_index += 1;
1807 if let Some(skip) = skip_until_index {
1813 if item_index < skip {
1814 if !displayed_skip_message {
1815 log::info!(
1816 target: &log_target,
1817 "skipping until index {} (at {})",
1818 skip,
1819 item_index
1820 );
1821 displayed_skip_message = true;
1822 }
1823 return Ok(());
1824 } else {
1825 log::info!(
1826 target: &log_target,
1827 "reached target index {}, resuming...",
1828 skip
1829 );
1830 skip_until_index = None;
1831 }
1832 }
1833 let node = node_with_cid.get_node();
1834
1835 use crate::node::Node::*;
1836 match node {
1837 Transaction(tx) => {
1838 let versioned_tx = tx.as_parsed()?;
1839 let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
1840
1841 let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1842
1843 let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1844 prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1845 Box::new(std::io::Error::other(
1846 std::format!("Error decoding metadata: {:?}", err),
1847 ))
1848 })?;
1849
1850 let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1851 metadata.try_into()?;
1852
1853 let message_hash = {
1854 #[cfg(feature = "verify-transaction-signatures")]
1855 {
1856 versioned_tx.verify_and_hash_message()?
1857 }
1858 #[cfg(not(feature = "verify-transaction-signatures"))]
1859 {
1860 versioned_tx.message.hash()
1863 }
1864 };
1865 let signature = versioned_tx
1866 .signatures
1867 .first()
1868 .ok_or_else(|| {
1869 Box::new(std::io::Error::new(
1870 std::io::ErrorKind::InvalidData,
1871 "transaction missing signature",
1872 )) as SharedError
1873 })?;
1874 let is_vote = is_simple_vote_transaction(&versioned_tx);
1875
1876 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
1877 transaction_notifier.notify_transaction(
1878 block.slot,
1879 tx.index.unwrap() as usize,
1880 signature,
1881 &message_hash,
1882 is_vote,
1883 &as_native_metadata,
1884 &versioned_tx,
1885 );
1886 }
1887
1888 }
1889 Entry(entry) => {
1890 let entry_hash = Hash::from(entry.hash.to_bytes());
1891 let entry_transaction_count = entry.transactions.len();
1892 let entry_transaction_count_u64 = entry_transaction_count as u64;
1893 let starting_transaction_index =
1894 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
1895 Box::new(std::io::Error::other(
1896 "transaction index exceeds usize range",
1897 )) as SharedError
1898 })?;
1899 todo_latest_entry_blockhash = entry_hash;
1900 this_block_executed_transaction_count += entry_transaction_count_u64;
1901 this_block_entry_count += 1;
1902 if entry_notifier_maybe.is_none() {
1903 return Ok(());
1904 }
1905 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
1906 let entry_summary = solana_entry::entry::EntrySummary {
1907 num_hashes: entry.num_hashes,
1908 hash: Hash::from(entry.hash.to_bytes()),
1909 num_transactions: entry_transaction_count_u64,
1910 };
1911 entry_notifier.notify_entry(
1912 block.slot,
1913 entry_index,
1914 &entry_summary,
1915 starting_transaction_index,
1916 );
1917 entry_index += 1;
1918 }
1919 Block(block) => {
1920 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
1921 confirmed_bank_sender.send(notification).unwrap();
1922
1923 if block_meta_notifier_maybe.is_none() {
1924 last_counted_slot = block.slot;
1925 return Ok(());
1926 }
1927 let keyed_rewards = std::mem::take(&mut this_block_rewards);
1928 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
1929 block_meta_notifier.notify_block_metadata(
1930 block.meta.parent_slot,
1931 todo_previous_blockhash.to_string().as_str(),
1932 block.slot,
1933 todo_latest_entry_blockhash.to_string().as_str(),
1934 &KeyedRewardsAndNumPartitions {
1935 keyed_rewards,
1936 num_partitions: None,
1937 },
1938 Some(block.meta.blocktime as i64),
1939 block.meta.block_height,
1940 this_block_executed_transaction_count,
1941 this_block_entry_count,
1942 );
1943 todo_previous_blockhash = todo_latest_entry_blockhash;
1944 last_counted_slot = block.slot;
1945 std::thread::yield_now();
1946 }
1947 Subset(_subset) => (),
1948 Epoch(_epoch) => (),
1949 Rewards(rewards) => {
1950 if !rewards.is_complete() {
1951 let reassembled = nodes.reassemble_dataframes(rewards.data.clone())?;
1952 let decompressed = utils::decompress_zstd(reassembled)?;
1953 let decoded = prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1954 Box::new(std::io::Error::other(
1955 std::format!("Error decoding rewards: {:?}", err),
1956 ))
1957 })?;
1958 this_block_rewards = convert_proto_rewards(&decoded)?;
1959 }
1960 }
1961 DataFrame(_data_frame) => (),
1962 }
1963 Ok(())
1964 })
1965 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1966 if block.slot == slot_range.end - 1 {
1967 let finish_time = std::time::Instant::now();
1968 let elapsed = finish_time.duration_since(start_time);
1969 log::info!(target: &log_target, "processed slot {}", block.slot);
1970 let elapsed_pretty = human_readable_duration(elapsed);
1971 log::info!(
1972 target: &log_target,
1973 "processed {} slots across {} epochs in {}.",
1974 initial_slot_range.end - initial_slot_range.start,
1975 slot_to_epoch(initial_slot_range.end)
1976 + 1
1977 - slot_to_epoch(initial_slot_range.start),
1978 elapsed_pretty
1979 );
1980 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
1981 let summary: String = error_counts
1984 .iter()
1985 .enumerate()
1986 .filter_map(|(i, c)| {
1987 let v = c.load(Ordering::Relaxed);
1988 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
1989 })
1990 .collect::<Vec<_>>()
1991 .join(", ");
1992 if !summary.is_empty() {
1993 log::debug!(target: &log_target, "threads with errors: {}", summary);
1994 }
1995 return Ok(());
1996 }
1997 }
1998 }
1999 Ok(())
2000}
2001.await
2002{
2003 if is_shutdown_error(&err) {
2004 log::info!(
2005 target: &log_target,
2006 "shutdown requested; terminating firehose thread {:?}",
2007 thread_index
2008 );
2009 return Ok(());
2010 }
2011 log::error!(
2012 target: &log_target,
2013 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2014 slot,
2015 slot_to_epoch(slot)
2016 );
2017 log::error!(target: &log_target, "{}", err);
2018 if matches!(err, FirehoseError::SlotOffsetIndexError(_)) {
2019 SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2021 }
2022 let item_index = match err {
2023 FirehoseError::NodeDecodingError(item_index, _) => item_index,
2024 _ => 0,
2025 };
2026 let idx = thread_index.unwrap_or(0);
2028 error_counts[idx].fetch_add(1, Ordering::Relaxed);
2029 log::warn!(
2030 target: &log_target,
2031 "restarting from slot {} at index {}",
2032 slot,
2033 item_index,
2034 );
2035 if slot <= last_counted_slot {
2038 slot_range.start = last_counted_slot.saturating_add(1);
2039 } else {
2040 slot_range.start = slot;
2041 }
2042 skip_until_index = Some(item_index);
2043}
2044 Ok(())
2045}
2046
2047#[inline]
2048fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2049 if !(1..=2).contains(&versioned_tx.signatures.len()) {
2050 return false;
2051 }
2052
2053 if !matches!(
2054 versioned_tx.version(),
2055 solana_transaction::versioned::TransactionVersion::Legacy(_)
2056 ) {
2057 return false;
2058 }
2059
2060 let instructions = versioned_tx.message.instructions();
2061 if instructions.len() != 1 {
2062 return false;
2063 }
2064
2065 let program_index = instructions[0].program_id_index as usize;
2066 versioned_tx
2067 .message
2068 .static_account_keys()
2069 .get(program_index)
2070 .map(|program_id| program_id == &vote_program_id())
2071 .unwrap_or(false)
2072}
2073
2074#[inline(always)]
2075fn convert_proto_rewards(
2076 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2077) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2078 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2079 for proto_reward in proto_rewards.rewards.iter() {
2080 let reward = RewardInfo {
2081 reward_type: match proto_reward.reward_type - 1 {
2082 0 => RewardType::Fee,
2083 1 => RewardType::Rent,
2084 2 => RewardType::Staking,
2085 3 => RewardType::Voting,
2086 typ => {
2087 return Err(Box::new(std::io::Error::other(format!(
2088 "unsupported reward type {}",
2089 typ
2090 ))));
2091 }
2092 },
2093 lamports: proto_reward.lamports,
2094 post_balance: proto_reward.post_balance,
2095 commission: proto_reward.commission.parse::<u8>().ok(),
2096 };
2097 let pubkey = proto_reward
2098 .pubkey
2099 .parse::<Address>()
2100 .map_err(|err| Box::new(err) as SharedError)?;
2101 keyed_rewards.push((pubkey, reward));
2102 }
2103 Ok(keyed_rewards)
2104}
2105
2106#[inline]
2107pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2109 let total = slot_range.end - slot_range.start;
2110 let slots_per_thread = total / threads;
2111 let remainder = total % threads;
2112
2113 let ranges: Vec<Range<u64>> = (0..threads)
2114 .map(|i| {
2115 let extra_slot = if i < remainder { 1 } else { 0 };
2117 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2118 let end = start + slots_per_thread + extra_slot;
2119 start..end
2120 })
2121 .collect();
2122
2123 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2125 assert_eq!(
2126 total_covered, total,
2127 "Range generation failed: {} threads should cover {} slots but only cover {}",
2128 threads, total, total_covered
2129 );
2130
2131 for i in 1..ranges.len() {
2133 assert_eq!(
2134 ranges[i - 1].end,
2135 ranges[i].start,
2136 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2137 i - 1,
2138 ranges[i - 1].end,
2139 i,
2140 ranges[i].start
2141 );
2142 }
2143
2144 log::info!(
2145 target: LOG_MODULE,
2146 "Generated {} thread ranges covering {} slots total",
2147 threads,
2148 total_covered
2149 );
2150 ranges
2151}
2152
2153fn human_readable_duration(duration: std::time::Duration) -> String {
2154 if duration.is_zero() {
2155 return "0s".into();
2156 }
2157 let total_secs = duration.as_secs();
2158 if total_secs < 60 {
2159 let secs_f = duration.as_secs_f64();
2160 if total_secs == 0 {
2161 format!("{:.2}s", secs_f)
2162 } else if duration.subsec_millis() == 0 {
2163 format!("{}s", total_secs)
2164 } else {
2165 format!("{:.2}s", secs_f)
2166 }
2167 } else {
2168 let mut secs = total_secs;
2169 let days = secs / 86_400;
2170 secs %= 86_400;
2171 let hours = secs / 3_600;
2172 secs %= 3_600;
2173 let minutes = secs / 60;
2174 secs %= 60;
2175 if days > 0 {
2176 if hours > 0 {
2177 format!("{days}d{hours}h")
2178 } else {
2179 format!("{days}d")
2180 }
2181 } else if hours > 0 {
2182 if minutes > 0 {
2183 format!("{hours}h{minutes}m")
2184 } else {
2185 format!("{hours}h")
2186 }
2187 } else if minutes > 0 {
2188 if secs > 0 {
2189 format!("{minutes}m{secs}s")
2190 } else {
2191 format!("{minutes}m")
2192 }
2193 } else {
2194 format!("{secs}s")
2195 }
2196 }
2197}
2198
2199#[cfg(test)]
2200fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2201 Box::pin(async move {
2202 let elapsed = stats.start_time.elapsed();
2203 let elapsed_secs = elapsed.as_secs_f64();
2204 let tps = if elapsed_secs > 0.0 {
2205 stats.transactions_processed as f64 / elapsed_secs
2206 } else {
2207 0.0
2208 };
2209 log::info!(
2210 target: LOG_MODULE,
2211 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2212 stats.thread_stats.current_slot,
2213 stats.slots_processed,
2214 stats.blocks_processed,
2215 stats.transactions_processed,
2216 stats.entries_processed,
2217 stats.rewards_processed,
2218 elapsed_secs,
2219 tps
2220 );
2221 Ok(())
2222 })
2223}
2224
2225#[tokio::test(flavor = "multi_thread")]
2226async fn test_firehose_epoch_800() {
2227 use dashmap::DashSet;
2228 use std::sync::atomic::{AtomicU64, Ordering};
2229 solana_logger::setup_with_default("info");
2230 const THREADS: usize = 4;
2231 const NUM_SLOTS_TO_COVER: u64 = 50;
2232 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2233 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2234 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2235 static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2236 static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2237 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2238 let stats_tracking = StatsTracking {
2239 on_stats: log_stats_handler,
2240 tracking_interval_slots: 10,
2241 };
2242
2243 for prev in PREV_BLOCK.iter() {
2244 prev.store(0, Ordering::Relaxed);
2245 }
2246 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2247 NUM_BLOCKS.store(0, Ordering::Relaxed);
2248 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2249 SEEN_SLOTS.get_or_init(DashSet::new).clear();
2250 SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2251
2252 firehose(
2253 THREADS.try_into().unwrap(),
2254 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2255 Some(|thread_id: usize, block: BlockData| {
2256 async move {
2257 let _prev =
2258 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2259 if block.was_skipped() {
2260 log::info!(
2261 target: LOG_MODULE,
2262 "leader skipped block {} on thread {}",
2263 block.slot(),
2264 thread_id,
2265 );
2266 } else {
2267 }
2274
2275 let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2276 if block.was_skipped() {
2277 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2278 SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2279 } else {
2280 if first_time {
2281 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2282 if let BlockData::Block {
2283 executed_transaction_count,
2284 ..
2285 } = &block
2286 {
2287 let executed = *executed_transaction_count;
2288 let _ = MIN_TRANSACTIONS.fetch_update(
2289 Ordering::Relaxed,
2290 Ordering::Relaxed,
2291 |current| {
2292 if executed < current {
2293 Some(executed)
2294 } else {
2295 None
2296 }
2297 },
2298 );
2299 }
2300 }
2301 }
2302 Ok(())
2303 }
2304 .boxed()
2305 }),
2306 None::<OnTxFn>,
2307 None::<OnEntryFn>,
2308 None::<OnRewardFn>,
2309 None::<OnErrorFn>,
2310 Some(stats_tracking),
2311 None,
2312 )
2313 .await
2314 .unwrap();
2315 let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2316 assert_eq!(
2317 seen, NUM_SLOTS_TO_COVER,
2318 "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2319 );
2320 let mut skipped: Vec<u64> = SEEN_SKIPPED
2321 .get_or_init(DashSet::new)
2322 .iter()
2323 .map(|v| *v)
2324 .collect();
2325 skipped.sort_unstable();
2326 const EXPECTED_SKIPPED: [u64; 6] = [
2328 345_600_004,
2329 345_600_005,
2330 345_600_008,
2331 345_600_009,
2332 345_600_010,
2333 345_600_011,
2334 ];
2335 assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2336 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2337}
2338
2339#[tokio::test(flavor = "multi_thread")]
2340async fn test_firehose_target_slot_transactions() {
2341 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2342 solana_logger::setup_with_default("info");
2343 const TARGET_SLOT: u64 = 376_273_722;
2344 const SLOT_RADIUS: u64 = 50;
2345 const EXPECTED_TRANSACTIONS: u64 = 1414;
2346 const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2347 static FOUND: AtomicBool = AtomicBool::new(false);
2348 static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2349 static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2350
2351 FOUND.store(false, Ordering::Relaxed);
2352 OBSERVED_TXS.store(0, Ordering::Relaxed);
2353 OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2354
2355 firehose(
2356 4,
2357 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2358 Some(|_thread_id: usize, block: BlockData| {
2359 async move {
2360 if block.slot() == TARGET_SLOT {
2361 assert!(
2362 !block.was_skipped(),
2363 "target slot {TARGET_SLOT} was marked leader skipped",
2364 );
2365 if let BlockData::Block {
2366 executed_transaction_count,
2367 ..
2368 } = block
2369 {
2370 OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
2371 FOUND.store(true, Ordering::Relaxed);
2372 assert_eq!(
2373 executed_transaction_count, EXPECTED_TRANSACTIONS,
2374 "unexpected transaction count for slot {TARGET_SLOT}"
2375 );
2376 assert_eq!(
2377 OBSERVED_NON_VOTE.load(Ordering::Relaxed),
2378 EXPECTED_NON_VOTE_TRANSACTIONS,
2379 "unexpected non-vote transaction count for slot {TARGET_SLOT}"
2380 );
2381 }
2382 }
2383 Ok(())
2384 }
2385 .boxed()
2386 }),
2387 Some(|_thread_id: usize, transaction: TransactionData| {
2388 async move {
2389 if transaction.slot == TARGET_SLOT && !transaction.is_vote {
2390 OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
2391 }
2392 Ok(())
2393 }
2394 .boxed()
2395 }),
2396 None::<OnEntryFn>,
2397 None::<OnRewardFn>,
2398 None::<OnErrorFn>,
2399 None::<OnStatsTrackingFn>,
2400 None,
2401 )
2402 .await
2403 .unwrap();
2404
2405 assert!(
2406 FOUND.load(Ordering::Relaxed),
2407 "target slot was not processed"
2408 );
2409 assert_eq!(
2410 OBSERVED_TXS.load(Ordering::Relaxed),
2411 EXPECTED_TRANSACTIONS,
2412 "recorded transaction count mismatch"
2413 );
2414}
2415
2416#[cfg(test)]
2417#[tokio::test(flavor = "multi_thread")]
2418#[serial]
2419async fn test_firehose_restart_loses_coverage_without_reset() {
2420 use std::collections::HashMap;
2421 solana_logger::setup_with_default("info");
2422 const THREADS: usize = 1;
2423 const START_SLOT: u64 = 345_600_000;
2424 const NUM_SLOTS: u64 = 8;
2425
2426 static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
2427 COVERAGE
2428 .get_or_init(|| Mutex::new(HashMap::new()))
2429 .lock()
2430 .unwrap()
2431 .clear();
2432 static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
2433 static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
2434 FAIL_TRIGGERED.store(false, Ordering::Relaxed);
2435 SEEN_BLOCKS.store(0, Ordering::Relaxed);
2436
2437 firehose(
2438 THREADS.try_into().unwrap(),
2439 START_SLOT..(START_SLOT + NUM_SLOTS),
2440 Some(|_thread_id: usize, block: BlockData| {
2441 async move {
2442 if !block.was_skipped()
2444 && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
2445 && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
2446 {
2447 return Err("synthetic handler failure to exercise restart".into());
2448 }
2449 let mut coverage = COVERAGE
2450 .get_or_init(|| Mutex::new(HashMap::new()))
2451 .lock()
2452 .unwrap();
2453 *coverage.entry(block.slot()).or_insert(0) += 1;
2454 if !block.was_skipped() {
2455 SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
2456 }
2457 Ok(())
2458 }
2459 .boxed()
2460 }),
2461 None::<OnTxFn>,
2462 None::<OnEntryFn>,
2463 None::<OnRewardFn>,
2464 None::<OnErrorFn>,
2465 None::<OnStatsTrackingFn>,
2466 None,
2467 )
2468 .await
2469 .unwrap();
2470
2471 let coverage = COVERAGE.get().unwrap().lock().unwrap();
2472 for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
2473 assert!(
2474 coverage.contains_key(&slot),
2475 "missing coverage for slot {slot} after restart"
2476 );
2477 }
2478}
2479
2480#[cfg(test)]
2481#[tokio::test(flavor = "multi_thread")]
2482#[serial]
2483async fn test_firehose_gap_coverage_near_known_missing_range() {
2484 use std::collections::HashSet;
2485 solana_logger::setup_with_default("info");
2486 const GAP_START: u64 = 378864000;
2487 const START_SLOT: u64 = GAP_START - 1000;
2488 const END_SLOT: u64 = GAP_START + 1000;
2489 const THREADS: usize = 16;
2490
2491 static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
2492 COVERAGE
2493 .get_or_init(|| Mutex::new(HashSet::new()))
2494 .lock()
2495 .unwrap()
2496 .clear();
2497
2498 firehose(
2499 THREADS.try_into().unwrap(),
2500 START_SLOT..(END_SLOT + 1),
2501 Some(|_thread_id: usize, block: BlockData| {
2502 async move {
2503 if block.was_skipped() {
2504 return Ok(());
2505 }
2506 let slot = block.slot();
2507 COVERAGE
2508 .get_or_init(|| Mutex::new(HashSet::new()))
2509 .lock()
2510 .unwrap()
2511 .insert(slot);
2512 Ok(())
2513 }
2514 .boxed()
2515 }),
2516 None::<OnTxFn>,
2517 None::<OnEntryFn>,
2518 None::<OnRewardFn>,
2519 None::<OnErrorFn>,
2520 None::<OnStatsTrackingFn>,
2521 None,
2522 )
2523 .await
2524 .unwrap();
2525
2526 let mut coverage = COVERAGE
2527 .get_or_init(|| Mutex::new(HashSet::new()))
2528 .lock()
2529 .unwrap()
2530 .clone();
2531
2532 coverage.insert(378864396);
2534 coverage.insert(378864397);
2535 coverage.insert(378864398);
2536 coverage.insert(378864399);
2537
2538 let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
2539 let missing: Vec<u64> = expected
2540 .iter()
2541 .copied()
2542 .filter(|slot| !coverage.contains(slot))
2543 .collect();
2544 assert!(
2545 missing.is_empty(),
2546 "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
2547 missing.len(),
2548 &missing[..missing.len().min(10)]
2549 );
2550}