jetstreamer 0.5.1

High-throughput Solana transaction ledger streaming and plugin framework suitable for research and backfilling
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
#![deny(missing_docs)]
//! Jetstreamer is a high-throughput Solana backfilling and research toolkit designed to stream
//! historical chain data live over the network from Project Yellowstone's [Old
//! Faithful](https://old-faithful.net/) archive, which is a comprehensive open source archive
//! of all Solana blocks and transactions from genesis to the current tip of the chain. Given
//! the right hardware and network connection, Jetstreamer can stream data at over 2.7M TPS to
//! a local Jetstreamer plugin or geyser plugin. Higher speeds are possible with better
//! hardware (in our case 64 core CPU, 30 Gbps+ network for the 2.7M TPS record).
//!
//! ## Components
//! - [`firehose`] exposes the underlying streaming primitives and async helpers for
//!   downloading, compacting, and replaying Old Faithful CAR archives at scale.
//! - [`plugin`] provides a trait-driven framework for building structured firehose data
//!   observers with ClickHouse-friendly batching and runtime metrics.
//! - [`utils`] hosts shared helpers used across the Jetstreamer ecosystem.
//!
//! All of these crates are re-exported from this facade, keeping most applications reliant on
//! a single dependency.
//!
//! # Quick Start
//! Install the CLI by cloning the repository and running the bundled demo runner:
//!
//! ```bash
//! # Replay all transactions in epoch 800 using eight HTTP multiplexing workers.
//! JETSTREAMER_THREADS=8 cargo run --release -- 800
//!
//! # Or replay an explicit slot range (slot ranges may cross epoch boundaries).
//! JETSTREAMER_THREADS=8 cargo run --release -- 358560000:367631999
//! ```
//!
//! The CLI accepts either `<start>:<end>` slot ranges or a single epoch. See
//! [`JetstreamerRunner::parse_cli_args`] for the precise argument grammar.
//!
//! When `JETSTREAMER_CLICKHOUSE_MODE` is `auto` (the default) the runner inspects the DSN to
//! decide whether to launch the bundled ClickHouse helper or connect to an external cluster.
//! You can also manage that helper manually via the crate-level Cargo aliases:
//!
//! ```bash
//! cargo clickhouse-server
//! cargo clickhouse-client
//! ```
//!
//! `cargo clickhouse-server` launches the bundled binary in `bin/`, while `cargo
//! clickhouse-client` opens a client session against the locally spawned helper. You can
//! connect with the client while Jetstreamer is running, or re-launch the helper later to
//! inspect the data persisted in `bin/`. Copying the `bin/` directory between systems is a
//! lightweight way to migrate ClickHouse state generated by the runner.
//!
//! # Environment Variables
//! `JetstreamerRunner` honors several environment variables for runtime tuning:
//! - `JETSTREAMER_THREADS` (default hardware auto-detect via
//!   [`jetstreamer_firehose::system::optimal_firehose_thread_count`]): number of firehose
//!   ingestion threads. Increase this to multiplex Old Faithful HTTP requests across more
//!   cores, or leave it unset to size the pool automatically using CPU and network heuristics.
//! - `JETSTREAMER_SEQUENTIAL` (default `false`): when truthy, firehose uses a single worker
//!   thread and uses ripget's parallel windowed downloader for sequential reads.
//! - `JETSTREAMER_BUFFER_WINDOW` (default lower of 4 GiB and 15% of available RAM): ripget
//!   hot/cold window size used in sequential mode. Accepts raw bytes or suffixes like `512MiB`.
//! - `JETSTREAMER_CLICKHOUSE_DSN` (default `http://localhost:8123`): DSN passed to plugin
//!   instances that emit ClickHouse writes.
//! - `JETSTREAMER_CLICKHOUSE_MODE` (default `auto`): controls ClickHouse integration. Accepted
//!   values are `auto`, `remote`, `local`, and `off`.
//!
//! Additional firehose-specific knobs such as `JETSTREAMER_COMPACT_INDEX_BASE_URL` and
//! `JETSTREAMER_NETWORK` live in [`jetstreamer_firehose`](crate::firehose).
//!
//! ## Limitations
//!
//! While Jetstreamer is able to play back all blocks, transactions, epochs, and rewards in the
//! history of Solana mainnet, it is limited by what is in Old Faithful. Old Faithful does not
//! contain account updates, so Jetstreamer at the moment also does not have account updates.
//! Transaction logs **are** available in `transaction_status_meta.log_messages` for all epochs.
//!
//! It is worth noting that the way Old Faithful and thus Jetstreamer stores transactions, they
//! are stored in their "already-executed" state as they originally appeared to Geyser when
//! they were first executed. Thus while Jetstreamer can replay ledger data, it is not
//! executing transactions directly, and when we say 2.7M TPS, we mean "2.7M transactions
//! processed by a Jetstreamer or Geyser plugin locally, streamed over the internet from the
//! Old Faithful archive."
//!
//! # Configuration
//!
//! The following configuration ENV vars are available across the Jetstreamer ecosystem:
//!
//! ## JetstreamerRunner Config
//!
//! | Variable | Default | Effect |
//! |----------|---------|--------|
//! | `JETSTREAMER_CLICKHOUSE_DSN` | `http://localhost:8123` | HTTP(S) DSN passed to the embedded plugin runner for ClickHouse writes. Override to target a remote ClickHouse deployment. |
//! | `JETSTREAMER_CLICKHOUSE_MODE` | `auto` | Controls ClickHouse integration. `auto` enables output and spawns the helper only for local DSNs, `remote` enables output without spawning the helper, `local` always requests the helper, and `off` disables ClickHouse entirely. |
//! | `JETSTREAMER_THREADS` | `auto` | Number of firehose ingestion threads. Leave unset to rely on hardware-based sizing or override with an explicit value when you know the ideal concurrency. |
//! | `JETSTREAMER_SEQUENTIAL` | `false` | Enables single-thread firehose processing with ripget-backed sequential streaming. |
//! | `JETSTREAMER_BUFFER_WINDOW` | `min(4 GiB, 15% available RAM)` | Total ripget hot/cold window size used only when `JETSTREAMER_SEQUENTIAL` is enabled. |
//!
//! Helper spawning only occurs when both the mode allows it (`auto`/`local`) **and** the DSN
//! points to `localhost` or `127.0.0.1`.
//!
//! ## Firehose Config (also used by JetstreamerRunner)
//!
//! | Variable | Default | Effect |
//! |----------|---------|--------|
//! | `JETSTREAMER_COMPACT_INDEX_BASE_URL` | `https://files.old-faithful.net` | Base URL for compact CAR index artifacts. Point this at your own mirror to reduce load on the public archive. |
//! | `JETSTREAMER_NETWORK` | `mainnet` | Network suffix appended to cache namespaces and index filenames (e.g., `testnet`). |
//! | `JETSTREAMER_NETWORK_CAPACITY_MB` | `1000` | Assumed network throughput in megabytes per second used when auto-sizing firehose thread counts. |
//!
//! Changing the network automatically segregates cache entries, allowing you to toggle between
//! clusters without purging state.
//!
//! # Epoch Feature Availability
//! Old Faithful snapshots expose different metadata across the network's history. Use the
//! table below to choose replay windows that match your requirements:
//!
//! | Epoch range | Slot range    | Comment |
//! |-------------|---------------|--------------------------------------------------|
//! | 0–156       | 0–?           | Incompatible with modern Geyser plugins          |
//! | 157+        | ?             | Compatible with modern Geyser plugins            |
//! | 0–449       | 0–194184610   | CU tracking not available (reported as `0`)      |
//! | 450+        | 194184611+    | CU tracking fully available                      |
//!
//! Epochs at or above `157` work with the bundled Geyser plugin interface, while compute unit
//! accounting first appears at epoch `450`.

