jetstreamer_plugin/
lib.rs

1#![deny(missing_docs)]
2//! Trait-based framework for building structured observers on top of Jetstreamer's firehose.
3//!
4//! # Overview
5//! Plugins let you react to every block, transaction, reward, entry, and stats update emitted
6//! by [`jetstreamer_firehose`](https://crates.io/crates/jetstreamer-firehose). Combined with
7//! the
8//! [`JetstreamerRunner`](https://docs.rs/jetstreamer/latest/jetstreamer/struct.JetstreamerRunner.html),
9//! they provide a high-throughput analytics pipeline capable of exceeding 2.7 million
10//! transactions per second on the right hardware. All events originate from Old Faithful's CAR
11//! archive and are streamed over the network into your local runner.
12//!
13//! The framework offers:
14//! - A [`Plugin`] trait with async hook points for each data type.
15//! - [`PluginRunner`] for coordinating multiple plugins with shared ClickHouse connections
16//!   (used internally by `JetstreamerRunner`).
17//! - Built-in plugins under [`plugins`] that demonstrate common batching strategies and
18//!   metrics.
19//! - See `JetstreamerRunner` in the `jetstreamer` crate for the easiest way to run plugins.
20//!
21//! # ClickHouse Integration
22//! Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances
23//! honor the following environment variables:
24//! - `JETSTREAMER_CLICKHOUSE_DSN` (default `http://localhost:8123`): HTTP(S) DSN handed to
25//!   every plugin that requests a database handle.
26//! - `JETSTREAMER_CLICKHOUSE_MODE` (default `auto`): toggles the bundled ClickHouse helper.
27//!   Set to `remote` to opt out of spawning the helper while still writing to a cluster,
28//!   `local` to always spawn, or `off` to disable ClickHouse entirely.
29//!
30//! When the mode is `auto`, Jetstreamer inspects the DSN at runtime and only launches the
31//! embedded helper for local endpoints, enabling native clustering workflows out of the box.
32//!
33//! # Batching ClickHouse Writes
34//! ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large
35//! numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a
36//! cadence that matches their workload. The default [`PluginRunner`] configuration triggers
37//! stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the
38//! database. The bundled [`plugins::program_tracking::ProgramTrackingPlugin`] mirrors this
39//! approach by accumulating `ProgramEvent` rows per worker thread and issuing a single batch
40//! insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive
41//! even under peak throughput.
42//!
43//! # Ordering Guarantees
44//! Also note that because Jetstreamer spawns parallel threads that process different subranges of
45//! the overall slot range at the same time, while each thread sees a purely sequential view of
46//! transactions, downstream services such as databases that consume this data will see writes in a
47//! fairly arbitrary order, so you should design your database tables and shared data structures
48//! accordingly.
49//!
50//! # Examples
51//! ## Defining a Plugin
52//! ```no_run
53//! use std::sync::Arc;
54//! use clickhouse::Client;
55//! use futures_util::FutureExt;
56//! use jetstreamer_firehose::firehose::TransactionData;
57//! use jetstreamer_plugin::{Plugin, PluginFuture};
58//!
59//! struct CountingPlugin;
60//!
61//! impl Plugin for CountingPlugin {
62//!     fn name(&self) -> &'static str { "counting" }
63//!
64//!     fn on_transaction<'a>(
65//!         &'a self,
66//!         _thread_id: usize,
67//!         _db: Option<Arc<Client>>,
68//!         transaction: &'a TransactionData,
69//!     ) -> PluginFuture<'a> {
70//!         async move {
71//!             println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
72//!             Ok(())
73//!         }
74//!         .boxed()
75//!     }
76//! }
77//! # let _plugin = CountingPlugin;
78//! ```
79//!
80//! ## Running Plugins with `PluginRunner`
81//! ```no_run
82//! use std::sync::Arc;
83//! use jetstreamer_firehose::epochs;
84//! use jetstreamer_plugin::{Plugin, PluginRunner};
85//!
86//! struct LoggingPlugin;
87//!
88//! impl Plugin for LoggingPlugin {
89//!     fn name(&self) -> &'static str { "logging" }
90//! }
91//!
92//! #[tokio::main]
93//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
94//!     let mut runner = PluginRunner::new("http://localhost:8123", 1);
95//!     runner.register(Box::new(LoggingPlugin));
96//!     let runner = Arc::new(runner);
97//!
98//!     let (start, _) = epochs::epoch_to_slot_range(800);
99//!     let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
100//!     runner
101//!         .clone()
102//!         .run(start..(end_inclusive + 1), false)
103//!         .await?;
104//!     Ok(())
105//! }
106//! ```
107
108/// Built-in plugin implementations that ship with Jetstreamer.
109pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114    fmt::Display,
115    future::Future,
116    ops::Range,
117    pin::Pin,
118    sync::{
119        Arc, Mutex,
120        atomic::{AtomicBool, AtomicU64, Ordering},
121    },
122    time::Duration,
123};
124
125use clickhouse::{Client, Row};
126use dashmap::DashMap;
127use futures_util::FutureExt;
128use jetstreamer_firehose::firehose::{
129    BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
130};
131use serde::Serialize;
132use sha2::{Digest, Sha256};
133use thiserror::Error;
134use tokio::{signal, sync::broadcast};
135
136#[derive(Clone, Copy)]
137struct Snapshot {
138    time: std::time::Instant,
139    slots: u64,
140    txs: u64,
141}
142
143#[derive(Clone, Copy, Default)]
144struct Contribution {
145    slot_delta: u64,
146    tx_delta: u64,
147    dt: f64,
148}
149
150#[derive(Clone, Copy, Default)]
151struct Rates {
152    slot_rate: f64,
153    tx_rate: f64,
154}
155
156#[derive(Default)]
157struct SnapshotWindow {
158    entries: [Option<Contribution>; 5],
159    idx: usize,
160    len: usize,
161    last: Option<Snapshot>,
162}
163
164impl SnapshotWindow {
165    fn update(&mut self, snapshot: Snapshot) -> Option<Rates> {
166        if let Some(prev) = self.last
167            && let Some(dt) = snapshot
168                .time
169                .checked_duration_since(prev.time)
170                .map(|d| d.as_secs_f64())
171            && dt > 0.0
172        {
173            let contrib = Contribution {
174                slot_delta: snapshot.slots.saturating_sub(prev.slots),
175                tx_delta: snapshot.txs.saturating_sub(prev.txs),
176                dt,
177            };
178            self.entries[self.idx] = Some(contrib);
179            self.idx = (self.idx + 1) % self.entries.len();
180            if self.len < self.entries.len() {
181                self.len += 1;
182            }
183        }
184        self.last = Some(snapshot);
185
186        let mut total_slots = 0u64;
187        let mut total_txs = 0u64;
188        let mut total_dt = 0.0;
189        for entry in self.entries.iter().flatten() {
190            total_slots = total_slots.saturating_add(entry.slot_delta);
191            total_txs = total_txs.saturating_add(entry.tx_delta);
192            total_dt += entry.dt;
193        }
194
195        if self.len < self.entries.len() || total_dt <= 0.0 {
196            return None;
197        }
198
199        Some(Rates {
200            slot_rate: total_slots as f64 / total_dt,
201            tx_rate: total_txs as f64 / total_dt,
202        })
203    }
204}
205
206/// Re-exported statistics types produced by [`firehose`].
207pub use jetstreamer_firehose::firehose::{Stats as FirehoseStats, ThreadStats};
208
209/// Convenience alias for the boxed future returned by plugin hooks.
210pub type PluginFuture<'a> = Pin<
211    Box<
212        dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
213            + Send
214            + 'a,
215    >,
216>;
217
218/// Trait implemented by plugins that consume firehose events.
219///
220/// See the crate-level documentation for usage examples.
221pub trait Plugin: Send + Sync + 'static {
222    /// Human-friendly plugin name used in logs and persisted metadata.
223    fn name(&self) -> &'static str;
224
225    /// Semantic version for the plugin; defaults to `1`.
226    fn version(&self) -> u16 {
227        1
228    }
229
230    /// Deterministic identifier derived from [`Plugin::name`].
231    fn id(&self) -> u16 {
232        let hash = Sha256::digest(self.name());
233        let mut res = 1u16;
234        for byte in hash {
235            res = res.wrapping_mul(31).wrapping_add(byte as u16);
236        }
237        res
238    }
239
240    /// Called for every transaction seen by the firehose.
241    fn on_transaction<'a>(
242        &'a self,
243        _thread_id: usize,
244        _db: Option<Arc<Client>>,
245        _transaction: &'a TransactionData,
246    ) -> PluginFuture<'a> {
247        async move { Ok(()) }.boxed()
248    }
249
250    /// Called for every block observed by the firehose.
251    fn on_block<'a>(
252        &'a self,
253        _thread_id: usize,
254        _db: Option<Arc<Client>>,
255        _block: &'a BlockData,
256    ) -> PluginFuture<'a> {
257        async move { Ok(()) }.boxed()
258    }
259
260    /// Called for every entry observed by the firehose when entry notifications are enabled.
261    fn on_entry<'a>(
262        &'a self,
263        _thread_id: usize,
264        _db: Option<Arc<Client>>,
265        _entry: &'a EntryData,
266    ) -> PluginFuture<'a> {
267        async move { Ok(()) }.boxed()
268    }
269
270    /// Called for reward updates associated with processed blocks.
271    fn on_reward<'a>(
272        &'a self,
273        _thread_id: usize,
274        _db: Option<Arc<Client>>,
275        _reward: &'a RewardsData,
276    ) -> PluginFuture<'a> {
277        async move { Ok(()) }.boxed()
278    }
279
280    /// Invoked once before the firehose starts streaming events.
281    fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
282        async move { Ok(()) }.boxed()
283    }
284
285    /// Invoked once after the firehose finishes or shuts down.
286    fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
287        async move { Ok(()) }.boxed()
288    }
289}
290
291/// Coordinates plugin execution and ClickHouse persistence.
292///
293/// See the crate-level documentation for usage examples.
294#[derive(Clone)]
295pub struct PluginRunner {
296    plugins: Arc<Vec<Arc<dyn Plugin>>>,
297    clickhouse_dsn: String,
298    num_threads: usize,
299    db_update_interval_slots: u64,
300}
301
302impl PluginRunner {
303    /// Creates a new runner that writes to `clickhouse_dsn` using `num_threads`.
304    pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self {
305        Self {
306            plugins: Arc::new(Vec::new()),
307            clickhouse_dsn: clickhouse_dsn.to_string(),
308            num_threads: std::cmp::max(1, num_threads),
309            db_update_interval_slots: 100,
310        }
311    }
312
313    /// Registers an additional plugin.
314    pub fn register(&mut self, plugin: Box<dyn Plugin>) {
315        Arc::get_mut(&mut self.plugins)
316            .expect("cannot register plugins after the runner has started")
317            .push(Arc::from(plugin));
318    }
319
320    /// Runs the firehose across the specified slot range, optionally writing to ClickHouse.
321    pub async fn run(
322        self: Arc<Self>,
323        slot_range: Range<u64>,
324        clickhouse_enabled: bool,
325    ) -> Result<(), PluginRunnerError> {
326        let db_update_interval = self.db_update_interval_slots.max(1);
327        let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
328            self.plugins
329                .iter()
330                .cloned()
331                .map(PluginHandle::from)
332                .collect(),
333        );
334
335        let clickhouse = if clickhouse_enabled {
336            let client = Arc::new(
337                Client::default()
338                    .with_url(&self.clickhouse_dsn)
339                    .with_option("async_insert", "1")
340                    .with_option("wait_for_async_insert", "0"),
341            );
342            ensure_clickhouse_tables(client.as_ref()).await?;
343            upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
344            Some(client)
345        } else {
346            None
347        };
348
349        for handle in plugin_handles.iter() {
350            if let Err(error) = handle
351                .plugin
352                .on_load(clickhouse.clone())
353                .await
354                .map_err(|e| e.to_string())
355            {
356                return Err(PluginRunnerError::PluginLifecycle {
357                    plugin: handle.name,
358                    stage: "on_load",
359                    details: error,
360                });
361            }
362        }
363
364        let shutting_down = Arc::new(AtomicBool::new(false));
365        let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
366        let clickhouse_enabled = clickhouse.is_some();
367        let slots_since_flush = Arc::new(AtomicU64::new(0));
368
369        let on_block = {
370            let plugin_handles = plugin_handles.clone();
371            let clickhouse = clickhouse.clone();
372            let slot_buffer = slot_buffer.clone();
373            let slots_since_flush = slots_since_flush.clone();
374            let shutting_down = shutting_down.clone();
375            move |thread_id: usize, block: BlockData| {
376                let plugin_handles = plugin_handles.clone();
377                let clickhouse = clickhouse.clone();
378                let slot_buffer = slot_buffer.clone();
379                let slots_since_flush = slots_since_flush.clone();
380                let shutting_down = shutting_down.clone();
381                async move {
382                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
383                    if plugin_handles.is_empty() {
384                        return Ok(());
385                    }
386                    if shutting_down.load(Ordering::SeqCst) {
387                        log::debug!(
388                            target: &log_target,
389                            "ignoring block while shutdown is in progress"
390                        );
391                        return Ok(());
392                    }
393                    let block = Arc::new(block);
394                    for handle in plugin_handles.iter() {
395                        let db = clickhouse.clone();
396                        if let Err(err) = handle
397                            .plugin
398                            .on_block(thread_id, db.clone(), block.as_ref())
399                            .await
400                        {
401                            log::error!(
402                                target: &log_target,
403                                "plugin {} on_block error: {}",
404                                handle.name,
405                                err
406                            );
407                            continue;
408                        }
409                        if let (Some(db_client), BlockData::Block { slot, .. }) =
410                            (clickhouse.clone(), block.as_ref())
411                        {
412                            if clickhouse_enabled {
413                                slot_buffer
414                                    .entry(handle.id)
415                                    .or_default()
416                                    .push(PluginSlotRow {
417                                        plugin_id: handle.id as u32,
418                                        slot: *slot,
419                                    });
420                            } else if let Err(err) =
421                                record_plugin_slot(db_client, handle.id, *slot).await
422                            {
423                                log::error!(
424                                    target: &log_target,
425                                    "failed to record plugin slot for {}: {}",
426                                    handle.name,
427                                    err
428                                );
429                            }
430                        }
431                    }
432                    if clickhouse_enabled {
433                        let current = slots_since_flush
434                            .fetch_add(1, Ordering::Relaxed)
435                            .wrapping_add(1);
436                        if current.is_multiple_of(db_update_interval)
437                            && let Some(db_client) = clickhouse.clone()
438                        {
439                            let buffer = slot_buffer.clone();
440                            let log_target_clone = log_target.clone();
441                            tokio::spawn(async move {
442                                if let Err(err) = flush_slot_buffer(db_client, buffer).await {
443                                    log::error!(
444                                        target: &log_target_clone,
445                                        "failed to flush buffered plugin slots: {}",
446                                        err
447                                    );
448                                }
449                            });
450                        }
451                    }
452                    if let Some(db_client) = clickhouse.clone() {
453                        match block.as_ref() {
454                            BlockData::Block {
455                                slot,
456                                executed_transaction_count,
457                                ..
458                            } => {
459                                if let Err(err) = record_slot_status(
460                                    db_client,
461                                    *slot,
462                                    thread_id,
463                                    *executed_transaction_count,
464                                )
465                                .await
466                                {
467                                    log::error!(
468                                        target: &log_target,
469                                        "failed to record slot status: {}",
470                                        err
471                                    );
472                                }
473                            }
474                            BlockData::LeaderSkipped { slot } => {
475                                if let Err(err) =
476                                    record_slot_status(db_client, *slot, thread_id, 0).await
477                                {
478                                    log::error!(
479                                        target: &log_target,
480                                        "failed to record slot status: {}",
481                                        err
482                                    );
483                                }
484                            }
485                        }
486                    }
487                    Ok(())
488                }
489                .boxed()
490            }
491        };
492
493        let on_transaction = {
494            let plugin_handles = plugin_handles.clone();
495            let clickhouse = clickhouse.clone();
496            let shutting_down = shutting_down.clone();
497            move |thread_id: usize, transaction: TransactionData| {
498                let plugin_handles = plugin_handles.clone();
499                let clickhouse = clickhouse.clone();
500                let shutting_down = shutting_down.clone();
501                async move {
502                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
503                    if plugin_handles.is_empty() {
504                        return Ok(());
505                    }
506                    if shutting_down.load(Ordering::SeqCst) {
507                        log::debug!(
508                            target: &log_target,
509                            "ignoring transaction while shutdown is in progress"
510                        );
511                        return Ok(());
512                    }
513                    let transaction = Arc::new(transaction);
514                    for handle in plugin_handles.iter() {
515                        if let Err(err) = handle
516                            .plugin
517                            .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref())
518                            .await
519                        {
520                            log::error!(
521                                target: &log_target,
522                                "plugin {} on_transaction error: {}",
523                                handle.name,
524                                err
525                            );
526                        }
527                    }
528                    Ok(())
529                }
530                .boxed()
531            }
532        };
533
534        let on_entry = {
535            let plugin_handles = plugin_handles.clone();
536            let clickhouse = clickhouse.clone();
537            let shutting_down = shutting_down.clone();
538            move |thread_id: usize, entry: EntryData| {
539                let plugin_handles = plugin_handles.clone();
540                let clickhouse = clickhouse.clone();
541                let shutting_down = shutting_down.clone();
542                async move {
543                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
544                    if plugin_handles.is_empty() {
545                        return Ok(());
546                    }
547                    if shutting_down.load(Ordering::SeqCst) {
548                        log::debug!(
549                            target: &log_target,
550                            "ignoring entry while shutdown is in progress"
551                        );
552                        return Ok(());
553                    }
554                    let entry = Arc::new(entry);
555                    for handle in plugin_handles.iter() {
556                        if let Err(err) = handle
557                            .plugin
558                            .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
559                            .await
560                        {
561                            log::error!(
562                                target: &log_target,
563                                "plugin {} on_entry error: {}",
564                                handle.name,
565                                err
566                            );
567                        }
568                    }
569                    Ok(())
570                }
571                .boxed()
572            }
573        };
574
575        let on_reward = {
576            let plugin_handles = plugin_handles.clone();
577            let clickhouse = clickhouse.clone();
578            let shutting_down = shutting_down.clone();
579            move |thread_id: usize, reward: RewardsData| {
580                let plugin_handles = plugin_handles.clone();
581                let clickhouse = clickhouse.clone();
582                let shutting_down = shutting_down.clone();
583                async move {
584                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
585                    if plugin_handles.is_empty() {
586                        return Ok(());
587                    }
588                    if shutting_down.load(Ordering::SeqCst) {
589                        log::debug!(
590                            target: &log_target,
591                            "ignoring reward while shutdown is in progress"
592                        );
593                        return Ok(());
594                    }
595                    let reward = Arc::new(reward);
596                    for handle in plugin_handles.iter() {
597                        if let Err(err) = handle
598                            .plugin
599                            .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
600                            .await
601                        {
602                            log::error!(
603                                target: &log_target,
604                                "plugin {} on_reward error: {}",
605                                handle.name,
606                                err
607                            );
608                        }
609                    }
610                    Ok(())
611                }
612                .boxed()
613            }
614        };
615
616        let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
617
618        let total_slot_count_capture = total_slot_count;
619        let stats_tracking = clickhouse.clone().map(|_db| {
620            let shutting_down = shutting_down.clone();
621            let last_snapshot: Arc<Mutex<SnapshotWindow>> =
622                Arc::new(Mutex::new(SnapshotWindow::default()));
623            StatsTracking {
624                on_stats: {
625                    let last_snapshot = last_snapshot.clone();
626                    let total_slot_count = total_slot_count_capture;
627                    move |thread_id: usize, stats: Stats| {
628                        let shutting_down = shutting_down.clone();
629                        let last_snapshot = last_snapshot.clone();
630                        async move {
631                            let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
632                            if shutting_down.load(Ordering::SeqCst) {
633                                log::debug!(
634                                    target: &log_target,
635                                    "skipping stats write during shutdown"
636                                );
637                                return Ok(());
638                            }
639                            let finish_at = stats
640                                .finish_time
641                                .unwrap_or_else(std::time::Instant::now);
642                            let elapsed = finish_at.saturating_duration_since(stats.start_time);
643                            let elapsed_secs = elapsed.as_secs_f64();
644                            let mut tps = if elapsed_secs > 0.0 {
645                                stats.transactions_processed as f64 / elapsed_secs
646                            } else {
647                                0.0
648                            };
649                            let thread_stats = &stats.thread_stats;
650                            let processed_slots = stats.slots_processed.min(total_slot_count);
651                            let progress_fraction = if total_slot_count > 0 {
652                                processed_slots as f64 / total_slot_count as f64
653                            } else {
654                                1.0
655                            };
656                            let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
657                            let thread_total_slots = thread_stats
658                                .slot_range
659                                .end
660                                .saturating_sub(thread_stats.slot_range.start);
661                            let thread_progress = if thread_total_slots > 0 {
662                                (thread_stats.slots_processed as f64 / thread_total_slots as f64)
663                                    .clamp(0.0, 1.0)
664                                    * 100.0
665                            } else {
666                                100.0
667                            };
668                            let mut overall_eta = None;
669                            if let Ok(mut window) = last_snapshot.lock()
670                                && let Some(rates) = window.update(Snapshot {
671                                    time: finish_at,
672                                    slots: stats.slots_processed,
673                                    txs: stats.transactions_processed,
674                                }) {
675                                    tps = rates.tx_rate;
676                                    let remaining_slots =
677                                        total_slot_count.saturating_sub(processed_slots);
678                                    if rates.slot_rate > 0.0 && remaining_slots > 0 {
679                                        overall_eta = Some(human_readable_duration(
680                                            remaining_slots as f64 / rates.slot_rate,
681                                        ));
682                                    }
683                                }
684                            if overall_eta.is_none() {
685                                if progress_fraction > 0.0 && progress_fraction < 1.0 {
686                                    if let Some(elapsed_total) = finish_at
687                                        .checked_duration_since(stats.start_time)
688                                        .map(|d| d.as_secs_f64())
689                                        && elapsed_total > 0.0 {
690                                            let remaining_secs =
691                                                elapsed_total * (1.0 / progress_fraction - 1.0);
692                                            overall_eta = Some(human_readable_duration(remaining_secs));
693                                        }
694                                } else if progress_fraction >= 1.0 {
695                                    overall_eta = Some("0s".into());
696                                }
697                            }
698                            let slots_display = human_readable_count(processed_slots);
699                            let blocks_display = human_readable_count(stats.blocks_processed);
700                            let txs_display = human_readable_count(stats.transactions_processed);
701                            let tps_display = human_readable_count(tps.ceil() as u64);
702                            log::info!(
703                                target: &log_target,
704                                "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
705                                overall_eta.unwrap_or_else(|| "n/a".into()),
706                            );
707                            Ok(())
708                        }
709                        .boxed()
710                    }
711                },
712                tracking_interval_slots: 100,
713            }
714        });
715
716        let (shutdown_tx, _) = broadcast::channel::<()>(1);
717
718        let mut firehose_future = Box::pin(firehose(
719            self.num_threads as u64,
720            slot_range,
721            Some(on_block),
722            Some(on_transaction),
723            Some(on_entry),
724            Some(on_reward),
725            stats_tracking,
726            Some(shutdown_tx.subscribe()),
727        ));
728
729        let firehose_result = tokio::select! {
730            res = &mut firehose_future => res,
731            ctrl = signal::ctrl_c() => {
732                match ctrl {
733                    Ok(()) => log::info!(
734                        target: LOG_MODULE,
735                        "CTRL+C received; initiating shutdown"
736                    ),
737                    Err(err) => log::error!(
738                        target: LOG_MODULE,
739                        "failed to listen for CTRL+C: {}",
740                        err
741                    ),
742                }
743                shutting_down.store(true, Ordering::SeqCst);
744                let _ = shutdown_tx.send(());
745                firehose_future.await
746            }
747        };
748
749        if clickhouse_enabled
750            && let Some(db_client) = clickhouse.clone()
751            && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
752        {
753            log::error!(
754                target: LOG_MODULE,
755                "failed to flush buffered plugin slots: {}",
756                err
757            );
758        }
759
760        for handle in plugin_handles.iter() {
761            if let Err(error) = handle
762                .plugin
763                .on_exit(clickhouse.clone())
764                .await
765                .map_err(|e| e.to_string())
766            {
767                log::error!(
768                    target: LOG_MODULE,
769                    "plugin {} on_exit error: {}",
770                    handle.name,
771                    error
772                );
773            }
774        }
775
776        match firehose_result {
777            Ok(()) => Ok(()),
778            Err((error, slot)) => Err(PluginRunnerError::Firehose {
779                details: error.to_string(),
780                slot,
781            }),
782        }
783    }
784}
785
786/// Errors that can arise while running plugins against the firehose.
787#[derive(Debug, Error)]
788pub enum PluginRunnerError {
789    /// ClickHouse client returned an error.
790    #[error("clickhouse error: {0}")]
791    Clickhouse(#[from] clickhouse::error::Error),
792    /// Firehose streaming failed at the specified slot.
793    #[error("firehose error at slot {slot}: {details}")]
794    Firehose {
795        /// Human-readable description of the firehose failure.
796        details: String,
797        /// Slot where the firehose encountered the error.
798        slot: u64,
799    },
800    /// Lifecycle hook on a plugin returned an error.
801    #[error("plugin {plugin} failed during {stage}: {details}")]
802    PluginLifecycle {
803        /// Name of the plugin that failed.
804        plugin: &'static str,
805        /// Lifecycle stage where the failure occurred.
806        stage: &'static str,
807        /// Textual error details.
808        details: String,
809    },
810}
811
812#[derive(Clone)]
813struct PluginHandle {
814    plugin: Arc<dyn Plugin>,
815    id: u16,
816    name: &'static str,
817    version: u16,
818}
819
820impl From<Arc<dyn Plugin>> for PluginHandle {
821    fn from(plugin: Arc<dyn Plugin>) -> Self {
822        let id = plugin.id();
823        let name = plugin.name();
824        let version = plugin.version();
825        Self {
826            plugin,
827            id,
828            name,
829            version,
830        }
831    }
832}
833
834#[derive(Row, Serialize)]
835struct PluginRow<'a> {
836    id: u32,
837    name: &'a str,
838    version: u32,
839}
840
841#[derive(Row, Serialize)]
842struct PluginSlotRow {
843    plugin_id: u32,
844    slot: u64,
845}
846
847#[derive(Row, Serialize)]
848struct SlotStatusRow {
849    slot: u64,
850    transaction_count: u32,
851    thread_id: u8,
852}
853
854async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
855    db.query(
856        r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
857            slot UInt64,
858            transaction_count UInt32 DEFAULT 0,
859            thread_id UInt8 DEFAULT 0,
860            indexed_at DateTime('UTC') DEFAULT now()
861        ) ENGINE = ReplacingMergeTree
862        ORDER BY (slot, thread_id)"#,
863    )
864    .execute()
865    .await?;
866
867    db.query(
868        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
869            id UInt32,
870            name String,
871            version UInt32
872        ) ENGINE = ReplacingMergeTree
873        ORDER BY id"#,
874    )
875    .execute()
876    .await?;
877
878    db.query(
879        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
880            plugin_id UInt32,
881            slot UInt64,
882            indexed_at DateTime('UTC') DEFAULT now()
883        ) ENGINE = ReplacingMergeTree
884        ORDER BY (plugin_id, slot)"#,
885    )
886    .execute()
887    .await?;
888
889    Ok(())
890}
891
892async fn upsert_plugins(
893    db: &Client,
894    plugins: &[PluginHandle],
895) -> Result<(), clickhouse::error::Error> {
896    if plugins.is_empty() {
897        return Ok(());
898    }
899    let mut insert = db.insert("jetstreamer_plugins")?;
900    for handle in plugins {
901        insert
902            .write(&PluginRow {
903                id: handle.id as u32,
904                name: handle.name,
905                version: handle.version as u32,
906            })
907            .await?;
908    }
909    insert.end().await?;
910    Ok(())
911}
912
913async fn record_plugin_slot(
914    db: Arc<Client>,
915    plugin_id: u16,
916    slot: u64,
917) -> Result<(), clickhouse::error::Error> {
918    let mut insert = db.insert("jetstreamer_plugin_slots")?;
919    insert
920        .write(&PluginSlotRow {
921            plugin_id: plugin_id as u32,
922            slot,
923        })
924        .await?;
925    insert.end().await?;
926    Ok(())
927}
928
929async fn flush_slot_buffer(
930    db: Arc<Client>,
931    buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
932) -> Result<(), clickhouse::error::Error> {
933    let mut rows = Vec::new();
934    buffer.iter_mut().for_each(|mut entry| {
935        if !entry.value().is_empty() {
936            rows.append(entry.value_mut());
937        }
938    });
939
940    if rows.is_empty() {
941        return Ok(());
942    }
943
944    let mut insert = db.insert("jetstreamer_plugin_slots")?;
945    for row in rows {
946        insert.write(&row).await?;
947    }
948    insert.end().await?;
949    Ok(())
950}
951
952async fn record_slot_status(
953    db: Arc<Client>,
954    slot: u64,
955    thread_id: usize,
956    transaction_count: u64,
957) -> Result<(), clickhouse::error::Error> {
958    let mut insert = db.insert("jetstreamer_slot_status")?;
959    insert
960        .write(&SlotStatusRow {
961            slot,
962            transaction_count: transaction_count.min(u32::MAX as u64) as u32,
963            thread_id: thread_id.try_into().unwrap_or(u8::MAX),
964        })
965        .await?;
966    insert.end().await?;
967    Ok(())
968}
969
970// Ensure PluginRunnerError is Send + Sync + 'static
971trait _CanSend: Send + Sync + 'static {}
972impl _CanSend for PluginRunnerError {}
973
974#[inline]
975fn human_readable_count(value: impl Into<u128>) -> String {
976    let digits = value.into().to_string();
977    let len = digits.len();
978    let mut formatted = String::with_capacity(len + len / 3);
979    for (idx, byte) in digits.bytes().enumerate() {
980        if idx != 0 && (len - idx) % 3 == 0 {
981            formatted.push(',');
982        }
983        formatted.push(char::from(byte));
984    }
985    formatted
986}
987
988fn human_readable_duration(seconds: f64) -> String {
989    if !seconds.is_finite() {
990        return "n/a".into();
991    }
992    if seconds <= 0.0 {
993        return "0s".into();
994    }
995    if seconds < 60.0 {
996        return format!("{:.1}s", seconds);
997    }
998    let duration = Duration::from_secs(seconds.round() as u64);
999    let secs = duration.as_secs();
1000    let days = secs / 86_400;
1001    let hours = (secs % 86_400) / 3_600;
1002    let minutes = (secs % 3_600) / 60;
1003    let seconds_rem = secs % 60;
1004    if days > 0 {
1005        if hours > 0 {
1006            format!("{}d{}h", days, hours)
1007        } else {
1008            format!("{}d", days)
1009        }
1010    } else if hours > 0 {
1011        format!("{}h{}m", hours, minutes)
1012    } else {
1013        format!("{}m{}s", minutes, seconds_rem)
1014    }
1015}