Skip to main content

modde_sources/wabbajack/
diagnostics.rs

1//! Runtime diagnostics for Wabbajack installs: tracks install phase, active
2//! archive batches and patches, and process/cgroup memory, emitting periodic
3//! JSONL heartbeats and aborting on a memory-saturated stall. See
4//! [`WabbajackDiagnostics`].
5
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use anyhow::{Context, Result, bail};
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize};
14use tokio::io::AsyncWriteExt;
15use tracing::{info, warn};
16
17#[derive(Debug, Clone)]
18pub struct WabbajackDiagnosticsOptions {
19    pub dir: PathBuf,
20    pub interval: Duration,
21    pub stall_warn: Duration,
22    pub stall_abort: Duration,
23}
24
25#[derive(Debug, Clone)]
26pub struct WabbajackDiagnostics {
27    inner: Arc<DiagnosticsInner>,
28}
29
30#[derive(Debug)]
31struct DiagnosticsInner {
32    options: WabbajackDiagnosticsOptions,
33    start: Instant,
34    abort: AtomicBool,
35    state: Mutex<DiagnosticsState>,
36}
37
38#[derive(Debug)]
39struct DiagnosticsState {
40    phase: String,
41    last_progress: Instant,
42    progress_events: u64,
43    completed_archive_batches: u64,
44    completed_create_bsa: u64,
45    active_archive_batches: Vec<ActiveArchiveBatchState>,
46    active_patches: Vec<ActivePatchState>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct ActiveArchiveBatch {
51    archive_hash: String,
52    directive_count: usize,
53    patch_count: usize,
54    archive_size_bytes: u64,
55    started_millis_ago: u128,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ActivePatch {
60    to: String,
61    patch_id: String,
62    source_bytes: u64,
63    expected_output_bytes: u64,
64    started_millis_ago: u128,
65}
66
67#[derive(Debug, Clone)]
68struct ActiveArchiveBatchState {
69    archive_hash: String,
70    directive_count: usize,
71    patch_count: usize,
72    archive_size_bytes: u64,
73    started: Instant,
74}
75
76#[derive(Debug, Clone)]
77struct ActivePatchState {
78    to: String,
79    patch_id: String,
80    source_bytes: u64,
81    expected_output_bytes: u64,
82    started: Instant,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub struct HeartbeatRecord {
87    pub kind: String,
88    pub unix_ms: u128,
89    pub uptime_ms: u128,
90    pub phase: String,
91    pub progress_events: u64,
92    pub completed_archive_batches: u64,
93    pub completed_create_bsa: u64,
94    pub idle_ms: u128,
95    pub active_archive_batches: Vec<ActiveArchiveBatch>,
96    #[serde(default)]
97    pub active_patches: Vec<ActivePatch>,
98    pub process: ProcessSnapshot,
99    pub cgroup: Option<CgroupSnapshot>,
100    pub byte_cache_used: u64,
101    pub abort_requested: bool,
102}
103
104#[derive(Debug, Default, Clone, Serialize, Deserialize)]
105pub struct ProcessSnapshot {
106    pub vm_rss_kib: Option<u64>,
107    pub vm_swap_kib: Option<u64>,
108    pub threads: Option<u64>,
109    pub rchar: Option<u64>,
110    pub wchar: Option<u64>,
111    pub read_bytes: Option<u64>,
112    pub write_bytes: Option<u64>,
113}
114
115#[derive(Debug, Default, Clone, Serialize, Deserialize)]
116pub struct CgroupSnapshot {
117    pub path: String,
118    pub memory_current: Option<u64>,
119    pub memory_high: Option<u64>,
120    pub memory_max: Option<u64>,
121    pub memory_swap_current: Option<u64>,
122    pub memory_swap_max: Option<u64>,
123    pub memory_events_high: Option<u64>,
124    pub memory_events_oom: Option<u64>,
125    pub memory_events_oom_kill: Option<u64>,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct ArchiveBatchRecord {
130    pub kind: String,
131    pub unix_ms: u128,
132    pub archive_hash: String,
133    pub archive_size_bytes: u64,
134    pub directive_count: usize,
135    pub patch_count: usize,
136    pub elapsed_ms: u128,
137    #[serde(default)]
138    pub trust_check_ms: u128,
139    pub extraction_ms: u128,
140    pub patch_ms: u128,
141    #[serde(default)]
142    pub prune_ms: u128,
143    pub extracted_patch_source_bytes: u64,
144    #[serde(default)]
145    pub sidecar_hit: bool,
146    #[serde(default)]
147    pub streamed_hash_bytes: u64,
148    #[serde(default)]
149    pub memory_archive_hit: bool,
150    #[serde(default)]
151    pub disk_archive_fallback: bool,
152    #[serde(default)]
153    pub pruned_bytes: u64,
154    pub byte_cache_used_before: u64,
155    pub byte_cache_used_after: u64,
156    pub success_count: usize,
157    pub error_count: usize,
158    #[serde(default)]
159    pub first_error: Option<String>,
160    pub rss_before_kib: Option<u64>,
161    pub rss_after_kib: Option<u64>,
162    pub swap_before_kib: Option<u64>,
163    pub swap_after_kib: Option<u64>,
164}
165
166impl WabbajackDiagnostics {
167    pub async fn new(options: WabbajackDiagnosticsOptions) -> Result<Self> {
168        tokio::fs::create_dir_all(&options.dir)
169            .await
170            .with_context(|| {
171                format!(
172                    "failed to create diagnostics directory: {}",
173                    options.dir.display()
174                )
175            })?;
176        Ok(Self {
177            inner: Arc::new(DiagnosticsInner {
178                options,
179                start: Instant::now(),
180                abort: AtomicBool::new(false),
181                state: Mutex::new(DiagnosticsState {
182                    phase: "starting".to_string(),
183                    last_progress: Instant::now(),
184                    progress_events: 0,
185                    completed_archive_batches: 0,
186                    completed_create_bsa: 0,
187                    active_archive_batches: Vec::new(),
188                    active_patches: Vec::new(),
189                }),
190            }),
191        })
192    }
193
194    pub fn spawn_heartbeat(
195        &self,
196        byte_cache_used: Arc<dyn Fn() -> u64 + Send + Sync>,
197    ) -> tokio::task::JoinHandle<()> {
198        let diagnostics = self.clone();
199        tokio::spawn(async move {
200            let mut interval = tokio::time::interval(diagnostics.inner.options.interval);
201            loop {
202                interval.tick().await;
203                if let Err(e) = diagnostics.write_heartbeat(byte_cache_used()).await {
204                    warn!("failed to write Wabbajack diagnostics heartbeat: {e:#}");
205                }
206            }
207        })
208    }
209
210    pub fn set_phase(&self, phase: impl Into<String>) {
211        self.inner.state.lock().phase = phase.into();
212    }
213
214    pub fn record_progress(&self, event: ProgressEvent) {
215        let mut state = self.inner.state.lock();
216        state.last_progress = Instant::now();
217        state.progress_events += 1;
218        match event {
219            ProgressEvent::ArchiveBatchComplete => state.completed_archive_batches += 1,
220            ProgressEvent::CreateBsaComplete => state.completed_create_bsa += 1,
221            ProgressEvent::Other => {}
222        }
223    }
224
225    pub fn start_archive_batch(
226        &self,
227        archive_hash: u64,
228        directive_count: usize,
229        patch_count: usize,
230        archive_size_bytes: u64,
231    ) -> ArchiveBatchGuard {
232        let started = Instant::now();
233        let active = ActiveArchiveBatchState {
234            archive_hash: format!("{archive_hash:016x}"),
235            directive_count,
236            patch_count,
237            archive_size_bytes,
238            started,
239        };
240        self.inner.state.lock().active_archive_batches.push(active);
241        ArchiveBatchGuard {
242            diagnostics: self.clone(),
243            archive_hash,
244            started,
245        }
246    }
247
248    pub fn start_patch(
249        &self,
250        to: impl Into<String>,
251        patch_id: impl Into<String>,
252        source_bytes: u64,
253        expected_output_bytes: u64,
254    ) -> PatchGuard {
255        let started = Instant::now();
256        let active = ActivePatchState {
257            to: to.into(),
258            patch_id: patch_id.into(),
259            source_bytes,
260            expected_output_bytes,
261            started,
262        };
263        let patch_id = active.patch_id.clone();
264        self.inner.state.lock().active_patches.push(active);
265        PatchGuard {
266            diagnostics: self.clone(),
267            patch_id,
268            started,
269        }
270    }
271
272    pub fn check_abort(&self) -> Result<()> {
273        if self.inner.abort.load(Ordering::Relaxed) {
274            bail!("Wabbajack install aborted by diagnostics stall detector");
275        }
276        Ok(())
277    }
278
279    pub async fn record_archive_batch(&self, record: &ArchiveBatchRecord) -> Result<()> {
280        let mut file = tokio::fs::OpenOptions::new()
281            .create(true)
282            .append(true)
283            .open(self.inner.options.dir.join("archive-batches.jsonl"))
284            .await?;
285        file.write_all(&serde_json::to_vec(record)?).await?;
286        file.write_all(b"\n").await?;
287        Ok(())
288    }
289
290    async fn write_heartbeat(&self, byte_cache_used: u64) -> Result<()> {
291        let snapshot = self.snapshot(byte_cache_used);
292        let idle = Duration::from_millis(snapshot.idle_ms as u64);
293        if idle >= self.inner.options.stall_warn {
294            warn!(
295                phase = %snapshot.phase,
296                idle_seconds = idle.as_secs(),
297                active_batches = snapshot.active_archive_batches.len(),
298                "Wabbajack install has not completed a batch or sentinel recently"
299            );
300        }
301        if idle >= self.inner.options.stall_abort && memory_is_saturated(snapshot.cgroup.as_ref()) {
302            self.inner.abort.store(true, Ordering::Relaxed);
303            warn!(
304                phase = %snapshot.phase,
305                idle_seconds = idle.as_secs(),
306                "Wabbajack diagnostics requested abort: cgroup memory is saturated and apply progress stopped"
307            );
308        }
309
310        let mut file = tokio::fs::OpenOptions::new()
311            .create(true)
312            .append(true)
313            .open(self.inner.options.dir.join("heartbeat.jsonl"))
314            .await?;
315        file.write_all(&serde_json::to_vec(&snapshot)?).await?;
316        file.write_all(b"\n").await?;
317        Ok(())
318    }
319
320    fn snapshot(&self, byte_cache_used: u64) -> HeartbeatRecord {
321        let now = Instant::now();
322        let state = self.inner.state.lock();
323        let active_archive_batches = state
324            .active_archive_batches
325            .iter()
326            .map(|active| ActiveArchiveBatch {
327                archive_hash: active.archive_hash.clone(),
328                directive_count: active.directive_count,
329                patch_count: active.patch_count,
330                archive_size_bytes: active.archive_size_bytes,
331                started_millis_ago: now.duration_since(active.started).as_millis(),
332            })
333            .collect();
334        let active_patches = state
335            .active_patches
336            .iter()
337            .map(|active| ActivePatch {
338                to: active.to.clone(),
339                patch_id: active.patch_id.clone(),
340                source_bytes: active.source_bytes,
341                expected_output_bytes: active.expected_output_bytes,
342                started_millis_ago: now.duration_since(active.started).as_millis(),
343            })
344            .collect();
345        HeartbeatRecord {
346            kind: "heartbeat".to_string(),
347            unix_ms: unix_ms(),
348            uptime_ms: now.duration_since(self.inner.start).as_millis(),
349            phase: state.phase.clone(),
350            progress_events: state.progress_events,
351            completed_archive_batches: state.completed_archive_batches,
352            completed_create_bsa: state.completed_create_bsa,
353            idle_ms: now.duration_since(state.last_progress).as_millis(),
354            active_archive_batches,
355            active_patches,
356            process: process_snapshot(),
357            cgroup: cgroup_snapshot(),
358            byte_cache_used,
359            abort_requested: self.inner.abort.load(Ordering::Relaxed),
360        }
361    }
362}
363
364pub enum ProgressEvent {
365    ArchiveBatchComplete,
366    CreateBsaComplete,
367    Other,
368}
369
370pub struct ArchiveBatchGuard {
371    diagnostics: WabbajackDiagnostics,
372    archive_hash: u64,
373    started: Instant,
374}
375
376pub struct PatchGuard {
377    diagnostics: WabbajackDiagnostics,
378    patch_id: String,
379    started: Instant,
380}
381
382impl Drop for ArchiveBatchGuard {
383    fn drop(&mut self) {
384        let mut state = self.diagnostics.inner.state.lock();
385        state
386            .active_archive_batches
387            .retain(|batch| batch.archive_hash != format!("{:016x}", self.archive_hash));
388        info!(
389            archive_hash = %format!("{:016x}", self.archive_hash),
390            elapsed_ms = self.started.elapsed().as_millis(),
391            "archive batch finished"
392        );
393    }
394}
395
396impl Drop for PatchGuard {
397    fn drop(&mut self) {
398        let mut state = self.diagnostics.inner.state.lock();
399        state
400            .active_patches
401            .retain(|patch| patch.patch_id != self.patch_id);
402        info!(
403            patch_id = %self.patch_id,
404            elapsed_ms = self.started.elapsed().as_millis(),
405            "patch finished"
406        );
407    }
408}
409
410fn unix_ms() -> u128 {
411    SystemTime::now()
412        .duration_since(UNIX_EPOCH)
413        .unwrap_or_default()
414        .as_millis()
415}
416
417fn process_snapshot() -> ProcessSnapshot {
418    let mut snapshot = ProcessSnapshot::default();
419    if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
420        for line in status.lines() {
421            if let Some(value) = line.strip_prefix("VmRSS:") {
422                snapshot.vm_rss_kib = first_number(value);
423            } else if let Some(value) = line.strip_prefix("VmSwap:") {
424                snapshot.vm_swap_kib = first_number(value);
425            } else if let Some(value) = line.strip_prefix("Threads:") {
426                snapshot.threads = first_number(value);
427            }
428        }
429    }
430    if let Ok(io) = std::fs::read_to_string("/proc/self/io") {
431        for line in io.lines() {
432            let Some((key, value)) = line.split_once(':') else {
433                continue;
434            };
435            let parsed = value.trim().parse::<u64>().ok();
436            match key {
437                "rchar" => snapshot.rchar = parsed,
438                "wchar" => snapshot.wchar = parsed,
439                "read_bytes" => snapshot.read_bytes = parsed,
440                "write_bytes" => snapshot.write_bytes = parsed,
441                _ => {}
442            }
443        }
444    }
445    snapshot
446}
447
448pub fn current_process_snapshot() -> ProcessSnapshot {
449    process_snapshot()
450}
451
452fn cgroup_snapshot() -> Option<CgroupSnapshot> {
453    let path = current_cgroup_v2_path()?;
454    Some(CgroupSnapshot {
455        path: path.display().to_string(),
456        memory_current: read_cgroup_u64(&path, "memory.current"),
457        memory_high: read_cgroup_limit(&path, "memory.high"),
458        memory_max: read_cgroup_limit(&path, "memory.max"),
459        memory_swap_current: read_cgroup_u64(&path, "memory.swap.current"),
460        memory_swap_max: read_cgroup_limit(&path, "memory.swap.max"),
461        memory_events_high: read_memory_event(&path, "high"),
462        memory_events_oom: read_memory_event(&path, "oom"),
463        memory_events_oom_kill: read_memory_event(&path, "oom_kill"),
464    })
465}
466
467fn current_cgroup_v2_path() -> Option<PathBuf> {
468    let cgroup = std::fs::read_to_string("/proc/self/cgroup").ok()?;
469    for line in cgroup.lines() {
470        let mut fields = line.splitn(3, ':');
471        let _hierarchy = fields.next();
472        let controllers = fields.next()?;
473        let path = fields.next()?;
474        if controllers.is_empty() {
475            return Some(Path::new("/sys/fs/cgroup").join(path.trim_start_matches('/')));
476        }
477    }
478    None
479}
480
481fn read_cgroup_u64(path: &Path, file: &str) -> Option<u64> {
482    std::fs::read_to_string(path.join(file))
483        .ok()
484        .and_then(|value| value.trim().parse::<u64>().ok())
485}
486
487fn read_cgroup_limit(path: &Path, file: &str) -> Option<u64> {
488    let value = std::fs::read_to_string(path.join(file)).ok()?;
489    let value = value.trim();
490    if value == "max" {
491        None
492    } else {
493        value.parse::<u64>().ok()
494    }
495}
496
497fn read_memory_event(path: &Path, key: &str) -> Option<u64> {
498    let events = std::fs::read_to_string(path.join("memory.events")).ok()?;
499    for line in events.lines() {
500        let (event, value) = line.split_once(' ')?;
501        if event == key {
502            return value.parse::<u64>().ok();
503        }
504    }
505    None
506}
507
508fn first_number(value: &str) -> Option<u64> {
509    value.split_whitespace().next()?.parse::<u64>().ok()
510}
511
512pub fn memory_is_saturated(cgroup: Option<&CgroupSnapshot>) -> bool {
513    let Some(cgroup) = cgroup else {
514        return false;
515    };
516    let high_saturated = match (cgroup.memory_current, cgroup.memory_high) {
517        (Some(current), Some(high)) => current >= high,
518        _ => false,
519    };
520    let swap_saturated = match (cgroup.memory_swap_current, cgroup.memory_swap_max) {
521        (Some(current), Some(max)) if max > 0 => current.saturating_mul(100) >= max * 95,
522        _ => false,
523    };
524    high_saturated && swap_saturated
525}
526
527pub fn cgroup_memory_pressure_high(threshold: f64) -> bool {
528    let Some(snapshot) = cgroup_snapshot() else {
529        return false;
530    };
531    let Some(current) = snapshot.memory_current else {
532        return false;
533    };
534    let limit = snapshot.memory_high.or(snapshot.memory_max);
535    let Some(limit) = limit.filter(|limit| *limit > 0) else {
536        return false;
537    };
538    (current as f64 / limit as f64) >= threshold
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544
545    #[test]
546    fn memory_saturation_requires_high_and_swap_pressure() {
547        let saturated = CgroupSnapshot {
548            memory_current: Some(100),
549            memory_high: Some(100),
550            memory_swap_current: Some(95),
551            memory_swap_max: Some(100),
552            ..CgroupSnapshot::default()
553        };
554        assert!(memory_is_saturated(Some(&saturated)));
555
556        let no_swap = CgroupSnapshot {
557            memory_current: Some(100),
558            memory_high: Some(100),
559            memory_swap_current: Some(10),
560            memory_swap_max: Some(100),
561            ..CgroupSnapshot::default()
562        };
563        assert!(!memory_is_saturated(Some(&no_swap)));
564
565        let no_high = CgroupSnapshot {
566            memory_current: Some(99),
567            memory_high: Some(100),
568            memory_swap_current: Some(95),
569            memory_swap_max: Some(100),
570            ..CgroupSnapshot::default()
571        };
572        assert!(!memory_is_saturated(Some(&no_high)));
573    }
574
575    #[test]
576    fn cgroup_pressure_without_cgroup_is_false_or_bounded() {
577        let _ = cgroup_memory_pressure_high(0.80);
578    }
579
580    #[tokio::test]
581    async fn heartbeat_writes_json_line() {
582        let temp = tempfile::tempdir().unwrap();
583        let diagnostics = WabbajackDiagnostics::new(WabbajackDiagnosticsOptions {
584            dir: temp.path().to_path_buf(),
585            interval: Duration::from_secs(1),
586            stall_warn: Duration::from_mins(1),
587            stall_abort: Duration::from_mins(2),
588        })
589        .await
590        .unwrap();
591
592        diagnostics.set_phase("apply-archive-batches");
593        diagnostics.record_progress(ProgressEvent::Other);
594        diagnostics.write_heartbeat(1234).await.unwrap();
595
596        let heartbeat = std::fs::read_to_string(temp.path().join("heartbeat.jsonl")).unwrap();
597        let record: serde_json::Value = serde_json::from_str(heartbeat.trim()).unwrap();
598        assert_eq!(record["kind"], "heartbeat");
599        assert_eq!(record["phase"], "apply-archive-batches");
600        assert_eq!(record["byte_cache_used"], 1234);
601    }
602
603    #[tokio::test]
604    async fn archive_batch_record_writes_json_line() {
605        let temp = tempfile::tempdir().unwrap();
606        let diagnostics = WabbajackDiagnostics::new(WabbajackDiagnosticsOptions {
607            dir: temp.path().to_path_buf(),
608            interval: Duration::from_secs(1),
609            stall_warn: Duration::from_mins(1),
610            stall_abort: Duration::from_mins(2),
611        })
612        .await
613        .unwrap();
614        diagnostics
615            .record_archive_batch(&ArchiveBatchRecord {
616                kind: "archive_batch".to_string(),
617                unix_ms: 1,
618                archive_hash: "0000000000000001".to_string(),
619                archive_size_bytes: 2,
620                directive_count: 3,
621                patch_count: 1,
622                elapsed_ms: 4,
623                trust_check_ms: 0,
624                extraction_ms: 5,
625                patch_ms: 6,
626                prune_ms: 0,
627                extracted_patch_source_bytes: 7,
628                sidecar_hit: false,
629                streamed_hash_bytes: 0,
630                memory_archive_hit: false,
631                disk_archive_fallback: false,
632                pruned_bytes: 0,
633                byte_cache_used_before: 8,
634                byte_cache_used_after: 9,
635                success_count: 10,
636                error_count: 0,
637                first_error: None,
638                rss_before_kib: Some(11),
639                rss_after_kib: Some(12),
640                swap_before_kib: Some(13),
641                swap_after_kib: Some(14),
642            })
643            .await
644            .unwrap();
645        let records = std::fs::read_to_string(temp.path().join("archive-batches.jsonl")).unwrap();
646        let record: ArchiveBatchRecord = serde_json::from_str(records.trim()).unwrap();
647        assert_eq!(record.archive_hash, "0000000000000001");
648        assert_eq!(record.extracted_patch_source_bytes, 7);
649    }
650}