1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7 block_metadata_notifier_interface::BlockMetadataNotifier,
8 geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14 optimistically_confirmed_bank_tracker::SlotNotification,
15 transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21 fmt::Display,
22 future::Future,
23 io,
24 ops::Range,
25 path::PathBuf,
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29 },
30};
31use thiserror::Error;
32use tokio::{
33 sync::broadcast::{self, error::TryRecvError},
34 time::timeout,
35};
36
37use crate::{
38 LOG_MODULE, SharedError,
39 epochs::{
40 FetchEpochStreamOptions, epoch_to_slot_range, fetch_epoch_stream,
41 fetch_epoch_stream_with_options, slot_to_epoch,
42 },
43 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
44 node_reader::NodeReader,
45 utils,
46};
47
48const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
52const OP_TIMEOUT_SEQUENTIAL: std::time::Duration = std::time::Duration::from_secs(180);
53const BINCODE_EPOCH_CUTOFF: u64 = 157;
55
56fn poll_shutdown(
57 flag: &Arc<std::sync::atomic::AtomicBool>,
58 receiver: &mut Option<broadcast::Receiver<()>>,
59) -> bool {
60 if let Some(rx) = receiver {
61 match rx.try_recv() {
62 Ok(_) | Err(TryRecvError::Lagged(_)) => {
63 flag.store(true, Ordering::SeqCst);
64 }
65 Err(TryRecvError::Closed) => {
66 flag.store(true, Ordering::SeqCst);
67 }
68 Err(TryRecvError::Empty) => {}
69 }
70 }
71 flag.load(Ordering::SeqCst)
72}
73
74fn is_shutdown_error(err: &FirehoseError) -> bool {
75 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
76 inner
77 .downcast_ref::<io::Error>()
78 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
79 .unwrap_or(false)
80 }
81
82 match err {
83 FirehoseError::BlockHandlerError(inner)
84 | FirehoseError::TransactionHandlerError(inner)
85 | FirehoseError::EntryHandlerError(inner)
86 | FirehoseError::RewardHandlerError(inner)
87 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
88 _ => false,
89 }
90}
91
92async fn find_previous_indexed_slot(
93 local_start: u64,
94 epoch_start: u64,
95 log_target: &str,
96) -> Result<Option<u64>, FirehoseError> {
97 if local_start <= epoch_start {
98 return Ok(None);
99 }
100 let mut candidate = local_start.saturating_sub(1);
101 let mut skipped = 0u64;
102 loop {
103 match slot_to_offset(candidate).await {
104 Ok(_) => {
105 if skipped > 0 {
106 log::info!(
107 target: log_target,
108 "slot {} missing in index; seeking back {} slots to {}",
109 local_start.saturating_sub(1),
110 skipped,
111 candidate
112 );
113 }
114 return Ok(Some(candidate));
115 }
116 Err(SlotOffsetIndexError::SlotNotFound(..)) => {
117 if candidate <= epoch_start {
118 break;
119 }
120 skipped += 1;
121 candidate = candidate.saturating_sub(1);
122 }
123 Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
124 }
125 }
126 log::warn!(
127 target: log_target,
128 "no indexed slot found before {} (epoch start {}); reading from epoch start",
129 local_start,
130 epoch_start
131 );
132 Ok(None)
133}
134
135#[derive(Debug, Error)]
138pub enum FirehoseError {
139 Reqwest(reqwest::Error),
141 ReadHeader(SharedError),
143 GeyserPluginService(GeyserPluginServiceError),
145 FailedToGetTransactionNotifier,
147 ReadUntilBlockError(SharedError),
149 GetBlockError(SharedError),
151 NodeDecodingError(usize, SharedError),
153 SlotOffsetIndexError(SlotOffsetIndexError),
155 SeekToSlotError(SharedError),
157 OnLoadError(SharedError),
159 OnStatsHandlerError(SharedError),
161 OperationTimeout(&'static str),
163 TransactionHandlerError(SharedError),
165 EntryHandlerError(SharedError),
167 RewardHandlerError(SharedError),
169 BlockHandlerError(SharedError),
171}
172
173unsafe impl Send for FirehoseError {}
174unsafe impl Sync for FirehoseError {}
175
176impl Display for FirehoseError {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
180 FirehoseError::ReadHeader(error) => {
181 write!(f, "Error reading header: {}", error)
182 }
183 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
184 f,
185 "Error initializing geyser plugin service: {}",
186 geyser_plugin_service_error
187 ),
188 FirehoseError::FailedToGetTransactionNotifier => write!(
189 f,
190 "Failed to get transaction notifier from GeyserPluginService"
191 ),
192 FirehoseError::ReadUntilBlockError(error) => {
193 write!(f, "Error reading until block: {}", error)
194 }
195 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
196 FirehoseError::NodeDecodingError(item_index, error) => {
197 write!(
198 f,
199 "Error seeking, reading data from, or decoding data for data node {}: {}",
200 item_index, error
201 )
202 }
203 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
204 f,
205 "Error getting info from slot offset index: {}",
206 slot_offset_index_error
207 ),
208 FirehoseError::SeekToSlotError(error) => {
209 write!(f, "Error seeking to slot: {}", error)
210 }
211 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
212 FirehoseError::OnStatsHandlerError(error) => {
213 write!(f, "Stats handler error: {}", error)
214 }
215 FirehoseError::OperationTimeout(op) => {
216 write!(f, "Timeout while waiting for operation: {}", op)
217 }
218 FirehoseError::TransactionHandlerError(error) => {
219 write!(f, "Transaction handler error: {}", error)
220 }
221 FirehoseError::EntryHandlerError(error) => {
222 write!(f, "Entry handler error: {}", error)
223 }
224 FirehoseError::RewardHandlerError(error) => {
225 write!(f, "Reward handler error: {}", error)
226 }
227 FirehoseError::BlockHandlerError(error) => {
228 write!(f, "Block handler error: {}", error)
229 }
230 }
231 }
232}
233
234impl From<reqwest::Error> for FirehoseError {
235 fn from(e: reqwest::Error) -> Self {
236 FirehoseError::Reqwest(e)
237 }
238}
239
240impl From<GeyserPluginServiceError> for FirehoseError {
241 fn from(e: GeyserPluginServiceError) -> Self {
242 FirehoseError::GeyserPluginService(e)
243 }
244}
245
246impl From<SlotOffsetIndexError> for FirehoseError {
247 fn from(e: SlotOffsetIndexError) -> Self {
248 FirehoseError::SlotOffsetIndexError(e)
249 }
250}
251
252#[derive(Clone, PartialEq, Eq, Hash, Debug)]
254pub struct ThreadStats {
255 pub thread_id: usize,
257 pub start_time: std::time::Instant,
259 pub finish_time: Option<std::time::Instant>,
261 pub slot_range: Range<u64>,
263 pub initial_slot_range: Range<u64>,
265 pub current_slot: u64,
267 pub slots_processed: u64,
269 pub blocks_processed: u64,
271 pub leader_skipped_slots: u64,
273 pub transactions_processed: u64,
275 pub entries_processed: u64,
277 pub rewards_processed: u64,
279}
280
281#[derive(Clone, PartialEq, Eq, Hash, Debug)]
283pub struct Stats {
284 pub thread_stats: ThreadStats,
286 pub start_time: std::time::Instant,
288 pub finish_time: Option<std::time::Instant>,
290 pub slot_range: Range<u64>,
292 pub slots_processed: u64,
294 pub blocks_processed: u64,
296 pub leader_skipped_slots: u64,
298 pub transactions_processed: u64,
300 pub entries_processed: u64,
302 pub rewards_processed: u64,
304 pub transactions_since_last_pulse: u64,
306 pub blocks_since_last_pulse: u64,
308 pub slots_since_last_pulse: u64,
310 pub time_since_last_pulse: std::time::Duration,
312}
313
314#[derive(Clone, PartialEq, Eq, Hash, Debug)]
316pub struct StatsTracking<OnStats: Handler<Stats>> {
317 pub on_stats: OnStats,
319 pub tracking_interval_slots: u64,
321}
322
323#[inline(always)]
324#[allow(clippy::too_many_arguments)]
325async fn maybe_emit_stats<OnStats: Handler<Stats>>(
326 stats_tracking: Option<&StatsTracking<OnStats>>,
327 thread_index: usize,
328 thread_stats: &ThreadStats,
329 overall_slots_processed: &AtomicU64,
330 overall_blocks_processed: &AtomicU64,
331 overall_transactions_processed: &AtomicU64,
332 overall_entries_processed: &AtomicU64,
333 transactions_since_stats: &AtomicU64,
334 blocks_since_stats: &AtomicU64,
335 slots_since_stats: &AtomicU64,
336 last_pulse: &Arc<AtomicU64>,
337 base_instant: std::time::Instant,
338) -> Result<(), (FirehoseError, u64)> {
339 if let Some(stats_tracker) = stats_tracking {
340 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
341 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
342 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
343 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
344 let now_nanos = base_instant.elapsed().as_nanos() as u64;
345 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
346 let delta_nanos = now_nanos.saturating_sub(previous);
347 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
348 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
349 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
350 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
351
352 let stats = Stats {
353 thread_stats: thread_stats.clone(),
354 start_time: thread_stats.start_time,
355 finish_time: thread_stats.finish_time,
356 slot_range: thread_stats.slot_range.clone(),
357 slots_processed: total_slots,
358 blocks_processed: total_blocks,
359 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
360 transactions_processed: total_transactions,
361 entries_processed: total_entries,
362 rewards_processed: thread_stats.rewards_processed,
363 transactions_since_last_pulse: processed_transactions,
364 blocks_since_last_pulse: processed_blocks,
365 slots_since_last_pulse: processed_slots,
366 time_since_last_pulse,
367 };
368
369 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
370 last_pulse.store(previous, Ordering::Relaxed);
371 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
372 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
373 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
374 return Err((
375 FirehoseError::OnStatsHandlerError(e),
376 thread_stats.current_slot,
377 ));
378 }
379 }
380 Ok(())
381}
382
383#[inline(always)]
384fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
385 if tracking_enabled {
386 atomic.fetch_add(value, Ordering::Relaxed);
387 }
388}
389
390fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot: u64) -> bool {
391 map.get(&thread_id)
392 .map(|set| set.remove(&slot).is_some())
393 .unwrap_or(false)
394}
395
396fn decode_transaction_status_meta_from_frame(
397 slot: u64,
398 reassembled_metadata: Vec<u8>,
399) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
400 if reassembled_metadata.is_empty() {
401 return Ok(solana_transaction_status::TransactionStatusMeta::default());
403 }
404
405 match utils::decompress_zstd(reassembled_metadata.as_slice()) {
406 Ok(decompressed) => {
407 decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
408 Box::new(std::io::Error::other(format!(
409 "decode transaction metadata (slot {slot}): {err}"
410 ))) as SharedError
411 })
412 }
413 Err(decomp_err) => {
414 decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
417 Box::new(std::io::Error::other(format!(
418 "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
419 ))) as SharedError
420 })
421 }
422 }
423}
424
425#[derive(Debug, Default)]
426struct DecodedRewards {
427 keyed_rewards: Vec<(Address, RewardInfo)>,
428 num_partitions: Option<u64>,
429}
430
431impl DecodedRewards {
432 fn empty() -> Self {
433 Self {
434 keyed_rewards: Vec::new(),
435 num_partitions: None,
436 }
437 }
438}
439
440fn decode_rewards_from_frame(
441 slot: u64,
442 reassembled_rewards: Vec<u8>,
443) -> Result<DecodedRewards, SharedError> {
444 if reassembled_rewards.is_empty() {
445 return Ok(DecodedRewards::empty());
447 }
448
449 match utils::decompress_zstd(reassembled_rewards.as_slice()) {
450 Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
451 |err| {
452 Box::new(std::io::Error::other(format!(
453 "decode rewards (slot {slot}): {err}"
454 ))) as SharedError
455 },
456 ),
457 Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
458 |err| {
459 Box::new(std::io::Error::other(format!(
460 "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
461 ))) as SharedError
462 },
463 ),
464 }
465}
466
467fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
468 let epoch = slot_to_epoch(slot);
469 let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
470 prost_011::Message::decode(bytes);
471 match proto_attempt {
472 Ok(proto) => {
473 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
474 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
475 Box::new(std::io::Error::other(format!(
476 "convert rewards proto failed (epoch {epoch}): {err}"
477 ))) as SharedError
478 })?;
479 Ok(DecodedRewards {
480 keyed_rewards,
481 num_partitions,
482 })
483 }
484 Err(proto_err) => {
485 let stored: solana_storage_proto::StoredExtendedRewards =
486 bincode::deserialize(bytes).map_err(|bin_err| {
487 Box::new(std::io::Error::other(format!(
488 "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
489 ))) as SharedError
490 })?;
491 let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
492 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
493 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
494 Box::new(std::io::Error::other(format!(
495 "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
496 ))) as SharedError
497 })?;
498 Ok(DecodedRewards {
499 keyed_rewards,
500 num_partitions,
501 })
502 }
503 }
504}
505
506fn decode_transaction_status_meta(
507 slot: u64,
508 metadata_bytes: &[u8],
509) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
510 let epoch = slot_to_epoch(slot);
511 let mut bincode_err: Option<String> = None;
512 if epoch < BINCODE_EPOCH_CUTOFF {
513 match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
514 metadata_bytes,
515 ) {
516 Ok(stored) => return Ok(stored.into()),
517 Err(err) => {
518 bincode_err = Some(err.to_string());
519 }
520 }
521 }
522
523 let bin_err_for_proto = bincode_err.clone();
524 let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
525 prost_011::Message::decode(metadata_bytes).map_err(|err| {
526 if let Some(ref bin_err) = bin_err_for_proto {
528 Box::new(std::io::Error::other(format!(
529 "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
530 ))) as SharedError
531 } else {
532 Box::new(std::io::Error::other(format!(
533 "protobuf decode transaction metadata: {err}"
534 ))) as SharedError
535 }
536 })?;
537
538 proto.try_into().map_err(|err| {
539 if let Some(ref bin_err) = bincode_err {
540 Box::new(std::io::Error::other(format!(
541 "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
542 ))) as SharedError
543 } else {
544 Box::new(std::io::Error::other(format!(
545 "convert transaction metadata proto: {err}"
546 ))) as SharedError
547 }
548 })
549}
550
551#[cfg(test)]
552mod metadata_decode_tests {
553 use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
554 use solana_message::v0::LoadedAddresses;
555 use solana_storage_proto::StoredTransactionStatusMeta;
556 use solana_transaction_status::TransactionStatusMeta;
557
558 fn sample_meta() -> TransactionStatusMeta {
559 let mut meta = TransactionStatusMeta::default();
560 meta.fee = 42;
561 meta.pre_balances = vec![1, 2];
562 meta.post_balances = vec![3, 4];
563 meta.log_messages = Some(vec!["hello".into()]);
564 meta.pre_token_balances = Some(Vec::new());
565 meta.post_token_balances = Some(Vec::new());
566 meta.rewards = Some(Vec::new());
567 meta.compute_units_consumed = Some(7);
568 meta.cost_units = Some(9);
569 meta.loaded_addresses = LoadedAddresses::default();
570 meta
571 }
572
573 #[test]
574 fn decodes_bincode_metadata_for_early_epochs() {
575 let stored = StoredTransactionStatusMeta {
576 status: Ok(()),
577 fee: 42,
578 pre_balances: vec![1, 2],
579 post_balances: vec![3, 4],
580 inner_instructions: None,
581 log_messages: Some(vec!["hello".into()]),
582 pre_token_balances: Some(Vec::new()),
583 post_token_balances: Some(Vec::new()),
584 rewards: Some(Vec::new()),
585 return_data: None,
586 compute_units_consumed: Some(7),
587 cost_units: Some(9),
588 };
589 let bytes = bincode::serialize(&stored).expect("bincode serialize");
590 let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
591 assert_eq!(decoded, TransactionStatusMeta::from(stored));
592 }
593
594 #[test]
595 fn decodes_protobuf_metadata_for_later_epochs() {
596 let meta = sample_meta();
597 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
598 meta.clone().into();
599 let bytes = prost_011::Message::encode_to_vec(&generated);
600 let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
601 assert_eq!(decoded, meta);
602 }
603
604 #[test]
605 fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
606 let meta = sample_meta();
607 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
608 meta.clone().into();
609 let bytes = prost_011::Message::encode_to_vec(&generated);
610 let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
612 assert_eq!(decoded, meta);
613 }
614
615 #[test]
616 fn empty_frame_decodes_to_default() {
617 let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
618 assert_eq!(decoded, TransactionStatusMeta::default());
619 }
620
621 #[test]
622 fn raw_bincode_frame_without_zstd_still_decodes() {
623 let stored = StoredTransactionStatusMeta {
624 status: Ok(()),
625 fee: 1,
626 pre_balances: vec![],
627 post_balances: vec![],
628 inner_instructions: None,
629 log_messages: None,
630 pre_token_balances: Some(Vec::new()),
631 post_token_balances: Some(Vec::new()),
632 rewards: Some(Vec::new()),
633 return_data: None,
634 compute_units_consumed: None,
635 cost_units: None,
636 };
637 let raw_bytes = bincode::serialize(&stored).expect("serialize");
638 let decoded =
639 decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
640 assert_eq!(decoded, TransactionStatusMeta::from(stored));
641 }
642}
643
644#[cfg(test)]
645mod rewards_decode_tests {
646 use super::decode_rewards_from_bytes;
647 use solana_sdk_ids::vote::id as vote_program_id;
648 use solana_storage_proto::StoredExtendedRewards;
649 use solana_transaction_status::{Reward, RewardType};
650
651 #[test]
652 fn decodes_protobuf_rewards() {
653 let pubkey = vote_program_id().to_string();
654 let proto = solana_storage_proto::convert::generated::Rewards {
655 rewards: vec![solana_storage_proto::convert::generated::Reward {
656 pubkey,
657 lamports: 5,
658 post_balance: 10,
659 reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
660 commission: "1".to_string(),
661 }],
662 num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
663 num_partitions: 2,
664 }),
665 };
666 let bytes = prost_011::Message::encode_to_vec(&proto);
667 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
668 assert_eq!(decoded.keyed_rewards.len(), 1);
669 assert_eq!(decoded.num_partitions, Some(2));
670 }
671
672 #[test]
673 fn decodes_bincode_rewards() {
674 let pubkey = vote_program_id().to_string();
675 let reward = Reward {
676 pubkey,
677 lamports: 7,
678 post_balance: 9,
679 reward_type: Some(RewardType::Rent),
680 commission: Some(3),
681 };
682 let stored_rewards: StoredExtendedRewards = vec![reward.into()];
683 let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
684 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
685 assert_eq!(decoded.keyed_rewards.len(), 1);
686 assert_eq!(decoded.num_partitions, None);
687 }
688}
689
690#[derive(Debug, Clone)]
692pub struct TransactionData {
693 pub slot: u64,
695 pub transaction_slot_index: usize,
697 pub signature: solana_signature::Signature,
699 pub message_hash: Hash,
701 pub is_vote: bool,
703 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
705 pub transaction: VersionedTransaction,
707}
708
709#[derive(Debug, Clone)]
711pub struct EntryData {
712 pub slot: u64,
714 pub entry_index: usize,
716 pub transaction_indexes: Range<usize>,
718 pub num_hashes: u64,
720 pub hash: Hash,
722}
723
724#[derive(Debug, Clone)]
726pub struct RewardsData {
727 pub slot: u64,
729 pub rewards: Vec<(Address, RewardInfo)>,
731}
732
733#[derive(Debug)]
735pub enum BlockData {
736 Block {
738 parent_slot: u64,
740 parent_blockhash: Hash,
742 slot: u64,
744 blockhash: Hash,
746 rewards: KeyedRewardsAndNumPartitions,
748 block_time: Option<i64>,
750 block_height: Option<u64>,
752 executed_transaction_count: u64,
754 entry_count: u64,
756 },
757 PossibleLeaderSkipped {
760 slot: u64,
762 },
763}
764
765impl BlockData {
766 #[inline(always)]
768 pub const fn slot(&self) -> u64 {
769 match self {
770 BlockData::Block { slot, .. } => *slot,
771 BlockData::PossibleLeaderSkipped { slot } => *slot,
772 }
773 }
774
775 #[inline(always)]
777 pub const fn was_skipped(&self) -> bool {
778 matches!(self, BlockData::PossibleLeaderSkipped { .. })
779 }
780
781 #[inline(always)]
783 pub const fn block_time(&self) -> Option<i64> {
784 match self {
785 BlockData::Block { block_time, .. } => *block_time,
786 BlockData::PossibleLeaderSkipped { .. } => None,
787 }
788 }
789}
790
791type HandlerResult = Result<(), SharedError>;
792type HandlerFuture = BoxFuture<'static, HandlerResult>;
793
794pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
796
797impl<Data, F> Handler<Data> for F where
798 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
799{
800}
801
802pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
804pub type OnBlockFn = HandlerFn<BlockData>;
806pub type OnTxFn = HandlerFn<TransactionData>;
808pub type OnEntryFn = HandlerFn<EntryData>;
810pub type OnRewardFn = HandlerFn<RewardsData>;
812pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
814pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
816pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
818
819#[derive(Clone, Debug)]
821pub struct FirehoseErrorContext {
822 pub thread_id: usize,
824 pub slot: u64,
826 pub epoch: u64,
828 pub error_message: String,
830}
831
832#[inline]
844#[allow(clippy::too_many_arguments)]
845pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
846 threads: u64,
847 sequential: bool,
848 buffer_window_bytes: Option<u64>,
849 slot_range: Range<u64>,
850 on_block: Option<OnBlock>,
851 on_tx: Option<OnTransaction>,
852 on_entry: Option<OnEntry>,
853 on_rewards: Option<OnRewards>,
854 on_error: Option<OnError>,
855 stats_tracking: Option<StatsTracking<OnStats>>,
856 shutdown_signal: Option<broadcast::Receiver<()>>,
857) -> Result<(), (FirehoseError, u64)>
858where
859 OnBlock: Handler<BlockData>,
860 OnTransaction: Handler<TransactionData>,
861 OnEntry: Handler<EntryData>,
862 OnRewards: Handler<RewardsData>,
863 OnStats: Handler<Stats>,
864 OnError: Handler<FirehoseErrorContext>,
865{
866 if threads == 0 {
867 return Err((
868 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
869 slot_range.start,
870 ));
871 }
872 let client = crate::network::create_http_client();
873 log::info!(target: LOG_MODULE, "starting firehose...");
874 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
875 let firehose_threads = if sequential { 1 } else { threads };
876 let sequential_download_threads = std::cmp::max(1, threads as usize);
877 let sequential_buffer_window_bytes = buffer_window_bytes
878 .filter(|value| *value >= 2)
879 .unwrap_or_else(crate::system::default_firehose_buffer_window_bytes);
880 if sequential {
881 log::info!(
882 target: LOG_MODULE,
883 "sequential mode enabled: firehose_threads=1, ripget_threads={}, ripget_window={}",
884 sequential_download_threads,
885 crate::system::format_byte_size(sequential_buffer_window_bytes)
886 );
887 }
888
889 let slot_range = Arc::new(slot_range);
890
891 let subranges = generate_subranges(&slot_range, firehose_threads);
893 if firehose_threads > 1 {
894 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
895 }
896
897 let firehose_start = std::time::Instant::now();
898 let shutdown_flag = Arc::new(AtomicBool::new(false));
899 if let Some(ref rx) = shutdown_signal {
900 let mut rx = rx.resubscribe();
901 let flag = shutdown_flag.clone();
902 tokio::spawn(async move {
903 if rx.recv().await.is_ok() {
904 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
905 flag.store(true, Ordering::SeqCst);
906 }
907 });
908 }
909
910 let shared_ripget_client: Option<ripget::Client> = if sequential {
912 Some(
913 ripget::build_client(Some(&format!(
914 "jetstreamer-firehose/{}",
915 env!("CARGO_PKG_VERSION")
916 )))
917 .expect("failed to build ripget HTTP client"),
918 )
919 } else {
920 None
921 };
922
923 let mut handles = Vec::new();
924 let error_counts: Arc<Vec<AtomicU32>> =
926 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
927
928 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
929 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
930 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
931 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
932 let pending_skipped_slots: Arc<DashMap<usize, DashSet<u64>>> = Arc::new(DashMap::new());
933
934 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
935 let error_counts = error_counts.clone();
936 let client = client.clone();
937 let on_block = on_block.clone();
938 let on_tx = on_tx.clone();
939 let on_entry = on_entry.clone();
940 let on_reward = on_rewards.clone();
941 let on_error = on_error.clone();
942 let overall_slots_processed = overall_slots_processed.clone();
943 let overall_blocks_processed = overall_blocks_processed.clone();
944 let overall_transactions_processed = overall_transactions_processed.clone();
945 let overall_entries_processed = overall_entries_processed.clone();
946 let stats_tracking = stats_tracking.clone();
947 let transactions_since_stats = Arc::new(AtomicU64::new(0));
948 let blocks_since_stats = Arc::new(AtomicU64::new(0));
949 let slots_since_stats = Arc::new(AtomicU64::new(0));
950 let last_pulse = Arc::new(AtomicU64::new(0));
951 let transactions_since_stats_cloned = transactions_since_stats.clone();
952 let blocks_since_stats_cloned = blocks_since_stats.clone();
953 let slots_since_stats_cloned = slots_since_stats.clone();
954 let last_pulse_cloned = last_pulse.clone();
955 let shutdown_flag = shutdown_flag.clone();
956 let pending_skipped_slots = pending_skipped_slots.clone();
957 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
958 let sequential_mode = sequential;
959 let ripget_threads = sequential_download_threads;
960 let ripget_buffer_window_bytes = sequential_buffer_window_bytes;
961 let ripget_client = shared_ripget_client.clone();
962
963 let handle = tokio::spawn(async move {
964 let transactions_since_stats = transactions_since_stats_cloned;
965 let blocks_since_stats = blocks_since_stats_cloned;
966 let slots_since_stats = slots_since_stats_cloned;
967 let last_pulse = last_pulse_cloned;
968 let mut shutdown_rx = thread_shutdown_rx;
969 let start_time = firehose_start;
970 last_pulse.store(
971 firehose_start.elapsed().as_nanos() as u64,
972 Ordering::Relaxed,
973 );
974 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
975 let mut skip_until_index = None;
976 let last_emitted_slot = slot_range.start.saturating_sub(1);
977 let block_enabled = on_block.is_some();
978 let tx_enabled = on_tx.is_some();
979 let entry_enabled = on_entry.is_some();
980 let reward_enabled = on_reward.is_some();
981 let tracking_enabled = stats_tracking.is_some();
982 if block_enabled {
983 pending_skipped_slots.entry(thread_index).or_default();
984 }
985 let mut last_counted_slot = slot_range.start.saturating_sub(1);
986 let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
987 let mut thread_stats = if tracking_enabled {
988 Some(ThreadStats {
989 thread_id: thread_index,
990 start_time,
991 finish_time: None,
992 slot_range: slot_range.clone(),
993 initial_slot_range: slot_range.clone(),
994 current_slot: slot_range.start,
995 slots_processed: 0,
996 blocks_processed: 0,
997 leader_skipped_slots: 0,
998 transactions_processed: 0,
999 entries_processed: 0,
1000 rewards_processed: 0,
1001 })
1002 } else {
1003 None
1004 };
1005
1006 while let Err((err, slot)) = async {
1008 let mut last_emitted_slot = last_emitted_slot_global;
1009 let op_timeout = if sequential_mode {
1010 OP_TIMEOUT_SEQUENTIAL
1011 } else {
1012 OP_TIMEOUT
1013 };
1014 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1015 log::info!(
1016 target: &log_target,
1017 "shutdown requested; terminating firehose thread {}",
1018 thread_index
1019 );
1020 return Ok(());
1021 }
1022 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
1023 log::info!(
1024 target: &log_target,
1025 "slot range: {} (epoch {}) ... {} (epoch {})",
1026 slot_range.start,
1027 slot_to_epoch(slot_range.start),
1028 slot_range.end,
1029 slot_to_epoch(slot_range.end)
1030 );
1031
1032 log::info!(target: &log_target, "🚒 starting firehose...");
1033
1034 let mut current_slot: Option<u64> = None;
1036 for epoch_num in epoch_range.clone() {
1037 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1038 log::info!(
1039 target: &log_target,
1040 "shutdown requested; terminating firehose thread {}",
1041 thread_index
1042 );
1043 return Ok(());
1044 }
1045 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1046 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1047 let local_start = std::cmp::max(slot_range.start, epoch_start);
1048 let local_end_inclusive =
1049 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1050 if local_start > local_end_inclusive {
1051 log::debug!(
1052 target: &log_target,
1053 "epoch {} has no overlap with thread range ({}..{}), skipping",
1054 epoch_num,
1055 slot_range.start,
1056 slot_range.end
1057 );
1058 continue;
1059 }
1060 let use_sequential_stream = sequential_mode && local_start == epoch_start;
1061 let stream = match timeout(op_timeout, async {
1062 if use_sequential_stream {
1063 fetch_epoch_stream_with_options(
1064 epoch_num,
1065 &client,
1066 Some(FetchEpochStreamOptions {
1067 sequential: true,
1068 ripget_threads,
1069 buffer_window_bytes: ripget_buffer_window_bytes,
1070 ripget_client: ripget_client.clone(),
1071 }),
1072 )
1073 .await
1074 } else {
1075 fetch_epoch_stream(epoch_num, &client).await
1076 }
1077 })
1078 .await
1079 {
1080 Ok(stream) => stream,
1081 Err(_) => {
1082 return Err((
1083 FirehoseError::OperationTimeout("fetch_epoch_stream"),
1084 current_slot.unwrap_or(slot_range.start),
1085 ));
1086 }
1087 };
1088 let mut reader = NodeReader::new(stream);
1089
1090 let header_fut = reader.read_raw_header();
1091 let header = match timeout(op_timeout, header_fut).await {
1092 Ok(res) => res
1093 .map_err(FirehoseError::ReadHeader)
1094 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1095 Err(_) => {
1096 return Err((
1097 FirehoseError::OperationTimeout("read_raw_header"),
1098 current_slot.unwrap_or(slot_range.start),
1099 ));
1100 }
1101 };
1102 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1103
1104 let mut previous_blockhash = Hash::default();
1105 let mut latest_entry_blockhash = Hash::default();
1106 last_counted_slot = local_start.saturating_sub(1);
1109 current_slot = None;
1110 if tracking_enabled
1111 && let Some(ref mut stats) = thread_stats {
1112 stats.current_slot = local_start;
1113 stats.slot_range.start = local_start;
1114 }
1115
1116 if local_start > epoch_start {
1117 let seek_slot = match timeout(
1120 OP_TIMEOUT,
1121 find_previous_indexed_slot(local_start, epoch_start, &log_target),
1122 )
1123 .await
1124 {
1125 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1126 Err(_) => {
1127 return Err((
1128 FirehoseError::OperationTimeout(
1129 "seek_to_previous_indexed_slot",
1130 ),
1131 current_slot.unwrap_or(slot_range.start),
1132 ));
1133 }
1134 };
1135 if let Some(seek_slot) = seek_slot {
1136 let seek_fut = reader.seek_to_slot(seek_slot);
1137 match timeout(op_timeout, seek_fut).await {
1138 Ok(res) => {
1139 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1140 }
1141 Err(_) => {
1142 return Err((
1143 FirehoseError::OperationTimeout("seek_to_slot"),
1144 current_slot.unwrap_or(slot_range.start),
1145 ));
1146 }
1147 }
1148 }
1149 }
1150
1151 let mut item_index = 0;
1153 let mut displayed_skip_message = false;
1154 loop {
1155 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1156 log::info!(
1157 target: &log_target,
1158 "shutdown requested; terminating firehose thread {}",
1159 thread_index
1160 );
1161 return Ok(());
1162 }
1163 let read_fut = reader.read_until_block();
1164 let nodes = match timeout(op_timeout, read_fut).await {
1165 Ok(result) => result
1166 .map_err(FirehoseError::ReadUntilBlockError)
1167 .map_err(|e| {
1168 (
1169 e,
1170 current_slot
1171 .map(|slot| slot.saturating_add(1))
1172 .unwrap_or(slot_range.start),
1173 )
1174 })?,
1175 Err(_) => {
1176 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1177 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1178 }
1179 };
1180 if nodes.is_empty() {
1181 log::info!(
1182 target: &log_target,
1183 "reached end of epoch {}",
1184 epoch_num
1185 );
1186 break;
1187 }
1188 if let Some(last_node) = nodes.0.last()
1189 && !last_node.get_node().is_block()
1190 {
1191 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1192 break;
1193 }
1194 let block = nodes
1195 .get_block()
1196 .map_err(FirehoseError::GetBlockError)
1197 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1198 log::debug!(
1199 target: &log_target,
1200 "read {} items from epoch {}, now at slot {}",
1201 item_index,
1202 epoch_num,
1203 block.slot
1204 );
1205 let slot = block.slot;
1206 if slot > local_end_inclusive {
1207 log::debug!(
1208 target: &log_target,
1209 "reached end of local slice at slot {} (epoch {}), stopping",
1210 slot,
1211 epoch_num
1212 );
1213 break;
1214 }
1215 if slot >= slot_range.end {
1216 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1217 if block_enabled {
1222 pending_skipped_slots.remove(&thread_index);
1223 }
1224 return Ok(());
1225 }
1226 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1227 if slot < slot_range.start {
1228 if slot.saturating_add(1) == slot_range.start {
1229 log::debug!(
1230 target: &log_target,
1231 "priming reader with preceding slot {}, skipping",
1232 slot
1233 );
1234 } else {
1235 log::warn!(
1236 target: &log_target,
1237 "encountered slot {} before start of range {}, skipping",
1238 slot,
1239 slot_range.start
1240 );
1241 }
1242 continue;
1243 }
1244 current_slot = Some(slot);
1245 let mut entry_index: usize = 0;
1246 let mut this_block_executed_transaction_count: u64 = 0;
1247 let mut this_block_entry_count: u64 = 0;
1248 let mut this_block_rewards = DecodedRewards::empty();
1249
1250 for node_with_cid in &nodes.0 {
1251 item_index += 1;
1252 if let Some(skip) = skip_until_index {
1253 if item_index < skip {
1254 if !displayed_skip_message {
1255 log::info!(
1256 target: &log_target,
1257 "skipping until index {} (at {})",
1258 skip,
1259 item_index
1260 );
1261 displayed_skip_message = true;
1262 }
1263 continue;
1264 } else {
1265 log::info!(
1266 target: &log_target,
1267 "reached target index {}, resuming...",
1268 skip
1269 );
1270 skip_until_index = None;
1271 }
1272 }
1273 let node = node_with_cid.get_node();
1274
1275 if let Some(ref mut stats) = thread_stats {
1276 stats.current_slot = slot;
1277 }
1278
1279 let error_slot = current_slot.unwrap_or(slot_range.start);
1280
1281 use crate::node::Node::*;
1282 match node {
1283 Transaction(tx) => {
1284 if tx_enabled
1285 && let Some(on_tx_cb) = on_tx.as_ref()
1286 {
1287 let error_slot = current_slot.unwrap_or(slot_range.start);
1288 let versioned_tx = tx.as_parsed().map_err(|err| {
1289 (
1290 FirehoseError::NodeDecodingError(item_index, err),
1291 error_slot,
1292 )
1293 })?;
1294 let reassembled_metadata = nodes
1295 .reassemble_dataframes(&tx.metadata)
1296 .map_err(|err| {
1297 (
1298 FirehoseError::NodeDecodingError(item_index, err),
1299 error_slot,
1300 )
1301 })?;
1302
1303 let as_native_metadata = decode_transaction_status_meta_from_frame(
1304 block.slot,
1305 reassembled_metadata,
1306 )
1307 .map_err(|err| {
1308 (
1309 FirehoseError::NodeDecodingError(item_index, err),
1310 error_slot,
1311 )
1312 })?;
1313
1314 let message_hash = {
1315 #[cfg(feature = "verify-transaction-signatures")]
1316 {
1317 versioned_tx.verify_and_hash_message().map_err(|err| {
1318 (
1319 FirehoseError::TransactionHandlerError(Box::new(err)),
1320 error_slot,
1321 )
1322 })?
1323 }
1324 #[cfg(not(feature = "verify-transaction-signatures"))]
1325 {
1326 versioned_tx.message.hash()
1327 }
1328 };
1329 let signature = versioned_tx
1330 .signatures
1331 .first()
1332 .ok_or_else(|| {
1333 Box::new(std::io::Error::new(
1334 std::io::ErrorKind::InvalidData,
1335 "transaction missing signature",
1336 )) as SharedError
1337 })
1338 .map_err(|err| {
1339 (
1340 FirehoseError::NodeDecodingError(
1341 item_index,
1342 err,
1343 ),
1344 error_slot,
1345 )
1346 })?;
1347 let is_vote = is_simple_vote_transaction(&versioned_tx);
1348
1349 on_tx_cb(
1350 thread_index,
1351 TransactionData {
1352 slot: block.slot,
1353 transaction_slot_index: tx.index.unwrap() as usize,
1354 signature: *signature,
1355 message_hash,
1356 is_vote,
1357 transaction_status_meta: as_native_metadata,
1358 transaction: versioned_tx,
1359 },
1360 )
1361 .await
1362 .map_err(|e| {
1363 (
1364 FirehoseError::TransactionHandlerError(e),
1365 error_slot,
1366 )
1367 })?;
1368 }
1369 fetch_add_if(
1370 tracking_enabled,
1371 &overall_transactions_processed,
1372 1,
1373 );
1374 if let Some(ref mut stats) = thread_stats {
1375 stats.transactions_processed += 1;
1376 }
1377 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1378 }
1379 Entry(entry) => {
1380 let entry_hash = Hash::from(entry.hash.to_bytes());
1381 let entry_transaction_count = entry.transactions.len();
1382 let entry_transaction_count_u64 = entry_transaction_count as u64;
1383 let starting_transaction_index_u64 =
1384 this_block_executed_transaction_count;
1385 latest_entry_blockhash = entry_hash;
1386 this_block_executed_transaction_count += entry_transaction_count_u64;
1387 this_block_entry_count += 1;
1388
1389 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1390 let starting_transaction_index = usize::try_from(
1391 starting_transaction_index_u64,
1392 )
1393 .map_err(|err| {
1394 (
1395 FirehoseError::EntryHandlerError(Box::new(err)),
1396 error_slot,
1397 )
1398 })?;
1399 let transaction_indexes_end =
1400 starting_transaction_index + entry_transaction_count;
1401 on_entry_cb(
1402 thread_index,
1403 EntryData {
1404 slot: block.slot,
1405 entry_index,
1406 transaction_indexes: starting_transaction_index
1407 ..transaction_indexes_end,
1408 num_hashes: entry.num_hashes,
1409 hash: entry_hash,
1410 },
1411 )
1412 .await
1413 .map_err(|e| {
1414 (
1415 FirehoseError::EntryHandlerError(e),
1416 error_slot,
1417 )
1418 })?;
1419 }
1420 entry_index += 1;
1421 fetch_add_if(
1422 tracking_enabled,
1423 &overall_entries_processed,
1424 1,
1425 );
1426 if let Some(ref mut stats) = thread_stats {
1427 stats.entries_processed += 1;
1428 }
1429 }
1430 Block(block) => {
1431 let prev_last_counted_slot = last_counted_slot;
1432 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1433 (
1434 stats.slots_processed,
1435 stats.blocks_processed,
1436 stats.leader_skipped_slots,
1437 stats.current_slot,
1438 )
1439 });
1440
1441 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1442 let skip_start_from_previous = last_counted_slot.saturating_add(1);
1443 let skip_start = skip_start_from_previous.max(next_expected_slot);
1444
1445 let skipped_epoch = slot_to_epoch(last_counted_slot);
1446 for skipped_slot in skip_start..slot {
1447 if slot_to_epoch(skipped_slot) != skipped_epoch {
1448 break;
1449 }
1450 log::debug!(
1451 target: &log_target,
1452 "leader skipped slot {} (prev_counted {}, current slot {})",
1453 skipped_slot,
1454 prev_last_counted_slot,
1455 slot,
1456 );
1457 if block_enabled {
1458 pending_skipped_slots
1459 .entry(thread_index)
1460 .or_default()
1461 .insert(skipped_slot);
1462 }
1463 if block_enabled
1464 && let Some(on_block_cb) = on_block.as_ref()
1465 && skipped_slot > last_emitted_slot {
1466 last_emitted_slot = skipped_slot;
1467 on_block_cb(
1468 thread_index,
1469 BlockData::PossibleLeaderSkipped {
1470 slot: skipped_slot,
1471 },
1472 )
1473 .await
1474 .map_err(|e| {
1475 (
1476 FirehoseError::BlockHandlerError(e),
1477 error_slot,
1478 )
1479 })?;
1480 }
1481 if tracking_enabled {
1482 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1483 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1484 if let Some(ref mut stats) = thread_stats {
1485 stats.leader_skipped_slots += 1;
1486 stats.slots_processed += 1;
1487 stats.current_slot = skipped_slot;
1488 }
1489 }
1490 last_counted_slot = skipped_slot;
1491 }
1492
1493 let cleared_pending_skip = if block_enabled {
1494 clear_pending_skip(
1495 &pending_skipped_slots,
1496 thread_index,
1497 slot,
1498 )
1499 } else {
1500 false
1501 };
1502
1503 if slot <= last_counted_slot && !cleared_pending_skip {
1504 log::debug!(
1505 target: &log_target,
1506 "duplicate block {}, already counted (last_counted={})",
1507 slot,
1508 last_counted_slot,
1509 );
1510 this_block_rewards = DecodedRewards::empty();
1511 continue;
1512 }
1513
1514 if block_enabled {
1515 if let Some(on_block_cb) = on_block.as_ref() {
1516 let DecodedRewards {
1517 keyed_rewards,
1518 num_partitions,
1519 } = std::mem::take(&mut this_block_rewards);
1520 if slot > last_emitted_slot {
1521 last_emitted_slot = slot;
1522 on_block_cb(
1523 thread_index,
1524 BlockData::Block {
1525 parent_slot: block.meta.parent_slot,
1526 parent_blockhash: previous_blockhash,
1527 slot: block.slot,
1528 blockhash: latest_entry_blockhash,
1529 rewards: KeyedRewardsAndNumPartitions {
1530 keyed_rewards,
1531 num_partitions,
1532 },
1533 block_time: Some(block.meta.blocktime as i64),
1534 block_height: block.meta.block_height,
1535 executed_transaction_count:
1536 this_block_executed_transaction_count,
1537 entry_count: this_block_entry_count,
1538 },
1539 )
1540 .await
1541 .map_err(|e| {
1542 (
1543 FirehoseError::BlockHandlerError(e),
1544 error_slot,
1545 )
1546 })?;
1547 }
1548 }
1549 } else {
1550 this_block_rewards = DecodedRewards::empty();
1551 }
1552 previous_blockhash = latest_entry_blockhash;
1553
1554 if tracking_enabled {
1555 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1556 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1557 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1558 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1559 if let Some(ref mut stats) = thread_stats {
1560 stats.blocks_processed += 1;
1561 stats.slots_processed += 1;
1562 stats.current_slot = slot;
1563 }
1564
1565 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1566 (&stats_tracking, thread_stats.as_mut())
1567 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1568 && let Err(err) = maybe_emit_stats(
1569 stats_tracking.as_ref(),
1570 thread_index,
1571 thread_stats_ref,
1572 &overall_slots_processed,
1573 &overall_blocks_processed,
1574 &overall_transactions_processed,
1575 &overall_entries_processed,
1576 &transactions_since_stats,
1577 &blocks_since_stats,
1578 &slots_since_stats,
1579 &last_pulse,
1580 start_time,
1581 )
1582 .await
1583 {
1584 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1585 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1586 overall_blocks_processed
1587 .fetch_sub(1, Ordering::Relaxed);
1588 overall_slots_processed
1589 .fetch_sub(1, Ordering::Relaxed);
1590 if let Some((
1591 prev_slots_processed,
1592 prev_blocks_processed,
1593 prev_leader_skipped,
1594 prev_current_slot,
1595 )) = thread_stats_snapshot
1596 {
1597 thread_stats_ref.slots_processed =
1598 prev_slots_processed;
1599 thread_stats_ref.blocks_processed =
1600 prev_blocks_processed;
1601 thread_stats_ref.leader_skipped_slots =
1602 prev_leader_skipped;
1603 thread_stats_ref.current_slot =
1604 prev_current_slot;
1605 }
1606 last_counted_slot = prev_last_counted_slot;
1607 return Err(err);
1608 }
1609 }
1610
1611 if slot > last_counted_slot {
1612 last_counted_slot = slot;
1613 }
1614 }
1615 Subset(_subset) => (),
1616 Epoch(_epoch) => (),
1617 Rewards(rewards) => {
1618 if reward_enabled || block_enabled {
1619 let reassembled = nodes
1620 .reassemble_dataframes(&rewards.data)
1621 .map_err(|err| {
1622 (
1623 FirehoseError::NodeDecodingError(item_index, err),
1624 current_slot.unwrap_or(slot_range.start),
1625 )
1626 })?;
1627 if reassembled.is_empty() {
1628 this_block_rewards = DecodedRewards::empty();
1629 if reward_enabled
1630 && let Some(on_reward_cb) = on_reward.as_ref()
1631 {
1632 on_reward_cb(
1633 thread_index,
1634 RewardsData {
1635 slot: block.slot,
1636 rewards: Vec::new(),
1637 },
1638 )
1639 .await
1640 .map_err(|e| {
1641 (
1642 FirehoseError::RewardHandlerError(e),
1643 error_slot,
1644 )
1645 })?;
1646 }
1647 continue;
1648 }
1649
1650 let decoded_rewards =
1651 decode_rewards_from_frame(block.slot, reassembled)
1652 .map_err(|err| {
1653 (
1654 FirehoseError::NodeDecodingError(
1655 item_index,
1656 err,
1657 ),
1658 error_slot,
1659 )
1660 })?;
1661 if reward_enabled
1662 && let Some(on_reward_cb) = on_reward.as_ref()
1663 {
1664 on_reward_cb(
1665 thread_index,
1666 RewardsData {
1667 slot: block.slot,
1668 rewards: decoded_rewards.keyed_rewards.clone(),
1669 },
1670 )
1671 .await
1672 .map_err(|e| {
1673 (
1674 FirehoseError::RewardHandlerError(e),
1675 error_slot,
1676 )
1677 })?;
1678 }
1679 this_block_rewards = decoded_rewards;
1680 if let Some(ref mut stats) = thread_stats {
1681 stats.rewards_processed +=
1682 this_block_rewards.keyed_rewards.len() as u64;
1683 }
1684 }
1685 }
1686 DataFrame(_data_frame) => (),
1687 }
1688 }
1689 if block.slot == slot_range.end - 1 {
1690 let finish_time = std::time::Instant::now();
1691 let elapsed = finish_time.duration_since(start_time);
1692 log::info!(target: &log_target, "processed slot {}", block.slot);
1693 let elapsed_pretty = human_readable_duration(elapsed);
1694 log::info!(
1695 target: &log_target,
1696 "processed {} slots across {} epochs in {}.",
1697 slot_range.end - slot_range.start,
1698 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1699 elapsed_pretty
1700 );
1701 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1702 let summary: String = error_counts
1705 .iter()
1706 .enumerate()
1707 .filter_map(|(i, c)| {
1708 let v = c.load(Ordering::Relaxed);
1709 if v > 0 {
1710 Some(format!("{:03}({})", i, v))
1711 } else {
1712 None
1713 }
1714 })
1715 .collect::<Vec<_>>()
1716 .join(", ");
1717 if !summary.is_empty() {
1718 log::debug!(target: &log_target, "threads with errors: {}", summary);
1719 }
1720 return Ok(());
1721 }
1722 }
1723 if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1724 && last_counted_slot < expected_last_slot
1725 {
1726 }
1729 if let Some(ref mut stats) = thread_stats {
1730 stats.finish_time = Some(std::time::Instant::now());
1731 maybe_emit_stats(
1732 stats_tracking.as_ref(),
1733 thread_index,
1734 stats,
1735 &overall_slots_processed,
1736 &overall_blocks_processed,
1737 &overall_transactions_processed,
1738 &overall_entries_processed,
1739 &transactions_since_stats,
1740 &blocks_since_stats,
1741 &slots_since_stats,
1742 &last_pulse,
1743 start_time,
1744 )
1745 .await?;
1746 }
1747 if block_enabled {
1748 pending_skipped_slots.remove(&thread_index);
1749 }
1750 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1751 }
1752 Ok(())
1753 }
1754 .await
1755 {
1756 if is_shutdown_error(&err) {
1757 log::info!(
1758 target: &log_target,
1759 "shutdown requested; terminating firehose thread {}",
1760 thread_index
1761 );
1762 break;
1763 }
1764 let epoch = slot_to_epoch(slot);
1765 let item_index = match &err {
1766 FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1767 _ => 0,
1768 };
1769 let error_message = err.to_string();
1770 log::error!(
1771 target: &log_target,
1772 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1773 slot,
1774 epoch
1775 );
1776 log::error!(target: &log_target, "{}", error_message);
1777 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1778 || error_message.contains("Unknown CID version")
1779 {
1780 SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1783 }
1784 if let Some(on_error_cb) = on_error.clone() {
1785 let context = FirehoseErrorContext {
1786 thread_id: thread_index,
1787 slot,
1788 epoch,
1789 error_message: error_message.clone(),
1790 };
1791 if let Err(handler_err) = on_error_cb(thread_index, context).await {
1792 log::error!(
1793 target: &log_target,
1794 "on_error handler failed: {}",
1795 handler_err
1796 );
1797 }
1798 }
1799 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1801 log::warn!(
1802 target: &log_target,
1803 "restarting from slot {} at index {}",
1804 slot,
1805 item_index,
1806 );
1807 if slot <= last_counted_slot {
1811 slot_range.start = last_counted_slot.saturating_add(1);
1812 } else {
1813 slot_range.start = slot;
1814 }
1815 last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1817 if tracking_enabled
1818 && let Some(ref mut stats_ref) = thread_stats {
1819 stats_ref.slot_range.start = slot_range.start;
1820 stats_ref.slot_range.end = slot_range.end;
1821 }
1823 if block_enabled {
1824 pending_skipped_slots.remove(&thread_index);
1825 }
1826 skip_until_index = None;
1830 last_emitted_slot_global = last_emitted_slot;
1831 }
1832 });
1833 handles.push(handle);
1834 }
1835
1836 for handle in handles {
1838 handle.await.unwrap();
1839 }
1840 if stats_tracking.is_some() {
1841 let elapsed = firehose_start.elapsed();
1842 let elapsed_secs = elapsed.as_secs_f64();
1843 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1844 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1845 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1846 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1847 let total_errors: u64 = error_counts
1848 .iter()
1849 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1850 .sum();
1851 let overall_tps = if elapsed_secs > 0.0 {
1852 total_transactions as f64 / elapsed_secs
1853 } else {
1854 0.0
1855 };
1856 log::info!(
1857 target: LOG_MODULE,
1858 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1859 elapsed_secs,
1860 total_slots,
1861 total_blocks,
1862 total_leader_skipped,
1863 total_transactions,
1864 overall_tps,
1865 total_errors
1866 );
1867 }
1868 if shutdown_flag.load(Ordering::SeqCst) {
1869 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1870 } else {
1871 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1872 }
1873 Ok(())
1874}
1875
1876#[allow(clippy::result_large_err)]
1877pub fn firehose_geyser(
1884 rt: Arc<tokio::runtime::Runtime>,
1885 slot_range: Range<u64>,
1886 geyser_config_files: Option<&[PathBuf]>,
1887 index_base_url: &Url,
1888 client: &Client,
1889 on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1890 threads: u64,
1891) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1892 if threads == 0 {
1893 return Err((
1894 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1895 slot_range.start,
1896 ));
1897 }
1898 log::info!(target: LOG_MODULE, "starting firehose...");
1899 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1900 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1901 let mut entry_notifier_maybe = None;
1902 let mut block_meta_notifier_maybe = None;
1903 let mut transaction_notifier_maybe = None;
1904 if let Some(geyser_config_files) = geyser_config_files {
1905 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1906
1907 let service =
1908 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1909 confirmed_bank_receiver.clone(),
1910 true,
1911 geyser_config_files,
1912 )
1913 .map_err(|e| (e.into(), slot_range.start))?;
1914
1915 transaction_notifier_maybe = Some(
1916 service
1917 .get_transaction_notifier()
1918 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
1919 .map_err(|e| (e, slot_range.start))?,
1920 );
1921
1922 entry_notifier_maybe = service.get_entry_notifier();
1923 block_meta_notifier_maybe = service.get_block_metadata_notifier();
1924
1925 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
1926 }
1927
1928 if entry_notifier_maybe.is_some() {
1929 log::debug!(target: LOG_MODULE, "entry notifications enabled")
1930 } else {
1931 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
1932 }
1933 log::info!(target: LOG_MODULE, "running on_load...");
1934 rt.spawn(on_load);
1935
1936 let slot_range = Arc::new(slot_range);
1937 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
1938 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
1939 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
1940 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
1941
1942 let subranges = generate_subranges(&slot_range, threads);
1944 if threads > 1 {
1945 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
1946 }
1947
1948 let mut handles = Vec::new();
1949 let error_counts: Arc<Vec<AtomicU32>> =
1951 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
1952
1953 for (i, slot_range) in subranges.into_iter().enumerate() {
1954 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
1955 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
1956 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
1957 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
1958 let client = client.clone();
1959 let error_counts = error_counts.clone();
1960
1961 let rt_clone = rt.clone();
1962
1963 let handle = std::thread::spawn(move || {
1964 rt_clone.block_on(async {
1965 firehose_geyser_thread(
1966 slot_range,
1967 transaction_notifier_maybe,
1968 entry_notifier_maybe,
1969 block_meta_notifier_maybe,
1970 confirmed_bank_sender,
1971 &client,
1972 if threads > 1 { Some(i) } else { None },
1973 error_counts,
1974 )
1975 .await
1976 .unwrap();
1977 });
1978 });
1979 handles.push(handle);
1980 }
1981
1982 for handle in handles {
1984 handle.join().unwrap();
1985 }
1986 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1987 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
1988 block_meta_notifier.notify_block_metadata(
1989 u64::MAX,
1990 "unload",
1991 u64::MAX,
1992 "unload",
1993 &KeyedRewardsAndNumPartitions {
1994 keyed_rewards: vec![],
1995 num_partitions: None,
1996 },
1997 None,
1998 None,
1999 0,
2000 0,
2001 );
2002 }
2003 Ok(confirmed_bank_receiver)
2004}
2005
2006#[allow(clippy::too_many_arguments)]
2007#[allow(clippy::result_large_err)]
2008async fn firehose_geyser_thread(
2009 mut slot_range: Range<u64>,
2010 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
2011 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
2012 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
2013 confirmed_bank_sender: Sender<SlotNotification>,
2014 client: &Client,
2015 thread_index: Option<usize>,
2016 error_counts: Arc<Vec<AtomicU32>>,
2017) -> Result<(), (FirehoseError, u64)> {
2018 let start_time = std::time::Instant::now();
2019 let log_target = if let Some(thread_index) = thread_index {
2020 format!("{}::T{:03}", LOG_MODULE, thread_index)
2021 } else {
2022 LOG_MODULE.to_string()
2023 };
2024 let initial_slot_range = slot_range.clone();
2025 let mut skip_until_index = None;
2026 let mut last_counted_slot = slot_range.start.saturating_sub(1);
2027 while let Err((err, slot)) = async {
2029 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
2030 log::info!(
2031 target: &log_target,
2032 "slot range: {} (epoch {}) ... {} (epoch {})",
2033 slot_range.start,
2034 slot_to_epoch(slot_range.start),
2035 slot_range.end,
2036 slot_to_epoch(slot_range.end)
2037 );
2038
2039 log::info!(target: &log_target, "🚒 starting firehose...");
2040
2041 let mut current_slot: Option<u64> = None;
2043 for epoch_num in epoch_range.clone() {
2044 log::info!(target: &log_target, "entering epoch {}", epoch_num);
2045 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
2046 Ok(stream) => stream,
2047 Err(_) => {
2048 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
2049 }
2050 };
2051 let mut reader = NodeReader::new(stream);
2052
2053 let header_fut = reader.read_raw_header();
2054 let header = match timeout(OP_TIMEOUT, header_fut).await {
2055 Ok(res) => res
2056 .map_err(FirehoseError::ReadHeader)
2057 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2058 Err(_) => {
2059 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
2060 }
2061 };
2062 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
2063
2064 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
2065 let local_start = std::cmp::max(slot_range.start, epoch_start);
2066 let local_end_inclusive =
2067 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
2068 if local_start > local_end_inclusive {
2069 log::debug!(
2070 target: &log_target,
2071 "epoch {} has no overlap with thread range ({}..{}), skipping",
2072 epoch_num,
2073 slot_range.start,
2074 slot_range.end
2075 );
2076 continue;
2077 }
2078
2079 let mut todo_previous_blockhash = Hash::default();
2080 let mut todo_latest_entry_blockhash = Hash::default();
2081 last_counted_slot = local_start.saturating_sub(1);
2084 current_slot = None;
2085
2086 if local_start > epoch_start {
2087 let seek_slot = match timeout(
2090 OP_TIMEOUT,
2091 find_previous_indexed_slot(local_start, epoch_start, &log_target),
2092 )
2093 .await
2094 {
2095 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2096 Err(_) => {
2097 return Err((
2098 FirehoseError::OperationTimeout(
2099 "seek_to_previous_indexed_slot",
2100 ),
2101 current_slot.unwrap_or(slot_range.start),
2102 ));
2103 }
2104 };
2105 if let Some(seek_slot) = seek_slot {
2106 let seek_fut = reader.seek_to_slot(seek_slot);
2107 match timeout(OP_TIMEOUT, seek_fut).await {
2108 Ok(res) => {
2109 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2110 }
2111 Err(_) => {
2112 return Err((
2113 FirehoseError::OperationTimeout("seek_to_slot"),
2114 current_slot.unwrap_or(slot_range.start),
2115 ));
2116 }
2117 }
2118 }
2119 }
2120
2121 let mut item_index = 0;
2123 let mut displayed_skip_message = false;
2124 loop {
2125 let read_fut = reader.read_until_block();
2126 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2127 Ok(result) => result
2128 .map_err(FirehoseError::ReadUntilBlockError)
2129 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2130 Err(_) => {
2131 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2132 let restart_slot =
2133 current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2134 return Err((
2135 FirehoseError::OperationTimeout("read_until_block"),
2136 restart_slot,
2137 ));
2138 }
2139 };
2140 if nodes.is_empty() {
2141 log::info!(
2142 target: &log_target,
2143 "reached end of epoch {}",
2144 epoch_num
2145 );
2146 break;
2147 }
2148 if let Some(last_node) = nodes.0.last()
2157 && !last_node.get_node().is_block() {
2158 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2159 break;
2160 }
2161 let block = nodes
2162 .get_block()
2163 .map_err(FirehoseError::GetBlockError)
2164 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2165 log::debug!(
2166 target: &log_target,
2167 "read {} items from epoch {}, now at slot {}",
2168 item_index,
2169 epoch_num,
2170 block.slot
2171 );
2172 let slot = block.slot;
2173 if slot > local_end_inclusive {
2174 log::debug!(
2175 target: &log_target,
2176 "reached end of local slice at slot {} (epoch {}), stopping",
2177 slot,
2178 epoch_num
2179 );
2180 break;
2181 }
2182 if slot >= slot_range.end {
2183 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2184 return Ok(());
2188 }
2189 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2190 if slot < local_start {
2191 if slot.saturating_add(1) == local_start {
2192 log::debug!(
2193 target: &log_target,
2194 "priming reader with preceding slot {}, skipping",
2195 slot
2196 );
2197 } else {
2198 log::warn!(
2199 target: &log_target,
2200 "encountered slot {} before start of range {}, skipping",
2201 slot,
2202 local_start
2203 );
2204 }
2205 continue;
2206 }
2207 current_slot = Some(slot);
2208 let mut entry_index: usize = 0;
2209 let mut this_block_executed_transaction_count: u64 = 0;
2210 let mut this_block_entry_count: u64 = 0;
2211 let mut this_block_rewards = DecodedRewards::empty();
2212
2213 if slot <= last_counted_slot {
2214 log::debug!(
2215 target: &log_target,
2216 "duplicate block {}, already counted (last_counted={})",
2217 slot,
2218 last_counted_slot,
2219 );
2220 continue;
2221 }
2222
2223 nodes.each(|node_with_cid| -> Result<(), SharedError> {
2224 item_index += 1;
2225 if let Some(skip) = skip_until_index {
2231 if item_index < skip {
2232 if !displayed_skip_message {
2233 log::info!(
2234 target: &log_target,
2235 "skipping until index {} (at {})",
2236 skip,
2237 item_index
2238 );
2239 displayed_skip_message = true;
2240 }
2241 return Ok(());
2242 } else {
2243 log::info!(
2244 target: &log_target,
2245 "reached target index {}, resuming...",
2246 skip
2247 );
2248 skip_until_index = None;
2249 }
2250 }
2251 let node = node_with_cid.get_node();
2252
2253 use crate::node::Node::*;
2254 match node {
2255 Transaction(tx) => {
2256 let versioned_tx = tx.as_parsed()?;
2257 let reassembled_metadata = nodes.reassemble_dataframes(&tx.metadata)?;
2258
2259 let as_native_metadata = decode_transaction_status_meta_from_frame(
2260 block.slot,
2261 reassembled_metadata,
2262 )?;
2263
2264 let message_hash = {
2265 #[cfg(feature = "verify-transaction-signatures")]
2266 {
2267 versioned_tx.verify_and_hash_message()?
2268 }
2269 #[cfg(not(feature = "verify-transaction-signatures"))]
2270 {
2271 versioned_tx.message.hash()
2274 }
2275 };
2276 let signature = versioned_tx
2277 .signatures
2278 .first()
2279 .ok_or_else(|| {
2280 Box::new(std::io::Error::new(
2281 std::io::ErrorKind::InvalidData,
2282 "transaction missing signature",
2283 )) as SharedError
2284 })?;
2285 let is_vote = is_simple_vote_transaction(&versioned_tx);
2286
2287 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2288 transaction_notifier.notify_transaction(
2289 block.slot,
2290 tx.index.unwrap() as usize,
2291 signature,
2292 &message_hash,
2293 is_vote,
2294 &as_native_metadata,
2295 &versioned_tx,
2296 );
2297 }
2298
2299 }
2300 Entry(entry) => {
2301 let entry_hash = Hash::from(entry.hash.to_bytes());
2302 let entry_transaction_count = entry.transactions.len();
2303 let entry_transaction_count_u64 = entry_transaction_count as u64;
2304 let starting_transaction_index =
2305 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2306 Box::new(std::io::Error::other(
2307 "transaction index exceeds usize range",
2308 )) as SharedError
2309 })?;
2310 todo_latest_entry_blockhash = entry_hash;
2311 this_block_executed_transaction_count += entry_transaction_count_u64;
2312 this_block_entry_count += 1;
2313 if entry_notifier_maybe.is_none() {
2314 return Ok(());
2315 }
2316 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2317 let entry_summary = solana_entry::entry::EntrySummary {
2318 num_hashes: entry.num_hashes,
2319 hash: Hash::from(entry.hash.to_bytes()),
2320 num_transactions: entry_transaction_count_u64,
2321 };
2322 entry_notifier.notify_entry(
2323 block.slot,
2324 entry_index,
2325 &entry_summary,
2326 starting_transaction_index,
2327 );
2328 entry_index += 1;
2329 }
2330 Block(block) => {
2331 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2332 confirmed_bank_sender.send(notification).unwrap();
2333
2334 if block_meta_notifier_maybe.is_none() {
2335 last_counted_slot = block.slot;
2336 return Ok(());
2337 }
2338 let DecodedRewards {
2339 keyed_rewards,
2340 num_partitions,
2341 } = std::mem::take(&mut this_block_rewards);
2342 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2343 block_meta_notifier.notify_block_metadata(
2344 block.meta.parent_slot,
2345 todo_previous_blockhash.to_string().as_str(),
2346 block.slot,
2347 todo_latest_entry_blockhash.to_string().as_str(),
2348 &KeyedRewardsAndNumPartitions {
2349 keyed_rewards,
2350 num_partitions,
2351 },
2352 Some(block.meta.blocktime as i64),
2353 block.meta.block_height,
2354 this_block_executed_transaction_count,
2355 this_block_entry_count,
2356 );
2357 todo_previous_blockhash = todo_latest_entry_blockhash;
2358 last_counted_slot = block.slot;
2359 std::thread::yield_now();
2360 }
2361 Subset(_subset) => (),
2362 Epoch(_epoch) => (),
2363 Rewards(rewards) => {
2364 let reassembled = nodes.reassemble_dataframes(&rewards.data)?;
2365 if !reassembled.is_empty() {
2366 this_block_rewards = decode_rewards_from_frame(
2367 block.slot,
2368 reassembled,
2369 )?;
2370 } else {
2371 this_block_rewards = DecodedRewards::empty();
2372 }
2373 }
2374 DataFrame(_data_frame) => (),
2375 }
2376 Ok(())
2377 })
2378 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2379 if block.slot == slot_range.end - 1 {
2380 let finish_time = std::time::Instant::now();
2381 let elapsed = finish_time.duration_since(start_time);
2382 log::info!(target: &log_target, "processed slot {}", block.slot);
2383 let elapsed_pretty = human_readable_duration(elapsed);
2384 log::info!(
2385 target: &log_target,
2386 "processed {} slots across {} epochs in {}.",
2387 initial_slot_range.end - initial_slot_range.start,
2388 slot_to_epoch(initial_slot_range.end)
2389 + 1
2390 - slot_to_epoch(initial_slot_range.start),
2391 elapsed_pretty
2392 );
2393 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2394 let summary: String = error_counts
2397 .iter()
2398 .enumerate()
2399 .filter_map(|(i, c)| {
2400 let v = c.load(Ordering::Relaxed);
2401 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2402 })
2403 .collect::<Vec<_>>()
2404 .join(", ");
2405 if !summary.is_empty() {
2406 log::debug!(target: &log_target, "threads with errors: {}", summary);
2407 }
2408 return Ok(());
2409 }
2410 }
2411 }
2412 Ok(())
2413}
2414.await
2415{
2416 if is_shutdown_error(&err) {
2417 log::info!(
2418 target: &log_target,
2419 "shutdown requested; terminating firehose thread {:?}",
2420 thread_index
2421 );
2422 return Ok(());
2423 }
2424 log::error!(
2425 target: &log_target,
2426 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2427 slot,
2428 slot_to_epoch(slot)
2429 );
2430 log::error!(target: &log_target, "{}", err);
2431 let error_message = err.to_string();
2432 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2433 || error_message.contains("Unknown CID version")
2434 {
2435 SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2438 }
2439 let item_index = match err {
2440 FirehoseError::NodeDecodingError(item_index, _) => item_index,
2441 _ => 0,
2442 };
2443 let idx = thread_index.unwrap_or(0);
2445 error_counts[idx].fetch_add(1, Ordering::Relaxed);
2446 log::warn!(
2447 target: &log_target,
2448 "restarting from slot {} at index {}",
2449 slot,
2450 item_index,
2451 );
2452 if slot <= last_counted_slot {
2455 slot_range.start = last_counted_slot.saturating_add(1);
2456 } else {
2457 slot_range.start = slot;
2458 }
2459 skip_until_index = None;
2463}
2464 Ok(())
2465}
2466
2467#[inline]
2468fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2469 if !(1..=2).contains(&versioned_tx.signatures.len()) {
2470 return false;
2471 }
2472
2473 if !matches!(
2474 versioned_tx.version(),
2475 solana_transaction::versioned::TransactionVersion::Legacy(_)
2476 ) {
2477 return false;
2478 }
2479
2480 let instructions = versioned_tx.message.instructions();
2481 if instructions.len() != 1 {
2482 return false;
2483 }
2484
2485 let program_index = instructions[0].program_id_index as usize;
2486 versioned_tx
2487 .message
2488 .static_account_keys()
2489 .get(program_index)
2490 .map(|program_id| program_id == &vote_program_id())
2491 .unwrap_or(false)
2492}
2493
2494#[inline(always)]
2495fn convert_proto_rewards(
2496 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2497) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2498 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2499 for proto_reward in proto_rewards.rewards.iter() {
2500 let reward = RewardInfo {
2501 reward_type: match proto_reward.reward_type - 1 {
2502 0 => RewardType::Fee,
2503 1 => RewardType::Rent,
2504 2 => RewardType::Staking,
2505 3 => RewardType::Voting,
2506 typ => {
2507 return Err(Box::new(std::io::Error::other(format!(
2508 "unsupported reward type {}",
2509 typ
2510 ))));
2511 }
2512 },
2513 lamports: proto_reward.lamports,
2514 post_balance: proto_reward.post_balance,
2515 commission: proto_reward.commission.parse::<u8>().ok(),
2516 };
2517 let pubkey = proto_reward
2518 .pubkey
2519 .parse::<Address>()
2520 .map_err(|err| Box::new(err) as SharedError)?;
2521 keyed_rewards.push((pubkey, reward));
2522 }
2523 Ok(keyed_rewards)
2524}
2525
2526#[inline]
2527pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2529 let total = slot_range.end - slot_range.start;
2530 let slots_per_thread = total / threads;
2531 let remainder = total % threads;
2532
2533 let ranges: Vec<Range<u64>> = (0..threads)
2534 .map(|i| {
2535 let extra_slot = if i < remainder { 1 } else { 0 };
2537 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2538 let end = start + slots_per_thread + extra_slot;
2539 start..end
2540 })
2541 .collect();
2542
2543 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2545 assert_eq!(
2546 total_covered, total,
2547 "Range generation failed: {} threads should cover {} slots but only cover {}",
2548 threads, total, total_covered
2549 );
2550
2551 for i in 1..ranges.len() {
2553 assert_eq!(
2554 ranges[i - 1].end,
2555 ranges[i].start,
2556 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2557 i - 1,
2558 ranges[i - 1].end,
2559 i,
2560 ranges[i].start
2561 );
2562 }
2563
2564 log::info!(
2565 target: LOG_MODULE,
2566 "Generated {} thread ranges covering {} slots total",
2567 threads,
2568 total_covered
2569 );
2570 ranges
2571}
2572
2573fn human_readable_duration(duration: std::time::Duration) -> String {
2574 if duration.is_zero() {
2575 return "0s".into();
2576 }
2577 let total_secs = duration.as_secs();
2578 if total_secs < 60 {
2579 let secs_f = duration.as_secs_f64();
2580 if total_secs == 0 {
2581 format!("{:.2}s", secs_f)
2582 } else if duration.subsec_millis() == 0 {
2583 format!("{}s", total_secs)
2584 } else {
2585 format!("{:.2}s", secs_f)
2586 }
2587 } else {
2588 let mut secs = total_secs;
2589 let days = secs / 86_400;
2590 secs %= 86_400;
2591 let hours = secs / 3_600;
2592 secs %= 3_600;
2593 let minutes = secs / 60;
2594 secs %= 60;
2595 if days > 0 {
2596 if hours > 0 {
2597 format!("{days}d{hours}h")
2598 } else {
2599 format!("{days}d")
2600 }
2601 } else if hours > 0 {
2602 if minutes > 0 {
2603 format!("{hours}h{minutes}m")
2604 } else {
2605 format!("{hours}h")
2606 }
2607 } else if minutes > 0 {
2608 if secs > 0 {
2609 format!("{minutes}m{secs}s")
2610 } else {
2611 format!("{minutes}m")
2612 }
2613 } else {
2614 format!("{secs}s")
2615 }
2616 }
2617}
2618
2619#[cfg(test)]
2620fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2621 Box::pin(async move {
2622 let elapsed = stats.start_time.elapsed();
2623 let elapsed_secs = elapsed.as_secs_f64();
2624 let tps = if elapsed_secs > 0.0 {
2625 stats.transactions_processed as f64 / elapsed_secs
2626 } else {
2627 0.0
2628 };
2629 log::info!(
2630 target: LOG_MODULE,
2631 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2632 stats.thread_stats.current_slot,
2633 stats.slots_processed,
2634 stats.blocks_processed,
2635 stats.transactions_processed,
2636 stats.entries_processed,
2637 stats.rewards_processed,
2638 elapsed_secs,
2639 tps
2640 );
2641 Ok(())
2642 })
2643}
2644
2645#[cfg(test)]
2646use futures_util::FutureExt;
2647#[cfg(test)]
2648use serial_test::serial;
2649#[cfg(test)]
2650use std::sync::{Mutex, OnceLock};
2651
2652#[cfg(test)]
2653async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2654 use std::sync::Arc;
2655 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2656
2657 let found = Arc::new(AtomicBool::new(false));
2658 let observed_total = Arc::new(AtomicU64::new(0));
2659 let observed_non_vote = Arc::new(AtomicU64::new(0));
2660
2661 let found_block = found.clone();
2662 let observed_total_block = observed_total.clone();
2663 let target_slot_block = slot;
2664 let target_slot_tx = slot;
2665 let observed_non_vote_tx = observed_non_vote.clone();
2666
2667 firehose(
2668 1,
2669 false,
2670 None,
2671 target_slot_block..(target_slot_block + 1),
2672 Some(move |_thread_id: usize, block: BlockData| {
2673 let found_block = found_block.clone();
2674 let observed_total_block = observed_total_block.clone();
2675 async move {
2676 if block.slot() == target_slot_block {
2677 assert!(
2678 !block.was_skipped(),
2679 "slot {target_slot_block} was marked leader skipped",
2680 );
2681 if let BlockData::Block {
2682 executed_transaction_count,
2683 ..
2684 } = block
2685 {
2686 found_block.store(true, Ordering::Relaxed);
2687 observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2688 }
2689 }
2690 Ok(())
2691 }
2692 .boxed()
2693 }),
2694 Some(move |_thread_id: usize, transaction: TransactionData| {
2695 let observed_non_vote_tx = observed_non_vote_tx.clone();
2696 async move {
2697 if transaction.slot == target_slot_tx && !transaction.is_vote {
2698 observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2699 }
2700 Ok(())
2701 }
2702 .boxed()
2703 }),
2704 None::<OnEntryFn>,
2705 None::<OnRewardFn>,
2706 None::<OnErrorFn>,
2707 None::<OnStatsTrackingFn>,
2708 None,
2709 )
2710 .await
2711 .unwrap();
2712
2713 assert!(
2714 found.load(Ordering::Relaxed),
2715 "target slot {slot} was not processed"
2716 );
2717 let observed_total = observed_total.load(Ordering::Relaxed);
2718 let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2719 assert!(
2720 observed_total > 0,
2721 "slot {slot} executed transaction count was zero"
2722 );
2723 assert!(
2724 observed_total >= min_executed,
2725 "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2726 );
2727 log::info!(
2728 target: LOG_MODULE,
2729 "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2730 observed_total,
2731 observed_non_vote
2732 );
2733}
2734
2735#[cfg(test)]
2736async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2737 use crate::index::slot_to_offset;
2738 use crate::node::Node;
2739
2740 let epoch = slot_to_epoch(slot);
2741 let client = crate::network::create_http_client();
2742 let stream = fetch_epoch_stream(epoch, &client).await;
2743 let mut reader = NodeReader::new(stream);
2744 reader
2745 .seek_to_slot(slot)
2746 .await
2747 .map_err(|err| Box::new(err) as SharedError)?;
2748
2749 let nodes = reader.read_until_block().await?;
2750 let mut transactions = 0u64;
2751 let mut entries = 0u64;
2752 let mut entry_tx_total = 0u64;
2753 let mut dataframes = 0u64;
2754 let mut rewards = 0u64;
2755 let mut subsets = 0u64;
2756 let mut epochs = 0u64;
2757 let mut block_slot = None;
2758 let mut block_entries = None;
2759 let first_kind = nodes
2760 .0
2761 .first()
2762 .map(|node| node.get_node())
2763 .map(|node| match node {
2764 Node::Transaction(_) => "transaction",
2765 Node::Entry(_) => "entry",
2766 Node::Block(_) => "block",
2767 Node::Subset(_) => "subset",
2768 Node::Epoch(_) => "epoch",
2769 Node::Rewards(_) => "rewards",
2770 Node::DataFrame(_) => "dataframe",
2771 })
2772 .unwrap_or("none");
2773
2774 for node in &nodes.0 {
2775 match node.get_node() {
2776 Node::Transaction(_) => {
2777 transactions += 1;
2778 }
2779 Node::Entry(entry) => {
2780 entries += 1;
2781 entry_tx_total += entry.transactions.len() as u64;
2782 }
2783 Node::Block(block) => {
2784 block_slot = Some(block.slot);
2785 block_entries = Some(block.entries.len());
2786 }
2787 Node::Subset(_) => {
2788 subsets += 1;
2789 }
2790 Node::Epoch(_) => {
2791 epochs += 1;
2792 }
2793 Node::Rewards(_) => {
2794 rewards += 1;
2795 }
2796 Node::DataFrame(_) => {
2797 dataframes += 1;
2798 }
2799 }
2800 }
2801
2802 log::info!(
2803 target: LOG_MODULE,
2804 "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2805 nodes.len(),
2806 first_kind,
2807 transactions,
2808 entries,
2809 entry_tx_total,
2810 block_slot,
2811 block_entries,
2812 dataframes,
2813 rewards,
2814 subsets,
2815 epochs
2816 );
2817
2818 if slot > 0 {
2819 let mut found_previous = None;
2820 for delta in 1..=5 {
2821 let candidate = slot.saturating_sub(delta);
2822 match slot_to_offset(candidate).await {
2823 Ok(offset) => {
2824 found_previous = Some((candidate, offset));
2825 break;
2826 }
2827 Err(err) => {
2828 log::info!(
2829 target: LOG_MODULE,
2830 "slot {slot} previous lookup {candidate} failed: {err}"
2831 );
2832 }
2833 }
2834 }
2835 if let Some((candidate, offset)) = found_previous {
2836 log::info!(
2837 target: LOG_MODULE,
2838 "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2839 );
2840 } else {
2841 log::info!(
2842 target: LOG_MODULE,
2843 "slot {slot} no previous offsets found within 5 slots"
2844 );
2845 }
2846 }
2847
2848 Ok(())
2849}
2850
2851#[tokio::test(flavor = "multi_thread")]
2852async fn test_firehose_epoch_800() {
2853 use dashmap::DashSet;
2854 use std::sync::atomic::{AtomicU64, Ordering};
2855 solana_logger::setup_with_default("info");
2856 const THREADS: usize = 4;
2857 const NUM_SLOTS_TO_COVER: u64 = 50;
2858 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2859 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2860 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2861 static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2862 static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2863 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2864 let stats_tracking = StatsTracking {
2865 on_stats: log_stats_handler,
2866 tracking_interval_slots: 10,
2867 };
2868
2869 for prev in PREV_BLOCK.iter() {
2870 prev.store(0, Ordering::Relaxed);
2871 }
2872 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2873 NUM_BLOCKS.store(0, Ordering::Relaxed);
2874 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2875 SEEN_SLOTS.get_or_init(DashSet::new).clear();
2876 SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2877
2878 firehose(
2879 THREADS.try_into().unwrap(),
2880 false,
2881 None,
2882 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2883 Some(|thread_id: usize, block: BlockData| {
2884 async move {
2885 let _prev =
2886 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2887 if block.was_skipped() {
2888 log::info!(
2889 target: LOG_MODULE,
2890 "leader skipped block {} on thread {}",
2891 block.slot(),
2892 thread_id,
2893 );
2894 } else {
2895 }
2902
2903 let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2904 if block.was_skipped() {
2905 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2906 SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2907 } else {
2908 if first_time {
2909 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2910 if let BlockData::Block {
2911 executed_transaction_count,
2912 ..
2913 } = &block
2914 {
2915 let executed = *executed_transaction_count;
2916 let _ = MIN_TRANSACTIONS.fetch_update(
2917 Ordering::Relaxed,
2918 Ordering::Relaxed,
2919 |current| {
2920 if executed < current {
2921 Some(executed)
2922 } else {
2923 None
2924 }
2925 },
2926 );
2927 }
2928 }
2929 }
2930 Ok(())
2931 }
2932 .boxed()
2933 }),
2934 None::<OnTxFn>,
2935 None::<OnEntryFn>,
2936 None::<OnRewardFn>,
2937 None::<OnErrorFn>,
2938 Some(stats_tracking),
2939 None,
2940 )
2941 .await
2942 .unwrap();
2943 let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
2944 assert_eq!(
2945 seen, NUM_SLOTS_TO_COVER,
2946 "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
2947 );
2948 let mut skipped: Vec<u64> = SEEN_SKIPPED
2949 .get_or_init(DashSet::new)
2950 .iter()
2951 .map(|v| *v)
2952 .collect();
2953 skipped.sort_unstable();
2954 const EXPECTED_SKIPPED: [u64; 6] = [
2956 345_600_004,
2957 345_600_005,
2958 345_600_008,
2959 345_600_009,
2960 345_600_010,
2961 345_600_011,
2962 ];
2963 assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
2964 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
2965}
2966
2967#[tokio::test(flavor = "multi_thread")]
2968async fn test_firehose_target_slot_transactions() {
2969 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2970 solana_logger::setup_with_default("info");
2971 const TARGET_SLOT: u64 = 376_273_722;
2972 const SLOT_RADIUS: u64 = 50;
2973 const EXPECTED_TRANSACTIONS: u64 = 1414;
2974 const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
2975 static FOUND: AtomicBool = AtomicBool::new(false);
2976 static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
2977 static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
2978
2979 FOUND.store(false, Ordering::Relaxed);
2980 OBSERVED_TXS.store(0, Ordering::Relaxed);
2981 OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
2982
2983 firehose(
2984 4,
2985 false,
2986 None,
2987 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
2988 Some(|_thread_id: usize, block: BlockData| {
2989 async move {
2990 if block.slot() == TARGET_SLOT {
2991 assert!(
2992 !block.was_skipped(),
2993 "target slot {TARGET_SLOT} was marked leader skipped",
2994 );
2995 if let BlockData::Block {
2996 executed_transaction_count,
2997 ..
2998 } = block
2999 {
3000 OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
3001 FOUND.store(true, Ordering::Relaxed);
3002 assert_eq!(
3003 executed_transaction_count, EXPECTED_TRANSACTIONS,
3004 "unexpected transaction count for slot {TARGET_SLOT}"
3005 );
3006 assert_eq!(
3007 OBSERVED_NON_VOTE.load(Ordering::Relaxed),
3008 EXPECTED_NON_VOTE_TRANSACTIONS,
3009 "unexpected non-vote transaction count for slot {TARGET_SLOT}"
3010 );
3011 }
3012 }
3013 Ok(())
3014 }
3015 .boxed()
3016 }),
3017 Some(|_thread_id: usize, transaction: TransactionData| {
3018 async move {
3019 if transaction.slot == TARGET_SLOT && !transaction.is_vote {
3020 OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
3021 }
3022 Ok(())
3023 }
3024 .boxed()
3025 }),
3026 None::<OnEntryFn>,
3027 None::<OnRewardFn>,
3028 None::<OnErrorFn>,
3029 None::<OnStatsTrackingFn>,
3030 None,
3031 )
3032 .await
3033 .unwrap();
3034
3035 assert!(
3036 FOUND.load(Ordering::Relaxed),
3037 "target slot was not processed"
3038 );
3039 assert_eq!(
3040 OBSERVED_TXS.load(Ordering::Relaxed),
3041 EXPECTED_TRANSACTIONS,
3042 "recorded transaction count mismatch"
3043 );
3044}
3045
3046#[cfg(test)]
3047#[serial]
3048#[tokio::test(flavor = "multi_thread")]
3049async fn test_firehose_epoch_900_boundary_window_sequential_monotonic_transactions() {
3050 use std::sync::{
3051 Arc, Mutex,
3052 atomic::{AtomicU64, Ordering},
3053 };
3054
3055 solana_logger::setup_with_default("info");
3056 const SLOT_COUNT: u64 = 100;
3057 const THREADS: u64 = 4;
3058 const TEST_BUFFER_WINDOW: &str = "4GiB";
3059
3060 let (epoch_900_start, _) = epoch_to_slot_range(900);
3061 let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3062
3063 let last_seen_tx_slot = Arc::new(Mutex::new(slot_range.start));
3064 let observed_txs = Arc::new(AtomicU64::new(0));
3065 let stats_tracking = StatsTracking {
3066 on_stats: log_stats_handler,
3067 tracking_interval_slots: 100,
3068 };
3069 let test_buffer_window_bytes = crate::system::parse_buffer_window_bytes(TEST_BUFFER_WINDOW)
3070 .expect("valid test buffer window");
3071
3072 firehose(
3073 THREADS,
3074 true,
3075 Some(test_buffer_window_bytes),
3076 slot_range.clone(),
3077 None::<OnBlockFn>,
3078 Some({
3079 let last_seen_tx_slot = last_seen_tx_slot.clone();
3080 let observed_txs = observed_txs.clone();
3081 move |_thread_id: usize, transaction: TransactionData| {
3082 let last_seen_tx_slot = last_seen_tx_slot.clone();
3083 let observed_txs = observed_txs.clone();
3084 async move {
3085 let mut previous = last_seen_tx_slot.lock().unwrap();
3086 assert!(
3089 transaction.slot >= *previous,
3090 "transaction slot regressed: prev={}, current={}",
3091 *previous,
3092 transaction.slot
3093 );
3094 *previous = transaction.slot;
3095 observed_txs.fetch_add(1, Ordering::Relaxed);
3096 Ok(())
3097 }
3098 .boxed()
3099 }
3100 }),
3101 None::<OnEntryFn>,
3102 None::<OnRewardFn>,
3103 None::<OnErrorFn>,
3104 Some(stats_tracking),
3105 None,
3106 )
3107 .await
3108 .unwrap();
3109
3110 assert!(
3111 observed_txs.load(Ordering::Relaxed) > 0,
3112 "expected to observe at least one transaction in slots [{}, {})",
3113 slot_range.start,
3114 slot_range.end
3115 );
3116}
3117
3118#[cfg(test)]
3119#[serial]
3120#[tokio::test(flavor = "multi_thread")]
3121async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
3122 solana_logger::setup_with_default("info");
3123 assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
3124}
3125
3126#[cfg(test)]
3127#[serial]
3128#[tokio::test(flavor = "multi_thread")]
3129async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
3130 solana_logger::setup_with_default("info");
3131 assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
3132}
3133
3134#[cfg(test)]
3135#[serial]
3136#[tokio::test(flavor = "multi_thread")]
3137async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
3138 solana_logger::setup_with_default("info");
3139 assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
3140}
3141
3142#[cfg(test)]
3143#[serial]
3144#[tokio::test(flavor = "multi_thread")]
3145async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
3146 solana_logger::setup_with_default("info");
3147 assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
3148}
3149
3150#[cfg(test)]
3151#[ignore]
3152#[serial]
3153#[tokio::test(flavor = "multi_thread")]
3154async fn debug_epoch_720_slot_311173980_node_summary() {
3155 solana_logger::setup_with_default("info");
3156 const SLOTS: &[u64] = &[
3157 311_173_980,
3158 311_225_232,
3159 311_175_860,
3160 311_134_608,
3161 376_273_722,
3162 ];
3163 for slot in SLOTS {
3164 log_slot_node_summary(*slot).await.expect("slot summary");
3165 }
3166}
3167
3168#[tokio::test(flavor = "multi_thread")]
3169async fn test_firehose_epoch_850_has_logs() {
3170 use std::sync::atomic::{AtomicU64, Ordering};
3171 solana_logger::setup_with_default("info");
3172 const START_SLOT: u64 = 367_200_075; const SLOT_COUNT: u64 = 50;
3174 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3175
3176 TOTAL_TXS.store(0, Ordering::Relaxed);
3177
3178 firehose(
3179 4,
3180 false,
3181 None,
3182 START_SLOT..(START_SLOT + SLOT_COUNT),
3183 None::<OnBlockFn>,
3184 Some(|_thread_id: usize, transaction: TransactionData| {
3185 async move {
3186 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3187 if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3188 let has_logs = logs.iter().any(|msg| !msg.is_empty());
3189 assert_eq!(has_logs, true);
3190 }
3191 Ok(())
3192 }
3193 .boxed()
3194 }),
3195 None::<OnEntryFn>,
3196 None::<OnRewardFn>,
3197 None::<OnErrorFn>,
3198 None::<OnStatsTrackingFn>,
3199 None,
3200 )
3201 .await
3202 .unwrap();
3203
3204 assert!(
3205 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3206 "no transactions observed in epoch 850 range"
3207 );
3208}
3209
3210#[tokio::test(flavor = "multi_thread")]
3211async fn test_firehose_epoch_850_votes_present() {
3212 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3213 solana_logger::setup_with_default("info");
3214 const TARGET_SLOT: u64 = 367_200_100; const SLOT_RADIUS: u64 = 10;
3216 static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3217 static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3218 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3219
3220 SEEN_BLOCK.store(false, Ordering::Relaxed);
3221 VOTE_TXS.store(0, Ordering::Relaxed);
3222 TOTAL_TXS.store(0, Ordering::Relaxed);
3223
3224 firehose(
3225 2,
3226 false,
3227 None,
3228 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3229 Some(|_thread_id: usize, block: BlockData| {
3230 async move {
3231 if block.slot() == TARGET_SLOT {
3232 assert!(
3233 !block.was_skipped(),
3234 "target slot {TARGET_SLOT} was marked leader skipped",
3235 );
3236 SEEN_BLOCK.store(true, Ordering::Relaxed);
3237 }
3238 Ok(())
3239 }
3240 .boxed()
3241 }),
3242 Some(|_thread_id: usize, transaction: TransactionData| {
3243 async move {
3244 if transaction.slot == TARGET_SLOT {
3245 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3246 if transaction.is_vote {
3247 VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3248 }
3249 }
3250 Ok(())
3251 }
3252 .boxed()
3253 }),
3254 None::<OnEntryFn>,
3255 None::<OnRewardFn>,
3256 None::<OnErrorFn>,
3257 None::<OnStatsTrackingFn>,
3258 None,
3259 )
3260 .await
3261 .unwrap();
3262
3263 assert!(
3264 SEEN_BLOCK.load(Ordering::Relaxed),
3265 "target slot was not processed"
3266 );
3267 assert!(
3268 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3269 "no transactions counted in target slot"
3270 );
3271 assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3272}
3273
3274#[cfg(test)]
3275#[serial]
3276#[tokio::test(flavor = "multi_thread")]
3277async fn test_firehose_restart_loses_coverage_without_reset() {
3278 use std::collections::HashMap;
3279 solana_logger::setup_with_default("info");
3280 const THREADS: usize = 1;
3281 const START_SLOT: u64 = 345_600_000;
3282 const NUM_SLOTS: u64 = 8;
3283
3284 static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3285 COVERAGE
3286 .get_or_init(|| Mutex::new(HashMap::new()))
3287 .lock()
3288 .unwrap()
3289 .clear();
3290 static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3291 static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3292 FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3293 SEEN_BLOCKS.store(0, Ordering::Relaxed);
3294
3295 firehose(
3296 THREADS.try_into().unwrap(),
3297 false,
3298 None,
3299 START_SLOT..(START_SLOT + NUM_SLOTS),
3300 Some(|_thread_id: usize, block: BlockData| {
3301 async move {
3302 if !block.was_skipped()
3304 && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3305 && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3306 {
3307 return Err("synthetic handler failure to exercise restart".into());
3308 }
3309 let mut coverage = COVERAGE
3310 .get_or_init(|| Mutex::new(HashMap::new()))
3311 .lock()
3312 .unwrap();
3313 *coverage.entry(block.slot()).or_insert(0) += 1;
3314 if !block.was_skipped() {
3315 SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3316 }
3317 Ok(())
3318 }
3319 .boxed()
3320 }),
3321 None::<OnTxFn>,
3322 None::<OnEntryFn>,
3323 None::<OnRewardFn>,
3324 None::<OnErrorFn>,
3325 None::<OnStatsTrackingFn>,
3326 None,
3327 )
3328 .await
3329 .unwrap();
3330
3331 let coverage = COVERAGE.get().unwrap().lock().unwrap();
3332 for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3333 assert!(
3334 coverage.contains_key(&slot),
3335 "missing coverage for slot {slot} after restart"
3336 );
3337 }
3338}
3339
3340#[cfg(test)]
3341#[serial]
3342#[tokio::test(flavor = "multi_thread")]
3343async fn test_firehose_gap_coverage_near_known_missing_range() {
3344 use std::collections::HashSet;
3345 solana_logger::setup_with_default("info");
3346 const GAP_START: u64 = 378864000;
3347 const START_SLOT: u64 = GAP_START - 1000;
3348 const END_SLOT: u64 = GAP_START + 1000;
3349 const THREADS: usize = 16;
3350
3351 static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3352 COVERAGE
3353 .get_or_init(|| Mutex::new(HashSet::new()))
3354 .lock()
3355 .unwrap()
3356 .clear();
3357
3358 firehose(
3359 THREADS.try_into().unwrap(),
3360 false,
3361 None,
3362 START_SLOT..(END_SLOT + 1),
3363 Some(|_thread_id: usize, block: BlockData| {
3364 async move {
3365 if block.was_skipped() {
3366 return Ok(());
3367 }
3368 let slot = block.slot();
3369 COVERAGE
3370 .get_or_init(|| Mutex::new(HashSet::new()))
3371 .lock()
3372 .unwrap()
3373 .insert(slot);
3374 Ok(())
3375 }
3376 .boxed()
3377 }),
3378 None::<OnTxFn>,
3379 None::<OnEntryFn>,
3380 None::<OnRewardFn>,
3381 None::<OnErrorFn>,
3382 None::<OnStatsTrackingFn>,
3383 None,
3384 )
3385 .await
3386 .unwrap();
3387
3388 let mut coverage = COVERAGE
3389 .get_or_init(|| Mutex::new(HashSet::new()))
3390 .lock()
3391 .unwrap()
3392 .clone();
3393
3394 coverage.insert(378864396);
3396 coverage.insert(378864397);
3397 coverage.insert(378864398);
3398 coverage.insert(378864399);
3399
3400 let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3401 let missing: Vec<u64> = expected
3402 .iter()
3403 .copied()
3404 .filter(|slot| !coverage.contains(slot))
3405 .collect();
3406 assert!(
3407 missing.is_empty(),
3408 "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3409 missing.len(),
3410 &missing[..missing.len().min(10)]
3411 );
3412}