Trait-based framework for building structured observers on top of Jetstreamer's firehose.
Overview
Plugins let you react to every block, transaction, reward, entry, and stats update emitted
by jetstreamer_firehose. Combined with
the
JetstreamerRunner,
they provide a high-throughput analytics pipeline capable of exceeding 2.7 million
transactions per second on the right hardware. All events originate from Old Faithful's CAR
archive and are streamed over the network into your local runner.
The framework offers:
- A [
Plugin] trait with async hook points for each data type. - [
PluginRunner] for coordinating multiple plugins with shared ClickHouse connections (used internally byJetstreamerRunner). - Built-in plugins under [
plugins] that demonstrate common batching strategies and metrics. - See
JetstreamerRunnerin thejetstreamercrate for the easiest way to run plugins.
ClickHouse Integration
Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances honor the following environment variables:
JETSTREAMER_CLICKHOUSE_DSN(defaulthttp://localhost:8123): HTTP(S) DSN handed to every plugin that requests a database handle.JETSTREAMER_CLICKHOUSE_MODE(defaultauto): toggles the bundled ClickHouse helper. Set toremoteto opt out of spawning the helper while still writing to a cluster,localto always spawn, oroffto disable ClickHouse entirely.
When the mode is auto, Jetstreamer inspects the DSN at runtime and only launches the
embedded helper for local endpoints, enabling native clustering workflows out of the box.
Batching ClickHouse Writes
ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large
numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a
cadence that matches their workload. The default [PluginRunner] configuration triggers
stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the
database. The bundled [plugins::program_tracking::ProgramTrackingPlugin] mirrors this
approach by accumulating ProgramEvent rows per worker thread and issuing a single batch
insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive
even under peak throughput.
Ordering Guarantees
Also note that because Jetstreamer spawns parallel threads that process different subranges of the overall slot range at the same time, while each thread sees a purely sequential view of transactions, downstream services such as databases that consume this data will see writes in a fairly arbitrary order, so you should design your database tables and shared data structures accordingly.
Examples
Defining a Plugin
use std::sync::Arc;
use clickhouse::Client;
use futures_util::FutureExt;
use jetstreamer_firehose::firehose::TransactionData;
use jetstreamer_plugin::{Plugin, PluginFuture};
struct CountingPlugin;
impl Plugin for CountingPlugin {
fn name(&self) -> &'static str { "counting" }
fn on_transaction<'a>(
&'a self,
_thread_id: usize,
_db: Option<Arc<Client>>,
transaction: &'a TransactionData,
) -> PluginFuture<'a> {
async move {
println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
Ok(())
}
.boxed()
}
}
# let _plugin = CountingPlugin;
Running Plugins with PluginRunner
use std::sync::Arc;
use jetstreamer_firehose::epochs;
use jetstreamer_plugin::{Plugin, PluginRunner};
struct LoggingPlugin;
impl Plugin for LoggingPlugin {
fn name(&self) -> &'static str { "logging" }
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut runner = PluginRunner::new("http://localhost:8123", 1);
runner.register(Box::new(LoggingPlugin));
let runner = Arc::new(runner);
let (start, _) = epochs::epoch_to_slot_range(800);
let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
runner
.clone()
.run(start..(end_inclusive + 1), false)
.await?;
Ok(())
}