pub use jetstreamer_firehose as firehose;
pub use jetstreamer_plugin as plugin;
pub use jetstreamer_utils as utils;

use core::ops::Range;
use jetstreamer_firehose::{epochs::slot_to_epoch, index::get_index_base_url};
use jetstreamer_plugin::{
    Plugin, PluginRunner, PluginRunnerError,
    plugins::{
        instruction_tracking::InstructionTrackingPlugin, program_tracking::ProgramTrackingPlugin,
        pubkey_stats::PubkeyStatsPlugin,
    },
};
use std::sync::Arc;

const WORKER_THREAD_MULTIPLIER: usize = 4; // each plugin thread gets 4 worker threads

#[derive(Clone, Copy)]
struct ClickhouseSettings {
    enabled: bool,
    spawn_helper: bool,
}

impl ClickhouseSettings {
    const fn new(enabled: bool, spawn_helper: bool) -> Self {
        Self {
            enabled,
            spawn_helper,
        }
    }
}

#[derive(Clone, Copy)]
enum ClickhouseMode {
    Auto,
    Disabled,
    RemoteOnly,
    Local,
}

fn resolve_clickhouse_settings(default_spawn_helper: bool) -> ClickhouseSettings {
    let default_settings = ClickhouseSettings::new(true, default_spawn_helper);

    match std::env::var("JETSTREAMER_CLICKHOUSE_MODE") {
        Ok(raw_mode) => match parse_clickhouse_mode(&raw_mode) {
            Some(ClickhouseMode::Auto) => default_settings,
            Some(ClickhouseMode::Disabled) => ClickhouseSettings::new(false, false),
            Some(ClickhouseMode::RemoteOnly) => ClickhouseSettings::new(true, false),
            Some(ClickhouseMode::Local) => ClickhouseSettings::new(true, true),
            None => {
                log::warn!(
                    "Unrecognized JETSTREAMER_CLICKHOUSE_MODE value '{}'; falling back to default settings",
                    raw_mode
                );
                default_settings
            }
        },
        Err(_) => default_settings,
    }
}

