Crate jetstreamer_plugin

Crate jetstreamer_plugin 

Source
Expand description

Trait-based framework for building structured observers on top of Jetstreamer’s firehose.

§Overview

Plugins let you react to every block, transaction, reward, entry, and stats update emitted by jetstreamer_firehose. Combined with the JetstreamerRunner, they provide a high-throughput analytics pipeline capable of exceeding 2.7 million transactions per second on the right hardware. All events originate from Old Faithful’s CAR archive and are streamed over the network into your local runner.

The framework offers:

  • A Plugin trait with async hook points for each data type.
  • PluginRunner for coordinating multiple plugins with shared ClickHouse connections (used internally by JetstreamerRunner).
  • Built-in plugins under plugins that demonstrate common batching strategies and metrics.
  • See JetstreamerRunner in the jetstreamer crate for the easiest way to run plugins.

§ClickHouse Integration

Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances honor the following environment variables:

  • JETSTREAMER_CLICKHOUSE_DSN (default http://localhost:8123): HTTP(S) DSN handed to every plugin that requests a database handle.
  • JETSTREAMER_CLICKHOUSE_MODE (default auto): toggles the bundled ClickHouse helper. Set to remote to opt out of spawning the helper while still writing to a cluster, local to always spawn, or off to disable ClickHouse entirely.

When the mode is auto, Jetstreamer inspects the DSN at runtime and only launches the embedded helper for local endpoints, enabling native clustering workflows out of the box.

§Batching ClickHouse Writes

ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a cadence that matches their workload. The default PluginRunner configuration triggers stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the database. The bundled plugins::program_tracking::ProgramTrackingPlugin mirrors this approach by accumulating ProgramEvent rows per worker thread and issuing a single batch insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive even under peak throughput.

§Ordering Guarantees

Also note that because Jetstreamer spawns parallel threads that process different subranges of the overall slot range at the same time, while each thread sees a purely sequential view of transactions, downstream services such as databases that consume this data will see writes in a fairly arbitrary order, so you should design your database tables and shared data structures accordingly.

§Examples

§Defining a Plugin

use std::sync::Arc;
use clickhouse::Client;
use futures_util::FutureExt;
use jetstreamer_firehose::firehose::TransactionData;
use jetstreamer_plugin::{Plugin, PluginFuture};

struct CountingPlugin;

impl Plugin for CountingPlugin {
    fn name(&self) -> &'static str { "counting" }

    fn on_transaction<'a>(
        &'a self,
        _thread_id: usize,
        _db: Option<Arc<Client>>,
        transaction: &'a TransactionData,
    ) -> PluginFuture<'a> {
        async move {
            println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
            Ok(())
        }
        .boxed()
    }
}

§Running Plugins with PluginRunner

use std::sync::Arc;
use jetstreamer_firehose::epochs;
use jetstreamer_plugin::{Plugin, PluginRunner};

struct LoggingPlugin;

impl Plugin for LoggingPlugin {
    fn name(&self) -> &'static str { "logging" }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut runner = PluginRunner::new("http://localhost:8123", 1);
    runner.register(Box::new(LoggingPlugin));
    let runner = Arc::new(runner);

    let (start, _) = epochs::epoch_to_slot_range(800);
    let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
    runner
        .clone()
        .run(start..(end_inclusive + 1), false)
        .await?;
    Ok(())
}

Modules§

plugins
Built-in plugin implementations that ship with Jetstreamer.

Structs§

FirehoseErrorContext
Re-exported statistics types produced by firehose. Metadata describing a firehose worker failure.
FirehoseStats
Re-exported statistics types produced by firehose. Aggregated firehose statistics covering all worker threads.
PluginRunner
Coordinates plugin execution and ClickHouse persistence.
ThreadStats
Re-exported statistics types produced by firehose. Per-thread progress information emitted by the firehose runner.

Enums§

PluginRunnerError
Errors that can arise while running plugins against the firehose.

Traits§

Plugin
Trait implemented by plugins that consume firehose events.

Type Aliases§

PluginFuture
Convenience alias for the boxed future returned by plugin hooks.