1#![deny(missing_docs)]
2pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114 fmt::Display,
115 future::Future,
116 hint,
117 ops::Range,
118 pin::Pin,
119 sync::{
120 Arc,
121 atomic::{AtomicBool, AtomicU64, Ordering},
122 },
123 time::Duration,
124};
125
126use clickhouse::{Client, Row};
127use dashmap::DashMap;
128use futures_util::FutureExt;
129use jetstreamer_firehose::firehose::{
130 BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
131};
132use once_cell::sync::Lazy;
133use serde::Serialize;
134use sha2::{Digest, Sha256};
135use thiserror::Error;
136use tokio::{signal, sync::broadcast};
137use url::Url;
138
139pub use jetstreamer_firehose::firehose::{
141 FirehoseErrorContext, Stats as FirehoseStats, ThreadStats,
142};
143
144static LAST_TOTAL_SLOTS: AtomicU64 = AtomicU64::new(0);
146static LAST_TOTAL_TXS: AtomicU64 = AtomicU64::new(0);
147static LAST_TOTAL_TIME_NS: AtomicU64 = AtomicU64::new(0);
148static SNAPSHOT_LOCK: AtomicBool = AtomicBool::new(false);
149#[inline]
150fn monotonic_nanos_since(origin: std::time::Instant) -> u64 {
151 origin.elapsed().as_nanos() as u64
152}
153
154pub type PluginFuture<'a> = Pin<
156 Box<
157 dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
158 + Send
159 + 'a,
160 >,
161>;
162
163pub trait Plugin: Send + Sync + 'static {
167 fn name(&self) -> &'static str;
169
170 fn version(&self) -> u16 {
172 1
173 }
174
175 fn id(&self) -> u16 {
177 let hash = Sha256::digest(self.name());
178 let mut res = 1u16;
179 for byte in hash {
180 res = res.wrapping_mul(31).wrapping_add(byte as u16);
181 }
182 res
183 }
184
185 fn on_transaction<'a>(
187 &'a self,
188 _thread_id: usize,
189 _db: Option<Arc<Client>>,
190 _transaction: &'a TransactionData,
191 ) -> PluginFuture<'a> {
192 async move { Ok(()) }.boxed()
193 }
194
195 fn on_block<'a>(
197 &'a self,
198 _thread_id: usize,
199 _db: Option<Arc<Client>>,
200 _block: &'a BlockData,
201 ) -> PluginFuture<'a> {
202 async move { Ok(()) }.boxed()
203 }
204
205 fn on_entry<'a>(
207 &'a self,
208 _thread_id: usize,
209 _db: Option<Arc<Client>>,
210 _entry: &'a EntryData,
211 ) -> PluginFuture<'a> {
212 async move { Ok(()) }.boxed()
213 }
214
215 fn on_reward<'a>(
217 &'a self,
218 _thread_id: usize,
219 _db: Option<Arc<Client>>,
220 _reward: &'a RewardsData,
221 ) -> PluginFuture<'a> {
222 async move { Ok(()) }.boxed()
223 }
224
225 fn on_error<'a>(
227 &'a self,
228 _thread_id: usize,
229 _db: Option<Arc<Client>>,
230 _error: &'a FirehoseErrorContext,
231 ) -> PluginFuture<'a> {
232 async move { Ok(()) }.boxed()
233 }
234
235 fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
237 async move { Ok(()) }.boxed()
238 }
239
240 fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
242 async move { Ok(()) }.boxed()
243 }
244}
245
246#[derive(Clone)]
250pub struct PluginRunner {
251 plugins: Arc<Vec<Arc<dyn Plugin>>>,
252 clickhouse_dsn: String,
253 num_threads: usize,
254 sequential: bool,
255 buffer_window_bytes: Option<u64>,
256 db_update_interval_slots: u64,
257}
258
259impl PluginRunner {
260 pub fn new(
265 clickhouse_dsn: impl Display,
266 num_threads: usize,
267 sequential: bool,
268 buffer_window_bytes: Option<u64>,
269 ) -> Self {
270 Self {
271 plugins: Arc::new(Vec::new()),
272 clickhouse_dsn: clickhouse_dsn.to_string(),
273 num_threads: std::cmp::max(1, num_threads),
274 sequential,
275 buffer_window_bytes,
276 db_update_interval_slots: 100,
277 }
278 }
279
280 pub fn register(&mut self, plugin: Box<dyn Plugin>) {
282 Arc::get_mut(&mut self.plugins)
283 .expect("cannot register plugins after the runner has started")
284 .push(Arc::from(plugin));
285 }
286
287 pub async fn run(
289 self: Arc<Self>,
290 slot_range: Range<u64>,
291 clickhouse_enabled: bool,
292 ) -> Result<(), PluginRunnerError> {
293 let db_update_interval = self.db_update_interval_slots.max(1);
294 let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
295 self.plugins
296 .iter()
297 .cloned()
298 .map(PluginHandle::from)
299 .collect(),
300 );
301
302 let clickhouse = if clickhouse_enabled {
303 let client = Arc::new(
304 build_clickhouse_client(&self.clickhouse_dsn)
305 .with_option("async_insert", "1")
306 .with_option("wait_for_async_insert", "0"),
307 );
308 ensure_clickhouse_tables(client.as_ref()).await?;
309 upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
310 Some(client)
311 } else {
312 None
313 };
314
315 for handle in plugin_handles.iter() {
316 if let Err(error) = handle
317 .plugin
318 .on_load(clickhouse.clone())
319 .await
320 .map_err(|e| e.to_string())
321 {
322 return Err(PluginRunnerError::PluginLifecycle {
323 plugin: handle.name,
324 stage: "on_load",
325 details: error,
326 });
327 }
328 }
329
330 let shutting_down = Arc::new(AtomicBool::new(false));
331 let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
332 let clickhouse_enabled = clickhouse.is_some();
333 let slots_since_flush = Arc::new(AtomicU64::new(0));
334
335 let on_block = {
336 let plugin_handles = plugin_handles.clone();
337 let clickhouse = clickhouse.clone();
338 let slot_buffer = slot_buffer.clone();
339 let slots_since_flush = slots_since_flush.clone();
340 let shutting_down = shutting_down.clone();
341 move |thread_id: usize, block: BlockData| {
342 let plugin_handles = plugin_handles.clone();
343 let clickhouse = clickhouse.clone();
344 let slot_buffer = slot_buffer.clone();
345 let slots_since_flush = slots_since_flush.clone();
346 let shutting_down = shutting_down.clone();
347 async move {
348 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
349 if shutting_down.load(Ordering::SeqCst) {
350 log::debug!(
351 target: &log_target,
352 "ignoring block while shutdown is in progress"
353 );
354 return Ok(());
355 }
356 let block = Arc::new(block);
357 if !plugin_handles.is_empty() {
358 for handle in plugin_handles.iter() {
359 let db = clickhouse.clone();
360 if let Err(err) = handle
361 .plugin
362 .on_block(thread_id, db.clone(), block.as_ref())
363 .await
364 {
365 log::error!(
366 target: &log_target,
367 "plugin {} on_block error: {}",
368 handle.name,
369 err
370 );
371 continue;
372 }
373 if let (Some(db_client), BlockData::Block { slot, .. }) =
374 (clickhouse.clone(), block.as_ref())
375 {
376 if clickhouse_enabled {
377 slot_buffer
378 .entry(handle.id)
379 .or_default()
380 .push(PluginSlotRow {
381 plugin_id: handle.id as u32,
382 slot: *slot,
383 });
384 } else if let Err(err) =
385 record_plugin_slot(db_client, handle.id, *slot).await
386 {
387 log::error!(
388 target: &log_target,
389 "failed to record plugin slot for {}: {}",
390 handle.name,
391 err
392 );
393 }
394 }
395 }
396 if clickhouse_enabled {
397 let current = slots_since_flush
398 .fetch_add(1, Ordering::Relaxed)
399 .wrapping_add(1);
400 if current.is_multiple_of(db_update_interval)
401 && let Some(db_client) = clickhouse.clone()
402 {
403 let buffer = slot_buffer.clone();
404 let log_target_clone = log_target.clone();
405 tokio::spawn(async move {
406 if let Err(err) = flush_slot_buffer(db_client, buffer).await {
407 log::error!(
408 target: &log_target_clone,
409 "failed to flush buffered plugin slots: {}",
410 err
411 );
412 }
413 });
414 }
415 }
416 }
417 if let Some(db_client) = clickhouse.clone() {
418 match block.as_ref() {
419 BlockData::Block {
420 slot,
421 executed_transaction_count,
422 block_time,
423 ..
424 } => {
425 let tally = take_slot_tx_tally(*slot);
426 let slot = *slot;
427 let executed_transaction_count = *executed_transaction_count;
428 let block_time = *block_time;
429 let log_target_clone = log_target.clone();
430 tokio::spawn(async move {
431 if let Err(err) = record_slot_status(
432 db_client,
433 slot,
434 thread_id,
435 executed_transaction_count,
436 tally.votes,
437 tally.non_votes,
438 block_time,
439 )
440 .await
441 {
442 log::error!(
443 target: &log_target_clone,
444 "failed to record slot status: {}",
445 err
446 );
447 }
448 });
449 }
450 BlockData::PossibleLeaderSkipped { slot } => {
451 take_slot_tx_tally(*slot);
453 }
454 }
455 }
456 Ok(())
457 }
458 .boxed()
459 }
460 };
461
462 let on_transaction = {
463 let plugin_handles = plugin_handles.clone();
464 let clickhouse = clickhouse.clone();
465 let shutting_down = shutting_down.clone();
466 move |thread_id: usize, transaction: TransactionData| {
467 let plugin_handles = plugin_handles.clone();
468 let clickhouse = clickhouse.clone();
469 let shutting_down = shutting_down.clone();
470 async move {
471 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
472 record_slot_vote_tally(transaction.slot, transaction.is_vote);
473 if plugin_handles.is_empty() {
474 return Ok(());
475 }
476 if shutting_down.load(Ordering::SeqCst) {
477 log::debug!(
478 target: &log_target,
479 "ignoring transaction while shutdown is in progress"
480 );
481 return Ok(());
482 }
483 for handle in plugin_handles.iter() {
484 if let Err(err) = handle
485 .plugin
486 .on_transaction(thread_id, clickhouse.clone(), &transaction)
487 .await
488 {
489 log::error!(
490 target: &log_target,
491 "plugin {} on_transaction error: {}",
492 handle.name,
493 err
494 );
495 }
496 }
497 Ok(())
498 }
499 .boxed()
500 }
501 };
502
503 let on_entry = {
504 let plugin_handles = plugin_handles.clone();
505 let clickhouse = clickhouse.clone();
506 let shutting_down = shutting_down.clone();
507 move |thread_id: usize, entry: EntryData| {
508 let plugin_handles = plugin_handles.clone();
509 let clickhouse = clickhouse.clone();
510 let shutting_down = shutting_down.clone();
511 async move {
512 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
513 if plugin_handles.is_empty() {
514 return Ok(());
515 }
516 if shutting_down.load(Ordering::SeqCst) {
517 log::debug!(
518 target: &log_target,
519 "ignoring entry while shutdown is in progress"
520 );
521 return Ok(());
522 }
523 let entry = Arc::new(entry);
524 for handle in plugin_handles.iter() {
525 if let Err(err) = handle
526 .plugin
527 .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
528 .await
529 {
530 log::error!(
531 target: &log_target,
532 "plugin {} on_entry error: {}",
533 handle.name,
534 err
535 );
536 }
537 }
538 Ok(())
539 }
540 .boxed()
541 }
542 };
543
544 let on_reward = {
545 let plugin_handles = plugin_handles.clone();
546 let clickhouse = clickhouse.clone();
547 let shutting_down = shutting_down.clone();
548 move |thread_id: usize, reward: RewardsData| {
549 let plugin_handles = plugin_handles.clone();
550 let clickhouse = clickhouse.clone();
551 let shutting_down = shutting_down.clone();
552 async move {
553 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
554 if plugin_handles.is_empty() {
555 return Ok(());
556 }
557 if shutting_down.load(Ordering::SeqCst) {
558 log::debug!(
559 target: &log_target,
560 "ignoring reward while shutdown is in progress"
561 );
562 return Ok(());
563 }
564 let reward = Arc::new(reward);
565 for handle in plugin_handles.iter() {
566 if let Err(err) = handle
567 .plugin
568 .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
569 .await
570 {
571 log::error!(
572 target: &log_target,
573 "plugin {} on_reward error: {}",
574 handle.name,
575 err
576 );
577 }
578 }
579 Ok(())
580 }
581 .boxed()
582 }
583 };
584
585 let on_error = {
586 let plugin_handles = plugin_handles.clone();
587 let clickhouse = clickhouse.clone();
588 let shutting_down = shutting_down.clone();
589 move |thread_id: usize, context: FirehoseErrorContext| {
590 let plugin_handles = plugin_handles.clone();
591 let clickhouse = clickhouse.clone();
592 let shutting_down = shutting_down.clone();
593 async move {
594 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
595 if plugin_handles.is_empty() {
596 return Ok(());
597 }
598 if shutting_down.load(Ordering::SeqCst) {
599 log::debug!(
600 target: &log_target,
601 "ignoring error callback while shutdown is in progress"
602 );
603 return Ok(());
604 }
605 let context = Arc::new(context);
606 for handle in plugin_handles.iter() {
607 if let Err(err) = handle
608 .plugin
609 .on_error(thread_id, clickhouse.clone(), context.as_ref())
610 .await
611 {
612 log::error!(
613 target: &log_target,
614 "plugin {} on_error error: {}",
615 handle.name,
616 err
617 );
618 }
619 }
620 Ok(())
621 }
622 .boxed()
623 }
624 };
625
626 let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
627
628 let total_slot_count_capture = total_slot_count;
629 let run_origin = std::time::Instant::now();
630 SNAPSHOT_LOCK.store(false, Ordering::Relaxed);
632 LAST_TOTAL_SLOTS.store(0, Ordering::Relaxed);
633 LAST_TOTAL_TXS.store(0, Ordering::Relaxed);
634 LAST_TOTAL_TIME_NS.store(monotonic_nanos_since(run_origin), Ordering::Relaxed);
635 let stats_tracking = clickhouse.clone().map(|_db| {
636 let shutting_down = shutting_down.clone();
637 let thread_progress_max: Arc<DashMap<usize, f64>> = Arc::new(DashMap::new());
638 StatsTracking {
639 on_stats: {
640 let thread_progress_max = thread_progress_max.clone();
641 let total_slot_count = total_slot_count_capture;
642 move |thread_id: usize, stats: Stats| {
643 let shutting_down = shutting_down.clone();
644 let thread_progress_max = thread_progress_max.clone();
645 async move {
646 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
647 if shutting_down.load(Ordering::SeqCst) {
648 log::debug!(
649 target: &log_target,
650 "skipping stats write during shutdown"
651 );
652 return Ok(());
653 }
654 let finish_at = stats
655 .finish_time
656 .unwrap_or_else(std::time::Instant::now);
657 let elapsed_since_start = finish_at
658 .saturating_duration_since(stats.start_time)
659 .as_nanos()
660 .max(1) as u64;
661 let total_slots = stats.slots_processed;
662 let total_txs = stats.transactions_processed;
663 let now_ns = monotonic_nanos_since(run_origin);
664 let (delta_slots, delta_txs, delta_time_ns) = {
668 while SNAPSHOT_LOCK
669 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
670 .is_err()
671 {
672 hint::spin_loop();
673 }
674 let prev_slots = LAST_TOTAL_SLOTS.load(Ordering::Relaxed);
675 let prev_txs = LAST_TOTAL_TXS.load(Ordering::Relaxed);
676 let prev_time_ns = LAST_TOTAL_TIME_NS.load(Ordering::Relaxed);
677 LAST_TOTAL_SLOTS.store(total_slots, Ordering::Relaxed);
678 LAST_TOTAL_TXS.store(total_txs, Ordering::Relaxed);
679 LAST_TOTAL_TIME_NS.store(now_ns, Ordering::Relaxed);
680 SNAPSHOT_LOCK.store(false, Ordering::Release);
681 let delta_slots = total_slots.saturating_sub(prev_slots);
682 let delta_txs = total_txs.saturating_sub(prev_txs);
683 let delta_time_ns = now_ns.saturating_sub(prev_time_ns).max(1);
684 (delta_slots, delta_txs, delta_time_ns)
685 };
686 let delta_secs = (delta_time_ns as f64 / 1e9).max(1e-9);
687 let mut slot_rate = delta_slots as f64 / delta_secs;
688 let mut tps = delta_txs as f64 / delta_secs;
689 if slot_rate <= 0.0 && total_slots > 0 {
690 slot_rate =
691 total_slots as f64 / (elapsed_since_start as f64 / 1e9);
692 }
693 if tps <= 0.0 && total_txs > 0 {
694 tps = total_txs as f64 / (elapsed_since_start as f64 / 1e9);
695 }
696 let thread_stats = &stats.thread_stats;
697 let processed_slots = stats.slots_processed.min(total_slot_count);
698 let progress_fraction = if total_slot_count > 0 {
699 processed_slots as f64 / total_slot_count as f64
700 } else {
701 1.0
702 };
703 let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
704 let thread_total_slots = thread_stats
705 .initial_slot_range
706 .end
707 .saturating_sub(thread_stats.initial_slot_range.start);
708 let thread_progress_raw = if thread_total_slots > 0 {
709 (thread_stats.slots_processed as f64 / thread_total_slots as f64)
710 .clamp(0.0, 1.0)
711 * 100.0
712 } else {
713 100.0
714 };
715 let thread_progress = *thread_progress_max
716 .entry(thread_id)
717 .and_modify(|max| {
718 if thread_progress_raw > *max {
719 *max = thread_progress_raw;
720 }
721 })
722 .or_insert(thread_progress_raw);
723 let mut overall_eta = None;
724 if slot_rate > 0.0 {
725 let remaining_slots =
726 total_slot_count.saturating_sub(processed_slots);
727 overall_eta = Some(human_readable_duration(
728 remaining_slots as f64 / slot_rate,
729 ));
730 }
731 if overall_eta.is_none() {
732 if progress_fraction > 0.0 && progress_fraction < 1.0 {
733 if let Some(elapsed_total) = finish_at
734 .checked_duration_since(stats.start_time)
735 .map(|d| d.as_secs_f64())
736 && elapsed_total > 0.0 {
737 let remaining_secs =
738 elapsed_total * (1.0 / progress_fraction - 1.0);
739 overall_eta = Some(human_readable_duration(remaining_secs));
740 }
741 } else if progress_fraction >= 1.0 {
742 overall_eta = Some("0s".into());
743 }
744 }
745 let slots_display = human_readable_count(processed_slots);
746 let blocks_display = human_readable_count(stats.blocks_processed);
747 let txs_display = human_readable_count(stats.transactions_processed);
748 let tps_display = human_readable_count(tps.ceil() as u64);
749 log::info!(
750 target: &log_target,
751 "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
752 overall_eta.unwrap_or_else(|| "n/a".into()),
753 );
754 Ok(())
755 }
756 .boxed()
757 }
758 },
759 tracking_interval_slots: 100,
760 }
761 });
762
763 let (shutdown_tx, _) = broadcast::channel::<()>(1);
764
765 let mut firehose_future = Box::pin(firehose(
766 self.num_threads as u64,
767 self.sequential,
768 self.buffer_window_bytes,
769 slot_range,
770 Some(on_block),
771 Some(on_transaction),
772 Some(on_entry),
773 Some(on_reward),
774 Some(on_error),
775 stats_tracking,
776 Some(shutdown_tx.subscribe()),
777 ));
778
779 let firehose_result = tokio::select! {
780 res = &mut firehose_future => res,
781 ctrl = signal::ctrl_c() => {
782 match ctrl {
783 Ok(()) => log::info!(
784 target: LOG_MODULE,
785 "CTRL+C received; initiating shutdown"
786 ),
787 Err(err) => log::error!(
788 target: LOG_MODULE,
789 "failed to listen for CTRL+C: {}",
790 err
791 ),
792 }
793 shutting_down.store(true, Ordering::SeqCst);
794 let _ = shutdown_tx.send(());
795 firehose_future.await
796 }
797 };
798
799 if clickhouse_enabled
800 && let Some(db_client) = clickhouse.clone()
801 && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
802 {
803 log::error!(
804 target: LOG_MODULE,
805 "failed to flush buffered plugin slots: {}",
806 err
807 );
808 }
809
810 for handle in plugin_handles.iter() {
811 if let Err(error) = handle
812 .plugin
813 .on_exit(clickhouse.clone())
814 .await
815 .map_err(|e| e.to_string())
816 {
817 log::error!(
818 target: LOG_MODULE,
819 "plugin {} on_exit error: {}",
820 handle.name,
821 error
822 );
823 }
824 }
825
826 match firehose_result {
827 Ok(()) => Ok(()),
828 Err((error, slot)) => Err(PluginRunnerError::Firehose {
829 details: error.to_string(),
830 slot,
831 }),
832 }
833 }
834}
835
836fn build_clickhouse_client(dsn: &str) -> Client {
837 let mut client = Client::default();
838 if let Ok(mut url) = Url::parse(dsn) {
839 let username = url.username().to_string();
840 let password = url.password().map(|value| value.to_string());
841 if !username.is_empty() || password.is_some() {
842 let _ = url.set_username("");
843 let _ = url.set_password(None);
844 }
845 client = client.with_url(url.as_str());
846 if !username.is_empty() {
847 client = client.with_user(username);
848 }
849 if let Some(password) = password {
850 client = client.with_password(password);
851 }
852 } else {
853 client = client.with_url(dsn);
854 }
855 client
856}
857
858#[derive(Debug, Error)]
860pub enum PluginRunnerError {
861 #[error("clickhouse error: {0}")]
863 Clickhouse(#[from] clickhouse::error::Error),
864 #[error("firehose error at slot {slot}: {details}")]
866 Firehose {
867 details: String,
869 slot: u64,
871 },
872 #[error("plugin {plugin} failed during {stage}: {details}")]
874 PluginLifecycle {
875 plugin: &'static str,
877 stage: &'static str,
879 details: String,
881 },
882}
883
884#[derive(Clone)]
885struct PluginHandle {
886 plugin: Arc<dyn Plugin>,
887 id: u16,
888 name: &'static str,
889 version: u16,
890}
891
892impl From<Arc<dyn Plugin>> for PluginHandle {
893 fn from(plugin: Arc<dyn Plugin>) -> Self {
894 let id = plugin.id();
895 let name = plugin.name();
896 let version = plugin.version();
897 Self {
898 plugin,
899 id,
900 name,
901 version,
902 }
903 }
904}
905
906#[derive(Row, Serialize)]
907struct PluginRow<'a> {
908 id: u32,
909 name: &'a str,
910 version: u32,
911}
912
913#[derive(Row, Serialize)]
914struct PluginSlotRow {
915 plugin_id: u32,
916 slot: u64,
917}
918
919#[derive(Row, Serialize)]
920struct SlotStatusRow {
921 slot: u64,
922 transaction_count: u32,
923 vote_transaction_count: u32,
924 non_vote_transaction_count: u32,
925 thread_id: u8,
926 block_time: u32,
927}
928
929#[derive(Default, Clone, Copy)]
930struct SlotTxTally {
931 votes: u64,
932 non_votes: u64,
933}
934
935static SLOT_TX_TALLY: Lazy<DashMap<u64, SlotTxTally>> = Lazy::new(DashMap::new);
936
937async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
938 db.query(
939 r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
940 slot UInt64,
941 transaction_count UInt32 DEFAULT 0,
942 vote_transaction_count UInt32 DEFAULT 0,
943 non_vote_transaction_count UInt32 DEFAULT 0,
944 thread_id UInt8 DEFAULT 0,
945 block_time DateTime('UTC') DEFAULT toDateTime(0),
946 indexed_at DateTime('UTC') DEFAULT now()
947 ) ENGINE = ReplacingMergeTree(indexed_at)
948 ORDER BY slot"#,
949 )
950 .execute()
951 .await?;
952
953 db.query(
954 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
955 id UInt32,
956 name String,
957 version UInt32
958 ) ENGINE = ReplacingMergeTree
959 ORDER BY id"#,
960 )
961 .execute()
962 .await?;
963
964 db.query(
965 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
966 plugin_id UInt32,
967 slot UInt64,
968 indexed_at DateTime('UTC') DEFAULT now()
969 ) ENGINE = ReplacingMergeTree
970 ORDER BY (plugin_id, slot)"#,
971 )
972 .execute()
973 .await?;
974
975 Ok(())
976}
977
978async fn upsert_plugins(
979 db: &Client,
980 plugins: &[PluginHandle],
981) -> Result<(), clickhouse::error::Error> {
982 if plugins.is_empty() {
983 return Ok(());
984 }
985 let mut insert = db.insert::<PluginRow>("jetstreamer_plugins").await?;
986 for handle in plugins {
987 insert
988 .write(&PluginRow {
989 id: handle.id as u32,
990 name: handle.name,
991 version: handle.version as u32,
992 })
993 .await?;
994 }
995 insert.end().await?;
996 Ok(())
997}
998
999async fn record_plugin_slot(
1000 db: Arc<Client>,
1001 plugin_id: u16,
1002 slot: u64,
1003) -> Result<(), clickhouse::error::Error> {
1004 let mut insert = db
1005 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1006 .await?;
1007 insert
1008 .write(&PluginSlotRow {
1009 plugin_id: plugin_id as u32,
1010 slot,
1011 })
1012 .await?;
1013 insert.end().await?;
1014 Ok(())
1015}
1016
1017async fn flush_slot_buffer(
1018 db: Arc<Client>,
1019 buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
1020) -> Result<(), clickhouse::error::Error> {
1021 let mut rows = Vec::new();
1022 buffer.iter_mut().for_each(|mut entry| {
1023 if !entry.value().is_empty() {
1024 rows.append(entry.value_mut());
1025 }
1026 });
1027
1028 if rows.is_empty() {
1029 return Ok(());
1030 }
1031
1032 let mut insert = db
1033 .insert::<PluginSlotRow>("jetstreamer_plugin_slots")
1034 .await?;
1035 for row in rows {
1036 insert.write(&row).await?;
1037 }
1038 insert.end().await?;
1039 Ok(())
1040}
1041
1042async fn record_slot_status(
1043 db: Arc<Client>,
1044 slot: u64,
1045 thread_id: usize,
1046 transaction_count: u64,
1047 vote_transaction_count: u64,
1048 non_vote_transaction_count: u64,
1049 block_time: Option<i64>,
1050) -> Result<(), clickhouse::error::Error> {
1051 let mut insert = db
1052 .insert::<SlotStatusRow>("jetstreamer_slot_status")
1053 .await?;
1054 insert
1055 .write(&SlotStatusRow {
1056 slot,
1057 transaction_count: transaction_count.min(u32::MAX as u64) as u32,
1058 vote_transaction_count: vote_transaction_count.min(u32::MAX as u64) as u32,
1059 non_vote_transaction_count: non_vote_transaction_count.min(u32::MAX as u64) as u32,
1060 thread_id: thread_id.try_into().unwrap_or(u8::MAX),
1061 block_time: clamp_block_time(block_time),
1062 })
1063 .await?;
1064 insert.end().await?;
1065 Ok(())
1066}
1067
1068fn clamp_block_time(block_time: Option<i64>) -> u32 {
1069 match block_time {
1070 Some(ts) if ts > 0 && ts <= u32::MAX as i64 => ts as u32,
1071 Some(ts) if ts > u32::MAX as i64 => u32::MAX,
1072 Some(ts) if ts < 0 => 0,
1073 _ => 0,
1074 }
1075}
1076
1077fn record_slot_vote_tally(slot: u64, is_vote: bool) {
1078 let mut entry = SLOT_TX_TALLY.entry(slot).or_default();
1079 if is_vote {
1080 entry.votes = entry.votes.saturating_add(1);
1081 } else {
1082 entry.non_votes = entry.non_votes.saturating_add(1);
1083 }
1084}
1085
1086fn take_slot_tx_tally(slot: u64) -> SlotTxTally {
1087 SLOT_TX_TALLY
1088 .remove(&slot)
1089 .map(|(_, tally)| tally)
1090 .unwrap_or_default()
1091}
1092
1093trait _CanSend: Send + Sync + 'static {}
1095impl _CanSend for PluginRunnerError {}
1096
1097#[inline]
1098fn human_readable_count(value: impl Into<u128>) -> String {
1099 let digits = value.into().to_string();
1100 let len = digits.len();
1101 let mut formatted = String::with_capacity(len + len / 3);
1102 for (idx, byte) in digits.bytes().enumerate() {
1103 if idx != 0 && (len - idx) % 3 == 0 {
1104 formatted.push(',');
1105 }
1106 formatted.push(char::from(byte));
1107 }
1108 formatted
1109}
1110
1111fn human_readable_duration(seconds: f64) -> String {
1112 if !seconds.is_finite() {
1113 return "n/a".into();
1114 }
1115 if seconds <= 0.0 {
1116 return "0s".into();
1117 }
1118 if seconds < 60.0 {
1119 return format!("{:.1}s", seconds);
1120 }
1121 let duration = Duration::from_secs(seconds.round() as u64);
1122 let secs = duration.as_secs();
1123 let days = secs / 86_400;
1124 let hours = (secs % 86_400) / 3_600;
1125 let minutes = (secs % 3_600) / 60;
1126 let seconds_rem = secs % 60;
1127 if days > 0 {
1128 if hours > 0 {
1129 format!("{}d{}h", days, hours)
1130 } else {
1131 format!("{}d", days)
1132 }
1133 } else if hours > 0 {
1134 format!("{}h{}m", hours, minutes)
1135 } else {
1136 format!("{}m{}s", minutes, seconds_rem)
1137 }
1138}