spg-server 7.11.5

SPG daemon — listens for self-built wire-frame connections and PG-wire (libpq-compatible).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
// HTTP framing here is one-shot per request — minimal hand-rolled
// parser, no streaming, no chunked encoding. The lints we allow
// here would otherwise nag every byte-cast.
#![allow(
    clippy::cast_lossless,
    clippy::cast_possible_truncation,
    clippy::cast_possible_wrap,
    clippy::cast_precision_loss,
    clippy::cast_sign_loss,
    clippy::doc_markdown,
    clippy::format_push_string,
    clippy::uninlined_format_args
)]

//! v4.13 observability surface: structured logging + `/healthz` +
//! Prometheus `/metrics`.
//!
//! Logging:
//! - `SPG_LOG_FORMAT=json` → every server event goes out as a
//!   single-line JSON object on stderr instead of human-readable
//!   text. Suitable for loki / cloudwatch / datadog ingestion.
//! - default (env unset or any other value) → existing
//!   `eprintln!`-style text. Backwards-compatible.
//!
//! HTTP listener:
//! - Opt-in via `SPG_HTTP_ADDR=host:port`. When set, a tiny
//!   single-threaded HTTP/1.1 listener handles two endpoints:
//!     - `GET /healthz` → 200 "ok"
//!     - `GET /metrics` → 200 with Prometheus exposition-format
//!       counters (`spg_connections_active`, `spg_queries_total`,
//!       `spg_errors_total`, `spg_server_info`).
//! - The HTTP loop is single-threaded — the metrics endpoint is a
//!   handful of atomic loads; even a busy server doesn't need
//!   more.

use std::collections::HashSet;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;

use spg_storage::{DataType, TableSchema};

/// v4.35 per-table metrics: cap on how many tables get
/// `spg_table_rows{...}` + `spg_table_bytes{...}` exported when no
/// explicit allowlist is set. Defaults to 50 to keep Prometheus
/// cardinality bounded for tenants with thousands of tables.
/// Operators raise it explicitly via `SPG_METRICS_TABLE_TOPN`.
const DEFAULT_TABLE_METRIC_TOPN: usize = 50;

