#![allow(
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::doc_markdown,
clippy::format_push_string,
clippy::uninlined_format_args
)]
use std::collections::HashSet;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use spg_storage::{DataType, TableSchema};
const DEFAULT_TABLE_METRIC_TOPN: usize = 50;
#[derive(Debug, Default)]
pub struct Metrics {
pub queries_total: AtomicU64,
pub errors_total: AtomicU64,
pub cold_segments: AtomicU64,
pub flusher_iterations: AtomicU64,
pub flusher_errors: AtomicU64,
pub last_durable_wal_offset: AtomicU64,
pub last_fsync_us: AtomicU64,
pub wal_bytes_uncompressed_in: AtomicU64,
pub wal_bytes_compressed_out: AtomicU64,
pub segment_bytes_uncompressed_in: AtomicU64,
pub segment_bytes_compressed_out: AtomicU64,
pub cold_prefetch_hits: AtomicU64,
}
fn json_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len() + 2);
for c in s.chars() {
match c {
'"' => out.push_str("\\\""),
'\\' => out.push_str("\\\\"),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if (c as u32) < 0x20 => out.push_str(&format!("\\u{:04x}", c as u32)),
c => out.push(c),
}
}
out
}
pub fn json_logging_enabled() -> bool {
static CHECKED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
*CHECKED.get_or_init(|| {
std::env::var("SPG_LOG_FORMAT").is_ok_and(|v| v.eq_ignore_ascii_case("json"))
})
}
#[allow(dead_code)] pub fn log_event(level: &str, msg: &str, kvs: &[(&str, &str)]) {
if json_logging_enabled() {
let mut line = format!("{{\"level\":\"{level}\",\"msg\":\"{}\"", json_escape(msg));
for (k, v) in kvs {
line.push_str(&format!(",\"{k}\":\"{}\"", json_escape(v)));
}
line.push_str("}\n");
let _ = std::io::stderr().write_all(line.as_bytes());
} else {
let mut line = format!("spg-server: {msg}");
for (k, v) in kvs {
line.push_str(&format!(" {k}={v}"));
}
line.push('\n');
let _ = std::io::stderr().write_all(line.as_bytes());
}
}
pub fn spawn_http(
addr: &str,
state: Arc<crate::ServerState>,
) -> std::io::Result<std::net::SocketAddr> {
let listener = TcpListener::bind(addr)?;
let local = listener.local_addr()?;
thread::spawn(move || {
for stream in listener.incoming() {
let Ok(stream) = stream else {
continue;
};
let s = Arc::clone(&state);
thread::spawn(move || {
if let Err(e) = handle_http(stream, &s) {
eprintln!("spg-server: /metrics conn error: {e}");
}
});
}
});
Ok(local)
}
fn handle_http(mut stream: TcpStream, state: &crate::ServerState) -> std::io::Result<()> {
let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
let mut buf = Vec::with_capacity(4096);
let mut chunk = [0u8; 1024];
loop {
let n = stream.read(&mut chunk)?;
if n == 0 {
break;
}
buf.extend_from_slice(&chunk[..n]);
if buf.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
if buf.len() > 4 * 1024 {
return write_response(&mut stream, 414, "URI Too Long", "request header too large");
}
}
let req = std::str::from_utf8(&buf).unwrap_or("");
let request_line = req.lines().next().unwrap_or("");
let mut parts = request_line.split_whitespace();
let method = parts.next().unwrap_or("");
let path = parts.next().unwrap_or("");
match (method, path) {
("GET", "/healthz") => write_response(&mut stream, 200, "OK", "ok\n"),
("GET", "/metrics") => {
let body = render_metrics(state);
write_response(&mut stream, 200, "OK", &body)
}
("GET", _) => write_response(&mut stream, 404, "Not Found", "no such path\n"),
_ => write_response(&mut stream, 405, "Method Not Allowed", "GET only\n"),
}
}
fn render_metrics(state: &crate::ServerState) -> String {
let mut out = String::with_capacity(512);
let version = env!("CARGO_PKG_VERSION");
out.push_str("# HELP spg_server_info SPG version build info\n");
out.push_str("# TYPE spg_server_info gauge\n");
out.push_str(&format!("spg_server_info{{version=\"{version}\"}} 1\n"));
out.push_str("# HELP spg_connections_active Current live client connections\n");
out.push_str("# TYPE spg_connections_active gauge\n");
out.push_str(&format!(
"spg_connections_active {}\n",
state.active_connections.load(Ordering::Relaxed)
));
out.push_str("# HELP spg_queries_total Total queries dispatched\n");
out.push_str("# TYPE spg_queries_total counter\n");
out.push_str(&format!(
"spg_queries_total {}\n",
state.metrics.queries_total.load(Ordering::Relaxed)
));
out.push_str("# HELP spg_errors_total Total query errors\n");
out.push_str("# TYPE spg_errors_total counter\n");
out.push_str(&format!(
"spg_errors_total {}\n",
state.metrics.errors_total.load(Ordering::Relaxed)
));
render_table_metrics(state, &mut out);
render_replication_lag(state, &mut out);
render_hot_tier(state, &mut out);
render_cold_tier(state, &mut out);
render_flusher(state, &mut out);
render_durability_lag(state, &mut out);
render_compression(state, &mut out);
out
}
fn render_durability_lag(state: &crate::ServerState, out: &mut String) {
let (lag_bytes, lag_seconds) = if crate::synchronous_commit_disabled() {
compute_durability_lag(state)
} else {
(0u64, 0.0f64)
};
out.push_str(
"# HELP spg_durability_lag_bytes WAL bytes written but not yet covered by a durability_checkpoint marker (v5.4.3)\n",
);
out.push_str("# TYPE spg_durability_lag_bytes gauge\n");
out.push_str(&format!("spg_durability_lag_bytes {lag_bytes}\n"));
out.push_str(
"# HELP spg_durability_lag_seconds Seconds since the flusher's most recent successful sync_data (v5.4.3)\n",
);
out.push_str("# TYPE spg_durability_lag_seconds gauge\n");
out.push_str(&format!("spg_durability_lag_seconds {lag_seconds:.6}\n"));
}
fn render_compression(state: &crate::ServerState, out: &mut String) {
out.push_str(
"# HELP spg_wal_bytes_uncompressed_total Sum of SQL byte counts seen by the WAL encoder since boot (v6.6.3)\n",
);
out.push_str("# TYPE spg_wal_bytes_uncompressed_total counter\n");
out.push_str(&format!(
"spg_wal_bytes_uncompressed_total {}\n",
state.metrics.wal_bytes_uncompressed_in.load(Ordering::Relaxed)
));
out.push_str(
"# HELP spg_wal_bytes_compressed_total Sum of bytes written to the WAL since boot, after compression (v6.6.3)\n",
);
out.push_str("# TYPE spg_wal_bytes_compressed_total counter\n");
out.push_str(&format!(
"spg_wal_bytes_compressed_total {}\n",
state.metrics.wal_bytes_compressed_out.load(Ordering::Relaxed)
));
out.push_str(
"# HELP spg_segment_bytes_uncompressed_total Sum of cold-tier segment v1 bytes the freezer produced (v6.6.3)\n",
);
out.push_str("# TYPE spg_segment_bytes_uncompressed_total counter\n");
out.push_str(&format!(
"spg_segment_bytes_uncompressed_total {}\n",
state.metrics.segment_bytes_uncompressed_in.load(Ordering::Relaxed)
));
out.push_str(
"# HELP spg_segment_bytes_compressed_total Sum of bytes actually written to disk for cold-tier segments (v6.6.3)\n",
);
out.push_str("# TYPE spg_segment_bytes_compressed_total counter\n");
out.push_str(&format!(
"spg_segment_bytes_compressed_total {}\n",
state.metrics.segment_bytes_compressed_out.load(Ordering::Relaxed)
));
out.push_str(
"# HELP spg_cold_prefetch_hits_total Cold-segment files loaded via the boot-time prefetch worker pool (v6.7.6)\n",
);
out.push_str("# TYPE spg_cold_prefetch_hits_total counter\n");
out.push_str(&format!(
"spg_cold_prefetch_hits_total {}\n",
state.metrics.cold_prefetch_hits.load(Ordering::Relaxed)
));
}
fn compute_durability_lag(state: &crate::ServerState) -> (u64, f64) {
let durable_offset = state
.metrics
.last_durable_wal_offset
.load(Ordering::Relaxed);
let current_wal_len = state
.wal
.as_ref()
.and_then(|m| m.lock().ok())
.and_then(|f| f.metadata().ok())
.map_or(0, |md| md.len());
let lag_bytes = current_wal_len.saturating_sub(durable_offset);
let last_us = state.metrics.last_fsync_us.load(Ordering::Relaxed);
let lag_seconds = if last_us == 0 {
0.0
} else {
let now_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| u64::try_from(d.as_micros()).ok())
.unwrap_or(last_us);
(now_us.saturating_sub(last_us)) as f64 / 1_000_000.0
};
(lag_bytes, lag_seconds)
}
fn render_flusher(state: &crate::ServerState, out: &mut String) {
out.push_str(
"# HELP spg_flusher_iterations_total Successful durability_checkpoint emissions by the async-commit flusher (v5.4.1)\n",
);
out.push_str("# TYPE spg_flusher_iterations_total counter\n");
out.push_str(&format!(
"spg_flusher_iterations_total {}\n",
state.metrics.flusher_iterations.load(Ordering::Relaxed)
));
out.push_str(
"# HELP spg_flusher_errors_total Flusher iterations that failed to append a durability marker (v5.4.1)\n",
);
out.push_str("# TYPE spg_flusher_errors_total counter\n");
out.push_str(&format!(
"spg_flusher_errors_total {}\n",
state.metrics.flusher_errors.load(Ordering::Relaxed)
));
}
fn render_cold_tier(state: &crate::ServerState, out: &mut String) {
out.push_str(
"# HELP spg_cold_segments_total Cold-tier segments registered on the engine catalog (v5.2.2)\n",
);
out.push_str("# TYPE spg_cold_segments_total gauge\n");
out.push_str(&format!(
"spg_cold_segments_total {}\n",
state.metrics.cold_segments.load(Ordering::Relaxed)
));
}
fn render_hot_tier(state: &crate::ServerState, out: &mut String) {
let used = match state.engine.read() {
Ok(engine) => engine.catalog().hot_tier_bytes(),
Err(_) => return,
};
out.push_str("# HELP spg_hot_tier_bytes_used Encoded byte size of hot-tier rows (v5.2.1)\n");
out.push_str("# TYPE spg_hot_tier_bytes_used gauge\n");
out.push_str(&format!("spg_hot_tier_bytes_used {used}\n"));
out.push_str(
"# HELP spg_hot_tier_bytes_budget Hot-tier byte budget configured via SPG_HOT_TIER_BYTES\n",
);
out.push_str("# TYPE spg_hot_tier_bytes_budget gauge\n");
out.push_str(&format!(
"spg_hot_tier_bytes_budget {}\n",
state.hot_tier_byte_budget
));
}
fn render_replication_lag(state: &crate::ServerState, out: &mut String) {
let primary_pos = state.lag_state.primary_pos.load(Ordering::Acquire);
let primary_wall = state.lag_state.primary_wall_time_us.load(Ordering::Acquire);
if primary_wall == 0 {
return;
}
let applied = state.lag_state.follower_applied_pos.load(Ordering::Acquire);
let lag_bytes = primary_pos.saturating_sub(applied);
let now_us = u64::try_from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_micros()),
)
.unwrap_or(0);
let lag_us = now_us.saturating_sub(primary_wall);
#[allow(clippy::cast_precision_loss)]
let lag_seconds = (lag_us as f64) / 1_000_000.0;
out.push_str("# HELP spg_replication_lag_bytes WAL bytes follower is behind primary (v4.36 status frame)\n");
out.push_str("# TYPE spg_replication_lag_bytes gauge\n");
out.push_str(&format!("spg_replication_lag_bytes {lag_bytes}\n"));
out.push_str(
"# HELP spg_replication_lag_seconds Wall-clock seconds since primary's last status frame\n",
);
out.push_str("# TYPE spg_replication_lag_seconds gauge\n");
out.push_str(&format!("spg_replication_lag_seconds {lag_seconds}\n"));
}
fn render_table_metrics(state: &crate::ServerState, out: &mut String) {
let Ok(engine) = state.engine.read() else {
return;
};
let catalog = engine.catalog();
let allowlist: Option<HashSet<String>> = std::env::var("SPG_METRICS_TABLE_ALLOWLIST")
.ok()
.filter(|s| !s.is_empty())
.map(|s| {
s.split(',')
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty())
.collect()
});
let topn = std::env::var("SPG_METRICS_TABLE_TOPN")
.ok()
.and_then(|s| s.trim().parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(DEFAULT_TABLE_METRIC_TOPN);
let mut entries: Vec<(String, u64, u64)> = catalog
.table_names()
.into_iter()
.filter_map(|name| {
let table = catalog.get(&name)?;
let rows = table.row_count() as u64;
let bytes = rows.saturating_mul(approx_row_bytes(table.schema()));
Some((name, rows, bytes))
})
.filter(|(name, _, _)| match &allowlist {
Some(set) => set.contains(name),
None => true,
})
.collect();
if allowlist.is_none() {
entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
entries.truncate(topn);
} else {
entries.sort_by(|a, b| a.0.cmp(&b.0));
}
out.push_str("# HELP spg_table_rows Live row count per user table\n");
out.push_str("# TYPE spg_table_rows gauge\n");
for (name, rows, _) in &entries {
out.push_str(&format!(
"spg_table_rows{{table=\"{}\"}} {rows}\n",
metric_label_escape(name)
));
}
out.push_str("# HELP spg_table_bytes Approximate on-disk byte size per user table (rows × schema width)\n");
out.push_str("# TYPE spg_table_bytes gauge\n");
for (name, _, bytes) in &entries {
out.push_str(&format!(
"spg_table_bytes{{table=\"{}\"}} {bytes}\n",
metric_label_escape(name)
));
}
}
fn approx_row_bytes(schema: &TableSchema) -> u64 {
schema
.columns
.iter()
.map(|c| -> u64 {
match c.ty {
DataType::SmallInt => 2,
DataType::Int => 4,
DataType::BigInt
| DataType::Date
| DataType::Timestamp
| DataType::Timestamptz
| DataType::Float => 8,
DataType::Bool => 1,
DataType::Char(n) => u64::from(n),
DataType::Varchar(n) => u64::from(n).max(1) / 2,
DataType::Text | DataType::Json | DataType::Jsonb => 64,
DataType::Bytes => 64,
DataType::Numeric { .. } | DataType::Interval => 16,
DataType::Vector { dim, .. } => u64::from(dim).saturating_mul(4),
}
})
.sum()
}
fn metric_label_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
c => out.push(c),
}
}
out
}
fn write_response(
stream: &mut TcpStream,
code: u16,
reason: &str,
body: &str,
) -> std::io::Result<()> {
let response = format!(
"HTTP/1.1 {code} {reason}\r\nContent-Type: text/plain; charset=utf-8\r\n\
Content-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream.write_all(response.as_bytes())
}