use llkv_storage::pager::{IoStatsSnapshot, PagerDiagnostics};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct TablePagerIngestionSample {
pub table: String,
pub rows: usize,
pub elapsed: Duration,
pub delta: IoStatsSnapshot,
}
impl TablePagerIngestionSample {
pub fn overwrite_pct(&self) -> f64 {
self.delta.overwrite_pct()
}
pub fn puts_per_batch(&self) -> f64 {
self.delta.puts_per_batch()
}
pub fn gets_per_batch(&self) -> f64 {
self.delta.gets_per_batch()
}
pub fn fresh_mib(&self) -> f64 {
self.delta.fresh_mib()
}
pub fn overwrite_mib(&self) -> f64 {
self.delta.overwrite_mib()
}
}
#[derive(Debug)]
pub struct TablePagerIngestionDiagnostics {
pager: Arc<PagerDiagnostics>,
table_starts: Mutex<HashMap<String, IoStatsSnapshot>>,
completed: Mutex<Vec<TablePagerIngestionSample>>,
}
impl TablePagerIngestionDiagnostics {
pub fn new(pager: Arc<PagerDiagnostics>) -> Self {
Self {
pager,
table_starts: Mutex::new(HashMap::new()),
completed: Mutex::new(Vec::new()),
}
}
pub fn begin_table(&self, table: impl Into<String>) {
let mut starts = self.table_starts.lock().unwrap();
starts.insert(table.into(), self.pager.snapshot());
}
pub fn finish_table(
&self,
table: &str,
rows: usize,
elapsed: Duration,
) -> TablePagerIngestionSample {
let start_snapshot = {
let mut starts = self.table_starts.lock().unwrap();
starts
.remove(table)
.unwrap_or_else(|| self.pager.snapshot())
};
let delta = self.pager.delta_since(&start_snapshot);
let entry = TablePagerIngestionSample {
table: table.to_string(),
rows,
elapsed,
delta,
};
self.completed.lock().unwrap().push(entry.clone());
entry
}
pub fn completed_tables(&self) -> Vec<TablePagerIngestionSample> {
self.completed.lock().unwrap().clone()
}
pub fn totals(&self) -> IoStatsSnapshot {
self.pager.totals()
}
}