/// Atomic counters surfaced via the `/metrics` endpoint. Cheap to
/// update from anywhere — every increment is a single Relaxed
/// atomic operation.
#[derive(Debug, Default)]
pub struct Metrics {
    pub queries_total: AtomicU64,
    pub errors_total: AtomicU64,
    /// v5.2.2: number of cold-tier segments currently registered on
    /// the engine catalog (sum across tables). Updated by the
    /// freezer thread after each successful demotion. Exposed via
    /// `spg_cold_segments_total`.
    pub cold_segments: AtomicU64,
    /// v5.4.1: total `durability_checkpoint` markers the flusher
    /// thread has successfully appended in async-commit mode.
    /// Stays at 0 in sync-commit mode (the flusher isn't spawned).
    /// Exposed via `spg_flusher_iterations_total`.
    pub flusher_iterations: AtomicU64,
    /// v5.4.1: total flusher-thread iterations that failed (WAL
    /// quota exceeded, ENOSPC, mutex poisoned). Pairs with
    /// `flusher_iterations`: a rising errors counter against a
    /// flatline iterations counter is the operator's signal that
    /// the WAL volume needs attention. Exposed via
    /// `spg_flusher_errors_total`.
    pub flusher_errors: AtomicU64,
    /// v5.4.3: WAL byte offset confirmed durable by the most
    /// recent `durability_checkpoint` marker the flusher
    /// emitted. Updated by the flusher after `sync_data` returns
    /// `Ok(())`; stays at 0 in sync-commit mode (the flusher
    /// isn't spawned). Combined with the current WAL file length
    /// at `/metrics` render time, derives `spg_durability_lag_bytes`.
    pub last_durable_wal_offset: AtomicU64,
    /// v5.4.3: wall-clock microseconds-since-epoch the most
    /// recent successful flusher `sync_data` completed. Updated
    /// alongside `last_durable_wal_offset`; 0 means "no flush
    /// has happened in this process lifetime" (either sync mode
    /// or the flusher hasn't ticked yet). Derives
    /// `spg_durability_lag_seconds` at render time.
    pub last_fsync_us: AtomicU64,
    /// v6.6.3 — sum of SQL byte counts seen by the WAL encoder
    /// since boot, before compression is applied. Derives the
    /// `spg_wal_bytes_uncompressed_total` series. Pairs with
    /// `wal_bytes_compressed_out` for the ratio computation.
    pub wal_bytes_uncompressed_in: AtomicU64,
    /// v6.6.3 — sum of bytes written to the WAL since boot, after
    /// compression. Derives `spg_wal_bytes_compressed_total`.
    pub wal_bytes_compressed_out: AtomicU64,
    /// v6.6.3 — sum of cold-tier segment v1 bytes the freezer
    /// produced since boot, BEFORE the v2 envelope. Derives
    /// `spg_segment_bytes_uncompressed_total`.
    pub segment_bytes_uncompressed_in: AtomicU64,
    /// v6.6.3 — sum of bytes actually written to disk for cold-tier
    /// segments since boot. Equals `segment_bytes_uncompressed_in`
    /// when SPG_SEGMENT_COMPRESSION=none. Derives
    /// `spg_segment_bytes_compressed_total`.
    pub segment_bytes_compressed_out: AtomicU64,
    /// v6.7.6 — number of cold-segment files the boot-time
    /// prefetch worker pool successfully read off disk (Linux:
    /// also `posix_fadvise(WILLNEED)`'d to seed the page cache).
    /// Increments by 1 per segment regardless of size. Derives
    /// `spg_cold_prefetch_hits_total`. A boot that loads N
    /// manifest-listed cold segments lands `N` hits; reconnects
    /// and CHECKPOINTs don't touch this counter.
    pub cold_prefetch_hits: AtomicU64,
}

/// JSON-safe escape: replace `"`, `\\`, and control characters per
/// RFC 8259. Used by the logging path so a SQL string with quotes
/// doesn't break the log line.
fn json_escape(s: &str) -> String {
    let mut out = String::with_capacity(s.len() + 2);
    for c in s.chars() {
        match c {
            '"' => out.push_str("\\\""),
            '\\' => out.push_str("\\\\"),
            '\n' => out.push_str("\\n"),
            '\r' => out.push_str("\\r"),
            '\t' => out.push_str("\\t"),
            c if (c as u32) < 0x20 => out.push_str(&format!("\\u{:04x}", c as u32)),
            c => out.push(c),
        }
    }
    out
}

/// True when the server should emit JSON-formatted log lines.
/// Cheap enough to check per call site — the env var is read once
/// per process anyway (cached at module load).
pub fn json_logging_enabled() -> bool {
    static CHECKED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
    *CHECKED.get_or_init(|| {
        std::env::var("SPG_LOG_FORMAT").is_ok_and(|v| v.eq_ignore_ascii_case("json"))
    })
}

/// Emit a log line. When `SPG_LOG_FORMAT=json`, encode as
/// `{"level":"...","msg":"...","key":"val",...}\n`; otherwise
/// match the prior `eprintln!("spg-server: <msg> ...")` shape.
#[allow(dead_code)] // wire-up to startup/auth events queued for follow-up commits
pub fn log_event(level: &str, msg: &str, kvs: &[(&str, &str)]) {
    if json_logging_enabled() {
        let mut line = format!("{{\"level\":\"{level}\",\"msg\":\"{}\"", json_escape(msg));
        for (k, v) in kvs {
            line.push_str(&format!(",\"{k}\":\"{}\"", json_escape(v)));
        }
        line.push_str("}\n");
        let _ = std::io::stderr().write_all(line.as_bytes());
    } else {
        let mut line = format!("spg-server: {msg}");
        for (k, v) in kvs {
            line.push_str(&format!(" {k}={v}"));
        }
        line.push('\n');
        let _ = std::io::stderr().write_all(line.as_bytes());
    }
}

