Skip to main content

jetstreamer_plugin/
lib.rs

1#![deny(missing_docs)]
2//! Trait-based framework for building structured observers on top of Jetstreamer's firehose.
3//!
4//! # Overview
5//! Plugins let you react to every block, transaction, reward, entry, and stats update emitted
6//! by [`jetstreamer_firehose`](https://crates.io/crates/jetstreamer-firehose). Combined with
7//! the
8//! [`JetstreamerRunner`](https://docs.rs/jetstreamer/latest/jetstreamer/struct.JetstreamerRunner.html),
9//! they provide a high-throughput analytics pipeline capable of exceeding 2.7 million
10//! transactions per second on the right hardware. All events originate from Old Faithful's CAR
11//! archive and are streamed over the network into your local runner.
12//!
13//! The framework offers:
14//! - A [`Plugin`] trait with async hook points for each data type.
15//! - [`PluginRunner`] for coordinating multiple plugins with shared ClickHouse connections
16//!   (used internally by `JetstreamerRunner`).
17//! - Built-in plugins under [`plugins`] that demonstrate common batching strategies and
18//!   metrics.
19//! - See `JetstreamerRunner` in the `jetstreamer` crate for the easiest way to run plugins.
20//!
21//! # ClickHouse Integration
22//! Jetstreamer plugins are typically paired with ClickHouse for persistence. Runner instances
23//! honor the following environment variables:
24//! - `JETSTREAMER_CLICKHOUSE_DSN` (default `http://localhost:8123`): HTTP(S) DSN handed to
25//!   every plugin that requests a database handle.
26//! - `JETSTREAMER_CLICKHOUSE_MODE` (default `auto`): toggles the bundled ClickHouse helper.
27//!   Set to `remote` to opt out of spawning the helper while still writing to a cluster,
28//!   `local` to always spawn, or `off` to disable ClickHouse entirely.
29//!
30//! When the mode is `auto`, Jetstreamer inspects the DSN at runtime and only launches the
31//! embedded helper for local endpoints, enabling native clustering workflows out of the box.
32//!
33//! # Batching ClickHouse Writes
34//! ClickHouse (and any sinks you invoke inside hook handlers) can apply backpressure on large
35//! numbers of tiny inserts. Plugins should buffer work locally and flush in batches on a
36//! cadence that matches their workload. The default [`PluginRunner`] configuration triggers
37//! stats pulses every 100 slots, which offers a reasonable heartbeat without thrashing the
38//! database. The bundled [`plugins::program_tracking::ProgramTrackingPlugin`] mirrors this
39//! approach by accumulating `ProgramEvent` rows per worker thread and issuing a single batch
40//! insert every 1,000 slots. Adopting a similar strategy keeps long-running replays responsive
41//! even under peak throughput.
42//!
43//! # Ordering Guarantees
44//! Also note that because Jetstreamer spawns parallel threads that process different subranges of
45//! the overall slot range at the same time, while each thread sees a purely sequential view of
46//! transactions, downstream services such as databases that consume this data will see writes in a
47//! fairly arbitrary order, so you should design your database tables and shared data structures
48//! accordingly.
49//!
50//! # Examples
51//! ## Defining a Plugin
52//! ```no_run
53//! use std::sync::Arc;
54//! use clickhouse::Client;
55//! use futures_util::FutureExt;
56//! use jetstreamer_firehose::firehose::TransactionData;
57//! use jetstreamer_plugin::{Plugin, PluginFuture};
58//!
59//! struct CountingPlugin;
60//!
61//! impl Plugin for CountingPlugin {
62//!     fn name(&self) -> &'static str { "counting" }
63//!
64//!     fn on_transaction<'a>(
65//!         &'a self,
66//!         _thread_id: usize,
67//!         _db: Option<Arc<Client>>,
68//!         transaction: &'a TransactionData,
69//!     ) -> PluginFuture<'a> {
70//!         async move {
71//!             println!("saw tx {} in slot {}", transaction.signature, transaction.slot);
72//!             Ok(())
73//!         }
74//!         .boxed()
75//!     }
76//! }
77//! # let _plugin = CountingPlugin;
78//! ```
79//!
80//! ## Running Plugins with `PluginRunner`
81//! ```no_run
82//! use std::sync::Arc;
83//! use jetstreamer_firehose::epochs;
84//! use jetstreamer_plugin::{Plugin, PluginRunner};
85//!
86//! struct LoggingPlugin;
87//!
88//! impl Plugin for LoggingPlugin {
89//!     fn name(&self) -> &'static str { "logging" }
90//! }
91//!
92//! #[tokio::main]
93//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
94//!     let mut runner = PluginRunner::new("http://localhost:8123", 1, false, false, None);
95//!     runner.register(Box::new(LoggingPlugin));
96//!     let runner = Arc::new(runner);
97//!
98//!     let (start, _) = epochs::epoch_to_slot_range(800);
99//!     let (_, end_inclusive) = epochs::epoch_to_slot_range(805);
100//!     runner
101//!         .clone()
102//!         .run(start..(end_inclusive + 1), false)
103//!         .await?;
104//!     Ok(())
105//! }
106//! ```
107
108/// Built-in plugin implementations that ship with Jetstreamer.
109pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114    fmt::Display,
115    future::Future,
116    hint,
117    ops::Range,
118    pin::Pin,
119    sync::{
120        Arc,
121        atomic::{AtomicBool, AtomicU64, Ordering},
122    },
123    time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130    BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137use url::Url;
138
139/// Re-exported statistics types produced by [`firehose`].
140pub use jetstreamer_firehose::firehose::{
141    FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
142};
143
144// Global totals snapshot used to compute overall TPS/ETA between pulses.
145static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
147static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
148static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
149#[inline]
150fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
151    origin.elapsed().as_nanos() as u64
152}
153
154/// Convenience alias for the boxed future returned by plugin hooks.
155pub type PluginFuture<'a> = Pin<
156    Box<
157        dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
158            + Send
159            + 'a,
160    >,
161>;
162
163/// Trait implemented by plugins that consume firehose events.
164///
165/// See the crate-level documentation for usage examples.
166pub trait Plugin: Send + Sync + 'static {
167    /// Human-friendly plugin name used in logs and persisted metadata.
168    fn name(&self) -> &'static str;
169
170    /// Semantic version for the plugin; defaults to `1`.
171    fn version(&self) -> u16 {
172        1
173    }
174
175    /// Deterministic identifier derived from [`Plugin::name`].
176    fn id(&self) -> u16 {
177        let hash = Sha256::digest(self.name());
178        let mut res = 1u16;
179        for byte in hash {
180            res = res.wrapping_mul(31).wrapping_add(byte as u16);
181        }
182        res
183    }
184
185    /// Called for every transaction seen by the firehose.
186    fn on_transaction<'a>(
187        &'a self,
188        _thread_id: usize,
189        _db: Option<Arc<Client>>,
190        _transaction: &'a TransactionData,
191    ) -> PluginFuture<'a> {
192        async move { Ok(()) }.boxed()
193    }
194
195    /// Called for every block observed by the firehose.
196    fn on_block<'a>(
197        &'a self,
198        _thread_id: usize,
199        _db: Option<Arc<Client>>,
200        _block: &'a BlockData,
201    ) -> PluginFuture<'a> {
202        async move { Ok(()) }.boxed()
203    }
204
205    /// Called for every entry observed by the firehose when entry notifications are enabled.
206    fn on_entry<'a>(
207        &'a self,
208        _thread_id: usize,
209        _db: Option<Arc<Client>>,
210        _entry: &'a EntryData,
211    ) -> PluginFuture<'a> {
212        async move { Ok(()) }.boxed()
213    }
214
215    /// Called for reward updates associated with processed blocks.
216    fn on_reward<'a>(
217        &'a self,
218        _thread_id: usize,
219        _db: Option<Arc<Client>>,
220        _reward: &'a RewardsData,
221    ) -> PluginFuture<'a> {
222        async move { Ok(()) }.boxed()
223    }
224
225    /// Called whenever a firehose thread encounters an error before restarting.
226    fn on_error<'a>(
227        &'a self,
228        _thread_id: usize,
229        _db: Option<Arc<Client>>,
230        _error: &'a FirehoseErrorContext,
231    ) -> PluginFuture<'a> {
232        async move { Ok(()) }.boxed()
233    }
234
235    /// Invoked once before the firehose starts streaming events.
236    fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
237        async move { Ok(()) }.boxed()
238    }
239
240    /// Invoked once after the firehose finishes or shuts down.
241    fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
242        async move { Ok(()) }.boxed()
243    }
244}
245
246/// Coordinates plugin execution and ClickHouse persistence.
247///
248/// See the crate-level documentation for usage examples.
249#[derive(Clone)]
250pub struct PluginRunner {
251    plugins: Arc<Vec<Arc<dyn Plugin>>>,
252    clickhouse_dsn: String,
253    num_threads: usize,
254    sequential: bool,
255    reverse: bool,
256    buffer_window_bytes: Option<u64>,
257    db_update_interval_slots: u64,
258}
259
260impl PluginRunner {
261    /// Creates a new runner that writes to `clickhouse_dsn` using `num_threads`.
262    ///
263    /// When `sequential` is `true`, firehose runs with one worker and `num_threads` is used as
264    /// ripget parallel download concurrency. When `reverse` is `true`, epochs in the slot range
265    /// are streamed from highest to lowest; this implies sequential mode and activates it
266    /// automatically if not already set.
267    pub fn new(
268        clickhouse_dsn: impl Display,
269        num_threads: usize,
270        sequential: bool,
271        reverse: bool,
272        buffer_window_bytes: Option<u64>,
273    ) -> Self {
274        Self {
275            plugins: Arc::new(Vec::new()),
276            clickhouse_dsn: clickhouse_dsn.to_string(),
277            num_threads: std::cmp::max(1, num_threads),
278            sequential,
279            reverse,
280            buffer_window_bytes,
281            db_update_interval_slots: 100,
282        }
283    }
284
285    /// Registers an additional plugin.
286    pub fn register(&mut self, plugin: Box<dyn Plugin>) {
287        Arc::get_mut(&mut self.plugins)
288            .expect("cannot register plugins after the runner has started")
289            .push(Arc::from(plugin));
290    }
291
292    /// Runs the firehose across the specified slot range, optionally writing to ClickHouse.
293    pub async fn run(
294        self: Arc<Self>,
295        slot_range: Range<u64>,
296        clickhouse_enabled: bool,
297    ) -> Result<(), PluginRunnerError> {
298        let db_update_interval = self.db_update_interval_slots.max(1);
299        let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
300            self.plugins
301                .iter()
302                .cloned()
303                .map(PluginHandle::from)
304                .collect(),
305        );
306
307        let clickhouse = if clickhouse_enabled {
308            let client = Arc::new(
309                build_clickhouse_client(&self.clickhouse_dsn)
310                    .with_option("async_insert", "1")
311                    .with_option("wait_for_async_insert", "0"),
312            );
313            ensure_clickhouse_tables(client.as_ref()).await?;
314            upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
315            Some(client)
316        } else {
317            None
318        };
319
320        for handle in plugin_handles.iter() {
321            if let Err(error) = handle
322                .plugin
323                .on_load(clickhouse.clone())
324                .await
325                .map_err(|e| e.to_string())
326            {
327                return Err(PluginRunnerError::PluginLifecycle {
328                    plugin: handle.name,
329                    stage: "on_load",
330                    details: error,
331                });
332            }
333        }
334
335        let shutting_down = Arc::new(AtomicBool::new(false));
336        let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>, ahash::RandomState>> =
337            Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
338        let clickhouse_enabled = clickhouse.is_some();
339        let slots_since_flush = Arc::new(AtomicU64::new(0));
340
341        let on_block = {
342            let plugin_handles = plugin_handles.clone();
343            let clickhouse = clickhouse.clone();
344            let slot_buffer = slot_buffer.clone();
345            let slots_since_flush = slots_since_flush.clone();
346            let shutting_down = shutting_down.clone();
347            move |thread_id: usize, block: BlockData| {
348                let plugin_handles = plugin_handles.clone();
349                let clickhouse = clickhouse.clone();
350                let slot_buffer = slot_buffer.clone();
351                let slots_since_flush = slots_since_flush.clone();
352                let shutting_down = shutting_down.clone();
353                async move {
354                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
355                    if shutting_down.load(Ordering::SeqCst) {
356                        log::debug!(
357                            target: &log_target,
358                            "ignoring block while shutdown is in progress"
359                        );
360                        return Ok(());
361                    }
362                    let block = Arc::new(block);
363                    if !plugin_handles.is_empty() {
364                        for handle in plugin_handles.iter() {
365                            let db = clickhouse.clone();
366                            if let Err(err) = handle
367                                .plugin
368                                .on_block(thread_id, db.clone(), block.as_ref())
369                                .await
370                            {
371                                log::error!(
372                                    target: &log_target,
373                                    "plugin {} on_block error: {}",
374                                    handle.name,
375                                    err
376                                );
377                                continue;
378                            }
379                            if let (Some(db_client), BlockData::Block { slot, .. }) =
380                                (clickhouse.clone(), block.as_ref())
381                            {
382                                if clickhouse_enabled {
383                                    slot_buffer
384                                        .entry(handle.id)
385                                        .or_default()
386                                        .push(PluginSlotRow {
387                                            plugin_id: handle.id as u32,
388                                            slot: *slot,
389                                        });
390                                } else if let Err(err) =
391                                    record_plugin_slot(db_client, handle.id, *slot).await
392                                {
393                                    log::error!(
394                                        target: &log_target,
395                                        "failed to record plugin slot for {}: {}",
396                                        handle.name,
397                                        err
398                                    );
399                                }
400                            }
401                        }
402                        if clickhouse_enabled {
403                            let current = slots_since_flush
404                                .fetch_add(1, Ordering::Relaxed)
405                                .wrapping_add(1);
406                            if current.is_multiple_of(db_update_interval)
407                                && let Some(db_client) = clickhouse.clone()
408                            {
409                                let buffer = slot_buffer.clone();
410                                let log_target_clone = log_target.clone();
411                                tokio::spawn(async move {
412                                    if let Err(err) = flush_slot_buffer(db_client, buffer).await {
413                                        log::error!(
414                                            target: &log_target_clone,
415                                            "failed to flush buffered plugin slots: {}",
416                                            err
417                                        );
418                                    }
419                                });
420                            }
421                        }
422                    }
423                    if let Some(db_client) = clickhouse.clone() {
424                        match block.as_ref() {
425                            BlockData::Block {
426                                slot,
427                                executed_transaction_count,
428                                block_time,
429                                ..
430                            } => {
431                                let tally = take_slot_tx_tally(*slot);
432                                let slot = *slot;
433                                let executed_transaction_count = *executed_transaction_count;
434                                let block_time = *block_time;
435                                let log_target_clone = log_target.clone();
436                                tokio::spawn(async move {
437                                    if let Err(err) = record_slot_status(
438                                        db_client,
439                                        slot,
440                                        thread_id,
441                                        executed_transaction_count,
442                                        tally.votes,
443                                        tally.non_votes,
444                                        block_time,
445                                    )
446                                    .await
447                                    {
448                                        log::error!(
449                                            target: &log_target_clone,
450                                            "failed to record slot status: {}",
451                                            err
452                                        );
453                                    }
454                                });
455                            }
456                            BlockData::PossibleLeaderSkipped { slot } => {
457                                // Drop any tallies that may exist for skipped slots.
458                                take_slot_tx_tally(*slot);
459                            }
460                        }
461                    }
462                    Ok(())
463                }
464                .boxed()
465            }
466        };
467
468        let on_transaction = {
469            let plugin_handles = plugin_handles.clone();
470            let clickhouse = clickhouse.clone();
471            let shutting_down = shutting_down.clone();
472            move |thread_id: usize, transaction: TransactionData| {
473                let plugin_handles = plugin_handles.clone();
474                let clickhouse = clickhouse.clone();
475                let shutting_down = shutting_down.clone();
476                async move {
477                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
478                    record_slot_vote_tally(transaction.slot, transaction.is_vote);
479                    if plugin_handles.is_empty() {
480                        return Ok(());
481                    }
482                    if shutting_down.load(Ordering::SeqCst) {
483                        log::debug!(
484                            target: &log_target,
485                            "ignoring transaction while shutdown is in progress"
486                        );
487                        return Ok(());
488                    }
489                    for handle in plugin_handles.iter() {
490                        if let Err(err) = handle
491                            .plugin
492                            .on_transaction(thread_id, clickhouse.clone(), &transaction)
493                            .await
494                        {
495                            log::error!(
496                                target: &log_target,
497                                "plugin {} on_transaction error: {}",
498                                handle.name,
499                                err
500                            );
501                        }
502                    }
503                    Ok(())
504                }
505                .boxed()
506            }
507        };
508
509        let on_entry = {
510            let plugin_handles = plugin_handles.clone();
511            let clickhouse = clickhouse.clone();
512            let shutting_down = shutting_down.clone();
513            move |thread_id: usize, entry: EntryData| {
514                let plugin_handles = plugin_handles.clone();
515                let clickhouse = clickhouse.clone();
516                let shutting_down = shutting_down.clone();
517                async move {
518                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
519                    if plugin_handles.is_empty() {
520                        return Ok(());
521                    }
522                    if shutting_down.load(Ordering::SeqCst) {
523                        log::debug!(
524                            target: &log_target,
525                            "ignoring entry while shutdown is in progress"
526                        );
527                        return Ok(());
528                    }
529                    let entry = Arc::new(entry);
530                    for handle in plugin_handles.iter() {
531                        if let Err(err) = handle
532                            .plugin
533                            .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
534                            .await
535                        {
536                            log::error!(
537                                target: &log_target,
538                                "plugin {} on_entry error: {}",
539                                handle.name,
540                                err
541                            );
542                        }
543                    }
544                    Ok(())
545                }
546                .boxed()
547            }
548        };
549
550        let on_reward = {
551            let plugin_handles = plugin_handles.clone();
552            let clickhouse = clickhouse.clone();
553            let shutting_down = shutting_down.clone();
554            move |thread_id: usize, reward: RewardsData| {
555                let plugin_handles = plugin_handles.clone();
556                let clickhouse = clickhouse.clone();
557                let shutting_down = shutting_down.clone();
558                async move {
559                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
560                    if plugin_handles.is_empty() {
561                        return Ok(());
562                    }
563                    if shutting_down.load(Ordering::SeqCst) {
564                        log::debug!(
565                            target: &log_target,
566                            "ignoring reward while shutdown is in progress"
567                        );
568                        return Ok(());
569                    }
570                    let reward = Arc::new(reward);
571                    for handle in plugin_handles.iter() {
572                        if let Err(err) = handle
573                            .plugin
574                            .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
575                            .await
576                        {
577                            log::error!(
578                                target: &log_target,
579                                "plugin {} on_reward error: {}",
580                                handle.name,
581                                err
582                            );
583                        }
584                    }
585                    Ok(())
586                }
587                .boxed()
588            }
589        };
590
591        let on_error = {
592            let plugin_handles = plugin_handles.clone();
593            let clickhouse = clickhouse.clone();
594            let shutting_down = shutting_down.clone();
595            move |thread_id: usize, context: FirehoseErrorContext| {
596                let plugin_handles = plugin_handles.clone();
597                let clickhouse = clickhouse.clone();
598                let shutting_down = shutting_down.clone();
599                async move {
600                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
601                    if plugin_handles.is_empty() {
602                        return Ok(());
603                    }
604                    if shutting_down.load(Ordering::SeqCst) {
605                        log::debug!(
606                            target: &log_target,
607                            "ignoring error callback while shutdown is in progress"
608                        );
609                        return Ok(());
610                    }
611                    let context = Arc::new(context);
612                    for handle in plugin_handles.iter() {
613                        if let Err(err) = handle
614                            .plugin
615                            .on_error(thread_id, clickhouse.clone(), context.as_ref())
616                            .await
617                        {
618                            log::error!(
619                                target: &log_target,
620                                "plugin {} on_error error: {}",
621                                handle.name,
622                                err
623                            );
624                        }
625                    }
626                    Ok(())
627                }
628                .boxed()
629            }
630        };
631
632        let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
633
634        let total_slot_count_capture = total_slot_count;
635        let run_origin = std::time::Instant::now();
636        // Reset global rate snapshot for a new run.
637        SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
638        LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
639        LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
640        LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
641        let stats_tracking = clickhouse.clone().map(|_db| {
642            let shutting_down = shutting_down.clone();
643            let thread_progress_max: Arc<DashMap<usize, f64, ahash::RandomState>> = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
644            StatsTracking {
645        on_stats: {
646            let thread_progress_max = thread_progress_max.clone();
647            let total_slot_count = total_slot_count_capture;
648            move |thread_id: usize, stats: Stats| {
649                let shutting_down = shutting_down.clone();
650                let thread_progress_max = thread_progress_max.clone();
651                async move {
652                    let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
653                    if shutting_down.load(Ordering::SeqCst) {
654                                log::debug!(
655                                    target: &log_target,
656                                    "skipping stats write during shutdown"
657                                );
658                                return Ok(());
659                            }
660                            let finish_at = stats
661                                .finish_time
662                                .unwrap_or_else(std::time::Instant::now);
663                            let elapsed_since_start = finish_at
664                                .saturating_duration_since(stats.start_time)
665                                .as_nanos()
666                                .max(1) as u64;
667                            let total_slots = stats.slots_processed;
668                            let total_txs = stats.transactions_processed;
669                            let now_ns = monotonic_nanos_since(run_origin);
670                            // Serialize snapshot updates so every pulse measures deltas from the
671                            // previous pulse (regardless of which thread emitted it) using a
672                            // monotonic clock shared across threads.
673                            let (delta_slots, delta_txs, delta_time_ns) = {
674                                while SNAPSHOT_LOCK
675                                    .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
676                                    .is_err()
677                                {
678                                    hint::spin_loop();
679                                }
680                                let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
681                                let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
682                                let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
683                                LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
684                                LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
685                                LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
686                                SNAPSHOT_LOCK.store(false, Ordering::Release);
687                                let delta_slots = total_slots.saturating_sub(prev_slots);
688                                let delta_txs = total_txs.saturating_sub(prev_txs);
689                                let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
690                                (delta_slots, delta_txs, delta_time_ns)
691                            };
692                            let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
693                            let mut slot_rate = delta_slots as f64 / delta_secs;
694                            let mut tps = delta_txs as f64 / delta_secs;
695                            if slot_rate <= 0.0 && total_slots > 0 {
696                                slot_rate =
697                                    total_slots as f64 / (elapsed_since_start as f64 / 1e9);
698                            }
699                            if tps <= 0.0 && total_txs > 0 {
700                                tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
701                            }
702                            let thread_stats = &stats.thread_stats;
703                            let processed_slots = stats.slots_processed.min(total_slot_count);
704                            let progress_fraction = if total_slot_count > 0 {
705                                processed_slots as f64 / total_slot_count as f64
706                            } else {
707                                1.0
708                            };
709                            let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
710                            let thread_total_slots = thread_stats
711                                .initial_slot_range
712                                .end
713                                .saturating_sub(thread_stats.initial_slot_range.start);
714                            let thread_progress_raw = if thread_total_slots > 0 {
715                                (thread_stats.slots_processed as f64 / thread_total_slots as f64)
716                                    .clamp(0.0, 1.0)
717                                    * 100.0
718                            } else {
719                                100.0
720                            };
721                            let thread_progress = *thread_progress_max
722                                .entry(thread_id)
723                                .and_modify(|max| {
724                                    if thread_progress_raw > *max {
725                                        *max = thread_progress_raw;
726                                    }
727                                })
728                                .or_insert(thread_progress_raw);
729                            let mut overall_eta = None;
730                            if slot_rate > 0.0 {
731                                let remaining_slots =
732                                    total_slot_count.saturating_sub(processed_slots);
733                                overall_eta = Some(human_readable_duration(
734                                    remaining_slots as f64 / slot_rate,
735                                ));
736                            }
737                            if overall_eta.is_none() {
738                                if progress_fraction > 0.0 && progress_fraction < 1.0 {
739                                    if let Some(elapsed_total) = finish_at
740                                        .checked_duration_since(stats.start_time)
741                                        .map(|d| d.as_secs_f64())
742                                        && elapsed_total > 0.0 {
743                                            let remaining_secs =
744                                                elapsed_total * (1.0 / progress_fraction - 1.0);
745                                            overall_eta = Some(human_readable_duration(remaining_secs));
746                                        }
747                                } else if progress_fraction >= 1.0 {
748                                    overall_eta = Some("0s".into());
749                                }
750                            }
751                            let slots_display = human_readable_count(processed_slots);
752                            let blocks_display = human_readable_count(stats.blocks_processed);
753                            let txs_display = human_readable_count(stats.transactions_processed);
754                            let tps_display = human_readable_count(tps.ceil() as u64);
755                            log::info!(
756                                target: &log_target,
757                                "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
758                                overall_eta.unwrap_or_else(|| "n/a".into()),
759                            );
760                            Ok(())
761                        }
762                        .boxed()
763                    }
764                },
765                tracking_interval_slots: 100,
766            }
767        });
768
769        let (shutdown_tx, _) = broadcast::channel::<()>(1);
770
771        let mut firehose_future = Box::pin(firehose(
772            self.num_threads as u64,
773            self.sequential,
774            self.reverse,
775            self.buffer_window_bytes,
776            slot_range,
777            Some(on_block),
778            Some(on_transaction),
779            Some(on_entry),
780            Some(on_reward),
781            Some(on_error),
782            stats_tracking,
783            Some(shutdown_tx.subscribe()),
784        ));
785
786        let firehose_result = tokio::select! {
787            res = &mut firehose_future => res,
788            ctrl = signal::ctrl_c() => {
789                match ctrl {
790                    Ok(()) => log::info!(
791                        target: LOG_MODULE,
792                        "CTRL+C received; initiating shutdown"
793                    ),
794                    Err(err) => log::error!(
795                        target: LOG_MODULE,
796                        "failed to listen for CTRL+C: {}",
797                        err
798                    ),
799                }
800                shutting_down.store(true, Ordering::SeqCst);
801                let _ = shutdown_tx.send(());
802                firehose_future.await
803            }
804        };
805
806        if clickhouse_enabled
807            && let Some(db_client) = clickhouse.clone()
808            && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
809        {
810            log::error!(
811                target: LOG_MODULE,
812                "failed to flush buffered plugin slots: {}",
813                err
814            );
815        }
816
817        for handle in plugin_handles.iter() {
818            if let Err(error) = handle
819                .plugin
820                .on_exit(clickhouse.clone())
821                .await
822                .map_err(|e| e.to_string())
823            {
824                log::error!(
825                    target: LOG_MODULE,
826                    "plugin {} on_exit error: {}",
827                    handle.name,
828                    error
829                );
830            }
831        }
832
833        match firehose_result {
834            Ok(()) => Ok(()),
835            Err((error, slot)) => Err(PluginRunnerError::Firehose {
836                details: error.to_string(),
837                slot,
838            }),
839        }
840    }
841}
842
843fn build_clickhouse_client(dsn: &str) -> Client {
844    let mut client = Client::default();
845    if let Ok(mut url) = Url::parse(dsn) {
846        let username = url.username().to_string();
847        let password = url.password().map(|value| value.to_string());
848        if !username.is_empty() || password.is_some() {
849            let _ = url.set_username("");
850            let _ = url.set_password(None);
851        }
852        client = client.with_url(url.as_str());
853        if !username.is_empty() {
854            client = client.with_user(username);
855        }
856        if let Some(password) = password {
857            client = client.with_password(password);
858        }
859    } else {
860        client = client.with_url(dsn);
861    }
862    client
863}
864
865/// Errors that can arise while running plugins against the firehose.
866#[derive(Debug, Error)]
867pub enum PluginRunnerError {
868    /// ClickHouse client returned an error.
869    #[error("clickhouse error: {0}")]
870    Clickhouse(#[from] clickhouse::error::Error),
871    /// Firehose streaming failed at the specified slot.
872    #[error("firehose error at slot {slot}: {details}")]
873    Firehose {
874        /// Human-readable description of the firehose failure.
875        details: String,
876        /// Slot where the firehose encountered the error.
877        slot: u64,
878    },
879    /// Lifecycle hook on a plugin returned an error.
880    #[error("plugin {plugin} failed during {stage}: {details}")]
881    PluginLifecycle {
882        /// Name of the plugin that failed.
883        plugin: &'static str,
884        /// Lifecycle stage where the failure occurred.
885        stage: &'static str,
886        /// Textual error details.
887        details: String,
888    },
889}
890
891#[derive(Clone)]
892struct PluginHandle {
893    plugin: Arc<dyn Plugin>,
894    id: u16,
895    name: &'static str,
896    version: u16,
897}
898
899impl From<Arc<dyn Plugin>> for PluginHandle {
900    fn from(plugin: Arc<dyn Plugin>) -> Self {
901        let id = plugin.id();
902        let name = plugin.name();
903        let version = plugin.version();
904        Self {
905            plugin,
906            id,
907            name,
908            version,
909        }
910    }
911}
912
913#[derive(Row, Serialize)]
914struct PluginRow<'a> {
915    id: u32,
916    name: &'a str,
917    version: u32,
918}
919
920#[derive(Row, Serialize)]
921struct PluginSlotRow {
922    plugin_id: u32,
923    slot: u64,
924}
925
926#[derive(Row, Serialize)]
927struct SlotStatusRow {
928    slot: u64,
929    transaction_count: u32,
930    vote_transaction_count: u32,
931    non_vote_transaction_count: u32,
932    thread_id: u8,
933    block_time: u32,
934}
935
936#[derive(Default, Clone, Copy)]
937struct SlotTxTally {
938    votes: u64,
939    non_votes: u64,
940}
941
942static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally, ahash::RandomState>> =
943    Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
944
945async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
946    db.query(
947        r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
948            slot UInt64,
949            transaction_count UInt32 DEFAULT 0,
950            vote_transaction_count UInt32 DEFAULT 0,
951            non_vote_transaction_count UInt32 DEFAULT 0,
952            thread_id UInt8 DEFAULT 0,
953            block_time DateTime('UTC') DEFAULT toDateTime(0),
954            indexed_at DateTime('UTC') DEFAULT now()
955        ) ENGINE = ReplacingMergeTree(indexed_at)
956        ORDER BY slot"#,
957    )
958    .execute()
959    .await?;
960
961    db.query(
962        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
963            id UInt32,
964            name String,
965            version UInt32
966        ) ENGINE = ReplacingMergeTree
967        ORDER BY id"#,
968    )
969    .execute()
970    .await?;
971
972    db.query(
973        r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
974            plugin_id UInt32,
975            slot UInt64,
976            indexed_at DateTime('UTC') DEFAULT now()
977        ) ENGINE = ReplacingMergeTree
978        ORDER BY (plugin_id, slot)"#,
979    )
980    .execute()
981    .await?;
982
983    Ok(())
984}
985
986async fn upsert_plugins(
987    db: &Client,
988    plugins: &[PluginHandle],
989) -> Result<(), clickhouse::error::Error> {
990    if plugins.is_empty() {
991        return Ok(());
992    }
993    let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
994    for handle in plugins {
995        insert
996            .write(&PluginRow {
997                id: handle.id as u32,
998                name: handle.name,
999                version: handle.version as u32,
1000            })
1001            .await?;
1002    }
1003    insert.end().await?;
1004    Ok(())
1005}
1006
1007async fn record_plugin_slot(
1008    db: Arc<Client>,
1009    plugin_id: u16,
1010    slot: u64,
1011) -> Result<(), clickhouse::error::Error> {
1012    let mut insert = db
1013        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1014        .await?;
1015    insert
1016        .write(&PluginSlotRow {
1017            plugin_id: plugin_id as u32,
1018            slot,
1019        })
1020        .await?;
1021    insert.end().await?;
1022    Ok(())
1023}
1024
1025async fn flush_slot_buffer(
1026    db: Arc<Client>,
1027    buffer: Arc<DashMap<u16, Vec<PluginSlotRow>, ahash::RandomState>>,
1028) -> Result<(), clickhouse::error::Error> {
1029    let mut rows = Vec::new();
1030    buffer.iter_mut().for_each(|mut entry| {
1031        if !entry.value().is_empty() {
1032            rows.append(entry.value_mut());
1033        }
1034    });
1035
1036    if rows.is_empty() {
1037        return Ok(());
1038    }
1039
1040    let mut insert = db
1041        .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1042        .await?;
1043    for row in rows {
1044        insert.write(&row).await?;
1045    }
1046    insert.end().await?;
1047    Ok(())
1048}
1049
1050async fn record_slot_status(
1051    db: Arc<Client>,
1052    slot: u64,
1053    thread_id: usize,
1054    transaction_count: u64,
1055    vote_transaction_count: u64,
1056    non_vote_transaction_count: u64,
1057    block_time: Option<i64>,
1058) -> Result<(), clickhouse::error::Error> {
1059    let mut insert = db
1060        .insert::<SlotStatusRow>("jetstreamer_slot_status")
1061        .await?;
1062    insert
1063        .write(&SlotStatusRow {
1064            slot,
1065            transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1066            vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1067            non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1068            thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1069            block_time: clamp_block_time(block_time),
1070        })
1071        .await?;
1072    insert.end().await?;
1073    Ok(())
1074}
1075
1076fn clamp_block_time(block_time: Option<i64>) -> u32 {
1077    match block_time {
1078        Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1079        Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1080        Some(ts) if ts < 0 => 0,
1081        _ => 0,
1082    }
1083}
1084
1085fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1086    let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1087    if is_vote {
1088        entry.votes = entry.votes.saturating_add(1);
1089    } else {
1090        entry.non_votes = entry.non_votes.saturating_add(1);
1091    }
1092}
1093
1094fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1095    SLOT_TX_TALLY
1096        .remove(&slot)
1097        .map(|(_, tally)| tally)
1098        .unwrap_or_default()
1099}
1100
1101// Ensure PluginRunnerError is Send + Sync + 'static
1102trait _CanSend: Send + Sync + 'static {}
1103impl _CanSend for PluginRunnerError {}
1104
1105#[inline]
1106fn human_readable_count(value: impl Into<u128>) -> String {
1107    let digits = value.into().to_string();
1108    let len = digits.len();
1109    let mut formatted = String::with_capacity(len + len / 3);
1110    for (idx, byte) in digits.bytes().enumerate() {
1111        if idx != 0 && (len - idx) % 3 == 0 {
1112            formatted.push(',');
1113        }
1114        formatted.push(char::from(byte));
1115    }
1116    formatted
1117}
1118
1119fn human_readable_duration(seconds: f64) -> String {
1120    if !seconds.is_finite() {
1121        return "n/a".into();
1122    }
1123    if seconds <= 0.0 {
1124        return "0s".into();
1125    }
1126    if seconds < 60.0 {
1127        return format!("{:.1}s", seconds);
1128    }
1129    let duration = Duration::from_secs(seconds.round() as u64);
1130    let secs = duration.as_secs();
1131    let days = secs / 86_400;
1132    let hours = (secs % 86_400) / 3_600;
1133    let minutes = (secs % 3_600) / 60;
1134    let seconds_rem = secs % 60;
1135    if days > 0 {
1136        if hours > 0 {
1137            format!("{}d{}h", days, hours)
1138        } else {
1139            format!("{}d", days)
1140        }
1141    } else if hours > 0 {
1142        format!("{}h{}m", hours, minutes)
1143    } else {
1144        format!("{}m{}s", minutes, seconds_rem)
1145    }
1146}