1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
//! Core data structures and streaming utilities for Jetstreamer firehose processing.
//!
//! # Overview
//! The firehose crate streams data live over the network directly from Project Yellowstone's
//! [Old Faithful](https://old-faithful.net/) archive of CAR files, which hosts the complete
//! history of every Solana transaction. Data only flows outward from Old Faithful to your
//! local consumer; nothing is ever uploaded back to the archive. With sufficient CPU and
//! network headroom the pipeline can exceed 2.7 million transactions per second while decoding
//! the stream for analysis and backfilling workloads.
//!
//! Firehose is the foundation that powers
//! [`jetstreamer`](https://crates.io/crates/jetstreamer) and
//! [`jetstreamer-plugin`](https://crates.io/crates/jetstreamer-plugin), but it can also be
//! consumed directly to build bespoke replay pipelines. The crate exposes:
//! - Async readers for Old Faithful CAR archives via [`firehose`].
//! - Rich data models for blocks, entries, rewards, and transactions.
//! - Epoch helpers for reasoning about slot ranges and availability windows.
//!
//! # Configuration
//! Several environment variables influence how the firehose locates and caches data:
//! - `JETSTREAMER_HTTP_BASE_URL` (default `https://files.old-faithful.net`): base URL or
//! `s3://bucket/prefix` for CAR snapshots. Change this to point at a private mirror.
//! - `JETSTREAMER_COMPACT_INDEX_BASE_URL` (default `https://files.old-faithful.net`): base URL
//! for compact CAR index artifacts. Point this at your own mirror to reduce load on the
//! public Old Faithful deployment. Also supports `s3://` URIs.
//! - `JETSTREAMER_ARCHIVE_BASE`: fallback URL/URI that applies to both CARs and compact
//! indexes when the more specific knobs are unset.
//! - `JETSTREAMER_ARCHIVE_BACKEND` (default `http`): set to `s3` to force the S3 transport even
//! when the resolved URL still points at `https://`.
//! - `JETSTREAMER_S3_BUCKET`, `JETSTREAMER_S3_PREFIX`, `JETSTREAMER_S3_INDEX_PREFIX`,
//! `JETSTREAMER_S3_REGION`, `JETSTREAMER_S3_ENDPOINT`, `JETSTREAMER_S3_ACCESS_KEY`,
//! `JETSTREAMER_S3_SECRET_KEY`, `JETSTREAMER_S3_SESSION_TOKEN`: credentials and overrides
//! applied when the S3 backend is active.
//! - `JETSTREAMER_NETWORK` (default `mainnet`): suffix appended to cache namespaces and index
//! filenames so you can swap between clusters without purging local state.
//! - `JETSTREAMER_NETWORK_CAPACITY_MB` (default `1000`): assumed network throughput in megabytes
//! per second used when sizing the firehose thread pool. Increase or decrease to match your
//! host's effective bandwidth.
//!
//! S3 transports are compiled behind the `s3-backend` Cargo feature. Enable that feature when
//! building if you plan to stream from `s3://` URIs instead of HTTP mirrors.
//!
//! Sequential-mode ripget buffering is configured on the [`firehose::firehose`] call via
//! `buffer_window_bytes`. Pass `None` to use the built-in default (`min(4 GiB, 15% of available
//! RAM)`). If you're using [`jetstreamer`](https://crates.io/crates/jetstreamer)'s binary runner,
//! that layer exposes `JETSTREAMER_BUFFER_WINDOW` and forwards the parsed value here.
//!
//! # Limitations
//! Old Faithful currently publishes blocks, transactions, epochs, and reward metadata but does
//! not ship account updates. The firehose mirrors that limitation; plan on a separate data
//! source if you require account updates.
//!
//! # Epoch Feature Availability
//! Old Faithful snapshots expose different metadata as the Solana protocol evolved. Use the
//! table below to decide which replay windows fit your requirements:
//!
//! | Epoch range | Slot range | Comment |
//! |-------------|---------------|--------------------------------------------------|
//! | 0–156 | 0–? | Incompatible with modern Geyser plugins |
//! | 157+ | ? | Compatible with modern Geyser plugins |
//! | 0–449 | 0–194184610 | CU tracking not available (reported as `0`) |
//! | 450+ | 194184611+ | CU tracking fully available |
//!
//! Detailed helpers for translating between epochs and slots live in the [`epochs`] module.
//!
//! # Ordering Guarantees
//! Because [`firehose`] spawns parallel threads that process different subranges of the
//! overall slot range at the same time, while each thread sees a purely sequential view of
//! transactions, downstream services such as databases that consume this data will see writes
//! in a fairly arbitrary order, so you should design your database tables and shared data
//! structures accordingly. The best pattern is to aggregate data on some interval on a
//! thread-local, per-thread basis and periodically flush the aggregated data to the shared
//! downstream service or data structure.
//!
//! # Examples
//! Run the firehose with handlers for every data type:
//! ```no_run
//! use futures_util::FutureExt;
//! use jetstreamer_firehose::{
//! epochs,
//! firehose::{self, BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData},
//! };
//!
//! fn block_handler() -> impl firehose::Handler<BlockData> {
//! move |_thread_id, block| async move {
//! println!("block slot {}", block.slot());
//! Ok(())
//! }
//! .boxed()
//! }
//!
//! fn tx_handler() -> impl firehose::Handler<TransactionData> {
//! move |_thread_id, tx| async move {
//! println!("tx {} in slot {}", tx.signature, tx.slot);
//! Ok(())
//! }
//! .boxed()
//! }
//!
//! fn entry_handler() -> impl firehose::Handler<EntryData> {
//! move |_thread_id, entry| async move {
//! println!("entry {} covering transactions {:?}", entry.entry_index, entry.transaction_indexes);
//! Ok(())
//! }
//! .boxed()
//! }
//!
//! fn reward_handler() -> impl firehose::Handler<RewardsData> {
//! move |_thread_id, rewards| async move {
//! println!("rewards in slot {} -> {} accounts", rewards.slot, rewards.rewards.len());
//! Ok(())
//! }
//! .boxed()
//! }
//!
//! fn stats_handler() -> impl firehose::Handler<Stats> {
//! move |_thread_id, stats| async move {
//! println!("processed {} slots so far", stats.slots_processed);
//! Ok(())
//! }
//! .boxed()
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let stats = StatsTracking {
//! on_stats: stats_handler(),
//! tracking_interval_slots: 100,
//! };
//!
//! let (start, _) = epochs::epoch_to_slot_range(800);
//! let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
//! let slot_range = start..(end_inclusive + 1);
//!
//! firehose::firehose(
//! 4,
//! false,
//! None,
//! slot_range,
//! Some(block_handler()),
//! Some(tx_handler()),
//! Some(entry_handler()),
//! Some(reward_handler()),
//! None::<firehose::OnErrorFn>,
//! Some(stats),
//! None,
//! )
//! .await
//! .map_err(|(err, slot)| -> Box<dyn std::error::Error> {
//! format!("firehose failed at slot {slot}: {err}").into()
//! })?;
//! Ok(())
//! }
//! ```
/// Shared boxed error type propagated across firehose modules.
pub type SharedError = ;
/// Backend configuration for archive mirrors (HTTP and S3).
/// Types for decoding block-level records emitted by the firehose.
/// Encodes and decodes arbitrary binary [`DataFrame`](dataframe::DataFrame) nodes.
/// Parsing and serialization helpers for [`Entry`](entry::Entry) nodes.
/// Structures for the top-level [`Epoch`](epoch::Epoch) node type.
/// Epoch utilities such as [`epoch_to_slot_range`](epochs::epoch_to_slot_range).
/// Streaming interface for fetching and parsing firehose blocks.
/// Slot offset index client for locating blocks in Old Faithful CAR archives.
/// Helpers for working with network metadata and endpoints.
/// Core node tree definitions shared across firehose types.
/// Reader utilities for decoding Old Faithful CAR node streams.
/// Reward decoding primitives and helpers.
/// Utilities for working with subset nodes.
/// System heuristics for sizing the firehose runtime.
/// Transaction decoding and helpers.
/// Shared helpers used throughout the firehose crate.
/// Log target prefix used across the firehose crate.
pub const LOG_MODULE: &str = "jetstreamer::firehose";