1use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use anyhow::{Context, Result, bail};
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize};
14use tokio::io::AsyncWriteExt;
15use tracing::{info, warn};
16
17#[derive(Debug, Clone)]
18pub struct WabbajackDiagnosticsOptions {
19 pub dir: PathBuf,
20 pub interval: Duration,
21 pub stall_warn: Duration,
22 pub stall_abort: Duration,
23}
24
25#[derive(Debug, Clone)]
26pub struct WabbajackDiagnostics {
27 inner: Arc<DiagnosticsInner>,
28}
29
30#[derive(Debug)]
31struct DiagnosticsInner {
32 options: WabbajackDiagnosticsOptions,
33 start: Instant,
34 abort: AtomicBool,
35 state: Mutex<DiagnosticsState>,
36}
37
38#[derive(Debug)]
39struct DiagnosticsState {
40 phase: String,
41 last_progress: Instant,
42 progress_events: u64,
43 completed_archive_batches: u64,
44 completed_create_bsa: u64,
45 active_archive_batches: Vec<ActiveArchiveBatchState>,
46 active_patches: Vec<ActivePatchState>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct ActiveArchiveBatch {
51 archive_hash: String,
52 directive_count: usize,
53 patch_count: usize,
54 archive_size_bytes: u64,
55 started_millis_ago: u128,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ActivePatch {
60 to: String,
61 patch_id: String,
62 source_bytes: u64,
63 expected_output_bytes: u64,
64 started_millis_ago: u128,
65}
66
67#[derive(Debug, Clone)]
68struct ActiveArchiveBatchState {
69 archive_hash: String,
70 directive_count: usize,
71 patch_count: usize,
72 archive_size_bytes: u64,
73 started: Instant,
74}
75
76#[derive(Debug, Clone)]
77struct ActivePatchState {
78 to: String,
79 patch_id: String,
80 source_bytes: u64,
81 expected_output_bytes: u64,
82 started: Instant,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86pub struct HeartbeatRecord {
87 pub kind: String,
88 pub unix_ms: u128,
89 pub uptime_ms: u128,
90 pub phase: String,
91 pub progress_events: u64,
92 pub completed_archive_batches: u64,
93 pub completed_create_bsa: u64,
94 pub idle_ms: u128,
95 pub active_archive_batches: Vec<ActiveArchiveBatch>,
96 #[serde(default)]
97 pub active_patches: Vec<ActivePatch>,
98 pub process: ProcessSnapshot,
99 pub cgroup: Option<CgroupSnapshot>,
100 pub byte_cache_used: u64,
101 pub abort_requested: bool,
102}
103
104#[derive(Debug, Default, Clone, Serialize, Deserialize)]
105pub struct ProcessSnapshot {
106 pub vm_rss_kib: Option<u64>,
107 pub vm_swap_kib: Option<u64>,
108 pub threads: Option<u64>,
109 pub rchar: Option<u64>,
110 pub wchar: Option<u64>,
111 pub read_bytes: Option<u64>,
112 pub write_bytes: Option<u64>,
113}
114
115#[derive(Debug, Default, Clone, Serialize, Deserialize)]
116pub struct CgroupSnapshot {
117 pub path: String,
118 pub memory_current: Option<u64>,
119 pub memory_high: Option<u64>,
120 pub memory_max: Option<u64>,
121 pub memory_swap_current: Option<u64>,
122 pub memory_swap_max: Option<u64>,
123 pub memory_events_high: Option<u64>,
124 pub memory_events_oom: Option<u64>,
125 pub memory_events_oom_kill: Option<u64>,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct ArchiveBatchRecord {
130 pub kind: String,
131 pub unix_ms: u128,
132 pub archive_hash: String,
133 pub archive_size_bytes: u64,
134 pub directive_count: usize,
135 pub patch_count: usize,
136 pub elapsed_ms: u128,
137 #[serde(default)]
138 pub trust_check_ms: u128,
139 pub extraction_ms: u128,
140 pub patch_ms: u128,
141 #[serde(default)]
142 pub prune_ms: u128,
143 pub extracted_patch_source_bytes: u64,
144 #[serde(default)]
145 pub sidecar_hit: bool,
146 #[serde(default)]
147 pub streamed_hash_bytes: u64,
148 #[serde(default)]
149 pub memory_archive_hit: bool,
150 #[serde(default)]
151 pub disk_archive_fallback: bool,
152 #[serde(default)]
153 pub pruned_bytes: u64,
154 pub byte_cache_used_before: u64,
155 pub byte_cache_used_after: u64,
156 pub success_count: usize,
157 pub error_count: usize,
158 #[serde(default)]
159 pub first_error: Option<String>,
160 pub rss_before_kib: Option<u64>,
161 pub rss_after_kib: Option<u64>,
162 pub swap_before_kib: Option<u64>,
163 pub swap_after_kib: Option<u64>,
164}
165
166impl WabbajackDiagnostics {
167 pub async fn new(options: WabbajackDiagnosticsOptions) -> Result<Self> {
168 tokio::fs::create_dir_all(&options.dir)
169 .await
170 .with_context(|| {
171 format!(
172 "failed to create diagnostics directory: {}",
173 options.dir.display()
174 )
175 })?;
176 Ok(Self {
177 inner: Arc::new(DiagnosticsInner {
178 options,
179 start: Instant::now(),
180 abort: AtomicBool::new(false),
181 state: Mutex::new(DiagnosticsState {
182 phase: "starting".to_string(),
183 last_progress: Instant::now(),
184 progress_events: 0,
185 completed_archive_batches: 0,
186 completed_create_bsa: 0,
187 active_archive_batches: Vec::new(),
188 active_patches: Vec::new(),
189 }),
190 }),
191 })
192 }
193
194 pub fn spawn_heartbeat(
195 &self,
196 byte_cache_used: Arc<dyn Fn() -> u64 + Send + Sync>,
197 ) -> tokio::task::JoinHandle<()> {
198 let diagnostics = self.clone();
199 tokio::spawn(async move {
200 let mut interval = tokio::time::interval(diagnostics.inner.options.interval);
201 loop {
202 interval.tick().await;
203 if let Err(e) = diagnostics.write_heartbeat(byte_cache_used()).await {
204 warn!("failed to write Wabbajack diagnostics heartbeat: {e:#}");
205 }
206 }
207 })
208 }
209
210 pub fn set_phase(&self, phase: impl Into<String>) {
211 self.inner.state.lock().phase = phase.into();
212 }
213
214 pub fn record_progress(&self, event: ProgressEvent) {
215 let mut state = self.inner.state.lock();
216 state.last_progress = Instant::now();
217 state.progress_events += 1;
218 match event {
219 ProgressEvent::ArchiveBatchComplete => state.completed_archive_batches += 1,
220 ProgressEvent::CreateBsaComplete => state.completed_create_bsa += 1,
221 ProgressEvent::Other => {}
222 }
223 }
224
225 pub fn start_archive_batch(
226 &self,
227 archive_hash: u64,
228 directive_count: usize,
229 patch_count: usize,
230 archive_size_bytes: u64,
231 ) -> ArchiveBatchGuard {
232 let started = Instant::now();
233 let active = ActiveArchiveBatchState {
234 archive_hash: format!("{archive_hash:016x}"),
235 directive_count,
236 patch_count,
237 archive_size_bytes,
238 started,
239 };
240 self.inner.state.lock().active_archive_batches.push(active);
241 ArchiveBatchGuard {
242 diagnostics: self.clone(),
243 archive_hash,
244 started,
245 }
246 }
247
248 pub fn start_patch(
249 &self,
250 to: impl Into<String>,
251 patch_id: impl Into<String>,
252 source_bytes: u64,
253 expected_output_bytes: u64,
254 ) -> PatchGuard {
255 let started = Instant::now();
256 let active = ActivePatchState {
257 to: to.into(),
258 patch_id: patch_id.into(),
259 source_bytes,
260 expected_output_bytes,
261 started,
262 };
263 let patch_id = active.patch_id.clone();
264 self.inner.state.lock().active_patches.push(active);
265 PatchGuard {
266 diagnostics: self.clone(),
267 patch_id,
268 started,
269 }
270 }
271
272 pub fn check_abort(&self) -> Result<()> {
273 if self.inner.abort.load(Ordering::Relaxed) {
274 bail!("Wabbajack install aborted by diagnostics stall detector");
275 }
276 Ok(())
277 }
278
279 pub async fn record_archive_batch(&self, record: &ArchiveBatchRecord) -> Result<()> {
280 let mut file = tokio::fs::OpenOptions::new()
281 .create(true)
282 .append(true)
283 .open(self.inner.options.dir.join("archive-batches.jsonl"))
284 .await?;
285 file.write_all(&serde_json::to_vec(record)?).await?;
286 file.write_all(b"\n").await?;
287 Ok(())
288 }
289
290 async fn write_heartbeat(&self, byte_cache_used: u64) -> Result<()> {
291 let snapshot = self.snapshot(byte_cache_used);
292 let idle = Duration::from_millis(snapshot.idle_ms as u64);
293 if idle >= self.inner.options.stall_warn {
294 warn!(
295 phase = %snapshot.phase,
296 idle_seconds = idle.as_secs(),
297 active_batches = snapshot.active_archive_batches.len(),
298 "Wabbajack install has not completed a batch or sentinel recently"
299 );
300 }
301 if idle >= self.inner.options.stall_abort && memory_is_saturated(snapshot.cgroup.as_ref()) {
302 self.inner.abort.store(true, Ordering::Relaxed);
303 warn!(
304 phase = %snapshot.phase,
305 idle_seconds = idle.as_secs(),
306 "Wabbajack diagnostics requested abort: cgroup memory is saturated and apply progress stopped"
307 );
308 }
309
310 let mut file = tokio::fs::OpenOptions::new()
311 .create(true)
312 .append(true)
313 .open(self.inner.options.dir.join("heartbeat.jsonl"))
314 .await?;
315 file.write_all(&serde_json::to_vec(&snapshot)?).await?;
316 file.write_all(b"\n").await?;
317 Ok(())
318 }
319
320 fn snapshot(&self, byte_cache_used: u64) -> HeartbeatRecord {
321 let now = Instant::now();
322 let state = self.inner.state.lock();
323 let active_archive_batches = state
324 .active_archive_batches
325 .iter()
326 .map(|active| ActiveArchiveBatch {
327 archive_hash: active.archive_hash.clone(),
328 directive_count: active.directive_count,
329 patch_count: active.patch_count,
330 archive_size_bytes: active.archive_size_bytes,
331 started_millis_ago: now.duration_since(active.started).as_millis(),
332 })
333 .collect();
334 let active_patches = state
335 .active_patches
336 .iter()
337 .map(|active| ActivePatch {
338 to: active.to.clone(),
339 patch_id: active.patch_id.clone(),
340 source_bytes: active.source_bytes,
341 expected_output_bytes: active.expected_output_bytes,
342 started_millis_ago: now.duration_since(active.started).as_millis(),
343 })
344 .collect();
345 HeartbeatRecord {
346 kind: "heartbeat".to_string(),
347 unix_ms: unix_ms(),
348 uptime_ms: now.duration_since(self.inner.start).as_millis(),
349 phase: state.phase.clone(),
350 progress_events: state.progress_events,
351 completed_archive_batches: state.completed_archive_batches,
352 completed_create_bsa: state.completed_create_bsa,
353 idle_ms: now.duration_since(state.last_progress).as_millis(),
354 active_archive_batches,
355 active_patches,
356 process: process_snapshot(),
357 cgroup: cgroup_snapshot(),
358 byte_cache_used,
359 abort_requested: self.inner.abort.load(Ordering::Relaxed),
360 }
361 }
362}
363
364pub enum ProgressEvent {
365 ArchiveBatchComplete,
366 CreateBsaComplete,
367 Other,
368}
369
370pub struct ArchiveBatchGuard {
371 diagnostics: WabbajackDiagnostics,
372 archive_hash: u64,
373 started: Instant,
374}
375
376pub struct PatchGuard {
377 diagnostics: WabbajackDiagnostics,
378 patch_id: String,
379 started: Instant,
380}
381
382impl Drop for ArchiveBatchGuard {
383 fn drop(&mut self) {
384 let mut state = self.diagnostics.inner.state.lock();
385 state
386 .active_archive_batches
387 .retain(|batch| batch.archive_hash != format!("{:016x}", self.archive_hash));
388 info!(
389 archive_hash = %format!("{:016x}", self.archive_hash),
390 elapsed_ms = self.started.elapsed().as_millis(),
391 "archive batch finished"
392 );
393 }
394}
395
396impl Drop for PatchGuard {
397 fn drop(&mut self) {
398 let mut state = self.diagnostics.inner.state.lock();
399 state
400 .active_patches
401 .retain(|patch| patch.patch_id != self.patch_id);
402 info!(
403 patch_id = %self.patch_id,
404 elapsed_ms = self.started.elapsed().as_millis(),
405 "patch finished"
406 );
407 }
408}
409
410fn unix_ms() -> u128 {
411 SystemTime::now()
412 .duration_since(UNIX_EPOCH)
413 .unwrap_or_default()
414 .as_millis()
415}
416
417fn process_snapshot() -> ProcessSnapshot {
418 let mut snapshot = ProcessSnapshot::default();
419 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
420 for line in status.lines() {
421 if let Some(value) = line.strip_prefix("VmRSS:") {
422 snapshot.vm_rss_kib = first_number(value);
423 } else if let Some(value) = line.strip_prefix("VmSwap:") {
424 snapshot.vm_swap_kib = first_number(value);
425 } else if let Some(value) = line.strip_prefix("Threads:") {
426 snapshot.threads = first_number(value);
427 }
428 }
429 }
430 if let Ok(io) = std::fs::read_to_string("/proc/self/io") {
431 for line in io.lines() {
432 let Some((key, value)) = line.split_once(':') else {
433 continue;
434 };
435 let parsed = value.trim().parse::<u64>().ok();
436 match key {
437 "rchar" => snapshot.rchar = parsed,
438 "wchar" => snapshot.wchar = parsed,
439 "read_bytes" => snapshot.read_bytes = parsed,
440 "write_bytes" => snapshot.write_bytes = parsed,
441 _ => {}
442 }
443 }
444 }
445 snapshot
446}
447
448pub fn current_process_snapshot() -> ProcessSnapshot {
449 process_snapshot()
450}
451
452fn cgroup_snapshot() -> Option<CgroupSnapshot> {
453 let path = current_cgroup_v2_path()?;
454 Some(CgroupSnapshot {
455 path: path.display().to_string(),
456 memory_current: read_cgroup_u64(&path, "memory.current"),
457 memory_high: read_cgroup_limit(&path, "memory.high"),
458 memory_max: read_cgroup_limit(&path, "memory.max"),
459 memory_swap_current: read_cgroup_u64(&path, "memory.swap.current"),
460 memory_swap_max: read_cgroup_limit(&path, "memory.swap.max"),
461 memory_events_high: read_memory_event(&path, "high"),
462 memory_events_oom: read_memory_event(&path, "oom"),
463 memory_events_oom_kill: read_memory_event(&path, "oom_kill"),
464 })
465}
466
467fn current_cgroup_v2_path() -> Option<PathBuf> {
468 let cgroup = std::fs::read_to_string("/proc/self/cgroup").ok()?;
469 for line in cgroup.lines() {
470 let mut fields = line.splitn(3, ':');
471 let _hierarchy = fields.next();
472 let controllers = fields.next()?;
473 let path = fields.next()?;
474 if controllers.is_empty() {
475 return Some(Path::new("/sys/fs/cgroup").join(path.trim_start_matches('/')));
476 }
477 }
478 None
479}
480
481fn read_cgroup_u64(path: &Path, file: &str) -> Option<u64> {
482 std::fs::read_to_string(path.join(file))
483 .ok()
484 .and_then(|value| value.trim().parse::<u64>().ok())
485}
486
487fn read_cgroup_limit(path: &Path, file: &str) -> Option<u64> {
488 let value = std::fs::read_to_string(path.join(file)).ok()?;
489 let value = value.trim();
490 if value == "max" {
491 None
492 } else {
493 value.parse::<u64>().ok()
494 }
495}
496
497fn read_memory_event(path: &Path, key: &str) -> Option<u64> {
498 let events = std::fs::read_to_string(path.join("memory.events")).ok()?;
499 for line in events.lines() {
500 let (event, value) = line.split_once(' ')?;
501 if event == key {
502 return value.parse::<u64>().ok();
503 }
504 }
505 None
506}
507
508fn first_number(value: &str) -> Option<u64> {
509 value.split_whitespace().next()?.parse::<u64>().ok()
510}
511
512pub fn memory_is_saturated(cgroup: Option<&CgroupSnapshot>) -> bool {
513 let Some(cgroup) = cgroup else {
514 return false;
515 };
516 let high_saturated = match (cgroup.memory_current, cgroup.memory_high) {
517 (Some(current), Some(high)) => current >= high,
518 _ => false,
519 };
520 let swap_saturated = match (cgroup.memory_swap_current, cgroup.memory_swap_max) {
521 (Some(current), Some(max)) if max > 0 => current.saturating_mul(100) >= max * 95,
522 _ => false,
523 };
524 high_saturated && swap_saturated
525}
526
527pub fn cgroup_memory_pressure_high(threshold: f64) -> bool {
528 let Some(snapshot) = cgroup_snapshot() else {
529 return false;
530 };
531 let Some(current) = snapshot.memory_current else {
532 return false;
533 };
534 let limit = snapshot.memory_high.or(snapshot.memory_max);
535 let Some(limit) = limit.filter(|limit| *limit > 0) else {
536 return false;
537 };
538 (current as f64 / limit as f64) >= threshold
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[test]
546 fn memory_saturation_requires_high_and_swap_pressure() {
547 let saturated = CgroupSnapshot {
548 memory_current: Some(100),
549 memory_high: Some(100),
550 memory_swap_current: Some(95),
551 memory_swap_max: Some(100),
552 ..CgroupSnapshot::default()
553 };
554 assert!(memory_is_saturated(Some(&saturated)));
555
556 let no_swap = CgroupSnapshot {
557 memory_current: Some(100),
558 memory_high: Some(100),
559 memory_swap_current: Some(10),
560 memory_swap_max: Some(100),
561 ..CgroupSnapshot::default()
562 };
563 assert!(!memory_is_saturated(Some(&no_swap)));
564
565 let no_high = CgroupSnapshot {
566 memory_current: Some(99),
567 memory_high: Some(100),
568 memory_swap_current: Some(95),
569 memory_swap_max: Some(100),
570 ..CgroupSnapshot::default()
571 };
572 assert!(!memory_is_saturated(Some(&no_high)));
573 }
574
575 #[test]
576 fn cgroup_pressure_without_cgroup_is_false_or_bounded() {
577 let _ = cgroup_memory_pressure_high(0.80);
578 }
579
580 #[tokio::test]
581 async fn heartbeat_writes_json_line() {
582 let temp = tempfile::tempdir().unwrap();
583 let diagnostics = WabbajackDiagnostics::new(WabbajackDiagnosticsOptions {
584 dir: temp.path().to_path_buf(),
585 interval: Duration::from_secs(1),
586 stall_warn: Duration::from_mins(1),
587 stall_abort: Duration::from_mins(2),
588 })
589 .await
590 .unwrap();
591
592 diagnostics.set_phase("apply-archive-batches");
593 diagnostics.record_progress(ProgressEvent::Other);
594 diagnostics.write_heartbeat(1234).await.unwrap();
595
596 let heartbeat = std::fs::read_to_string(temp.path().join("heartbeat.jsonl")).unwrap();
597 let record: serde_json::Value = serde_json::from_str(heartbeat.trim()).unwrap();
598 assert_eq!(record["kind"], "heartbeat");
599 assert_eq!(record["phase"], "apply-archive-batches");
600 assert_eq!(record["byte_cache_used"], 1234);
601 }
602
603 #[tokio::test]
604 async fn archive_batch_record_writes_json_line() {
605 let temp = tempfile::tempdir().unwrap();
606 let diagnostics = WabbajackDiagnostics::new(WabbajackDiagnosticsOptions {
607 dir: temp.path().to_path_buf(),
608 interval: Duration::from_secs(1),
609 stall_warn: Duration::from_mins(1),
610 stall_abort: Duration::from_mins(2),
611 })
612 .await
613 .unwrap();
614 diagnostics
615 .record_archive_batch(&ArchiveBatchRecord {
616 kind: "archive_batch".to_string(),
617 unix_ms: 1,
618 archive_hash: "0000000000000001".to_string(),
619 archive_size_bytes: 2,
620 directive_count: 3,
621 patch_count: 1,
622 elapsed_ms: 4,
623 trust_check_ms: 0,
624 extraction_ms: 5,
625 patch_ms: 6,
626 prune_ms: 0,
627 extracted_patch_source_bytes: 7,
628 sidecar_hit: false,
629 streamed_hash_bytes: 0,
630 memory_archive_hit: false,
631 disk_archive_fallback: false,
632 pruned_bytes: 0,
633 byte_cache_used_before: 8,
634 byte_cache_used_after: 9,
635 success_count: 10,
636 error_count: 0,
637 first_error: None,
638 rss_before_kib: Some(11),
639 rss_after_kib: Some(12),
640 swap_before_kib: Some(13),
641 swap_after_kib: Some(14),
642 })
643 .await
644 .unwrap();
645 let records = std::fs::read_to_string(temp.path().join("archive-batches.jsonl")).unwrap();
646 let record: ArchiveBatchRecord = serde_json::from_str(records.trim()).unwrap();
647 assert_eq!(record.archive_hash, "0000000000000001");
648 assert_eq!(record.extracted_patch_source_bytes, 7);
649 }
650}