/// Spawn the `/healthz` + `/metrics` HTTP listener. Returns the
/// bound address (useful for tests that use `127.0.0.1:0` to pick
/// a free port).
pub fn spawn_http(
    addr: &str,
    state: Arc<crate::ServerState>,
) -> std::io::Result<std::net::SocketAddr> {
    let listener = TcpListener::bind(addr)?;
    let local = listener.local_addr()?;
    thread::spawn(move || {
        for stream in listener.incoming() {
            let Ok(stream) = stream else {
                continue;
            };
            let s = Arc::clone(&state);
            thread::spawn(move || {
                if let Err(e) = handle_http(stream, &s) {
                    eprintln!("spg-server: /metrics conn error: {e}");
                }
            });
        }
    });
    Ok(local)
}

fn handle_http(mut stream: TcpStream, state: &crate::ServerState) -> std::io::Result<()> {
    // Read up to 4 KiB of request — enough for any reasonable
    // GET. We stop at the blank-line CRLF terminator.
    let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
    let mut buf = Vec::with_capacity(4096);
    let mut chunk = [0u8; 1024];
    loop {
        let n = stream.read(&mut chunk)?;
        if n == 0 {
            break;
        }
        buf.extend_from_slice(&chunk[..n]);
        if buf.windows(4).any(|w| w == b"\r\n\r\n") {
            break;
        }
        if buf.len() > 4 * 1024 {
            return write_response(&mut stream, 414, "URI Too Long", "request header too large");
        }
    }
    let req = std::str::from_utf8(&buf).unwrap_or("");
    let request_line = req.lines().next().unwrap_or("");
    let mut parts = request_line.split_whitespace();
    let method = parts.next().unwrap_or("");
    let path = parts.next().unwrap_or("");
    match (method, path) {
        ("GET", "/healthz") => write_response(&mut stream, 200, "OK", "ok\n"),
        ("GET", "/metrics") => {
            let body = render_metrics(state);
            write_response(&mut stream, 200, "OK", &body)
        }
        ("GET", _) => write_response(&mut stream, 404, "Not Found", "no such path\n"),
        _ => write_response(&mut stream, 405, "Method Not Allowed", "GET only\n"),
    }
}

fn render_metrics(state: &crate::ServerState) -> String {
    let mut out = String::with_capacity(512);
    let version = env!("CARGO_PKG_VERSION");
    out.push_str("# HELP spg_server_info SPG version build info\n");
    out.push_str("# TYPE spg_server_info gauge\n");
    out.push_str(&format!("spg_server_info{{version=\"{version}\"}} 1\n"));
    out.push_str("# HELP spg_connections_active Current live client connections\n");
    out.push_str("# TYPE spg_connections_active gauge\n");
    out.push_str(&format!(
        "spg_connections_active {}\n",
        state.active_connections.load(Ordering::Relaxed)
    ));
    out.push_str("# HELP spg_queries_total Total queries dispatched\n");
    out.push_str("# TYPE spg_queries_total counter\n");
    out.push_str(&format!(
        "spg_queries_total {}\n",
        state.metrics.queries_total.load(Ordering::Relaxed)
    ));
    out.push_str("# HELP spg_errors_total Total query errors\n");
    out.push_str("# TYPE spg_errors_total counter\n");
    out.push_str(&format!(
        "spg_errors_total {}\n",
        state.metrics.errors_total.load(Ordering::Relaxed)
    ));
    render_table_metrics(state, &mut out);
    render_replication_lag(state, &mut out);
    render_hot_tier(state, &mut out);
    render_cold_tier(state, &mut out);
    render_flusher(state, &mut out);
    render_durability_lag(state, &mut out);
    render_compression(state, &mut out);
    out
}

