1use std::{
2 fs,
3 io::ErrorKind,
4 panic,
5 path::{Path, PathBuf},
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc, Mutex,
9 },
10 thread,
11 time::Duration,
12};
13
14use auditaur_collector::{
15 exporter_sqlite::{SqliteStore, SQLITE_SCHEMA_VERSION, TELEMETRY_DATABASE_FILE},
16 receiver::OTelBatch,
17 retention::RetentionLimits,
18};
19use auditaur_core::{
20 discovery::DiscoveryFile,
21 drive_bridge::{
22 DriveBridgeRequest, DriveBridgeResponse, DriveBridgeStatus, DRIVE_BRIDGE_CAPABILITY,
23 DRIVE_BRIDGE_DIR, DRIVE_BRIDGE_IN_FLIGHT_DIR, DRIVE_BRIDGE_PROTOCOL_VERSION,
24 DRIVE_BRIDGE_REQUESTS_DIR, DRIVE_BRIDGE_RESPONSES_DIR, DRIVE_BRIDGE_STALE_FILE_NANOS,
25 DRIVE_BRIDGE_STATUS_FILE,
26 },
27 model::{FrontendError, LogRecord, Session, TelemetrySource},
28 AuditaurConfig,
29};
30use serde_json::json;
31use time::{format_description::well_known::Rfc3339, OffsetDateTime};
32use uuid::Uuid;
33
34use crate::error::AuditaurError;
35
36static PANIC_SINK: Mutex<Option<PanicSink>> = Mutex::new(None);
37static PANIC_HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
38
39#[derive(Clone)]
40struct PanicSink {
41 session_id: String,
42 store: Arc<Mutex<SqliteStore>>,
43}
44
45pub struct AuditaurState {
46 pub session_id: Option<String>,
47 enabled: bool,
48 store: Option<Arc<Mutex<SqliteStore>>>,
49 discovery_path: Option<PathBuf>,
50 bridge_dir: Option<PathBuf>,
51 heartbeat_alive: Option<Arc<AtomicBool>>,
52 bridge_notifier_started: Arc<AtomicBool>,
53 bridge_notifier_alive: Arc<AtomicBool>,
54 redact_defaults: bool,
55 extra_redaction_keys: Vec<String>,
56 retention_limits: RetentionLimits,
57}
58
59impl AuditaurState {
60 pub fn initialize(
61 config: AuditaurConfig,
62 pid: u32,
63 app_identifier: Option<String>,
64 ) -> Result<Self, AuditaurError> {
65 let enabled = config.enabled.unwrap_or_else(default_enabled);
66 if !enabled {
67 return Ok(Self {
68 session_id: None,
69 enabled: false,
70 store: None,
71 discovery_path: None,
72 bridge_dir: None,
73 heartbeat_alive: None,
74 bridge_notifier_started: Arc::new(AtomicBool::new(false)),
75 bridge_notifier_alive: Arc::new(AtomicBool::new(false)),
76 redact_defaults: config.redact_defaults,
77 extra_redaction_keys: config.extra_redaction_keys,
78 retention_limits: RetentionLimits::default(),
79 });
80 }
81
82 if !cfg!(debug_assertions) && !config.allow_release_builds {
83 return Err(AuditaurError::new(
84 "Auditaur is disabled in release builds unless allow_release_builds is true.",
85 ));
86 }
87
88 let data_dir = auditaur_core::resolve_data_dir(config.data_dir.as_ref())
89 .map_err(|error| AuditaurError::new(error.to_string()))?;
90 let session_id = Uuid::new_v4().to_string();
91 let instance_id = Uuid::new_v4().to_string();
92 let session_dir = data_dir.join("sessions").join(&session_id);
93 let bridge_dir = session_dir.join(DRIVE_BRIDGE_DIR);
94 let apps_dir = data_dir.join("apps");
95 fs::create_dir_all(&session_dir)?;
96 fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_REQUESTS_DIR))?;
97 fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_IN_FLIGHT_DIR))?;
98 fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_RESPONSES_DIR))?;
99 fs::create_dir_all(&apps_dir)?;
100
101 let database_path = session_dir.join(TELEMETRY_DATABASE_FILE);
102 let store = SqliteStore::open(&database_path)?;
103 store.migrate()?;
104
105 let service_name = config
106 .service_name
107 .clone()
108 .or_else(|| std::env::var("CARGO_PKG_NAME").ok())
109 .unwrap_or_else(|| "tauri-app".to_string());
110 let service_version = config.service_version.clone();
111 let started_at = now_rfc3339()?;
112 store.create_session(&Session {
113 id: session_id.clone(),
114 session_name: config.session_name.clone(),
115 service_name: service_name.clone(),
116 service_version: service_version.clone(),
117 app_identifier: app_identifier.clone(),
118 pid: Some(i64::from(pid)),
119 started_at: started_at.clone(),
120 ended_at: None,
121 schema_version: SQLITE_SCHEMA_VERSION,
122 auditaur_version: Some(env!("CARGO_PKG_VERSION").to_string()),
123 })?;
124
125 let discovery_path = apps_dir.join(format!("{instance_id}.json"));
126 let discovery = DiscoveryFile {
127 schema_version: 1,
128 instance_id,
129 session_id: session_id.clone(),
130 service_name,
131 service_version,
132 app_identifier,
133 pid,
134 started_at,
135 database_path: database_path.to_string_lossy().to_string(),
136 capabilities: vec![
137 "logs".to_string(),
138 "traces".to_string(),
139 "frontend_errors".to_string(),
140 "ipc".to_string(),
141 "events".to_string(),
142 "windows".to_string(),
143 DRIVE_BRIDGE_CAPABILITY.to_string(),
144 ],
145 last_heartbeat_at: now_rfc3339()?,
146 };
147 write_discovery(&discovery_path, &discovery)?;
148
149 let heartbeat_alive = Arc::new(AtomicBool::new(true));
150 let bridge_notifier_started = Arc::new(AtomicBool::new(false));
151 let bridge_notifier_alive = Arc::new(AtomicBool::new(true));
152 start_heartbeat(
153 discovery_path.clone(),
154 discovery,
155 heartbeat_alive.clone(),
156 Duration::from_millis(config.heartbeat_interval_ms.max(1_000)),
157 );
158 let store = Arc::new(Mutex::new(store));
159 crate::tracing::install_sink(session_id.clone(), store.clone());
160 install_panic_sink(session_id.clone(), store.clone());
161
162 Ok(Self {
163 session_id: Some(session_id),
164 enabled: true,
165 store: Some(store),
166 discovery_path: Some(discovery_path),
167 bridge_dir: Some(bridge_dir),
168 heartbeat_alive: Some(heartbeat_alive),
169 bridge_notifier_started,
170 bridge_notifier_alive,
171 redact_defaults: config.redact_defaults,
172 extra_redaction_keys: config.extra_redaction_keys,
173 retention_limits: RetentionLimits {
174 max_session_bytes: config.max_session_bytes,
175 ..RetentionLimits::default()
176 },
177 })
178 }
179
180 pub fn export_batch(&self, batch: OTelBatch) -> Result<(), AuditaurError> {
181 if !self.enabled {
182 return Ok(());
183 }
184
185 let Some(store) = &self.store else {
186 return Err(AuditaurError::new(
187 "Auditaur is enabled without an initialized store.",
188 ));
189 };
190 let session_id = self
191 .session_id
192 .as_deref()
193 .ok_or_else(|| AuditaurError::new("Auditaur is enabled without a session id."))?;
194 let store = store
195 .lock()
196 .map_err(|_| AuditaurError::new("Auditaur SQLite store lock was poisoned."))?;
197
198 for mut span in batch.spans {
199 if span.session_id.is_empty() {
200 span.session_id = session_id.to_string();
201 }
202 span.attributes = self.redact_value(&span.attributes);
203 store.insert_span(&span)?;
204 }
205
206 for mut event in batch.span_events {
207 if event.session_id.is_empty() {
208 event.session_id = session_id.to_string();
209 }
210 event.attributes = self.redact_value(&event.attributes);
211 store.insert_span_event(&event)?;
212 }
213
214 for mut log in batch.logs {
215 if log.session_id.is_empty() {
216 log.session_id = session_id.to_string();
217 }
218 log.attributes = self.redact_value(&log.attributes);
219 log.body_json = log.body_json.as_ref().map(|value| self.redact_value(value));
220 store.insert_log(&log)?;
221 }
222
223 for mut error in batch.frontend_errors {
224 if error.session_id.is_empty() {
225 error.session_id = session_id.to_string();
226 }
227 error.attributes = self.redact_value(&error.attributes);
228 store.insert_frontend_error(&error)?;
229 }
230
231 for mut call in batch.tauri_ipc_calls {
232 if call.session_id.is_empty() {
233 call.session_id = session_id.to_string();
234 }
235 if let Some(args_json) = &call.args_json {
236 let outcome = auditaur_core::redaction::redact_json_with_options(
237 args_json,
238 self.redact_defaults,
239 &self.extra_redaction_keys,
240 );
241 call.args_json = Some(outcome.value);
242 call.args_redacted = outcome.redacted;
243 } else {
244 call.args_redacted = false;
245 }
246 store.insert_tauri_ipc_call(&call)?;
247 }
248
249 for mut event in batch.tauri_events {
250 if event.session_id.is_empty() {
251 event.session_id = session_id.to_string();
252 }
253 if let Some(payload_json) = &event.payload_json {
254 let outcome = auditaur_core::redaction::redact_json_with_options(
255 payload_json,
256 self.redact_defaults,
257 &self.extra_redaction_keys,
258 );
259 event.payload_json = Some(outcome.value);
260 event.payload_redacted = outcome.redacted;
261 } else {
262 event.payload_redacted = false;
263 }
264 store.insert_tauri_event(&event)?;
265 }
266
267 store.enforce_retention(self.retention_limits)?;
268
269 Ok(())
270 }
271
272 pub fn tracing_layer(&self) -> crate::tracing::AuditaurTracingLayer {
273 match (&self.session_id, &self.store) {
274 (Some(session_id), Some(store)) => {
275 crate::tracing::AuditaurTracingLayer::with_sink(session_id.clone(), store.clone())
276 }
277 _ => crate::tracing::tracing_layer(),
278 }
279 }
280
281 pub(crate) fn store(&self) -> Option<Arc<Mutex<SqliteStore>>> {
282 self.store.clone()
283 }
284
285 pub(crate) fn bridge_dir_path(&self) -> Option<PathBuf> {
286 self.bridge_dir.clone()
287 }
288
289 pub(crate) fn start_bridge_notifier_if_needed(&self) -> Option<(PathBuf, Arc<AtomicBool>)> {
290 let bridge_dir = self.bridge_dir_path()?;
291 if self.bridge_notifier_started.swap(true, Ordering::SeqCst) {
292 return None;
293 }
294 Some((bridge_dir, self.bridge_notifier_alive.clone()))
295 }
296
297 pub fn register_drive_bridge(
298 &self,
299 window_label: Option<String>,
300 ) -> Result<DriveBridgeStatus, AuditaurError> {
301 let bridge_dir = self.bridge_dir()?;
302 ensure_bridge_dirs(&bridge_dir)?;
303 sweep_stale_bridge_files(&bridge_dir)?;
304 let now = now_unix_nanos();
305 let registered_at_unix_nanos = read_drive_bridge_status(&bridge_dir)
306 .ok()
307 .flatten()
308 .map(|status| status.registered_at_unix_nanos)
309 .unwrap_or(now);
310 let status = DriveBridgeStatus {
311 schema_version: 1,
312 protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
313 active: true,
314 window_label: window_label.clone(),
315 registered_at_unix_nanos,
316 last_heartbeat_unix_nanos: now,
317 targets: vec![auditaur_core::drive_bridge::DriveBridgeTarget {
318 target_id: "auditaur-bridge".to_string(),
319 title: "Auditaur in-app drive bridge".to_string(),
320 window_label,
321 active: true,
322 last_heartbeat_unix_nanos: now,
323 }],
324 };
325 write_drive_bridge_status(&bridge_dir, &status)?;
326 Ok(status)
327 }
328
329 pub fn poll_drive_bridge_request(
330 &self,
331 window_label: Option<String>,
332 ) -> Result<Option<DriveBridgeRequest>, AuditaurError> {
333 let bridge_dir = self.bridge_dir()?;
334 ensure_bridge_dirs(&bridge_dir)?;
335 sweep_stale_bridge_files(&bridge_dir)?;
336 let requests_dir = bridge_dir.join(DRIVE_BRIDGE_REQUESTS_DIR);
337 let in_flight_dir = bridge_dir.join(DRIVE_BRIDGE_IN_FLIGHT_DIR);
338 tracing::debug!(
339 window_label = window_label.as_deref(),
340 requests_dir = %requests_dir.display(),
341 "Auditaur drive bridge poll started"
342 );
343 let Some(request_path) =
344 first_matching_request_file(&requests_dir, window_label.as_deref())?
345 else {
346 return Ok(None);
347 };
348 tracing::debug!(
349 window_label = window_label.as_deref(),
350 request_path = %request_path.display(),
351 "Auditaur drive bridge poll matched request"
352 );
353 let file_name = request_path
354 .file_name()
355 .ok_or_else(|| AuditaurError::new("drive bridge request path had no file name"))?;
356 let in_flight_path = in_flight_dir.join(file_name);
357 match fs::rename(&request_path, &in_flight_path) {
358 Ok(()) => {}
359 Err(error) if error.kind() == ErrorKind::NotFound => {
360 tracing::debug!(
361 request_path = %request_path.display(),
362 "Auditaur drive bridge request was already claimed"
363 );
364 return Ok(None);
365 }
366 Err(error) => return Err(error.into()),
367 }
368 let request = fs::read(&in_flight_path)?;
369 let request: DriveBridgeRequest = serde_json::from_slice(&request)?;
370 tracing::debug!(
371 action = request.action.as_str(),
372 request_id = request.request_id.as_str(),
373 selector = request.selector.as_deref(),
374 "Auditaur drive bridge poll returning request"
375 );
376 Ok(Some(request))
377 }
378
379 pub fn complete_drive_bridge_request(
380 &self,
381 response: DriveBridgeResponse,
382 ) -> Result<(), AuditaurError> {
383 let bridge_dir = self.bridge_dir()?;
384 ensure_bridge_dirs(&bridge_dir)?;
385 let response_path = bridge_dir
386 .join(DRIVE_BRIDGE_RESPONSES_DIR)
387 .join(format!("{}.json", safe_bridge_id(&response.request_id)?));
388 atomic_write_json(&response_path, &response)?;
389 let in_flight_path = bridge_dir
390 .join(DRIVE_BRIDGE_IN_FLIGHT_DIR)
391 .join(format!("{}.json", safe_bridge_id(&response.request_id)?));
392 if in_flight_path.exists() {
393 fs::remove_file(in_flight_path)?;
394 }
395 Ok(())
396 }
397
398 fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
399 auditaur_core::redaction::redact_json_with_options(
400 value,
401 self.redact_defaults,
402 &self.extra_redaction_keys,
403 )
404 .value
405 }
406
407 fn bridge_dir(&self) -> Result<PathBuf, AuditaurError> {
408 if !self.enabled {
409 return Err(AuditaurError::new("Auditaur drive bridge is disabled."));
410 }
411 self.bridge_dir
412 .clone()
413 .ok_or_else(|| AuditaurError::new("Auditaur drive bridge has no session directory."))
414 }
415}
416
417impl Drop for AuditaurState {
418 fn drop(&mut self) {
419 if let Some(alive) = &self.heartbeat_alive {
420 alive.store(false, Ordering::SeqCst);
421 }
422 self.bridge_notifier_alive.store(false, Ordering::SeqCst);
423 if let Some(session_id) = &self.session_id {
424 crate::tracing::clear_sink(session_id);
425 clear_panic_sink(session_id);
426 }
427 if let Some(path) = &self.discovery_path {
428 let _ = fs::remove_file(path);
429 }
430 }
431}
432
433fn default_enabled() -> bool {
434 cfg!(debug_assertions) || std::env::var("AUDITAUR").ok().as_deref() == Some("1")
435}
436
437fn now_rfc3339() -> Result<String, AuditaurError> {
438 OffsetDateTime::now_utc()
439 .format(&Rfc3339)
440 .map_err(|error| AuditaurError::new(error.to_string()))
441}
442
443fn write_discovery(path: &PathBuf, discovery: &DiscoveryFile) -> Result<(), AuditaurError> {
444 atomic_write_json(path, discovery)?;
445 Ok(())
446}
447
448fn ensure_bridge_dirs(bridge_dir: &Path) -> Result<(), AuditaurError> {
449 fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_REQUESTS_DIR))?;
450 fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_IN_FLIGHT_DIR))?;
451 fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_RESPONSES_DIR))?;
452 Ok(())
453}
454
455fn first_matching_request_file(
456 dir: &Path,
457 window_label: Option<&str>,
458) -> Result<Option<PathBuf>, AuditaurError> {
459 let mut paths = fs::read_dir(dir)?
460 .filter_map(Result::ok)
461 .map(|entry| entry.path())
462 .filter(|path| path.extension().and_then(|value| value.to_str()) == Some("json"))
463 .collect::<Vec<_>>();
464 paths.sort();
465 for path in paths {
466 let bytes = match fs::read(&path) {
467 Ok(bytes) => bytes,
468 Err(_) => continue,
469 };
470 let Ok(request) = serde_json::from_slice::<DriveBridgeRequest>(&bytes) else {
471 continue;
472 };
473 if request
474 .window_label
475 .as_deref()
476 .is_none_or(|requested| Some(requested) == window_label)
477 {
478 return Ok(Some(path));
479 }
480 }
481 Ok(None)
482}
483
484fn sweep_stale_bridge_files(bridge_dir: &Path) -> Result<(), AuditaurError> {
485 let now = now_unix_nanos();
486 for dirname in [
487 DRIVE_BRIDGE_REQUESTS_DIR,
488 DRIVE_BRIDGE_IN_FLIGHT_DIR,
489 DRIVE_BRIDGE_RESPONSES_DIR,
490 ] {
491 let dir = bridge_dir.join(dirname);
492 if !dir.exists() {
493 continue;
494 }
495 for entry in fs::read_dir(dir)?.filter_map(Result::ok) {
496 let path = entry.path();
497 if !is_bridge_json_or_temp_file(&path) {
498 continue;
499 }
500 let modified = entry
501 .metadata()
502 .and_then(|metadata| metadata.modified())
503 .ok()
504 .and_then(|modified| modified.duration_since(std::time::UNIX_EPOCH).ok())
505 .and_then(|duration| i64::try_from(duration.as_nanos()).ok())
506 .unwrap_or(now);
507 if now.saturating_sub(modified) > DRIVE_BRIDGE_STALE_FILE_NANOS {
508 let _ = fs::remove_file(path);
509 }
510 }
511 }
512 Ok(())
513}
514
515fn is_bridge_json_or_temp_file(path: &Path) -> bool {
516 if path.extension().and_then(|value| value.to_str()) == Some("json") {
517 return true;
518 }
519 path.file_name()
520 .and_then(|value| value.to_str())
521 .is_some_and(|name| name.contains(".json.") && name.ends_with(".tmp"))
522}
523
524fn safe_bridge_id(id: &str) -> Result<&str, AuditaurError> {
525 if id.is_empty()
526 || !id
527 .chars()
528 .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
529 {
530 return Err(AuditaurError::new(
531 "drive bridge request id contains invalid characters.",
532 ));
533 }
534 Ok(id)
535}
536
537fn read_drive_bridge_status(bridge_dir: &Path) -> Result<Option<DriveBridgeStatus>, AuditaurError> {
538 let status_path = bridge_dir.join(DRIVE_BRIDGE_STATUS_FILE);
539 if !status_path.exists() {
540 return Ok(None);
541 }
542 let bytes = fs::read(status_path)?;
543 serde_json::from_slice(&bytes).map(Some).map_err(Into::into)
544}
545
546fn write_drive_bridge_status(
547 bridge_dir: &Path,
548 status: &DriveBridgeStatus,
549) -> Result<(), AuditaurError> {
550 atomic_write_json(&bridge_dir.join(DRIVE_BRIDGE_STATUS_FILE), status)?;
551 Ok(())
552}
553
554fn atomic_write_json<T: serde::Serialize>(path: &Path, value: &T) -> Result<(), AuditaurError> {
555 let bytes = serde_json::to_vec_pretty(value)?;
556 atomic_write(path, &bytes)
557}
558
559fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), AuditaurError> {
560 let parent = path
561 .parent()
562 .ok_or_else(|| AuditaurError::new("atomic write target has no parent directory"))?;
563 fs::create_dir_all(parent)?;
564 let file_name = path
565 .file_name()
566 .and_then(|value| value.to_str())
567 .ok_or_else(|| AuditaurError::new("atomic write target has no UTF-8 file name"))?;
568 let temp_path = parent.join(format!(
569 ".{file_name}.{}.tmp",
570 now_unix_nanos().saturating_abs()
571 ));
572 fs::write(&temp_path, bytes)?;
573 fs::rename(temp_path, path)?;
574 Ok(())
575}
576
577fn start_heartbeat(
578 discovery_path: PathBuf,
579 mut discovery: DiscoveryFile,
580 alive: Arc<AtomicBool>,
581 interval: Duration,
582) {
583 thread::spawn(move || {
584 while alive.load(Ordering::SeqCst) {
585 thread::sleep(interval);
586 if !alive.load(Ordering::SeqCst) {
587 break;
588 }
589 if let Ok(timestamp) = now_rfc3339() {
590 discovery.last_heartbeat_at = timestamp;
591 let _ = write_discovery(&discovery_path, &discovery);
592 }
593 }
594 });
595}
596
597fn install_panic_sink(session_id: String, store: Arc<Mutex<SqliteStore>>) {
598 if let Ok(mut sink) = PANIC_SINK.lock() {
599 *sink = Some(PanicSink { session_id, store });
600 }
601 if PANIC_HOOK_INSTALLED.swap(true, Ordering::SeqCst) {
602 return;
603 }
604 let previous = panic::take_hook();
605 panic::set_hook(Box::new(move |info| {
606 record_panic(info);
607 previous(info);
608 }));
609}
610
611fn clear_panic_sink(session_id: &str) {
612 let Ok(mut sink) = PANIC_SINK.lock() else {
613 return;
614 };
615 if sink
616 .as_ref()
617 .map(|sink| sink.session_id.as_str() == session_id)
618 .unwrap_or(false)
619 {
620 *sink = None;
621 }
622}
623
624fn active_panic_sink() -> Option<PanicSink> {
625 PANIC_SINK.lock().ok().and_then(|sink| sink.clone())
626}
627
628fn record_panic(info: &panic::PanicHookInfo<'_>) {
629 let Some(sink) = active_panic_sink() else {
630 return;
631 };
632 let Ok(store) = sink.store.try_lock() else {
633 return;
634 };
635 let message = panic_message(info);
636 let location = info.location().map(|location| {
637 format!(
638 "{}:{}:{}",
639 location.file(),
640 location.line(),
641 location.column()
642 )
643 });
644 let timestamp = now_unix_nanos();
645 let attributes = json!({
646 "auditaur.source": "panic_hook",
647 "exception.escaped": true,
648 "code.filepath": info.location().map(|location| location.file()),
649 "code.lineno": info.location().map(|location| location.line()),
650 "code.column": info.location().map(|location| location.column()),
651 });
652 let _ = store.insert_log(&LogRecord {
653 session_id: sink.session_id.clone(),
654 timestamp_unix_nanos: timestamp,
655 observed_timestamp_unix_nanos: None,
656 severity_text: Some("ERROR".to_string()),
657 severity_number: Some(17),
658 body: Some(format!("Rust panic: {message}")),
659 body_json: Some(json!({
660 "message": message,
661 "location": location,
662 })),
663 trace_id: None,
664 span_id: None,
665 scope_name: Some("panic".to_string()),
666 scope_version: None,
667 attributes: attributes.clone(),
668 source: TelemetrySource::Plugin,
669 });
670 let _ = store.insert_frontend_error(&FrontendError {
671 session_id: sink.session_id,
672 timestamp_unix_nanos: timestamp,
673 message,
674 stack: location,
675 filename: info.location().map(|location| location.file().to_string()),
676 line_number: info.location().map(|location| i64::from(location.line())),
677 column_number: info.location().map(|location| i64::from(location.column())),
678 error_type: Some("RustPanic".to_string()),
679 trace_id: None,
680 span_id: None,
681 window_label: None,
682 attributes,
683 });
684}
685
686fn panic_message(info: &panic::PanicHookInfo<'_>) -> String {
687 info.payload()
688 .downcast_ref::<&str>()
689 .map(|message| (*message).to_string())
690 .or_else(|| info.payload().downcast_ref::<String>().cloned())
691 .unwrap_or_else(|| "panic payload was not a string".to_string())
692}
693
694fn now_unix_nanos() -> i64 {
695 let now = std::time::SystemTime::now()
696 .duration_since(std::time::UNIX_EPOCH)
697 .unwrap_or_default();
698 i64::try_from(now.as_nanos()).unwrap_or(i64::MAX)
699}
700
701#[cfg(test)]
702mod tests {
703 use super::AuditaurState;
704 use auditaur_collector::{exporter_sqlite::SqliteStore, receiver::OTelBatch};
705 use auditaur_core::{
706 drive_bridge::{
707 DriveBridgeRequest, DriveBridgeResponse, DRIVE_BRIDGE_IN_FLIGHT_DIR,
708 DRIVE_BRIDGE_PROTOCOL_VERSION, DRIVE_BRIDGE_REQUESTS_DIR, DRIVE_BRIDGE_RESPONSES_DIR,
709 DRIVE_BRIDGE_STALE_FILE_NANOS,
710 },
711 model::{LogRecord, SpanEventRecord, SpanRecord, TauriEventRecord, TauriIpcCall},
712 storage::{FrontendErrorQuery, SpanEventQuery},
713 AuditaurConfig,
714 };
715 use serde_json::json;
716 use std::sync::atomic::Ordering;
717 use std::time::{Duration, SystemTime};
718 use tempfile::TempDir;
719
720 #[test]
721 fn initializes_session_database_and_discovery_file() {
722 let _guard = crate::test_support::global_state_lock();
723 let temp = TempDir::new().unwrap();
724 let state = AuditaurState::initialize(
725 AuditaurConfig {
726 enabled: Some(true),
727 service_name: Some("plugin-test".to_string()),
728 session_name: Some("plugin-session".to_string()),
729 data_dir: Some(temp.path().to_path_buf()),
730 ..AuditaurConfig::default()
731 },
732 123,
733 Some("dev.auditaur.test".to_string()),
734 )
735 .unwrap();
736 if let Some(session_id) = state.session_id.as_deref() {
737 crate::tracing::clear_sink(session_id);
738 }
739
740 assert!(state.session_id.is_some());
741 let db = temp
742 .path()
743 .join("sessions")
744 .join(state.session_id.as_ref().unwrap())
745 .join("telemetry.sqlite");
746 let store = SqliteStore::open(db).unwrap();
747 let session = store
748 .get_session(state.session_id.as_ref().unwrap())
749 .unwrap()
750 .unwrap();
751 assert_eq!(session.session_name.as_deref(), Some("plugin-session"));
752 assert_eq!(
753 temp.path().join("apps").read_dir().unwrap().count(),
754 1,
755 "discovery file should be written"
756 );
757 }
758
759 #[test]
760 fn exports_redacted_batch_to_sqlite() {
761 let _guard = crate::test_support::global_state_lock();
762 let temp = TempDir::new().unwrap();
763 let state = AuditaurState::initialize(
764 AuditaurConfig {
765 enabled: Some(true),
766 service_name: Some("plugin-test".to_string()),
767 data_dir: Some(temp.path().to_path_buf()),
768 ..AuditaurConfig::default()
769 },
770 123,
771 None,
772 )
773 .unwrap();
774 let session_id = state.session_id.clone().unwrap();
775 crate::tracing::clear_sink(&session_id);
776
777 state
778 .export_batch(OTelBatch {
779 logs: vec![LogRecord {
780 session_id: String::new(),
781 timestamp_unix_nanos: 1,
782 observed_timestamp_unix_nanos: None,
783 severity_text: Some("INFO".to_string()),
784 severity_number: Some(9),
785 body: Some("hello".to_string()),
786 body_json: Some(json!({ "token": "secret" })),
787 trace_id: None,
788 span_id: None,
789 scope_name: None,
790 scope_version: None,
791 attributes: json!({ "api_key": "secret" }),
792 source: auditaur_core::model::TelemetrySource::Frontend,
793 }],
794 spans: vec![SpanRecord {
795 session_id: String::new(),
796 trace_id: "trace".to_string(),
797 span_id: "span".to_string(),
798 parent_span_id: None,
799 name: "agentive.run".to_string(),
800 kind: Some("internal".to_string()),
801 start_time_unix_nanos: 1,
802 end_time_unix_nanos: Some(4),
803 status_code: Some("OK".to_string()),
804 status_message: None,
805 scope_name: Some("agentive".to_string()),
806 scope_version: Some("0.2.1".to_string()),
807 attributes: json!({ "agentive.run_id": "run", "token": "secret" }),
808 source: auditaur_core::model::TelemetrySource::ThirdPartyOtel,
809 }],
810 span_events: vec![SpanEventRecord {
811 session_id: String::new(),
812 trace_id: "trace".to_string(),
813 span_id: "span".to_string(),
814 name: "agent-event".to_string(),
815 timestamp_unix_nanos: 2,
816 attributes: json!({ "token": "secret", "summary": "done" }),
817 }],
818 tauri_ipc_calls: vec![TauriIpcCall {
819 session_id: String::new(),
820 timestamp_unix_nanos: 2,
821 duration_ms: Some(1.0),
822 command: "save".to_string(),
823 status: "OK".to_string(),
824 error_message: None,
825 trace_id: Some("trace".to_string()),
826 span_id: Some("span".to_string()),
827 window_label: Some("main".to_string()),
828 args_json: Some(json!({ "password": "secret" })),
829 args_redacted: true,
830 result_summary: Some("ok".to_string()),
831 }],
832 tauri_events: vec![TauriEventRecord {
833 session_id: String::new(),
834 timestamp_unix_nanos: 3,
835 event_name: "save".to_string(),
836 direction: "emit".to_string(),
837 target: None,
838 trace_id: Some("trace".to_string()),
839 span_id: Some("event-span".to_string()),
840 window_label: Some("main".to_string()),
841 payload_summary: Some("payload".to_string()),
842 payload_json: Some(json!({ "token": "secret" })),
843 payload_redacted: true,
844 }],
845 ..OTelBatch::default()
846 })
847 .unwrap();
848
849 let db = temp
850 .path()
851 .join("sessions")
852 .join(&session_id)
853 .join("telemetry.sqlite");
854 let store = SqliteStore::open(db).unwrap();
855 let logs = store
856 .list_logs(&auditaur_core::storage::LogQuery::default())
857 .unwrap();
858
859 let log = logs
860 .iter()
861 .find(|log| log.body.as_deref() == Some("hello"))
862 .expect("exported frontend log should be present");
863 assert_eq!(log.session_id, session_id);
864 assert_eq!(log.attributes["api_key"], "[REDACTED]");
865 assert_eq!(log.body_json.as_ref().unwrap()["token"], "[REDACTED]");
866
867 let span_events = store.list_span_events(&SpanEventQuery::default()).unwrap();
868 assert_eq!(span_events[0].session_id, session_id);
869 assert_eq!(span_events[0].attributes["token"], "[REDACTED]");
870
871 let ipc = store
872 .list_tauri_ipc_calls(&auditaur_core::storage::TauriIpcQuery::default())
873 .unwrap();
874 assert_eq!(ipc[0].session_id, session_id);
875 assert_eq!(ipc[0].args_json.as_ref().unwrap()["password"], "[REDACTED]");
876
877 let events = store
878 .list_tauri_events(&auditaur_core::storage::TauriEventQuery::default())
879 .unwrap();
880 assert_eq!(
881 events[0].payload_json.as_ref().unwrap()["token"],
882 "[REDACTED]"
883 );
884 }
885
886 #[test]
887 fn drive_bridge_register_poll_and_complete_round_trip() {
888 let _guard = crate::test_support::global_state_lock();
889 let temp = TempDir::new().unwrap();
890 let state = AuditaurState::initialize(
891 AuditaurConfig {
892 enabled: Some(true),
893 service_name: Some("bridge-test".to_string()),
894 data_dir: Some(temp.path().to_path_buf()),
895 ..AuditaurConfig::default()
896 },
897 123,
898 None,
899 )
900 .unwrap();
901 let session_id = state.session_id.clone().unwrap();
902 crate::tracing::clear_sink(&session_id);
903
904 let status = state
905 .register_drive_bridge(Some("main".to_string()))
906 .unwrap();
907 assert!(status.active);
908 assert_eq!(status.window_label.as_deref(), Some("main"));
909
910 let bridge_dir = state.bridge_dir().unwrap();
911 let request = DriveBridgeRequest {
912 schema_version: 1,
913 protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
914 request_id: "request-1".to_string(),
915 action: "exists".to_string(),
916 selector: Some("#ready".to_string()),
917 value: None,
918 visible_only: true,
919 window_label: Some("main".to_string()),
920 test_id: Some("test".to_string()),
921 step_id: Some("step".to_string()),
922 created_at_unix_nanos: 1,
923 };
924 std::fs::write(
925 bridge_dir
926 .join(DRIVE_BRIDGE_REQUESTS_DIR)
927 .join("request-1.json"),
928 serde_json::to_vec_pretty(&request).unwrap(),
929 )
930 .unwrap();
931
932 let polled = state
933 .poll_drive_bridge_request(Some("main".to_string()))
934 .unwrap()
935 .unwrap();
936 assert_eq!(polled.request_id, "request-1");
937 assert!(!bridge_dir
938 .join(DRIVE_BRIDGE_REQUESTS_DIR)
939 .join("request-1.json")
940 .exists());
941 assert!(bridge_dir
942 .join(DRIVE_BRIDGE_IN_FLIGHT_DIR)
943 .join("request-1.json")
944 .exists());
945
946 state
947 .complete_drive_bridge_request(DriveBridgeResponse {
948 schema_version: 1,
949 protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
950 request_id: "request-1".to_string(),
951 action: "exists".to_string(),
952 selector: Some("#ready".to_string()),
953 visible_only: true,
954 ok: true,
955 payload: json!({ "exists": true }),
956 error: None,
957 completed_at_unix_nanos: 2,
958 })
959 .unwrap();
960
961 assert!(!bridge_dir
962 .join(DRIVE_BRIDGE_IN_FLIGHT_DIR)
963 .join("request-1.json")
964 .exists());
965 let response: DriveBridgeResponse = serde_json::from_slice(
966 &std::fs::read(
967 bridge_dir
968 .join(DRIVE_BRIDGE_RESPONSES_DIR)
969 .join("request-1.json"),
970 )
971 .unwrap(),
972 )
973 .unwrap();
974 assert_eq!(response.payload["exists"], true);
975 }
976
977 #[test]
978 fn drive_bridge_rejects_unsafe_response_request_id() {
979 let _guard = crate::test_support::global_state_lock();
980 let temp = TempDir::new().unwrap();
981 let state = AuditaurState::initialize(
982 AuditaurConfig {
983 enabled: Some(true),
984 service_name: Some("bridge-test".to_string()),
985 data_dir: Some(temp.path().to_path_buf()),
986 ..AuditaurConfig::default()
987 },
988 123,
989 None,
990 )
991 .unwrap();
992 let session_id = state.session_id.clone().unwrap();
993 crate::tracing::clear_sink(&session_id);
994
995 let error = state
996 .complete_drive_bridge_request(DriveBridgeResponse {
997 schema_version: 1,
998 protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
999 request_id: "../escape".to_string(),
1000 action: "exists".to_string(),
1001 selector: None,
1002 visible_only: false,
1003 ok: false,
1004 payload: json!({ "ok": false }),
1005 error: Some("bad".to_string()),
1006 completed_at_unix_nanos: 2,
1007 })
1008 .unwrap_err();
1009 assert!(error.to_string().contains("invalid characters"));
1010 }
1011
1012 #[test]
1013 fn drive_bridge_notifier_starts_once_and_stops_with_state() {
1014 let _guard = crate::test_support::global_state_lock();
1015 let temp = TempDir::new().unwrap();
1016 let state = AuditaurState::initialize(
1017 AuditaurConfig {
1018 enabled: Some(true),
1019 service_name: Some("bridge-test".to_string()),
1020 data_dir: Some(temp.path().to_path_buf()),
1021 ..AuditaurConfig::default()
1022 },
1023 123,
1024 None,
1025 )
1026 .unwrap();
1027 let session_id = state.session_id.clone().unwrap();
1028 crate::tracing::clear_sink(&session_id);
1029
1030 state
1031 .register_drive_bridge(Some("main".to_string()))
1032 .unwrap();
1033 let first = state.start_bridge_notifier_if_needed();
1034 assert!(first.is_some(), "first bridge registration starts notifier");
1035 let (_, alive) = first.unwrap();
1036 assert!(alive.load(Ordering::SeqCst));
1037 assert!(
1038 state.start_bridge_notifier_if_needed().is_none(),
1039 "notifier startup is idempotent"
1040 );
1041 drop(state);
1042 assert!(
1043 !alive.load(Ordering::SeqCst),
1044 "notifier lifetime flag stops with plugin state"
1045 );
1046 }
1047
1048 #[test]
1049 fn drive_bridge_sweeps_stale_atomic_temp_files() {
1050 let _guard = crate::test_support::global_state_lock();
1051 let temp = TempDir::new().unwrap();
1052 let state = AuditaurState::initialize(
1053 AuditaurConfig {
1054 enabled: Some(true),
1055 service_name: Some("bridge-test".to_string()),
1056 data_dir: Some(temp.path().to_path_buf()),
1057 ..AuditaurConfig::default()
1058 },
1059 123,
1060 None,
1061 )
1062 .unwrap();
1063 let session_id = state.session_id.clone().unwrap();
1064 crate::tracing::clear_sink(&session_id);
1065
1066 let bridge_dir = state.bridge_dir().unwrap();
1067 let stale_temp = bridge_dir
1068 .join(DRIVE_BRIDGE_REQUESTS_DIR)
1069 .join("request-1.json.123.tmp");
1070 std::fs::write(&stale_temp, b"partial").unwrap();
1071 std::fs::OpenOptions::new()
1072 .write(true)
1073 .open(&stale_temp)
1074 .unwrap()
1075 .set_modified(
1076 SystemTime::now()
1077 - Duration::from_nanos(
1078 u64::try_from(DRIVE_BRIDGE_STALE_FILE_NANOS).unwrap() + 1_000_000,
1079 ),
1080 )
1081 .unwrap();
1082
1083 state
1084 .register_drive_bridge(Some("main".to_string()))
1085 .unwrap();
1086 assert!(
1087 !stale_temp.exists(),
1088 "stale atomic-write temp files should be reclaimed"
1089 );
1090 }
1091
1092 #[test]
1093 fn panic_hook_records_and_clears_with_state() {
1094 let _guard = crate::test_support::global_state_lock();
1095 let temp = TempDir::new().unwrap();
1096 let state = AuditaurState::initialize(
1097 AuditaurConfig {
1098 enabled: Some(true),
1099 service_name: Some("panic-test".to_string()),
1100 data_dir: Some(temp.path().to_path_buf()),
1101 ..AuditaurConfig::default()
1102 },
1103 123,
1104 None,
1105 )
1106 .unwrap();
1107 let session_id = state.session_id.clone().unwrap();
1108 crate::tracing::clear_sink(&session_id);
1109
1110 let _ = std::panic::catch_unwind(|| panic!("intentional auditaur panic test"));
1111
1112 let store = store_for(&temp, &session_id);
1113 let errors = store
1114 .list_frontend_errors(&FrontendErrorQuery::default())
1115 .unwrap();
1116 assert_eq!(errors.len(), 1);
1117 assert_eq!(errors[0].error_type.as_deref(), Some("RustPanic"));
1118 assert_eq!(errors[0].message, "intentional auditaur panic test");
1119
1120 drop(state);
1121 let _ = std::panic::catch_unwind(|| panic!("after drop"));
1122 let errors_after_drop = store
1123 .list_frontend_errors(&FrontendErrorQuery::default())
1124 .unwrap();
1125 assert_eq!(errors_after_drop.len(), 1);
1126 }
1127
1128 fn store_for(temp: &TempDir, session_id: &str) -> SqliteStore {
1129 let db = temp
1130 .path()
1131 .join("sessions")
1132 .join(session_id)
1133 .join("telemetry.sqlite");
1134 SqliteStore::open(db).unwrap()
1135 }
1136}