1#![deny(missing_docs)]
2pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114 fmt::Display,
115 future::Future,
116 hint,
117 ops::Range,
118 pin::Pin,
119 sync::{
120 Arc,
121 atomic::{AtomicBool, AtomicU64, Ordering},
122 },
123 time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130 BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use serde::Serialize;
133use sha2::{Digest, Sha256};
134use thiserror::Error;
135use tokio::{signal, sync::broadcast};
136
137pub use jetstreamer_firehose::firehose::{
139 FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
140};
141
142static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
144static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
145static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
146static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
147#[inline]
148fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
149 origin.elapsed().as_nanos() as u64
150}
151
152pub type PluginFuture<'a> = Pin<
154 Box<
155 dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
156 + Send
157 + 'a,
158 >,
159>;
160
161pub trait Plugin: Send + Sync + 'static {
165 fn name(&self) -> &'static str;
167
168 fn version(&self) -> u16 {
170 1
171 }
172
173 fn id(&self) -> u16 {
175 let hash = Sha256::digest(self.name());
176 let mut res = 1u16;
177 for byte in hash {
178 res = res.wrapping_mul(31).wrapping_add(byte as u16);
179 }
180 res
181 }
182
183 fn on_transaction<'a>(
185 &'a self,
186 _thread_id: usize,
187 _db: Option<Arc<Client>>,
188 _transaction: &'a TransactionData,
189 ) -> PluginFuture<'a> {
190 async move { Ok(()) }.boxed()
191 }
192
193 fn on_block<'a>(
195 &'a self,
196 _thread_id: usize,
197 _db: Option<Arc<Client>>,
198 _block: &'a BlockData,
199 ) -> PluginFuture<'a> {
200 async move { Ok(()) }.boxed()
201 }
202
203 fn on_entry<'a>(
205 &'a self,
206 _thread_id: usize,
207 _db: Option<Arc<Client>>,
208 _entry: &'a EntryData,
209 ) -> PluginFuture<'a> {
210 async move { Ok(()) }.boxed()
211 }
212
213 fn on_reward<'a>(
215 &'a self,
216 _thread_id: usize,
217 _db: Option<Arc<Client>>,
218 _reward: &'a RewardsData,
219 ) -> PluginFuture<'a> {
220 async move { Ok(()) }.boxed()
221 }
222
223 fn on_error<'a>(
225 &'a self,
226 _thread_id: usize,
227 _db: Option<Arc<Client>>,
228 _error: &'a FirehoseErrorContext,
229 ) -> PluginFuture<'a> {
230 async move { Ok(()) }.boxed()
231 }
232
233 fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
235 async move { Ok(()) }.boxed()
236 }
237
238 fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
240 async move { Ok(()) }.boxed()
241 }
242}
243
244#[derive(Clone)]
248pub struct PluginRunner {
249 plugins: Arc<Vec<Arc<dyn Plugin>>>,
250 clickhouse_dsn: String,
251 num_threads: usize,
252 db_update_interval_slots: u64,
253}
254
255impl PluginRunner {
256 pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self {
258 Self {
259 plugins: Arc::new(Vec::new()),
260 clickhouse_dsn: clickhouse_dsn.to_string(),
261 num_threads: std::cmp::max(1, num_threads),
262 db_update_interval_slots: 100,
263 }
264 }
265
266 pub fn register(&mut self, plugin: Box<dyn Plugin>) {
268 Arc::get_mut(&mut self.plugins)
269 .expect("cannot register plugins after the runner has started")
270 .push(Arc::from(plugin));
271 }
272
273 pub async fn run(
275 self: Arc<Self>,
276 slot_range: Range<u64>,
277 clickhouse_enabled: bool,
278 ) -> Result<(), PluginRunnerError> {
279 let db_update_interval = self.db_update_interval_slots.max(1);
280 let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
281 self.plugins
282 .iter()
283 .cloned()
284 .map(PluginHandle::from)
285 .collect(),
286 );
287
288 let clickhouse = if clickhouse_enabled {
289 let client = Arc::new(
290 Client::default()
291 .with_url(&self.clickhouse_dsn)
292 .with_option("async_insert", "1")
293 .with_option("wait_for_async_insert", "0"),
294 );
295 ensure_clickhouse_tables(client.as_ref()).await?;
296 upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
297 Some(client)
298 } else {
299 None
300 };
301
302 for handle in plugin_handles.iter() {
303 if let Err(error) = handle
304 .plugin
305 .on_load(clickhouse.clone())
306 .await
307 .map_err(|e| e.to_string())
308 {
309 return Err(PluginRunnerError::PluginLifecycle {
310 plugin: handle.name,
311 stage: "on_load",
312 details: error,
313 });
314 }
315 }
316
317 let shutting_down = Arc::new(AtomicBool::new(false));
318 let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
319 let clickhouse_enabled = clickhouse.is_some();
320 let slots_since_flush = Arc::new(AtomicU64::new(0));
321
322 let on_block = {
323 let plugin_handles = plugin_handles.clone();
324 let clickhouse = clickhouse.clone();
325 let slot_buffer = slot_buffer.clone();
326 let slots_since_flush = slots_since_flush.clone();
327 let shutting_down = shutting_down.clone();
328 move |thread_id: usize, block: BlockData| {
329 let plugin_handles = plugin_handles.clone();
330 let clickhouse = clickhouse.clone();
331 let slot_buffer = slot_buffer.clone();
332 let slots_since_flush = slots_since_flush.clone();
333 let shutting_down = shutting_down.clone();
334 async move {
335 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
336 if plugin_handles.is_empty() {
337 return Ok(());
338 }
339 if shutting_down.load(Ordering::SeqCst) {
340 log::debug!(
341 target: &log_target,
342 "ignoring block while shutdown is in progress"
343 );
344 return Ok(());
345 }
346 let block = Arc::new(block);
347 for handle in plugin_handles.iter() {
348 let db = clickhouse.clone();
349 if let Err(err) = handle
350 .plugin
351 .on_block(thread_id, db.clone(), block.as_ref())
352 .await
353 {
354 log::error!(
355 target: &log_target,
356 "plugin {} on_block error: {}",
357 handle.name,
358 err
359 );
360 continue;
361 }
362 if let (Some(db_client), BlockData::Block { slot, .. }) =
363 (clickhouse.clone(), block.as_ref())
364 {
365 if clickhouse_enabled {
366 slot_buffer
367 .entry(handle.id)
368 .or_default()
369 .push(PluginSlotRow {
370 plugin_id: handle.id as u32,
371 slot: *slot,
372 });
373 } else if let Err(err) =
374 record_plugin_slot(db_client, handle.id, *slot).await
375 {
376 log::error!(
377 target: &log_target,
378 "failed to record plugin slot for {}: {}",
379 handle.name,
380 err
381 );
382 }
383 }
384 }
385 if clickhouse_enabled {
386 let current = slots_since_flush
387 .fetch_add(1, Ordering::Relaxed)
388 .wrapping_add(1);
389 if current.is_multiple_of(db_update_interval)
390 && let Some(db_client) = clickhouse.clone()
391 {
392 let buffer = slot_buffer.clone();
393 let log_target_clone = log_target.clone();
394 tokio::spawn(async move {
395 if let Err(err) = flush_slot_buffer(db_client, buffer).await {
396 log::error!(
397 target: &log_target_clone,
398 "failed to flush buffered plugin slots: {}",
399 err
400 );
401 }
402 });
403 }
404 }
405 if let Some(db_client) = clickhouse.clone() {
406 match block.as_ref() {
407 BlockData::Block {
408 slot,
409 executed_transaction_count,
410 block_time,
411 ..
412 } => {
413 if let Err(err) = record_slot_status(
414 db_client,
415 *slot,
416 thread_id,
417 *executed_transaction_count,
418 *block_time,
419 )
420 .await
421 {
422 log::error!(
423 target: &log_target,
424 "failed to record slot status: {}",
425 err
426 );
427 }
428 }
429 BlockData::PossibleLeaderSkipped { .. } => {}
430 }
431 }
432 Ok(())
433 }
434 .boxed()
435 }
436 };
437
438 let on_transaction = {
439 let plugin_handles = plugin_handles.clone();
440 let clickhouse = clickhouse.clone();
441 let shutting_down = shutting_down.clone();
442 move |thread_id: usize, transaction: TransactionData| {
443 let plugin_handles = plugin_handles.clone();
444 let clickhouse = clickhouse.clone();
445 let shutting_down = shutting_down.clone();
446 async move {
447 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
448 if plugin_handles.is_empty() {
449 return Ok(());
450 }
451 if shutting_down.load(Ordering::SeqCst) {
452 log::debug!(
453 target: &log_target,
454 "ignoring transaction while shutdown is in progress"
455 );
456 return Ok(());
457 }
458 let transaction = Arc::new(transaction);
459 for handle in plugin_handles.iter() {
460 if let Err(err) = handle
461 .plugin
462 .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref())
463 .await
464 {
465 log::error!(
466 target: &log_target,
467 "plugin {} on_transaction error: {}",
468 handle.name,
469 err
470 );
471 }
472 }
473 Ok(())
474 }
475 .boxed()
476 }
477 };
478
479 let on_entry = {
480 let plugin_handles = plugin_handles.clone();
481 let clickhouse = clickhouse.clone();
482 let shutting_down = shutting_down.clone();
483 move |thread_id: usize, entry: EntryData| {
484 let plugin_handles = plugin_handles.clone();
485 let clickhouse = clickhouse.clone();
486 let shutting_down = shutting_down.clone();
487 async move {
488 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
489 if plugin_handles.is_empty() {
490 return Ok(());
491 }
492 if shutting_down.load(Ordering::SeqCst) {
493 log::debug!(
494 target: &log_target,
495 "ignoring entry while shutdown is in progress"
496 );
497 return Ok(());
498 }
499 let entry = Arc::new(entry);
500 for handle in plugin_handles.iter() {
501 if let Err(err) = handle
502 .plugin
503 .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
504 .await
505 {
506 log::error!(
507 target: &log_target,
508 "plugin {} on_entry error: {}",
509 handle.name,
510 err
511 );
512 }
513 }
514 Ok(())
515 }
516 .boxed()
517 }
518 };
519
520 let on_reward = {
521 let plugin_handles = plugin_handles.clone();
522 let clickhouse = clickhouse.clone();
523 let shutting_down = shutting_down.clone();
524 move |thread_id: usize, reward: RewardsData| {
525 let plugin_handles = plugin_handles.clone();
526 let clickhouse = clickhouse.clone();
527 let shutting_down = shutting_down.clone();
528 async move {
529 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
530 if plugin_handles.is_empty() {
531 return Ok(());
532 }
533 if shutting_down.load(Ordering::SeqCst) {
534 log::debug!(
535 target: &log_target,
536 "ignoring reward while shutdown is in progress"
537 );
538 return Ok(());
539 }
540 let reward = Arc::new(reward);
541 for handle in plugin_handles.iter() {
542 if let Err(err) = handle
543 .plugin
544 .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
545 .await
546 {
547 log::error!(
548 target: &log_target,
549 "plugin {} on_reward error: {}",
550 handle.name,
551 err
552 );
553 }
554 }
555 Ok(())
556 }
557 .boxed()
558 }
559 };
560
561 let on_error = {
562 let plugin_handles = plugin_handles.clone();
563 let clickhouse = clickhouse.clone();
564 let shutting_down = shutting_down.clone();
565 move |thread_id: usize, context: FirehoseErrorContext| {
566 let plugin_handles = plugin_handles.clone();
567 let clickhouse = clickhouse.clone();
568 let shutting_down = shutting_down.clone();
569 async move {
570 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
571 if plugin_handles.is_empty() {
572 return Ok(());
573 }
574 if shutting_down.load(Ordering::SeqCst) {
575 log::debug!(
576 target: &log_target,
577 "ignoring error callback while shutdown is in progress"
578 );
579 return Ok(());
580 }
581 let context = Arc::new(context);
582 for handle in plugin_handles.iter() {
583 if let Err(err) = handle
584 .plugin
585 .on_error(thread_id, clickhouse.clone(), context.as_ref())
586 .await
587 {
588 log::error!(
589 target: &log_target,
590 "plugin {} on_error error: {}",
591 handle.name,
592 err
593 );
594 }
595 }
596 Ok(())
597 }
598 .boxed()
599 }
600 };
601
602 let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
603
604 let total_slot_count_capture = total_slot_count;
605 let run_origin = std::time::Instant::now();
606 SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
608 LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
609 LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
610 LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
611 let stats_tracking = clickhouse.clone().map(|_db| {
612 let shutting_down = shutting_down.clone();
613 let thread_progress_max: Arc<DashMap<usize, f64>> = Arc::new(DashMap::new());
614 StatsTracking {
615 on_stats: {
616 let thread_progress_max = thread_progress_max.clone();
617 let total_slot_count = total_slot_count_capture;
618 move |thread_id: usize, stats: Stats| {
619 let shutting_down = shutting_down.clone();
620 let thread_progress_max = thread_progress_max.clone();
621 async move {
622 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
623 if shutting_down.load(Ordering::SeqCst) {
624 log::debug!(
625 target: &log_target,
626 "skipping stats write during shutdown"
627 );
628 return Ok(());
629 }
630 let finish_at = stats
631 .finish_time
632 .unwrap_or_else(std::time::Instant::now);
633 let elapsed_since_start = finish_at
634 .saturating_duration_since(stats.start_time)
635 .as_nanos()
636 .max(1) as u64;
637 let total_slots = stats.slots_processed;
638 let total_txs = stats.transactions_processed;
639 let now_ns = monotonic_nanos_since(run_origin);
640 let (delta_slots, delta_txs, delta_time_ns) = {
644 while SNAPSHOT_LOCK
645 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
646 .is_err()
647 {
648 hint::spin_loop();
649 }
650 let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
651 let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
652 let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
653 LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
654 LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
655 LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
656 SNAPSHOT_LOCK.store(false, Ordering::Release);
657 let delta_slots = total_slots.saturating_sub(prev_slots);
658 let delta_txs = total_txs.saturating_sub(prev_txs);
659 let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
660 (delta_slots, delta_txs, delta_time_ns)
661 };
662 let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
663 let mut slot_rate = delta_slots as f64 / delta_secs;
664 let mut tps = delta_txs as f64 / delta_secs;
665 if slot_rate <= 0.0 && total_slots > 0 {
666 slot_rate =
667 total_slots as f64 / (elapsed_since_start as f64 / 1e9);
668 }
669 if tps <= 0.0 && total_txs > 0 {
670 tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
671 }
672 let thread_stats = &stats.thread_stats;
673 let processed_slots = stats.slots_processed.min(total_slot_count);
674 let progress_fraction = if total_slot_count > 0 {
675 processed_slots as f64 / total_slot_count as f64
676 } else {
677 1.0
678 };
679 let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
680 let thread_total_slots = thread_stats
681 .initial_slot_range
682 .end
683 .saturating_sub(thread_stats.initial_slot_range.start);
684 let thread_progress_raw = if thread_total_slots > 0 {
685 (thread_stats.slots_processed as f64 / thread_total_slots as f64)
686 .clamp(0.0, 1.0)
687 * 100.0
688 } else {
689 100.0
690 };
691 let thread_progress = *thread_progress_max
692 .entry(thread_id)
693 .and_modify(|max| {
694 if thread_progress_raw > *max {
695 *max = thread_progress_raw;
696 }
697 })
698 .or_insert(thread_progress_raw);
699 let mut overall_eta = None;
700 if slot_rate > 0.0 {
701 let remaining_slots =
702 total_slot_count.saturating_sub(processed_slots);
703 overall_eta = Some(human_readable_duration(
704 remaining_slots as f64 / slot_rate,
705 ));
706 }
707 if overall_eta.is_none() {
708 if progress_fraction > 0.0 && progress_fraction < 1.0 {
709 if let Some(elapsed_total) = finish_at
710 .checked_duration_since(stats.start_time)
711 .map(|d| d.as_secs_f64())
712 && elapsed_total > 0.0 {
713 let remaining_secs =
714 elapsed_total * (1.0 / progress_fraction - 1.0);
715 overall_eta = Some(human_readable_duration(remaining_secs));
716 }
717 } else if progress_fraction >= 1.0 {
718 overall_eta = Some("0s".into());
719 }
720 }
721 let slots_display = human_readable_count(processed_slots);
722 let blocks_display = human_readable_count(stats.blocks_processed);
723 let txs_display = human_readable_count(stats.transactions_processed);
724 let tps_display = human_readable_count(tps.ceil() as u64);
725 log::info!(
726 target: &log_target,
727 "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
728 overall_eta.unwrap_or_else(|| "n/a".into()),
729 );
730 Ok(())
731 }
732 .boxed()
733 }
734 },
735 tracking_interval_slots: 100,
736 }
737 });
738
739 let (shutdown_tx, _) = broadcast::channel::<()>(1);
740
741 let mut firehose_future = Box::pin(firehose(
742 self.num_threads as u64,
743 slot_range,
744 Some(on_block),
745 Some(on_transaction),
746 Some(on_entry),
747 Some(on_reward),
748 Some(on_error),
749 stats_tracking,
750 Some(shutdown_tx.subscribe()),
751 ));
752
753 let firehose_result = tokio::select! {
754 res = &mut firehose_future => res,
755 ctrl = signal::ctrl_c() => {
756 match ctrl {
757 Ok(()) => log::info!(
758 target: LOG_MODULE,
759 "CTRL+C received; initiating shutdown"
760 ),
761 Err(err) => log::error!(
762 target: LOG_MODULE,
763 "failed to listen for CTRL+C: {}",
764 err
765 ),
766 }
767 shutting_down.store(true, Ordering::SeqCst);
768 let _ = shutdown_tx.send(());
769 firehose_future.await
770 }
771 };
772
773 if clickhouse_enabled
774 && let Some(db_client) = clickhouse.clone()
775 && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
776 {
777 log::error!(
778 target: LOG_MODULE,
779 "failed to flush buffered plugin slots: {}",
780 err
781 );
782 }
783
784 for handle in plugin_handles.iter() {
785 if let Err(error) = handle
786 .plugin
787 .on_exit(clickhouse.clone())
788 .await
789 .map_err(|e| e.to_string())
790 {
791 log::error!(
792 target: LOG_MODULE,
793 "plugin {} on_exit error: {}",
794 handle.name,
795 error
796 );
797 }
798 }
799
800 match firehose_result {
801 Ok(()) => Ok(()),
802 Err((error, slot)) => Err(PluginRunnerError::Firehose {
803 details: error.to_string(),
804 slot,
805 }),
806 }
807 }
808}
809
810#[derive(Debug, Error)]
812pub enum PluginRunnerError {
813 #[error("clickhouse error: {0}")]
815 Clickhouse(#[from] clickhouse::error::Error),
816 #[error("firehose error at slot {slot}: {details}")]
818 Firehose {
819 details: String,
821 slot: u64,
823 },
824 #[error("plugin {plugin} failed during {stage}: {details}")]
826 PluginLifecycle {
827 plugin: &'static str,
829 stage: &'static str,
831 details: String,
833 },
834}
835
836#[derive(Clone)]
837struct PluginHandle {
838 plugin: Arc<dyn Plugin>,
839 id: u16,
840 name: &'static str,
841 version: u16,
842}
843
844impl From<Arc<dyn Plugin>> for PluginHandle {
845 fn from(plugin: Arc<dyn Plugin>) -> Self {
846 let id = plugin.id();
847 let name = plugin.name();
848 let version = plugin.version();
849 Self {
850 plugin,
851 id,
852 name,
853 version,
854 }
855 }
856}
857
858#[derive(Row, Serialize)]
859struct PluginRow<'a> {
860 id: u32,
861 name: &'a str,
862 version: u32,
863}
864
865#[derive(Row, Serialize)]
866struct PluginSlotRow {
867 plugin_id: u32,
868 slot: u64,
869}
870
871#[derive(Row, Serialize)]
872struct SlotStatusRow {
873 slot: u64,
874 transaction_count: u32,
875 thread_id: u8,
876 block_time: u32,
877}
878
879async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
880 db.query(
881 r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
882 slot UInt64,
883 transaction_count UInt32 DEFAULT 0,
884 thread_id UInt8 DEFAULT 0,
885 block_time DateTime('UTC') DEFAULT toDateTime(0),
886 indexed_at DateTime('UTC') DEFAULT now()
887 ) ENGINE = ReplacingMergeTree(indexed_at)
888 ORDER BY slot"#,
889 )
890 .execute()
891 .await?;
892
893 db.query(
894 r#"ALTER TABLE jetstreamer_slot_status
895 ADD COLUMN IF NOT EXISTS block_time DateTime('UTC') DEFAULT toDateTime(0)"#,
896 )
897 .execute()
898 .await?;
899
900 db.query(
901 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
902 id UInt32,
903 name String,
904 version UInt32
905 ) ENGINE = ReplacingMergeTree
906 ORDER BY id"#,
907 )
908 .execute()
909 .await?;
910
911 db.query(
912 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
913 plugin_id UInt32,
914 slot UInt64,
915 indexed_at DateTime('UTC') DEFAULT now()
916 ) ENGINE = ReplacingMergeTree
917 ORDER BY (plugin_id, slot)"#,
918 )
919 .execute()
920 .await?;
921
922 Ok(())
923}
924
925async fn upsert_plugins(
926 db: &Client,
927 plugins: &[PluginHandle],
928) -> Result<(), clickhouse::error::Error> {
929 if plugins.is_empty() {
930 return Ok(());
931 }
932 let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
933 for handle in plugins {
934 insert
935 .write(&PluginRow {
936 id: handle.id as u32,
937 name: handle.name,
938 version: handle.version as u32,
939 })
940 .await?;
941 }
942 insert.end().await?;
943 Ok(())
944}
945
946async fn record_plugin_slot(
947 db: Arc<Client>,
948 plugin_id: u16,
949 slot: u64,
950) -> Result<(), clickhouse::error::Error> {
951 let mut insert = db
952 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
953 .await?;
954 insert
955 .write(&PluginSlotRow {
956 plugin_id: plugin_id as u32,
957 slot,
958 })
959 .await?;
960 insert.end().await?;
961 Ok(())
962}
963
964async fn flush_slot_buffer(
965 db: Arc<Client>,
966 buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
967) -> Result<(), clickhouse::error::Error> {
968 let mut rows = Vec::new();
969 buffer.iter_mut().for_each(|mut entry| {
970 if !entry.value().is_empty() {
971 rows.append(entry.value_mut());
972 }
973 });
974
975 if rows.is_empty() {
976 return Ok(());
977 }
978
979 let mut insert = db
980 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
981 .await?;
982 for row in rows {
983 insert.write(&row).await?;
984 }
985 insert.end().await?;
986 Ok(())
987}
988
989async fn record_slot_status(
990 db: Arc<Client>,
991 slot: u64,
992 thread_id: usize,
993 transaction_count: u64,
994 block_time: Option<i64>,
995) -> Result<(), clickhouse::error::Error> {
996 let mut insert = db
997 .insert::<SlotStatusRow>("jetstreamer_slot_status")
998 .await?;
999 insert
1000 .write(&SlotStatusRow {
1001 slot,
1002 transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1003 thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1004 block_time: clamp_block_time(block_time),
1005 })
1006 .await?;
1007 insert.end().await?;
1008 Ok(())
1009}
1010
1011fn clamp_block_time(block_time: Option<i64>) -> u32 {
1012 match block_time {
1013 Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1014 Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1015 Some(ts) if ts < 0 => 0,
1016 _ => 0,
1017 }
1018}
1019
1020trait _CanSend: Send + Sync + 'static {}
1022impl _CanSend for PluginRunnerError {}
1023
1024#[inline]
1025fn human_readable_count(value: impl Into<u128>) -> String {
1026 let digits = value.into().to_string();
1027 let len = digits.len();
1028 let mut formatted = String::with_capacity(len + len / 3);
1029 for (idx, byte) in digits.bytes().enumerate() {
1030 if idx != 0 && (len - idx) % 3 == 0 {
1031 formatted.push(',');
1032 }
1033 formatted.push(char::from(byte));
1034 }
1035 formatted
1036}
1037
1038fn human_readable_duration(seconds: f64) -> String {
1039 if !seconds.is_finite() {
1040 return "n/a".into();
1041 }
1042 if seconds <= 0.0 {
1043 return "0s".into();
1044 }
1045 if seconds < 60.0 {
1046 return format!("{:.1}s", seconds);
1047 }
1048 let duration = Duration::from_secs(seconds.round() as u64);
1049 let secs = duration.as_secs();
1050 let days = secs / 86_400;
1051 let hours = (secs % 86_400) / 3_600;
1052 let minutes = (secs % 3_600) / 60;
1053 let seconds_rem = secs % 60;
1054 if days > 0 {
1055 if hours > 0 {
1056 format!("{}d{}h", days, hours)
1057 } else {
1058 format!("{}d", days)
1059 }
1060 } else if hours > 0 {
1061 format!("{}h{}m", hours, minutes)
1062 } else {
1063 format!("{}m{}s", minutes, seconds_rem)
1064 }
1065}