/// v5.4.3 — durability lag metrics. In sync-commit mode both
/// series are reported as 0 (every write is fsynced before the
/// client ack, so the lag is structurally bounded by one fsync
/// latency — sub-millisecond — and not worth a per-write atomic
/// store on the hot path). In async-commit mode the metrics are
/// derived from `last_durable_wal_offset` + `last_fsync_us`,
/// which the flusher thread updates after every successful
/// `sync_data`. Operators can alert on `lag_bytes` growth to
/// detect a stuck flusher even while `flusher_iterations_total`
/// keeps counting (a tick that fails to grab the WAL mutex
/// could spin without making forward progress; not the current
/// behaviour but defended against by the metric).
fn render_durability_lag(state: &crate::ServerState, out: &mut String) {
    let (lag_bytes, lag_seconds) = if crate::synchronous_commit_disabled() {
        compute_durability_lag(state)
    } else {
        (0u64, 0.0f64)
    };
    out.push_str(
        "# HELP spg_durability_lag_bytes WAL bytes written but not yet covered by a durability_checkpoint marker (v5.4.3)\n",
    );
    out.push_str("# TYPE spg_durability_lag_bytes gauge\n");
    out.push_str(&format!("spg_durability_lag_bytes {lag_bytes}\n"));
    out.push_str(
        "# HELP spg_durability_lag_seconds Seconds since the flusher's most recent successful sync_data (v5.4.3)\n",
    );
    out.push_str("# TYPE spg_durability_lag_seconds gauge\n");
    out.push_str(&format!("spg_durability_lag_seconds {lag_seconds:.6}\n"));
}

/// v6.6.3 — compression ratio series.
fn render_compression(state: &crate::ServerState, out: &mut String) {
    out.push_str(
        "# HELP spg_wal_bytes_uncompressed_total Sum of SQL byte counts seen by the WAL encoder since boot (v6.6.3)\n",
    );
    out.push_str("# TYPE spg_wal_bytes_uncompressed_total counter\n");
    out.push_str(&format!(
        "spg_wal_bytes_uncompressed_total {}\n",
        state.metrics.wal_bytes_uncompressed_in.load(Ordering::Relaxed)
    ));
    out.push_str(
        "# HELP spg_wal_bytes_compressed_total Sum of bytes written to the WAL since boot, after compression (v6.6.3)\n",
    );
    out.push_str("# TYPE spg_wal_bytes_compressed_total counter\n");
    out.push_str(&format!(
        "spg_wal_bytes_compressed_total {}\n",
        state.metrics.wal_bytes_compressed_out.load(Ordering::Relaxed)
    ));
    out.push_str(
        "# HELP spg_segment_bytes_uncompressed_total Sum of cold-tier segment v1 bytes the freezer produced (v6.6.3)\n",
    );
    out.push_str("# TYPE spg_segment_bytes_uncompressed_total counter\n");
    out.push_str(&format!(
        "spg_segment_bytes_uncompressed_total {}\n",
        state.metrics.segment_bytes_uncompressed_in.load(Ordering::Relaxed)
    ));
    out.push_str(
        "# HELP spg_segment_bytes_compressed_total Sum of bytes actually written to disk for cold-tier segments (v6.6.3)\n",
    );
    out.push_str("# TYPE spg_segment_bytes_compressed_total counter\n");
    out.push_str(&format!(
        "spg_segment_bytes_compressed_total {}\n",
        state.metrics.segment_bytes_compressed_out.load(Ordering::Relaxed)
    ));
    out.push_str(
        "# HELP spg_cold_prefetch_hits_total Cold-segment files loaded via the boot-time prefetch worker pool (v6.7.6)\n",
    );
    out.push_str("# TYPE spg_cold_prefetch_hits_total counter\n");
    out.push_str(&format!(
        "spg_cold_prefetch_hits_total {}\n",
        state.metrics.cold_prefetch_hits.load(Ordering::Relaxed)
    ));
}

