1#![deny(missing_docs)]
2pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114 fmt::Display,
115 future::Future,
116 hint,
117 ops::Range,
118 pin::Pin,
119 sync::{
120 Arc,
121 atomic::{AtomicBool, AtomicU64, Ordering},
122 },
123 time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130 BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137use url::Url;
138
139pub use jetstreamer_firehose::firehose::{
141 FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
142};
143
144static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
147static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
148static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
149#[inline]
150fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
151 origin.elapsed().as_nanos() as u64
152}
153
154pub type PluginFuture<'a> = Pin<
156 Box<
157 dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
158 + Send
159 + 'a,
160 >,
161>;
162
163pub trait Plugin: Send + Sync + 'static {
167 fn name(&self) -> &'static str;
169
170 fn version(&self) -> u16 {
172 1
173 }
174
175 fn id(&self) -> u16 {
177 let hash = Sha256::digest(self.name());
178 let mut res = 1u16;
179 for byte in hash {
180 res = res.wrapping_mul(31).wrapping_add(byte as u16);
181 }
182 res
183 }
184
185 fn on_transaction<'a>(
187 &'a self,
188 _thread_id: usize,
189 _db: Option<Arc<Client>>,
190 _transaction: &'a TransactionData,
191 ) -> PluginFuture<'a> {
192 async move { Ok(()) }.boxed()
193 }
194
195 fn on_block<'a>(
197 &'a self,
198 _thread_id: usize,
199 _db: Option<Arc<Client>>,
200 _block: &'a BlockData,
201 ) -> PluginFuture<'a> {
202 async move { Ok(()) }.boxed()
203 }
204
205 fn on_entry<'a>(
207 &'a self,
208 _thread_id: usize,
209 _db: Option<Arc<Client>>,
210 _entry: &'a EntryData,
211 ) -> PluginFuture<'a> {
212 async move { Ok(()) }.boxed()
213 }
214
215 fn on_reward<'a>(
217 &'a self,
218 _thread_id: usize,
219 _db: Option<Arc<Client>>,
220 _reward: &'a RewardsData,
221 ) -> PluginFuture<'a> {
222 async move { Ok(()) }.boxed()
223 }
224
225 fn on_error<'a>(
227 &'a self,
228 _thread_id: usize,
229 _db: Option<Arc<Client>>,
230 _error: &'a FirehoseErrorContext,
231 ) -> PluginFuture<'a> {
232 async move { Ok(()) }.boxed()
233 }
234
235 fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
237 async move { Ok(()) }.boxed()
238 }
239
240 fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
242 async move { Ok(()) }.boxed()
243 }
244}
245
246#[derive(Clone)]
250pub struct PluginRunner {
251 plugins: Arc<Vec<Arc<dyn Plugin>>>,
252 clickhouse_dsn: String,
253 num_threads: usize,
254 sequential: bool,
255 reverse: bool,
256 buffer_window_bytes: Option<u64>,
257 db_update_interval_slots: u64,
258}
259
260impl PluginRunner {
261 pub fn new(
268 clickhouse_dsn: impl Display,
269 num_threads: usize,
270 sequential: bool,
271 reverse: bool,
272 buffer_window_bytes: Option<u64>,
273 ) -> Self {
274 Self {
275 plugins: Arc::new(Vec::new()),
276 clickhouse_dsn: clickhouse_dsn.to_string(),
277 num_threads: std::cmp::max(1, num_threads),
278 sequential,
279 reverse,
280 buffer_window_bytes,
281 db_update_interval_slots: 100,
282 }
283 }
284
285 pub fn register(&mut self, plugin: Box<dyn Plugin>) {
287 Arc::get_mut(&mut self.plugins)
288 .expect("cannot register plugins after the runner has started")
289 .push(Arc::from(plugin));
290 }
291
292 pub async fn run(
294 self: Arc<Self>,
295 slot_range: Range<u64>,
296 clickhouse_enabled: bool,
297 ) -> Result<(), PluginRunnerError> {
298 let db_update_interval = self.db_update_interval_slots.max(1);
299 let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
300 self.plugins
301 .iter()
302 .cloned()
303 .map(PluginHandle::from)
304 .collect(),
305 );
306
307 let clickhouse = if clickhouse_enabled {
308 let client = Arc::new(
309 build_clickhouse_client(&self.clickhouse_dsn)
310 .with_option("async_insert", "1")
311 .with_option("wait_for_async_insert", "0"),
312 );
313 ensure_clickhouse_tables(client.as_ref()).await?;
314 upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
315 Some(client)
316 } else {
317 None
318 };
319
320 for handle in plugin_handles.iter() {
321 if let Err(error) = handle
322 .plugin
323 .on_load(clickhouse.clone())
324 .await
325 .map_err(|e| e.to_string())
326 {
327 return Err(PluginRunnerError::PluginLifecycle {
328 plugin: handle.name,
329 stage: "on_load",
330 details: error,
331 });
332 }
333 }
334
335 let shutting_down = Arc::new(AtomicBool::new(false));
336 let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>, ahash::RandomState>> =
337 Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
338 let clickhouse_enabled = clickhouse.is_some();
339 let slots_since_flush = Arc::new(AtomicU64::new(0));
340
341 let on_block = {
342 let plugin_handles = plugin_handles.clone();
343 let clickhouse = clickhouse.clone();
344 let slot_buffer = slot_buffer.clone();
345 let slots_since_flush = slots_since_flush.clone();
346 let shutting_down = shutting_down.clone();
347 move |thread_id: usize, block: BlockData| {
348 let plugin_handles = plugin_handles.clone();
349 let clickhouse = clickhouse.clone();
350 let slot_buffer = slot_buffer.clone();
351 let slots_since_flush = slots_since_flush.clone();
352 let shutting_down = shutting_down.clone();
353 async move {
354 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
355 if shutting_down.load(Ordering::SeqCst) {
356 log::debug!(
357 target: &log_target,
358 "ignoring block while shutdown is in progress"
359 );
360 return Ok(());
361 }
362 let block = Arc::new(block);
363 if !plugin_handles.is_empty() {
364 for handle in plugin_handles.iter() {
365 let db = clickhouse.clone();
366 if let Err(err) = handle
367 .plugin
368 .on_block(thread_id, db.clone(), block.as_ref())
369 .await
370 {
371 log::error!(
372 target: &log_target,
373 "plugin {} on_block error: {}",
374 handle.name,
375 err
376 );
377 continue;
378 }
379 if let (Some(db_client), BlockData::Block { slot, .. }) =
380 (clickhouse.clone(), block.as_ref())
381 {
382 if clickhouse_enabled {
383 slot_buffer
384 .entry(handle.id)
385 .or_default()
386 .push(PluginSlotRow {
387 plugin_id: handle.id as u32,
388 slot: *slot,
389 });
390 } else if let Err(err) =
391 record_plugin_slot(db_client, handle.id, *slot).await
392 {
393 log::error!(
394 target: &log_target,
395 "failed to record plugin slot for {}: {}",
396 handle.name,
397 err
398 );
399 }
400 }
401 }
402 if clickhouse_enabled {
403 let current = slots_since_flush
404 .fetch_add(1, Ordering::Relaxed)
405 .wrapping_add(1);
406 if current.is_multiple_of(db_update_interval)
407 && let Some(db_client) = clickhouse.clone()
408 {
409 let buffer = slot_buffer.clone();
410 let log_target_clone = log_target.clone();
411 tokio::spawn(async move {
412 if let Err(err) = flush_slot_buffer(db_client, buffer).await {
413 log::error!(
414 target: &log_target_clone,
415 "failed to flush buffered plugin slots: {}",
416 err
417 );
418 }
419 });
420 }
421 }
422 }
423 if let Some(db_client) = clickhouse.clone() {
424 match block.as_ref() {
425 BlockData::Block {
426 slot,
427 executed_transaction_count,
428 block_time,
429 ..
430 } => {
431 let tally = take_slot_tx_tally(*slot);
432 let slot = *slot;
433 let executed_transaction_count = *executed_transaction_count;
434 let block_time = *block_time;
435 let log_target_clone = log_target.clone();
436 tokio::spawn(async move {
437 if let Err(err) = record_slot_status(
438 db_client,
439 slot,
440 thread_id,
441 executed_transaction_count,
442 tally.votes,
443 tally.non_votes,
444 block_time,
445 )
446 .await
447 {
448 log::error!(
449 target: &log_target_clone,
450 "failed to record slot status: {}",
451 err
452 );
453 }
454 });
455 }
456 BlockData::PossibleLeaderSkipped { slot } => {
457 take_slot_tx_tally(*slot);
459 }
460 }
461 }
462 Ok(())
463 }
464 .boxed()
465 }
466 };
467
468 let on_transaction = {
469 let plugin_handles = plugin_handles.clone();
470 let clickhouse = clickhouse.clone();
471 let shutting_down = shutting_down.clone();
472 move |thread_id: usize, transaction: TransactionData| {
473 let plugin_handles = plugin_handles.clone();
474 let clickhouse = clickhouse.clone();
475 let shutting_down = shutting_down.clone();
476 async move {
477 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
478 record_slot_vote_tally(transaction.slot, transaction.is_vote);
479 if plugin_handles.is_empty() {
480 return Ok(());
481 }
482 if shutting_down.load(Ordering::SeqCst) {
483 log::debug!(
484 target: &log_target,
485 "ignoring transaction while shutdown is in progress"
486 );
487 return Ok(());
488 }
489 for handle in plugin_handles.iter() {
490 if let Err(err) = handle
491 .plugin
492 .on_transaction(thread_id, clickhouse.clone(), &transaction)
493 .await
494 {
495 log::error!(
496 target: &log_target,
497 "plugin {} on_transaction error: {}",
498 handle.name,
499 err
500 );
501 }
502 }
503 Ok(())
504 }
505 .boxed()
506 }
507 };
508
509 let on_entry = {
510 let plugin_handles = plugin_handles.clone();
511 let clickhouse = clickhouse.clone();
512 let shutting_down = shutting_down.clone();
513 move |thread_id: usize, entry: EntryData| {
514 let plugin_handles = plugin_handles.clone();
515 let clickhouse = clickhouse.clone();
516 let shutting_down = shutting_down.clone();
517 async move {
518 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
519 if plugin_handles.is_empty() {
520 return Ok(());
521 }
522 if shutting_down.load(Ordering::SeqCst) {
523 log::debug!(
524 target: &log_target,
525 "ignoring entry while shutdown is in progress"
526 );
527 return Ok(());
528 }
529 let entry = Arc::new(entry);
530 for handle in plugin_handles.iter() {
531 if let Err(err) = handle
532 .plugin
533 .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
534 .await
535 {
536 log::error!(
537 target: &log_target,
538 "plugin {} on_entry error: {}",
539 handle.name,
540 err
541 );
542 }
543 }
544 Ok(())
545 }
546 .boxed()
547 }
548 };
549
550 let on_reward = {
551 let plugin_handles = plugin_handles.clone();
552 let clickhouse = clickhouse.clone();
553 let shutting_down = shutting_down.clone();
554 move |thread_id: usize, reward: RewardsData| {
555 let plugin_handles = plugin_handles.clone();
556 let clickhouse = clickhouse.clone();
557 let shutting_down = shutting_down.clone();
558 async move {
559 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
560 if plugin_handles.is_empty() {
561 return Ok(());
562 }
563 if shutting_down.load(Ordering::SeqCst) {
564 log::debug!(
565 target: &log_target,
566 "ignoring reward while shutdown is in progress"
567 );
568 return Ok(());
569 }
570 let reward = Arc::new(reward);
571 for handle in plugin_handles.iter() {
572 if let Err(err) = handle
573 .plugin
574 .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
575 .await
576 {
577 log::error!(
578 target: &log_target,
579 "plugin {} on_reward error: {}",
580 handle.name,
581 err
582 );
583 }
584 }
585 Ok(())
586 }
587 .boxed()
588 }
589 };
590
591 let on_error = {
592 let plugin_handles = plugin_handles.clone();
593 let clickhouse = clickhouse.clone();
594 let shutting_down = shutting_down.clone();
595 move |thread_id: usize, context: FirehoseErrorContext| {
596 let plugin_handles = plugin_handles.clone();
597 let clickhouse = clickhouse.clone();
598 let shutting_down = shutting_down.clone();
599 async move {
600 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
601 if plugin_handles.is_empty() {
602 return Ok(());
603 }
604 if shutting_down.load(Ordering::SeqCst) {
605 log::debug!(
606 target: &log_target,
607 "ignoring error callback while shutdown is in progress"
608 );
609 return Ok(());
610 }
611 let context = Arc::new(context);
612 for handle in plugin_handles.iter() {
613 if let Err(err) = handle
614 .plugin
615 .on_error(thread_id, clickhouse.clone(), context.as_ref())
616 .await
617 {
618 log::error!(
619 target: &log_target,
620 "plugin {} on_error error: {}",
621 handle.name,
622 err
623 );
624 }
625 }
626 Ok(())
627 }
628 .boxed()
629 }
630 };
631
632 let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
633
634 let total_slot_count_capture = total_slot_count;
635 let run_origin = std::time::Instant::now();
636 SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
638 LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
639 LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
640 LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
641 let stats_tracking = clickhouse.clone().map(|_db| {
642 let shutting_down = shutting_down.clone();
643 let thread_progress_max: Arc<DashMap<usize, f64, ahash::RandomState>> = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
644 StatsTracking {
645 on_stats: {
646 let thread_progress_max = thread_progress_max.clone();
647 let total_slot_count = total_slot_count_capture;
648 move |thread_id: usize, stats: Stats| {
649 let shutting_down = shutting_down.clone();
650 let thread_progress_max = thread_progress_max.clone();
651 async move {
652 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
653 if shutting_down.load(Ordering::SeqCst) {
654 log::debug!(
655 target: &log_target,
656 "skipping stats write during shutdown"
657 );
658 return Ok(());
659 }
660 let finish_at = stats
661 .finish_time
662 .unwrap_or_else(std::time::Instant::now);
663 let elapsed_since_start = finish_at
664 .saturating_duration_since(stats.start_time)
665 .as_nanos()
666 .max(1) as u64;
667 let total_slots = stats.slots_processed;
668 let total_txs = stats.transactions_processed;
669 let now_ns = monotonic_nanos_since(run_origin);
670 let (delta_slots, delta_txs, delta_time_ns) = {
674 while SNAPSHOT_LOCK
675 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
676 .is_err()
677 {
678 hint::spin_loop();
679 }
680 let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
681 let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
682 let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
683 LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
684 LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
685 LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
686 SNAPSHOT_LOCK.store(false, Ordering::Release);
687 let delta_slots = total_slots.saturating_sub(prev_slots);
688 let delta_txs = total_txs.saturating_sub(prev_txs);
689 let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
690 (delta_slots, delta_txs, delta_time_ns)
691 };
692 let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
693 let mut slot_rate = delta_slots as f64 / delta_secs;
694 let mut tps = delta_txs as f64 / delta_secs;
695 if slot_rate <= 0.0 && total_slots > 0 {
696 slot_rate =
697 total_slots as f64 / (elapsed_since_start as f64 / 1e9);
698 }
699 if tps <= 0.0 && total_txs > 0 {
700 tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
701 }
702 let thread_stats = &stats.thread_stats;
703 let processed_slots = stats.slots_processed.min(total_slot_count);
704 let progress_fraction = if total_slot_count > 0 {
705 processed_slots as f64 / total_slot_count as f64
706 } else {
707 1.0
708 };
709 let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
710 let thread_total_slots = thread_stats
711 .initial_slot_range
712 .end
713 .saturating_sub(thread_stats.initial_slot_range.start);
714 let thread_progress_raw = if thread_total_slots > 0 {
715 (thread_stats.slots_processed as f64 / thread_total_slots as f64)
716 .clamp(0.0, 1.0)
717 * 100.0
718 } else {
719 100.0
720 };
721 let thread_progress = *thread_progress_max
722 .entry(thread_id)
723 .and_modify(|max| {
724 if thread_progress_raw > *max {
725 *max = thread_progress_raw;
726 }
727 })
728 .or_insert(thread_progress_raw);
729 let mut overall_eta = None;
730 if slot_rate > 0.0 {
731 let remaining_slots =
732 total_slot_count.saturating_sub(processed_slots);
733 overall_eta = Some(human_readable_duration(
734 remaining_slots as f64 / slot_rate,
735 ));
736 }
737 if overall_eta.is_none() {
738 if progress_fraction > 0.0 && progress_fraction < 1.0 {
739 if let Some(elapsed_total) = finish_at
740 .checked_duration_since(stats.start_time)
741 .map(|d| d.as_secs_f64())
742 && elapsed_total > 0.0 {
743 let remaining_secs =
744 elapsed_total * (1.0 / progress_fraction - 1.0);
745 overall_eta = Some(human_readable_duration(remaining_secs));
746 }
747 } else if progress_fraction >= 1.0 {
748 overall_eta = Some("0s".into());
749 }
750 }
751 let slots_display = human_readable_count(processed_slots);
752 let blocks_display = human_readable_count(stats.blocks_processed);
753 let txs_display = human_readable_count(stats.transactions_processed);
754 let tps_display = human_readable_count(tps.ceil() as u64);
755 log::info!(
756 target: &log_target,
757 "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
758 overall_eta.unwrap_or_else(|| "n/a".into()),
759 );
760 Ok(())
761 }
762 .boxed()
763 }
764 },
765 tracking_interval_slots: 100,
766 }
767 });
768
769 let (shutdown_tx, _) = broadcast::channel::<()>(1);
770
771 let mut firehose_future = Box::pin(firehose(
772 self.num_threads as u64,
773 self.sequential,
774 self.reverse,
775 self.buffer_window_bytes,
776 slot_range,
777 Some(on_block),
778 Some(on_transaction),
779 Some(on_entry),
780 Some(on_reward),
781 Some(on_error),
782 stats_tracking,
783 Some(shutdown_tx.subscribe()),
784 ));
785
786 let firehose_result = tokio::select! {
787 res = &mut firehose_future => res,
788 ctrl = signal::ctrl_c() => {
789 match ctrl {
790 Ok(()) => log::info!(
791 target: LOG_MODULE,
792 "CTRL+C received; initiating shutdown"
793 ),
794 Err(err) => log::error!(
795 target: LOG_MODULE,
796 "failed to listen for CTRL+C: {}",
797 err
798 ),
799 }
800 shutting_down.store(true, Ordering::SeqCst);
801 let _ = shutdown_tx.send(());
802 firehose_future.await
803 }
804 };
805
806 if clickhouse_enabled
807 && let Some(db_client) = clickhouse.clone()
808 && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
809 {
810 log::error!(
811 target: LOG_MODULE,
812 "failed to flush buffered plugin slots: {}",
813 err
814 );
815 }
816
817 for handle in plugin_handles.iter() {
818 if let Err(error) = handle
819 .plugin
820 .on_exit(clickhouse.clone())
821 .await
822 .map_err(|e| e.to_string())
823 {
824 log::error!(
825 target: LOG_MODULE,
826 "plugin {} on_exit error: {}",
827 handle.name,
828 error
829 );
830 }
831 }
832
833 match firehose_result {
834 Ok(()) => Ok(()),
835 Err((error, slot)) => Err(PluginRunnerError::Firehose {
836 details: error.to_string(),
837 slot,
838 }),
839 }
840 }
841}
842
843fn build_clickhouse_client(dsn: &str) -> Client {
844 let mut client = Client::default();
845 if let Ok(mut url) = Url::parse(dsn) {
846 let username = url.username().to_string();
847 let password = url.password().map(|value| value.to_string());
848 if !username.is_empty() || password.is_some() {
849 let _ = url.set_username("");
850 let _ = url.set_password(None);
851 }
852 client = client.with_url(url.as_str());
853 if !username.is_empty() {
854 client = client.with_user(username);
855 }
856 if let Some(password) = password {
857 client = client.with_password(password);
858 }
859 } else {
860 client = client.with_url(dsn);
861 }
862 client
863}
864
865#[derive(Debug, Error)]
867pub enum PluginRunnerError {
868 #[error("clickhouse error: {0}")]
870 Clickhouse(#[from] clickhouse::error::Error),
871 #[error("firehose error at slot {slot}: {details}")]
873 Firehose {
874 details: String,
876 slot: u64,
878 },
879 #[error("plugin {plugin} failed during {stage}: {details}")]
881 PluginLifecycle {
882 plugin: &'static str,
884 stage: &'static str,
886 details: String,
888 },
889}
890
891#[derive(Clone)]
892struct PluginHandle {
893 plugin: Arc<dyn Plugin>,
894 id: u16,
895 name: &'static str,
896 version: u16,
897}
898
899impl From<Arc<dyn Plugin>> for PluginHandle {
900 fn from(plugin: Arc<dyn Plugin>) -> Self {
901 let id = plugin.id();
902 let name = plugin.name();
903 let version = plugin.version();
904 Self {
905 plugin,
906 id,
907 name,
908 version,
909 }
910 }
911}
912
913#[derive(Row, Serialize)]
914struct PluginRow<'a> {
915 id: u32,
916 name: &'a str,
917 version: u32,
918}
919
920#[derive(Row, Serialize)]
921struct PluginSlotRow {
922 plugin_id: u32,
923 slot: u64,
924}
925
926#[derive(Row, Serialize)]
927struct SlotStatusRow {
928 slot: u64,
929 transaction_count: u32,
930 vote_transaction_count: u32,
931 non_vote_transaction_count: u32,
932 thread_id: u8,
933 block_time: u32,
934}
935
936#[derive(Default, Clone, Copy)]
937struct SlotTxTally {
938 votes: u64,
939 non_votes: u64,
940}
941
942static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally, ahash::RandomState>> =
943 Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
944
945async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
946 db.query(
947 r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
948 slot UInt64,
949 transaction_count UInt32 DEFAULT 0,
950 vote_transaction_count UInt32 DEFAULT 0,
951 non_vote_transaction_count UInt32 DEFAULT 0,
952 thread_id UInt8 DEFAULT 0,
953 block_time DateTime('UTC') DEFAULT toDateTime(0),
954 indexed_at DateTime('UTC') DEFAULT now()
955 ) ENGINE = ReplacingMergeTree(indexed_at)
956 ORDER BY slot"#,
957 )
958 .execute()
959 .await?;
960
961 db.query(
962 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
963 id UInt32,
964 name String,
965 version UInt32
966 ) ENGINE = ReplacingMergeTree
967 ORDER BY id"#,
968 )
969 .execute()
970 .await?;
971
972 db.query(
973 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
974 plugin_id UInt32,
975 slot UInt64,
976 indexed_at DateTime('UTC') DEFAULT now()
977 ) ENGINE = ReplacingMergeTree
978 ORDER BY (plugin_id, slot)"#,
979 )
980 .execute()
981 .await?;
982
983 Ok(())
984}
985
986async fn upsert_plugins(
987 db: &Client,
988 plugins: &[PluginHandle],
989) -> Result<(), clickhouse::error::Error> {
990 if plugins.is_empty() {
991 return Ok(());
992 }
993 let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
994 for handle in plugins {
995 insert
996 .write(&PluginRow {
997 id: handle.id as u32,
998 name: handle.name,
999 version: handle.version as u32,
1000 })
1001 .await?;
1002 }
1003 insert.end().await?;
1004 Ok(())
1005}
1006
1007async fn record_plugin_slot(
1008 db: Arc<Client>,
1009 plugin_id: u16,
1010 slot: u64,
1011) -> Result<(), clickhouse::error::Error> {
1012 let mut insert = db
1013 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1014 .await?;
1015 insert
1016 .write(&PluginSlotRow {
1017 plugin_id: plugin_id as u32,
1018 slot,
1019 })
1020 .await?;
1021 insert.end().await?;
1022 Ok(())
1023}
1024
1025async fn flush_slot_buffer(
1026 db: Arc<Client>,
1027 buffer: Arc<DashMap<u16, Vec<PluginSlotRow>, ahash::RandomState>>,
1028) -> Result<(), clickhouse::error::Error> {
1029 let mut rows = Vec::new();
1030 buffer.iter_mut().for_each(|mut entry| {
1031 if !entry.value().is_empty() {
1032 rows.append(entry.value_mut());
1033 }
1034 });
1035
1036 if rows.is_empty() {
1037 return Ok(());
1038 }
1039
1040 let mut insert = db
1041 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1042 .await?;
1043 for row in rows {
1044 insert.write(&row).await?;
1045 }
1046 insert.end().await?;
1047 Ok(())
1048}
1049
1050async fn record_slot_status(
1051 db: Arc<Client>,
1052 slot: u64,
1053 thread_id: usize,
1054 transaction_count: u64,
1055 vote_transaction_count: u64,
1056 non_vote_transaction_count: u64,
1057 block_time: Option<i64>,
1058) -> Result<(), clickhouse::error::Error> {
1059 let mut insert = db
1060 .insert::<SlotStatusRow>("jetstreamer_slot_status")
1061 .await?;
1062 insert
1063 .write(&SlotStatusRow {
1064 slot,
1065 transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1066 vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1067 non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1068 thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1069 block_time: clamp_block_time(block_time),
1070 })
1071 .await?;
1072 insert.end().await?;
1073 Ok(())
1074}
1075
1076fn clamp_block_time(block_time: Option<i64>) -> u32 {
1077 match block_time {
1078 Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1079 Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1080 Some(ts) if ts < 0 => 0,
1081 _ => 0,
1082 }
1083}
1084
1085fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1086 let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1087 if is_vote {
1088 entry.votes = entry.votes.saturating_add(1);
1089 } else {
1090 entry.non_votes = entry.non_votes.saturating_add(1);
1091 }
1092}
1093
1094fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1095 SLOT_TX_TALLY
1096 .remove(&slot)
1097 .map(|(_, tally)| tally)
1098 .unwrap_or_default()
1099}
1100
1101trait _CanSend: Send + Sync + 'static {}
1103impl _CanSend for PluginRunnerError {}
1104
1105#[inline]
1106fn human_readable_count(value: impl Into<u128>) -> String {
1107 let digits = value.into().to_string();
1108 let len = digits.len();
1109 let mut formatted = String::with_capacity(len + len / 3);
1110 for (idx, byte) in digits.bytes().enumerate() {
1111 if idx != 0 && (len - idx) % 3 == 0 {
1112 formatted.push(',');
1113 }
1114 formatted.push(char::from(byte));
1115 }
1116 formatted
1117}
1118
1119fn human_readable_duration(seconds: f64) -> String {
1120 if !seconds.is_finite() {
1121 return "n/a".into();
1122 }
1123 if seconds <= 0.0 {
1124 return "0s".into();
1125 }
1126 if seconds < 60.0 {
1127 return format!("{:.1}s", seconds);
1128 }
1129 let duration = Duration::from_secs(seconds.round() as u64);
1130 let secs = duration.as_secs();
1131 let days = secs / 86_400;
1132 let hours = (secs % 86_400) / 3_600;
1133 let minutes = (secs % 3_600) / 60;
1134 let seconds_rem = secs % 60;
1135 if days > 0 {
1136 if hours > 0 {
1137 format!("{}d{}h", days, hours)
1138 } else {
1139 format!("{}d", days)
1140 }
1141 } else if hours > 0 {
1142 format!("{}h{}m", hours, minutes)
1143 } else {
1144 format!("{}m{}s", minutes, seconds_rem)
1145 }
1146}