llkv_table/
diagnostics.rs

1//! Table-level pager diagnostics built on top of pager statistics.
2//!
3//! - [`llkv_storage::pager::PagerDiagnostics`] captures raw [`IoStatsSnapshot`] values.
4//! - This module wraps that handle so table loaders can bracket ingest work with
5//!   [`TablePagerIngestionDiagnostics::begin_table`] / [`TablePagerIngestionDiagnostics::finish_table`] and receive
6//!   a [`TablePagerIngestionSample`] (table-scoped snapshot delta).
7//! - Consumers format `TablePagerIngestionSample` for reporting, while [`TablePagerIngestionDiagnostics`]
8//!   keeps the mutex bookkeeping out of the application layer.
9
10use llkv_storage::pager::{IoStatsSnapshot, PagerDiagnostics};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14
15/// Captures pager I/O metrics for a single table ingest span.
16#[derive(Clone, Debug)]
17pub struct TablePagerIngestionSample {
18    /// Logical table name for the recorded span.
19    pub table: String,
20    /// Rows written while the table ingest was active.
21    pub rows: usize,
22    /// Wall-clock duration of the ingest span.
23    pub elapsed: Duration,
24    /// Pager I/O deltas collected during the ingest span.
25    pub delta: IoStatsSnapshot,
26}
27
28impl TablePagerIngestionSample {
29    /// Overwrite percentage (0-100) derived from [`IoStatsSnapshot::overwritten_put_bytes`].
30    pub fn overwrite_pct(&self) -> f64 {
31        self.delta.overwrite_pct()
32    }
33
34    /// Average physical put operations per batch for this table ingest.
35    pub fn puts_per_batch(&self) -> f64 {
36        self.delta.puts_per_batch()
37    }
38
39    /// Average physical get operations per batch for this table ingest.
40    pub fn gets_per_batch(&self) -> f64 {
41        self.delta.gets_per_batch()
42    }
43
44    /// Fresh bytes written during this ingest converted to mebibytes.
45    pub fn fresh_mib(&self) -> f64 {
46        self.delta.fresh_mib()
47    }
48
49    /// Overwrite bytes written during this ingest converted to mebibytes.
50    pub fn overwrite_mib(&self) -> f64 {
51        self.delta.overwrite_mib()
52    }
53}
54
55/// Tracks per-table pager spans over the course of a loader run.
56#[derive(Debug)]
57pub struct TablePagerIngestionDiagnostics {
58    pager: Arc<PagerDiagnostics>,
59    table_starts: Mutex<HashMap<String, IoStatsSnapshot>>,
60    completed: Mutex<Vec<TablePagerIngestionSample>>,
61}
62
63impl TablePagerIngestionDiagnostics {
64    /// Create diagnostics bound to the provided pager statistics helper.
65    pub fn new(pager: Arc<PagerDiagnostics>) -> Self {
66        Self {
67            pager,
68            table_starts: Mutex::new(HashMap::new()),
69            completed: Mutex::new(Vec::new()),
70        }
71    }
72
73    /// Record the start snapshot for a table ingest span.
74    pub fn begin_table(&self, table: impl Into<String>) {
75        let mut starts = self.table_starts.lock().unwrap();
76        starts.insert(table.into(), self.pager.snapshot());
77    }
78
79    /// Complete a table ingest span and store the resulting diagnostics entry.
80    pub fn finish_table(
81        &self,
82        table: &str,
83        rows: usize,
84        elapsed: Duration,
85    ) -> TablePagerIngestionSample {
86        let start_snapshot = {
87            let mut starts = self.table_starts.lock().unwrap();
88            starts
89                .remove(table)
90                .unwrap_or_else(|| self.pager.snapshot())
91        };
92        let delta = self.pager.delta_since(&start_snapshot);
93        let entry = TablePagerIngestionSample {
94            table: table.to_string(),
95            rows,
96            elapsed,
97            delta,
98        };
99        self.completed.lock().unwrap().push(entry.clone());
100        entry
101    }
102
103    /// Inspect the completed per-table diagnostics.
104    pub fn completed_tables(&self) -> Vec<TablePagerIngestionSample> {
105        self.completed.lock().unwrap().clone()
106    }
107
108    /// Return the cumulative pager totals captured during the run.
109    pub fn totals(&self) -> IoStatsSnapshot {
110        self.pager.totals()
111    }
112}