fn parse_clickhouse_mode(value: &str) -> Option<ClickhouseMode> {
    let trimmed = value.trim();
    if trimmed.is_empty() {
        return Some(ClickhouseMode::Auto);
    }

    let lowered = trimmed.to_ascii_lowercase();
    match lowered.as_str() {
        "auto" | "default" | "on" | "true" | "1" => Some(ClickhouseMode::Auto),
        "off" | "disable" | "disabled" | "0" | "false" | "none" | "no" => {
            Some(ClickhouseMode::Disabled)
        }
        "remote" | "external" | "no-spawn" | "no_spawn" | "nospawn" => {
            Some(ClickhouseMode::RemoteOnly)
        }
        "local" | "spawn" | "helper" | "auto-spawn" | "autospawn" => Some(ClickhouseMode::Local),
        _ => None,
    }
}

/// Coordinates plugin execution against the firehose.
///
/// Configure the runner with the builder-style methods and finish by calling
/// [`JetstreamerRunner::run`]. The runner also honors the process-level environment variables
/// documented at the module level
///
/// ### Environment variables
///
/// [`JetstreamerRunner`] inspects a handful of environment variables at startup to fine-tune
/// runtime behavior:
///
/// - `JETSTREAMER_THREADS`: Number of firehose ingestion threads. When unset the value is
///   derived from [`jetstreamer_firehose::system::optimal_firehose_thread_count`].
/// - `JETSTREAMER_SEQUENTIAL`: When true, runs a single firehose worker and uses ripget's
///   parallel windowed downloader for sequential reads.
/// - `JETSTREAMER_CLICKHOUSE_DSN`: DSN for ClickHouse ingestion; defaults to
///   `http://localhost:8123`.
/// - `JETSTREAMER_CLICKHOUSE_MODE`: Controls ClickHouse integration. Accepted values are
///   `auto` (default: enable output and spawn the helper only for local DSNs), `remote`
///   (enable output but never spawn the helper), `local` (always request the helper), and
///   `off` (disable ClickHouse entirely).
///
/// ### Example
///
/// ```no_run
/// use std::sync::Arc;
///
/// use clickhouse::Client;
/// use jetstreamer::{
///     JetstreamerRunner,
///     firehose::{
///         epochs,
///         firehose::{BlockData, TransactionData},
///     },
///     plugin::{Plugin, PluginFuture},
/// };
///
/// struct Dummy;
///
/// impl Plugin for Dummy {
///     fn name(&self) -> &'static str {
///         "dummy"
///     }
///
///     fn on_transaction<'a>(
///         &'a self,
///         _thread_id: usize,
///         _db: Option<Arc<Client>>,
///         tx: &'a TransactionData,
///     ) -> PluginFuture<'a> {
///         Box::pin(async move {
///             println!("tx {} landed in slot {}", tx.signature, tx.slot);
///             Ok(())
///         })
///     }
///
///     fn on_block<'a>(
///         &'a self,
///         _thread_id: usize,
///         _db: Option<Arc<Client>>,
///         block: &'a BlockData,
///     ) -> PluginFuture<'a> {
///         Box::pin(async move {
///             if block.was_skipped() {
///                 println!("slot {} was skipped", block.slot());
///             } else {
///                 println!("processed block at slot {}", block.slot());
///             }
///             Ok(())
///         })
///     }
/// }
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let (start_slot, end_inclusive) = epochs::epoch_to_slot_range(800);
///
/// JetstreamerRunner::new()
///     .with_plugin(Box::new(Dummy))
///     .with_threads(4)
///     .with_slot_range_bounds(start_slot, end_inclusive + 1)
///     .with_clickhouse_dsn("https://clickhouse.example.com")
///     .run()
///     .expect("runner execution");
/// # Ok(())
/// # }
/// ```
///
/// ## Multiplexing and Throughput
///
/// When `JETSTREAMER_THREADS` is unset and you do not call
/// [`JetstreamerRunner::with_threads`], the runner defers to
/// [`jetstreamer_firehose::system::optimal_firehose_thread_count`] to size the ingestion pool
/// automatically. Set the environment variable (or call [`JetstreamerRunner::with_threads`])
/// to override the heuristic with an explicit value. Multiplexing works by allowing multiple
/// threads to connect to different subsections of the underlying slot range being streamed
/// from Old Faithful, processing each slice in parallel. This yields embarrassingly parallel
/// speedups up to the limits of your CPU and network. A good rule of thumb is to expect about
/// 250 Mbps of bandwidth and significant one-core compute per thread. On a 16 core system with
/// a 1 Gbps network connection, the heuristic typically lands between 4-5 threads; overriding
/// `JETSTREAMER_THREADS` to a nearby value is a fine-tuning knob if you know your workload
/// well. If the automatic sizing feels off, adjust `JETSTREAMER_NETWORK_CAPACITY_MB` so the
/// heuristic reflects your actual network budget before reaching for manual thread counts.
///
/// To achieve 2M TPS+, you will need a 20+ Gbps network connection and at least a 64 core CPU.
/// On our benchmark hardware we currently have a 100 Gbps connection and 64 cores, which has
/// led to a record of 2.7M TPS of the course of a 12 hour run using 255 threads.
pub struct JetstreamerRunner {
    log_level: String,
    plugins: Vec<Box<dyn Plugin>>,
    clickhouse_dsn: String,
    config: Config,
}

