1#![deny(missing_docs)]
2pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114 fmt::Display,
115 future::Future,
116 hint,
117 ops::Range,
118 pin::Pin,
119 sync::{
120 Arc,
121 atomic::{AtomicBool, AtomicU64, Ordering},
122 },
123 time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130 BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137use url::Url;
138
139pub use jetstreamer_firehose::firehose::{
141 FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
142};
143
144static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
147static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
148static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
149#[inline]
150fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
151 origin.elapsed().as_nanos() as u64
152}
153
154pub type PluginFuture<'a> = Pin<
156 Box<
157 dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
158 + Send
159 + 'a,
160 >,
161>;
162
163pub trait Plugin: Send + Sync + 'static {
167 fn name(&self) -> &'static str;
169
170 fn version(&self) -> u16 {
172 1
173 }
174
175 fn id(&self) -> u16 {
177 let hash = Sha256::digest(self.name());
178 let mut res = 1u16;
179 for byte in hash {
180 res = res.wrapping_mul(31).wrapping_add(byte as u16);
181 }
182 res
183 }
184
185 fn on_transaction<'a>(
187 &'a self,
188 _thread_id: usize,
189 _db: Option<Arc<Client>>,
190 _transaction: &'a TransactionData,
191 ) -> PluginFuture<'a> {
192 async move { Ok(()) }.boxed()
193 }
194
195 fn on_block<'a>(
197 &'a self,
198 _thread_id: usize,
199 _db: Option<Arc<Client>>,
200 _block: &'a BlockData,
201 ) -> PluginFuture<'a> {
202 async move { Ok(()) }.boxed()
203 }
204
205 fn on_entry<'a>(
207 &'a self,
208 _thread_id: usize,
209 _db: Option<Arc<Client>>,
210 _entry: &'a EntryData,
211 ) -> PluginFuture<'a> {
212 async move { Ok(()) }.boxed()
213 }
214
215 fn on_reward<'a>(
217 &'a self,
218 _thread_id: usize,
219 _db: Option<Arc<Client>>,
220 _reward: &'a RewardsData,
221 ) -> PluginFuture<'a> {
222 async move { Ok(()) }.boxed()
223 }
224
225 fn on_error<'a>(
227 &'a self,
228 _thread_id: usize,
229 _db: Option<Arc<Client>>,
230 _error: &'a FirehoseErrorContext,
231 ) -> PluginFuture<'a> {
232 async move { Ok(()) }.boxed()
233 }
234
235 fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
237 async move { Ok(()) }.boxed()
238 }
239
240 fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
242 async move { Ok(()) }.boxed()
243 }
244}
245
246#[derive(Clone)]
250pub struct PluginRunner {
251 plugins: Arc<Vec<Arc<dyn Plugin>>>,
252 clickhouse_dsn: String,
253 num_threads: usize,
254 db_update_interval_slots: u64,
255}
256
257impl PluginRunner {
258 pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self {
260 Self {
261 plugins: Arc::new(Vec::new()),
262 clickhouse_dsn: clickhouse_dsn.to_string(),
263 num_threads: std::cmp::max(1, num_threads),
264 db_update_interval_slots: 100,
265 }
266 }
267
268 pub fn register(&mut self, plugin: Box<dyn Plugin>) {
270 Arc::get_mut(&mut self.plugins)
271 .expect("cannot register plugins after the runner has started")
272 .push(Arc::from(plugin));
273 }
274
275 pub async fn run(
277 self: Arc<Self>,
278 slot_range: Range<u64>,
279 clickhouse_enabled: bool,
280 ) -> Result<(), PluginRunnerError> {
281 let db_update_interval = self.db_update_interval_slots.max(1);
282 let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
283 self.plugins
284 .iter()
285 .cloned()
286 .map(PluginHandle::from)
287 .collect(),
288 );
289
290 let clickhouse = if clickhouse_enabled {
291 let client = Arc::new(
292 build_clickhouse_client(&self.clickhouse_dsn)
293 .with_option("async_insert", "1")
294 .with_option("wait_for_async_insert", "0"),
295 );
296 ensure_clickhouse_tables(client.as_ref()).await?;
297 upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
298 Some(client)
299 } else {
300 None
301 };
302
303 for handle in plugin_handles.iter() {
304 if let Err(error) = handle
305 .plugin
306 .on_load(clickhouse.clone())
307 .await
308 .map_err(|e| e.to_string())
309 {
310 return Err(PluginRunnerError::PluginLifecycle {
311 plugin: handle.name,
312 stage: "on_load",
313 details: error,
314 });
315 }
316 }
317
318 let shutting_down = Arc::new(AtomicBool::new(false));
319 let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
320 let clickhouse_enabled = clickhouse.is_some();
321 let slots_since_flush = Arc::new(AtomicU64::new(0));
322
323 let on_block = {
324 let plugin_handles = plugin_handles.clone();
325 let clickhouse = clickhouse.clone();
326 let slot_buffer = slot_buffer.clone();
327 let slots_since_flush = slots_since_flush.clone();
328 let shutting_down = shutting_down.clone();
329 move |thread_id: usize, block: BlockData| {
330 let plugin_handles = plugin_handles.clone();
331 let clickhouse = clickhouse.clone();
332 let slot_buffer = slot_buffer.clone();
333 let slots_since_flush = slots_since_flush.clone();
334 let shutting_down = shutting_down.clone();
335 async move {
336 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
337 if shutting_down.load(Ordering::SeqCst) {
338 log::debug!(
339 target: &log_target,
340 "ignoring block while shutdown is in progress"
341 );
342 return Ok(());
343 }
344 let block = Arc::new(block);
345 if !plugin_handles.is_empty() {
346 for handle in plugin_handles.iter() {
347 let db = clickhouse.clone();
348 if let Err(err) = handle
349 .plugin
350 .on_block(thread_id, db.clone(), block.as_ref())
351 .await
352 {
353 log::error!(
354 target: &log_target,
355 "plugin {} on_block error: {}",
356 handle.name,
357 err
358 );
359 continue;
360 }
361 if let (Some(db_client), BlockData::Block { slot, .. }) =
362 (clickhouse.clone(), block.as_ref())
363 {
364 if clickhouse_enabled {
365 slot_buffer
366 .entry(handle.id)
367 .or_default()
368 .push(PluginSlotRow {
369 plugin_id: handle.id as u32,
370 slot: *slot,
371 });
372 } else if let Err(err) =
373 record_plugin_slot(db_client, handle.id, *slot).await
374 {
375 log::error!(
376 target: &log_target,
377 "failed to record plugin slot for {}: {}",
378 handle.name,
379 err
380 );
381 }
382 }
383 }
384 if clickhouse_enabled {
385 let current = slots_since_flush
386 .fetch_add(1, Ordering::Relaxed)
387 .wrapping_add(1);
388 if current.is_multiple_of(db_update_interval)
389 && let Some(db_client) = clickhouse.clone()
390 {
391 let buffer = slot_buffer.clone();
392 let log_target_clone = log_target.clone();
393 tokio::spawn(async move {
394 if let Err(err) = flush_slot_buffer(db_client, buffer).await {
395 log::error!(
396 target: &log_target_clone,
397 "failed to flush buffered plugin slots: {}",
398 err
399 );
400 }
401 });
402 }
403 }
404 }
405 if let Some(db_client) = clickhouse.clone() {
406 match block.as_ref() {
407 BlockData::Block {
408 slot,
409 executed_transaction_count,
410 block_time,
411 ..
412 } => {
413 let tally = take_slot_tx_tally(*slot);
414 if let Err(err) = record_slot_status(
415 db_client,
416 *slot,
417 thread_id,
418 *executed_transaction_count,
419 tally.votes,
420 tally.non_votes,
421 *block_time,
422 )
423 .await
424 {
425 log::error!(
426 target: &log_target,
427 "failed to record slot status: {}",
428 err
429 );
430 }
431 }
432 BlockData::PossibleLeaderSkipped { slot } => {
433 take_slot_tx_tally(*slot);
435 }
436 }
437 }
438 Ok(())
439 }
440 .boxed()
441 }
442 };
443
444 let on_transaction = {
445 let plugin_handles = plugin_handles.clone();
446 let clickhouse = clickhouse.clone();
447 let shutting_down = shutting_down.clone();
448 move |thread_id: usize, transaction: TransactionData| {
449 let plugin_handles = plugin_handles.clone();
450 let clickhouse = clickhouse.clone();
451 let shutting_down = shutting_down.clone();
452 async move {
453 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
454 record_slot_vote_tally(transaction.slot, transaction.is_vote);
455 if plugin_handles.is_empty() {
456 return Ok(());
457 }
458 if shutting_down.load(Ordering::SeqCst) {
459 log::debug!(
460 target: &log_target,
461 "ignoring transaction while shutdown is in progress"
462 );
463 return Ok(());
464 }
465 let transaction = Arc::new(transaction);
466 for handle in plugin_handles.iter() {
467 if let Err(err) = handle
468 .plugin
469 .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref())
470 .await
471 {
472 log::error!(
473 target: &log_target,
474 "plugin {} on_transaction error: {}",
475 handle.name,
476 err
477 );
478 }
479 }
480 Ok(())
481 }
482 .boxed()
483 }
484 };
485
486 let on_entry = {
487 let plugin_handles = plugin_handles.clone();
488 let clickhouse = clickhouse.clone();
489 let shutting_down = shutting_down.clone();
490 move |thread_id: usize, entry: EntryData| {
491 let plugin_handles = plugin_handles.clone();
492 let clickhouse = clickhouse.clone();
493 let shutting_down = shutting_down.clone();
494 async move {
495 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
496 if plugin_handles.is_empty() {
497 return Ok(());
498 }
499 if shutting_down.load(Ordering::SeqCst) {
500 log::debug!(
501 target: &log_target,
502 "ignoring entry while shutdown is in progress"
503 );
504 return Ok(());
505 }
506 let entry = Arc::new(entry);
507 for handle in plugin_handles.iter() {
508 if let Err(err) = handle
509 .plugin
510 .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
511 .await
512 {
513 log::error!(
514 target: &log_target,
515 "plugin {} on_entry error: {}",
516 handle.name,
517 err
518 );
519 }
520 }
521 Ok(())
522 }
523 .boxed()
524 }
525 };
526
527 let on_reward = {
528 let plugin_handles = plugin_handles.clone();
529 let clickhouse = clickhouse.clone();
530 let shutting_down = shutting_down.clone();
531 move |thread_id: usize, reward: RewardsData| {
532 let plugin_handles = plugin_handles.clone();
533 let clickhouse = clickhouse.clone();
534 let shutting_down = shutting_down.clone();
535 async move {
536 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
537 if plugin_handles.is_empty() {
538 return Ok(());
539 }
540 if shutting_down.load(Ordering::SeqCst) {
541 log::debug!(
542 target: &log_target,
543 "ignoring reward while shutdown is in progress"
544 );
545 return Ok(());
546 }
547 let reward = Arc::new(reward);
548 for handle in plugin_handles.iter() {
549 if let Err(err) = handle
550 .plugin
551 .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
552 .await
553 {
554 log::error!(
555 target: &log_target,
556 "plugin {} on_reward error: {}",
557 handle.name,
558 err
559 );
560 }
561 }
562 Ok(())
563 }
564 .boxed()
565 }
566 };
567
568 let on_error = {
569 let plugin_handles = plugin_handles.clone();
570 let clickhouse = clickhouse.clone();
571 let shutting_down = shutting_down.clone();
572 move |thread_id: usize, context: FirehoseErrorContext| {
573 let plugin_handles = plugin_handles.clone();
574 let clickhouse = clickhouse.clone();
575 let shutting_down = shutting_down.clone();
576 async move {
577 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
578 if plugin_handles.is_empty() {
579 return Ok(());
580 }
581 if shutting_down.load(Ordering::SeqCst) {
582 log::debug!(
583 target: &log_target,
584 "ignoring error callback while shutdown is in progress"
585 );
586 return Ok(());
587 }
588 let context = Arc::new(context);
589 for handle in plugin_handles.iter() {
590 if let Err(err) = handle
591 .plugin
592 .on_error(thread_id, clickhouse.clone(), context.as_ref())
593 .await
594 {
595 log::error!(
596 target: &log_target,
597 "plugin {} on_error error: {}",
598 handle.name,
599 err
600 );
601 }
602 }
603 Ok(())
604 }
605 .boxed()
606 }
607 };
608
609 let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
610
611 let total_slot_count_capture = total_slot_count;
612 let run_origin = std::time::Instant::now();
613 SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
615 LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
616 LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
617 LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
618 let stats_tracking = clickhouse.clone().map(|_db| {
619 let shutting_down = shutting_down.clone();
620 let thread_progress_max: Arc<DashMap<usize, f64>> = Arc::new(DashMap::new());
621 StatsTracking {
622 on_stats: {
623 let thread_progress_max = thread_progress_max.clone();
624 let total_slot_count = total_slot_count_capture;
625 move |thread_id: usize, stats: Stats| {
626 let shutting_down = shutting_down.clone();
627 let thread_progress_max = thread_progress_max.clone();
628 async move {
629 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
630 if shutting_down.load(Ordering::SeqCst) {
631 log::debug!(
632 target: &log_target,
633 "skipping stats write during shutdown"
634 );
635 return Ok(());
636 }
637 let finish_at = stats
638 .finish_time
639 .unwrap_or_else(std::time::Instant::now);
640 let elapsed_since_start = finish_at
641 .saturating_duration_since(stats.start_time)
642 .as_nanos()
643 .max(1) as u64;
644 let total_slots = stats.slots_processed;
645 let total_txs = stats.transactions_processed;
646 let now_ns = monotonic_nanos_since(run_origin);
647 let (delta_slots, delta_txs, delta_time_ns) = {
651 while SNAPSHOT_LOCK
652 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
653 .is_err()
654 {
655 hint::spin_loop();
656 }
657 let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
658 let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
659 let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
660 LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
661 LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
662 LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
663 SNAPSHOT_LOCK.store(false, Ordering::Release);
664 let delta_slots = total_slots.saturating_sub(prev_slots);
665 let delta_txs = total_txs.saturating_sub(prev_txs);
666 let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
667 (delta_slots, delta_txs, delta_time_ns)
668 };
669 let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
670 let mut slot_rate = delta_slots as f64 / delta_secs;
671 let mut tps = delta_txs as f64 / delta_secs;
672 if slot_rate <= 0.0 && total_slots > 0 {
673 slot_rate =
674 total_slots as f64 / (elapsed_since_start as f64 / 1e9);
675 }
676 if tps <= 0.0 && total_txs > 0 {
677 tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
678 }
679 let thread_stats = &stats.thread_stats;
680 let processed_slots = stats.slots_processed.min(total_slot_count);
681 let progress_fraction = if total_slot_count > 0 {
682 processed_slots as f64 / total_slot_count as f64
683 } else {
684 1.0
685 };
686 let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
687 let thread_total_slots = thread_stats
688 .initial_slot_range
689 .end
690 .saturating_sub(thread_stats.initial_slot_range.start);
691 let thread_progress_raw = if thread_total_slots > 0 {
692 (thread_stats.slots_processed as f64 / thread_total_slots as f64)
693 .clamp(0.0, 1.0)
694 * 100.0
695 } else {
696 100.0
697 };
698 let thread_progress = *thread_progress_max
699 .entry(thread_id)
700 .and_modify(|max| {
701 if thread_progress_raw > *max {
702 *max = thread_progress_raw;
703 }
704 })
705 .or_insert(thread_progress_raw);
706 let mut overall_eta = None;
707 if slot_rate > 0.0 {
708 let remaining_slots =
709 total_slot_count.saturating_sub(processed_slots);
710 overall_eta = Some(human_readable_duration(
711 remaining_slots as f64 / slot_rate,
712 ));
713 }
714 if overall_eta.is_none() {
715 if progress_fraction > 0.0 && progress_fraction < 1.0 {
716 if let Some(elapsed_total) = finish_at
717 .checked_duration_since(stats.start_time)
718 .map(|d| d.as_secs_f64())
719 && elapsed_total > 0.0 {
720 let remaining_secs =
721 elapsed_total * (1.0 / progress_fraction - 1.0);
722 overall_eta = Some(human_readable_duration(remaining_secs));
723 }
724 } else if progress_fraction >= 1.0 {
725 overall_eta = Some("0s".into());
726 }
727 }
728 let slots_display = human_readable_count(processed_slots);
729 let blocks_display = human_readable_count(stats.blocks_processed);
730 let txs_display = human_readable_count(stats.transactions_processed);
731 let tps_display = human_readable_count(tps.ceil() as u64);
732 log::info!(
733 target: &log_target,
734 "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
735 overall_eta.unwrap_or_else(|| "n/a".into()),
736 );
737 Ok(())
738 }
739 .boxed()
740 }
741 },
742 tracking_interval_slots: 100,
743 }
744 });
745
746 let (shutdown_tx, _) = broadcast::channel::<()>(1);
747
748 let mut firehose_future = Box::pin(firehose(
749 self.num_threads as u64,
750 slot_range,
751 Some(on_block),
752 Some(on_transaction),
753 Some(on_entry),
754 Some(on_reward),
755 Some(on_error),
756 stats_tracking,
757 Some(shutdown_tx.subscribe()),
758 ));
759
760 let firehose_result = tokio::select! {
761 res = &mut firehose_future => res,
762 ctrl = signal::ctrl_c() => {
763 match ctrl {
764 Ok(()) => log::info!(
765 target: LOG_MODULE,
766 "CTRL+C received; initiating shutdown"
767 ),
768 Err(err) => log::error!(
769 target: LOG_MODULE,
770 "failed to listen for CTRL+C: {}",
771 err
772 ),
773 }
774 shutting_down.store(true, Ordering::SeqCst);
775 let _ = shutdown_tx.send(());
776 firehose_future.await
777 }
778 };
779
780 if clickhouse_enabled
781 && let Some(db_client) = clickhouse.clone()
782 && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
783 {
784 log::error!(
785 target: LOG_MODULE,
786 "failed to flush buffered plugin slots: {}",
787 err
788 );
789 }
790
791 for handle in plugin_handles.iter() {
792 if let Err(error) = handle
793 .plugin
794 .on_exit(clickhouse.clone())
795 .await
796 .map_err(|e| e.to_string())
797 {
798 log::error!(
799 target: LOG_MODULE,
800 "plugin {} on_exit error: {}",
801 handle.name,
802 error
803 );
804 }
805 }
806
807 match firehose_result {
808 Ok(()) => Ok(()),
809 Err((error, slot)) => Err(PluginRunnerError::Firehose {
810 details: error.to_string(),
811 slot,
812 }),
813 }
814 }
815}
816
817fn build_clickhouse_client(dsn: &str) -> Client {
818 let mut client = Client::default();
819 if let Ok(mut url) = Url::parse(dsn) {
820 let username = url.username().to_string();
821 let password = url.password().map(|value| value.to_string());
822 if !username.is_empty() || password.is_some() {
823 let _ = url.set_username("");
824 let _ = url.set_password(None);
825 }
826 client = client.with_url(url.as_str());
827 if !username.is_empty() {
828 client = client.with_user(username);
829 }
830 if let Some(password) = password {
831 client = client.with_password(password);
832 }
833 } else {
834 client = client.with_url(dsn);
835 }
836 client
837}
838
839#[derive(Debug, Error)]
841pub enum PluginRunnerError {
842 #[error("clickhouse error: {0}")]
844 Clickhouse(#[from] clickhouse::error::Error),
845 #[error("firehose error at slot {slot}: {details}")]
847 Firehose {
848 details: String,
850 slot: u64,
852 },
853 #[error("plugin {plugin} failed during {stage}: {details}")]
855 PluginLifecycle {
856 plugin: &'static str,
858 stage: &'static str,
860 details: String,
862 },
863}
864
865#[derive(Clone)]
866struct PluginHandle {
867 plugin: Arc<dyn Plugin>,
868 id: u16,
869 name: &'static str,
870 version: u16,
871}
872
873impl From<Arc<dyn Plugin>> for PluginHandle {
874 fn from(plugin: Arc<dyn Plugin>) -> Self {
875 let id = plugin.id();
876 let name = plugin.name();
877 let version = plugin.version();
878 Self {
879 plugin,
880 id,
881 name,
882 version,
883 }
884 }
885}
886
887#[derive(Row, Serialize)]
888struct PluginRow<'a> {
889 id: u32,
890 name: &'a str,
891 version: u32,
892}
893
894#[derive(Row, Serialize)]
895struct PluginSlotRow {
896 plugin_id: u32,
897 slot: u64,
898}
899
900#[derive(Row, Serialize)]
901struct SlotStatusRow {
902 slot: u64,
903 transaction_count: u32,
904 vote_transaction_count: u32,
905 non_vote_transaction_count: u32,
906 thread_id: u8,
907 block_time: u32,
908}
909
910#[derive(Default, Clone, Copy)]
911struct SlotTxTally {
912 votes: u64,
913 non_votes: u64,
914}
915
916static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally>> = Lazy::new(DashMap::new);
917
918async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
919 db.query(
920 r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
921 slot UInt64,
922 transaction_count UInt32 DEFAULT 0,
923 vote_transaction_count UInt32 DEFAULT 0,
924 non_vote_transaction_count UInt32 DEFAULT 0,
925 thread_id UInt8 DEFAULT 0,
926 block_time DateTime('UTC') DEFAULT toDateTime(0),
927 indexed_at DateTime('UTC') DEFAULT now()
928 ) ENGINE = ReplacingMergeTree(indexed_at)
929 ORDER BY slot"#,
930 )
931 .execute()
932 .await?;
933
934 db.query(
935 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
936 id UInt32,
937 name String,
938 version UInt32
939 ) ENGINE = ReplacingMergeTree
940 ORDER BY id"#,
941 )
942 .execute()
943 .await?;
944
945 db.query(
946 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
947 plugin_id UInt32,
948 slot UInt64,
949 indexed_at DateTime('UTC') DEFAULT now()
950 ) ENGINE = ReplacingMergeTree
951 ORDER BY (plugin_id, slot)"#,
952 )
953 .execute()
954 .await?;
955
956 Ok(())
957}
958
959async fn upsert_plugins(
960 db: &Client,
961 plugins: &[PluginHandle],
962) -> Result<(), clickhouse::error::Error> {
963 if plugins.is_empty() {
964 return Ok(());
965 }
966 let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
967 for handle in plugins {
968 insert
969 .write(&PluginRow {
970 id: handle.id as u32,
971 name: handle.name,
972 version: handle.version as u32,
973 })
974 .await?;
975 }
976 insert.end().await?;
977 Ok(())
978}
979
980async fn record_plugin_slot(
981 db: Arc<Client>,
982 plugin_id: u16,
983 slot: u64,
984) -> Result<(), clickhouse::error::Error> {
985 let mut insert = db
986 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
987 .await?;
988 insert
989 .write(&PluginSlotRow {
990 plugin_id: plugin_id as u32,
991 slot,
992 })
993 .await?;
994 insert.end().await?;
995 Ok(())
996}
997
998async fn flush_slot_buffer(
999 db: Arc<Client>,
1000 buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
1001) -> Result<(), clickhouse::error::Error> {
1002 let mut rows = Vec::new();
1003 buffer.iter_mut().for_each(|mut entry| {
1004 if !entry.value().is_empty() {
1005 rows.append(entry.value_mut());
1006 }
1007 });
1008
1009 if rows.is_empty() {
1010 return Ok(());
1011 }
1012
1013 let mut insert = db
1014 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1015 .await?;
1016 for row in rows {
1017 insert.write(&row).await?;
1018 }
1019 insert.end().await?;
1020 Ok(())
1021}
1022
1023async fn record_slot_status(
1024 db: Arc<Client>,
1025 slot: u64,
1026 thread_id: usize,
1027 transaction_count: u64,
1028 vote_transaction_count: u64,
1029 non_vote_transaction_count: u64,
1030 block_time: Option<i64>,
1031) -> Result<(), clickhouse::error::Error> {
1032 let mut insert = db
1033 .insert::<SlotStatusRow>("jetstreamer_slot_status")
1034 .await?;
1035 insert
1036 .write(&SlotStatusRow {
1037 slot,
1038 transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1039 vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1040 non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1041 thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1042 block_time: clamp_block_time(block_time),
1043 })
1044 .await?;
1045 insert.end().await?;
1046 Ok(())
1047}
1048
1049fn clamp_block_time(block_time: Option<i64>) -> u32 {
1050 match block_time {
1051 Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1052 Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1053 Some(ts) if ts < 0 => 0,
1054 _ => 0,
1055 }
1056}
1057
1058fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1059 let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1060 if is_vote {
1061 entry.votes = entry.votes.saturating_add(1);
1062 } else {
1063 entry.non_votes = entry.non_votes.saturating_add(1);
1064 }
1065}
1066
1067fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1068 SLOT_TX_TALLY
1069 .remove(&slot)
1070 .map(|(_, tally)| tally)
1071 .unwrap_or_default()
1072}
1073
1074trait _CanSend: Send + Sync + 'static {}
1076impl _CanSend for PluginRunnerError {}
1077
1078#[inline]
1079fn human_readable_count(value: impl Into<u128>) -> String {
1080 let digits = value.into().to_string();
1081 let len = digits.len();
1082 let mut formatted = String::with_capacity(len + len / 3);
1083 for (idx, byte) in digits.bytes().enumerate() {
1084 if idx != 0 && (len - idx) % 3 == 0 {
1085 formatted.push(',');
1086 }
1087 formatted.push(char::from(byte));
1088 }
1089 formatted
1090}
1091
1092fn human_readable_duration(seconds: f64) -> String {
1093 if !seconds.is_finite() {
1094 return "n/a".into();
1095 }
1096 if seconds <= 0.0 {
1097 return "0s".into();
1098 }
1099 if seconds < 60.0 {
1100 return format!("{:.1}s", seconds);
1101 }
1102 let duration = Duration::from_secs(seconds.round() as u64);
1103 let secs = duration.as_secs();
1104 let days = secs / 86_400;
1105 let hours = (secs % 86_400) / 3_600;
1106 let minutes = (secs % 3_600) / 60;
1107 let seconds_rem = secs % 60;
1108 if days > 0 {
1109 if hours > 0 {
1110 format!("{}d{}h", days, hours)
1111 } else {
1112 format!("{}d", days)
1113 }
1114 } else if hours > 0 {
1115 format!("{}h{}m", hours, minutes)
1116 } else {
1117 format!("{}m{}s", minutes, seconds_rem)
1118 }
1119}