Skip to main content

loong_kernel/
audit.rs

1use std::fs::{self, File, OpenOptions};
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5
6// Re-export data types from contracts
7pub use loong_contracts::{AuditEvent, AuditEventKind, ExecutionPlane, PlaneTier};
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10
11use crate::errors::AuditError;
12
13pub trait AuditSink: Send + Sync {
14    fn record(&self, event: AuditEvent) -> Result<(), AuditError>;
15}
16
17#[derive(Debug, Default)]
18pub struct NoopAuditSink;
19
20impl AuditSink for NoopAuditSink {
21    fn record(&self, _event: AuditEvent) -> Result<(), AuditError> {
22        Ok(())
23    }
24}
25
26#[derive(Debug, Default, Clone)]
27pub struct InMemoryAuditSink {
28    events: Arc<Mutex<Vec<AuditEvent>>>,
29}
30
31impl InMemoryAuditSink {
32    #[must_use]
33    pub fn snapshot(&self) -> Vec<AuditEvent> {
34        self.events
35            .lock()
36            .map_or_else(|_| Vec::new(), |guard| guard.clone())
37    }
38}
39
40impl AuditSink for InMemoryAuditSink {
41    fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
42        let mut guard = self
43            .events
44            .lock()
45            .map_err(|_err| AuditError::Sink("audit mutex poisoned".to_owned()))?;
46        guard.push(event);
47        Ok(())
48    }
49}
50
51#[derive(Debug)]
52struct JsonlAuditJournalState {
53    file: File,
54    last_entry_hash: Option<String>,
55}
56
57#[derive(Debug)]
58pub struct JsonlAuditSink {
59    path: PathBuf,
60    journal: Mutex<JsonlAuditJournalState>,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct AuditVerificationReport {
65    pub total_events: usize,
66    pub verified_events: usize,
67    pub valid: bool,
68    pub last_entry_hash: Option<String>,
69    pub first_invalid_line: Option<usize>,
70    pub reason: Option<String>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74struct PersistedAuditIntegrity {
75    algorithm: String,
76    prev_hash: Option<String>,
77    entry_hash: String,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81struct PersistedAuditEvent {
82    #[serde(flatten)]
83    event: AuditEvent,
84    #[serde(default, skip_serializing_if = "Option::is_none")]
85    integrity: Option<PersistedAuditIntegrity>,
86}
87
88fn prepare_audit_journal_parent(path: &Path) -> Result<(), AuditError> {
89    if let Some(parent) = path.parent()
90        && !parent.as_os_str().is_empty()
91    {
92        fs::create_dir_all(parent).map_err(|error| {
93            AuditError::Sink(format!(
94                "failed to prepare audit journal parent directory `{}`: {error}",
95                parent.display()
96            ))
97        })?;
98    }
99
100    Ok(())
101}
102
103fn open_jsonl_audit_journal(path: &Path) -> Result<File, AuditError> {
104    prepare_audit_journal_parent(path)?;
105
106    OpenOptions::new()
107        .create(true)
108        .read(true)
109        .append(true)
110        .open(path)
111        .map_err(|error| {
112            AuditError::Sink(format!(
113                "failed to open audit journal `{}`: {error}",
114                path.display()
115            ))
116        })
117}
118
119fn lock_audit_journal(journal: &File, path: &Path) -> Result<(), AuditError> {
120    journal.lock().map_err(|error| {
121        AuditError::Sink(format!(
122            "failed to lock audit journal `{}`: {error}",
123            path.display()
124        ))
125    })
126}
127
128fn unlock_audit_journal(journal: &File, path: &Path) -> Result<(), AuditError> {
129    journal.unlock().map_err(|error| {
130        AuditError::Sink(format!(
131            "failed to unlock audit journal `{}`: {error}",
132            path.display()
133        ))
134    })
135}
136
137/// Exercise the same open + lock + unlock path that production audit writes use.
138pub fn probe_jsonl_audit_journal_runtime_ready(path: &Path) -> Result<(), AuditError> {
139    let journal = open_jsonl_audit_journal(path)?;
140    lock_audit_journal(&journal, path)?;
141    unlock_audit_journal(&journal, path)
142}
143
144impl JsonlAuditSink {
145    pub fn new(path: PathBuf) -> Result<Self, AuditError> {
146        let journal = open_jsonl_audit_journal(&path)?;
147        let last_entry_hash = load_last_audit_entry_hash(&path)?;
148
149        Ok(Self {
150            path,
151            journal: Mutex::new(JsonlAuditJournalState {
152                file: journal,
153                last_entry_hash,
154            }),
155        })
156    }
157}
158
159fn serialize_audit_event_chain_material(
160    event: &AuditEvent,
161    prev_hash: Option<&str>,
162    journal_path: &Path,
163) -> Result<Vec<u8>, AuditError> {
164    serde_json::to_vec(&serde_json::json!({
165        "event_id": event.event_id,
166        "timestamp_epoch_s": event.timestamp_epoch_s,
167        "agent_id": event.agent_id,
168        "kind": event.kind,
169        "prev_hash": prev_hash,
170    }))
171    .map_err(|error| {
172        AuditError::Sink(format!(
173            "failed to serialize audit chain material for `{}`: {error}",
174            journal_path.display()
175        ))
176    })
177}
178
179fn compute_audit_event_entry_hash(
180    event: &AuditEvent,
181    prev_hash: Option<&str>,
182    journal_path: &Path,
183) -> Result<String, AuditError> {
184    let material = serialize_audit_event_chain_material(event, prev_hash, journal_path)?;
185    let digest = Sha256::digest(material);
186    let encoded = hex::encode(digest);
187    Ok(encoded)
188}
189
190fn event_with_integrity(
191    event: AuditEvent,
192    prev_hash: Option<String>,
193    entry_hash: String,
194) -> PersistedAuditEvent {
195    let integrity = PersistedAuditIntegrity {
196        algorithm: "sha256".to_owned(),
197        prev_hash,
198        entry_hash,
199    };
200
201    PersistedAuditEvent {
202        event,
203        integrity: Some(integrity),
204    }
205}
206
207fn decode_persisted_audit_event_line(
208    line: &str,
209    journal_path: &Path,
210    line_number: &str,
211) -> Result<PersistedAuditEvent, AuditError> {
212    serde_json::from_str::<PersistedAuditEvent>(line).map_err(|error| {
213        AuditError::Sink(format!(
214            "failed to decode audit journal `{}` at {}: {error}",
215            journal_path.display(),
216            line_number
217        ))
218    })
219}
220
221fn load_last_audit_entry_hash(path: &Path) -> Result<Option<String>, AuditError> {
222    if !path.exists() {
223        return Ok(None);
224    }
225
226    let file = File::open(path).map_err(|error| {
227        AuditError::Sink(format!(
228            "failed to inspect audit journal `{}`: {error}",
229            path.display()
230        ))
231    })?;
232    let reader = BufReader::new(file);
233    let mut last_non_empty_line = None;
234
235    for line_result in reader.lines() {
236        let line = line_result.map_err(|error| {
237            AuditError::Sink(format!(
238                "failed to read audit journal `{}` while loading tail hash: {error}",
239                path.display()
240            ))
241        })?;
242        if !line.trim().is_empty() {
243            last_non_empty_line = Some(line);
244        }
245    }
246
247    let Some(line) = last_non_empty_line else {
248        return Ok(None);
249    };
250
251    let persisted_event = decode_persisted_audit_event_line(&line, path, "tail line")?;
252    let last_hash = persisted_event.integrity.and_then(|value| {
253        let hash = value.entry_hash;
254        let trimmed_hash = hash.trim();
255        if trimmed_hash.is_empty() {
256            return None;
257        }
258        Some(hash)
259    });
260
261    Ok(last_hash)
262}
263
264pub fn verify_jsonl_audit_journal(path: &Path) -> Result<AuditVerificationReport, AuditError> {
265    if !path.exists() {
266        return Ok(AuditVerificationReport {
267            total_events: 0,
268            verified_events: 0,
269            valid: true,
270            last_entry_hash: None,
271            first_invalid_line: None,
272            reason: None,
273        });
274    }
275
276    let file = File::open(path).map_err(|error| {
277        AuditError::Sink(format!(
278            "failed to open audit journal `{}` for verification: {error}",
279            path.display()
280        ))
281    })?;
282    let reader = BufReader::new(file);
283    let mut total_events = 0usize;
284    let mut verified_events = 0usize;
285    let mut previous_hash: Option<String> = None;
286    let mut protected_chain_started = false;
287
288    for (index, line_result) in reader.lines().enumerate() {
289        let line_number = index + 1;
290        let line = line_result.map_err(|error| {
291            AuditError::Sink(format!(
292                "failed to read audit journal `{}` at line {}: {error}",
293                path.display(),
294                line_number
295            ))
296        })?;
297        if line.trim().is_empty() {
298            continue;
299        }
300
301        total_events += 1;
302        let line_label = format!("line {line_number}");
303        let persisted_event = decode_persisted_audit_event_line(&line, path, &line_label)?;
304        let event = persisted_event.event;
305        let Some(integrity) = persisted_event.integrity.as_ref() else {
306            if protected_chain_started {
307                return Ok(AuditVerificationReport {
308                    total_events,
309                    verified_events,
310                    valid: false,
311                    last_entry_hash: previous_hash,
312                    first_invalid_line: Some(line_number),
313                    reason: Some("missing integrity envelope".to_owned()),
314                });
315            }
316
317            continue;
318        };
319
320        if integrity.algorithm.trim() != "sha256" {
321            return Ok(AuditVerificationReport {
322                total_events,
323                verified_events,
324                valid: false,
325                last_entry_hash: previous_hash,
326                first_invalid_line: Some(line_number),
327                reason: Some(format!(
328                    "unsupported integrity algorithm `{}`",
329                    integrity.algorithm
330                )),
331            });
332        }
333
334        protected_chain_started = true;
335
336        if integrity.prev_hash != previous_hash {
337            return Ok(AuditVerificationReport {
338                total_events,
339                verified_events,
340                valid: false,
341                last_entry_hash: previous_hash,
342                first_invalid_line: Some(line_number),
343                reason: Some("prev_hash mismatch".to_owned()),
344            });
345        }
346
347        let expected_hash =
348            compute_audit_event_entry_hash(&event, integrity.prev_hash.as_deref(), path)?;
349
350        if integrity.entry_hash != expected_hash {
351            return Ok(AuditVerificationReport {
352                total_events,
353                verified_events,
354                valid: false,
355                last_entry_hash: previous_hash,
356                first_invalid_line: Some(line_number),
357                reason: Some("entry_hash mismatch".to_owned()),
358            });
359        }
360
361        previous_hash = Some(integrity.entry_hash.clone());
362        verified_events += 1;
363    }
364
365    Ok(AuditVerificationReport {
366        total_events,
367        verified_events,
368        valid: true,
369        last_entry_hash: previous_hash,
370        first_invalid_line: None,
371        reason: None,
372    })
373}
374
375#[derive(Debug, Clone, PartialEq, Eq)]
376pub enum AuditRepairOutcome {
377    Healthy,
378    Repaired,
379    Refused { line: usize, reason: String },
380}
381
382#[derive(Debug, Clone, PartialEq, Eq)]
383pub struct AuditRepairReport {
384    pub total_events: usize,
385    pub repaired_events: usize,
386    pub already_valid_events: usize,
387    pub outcome: AuditRepairOutcome,
388}
389
390/// Repair legacy journal entries that are missing integrity envelopes.
391///
392/// **Must be run while the daemon is stopped.** A running `JsonlAuditSink` holds
393/// an open file handle and cached tail hash that would be invalidated by the
394/// atomic rename.
395pub fn repair_jsonl_audit_journal(path: &Path) -> Result<AuditRepairReport, AuditError> {
396    if !path.exists() {
397        return Ok(AuditRepairReport {
398            total_events: 0,
399            repaired_events: 0,
400            already_valid_events: 0,
401            outcome: AuditRepairOutcome::Healthy,
402        });
403    }
404
405    let file = File::open(path).map_err(|error| {
406        AuditError::Sink(format!(
407            "failed to open audit journal `{}` for repair: {error}",
408            path.display()
409        ))
410    })?;
411    let original_metadata = fs::metadata(path).map_err(|error| {
412        AuditError::Sink(format!(
413            "failed to read audit journal metadata `{}` before repair: {error}",
414            path.display()
415        ))
416    })?;
417    let original_permissions = original_metadata.permissions();
418
419    let reader = BufReader::new(file);
420    let mut repaired_lines: Vec<Vec<u8>> = Vec::new();
421    let mut rebuilt_previous_hash: Option<String> = None;
422    let mut source_previous_hash: Option<String> = None;
423    let mut protected_chain_started = false;
424    let mut total_events = 0usize;
425    let mut repaired_events = 0usize;
426    let mut already_valid_events = 0usize;
427
428    for (index, line_result) in reader.lines().enumerate() {
429        let line_number = index + 1;
430        let line = line_result.map_err(|error| {
431            AuditError::Sink(format!(
432                "failed to read audit journal `{}` at line {line_number}: {error}",
433                path.display()
434            ))
435        })?;
436        if line.trim().is_empty() {
437            repaired_lines.push(b"\n".to_vec());
438            continue;
439        }
440
441        total_events += 1;
442        let line_label = format!("line {line_number}");
443        let persisted = decode_persisted_audit_event_line(&line, path, &line_label)?;
444        let event = persisted.event;
445
446        if let Some(integrity) = persisted.integrity.as_ref() {
447            if integrity.algorithm.trim() != "sha256" {
448                return Ok(AuditRepairReport {
449                    total_events,
450                    repaired_events,
451                    already_valid_events,
452                    outcome: AuditRepairOutcome::Refused {
453                        line: line_number,
454                        reason: format!(
455                            "unsupported integrity algorithm `{}`",
456                            integrity.algorithm
457                        ),
458                    },
459                });
460            }
461
462            // Validate source chain: prev_hash must match the previous
463            // source entry_hash (mirrors verify_jsonl_audit_journal).
464            if integrity.prev_hash != source_previous_hash {
465                return Ok(AuditRepairReport {
466                    total_events,
467                    repaired_events,
468                    already_valid_events,
469                    outcome: AuditRepairOutcome::Refused {
470                        line: line_number,
471                        reason: "prev_hash mismatch in source chain".to_owned(),
472                    },
473                });
474            }
475
476            // Check self-consistency: does entry_hash match the event data?
477            let self_consistent_hash =
478                compute_audit_event_entry_hash(&event, integrity.prev_hash.as_deref(), path)?;
479            if integrity.entry_hash != self_consistent_hash {
480                return Ok(AuditRepairReport {
481                    total_events,
482                    repaired_events,
483                    already_valid_events,
484                    outcome: AuditRepairOutcome::Refused {
485                        line: line_number,
486                        reason: "entry_hash mismatch — event data may be tampered".to_owned(),
487                    },
488                });
489            }
490
491            protected_chain_started = true;
492            source_previous_hash = Some(integrity.entry_hash.clone());
493
494            if repaired_events == 0 && integrity.prev_hash == rebuilt_previous_hash {
495                // No prior repairs and chain is continuous — keep as-is.
496                rebuilt_previous_hash = Some(integrity.entry_hash.clone());
497                already_valid_events += 1;
498                let mut encoded = line.into_bytes();
499                encoded.push(b'\n');
500                repaired_lines.push(encoded);
501            } else {
502                // Prior legacy entries were repaired, so the chain position
503                // changed. Re-seal this entry with the rebuilt prev_hash.
504                let entry_hash =
505                    compute_audit_event_entry_hash(&event, rebuilt_previous_hash.as_deref(), path)?;
506                let resealed =
507                    event_with_integrity(event, rebuilt_previous_hash.clone(), entry_hash.clone());
508                let encoded = serialize_audit_event_line(&resealed, path)?;
509                repaired_lines.push(encoded);
510                rebuilt_previous_hash = Some(entry_hash);
511                repaired_events += 1;
512            }
513        } else {
514            if protected_chain_started {
515                return Ok(AuditRepairReport {
516                    total_events,
517                    repaired_events,
518                    already_valid_events,
519                    outcome: AuditRepairOutcome::Refused {
520                        line: line_number,
521                        reason: "missing integrity envelope after protected chain started"
522                            .to_owned(),
523                    },
524                });
525            }
526            let entry_hash =
527                compute_audit_event_entry_hash(&event, rebuilt_previous_hash.as_deref(), path)?;
528            let repaired =
529                event_with_integrity(event, rebuilt_previous_hash.clone(), entry_hash.clone());
530            let encoded = serialize_audit_event_line(&repaired, path)?;
531            repaired_lines.push(encoded);
532            rebuilt_previous_hash = Some(entry_hash);
533            repaired_events += 1;
534        }
535    }
536
537    if repaired_events == 0 {
538        return Ok(AuditRepairReport {
539            total_events,
540            repaired_events: 0,
541            already_valid_events,
542            outcome: AuditRepairOutcome::Healthy,
543        });
544    }
545
546    let temp_path = path.with_extension("jsonl.repair-tmp");
547    let write_result = (|| {
548        let mut temp_file = File::create(&temp_path).map_err(|error| {
549            AuditError::Sink(format!(
550                "failed to create repair temp file `{}`: {error}",
551                temp_path.display()
552            ))
553        })?;
554        fs::set_permissions(&temp_path, original_permissions.clone()).map_err(|error| {
555            AuditError::Sink(format!(
556                "failed to apply original permissions to repair temp file `{}`: {error}",
557                temp_path.display()
558            ))
559        })?;
560        for line_bytes in &repaired_lines {
561            temp_file.write_all(line_bytes).map_err(|error| {
562                AuditError::Sink(format!(
563                    "failed to write repair temp file `{}`: {error}",
564                    temp_path.display()
565                ))
566            })?;
567        }
568        temp_file.flush().map_err(|error| {
569            AuditError::Sink(format!(
570                "failed to flush repair temp file `{}`: {error}",
571                temp_path.display()
572            ))
573        })?;
574        temp_file.sync_all().map_err(|error| {
575            AuditError::Sink(format!(
576                "failed to sync repair temp file `{}`: {error}",
577                temp_path.display()
578            ))
579        })?;
580        drop(temp_file);
581        fs::rename(&temp_path, path).map_err(|error| {
582            AuditError::Sink(format!(
583                "failed to replace journal with repaired file `{}`: {error}",
584                path.display()
585            ))
586        })
587    })();
588
589    if write_result.is_err() {
590        let _ = fs::remove_file(&temp_path);
591    }
592    write_result?;
593
594    Ok(AuditRepairReport {
595        total_events,
596        repaired_events,
597        already_valid_events,
598        outcome: AuditRepairOutcome::Repaired,
599    })
600}
601
602fn serialize_audit_event_line(
603    event: &PersistedAuditEvent,
604    journal_path: &Path,
605) -> Result<Vec<u8>, AuditError> {
606    let mut encoded = serde_json::to_vec(event).map_err(|error| {
607        AuditError::Sink(format!(
608            "failed to serialize audit event for `{}`: {error}",
609            journal_path.display()
610        ))
611    })?;
612    encoded.push(b'\n');
613    Ok(encoded)
614}
615
616impl AuditSink for JsonlAuditSink {
617    fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
618        let mut guard = self
619            .journal
620            .lock()
621            .map_err(|_error| AuditError::Sink("audit journal mutex poisoned".to_owned()))?;
622        let previous_hash = guard.last_entry_hash.clone();
623        let entry_hash =
624            compute_audit_event_entry_hash(&event, previous_hash.as_deref(), &self.path)?;
625        let persisted_event = event_with_integrity(event, previous_hash, entry_hash.clone());
626        let encoded = serialize_audit_event_line(&persisted_event, &self.path)?;
627
628        lock_audit_journal(&guard.file, &self.path)?;
629
630        let write_result = guard
631            .file
632            .write_all(&encoded)
633            .map_err(|error| {
634                AuditError::Sink(format!(
635                    "failed to append audit event to `{}`: {error}",
636                    self.path.display()
637                ))
638            })
639            .and_then(|()| {
640                guard.file.flush().map_err(|error| {
641                    AuditError::Sink(format!(
642                        "failed to flush audit journal `{}`: {error}",
643                        self.path.display()
644                    ))
645                })
646            });
647
648        let unlock_result = unlock_audit_journal(&guard.file, &self.path);
649
650        match (write_result, unlock_result) {
651            (Err(error), _) => Err(error),
652            (Ok(()), Err(error)) => Err(error),
653            (Ok(()), Ok(())) => {
654                guard.last_entry_hash = Some(entry_hash);
655                Ok(())
656            }
657        }
658    }
659}
660
661pub struct FanoutAuditSink {
662    children: Vec<Arc<dyn AuditSink>>,
663}
664
665impl FanoutAuditSink {
666    #[must_use]
667    pub fn new(children: Vec<Arc<dyn AuditSink>>) -> Self {
668        assert!(
669            !children.is_empty(),
670            "fanout audit sink requires at least one child"
671        );
672        Self { children }
673    }
674}
675
676impl AuditSink for FanoutAuditSink {
677    fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
678        if let Some((last, rest)) = self.children.split_last() {
679            for sink in rest {
680                sink.record(event.clone())?;
681            }
682            last.record(event)?;
683        }
684        Ok(())
685    }
686}