impl Default for JetstreamerRunner {
    fn default() -> Self {
        let clickhouse_dsn = std::env::var("JETSTREAMER_CLICKHOUSE_DSN")
            .unwrap_or_else(|_| "http://localhost:8123".to_string());
        let default_spawn = should_spawn_for_dsn(&clickhouse_dsn);
        let clickhouse_settings = resolve_clickhouse_settings(default_spawn);
        Self {
            log_level: "info".to_string(),
            plugins: Vec::new(),
            clickhouse_dsn,
            config: Config {
                threads: jetstreamer_firehose::system::optimal_firehose_thread_count(),
                sequential: false,
                buffer_window_bytes: None,
                slot_range: 0..0,
                clickhouse_enabled: clickhouse_settings.enabled,
                spawn_clickhouse: clickhouse_settings.spawn_helper && clickhouse_settings.enabled,
                builtin_plugins: Vec::new(),
            },
        }
    }
}

impl JetstreamerRunner {
    /// Creates a [`JetstreamerRunner`] with default configuration.
    pub fn new() -> Self {
        Self::default()
    }

    /// Overrides the log level used when initializing `solana_logger`.
    pub fn with_log_level(mut self, log_level: impl Into<String>) -> Self {
        self.log_level = log_level.into();
        solana_logger::setup_with_default(&self.log_level);
        self
    }

