Skip to main content

codetether_agent/telemetry/
rss_watchdog.rs

1//! RSS watchdog: samples memory periodically, logs above threshold, and
2//! writes a synthetic crash report when RSS crosses a critical level so
3//! the existing crash-report flusher ships it through the same pipeline
4//! on next startup.
5
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::time::Duration;
9
10use chrono::Utc;
11use serde_json::json;
12
13use super::memory::MemorySnapshot;
14
15/// Environment variables that tune the watchdog.
16pub const ENV_RSS_WARN_MIB: &str = "CODETETHER_RSS_WARN_MIB";
17pub const ENV_RSS_CRITICAL_MIB: &str = "CODETETHER_RSS_CRITICAL_MIB";
18pub const ENV_RSS_SAMPLE_SECS: &str = "CODETETHER_RSS_SAMPLE_SECS";
19
20/// Default warning threshold (MiB) — logs `warn!` once when RSS crosses it.
21const DEFAULT_WARN_MIB: u64 = 2048;
22/// Default critical threshold (MiB) — writes a breadcrumb once when crossed.
23const DEFAULT_CRITICAL_MIB: u64 = 8192;
24/// Default sampling interval (seconds).
25const DEFAULT_SAMPLE_SECS: u64 = 5;
26
27/// Spawn the watchdog in the background. Idempotent — calling it more
28/// than once in the same process is a no-op.
29///
30/// `spool_dir` is the crash-report spool directory. The synthetic report
31/// is written there using the same JSON shape as panic-derived reports so
32/// the existing flusher picks it up on next startup.
33pub fn spawn(spool_dir: PathBuf) {
34    static STARTED: AtomicBool = AtomicBool::new(false);
35    if STARTED.swap(true, Ordering::SeqCst) {
36        return;
37    }
38    let warn_mib = read_env_u64(ENV_RSS_WARN_MIB).unwrap_or(DEFAULT_WARN_MIB);
39    let critical_mib = read_env_u64(ENV_RSS_CRITICAL_MIB).unwrap_or(DEFAULT_CRITICAL_MIB);
40    let sample_secs = read_env_u64(ENV_RSS_SAMPLE_SECS).unwrap_or(DEFAULT_SAMPLE_SECS);
41
42    tokio::spawn(async move {
43        run_loop(spool_dir, warn_mib, critical_mib, sample_secs).await;
44    });
45}
46
47async fn run_loop(spool_dir: PathBuf, warn_mib: u64, critical_mib: u64, sample_secs: u64) {
48    let mut warned = false;
49    let mut critical_written = false;
50    let interval = Duration::from_secs(sample_secs.max(1));
51    loop {
52        tokio::time::sleep(interval).await;
53        let snap = MemorySnapshot::capture();
54        let Some(rss_mib) = snap.rss_mib() else {
55            continue;
56        };
57
58        if !warned && rss_mib >= warn_mib {
59            warned = true;
60            tracing::warn!(
61                rss_mib,
62                threads = snap.threads.unwrap_or(0),
63                "RSS crossed warning threshold"
64            );
65        }
66
67        if !critical_written && rss_mib >= critical_mib {
68            critical_written = true;
69            tracing::error!(
70                rss_mib,
71                threads = snap.threads.unwrap_or(0),
72                "RSS crossed critical threshold; writing pre-OOM crash report"
73            );
74            let message = format!(
75                "pre_oom: RSS {} MiB crossed critical threshold {} MiB (threads={})",
76                rss_mib,
77                critical_mib,
78                snap.threads.unwrap_or(0)
79            );
80            if let Err(err) = write_pre_oom_report(&spool_dir, &message, &snap) {
81                tracing::warn!(error = %err, "Failed to write pre-OOM crash report");
82            }
83        }
84    }
85}
86
87/// Write a crash-report-shaped JSON document into `spool_dir` so the
88/// existing crash-report flusher ships it on next startup. The shape
89/// matches `crash::CrashReport` so no custom server handling is needed.
90fn write_pre_oom_report(
91    spool_dir: &Path,
92    message: &str,
93    memory: &MemorySnapshot,
94) -> std::io::Result<()> {
95    std::fs::create_dir_all(spool_dir)?;
96    let now = Utc::now();
97    let report_id = uuid::Uuid::new_v4().to_string();
98    let thread_name = std::thread::current()
99        .name()
100        .map(str::to_string)
101        .unwrap_or_else(|| "unnamed".to_string());
102    let command_line = std::env::args().collect::<Vec<_>>().join(" ");
103    let report = json!({
104        "report_version": 1,
105        "report_id": report_id,
106        "occurred_at": now.to_rfc3339(),
107        "app_version": env!("CARGO_PKG_VERSION"),
108        "command_line": command_line,
109        "os": std::env::consts::OS,
110        "arch": std::env::consts::ARCH,
111        "process_id": std::process::id(),
112        "thread_name": thread_name,
113        "panic_message": message,
114        "panic_location": null,
115        "backtrace": "",
116        "memory": memory,
117    });
118    let file_name = format!(
119        "{}-{}.json",
120        now.format("%Y%m%dT%H%M%S%.3fZ"),
121        report_id
122    );
123    let path = spool_dir.join(file_name);
124    let bytes = serde_json::to_vec_pretty(&report)
125        .map_err(|e| std::io::Error::other(e.to_string()))?;
126    std::fs::write(path, bytes)
127}
128
129fn read_env_u64(var: &str) -> Option<u64> {
130    std::env::var(var).ok().and_then(|v| v.parse().ok())
131}