1#![deny(missing_docs)]
2pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114 fmt::Display,
115 future::Future,
116 hint,
117 ops::Range,
118 pin::Pin,
119 sync::{
120 Arc,
121 atomic::{AtomicBool, AtomicU64, Ordering},
122 },
123 time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130 BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137
138pub use jetstreamer_firehose::firehose::{
140 FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
141};
142
143static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
145static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
147static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
148#[inline]
149fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
150 origin.elapsed().as_nanos() as u64
151}
152
153pub type PluginFuture<'a> = Pin<
155 Box<
156 dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
157 + Send
158 + 'a,
159 >,
160>;
161
162pub trait Plugin: Send + Sync + 'static {
166 fn name(&self) -> &'static str;
168
169 fn version(&self) -> u16 {
171 1
172 }
173
174 fn id(&self) -> u16 {
176 let hash = Sha256::digest(self.name());
177 let mut res = 1u16;
178 for byte in hash {
179 res = res.wrapping_mul(31).wrapping_add(byte as u16);
180 }
181 res
182 }
183
184 fn on_transaction<'a>(
186 &'a self,
187 _thread_id: usize,
188 _db: Option<Arc<Client>>,
189 _transaction: &'a TransactionData,
190 ) -> PluginFuture<'a> {
191 async move { Ok(()) }.boxed()
192 }
193
194 fn on_block<'a>(
196 &'a self,
197 _thread_id: usize,
198 _db: Option<Arc<Client>>,
199 _block: &'a BlockData,
200 ) -> PluginFuture<'a> {
201 async move { Ok(()) }.boxed()
202 }
203
204 fn on_entry<'a>(
206 &'a self,
207 _thread_id: usize,
208 _db: Option<Arc<Client>>,
209 _entry: &'a EntryData,
210 ) -> PluginFuture<'a> {
211 async move { Ok(()) }.boxed()
212 }
213
214 fn on_reward<'a>(
216 &'a self,
217 _thread_id: usize,
218 _db: Option<Arc<Client>>,
219 _reward: &'a RewardsData,
220 ) -> PluginFuture<'a> {
221 async move { Ok(()) }.boxed()
222 }
223
224 fn on_error<'a>(
226 &'a self,
227 _thread_id: usize,
228 _db: Option<Arc<Client>>,
229 _error: &'a FirehoseErrorContext,
230 ) -> PluginFuture<'a> {
231 async move { Ok(()) }.boxed()
232 }
233
234 fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
236 async move { Ok(()) }.boxed()
237 }
238
239 fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
241 async move { Ok(()) }.boxed()
242 }
243}
244
245#[derive(Clone)]
249pub struct PluginRunner {
250 plugins: Arc<Vec<Arc<dyn Plugin>>>,
251 clickhouse_dsn: String,
252 num_threads: usize,
253 db_update_interval_slots: u64,
254}
255
256impl PluginRunner {
257 pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self {
259 Self {
260 plugins: Arc::new(Vec::new()),
261 clickhouse_dsn: clickhouse_dsn.to_string(),
262 num_threads: std::cmp::max(1, num_threads),
263 db_update_interval_slots: 100,
264 }
265 }
266
267 pub fn register(&mut self, plugin: Box<dyn Plugin>) {
269 Arc::get_mut(&mut self.plugins)
270 .expect("cannot register plugins after the runner has started")
271 .push(Arc::from(plugin));
272 }
273
274 pub async fn run(
276 self: Arc<Self>,
277 slot_range: Range<u64>,
278 clickhouse_enabled: bool,
279 ) -> Result<(), PluginRunnerError> {
280 let db_update_interval = self.db_update_interval_slots.max(1);
281 let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
282 self.plugins
283 .iter()
284 .cloned()
285 .map(PluginHandle::from)
286 .collect(),
287 );
288
289 let clickhouse = if clickhouse_enabled {
290 let client = Arc::new(
291 Client::default()
292 .with_url(&self.clickhouse_dsn)
293 .with_option("async_insert", "1")
294 .with_option("wait_for_async_insert", "0"),
295 );
296 ensure_clickhouse_tables(client.as_ref()).await?;
297 upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
298 Some(client)
299 } else {
300 None
301 };
302
303 for handle in plugin_handles.iter() {
304 if let Err(error) = handle
305 .plugin
306 .on_load(clickhouse.clone())
307 .await
308 .map_err(|e| e.to_string())
309 {
310 return Err(PluginRunnerError::PluginLifecycle {
311 plugin: handle.name,
312 stage: "on_load",
313 details: error,
314 });
315 }
316 }
317
318 let shutting_down = Arc::new(AtomicBool::new(false));
319 let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
320 let clickhouse_enabled = clickhouse.is_some();
321 let slots_since_flush = Arc::new(AtomicU64::new(0));
322
323 let on_block = {
324 let plugin_handles = plugin_handles.clone();
325 let clickhouse = clickhouse.clone();
326 let slot_buffer = slot_buffer.clone();
327 let slots_since_flush = slots_since_flush.clone();
328 let shutting_down = shutting_down.clone();
329 move |thread_id: usize, block: BlockData| {
330 let plugin_handles = plugin_handles.clone();
331 let clickhouse = clickhouse.clone();
332 let slot_buffer = slot_buffer.clone();
333 let slots_since_flush = slots_since_flush.clone();
334 let shutting_down = shutting_down.clone();
335 async move {
336 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
337 if shutting_down.load(Ordering::SeqCst) {
338 log::debug!(
339 target: &log_target,
340 "ignoring block while shutdown is in progress"
341 );
342 return Ok(());
343 }
344 let block = Arc::new(block);
345 if !plugin_handles.is_empty() {
346 for handle in plugin_handles.iter() {
347 let db = clickhouse.clone();
348 if let Err(err) = handle
349 .plugin
350 .on_block(thread_id, db.clone(), block.as_ref())
351 .await
352 {
353 log::error!(
354 target: &log_target,
355 "plugin {} on_block error: {}",
356 handle.name,
357 err
358 );
359 continue;
360 }
361 if let (Some(db_client), BlockData::Block { slot, .. }) =
362 (clickhouse.clone(), block.as_ref())
363 {
364 if clickhouse_enabled {
365 slot_buffer
366 .entry(handle.id)
367 .or_default()
368 .push(PluginSlotRow {
369 plugin_id: handle.id as u32,
370 slot: *slot,
371 });
372 } else if let Err(err) =
373 record_plugin_slot(db_client, handle.id, *slot).await
374 {
375 log::error!(
376 target: &log_target,
377 "failed to record plugin slot for {}: {}",
378 handle.name,
379 err
380 );
381 }
382 }
383 }
384 if clickhouse_enabled {
385 let current = slots_since_flush
386 .fetch_add(1, Ordering::Relaxed)
387 .wrapping_add(1);
388 if current.is_multiple_of(db_update_interval)
389 && let Some(db_client) = clickhouse.clone()
390 {
391 let buffer = slot_buffer.clone();
392 let log_target_clone = log_target.clone();
393 tokio::spawn(async move {
394 if let Err(err) = flush_slot_buffer(db_client, buffer).await {
395 log::error!(
396 target: &log_target_clone,
397 "failed to flush buffered plugin slots: {}",
398 err
399 );
400 }
401 });
402 }
403 }
404 }
405 if let Some(db_client) = clickhouse.clone() {
406 match block.as_ref() {
407 BlockData::Block {
408 slot,
409 executed_transaction_count,
410 block_time,
411 ..
412 } => {
413 let tally = take_slot_tx_tally(*slot);
414 if let Err(err) = record_slot_status(
415 db_client,
416 *slot,
417 thread_id,
418 *executed_transaction_count,
419 tally.votes,
420 tally.non_votes,
421 *block_time,
422 )
423 .await
424 {
425 log::error!(
426 target: &log_target,
427 "failed to record slot status: {}",
428 err
429 );
430 }
431 }
432 BlockData::PossibleLeaderSkipped { slot } => {
433 take_slot_tx_tally(*slot);
435 }
436 }
437 }
438 Ok(())
439 }
440 .boxed()
441 }
442 };
443
444 let on_transaction = {
445 let plugin_handles = plugin_handles.clone();
446 let clickhouse = clickhouse.clone();
447 let shutting_down = shutting_down.clone();
448 move |thread_id: usize, transaction: TransactionData| {
449 let plugin_handles = plugin_handles.clone();
450 let clickhouse = clickhouse.clone();
451 let shutting_down = shutting_down.clone();
452 async move {
453 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
454 record_slot_vote_tally(transaction.slot, transaction.is_vote);
455 if plugin_handles.is_empty() {
456 return Ok(());
457 }
458 if shutting_down.load(Ordering::SeqCst) {
459 log::debug!(
460 target: &log_target,
461 "ignoring transaction while shutdown is in progress"
462 );
463 return Ok(());
464 }
465 let transaction = Arc::new(transaction);
466 for handle in plugin_handles.iter() {
467 if let Err(err) = handle
468 .plugin
469 .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref())
470 .await
471 {
472 log::error!(
473 target: &log_target,
474 "plugin {} on_transaction error: {}",
475 handle.name,
476 err
477 );
478 }
479 }
480 Ok(())
481 }
482 .boxed()
483 }
484 };
485
486 let on_entry = {
487 let plugin_handles = plugin_handles.clone();
488 let clickhouse = clickhouse.clone();
489 let shutting_down = shutting_down.clone();
490 move |thread_id: usize, entry: EntryData| {
491 let plugin_handles = plugin_handles.clone();
492 let clickhouse = clickhouse.clone();
493 let shutting_down = shutting_down.clone();
494 async move {
495 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
496 if plugin_handles.is_empty() {
497 return Ok(());
498 }
499 if shutting_down.load(Ordering::SeqCst) {
500 log::debug!(
501 target: &log_target,
502 "ignoring entry while shutdown is in progress"
503 );
504 return Ok(());
505 }
506 let entry = Arc::new(entry);
507 for handle in plugin_handles.iter() {
508 if let Err(err) = handle
509 .plugin
510 .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
511 .await
512 {
513 log::error!(
514 target: &log_target,
515 "plugin {} on_entry error: {}",
516 handle.name,
517 err
518 );
519 }
520 }
521 Ok(())
522 }
523 .boxed()
524 }
525 };
526
527 let on_reward = {
528 let plugin_handles = plugin_handles.clone();
529 let clickhouse = clickhouse.clone();
530 let shutting_down = shutting_down.clone();
531 move |thread_id: usize, reward: RewardsData| {
532 let plugin_handles = plugin_handles.clone();
533 let clickhouse = clickhouse.clone();
534 let shutting_down = shutting_down.clone();
535 async move {
536 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
537 if plugin_handles.is_empty() {
538 return Ok(());
539 }
540 if shutting_down.load(Ordering::SeqCst) {
541 log::debug!(
542 target: &log_target,
543 "ignoring reward while shutdown is in progress"
544 );
545 return Ok(());
546 }
547 let reward = Arc::new(reward);
548 for handle in plugin_handles.iter() {
549 if let Err(err) = handle
550 .plugin
551 .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
552 .await
553 {
554 log::error!(
555 target: &log_target,
556 "plugin {} on_reward error: {}",
557 handle.name,
558 err
559 );
560 }
561 }
562 Ok(())
563 }
564 .boxed()
565 }
566 };
567
568 let on_error = {
569 let plugin_handles = plugin_handles.clone();
570 let clickhouse = clickhouse.clone();
571 let shutting_down = shutting_down.clone();
572 move |thread_id: usize, context: FirehoseErrorContext| {
573 let plugin_handles = plugin_handles.clone();
574 let clickhouse = clickhouse.clone();
575 let shutting_down = shutting_down.clone();
576 async move {
577 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
578 if plugin_handles.is_empty() {
579 return Ok(());
580 }
581 if shutting_down.load(Ordering::SeqCst) {
582 log::debug!(
583 target: &log_target,
584 "ignoring error callback while shutdown is in progress"
585 );
586 return Ok(());
587 }
588 let context = Arc::new(context);
589 for handle in plugin_handles.iter() {
590 if let Err(err) = handle
591 .plugin
592 .on_error(thread_id, clickhouse.clone(), context.as_ref())
593 .await
594 {
595 log::error!(
596 target: &log_target,
597 "plugin {} on_error error: {}",
598 handle.name,
599 err
600 );
601 }
602 }
603 Ok(())
604 }
605 .boxed()
606 }
607 };
608
609 let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
610
611 let total_slot_count_capture = total_slot_count;
612 let run_origin = std::time::Instant::now();
613 SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
615 LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
616 LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
617 LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
618 let stats_tracking = clickhouse.clone().map(|_db| {
619 let shutting_down = shutting_down.clone();
620 let thread_progress_max: Arc<DashMap<usize, f64>> = Arc::new(DashMap::new());
621 StatsTracking {
622 on_stats: {
623 let thread_progress_max = thread_progress_max.clone();
624 let total_slot_count = total_slot_count_capture;
625 move |thread_id: usize, stats: Stats| {
626 let shutting_down = shutting_down.clone();
627 let thread_progress_max = thread_progress_max.clone();
628 async move {
629 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
630 if shutting_down.load(Ordering::SeqCst) {
631 log::debug!(
632 target: &log_target,
633 "skipping stats write during shutdown"
634 );
635 return Ok(());
636 }
637 let finish_at = stats
638 .finish_time
639 .unwrap_or_else(std::time::Instant::now);
640 let elapsed_since_start = finish_at
641 .saturating_duration_since(stats.start_time)
642 .as_nanos()
643 .max(1) as u64;
644 let total_slots = stats.slots_processed;
645 let total_txs = stats.transactions_processed;
646 let now_ns = monotonic_nanos_since(run_origin);
647 let (delta_slots, delta_txs, delta_time_ns) = {
651 while SNAPSHOT_LOCK
652 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
653 .is_err()
654 {
655 hint::spin_loop();
656 }
657 let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
658 let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
659 let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
660 LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
661 LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
662 LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
663 SNAPSHOT_LOCK.store(false, Ordering::Release);
664 let delta_slots = total_slots.saturating_sub(prev_slots);
665 let delta_txs = total_txs.saturating_sub(prev_txs);
666 let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
667 (delta_slots, delta_txs, delta_time_ns)
668 };
669 let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
670 let mut slot_rate = delta_slots as f64 / delta_secs;
671 let mut tps = delta_txs as f64 / delta_secs;
672 if slot_rate <= 0.0 && total_slots > 0 {
673 slot_rate =
674 total_slots as f64 / (elapsed_since_start as f64 / 1e9);
675 }
676 if tps <= 0.0 && total_txs > 0 {
677 tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
678 }
679 let thread_stats = &stats.thread_stats;
680 let processed_slots = stats.slots_processed.min(total_slot_count);
681 let progress_fraction = if total_slot_count > 0 {
682 processed_slots as f64 / total_slot_count as f64
683 } else {
684 1.0
685 };
686 let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
687 let thread_total_slots = thread_stats
688 .initial_slot_range
689 .end
690 .saturating_sub(thread_stats.initial_slot_range.start);
691 let thread_progress_raw = if thread_total_slots > 0 {
692 (thread_stats.slots_processed as f64 / thread_total_slots as f64)
693 .clamp(0.0, 1.0)
694 * 100.0
695 } else {
696 100.0
697 };
698 let thread_progress = *thread_progress_max
699 .entry(thread_id)
700 .and_modify(|max| {
701 if thread_progress_raw > *max {
702 *max = thread_progress_raw;
703 }
704 })
705 .or_insert(thread_progress_raw);
706 let mut overall_eta = None;
707 if slot_rate > 0.0 {
708 let remaining_slots =
709 total_slot_count.saturating_sub(processed_slots);
710 overall_eta = Some(human_readable_duration(
711 remaining_slots as f64 / slot_rate,
712 ));
713 }
714 if overall_eta.is_none() {
715 if progress_fraction > 0.0 && progress_fraction < 1.0 {
716 if let Some(elapsed_total) = finish_at
717 .checked_duration_since(stats.start_time)
718 .map(|d| d.as_secs_f64())
719 && elapsed_total > 0.0 {
720 let remaining_secs =
721 elapsed_total * (1.0 / progress_fraction - 1.0);
722 overall_eta = Some(human_readable_duration(remaining_secs));
723 }
724 } else if progress_fraction >= 1.0 {
725 overall_eta = Some("0s".into());
726 }
727 }
728 let slots_display = human_readable_count(processed_slots);
729 let blocks_display = human_readable_count(stats.blocks_processed);
730 let txs_display = human_readable_count(stats.transactions_processed);
731 let tps_display = human_readable_count(tps.ceil() as u64);
732 log::info!(
733 target: &log_target,
734 "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
735 overall_eta.unwrap_or_else(|| "n/a".into()),
736 );
737 Ok(())
738 }
739 .boxed()
740 }
741 },
742 tracking_interval_slots: 100,
743 }
744 });
745
746 let (shutdown_tx, _) = broadcast::channel::<()>(1);
747
748 let mut firehose_future = Box::pin(firehose(
749 self.num_threads as u64,
750 slot_range,
751 Some(on_block),
752 Some(on_transaction),
753 Some(on_entry),
754 Some(on_reward),
755 Some(on_error),
756 stats_tracking,
757 Some(shutdown_tx.subscribe()),
758 ));
759
760 let firehose_result = tokio::select! {
761 res = &mut firehose_future => res,
762 ctrl = signal::ctrl_c() => {
763 match ctrl {
764 Ok(()) => log::info!(
765 target: LOG_MODULE,
766 "CTRL+C received; initiating shutdown"
767 ),
768 Err(err) => log::error!(
769 target: LOG_MODULE,
770 "failed to listen for CTRL+C: {}",
771 err
772 ),
773 }
774 shutting_down.store(true, Ordering::SeqCst);
775 let _ = shutdown_tx.send(());
776 firehose_future.await
777 }
778 };
779
780 if clickhouse_enabled
781 && let Some(db_client) = clickhouse.clone()
782 && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
783 {
784 log::error!(
785 target: LOG_MODULE,
786 "failed to flush buffered plugin slots: {}",
787 err
788 );
789 }
790
791 for handle in plugin_handles.iter() {
792 if let Err(error) = handle
793 .plugin
794 .on_exit(clickhouse.clone())
795 .await
796 .map_err(|e| e.to_string())
797 {
798 log::error!(
799 target: LOG_MODULE,
800 "plugin {} on_exit error: {}",
801 handle.name,
802 error
803 );
804 }
805 }
806
807 match firehose_result {
808 Ok(()) => Ok(()),
809 Err((error, slot)) => Err(PluginRunnerError::Firehose {
810 details: error.to_string(),
811 slot,
812 }),
813 }
814 }
815}
816
817#[derive(Debug, Error)]
819pub enum PluginRunnerError {
820 #[error("clickhouse error: {0}")]
822 Clickhouse(#[from] clickhouse::error::Error),
823 #[error("firehose error at slot {slot}: {details}")]
825 Firehose {
826 details: String,
828 slot: u64,
830 },
831 #[error("plugin {plugin} failed during {stage}: {details}")]
833 PluginLifecycle {
834 plugin: &'static str,
836 stage: &'static str,
838 details: String,
840 },
841}
842
843#[derive(Clone)]
844struct PluginHandle {
845 plugin: Arc<dyn Plugin>,
846 id: u16,
847 name: &'static str,
848 version: u16,
849}
850
851impl From<Arc<dyn Plugin>> for PluginHandle {
852 fn from(plugin: Arc<dyn Plugin>) -> Self {
853 let id = plugin.id();
854 let name = plugin.name();
855 let version = plugin.version();
856 Self {
857 plugin,
858 id,
859 name,
860 version,
861 }
862 }
863}
864
865#[derive(Row, Serialize)]
866struct PluginRow<'a> {
867 id: u32,
868 name: &'a str,
869 version: u32,
870}
871
872#[derive(Row, Serialize)]
873struct PluginSlotRow {
874 plugin_id: u32,
875 slot: u64,
876}
877
878#[derive(Row, Serialize)]
879struct SlotStatusRow {
880 slot: u64,
881 transaction_count: u32,
882 vote_transaction_count: u32,
883 non_vote_transaction_count: u32,
884 thread_id: u8,
885 block_time: u32,
886}
887
888#[derive(Default, Clone, Copy)]
889struct SlotTxTally {
890 votes: u64,
891 non_votes: u64,
892}
893
894static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally>> = Lazy::new(DashMap::new);
895
896async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
897 db.query(
898 r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
899 slot UInt64,
900 transaction_count UInt32 DEFAULT 0,
901 vote_transaction_count UInt32 DEFAULT 0,
902 non_vote_transaction_count UInt32 DEFAULT 0,
903 thread_id UInt8 DEFAULT 0,
904 block_time DateTime('UTC') DEFAULT toDateTime(0),
905 indexed_at DateTime('UTC') DEFAULT now()
906 ) ENGINE = ReplacingMergeTree(indexed_at)
907 ORDER BY slot"#,
908 )
909 .execute()
910 .await?;
911
912 db.query(
913 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
914 id UInt32,
915 name String,
916 version UInt32
917 ) ENGINE = ReplacingMergeTree
918 ORDER BY id"#,
919 )
920 .execute()
921 .await?;
922
923 db.query(
924 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
925 plugin_id UInt32,
926 slot UInt64,
927 indexed_at DateTime('UTC') DEFAULT now()
928 ) ENGINE = ReplacingMergeTree
929 ORDER BY (plugin_id, slot)"#,
930 )
931 .execute()
932 .await?;
933
934 Ok(())
935}
936
937async fn upsert_plugins(
938 db: &Client,
939 plugins: &[PluginHandle],
940) -> Result<(), clickhouse::error::Error> {
941 if plugins.is_empty() {
942 return Ok(());
943 }
944 let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
945 for handle in plugins {
946 insert
947 .write(&PluginRow {
948 id: handle.id as u32,
949 name: handle.name,
950 version: handle.version as u32,
951 })
952 .await?;
953 }
954 insert.end().await?;
955 Ok(())
956}
957
958async fn record_plugin_slot(
959 db: Arc<Client>,
960 plugin_id: u16,
961 slot: u64,
962) -> Result<(), clickhouse::error::Error> {
963 let mut insert = db
964 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
965 .await?;
966 insert
967 .write(&PluginSlotRow {
968 plugin_id: plugin_id as u32,
969 slot,
970 })
971 .await?;
972 insert.end().await?;
973 Ok(())
974}
975
976async fn flush_slot_buffer(
977 db: Arc<Client>,
978 buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
979) -> Result<(), clickhouse::error::Error> {
980 let mut rows = Vec::new();
981 buffer.iter_mut().for_each(|mut entry| {
982 if !entry.value().is_empty() {
983 rows.append(entry.value_mut());
984 }
985 });
986
987 if rows.is_empty() {
988 return Ok(());
989 }
990
991 let mut insert = db
992 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
993 .await?;
994 for row in rows {
995 insert.write(&row).await?;
996 }
997 insert.end().await?;
998 Ok(())
999}
1000
1001async fn record_slot_status(
1002 db: Arc<Client>,
1003 slot: u64,
1004 thread_id: usize,
1005 transaction_count: u64,
1006 vote_transaction_count: u64,
1007 non_vote_transaction_count: u64,
1008 block_time: Option<i64>,
1009) -> Result<(), clickhouse::error::Error> {
1010 let mut insert = db
1011 .insert::<SlotStatusRow>("jetstreamer_slot_status")
1012 .await?;
1013 insert
1014 .write(&SlotStatusRow {
1015 slot,
1016 transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1017 vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1018 non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1019 thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1020 block_time: clamp_block_time(block_time),
1021 })
1022 .await?;
1023 insert.end().await?;
1024 Ok(())
1025}
1026
1027fn clamp_block_time(block_time: Option<i64>) -> u32 {
1028 match block_time {
1029 Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1030 Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1031 Some(ts) if ts < 0 => 0,
1032 _ => 0,
1033 }
1034}
1035
1036fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1037 let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1038 if is_vote {
1039 entry.votes = entry.votes.saturating_add(1);
1040 } else {
1041 entry.non_votes = entry.non_votes.saturating_add(1);
1042 }
1043}
1044
1045fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1046 SLOT_TX_TALLY
1047 .remove(&slot)
1048 .map(|(_, tally)| tally)
1049 .unwrap_or_default()
1050}
1051
1052trait _CanSend: Send + Sync + 'static {}
1054impl _CanSend for PluginRunnerError {}
1055
1056#[inline]
1057fn human_readable_count(value: impl Into<u128>) -> String {
1058 let digits = value.into().to_string();
1059 let len = digits.len();
1060 let mut formatted = String::with_capacity(len + len / 3);
1061 for (idx, byte) in digits.bytes().enumerate() {
1062 if idx != 0 && (len - idx) % 3 == 0 {
1063 formatted.push(',');
1064 }
1065 formatted.push(char::from(byte));
1066 }
1067 formatted
1068}
1069
1070fn human_readable_duration(seconds: f64) -> String {
1071 if !seconds.is_finite() {
1072 return "n/a".into();
1073 }
1074 if seconds <= 0.0 {
1075 return "0s".into();
1076 }
1077 if seconds < 60.0 {
1078 return format!("{:.1}s", seconds);
1079 }
1080 let duration = Duration::from_secs(seconds.round() as u64);
1081 let secs = duration.as_secs();
1082 let days = secs / 86_400;
1083 let hours = (secs % 86_400) / 3_600;
1084 let minutes = (secs % 3_600) / 60;
1085 let seconds_rem = secs % 60;
1086 if days > 0 {
1087 if hours > 0 {
1088 format!("{}d{}h", days, hours)
1089 } else {
1090 format!("{}d", days)
1091 }
1092 } else if hours > 0 {
1093 format!("{}h{}m", hours, minutes)
1094 } else {
1095 format!("{}m{}s", minutes, seconds_rem)
1096 }
1097}