use std::fs;
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use spg_wire::{build_command_complete, build_error_response};
use crate::wire::write_frame;
use crate::{ServerState, backup, parse_env_u64, write_atomic, write_manifest_alongside};
pub(crate) fn render_stats(state: &ServerState) -> std::io::Result<String> {
use std::fmt::Write as _;
let engine = state
.engine
.read()
.map_err(|_| std::io::Error::other("engine rwlock poisoned"))?;
let audit = state
.audit_log
.lock()
.map_err(|_| std::io::Error::other("audit mutex poisoned"))?;
let catalog = engine.catalog();
let mut out = String::new();
writeln!(out, "spg_version={}", env!("CARGO_PKG_VERSION")).unwrap();
writeln!(out, "tables={}", catalog.table_count()).unwrap();
for i in 0..catalog.table_count() {
let _ = i; }
writeln!(out, "in_transaction={}", engine.in_transaction()).unwrap();
writeln!(out, "audit_entries={}", audit.len()).unwrap();
writeln!(
out,
"db_path={}",
state
.db_path
.as_deref()
.map_or("<in-memory>".to_string(), |p| p.display().to_string())
)
.unwrap();
writeln!(
out,
"audit_path={}",
state
.audit_path
.as_deref()
.map_or("<disabled>".to_string(), |p| p.display().to_string())
)
.unwrap();
writeln!(
out,
"wal_path={}",
state
.wal_path
.as_deref()
.map_or("<disabled>".to_string(), |p| p.display().to_string())
)
.unwrap();
Ok(out)
}
pub(crate) fn parse_backup_intent(sql: &str) -> Option<BackupIntent> {
let trimmed = sql.trim().trim_end_matches(';').trim();
let lower = trimmed.to_ascii_lowercase();
let after_prefix = lower
.strip_prefix("backup ")?
.trim_start()
.strip_prefix("to ")?
.trim_start();
let prefix_consumed = lower.len() - after_prefix.len();
if !trimmed[prefix_consumed..].starts_with('\'') {
return None;
}
let after_open = &trimmed[prefix_consumed + 1..];
let close = after_open.find('\'')?;
let path = after_open[..close].to_string();
let tail = after_open[close + 1..].trim().to_ascii_lowercase();
if tail.is_empty() {
return Some(BackupIntent::Full { path });
}
let since_str = tail
.strip_prefix("incremental ")?
.trim_start()
.strip_prefix("since ")?
.trim_start();
let since: u64 = since_str.parse().ok()?;
Some(BackupIntent::Incremental { path, since })
}
#[derive(Debug)]
pub(crate) enum BackupIntent {
Full { path: String },
Incremental { path: String, since: u64 },
}
pub(crate) fn parse_checkpoint_intent(sql: &str) -> bool {
let trimmed = sql.trim().trim_end_matches(';').trim();
trimmed.eq_ignore_ascii_case("checkpoint")
}
pub(crate) fn run_checkpoint_command(
stream: &mut TcpStream,
state: &ServerState,
) -> std::io::Result<()> {
let Some(db_path) = state.db_path.as_deref() else {
return write_frame(
stream,
&build_error_response("CHECKPOINT requires a db_path (server started without one)"),
);
};
let snapshot_bytes = {
let engine = state
.engine
.write()
.map_err(|_| std::io::Error::other("engine rwlock poisoned"))?;
if engine.in_transaction() {
return write_frame(
stream,
&build_error_response("CHECKPOINT refused: an open transaction is in flight"),
);
}
let bytes = engine.snapshot();
drop(engine);
bytes
};
if let Err(e) = write_atomic(db_path, &snapshot_bytes) {
return write_frame(
stream,
&build_error_response(&format!("CHECKPOINT snapshot write failed: {e}")),
);
}
let cold_paths = state
.cold_segment_paths
.lock()
.map(|g| g.clone())
.unwrap_or_default();
write_manifest_alongside(db_path, &snapshot_bytes, &cold_paths, 0);
if let Some(wal_mutex) = state.wal.as_ref() {
let wal_lock = wal_mutex
.lock()
.map_err(|_| std::io::Error::other("WAL mutex poisoned"))?;
if let Err(e) = wal_lock.set_len(0) {
return write_frame(
stream,
&build_error_response(&format!("CHECKPOINT WAL truncate failed: {e}")),
);
}
if let Err(e) = wal_lock.sync_data() {
return write_frame(
stream,
&build_error_response(&format!("CHECKPOINT WAL sync failed: {e}")),
);
}
drop(wal_lock);
}
write_frame(stream, &build_command_complete(0))
}
pub(crate) fn parse_compact_cold_segments_intent(sql: &str) -> bool {
let trimmed = sql.trim().trim_end_matches(';').trim();
let mut parts = trimmed.split_whitespace();
matches!(parts.next(), Some(w) if w.eq_ignore_ascii_case("compact"))
&& matches!(parts.next(), Some(w) if w.eq_ignore_ascii_case("cold"))
&& matches!(parts.next(), Some(w) if w.eq_ignore_ascii_case("segments"))
&& parts.next().is_none()
}
pub(crate) fn compaction_target_bytes() -> u64 {
static CHECKED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
*CHECKED.get_or_init(|| {
parse_env_u64("SPG_COMPACTION_TARGET_SEGMENT_BYTES")
.unwrap_or(spg_engine::COMPACTION_TARGET_DEFAULT_BYTES)
})
}
pub(crate) fn run_compact_cold_segments_command(
stream: &mut TcpStream,
state: &ServerState,
) -> std::io::Result<()> {
let target = compaction_target_bytes();
let reports = {
let mut engine = state
.engine
.write()
.map_err(|_| std::io::Error::other("engine rwlock poisoned"))?;
if engine.in_transaction() {
return write_frame(
stream,
&build_error_response(
"COMPACT COLD SEGMENTS refused: an open transaction is in flight",
),
);
}
match engine.compact_cold_segments_with_target(target) {
Ok(r) => r,
Err(e) => {
return write_frame(
stream,
&build_error_response(&format!("COMPACT COLD SEGMENTS failed: {e:?}")),
);
}
}
};
let merged_count = reports.len();
if let Some(db_path) = state.db_path.as_deref() {
for (_tname, _iname, report) in &reports {
let Some(merged_id) = report.merged_segment_id else {
continue;
};
match persist_compact_merged_segment(db_path, merged_id, &report.merged_segment_bytes) {
Ok(merged_path) => {
if let Ok(mut paths) = state.cold_segment_paths.lock() {
for src in &report.sources {
paths.remove(src);
}
paths.insert(merged_id, merged_path);
}
}
Err(e) => {
eprintln!(
"spg-server: COMPACT persist of merged segment {merged_id} failed: {e}"
);
}
}
}
state.metrics.cold_segments.store(
state
.engine
.read()
.ok()
.map(|e| e.catalog().cold_segment_count() as u64)
.unwrap_or(0),
std::sync::atomic::Ordering::Relaxed,
);
}
write_frame(stream, &build_command_complete(merged_count as u64))
}
pub(crate) fn persist_compact_merged_segment(
db_path: &Path,
merged_id: u32,
merged_segment_bytes: &[u8],
) -> std::io::Result<PathBuf> {
let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
let stem = db_path
.file_stem()
.unwrap_or_else(|| std::ffi::OsStr::new("db"))
.to_string_lossy();
let seg_dir = parent.join(format!("{stem}.spg")).join("segments");
fs::create_dir_all(&seg_dir)?;
let final_path = seg_dir.join(format!("seg_{merged_id}.spg"));
let tmp_path = seg_dir.join(format!("seg_{merged_id}.spg.tmp"));
let bytes_to_write = if std::env::var("SPG_SEGMENT_COMPRESSION")
.map_or(true, |v| !v.eq_ignore_ascii_case("none"))
{
spg_storage::wrap_v2_envelope(merged_segment_bytes.to_vec(), true)
} else {
merged_segment_bytes.to_vec()
};
fs::write(&tmp_path, &bytes_to_write)?;
fs::rename(&tmp_path, &final_path)?;
Ok(final_path)
}
pub(crate) fn run_backup_command(
stream: &mut TcpStream,
state: &ServerState,
intent: &BackupIntent,
) -> std::io::Result<()> {
let result = match intent {
BackupIntent::Full { path } => backup::take_full_backup(state, Path::new(path)),
BackupIntent::Incremental { path, since } => {
backup::take_incremental_backup(state, Path::new(path), *since)
}
};
match result {
Ok(wal_pos) => write_frame(stream, &build_command_complete(wal_pos)),
Err(e) => write_frame(
stream,
&build_error_response(&format!("backup failed: {e}")),
),
}
}