Expand description
Trait-based framework for building structured observers on top of Jetstreamer’s firehose.
§Overview
Plugins let you react to every block, transaction, reward, entry, and stats update emitted
by jetstreamer_firehose. Combined with
the
JetstreamerRunner,
they provide a high-throughput analytics pipeline capable of exceeding 2.7 million
transactions per second on the right hardware. All events originate from Old Faithful’s CAR
archive and are streamed over the network into your local runner.
The framework offers:
- A
Plugintrait with async hook points for each data type. PluginRunnerfor coordinating multiple plugins with shared ClickHouse connections (used internally byJetstreamerRunner).- Built-in plugins under
pluginsthat demonstrate common batching strategies and metrics. - See
JetstreamerRunnerin thejetstreamercrate for the easiest way to run plugins.
§ClickHouse Integration
Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances honor the following environment variables:
JETSTREAMER_CLICKHOUSE_DSN(defaulthttp://localhost:8123): HTTP(S) DSN handed to every plugin that requests a database handle.JETSTREAMER_CLICKHOUSE_MODE(defaultauto): toggles the bundled ClickHouse helper. Set toremoteto opt out of spawning the helper while still writing to a cluster,localto always spawn, oroffto disable ClickHouse entirely.
When the mode is auto, Jetstreamer inspects the DSN at runtime and only launches the
embedded helper for local endpoints, enabling native clustering workflows out of the box.
§Batching ClickHouse Writes
ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large
numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a
cadence that matches their workload. The default PluginRunner configuration triggers
stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the
database. The bundled plugins::program_tracking::ProgramTrackingPlugin mirrors this
approach by accumulating ProgramEvent rows per worker thread and issuing a single batch
insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive
even under peak throughput.
§Ordering Guarantees
Also note that because Jetstreamer spawns parallel threads that process different subranges of the overall slot range at the same time, while each thread sees a purely sequential view of transactions, downstream services such as databases that consume this data will see writes in a fairly arbitrary order, so you should design your database tables and shared data structures accordingly.
§Examples
§Defining a Plugin
use std::sync::Arc;
use clickhouse::Client;
use futures_util::FutureExt;
use jetstreamer_firehose::firehose::TransactionData;
use jetstreamer_plugin::{Plugin, PluginFuture};
struct CountingPlugin;
impl Plugin for CountingPlugin {
fn name(&self) -> &'static str { "counting" }
fn on_transaction<'a>(
&'a self,
_thread_id: usize,
_db: Option<Arc<Client>>,
transaction: &'a TransactionData,
) -> PluginFuture<'a> {
async move {
println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
Ok(())
}
.boxed()
}
}§Running Plugins with PluginRunner
use std::sync::Arc;
use jetstreamer_firehose::epochs;
use jetstreamer_plugin::{Plugin, PluginRunner};
struct LoggingPlugin;
impl Plugin for LoggingPlugin {
fn name(&self) -> &'static str { "logging" }
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut runner = PluginRunner::new("http://localhost:8123", 1);
runner.register(Box::new(LoggingPlugin));
let runner = Arc::new(runner);
let (start, _) = epochs::epoch_to_slot_range(800);
let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
runner
.clone()
.run(start..(end_inclusive + 1), false)
.await?;
Ok(())
}Modules§
- plugins
- Built-in plugin implementations that ship with Jetstreamer.
Structs§
- Firehose
Error Context - Re-exported statistics types produced by
firehose. Metadata describing a firehose worker failure. - Firehose
Stats - Re-exported statistics types produced by
firehose. Aggregated firehose statistics covering all worker threads. - Plugin
Runner - Coordinates plugin execution and ClickHouse persistence.
- Thread
Stats - Re-exported statistics types produced by
firehose. Per-thread progress information emitted by the firehose runner.
Enums§
- Plugin
Runner Error - Errors that can arise while running plugins against the firehose.
Traits§
- Plugin
- Trait implemented by plugins that consume firehose events.
Type Aliases§
- Plugin
Future - Convenience alias for the boxed future returned by plugin hooks.