Skip to main content

jetstreamer_plugin/
lib.rs

1#![deny(missing_docs)]
2//! Trait-based framework for building structured observers on top of Jetstreamer's firehose.
3//!
4//! # Overview
5//! Plugins let you react to every block, transaction, reward, entry, and stats update emitted
6//! by [`jetstreamer_firehose`](https://crates.io/crates/jetstreamer-firehose). Combined with
7//! the
8//! [`JetstreamerRunner`](https://docs.rs/jetstreamer/latest/jetstreamer/struct.JetstreamerRunner.html),
9//! they provide a high-throughput analytics pipeline capable of exceeding 2.7 million
10//! transactions per second on the right hardware. All events originate from Old Faithful's CAR
11//! archive and are streamed over the network into your local runner.
12//!
13//! The framework offers:
14//! - A [`Plugin`] trait with async hook points for each data type.
15//! - [`PluginRunner`] for coordinating multiple plugins with shared ClickHouse connections
16//!   (used internally by `JetstreamerRunner`).
17//! - Built-in plugins under [`plugins`] that demonstrate common batching strategies and
18//!   metrics.
19//! - See `JetstreamerRunner` in the `jetstreamer` crate for the easiest way to run plugins.
20//!
21//! # ClickHouse Integration
22//! Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances
23//! honor the following environment variables:
24//! - `JETSTREAMER_CLICKHOUSE_DSN` (default `http://localhost:8123`): HTTP(S) DSN handed to
25//!   every plugin that requests a database handle.
26//! - `JETSTREAMER_CLICKHOUSE_MODE` (default `auto`): toggles the bundled ClickHouse helper.
27//!   Set to `remote` to opt out of spawning the helper while still writing to a cluster,
28//!   `local` to always spawn, or `off` to disable ClickHouse entirely.
29//!
30//! When the mode is `auto`, Jetstreamer inspects the DSN at runtime and only launches the
31//! embedded helper for local endpoints, enabling native clustering workflows out of the box.
32//!
33//! # Batching ClickHouse Writes
34//! ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large
35//! numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a
36//! cadence that matches their workload. The default [`PluginRunner`] configuration triggers
37//! stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the
38//! database. The bundled [`plugins::program_tracking::ProgramTrackingPlugin`] mirrors this
39//! approach by accumulating `ProgramEvent` rows per worker thread and issuing a single batch
40//! insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive
41//! even under peak throughput.
42//!
43//! # Ordering Guarantees
44//! Also note that because Jetstreamer spawns parallel threads that process different subranges of
45//! the overall slot range at the same time, while each thread sees a purely sequential view of
46//! transactions, downstream services such as databases that consume this data will see writes in a
47//! fairly arbitrary order, so you should design your database tables and shared data structures
48//! accordingly.
49//!
50//! # Examples
51//! ## Defining a Plugin
52//! ```no_run
53//! use std::sync::Arc;
54//! use clickhouse::Client;
55//! use futures_util::FutureExt;
56//! use jetstreamer_firehose::firehose::TransactionData;
57//! use jetstreamer_plugin::{Plugin, PluginFuture};
58//!
59//! struct CountingPlugin;
60//!
61//! impl Plugin for CountingPlugin {
62//!     fn name(&self) -> &'static str { "counting" }
63//!
64//!     fn on_transaction<'a>(
65//!         &'a self,
66//!         _thread_id: usize,
67//!         _db: Option<Arc<Client>>,
68//!         transaction: &'a TransactionData,
69//!     ) -> PluginFuture<'a> {
70//!         async move {
71//!             println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
72//!             Ok(())
73//!         }
74//!         .boxed()
75//!     }
76//! }
77//! # let _plugin = CountingPlugin;
78//! ```
79//!
80//! ## Running Plugins with `PluginRunner`
81//! ```no_run
82//! use std::sync::Arc;
83//! use jetstreamer_firehose::epochs;
84//! use jetstreamer_plugin::{Plugin, PluginRunner};
85//!
86//! struct LoggingPlugin;
87//!
88//! impl Plugin for LoggingPlugin {
89//!     fn name(&self) -> &'static str { "logging" }
90//! }
91//!
92//! #[tokio::main]
93//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
94//!     let mut runner = PluginRunner::new("http://localhost:8123", 1);
95//!     runner.register(Box::new(LoggingPlugin));
96//!     let runner = Arc::new(runner);
97//!
98//!     let (start, _) = epochs::epoch_to_slot_range(800);
99//!     let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
100//!     runner
101//!         .clone()
102//!         .run(start..(end_inclusive + 1), false)
103//!         .await?;
104//!     Ok(())
105//! }
106//! ```
107
108/// Built-in plugin implementations that ship with Jetstreamer.
109pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114    fmt::Display,
115    future::Future,
116    hint,
117    ops::Range,
118    pin::Pin,
119    sync::{
120        Arc,
121        atomic::{AtomicBool, AtomicU64, Ordering},
122    },
123    time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130    BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137use url::Url;
138
139/// Re-exported statistics types produced by [`firehose`].
140pub use jetstreamer_firehose::firehose::{
141    FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
142};
143
144// Global totals snapshot used to compute overall TPS/ETA between pulses.
145static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
147static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
148static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
149#[inline]
150fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
151    origin.elapsed().as_nanos() as u64
152}
153
154/// Convenience alias for the boxed future returned by plugin hooks.
155pub type PluginFuture<'a> = Pin<
156    Box<
157        dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
158            + Send
159            + 'a,
160    >,
161>;
162
163/// Trait implemented by plugins that consume firehose events.
164///
165/// See the crate-level documentation for usage examples.
166pub trait Plugin: Send + Sync + 'static {
167    /// Human-friendly plugin name used in logs and persisted metadata.
168    fn name(&self) -> &'static str;
169
170    /// Semantic version for the plugin; defaults to `1`.
171    fn version(&self) -> u16 {
172        1
173    }
174
175    /// Deterministic identifier derived from [`Plugin::name`].
176    fn id(&self) -> u16 {
177        let hash = Sha256::digest(self.name());
178        let mut res = 1u16;
179        for byte in hash {
180            res = res.wrapping_mul(31).wrapping_add(byte as u16);
181        }
182        res
183    }
184
185    /// Called for every transaction seen by the firehose.
186    fn on_transaction<'a>(
187        &'a self,
188        _thread_id: usize,
189        _db: Option<Arc<Client>>,
190        _transaction: &'a TransactionData,
191    ) -> PluginFuture<'a> {
192        async move { Ok(()) }.boxed()
193    }
194
195    /// Called for every block observed by the firehose.
196    fn on_block<'a>(
197        &'a self,
198        _thread_id: usize,
199        _db: Option<Arc<Client>>,
200        _block: &'a BlockData,
201    ) -> PluginFuture<'a> {
202        async move { Ok(()) }.boxed()
203    }
204
205    /// Called for every entry observed by the firehose when entry notifications are enabled.
206    fn on_entry<'a>(
207        &'a self,
208        _thread_id: usize,
209        _db: Option<Arc<Client>>,
210        _entry: &'a EntryData,
211    ) -> PluginFuture<'a> {
212        async move { Ok(()) }.boxed()
213    }
214
215    /// Called for reward updates associated with processed blocks.
216    fn on_reward<'a>(
217        &'a self,
218        _thread_id: usize,
219        _db: Option<Arc<Client>>,
220        _reward: &'a RewardsData,
221    ) -> PluginFuture<'a> {
222        async move { Ok(()) }.boxed()
223    }
224
225    /// Called whenever a firehose thread encounters an error before restarting.
226    fn on_error<'a>(
227        &'a self,
228        _thread_id: usize,
229        _db: Option<Arc<Client>>,
230        _error: &'a FirehoseErrorContext,
231    ) -> PluginFuture<'a> {
232        async move { Ok(()) }.boxed()
233    }
234
235    /// Invoked once before the firehose starts streaming events.
236    fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
237        async move { Ok(()) }.boxed()
238    }
239
240    /// Invoked once after the firehose finishes or shuts down.
241    fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
242        async move { Ok(()) }.boxed()
243    }
244}
245
246/// Coordinates plugin execution and ClickHouse persistence.
247///
248/// See the crate-level documentation for usage examples.
249#[derive(Clone)]
250pub struct PluginRunner {
251    plugins: Arc<Vec<Arc<dyn Plugin>>>,
252    clickhouse_dsn: String,
253    num_threads: usize,
254    db_update_interval_slots: u64,
255}
256
257impl PluginRunner {
258    /// Creates a new runner that writes to `clickhouse_dsn` using `num_threads`.
259    pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self {
260        Self {
261            plugins: Arc::new(Vec::new()),
262            clickhouse_dsn: clickhouse_dsn.to_string(),
263            num_threads: std::cmp::max(1, num_threads),
264            db_update_interval_slots: 100,
265        }
266    }
267
268    /// Registers an additional plugin.
269    pub fn register(&mut self, plugin: Box<dyn Plugin>) {
270        Arc::get_mut(&mut self.plugins)
271            .expect("cannot register plugins after the runner has started")
272            .push(Arc::from(plugin));
273    }
274
275    /// Runs the firehose across the specified slot range, optionally writing to ClickHouse.
276    pub async fn run(
277        self: Arc<Self>,
278        slot_range: Range<u64>,
279        clickhouse_enabled: bool,
280    ) -> Result<(), PluginRunnerError> {
281        let db_update_interval = self.db_update_interval_slots.max(1);
282        let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
283            self.plugins
284                .iter()
285                .cloned()
286                .map(PluginHandle::from)
287                .collect(),
288        );
289
290        let clickhouse = if clickhouse_enabled {
291            let client = Arc::new(
292                build_clickhouse_client(&self.clickhouse_dsn)
293                    .with_option("async_insert", "1")
294                    .with_option("wait_for_async_insert", "0"),
295            );
296            ensure_clickhouse_tables(client.as_ref()).await?;
297            upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
298            Some(client)
299        } else {
300            None
301        };
302
303        for handle in plugin_handles.iter() {
304            if let Err(error) = handle
305                .plugin
306                .on_load(clickhouse.clone())
307                .await
308                .map_err(|e| e.to_string())
309            {
310                return Err(PluginRunnerError::PluginLifecycle {
311                    plugin: handle.name,
312                    stage: "on_load",
313                    details: error,
314                });
315            }
316        }
317
318        let shutting_down = Arc::new(AtomicBool::new(false));
319        let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
320        let clickhouse_enabled = clickhouse.is_some();
321        let slots_since_flush = Arc::new(AtomicU64::new(0));
322
323        let on_block = {
324            let plugin_handles = plugin_handles.clone();
325            let clickhouse = clickhouse.clone();
326            let slot_buffer = slot_buffer.clone();
327            let slots_since_flush = slots_since_flush.clone();
328            let shutting_down = shutting_down.clone();
329            move |thread_id: usize, block: BlockData| {
330                let plugin_handles = plugin_handles.clone();
331                let clickhouse = clickhouse.clone();
332                let slot_buffer = slot_buffer.clone();
333                let slots_since_flush = slots_since_flush.clone();
334                let shutting_down = shutting_down.clone();
335                async move {
336                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
337                    if shutting_down.load(Ordering::SeqCst) {
338                        log::debug!(
339                            target: &log_target,
340                            "ignoring block while shutdown is in progress"
341                        );
342                        return Ok(());
343                    }
344                    let block = Arc::new(block);
345                    if !plugin_handles.is_empty() {
346                        for handle in plugin_handles.iter() {
347                            let db = clickhouse.clone();
348                            if let Err(err) = handle
349                                .plugin
350                                .on_block(thread_id, db.clone(), block.as_ref())
351                                .await
352                            {
353                                log::error!(
354                                    target: &log_target,
355                                    "plugin {} on_block error: {}",
356                                    handle.name,
357                                    err
358                                );
359                                continue;
360                            }
361                            if let (Some(db_client), BlockData::Block { slot, .. }) =
362                                (clickhouse.clone(), block.as_ref())
363                            {
364                                if clickhouse_enabled {
365                                    slot_buffer
366                                        .entry(handle.id)
367                                        .or_default()
368                                        .push(PluginSlotRow {
369                                            plugin_id: handle.id as u32,
370                                            slot: *slot,
371                                        });
372                                } else if let Err(err) =
373                                    record_plugin_slot(db_client, handle.id, *slot).await
374                                {
375                                    log::error!(
376                                        target: &log_target,
377                                        "failed to record plugin slot for {}: {}",
378                                        handle.name,
379                                        err
380                                    );
381                                }
382                            }
383                        }
384                        if clickhouse_enabled {
385                            let current = slots_since_flush
386                                .fetch_add(1, Ordering::Relaxed)
387                                .wrapping_add(1);
388                            if current.is_multiple_of(db_update_interval)
389                                && let Some(db_client) = clickhouse.clone()
390                            {
391                                let buffer = slot_buffer.clone();
392                                let log_target_clone = log_target.clone();
393                                tokio::spawn(async move {
394                                    if let Err(err) = flush_slot_buffer(db_client, buffer).await {
395                                        log::error!(
396                                            target: &log_target_clone,
397                                            "failed to flush buffered plugin slots: {}",
398                                            err
399                                        );
400                                    }
401                                });
402                            }
403                        }
404                    }
405                    if let Some(db_client) = clickhouse.clone() {
406                        match block.as_ref() {
407                            BlockData::Block {
408                                slot,
409                                executed_transaction_count,
410                                block_time,
411                                ..
412                            } => {
413                                let tally = take_slot_tx_tally(*slot);
414                                if let Err(err) = record_slot_status(
415                                    db_client,
416                                    *slot,
417                                    thread_id,
418                                    *executed_transaction_count,
419                                    tally.votes,
420                                    tally.non_votes,
421                                    *block_time,
422                                )
423                                .await
424                                {
425                                    log::error!(
426                                        target: &log_target,
427                                        "failed to record slot status: {}",
428                                        err
429                                    );
430                                }
431                            }
432                            BlockData::PossibleLeaderSkipped { slot } => {
433                                // Drop any tallies that may exist for skipped slots.
434                                take_slot_tx_tally(*slot);
435                            }
436                        }
437                    }
438                    Ok(())
439                }
440                .boxed()
441            }
442        };
443
444        let on_transaction = {
445            let plugin_handles = plugin_handles.clone();
446            let clickhouse = clickhouse.clone();
447            let shutting_down = shutting_down.clone();
448            move |thread_id: usize, transaction: TransactionData| {
449                let plugin_handles = plugin_handles.clone();
450                let clickhouse = clickhouse.clone();
451                let shutting_down = shutting_down.clone();
452                async move {
453                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
454                    record_slot_vote_tally(transaction.slot, transaction.is_vote);
455                    if plugin_handles.is_empty() {
456                        return Ok(());
457                    }
458                    if shutting_down.load(Ordering::SeqCst) {
459                        log::debug!(
460                            target: &log_target,
461                            "ignoring transaction while shutdown is in progress"
462                        );
463                        return Ok(());
464                    }
465                    let transaction = Arc::new(transaction);
466                    for handle in plugin_handles.iter() {
467                        if let Err(err) = handle
468                            .plugin
469                            .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref())
470                            .await
471                        {
472                            log::error!(
473                                target: &log_target,
474                                "plugin {} on_transaction error: {}",
475                                handle.name,
476                                err
477                            );
478                        }
479                    }
480                    Ok(())
481                }
482                .boxed()
483            }
484        };
485
486        let on_entry = {
487            let plugin_handles = plugin_handles.clone();
488            let clickhouse = clickhouse.clone();
489            let shutting_down = shutting_down.clone();
490            move |thread_id: usize, entry: EntryData| {
491                let plugin_handles = plugin_handles.clone();
492                let clickhouse = clickhouse.clone();
493                let shutting_down = shutting_down.clone();
494                async move {
495                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
496                    if plugin_handles.is_empty() {
497                        return Ok(());
498                    }
499                    if shutting_down.load(Ordering::SeqCst) {
500                        log::debug!(
501                            target: &log_target,
502                            "ignoring entry while shutdown is in progress"
503                        );
504                        return Ok(());
505                    }
506                    let entry = Arc::new(entry);
507                    for handle in plugin_handles.iter() {
508                        if let Err(err) = handle
509                            .plugin
510                            .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
511                            .await
512                        {
513                            log::error!(
514                                target: &log_target,
515                                "plugin {} on_entry error: {}",
516                                handle.name,
517                                err
518                            );
519                        }
520                    }
521                    Ok(())
522                }
523                .boxed()
524            }
525        };
526
527        let on_reward = {
528            let plugin_handles = plugin_handles.clone();
529            let clickhouse = clickhouse.clone();
530            let shutting_down = shutting_down.clone();
531            move |thread_id: usize, reward: RewardsData| {
532                let plugin_handles = plugin_handles.clone();
533                let clickhouse = clickhouse.clone();
534                let shutting_down = shutting_down.clone();
535                async move {
536                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
537                    if plugin_handles.is_empty() {
538                        return Ok(());
539                    }
540                    if shutting_down.load(Ordering::SeqCst) {
541                        log::debug!(
542                            target: &log_target,
543                            "ignoring reward while shutdown is in progress"
544                        );
545                        return Ok(());
546                    }
547                    let reward = Arc::new(reward);
548                    for handle in plugin_handles.iter() {
549                        if let Err(err) = handle
550                            .plugin
551                            .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
552                            .await
553                        {
554                            log::error!(
555                                target: &log_target,
556                                "plugin {} on_reward error: {}",
557                                handle.name,
558                                err
559                            );
560                        }
561                    }
562                    Ok(())
563                }
564                .boxed()
565            }
566        };
567
568        let on_error = {
569            let plugin_handles = plugin_handles.clone();
570            let clickhouse = clickhouse.clone();
571            let shutting_down = shutting_down.clone();
572            move |thread_id: usize, context: FirehoseErrorContext| {
573                let plugin_handles = plugin_handles.clone();
574                let clickhouse = clickhouse.clone();
575                let shutting_down = shutting_down.clone();
576                async move {
577                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
578                    if plugin_handles.is_empty() {
579                        return Ok(());
580                    }
581                    if shutting_down.load(Ordering::SeqCst) {
582                        log::debug!(
583                            target: &log_target,
584                            "ignoring error callback while shutdown is in progress"
585                        );
586                        return Ok(());
587                    }
588                    let context = Arc::new(context);
589                    for handle in plugin_handles.iter() {
590                        if let Err(err) = handle
591                            .plugin
592                            .on_error(thread_id, clickhouse.clone(), context.as_ref())
593                            .await
594                        {
595                            log::error!(
596                                target: &log_target,
597                                "plugin {} on_error error: {}",
598                                handle.name,
599                                err
600                            );
601                        }
602                    }
603                    Ok(())
604                }
605                .boxed()
606            }
607        };
608
609        let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
610
611        let total_slot_count_capture = total_slot_count;
612        let run_origin = std::time::Instant::now();
613        // Reset global rate snapshot for a new run.
614        SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
615        LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
616        LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
617        LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
618        let stats_tracking = clickhouse.clone().map(|_db| {
619            let shutting_down = shutting_down.clone();
620            let thread_progress_max: Arc<DashMap<usize, f64>> = Arc::new(DashMap::new());
621            StatsTracking {
622        on_stats: {
623            let thread_progress_max = thread_progress_max.clone();
624            let total_slot_count = total_slot_count_capture;
625            move |thread_id: usize, stats: Stats| {
626                let shutting_down = shutting_down.clone();
627                let thread_progress_max = thread_progress_max.clone();
628                async move {
629                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
630                    if shutting_down.load(Ordering::SeqCst) {
631                                log::debug!(
632                                    target: &log_target,
633                                    "skipping stats write during shutdown"
634                                );
635                                return Ok(());
636                            }
637                            let finish_at = stats
638                                .finish_time
639                                .unwrap_or_else(std::time::Instant::now);
640                            let elapsed_since_start = finish_at
641                                .saturating_duration_since(stats.start_time)
642                                .as_nanos()
643                                .max(1) as u64;
644                            let total_slots = stats.slots_processed;
645                            let total_txs = stats.transactions_processed;
646                            let now_ns = monotonic_nanos_since(run_origin);
647                            // Serialize snapshot updates so every pulse measures deltas from the
648                            // previous pulse (regardless of which thread emitted it) using a
649                            // monotonic clock shared across threads.
650                            let (delta_slots, delta_txs, delta_time_ns) = {
651                                while SNAPSHOT_LOCK
652                                    .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
653                                    .is_err()
654                                {
655                                    hint::spin_loop();
656                                }
657                                let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
658                                let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
659                                let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
660                                LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
661                                LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
662                                LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
663                                SNAPSHOT_LOCK.store(false, Ordering::Release);
664                                let delta_slots = total_slots.saturating_sub(prev_slots);
665                                let delta_txs = total_txs.saturating_sub(prev_txs);
666                                let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
667                                (delta_slots, delta_txs, delta_time_ns)
668                            };
669                            let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
670                            let mut slot_rate = delta_slots as f64 / delta_secs;
671                            let mut tps = delta_txs as f64 / delta_secs;
672                            if slot_rate <= 0.0 && total_slots > 0 {
673                                slot_rate =
674                                    total_slots as f64 / (elapsed_since_start as f64 / 1e9);
675                            }
676                            if tps <= 0.0 && total_txs > 0 {
677                                tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
678                            }
679                            let thread_stats = &stats.thread_stats;
680                            let processed_slots = stats.slots_processed.min(total_slot_count);
681                            let progress_fraction = if total_slot_count > 0 {
682                                processed_slots as f64 / total_slot_count as f64
683                            } else {
684                                1.0
685                            };
686                            let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
687                            let thread_total_slots = thread_stats
688                                .initial_slot_range
689                                .end
690                                .saturating_sub(thread_stats.initial_slot_range.start);
691                            let thread_progress_raw = if thread_total_slots > 0 {
692                                (thread_stats.slots_processed as f64 / thread_total_slots as f64)
693                                    .clamp(0.0, 1.0)
694                                    * 100.0
695                            } else {
696                                100.0
697                            };
698                            let thread_progress = *thread_progress_max
699                                .entry(thread_id)
700                                .and_modify(|max| {
701                                    if thread_progress_raw > *max {
702                                        *max = thread_progress_raw;
703                                    }
704                                })
705                                .or_insert(thread_progress_raw);
706                            let mut overall_eta = None;
707                            if slot_rate > 0.0 {
708                                let remaining_slots =
709                                    total_slot_count.saturating_sub(processed_slots);
710                                overall_eta = Some(human_readable_duration(
711                                    remaining_slots as f64 / slot_rate,
712                                ));
713                            }
714                            if overall_eta.is_none() {
715                                if progress_fraction > 0.0 && progress_fraction < 1.0 {
716                                    if let Some(elapsed_total) = finish_at
717                                        .checked_duration_since(stats.start_time)
718                                        .map(|d| d.as_secs_f64())
719                                        && elapsed_total > 0.0 {
720                                            let remaining_secs =
721                                                elapsed_total * (1.0 / progress_fraction - 1.0);
722                                            overall_eta = Some(human_readable_duration(remaining_secs));
723                                        }
724                                } else if progress_fraction >= 1.0 {
725                                    overall_eta = Some("0s".into());
726                                }
727                            }
728                            let slots_display = human_readable_count(processed_slots);
729                            let blocks_display = human_readable_count(stats.blocks_processed);
730                            let txs_display = human_readable_count(stats.transactions_processed);
731                            let tps_display = human_readable_count(tps.ceil() as u64);
732                            log::info!(
733                                target: &log_target,
734                                "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
735                                overall_eta.unwrap_or_else(|| "n/a".into()),
736                            );
737                            Ok(())
738                        }
739                        .boxed()
740                    }
741                },
742                tracking_interval_slots: 100,
743            }
744        });
745
746        let (shutdown_tx, _) = broadcast::channel::<()>(1);
747
748        let mut firehose_future = Box::pin(firehose(
749            self.num_threads as u64,
750            slot_range,
751            Some(on_block),
752            Some(on_transaction),
753            Some(on_entry),
754            Some(on_reward),
755            Some(on_error),
756            stats_tracking,
757            Some(shutdown_tx.subscribe()),
758        ));
759
760        let firehose_result = tokio::select! {
761            res = &mut firehose_future => res,
762            ctrl = signal::ctrl_c() => {
763                match ctrl {
764                    Ok(()) => log::info!(
765                        target: LOG_MODULE,
766                        "CTRL+C received; initiating shutdown"
767                    ),
768                    Err(err) => log::error!(
769                        target: LOG_MODULE,
770                        "failed to listen for CTRL+C: {}",
771                        err
772                    ),
773                }
774                shutting_down.store(true, Ordering::SeqCst);
775                let _ = shutdown_tx.send(());
776                firehose_future.await
777            }
778        };
779
780        if clickhouse_enabled
781            && let Some(db_client) = clickhouse.clone()
782            && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
783        {
784            log::error!(
785                target: LOG_MODULE,
786                "failed to flush buffered plugin slots: {}",
787                err
788            );
789        }
790
791        for handle in plugin_handles.iter() {
792            if let Err(error) = handle
793                .plugin
794                .on_exit(clickhouse.clone())
795                .await
796                .map_err(|e| e.to_string())
797            {
798                log::error!(
799                    target: LOG_MODULE,
800                    "plugin {} on_exit error: {}",
801                    handle.name,
802                    error
803                );
804            }
805        }
806
807        match firehose_result {
808            Ok(()) => Ok(()),
809            Err((error, slot)) => Err(PluginRunnerError::Firehose {
810                details: error.to_string(),
811                slot,
812            }),
813        }
814    }
815}
816
817fn build_clickhouse_client(dsn: &str) -> Client {
818    let mut client = Client::default();
819    if let Ok(mut url) = Url::parse(dsn) {
820        let username = url.username().to_string();
821        let password = url.password().map(|value| value.to_string());
822        if !username.is_empty() || password.is_some() {
823            let _ = url.set_username("");
824            let _ = url.set_password(None);
825        }
826        client = client.with_url(url.as_str());
827        if !username.is_empty() {
828            client = client.with_user(username);
829        }
830        if let Some(password) = password {
831            client = client.with_password(password);
832        }
833    } else {
834        client = client.with_url(dsn);
835    }
836    client
837}
838
839/// Errors that can arise while running plugins against the firehose.
840#[derive(Debug, Error)]
841pub enum PluginRunnerError {
842    /// ClickHouse client returned an error.
843    #[error("clickhouse error: {0}")]
844    Clickhouse(#[from] clickhouse::error::Error),
845    /// Firehose streaming failed at the specified slot.
846    #[error("firehose error at slot {slot}: {details}")]
847    Firehose {
848        /// Human-readable description of the firehose failure.
849        details: String,
850        /// Slot where the firehose encountered the error.
851        slot: u64,
852    },
853    /// Lifecycle hook on a plugin returned an error.
854    #[error("plugin {plugin} failed during {stage}: {details}")]
855    PluginLifecycle {
856        /// Name of the plugin that failed.
857        plugin: &'static str,
858        /// Lifecycle stage where the failure occurred.
859        stage: &'static str,
860        /// Textual error details.
861        details: String,
862    },
863}
864
865#[derive(Clone)]
866struct PluginHandle {
867    plugin: Arc<dyn Plugin>,
868    id: u16,
869    name: &'static str,
870    version: u16,
871}
872
873impl From<Arc<dyn Plugin>> for PluginHandle {
874    fn from(plugin: Arc<dyn Plugin>) -> Self {
875        let id = plugin.id();
876        let name = plugin.name();
877        let version = plugin.version();
878        Self {
879            plugin,
880            id,
881            name,
882            version,
883        }
884    }
885}
886
887#[derive(Row, Serialize)]
888struct PluginRow<'a> {
889    id: u32,
890    name: &'a str,
891    version: u32,
892}
893
894#[derive(Row, Serialize)]
895struct PluginSlotRow {
896    plugin_id: u32,
897    slot: u64,
898}
899
900#[derive(Row, Serialize)]
901struct SlotStatusRow {
902    slot: u64,
903    transaction_count: u32,
904    vote_transaction_count: u32,
905    non_vote_transaction_count: u32,
906    thread_id: u8,
907    block_time: u32,
908}
909
910#[derive(Default, Clone, Copy)]
911struct SlotTxTally {
912    votes: u64,
913    non_votes: u64,
914}
915
916static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally>> = Lazy::new(DashMap::new);
917
918async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
919    db.query(
920        r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
921            slot UInt64,
922            transaction_count UInt32 DEFAULT 0,
923            vote_transaction_count UInt32 DEFAULT 0,
924            non_vote_transaction_count UInt32 DEFAULT 0,
925            thread_id UInt8 DEFAULT 0,
926            block_time DateTime('UTC') DEFAULT toDateTime(0),
927            indexed_at DateTime('UTC') DEFAULT now()
928        ) ENGINE = ReplacingMergeTree(indexed_at)
929        ORDER BY slot"#,
930    )
931    .execute()
932    .await?;
933
934    db.query(
935        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
936            id UInt32,
937            name String,
938            version UInt32
939        ) ENGINE = ReplacingMergeTree
940        ORDER BY id"#,
941    )
942    .execute()
943    .await?;
944
945    db.query(
946        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
947            plugin_id UInt32,
948            slot UInt64,
949            indexed_at DateTime('UTC') DEFAULT now()
950        ) ENGINE = ReplacingMergeTree
951        ORDER BY (plugin_id, slot)"#,
952    )
953    .execute()
954    .await?;
955
956    Ok(())
957}
958
959async fn upsert_plugins(
960    db: &Client,
961    plugins: &[PluginHandle],
962) -> Result<(), clickhouse::error::Error> {
963    if plugins.is_empty() {
964        return Ok(());
965    }
966    let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
967    for handle in plugins {
968        insert
969            .write(&PluginRow {
970                id: handle.id as u32,
971                name: handle.name,
972                version: handle.version as u32,
973            })
974            .await?;
975    }
976    insert.end().await?;
977    Ok(())
978}
979
980async fn record_plugin_slot(
981    db: Arc<Client>,
982    plugin_id: u16,
983    slot: u64,
984) -> Result<(), clickhouse::error::Error> {
985    let mut insert = db
986        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
987        .await?;
988    insert
989        .write(&PluginSlotRow {
990            plugin_id: plugin_id as u32,
991            slot,
992        })
993        .await?;
994    insert.end().await?;
995    Ok(())
996}
997
998async fn flush_slot_buffer(
999    db: Arc<Client>,
1000    buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
1001) -> Result<(), clickhouse::error::Error> {
1002    let mut rows = Vec::new();
1003    buffer.iter_mut().for_each(|mut entry| {
1004        if !entry.value().is_empty() {
1005            rows.append(entry.value_mut());
1006        }
1007    });
1008
1009    if rows.is_empty() {
1010        return Ok(());
1011    }
1012
1013    let mut insert = db
1014        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1015        .await?;
1016    for row in rows {
1017        insert.write(&row).await?;
1018    }
1019    insert.end().await?;
1020    Ok(())
1021}
1022
1023async fn record_slot_status(
1024    db: Arc<Client>,
1025    slot: u64,
1026    thread_id: usize,
1027    transaction_count: u64,
1028    vote_transaction_count: u64,
1029    non_vote_transaction_count: u64,
1030    block_time: Option<i64>,
1031) -> Result<(), clickhouse::error::Error> {
1032    let mut insert = db
1033        .insert::<SlotStatusRow>("jetstreamer_slot_status")
1034        .await?;
1035    insert
1036        .write(&SlotStatusRow {
1037            slot,
1038            transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1039            vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1040            non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1041            thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1042            block_time: clamp_block_time(block_time),
1043        })
1044        .await?;
1045    insert.end().await?;
1046    Ok(())
1047}
1048
1049fn clamp_block_time(block_time: Option<i64>) -> u32 {
1050    match block_time {
1051        Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1052        Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1053        Some(ts) if ts < 0 => 0,
1054        _ => 0,
1055    }
1056}
1057
1058fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1059    let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1060    if is_vote {
1061        entry.votes = entry.votes.saturating_add(1);
1062    } else {
1063        entry.non_votes = entry.non_votes.saturating_add(1);
1064    }
1065}
1066
1067fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1068    SLOT_TX_TALLY
1069        .remove(&slot)
1070        .map(|(_, tally)| tally)
1071        .unwrap_or_default()
1072}
1073
1074// Ensure PluginRunnerError is Send + Sync + 'static
1075trait _CanSend: Send + Sync + 'static {}
1076impl _CanSend for PluginRunnerError {}
1077
1078#[inline]
1079fn human_readable_count(value: impl Into<u128>) -> String {
1080    let digits = value.into().to_string();
1081    let len = digits.len();
1082    let mut formatted = String::with_capacity(len + len / 3);
1083    for (idx, byte) in digits.bytes().enumerate() {
1084        if idx != 0 && (len - idx) % 3 == 0 {
1085            formatted.push(',');
1086        }
1087        formatted.push(char::from(byte));
1088    }
1089    formatted
1090}
1091
1092fn human_readable_duration(seconds: f64) -> String {
1093    if !seconds.is_finite() {
1094        return "n/a".into();
1095    }
1096    if seconds <= 0.0 {
1097        return "0s".into();
1098    }
1099    if seconds < 60.0 {
1100        return format!("{:.1}s", seconds);
1101    }
1102    let duration = Duration::from_secs(seconds.round() as u64);
1103    let secs = duration.as_secs();
1104    let days = secs / 86_400;
1105    let hours = (secs % 86_400) / 3_600;
1106    let minutes = (secs % 3_600) / 60;
1107    let seconds_rem = secs % 60;
1108    if days > 0 {
1109        if hours > 0 {
1110            format!("{}d{}h", days, hours)
1111        } else {
1112            format!("{}d", days)
1113        }
1114    } else if hours > 0 {
1115        format!("{}h{}m", hours, minutes)
1116    } else {
1117        format!("{}m{}s", minutes, seconds_rem)
1118    }
1119}