    /// Registers an additional [`Plugin`] to receive firehose events.
    pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
        self.plugins.push(plugin);
        self
    }

    /// Sets the number of firehose ingestion threads.
    ///
    /// When sequential mode is enabled, this value is used as ripget parallel range
    /// concurrency while firehose itself runs one worker.
    pub fn with_threads(mut self, threads: usize) -> Self {
        self.config.threads = std::cmp::max(1, threads);
        self
    }

    /// Toggles sequential firehose mode.
    ///
    /// When enabled, firehose uses one worker thread while retaining `JETSTREAMER_THREADS` as
    /// the ripget parallel range count for sequential windowed downloads.
    pub const fn with_sequential(mut self, sequential: bool) -> Self {
        self.config.sequential = sequential;
        self
    }

    /// Sets the ripget sequential download window size in bytes.
    ///
    /// This value is only used when sequential mode is enabled.
    pub const fn with_buffer_window_bytes(mut self, buffer_window_bytes: Option<u64>) -> Self {
        self.config.buffer_window_bytes = buffer_window_bytes;
        self
    }

    /// Restricts [`JetstreamerRunner::run`] to a specific slot range.
    pub const fn with_slot_range(mut self, slot_range: Range<u64>) -> Self {
        self.config.slot_range = slot_range;
        self
    }

    /// Configures the slot range using an explicit start (inclusive) and end (exclusive).
    pub fn with_slot_range_bounds(mut self, start_slot: u64, end_slot: u64) -> Self {
        assert!(
            start_slot < end_slot,
            "slot range must have a strictly increasing upper bound"
        );
        self.config.slot_range = start_slot..end_slot;
        self
    }

    /// Sets the ClickHouse DSN passed to [`PluginRunner::new`].
    pub fn with_clickhouse_dsn(mut self, clickhouse_dsn: impl Into<String>) -> Self {
        self.clickhouse_dsn = clickhouse_dsn.into();
        self
    }

    /// Replaces the current [`Config`] with values parsed from CLI arguments and the
    /// environment.
    pub fn parse_cli_args(mut self) -> Result<Self, Box<dyn std::error::Error>> {
        self.config = parse_cli_args()?;
        Ok(self)
    }

    /// Builds the plugin runtime and streams blocks through every registered [`Plugin`].
    pub fn run(self) -> Result<(), PluginRunnerError> {
        solana_logger::setup_with_default(&self.log_level);

        if let Ok(index_url) = get_index_base_url() {
            log::info!("slot index base url: {}", index_url);
        }

        let threads = std::cmp::max(1, self.config.threads);
        let sequential = self.config.sequential;
        let buffer_window_bytes = self.config.buffer_window_bytes;
        let clickhouse_enabled =
            self.config.clickhouse_enabled && !self.clickhouse_dsn.trim().is_empty();
        let slot_range = self.config.slot_range.clone();
        let spawn_clickhouse = clickhouse_enabled
            && self.config.spawn_clickhouse
            && should_spawn_for_dsn(&self.clickhouse_dsn);

        log::info!(
            "processing slots [{}..{}) with {} configured threads (sequential={}, buffer_window_bytes={:?}, clickhouse_enabled={})",
            slot_range.start,
            slot_range.end,
            threads,
            sequential,
            buffer_window_bytes,
            clickhouse_enabled
        );

        let mut runner = PluginRunner::new(
            &self.clickhouse_dsn,
            threads,
            sequential,
            buffer_window_bytes,
        );
        for plugin in &self.config.builtin_plugins {
            runner.register(plugin.instantiate());
        }

        for plugin in self.plugins {
            runner.register(plugin);
        }

        let runner = Arc::new(runner);
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(std::cmp::max(
                1,
                threads.saturating_mul(WORKER_THREAD_MULTIPLIER),
            ))
            .enable_all()
            .thread_name("jetstreamer")
            .build()
            .expect("failed to build plugin runtime");

        let mut clickhouse_task: Option<tokio::task::JoinHandle<Result<(), ()>>> = None;

        if spawn_clickhouse {
            clickhouse_task = Some(runtime.block_on(async {
                let (mut ready_rx, clickhouse_future) =
                    jetstreamer_utils::start().await.map_err(|err| {
                        PluginRunnerError::PluginLifecycle {
                            plugin: "clickhouse",
                            stage: "start",
                            details: err.to_string(),
                        }
                    })?;

                ready_rx
                    .recv()
                    .await
                    .ok_or_else(|| PluginRunnerError::PluginLifecycle {
                        plugin: "clickhouse",
                        stage: "ready",
                        details: "ClickHouse readiness signal channel closed unexpectedly".into(),
                    })?;

                Ok::<_, PluginRunnerError>(tokio::spawn(async move {
                    match clickhouse_future.await {
                        Ok(()) => {
                            log::info!("ClickHouse process exited gracefully.");
                            Ok(())
                        }
                        Err(()) => {
                            log::error!("ClickHouse process exited with an error.");
                            Err(())
                        }
                    }
                }))
            })?);
        } else if clickhouse_enabled {
            if !self.config.spawn_clickhouse {
                log::info!(
                    "ClickHouse auto-spawn disabled via configuration; using existing instance at {}",
                    self.clickhouse_dsn
                );
            } else {
                log::info!(
                    "ClickHouse DSN {} not recognized as local; skipping embedded ClickHouse spawn",
                    self.clickhouse_dsn
                );
            }
        }

        let result = runtime.block_on(runner.run(slot_range.clone(), clickhouse_enabled));

        if spawn_clickhouse {
            let handle = clickhouse_task.take();
            runtime.block_on(async move {
                jetstreamer_utils::stop().await;
                if let Some(handle) = handle
                    && let Err(err) = handle.await
                {
                    log::warn!("ClickHouse monitor task aborted: {}", err);
                }
            });
        }

        match result {
            Ok(()) => Ok(()),
            Err(err) => {
                if let PluginRunnerError::Firehose { slot, details } = &err {
                    log::error!(
                        "firehose failed at slot {} in epoch {}: {}",
                        slot,
                        slot_to_epoch(*slot),
                        details
                    );
                }
                Err(err)
            }
        }
    }
}