fn compute_durability_lag(state: &crate::ServerState) -> (u64, f64) {
    let durable_offset = state
        .metrics
        .last_durable_wal_offset
        .load(Ordering::Relaxed);
    let current_wal_len = state
        .wal
        .as_ref()
        .and_then(|m| m.lock().ok())
        .and_then(|f| f.metadata().ok())
        .map_or(0, |md| md.len());
    let lag_bytes = current_wal_len.saturating_sub(durable_offset);
    let last_us = state.metrics.last_fsync_us.load(Ordering::Relaxed);
    let lag_seconds = if last_us == 0 {
        0.0
    } else {
        let now_us = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .ok()
            .and_then(|d| u64::try_from(d.as_micros()).ok())
            .unwrap_or(last_us);
        (now_us.saturating_sub(last_us)) as f64 / 1_000_000.0
    };
    (lag_bytes, lag_seconds)
}

/// v5.4.1 — async-commit flusher counters. Always rendered (even
/// in sync-commit mode where both stay at 0) so a Prometheus
/// dashboard tracking these series doesn't need conditional
/// queries; the zeros are themselves the "sync mode confirmed"
/// signal.
fn render_flusher(state: &crate::ServerState, out: &mut String) {
    out.push_str(
        "# HELP spg_flusher_iterations_total Successful durability_checkpoint emissions by the async-commit flusher (v5.4.1)\n",
    );
    out.push_str("# TYPE spg_flusher_iterations_total counter\n");
    out.push_str(&format!(
        "spg_flusher_iterations_total {}\n",
        state.metrics.flusher_iterations.load(Ordering::Relaxed)
    ));
    out.push_str(
        "# HELP spg_flusher_errors_total Flusher iterations that failed to append a durability marker (v5.4.1)\n",
    );
    out.push_str("# TYPE spg_flusher_errors_total counter\n");
    out.push_str(&format!(
        "spg_flusher_errors_total {}\n",
        state.metrics.flusher_errors.load(Ordering::Relaxed)
    ));
}

/// v5.2.2 — cold-tier segment count. Tracked separately from
/// `render_table_metrics` because segments are catalog-global
/// (`Catalog::cold_segments`), not per-table; a cardinality concern
/// doesn't apply.
fn render_cold_tier(state: &crate::ServerState, out: &mut String) {
    out.push_str(
        "# HELP spg_cold_segments_total Cold-tier segments registered on the engine catalog (v5.2.2)\n",
    );
    out.push_str("# TYPE spg_cold_segments_total gauge\n");
    out.push_str(&format!(
        "spg_cold_segments_total {}\n",
        state.metrics.cold_segments.load(Ordering::Relaxed)
    ));
}

/// v5.2.1 — hot-tier byte counters. `spg_hot_tier_bytes_used` is the
/// catalog-wide sum of every table's `hot_bytes()` (the encoded byte
/// size of currently in-RAM rows). `spg_hot_tier_bytes_budget` is the
/// configured cap (`SPG_HOT_TIER_BYTES`, default 4 GiB). v5.2.1 ships
/// these as measurement only — v5.2.2 will use the same `used / budget`
/// comparison to wake the freezer thread.
fn render_hot_tier(state: &crate::ServerState, out: &mut String) {
    let used = match state.engine.read() {
        Ok(engine) => engine.catalog().hot_tier_bytes(),
        Err(_) => return,
    };
    out.push_str("# HELP spg_hot_tier_bytes_used Encoded byte size of hot-tier rows (v5.2.1)\n");
    out.push_str("# TYPE spg_hot_tier_bytes_used gauge\n");
    out.push_str(&format!("spg_hot_tier_bytes_used {used}\n"));
    out.push_str(
        "# HELP spg_hot_tier_bytes_budget Hot-tier byte budget configured via SPG_HOT_TIER_BYTES\n",
    );
    out.push_str("# TYPE spg_hot_tier_bytes_budget gauge\n");
    out.push_str(&format!(
        "spg_hot_tier_bytes_budget {}\n",
        state.hot_tier_byte_budget
    ));
}

