llkv_table/
diagnostics.rs1use llkv_storage::pager::{IoStatsSnapshot, PagerDiagnostics};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14
15#[derive(Clone, Debug)]
17pub struct TablePagerIngestionSample {
18 pub table: String,
20 pub rows: usize,
22 pub elapsed: Duration,
24 pub delta: IoStatsSnapshot,
26}
27
28impl TablePagerIngestionSample {
29 pub fn overwrite_pct(&self) -> f64 {
31 self.delta.overwrite_pct()
32 }
33
34 pub fn puts_per_batch(&self) -> f64 {
36 self.delta.puts_per_batch()
37 }
38
39 pub fn gets_per_batch(&self) -> f64 {
41 self.delta.gets_per_batch()
42 }
43
44 pub fn fresh_mib(&self) -> f64 {
46 self.delta.fresh_mib()
47 }
48
49 pub fn overwrite_mib(&self) -> f64 {
51 self.delta.overwrite_mib()
52 }
53}
54
55#[derive(Debug)]
57pub struct TablePagerIngestionDiagnostics {
58 pager: Arc<PagerDiagnostics>,
59 table_starts: Mutex<HashMap<String, IoStatsSnapshot>>,
60 completed: Mutex<Vec<TablePagerIngestionSample>>,
61}
62
63impl TablePagerIngestionDiagnostics {
64 pub fn new(pager: Arc<PagerDiagnostics>) -> Self {
66 Self {
67 pager,
68 table_starts: Mutex::new(HashMap::new()),
69 completed: Mutex::new(Vec::new()),
70 }
71 }
72
73 pub fn begin_table(&self, table: impl Into<String>) {
75 let mut starts = self.table_starts.lock().unwrap();
76 starts.insert(table.into(), self.pager.snapshot());
77 }
78
79 pub fn finish_table(
81 &self,
82 table: &str,
83 rows: usize,
84 elapsed: Duration,
85 ) -> TablePagerIngestionSample {
86 let start_snapshot = {
87 let mut starts = self.table_starts.lock().unwrap();
88 starts
89 .remove(table)
90 .unwrap_or_else(|| self.pager.snapshot())
91 };
92 let delta = self.pager.delta_since(&start_snapshot);
93 let entry = TablePagerIngestionSample {
94 table: table.to_string(),
95 rows,
96 elapsed,
97 delta,
98 };
99 self.completed.lock().unwrap().push(entry.clone());
100 entry
101 }
102
103 pub fn completed_tables(&self) -> Vec<TablePagerIngestionSample> {
105 self.completed.lock().unwrap().clone()
106 }
107
108 pub fn totals(&self) -> IoStatsSnapshot {
110 self.pager.totals()
111 }
112}