/// Runtime configuration for [`JetstreamerRunner`].
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Config {
    /// Number of simultaneous firehose streams to spawn.
    pub threads: usize,
    /// Whether to process with a single firehose worker and ripget-backed sequential streaming.
    pub sequential: bool,
    /// Optional override for ripget sequential window size in bytes.
    pub buffer_window_bytes: Option<u64>,
    /// The range of slots to process, inclusive of the start and exclusive of the end slot.
    pub slot_range: Range<u64>,
    /// Whether to connect to ClickHouse for plugin output.
    pub clickhouse_enabled: bool,
    /// Whether to spawn a local ClickHouse instance automatically.
    pub spawn_clickhouse: bool,
    /// Built-in plugins requested via CLI flags.
    pub builtin_plugins: Vec<BuiltinPlugin>,
}

/// Built-in plugins that can be toggled via CLI flags.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BuiltinPlugin {
    /// Program Tracking.
    ProgramTracking,
    /// Instruction Tracking.
    InstructionTracking,
    /// Pubkey Stats.
    PubkeyStats,
}

impl BuiltinPlugin {
    fn from_flag(value: &str) -> Option<Self> {
        match value {
            "program-tracking" => Some(Self::ProgramTracking),
            "instruction-tracking" => Some(Self::InstructionTracking),
            "pubkey-stats" => Some(Self::PubkeyStats),
            _ => None,
        }
    }

