1use crossbeam_channel::{Receiver, Sender, unbounded};
2use dashmap::{DashMap, DashSet};
3use futures_util::future::BoxFuture;
4use reqwest::{Client, Url};
5use solana_address::Address;
6use solana_geyser_plugin_manager::{
7 block_metadata_notifier_interface::BlockMetadataNotifier,
8 geyser_plugin_service::GeyserPluginServiceError,
9};
10use solana_hash::Hash;
11use solana_ledger::entry_notifier_interface::EntryNotifier;
12use solana_reward_info::RewardInfo;
13use solana_rpc::{
14 optimistically_confirmed_bank_tracker::SlotNotification,
15 transaction_notifier_interface::TransactionNotifier,
16};
17use solana_runtime::bank::{KeyedRewardsAndNumPartitions, RewardType};
18use solana_sdk_ids::vote::id as vote_program_id;
19use solana_transaction::versioned::VersionedTransaction;
20use std::{
21 fmt::Display,
22 future::Future,
23 io,
24 ops::Range,
25 path::PathBuf,
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
29 },
30};
31use thiserror::Error;
32use tokio::{
33 sync::broadcast::{self, error::TryRecvError},
34 time::timeout,
35};
36
37use crate::{
38 LOG_MODULE, SharedError,
39 epochs::{
40 FetchEpochStreamOptions, epoch_to_slot_range, fetch_epoch_stream,
41 fetch_epoch_stream_with_options, slot_to_epoch,
42 },
43 index::{SLOT_OFFSET_INDEX, SlotOffsetIndexError, slot_to_offset},
44 node_reader::NodeReader,
45 utils,
46};
47
48const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
52const OP_TIMEOUT_SEQUENTIAL: std::time::Duration = std::time::Duration::from_secs(180);
53const BINCODE_EPOCH_CUTOFF: u64 = 157;
55
56fn poll_shutdown(
57 flag: &Arc<std::sync::atomic::AtomicBool>,
58 receiver: &mut Option<broadcast::Receiver<()>>,
59) -> bool {
60 if let Some(rx) = receiver {
61 match rx.try_recv() {
62 Ok(_) | Err(TryRecvError::Lagged(_)) => {
63 flag.store(true, Ordering::SeqCst);
64 }
65 Err(TryRecvError::Closed) => {
66 flag.store(true, Ordering::SeqCst);
67 }
68 Err(TryRecvError::Empty) => {}
69 }
70 }
71 flag.load(Ordering::SeqCst)
72}
73
74fn is_shutdown_error(err: &FirehoseError) -> bool {
75 fn is_interrupted(inner: &(dyn std::error::Error + 'static)) -> bool {
76 inner
77 .downcast_ref::<io::Error>()
78 .map(|io_err| io_err.kind() == io::ErrorKind::Interrupted)
79 .unwrap_or(false)
80 }
81
82 match err {
83 FirehoseError::BlockHandlerError(inner)
84 | FirehoseError::TransactionHandlerError(inner)
85 | FirehoseError::EntryHandlerError(inner)
86 | FirehoseError::RewardHandlerError(inner)
87 | FirehoseError::OnStatsHandlerError(inner) => is_interrupted(inner.as_ref()),
88 _ => false,
89 }
90}
91
92async fn find_previous_indexed_slot(
93 local_start: u64,
94 epoch_start: u64,
95 log_target: &str,
96) -> Result<Option<u64>, FirehoseError> {
97 if local_start <= epoch_start {
98 return Ok(None);
99 }
100 let mut candidate = local_start.saturating_sub(1);
101 let mut skipped = 0u64;
102 loop {
103 match slot_to_offset(candidate).await {
104 Ok(_) => {
105 if skipped > 0 {
106 log::info!(
107 target: log_target,
108 "slot {} missing in index; seeking back {} slots to {}",
109 local_start.saturating_sub(1),
110 skipped,
111 candidate
112 );
113 }
114 return Ok(Some(candidate));
115 }
116 Err(SlotOffsetIndexError::SlotNotFound(..)) => {
117 if candidate <= epoch_start {
118 break;
119 }
120 skipped += 1;
121 candidate = candidate.saturating_sub(1);
122 }
123 Err(err) => return Err(FirehoseError::SlotOffsetIndexError(err)),
124 }
125 }
126 log::warn!(
127 target: log_target,
128 "no indexed slot found before {} (epoch start {}); reading from epoch start",
129 local_start,
130 epoch_start
131 );
132 Ok(None)
133}
134
135#[derive(Debug, Error)]
138pub enum FirehoseError {
139 Reqwest(reqwest::Error),
141 ReadHeader(SharedError),
143 GeyserPluginService(GeyserPluginServiceError),
145 FailedToGetTransactionNotifier,
147 ReadUntilBlockError(SharedError),
149 GetBlockError(SharedError),
151 NodeDecodingError(usize, SharedError),
153 SlotOffsetIndexError(SlotOffsetIndexError),
155 SeekToSlotError(SharedError),
157 OnLoadError(SharedError),
159 OnStatsHandlerError(SharedError),
161 OperationTimeout(&'static str),
163 TransactionHandlerError(SharedError),
165 EntryHandlerError(SharedError),
167 RewardHandlerError(SharedError),
169 BlockHandlerError(SharedError),
171}
172
173unsafe impl Send for FirehoseError {}
174unsafe impl Sync for FirehoseError {}
175
176impl Display for FirehoseError {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 FirehoseError::Reqwest(e) => write!(f, "Reqwest error: {}", e),
180 FirehoseError::ReadHeader(error) => {
181 write!(f, "Error reading header: {}", error)
182 }
183 FirehoseError::GeyserPluginService(geyser_plugin_service_error) => write!(
184 f,
185 "Error initializing geyser plugin service: {}",
186 geyser_plugin_service_error
187 ),
188 FirehoseError::FailedToGetTransactionNotifier => write!(
189 f,
190 "Failed to get transaction notifier from GeyserPluginService"
191 ),
192 FirehoseError::ReadUntilBlockError(error) => {
193 write!(f, "Error reading until block: {}", error)
194 }
195 FirehoseError::GetBlockError(error) => write!(f, "Error getting block: {}", error),
196 FirehoseError::NodeDecodingError(item_index, error) => {
197 write!(
198 f,
199 "Error seeking, reading data from, or decoding data for data node {}: {}",
200 item_index, error
201 )
202 }
203 FirehoseError::SlotOffsetIndexError(slot_offset_index_error) => write!(
204 f,
205 "Error getting info from slot offset index: {}",
206 slot_offset_index_error
207 ),
208 FirehoseError::SeekToSlotError(error) => {
209 write!(f, "Error seeking to slot: {}", error)
210 }
211 FirehoseError::OnLoadError(error) => write!(f, "Error on load: {}", error),
212 FirehoseError::OnStatsHandlerError(error) => {
213 write!(f, "Stats handler error: {}", error)
214 }
215 FirehoseError::OperationTimeout(op) => {
216 write!(f, "Timeout while waiting for operation: {}", op)
217 }
218 FirehoseError::TransactionHandlerError(error) => {
219 write!(f, "Transaction handler error: {}", error)
220 }
221 FirehoseError::EntryHandlerError(error) => {
222 write!(f, "Entry handler error: {}", error)
223 }
224 FirehoseError::RewardHandlerError(error) => {
225 write!(f, "Reward handler error: {}", error)
226 }
227 FirehoseError::BlockHandlerError(error) => {
228 write!(f, "Block handler error: {}", error)
229 }
230 }
231 }
232}
233
234impl From<reqwest::Error> for FirehoseError {
235 fn from(e: reqwest::Error) -> Self {
236 FirehoseError::Reqwest(e)
237 }
238}
239
240impl From<GeyserPluginServiceError> for FirehoseError {
241 fn from(e: GeyserPluginServiceError) -> Self {
242 FirehoseError::GeyserPluginService(e)
243 }
244}
245
246impl From<SlotOffsetIndexError> for FirehoseError {
247 fn from(e: SlotOffsetIndexError) -> Self {
248 FirehoseError::SlotOffsetIndexError(e)
249 }
250}
251
252#[derive(Clone, PartialEq, Eq, Hash, Debug)]
254pub struct ThreadStats {
255 pub thread_id: usize,
257 pub start_time: std::time::Instant,
259 pub finish_time: Option<std::time::Instant>,
261 pub slot_range: Range<u64>,
263 pub initial_slot_range: Range<u64>,
265 pub current_slot: u64,
267 pub slots_processed: u64,
269 pub blocks_processed: u64,
271 pub leader_skipped_slots: u64,
273 pub transactions_processed: u64,
275 pub entries_processed: u64,
277 pub rewards_processed: u64,
279}
280
281#[derive(Clone, PartialEq, Eq, Hash, Debug)]
283pub struct Stats {
284 pub thread_stats: ThreadStats,
286 pub start_time: std::time::Instant,
288 pub finish_time: Option<std::time::Instant>,
290 pub slot_range: Range<u64>,
292 pub slots_processed: u64,
294 pub blocks_processed: u64,
296 pub leader_skipped_slots: u64,
298 pub transactions_processed: u64,
300 pub entries_processed: u64,
302 pub rewards_processed: u64,
304 pub transactions_since_last_pulse: u64,
306 pub blocks_since_last_pulse: u64,
308 pub slots_since_last_pulse: u64,
310 pub time_since_last_pulse: std::time::Duration,
312}
313
314#[derive(Clone, PartialEq, Eq, Hash, Debug)]
316pub struct StatsTracking<OnStats: Handler<Stats>> {
317 pub on_stats: OnStats,
319 pub tracking_interval_slots: u64,
321}
322
323#[inline(always)]
324#[allow(clippy::too_many_arguments)]
325async fn maybe_emit_stats<OnStats: Handler<Stats>>(
326 stats_tracking: Option<&StatsTracking<OnStats>>,
327 thread_index: usize,
328 thread_stats: &ThreadStats,
329 overall_slots_processed: &AtomicU64,
330 overall_blocks_processed: &AtomicU64,
331 overall_transactions_processed: &AtomicU64,
332 overall_entries_processed: &AtomicU64,
333 transactions_since_stats: &AtomicU64,
334 blocks_since_stats: &AtomicU64,
335 slots_since_stats: &AtomicU64,
336 last_pulse: &Arc<AtomicU64>,
337 base_instant: std::time::Instant,
338) -> Result<(), (FirehoseError, u64)> {
339 if let Some(stats_tracker) = stats_tracking {
340 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
341 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
342 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
343 let total_entries = overall_entries_processed.load(Ordering::Relaxed);
344 let now_nanos = base_instant.elapsed().as_nanos() as u64;
345 let previous = last_pulse.swap(now_nanos, Ordering::Relaxed);
346 let delta_nanos = now_nanos.saturating_sub(previous);
347 let time_since_last_pulse = std::time::Duration::from_nanos(delta_nanos.max(1));
348 let processed_transactions = transactions_since_stats.swap(0, Ordering::Relaxed);
349 let processed_blocks = blocks_since_stats.swap(0, Ordering::Relaxed);
350 let processed_slots = slots_since_stats.swap(0, Ordering::Relaxed);
351
352 let stats = Stats {
353 thread_stats: thread_stats.clone(),
354 start_time: thread_stats.start_time,
355 finish_time: thread_stats.finish_time,
356 slot_range: thread_stats.slot_range.clone(),
357 slots_processed: total_slots,
358 blocks_processed: total_blocks,
359 leader_skipped_slots: total_slots.saturating_sub(total_blocks),
360 transactions_processed: total_transactions,
361 entries_processed: total_entries,
362 rewards_processed: thread_stats.rewards_processed,
363 transactions_since_last_pulse: processed_transactions,
364 blocks_since_last_pulse: processed_blocks,
365 slots_since_last_pulse: processed_slots,
366 time_since_last_pulse,
367 };
368
369 if let Err(e) = (stats_tracker.on_stats)(thread_index, stats).await {
370 last_pulse.store(previous, Ordering::Relaxed);
371 transactions_since_stats.fetch_add(processed_transactions, Ordering::Relaxed);
372 blocks_since_stats.fetch_add(processed_blocks, Ordering::Relaxed);
373 slots_since_stats.fetch_add(processed_slots, Ordering::Relaxed);
374 return Err((
375 FirehoseError::OnStatsHandlerError(e),
376 thread_stats.current_slot,
377 ));
378 }
379 }
380 Ok(())
381}
382
383#[inline(always)]
384fn fetch_add_if(tracking_enabled: bool, atomic: &AtomicU64, value: u64) {
385 if tracking_enabled {
386 atomic.fetch_add(value, Ordering::Relaxed);
387 }
388}
389
390fn clear_pending_skip(
391 map: &DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
392 thread_id: usize,
393 slot: u64,
394) -> bool {
395 map.get(&thread_id)
396 .map(|set| set.remove(&slot).is_some())
397 .unwrap_or(false)
398}
399
400fn decode_transaction_status_meta_from_frame(
401 slot: u64,
402 reassembled_metadata: Vec<u8>,
403) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
404 if reassembled_metadata.is_empty() {
405 return Ok(solana_transaction_status::TransactionStatusMeta::default());
407 }
408
409 match utils::decompress_zstd(reassembled_metadata.as_slice()) {
410 Ok(decompressed) => {
411 decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
412 Box::new(std::io::Error::other(format!(
413 "decode transaction metadata (slot {slot}): {err}"
414 ))) as SharedError
415 })
416 }
417 Err(decomp_err) => {
418 decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
421 Box::new(std::io::Error::other(format!(
422 "transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
423 ))) as SharedError
424 })
425 }
426 }
427}
428
429#[derive(Debug, Default)]
430struct DecodedRewards {
431 keyed_rewards: Vec<(Address, RewardInfo)>,
432 num_partitions: Option<u64>,
433}
434
435impl DecodedRewards {
436 fn empty() -> Self {
437 Self {
438 keyed_rewards: Vec::new(),
439 num_partitions: None,
440 }
441 }
442}
443
444fn decode_rewards_from_frame(
445 slot: u64,
446 reassembled_rewards: Vec<u8>,
447) -> Result<DecodedRewards, SharedError> {
448 if reassembled_rewards.is_empty() {
449 return Ok(DecodedRewards::empty());
451 }
452
453 match utils::decompress_zstd(reassembled_rewards.as_slice()) {
454 Ok(decompressed) => decode_rewards_from_bytes(slot, decompressed.as_slice()).map_err(
455 |err| {
456 Box::new(std::io::Error::other(format!(
457 "decode rewards (slot {slot}): {err}"
458 ))) as SharedError
459 },
460 ),
461 Err(decomp_err) => decode_rewards_from_bytes(slot, reassembled_rewards.as_slice()).map_err(
462 |err| {
463 Box::new(std::io::Error::other(format!(
464 "rewards not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
465 ))) as SharedError
466 },
467 ),
468 }
469}
470
471fn decode_rewards_from_bytes(slot: u64, bytes: &[u8]) -> Result<DecodedRewards, SharedError> {
472 let epoch = slot_to_epoch(slot);
473 let proto_attempt: Result<solana_storage_proto::convert::generated::Rewards, _> =
474 prost_011::Message::decode(bytes);
475 match proto_attempt {
476 Ok(proto) => {
477 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
478 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
479 Box::new(std::io::Error::other(format!(
480 "convert rewards proto failed (epoch {epoch}): {err}"
481 ))) as SharedError
482 })?;
483 Ok(DecodedRewards {
484 keyed_rewards,
485 num_partitions,
486 })
487 }
488 Err(proto_err) => {
489 let stored: solana_storage_proto::StoredExtendedRewards =
490 bincode::deserialize(bytes).map_err(|bin_err| {
491 Box::new(std::io::Error::other(format!(
492 "protobuf decode rewards failed (epoch {epoch}); bincode failed too: {bin_err}; protobuf error: {proto_err}"
493 ))) as SharedError
494 })?;
495 let proto: solana_storage_proto::convert::generated::Rewards = stored.into();
496 let num_partitions = proto.num_partitions.as_ref().map(|p| p.num_partitions);
497 let keyed_rewards = convert_proto_rewards(&proto).map_err(|err| {
498 Box::new(std::io::Error::other(format!(
499 "convert rewards bincode fallback failed (epoch {epoch}); protobuf error: {proto_err}; conversion error: {err}"
500 ))) as SharedError
501 })?;
502 Ok(DecodedRewards {
503 keyed_rewards,
504 num_partitions,
505 })
506 }
507 }
508}
509
510fn decode_transaction_status_meta(
511 slot: u64,
512 metadata_bytes: &[u8],
513) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
514 let epoch = slot_to_epoch(slot);
515 let mut bincode_err: Option<String> = None;
516 if epoch < BINCODE_EPOCH_CUTOFF {
517 match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
518 metadata_bytes,
519 ) {
520 Ok(stored) => return Ok(stored.into()),
521 Err(err) => {
522 bincode_err = Some(err.to_string());
523 }
524 }
525 }
526
527 let bin_err_for_proto = bincode_err.clone();
528 let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
529 prost_011::Message::decode(metadata_bytes).map_err(|err| {
530 if let Some(ref bin_err) = bin_err_for_proto {
532 Box::new(std::io::Error::other(format!(
533 "protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
534 ))) as SharedError
535 } else {
536 Box::new(std::io::Error::other(format!(
537 "protobuf decode transaction metadata: {err}"
538 ))) as SharedError
539 }
540 })?;
541
542 proto.try_into().map_err(|err| {
543 if let Some(ref bin_err) = bincode_err {
544 Box::new(std::io::Error::other(format!(
545 "convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
546 ))) as SharedError
547 } else {
548 Box::new(std::io::Error::other(format!(
549 "convert transaction metadata proto: {err}"
550 ))) as SharedError
551 }
552 })
553}
554
555#[cfg(test)]
556mod metadata_decode_tests {
557 use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
558 use solana_message::v0::LoadedAddresses;
559 use solana_storage_proto::StoredTransactionStatusMeta;
560 use solana_transaction_status::TransactionStatusMeta;
561
562 fn sample_meta() -> TransactionStatusMeta {
563 TransactionStatusMeta {
564 fee: 42,
565 pre_balances: vec![1, 2],
566 post_balances: vec![3, 4],
567 log_messages: Some(vec!["hello".into()]),
568 pre_token_balances: Some(Vec::new()),
569 post_token_balances: Some(Vec::new()),
570 rewards: Some(Vec::new()),
571 compute_units_consumed: Some(7),
572 cost_units: Some(9),
573 loaded_addresses: LoadedAddresses::default(),
574 ..TransactionStatusMeta::default()
575 }
576 }
577
578 #[test]
579 fn decodes_bincode_metadata_for_early_epochs() {
580 let stored = StoredTransactionStatusMeta {
581 status: Ok(()),
582 fee: 42,
583 pre_balances: vec![1, 2],
584 post_balances: vec![3, 4],
585 inner_instructions: None,
586 log_messages: Some(vec!["hello".into()]),
587 pre_token_balances: Some(Vec::new()),
588 post_token_balances: Some(Vec::new()),
589 rewards: Some(Vec::new()),
590 return_data: None,
591 compute_units_consumed: Some(7),
592 cost_units: Some(9),
593 };
594 let bytes = bincode::serialize(&stored).expect("bincode serialize");
595 let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
596 assert_eq!(decoded, TransactionStatusMeta::from(stored));
597 }
598
599 #[test]
600 fn decodes_protobuf_metadata_for_later_epochs() {
601 let meta = sample_meta();
602 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
603 meta.clone().into();
604 let bytes = prost_011::Message::encode_to_vec(&generated);
605 let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
606 assert_eq!(decoded, meta);
607 }
608
609 #[test]
610 fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
611 let meta = sample_meta();
612 let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
613 meta.clone().into();
614 let bytes = prost_011::Message::encode_to_vec(&generated);
615 let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
617 assert_eq!(decoded, meta);
618 }
619
620 #[test]
621 fn empty_frame_decodes_to_default() {
622 let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
623 assert_eq!(decoded, TransactionStatusMeta::default());
624 }
625
626 #[test]
627 fn raw_bincode_frame_without_zstd_still_decodes() {
628 let stored = StoredTransactionStatusMeta {
629 status: Ok(()),
630 fee: 1,
631 pre_balances: vec![],
632 post_balances: vec![],
633 inner_instructions: None,
634 log_messages: None,
635 pre_token_balances: Some(Vec::new()),
636 post_token_balances: Some(Vec::new()),
637 rewards: Some(Vec::new()),
638 return_data: None,
639 compute_units_consumed: None,
640 cost_units: None,
641 };
642 let raw_bytes = bincode::serialize(&stored).expect("serialize");
643 let decoded =
644 decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
645 assert_eq!(decoded, TransactionStatusMeta::from(stored));
646 }
647}
648
649#[cfg(test)]
650mod rewards_decode_tests {
651 use super::decode_rewards_from_bytes;
652 use solana_sdk_ids::vote::id as vote_program_id;
653 use solana_storage_proto::StoredExtendedRewards;
654 use solana_transaction_status::{Reward, RewardType};
655
656 #[test]
657 fn decodes_protobuf_rewards() {
658 let pubkey = vote_program_id().to_string();
659 let proto = solana_storage_proto::convert::generated::Rewards {
660 rewards: vec![solana_storage_proto::convert::generated::Reward {
661 pubkey,
662 lamports: 5,
663 post_balance: 10,
664 reward_type: solana_storage_proto::convert::generated::RewardType::Fee as i32,
665 commission: "1".to_string(),
666 }],
667 num_partitions: Some(solana_storage_proto::convert::generated::NumPartitions {
668 num_partitions: 2,
669 }),
670 };
671 let bytes = prost_011::Message::encode_to_vec(&proto);
672 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode proto rewards");
673 assert_eq!(decoded.keyed_rewards.len(), 1);
674 assert_eq!(decoded.num_partitions, Some(2));
675 }
676
677 #[test]
678 fn decodes_bincode_rewards() {
679 let pubkey = vote_program_id().to_string();
680 let reward = Reward {
681 pubkey,
682 lamports: 7,
683 post_balance: 9,
684 reward_type: Some(RewardType::Rent),
685 commission: Some(3),
686 };
687 let stored_rewards: StoredExtendedRewards = vec![reward.into()];
688 let bytes = bincode::serialize(&stored_rewards).expect("bincode serialize");
689 let decoded = decode_rewards_from_bytes(0, &bytes).expect("decode bincode rewards");
690 assert_eq!(decoded.keyed_rewards.len(), 1);
691 assert_eq!(decoded.num_partitions, None);
692 }
693}
694
695#[derive(Debug, Clone)]
697pub struct TransactionData {
698 pub slot: u64,
700 pub transaction_slot_index: usize,
702 pub signature: solana_signature::Signature,
704 pub message_hash: Hash,
706 pub is_vote: bool,
708 pub transaction_status_meta: solana_transaction_status::TransactionStatusMeta,
710 pub transaction: VersionedTransaction,
712}
713
714#[derive(Debug, Clone)]
716pub struct EntryData {
717 pub slot: u64,
719 pub entry_index: usize,
721 pub transaction_indexes: Range<usize>,
723 pub num_hashes: u64,
725 pub hash: Hash,
727}
728
729#[derive(Debug, Clone)]
731pub struct RewardsData {
732 pub slot: u64,
734 pub rewards: Vec<(Address, RewardInfo)>,
736}
737
738#[derive(Debug)]
740pub enum BlockData {
741 Block {
743 parent_slot: u64,
745 parent_blockhash: Hash,
747 slot: u64,
749 blockhash: Hash,
751 rewards: KeyedRewardsAndNumPartitions,
753 block_time: Option<i64>,
755 block_height: Option<u64>,
757 executed_transaction_count: u64,
759 entry_count: u64,
761 },
762 PossibleLeaderSkipped {
765 slot: u64,
767 },
768}
769
770impl BlockData {
771 #[inline(always)]
773 pub const fn slot(&self) -> u64 {
774 match self {
775 BlockData::Block { slot, .. } => *slot,
776 BlockData::PossibleLeaderSkipped { slot } => *slot,
777 }
778 }
779
780 #[inline(always)]
782 pub const fn was_skipped(&self) -> bool {
783 matches!(self, BlockData::PossibleLeaderSkipped { .. })
784 }
785
786 #[inline(always)]
788 pub const fn block_time(&self) -> Option<i64> {
789 match self {
790 BlockData::Block { block_time, .. } => *block_time,
791 BlockData::PossibleLeaderSkipped { .. } => None,
792 }
793 }
794}
795
796type HandlerResult = Result<(), SharedError>;
797type HandlerFuture = BoxFuture<'static, HandlerResult>;
798
799pub trait Handler<Data>: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static {}
801
802impl<Data, F> Handler<Data> for F where
803 F: Fn(usize, Data) -> HandlerFuture + Send + Sync + Clone + 'static
804{
805}
806
807pub type HandlerFn<Data> = fn(usize, Data) -> HandlerFuture;
809pub type OnBlockFn = HandlerFn<BlockData>;
811pub type OnTxFn = HandlerFn<TransactionData>;
813pub type OnEntryFn = HandlerFn<EntryData>;
815pub type OnRewardFn = HandlerFn<RewardsData>;
817pub type StatsTracker = StatsTracking<HandlerFn<Stats>>;
819pub type OnErrorFn = HandlerFn<FirehoseErrorContext>;
821pub type OnStatsTrackingFn = StatsTracking<HandlerFn<Stats>>;
823
824#[derive(Clone, Debug)]
826pub struct FirehoseErrorContext {
827 pub thread_id: usize,
829 pub slot: u64,
831 pub epoch: u64,
833 pub error_message: String,
835}
836
837#[inline]
853#[allow(clippy::too_many_arguments)]
854pub async fn firehose<OnBlock, OnTransaction, OnEntry, OnRewards, OnStats, OnError>(
855 threads: u64,
856 sequential: bool,
857 reverse: bool,
858 buffer_window_bytes: Option<u64>,
859 slot_range: Range<u64>,
860 on_block: Option<OnBlock>,
861 on_tx: Option<OnTransaction>,
862 on_entry: Option<OnEntry>,
863 on_rewards: Option<OnRewards>,
864 on_error: Option<OnError>,
865 stats_tracking: Option<StatsTracking<OnStats>>,
866 shutdown_signal: Option<broadcast::Receiver<()>>,
867) -> Result<(), (FirehoseError, u64)>
868where
869 OnBlock: Handler<BlockData>,
870 OnTransaction: Handler<TransactionData>,
871 OnEntry: Handler<EntryData>,
872 OnRewards: Handler<RewardsData>,
873 OnStats: Handler<Stats>,
874 OnError: Handler<FirehoseErrorContext>,
875{
876 if threads == 0 {
877 return Err((
878 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
879 slot_range.start,
880 ));
881 }
882 let client = crate::network::create_http_client();
883 log::info!(target: LOG_MODULE, "starting firehose...");
884 log::info!(target: LOG_MODULE, "index base url: {}", SLOT_OFFSET_INDEX.base_url());
885 let sequential = sequential || reverse;
888 let firehose_threads = if sequential { 1 } else { threads };
889 let sequential_download_threads = std::cmp::max(1, threads as usize);
890 let sequential_buffer_window_bytes = buffer_window_bytes
891 .filter(|value| *value >= 2)
892 .unwrap_or_else(crate::system::default_firehose_buffer_window_bytes);
893 if sequential {
894 log::info!(
895 target: LOG_MODULE,
896 "sequential mode enabled: firehose_threads=1, ripget_threads={}, ripget_window={}",
897 sequential_download_threads,
898 crate::system::format_byte_size(sequential_buffer_window_bytes)
899 );
900 }
901 let reverse_mode = reverse;
902 if reverse_mode {
903 log::info!(
904 target: LOG_MODULE,
905 "reverse mode enabled: epochs processed from highest to lowest"
906 );
907 }
908
909 let slot_range = Arc::new(slot_range);
910
911 let subranges = generate_subranges(&slot_range, firehose_threads);
913 if firehose_threads > 1 {
914 log::debug!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
915 }
916
917 let firehose_start = std::time::Instant::now();
918 let shutdown_flag = Arc::new(AtomicBool::new(false));
919 if let Some(ref rx) = shutdown_signal {
920 let mut rx = rx.resubscribe();
921 let flag = shutdown_flag.clone();
922 tokio::spawn(async move {
923 if rx.recv().await.is_ok() {
924 log::info!(target: LOG_MODULE, "shutdown signal received; notifying firehose threads");
925 flag.store(true, Ordering::SeqCst);
926 }
927 });
928 }
929
930 let shared_ripget_client: Option<ripget::Client> = if sequential {
932 Some(
933 ripget::build_client(Some(&format!(
934 "jetstreamer-firehose/{}",
935 env!("CARGO_PKG_VERSION")
936 )))
937 .expect("failed to build ripget HTTP client"),
938 )
939 } else {
940 None
941 };
942
943 let mut handles = Vec::new();
944 let error_counts: Arc<Vec<AtomicU32>> =
946 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
947
948 let overall_slots_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
949 let overall_blocks_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
950 let overall_transactions_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
951 let overall_entries_processed: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
952 let pending_skipped_slots: Arc<
953 DashMap<usize, DashSet<u64, ahash::RandomState>, ahash::RandomState>,
954 > = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
955
956 for (thread_index, mut slot_range) in subranges.into_iter().enumerate() {
957 let error_counts = error_counts.clone();
958 let client = client.clone();
959 let on_block = on_block.clone();
960 let on_tx = on_tx.clone();
961 let on_entry = on_entry.clone();
962 let on_reward = on_rewards.clone();
963 let on_error = on_error.clone();
964 let overall_slots_processed = overall_slots_processed.clone();
965 let overall_blocks_processed = overall_blocks_processed.clone();
966 let overall_transactions_processed = overall_transactions_processed.clone();
967 let overall_entries_processed = overall_entries_processed.clone();
968 let stats_tracking = stats_tracking.clone();
969 let transactions_since_stats = Arc::new(AtomicU64::new(0));
970 let blocks_since_stats = Arc::new(AtomicU64::new(0));
971 let slots_since_stats = Arc::new(AtomicU64::new(0));
972 let last_pulse = Arc::new(AtomicU64::new(0));
973 let transactions_since_stats_cloned = transactions_since_stats.clone();
974 let blocks_since_stats_cloned = blocks_since_stats.clone();
975 let slots_since_stats_cloned = slots_since_stats.clone();
976 let last_pulse_cloned = last_pulse.clone();
977 let shutdown_flag = shutdown_flag.clone();
978 let pending_skipped_slots = pending_skipped_slots.clone();
979 let thread_shutdown_rx = shutdown_signal.as_ref().map(|rx| rx.resubscribe());
980 let sequential_mode = sequential;
981 let reverse_mode_local = reverse_mode;
982 let ripget_threads = sequential_download_threads;
983 let ripget_buffer_window_bytes = sequential_buffer_window_bytes;
984 let ripget_client = shared_ripget_client.clone();
985
986 let handle = tokio::spawn(async move {
987 let transactions_since_stats = transactions_since_stats_cloned;
988 let blocks_since_stats = blocks_since_stats_cloned;
989 let slots_since_stats = slots_since_stats_cloned;
990 let last_pulse = last_pulse_cloned;
991 let mut shutdown_rx = thread_shutdown_rx;
992 let start_time = firehose_start;
993 last_pulse.store(
994 firehose_start.elapsed().as_nanos() as u64,
995 Ordering::Relaxed,
996 );
997 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_index);
998 let mut skip_until_index = None;
999 let last_emitted_slot = slot_range.start.saturating_sub(1);
1000 let block_enabled = on_block.is_some();
1001 let tx_enabled = on_tx.is_some();
1002 let entry_enabled = on_entry.is_some();
1003 let reward_enabled = on_reward.is_some();
1004 let tracking_enabled = stats_tracking.is_some();
1005 if block_enabled {
1006 pending_skipped_slots
1007 .entry(thread_index)
1008 .or_insert_with(|| DashSet::with_hasher(ahash::RandomState::new()));
1009 }
1010 let mut last_counted_slot = slot_range.start.saturating_sub(1);
1011 let mut last_emitted_slot_global = slot_range.start.saturating_sub(1);
1012 let mut reverse_partial_resume: Option<u64> = None;
1014 let mut reverse_highest_remaining_epoch: u64 = if reverse_mode_local {
1015 slot_to_epoch(slot_range.end.saturating_sub(1))
1016 } else {
1017 0
1018 };
1019 let mut thread_stats = if tracking_enabled {
1020 Some(ThreadStats {
1021 thread_id: thread_index,
1022 start_time,
1023 finish_time: None,
1024 slot_range: slot_range.clone(),
1025 initial_slot_range: slot_range.clone(),
1026 current_slot: slot_range.start,
1027 slots_processed: 0,
1028 blocks_processed: 0,
1029 leader_skipped_slots: 0,
1030 transactions_processed: 0,
1031 entries_processed: 0,
1032 rewards_processed: 0,
1033 })
1034 } else {
1035 None
1036 };
1037
1038 while let Err((err, slot)) = async {
1040 let mut last_emitted_slot = last_emitted_slot_global;
1041 let op_timeout = if sequential_mode {
1042 OP_TIMEOUT_SEQUENTIAL
1043 } else {
1044 OP_TIMEOUT
1045 };
1046 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1047 log::info!(
1048 target: &log_target,
1049 "shutdown requested; terminating firehose thread {}",
1050 thread_index
1051 );
1052 return Ok(());
1053 }
1054 let lowest_epoch = slot_to_epoch(slot_range.start);
1055 let highest_epoch = slot_to_epoch(slot_range.end - 1);
1056 let epoch_range = lowest_epoch..=highest_epoch;
1057 log::info!(
1058 target: &log_target,
1059 "slot range: {} (epoch {}) ... {} (epoch {})",
1060 slot_range.start,
1061 slot_to_epoch(slot_range.start),
1062 slot_range.end,
1063 slot_to_epoch(slot_range.end)
1064 );
1065
1066 log::info!(target: &log_target, "🚒 starting firehose...");
1067
1068 let mut current_slot: Option<u64> = None;
1070 let epoch_iter: Vec<u64> = if reverse_mode_local {
1071 if reverse_highest_remaining_epoch < lowest_epoch {
1072 return Ok(());
1074 }
1075 (lowest_epoch..=reverse_highest_remaining_epoch)
1076 .rev()
1077 .collect()
1078 } else {
1079 epoch_range.clone().collect()
1080 };
1081 for epoch_num in epoch_iter {
1082 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1083 log::info!(
1084 target: &log_target,
1085 "shutdown requested; terminating firehose thread {}",
1086 thread_index
1087 );
1088 return Ok(());
1089 }
1090 log::info!(target: &log_target, "entering epoch {}", epoch_num);
1091 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
1092 let local_start = if reverse_mode_local {
1093 match reverse_partial_resume {
1094 Some(s) if slot_to_epoch(s) == epoch_num => {
1095 std::cmp::max(epoch_start, s)
1096 }
1097 _ => std::cmp::max(slot_range.start, epoch_start),
1098 }
1099 } else {
1100 std::cmp::max(slot_range.start, epoch_start)
1101 };
1102 let local_end_inclusive =
1103 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
1104 if local_start > local_end_inclusive {
1105 log::debug!(
1106 target: &log_target,
1107 "epoch {} has no overlap with thread range ({}..{}), skipping",
1108 epoch_num,
1109 slot_range.start,
1110 slot_range.end
1111 );
1112 continue;
1113 }
1114 let use_sequential_stream = sequential_mode && local_start == epoch_start;
1115 let stream = match timeout(op_timeout, async {
1116 if use_sequential_stream {
1117 fetch_epoch_stream_with_options(
1118 epoch_num,
1119 &client,
1120 Some(FetchEpochStreamOptions {
1121 sequential: true,
1122 ripget_threads,
1123 buffer_window_bytes: ripget_buffer_window_bytes,
1124 ripget_client: ripget_client.clone(),
1125 }),
1126 )
1127 .await
1128 } else {
1129 fetch_epoch_stream(epoch_num, &client).await
1130 }
1131 })
1132 .await
1133 {
1134 Ok(stream) => stream,
1135 Err(_) => {
1136 return Err((
1137 FirehoseError::OperationTimeout("fetch_epoch_stream"),
1138 current_slot.unwrap_or(slot_range.start),
1139 ));
1140 }
1141 };
1142 let mut reader = NodeReader::new(stream);
1143
1144 let header_fut = reader.read_raw_header();
1145 let header = match timeout(op_timeout, header_fut).await {
1146 Ok(res) => res
1147 .map_err(FirehoseError::ReadHeader)
1148 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1149 Err(_) => {
1150 return Err((
1151 FirehoseError::OperationTimeout("read_raw_header"),
1152 current_slot.unwrap_or(slot_range.start),
1153 ));
1154 }
1155 };
1156 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
1157
1158 let mut previous_blockhash = Hash::default();
1159 let mut latest_entry_blockhash = Hash::default();
1160 last_counted_slot = local_start.saturating_sub(1);
1163 current_slot = None;
1164 if reverse_mode_local {
1165 last_emitted_slot = local_start.saturating_sub(1);
1169 }
1170 if tracking_enabled
1171 && let Some(ref mut stats) = thread_stats {
1172 stats.current_slot = local_start;
1173 stats.slot_range.start = local_start;
1174 }
1175
1176 if local_start > epoch_start {
1177 let seek_slot = match timeout(
1180 OP_TIMEOUT,
1181 find_previous_indexed_slot(local_start, epoch_start, &log_target),
1182 )
1183 .await
1184 {
1185 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
1186 Err(_) => {
1187 return Err((
1188 FirehoseError::OperationTimeout(
1189 "seek_to_previous_indexed_slot",
1190 ),
1191 current_slot.unwrap_or(slot_range.start),
1192 ));
1193 }
1194 };
1195 if let Some(seek_slot) = seek_slot {
1196 let seek_fut = reader.seek_to_slot(seek_slot);
1197 match timeout(op_timeout, seek_fut).await {
1198 Ok(res) => {
1199 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
1200 }
1201 Err(_) => {
1202 return Err((
1203 FirehoseError::OperationTimeout("seek_to_slot"),
1204 current_slot.unwrap_or(slot_range.start),
1205 ));
1206 }
1207 }
1208 }
1209 }
1210
1211 let mut item_index = 0;
1213 let mut displayed_skip_message = false;
1214 loop {
1215 if poll_shutdown(&shutdown_flag, &mut shutdown_rx) {
1216 log::info!(
1217 target: &log_target,
1218 "shutdown requested; terminating firehose thread {}",
1219 thread_index
1220 );
1221 return Ok(());
1222 }
1223 let read_fut = reader.read_until_block();
1224 let nodes = match timeout(op_timeout, read_fut).await {
1225 Ok(result) => result
1226 .map_err(FirehoseError::ReadUntilBlockError)
1227 .map_err(|e| {
1228 (
1229 e,
1230 current_slot
1231 .map(|slot| slot.saturating_add(1))
1232 .unwrap_or(slot_range.start),
1233 )
1234 })?,
1235 Err(_) => {
1236 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
1237 return Err((FirehoseError::OperationTimeout("read_until_block"), current_slot.map(|s| s + 1).unwrap_or(slot_range.start)));
1238 }
1239 };
1240 if nodes.is_empty() {
1241 log::info!(
1242 target: &log_target,
1243 "reached end of epoch {}",
1244 epoch_num
1245 );
1246 break;
1247 }
1248 if let Some(last_node) = nodes.0.last()
1249 && !last_node.get_node().is_block()
1250 {
1251 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
1252 break;
1253 }
1254 let block = nodes
1255 .get_block()
1256 .map_err(FirehoseError::GetBlockError)
1257 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
1258 log::debug!(
1259 target: &log_target,
1260 "read {} items from epoch {}, now at slot {}",
1261 item_index,
1262 epoch_num,
1263 block.slot
1264 );
1265 let slot = block.slot;
1266 if slot > local_end_inclusive {
1267 log::debug!(
1268 target: &log_target,
1269 "reached end of local slice at slot {} (epoch {}), stopping",
1270 slot,
1271 epoch_num
1272 );
1273 break;
1274 }
1275 if slot >= slot_range.end {
1276 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
1277 if reverse_mode_local {
1283 break;
1284 }
1285 if block_enabled {
1286 pending_skipped_slots.remove(&thread_index);
1287 }
1288 return Ok(());
1289 }
1290 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
1291 if slot < slot_range.start {
1292 if slot.saturating_add(1) == slot_range.start {
1293 log::debug!(
1294 target: &log_target,
1295 "priming reader with preceding slot {}, skipping",
1296 slot
1297 );
1298 } else {
1299 log::warn!(
1300 target: &log_target,
1301 "encountered slot {} before start of range {}, skipping",
1302 slot,
1303 slot_range.start
1304 );
1305 }
1306 continue;
1307 }
1308 current_slot = Some(slot);
1309 let mut entry_index: usize = 0;
1310 let mut this_block_executed_transaction_count: u64 = 0;
1311 let mut this_block_entry_count: u64 = 0;
1312 let mut this_block_rewards = DecodedRewards::empty();
1313
1314 for node_with_cid in &nodes.0 {
1315 item_index += 1;
1316 if let Some(skip) = skip_until_index {
1317 if item_index < skip {
1318 if !displayed_skip_message {
1319 log::info!(
1320 target: &log_target,
1321 "skipping until index {} (at {})",
1322 skip,
1323 item_index
1324 );
1325 displayed_skip_message = true;
1326 }
1327 continue;
1328 } else {
1329 log::info!(
1330 target: &log_target,
1331 "reached target index {}, resuming...",
1332 skip
1333 );
1334 skip_until_index = None;
1335 }
1336 }
1337 let node = node_with_cid.get_node();
1338
1339 if let Some(ref mut stats) = thread_stats {
1340 stats.current_slot = slot;
1341 }
1342
1343 let error_slot = current_slot.unwrap_or(slot_range.start);
1344
1345 use crate::node::Node::*;
1346 match node {
1347 Transaction(tx) => {
1348 if tx_enabled
1349 && let Some(on_tx_cb) = on_tx.as_ref()
1350 {
1351 let error_slot = current_slot.unwrap_or(slot_range.start);
1352 let versioned_tx = tx.as_parsed().map_err(|err| {
1353 (
1354 FirehoseError::NodeDecodingError(item_index, err),
1355 error_slot,
1356 )
1357 })?;
1358 let reassembled_metadata = nodes
1359 .reassemble_dataframes(&tx.metadata)
1360 .map_err(|err| {
1361 (
1362 FirehoseError::NodeDecodingError(item_index, err),
1363 error_slot,
1364 )
1365 })?;
1366
1367 let as_native_metadata = decode_transaction_status_meta_from_frame(
1368 block.slot,
1369 reassembled_metadata,
1370 )
1371 .map_err(|err| {
1372 (
1373 FirehoseError::NodeDecodingError(item_index, err),
1374 error_slot,
1375 )
1376 })?;
1377
1378 let message_hash = {
1379 #[cfg(feature = "verify-transaction-signatures")]
1380 {
1381 versioned_tx.verify_and_hash_message().map_err(|err| {
1382 (
1383 FirehoseError::TransactionHandlerError(Box::new(err)),
1384 error_slot,
1385 )
1386 })?
1387 }
1388 #[cfg(not(feature = "verify-transaction-signatures"))]
1389 {
1390 versioned_tx.message.hash()
1391 }
1392 };
1393 let signature = versioned_tx
1394 .signatures
1395 .first()
1396 .ok_or_else(|| {
1397 Box::new(std::io::Error::new(
1398 std::io::ErrorKind::InvalidData,
1399 "transaction missing signature",
1400 )) as SharedError
1401 })
1402 .map_err(|err| {
1403 (
1404 FirehoseError::NodeDecodingError(
1405 item_index,
1406 err,
1407 ),
1408 error_slot,
1409 )
1410 })?;
1411 let is_vote = is_simple_vote_transaction(&versioned_tx);
1412
1413 on_tx_cb(
1414 thread_index,
1415 TransactionData {
1416 slot: block.slot,
1417 transaction_slot_index: tx.index.unwrap() as usize,
1418 signature: *signature,
1419 message_hash,
1420 is_vote,
1421 transaction_status_meta: as_native_metadata,
1422 transaction: versioned_tx,
1423 },
1424 )
1425 .await
1426 .map_err(|e| {
1427 (
1428 FirehoseError::TransactionHandlerError(e),
1429 error_slot,
1430 )
1431 })?;
1432 }
1433 fetch_add_if(
1434 tracking_enabled,
1435 &overall_transactions_processed,
1436 1,
1437 );
1438 if let Some(ref mut stats) = thread_stats {
1439 stats.transactions_processed += 1;
1440 }
1441 transactions_since_stats.fetch_add(1, Ordering::Relaxed);
1442 }
1443 Entry(entry) => {
1444 let entry_hash = Hash::from(entry.hash.to_bytes());
1445 let entry_transaction_count = entry.transactions.len();
1446 let entry_transaction_count_u64 = entry_transaction_count as u64;
1447 let starting_transaction_index_u64 =
1448 this_block_executed_transaction_count;
1449 latest_entry_blockhash = entry_hash;
1450 this_block_executed_transaction_count += entry_transaction_count_u64;
1451 this_block_entry_count += 1;
1452
1453 if entry_enabled && let Some(on_entry_cb) = on_entry.as_ref() {
1454 let starting_transaction_index = usize::try_from(
1455 starting_transaction_index_u64,
1456 )
1457 .map_err(|err| {
1458 (
1459 FirehoseError::EntryHandlerError(Box::new(err)),
1460 error_slot,
1461 )
1462 })?;
1463 let transaction_indexes_end =
1464 starting_transaction_index + entry_transaction_count;
1465 on_entry_cb(
1466 thread_index,
1467 EntryData {
1468 slot: block.slot,
1469 entry_index,
1470 transaction_indexes: starting_transaction_index
1471 ..transaction_indexes_end,
1472 num_hashes: entry.num_hashes,
1473 hash: entry_hash,
1474 },
1475 )
1476 .await
1477 .map_err(|e| {
1478 (
1479 FirehoseError::EntryHandlerError(e),
1480 error_slot,
1481 )
1482 })?;
1483 }
1484 entry_index += 1;
1485 fetch_add_if(
1486 tracking_enabled,
1487 &overall_entries_processed,
1488 1,
1489 );
1490 if let Some(ref mut stats) = thread_stats {
1491 stats.entries_processed += 1;
1492 }
1493 }
1494 Block(block) => {
1495 let prev_last_counted_slot = last_counted_slot;
1496 let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
1497 (
1498 stats.slots_processed,
1499 stats.blocks_processed,
1500 stats.leader_skipped_slots,
1501 stats.current_slot,
1502 )
1503 });
1504
1505 let next_expected_slot = prev_last_counted_slot.saturating_add(1);
1506 let skip_start_from_previous = last_counted_slot.saturating_add(1);
1507 let skip_start = skip_start_from_previous.max(next_expected_slot);
1508
1509 let skipped_epoch = slot_to_epoch(last_counted_slot);
1510 for skipped_slot in skip_start..slot {
1511 if slot_to_epoch(skipped_slot) != skipped_epoch {
1512 break;
1513 }
1514 log::debug!(
1515 target: &log_target,
1516 "leader skipped slot {} (prev_counted {}, current slot {})",
1517 skipped_slot,
1518 prev_last_counted_slot,
1519 slot,
1520 );
1521 if block_enabled {
1522 pending_skipped_slots
1523 .entry(thread_index)
1524 .or_default()
1525 .insert(skipped_slot);
1526 }
1527 if block_enabled
1528 && let Some(on_block_cb) = on_block.as_ref()
1529 && skipped_slot > last_emitted_slot {
1530 last_emitted_slot = skipped_slot;
1531 on_block_cb(
1532 thread_index,
1533 BlockData::PossibleLeaderSkipped {
1534 slot: skipped_slot,
1535 },
1536 )
1537 .await
1538 .map_err(|e| {
1539 (
1540 FirehoseError::BlockHandlerError(e),
1541 error_slot,
1542 )
1543 })?;
1544 }
1545 if tracking_enabled {
1546 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1547 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1548 if let Some(ref mut stats) = thread_stats {
1549 stats.leader_skipped_slots += 1;
1550 stats.slots_processed += 1;
1551 stats.current_slot = skipped_slot;
1552 }
1553 }
1554 last_counted_slot = skipped_slot;
1555 }
1556
1557 let cleared_pending_skip = if block_enabled {
1558 clear_pending_skip(
1559 &pending_skipped_slots,
1560 thread_index,
1561 slot,
1562 )
1563 } else {
1564 false
1565 };
1566
1567 if slot <= last_counted_slot && !cleared_pending_skip {
1568 log::debug!(
1569 target: &log_target,
1570 "duplicate block {}, already counted (last_counted={})",
1571 slot,
1572 last_counted_slot,
1573 );
1574 this_block_rewards = DecodedRewards::empty();
1575 continue;
1576 }
1577
1578 if block_enabled {
1579 if let Some(on_block_cb) = on_block.as_ref() {
1580 let DecodedRewards {
1581 keyed_rewards,
1582 num_partitions,
1583 } = std::mem::take(&mut this_block_rewards);
1584 if slot > last_emitted_slot {
1585 last_emitted_slot = slot;
1586 on_block_cb(
1587 thread_index,
1588 BlockData::Block {
1589 parent_slot: block.meta.parent_slot,
1590 parent_blockhash: previous_blockhash,
1591 slot: block.slot,
1592 blockhash: latest_entry_blockhash,
1593 rewards: KeyedRewardsAndNumPartitions {
1594 keyed_rewards,
1595 num_partitions,
1596 },
1597 block_time: Some(block.meta.blocktime as i64),
1598 block_height: block.meta.block_height,
1599 executed_transaction_count:
1600 this_block_executed_transaction_count,
1601 entry_count: this_block_entry_count,
1602 },
1603 )
1604 .await
1605 .map_err(|e| {
1606 (
1607 FirehoseError::BlockHandlerError(e),
1608 error_slot,
1609 )
1610 })?;
1611 }
1612 }
1613 } else {
1614 this_block_rewards = DecodedRewards::empty();
1615 }
1616 previous_blockhash = latest_entry_blockhash;
1617
1618 if tracking_enabled {
1619 overall_slots_processed.fetch_add(1, Ordering::Relaxed);
1620 overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1621 slots_since_stats.fetch_add(1, Ordering::Relaxed);
1622 blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1623 if let Some(ref mut stats) = thread_stats {
1624 stats.blocks_processed += 1;
1625 stats.slots_processed += 1;
1626 stats.current_slot = slot;
1627 }
1628
1629 if let (Some(stats_tracking_cfg), Some(thread_stats_ref)) =
1630 (&stats_tracking, thread_stats.as_mut())
1631 && slot % stats_tracking_cfg.tracking_interval_slots == 0
1632 && let Err(err) = maybe_emit_stats(
1633 stats_tracking.as_ref(),
1634 thread_index,
1635 thread_stats_ref,
1636 &overall_slots_processed,
1637 &overall_blocks_processed,
1638 &overall_transactions_processed,
1639 &overall_entries_processed,
1640 &transactions_since_stats,
1641 &blocks_since_stats,
1642 &slots_since_stats,
1643 &last_pulse,
1644 start_time,
1645 )
1646 .await
1647 {
1648 blocks_since_stats.fetch_sub(1, Ordering::Relaxed);
1649 slots_since_stats.fetch_sub(1, Ordering::Relaxed);
1650 overall_blocks_processed
1651 .fetch_sub(1, Ordering::Relaxed);
1652 overall_slots_processed
1653 .fetch_sub(1, Ordering::Relaxed);
1654 if let Some((
1655 prev_slots_processed,
1656 prev_blocks_processed,
1657 prev_leader_skipped,
1658 prev_current_slot,
1659 )) = thread_stats_snapshot
1660 {
1661 thread_stats_ref.slots_processed =
1662 prev_slots_processed;
1663 thread_stats_ref.blocks_processed =
1664 prev_blocks_processed;
1665 thread_stats_ref.leader_skipped_slots =
1666 prev_leader_skipped;
1667 thread_stats_ref.current_slot =
1668 prev_current_slot;
1669 }
1670 last_counted_slot = prev_last_counted_slot;
1671 return Err(err);
1672 }
1673 }
1674
1675 if slot > last_counted_slot {
1676 last_counted_slot = slot;
1677 }
1678 }
1679 Subset(_subset) => (),
1680 Epoch(_epoch) => (),
1681 Rewards(rewards) => {
1682 if reward_enabled || block_enabled {
1683 let reassembled = nodes
1684 .reassemble_dataframes(&rewards.data)
1685 .map_err(|err| {
1686 (
1687 FirehoseError::NodeDecodingError(item_index, err),
1688 current_slot.unwrap_or(slot_range.start),
1689 )
1690 })?;
1691 if reassembled.is_empty() {
1692 this_block_rewards = DecodedRewards::empty();
1693 if reward_enabled
1694 && let Some(on_reward_cb) = on_reward.as_ref()
1695 {
1696 on_reward_cb(
1697 thread_index,
1698 RewardsData {
1699 slot: block.slot,
1700 rewards: Vec::new(),
1701 },
1702 )
1703 .await
1704 .map_err(|e| {
1705 (
1706 FirehoseError::RewardHandlerError(e),
1707 error_slot,
1708 )
1709 })?;
1710 }
1711 continue;
1712 }
1713
1714 let decoded_rewards =
1715 decode_rewards_from_frame(block.slot, reassembled)
1716 .map_err(|err| {
1717 (
1718 FirehoseError::NodeDecodingError(
1719 item_index,
1720 err,
1721 ),
1722 error_slot,
1723 )
1724 })?;
1725 if reward_enabled
1726 && let Some(on_reward_cb) = on_reward.as_ref()
1727 {
1728 on_reward_cb(
1729 thread_index,
1730 RewardsData {
1731 slot: block.slot,
1732 rewards: decoded_rewards.keyed_rewards.clone(),
1733 },
1734 )
1735 .await
1736 .map_err(|e| {
1737 (
1738 FirehoseError::RewardHandlerError(e),
1739 error_slot,
1740 )
1741 })?;
1742 }
1743 this_block_rewards = decoded_rewards;
1744 if let Some(ref mut stats) = thread_stats {
1745 stats.rewards_processed +=
1746 this_block_rewards.keyed_rewards.len() as u64;
1747 }
1748 }
1749 }
1750 DataFrame(_data_frame) => (),
1751 }
1752 }
1753 if !reverse_mode_local && block.slot == slot_range.end - 1 {
1754 let finish_time = std::time::Instant::now();
1755 let elapsed = finish_time.duration_since(start_time);
1756 log::info!(target: &log_target, "processed slot {}", block.slot);
1757 let elapsed_pretty = human_readable_duration(elapsed);
1758 log::info!(
1759 target: &log_target,
1760 "processed {} slots across {} epochs in {}.",
1761 slot_range.end - slot_range.start,
1762 slot_to_epoch(slot_range.end) + 1 - slot_to_epoch(slot_range.start),
1763 elapsed_pretty
1764 );
1765 log::info!(target: &log_target, "a 🚒 firehose thread completed its work.");
1766 let summary: String = error_counts
1769 .iter()
1770 .enumerate()
1771 .filter_map(|(i, c)| {
1772 let v = c.load(Ordering::Relaxed);
1773 if v > 0 {
1774 Some(format!("{:03}({})", i, v))
1775 } else {
1776 None
1777 }
1778 })
1779 .collect::<Vec<_>>()
1780 .join(", ");
1781 if !summary.is_empty() {
1782 log::debug!(target: &log_target, "threads with errors: {}", summary);
1783 }
1784 return Ok(());
1785 }
1786 }
1787 if reverse_mode_local {
1788 if epoch_num == reverse_highest_remaining_epoch {
1790 reverse_highest_remaining_epoch =
1791 reverse_highest_remaining_epoch.saturating_sub(1);
1792 }
1793 if matches!(
1794 reverse_partial_resume,
1795 Some(s) if slot_to_epoch(s) == epoch_num
1796 ) {
1797 reverse_partial_resume = None;
1798 }
1799 }
1800 if let Some(expected_last_slot) = slot_range.end.checked_sub(1)
1801 && last_counted_slot < expected_last_slot
1802 {
1803 }
1806 if let Some(ref mut stats) = thread_stats {
1807 stats.finish_time = Some(std::time::Instant::now());
1808 maybe_emit_stats(
1809 stats_tracking.as_ref(),
1810 thread_index,
1811 stats,
1812 &overall_slots_processed,
1813 &overall_blocks_processed,
1814 &overall_transactions_processed,
1815 &overall_entries_processed,
1816 &transactions_since_stats,
1817 &blocks_since_stats,
1818 &slots_since_stats,
1819 &last_pulse,
1820 start_time,
1821 )
1822 .await?;
1823 }
1824 if block_enabled {
1825 pending_skipped_slots.remove(&thread_index);
1826 }
1827 log::info!(target: &log_target, "thread {} has finished its work", thread_index);
1828 }
1829 Ok(())
1830 }
1831 .await
1832 {
1833 if is_shutdown_error(&err) {
1834 log::info!(
1835 target: &log_target,
1836 "shutdown requested; terminating firehose thread {}",
1837 thread_index
1838 );
1839 break;
1840 }
1841 let epoch = slot_to_epoch(slot);
1842 let item_index = match &err {
1843 FirehoseError::NodeDecodingError(item_index, _) => *item_index,
1844 _ => 0,
1845 };
1846 let error_message = err.to_string();
1847 log::error!(
1848 target: &log_target,
1849 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
1850 slot,
1851 epoch
1852 );
1853 log::error!(target: &log_target, "{}", error_message);
1854 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
1855 || error_message.contains("Unknown CID version")
1856 {
1857 SLOT_OFFSET_INDEX.invalidate_epoch(epoch);
1860 }
1861 if let Some(on_error_cb) = on_error.clone() {
1862 let context = FirehoseErrorContext {
1863 thread_id: thread_index,
1864 slot,
1865 epoch,
1866 error_message: error_message.clone(),
1867 };
1868 if let Err(handler_err) = on_error_cb(thread_index, context).await {
1869 log::error!(
1870 target: &log_target,
1871 "on_error handler failed: {}",
1872 handler_err
1873 );
1874 }
1875 }
1876 error_counts[thread_index].fetch_add(1, Ordering::Relaxed);
1878 log::warn!(
1879 target: &log_target,
1880 "restarting from slot {} at index {}",
1881 slot,
1882 item_index,
1883 );
1884 if reverse_mode_local {
1888 let resume_slot = if slot <= last_counted_slot {
1892 last_counted_slot.saturating_add(1)
1893 } else {
1894 slot
1895 };
1896 reverse_partial_resume = Some(resume_slot);
1897 } else if slot <= last_counted_slot {
1898 slot_range.start = last_counted_slot.saturating_add(1);
1899 } else {
1900 slot_range.start = slot;
1901 }
1902 last_pulse.store(start_time.elapsed().as_nanos() as u64, Ordering::Relaxed);
1904 if tracking_enabled
1905 && let Some(ref mut stats_ref) = thread_stats {
1906 stats_ref.slot_range.start = slot_range.start;
1907 stats_ref.slot_range.end = slot_range.end;
1908 }
1910 if block_enabled {
1911 pending_skipped_slots.remove(&thread_index);
1912 }
1913 skip_until_index = None;
1917 last_emitted_slot_global = last_emitted_slot;
1918 }
1919 });
1920 handles.push(handle);
1921 }
1922
1923 for handle in handles {
1925 handle.await.unwrap();
1926 }
1927 if stats_tracking.is_some() {
1928 let elapsed = firehose_start.elapsed();
1929 let elapsed_secs = elapsed.as_secs_f64();
1930 let total_slots = overall_slots_processed.load(Ordering::Relaxed);
1931 let total_blocks = overall_blocks_processed.load(Ordering::Relaxed);
1932 let total_transactions = overall_transactions_processed.load(Ordering::Relaxed);
1933 let total_leader_skipped = total_slots.saturating_sub(total_blocks);
1934 let total_errors: u64 = error_counts
1935 .iter()
1936 .map(|counter| counter.load(Ordering::Relaxed) as u64)
1937 .sum();
1938 let overall_tps = if elapsed_secs > 0.0 {
1939 total_transactions as f64 / elapsed_secs
1940 } else {
1941 0.0
1942 };
1943 log::info!(
1944 target: LOG_MODULE,
1945 "firehose summary: elapsed={:.2}s, slots={}, blocks={}, leader_skipped={}, transactions={}, overall_tps={:.2}, total_errors={}",
1946 elapsed_secs,
1947 total_slots,
1948 total_blocks,
1949 total_leader_skipped,
1950 total_transactions,
1951 overall_tps,
1952 total_errors
1953 );
1954 }
1955 if shutdown_flag.load(Ordering::SeqCst) {
1956 log::info!(target: LOG_MODULE, "firehose shutdown complete; all threads exited cleanly.");
1957 } else {
1958 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
1959 }
1960 Ok(())
1961}
1962
1963#[allow(clippy::result_large_err)]
1964pub fn firehose_geyser(
1971 rt: Arc<tokio::runtime::Runtime>,
1972 slot_range: Range<u64>,
1973 geyser_config_files: Option<&[PathBuf]>,
1974 index_base_url: &Url,
1975 client: &Client,
1976 on_load: impl Future<Output = Result<(), SharedError>> + Send + 'static,
1977 threads: u64,
1978) -> Result<Receiver<SlotNotification>, (FirehoseError, u64)> {
1979 if threads == 0 {
1980 return Err((
1981 FirehoseError::OnLoadError("Number of threads must be greater than 0".into()),
1982 slot_range.start,
1983 ));
1984 }
1985 log::info!(target: LOG_MODULE, "starting firehose...");
1986 log::info!(target: LOG_MODULE, "index base url: {}", index_base_url);
1987 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
1988 let mut entry_notifier_maybe = None;
1989 let mut block_meta_notifier_maybe = None;
1990 let mut transaction_notifier_maybe = None;
1991 if let Some(geyser_config_files) = geyser_config_files {
1992 log::debug!(target: LOG_MODULE, "geyser config files: {:?}", geyser_config_files);
1993
1994 let service =
1995 solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService::new(
1996 confirmed_bank_receiver.clone(),
1997 true,
1998 geyser_config_files,
1999 )
2000 .map_err(|e| (e.into(), slot_range.start))?;
2001
2002 transaction_notifier_maybe = Some(
2003 service
2004 .get_transaction_notifier()
2005 .ok_or(FirehoseError::FailedToGetTransactionNotifier)
2006 .map_err(|e| (e, slot_range.start))?,
2007 );
2008
2009 entry_notifier_maybe = service.get_entry_notifier();
2010 block_meta_notifier_maybe = service.get_block_metadata_notifier();
2011
2012 log::debug!(target: LOG_MODULE, "geyser plugin service initialized.");
2013 }
2014
2015 if entry_notifier_maybe.is_some() {
2016 log::debug!(target: LOG_MODULE, "entry notifications enabled")
2017 } else {
2018 log::debug!(target: LOG_MODULE, "none of the plugins have enabled entry notifications")
2019 }
2020 log::info!(target: LOG_MODULE, "running on_load...");
2021 rt.spawn(on_load);
2022
2023 let slot_range = Arc::new(slot_range);
2024 let transaction_notifier_maybe = Arc::new(transaction_notifier_maybe);
2025 let entry_notifier_maybe = Arc::new(entry_notifier_maybe);
2026 let block_meta_notifier_maybe = Arc::new(block_meta_notifier_maybe);
2027 let confirmed_bank_sender = Arc::new(confirmed_bank_sender);
2028
2029 let subranges = generate_subranges(&slot_range, threads);
2031 if threads > 1 {
2032 log::info!(target: LOG_MODULE, "âš¡ thread sub-ranges: {:?}", subranges);
2033 }
2034
2035 let mut handles = Vec::new();
2036 let error_counts: Arc<Vec<AtomicU32>> =
2038 Arc::new((0..subranges.len()).map(|_| AtomicU32::new(0)).collect());
2039
2040 for (i, slot_range) in subranges.into_iter().enumerate() {
2041 let transaction_notifier_maybe = (*transaction_notifier_maybe).clone();
2042 let entry_notifier_maybe = (*entry_notifier_maybe).clone();
2043 let block_meta_notifier_maybe = (*block_meta_notifier_maybe).clone();
2044 let confirmed_bank_sender = (*confirmed_bank_sender).clone();
2045 let client = client.clone();
2046 let error_counts = error_counts.clone();
2047
2048 let rt_clone = rt.clone();
2049
2050 let handle = std::thread::spawn(move || {
2051 rt_clone.block_on(async {
2052 firehose_geyser_thread(
2053 slot_range,
2054 transaction_notifier_maybe,
2055 entry_notifier_maybe,
2056 block_meta_notifier_maybe,
2057 confirmed_bank_sender,
2058 &client,
2059 if threads > 1 { Some(i) } else { None },
2060 error_counts,
2061 )
2062 .await
2063 .unwrap();
2064 });
2065 });
2066 handles.push(handle);
2067 }
2068
2069 for handle in handles {
2071 handle.join().unwrap();
2072 }
2073 log::info!(target: LOG_MODULE, "🚒 firehose finished successfully.");
2074 if let Some(block_meta_notifier) = block_meta_notifier_maybe.as_ref() {
2075 block_meta_notifier.notify_block_metadata(
2076 u64::MAX,
2077 "unload",
2078 u64::MAX,
2079 "unload",
2080 &KeyedRewardsAndNumPartitions {
2081 keyed_rewards: vec![],
2082 num_partitions: None,
2083 },
2084 None,
2085 None,
2086 0,
2087 0,
2088 );
2089 }
2090 Ok(confirmed_bank_receiver)
2091}
2092
2093#[allow(clippy::too_many_arguments)]
2094#[allow(clippy::result_large_err)]
2095async fn firehose_geyser_thread(
2096 mut slot_range: Range<u64>,
2097 transaction_notifier_maybe: Option<Arc<dyn TransactionNotifier + Send + Sync + 'static>>,
2098 entry_notifier_maybe: Option<Arc<dyn EntryNotifier + Send + Sync + 'static>>,
2099 block_meta_notifier_maybe: Option<Arc<dyn BlockMetadataNotifier + Send + Sync + 'static>>,
2100 confirmed_bank_sender: Sender<SlotNotification>,
2101 client: &Client,
2102 thread_index: Option<usize>,
2103 error_counts: Arc<Vec<AtomicU32>>,
2104) -> Result<(), (FirehoseError, u64)> {
2105 let start_time = std::time::Instant::now();
2106 let log_target = if let Some(thread_index) = thread_index {
2107 format!("{}::T{:03}", LOG_MODULE, thread_index)
2108 } else {
2109 LOG_MODULE.to_string()
2110 };
2111 let initial_slot_range = slot_range.clone();
2112 let mut skip_until_index = None;
2113 let mut last_counted_slot = slot_range.start.saturating_sub(1);
2114 while let Err((err, slot)) = async {
2116 let epoch_range = slot_to_epoch(slot_range.start)..=slot_to_epoch(slot_range.end - 1);
2117 log::info!(
2118 target: &log_target,
2119 "slot range: {} (epoch {}) ... {} (epoch {})",
2120 slot_range.start,
2121 slot_to_epoch(slot_range.start),
2122 slot_range.end,
2123 slot_to_epoch(slot_range.end)
2124 );
2125
2126 log::info!(target: &log_target, "🚒 starting firehose...");
2127
2128 let mut current_slot: Option<u64> = None;
2130 for epoch_num in epoch_range.clone() {
2131 log::info!(target: &log_target, "entering epoch {}", epoch_num);
2132 let stream = match timeout(OP_TIMEOUT, fetch_epoch_stream(epoch_num, client)).await {
2133 Ok(stream) => stream,
2134 Err(_) => {
2135 return Err((FirehoseError::OperationTimeout("fetch_epoch_stream"), current_slot.unwrap_or(slot_range.start)));
2136 }
2137 };
2138 let mut reader = NodeReader::new(stream);
2139
2140 let header_fut = reader.read_raw_header();
2141 let header = match timeout(OP_TIMEOUT, header_fut).await {
2142 Ok(res) => res
2143 .map_err(FirehoseError::ReadHeader)
2144 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2145 Err(_) => {
2146 return Err((FirehoseError::OperationTimeout("read_raw_header"), current_slot.unwrap_or(slot_range.start)));
2147 }
2148 };
2149 log::debug!(target: &log_target, "read epoch {} header: {:?}", epoch_num, header);
2150
2151 let (epoch_start, epoch_end_inclusive) = epoch_to_slot_range(epoch_num);
2152 let local_start = std::cmp::max(slot_range.start, epoch_start);
2153 let local_end_inclusive =
2154 std::cmp::min(slot_range.end.saturating_sub(1), epoch_end_inclusive);
2155 if local_start > local_end_inclusive {
2156 log::debug!(
2157 target: &log_target,
2158 "epoch {} has no overlap with thread range ({}..{}), skipping",
2159 epoch_num,
2160 slot_range.start,
2161 slot_range.end
2162 );
2163 continue;
2164 }
2165
2166 let mut todo_previous_blockhash = Hash::default();
2167 let mut todo_latest_entry_blockhash = Hash::default();
2168 last_counted_slot = local_start.saturating_sub(1);
2171 current_slot = None;
2172
2173 if local_start > epoch_start {
2174 let seek_slot = match timeout(
2177 OP_TIMEOUT,
2178 find_previous_indexed_slot(local_start, epoch_start, &log_target),
2179 )
2180 .await
2181 {
2182 Ok(res) => res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2183 Err(_) => {
2184 return Err((
2185 FirehoseError::OperationTimeout(
2186 "seek_to_previous_indexed_slot",
2187 ),
2188 current_slot.unwrap_or(slot_range.start),
2189 ));
2190 }
2191 };
2192 if let Some(seek_slot) = seek_slot {
2193 let seek_fut = reader.seek_to_slot(seek_slot);
2194 match timeout(OP_TIMEOUT, seek_fut).await {
2195 Ok(res) => {
2196 res.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?
2197 }
2198 Err(_) => {
2199 return Err((
2200 FirehoseError::OperationTimeout("seek_to_slot"),
2201 current_slot.unwrap_or(slot_range.start),
2202 ));
2203 }
2204 }
2205 }
2206 }
2207
2208 let mut item_index = 0;
2210 let mut displayed_skip_message = false;
2211 loop {
2212 let read_fut = reader.read_until_block();
2213 let nodes = match timeout(OP_TIMEOUT, read_fut).await {
2214 Ok(result) => result
2215 .map_err(FirehoseError::ReadUntilBlockError)
2216 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?,
2217 Err(_) => {
2218 log::warn!(target: &log_target, "timeout reading next block, retrying (will restart)...");
2219 let restart_slot =
2220 current_slot.map(|s| s + 1).unwrap_or(slot_range.start);
2221 return Err((
2222 FirehoseError::OperationTimeout("read_until_block"),
2223 restart_slot,
2224 ));
2225 }
2226 };
2227 if nodes.is_empty() {
2228 log::info!(
2229 target: &log_target,
2230 "reached end of epoch {}",
2231 epoch_num
2232 );
2233 break;
2234 }
2235 if let Some(last_node) = nodes.0.last()
2244 && !last_node.get_node().is_block() {
2245 log::info!(target: &log_target, "reached end of epoch {}", epoch_num);
2246 break;
2247 }
2248 let block = nodes
2249 .get_block()
2250 .map_err(FirehoseError::GetBlockError)
2251 .map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2252 log::debug!(
2253 target: &log_target,
2254 "read {} items from epoch {}, now at slot {}",
2255 item_index,
2256 epoch_num,
2257 block.slot
2258 );
2259 let slot = block.slot;
2260 if slot > local_end_inclusive {
2261 log::debug!(
2262 target: &log_target,
2263 "reached end of local slice at slot {} (epoch {}), stopping",
2264 slot,
2265 epoch_num
2266 );
2267 break;
2268 }
2269 if slot >= slot_range.end {
2270 log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
2271 return Ok(());
2275 }
2276 debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
2277 if slot < local_start {
2278 if slot.saturating_add(1) == local_start {
2279 log::debug!(
2280 target: &log_target,
2281 "priming reader with preceding slot {}, skipping",
2282 slot
2283 );
2284 } else {
2285 log::warn!(
2286 target: &log_target,
2287 "encountered slot {} before start of range {}, skipping",
2288 slot,
2289 local_start
2290 );
2291 }
2292 continue;
2293 }
2294 current_slot = Some(slot);
2295 let mut entry_index: usize = 0;
2296 let mut this_block_executed_transaction_count: u64 = 0;
2297 let mut this_block_entry_count: u64 = 0;
2298 let mut this_block_rewards = DecodedRewards::empty();
2299
2300 if slot <= last_counted_slot {
2301 log::debug!(
2302 target: &log_target,
2303 "duplicate block {}, already counted (last_counted={})",
2304 slot,
2305 last_counted_slot,
2306 );
2307 continue;
2308 }
2309
2310 nodes.each(|node_with_cid| -> Result<(), SharedError> {
2311 item_index += 1;
2312 if let Some(skip) = skip_until_index {
2318 if item_index < skip {
2319 if !displayed_skip_message {
2320 log::info!(
2321 target: &log_target,
2322 "skipping until index {} (at {})",
2323 skip,
2324 item_index
2325 );
2326 displayed_skip_message = true;
2327 }
2328 return Ok(());
2329 } else {
2330 log::info!(
2331 target: &log_target,
2332 "reached target index {}, resuming...",
2333 skip
2334 );
2335 skip_until_index = None;
2336 }
2337 }
2338 let node = node_with_cid.get_node();
2339
2340 use crate::node::Node::*;
2341 match node {
2342 Transaction(tx) => {
2343 let versioned_tx = tx.as_parsed()?;
2344 let reassembled_metadata = nodes.reassemble_dataframes(&tx.metadata)?;
2345
2346 let as_native_metadata = decode_transaction_status_meta_from_frame(
2347 block.slot,
2348 reassembled_metadata,
2349 )?;
2350
2351 let message_hash = {
2352 #[cfg(feature = "verify-transaction-signatures")]
2353 {
2354 versioned_tx.verify_and_hash_message()?
2355 }
2356 #[cfg(not(feature = "verify-transaction-signatures"))]
2357 {
2358 versioned_tx.message.hash()
2361 }
2362 };
2363 let signature = versioned_tx
2364 .signatures
2365 .first()
2366 .ok_or_else(|| {
2367 Box::new(std::io::Error::new(
2368 std::io::ErrorKind::InvalidData,
2369 "transaction missing signature",
2370 )) as SharedError
2371 })?;
2372 let is_vote = is_simple_vote_transaction(&versioned_tx);
2373
2374 if let Some(transaction_notifier) = transaction_notifier_maybe.as_ref() {
2375 transaction_notifier.notify_transaction(
2376 block.slot,
2377 tx.index.unwrap() as usize,
2378 signature,
2379 &message_hash,
2380 is_vote,
2381 &as_native_metadata,
2382 &versioned_tx,
2383 );
2384 }
2385
2386 }
2387 Entry(entry) => {
2388 let entry_hash = Hash::from(entry.hash.to_bytes());
2389 let entry_transaction_count = entry.transactions.len();
2390 let entry_transaction_count_u64 = entry_transaction_count as u64;
2391 let starting_transaction_index =
2392 usize::try_from(this_block_executed_transaction_count).map_err(|_| {
2393 Box::new(std::io::Error::other(
2394 "transaction index exceeds usize range",
2395 )) as SharedError
2396 })?;
2397 todo_latest_entry_blockhash = entry_hash;
2398 this_block_executed_transaction_count += entry_transaction_count_u64;
2399 this_block_entry_count += 1;
2400 if entry_notifier_maybe.is_none() {
2401 return Ok(());
2402 }
2403 let entry_notifier = entry_notifier_maybe.as_ref().unwrap();
2404 let entry_summary = solana_entry::entry::EntrySummary {
2405 num_hashes: entry.num_hashes,
2406 hash: Hash::from(entry.hash.to_bytes()),
2407 num_transactions: entry_transaction_count_u64,
2408 };
2409 entry_notifier.notify_entry(
2410 block.slot,
2411 entry_index,
2412 &entry_summary,
2413 starting_transaction_index,
2414 );
2415 entry_index += 1;
2416 }
2417 Block(block) => {
2418 let notification = SlotNotification::Root((block.slot, block.meta.parent_slot));
2419 confirmed_bank_sender.send(notification).unwrap();
2420
2421 if block_meta_notifier_maybe.is_none() {
2422 last_counted_slot = block.slot;
2423 return Ok(());
2424 }
2425 let DecodedRewards {
2426 keyed_rewards,
2427 num_partitions,
2428 } = std::mem::take(&mut this_block_rewards);
2429 let block_meta_notifier = block_meta_notifier_maybe.as_ref().unwrap();
2430 block_meta_notifier.notify_block_metadata(
2431 block.meta.parent_slot,
2432 todo_previous_blockhash.to_string().as_str(),
2433 block.slot,
2434 todo_latest_entry_blockhash.to_string().as_str(),
2435 &KeyedRewardsAndNumPartitions {
2436 keyed_rewards,
2437 num_partitions,
2438 },
2439 Some(block.meta.blocktime as i64),
2440 block.meta.block_height,
2441 this_block_executed_transaction_count,
2442 this_block_entry_count,
2443 );
2444 todo_previous_blockhash = todo_latest_entry_blockhash;
2445 last_counted_slot = block.slot;
2446 std::thread::yield_now();
2447 }
2448 Subset(_subset) => (),
2449 Epoch(_epoch) => (),
2450 Rewards(rewards) => {
2451 let reassembled = nodes.reassemble_dataframes(&rewards.data)?;
2452 if !reassembled.is_empty() {
2453 this_block_rewards = decode_rewards_from_frame(
2454 block.slot,
2455 reassembled,
2456 )?;
2457 } else {
2458 this_block_rewards = DecodedRewards::empty();
2459 }
2460 }
2461 DataFrame(_data_frame) => (),
2462 }
2463 Ok(())
2464 })
2465 .map_err(|e| FirehoseError::NodeDecodingError(item_index, e)).map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
2466 if block.slot == slot_range.end - 1 {
2467 let finish_time = std::time::Instant::now();
2468 let elapsed = finish_time.duration_since(start_time);
2469 log::info!(target: &log_target, "processed slot {}", block.slot);
2470 let elapsed_pretty = human_readable_duration(elapsed);
2471 log::info!(
2472 target: &log_target,
2473 "processed {} slots across {} epochs in {}.",
2474 initial_slot_range.end - initial_slot_range.start,
2475 slot_to_epoch(initial_slot_range.end)
2476 + 1
2477 - slot_to_epoch(initial_slot_range.start),
2478 elapsed_pretty
2479 );
2480 log::info!(target: &log_target, "a 🚒 firehose thread finished completed its work.");
2481 let summary: String = error_counts
2484 .iter()
2485 .enumerate()
2486 .filter_map(|(i, c)| {
2487 let v = c.load(Ordering::Relaxed);
2488 if v > 0 { Some(format!("{:03}({})", i, v)) } else { None }
2489 })
2490 .collect::<Vec<_>>()
2491 .join(", ");
2492 if !summary.is_empty() {
2493 log::debug!(target: &log_target, "threads with errors: {}", summary);
2494 }
2495 return Ok(());
2496 }
2497 }
2498 }
2499 Ok(())
2500}
2501.await
2502{
2503 if is_shutdown_error(&err) {
2504 log::info!(
2505 target: &log_target,
2506 "shutdown requested; terminating firehose thread {:?}",
2507 thread_index
2508 );
2509 return Ok(());
2510 }
2511 log::error!(
2512 target: &log_target,
2513 "🧯💦🔥 firehose encountered an error at slot {} in epoch {} and will roll back one slot and retry:",
2514 slot,
2515 slot_to_epoch(slot)
2516 );
2517 log::error!(target: &log_target, "{}", err);
2518 let error_message = err.to_string();
2519 if matches!(err, FirehoseError::SlotOffsetIndexError(_))
2520 || error_message.contains("Unknown CID version")
2521 {
2522 SLOT_OFFSET_INDEX.invalidate_epoch(slot_to_epoch(slot));
2525 }
2526 let item_index = match err {
2527 FirehoseError::NodeDecodingError(item_index, _) => item_index,
2528 _ => 0,
2529 };
2530 let idx = thread_index.unwrap_or(0);
2532 error_counts[idx].fetch_add(1, Ordering::Relaxed);
2533 log::warn!(
2534 target: &log_target,
2535 "restarting from slot {} at index {}",
2536 slot,
2537 item_index,
2538 );
2539 if slot <= last_counted_slot {
2542 slot_range.start = last_counted_slot.saturating_add(1);
2543 } else {
2544 slot_range.start = slot;
2545 }
2546 skip_until_index = None;
2550}
2551 Ok(())
2552}
2553
2554#[inline]
2555fn is_simple_vote_transaction(versioned_tx: &VersionedTransaction) -> bool {
2556 if !(1..=2).contains(&versioned_tx.signatures.len()) {
2557 return false;
2558 }
2559
2560 if !matches!(
2561 versioned_tx.version(),
2562 solana_transaction::versioned::TransactionVersion::Legacy(_)
2563 ) {
2564 return false;
2565 }
2566
2567 let instructions = versioned_tx.message.instructions();
2568 if instructions.len() != 1 {
2569 return false;
2570 }
2571
2572 let program_index = instructions[0].program_id_index as usize;
2573 versioned_tx
2574 .message
2575 .static_account_keys()
2576 .get(program_index)
2577 .map(|program_id| program_id == &vote_program_id())
2578 .unwrap_or(false)
2579}
2580
2581#[inline(always)]
2582fn convert_proto_rewards(
2583 proto_rewards: &solana_storage_proto::convert::generated::Rewards,
2584) -> Result<Vec<(Address, RewardInfo)>, SharedError> {
2585 let mut keyed_rewards = Vec::with_capacity(proto_rewards.rewards.len());
2586 for proto_reward in proto_rewards.rewards.iter() {
2587 let reward = RewardInfo {
2588 reward_type: match proto_reward.reward_type - 1 {
2589 0 => RewardType::Fee,
2590 1 => RewardType::Rent,
2591 2 => RewardType::Staking,
2592 3 => RewardType::Voting,
2593 typ => {
2594 return Err(Box::new(std::io::Error::other(format!(
2595 "unsupported reward type {}",
2596 typ
2597 ))));
2598 }
2599 },
2600 lamports: proto_reward.lamports,
2601 post_balance: proto_reward.post_balance,
2602 commission: proto_reward.commission.parse::<u8>().ok(),
2603 };
2604 let pubkey = proto_reward
2605 .pubkey
2606 .parse::<Address>()
2607 .map_err(|err| Box::new(err) as SharedError)?;
2608 keyed_rewards.push((pubkey, reward));
2609 }
2610 Ok(keyed_rewards)
2611}
2612
2613#[inline]
2614pub fn generate_subranges(slot_range: &Range<u64>, threads: u64) -> Vec<Range<u64>> {
2616 let total = slot_range.end - slot_range.start;
2617 let slots_per_thread = total / threads;
2618 let remainder = total % threads;
2619
2620 let ranges: Vec<Range<u64>> = (0..threads)
2621 .map(|i| {
2622 let extra_slot = if i < remainder { 1 } else { 0 };
2624 let start = slot_range.start + i * slots_per_thread + i.min(remainder);
2625 let end = start + slots_per_thread + extra_slot;
2626 start..end
2627 })
2628 .collect();
2629
2630 let total_covered: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2632 assert_eq!(
2633 total_covered, total,
2634 "Range generation failed: {} threads should cover {} slots but only cover {}",
2635 threads, total, total_covered
2636 );
2637
2638 for i in 1..ranges.len() {
2640 assert_eq!(
2641 ranges[i - 1].end,
2642 ranges[i].start,
2643 "Gap found between thread {} (ends at {}) and thread {} (starts at {})",
2644 i - 1,
2645 ranges[i - 1].end,
2646 i,
2647 ranges[i].start
2648 );
2649 }
2650
2651 log::info!(
2652 target: LOG_MODULE,
2653 "Generated {} thread ranges covering {} slots total",
2654 threads,
2655 total_covered
2656 );
2657 ranges
2658}
2659
2660fn human_readable_duration(duration: std::time::Duration) -> String {
2661 if duration.is_zero() {
2662 return "0s".into();
2663 }
2664 let total_secs = duration.as_secs();
2665 if total_secs < 60 {
2666 let secs_f = duration.as_secs_f64();
2667 if total_secs == 0 {
2668 format!("{:.2}s", secs_f)
2669 } else if duration.subsec_millis() == 0 {
2670 format!("{}s", total_secs)
2671 } else {
2672 format!("{:.2}s", secs_f)
2673 }
2674 } else {
2675 let mut secs = total_secs;
2676 let days = secs / 86_400;
2677 secs %= 86_400;
2678 let hours = secs / 3_600;
2679 secs %= 3_600;
2680 let minutes = secs / 60;
2681 secs %= 60;
2682 if days > 0 {
2683 if hours > 0 {
2684 format!("{days}d{hours}h")
2685 } else {
2686 format!("{days}d")
2687 }
2688 } else if hours > 0 {
2689 if minutes > 0 {
2690 format!("{hours}h{minutes}m")
2691 } else {
2692 format!("{hours}h")
2693 }
2694 } else if minutes > 0 {
2695 if secs > 0 {
2696 format!("{minutes}m{secs}s")
2697 } else {
2698 format!("{minutes}m")
2699 }
2700 } else {
2701 format!("{secs}s")
2702 }
2703 }
2704}
2705
2706#[cfg(test)]
2707fn log_stats_handler(thread_id: usize, stats: Stats) -> HandlerFuture {
2708 Box::pin(async move {
2709 let elapsed = stats.start_time.elapsed();
2710 let elapsed_secs = elapsed.as_secs_f64();
2711 let tps = if elapsed_secs > 0.0 {
2712 stats.transactions_processed as f64 / elapsed_secs
2713 } else {
2714 0.0
2715 };
2716 log::info!(
2717 target: LOG_MODULE,
2718 "thread {thread_id} stats: current_slot={}, slots_processed={}, blocks_processed={}, txs={}, entries={}, rewards={}, elapsed_s={:.2}, tps={:.2}",
2719 stats.thread_stats.current_slot,
2720 stats.slots_processed,
2721 stats.blocks_processed,
2722 stats.transactions_processed,
2723 stats.entries_processed,
2724 stats.rewards_processed,
2725 elapsed_secs,
2726 tps
2727 );
2728 Ok(())
2729 })
2730}
2731
2732#[cfg(test)]
2733use futures_util::FutureExt;
2734#[cfg(test)]
2735use serial_test::serial;
2736#[cfg(test)]
2737use std::sync::{Mutex, OnceLock};
2738
2739#[cfg(test)]
2740async fn assert_slot_min_executed_transactions(slot: u64, min_executed: u64) {
2741 use std::sync::Arc;
2742 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2743
2744 let found = Arc::new(AtomicBool::new(false));
2745 let observed_total = Arc::new(AtomicU64::new(0));
2746 let observed_non_vote = Arc::new(AtomicU64::new(0));
2747
2748 let found_block = found.clone();
2749 let observed_total_block = observed_total.clone();
2750 let target_slot_block = slot;
2751 let target_slot_tx = slot;
2752 let observed_non_vote_tx = observed_non_vote.clone();
2753
2754 firehose(
2755 1,
2756 false,
2757 false,
2758 None,
2759 target_slot_block..(target_slot_block + 1),
2760 Some(move |_thread_id: usize, block: BlockData| {
2761 let found_block = found_block.clone();
2762 let observed_total_block = observed_total_block.clone();
2763 async move {
2764 if block.slot() == target_slot_block {
2765 assert!(
2766 !block.was_skipped(),
2767 "slot {target_slot_block} was marked leader skipped",
2768 );
2769 if let BlockData::Block {
2770 executed_transaction_count,
2771 ..
2772 } = block
2773 {
2774 found_block.store(true, Ordering::Relaxed);
2775 observed_total_block.store(executed_transaction_count, Ordering::Relaxed);
2776 }
2777 }
2778 Ok(())
2779 }
2780 .boxed()
2781 }),
2782 Some(move |_thread_id: usize, transaction: TransactionData| {
2783 let observed_non_vote_tx = observed_non_vote_tx.clone();
2784 async move {
2785 if transaction.slot == target_slot_tx && !transaction.is_vote {
2786 observed_non_vote_tx.fetch_add(1, Ordering::Relaxed);
2787 }
2788 Ok(())
2789 }
2790 .boxed()
2791 }),
2792 None::<OnEntryFn>,
2793 None::<OnRewardFn>,
2794 None::<OnErrorFn>,
2795 None::<OnStatsTrackingFn>,
2796 None,
2797 )
2798 .await
2799 .unwrap();
2800
2801 assert!(
2802 found.load(Ordering::Relaxed),
2803 "target slot {slot} was not processed"
2804 );
2805 let observed_total = observed_total.load(Ordering::Relaxed);
2806 let observed_non_vote = observed_non_vote.load(Ordering::Relaxed);
2807 assert!(
2808 observed_total > 0,
2809 "slot {slot} executed transaction count was zero"
2810 );
2811 assert!(
2812 observed_total >= min_executed,
2813 "slot {slot} executed transaction count {observed_total} is below expected minimum {min_executed}"
2814 );
2815 log::info!(
2816 target: LOG_MODULE,
2817 "slot {slot} executed_tx_count={}, non_vote_tx_count={}",
2818 observed_total,
2819 observed_non_vote
2820 );
2821}
2822
2823#[cfg(test)]
2824async fn log_slot_node_summary(slot: u64) -> Result<(), SharedError> {
2825 use crate::index::slot_to_offset;
2826 use crate::node::Node;
2827
2828 let epoch = slot_to_epoch(slot);
2829 let client = crate::network::create_http_client();
2830 let stream = fetch_epoch_stream(epoch, &client).await;
2831 let mut reader = NodeReader::new(stream);
2832 reader
2833 .seek_to_slot(slot)
2834 .await
2835 .map_err(|err| Box::new(err) as SharedError)?;
2836
2837 let nodes = reader.read_until_block().await?;
2838 let mut transactions = 0u64;
2839 let mut entries = 0u64;
2840 let mut entry_tx_total = 0u64;
2841 let mut dataframes = 0u64;
2842 let mut rewards = 0u64;
2843 let mut subsets = 0u64;
2844 let mut epochs = 0u64;
2845 let mut block_slot = None;
2846 let mut block_entries = None;
2847 let first_kind = nodes
2848 .0
2849 .first()
2850 .map(|node| node.get_node())
2851 .map(|node| match node {
2852 Node::Transaction(_) => "transaction",
2853 Node::Entry(_) => "entry",
2854 Node::Block(_) => "block",
2855 Node::Subset(_) => "subset",
2856 Node::Epoch(_) => "epoch",
2857 Node::Rewards(_) => "rewards",
2858 Node::DataFrame(_) => "dataframe",
2859 })
2860 .unwrap_or("none");
2861
2862 for node in &nodes.0 {
2863 match node.get_node() {
2864 Node::Transaction(_) => {
2865 transactions += 1;
2866 }
2867 Node::Entry(entry) => {
2868 entries += 1;
2869 entry_tx_total += entry.transactions.len() as u64;
2870 }
2871 Node::Block(block) => {
2872 block_slot = Some(block.slot);
2873 block_entries = Some(block.entries.len());
2874 }
2875 Node::Subset(_) => {
2876 subsets += 1;
2877 }
2878 Node::Epoch(_) => {
2879 epochs += 1;
2880 }
2881 Node::Rewards(_) => {
2882 rewards += 1;
2883 }
2884 Node::DataFrame(_) => {
2885 dataframes += 1;
2886 }
2887 }
2888 }
2889
2890 log::info!(
2891 target: LOG_MODULE,
2892 "slot {slot} node summary: total_nodes={}, first_kind={}, tx_nodes={}, entry_nodes={}, entry_tx_total={}, block_slot={:?}, block_entries={:?}, dataframes={}, rewards={}, subsets={}, epochs={}",
2893 nodes.len(),
2894 first_kind,
2895 transactions,
2896 entries,
2897 entry_tx_total,
2898 block_slot,
2899 block_entries,
2900 dataframes,
2901 rewards,
2902 subsets,
2903 epochs
2904 );
2905
2906 if slot > 0 {
2907 let mut found_previous = None;
2908 for delta in 1..=5 {
2909 let candidate = slot.saturating_sub(delta);
2910 match slot_to_offset(candidate).await {
2911 Ok(offset) => {
2912 found_previous = Some((candidate, offset));
2913 break;
2914 }
2915 Err(err) => {
2916 log::info!(
2917 target: LOG_MODULE,
2918 "slot {slot} previous lookup {candidate} failed: {err}"
2919 );
2920 }
2921 }
2922 }
2923 if let Some((candidate, offset)) = found_previous {
2924 log::info!(
2925 target: LOG_MODULE,
2926 "slot {slot} nearest previous offset within 5 slots: slot {candidate} @ {offset}"
2927 );
2928 } else {
2929 log::info!(
2930 target: LOG_MODULE,
2931 "slot {slot} no previous offsets found within 5 slots"
2932 );
2933 }
2934 }
2935
2936 Ok(())
2937}
2938
2939#[tokio::test(flavor = "multi_thread")]
2940async fn test_firehose_epoch_800() {
2941 use dashmap::DashSet;
2942 use std::sync::atomic::{AtomicU64, Ordering};
2943 solana_logger::setup_with_default("info");
2944 const THREADS: usize = 4;
2945 const NUM_SLOTS_TO_COVER: u64 = 50;
2946 static PREV_BLOCK: [AtomicU64; THREADS] = [const { AtomicU64::new(0) }; THREADS];
2947 static NUM_SKIPPED_BLOCKS: AtomicU64 = AtomicU64::new(0);
2948 static NUM_BLOCKS: AtomicU64 = AtomicU64::new(0);
2949 static SEEN_SKIPPED: OnceLock<DashSet<u64>> = OnceLock::new();
2950 static SEEN_SLOTS: OnceLock<DashSet<u64>> = OnceLock::new();
2951 static MIN_TRANSACTIONS: AtomicU64 = AtomicU64::new(u64::MAX);
2952 let stats_tracking = StatsTracking {
2953 on_stats: log_stats_handler,
2954 tracking_interval_slots: 10,
2955 };
2956
2957 for prev in PREV_BLOCK.iter() {
2958 prev.store(0, Ordering::Relaxed);
2959 }
2960 NUM_SKIPPED_BLOCKS.store(0, Ordering::Relaxed);
2961 NUM_BLOCKS.store(0, Ordering::Relaxed);
2962 MIN_TRANSACTIONS.store(u64::MAX, Ordering::Relaxed);
2963 SEEN_SLOTS.get_or_init(DashSet::new).clear();
2964 SEEN_SKIPPED.get_or_init(DashSet::new).clear();
2965
2966 firehose(
2967 THREADS.try_into().unwrap(),
2968 false,
2969 false,
2970 None,
2971 (345600000 - NUM_SLOTS_TO_COVER / 2)..(345600000 + NUM_SLOTS_TO_COVER / 2),
2972 Some(|thread_id: usize, block: BlockData| {
2973 async move {
2974 let _prev =
2975 PREV_BLOCK[thread_id % PREV_BLOCK.len()].swap(block.slot(), Ordering::Relaxed);
2976 if block.was_skipped() {
2977 log::info!(
2978 target: LOG_MODULE,
2979 "leader skipped block {} on thread {}",
2980 block.slot(),
2981 thread_id,
2982 );
2983 } else {
2984 }
2991
2992 let first_time = SEEN_SLOTS.get_or_init(DashSet::new).insert(block.slot());
2993 if block.was_skipped() {
2994 NUM_SKIPPED_BLOCKS.fetch_add(1, Ordering::Relaxed);
2995 SEEN_SKIPPED.get_or_init(DashSet::new).insert(block.slot());
2996 } else if first_time {
2997 NUM_BLOCKS.fetch_add(1, Ordering::Relaxed);
2998 if let BlockData::Block {
2999 executed_transaction_count,
3000 ..
3001 } = &block
3002 {
3003 let executed = *executed_transaction_count;
3004 let _ = MIN_TRANSACTIONS.fetch_update(
3005 Ordering::Relaxed,
3006 Ordering::Relaxed,
3007 |current| {
3008 if executed < current {
3009 Some(executed)
3010 } else {
3011 None
3012 }
3013 },
3014 );
3015 }
3016 }
3017 Ok(())
3018 }
3019 .boxed()
3020 }),
3021 None::<OnTxFn>,
3022 None::<OnEntryFn>,
3023 None::<OnRewardFn>,
3024 None::<OnErrorFn>,
3025 Some(stats_tracking),
3026 None,
3027 )
3028 .await
3029 .unwrap();
3030 let seen = SEEN_SLOTS.get_or_init(DashSet::new).len() as u64;
3031 assert_eq!(
3032 seen, NUM_SLOTS_TO_COVER,
3033 "expected to see exactly {NUM_SLOTS_TO_COVER} unique slots, saw {seen}"
3034 );
3035 let mut skipped: Vec<u64> = SEEN_SKIPPED
3036 .get_or_init(DashSet::new)
3037 .iter()
3038 .map(|v| *v)
3039 .collect();
3040 skipped.sort_unstable();
3041 const EXPECTED_SKIPPED: [u64; 6] = [
3043 345_600_004,
3044 345_600_005,
3045 345_600_008,
3046 345_600_009,
3047 345_600_010,
3048 345_600_011,
3049 ];
3050 assert_eq!(skipped, EXPECTED_SKIPPED, "unexpected skipped slots");
3051 assert!(NUM_BLOCKS.load(Ordering::Relaxed) > 0);
3052}
3053
3054#[tokio::test(flavor = "multi_thread")]
3055async fn test_firehose_target_slot_transactions() {
3056 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3057 solana_logger::setup_with_default("info");
3058 const TARGET_SLOT: u64 = 376_273_722;
3059 const SLOT_RADIUS: u64 = 50;
3060 const EXPECTED_TRANSACTIONS: u64 = 1414;
3061 const EXPECTED_NON_VOTE_TRANSACTIONS: u64 = 511;
3062 static FOUND: AtomicBool = AtomicBool::new(false);
3063 static OBSERVED_TXS: AtomicU64 = AtomicU64::new(0);
3064 static OBSERVED_NON_VOTE: AtomicU64 = AtomicU64::new(0);
3065
3066 FOUND.store(false, Ordering::Relaxed);
3067 OBSERVED_TXS.store(0, Ordering::Relaxed);
3068 OBSERVED_NON_VOTE.store(0, Ordering::Relaxed);
3069
3070 firehose(
3071 4,
3072 false,
3073 false,
3074 None,
3075 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3076 Some(|_thread_id: usize, block: BlockData| {
3077 async move {
3078 if block.slot() == TARGET_SLOT {
3079 assert!(
3080 !block.was_skipped(),
3081 "target slot {TARGET_SLOT} was marked leader skipped",
3082 );
3083 if let BlockData::Block {
3084 executed_transaction_count,
3085 ..
3086 } = block
3087 {
3088 OBSERVED_TXS.store(executed_transaction_count, Ordering::Relaxed);
3089 FOUND.store(true, Ordering::Relaxed);
3090 assert_eq!(
3091 executed_transaction_count, EXPECTED_TRANSACTIONS,
3092 "unexpected transaction count for slot {TARGET_SLOT}"
3093 );
3094 assert_eq!(
3095 OBSERVED_NON_VOTE.load(Ordering::Relaxed),
3096 EXPECTED_NON_VOTE_TRANSACTIONS,
3097 "unexpected non-vote transaction count for slot {TARGET_SLOT}"
3098 );
3099 }
3100 }
3101 Ok(())
3102 }
3103 .boxed()
3104 }),
3105 Some(|_thread_id: usize, transaction: TransactionData| {
3106 async move {
3107 if transaction.slot == TARGET_SLOT && !transaction.is_vote {
3108 OBSERVED_NON_VOTE.fetch_add(1, Ordering::Relaxed);
3109 }
3110 Ok(())
3111 }
3112 .boxed()
3113 }),
3114 None::<OnEntryFn>,
3115 None::<OnRewardFn>,
3116 None::<OnErrorFn>,
3117 None::<OnStatsTrackingFn>,
3118 None,
3119 )
3120 .await
3121 .unwrap();
3122
3123 assert!(
3124 FOUND.load(Ordering::Relaxed),
3125 "target slot was not processed"
3126 );
3127 assert_eq!(
3128 OBSERVED_TXS.load(Ordering::Relaxed),
3129 EXPECTED_TRANSACTIONS,
3130 "recorded transaction count mismatch"
3131 );
3132}
3133
3134#[cfg(test)]
3135#[serial]
3136#[tokio::test(flavor = "multi_thread")]
3137async fn test_firehose_epoch_900_boundary_window_sequential_monotonic_transactions() {
3138 use std::sync::{
3139 Arc, Mutex,
3140 atomic::{AtomicU64, Ordering},
3141 };
3142
3143 solana_logger::setup_with_default("info");
3144 const SLOT_COUNT: u64 = 100;
3145 const THREADS: u64 = 4;
3146 const TEST_BUFFER_WINDOW: &str = "4GiB";
3147
3148 let (epoch_900_start, _) = epoch_to_slot_range(900);
3149 let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3150
3151 let last_seen_tx_slot = Arc::new(Mutex::new(slot_range.start));
3152 let observed_txs = Arc::new(AtomicU64::new(0));
3153 let stats_tracking = StatsTracking {
3154 on_stats: log_stats_handler,
3155 tracking_interval_slots: 100,
3156 };
3157 let test_buffer_window_bytes = crate::system::parse_buffer_window_bytes(TEST_BUFFER_WINDOW)
3158 .expect("valid test buffer window");
3159
3160 firehose(
3161 THREADS,
3162 true,
3163 false,
3164 Some(test_buffer_window_bytes),
3165 slot_range.clone(),
3166 None::<OnBlockFn>,
3167 Some({
3168 let last_seen_tx_slot = last_seen_tx_slot.clone();
3169 let observed_txs = observed_txs.clone();
3170 move |_thread_id: usize, transaction: TransactionData| {
3171 let last_seen_tx_slot = last_seen_tx_slot.clone();
3172 let observed_txs = observed_txs.clone();
3173 async move {
3174 let mut previous = last_seen_tx_slot.lock().unwrap();
3175 assert!(
3178 transaction.slot >= *previous,
3179 "transaction slot regressed: prev={}, current={}",
3180 *previous,
3181 transaction.slot
3182 );
3183 *previous = transaction.slot;
3184 observed_txs.fetch_add(1, Ordering::Relaxed);
3185 Ok(())
3186 }
3187 .boxed()
3188 }
3189 }),
3190 None::<OnEntryFn>,
3191 None::<OnRewardFn>,
3192 None::<OnErrorFn>,
3193 Some(stats_tracking),
3194 None,
3195 )
3196 .await
3197 .unwrap();
3198
3199 assert!(
3200 observed_txs.load(Ordering::Relaxed) > 0,
3201 "expected to observe at least one transaction in slots [{}, {})",
3202 slot_range.start,
3203 slot_range.end
3204 );
3205}
3206
3207#[cfg(test)]
3208#[serial]
3209#[tokio::test(flavor = "multi_thread")]
3210async fn test_firehose_epoch_720_slot_311173980_solscan_non_vote_counts() {
3211 solana_logger::setup_with_default("info");
3212 assert_slot_min_executed_transactions(311_173_980, 1_197 + 211).await;
3213}
3214
3215#[cfg(test)]
3216#[serial]
3217#[tokio::test(flavor = "multi_thread")]
3218async fn test_firehose_epoch_720_slot_311225232_solscan_non_vote_counts() {
3219 solana_logger::setup_with_default("info");
3220 assert_slot_min_executed_transactions(311_225_232, 888 + 157).await;
3221}
3222
3223#[cfg(test)]
3224#[serial]
3225#[tokio::test(flavor = "multi_thread")]
3226async fn test_firehose_epoch_720_slot_311175860_solscan_non_vote_counts() {
3227 solana_logger::setup_with_default("info");
3228 assert_slot_min_executed_transactions(311_175_860, 527 + 110).await;
3229}
3230
3231#[cfg(test)]
3232#[serial]
3233#[tokio::test(flavor = "multi_thread")]
3234async fn test_firehose_epoch_720_slot_311134608_solscan_non_vote_counts() {
3235 solana_logger::setup_with_default("info");
3236 assert_slot_min_executed_transactions(311_134_608, 1_086 + 169).await;
3237}
3238
3239#[cfg(test)]
3240#[ignore]
3241#[serial]
3242#[tokio::test(flavor = "multi_thread")]
3243async fn debug_epoch_720_slot_311173980_node_summary() {
3244 solana_logger::setup_with_default("info");
3245 const SLOTS: &[u64] = &[
3246 311_173_980,
3247 311_225_232,
3248 311_175_860,
3249 311_134_608,
3250 376_273_722,
3251 ];
3252 for slot in SLOTS {
3253 log_slot_node_summary(*slot).await.expect("slot summary");
3254 }
3255}
3256
3257#[tokio::test(flavor = "multi_thread")]
3258async fn test_firehose_epoch_850_has_logs() {
3259 use std::sync::atomic::{AtomicU64, Ordering};
3260 solana_logger::setup_with_default("info");
3261 const START_SLOT: u64 = 367_200_075; const SLOT_COUNT: u64 = 50;
3263 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3264
3265 TOTAL_TXS.store(0, Ordering::Relaxed);
3266
3267 firehose(
3268 4,
3269 false,
3270 false,
3271 None,
3272 START_SLOT..(START_SLOT + SLOT_COUNT),
3273 None::<OnBlockFn>,
3274 Some(|_thread_id: usize, transaction: TransactionData| {
3275 async move {
3276 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3277 if let Some(logs) = transaction.transaction_status_meta.log_messages.as_ref() {
3278 let has_logs = logs.iter().any(|msg| !msg.is_empty());
3279 assert!(has_logs);
3280 }
3281 Ok(())
3282 }
3283 .boxed()
3284 }),
3285 None::<OnEntryFn>,
3286 None::<OnRewardFn>,
3287 None::<OnErrorFn>,
3288 None::<OnStatsTrackingFn>,
3289 None,
3290 )
3291 .await
3292 .unwrap();
3293
3294 assert!(
3295 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3296 "no transactions observed in epoch 850 range"
3297 );
3298}
3299
3300#[tokio::test(flavor = "multi_thread")]
3301async fn test_firehose_epoch_850_votes_present() {
3302 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3303 solana_logger::setup_with_default("info");
3304 const TARGET_SLOT: u64 = 367_200_100; const SLOT_RADIUS: u64 = 10;
3306 static SEEN_BLOCK: AtomicBool = AtomicBool::new(false);
3307 static VOTE_TXS: AtomicU64 = AtomicU64::new(0);
3308 static TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
3309
3310 SEEN_BLOCK.store(false, Ordering::Relaxed);
3311 VOTE_TXS.store(0, Ordering::Relaxed);
3312 TOTAL_TXS.store(0, Ordering::Relaxed);
3313
3314 firehose(
3315 2,
3316 false,
3317 false,
3318 None,
3319 (TARGET_SLOT - SLOT_RADIUS)..(TARGET_SLOT + SLOT_RADIUS),
3320 Some(|_thread_id: usize, block: BlockData| {
3321 async move {
3322 if block.slot() == TARGET_SLOT {
3323 assert!(
3324 !block.was_skipped(),
3325 "target slot {TARGET_SLOT} was marked leader skipped",
3326 );
3327 SEEN_BLOCK.store(true, Ordering::Relaxed);
3328 }
3329 Ok(())
3330 }
3331 .boxed()
3332 }),
3333 Some(|_thread_id: usize, transaction: TransactionData| {
3334 async move {
3335 if transaction.slot == TARGET_SLOT {
3336 TOTAL_TXS.fetch_add(1, Ordering::Relaxed);
3337 if transaction.is_vote {
3338 VOTE_TXS.fetch_add(1, Ordering::Relaxed);
3339 }
3340 }
3341 Ok(())
3342 }
3343 .boxed()
3344 }),
3345 None::<OnEntryFn>,
3346 None::<OnRewardFn>,
3347 None::<OnErrorFn>,
3348 None::<OnStatsTrackingFn>,
3349 None,
3350 )
3351 .await
3352 .unwrap();
3353
3354 assert!(
3355 SEEN_BLOCK.load(Ordering::Relaxed),
3356 "target slot was not processed"
3357 );
3358 assert!(
3359 TOTAL_TXS.load(Ordering::Relaxed) > 0,
3360 "no transactions counted in target slot"
3361 );
3362 assert_eq!(VOTE_TXS.load(Ordering::Relaxed), 991);
3363}
3364
3365#[cfg(test)]
3366#[serial]
3367#[tokio::test(flavor = "multi_thread")]
3368async fn test_firehose_restart_loses_coverage_without_reset() {
3369 use std::collections::HashMap;
3370 solana_logger::setup_with_default("info");
3371 const THREADS: usize = 1;
3372 const START_SLOT: u64 = 345_600_000;
3373 const NUM_SLOTS: u64 = 8;
3374
3375 static COVERAGE: OnceLock<Mutex<HashMap<u64, u32>>> = OnceLock::new();
3376 COVERAGE
3377 .get_or_init(|| Mutex::new(HashMap::new()))
3378 .lock()
3379 .unwrap()
3380 .clear();
3381 static FAIL_TRIGGERED: AtomicBool = AtomicBool::new(false);
3382 static SEEN_BLOCKS: AtomicU64 = AtomicU64::new(0);
3383 FAIL_TRIGGERED.store(false, Ordering::Relaxed);
3384 SEEN_BLOCKS.store(0, Ordering::Relaxed);
3385
3386 firehose(
3387 THREADS.try_into().unwrap(),
3388 false,
3389 false,
3390 None,
3391 START_SLOT..(START_SLOT + NUM_SLOTS),
3392 Some(|_thread_id: usize, block: BlockData| {
3393 async move {
3394 if !block.was_skipped()
3396 && SEEN_BLOCKS.load(Ordering::Relaxed) > 0
3397 && !FAIL_TRIGGERED.swap(true, Ordering::SeqCst)
3398 {
3399 return Err("synthetic handler failure to exercise restart".into());
3400 }
3401 let mut coverage = COVERAGE
3402 .get_or_init(|| Mutex::new(HashMap::new()))
3403 .lock()
3404 .unwrap();
3405 *coverage.entry(block.slot()).or_insert(0) += 1;
3406 if !block.was_skipped() {
3407 SEEN_BLOCKS.fetch_add(1, Ordering::Relaxed);
3408 }
3409 Ok(())
3410 }
3411 .boxed()
3412 }),
3413 None::<OnTxFn>,
3414 None::<OnEntryFn>,
3415 None::<OnRewardFn>,
3416 None::<OnErrorFn>,
3417 None::<OnStatsTrackingFn>,
3418 None,
3419 )
3420 .await
3421 .unwrap();
3422
3423 let coverage = COVERAGE.get().unwrap().lock().unwrap();
3424 for slot in START_SLOT..(START_SLOT + NUM_SLOTS) {
3425 assert!(
3426 coverage.contains_key(&slot),
3427 "missing coverage for slot {slot} after restart"
3428 );
3429 }
3430}
3431
3432#[cfg(test)]
3433#[serial]
3434#[tokio::test(flavor = "multi_thread")]
3435async fn test_firehose_gap_coverage_near_known_missing_range() {
3436 use std::collections::HashSet;
3437 solana_logger::setup_with_default("info");
3438 const GAP_START: u64 = 378864000;
3439 const START_SLOT: u64 = GAP_START - 1000;
3440 const END_SLOT: u64 = GAP_START + 1000;
3441 const THREADS: usize = 16;
3442
3443 static COVERAGE: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
3444 COVERAGE
3445 .get_or_init(|| Mutex::new(HashSet::new()))
3446 .lock()
3447 .unwrap()
3448 .clear();
3449
3450 firehose(
3451 THREADS.try_into().unwrap(),
3452 false,
3453 false,
3454 None,
3455 START_SLOT..(END_SLOT + 1),
3456 Some(|_thread_id: usize, block: BlockData| {
3457 async move {
3458 if block.was_skipped() {
3459 return Ok(());
3460 }
3461 let slot = block.slot();
3462 COVERAGE
3463 .get_or_init(|| Mutex::new(HashSet::new()))
3464 .lock()
3465 .unwrap()
3466 .insert(slot);
3467 Ok(())
3468 }
3469 .boxed()
3470 }),
3471 None::<OnTxFn>,
3472 None::<OnEntryFn>,
3473 None::<OnRewardFn>,
3474 None::<OnErrorFn>,
3475 None::<OnStatsTrackingFn>,
3476 None,
3477 )
3478 .await
3479 .unwrap();
3480
3481 let mut coverage = COVERAGE
3482 .get_or_init(|| Mutex::new(HashSet::new()))
3483 .lock()
3484 .unwrap()
3485 .clone();
3486
3487 coverage.insert(378864396);
3489 coverage.insert(378864397);
3490 coverage.insert(378864398);
3491 coverage.insert(378864399);
3492
3493 let expected: Vec<u64> = (START_SLOT..=END_SLOT).collect();
3494 let missing: Vec<u64> = expected
3495 .iter()
3496 .copied()
3497 .filter(|slot| !coverage.contains(slot))
3498 .collect();
3499 assert!(
3500 missing.is_empty(),
3501 "missing slots in {START_SLOT}..={END_SLOT}; count={}, first few={:?}",
3502 missing.len(),
3503 &missing[..missing.len().min(10)]
3504 );
3505}
3506
3507#[cfg(test)]
3508#[serial]
3509#[tokio::test(flavor = "multi_thread")]
3510async fn test_firehose_sequential_reverse_crosses_epoch_boundary() {
3511 use std::sync::{
3512 Arc, Mutex,
3513 atomic::{AtomicU64, Ordering},
3514 };
3515
3516 solana_logger::setup_with_default("info");
3517 const SLOT_COUNT: u64 = 100;
3518
3519 let (epoch_900_start, _) = epoch_to_slot_range(900);
3520 let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3521
3522 let observed_blocks: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
3523 let observed_tx_count = Arc::new(AtomicU64::new(0));
3524
3525 firehose(
3526 1,
3527 true,
3528 true,
3529 None,
3530 slot_range.clone(),
3531 Some({
3532 let observed_blocks = observed_blocks.clone();
3533 move |_thread_id: usize, block: BlockData| {
3534 let observed_blocks = observed_blocks.clone();
3535 async move {
3536 observed_blocks.lock().unwrap().push(block.slot());
3537 Ok(())
3538 }
3539 .boxed()
3540 }
3541 }),
3542 Some({
3543 let observed_tx_count = observed_tx_count.clone();
3544 move |_thread_id: usize, _tx: TransactionData| {
3545 let observed_tx_count = observed_tx_count.clone();
3546 async move {
3547 observed_tx_count.fetch_add(1, Ordering::Relaxed);
3548 Ok(())
3549 }
3550 .boxed()
3551 }
3552 }),
3553 None::<OnEntryFn>,
3554 None::<OnRewardFn>,
3555 None::<OnErrorFn>,
3556 None::<OnStatsTrackingFn>,
3557 None,
3558 )
3559 .await
3560 .unwrap();
3561
3562 let observed = observed_blocks.lock().unwrap().clone();
3563 assert!(
3564 !observed.is_empty(),
3565 "expected to observe at least one block"
3566 );
3567 assert!(
3568 observed_tx_count.load(Ordering::Relaxed) > 0,
3569 "expected to observe at least one transaction"
3570 );
3571
3572 let first_epoch = slot_to_epoch(observed[0]);
3574 assert_eq!(
3575 first_epoch, 900,
3576 "reverse mode must start with the highest epoch, got slot {} in epoch {}",
3577 observed[0], first_epoch,
3578 );
3579
3580 let mut transitions = 0u32;
3582 let mut current_epoch = first_epoch;
3583 let mut prev_slot_in_epoch: Option<u64> = None;
3584 for &slot in &observed {
3585 let epoch = slot_to_epoch(slot);
3586 if epoch != current_epoch {
3587 assert!(
3588 epoch < current_epoch,
3589 "epoch did not decrease across boundary: prev={current_epoch} now={epoch}",
3590 );
3591 transitions += 1;
3592 current_epoch = epoch;
3593 prev_slot_in_epoch = None;
3594 }
3595 if let Some(prev) = prev_slot_in_epoch {
3596 assert!(
3597 slot >= prev,
3598 "within epoch {epoch}, slot regressed: prev={prev} now={slot}",
3599 );
3600 }
3601 prev_slot_in_epoch = Some(slot);
3602 }
3603 assert_eq!(
3604 transitions, 1,
3605 "expected exactly one epoch transition for a range crossing one boundary",
3606 );
3607 assert_eq!(
3608 current_epoch, 899,
3609 "reverse mode should end at the lower epoch (899), got {current_epoch}",
3610 );
3611}
3612
3613#[cfg(test)]
3614#[serial]
3615#[tokio::test(flavor = "multi_thread")]
3616async fn test_firehose_reverse_implies_sequential() {
3617 use std::sync::{
3618 Arc, Mutex,
3619 atomic::{AtomicU64, Ordering},
3620 };
3621
3622 solana_logger::setup_with_default("info");
3623 const SLOT_COUNT: u64 = 100;
3624
3625 let (epoch_900_start, _) = epoch_to_slot_range(900);
3626 let slot_range = (epoch_900_start - SLOT_COUNT)..(epoch_900_start + SLOT_COUNT);
3627
3628 let observed_blocks: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
3629 let observed_tx_count = Arc::new(AtomicU64::new(0));
3630
3631 firehose(
3633 4,
3634 false,
3635 true,
3636 None,
3637 slot_range.clone(),
3638 Some({
3639 let observed_blocks = observed_blocks.clone();
3640 move |_thread_id: usize, block: BlockData| {
3641 let observed_blocks = observed_blocks.clone();
3642 async move {
3643 observed_blocks.lock().unwrap().push(block.slot());
3644 Ok(())
3645 }
3646 .boxed()
3647 }
3648 }),
3649 Some({
3650 let observed_tx_count = observed_tx_count.clone();
3651 move |_thread_id: usize, _tx: TransactionData| {
3652 let observed_tx_count = observed_tx_count.clone();
3653 async move {
3654 observed_tx_count.fetch_add(1, Ordering::Relaxed);
3655 Ok(())
3656 }
3657 .boxed()
3658 }
3659 }),
3660 None::<OnEntryFn>,
3661 None::<OnRewardFn>,
3662 None::<OnErrorFn>,
3663 None::<OnStatsTrackingFn>,
3664 None,
3665 )
3666 .await
3667 .unwrap();
3668
3669 let observed = observed_blocks.lock().unwrap().clone();
3670 assert!(
3671 !observed.is_empty(),
3672 "expected to observe at least one block"
3673 );
3674 assert_eq!(
3678 slot_to_epoch(observed[0]),
3679 900,
3680 "reverse should imply sequential and emit highest epoch first; first slot was {}",
3681 observed[0],
3682 );
3683}