jetstreamer_plugin/
lib.rs

1#![deny(missing_docs)]
2//! Trait-based framework for building structured observers on top of Jetstreamer's firehose.
3//!
4//! # Overview
5//! Plugins let you react to every block, transaction, reward, entry, and stats update emitted
6//! by [`jetstreamer_firehose`](https://crates.io/crates/jetstreamer-firehose). Combined with
7//! the
8//! [`JetstreamerRunner`](https://docs.rs/jetstreamer/latest/jetstreamer/struct.JetstreamerRunner.html),
9//! they provide a high-throughput analytics pipeline capable of exceeding 2.7 million
10//! transactions per second on the right hardware. All events originate from Old Faithful's CAR
11//! archive and are streamed over the network into your local runner.
12//!
13//! The framework offers:
14//! - A [`Plugin`] trait with async hook points for each data type.
15//! - [`PluginRunner`] for coordinating multiple plugins with shared ClickHouse connections
16//!   (used internally by `JetstreamerRunner`).
17//! - Built-in plugins under [`plugins`] that demonstrate common batching strategies and
18//!   metrics.
19//! - See `JetstreamerRunner` in the `jetstreamer` crate for the easiest way to run plugins.
20//!
21//! # ClickHouse Integration
22//! Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances
23//! honor the following environment variables:
24//! - `JETSTREAMER_CLICKHOUSE_DSN` (default `http://localhost:8123`): HTTP(S) DSN handed to
25//!   every plugin that requests a database handle.
26//! - `JETSTREAMER_CLICKHOUSE_MODE` (default `auto`): toggles the bundled ClickHouse helper.
27//!   Set to `remote` to opt out of spawning the helper while still writing to a cluster,
28//!   `local` to always spawn, or `off` to disable ClickHouse entirely.
29//!
30//! When the mode is `auto`, Jetstreamer inspects the DSN at runtime and only launches the
31//! embedded helper for local endpoints, enabling native clustering workflows out of the box.
32//!
33//! # Batching ClickHouse Writes
34//! ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large
35//! numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a
36//! cadence that matches their workload. The default [`PluginRunner`] configuration triggers
37//! stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the
38//! database. The bundled [`plugins::program_tracking::ProgramTrackingPlugin`] mirrors this
39//! approach by accumulating `ProgramEvent` rows per worker thread and issuing a single batch
40//! insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive
41//! even under peak throughput.
42//!
43//! # Ordering Guarantees
44//! Also note that because Jetstreamer spawns parallel threads that process different subranges of
45//! the overall slot range at the same time, while each thread sees a purely sequential view of
46//! transactions, downstream services such as databases that consume this data will see writes in a
47//! fairly arbitrary order, so you should design your database tables and shared data structures
48//! accordingly.
49//!
50//! # Examples
51//! ## Defining a Plugin
52//! ```no_run
53//! use std::sync::Arc;
54//! use clickhouse::Client;
55//! use futures_util::FutureExt;
56//! use jetstreamer_firehose::firehose::TransactionData;
57//! use jetstreamer_plugin::{Plugin, PluginFuture};
58//!
59//! struct CountingPlugin;
60//!
61//! impl Plugin for CountingPlugin {
62//!     fn name(&self) -> &'static str { "counting" }
63//!
64//!     fn on_transaction<'a>(
65//!         &'a self,
66//!         _thread_id: usize,
67//!         _db: Option<Arc<Client>>,
68//!         transaction: &'a TransactionData,
69//!     ) -> PluginFuture<'a> {
70//!         async move {
71//!             println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
72//!             Ok(())
73//!         }
74//!         .boxed()
75//!     }
76//! }
77//! # let _plugin = CountingPlugin;
78//! ```
79//!
80//! ## Running Plugins with `PluginRunner`
81//! ```no_run
82//! use std::sync::Arc;
83//! use jetstreamer_firehose::epochs;
84//! use jetstreamer_plugin::{Plugin, PluginRunner};
85//!
86//! struct LoggingPlugin;
87//!
88//! impl Plugin for LoggingPlugin {
89//!     fn name(&self) -> &'static str { "logging" }
90//! }
91//!
92//! #[tokio::main]
93//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
94//!     let mut runner = PluginRunner::new("http://localhost:8123", 1);
95//!     runner.register(Box::new(LoggingPlugin));
96//!     let runner = Arc::new(runner);
97//!
98//!     let (start, _) = epochs::epoch_to_slot_range(800);
99//!     let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
100//!     runner
101//!         .clone()
102//!         .run(start..(end_inclusive + 1), false)
103//!         .await?;
104//!     Ok(())
105//! }
106//! ```
107
108/// Built-in plugin implementations that ship with Jetstreamer.
109pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114    fmt::Display,
115    future::Future,
116    hint,
117    ops::Range,
118    pin::Pin,
119    sync::{
120        Arc,
121        atomic::{AtomicBool, AtomicU64, Ordering},
122    },
123    time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130    BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use serde::Serialize;
133use sha2::{Digest, Sha256};
134use thiserror::Error;
135use tokio::{signal, sync::broadcast};
136
137/// Re-exported statistics types produced by [`firehose`].
138pub use jetstreamer_firehose::firehose::{
139    FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
140};
141
142// Global totals snapshot used to compute overall TPS/ETA between pulses.
143static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
144static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
145static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
146static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
147#[inline]
148fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
149    origin.elapsed().as_nanos() as u64
150}
151
152/// Convenience alias for the boxed future returned by plugin hooks.
153pub type PluginFuture<'a> = Pin<
154    Box<
155        dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
156            + Send
157            + 'a,
158    >,
159>;
160
161/// Trait implemented by plugins that consume firehose events.
162///
163/// See the crate-level documentation for usage examples.
164pub trait Plugin: Send + Sync + 'static {
165    /// Human-friendly plugin name used in logs and persisted metadata.
166    fn name(&self) -> &'static str;
167
168    /// Semantic version for the plugin; defaults to `1`.
169    fn version(&self) -> u16 {
170        1
171    }
172
173    /// Deterministic identifier derived from [`Plugin::name`].
174    fn id(&self) -> u16 {
175        let hash = Sha256::digest(self.name());
176        let mut res = 1u16;
177        for byte in hash {
178            res = res.wrapping_mul(31).wrapping_add(byte as u16);
179        }
180        res
181    }
182
183    /// Called for every transaction seen by the firehose.
184    fn on_transaction<'a>(
185        &'a self,
186        _thread_id: usize,
187        _db: Option<Arc<Client>>,
188        _transaction: &'a TransactionData,
189    ) -> PluginFuture<'a> {
190        async move { Ok(()) }.boxed()
191    }
192
193    /// Called for every block observed by the firehose.
194    fn on_block<'a>(
195        &'a self,
196        _thread_id: usize,
197        _db: Option<Arc<Client>>,
198        _block: &'a BlockData,
199    ) -> PluginFuture<'a> {
200        async move { Ok(()) }.boxed()
201    }
202
203    /// Called for every entry observed by the firehose when entry notifications are enabled.
204    fn on_entry<'a>(
205        &'a self,
206        _thread_id: usize,
207        _db: Option<Arc<Client>>,
208        _entry: &'a EntryData,
209    ) -> PluginFuture<'a> {
210        async move { Ok(()) }.boxed()
211    }
212
213    /// Called for reward updates associated with processed blocks.
214    fn on_reward<'a>(
215        &'a self,
216        _thread_id: usize,
217        _db: Option<Arc<Client>>,
218        _reward: &'a RewardsData,
219    ) -> PluginFuture<'a> {
220        async move { Ok(()) }.boxed()
221    }
222
223    /// Called whenever a firehose thread encounters an error before restarting.
224    fn on_error<'a>(
225        &'a self,
226        _thread_id: usize,
227        _db: Option<Arc<Client>>,
228        _error: &'a FirehoseErrorContext,
229    ) -> PluginFuture<'a> {
230        async move { Ok(()) }.boxed()
231    }
232
233    /// Invoked once before the firehose starts streaming events.
234    fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
235        async move { Ok(()) }.boxed()
236    }
237
238    /// Invoked once after the firehose finishes or shuts down.
239    fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
240        async move { Ok(()) }.boxed()
241    }
242}
243
244/// Coordinates plugin execution and ClickHouse persistence.
245///
246/// See the crate-level documentation for usage examples.
247#[derive(Clone)]
248pub struct PluginRunner {
249    plugins: Arc<Vec<Arc<dyn Plugin>>>,
250    clickhouse_dsn: String,
251    num_threads: usize,
252    db_update_interval_slots: u64,
253}
254
255impl PluginRunner {
256    /// Creates a new runner that writes to `clickhouse_dsn` using `num_threads`.
257    pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self {
258        Self {
259            plugins: Arc::new(Vec::new()),
260            clickhouse_dsn: clickhouse_dsn.to_string(),
261            num_threads: std::cmp::max(1, num_threads),
262            db_update_interval_slots: 100,
263        }
264    }
265
266    /// Registers an additional plugin.
267    pub fn register(&mut self, plugin: Box<dyn Plugin>) {
268        Arc::get_mut(&mut self.plugins)
269            .expect("cannot register plugins after the runner has started")
270            .push(Arc::from(plugin));
271    }
272
273    /// Runs the firehose across the specified slot range, optionally writing to ClickHouse.
274    pub async fn run(
275        self: Arc<Self>,
276        slot_range: Range<u64>,
277        clickhouse_enabled: bool,
278    ) -> Result<(), PluginRunnerError> {
279        let db_update_interval = self.db_update_interval_slots.max(1);
280        let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
281            self.plugins
282                .iter()
283                .cloned()
284                .map(PluginHandle::from)
285                .collect(),
286        );
287
288        let clickhouse = if clickhouse_enabled {
289            let client = Arc::new(
290                Client::default()
291                    .with_url(&self.clickhouse_dsn)
292                    .with_option("async_insert", "1")
293                    .with_option("wait_for_async_insert", "0"),
294            );
295            ensure_clickhouse_tables(client.as_ref()).await?;
296            upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
297            Some(client)
298        } else {
299            None
300        };
301
302        for handle in plugin_handles.iter() {
303            if let Err(error) = handle
304                .plugin
305                .on_load(clickhouse.clone())
306                .await
307                .map_err(|e| e.to_string())
308            {
309                return Err(PluginRunnerError::PluginLifecycle {
310                    plugin: handle.name,
311                    stage: "on_load",
312                    details: error,
313                });
314            }
315        }
316
317        let shutting_down = Arc::new(AtomicBool::new(false));
318        let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
319        let clickhouse_enabled = clickhouse.is_some();
320        let slots_since_flush = Arc::new(AtomicU64::new(0));
321
322        let on_block = {
323            let plugin_handles = plugin_handles.clone();
324            let clickhouse = clickhouse.clone();
325            let slot_buffer = slot_buffer.clone();
326            let slots_since_flush = slots_since_flush.clone();
327            let shutting_down = shutting_down.clone();
328            move |thread_id: usize, block: BlockData| {
329                let plugin_handles = plugin_handles.clone();
330                let clickhouse = clickhouse.clone();
331                let slot_buffer = slot_buffer.clone();
332                let slots_since_flush = slots_since_flush.clone();
333                let shutting_down = shutting_down.clone();
334                async move {
335                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
336                    if plugin_handles.is_empty() {
337                        return Ok(());
338                    }
339                    if shutting_down.load(Ordering::SeqCst) {
340                        log::debug!(
341                            target: &log_target,
342                            "ignoring block while shutdown is in progress"
343                        );
344                        return Ok(());
345                    }
346                    let block = Arc::new(block);
347                    for handle in plugin_handles.iter() {
348                        let db = clickhouse.clone();
349                        if let Err(err) = handle
350                            .plugin
351                            .on_block(thread_id, db.clone(), block.as_ref())
352                            .await
353                        {
354                            log::error!(
355                                target: &log_target,
356                                "plugin {} on_block error: {}",
357                                handle.name,
358                                err
359                            );
360                            continue;
361                        }
362                        if let (Some(db_client), BlockData::Block { slot, .. }) =
363                            (clickhouse.clone(), block.as_ref())
364                        {
365                            if clickhouse_enabled {
366                                slot_buffer
367                                    .entry(handle.id)
368                                    .or_default()
369                                    .push(PluginSlotRow {
370                                        plugin_id: handle.id as u32,
371                                        slot: *slot,
372                                    });
373                            } else if let Err(err) =
374                                record_plugin_slot(db_client, handle.id, *slot).await
375                            {
376                                log::error!(
377                                    target: &log_target,
378                                    "failed to record plugin slot for {}: {}",
379                                    handle.name,
380                                    err
381                                );
382                            }
383                        }
384                    }
385                    if clickhouse_enabled {
386                        let current = slots_since_flush
387                            .fetch_add(1, Ordering::Relaxed)
388                            .wrapping_add(1);
389                        if current.is_multiple_of(db_update_interval)
390                            && let Some(db_client) = clickhouse.clone()
391                        {
392                            let buffer = slot_buffer.clone();
393                            let log_target_clone = log_target.clone();
394                            tokio::spawn(async move {
395                                if let Err(err) = flush_slot_buffer(db_client, buffer).await {
396                                    log::error!(
397                                        target: &log_target_clone,
398                                        "failed to flush buffered plugin slots: {}",
399                                        err
400                                    );
401                                }
402                            });
403                        }
404                    }
405                    if let Some(db_client) = clickhouse.clone() {
406                        match block.as_ref() {
407                            BlockData::Block {
408                                slot,
409                                executed_transaction_count,
410                                block_time,
411                                ..
412                            } => {
413                                if let Err(err) = record_slot_status(
414                                    db_client,
415                                    *slot,
416                                    thread_id,
417                                    *executed_transaction_count,
418                                    *block_time,
419                                )
420                                .await
421                                {
422                                    log::error!(
423                                        target: &log_target,
424                                        "failed to record slot status: {}",
425                                        err
426                                    );
427                                }
428                            }
429                            BlockData::PossibleLeaderSkipped { .. } => {}
430                        }
431                    }
432                    Ok(())
433                }
434                .boxed()
435            }
436        };
437
438        let on_transaction = {
439            let plugin_handles = plugin_handles.clone();
440            let clickhouse = clickhouse.clone();
441            let shutting_down = shutting_down.clone();
442            move |thread_id: usize, transaction: TransactionData| {
443                let plugin_handles = plugin_handles.clone();
444                let clickhouse = clickhouse.clone();
445                let shutting_down = shutting_down.clone();
446                async move {
447                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
448                    if plugin_handles.is_empty() {
449                        return Ok(());
450                    }
451                    if shutting_down.load(Ordering::SeqCst) {
452                        log::debug!(
453                            target: &log_target,
454                            "ignoring transaction while shutdown is in progress"
455                        );
456                        return Ok(());
457                    }
458                    let transaction = Arc::new(transaction);
459                    for handle in plugin_handles.iter() {
460                        if let Err(err) = handle
461                            .plugin
462                            .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref())
463                            .await
464                        {
465                            log::error!(
466                                target: &log_target,
467                                "plugin {} on_transaction error: {}",
468                                handle.name,
469                                err
470                            );
471                        }
472                    }
473                    Ok(())
474                }
475                .boxed()
476            }
477        };
478
479        let on_entry = {
480            let plugin_handles = plugin_handles.clone();
481            let clickhouse = clickhouse.clone();
482            let shutting_down = shutting_down.clone();
483            move |thread_id: usize, entry: EntryData| {
484                let plugin_handles = plugin_handles.clone();
485                let clickhouse = clickhouse.clone();
486                let shutting_down = shutting_down.clone();
487                async move {
488                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
489                    if plugin_handles.is_empty() {
490                        return Ok(());
491                    }
492                    if shutting_down.load(Ordering::SeqCst) {
493                        log::debug!(
494                            target: &log_target,
495                            "ignoring entry while shutdown is in progress"
496                        );
497                        return Ok(());
498                    }
499                    let entry = Arc::new(entry);
500                    for handle in plugin_handles.iter() {
501                        if let Err(err) = handle
502                            .plugin
503                            .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
504                            .await
505                        {
506                            log::error!(
507                                target: &log_target,
508                                "plugin {} on_entry error: {}",
509                                handle.name,
510                                err
511                            );
512                        }
513                    }
514                    Ok(())
515                }
516                .boxed()
517            }
518        };
519
520        let on_reward = {
521            let plugin_handles = plugin_handles.clone();
522            let clickhouse = clickhouse.clone();
523            let shutting_down = shutting_down.clone();
524            move |thread_id: usize, reward: RewardsData| {
525                let plugin_handles = plugin_handles.clone();
526                let clickhouse = clickhouse.clone();
527                let shutting_down = shutting_down.clone();
528                async move {
529                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
530                    if plugin_handles.is_empty() {
531                        return Ok(());
532                    }
533                    if shutting_down.load(Ordering::SeqCst) {
534                        log::debug!(
535                            target: &log_target,
536                            "ignoring reward while shutdown is in progress"
537                        );
538                        return Ok(());
539                    }
540                    let reward = Arc::new(reward);
541                    for handle in plugin_handles.iter() {
542                        if let Err(err) = handle
543                            .plugin
544                            .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
545                            .await
546                        {
547                            log::error!(
548                                target: &log_target,
549                                "plugin {} on_reward error: {}",
550                                handle.name,
551                                err
552                            );
553                        }
554                    }
555                    Ok(())
556                }
557                .boxed()
558            }
559        };
560
561        let on_error = {
562            let plugin_handles = plugin_handles.clone();
563            let clickhouse = clickhouse.clone();
564            let shutting_down = shutting_down.clone();
565            move |thread_id: usize, context: FirehoseErrorContext| {
566                let plugin_handles = plugin_handles.clone();
567                let clickhouse = clickhouse.clone();
568                let shutting_down = shutting_down.clone();
569                async move {
570                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
571                    if plugin_handles.is_empty() {
572                        return Ok(());
573                    }
574                    if shutting_down.load(Ordering::SeqCst) {
575                        log::debug!(
576                            target: &log_target,
577                            "ignoring error callback while shutdown is in progress"
578                        );
579                        return Ok(());
580                    }
581                    let context = Arc::new(context);
582                    for handle in plugin_handles.iter() {
583                        if let Err(err) = handle
584                            .plugin
585                            .on_error(thread_id, clickhouse.clone(), context.as_ref())
586                            .await
587                        {
588                            log::error!(
589                                target: &log_target,
590                                "plugin {} on_error error: {}",
591                                handle.name,
592                                err
593                            );
594                        }
595                    }
596                    Ok(())
597                }
598                .boxed()
599            }
600        };
601
602        let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
603
604        let total_slot_count_capture = total_slot_count;
605        let run_origin = std::time::Instant::now();
606        // Reset global rate snapshot for a new run.
607        SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
608        LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
609        LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
610        LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
611        let stats_tracking = clickhouse.clone().map(|_db| {
612            let shutting_down = shutting_down.clone();
613            let thread_progress_max: Arc<DashMap<usize, f64>> = Arc::new(DashMap::new());
614            StatsTracking {
615        on_stats: {
616            let thread_progress_max = thread_progress_max.clone();
617            let total_slot_count = total_slot_count_capture;
618            move |thread_id: usize, stats: Stats| {
619                let shutting_down = shutting_down.clone();
620                let thread_progress_max = thread_progress_max.clone();
621                async move {
622                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
623                    if shutting_down.load(Ordering::SeqCst) {
624                                log::debug!(
625                                    target: &log_target,
626                                    "skipping stats write during shutdown"
627                                );
628                                return Ok(());
629                            }
630                            let finish_at = stats
631                                .finish_time
632                                .unwrap_or_else(std::time::Instant::now);
633                            let elapsed_since_start = finish_at
634                                .saturating_duration_since(stats.start_time)
635                                .as_nanos()
636                                .max(1) as u64;
637                            let total_slots = stats.slots_processed;
638                            let total_txs = stats.transactions_processed;
639                            let now_ns = monotonic_nanos_since(run_origin);
640                            // Serialize snapshot updates so every pulse measures deltas from the
641                            // previous pulse (regardless of which thread emitted it) using a
642                            // monotonic clock shared across threads.
643                            let (delta_slots, delta_txs, delta_time_ns) = {
644                                while SNAPSHOT_LOCK
645                                    .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
646                                    .is_err()
647                                {
648                                    hint::spin_loop();
649                                }
650                                let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
651                                let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
652                                let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
653                                LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
654                                LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
655                                LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
656                                SNAPSHOT_LOCK.store(false, Ordering::Release);
657                                let delta_slots = total_slots.saturating_sub(prev_slots);
658                                let delta_txs = total_txs.saturating_sub(prev_txs);
659                                let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
660                                (delta_slots, delta_txs, delta_time_ns)
661                            };
662                            let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
663                            let mut slot_rate = delta_slots as f64 / delta_secs;
664                            let mut tps = delta_txs as f64 / delta_secs;
665                            if slot_rate <= 0.0 && total_slots > 0 {
666                                slot_rate =
667                                    total_slots as f64 / (elapsed_since_start as f64 / 1e9);
668                            }
669                            if tps <= 0.0 && total_txs > 0 {
670                                tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
671                            }
672                            let thread_stats = &stats.thread_stats;
673                            let processed_slots = stats.slots_processed.min(total_slot_count);
674                            let progress_fraction = if total_slot_count > 0 {
675                                processed_slots as f64 / total_slot_count as f64
676                            } else {
677                                1.0
678                            };
679                            let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
680                            let thread_total_slots = thread_stats
681                                .initial_slot_range
682                                .end
683                                .saturating_sub(thread_stats.initial_slot_range.start);
684                            let thread_progress_raw = if thread_total_slots > 0 {
685                                (thread_stats.slots_processed as f64 / thread_total_slots as f64)
686                                    .clamp(0.0, 1.0)
687                                    * 100.0
688                            } else {
689                                100.0
690                            };
691                            let thread_progress = *thread_progress_max
692                                .entry(thread_id)
693                                .and_modify(|max| {
694                                    if thread_progress_raw > *max {
695                                        *max = thread_progress_raw;
696                                    }
697                                })
698                                .or_insert(thread_progress_raw);
699                            let mut overall_eta = None;
700                            if slot_rate > 0.0 {
701                                let remaining_slots =
702                                    total_slot_count.saturating_sub(processed_slots);
703                                overall_eta = Some(human_readable_duration(
704                                    remaining_slots as f64 / slot_rate,
705                                ));
706                            }
707                            if overall_eta.is_none() {
708                                if progress_fraction > 0.0 && progress_fraction < 1.0 {
709                                    if let Some(elapsed_total) = finish_at
710                                        .checked_duration_since(stats.start_time)
711                                        .map(|d| d.as_secs_f64())
712                                        && elapsed_total > 0.0 {
713                                            let remaining_secs =
714                                                elapsed_total * (1.0 / progress_fraction - 1.0);
715                                            overall_eta = Some(human_readable_duration(remaining_secs));
716                                        }
717                                } else if progress_fraction >= 1.0 {
718                                    overall_eta = Some("0s".into());
719                                }
720                            }
721                            let slots_display = human_readable_count(processed_slots);
722                            let blocks_display = human_readable_count(stats.blocks_processed);
723                            let txs_display = human_readable_count(stats.transactions_processed);
724                            let tps_display = human_readable_count(tps.ceil() as u64);
725                            log::info!(
726                                target: &log_target,
727                                "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
728                                overall_eta.unwrap_or_else(|| "n/a".into()),
729                            );
730                            Ok(())
731                        }
732                        .boxed()
733                    }
734                },
735                tracking_interval_slots: 100,
736            }
737        });
738
739        let (shutdown_tx, _) = broadcast::channel::<()>(1);
740
741        let mut firehose_future = Box::pin(firehose(
742            self.num_threads as u64,
743            slot_range,
744            Some(on_block),
745            Some(on_transaction),
746            Some(on_entry),
747            Some(on_reward),
748            Some(on_error),
749            stats_tracking,
750            Some(shutdown_tx.subscribe()),
751        ));
752
753        let firehose_result = tokio::select! {
754            res = &mut firehose_future => res,
755            ctrl = signal::ctrl_c() => {
756                match ctrl {
757                    Ok(()) => log::info!(
758                        target: LOG_MODULE,
759                        "CTRL+C received; initiating shutdown"
760                    ),
761                    Err(err) => log::error!(
762                        target: LOG_MODULE,
763                        "failed to listen for CTRL+C: {}",
764                        err
765                    ),
766                }
767                shutting_down.store(true, Ordering::SeqCst);
768                let _ = shutdown_tx.send(());
769                firehose_future.await
770            }
771        };
772
773        if clickhouse_enabled
774            && let Some(db_client) = clickhouse.clone()
775            && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
776        {
777            log::error!(
778                target: LOG_MODULE,
779                "failed to flush buffered plugin slots: {}",
780                err
781            );
782        }
783
784        for handle in plugin_handles.iter() {
785            if let Err(error) = handle
786                .plugin
787                .on_exit(clickhouse.clone())
788                .await
789                .map_err(|e| e.to_string())
790            {
791                log::error!(
792                    target: LOG_MODULE,
793                    "plugin {} on_exit error: {}",
794                    handle.name,
795                    error
796                );
797            }
798        }
799
800        match firehose_result {
801            Ok(()) => Ok(()),
802            Err((error, slot)) => Err(PluginRunnerError::Firehose {
803                details: error.to_string(),
804                slot,
805            }),
806        }
807    }
808}
809
810/// Errors that can arise while running plugins against the firehose.
811#[derive(Debug, Error)]
812pub enum PluginRunnerError {
813    /// ClickHouse client returned an error.
814    #[error("clickhouse error: {0}")]
815    Clickhouse(#[from] clickhouse::error::Error),
816    /// Firehose streaming failed at the specified slot.
817    #[error("firehose error at slot {slot}: {details}")]
818    Firehose {
819        /// Human-readable description of the firehose failure.
820        details: String,
821        /// Slot where the firehose encountered the error.
822        slot: u64,
823    },
824    /// Lifecycle hook on a plugin returned an error.
825    #[error("plugin {plugin} failed during {stage}: {details}")]
826    PluginLifecycle {
827        /// Name of the plugin that failed.
828        plugin: &'static str,
829        /// Lifecycle stage where the failure occurred.
830        stage: &'static str,
831        /// Textual error details.
832        details: String,
833    },
834}
835
836#[derive(Clone)]
837struct PluginHandle {
838    plugin: Arc<dyn Plugin>,
839    id: u16,
840    name: &'static str,
841    version: u16,
842}
843
844impl From<Arc<dyn Plugin>> for PluginHandle {
845    fn from(plugin: Arc<dyn Plugin>) -> Self {
846        let id = plugin.id();
847        let name = plugin.name();
848        let version = plugin.version();
849        Self {
850            plugin,
851            id,
852            name,
853            version,
854        }
855    }
856}
857
858#[derive(Row, Serialize)]
859struct PluginRow<'a> {
860    id: u32,
861    name: &'a str,
862    version: u32,
863}
864
865#[derive(Row, Serialize)]
866struct PluginSlotRow {
867    plugin_id: u32,
868    slot: u64,
869}
870
871#[derive(Row, Serialize)]
872struct SlotStatusRow {
873    slot: u64,
874    transaction_count: u32,
875    thread_id: u8,
876    block_time: u32,
877}
878
879async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
880    db.query(
881        r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
882            slot UInt64,
883            transaction_count UInt32 DEFAULT 0,
884            thread_id UInt8 DEFAULT 0,
885            block_time DateTime('UTC') DEFAULT toDateTime(0),
886            indexed_at DateTime('UTC') DEFAULT now()
887        ) ENGINE = ReplacingMergeTree(indexed_at)
888        ORDER BY slot"#,
889    )
890    .execute()
891    .await?;
892
893    db.query(
894        r#"ALTER TABLE jetstreamer_slot_status
895           ADD COLUMN IF NOT EXISTS block_time DateTime('UTC') DEFAULT toDateTime(0)"#,
896    )
897    .execute()
898    .await?;
899
900    db.query(
901        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
902            id UInt32,
903            name String,
904            version UInt32
905        ) ENGINE = ReplacingMergeTree
906        ORDER BY id"#,
907    )
908    .execute()
909    .await?;
910
911    db.query(
912        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
913            plugin_id UInt32,
914            slot UInt64,
915            indexed_at DateTime('UTC') DEFAULT now()
916        ) ENGINE = ReplacingMergeTree
917        ORDER BY (plugin_id, slot)"#,
918    )
919    .execute()
920    .await?;
921
922    Ok(())
923}
924
925async fn upsert_plugins(
926    db: &Client,
927    plugins: &[PluginHandle],
928) -> Result<(), clickhouse::error::Error> {
929    if plugins.is_empty() {
930        return Ok(());
931    }
932    let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
933    for handle in plugins {
934        insert
935            .write(&PluginRow {
936                id: handle.id as u32,
937                name: handle.name,
938                version: handle.version as u32,
939            })
940            .await?;
941    }
942    insert.end().await?;
943    Ok(())
944}
945
946async fn record_plugin_slot(
947    db: Arc<Client>,
948    plugin_id: u16,
949    slot: u64,
950) -> Result<(), clickhouse::error::Error> {
951    let mut insert = db
952        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
953        .await?;
954    insert
955        .write(&PluginSlotRow {
956            plugin_id: plugin_id as u32,
957            slot,
958        })
959        .await?;
960    insert.end().await?;
961    Ok(())
962}
963
964async fn flush_slot_buffer(
965    db: Arc<Client>,
966    buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
967) -> Result<(), clickhouse::error::Error> {
968    let mut rows = Vec::new();
969    buffer.iter_mut().for_each(|mut entry| {
970        if !entry.value().is_empty() {
971            rows.append(entry.value_mut());
972        }
973    });
974
975    if rows.is_empty() {
976        return Ok(());
977    }
978
979    let mut insert = db
980        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
981        .await?;
982    for row in rows {
983        insert.write(&row).await?;
984    }
985    insert.end().await?;
986    Ok(())
987}
988
989async fn record_slot_status(
990    db: Arc<Client>,
991    slot: u64,
992    thread_id: usize,
993    transaction_count: u64,
994    block_time: Option<i64>,
995) -> Result<(), clickhouse::error::Error> {
996    let mut insert = db
997        .insert::<SlotStatusRow>("jetstreamer_slot_status")
998        .await?;
999    insert
1000        .write(&SlotStatusRow {
1001            slot,
1002            transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1003            thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1004            block_time: clamp_block_time(block_time),
1005        })
1006        .await?;
1007    insert.end().await?;
1008    Ok(())
1009}
1010
1011fn clamp_block_time(block_time: Option<i64>) -> u32 {
1012    match block_time {
1013        Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1014        Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1015        Some(ts) if ts < 0 => 0,
1016        _ => 0,
1017    }
1018}
1019
1020// Ensure PluginRunnerError is Send + Sync + 'static
1021trait _CanSend: Send + Sync + 'static {}
1022impl _CanSend for PluginRunnerError {}
1023
1024#[inline]
1025fn human_readable_count(value: impl Into<u128>) -> String {
1026    let digits = value.into().to_string();
1027    let len = digits.len();
1028    let mut formatted = String::with_capacity(len + len / 3);
1029    for (idx, byte) in digits.bytes().enumerate() {
1030        if idx != 0 && (len - idx) % 3 == 0 {
1031            formatted.push(',');
1032        }
1033        formatted.push(char::from(byte));
1034    }
1035    formatted
1036}
1037
1038fn human_readable_duration(seconds: f64) -> String {
1039    if !seconds.is_finite() {
1040        return "n/a".into();
1041    }
1042    if seconds <= 0.0 {
1043        return "0s".into();
1044    }
1045    if seconds < 60.0 {
1046        return format!("{:.1}s", seconds);
1047    }
1048    let duration = Duration::from_secs(seconds.round() as u64);
1049    let secs = duration.as_secs();
1050    let days = secs / 86_400;
1051    let hours = (secs % 86_400) / 3_600;
1052    let minutes = (secs % 3_600) / 60;
1053    let seconds_rem = secs % 60;
1054    if days > 0 {
1055        if hours > 0 {
1056            format!("{}d{}h", days, hours)
1057        } else {
1058            format!("{}d", days)
1059        }
1060    } else if hours > 0 {
1061        format!("{}h{}m", hours, minutes)
1062    } else {
1063        format!("{}m{}s", minutes, seconds_rem)
1064    }
1065}