use std::env;
use std::ffi::OsStr;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use spg_storage::{Catalog, DataType, FreezeReport, FreezeSlice, IndexKind};
use crate::{SHUTDOWN_FLAG, ServerState};
const DEFAULT_TICK_MS: u64 = 1000;
const DEFAULT_BATCH_ROWS: usize = 1_000;
const MIN_TICK_MS: u64 = 10;
#[derive(Debug, Clone, Copy)]
pub(crate) struct FreezerConfig {
pub(crate) tick: Duration,
pub(crate) batch_rows: usize,
pub(crate) disabled: bool,
pub(crate) workers: usize,
}
impl FreezerConfig {
pub(crate) fn from_env() -> Self {
let disabled = env::var("SPG_FREEZER_DISABLE")
.ok()
.is_some_and(|s| !s.is_empty() && s != "0");
let tick_ms = env::var("SPG_FREEZER_TICK_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|&n| n >= MIN_TICK_MS)
.unwrap_or(DEFAULT_TICK_MS);
let batch_rows = env::var("SPG_FREEZER_BATCH_ROWS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(DEFAULT_BATCH_ROWS);
let workers = env::var("SPG_FREEZER_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or_else(default_worker_count);
Self {
tick: Duration::from_millis(tick_ms),
batch_rows,
disabled,
workers,
}
}
}
fn default_worker_count() -> usize {
let cores = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(2);
cores.saturating_sub(2).max(1).min(16)
}
pub(crate) fn spawn(state: Arc<ServerState>) -> Option<JoinHandle<()>> {
let config = FreezerConfig::from_env();
if config.disabled {
return None;
}
let handle = thread::Builder::new()
.name("spg-freezer".into())
.spawn(move || run(&state, config))
.expect("spawn freezer thread");
Some(handle)
}
fn run(state: &ServerState, config: FreezerConfig) {
loop {
if SHUTDOWN_FLAG.load(Ordering::Acquire) {
break;
}
thread::sleep(config.tick);
if SHUTDOWN_FLAG.load(Ordering::Acquire) {
break;
}
if let Err(e) = tick(state, config.batch_rows, config.workers) {
eprintln!("spg-freezer: tick error: {e}");
}
}
}
#[derive(Debug)]
struct FreezeTarget {
table: String,
index: String,
}
fn pick_target(cat: &Catalog) -> Option<FreezeTarget> {
let mut best: Option<(FreezeTarget, u64)> = None;
for name in cat.table_names() {
let Some(t) = cat.get(&name) else { continue };
if t.row_count() == 0 {
continue;
}
let columns = &t.schema().columns;
let Some(idx) = t.indices().iter().find(|i| {
matches!(i.kind, IndexKind::BTree(_))
&& i.column_position < columns.len()
&& matches!(
columns[i.column_position].ty,
DataType::SmallInt | DataType::Int | DataType::BigInt
)
}) else {
continue;
};
let hot = t.hot_bytes();
let candidate = FreezeTarget {
table: name,
index: idx.name.clone(),
};
match best {
None => best = Some((candidate, hot)),
Some((_, best_hot)) if hot > best_hot => best = Some((candidate, hot)),
_ => {}
}
}
best.map(|(t, _)| t)
}
fn tick(state: &ServerState, batch_rows: usize, workers: usize) -> std::io::Result<()> {
let mut engine = state
.engine
.write()
.map_err(|_| std::io::Error::other("engine rwlock poisoned"))?;
if engine.in_transaction() {
return Ok(());
}
let any_table_over = engine
.catalog()
.table_names()
.iter()
.any(|n| {
engine
.catalog()
.get(n)
.and_then(|t| t.schema().hot_tier_bytes.map(|cap| t.hot_bytes() > cap))
.unwrap_or(false)
});
let used = engine.catalog().hot_tier_bytes();
if !any_table_over && used <= state.hot_tier_byte_budget {
return Ok(());
}
let Some(target) = pick_target(engine.catalog()) else {
return Ok(());
};
let row_count = engine
.catalog()
.get(&target.table)
.map_or(0, spg_storage::Table::row_count);
let to_freeze = batch_rows.min(row_count);
if to_freeze == 0 {
return Ok(());
}
let mut new_cat = engine.catalog().clone();
let report = freeze_with_workers(
&mut new_cat,
&target.table,
&target.index,
to_freeze,
workers,
)
.map_err(|e| std::io::Error::other(format!("freeze: {e}")))?;
engine.replace_catalog(new_cat);
state.metrics.cold_segments.store(
engine.catalog().cold_segment_count() as u64,
Ordering::Relaxed,
);
if let Some(db_path) = state.db_path.as_deref() {
let raw_size = report.segment_bytes.len() as u64;
state
.metrics
.segment_bytes_uncompressed_in
.fetch_add(raw_size, std::sync::atomic::Ordering::Relaxed);
match persist_segment(db_path, &report) {
Ok(written_path) => {
if let Ok(written) = std::fs::metadata(&written_path) {
state
.metrics
.segment_bytes_compressed_out
.fetch_add(written.len(), std::sync::atomic::Ordering::Relaxed);
}
if let Ok(mut paths) = state.cold_segment_paths.lock() {
paths.insert(report.segment_id, written_path);
}
}
Err(e) => eprintln!("spg-freezer: segment persist failed: {e}"),
}
}
eprintln!(
"spg-freezer: froze {} rows from {}.{} into seg {} ({} B freed; hot {} → {})",
report.frozen_rows,
target.table,
target.index,
report.segment_id,
report.bytes_freed,
used,
engine.catalog().hot_tier_bytes(),
);
Ok(())
}
fn freeze_with_workers(
cat: &mut Catalog,
table: &str,
index: &str,
to_freeze: usize,
workers: usize,
) -> Result<FreezeReport, spg_storage::StorageError> {
let workers = workers.max(1).min(to_freeze.max(1));
let ranges = partition_range(to_freeze, workers);
let slices: Vec<FreezeSlice> = if workers == 1 {
ranges
.into_iter()
.map(|r| cat.prepare_freeze_slice(table, index, r))
.collect::<Result<Vec<_>, _>>()?
} else {
let cat_ref: &Catalog = cat;
std::thread::scope(|s| {
let handles: Vec<_> = ranges
.into_iter()
.map(|r| s.spawn(move || cat_ref.prepare_freeze_slice(table, index, r)))
.collect();
handles
.into_iter()
.map(|h| h.join().expect("prepare_freeze_slice worker panicked"))
.collect::<Result<Vec<_>, _>>()
})?
};
cat.commit_freeze_slices(table, index, slices)
}
fn partition_range(n: usize, parts: usize) -> Vec<core::ops::Range<usize>> {
let mut out = Vec::with_capacity(parts);
let base = n / parts;
let extra = n % parts;
let mut start = 0;
for i in 0..parts {
let len = base + usize::from(i < extra);
out.push(start..start + len);
start += len;
}
out
}
fn persist_segment(db_path: &Path, report: &FreezeReport) -> std::io::Result<std::path::PathBuf> {
let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
let stem = db_path
.file_stem()
.unwrap_or_else(|| OsStr::new("db"))
.to_string_lossy();
let seg_dir = parent.join(format!("{stem}.spg")).join("segments");
std::fs::create_dir_all(&seg_dir)?;
let final_path = seg_dir.join(format!("seg_{}.spg", report.segment_id));
let tmp_path = seg_dir.join(format!("seg_{}.spg.tmp", report.segment_id));
let bytes_to_write = if segment_compression_enabled() {
spg_storage::wrap_v2_envelope(report.segment_bytes.clone(), true)
} else {
report.segment_bytes.clone()
};
std::fs::write(&tmp_path, &bytes_to_write)?;
std::fs::rename(&tmp_path, &final_path)?;
Ok(final_path)
}
fn segment_compression_enabled() -> bool {
static CHECKED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
*CHECKED.get_or_init(|| {
std::env::var("SPG_SEGMENT_COMPRESSION")
.map_or(true, |v| !v.eq_ignore_ascii_case("none"))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults_match_v5_2_2_ship_gate() {
assert_eq!(DEFAULT_TICK_MS, 1000);
assert_eq!(DEFAULT_BATCH_ROWS, 1_000);
assert_eq!(MIN_TICK_MS, 10);
}
}