Skip to main content

jetstreamer_plugin/
lib.rs

1#![deny(missing_docs)]
2//! Trait-based framework for building structured observers on top of Jetstreamer's firehose.
3//!
4//! # Overview
5//! Plugins let you react to every block, transaction, reward, entry, and stats update emitted
6//! by [`jetstreamer_firehose`](https://crates.io/crates/jetstreamer-firehose). Combined with
7//! the
8//! [`JetstreamerRunner`](https://docs.rs/jetstreamer/latest/jetstreamer/struct.JetstreamerRunner.html),
9//! they provide a high-throughput analytics pipeline capable of exceeding 2.7 million
10//! transactions per second on the right hardware. All events originate from Old Faithful's CAR
11//! archive and are streamed over the network into your local runner.
12//!
13//! The framework offers:
14//! - A [`Plugin`] trait with async hook points for each data type.
15//! - [`PluginRunner`] for coordinating multiple plugins with shared ClickHouse connections
16//!   (used internally by `JetstreamerRunner`).
17//! - Built-in plugins under [`plugins`] that demonstrate common batching strategies and
18//!   metrics.
19//! - See `JetstreamerRunner` in the `jetstreamer` crate for the easiest way to run plugins.
20//!
21//! # ClickHouse Integration
22//! Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances
23//! honor the following environment variables:
24//! - `JETSTREAMER_CLICKHOUSE_DSN` (default `http://localhost:8123`): HTTP(S) DSN handed to
25//!   every plugin that requests a database handle.
26//! - `JETSTREAMER_CLICKHOUSE_MODE` (default `auto`): toggles the bundled ClickHouse helper.
27//!   Set to `remote` to opt out of spawning the helper while still writing to a cluster,
28//!   `local` to always spawn, or `off` to disable ClickHouse entirely.
29//!
30//! When the mode is `auto`, Jetstreamer inspects the DSN at runtime and only launches the
31//! embedded helper for local endpoints, enabling native clustering workflows out of the box.
32//!
33//! # Batching ClickHouse Writes
34//! ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large
35//! numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a
36//! cadence that matches their workload. The default [`PluginRunner`] configuration triggers
37//! stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the
38//! database. The bundled [`plugins::program_tracking::ProgramTrackingPlugin`] mirrors this
39//! approach by accumulating `ProgramEvent` rows per worker thread and issuing a single batch
40//! insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive
41//! even under peak throughput.
42//!
43//! # Ordering Guarantees
44//! Also note that because Jetstreamer spawns parallel threads that process different subranges of
45//! the overall slot range at the same time, while each thread sees a purely sequential view of
46//! transactions, downstream services such as databases that consume this data will see writes in a
47//! fairly arbitrary order, so you should design your database tables and shared data structures
48//! accordingly.
49//!
50//! # Examples
51//! ## Defining a Plugin
52//! ```no_run
53//! use std::sync::Arc;
54//! use clickhouse::Client;
55//! use futures_util::FutureExt;
56//! use jetstreamer_firehose::firehose::TransactionData;
57//! use jetstreamer_plugin::{Plugin, PluginFuture};
58//!
59//! struct CountingPlugin;
60//!
61//! impl Plugin for CountingPlugin {
62//!     fn name(&self) -> &'static str { "counting" }
63//!
64//!     fn on_transaction<'a>(
65//!         &'a self,
66//!         _thread_id: usize,
67//!         _db: Option<Arc<Client>>,
68//!         transaction: &'a TransactionData,
69//!     ) -> PluginFuture<'a> {
70//!         async move {
71//!             println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
72//!             Ok(())
73//!         }
74//!         .boxed()
75//!     }
76//! }
77//! # let _plugin = CountingPlugin;
78//! ```
79//!
80//! ## Running Plugins with `PluginRunner`
81//! ```no_run
82//! use std::sync::Arc;
83//! use jetstreamer_firehose::epochs;
84//! use jetstreamer_plugin::{Plugin, PluginRunner};
85//!
86//! struct LoggingPlugin;
87//!
88//! impl Plugin for LoggingPlugin {
89//!     fn name(&self) -> &'static str { "logging" }
90//! }
91//!
92//! #[tokio::main]
93//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
94//!     let mut runner = PluginRunner::new("http://localhost:8123", 1, false, None);
95//!     runner.register(Box::new(LoggingPlugin));
96//!     let runner = Arc::new(runner);
97//!
98//!     let (start, _) = epochs::epoch_to_slot_range(800);
99//!     let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
100//!     runner
101//!         .clone()
102//!         .run(start..(end_inclusive + 1), false)
103//!         .await?;
104//!     Ok(())
105//! }
106//! ```
107
108/// Built-in plugin implementations that ship with Jetstreamer.
109pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114    fmt::Display,
115    future::Future,
116    hint,
117    ops::Range,
118    pin::Pin,
119    sync::{
120        Arc,
121        atomic::{AtomicBool, AtomicU64, Ordering},
122    },
123    time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130    BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137use url::Url;
138
139/// Re-exported statistics types produced by [`firehose`].
140pub use jetstreamer_firehose::firehose::{
141    FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
142};
143
144// Global totals snapshot used to compute overall TPS/ETA between pulses.
145static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
147static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
148static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
149#[inline]
150fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
151    origin.elapsed().as_nanos() as u64
152}
153
154/// Convenience alias for the boxed future returned by plugin hooks.
155pub type PluginFuture<'a> = Pin<
156    Box<
157        dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
158            + Send
159            + 'a,
160    >,
161>;
162
163/// Trait implemented by plugins that consume firehose events.
164///
165/// See the crate-level documentation for usage examples.
166pub trait Plugin: Send + Sync + 'static {
167    /// Human-friendly plugin name used in logs and persisted metadata.
168    fn name(&self) -> &'static str;
169
170    /// Semantic version for the plugin; defaults to `1`.
171    fn version(&self) -> u16 {
172        1
173    }
174
175    /// Deterministic identifier derived from [`Plugin::name`].
176    fn id(&self) -> u16 {
177        let hash = Sha256::digest(self.name());
178        let mut res = 1u16;
179        for byte in hash {
180            res = res.wrapping_mul(31).wrapping_add(byte as u16);
181        }
182        res
183    }
184
185    /// Called for every transaction seen by the firehose.
186    fn on_transaction<'a>(
187        &'a self,
188        _thread_id: usize,
189        _db: Option<Arc<Client>>,
190        _transaction: &'a TransactionData,
191    ) -> PluginFuture<'a> {
192        async move { Ok(()) }.boxed()
193    }
194
195    /// Called for every block observed by the firehose.
196    fn on_block<'a>(
197        &'a self,
198        _thread_id: usize,
199        _db: Option<Arc<Client>>,
200        _block: &'a BlockData,
201    ) -> PluginFuture<'a> {
202        async move { Ok(()) }.boxed()
203    }
204
205    /// Called for every entry observed by the firehose when entry notifications are enabled.
206    fn on_entry<'a>(
207        &'a self,
208        _thread_id: usize,
209        _db: Option<Arc<Client>>,
210        _entry: &'a EntryData,
211    ) -> PluginFuture<'a> {
212        async move { Ok(()) }.boxed()
213    }
214
215    /// Called for reward updates associated with processed blocks.
216    fn on_reward<'a>(
217        &'a self,
218        _thread_id: usize,
219        _db: Option<Arc<Client>>,
220        _reward: &'a RewardsData,
221    ) -> PluginFuture<'a> {
222        async move { Ok(()) }.boxed()
223    }
224
225    /// Called whenever a firehose thread encounters an error before restarting.
226    fn on_error<'a>(
227        &'a self,
228        _thread_id: usize,
229        _db: Option<Arc<Client>>,
230        _error: &'a FirehoseErrorContext,
231    ) -> PluginFuture<'a> {
232        async move { Ok(()) }.boxed()
233    }
234
235    /// Invoked once before the firehose starts streaming events.
236    fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
237        async move { Ok(()) }.boxed()
238    }
239
240    /// Invoked once after the firehose finishes or shuts down.
241    fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
242        async move { Ok(()) }.boxed()
243    }
244}
245
246/// Coordinates plugin execution and ClickHouse persistence.
247///
248/// See the crate-level documentation for usage examples.
249#[derive(Clone)]
250pub struct PluginRunner {
251    plugins: Arc<Vec<Arc<dyn Plugin>>>,
252    clickhouse_dsn: String,
253    num_threads: usize,
254    sequential: bool,
255    buffer_window_bytes: Option<u64>,
256    db_update_interval_slots: u64,
257}
258
259impl PluginRunner {
260    /// Creates a new runner that writes to `clickhouse_dsn` using `num_threads`.
261    ///
262    /// When `sequential` is `true`, firehose runs with one worker and `num_threads` is used as
263    /// ripget parallel download concurrency.
264    pub fn new(
265        clickhouse_dsn: impl Display,
266        num_threads: usize,
267        sequential: bool,
268        buffer_window_bytes: Option<u64>,
269    ) -> Self {
270        Self {
271            plugins: Arc::new(Vec::new()),
272            clickhouse_dsn: clickhouse_dsn.to_string(),
273            num_threads: std::cmp::max(1, num_threads),
274            sequential,
275            buffer_window_bytes,
276            db_update_interval_slots: 100,
277        }
278    }
279
280    /// Registers an additional plugin.
281    pub fn register(&mut self, plugin: Box<dyn Plugin>) {
282        Arc::get_mut(&mut self.plugins)
283            .expect("cannot register plugins after the runner has started")
284            .push(Arc::from(plugin));
285    }
286
287    /// Runs the firehose across the specified slot range, optionally writing to ClickHouse.
288    pub async fn run(
289        self: Arc<Self>,
290        slot_range: Range<u64>,
291        clickhouse_enabled: bool,
292    ) -> Result<(), PluginRunnerError> {
293        let db_update_interval = self.db_update_interval_slots.max(1);
294        let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
295            self.plugins
296                .iter()
297                .cloned()
298                .map(PluginHandle::from)
299                .collect(),
300        );
301
302        let clickhouse = if clickhouse_enabled {
303            let client = Arc::new(
304                build_clickhouse_client(&self.clickhouse_dsn)
305                    .with_option("async_insert", "1")
306                    .with_option("wait_for_async_insert", "0"),
307            );
308            ensure_clickhouse_tables(client.as_ref()).await?;
309            upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
310            Some(client)
311        } else {
312            None
313        };
314
315        for handle in plugin_handles.iter() {
316            if let Err(error) = handle
317                .plugin
318                .on_load(clickhouse.clone())
319                .await
320                .map_err(|e| e.to_string())
321            {
322                return Err(PluginRunnerError::PluginLifecycle {
323                    plugin: handle.name,
324                    stage: "on_load",
325                    details: error,
326                });
327            }
328        }
329
330        let shutting_down = Arc::new(AtomicBool::new(false));
331        let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
332        let clickhouse_enabled = clickhouse.is_some();
333        let slots_since_flush = Arc::new(AtomicU64::new(0));
334
335        let on_block = {
336            let plugin_handles = plugin_handles.clone();
337            let clickhouse = clickhouse.clone();
338            let slot_buffer = slot_buffer.clone();
339            let slots_since_flush = slots_since_flush.clone();
340            let shutting_down = shutting_down.clone();
341            move |thread_id: usize, block: BlockData| {
342                let plugin_handles = plugin_handles.clone();
343                let clickhouse = clickhouse.clone();
344                let slot_buffer = slot_buffer.clone();
345                let slots_since_flush = slots_since_flush.clone();
346                let shutting_down = shutting_down.clone();
347                async move {
348                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
349                    if shutting_down.load(Ordering::SeqCst) {
350                        log::debug!(
351                            target: &log_target,
352                            "ignoring block while shutdown is in progress"
353                        );
354                        return Ok(());
355                    }
356                    let block = Arc::new(block);
357                    if !plugin_handles.is_empty() {
358                        for handle in plugin_handles.iter() {
359                            let db = clickhouse.clone();
360                            if let Err(err) = handle
361                                .plugin
362                                .on_block(thread_id, db.clone(), block.as_ref())
363                                .await
364                            {
365                                log::error!(
366                                    target: &log_target,
367                                    "plugin {} on_block error: {}",
368                                    handle.name,
369                                    err
370                                );
371                                continue;
372                            }
373                            if let (Some(db_client), BlockData::Block { slot, .. }) =
374                                (clickhouse.clone(), block.as_ref())
375                            {
376                                if clickhouse_enabled {
377                                    slot_buffer
378                                        .entry(handle.id)
379                                        .or_default()
380                                        .push(PluginSlotRow {
381                                            plugin_id: handle.id as u32,
382                                            slot: *slot,
383                                        });
384                                } else if let Err(err) =
385                                    record_plugin_slot(db_client, handle.id, *slot).await
386                                {
387                                    log::error!(
388                                        target: &log_target,
389                                        "failed to record plugin slot for {}: {}",
390                                        handle.name,
391                                        err
392                                    );
393                                }
394                            }
395                        }
396                        if clickhouse_enabled {
397                            let current = slots_since_flush
398                                .fetch_add(1, Ordering::Relaxed)
399                                .wrapping_add(1);
400                            if current.is_multiple_of(db_update_interval)
401                                && let Some(db_client) = clickhouse.clone()
402                            {
403                                let buffer = slot_buffer.clone();
404                                let log_target_clone = log_target.clone();
405                                tokio::spawn(async move {
406                                    if let Err(err) = flush_slot_buffer(db_client, buffer).await {
407                                        log::error!(
408                                            target: &log_target_clone,
409                                            "failed to flush buffered plugin slots: {}",
410                                            err
411                                        );
412                                    }
413                                });
414                            }
415                        }
416                    }
417                    if let Some(db_client) = clickhouse.clone() {
418                        match block.as_ref() {
419                            BlockData::Block {
420                                slot,
421                                executed_transaction_count,
422                                block_time,
423                                ..
424                            } => {
425                                let tally = take_slot_tx_tally(*slot);
426                                let slot = *slot;
427                                let executed_transaction_count = *executed_transaction_count;
428                                let block_time = *block_time;
429                                let log_target_clone = log_target.clone();
430                                tokio::spawn(async move {
431                                    if let Err(err) = record_slot_status(
432                                        db_client,
433                                        slot,
434                                        thread_id,
435                                        executed_transaction_count,
436                                        tally.votes,
437                                        tally.non_votes,
438                                        block_time,
439                                    )
440                                    .await
441                                    {
442                                        log::error!(
443                                            target: &log_target_clone,
444                                            "failed to record slot status: {}",
445                                            err
446                                        );
447                                    }
448                                });
449                            }
450                            BlockData::PossibleLeaderSkipped { slot } => {
451                                // Drop any tallies that may exist for skipped slots.
452                                take_slot_tx_tally(*slot);
453                            }
454                        }
455                    }
456                    Ok(())
457                }
458                .boxed()
459            }
460        };
461
462        let on_transaction = {
463            let plugin_handles = plugin_handles.clone();
464            let clickhouse = clickhouse.clone();
465            let shutting_down = shutting_down.clone();
466            move |thread_id: usize, transaction: TransactionData| {
467                let plugin_handles = plugin_handles.clone();
468                let clickhouse = clickhouse.clone();
469                let shutting_down = shutting_down.clone();
470                async move {
471                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
472                    record_slot_vote_tally(transaction.slot, transaction.is_vote);
473                    if plugin_handles.is_empty() {
474                        return Ok(());
475                    }
476                    if shutting_down.load(Ordering::SeqCst) {
477                        log::debug!(
478                            target: &log_target,
479                            "ignoring transaction while shutdown is in progress"
480                        );
481                        return Ok(());
482                    }
483                    for handle in plugin_handles.iter() {
484                        if let Err(err) = handle
485                            .plugin
486                            .on_transaction(thread_id, clickhouse.clone(), &transaction)
487                            .await
488                        {
489                            log::error!(
490                                target: &log_target,
491                                "plugin {} on_transaction error: {}",
492                                handle.name,
493                                err
494                            );
495                        }
496                    }
497                    Ok(())
498                }
499                .boxed()
500            }
501        };
502
503        let on_entry = {
504            let plugin_handles = plugin_handles.clone();
505            let clickhouse = clickhouse.clone();
506            let shutting_down = shutting_down.clone();
507            move |thread_id: usize, entry: EntryData| {
508                let plugin_handles = plugin_handles.clone();
509                let clickhouse = clickhouse.clone();
510                let shutting_down = shutting_down.clone();
511                async move {
512                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
513                    if plugin_handles.is_empty() {
514                        return Ok(());
515                    }
516                    if shutting_down.load(Ordering::SeqCst) {
517                        log::debug!(
518                            target: &log_target,
519                            "ignoring entry while shutdown is in progress"
520                        );
521                        return Ok(());
522                    }
523                    let entry = Arc::new(entry);
524                    for handle in plugin_handles.iter() {
525                        if let Err(err) = handle
526                            .plugin
527                            .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
528                            .await
529                        {
530                            log::error!(
531                                target: &log_target,
532                                "plugin {} on_entry error: {}",
533                                handle.name,
534                                err
535                            );
536                        }
537                    }
538                    Ok(())
539                }
540                .boxed()
541            }
542        };
543
544        let on_reward = {
545            let plugin_handles = plugin_handles.clone();
546            let clickhouse = clickhouse.clone();
547            let shutting_down = shutting_down.clone();
548            move |thread_id: usize, reward: RewardsData| {
549                let plugin_handles = plugin_handles.clone();
550                let clickhouse = clickhouse.clone();
551                let shutting_down = shutting_down.clone();
552                async move {
553                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
554                    if plugin_handles.is_empty() {
555                        return Ok(());
556                    }
557                    if shutting_down.load(Ordering::SeqCst) {
558                        log::debug!(
559                            target: &log_target,
560                            "ignoring reward while shutdown is in progress"
561                        );
562                        return Ok(());
563                    }
564                    let reward = Arc::new(reward);
565                    for handle in plugin_handles.iter() {
566                        if let Err(err) = handle
567                            .plugin
568                            .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
569                            .await
570                        {
571                            log::error!(
572                                target: &log_target,
573                                "plugin {} on_reward error: {}",
574                                handle.name,
575                                err
576                            );
577                        }
578                    }
579                    Ok(())
580                }
581                .boxed()
582            }
583        };
584
585        let on_error = {
586            let plugin_handles = plugin_handles.clone();
587            let clickhouse = clickhouse.clone();
588            let shutting_down = shutting_down.clone();
589            move |thread_id: usize, context: FirehoseErrorContext| {
590                let plugin_handles = plugin_handles.clone();
591                let clickhouse = clickhouse.clone();
592                let shutting_down = shutting_down.clone();
593                async move {
594                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
595                    if plugin_handles.is_empty() {
596                        return Ok(());
597                    }
598                    if shutting_down.load(Ordering::SeqCst) {
599                        log::debug!(
600                            target: &log_target,
601                            "ignoring error callback while shutdown is in progress"
602                        );
603                        return Ok(());
604                    }
605                    let context = Arc::new(context);
606                    for handle in plugin_handles.iter() {
607                        if let Err(err) = handle
608                            .plugin
609                            .on_error(thread_id, clickhouse.clone(), context.as_ref())
610                            .await
611                        {
612                            log::error!(
613                                target: &log_target,
614                                "plugin {} on_error error: {}",
615                                handle.name,
616                                err
617                            );
618                        }
619                    }
620                    Ok(())
621                }
622                .boxed()
623            }
624        };
625
626        let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
627
628        let total_slot_count_capture = total_slot_count;
629        let run_origin = std::time::Instant::now();
630        // Reset global rate snapshot for a new run.
631        SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
632        LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
633        LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
634        LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
635        let stats_tracking = clickhouse.clone().map(|_db| {
636            let shutting_down = shutting_down.clone();
637            let thread_progress_max: Arc<DashMap<usize, f64>> = Arc::new(DashMap::new());
638            StatsTracking {
639        on_stats: {
640            let thread_progress_max = thread_progress_max.clone();
641            let total_slot_count = total_slot_count_capture;
642            move |thread_id: usize, stats: Stats| {
643                let shutting_down = shutting_down.clone();
644                let thread_progress_max = thread_progress_max.clone();
645                async move {
646                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
647                    if shutting_down.load(Ordering::SeqCst) {
648                                log::debug!(
649                                    target: &log_target,
650                                    "skipping stats write during shutdown"
651                                );
652                                return Ok(());
653                            }
654                            let finish_at = stats
655                                .finish_time
656                                .unwrap_or_else(std::time::Instant::now);
657                            let elapsed_since_start = finish_at
658                                .saturating_duration_since(stats.start_time)
659                                .as_nanos()
660                                .max(1) as u64;
661                            let total_slots = stats.slots_processed;
662                            let total_txs = stats.transactions_processed;
663                            let now_ns = monotonic_nanos_since(run_origin);
664                            // Serialize snapshot updates so every pulse measures deltas from the
665                            // previous pulse (regardless of which thread emitted it) using a
666                            // monotonic clock shared across threads.
667                            let (delta_slots, delta_txs, delta_time_ns) = {
668                                while SNAPSHOT_LOCK
669                                    .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
670                                    .is_err()
671                                {
672                                    hint::spin_loop();
673                                }
674                                let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
675                                let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
676                                let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
677                                LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
678                                LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
679                                LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
680                                SNAPSHOT_LOCK.store(false, Ordering::Release);
681                                let delta_slots = total_slots.saturating_sub(prev_slots);
682                                let delta_txs = total_txs.saturating_sub(prev_txs);
683                                let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
684                                (delta_slots, delta_txs, delta_time_ns)
685                            };
686                            let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
687                            let mut slot_rate = delta_slots as f64 / delta_secs;
688                            let mut tps = delta_txs as f64 / delta_secs;
689                            if slot_rate <= 0.0 && total_slots > 0 {
690                                slot_rate =
691                                    total_slots as f64 / (elapsed_since_start as f64 / 1e9);
692                            }
693                            if tps <= 0.0 && total_txs > 0 {
694                                tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
695                            }
696                            let thread_stats = &stats.thread_stats;
697                            let processed_slots = stats.slots_processed.min(total_slot_count);
698                            let progress_fraction = if total_slot_count > 0 {
699                                processed_slots as f64 / total_slot_count as f64
700                            } else {
701                                1.0
702                            };
703                            let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
704                            let thread_total_slots = thread_stats
705                                .initial_slot_range
706                                .end
707                                .saturating_sub(thread_stats.initial_slot_range.start);
708                            let thread_progress_raw = if thread_total_slots > 0 {
709                                (thread_stats.slots_processed as f64 / thread_total_slots as f64)
710                                    .clamp(0.0, 1.0)
711                                    * 100.0
712                            } else {
713                                100.0
714                            };
715                            let thread_progress = *thread_progress_max
716                                .entry(thread_id)
717                                .and_modify(|max| {
718                                    if thread_progress_raw > *max {
719                                        *max = thread_progress_raw;
720                                    }
721                                })
722                                .or_insert(thread_progress_raw);
723                            let mut overall_eta = None;
724                            if slot_rate > 0.0 {
725                                let remaining_slots =
726                                    total_slot_count.saturating_sub(processed_slots);
727                                overall_eta = Some(human_readable_duration(
728                                    remaining_slots as f64 / slot_rate,
729                                ));
730                            }
731                            if overall_eta.is_none() {
732                                if progress_fraction > 0.0 && progress_fraction < 1.0 {
733                                    if let Some(elapsed_total) = finish_at
734                                        .checked_duration_since(stats.start_time)
735                                        .map(|d| d.as_secs_f64())
736                                        && elapsed_total > 0.0 {
737                                            let remaining_secs =
738                                                elapsed_total * (1.0 / progress_fraction - 1.0);
739                                            overall_eta = Some(human_readable_duration(remaining_secs));
740                                        }
741                                } else if progress_fraction >= 1.0 {
742                                    overall_eta = Some("0s".into());
743                                }
744                            }
745                            let slots_display = human_readable_count(processed_slots);
746                            let blocks_display = human_readable_count(stats.blocks_processed);
747                            let txs_display = human_readable_count(stats.transactions_processed);
748                            let tps_display = human_readable_count(tps.ceil() as u64);
749                            log::info!(
750                                target: &log_target,
751                                "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
752                                overall_eta.unwrap_or_else(|| "n/a".into()),
753                            );
754                            Ok(())
755                        }
756                        .boxed()
757                    }
758                },
759                tracking_interval_slots: 100,
760            }
761        });
762
763        let (shutdown_tx, _) = broadcast::channel::<()>(1);
764
765        let mut firehose_future = Box::pin(firehose(
766            self.num_threads as u64,
767            self.sequential,
768            self.buffer_window_bytes,
769            slot_range,
770            Some(on_block),
771            Some(on_transaction),
772            Some(on_entry),
773            Some(on_reward),
774            Some(on_error),
775            stats_tracking,
776            Some(shutdown_tx.subscribe()),
777        ));
778
779        let firehose_result = tokio::select! {
780            res = &mut firehose_future => res,
781            ctrl = signal::ctrl_c() => {
782                match ctrl {
783                    Ok(()) => log::info!(
784                        target: LOG_MODULE,
785                        "CTRL+C received; initiating shutdown"
786                    ),
787                    Err(err) => log::error!(
788                        target: LOG_MODULE,
789                        "failed to listen for CTRL+C: {}",
790                        err
791                    ),
792                }
793                shutting_down.store(true, Ordering::SeqCst);
794                let _ = shutdown_tx.send(());
795                firehose_future.await
796            }
797        };
798
799        if clickhouse_enabled
800            && let Some(db_client) = clickhouse.clone()
801            && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
802        {
803            log::error!(
804                target: LOG_MODULE,
805                "failed to flush buffered plugin slots: {}",
806                err
807            );
808        }
809
810        for handle in plugin_handles.iter() {
811            if let Err(error) = handle
812                .plugin
813                .on_exit(clickhouse.clone())
814                .await
815                .map_err(|e| e.to_string())
816            {
817                log::error!(
818                    target: LOG_MODULE,
819                    "plugin {} on_exit error: {}",
820                    handle.name,
821                    error
822                );
823            }
824        }
825
826        match firehose_result {
827            Ok(()) => Ok(()),
828            Err((error, slot)) => Err(PluginRunnerError::Firehose {
829                details: error.to_string(),
830                slot,
831            }),
832        }
833    }
834}
835
836fn build_clickhouse_client(dsn: &str) -> Client {
837    let mut client = Client::default();
838    if let Ok(mut url) = Url::parse(dsn) {
839        let username = url.username().to_string();
840        let password = url.password().map(|value| value.to_string());
841        if !username.is_empty() || password.is_some() {
842            let _ = url.set_username("");
843            let _ = url.set_password(None);
844        }
845        client = client.with_url(url.as_str());
846        if !username.is_empty() {
847            client = client.with_user(username);
848        }
849        if let Some(password) = password {
850            client = client.with_password(password);
851        }
852    } else {
853        client = client.with_url(dsn);
854    }
855    client
856}
857
858/// Errors that can arise while running plugins against the firehose.
859#[derive(Debug, Error)]
860pub enum PluginRunnerError {
861    /// ClickHouse client returned an error.
862    #[error("clickhouse error: {0}")]
863    Clickhouse(#[from] clickhouse::error::Error),
864    /// Firehose streaming failed at the specified slot.
865    #[error("firehose error at slot {slot}: {details}")]
866    Firehose {
867        /// Human-readable description of the firehose failure.
868        details: String,
869        /// Slot where the firehose encountered the error.
870        slot: u64,
871    },
872    /// Lifecycle hook on a plugin returned an error.
873    #[error("plugin {plugin} failed during {stage}: {details}")]
874    PluginLifecycle {
875        /// Name of the plugin that failed.
876        plugin: &'static str,
877        /// Lifecycle stage where the failure occurred.
878        stage: &'static str,
879        /// Textual error details.
880        details: String,
881    },
882}
883
884#[derive(Clone)]
885struct PluginHandle {
886    plugin: Arc<dyn Plugin>,
887    id: u16,
888    name: &'static str,
889    version: u16,
890}
891
892impl From<Arc<dyn Plugin>> for PluginHandle {
893    fn from(plugin: Arc<dyn Plugin>) -> Self {
894        let id = plugin.id();
895        let name = plugin.name();
896        let version = plugin.version();
897        Self {
898            plugin,
899            id,
900            name,
901            version,
902        }
903    }
904}
905
906#[derive(Row, Serialize)]
907struct PluginRow<'a> {
908    id: u32,
909    name: &'a str,
910    version: u32,
911}
912
913#[derive(Row, Serialize)]
914struct PluginSlotRow {
915    plugin_id: u32,
916    slot: u64,
917}
918
919#[derive(Row, Serialize)]
920struct SlotStatusRow {
921    slot: u64,
922    transaction_count: u32,
923    vote_transaction_count: u32,
924    non_vote_transaction_count: u32,
925    thread_id: u8,
926    block_time: u32,
927}
928
929#[derive(Default, Clone, Copy)]
930struct SlotTxTally {
931    votes: u64,
932    non_votes: u64,
933}
934
935static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally>> = Lazy::new(DashMap::new);
936
937async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
938    db.query(
939        r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
940            slot UInt64,
941            transaction_count UInt32 DEFAULT 0,
942            vote_transaction_count UInt32 DEFAULT 0,
943            non_vote_transaction_count UInt32 DEFAULT 0,
944            thread_id UInt8 DEFAULT 0,
945            block_time DateTime('UTC') DEFAULT toDateTime(0),
946            indexed_at DateTime('UTC') DEFAULT now()
947        ) ENGINE = ReplacingMergeTree(indexed_at)
948        ORDER BY slot"#,
949    )
950    .execute()
951    .await?;
952
953    db.query(
954        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
955            id UInt32,
956            name String,
957            version UInt32
958        ) ENGINE = ReplacingMergeTree
959        ORDER BY id"#,
960    )
961    .execute()
962    .await?;
963
964    db.query(
965        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
966            plugin_id UInt32,
967            slot UInt64,
968            indexed_at DateTime('UTC') DEFAULT now()
969        ) ENGINE = ReplacingMergeTree
970        ORDER BY (plugin_id, slot)"#,
971    )
972    .execute()
973    .await?;
974
975    Ok(())
976}
977
978async fn upsert_plugins(
979    db: &Client,
980    plugins: &[PluginHandle],
981) -> Result<(), clickhouse::error::Error> {
982    if plugins.is_empty() {
983        return Ok(());
984    }
985    let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
986    for handle in plugins {
987        insert
988            .write(&PluginRow {
989                id: handle.id as u32,
990                name: handle.name,
991                version: handle.version as u32,
992            })
993            .await?;
994    }
995    insert.end().await?;
996    Ok(())
997}
998
999async fn record_plugin_slot(
1000    db: Arc<Client>,
1001    plugin_id: u16,
1002    slot: u64,
1003) -> Result<(), clickhouse::error::Error> {
1004    let mut insert = db
1005        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1006        .await?;
1007    insert
1008        .write(&PluginSlotRow {
1009            plugin_id: plugin_id as u32,
1010            slot,
1011        })
1012        .await?;
1013    insert.end().await?;
1014    Ok(())
1015}
1016
1017async fn flush_slot_buffer(
1018    db: Arc<Client>,
1019    buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
1020) -> Result<(), clickhouse::error::Error> {
1021    let mut rows = Vec::new();
1022    buffer.iter_mut().for_each(|mut entry| {
1023        if !entry.value().is_empty() {
1024            rows.append(entry.value_mut());
1025        }
1026    });
1027
1028    if rows.is_empty() {
1029        return Ok(());
1030    }
1031
1032    let mut insert = db
1033        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1034        .await?;
1035    for row in rows {
1036        insert.write(&row).await?;
1037    }
1038    insert.end().await?;
1039    Ok(())
1040}
1041
1042async fn record_slot_status(
1043    db: Arc<Client>,
1044    slot: u64,
1045    thread_id: usize,
1046    transaction_count: u64,
1047    vote_transaction_count: u64,
1048    non_vote_transaction_count: u64,
1049    block_time: Option<i64>,
1050) -> Result<(), clickhouse::error::Error> {
1051    let mut insert = db
1052        .insert::<SlotStatusRow>("jetstreamer_slot_status")
1053        .await?;
1054    insert
1055        .write(&SlotStatusRow {
1056            slot,
1057            transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1058            vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1059            non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1060            thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1061            block_time: clamp_block_time(block_time),
1062        })
1063        .await?;
1064    insert.end().await?;
1065    Ok(())
1066}
1067
1068fn clamp_block_time(block_time: Option<i64>) -> u32 {
1069    match block_time {
1070        Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1071        Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1072        Some(ts) if ts < 0 => 0,
1073        _ => 0,
1074    }
1075}
1076
1077fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1078    let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1079    if is_vote {
1080        entry.votes = entry.votes.saturating_add(1);
1081    } else {
1082        entry.non_votes = entry.non_votes.saturating_add(1);
1083    }
1084}
1085
1086fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1087    SLOT_TX_TALLY
1088        .remove(&slot)
1089        .map(|(_, tally)| tally)
1090        .unwrap_or_default()
1091}
1092
1093// Ensure PluginRunnerError is Send + Sync + 'static
1094trait _CanSend: Send + Sync + 'static {}
1095impl _CanSend for PluginRunnerError {}
1096
1097#[inline]
1098fn human_readable_count(value: impl Into<u128>) -> String {
1099    let digits = value.into().to_string();
1100    let len = digits.len();
1101    let mut formatted = String::with_capacity(len + len / 3);
1102    for (idx, byte) in digits.bytes().enumerate() {
1103        if idx != 0 && (len - idx) % 3 == 0 {
1104            formatted.push(',');
1105        }
1106        formatted.push(char::from(byte));
1107    }
1108    formatted
1109}
1110
1111fn human_readable_duration(seconds: f64) -> String {
1112    if !seconds.is_finite() {
1113        return "n/a".into();
1114    }
1115    if seconds <= 0.0 {
1116        return "0s".into();
1117    }
1118    if seconds < 60.0 {
1119        return format!("{:.1}s", seconds);
1120    }
1121    let duration = Duration::from_secs(seconds.round() as u64);
1122    let secs = duration.as_secs();
1123    let days = secs / 86_400;
1124    let hours = (secs % 86_400) / 3_600;
1125    let minutes = (secs % 3_600) / 60;
1126    let seconds_rem = secs % 60;
1127    if days > 0 {
1128        if hours > 0 {
1129            format!("{}d{}h", days, hours)
1130        } else {
1131            format!("{}d", days)
1132        }
1133    } else if hours > 0 {
1134        format!("{}h{}m", hours, minutes)
1135    } else {
1136        format!("{}m{}s", minutes, seconds_rem)
1137    }
1138}