use std::env;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crate::{SHUTDOWN_FLAG, ServerState, append_durability_marker};
pub(crate) const DEFAULT_INTERVAL_US: u64 = 200;
const MIN_INTERVAL_US: u64 = 10;
const SHUTDOWN_POLL_CAP: Duration = Duration::from_millis(50);
#[derive(Debug, Clone, Copy)]
pub(crate) struct FlusherConfig {
pub(crate) interval: Duration,
pub(crate) async_mode: bool,
}
impl FlusherConfig {
pub(crate) fn from_env() -> Self {
let async_mode = crate::synchronous_commit_disabled();
let interval_us = env::var("SPG_FLUSHER_INTERVAL_US")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|&n| n >= MIN_INTERVAL_US)
.unwrap_or(DEFAULT_INTERVAL_US);
Self {
interval: Duration::from_micros(interval_us),
async_mode,
}
}
}
pub(crate) fn spawn(state: Arc<ServerState>) -> Option<JoinHandle<()>> {
let config = FlusherConfig::from_env();
if !config.async_mode {
return None;
}
let handle = thread::Builder::new()
.name("spg-flusher".into())
.spawn(move || run(&state, config))
.expect("spawn flusher thread");
Some(handle)
}
fn run(state: &ServerState, config: FlusherConfig) {
let mut last_tick = Instant::now();
loop {
if SHUTDOWN_FLAG.load(Ordering::Acquire) {
break;
}
let elapsed = last_tick.elapsed();
if elapsed < config.interval {
let remaining = config.interval.saturating_sub(elapsed);
thread::sleep(remaining.min(SHUTDOWN_POLL_CAP));
continue;
}
last_tick = Instant::now();
match append_durability_marker(state) {
Ok(pre_marker_offset) => {
const MARKER_FRAME_BYTES: u64 = 17;
let post_marker_offset = pre_marker_offset.saturating_add(MARKER_FRAME_BYTES);
state
.metrics
.last_durable_wal_offset
.store(post_marker_offset, Ordering::Relaxed);
let now_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| u64::try_from(d.as_micros()).ok())
.unwrap_or(u64::MAX);
state.metrics.last_fsync_us.store(now_us, Ordering::Relaxed);
state
.metrics
.flusher_iterations
.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
eprintln!("spg-flusher: append_durability_marker failed: {e}");
state.metrics.flusher_errors.fetch_add(1, Ordering::Relaxed);
}
}
}
}