jetstreamer-plugin 0.1.0

Support crate for Jetstreamer containing plugin framework abstractions and utilities
Documentation

Trait-based framework for building structured observers on top of Jetstreamer's firehose.

Overview

Plugins let you react to every block, transaction, reward, entry, and stats update emitted by jetstreamer_firehose. Combined with the JetstreamerRunner, they provide a high-throughput analytics pipeline capable of exceeding 2.7 million transactions per second on the right hardware. All events originate from Old Faithful's CAR archive and are streamed over the network into your local runner.

The framework offers:

  • A [Plugin] trait with async hook points for each data type.
  • [PluginRunner] for coordinating multiple plugins with shared ClickHouse connections (used internally by JetstreamerRunner).
  • Built-in plugins under [plugins] that demonstrate common batching strategies and metrics.
  • See JetstreamerRunner in the jetstreamer crate for the easiest way to run plugins.

ClickHouse Integration

Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances honor the following environment variables:

  • JETSTREAMER_CLICKHOUSE_DSN (default http://localhost:8123): HTTP(S) DSN handed to every plugin that requests a database handle.
  • JETSTREAMER_CLICKHOUSE_MODE (default auto): toggles the bundled ClickHouse helper. Set to remote to opt out of spawning the helper while still writing to a cluster, local to always spawn, or off to disable ClickHouse entirely.

When the mode is auto, Jetstreamer inspects the DSN at runtime and only launches the embedded helper for local endpoints, enabling native clustering workflows out of the box.

Batching ClickHouse Writes

ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a cadence that matches their workload. The default [PluginRunner] configuration triggers stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the database. The bundled [plugins::program_tracking::ProgramTrackingPlugin] mirrors this approach by accumulating ProgramEvent rows per worker thread and issuing a single batch insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive even under peak throughput.

Ordering Guarantees

Also note that because Jetstreamer spawns parallel threads that process different subranges of the overall slot range at the same time, while each thread sees a purely sequential view of transactions, downstream services such as databases that consume this data will see writes in a fairly arbitrary order, so you should design your database tables and shared data structures accordingly.

Examples

Defining a Plugin

use std::sync::Arc;
use clickhouse::Client;
use futures_util::FutureExt;
use jetstreamer_firehose::firehose::TransactionData;
use jetstreamer_plugin::{Plugin, PluginFuture};

struct CountingPlugin;

impl Plugin for CountingPlugin {
    fn name(&self) -> &'static str { "counting" }

    fn on_transaction<'a>(
        &'a self,
        _thread_id: usize,
        _db: Option<Arc<Client>>,
        transaction: &'a TransactionData,
    ) -> PluginFuture<'a> {
        async move {
            println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
            Ok(())
        }
        .boxed()
    }
}
# let _plugin = CountingPlugin;

Running Plugins with PluginRunner

use std::sync::Arc;
use jetstreamer_firehose::epochs;
use jetstreamer_plugin::{Plugin, PluginRunner};

struct LoggingPlugin;

impl Plugin for LoggingPlugin {
    fn name(&self) -> &'static str { "logging" }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut runner = PluginRunner::new("http://localhost:8123", 1);
    runner.register(Box::new(LoggingPlugin));
    let runner = Arc::new(runner);

    let (start, _) = epochs::epoch_to_slot_range(800);
    let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
    runner
        .clone()
        .run(start..(end_inclusive + 1), false)
        .await?;
    Ok(())
}