#![deny(missing_docs)]
pub use jetstreamer_firehose as firehose;
pub use jetstreamer_plugin as plugin;
pub use jetstreamer_utils as utils;
use core::ops::Range;
use jetstreamer_firehose::{epochs::slot_to_epoch, index::get_index_base_url};
use jetstreamer_plugin::{
Plugin, PluginRunner, PluginRunnerError,
plugins::{
instruction_tracking::InstructionTrackingPlugin, program_tracking::ProgramTrackingPlugin,
pubkey_stats::PubkeyStatsPlugin,
},
};
use std::sync::Arc;
const WORKER_THREAD_MULTIPLIER: usize = 4;
#[derive(Clone, Copy)]
struct ClickhouseSettings {
enabled: bool,
spawn_helper: bool,
}
impl ClickhouseSettings {
const fn new(enabled: bool, spawn_helper: bool) -> Self {
Self {
enabled,
spawn_helper,
}
}
}
#[derive(Clone, Copy)]
enum ClickhouseMode {
Auto,
Disabled,
RemoteOnly,
Local,
}
fn resolve_clickhouse_settings(default_spawn_helper: bool) -> ClickhouseSettings {
let default_settings = ClickhouseSettings::new(true, default_spawn_helper);
match std::env::var("JETSTREAMER_CLICKHOUSE_MODE") {
Ok(raw_mode) => match parse_clickhouse_mode(&raw_mode) {
Some(ClickhouseMode::Auto) => default_settings,
Some(ClickhouseMode::Disabled) => ClickhouseSettings::new(false, false),
Some(ClickhouseMode::RemoteOnly) => ClickhouseSettings::new(true, false),
Some(ClickhouseMode::Local) => ClickhouseSettings::new(true, true),
None => {
log::warn!(
"Unrecognized JETSTREAMER_CLICKHOUSE_MODE value '{}'; falling back to default settings",
raw_mode
);
default_settings
}
},
Err(_) => default_settings,
}
}
fn parse_clickhouse_mode(value: &str) -> Option<ClickhouseMode> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Some(ClickhouseMode::Auto);
}
let lowered = trimmed.to_ascii_lowercase();
match lowered.as_str() {
"auto" | "default" | "on" | "true" | "1" => Some(ClickhouseMode::Auto),
"off" | "disable" | "disabled" | "0" | "false" | "none" | "no" => {
Some(ClickhouseMode::Disabled)
}
"remote" | "external" | "no-spawn" | "no_spawn" | "nospawn" => {
Some(ClickhouseMode::RemoteOnly)
}
"local" | "spawn" | "helper" | "auto-spawn" | "autospawn" => Some(ClickhouseMode::Local),
_ => None,
}
}
pub struct JetstreamerRunner {
log_level: String,
plugins: Vec<Box<dyn Plugin>>,
clickhouse_dsn: String,
config: Config,
}
impl Default for JetstreamerRunner {
fn default() -> Self {
let clickhouse_dsn = std::env::var("JETSTREAMER_CLICKHOUSE_DSN")
.unwrap_or_else(|_| "http://localhost:8123".to_string());
let default_spawn = should_spawn_for_dsn(&clickhouse_dsn);
let clickhouse_settings = resolve_clickhouse_settings(default_spawn);
Self {
log_level: "info".to_string(),
plugins: Vec::new(),
clickhouse_dsn,
config: Config {
threads: jetstreamer_firehose::system::optimal_firehose_thread_count(),
sequential: false,
buffer_window_bytes: None,
slot_range: 0..0,
clickhouse_enabled: clickhouse_settings.enabled,
spawn_clickhouse: clickhouse_settings.spawn_helper && clickhouse_settings.enabled,
builtin_plugins: Vec::new(),
},
}
}
}
impl JetstreamerRunner {
pub fn new() -> Self {
Self::default()
}
pub fn with_log_level(mut self, log_level: impl Into<String>) -> Self {
self.log_level = log_level.into();
solana_logger::setup_with_default(&self.log_level);
self
}
pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
self.plugins.push(plugin);
self
}
pub fn with_threads(mut self, threads: usize) -> Self {
self.config.threads = std::cmp::max(1, threads);
self
}
pub const fn with_sequential(mut self, sequential: bool) -> Self {
self.config.sequential = sequential;
self
}
pub const fn with_buffer_window_bytes(mut self, buffer_window_bytes: Option<u64>) -> Self {
self.config.buffer_window_bytes = buffer_window_bytes;
self
}
pub const fn with_slot_range(mut self, slot_range: Range<u64>) -> Self {
self.config.slot_range = slot_range;
self
}
pub fn with_slot_range_bounds(mut self, start_slot: u64, end_slot: u64) -> Self {
assert!(
start_slot < end_slot,
"slot range must have a strictly increasing upper bound"
);
self.config.slot_range = start_slot..end_slot;
self
}
pub fn with_clickhouse_dsn(mut self, clickhouse_dsn: impl Into<String>) -> Self {
self.clickhouse_dsn = clickhouse_dsn.into();
self
}
pub fn parse_cli_args(mut self) -> Result<Self, Box<dyn std::error::Error>> {
self.config = parse_cli_args()?;
Ok(self)
}
pub fn run(self) -> Result<(), PluginRunnerError> {
solana_logger::setup_with_default(&self.log_level);
if let Ok(index_url) = get_index_base_url() {
log::info!("slot index base url: {}", index_url);
}
let threads = std::cmp::max(1, self.config.threads);
let sequential = self.config.sequential;
let buffer_window_bytes = self.config.buffer_window_bytes;
let clickhouse_enabled =
self.config.clickhouse_enabled && !self.clickhouse_dsn.trim().is_empty();
let slot_range = self.config.slot_range.clone();
let spawn_clickhouse = clickhouse_enabled
&& self.config.spawn_clickhouse
&& should_spawn_for_dsn(&self.clickhouse_dsn);
log::info!(
"processing slots [{}..{}) with {} configured threads (sequential={}, buffer_window_bytes={:?}, clickhouse_enabled={})",
slot_range.start,
slot_range.end,
threads,
sequential,
buffer_window_bytes,
clickhouse_enabled
);
let mut runner = PluginRunner::new(
&self.clickhouse_dsn,
threads,
sequential,
buffer_window_bytes,
);
for plugin in &self.config.builtin_plugins {
runner.register(plugin.instantiate());
}
for plugin in self.plugins {
runner.register(plugin);
}
let runner = Arc::new(runner);
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(std::cmp::max(
1,
threads.saturating_mul(WORKER_THREAD_MULTIPLIER),
))
.enable_all()
.thread_name("jetstreamer")
.build()
.expect("failed to build plugin runtime");
let mut clickhouse_task: Option<tokio::task::JoinHandle<Result<(), ()>>> = None;
if spawn_clickhouse {
clickhouse_task = Some(runtime.block_on(async {
let (mut ready_rx, clickhouse_future) =
jetstreamer_utils::start().await.map_err(|err| {
PluginRunnerError::PluginLifecycle {
plugin: "clickhouse",
stage: "start",
details: err.to_string(),
}
})?;
ready_rx
.recv()
.await
.ok_or_else(|| PluginRunnerError::PluginLifecycle {
plugin: "clickhouse",
stage: "ready",
details: "ClickHouse readiness signal channel closed unexpectedly".into(),
})?;
Ok::<_, PluginRunnerError>(tokio::spawn(async move {
match clickhouse_future.await {
Ok(()) => {
log::info!("ClickHouse process exited gracefully.");
Ok(())
}
Err(()) => {
log::error!("ClickHouse process exited with an error.");
Err(())
}
}
}))
})?);
} else if clickhouse_enabled {
if !self.config.spawn_clickhouse {
log::info!(
"ClickHouse auto-spawn disabled via configuration; using existing instance at {}",
self.clickhouse_dsn
);
} else {
log::info!(
"ClickHouse DSN {} not recognized as local; skipping embedded ClickHouse spawn",
self.clickhouse_dsn
);
}
}
let result = runtime.block_on(runner.run(slot_range.clone(), clickhouse_enabled));
if spawn_clickhouse {
let handle = clickhouse_task.take();
runtime.block_on(async move {
jetstreamer_utils::stop().await;
if let Some(handle) = handle
&& let Err(err) = handle.await
{
log::warn!("ClickHouse monitor task aborted: {}", err);
}
});
}
match result {
Ok(()) => Ok(()),
Err(err) => {
if let PluginRunnerError::Firehose { slot, details } = &err {
log::error!(
"firehose failed at slot {} in epoch {}: {}",
slot,
slot_to_epoch(*slot),
details
);
}
Err(err)
}
}
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Config {
pub threads: usize,
pub sequential: bool,
pub buffer_window_bytes: Option<u64>,
pub slot_range: Range<u64>,
pub clickhouse_enabled: bool,
pub spawn_clickhouse: bool,
pub builtin_plugins: Vec<BuiltinPlugin>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BuiltinPlugin {
ProgramTracking,
InstructionTracking,
PubkeyStats,
}
impl BuiltinPlugin {
fn from_flag(value: &str) -> Option<Self> {
match value {
"program-tracking" => Some(Self::ProgramTracking),
"instruction-tracking" => Some(Self::InstructionTracking),
"pubkey-stats" => Some(Self::PubkeyStats),
_ => None,
}
}
fn instantiate(self) -> Box<dyn Plugin> {
match self {
Self::ProgramTracking => Box::new(ProgramTrackingPlugin::new()),
Self::InstructionTracking => Box::new(InstructionTrackingPlugin::new()),
Self::PubkeyStats => Box::new(PubkeyStatsPlugin::new()),
}
}
}
pub fn parse_cli_args() -> Result<Config, Box<dyn std::error::Error>> {
let mut args = std::env::args();
args.next(); let mut first_arg: Option<String> = None;
let mut builtin_plugins = Vec::new();
let mut no_plugins = false;
let mut sequential_cli = false;
let mut buffer_window_cli: Option<String> = None;
while let Some(arg) = args.next() {
match arg.as_str() {
"--with-plugin" => {
let plugin_name = args
.next()
.ok_or_else(|| "--with-plugin requires a plugin name".to_string())?;
let plugin = BuiltinPlugin::from_flag(&plugin_name).ok_or_else(|| {
format!(
"unknown plugin '{plugin_name}'. expected 'program-tracking', 'instruction-tracking', or 'pubkey-stats'"
)
})?;
builtin_plugins.push(plugin);
}
"--no-plugins" => {
no_plugins = true;
}
"--sequential" => {
sequential_cli = true;
}
"--buffer-window" => {
let raw = args
.next()
.ok_or_else(|| "--buffer-window requires a value like 4GiB".to_string())?;
buffer_window_cli = Some(raw);
}
_ if first_arg.is_none() => first_arg = Some(arg),
other => return Err(format!("unrecognized argument '{other}'").into()),
}
}
let first_arg = first_arg.expect("no first argument given");
if no_plugins && !builtin_plugins.is_empty() {
return Err("--no-plugins cannot be combined with --with-plugin".into());
}
let slot_range = if first_arg.contains(':') {
let (slot_a, slot_b) = first_arg
.split_once(':')
.expect("failed to parse slot range, expected format: <start>:<end> or a single epoch");
let slot_a: u64 = slot_a.parse().expect("failed to parse first slot");
let slot_b: u64 = slot_b.parse().expect("failed to parse second slot");
slot_a..(slot_b + 1)
} else {
let epoch: u64 = first_arg.parse().expect("failed to parse epoch");
log::info!("epoch: {}", epoch);
let (start_slot, end_slot_inclusive) =
jetstreamer_firehose::epochs::epoch_to_slot_range(epoch);
start_slot..(end_slot_inclusive + 1)
};
let clickhouse_settings = resolve_clickhouse_settings(true);
let clickhouse_enabled = clickhouse_settings.enabled;
let threads = std::env::var("JETSTREAMER_THREADS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or_else(jetstreamer_firehose::system::optimal_firehose_thread_count);
let sequential = if sequential_cli {
true
} else {
parse_env_bool("JETSTREAMER_SEQUENTIAL", false)
};
let buffer_window_raw = if let Some(cli) = buffer_window_cli {
Some(cli)
} else {
std::env::var("JETSTREAMER_BUFFER_WINDOW").ok()
};
let buffer_window_bytes = parse_optional_buffer_window_bytes(buffer_window_raw.as_deref())?;
let spawn_clickhouse = clickhouse_settings.spawn_helper && clickhouse_enabled;
let builtin_plugins = if no_plugins {
Vec::new()
} else if builtin_plugins.is_empty() {
vec![BuiltinPlugin::ProgramTracking]
} else {
builtin_plugins
};
Ok(Config {
threads,
sequential,
buffer_window_bytes,
slot_range,
clickhouse_enabled,
spawn_clickhouse,
builtin_plugins,
})
}
fn parse_optional_buffer_window_bytes(
raw: Option<&str>,
) -> Result<Option<u64>, Box<dyn std::error::Error>> {
let Some(raw) = raw else {
return Ok(None);
};
let parsed = jetstreamer_firehose::system::parse_buffer_window_bytes(raw).ok_or_else(|| {
format!(
"invalid buffer window '{}'; expected integer bytes or suffix like 4GiB/512MiB",
raw
)
})?;
Ok(Some(parsed))
}
fn parse_env_bool(key: &str, default: bool) -> bool {
match std::env::var(key) {
Ok(raw) => match raw.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "yes" | "on" => true,
"0" | "false" | "no" | "off" => false,
other => {
log::warn!(
"unrecognized boolean value for {}='{}'; using default {}",
key,
other,
default
);
default
}
},
Err(_) => default,
}
}
fn should_spawn_for_dsn(dsn: &str) -> bool {
let lower = dsn.to_ascii_lowercase();
lower.contains("localhost") || lower.contains("127.0.0.1")
}