/// v4.36 follower-side replication lag. Emits two series:
///
/// - `spg_replication_lag_bytes` — `primary_pos − follower_applied_pos`
///   from the master's most recent status frame. Zero on a primary
///   or a v1-only follower; both series omitted when the follower
///   hasn't received any status frame yet (so Prometheus doesn't
///   reify a misleading 0 lag).
/// - `spg_replication_lag_seconds` — `now − master_wall_time_us`,
///   converted to floating seconds. Same omit-on-no-data rule.
fn render_replication_lag(state: &crate::ServerState, out: &mut String) {
    let primary_pos = state.lag_state.primary_pos.load(Ordering::Acquire);
    let primary_wall = state.lag_state.primary_wall_time_us.load(Ordering::Acquire);
    if primary_wall == 0 {
        // No status frame seen yet — primary or v1 follower. Skip.
        return;
    }
    let applied = state.lag_state.follower_applied_pos.load(Ordering::Acquire);
    let lag_bytes = primary_pos.saturating_sub(applied);
    let now_us = u64::try_from(
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .map_or(0, |d| d.as_micros()),
    )
    .unwrap_or(0);
    // wall-time deltas can go briefly negative under NTP slew —
    // saturate at 0 so the metric is monotonically meaningful.
    let lag_us = now_us.saturating_sub(primary_wall);
    // Microseconds-to-seconds cast is precision-safe for any lag
    // smaller than ~285 years — f64 mantissa covers it.
    #[allow(clippy::cast_precision_loss)]
    let lag_seconds = (lag_us as f64) / 1_000_000.0;
    out.push_str("# HELP spg_replication_lag_bytes WAL bytes follower is behind primary (v4.36 status frame)\n");
    out.push_str("# TYPE spg_replication_lag_bytes gauge\n");
    out.push_str(&format!("spg_replication_lag_bytes {lag_bytes}\n"));
    out.push_str(
        "# HELP spg_replication_lag_seconds Wall-clock seconds since primary's last status frame\n",
    );
    out.push_str("# TYPE spg_replication_lag_seconds gauge\n");
    out.push_str(&format!("spg_replication_lag_seconds {lag_seconds}\n"));
}

