1use std::fs::{self, File, OpenOptions};
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5
6pub use loong_contracts::{AuditEvent, AuditEventKind, ExecutionPlane, PlaneTier};
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10
11use crate::errors::AuditError;
12
13pub trait AuditSink: Send + Sync {
14 fn record(&self, event: AuditEvent) -> Result<(), AuditError>;
15}
16
17#[derive(Debug, Default)]
18pub struct NoopAuditSink;
19
20impl AuditSink for NoopAuditSink {
21 fn record(&self, _event: AuditEvent) -> Result<(), AuditError> {
22 Ok(())
23 }
24}
25
26#[derive(Debug, Default, Clone)]
27pub struct InMemoryAuditSink {
28 events: Arc<Mutex<Vec<AuditEvent>>>,
29}
30
31impl InMemoryAuditSink {
32 #[must_use]
33 pub fn snapshot(&self) -> Vec<AuditEvent> {
34 self.events
35 .lock()
36 .map_or_else(|_| Vec::new(), |guard| guard.clone())
37 }
38}
39
40impl AuditSink for InMemoryAuditSink {
41 fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
42 let mut guard = self
43 .events
44 .lock()
45 .map_err(|_err| AuditError::Sink("audit mutex poisoned".to_owned()))?;
46 guard.push(event);
47 Ok(())
48 }
49}
50
51#[derive(Debug)]
52struct JsonlAuditJournalState {
53 file: File,
54 last_entry_hash: Option<String>,
55}
56
57#[derive(Debug)]
58pub struct JsonlAuditSink {
59 path: PathBuf,
60 journal: Mutex<JsonlAuditJournalState>,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct AuditVerificationReport {
65 pub total_events: usize,
66 pub verified_events: usize,
67 pub valid: bool,
68 pub last_entry_hash: Option<String>,
69 pub first_invalid_line: Option<usize>,
70 pub reason: Option<String>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74struct PersistedAuditIntegrity {
75 algorithm: String,
76 prev_hash: Option<String>,
77 entry_hash: String,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81struct PersistedAuditEvent {
82 #[serde(flatten)]
83 event: AuditEvent,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
85 integrity: Option<PersistedAuditIntegrity>,
86}
87
88fn prepare_audit_journal_parent(path: &Path) -> Result<(), AuditError> {
89 if let Some(parent) = path.parent()
90 && !parent.as_os_str().is_empty()
91 {
92 fs::create_dir_all(parent).map_err(|error| {
93 AuditError::Sink(format!(
94 "failed to prepare audit journal parent directory `{}`: {error}",
95 parent.display()
96 ))
97 })?;
98 }
99
100 Ok(())
101}
102
103fn open_jsonl_audit_journal(path: &Path) -> Result<File, AuditError> {
104 prepare_audit_journal_parent(path)?;
105
106 OpenOptions::new()
107 .create(true)
108 .read(true)
109 .append(true)
110 .open(path)
111 .map_err(|error| {
112 AuditError::Sink(format!(
113 "failed to open audit journal `{}`: {error}",
114 path.display()
115 ))
116 })
117}
118
119fn lock_audit_journal(journal: &File, path: &Path) -> Result<(), AuditError> {
120 journal.lock().map_err(|error| {
121 AuditError::Sink(format!(
122 "failed to lock audit journal `{}`: {error}",
123 path.display()
124 ))
125 })
126}
127
128fn unlock_audit_journal(journal: &File, path: &Path) -> Result<(), AuditError> {
129 journal.unlock().map_err(|error| {
130 AuditError::Sink(format!(
131 "failed to unlock audit journal `{}`: {error}",
132 path.display()
133 ))
134 })
135}
136
137pub fn probe_jsonl_audit_journal_runtime_ready(path: &Path) -> Result<(), AuditError> {
139 let journal = open_jsonl_audit_journal(path)?;
140 lock_audit_journal(&journal, path)?;
141 unlock_audit_journal(&journal, path)
142}
143
144impl JsonlAuditSink {
145 pub fn new(path: PathBuf) -> Result<Self, AuditError> {
146 let journal = open_jsonl_audit_journal(&path)?;
147 let last_entry_hash = load_last_audit_entry_hash(&path)?;
148
149 Ok(Self {
150 path,
151 journal: Mutex::new(JsonlAuditJournalState {
152 file: journal,
153 last_entry_hash,
154 }),
155 })
156 }
157}
158
159fn serialize_audit_event_chain_material(
160 event: &AuditEvent,
161 prev_hash: Option<&str>,
162 journal_path: &Path,
163) -> Result<Vec<u8>, AuditError> {
164 serde_json::to_vec(&serde_json::json!({
165 "event_id": event.event_id,
166 "timestamp_epoch_s": event.timestamp_epoch_s,
167 "agent_id": event.agent_id,
168 "kind": event.kind,
169 "prev_hash": prev_hash,
170 }))
171 .map_err(|error| {
172 AuditError::Sink(format!(
173 "failed to serialize audit chain material for `{}`: {error}",
174 journal_path.display()
175 ))
176 })
177}
178
179fn compute_audit_event_entry_hash(
180 event: &AuditEvent,
181 prev_hash: Option<&str>,
182 journal_path: &Path,
183) -> Result<String, AuditError> {
184 let material = serialize_audit_event_chain_material(event, prev_hash, journal_path)?;
185 let digest = Sha256::digest(material);
186 let encoded = hex::encode(digest);
187 Ok(encoded)
188}
189
190fn event_with_integrity(
191 event: AuditEvent,
192 prev_hash: Option<String>,
193 entry_hash: String,
194) -> PersistedAuditEvent {
195 let integrity = PersistedAuditIntegrity {
196 algorithm: "sha256".to_owned(),
197 prev_hash,
198 entry_hash,
199 };
200
201 PersistedAuditEvent {
202 event,
203 integrity: Some(integrity),
204 }
205}
206
207fn decode_persisted_audit_event_line(
208 line: &str,
209 journal_path: &Path,
210 line_number: &str,
211) -> Result<PersistedAuditEvent, AuditError> {
212 serde_json::from_str::<PersistedAuditEvent>(line).map_err(|error| {
213 AuditError::Sink(format!(
214 "failed to decode audit journal `{}` at {}: {error}",
215 journal_path.display(),
216 line_number
217 ))
218 })
219}
220
221fn load_last_audit_entry_hash(path: &Path) -> Result<Option<String>, AuditError> {
222 if !path.exists() {
223 return Ok(None);
224 }
225
226 let file = File::open(path).map_err(|error| {
227 AuditError::Sink(format!(
228 "failed to inspect audit journal `{}`: {error}",
229 path.display()
230 ))
231 })?;
232 let reader = BufReader::new(file);
233 let mut last_non_empty_line = None;
234
235 for line_result in reader.lines() {
236 let line = line_result.map_err(|error| {
237 AuditError::Sink(format!(
238 "failed to read audit journal `{}` while loading tail hash: {error}",
239 path.display()
240 ))
241 })?;
242 if !line.trim().is_empty() {
243 last_non_empty_line = Some(line);
244 }
245 }
246
247 let Some(line) = last_non_empty_line else {
248 return Ok(None);
249 };
250
251 let persisted_event = decode_persisted_audit_event_line(&line, path, "tail line")?;
252 let last_hash = persisted_event.integrity.and_then(|value| {
253 let hash = value.entry_hash;
254 let trimmed_hash = hash.trim();
255 if trimmed_hash.is_empty() {
256 return None;
257 }
258 Some(hash)
259 });
260
261 Ok(last_hash)
262}
263
264pub fn verify_jsonl_audit_journal(path: &Path) -> Result<AuditVerificationReport, AuditError> {
265 if !path.exists() {
266 return Ok(AuditVerificationReport {
267 total_events: 0,
268 verified_events: 0,
269 valid: true,
270 last_entry_hash: None,
271 first_invalid_line: None,
272 reason: None,
273 });
274 }
275
276 let file = File::open(path).map_err(|error| {
277 AuditError::Sink(format!(
278 "failed to open audit journal `{}` for verification: {error}",
279 path.display()
280 ))
281 })?;
282 let reader = BufReader::new(file);
283 let mut total_events = 0usize;
284 let mut verified_events = 0usize;
285 let mut previous_hash: Option<String> = None;
286 let mut protected_chain_started = false;
287
288 for (index, line_result) in reader.lines().enumerate() {
289 let line_number = index + 1;
290 let line = line_result.map_err(|error| {
291 AuditError::Sink(format!(
292 "failed to read audit journal `{}` at line {}: {error}",
293 path.display(),
294 line_number
295 ))
296 })?;
297 if line.trim().is_empty() {
298 continue;
299 }
300
301 total_events += 1;
302 let line_label = format!("line {line_number}");
303 let persisted_event = decode_persisted_audit_event_line(&line, path, &line_label)?;
304 let event = persisted_event.event;
305 let Some(integrity) = persisted_event.integrity.as_ref() else {
306 if protected_chain_started {
307 return Ok(AuditVerificationReport {
308 total_events,
309 verified_events,
310 valid: false,
311 last_entry_hash: previous_hash,
312 first_invalid_line: Some(line_number),
313 reason: Some("missing integrity envelope".to_owned()),
314 });
315 }
316
317 continue;
318 };
319
320 if integrity.algorithm.trim() != "sha256" {
321 return Ok(AuditVerificationReport {
322 total_events,
323 verified_events,
324 valid: false,
325 last_entry_hash: previous_hash,
326 first_invalid_line: Some(line_number),
327 reason: Some(format!(
328 "unsupported integrity algorithm `{}`",
329 integrity.algorithm
330 )),
331 });
332 }
333
334 protected_chain_started = true;
335
336 if integrity.prev_hash != previous_hash {
337 return Ok(AuditVerificationReport {
338 total_events,
339 verified_events,
340 valid: false,
341 last_entry_hash: previous_hash,
342 first_invalid_line: Some(line_number),
343 reason: Some("prev_hash mismatch".to_owned()),
344 });
345 }
346
347 let expected_hash =
348 compute_audit_event_entry_hash(&event, integrity.prev_hash.as_deref(), path)?;
349
350 if integrity.entry_hash != expected_hash {
351 return Ok(AuditVerificationReport {
352 total_events,
353 verified_events,
354 valid: false,
355 last_entry_hash: previous_hash,
356 first_invalid_line: Some(line_number),
357 reason: Some("entry_hash mismatch".to_owned()),
358 });
359 }
360
361 previous_hash = Some(integrity.entry_hash.clone());
362 verified_events += 1;
363 }
364
365 Ok(AuditVerificationReport {
366 total_events,
367 verified_events,
368 valid: true,
369 last_entry_hash: previous_hash,
370 first_invalid_line: None,
371 reason: None,
372 })
373}
374
375#[derive(Debug, Clone, PartialEq, Eq)]
376pub enum AuditRepairOutcome {
377 Healthy,
378 Repaired,
379 Refused { line: usize, reason: String },
380}
381
382#[derive(Debug, Clone, PartialEq, Eq)]
383pub struct AuditRepairReport {
384 pub total_events: usize,
385 pub repaired_events: usize,
386 pub already_valid_events: usize,
387 pub outcome: AuditRepairOutcome,
388}
389
390pub fn repair_jsonl_audit_journal(path: &Path) -> Result<AuditRepairReport, AuditError> {
396 if !path.exists() {
397 return Ok(AuditRepairReport {
398 total_events: 0,
399 repaired_events: 0,
400 already_valid_events: 0,
401 outcome: AuditRepairOutcome::Healthy,
402 });
403 }
404
405 let file = File::open(path).map_err(|error| {
406 AuditError::Sink(format!(
407 "failed to open audit journal `{}` for repair: {error}",
408 path.display()
409 ))
410 })?;
411 let original_metadata = fs::metadata(path).map_err(|error| {
412 AuditError::Sink(format!(
413 "failed to read audit journal metadata `{}` before repair: {error}",
414 path.display()
415 ))
416 })?;
417 let original_permissions = original_metadata.permissions();
418
419 let reader = BufReader::new(file);
420 let mut repaired_lines: Vec<Vec<u8>> = Vec::new();
421 let mut rebuilt_previous_hash: Option<String> = None;
422 let mut source_previous_hash: Option<String> = None;
423 let mut protected_chain_started = false;
424 let mut total_events = 0usize;
425 let mut repaired_events = 0usize;
426 let mut already_valid_events = 0usize;
427
428 for (index, line_result) in reader.lines().enumerate() {
429 let line_number = index + 1;
430 let line = line_result.map_err(|error| {
431 AuditError::Sink(format!(
432 "failed to read audit journal `{}` at line {line_number}: {error}",
433 path.display()
434 ))
435 })?;
436 if line.trim().is_empty() {
437 repaired_lines.push(b"\n".to_vec());
438 continue;
439 }
440
441 total_events += 1;
442 let line_label = format!("line {line_number}");
443 let persisted = decode_persisted_audit_event_line(&line, path, &line_label)?;
444 let event = persisted.event;
445
446 if let Some(integrity) = persisted.integrity.as_ref() {
447 if integrity.algorithm.trim() != "sha256" {
448 return Ok(AuditRepairReport {
449 total_events,
450 repaired_events,
451 already_valid_events,
452 outcome: AuditRepairOutcome::Refused {
453 line: line_number,
454 reason: format!(
455 "unsupported integrity algorithm `{}`",
456 integrity.algorithm
457 ),
458 },
459 });
460 }
461
462 if integrity.prev_hash != source_previous_hash {
465 return Ok(AuditRepairReport {
466 total_events,
467 repaired_events,
468 already_valid_events,
469 outcome: AuditRepairOutcome::Refused {
470 line: line_number,
471 reason: "prev_hash mismatch in source chain".to_owned(),
472 },
473 });
474 }
475
476 let self_consistent_hash =
478 compute_audit_event_entry_hash(&event, integrity.prev_hash.as_deref(), path)?;
479 if integrity.entry_hash != self_consistent_hash {
480 return Ok(AuditRepairReport {
481 total_events,
482 repaired_events,
483 already_valid_events,
484 outcome: AuditRepairOutcome::Refused {
485 line: line_number,
486 reason: "entry_hash mismatch — event data may be tampered".to_owned(),
487 },
488 });
489 }
490
491 protected_chain_started = true;
492 source_previous_hash = Some(integrity.entry_hash.clone());
493
494 if repaired_events == 0 && integrity.prev_hash == rebuilt_previous_hash {
495 rebuilt_previous_hash = Some(integrity.entry_hash.clone());
497 already_valid_events += 1;
498 let mut encoded = line.into_bytes();
499 encoded.push(b'\n');
500 repaired_lines.push(encoded);
501 } else {
502 let entry_hash =
505 compute_audit_event_entry_hash(&event, rebuilt_previous_hash.as_deref(), path)?;
506 let resealed =
507 event_with_integrity(event, rebuilt_previous_hash.clone(), entry_hash.clone());
508 let encoded = serialize_audit_event_line(&resealed, path)?;
509 repaired_lines.push(encoded);
510 rebuilt_previous_hash = Some(entry_hash);
511 repaired_events += 1;
512 }
513 } else {
514 if protected_chain_started {
515 return Ok(AuditRepairReport {
516 total_events,
517 repaired_events,
518 already_valid_events,
519 outcome: AuditRepairOutcome::Refused {
520 line: line_number,
521 reason: "missing integrity envelope after protected chain started"
522 .to_owned(),
523 },
524 });
525 }
526 let entry_hash =
527 compute_audit_event_entry_hash(&event, rebuilt_previous_hash.as_deref(), path)?;
528 let repaired =
529 event_with_integrity(event, rebuilt_previous_hash.clone(), entry_hash.clone());
530 let encoded = serialize_audit_event_line(&repaired, path)?;
531 repaired_lines.push(encoded);
532 rebuilt_previous_hash = Some(entry_hash);
533 repaired_events += 1;
534 }
535 }
536
537 if repaired_events == 0 {
538 return Ok(AuditRepairReport {
539 total_events,
540 repaired_events: 0,
541 already_valid_events,
542 outcome: AuditRepairOutcome::Healthy,
543 });
544 }
545
546 let temp_path = path.with_extension("jsonl.repair-tmp");
547 let write_result = (|| {
548 let mut temp_file = File::create(&temp_path).map_err(|error| {
549 AuditError::Sink(format!(
550 "failed to create repair temp file `{}`: {error}",
551 temp_path.display()
552 ))
553 })?;
554 fs::set_permissions(&temp_path, original_permissions.clone()).map_err(|error| {
555 AuditError::Sink(format!(
556 "failed to apply original permissions to repair temp file `{}`: {error}",
557 temp_path.display()
558 ))
559 })?;
560 for line_bytes in &repaired_lines {
561 temp_file.write_all(line_bytes).map_err(|error| {
562 AuditError::Sink(format!(
563 "failed to write repair temp file `{}`: {error}",
564 temp_path.display()
565 ))
566 })?;
567 }
568 temp_file.flush().map_err(|error| {
569 AuditError::Sink(format!(
570 "failed to flush repair temp file `{}`: {error}",
571 temp_path.display()
572 ))
573 })?;
574 temp_file.sync_all().map_err(|error| {
575 AuditError::Sink(format!(
576 "failed to sync repair temp file `{}`: {error}",
577 temp_path.display()
578 ))
579 })?;
580 drop(temp_file);
581 fs::rename(&temp_path, path).map_err(|error| {
582 AuditError::Sink(format!(
583 "failed to replace journal with repaired file `{}`: {error}",
584 path.display()
585 ))
586 })
587 })();
588
589 if write_result.is_err() {
590 let _ = fs::remove_file(&temp_path);
591 }
592 write_result?;
593
594 Ok(AuditRepairReport {
595 total_events,
596 repaired_events,
597 already_valid_events,
598 outcome: AuditRepairOutcome::Repaired,
599 })
600}
601
602fn serialize_audit_event_line(
603 event: &PersistedAuditEvent,
604 journal_path: &Path,
605) -> Result<Vec<u8>, AuditError> {
606 let mut encoded = serde_json::to_vec(event).map_err(|error| {
607 AuditError::Sink(format!(
608 "failed to serialize audit event for `{}`: {error}",
609 journal_path.display()
610 ))
611 })?;
612 encoded.push(b'\n');
613 Ok(encoded)
614}
615
616impl AuditSink for JsonlAuditSink {
617 fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
618 let mut guard = self
619 .journal
620 .lock()
621 .map_err(|_error| AuditError::Sink("audit journal mutex poisoned".to_owned()))?;
622 let previous_hash = guard.last_entry_hash.clone();
623 let entry_hash =
624 compute_audit_event_entry_hash(&event, previous_hash.as_deref(), &self.path)?;
625 let persisted_event = event_with_integrity(event, previous_hash, entry_hash.clone());
626 let encoded = serialize_audit_event_line(&persisted_event, &self.path)?;
627
628 lock_audit_journal(&guard.file, &self.path)?;
629
630 let write_result = guard
631 .file
632 .write_all(&encoded)
633 .map_err(|error| {
634 AuditError::Sink(format!(
635 "failed to append audit event to `{}`: {error}",
636 self.path.display()
637 ))
638 })
639 .and_then(|()| {
640 guard.file.flush().map_err(|error| {
641 AuditError::Sink(format!(
642 "failed to flush audit journal `{}`: {error}",
643 self.path.display()
644 ))
645 })
646 });
647
648 let unlock_result = unlock_audit_journal(&guard.file, &self.path);
649
650 match (write_result, unlock_result) {
651 (Err(error), _) => Err(error),
652 (Ok(()), Err(error)) => Err(error),
653 (Ok(()), Ok(())) => {
654 guard.last_entry_hash = Some(entry_hash);
655 Ok(())
656 }
657 }
658 }
659}
660
661pub struct FanoutAuditSink {
662 children: Vec<Arc<dyn AuditSink>>,
663}
664
665impl FanoutAuditSink {
666 #[must_use]
667 pub fn new(children: Vec<Arc<dyn AuditSink>>) -> Self {
668 assert!(
669 !children.is_empty(),
670 "fanout audit sink requires at least one child"
671 );
672 Self { children }
673 }
674}
675
676impl AuditSink for FanoutAuditSink {
677 fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
678 if let Some((last, rest)) = self.children.split_last() {
679 for sink in rest {
680 sink.record(event.clone())?;
681 }
682 last.record(event)?;
683 }
684 Ok(())
685 }
686}