use crate::environment::build_environment_stats;
use noxu_dbi::EnvironmentImpl;
use noxu_log::LogManager;
use noxu_sync::Mutex;
use noxu_util::daemon::DaemonThread;
use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
const STATS_FILE_STEM: &str = "noxu.stat";
const STATS_FILE_EXT: &str = "csv";
const CSV_HEADER: &str = "time_ms,cache_size,cache_usage,n_databases,\
n_log_fsyncs,n_fsync_requests,n_group_commits,\
lock_n_requests,lock_n_waits,lock_n_total_locks,\
txn_n_begins,txn_n_commits,txn_n_aborts,txn_n_active\n";
pub struct StatsFileDumper {
daemon: DaemonThread,
}
impl StatsFileDumper {
pub fn start(
env_impl: Arc<Mutex<EnvironmentImpl>>,
log_manager: Option<Arc<LogManager>>,
cache_size: u64,
dir: PathBuf,
interval: Duration,
row_count: u32,
max_files: u32,
) -> Self {
let writer = Arc::new(Mutex::new(RotatingWriter::new(
dir,
row_count.max(1),
max_files.max(1),
)));
writer.lock().seed_next_index();
let seq = Arc::new(AtomicU64::new(0));
let daemon =
DaemonThread::spawn("noxu-stats-file", interval, move || {
let stats = {
let guard = match env_impl.try_lock() {
Some(g) => g,
None => return true,
};
build_environment_stats(
&guard,
log_manager.as_deref(),
cache_size,
)
};
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let _ = seq.fetch_add(1, Ordering::Relaxed);
let row = format_row(now_ms, &stats);
if let Err(e) = writer.lock().write_row(&row) {
log::warn!("stats-file dump failed: {e}");
}
true
});
StatsFileDumper { daemon }
}
pub fn stop(self) {
self.daemon.shutdown();
}
}
fn format_row(now_ms: u64, s: &noxu_engine::EnvironmentStats) -> String {
format!(
"{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n",
now_ms,
s.cache_size,
s.cache_usage,
s.n_databases,
s.log.n_log_fsyncs,
s.log.n_fsync_requests,
s.log.n_group_commits,
s.lock.n_requests,
s.lock.n_waits,
s.lock.n_total_locks,
s.txn.n_begins,
s.txn.n_commits,
s.txn.n_aborts,
s.txn.n_active,
)
}
struct RotatingWriter {
dir: PathBuf,
row_count: u32,
max_files: u32,
file_index: u64,
rows_in_current: u32,
current: Option<File>,
}
impl RotatingWriter {
fn new(dir: PathBuf, row_count: u32, max_files: u32) -> Self {
RotatingWriter {
dir,
row_count,
max_files,
file_index: 0,
rows_in_current: 0,
current: None,
}
}
fn path_for(&self, index: u64) -> PathBuf {
self.dir.join(format!("{STATS_FILE_STEM}.{index}.{STATS_FILE_EXT}"))
}
fn seed_next_index(&mut self) {
let mut max_seen: Option<u64> = None;
if let Ok(entries) = fs::read_dir(&self.dir) {
for e in entries.flatten() {
if let Some(idx) = parse_stats_index(&e.path()) {
max_seen = Some(max_seen.map_or(idx, |m| m.max(idx)));
}
}
}
if let Some(m) = max_seen {
self.file_index = m + 1;
}
}
fn open_current(&mut self) -> std::io::Result<()> {
fs::create_dir_all(&self.dir)?;
let path = self.path_for(self.file_index);
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&path)?;
f.write_all(CSV_HEADER.as_bytes())?;
self.current = Some(f);
self.rows_in_current = 0;
Ok(())
}
fn write_row(&mut self, row: &str) -> std::io::Result<()> {
if self.current.is_none() {
self.open_current()?;
} else if self.rows_in_current >= self.row_count {
self.file_index += 1;
self.open_current()?;
self.prune_old_files();
}
if let Some(f) = self.current.as_mut() {
f.write_all(row.as_bytes())?;
f.flush()?;
self.rows_in_current += 1;
}
Ok(())
}
fn prune_old_files(&self) {
let mut indices: Vec<u64> = Vec::new();
if let Ok(entries) = fs::read_dir(&self.dir) {
for e in entries.flatten() {
if let Some(idx) = parse_stats_index(&e.path()) {
indices.push(idx);
}
}
}
indices.sort_unstable();
let keep = self.max_files as usize;
if indices.len() > keep {
for idx in &indices[..indices.len() - keep] {
let _ = fs::remove_file(self.path_for(*idx));
}
}
}
}
fn parse_stats_index(path: &Path) -> Option<u64> {
let name = path.file_name()?.to_str()?;
let rest = name.strip_prefix(&format!("{STATS_FILE_STEM}."))?;
let idx_str = rest.strip_suffix(&format!(".{STATS_FILE_EXT}"))?;
idx_str.parse::<u64>().ok()
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn parse_index_roundtrip() {
let p = PathBuf::from("/x/noxu.stat.7.csv");
assert_eq!(parse_stats_index(&p), Some(7));
assert_eq!(parse_stats_index(&PathBuf::from("/x/other.csv")), None);
assert_eq!(
parse_stats_index(&PathBuf::from("/x/noxu.stat.notanum.csv")),
None
);
}
#[test]
fn rotates_after_row_count_and_prunes() {
let tmp = TempDir::new().unwrap();
let mut w = RotatingWriter::new(tmp.path().to_path_buf(), 2, 2);
for i in 0..6 {
w.write_row(&format!("{i},0,0,0,0,0,0,0,0,0,0,0,0,0\n")).unwrap();
}
let mut files: Vec<u64> = fs::read_dir(tmp.path())
.unwrap()
.flatten()
.filter_map(|e| parse_stats_index(&e.path()))
.collect();
files.sort_unstable();
assert_eq!(files, vec![1, 2], "should retain only the newest 2 files");
let last =
fs::read_to_string(tmp.path().join("noxu.stat.2.csv")).unwrap();
assert!(last.starts_with("time_ms,"), "header present");
assert_eq!(
last.lines().count(),
1 + 2, "last file: header + 2 rows"
);
}
#[test]
fn seed_next_index_resumes_past_existing() {
let tmp = TempDir::new().unwrap();
fs::write(tmp.path().join("noxu.stat.4.csv"), b"x").unwrap();
let mut w = RotatingWriter::new(tmp.path().to_path_buf(), 10, 10);
w.seed_next_index();
assert_eq!(w.file_index, 5, "resume past highest existing index");
}
}