1#![deny(missing_docs)]
2pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114 fmt::Display,
115 future::Future,
116 hint,
117 ops::Range,
118 pin::Pin,
119 sync::{
120 Arc,
121 atomic::{AtomicBool, AtomicU64, Ordering},
122 },
123 time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130 BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137use url::Url;
138
139pub use jetstreamer_firehose::firehose::{
141 FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
142};
143
144static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
147static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
148static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
149#[inline]
150fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
151 origin.elapsed().as_nanos() as u64
152}
153
154pub type PluginFuture<'a> = Pin<
156 Box<
157 dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
158 + Send
159 + 'a,
160 >,
161>;
162
163pub trait Plugin: Send + Sync + 'static {
167 fn name(&self) -> &'static str;
169
170 fn version(&self) -> u16 {
172 1
173 }
174
175 fn id(&self) -> u16 {
177 let hash = Sha256::digest(self.name());
178 let mut res = 1u16;
179 for byte in hash {
180 res = res.wrapping_mul(31).wrapping_add(byte as u16);
181 }
182 res
183 }
184
185 fn on_transaction<'a>(
187 &'a self,
188 _thread_id: usize,
189 _db: Option<Arc<Client>>,
190 _transaction: &'a TransactionData,
191 ) -> PluginFuture<'a> {
192 async move { Ok(()) }.boxed()
193 }
194
195 fn on_block<'a>(
197 &'a self,
198 _thread_id: usize,
199 _db: Option<Arc<Client>>,
200 _block: &'a BlockData,
201 ) -> PluginFuture<'a> {
202 async move { Ok(()) }.boxed()
203 }
204
205 fn on_entry<'a>(
207 &'a self,
208 _thread_id: usize,
209 _db: Option<Arc<Client>>,
210 _entry: &'a EntryData,
211 ) -> PluginFuture<'a> {
212 async move { Ok(()) }.boxed()
213 }
214
215 fn on_reward<'a>(
217 &'a self,
218 _thread_id: usize,
219 _db: Option<Arc<Client>>,
220 _reward: &'a RewardsData,
221 ) -> PluginFuture<'a> {
222 async move { Ok(()) }.boxed()
223 }
224
225 fn on_error<'a>(
227 &'a self,
228 _thread_id: usize,
229 _db: Option<Arc<Client>>,
230 _error: &'a FirehoseErrorContext,
231 ) -> PluginFuture<'a> {
232 async move { Ok(()) }.boxed()
233 }
234
235 fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
237 async move { Ok(()) }.boxed()
238 }
239
240 fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
242 async move { Ok(()) }.boxed()
243 }
244}
245
246#[derive(Clone)]
250pub struct PluginRunner {
251 plugins: Arc<Vec<Arc<dyn Plugin>>>,
252 clickhouse_dsn: String,
253 num_threads: usize,
254 sequential: bool,
255 buffer_window_bytes: Option<u64>,
256 db_update_interval_slots: u64,
257}
258
259impl PluginRunner {
260 pub fn new(
265 clickhouse_dsn: impl Display,
266 num_threads: usize,
267 sequential: bool,
268 buffer_window_bytes: Option<u64>,
269 ) -> Self {
270 Self {
271 plugins: Arc::new(Vec::new()),
272 clickhouse_dsn: clickhouse_dsn.to_string(),
273 num_threads: std::cmp::max(1, num_threads),
274 sequential,
275 buffer_window_bytes,
276 db_update_interval_slots: 100,
277 }
278 }
279
280 pub fn register(&mut self, plugin: Box<dyn Plugin>) {
282 Arc::get_mut(&mut self.plugins)
283 .expect("cannot register plugins after the runner has started")
284 .push(Arc::from(plugin));
285 }
286
287 pub async fn run(
289 self: Arc<Self>,
290 slot_range: Range<u64>,
291 clickhouse_enabled: bool,
292 ) -> Result<(), PluginRunnerError> {
293 let db_update_interval = self.db_update_interval_slots.max(1);
294 let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
295 self.plugins
296 .iter()
297 .cloned()
298 .map(PluginHandle::from)
299 .collect(),
300 );
301
302 let clickhouse = if clickhouse_enabled {
303 let client = Arc::new(
304 build_clickhouse_client(&self.clickhouse_dsn)
305 .with_option("async_insert", "1")
306 .with_option("wait_for_async_insert", "0"),
307 );
308 ensure_clickhouse_tables(client.as_ref()).await?;
309 upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
310 Some(client)
311 } else {
312 None
313 };
314
315 for handle in plugin_handles.iter() {
316 if let Err(error) = handle
317 .plugin
318 .on_load(clickhouse.clone())
319 .await
320 .map_err(|e| e.to_string())
321 {
322 return Err(PluginRunnerError::PluginLifecycle {
323 plugin: handle.name,
324 stage: "on_load",
325 details: error,
326 });
327 }
328 }
329
330 let shutting_down = Arc::new(AtomicBool::new(false));
331 let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>, ahash::RandomState>> =
332 Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
333 let clickhouse_enabled = clickhouse.is_some();
334 let slots_since_flush = Arc::new(AtomicU64::new(0));
335
336 let on_block = {
337 let plugin_handles = plugin_handles.clone();
338 let clickhouse = clickhouse.clone();
339 let slot_buffer = slot_buffer.clone();
340 let slots_since_flush = slots_since_flush.clone();
341 let shutting_down = shutting_down.clone();
342 move |thread_id: usize, block: BlockData| {
343 let plugin_handles = plugin_handles.clone();
344 let clickhouse = clickhouse.clone();
345 let slot_buffer = slot_buffer.clone();
346 let slots_since_flush = slots_since_flush.clone();
347 let shutting_down = shutting_down.clone();
348 async move {
349 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
350 if shutting_down.load(Ordering::SeqCst) {
351 log::debug!(
352 target: &log_target,
353 "ignoring block while shutdown is in progress"
354 );
355 return Ok(());
356 }
357 let block = Arc::new(block);
358 if !plugin_handles.is_empty() {
359 for handle in plugin_handles.iter() {
360 let db = clickhouse.clone();
361 if let Err(err) = handle
362 .plugin
363 .on_block(thread_id, db.clone(), block.as_ref())
364 .await
365 {
366 log::error!(
367 target: &log_target,
368 "plugin {} on_block error: {}",
369 handle.name,
370 err
371 );
372 continue;
373 }
374 if let (Some(db_client), BlockData::Block { slot, .. }) =
375 (clickhouse.clone(), block.as_ref())
376 {
377 if clickhouse_enabled {
378 slot_buffer
379 .entry(handle.id)
380 .or_default()
381 .push(PluginSlotRow {
382 plugin_id: handle.id as u32,
383 slot: *slot,
384 });
385 } else if let Err(err) =
386 record_plugin_slot(db_client, handle.id, *slot).await
387 {
388 log::error!(
389 target: &log_target,
390 "failed to record plugin slot for {}: {}",
391 handle.name,
392 err
393 );
394 }
395 }
396 }
397 if clickhouse_enabled {
398 let current = slots_since_flush
399 .fetch_add(1, Ordering::Relaxed)
400 .wrapping_add(1);
401 if current.is_multiple_of(db_update_interval)
402 && let Some(db_client) = clickhouse.clone()
403 {
404 let buffer = slot_buffer.clone();
405 let log_target_clone = log_target.clone();
406 tokio::spawn(async move {
407 if let Err(err) = flush_slot_buffer(db_client, buffer).await {
408 log::error!(
409 target: &log_target_clone,
410 "failed to flush buffered plugin slots: {}",
411 err
412 );
413 }
414 });
415 }
416 }
417 }
418 if let Some(db_client) = clickhouse.clone() {
419 match block.as_ref() {
420 BlockData::Block {
421 slot,
422 executed_transaction_count,
423 block_time,
424 ..
425 } => {
426 let tally = take_slot_tx_tally(*slot);
427 let slot = *slot;
428 let executed_transaction_count = *executed_transaction_count;
429 let block_time = *block_time;
430 let log_target_clone = log_target.clone();
431 tokio::spawn(async move {
432 if let Err(err) = record_slot_status(
433 db_client,
434 slot,
435 thread_id,
436 executed_transaction_count,
437 tally.votes,
438 tally.non_votes,
439 block_time,
440 )
441 .await
442 {
443 log::error!(
444 target: &log_target_clone,
445 "failed to record slot status: {}",
446 err
447 );
448 }
449 });
450 }
451 BlockData::PossibleLeaderSkipped { slot } => {
452 take_slot_tx_tally(*slot);
454 }
455 }
456 }
457 Ok(())
458 }
459 .boxed()
460 }
461 };
462
463 let on_transaction = {
464 let plugin_handles = plugin_handles.clone();
465 let clickhouse = clickhouse.clone();
466 let shutting_down = shutting_down.clone();
467 move |thread_id: usize, transaction: TransactionData| {
468 let plugin_handles = plugin_handles.clone();
469 let clickhouse = clickhouse.clone();
470 let shutting_down = shutting_down.clone();
471 async move {
472 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
473 record_slot_vote_tally(transaction.slot, transaction.is_vote);
474 if plugin_handles.is_empty() {
475 return Ok(());
476 }
477 if shutting_down.load(Ordering::SeqCst) {
478 log::debug!(
479 target: &log_target,
480 "ignoring transaction while shutdown is in progress"
481 );
482 return Ok(());
483 }
484 for handle in plugin_handles.iter() {
485 if let Err(err) = handle
486 .plugin
487 .on_transaction(thread_id, clickhouse.clone(), &transaction)
488 .await
489 {
490 log::error!(
491 target: &log_target,
492 "plugin {} on_transaction error: {}",
493 handle.name,
494 err
495 );
496 }
497 }
498 Ok(())
499 }
500 .boxed()
501 }
502 };
503
504 let on_entry = {
505 let plugin_handles = plugin_handles.clone();
506 let clickhouse = clickhouse.clone();
507 let shutting_down = shutting_down.clone();
508 move |thread_id: usize, entry: EntryData| {
509 let plugin_handles = plugin_handles.clone();
510 let clickhouse = clickhouse.clone();
511 let shutting_down = shutting_down.clone();
512 async move {
513 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
514 if plugin_handles.is_empty() {
515 return Ok(());
516 }
517 if shutting_down.load(Ordering::SeqCst) {
518 log::debug!(
519 target: &log_target,
520 "ignoring entry while shutdown is in progress"
521 );
522 return Ok(());
523 }
524 let entry = Arc::new(entry);
525 for handle in plugin_handles.iter() {
526 if let Err(err) = handle
527 .plugin
528 .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
529 .await
530 {
531 log::error!(
532 target: &log_target,
533 "plugin {} on_entry error: {}",
534 handle.name,
535 err
536 );
537 }
538 }
539 Ok(())
540 }
541 .boxed()
542 }
543 };
544
545 let on_reward = {
546 let plugin_handles = plugin_handles.clone();
547 let clickhouse = clickhouse.clone();
548 let shutting_down = shutting_down.clone();
549 move |thread_id: usize, reward: RewardsData| {
550 let plugin_handles = plugin_handles.clone();
551 let clickhouse = clickhouse.clone();
552 let shutting_down = shutting_down.clone();
553 async move {
554 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
555 if plugin_handles.is_empty() {
556 return Ok(());
557 }
558 if shutting_down.load(Ordering::SeqCst) {
559 log::debug!(
560 target: &log_target,
561 "ignoring reward while shutdown is in progress"
562 );
563 return Ok(());
564 }
565 let reward = Arc::new(reward);
566 for handle in plugin_handles.iter() {
567 if let Err(err) = handle
568 .plugin
569 .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
570 .await
571 {
572 log::error!(
573 target: &log_target,
574 "plugin {} on_reward error: {}",
575 handle.name,
576 err
577 );
578 }
579 }
580 Ok(())
581 }
582 .boxed()
583 }
584 };
585
586 let on_error = {
587 let plugin_handles = plugin_handles.clone();
588 let clickhouse = clickhouse.clone();
589 let shutting_down = shutting_down.clone();
590 move |thread_id: usize, context: FirehoseErrorContext| {
591 let plugin_handles = plugin_handles.clone();
592 let clickhouse = clickhouse.clone();
593 let shutting_down = shutting_down.clone();
594 async move {
595 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
596 if plugin_handles.is_empty() {
597 return Ok(());
598 }
599 if shutting_down.load(Ordering::SeqCst) {
600 log::debug!(
601 target: &log_target,
602 "ignoring error callback while shutdown is in progress"
603 );
604 return Ok(());
605 }
606 let context = Arc::new(context);
607 for handle in plugin_handles.iter() {
608 if let Err(err) = handle
609 .plugin
610 .on_error(thread_id, clickhouse.clone(), context.as_ref())
611 .await
612 {
613 log::error!(
614 target: &log_target,
615 "plugin {} on_error error: {}",
616 handle.name,
617 err
618 );
619 }
620 }
621 Ok(())
622 }
623 .boxed()
624 }
625 };
626
627 let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
628
629 let total_slot_count_capture = total_slot_count;
630 let run_origin = std::time::Instant::now();
631 SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
633 LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
634 LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
635 LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
636 let stats_tracking = clickhouse.clone().map(|_db| {
637 let shutting_down = shutting_down.clone();
638 let thread_progress_max: Arc<DashMap<usize, f64, ahash::RandomState>> = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
639 StatsTracking {
640 on_stats: {
641 let thread_progress_max = thread_progress_max.clone();
642 let total_slot_count = total_slot_count_capture;
643 move |thread_id: usize, stats: Stats| {
644 let shutting_down = shutting_down.clone();
645 let thread_progress_max = thread_progress_max.clone();
646 async move {
647 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
648 if shutting_down.load(Ordering::SeqCst) {
649 log::debug!(
650 target: &log_target,
651 "skipping stats write during shutdown"
652 );
653 return Ok(());
654 }
655 let finish_at = stats
656 .finish_time
657 .unwrap_or_else(std::time::Instant::now);
658 let elapsed_since_start = finish_at
659 .saturating_duration_since(stats.start_time)
660 .as_nanos()
661 .max(1) as u64;
662 let total_slots = stats.slots_processed;
663 let total_txs = stats.transactions_processed;
664 let now_ns = monotonic_nanos_since(run_origin);
665 let (delta_slots, delta_txs, delta_time_ns) = {
669 while SNAPSHOT_LOCK
670 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
671 .is_err()
672 {
673 hint::spin_loop();
674 }
675 let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
676 let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
677 let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
678 LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
679 LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
680 LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
681 SNAPSHOT_LOCK.store(false, Ordering::Release);
682 let delta_slots = total_slots.saturating_sub(prev_slots);
683 let delta_txs = total_txs.saturating_sub(prev_txs);
684 let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
685 (delta_slots, delta_txs, delta_time_ns)
686 };
687 let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
688 let mut slot_rate = delta_slots as f64 / delta_secs;
689 let mut tps = delta_txs as f64 / delta_secs;
690 if slot_rate <= 0.0 && total_slots > 0 {
691 slot_rate =
692 total_slots as f64 / (elapsed_since_start as f64 / 1e9);
693 }
694 if tps <= 0.0 && total_txs > 0 {
695 tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
696 }
697 let thread_stats = &stats.thread_stats;
698 let processed_slots = stats.slots_processed.min(total_slot_count);
699 let progress_fraction = if total_slot_count > 0 {
700 processed_slots as f64 / total_slot_count as f64
701 } else {
702 1.0
703 };
704 let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
705 let thread_total_slots = thread_stats
706 .initial_slot_range
707 .end
708 .saturating_sub(thread_stats.initial_slot_range.start);
709 let thread_progress_raw = if thread_total_slots > 0 {
710 (thread_stats.slots_processed as f64 / thread_total_slots as f64)
711 .clamp(0.0, 1.0)
712 * 100.0
713 } else {
714 100.0
715 };
716 let thread_progress = *thread_progress_max
717 .entry(thread_id)
718 .and_modify(|max| {
719 if thread_progress_raw > *max {
720 *max = thread_progress_raw;
721 }
722 })
723 .or_insert(thread_progress_raw);
724 let mut overall_eta = None;
725 if slot_rate > 0.0 {
726 let remaining_slots =
727 total_slot_count.saturating_sub(processed_slots);
728 overall_eta = Some(human_readable_duration(
729 remaining_slots as f64 / slot_rate,
730 ));
731 }
732 if overall_eta.is_none() {
733 if progress_fraction > 0.0 && progress_fraction < 1.0 {
734 if let Some(elapsed_total) = finish_at
735 .checked_duration_since(stats.start_time)
736 .map(|d| d.as_secs_f64())
737 && elapsed_total > 0.0 {
738 let remaining_secs =
739 elapsed_total * (1.0 / progress_fraction - 1.0);
740 overall_eta = Some(human_readable_duration(remaining_secs));
741 }
742 } else if progress_fraction >= 1.0 {
743 overall_eta = Some("0s".into());
744 }
745 }
746 let slots_display = human_readable_count(processed_slots);
747 let blocks_display = human_readable_count(stats.blocks_processed);
748 let txs_display = human_readable_count(stats.transactions_processed);
749 let tps_display = human_readable_count(tps.ceil() as u64);
750 log::info!(
751 target: &log_target,
752 "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
753 overall_eta.unwrap_or_else(|| "n/a".into()),
754 );
755 Ok(())
756 }
757 .boxed()
758 }
759 },
760 tracking_interval_slots: 100,
761 }
762 });
763
764 let (shutdown_tx, _) = broadcast::channel::<()>(1);
765
766 let mut firehose_future = Box::pin(firehose(
767 self.num_threads as u64,
768 self.sequential,
769 self.buffer_window_bytes,
770 slot_range,
771 Some(on_block),
772 Some(on_transaction),
773 Some(on_entry),
774 Some(on_reward),
775 Some(on_error),
776 stats_tracking,
777 Some(shutdown_tx.subscribe()),
778 ));
779
780 let firehose_result = tokio::select! {
781 res = &mut firehose_future => res,
782 ctrl = signal::ctrl_c() => {
783 match ctrl {
784 Ok(()) => log::info!(
785 target: LOG_MODULE,
786 "CTRL+C received; initiating shutdown"
787 ),
788 Err(err) => log::error!(
789 target: LOG_MODULE,
790 "failed to listen for CTRL+C: {}",
791 err
792 ),
793 }
794 shutting_down.store(true, Ordering::SeqCst);
795 let _ = shutdown_tx.send(());
796 firehose_future.await
797 }
798 };
799
800 if clickhouse_enabled
801 && let Some(db_client) = clickhouse.clone()
802 && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
803 {
804 log::error!(
805 target: LOG_MODULE,
806 "failed to flush buffered plugin slots: {}",
807 err
808 );
809 }
810
811 for handle in plugin_handles.iter() {
812 if let Err(error) = handle
813 .plugin
814 .on_exit(clickhouse.clone())
815 .await
816 .map_err(|e| e.to_string())
817 {
818 log::error!(
819 target: LOG_MODULE,
820 "plugin {} on_exit error: {}",
821 handle.name,
822 error
823 );
824 }
825 }
826
827 match firehose_result {
828 Ok(()) => Ok(()),
829 Err((error, slot)) => Err(PluginRunnerError::Firehose {
830 details: error.to_string(),
831 slot,
832 }),
833 }
834 }
835}
836
837fn build_clickhouse_client(dsn: &str) -> Client {
838 let mut client = Client::default();
839 if let Ok(mut url) = Url::parse(dsn) {
840 let username = url.username().to_string();
841 let password = url.password().map(|value| value.to_string());
842 if !username.is_empty() || password.is_some() {
843 let _ = url.set_username("");
844 let _ = url.set_password(None);
845 }
846 client = client.with_url(url.as_str());
847 if !username.is_empty() {
848 client = client.with_user(username);
849 }
850 if let Some(password) = password {
851 client = client.with_password(password);
852 }
853 } else {
854 client = client.with_url(dsn);
855 }
856 client
857}
858
859#[derive(Debug, Error)]
861pub enum PluginRunnerError {
862 #[error("clickhouse error: {0}")]
864 Clickhouse(#[from] clickhouse::error::Error),
865 #[error("firehose error at slot {slot}: {details}")]
867 Firehose {
868 details: String,
870 slot: u64,
872 },
873 #[error("plugin {plugin} failed during {stage}: {details}")]
875 PluginLifecycle {
876 plugin: &'static str,
878 stage: &'static str,
880 details: String,
882 },
883}
884
885#[derive(Clone)]
886struct PluginHandle {
887 plugin: Arc<dyn Plugin>,
888 id: u16,
889 name: &'static str,
890 version: u16,
891}
892
893impl From<Arc<dyn Plugin>> for PluginHandle {
894 fn from(plugin: Arc<dyn Plugin>) -> Self {
895 let id = plugin.id();
896 let name = plugin.name();
897 let version = plugin.version();
898 Self {
899 plugin,
900 id,
901 name,
902 version,
903 }
904 }
905}
906
907#[derive(Row, Serialize)]
908struct PluginRow<'a> {
909 id: u32,
910 name: &'a str,
911 version: u32,
912}
913
914#[derive(Row, Serialize)]
915struct PluginSlotRow {
916 plugin_id: u32,
917 slot: u64,
918}
919
920#[derive(Row, Serialize)]
921struct SlotStatusRow {
922 slot: u64,
923 transaction_count: u32,
924 vote_transaction_count: u32,
925 non_vote_transaction_count: u32,
926 thread_id: u8,
927 block_time: u32,
928}
929
930#[derive(Default, Clone, Copy)]
931struct SlotTxTally {
932 votes: u64,
933 non_votes: u64,
934}
935
936static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally, ahash::RandomState>> =
937 Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
938
939async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
940 db.query(
941 r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
942 slot UInt64,
943 transaction_count UInt32 DEFAULT 0,
944 vote_transaction_count UInt32 DEFAULT 0,
945 non_vote_transaction_count UInt32 DEFAULT 0,
946 thread_id UInt8 DEFAULT 0,
947 block_time DateTime('UTC') DEFAULT toDateTime(0),
948 indexed_at DateTime('UTC') DEFAULT now()
949 ) ENGINE = ReplacingMergeTree(indexed_at)
950 ORDER BY slot"#,
951 )
952 .execute()
953 .await?;
954
955 db.query(
956 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
957 id UInt32,
958 name String,
959 version UInt32
960 ) ENGINE = ReplacingMergeTree
961 ORDER BY id"#,
962 )
963 .execute()
964 .await?;
965
966 db.query(
967 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
968 plugin_id UInt32,
969 slot UInt64,
970 indexed_at DateTime('UTC') DEFAULT now()
971 ) ENGINE = ReplacingMergeTree
972 ORDER BY (plugin_id, slot)"#,
973 )
974 .execute()
975 .await?;
976
977 Ok(())
978}
979
980async fn upsert_plugins(
981 db: &Client,
982 plugins: &[PluginHandle],
983) -> Result<(), clickhouse::error::Error> {
984 if plugins.is_empty() {
985 return Ok(());
986 }
987 let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
988 for handle in plugins {
989 insert
990 .write(&PluginRow {
991 id: handle.id as u32,
992 name: handle.name,
993 version: handle.version as u32,
994 })
995 .await?;
996 }
997 insert.end().await?;
998 Ok(())
999}
1000
1001async fn record_plugin_slot(
1002 db: Arc<Client>,
1003 plugin_id: u16,
1004 slot: u64,
1005) -> Result<(), clickhouse::error::Error> {
1006 let mut insert = db
1007 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1008 .await?;
1009 insert
1010 .write(&PluginSlotRow {
1011 plugin_id: plugin_id as u32,
1012 slot,
1013 })
1014 .await?;
1015 insert.end().await?;
1016 Ok(())
1017}
1018
1019async fn flush_slot_buffer(
1020 db: Arc<Client>,
1021 buffer: Arc<DashMap<u16, Vec<PluginSlotRow>, ahash::RandomState>>,
1022) -> Result<(), clickhouse::error::Error> {
1023 let mut rows = Vec::new();
1024 buffer.iter_mut().for_each(|mut entry| {
1025 if !entry.value().is_empty() {
1026 rows.append(entry.value_mut());
1027 }
1028 });
1029
1030 if rows.is_empty() {
1031 return Ok(());
1032 }
1033
1034 let mut insert = db
1035 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1036 .await?;
1037 for row in rows {
1038 insert.write(&row).await?;
1039 }
1040 insert.end().await?;
1041 Ok(())
1042}
1043
1044async fn record_slot_status(
1045 db: Arc<Client>,
1046 slot: u64,
1047 thread_id: usize,
1048 transaction_count: u64,
1049 vote_transaction_count: u64,
1050 non_vote_transaction_count: u64,
1051 block_time: Option<i64>,
1052) -> Result<(), clickhouse::error::Error> {
1053 let mut insert = db
1054 .insert::<SlotStatusRow>("jetstreamer_slot_status")
1055 .await?;
1056 insert
1057 .write(&SlotStatusRow {
1058 slot,
1059 transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1060 vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1061 non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1062 thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1063 block_time: clamp_block_time(block_time),
1064 })
1065 .await?;
1066 insert.end().await?;
1067 Ok(())
1068}
1069
1070fn clamp_block_time(block_time: Option<i64>) -> u32 {
1071 match block_time {
1072 Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1073 Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1074 Some(ts) if ts < 0 => 0,
1075 _ => 0,
1076 }
1077}
1078
1079fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1080 let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1081 if is_vote {
1082 entry.votes = entry.votes.saturating_add(1);
1083 } else {
1084 entry.non_votes = entry.non_votes.saturating_add(1);
1085 }
1086}
1087
1088fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1089 SLOT_TX_TALLY
1090 .remove(&slot)
1091 .map(|(_, tally)| tally)
1092 .unwrap_or_default()
1093}
1094
1095trait _CanSend: Send + Sync + 'static {}
1097impl _CanSend for PluginRunnerError {}
1098
1099#[inline]
1100fn human_readable_count(value: impl Into<u128>) -> String {
1101 let digits = value.into().to_string();
1102 let len = digits.len();
1103 let mut formatted = String::with_capacity(len + len / 3);
1104 for (idx, byte) in digits.bytes().enumerate() {
1105 if idx != 0 && (len - idx) % 3 == 0 {
1106 formatted.push(',');
1107 }
1108 formatted.push(char::from(byte));
1109 }
1110 formatted
1111}
1112
1113fn human_readable_duration(seconds: f64) -> String {
1114 if !seconds.is_finite() {
1115 return "n/a".into();
1116 }
1117 if seconds <= 0.0 {
1118 return "0s".into();
1119 }
1120 if seconds < 60.0 {
1121 return format!("{:.1}s", seconds);
1122 }
1123 let duration = Duration::from_secs(seconds.round() as u64);
1124 let secs = duration.as_secs();
1125 let days = secs / 86_400;
1126 let hours = (secs % 86_400) / 3_600;
1127 let minutes = (secs % 3_600) / 60;
1128 let seconds_rem = secs % 60;
1129 if days > 0 {
1130 if hours > 0 {
1131 format!("{}d{}h", days, hours)
1132 } else {
1133 format!("{}d", days)
1134 }
1135 } else if hours > 0 {
1136 format!("{}h{}m", hours, minutes)
1137 } else {
1138 format!("{}m{}s", minutes, seconds_rem)
1139 }
1140}