1#![deny(missing_docs)]
2pub mod plugins;
110
111const LOG_MODULE: &str = "jetstreamer::runner";
112
113use std::{
114 fmt::Display,
115 future::Future,
116 ops::Range,
117 pin::Pin,
118 sync::{
119 Arc, Mutex,
120 atomic::{AtomicBool, AtomicU64, Ordering},
121 },
122 time::Duration,
123};
124
125use clickhouse::{Client, Row};
126use dashmap::DashMap;
127use futures_util::FutureExt;
128use jetstreamer_firehose::firehose::{
129 BlockData, EntryData, RewardsData, Stats, StatsTracking, TransactionData, firehose,
130};
131use serde::Serialize;
132use sha2::{Digest, Sha256};
133use thiserror::Error;
134use tokio::{signal, sync::broadcast};
135
136#[derive(Clone, Copy)]
137struct Snapshot {
138 time: std::time::Instant,
139 slots: u64,
140 txs: u64,
141}
142
143#[derive(Clone, Copy, Default)]
144struct Contribution {
145 slot_delta: u64,
146 tx_delta: u64,
147 dt: f64,
148}
149
150#[derive(Clone, Copy, Default)]
151struct Rates {
152 slot_rate: f64,
153 tx_rate: f64,
154}
155
156#[derive(Default)]
157struct SnapshotWindow {
158 entries: [Option<Contribution>; 5],
159 idx: usize,
160 len: usize,
161 last: Option<Snapshot>,
162}
163
164impl SnapshotWindow {
165 fn update(&mut self, snapshot: Snapshot) -> Option<Rates> {
166 if let Some(prev) = self.last
167 && let Some(dt) = snapshot
168 .time
169 .checked_duration_since(prev.time)
170 .map(|d| d.as_secs_f64())
171 && dt > 0.0
172 {
173 let contrib = Contribution {
174 slot_delta: snapshot.slots.saturating_sub(prev.slots),
175 tx_delta: snapshot.txs.saturating_sub(prev.txs),
176 dt,
177 };
178 self.entries[self.idx] = Some(contrib);
179 self.idx = (self.idx + 1) % self.entries.len();
180 if self.len < self.entries.len() {
181 self.len += 1;
182 }
183 }
184 self.last = Some(snapshot);
185
186 let mut total_slots = 0u64;
187 let mut total_txs = 0u64;
188 let mut total_dt = 0.0;
189 for entry in self.entries.iter().flatten() {
190 total_slots = total_slots.saturating_add(entry.slot_delta);
191 total_txs = total_txs.saturating_add(entry.tx_delta);
192 total_dt += entry.dt;
193 }
194
195 if self.len < self.entries.len() || total_dt <= 0.0 {
196 return None;
197 }
198
199 Some(Rates {
200 slot_rate: total_slots as f64 / total_dt,
201 tx_rate: total_txs as f64 / total_dt,
202 })
203 }
204}
205
206pub use jetstreamer_firehose::firehose::{Stats as FirehoseStats, ThreadStats};
208
209pub type PluginFuture<'a> = Pin<
211 Box<
212 dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>
213 + Send
214 + 'a,
215 >,
216>;
217
218pub trait Plugin: Send + Sync + 'static {
222 fn name(&self) -> &'static str;
224
225 fn version(&self) -> u16 {
227 1
228 }
229
230 fn id(&self) -> u16 {
232 let hash = Sha256::digest(self.name());
233 let mut res = 1u16;
234 for byte in hash {
235 res = res.wrapping_mul(31).wrapping_add(byte as u16);
236 }
237 res
238 }
239
240 fn on_transaction<'a>(
242 &'a self,
243 _thread_id: usize,
244 _db: Option<Arc<Client>>,
245 _transaction: &'a TransactionData,
246 ) -> PluginFuture<'a> {
247 async move { Ok(()) }.boxed()
248 }
249
250 fn on_block<'a>(
252 &'a self,
253 _thread_id: usize,
254 _db: Option<Arc<Client>>,
255 _block: &'a BlockData,
256 ) -> PluginFuture<'a> {
257 async move { Ok(()) }.boxed()
258 }
259
260 fn on_entry<'a>(
262 &'a self,
263 _thread_id: usize,
264 _db: Option<Arc<Client>>,
265 _entry: &'a EntryData,
266 ) -> PluginFuture<'a> {
267 async move { Ok(()) }.boxed()
268 }
269
270 fn on_reward<'a>(
272 &'a self,
273 _thread_id: usize,
274 _db: Option<Arc<Client>>,
275 _reward: &'a RewardsData,
276 ) -> PluginFuture<'a> {
277 async move { Ok(()) }.boxed()
278 }
279
280 fn on_load(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
282 async move { Ok(()) }.boxed()
283 }
284
285 fn on_exit(&self, _db: Option<Arc<Client>>) -> PluginFuture<'_> {
287 async move { Ok(()) }.boxed()
288 }
289}
290
291#[derive(Clone)]
295pub struct PluginRunner {
296 plugins: Arc<Vec<Arc<dyn Plugin>>>,
297 clickhouse_dsn: String,
298 num_threads: usize,
299 db_update_interval_slots: u64,
300}
301
302impl PluginRunner {
303 pub fn new(clickhouse_dsn: impl Display, num_threads: usize) -> Self {
305 Self {
306 plugins: Arc::new(Vec::new()),
307 clickhouse_dsn: clickhouse_dsn.to_string(),
308 num_threads: std::cmp::max(1, num_threads),
309 db_update_interval_slots: 100,
310 }
311 }
312
313 pub fn register(&mut self, plugin: Box<dyn Plugin>) {
315 Arc::get_mut(&mut self.plugins)
316 .expect("cannot register plugins after the runner has started")
317 .push(Arc::from(plugin));
318 }
319
320 pub async fn run(
322 self: Arc<Self>,
323 slot_range: Range<u64>,
324 clickhouse_enabled: bool,
325 ) -> Result<(), PluginRunnerError> {
326 let db_update_interval = self.db_update_interval_slots.max(1);
327 let plugin_handles: Arc<Vec<PluginHandle>> = Arc::new(
328 self.plugins
329 .iter()
330 .cloned()
331 .map(PluginHandle::from)
332 .collect(),
333 );
334
335 let clickhouse = if clickhouse_enabled {
336 let client = Arc::new(
337 Client::default()
338 .with_url(&self.clickhouse_dsn)
339 .with_option("async_insert", "1")
340 .with_option("wait_for_async_insert", "0"),
341 );
342 ensure_clickhouse_tables(client.as_ref()).await?;
343 upsert_plugins(client.as_ref(), plugin_handles.as_ref()).await?;
344 Some(client)
345 } else {
346 None
347 };
348
349 for handle in plugin_handles.iter() {
350 if let Err(error) = handle
351 .plugin
352 .on_load(clickhouse.clone())
353 .await
354 .map_err(|e| e.to_string())
355 {
356 return Err(PluginRunnerError::PluginLifecycle {
357 plugin: handle.name,
358 stage: "on_load",
359 details: error,
360 });
361 }
362 }
363
364 let shutting_down = Arc::new(AtomicBool::new(false));
365 let slot_buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>> = Arc::new(DashMap::new());
366 let clickhouse_enabled = clickhouse.is_some();
367 let slots_since_flush = Arc::new(AtomicU64::new(0));
368
369 let on_block = {
370 let plugin_handles = plugin_handles.clone();
371 let clickhouse = clickhouse.clone();
372 let slot_buffer = slot_buffer.clone();
373 let slots_since_flush = slots_since_flush.clone();
374 let shutting_down = shutting_down.clone();
375 move |thread_id: usize, block: BlockData| {
376 let plugin_handles = plugin_handles.clone();
377 let clickhouse = clickhouse.clone();
378 let slot_buffer = slot_buffer.clone();
379 let slots_since_flush = slots_since_flush.clone();
380 let shutting_down = shutting_down.clone();
381 async move {
382 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
383 if plugin_handles.is_empty() {
384 return Ok(());
385 }
386 if shutting_down.load(Ordering::SeqCst) {
387 log::debug!(
388 target: &log_target,
389 "ignoring block while shutdown is in progress"
390 );
391 return Ok(());
392 }
393 let block = Arc::new(block);
394 for handle in plugin_handles.iter() {
395 let db = clickhouse.clone();
396 if let Err(err) = handle
397 .plugin
398 .on_block(thread_id, db.clone(), block.as_ref())
399 .await
400 {
401 log::error!(
402 target: &log_target,
403 "plugin {} on_block error: {}",
404 handle.name,
405 err
406 );
407 continue;
408 }
409 if let (Some(db_client), BlockData::Block { slot, .. }) =
410 (clickhouse.clone(), block.as_ref())
411 {
412 if clickhouse_enabled {
413 slot_buffer
414 .entry(handle.id)
415 .or_default()
416 .push(PluginSlotRow {
417 plugin_id: handle.id as u32,
418 slot: *slot,
419 });
420 } else if let Err(err) =
421 record_plugin_slot(db_client, handle.id, *slot).await
422 {
423 log::error!(
424 target: &log_target,
425 "failed to record plugin slot for {}: {}",
426 handle.name,
427 err
428 );
429 }
430 }
431 }
432 if clickhouse_enabled {
433 let current = slots_since_flush
434 .fetch_add(1, Ordering::Relaxed)
435 .wrapping_add(1);
436 if current.is_multiple_of(db_update_interval)
437 && let Some(db_client) = clickhouse.clone()
438 {
439 let buffer = slot_buffer.clone();
440 let log_target_clone = log_target.clone();
441 tokio::spawn(async move {
442 if let Err(err) = flush_slot_buffer(db_client, buffer).await {
443 log::error!(
444 target: &log_target_clone,
445 "failed to flush buffered plugin slots: {}",
446 err
447 );
448 }
449 });
450 }
451 }
452 if let Some(db_client) = clickhouse.clone() {
453 match block.as_ref() {
454 BlockData::Block {
455 slot,
456 executed_transaction_count,
457 ..
458 } => {
459 if let Err(err) = record_slot_status(
460 db_client,
461 *slot,
462 thread_id,
463 *executed_transaction_count,
464 )
465 .await
466 {
467 log::error!(
468 target: &log_target,
469 "failed to record slot status: {}",
470 err
471 );
472 }
473 }
474 BlockData::LeaderSkipped { slot } => {
475 if let Err(err) =
476 record_slot_status(db_client, *slot, thread_id, 0).await
477 {
478 log::error!(
479 target: &log_target,
480 "failed to record slot status: {}",
481 err
482 );
483 }
484 }
485 }
486 }
487 Ok(())
488 }
489 .boxed()
490 }
491 };
492
493 let on_transaction = {
494 let plugin_handles = plugin_handles.clone();
495 let clickhouse = clickhouse.clone();
496 let shutting_down = shutting_down.clone();
497 move |thread_id: usize, transaction: TransactionData| {
498 let plugin_handles = plugin_handles.clone();
499 let clickhouse = clickhouse.clone();
500 let shutting_down = shutting_down.clone();
501 async move {
502 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
503 if plugin_handles.is_empty() {
504 return Ok(());
505 }
506 if shutting_down.load(Ordering::SeqCst) {
507 log::debug!(
508 target: &log_target,
509 "ignoring transaction while shutdown is in progress"
510 );
511 return Ok(());
512 }
513 let transaction = Arc::new(transaction);
514 for handle in plugin_handles.iter() {
515 if let Err(err) = handle
516 .plugin
517 .on_transaction(thread_id, clickhouse.clone(), transaction.as_ref())
518 .await
519 {
520 log::error!(
521 target: &log_target,
522 "plugin {} on_transaction error: {}",
523 handle.name,
524 err
525 );
526 }
527 }
528 Ok(())
529 }
530 .boxed()
531 }
532 };
533
534 let on_entry = {
535 let plugin_handles = plugin_handles.clone();
536 let clickhouse = clickhouse.clone();
537 let shutting_down = shutting_down.clone();
538 move |thread_id: usize, entry: EntryData| {
539 let plugin_handles = plugin_handles.clone();
540 let clickhouse = clickhouse.clone();
541 let shutting_down = shutting_down.clone();
542 async move {
543 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
544 if plugin_handles.is_empty() {
545 return Ok(());
546 }
547 if shutting_down.load(Ordering::SeqCst) {
548 log::debug!(
549 target: &log_target,
550 "ignoring entry while shutdown is in progress"
551 );
552 return Ok(());
553 }
554 let entry = Arc::new(entry);
555 for handle in plugin_handles.iter() {
556 if let Err(err) = handle
557 .plugin
558 .on_entry(thread_id, clickhouse.clone(), entry.as_ref())
559 .await
560 {
561 log::error!(
562 target: &log_target,
563 "plugin {} on_entry error: {}",
564 handle.name,
565 err
566 );
567 }
568 }
569 Ok(())
570 }
571 .boxed()
572 }
573 };
574
575 let on_reward = {
576 let plugin_handles = plugin_handles.clone();
577 let clickhouse = clickhouse.clone();
578 let shutting_down = shutting_down.clone();
579 move |thread_id: usize, reward: RewardsData| {
580 let plugin_handles = plugin_handles.clone();
581 let clickhouse = clickhouse.clone();
582 let shutting_down = shutting_down.clone();
583 async move {
584 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
585 if plugin_handles.is_empty() {
586 return Ok(());
587 }
588 if shutting_down.load(Ordering::SeqCst) {
589 log::debug!(
590 target: &log_target,
591 "ignoring reward while shutdown is in progress"
592 );
593 return Ok(());
594 }
595 let reward = Arc::new(reward);
596 for handle in plugin_handles.iter() {
597 if let Err(err) = handle
598 .plugin
599 .on_reward(thread_id, clickhouse.clone(), reward.as_ref())
600 .await
601 {
602 log::error!(
603 target: &log_target,
604 "plugin {} on_reward error: {}",
605 handle.name,
606 err
607 );
608 }
609 }
610 Ok(())
611 }
612 .boxed()
613 }
614 };
615
616 let total_slot_count = slot_range.end.saturating_sub(slot_range.start);
617
618 let total_slot_count_capture = total_slot_count;
619 let stats_tracking = clickhouse.clone().map(|_db| {
620 let shutting_down = shutting_down.clone();
621 let last_snapshot: Arc<Mutex<SnapshotWindow>> =
622 Arc::new(Mutex::new(SnapshotWindow::default()));
623 StatsTracking {
624 on_stats: {
625 let last_snapshot = last_snapshot.clone();
626 let total_slot_count = total_slot_count_capture;
627 move |thread_id: usize, stats: Stats| {
628 let shutting_down = shutting_down.clone();
629 let last_snapshot = last_snapshot.clone();
630 async move {
631 let log_target = format!("{}::T{:03}", LOG_MODULE, thread_id);
632 if shutting_down.load(Ordering::SeqCst) {
633 log::debug!(
634 target: &log_target,
635 "skipping stats write during shutdown"
636 );
637 return Ok(());
638 }
639 let finish_at = stats
640 .finish_time
641 .unwrap_or_else(std::time::Instant::now);
642 let elapsed = finish_at.saturating_duration_since(stats.start_time);
643 let elapsed_secs = elapsed.as_secs_f64();
644 let mut tps = if elapsed_secs > 0.0 {
645 stats.transactions_processed as f64 / elapsed_secs
646 } else {
647 0.0
648 };
649 let thread_stats = &stats.thread_stats;
650 let processed_slots = stats.slots_processed.min(total_slot_count);
651 let progress_fraction = if total_slot_count > 0 {
652 processed_slots as f64 / total_slot_count as f64
653 } else {
654 1.0
655 };
656 let overall_progress = (progress_fraction * 100.0).clamp(0.0, 100.0);
657 let thread_total_slots = thread_stats
658 .slot_range
659 .end
660 .saturating_sub(thread_stats.slot_range.start);
661 let thread_progress = if thread_total_slots > 0 {
662 (thread_stats.slots_processed as f64 / thread_total_slots as f64)
663 .clamp(0.0, 1.0)
664 * 100.0
665 } else {
666 100.0
667 };
668 let mut overall_eta = None;
669 if let Ok(mut window) = last_snapshot.lock()
670 && let Some(rates) = window.update(Snapshot {
671 time: finish_at,
672 slots: stats.slots_processed,
673 txs: stats.transactions_processed,
674 }) {
675 tps = rates.tx_rate;
676 let remaining_slots =
677 total_slot_count.saturating_sub(processed_slots);
678 if rates.slot_rate > 0.0 && remaining_slots > 0 {
679 overall_eta = Some(human_readable_duration(
680 remaining_slots as f64 / rates.slot_rate,
681 ));
682 }
683 }
684 if overall_eta.is_none() {
685 if progress_fraction > 0.0 && progress_fraction < 1.0 {
686 if let Some(elapsed_total) = finish_at
687 .checked_duration_since(stats.start_time)
688 .map(|d| d.as_secs_f64())
689 && elapsed_total > 0.0 {
690 let remaining_secs =
691 elapsed_total * (1.0 / progress_fraction - 1.0);
692 overall_eta = Some(human_readable_duration(remaining_secs));
693 }
694 } else if progress_fraction >= 1.0 {
695 overall_eta = Some("0s".into());
696 }
697 }
698 let slots_display = human_readable_count(processed_slots);
699 let blocks_display = human_readable_count(stats.blocks_processed);
700 let txs_display = human_readable_count(stats.transactions_processed);
701 let tps_display = human_readable_count(tps.ceil() as u64);
702 log::info!(
703 target: &log_target,
704 "{overall_progress:.1}% | ETA: {} | {tps_display} TPS | {slots_display} slots | {blocks_display} blocks | {txs_display} txs | thread: {thread_progress:.1}%",
705 overall_eta.unwrap_or_else(|| "n/a".into()),
706 );
707 Ok(())
708 }
709 .boxed()
710 }
711 },
712 tracking_interval_slots: 100,
713 }
714 });
715
716 let (shutdown_tx, _) = broadcast::channel::<()>(1);
717
718 let mut firehose_future = Box::pin(firehose(
719 self.num_threads as u64,
720 slot_range,
721 Some(on_block),
722 Some(on_transaction),
723 Some(on_entry),
724 Some(on_reward),
725 stats_tracking,
726 Some(shutdown_tx.subscribe()),
727 ));
728
729 let firehose_result = tokio::select! {
730 res = &mut firehose_future => res,
731 ctrl = signal::ctrl_c() => {
732 match ctrl {
733 Ok(()) => log::info!(
734 target: LOG_MODULE,
735 "CTRL+C received; initiating shutdown"
736 ),
737 Err(err) => log::error!(
738 target: LOG_MODULE,
739 "failed to listen for CTRL+C: {}",
740 err
741 ),
742 }
743 shutting_down.store(true, Ordering::SeqCst);
744 let _ = shutdown_tx.send(());
745 firehose_future.await
746 }
747 };
748
749 if clickhouse_enabled
750 && let Some(db_client) = clickhouse.clone()
751 && let Err(err) = flush_slot_buffer(db_client, slot_buffer.clone()).await
752 {
753 log::error!(
754 target: LOG_MODULE,
755 "failed to flush buffered plugin slots: {}",
756 err
757 );
758 }
759
760 for handle in plugin_handles.iter() {
761 if let Err(error) = handle
762 .plugin
763 .on_exit(clickhouse.clone())
764 .await
765 .map_err(|e| e.to_string())
766 {
767 log::error!(
768 target: LOG_MODULE,
769 "plugin {} on_exit error: {}",
770 handle.name,
771 error
772 );
773 }
774 }
775
776 match firehose_result {
777 Ok(()) => Ok(()),
778 Err((error, slot)) => Err(PluginRunnerError::Firehose {
779 details: error.to_string(),
780 slot,
781 }),
782 }
783 }
784}
785
786#[derive(Debug, Error)]
788pub enum PluginRunnerError {
789 #[error("clickhouse error: {0}")]
791 Clickhouse(#[from] clickhouse::error::Error),
792 #[error("firehose error at slot {slot}: {details}")]
794 Firehose {
795 details: String,
797 slot: u64,
799 },
800 #[error("plugin {plugin} failed during {stage}: {details}")]
802 PluginLifecycle {
803 plugin: &'static str,
805 stage: &'static str,
807 details: String,
809 },
810}
811
812#[derive(Clone)]
813struct PluginHandle {
814 plugin: Arc<dyn Plugin>,
815 id: u16,
816 name: &'static str,
817 version: u16,
818}
819
820impl From<Arc<dyn Plugin>> for PluginHandle {
821 fn from(plugin: Arc<dyn Plugin>) -> Self {
822 let id = plugin.id();
823 let name = plugin.name();
824 let version = plugin.version();
825 Self {
826 plugin,
827 id,
828 name,
829 version,
830 }
831 }
832}
833
834#[derive(Row, Serialize)]
835struct PluginRow<'a> {
836 id: u32,
837 name: &'a str,
838 version: u32,
839}
840
841#[derive(Row, Serialize)]
842struct PluginSlotRow {
843 plugin_id: u32,
844 slot: u64,
845}
846
847#[derive(Row, Serialize)]
848struct SlotStatusRow {
849 slot: u64,
850 transaction_count: u32,
851 thread_id: u8,
852}
853
854async fn ensure_clickhouse_tables(db: &Client) -> Result<(), clickhouse::error::Error> {
855 db.query(
856 r#"CREATE TABLE IF NOT EXISTS jetstreamer_slot_status (
857 slot UInt64,
858 transaction_count UInt32 DEFAULT 0,
859 thread_id UInt8 DEFAULT 0,
860 indexed_at DateTime('UTC') DEFAULT now()
861 ) ENGINE = ReplacingMergeTree
862 ORDER BY (slot, thread_id)"#,
863 )
864 .execute()
865 .await?;
866
867 db.query(
868 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugins (
869 id UInt32,
870 name String,
871 version UInt32
872 ) ENGINE = ReplacingMergeTree
873 ORDER BY id"#,
874 )
875 .execute()
876 .await?;
877
878 db.query(
879 r#"CREATE TABLE IF NOT EXISTS jetstreamer_plugin_slots (
880 plugin_id UInt32,
881 slot UInt64,
882 indexed_at DateTime('UTC') DEFAULT now()
883 ) ENGINE = ReplacingMergeTree
884 ORDER BY (plugin_id, slot)"#,
885 )
886 .execute()
887 .await?;
888
889 Ok(())
890}
891
892async fn upsert_plugins(
893 db: &Client,
894 plugins: &[PluginHandle],
895) -> Result<(), clickhouse::error::Error> {
896 if plugins.is_empty() {
897 return Ok(());
898 }
899 let mut insert = db.insert("jetstreamer_plugins")?;
900 for handle in plugins {
901 insert
902 .write(&PluginRow {
903 id: handle.id as u32,
904 name: handle.name,
905 version: handle.version as u32,
906 })
907 .await?;
908 }
909 insert.end().await?;
910 Ok(())
911}
912
913async fn record_plugin_slot(
914 db: Arc<Client>,
915 plugin_id: u16,
916 slot: u64,
917) -> Result<(), clickhouse::error::Error> {
918 let mut insert = db.insert("jetstreamer_plugin_slots")?;
919 insert
920 .write(&PluginSlotRow {
921 plugin_id: plugin_id as u32,
922 slot,
923 })
924 .await?;
925 insert.end().await?;
926 Ok(())
927}
928
929async fn flush_slot_buffer(
930 db: Arc<Client>,
931 buffer: Arc<DashMap<u16, Vec<PluginSlotRow>>>,
932) -> Result<(), clickhouse::error::Error> {
933 let mut rows = Vec::new();
934 buffer.iter_mut().for_each(|mut entry| {
935 if !entry.value().is_empty() {
936 rows.append(entry.value_mut());
937 }
938 });
939
940 if rows.is_empty() {
941 return Ok(());
942 }
943
944 let mut insert = db.insert("jetstreamer_plugin_slots")?;
945 for row in rows {
946 insert.write(&row).await?;
947 }
948 insert.end().await?;
949 Ok(())
950}
951
952async fn record_slot_status(
953 db: Arc<Client>,
954 slot: u64,
955 thread_id: usize,
956 transaction_count: u64,
957) -> Result<(), clickhouse::error::Error> {
958 let mut insert = db.insert("jetstreamer_slot_status")?;
959 insert
960 .write(&SlotStatusRow {
961 slot,
962 transaction_count: transaction_count.min(u32::MAX as u64) as u32,
963 thread_id: thread_id.try_into().unwrap_or(u8::MAX),
964 })
965 .await?;
966 insert.end().await?;
967 Ok(())
968}
969
970trait _CanSend: Send + Sync + 'static {}
972impl _CanSend for PluginRunnerError {}
973
974#[inline]
975fn human_readable_count(value: impl Into<u128>) -> String {
976 let digits = value.into().to_string();
977 let len = digits.len();
978 let mut formatted = String::with_capacity(len + len / 3);
979 for (idx, byte) in digits.bytes().enumerate() {
980 if idx != 0 && (len - idx) % 3 == 0 {
981 formatted.push(',');
982 }
983 formatted.push(char::from(byte));
984 }
985 formatted
986}
987
988fn human_readable_duration(seconds: f64) -> String {
989 if !seconds.is_finite() {
990 return "n/a".into();
991 }
992 if seconds <= 0.0 {
993 return "0s".into();
994 }
995 if seconds < 60.0 {
996 return format!("{:.1}s", seconds);
997 }
998 let duration = Duration::from_secs(seconds.round() as u64);
999 let secs = duration.as_secs();
1000 let days = secs / 86_400;
1001 let hours = (secs % 86_400) / 3_600;
1002 let minutes = (secs % 3_600) / 60;
1003 let seconds_rem = secs % 60;
1004 if days > 0 {
1005 if hours > 0 {
1006 format!("{}d{}h", days, hours)
1007 } else {
1008 format!("{}d", days)
1009 }
1010 } else if hours > 0 {
1011 format!("{}h{}m", hours, minutes)
1012 } else {
1013 format!("{}m{}s", minutes, seconds_rem)
1014 }
1015}