    fn instantiate(self) -> Box<dyn Plugin> {
        match self {
            Self::ProgramTracking => Box::new(ProgramTrackingPlugin::new()),
            Self::InstructionTracking => Box::new(InstructionTrackingPlugin::new()),
            Self::PubkeyStats => Box::new(PubkeyStatsPlugin::new()),
        }
    }
}

/// Parses command-line arguments and environment variables into a [`Config`].
///
/// The following environment variables are inspected:
/// - `JETSTREAMER_CLICKHOUSE_MODE`: Controls ClickHouse integration. Accepts `auto`, `remote`,
///   `local`, or `off`.
/// - `JETSTREAMER_THREADS`: Number of firehose ingestion threads.
/// - `JETSTREAMER_SEQUENTIAL`: Enables single-thread sequential firehose mode when truthy.
/// - `JETSTREAMER_BUFFER_WINDOW`: Optional ripget sequential window size (for example `4GiB`).
///
/// CLI flags:
/// - `--with-plugin <name>`: Adds one of the built-in plugins (`program-tracking`,
///   `instruction-tracking`, or `pubkey-stats`). When omitted, the CLI defaults to `program-tracking`.
/// - `--no-plugins`: Disables all built-in plugins (overrides the default and any `--with-plugin`).
/// - `--sequential`: Enables single-thread sequential firehose mode.
/// - `--buffer-window <size>`: Overrides ripget sequential window size (for example `4GiB`).
///
/// # Examples
///
/// ```no_run
/// # use jetstreamer::parse_cli_args;
/// # unsafe {
/// #     std::env::set_var("JETSTREAMER_THREADS", "3");
/// #     std::env::set_var("JETSTREAMER_CLICKHOUSE_MODE", "off");
/// # }
/// let config = parse_cli_args().expect("env and CLI parsed");
/// assert_eq!(config.threads, 3);
/// assert!(!config.clickhouse_enabled);
/// ```
pub fn parse_cli_args() -> Result<Config, Box<dyn std::error::Error>> {
    let mut args = std::env::args();
    args.next(); // binary name
    let mut first_arg: Option<String> = None;
    let mut builtin_plugins = Vec::new();
    let mut no_plugins = false;
    let mut sequential_cli = false;
    let mut buffer_window_cli: Option<String> = None;
    while let Some(arg) = args.next() {
        match arg.as_str() {
            "--with-plugin" => {
                let plugin_name = args
                    .next()
                    .ok_or_else(|| "--with-plugin requires a plugin name".to_string())?;
                let plugin = BuiltinPlugin::from_flag(&plugin_name).ok_or_else(|| {
                    format!(
                        "unknown plugin '{plugin_name}'. expected 'program-tracking', 'instruction-tracking', or 'pubkey-stats'"
                    )
                })?;
                builtin_plugins.push(plugin);
            }
            "--no-plugins" => {
                no_plugins = true;
            }
            "--sequential" => {
                sequential_cli = true;
            }
            "--buffer-window" => {
                let raw = args
                    .next()
                    .ok_or_else(|| "--buffer-window requires a value like 4GiB".to_string())?;
                buffer_window_cli = Some(raw);
            }
            _ if first_arg.is_none() => first_arg = Some(arg),
            other => return Err(format!("unrecognized argument '{other}'").into()),
        }
    }
    let first_arg = first_arg.expect("no first argument given");
    if no_plugins && !builtin_plugins.is_empty() {
        return Err("--no-plugins cannot be combined with --with-plugin".into());
    }
    let slot_range = if first_arg.contains(':') {
        let (slot_a, slot_b) = first_arg
            .split_once(':')
            .expect("failed to parse slot range, expected format: <start>:<end> or a single epoch");
        let slot_a: u64 = slot_a.parse().expect("failed to parse first slot");
        let slot_b: u64 = slot_b.parse().expect("failed to parse second slot");
        slot_a..(slot_b + 1)
    } else {
        let epoch: u64 = first_arg.parse().expect("failed to parse epoch");
        log::info!("epoch: {}", epoch);
        let (start_slot, end_slot_inclusive) =
            jetstreamer_firehose::epochs::epoch_to_slot_range(epoch);
        start_slot..(end_slot_inclusive + 1)
    };

    let clickhouse_settings = resolve_clickhouse_settings(true);
    let clickhouse_enabled = clickhouse_settings.enabled;

    let threads = std::env::var("JETSTREAMER_THREADS")
        .ok()
        .and_then(|s| s.parse::<usize>().ok())
        .unwrap_or_else(jetstreamer_firehose::system::optimal_firehose_thread_count);
    let sequential = if sequential_cli {
        true
    } else {
        parse_env_bool("JETSTREAMER_SEQUENTIAL", false)
    };
    let buffer_window_raw = if let Some(cli) = buffer_window_cli {
        Some(cli)
    } else {
        std::env::var("JETSTREAMER_BUFFER_WINDOW").ok()
    };
    let buffer_window_bytes = parse_optional_buffer_window_bytes(buffer_window_raw.as_deref())?;

    let spawn_clickhouse = clickhouse_settings.spawn_helper && clickhouse_enabled;

    let builtin_plugins = if no_plugins {
        Vec::new()
    } else if builtin_plugins.is_empty() {
        vec![BuiltinPlugin::ProgramTracking]
    } else {
        builtin_plugins
    };

    Ok(Config {
        threads,
        sequential,
        buffer_window_bytes,
        slot_range,
        clickhouse_enabled,
        spawn_clickhouse,
        builtin_plugins,
    })
}

fn parse_optional_buffer_window_bytes(
    raw: Option<&str>,
) -> Result<Option<u64>, Box<dyn std::error::Error>> {
    let Some(raw) = raw else {
        return Ok(None);
    };
    let parsed = jetstreamer_firehose::system::parse_buffer_window_bytes(raw).ok_or_else(|| {
        format!(
            "invalid buffer window '{}'; expected integer bytes or suffix like 4GiB/512MiB",
            raw
        )
    })?;
    Ok(Some(parsed))
}

fn parse_env_bool(key: &str, default: bool) -> bool {
    match std::env::var(key) {
        Ok(raw) => match raw.trim().to_ascii_lowercase().as_str() {
            "1" | "true" | "yes" | "on" => true,
            "0" | "false" | "no" | "off" => false,
            other => {
                log::warn!(
                    "unrecognized boolean value for {}='{}'; using default {}",
                    key,
                    other,
                    default
                );
                default
            }
        },
        Err(_) => default,
    }
}

fn should_spawn_for_dsn(dsn: &str) -> bool {
    let lower = dsn.to_ascii_lowercase();
    lower.contains("localhost") || lower.contains("127.0.0.1")
}