/// v4.35: per-table `spg_table_rows{table="..."}` +
/// `spg_table_bytes{table="..."}` series. Operators bound
/// cardinality via either:
///
/// - `SPG_METRICS_TABLE_ALLOWLIST=t1,t2,...` — exact list, in
///   exposition order; tables not listed (or not present) are
///   silently dropped.
/// - `SPG_METRICS_TABLE_TOPN=N` — without an allowlist, only the
///   N largest tables (by row count) are exported (default 50 —
///   the `DEFAULT_TABLE_METRIC_TOPN` constant).
///
/// Reads the engine catalog under `engine.read()`. Cost is one
/// pass over `Catalog::table_names()` + per-table `row_count()` +
/// schema width — none of which allocate row data.
fn render_table_metrics(state: &crate::ServerState, out: &mut String) {
    let Ok(engine) = state.engine.read() else {
        // Engine lock poisoned — leave the per-table series off so
        // /metrics still serves the rest of the page. Operators see
        // this via spg_errors_total.
        return;
    };
    let catalog = engine.catalog();
    let allowlist: Option<HashSet<String>> = std::env::var("SPG_METRICS_TABLE_ALLOWLIST")
        .ok()
        .filter(|s| !s.is_empty())
        .map(|s| {
            s.split(',')
                .map(|t| t.trim().to_string())
                .filter(|t| !t.is_empty())
                .collect()
        });
    let topn = std::env::var("SPG_METRICS_TABLE_TOPN")
        .ok()
        .and_then(|s| s.trim().parse::<usize>().ok())
        .filter(|&n| n > 0)
        .unwrap_or(DEFAULT_TABLE_METRIC_TOPN);

    let mut entries: Vec<(String, u64, u64)> = catalog
        .table_names()
        .into_iter()
        .filter_map(|name| {
            let table = catalog.get(&name)?;
            let rows = table.row_count() as u64;
            let bytes = rows.saturating_mul(approx_row_bytes(table.schema()));
            Some((name, rows, bytes))
        })
        .filter(|(name, _, _)| match &allowlist {
            Some(set) => set.contains(name),
            None => true,
        })
        .collect();

    if allowlist.is_none() {
        // Top-N by row count, tiebreak by name for stable output.
        entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
        entries.truncate(topn);
    } else {
        // Allowlist defines the user-meaningful order; keep
        // catalog order so the output is deterministic.
        entries.sort_by(|a, b| a.0.cmp(&b.0));
    }

    out.push_str("# HELP spg_table_rows Live row count per user table\n");
    out.push_str("# TYPE spg_table_rows gauge\n");
    for (name, rows, _) in &entries {
        out.push_str(&format!(
            "spg_table_rows{{table=\"{}\"}} {rows}\n",
            metric_label_escape(name)
        ));
    }
    out.push_str("# HELP spg_table_bytes Approximate on-disk byte size per user table (rows × schema width)\n");
    out.push_str("# TYPE spg_table_bytes gauge\n");
    for (name, _, bytes) in &entries {
        out.push_str(&format!(
            "spg_table_bytes{{table=\"{}\"}} {bytes}\n",
            metric_label_escape(name)
        ));
    }
}

/// v4.35: average row width estimate from the schema. Used for
/// `spg_table_bytes`. Variable-width types pick a defensible
/// upper-ish bound — operators who care about exact disk usage
/// inspect the snapshot file directly.
fn approx_row_bytes(schema: &TableSchema) -> u64 {
    schema
        .columns
        .iter()
        .map(|c| -> u64 {
            match c.ty {
                DataType::SmallInt => 2,
                DataType::Int => 4,
                DataType::BigInt
                | DataType::Date
                | DataType::Timestamp
                | DataType::Timestamptz
                | DataType::Float => 8,
                DataType::Bool => 1,
                DataType::Char(n) => u64::from(n),
                // Average a half-full VARCHAR; the exact value is
                // operator-knowable but not in the catalog.
                DataType::Varchar(n) => u64::from(n).max(1) / 2,
                DataType::Text | DataType::Json | DataType::Jsonb => 64,
                // v7.10.4 — same rough sizing as Text. Exact value
                // is operator-knowable; this is a snapshot heuristic.
                DataType::Bytes => 64,
                // v7.10.9 — TEXT[] sized like a small array of TEXT
                // cells; rough heuristic (~4 elements × 16 chars).
                DataType::TextArray => 64,
                DataType::Numeric { .. } | DataType::Interval => 16,
                // f32 per vector dimension.
                DataType::Vector { dim, .. } => u64::from(dim).saturating_mul(4),
            }
        })
        .sum()
}

/// Escape `\\` and `"` per Prometheus exposition format (control
/// characters not allowed in label values; we don't generate any).
fn metric_label_escape(s: &str) -> String {
    let mut out = String::with_capacity(s.len());
    for c in s.chars() {
        match c {
            '\\' => out.push_str("\\\\"),
            '"' => out.push_str("\\\""),
            '\n' => out.push_str("\\n"),
            c => out.push(c),
        }
    }
    out
}

fn write_response(
    stream: &mut TcpStream,
    code: u16,
    reason: &str,
    body: &str,
) -> std::io::Result<()> {
    let response = format!(
        "HTTP/1.1 {code} {reason}\r\nContent-Type: text/plain; charset=utf-8\r\n\
         Content-Length: {}\r\nConnection: close\r\n\r\n{body}",
        body.len()
    );
    stream.write_all(response.as_bytes())
}