Skip to main content

tauri_plugin_auditaur/
state.rs

1use std::{
2    fs,
3    io::ErrorKind,
4    panic,
5    path::{Path, PathBuf},
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc, Mutex,
9    },
10    thread,
11    time::Duration,
12};
13
14use auditaur_collector::{
15    exporter_sqlite::{SqliteStore, SQLITE_SCHEMA_VERSION, TELEMETRY_DATABASE_FILE},
16    receiver::OTelBatch,
17    retention::RetentionLimits,
18};
19use auditaur_core::{
20    discovery::DiscoveryFile,
21    drive_bridge::{
22        DriveBridgeRequest, DriveBridgeResponse, DriveBridgeStatus, DRIVE_BRIDGE_CAPABILITY,
23        DRIVE_BRIDGE_DIR, DRIVE_BRIDGE_IN_FLIGHT_DIR, DRIVE_BRIDGE_PROTOCOL_VERSION,
24        DRIVE_BRIDGE_REQUESTS_DIR, DRIVE_BRIDGE_RESPONSES_DIR, DRIVE_BRIDGE_STALE_FILE_NANOS,
25        DRIVE_BRIDGE_STATUS_FILE,
26    },
27    model::{FrontendError, LogRecord, Session, TelemetrySource},
28    AuditaurConfig,
29};
30use serde_json::json;
31use time::{format_description::well_known::Rfc3339, OffsetDateTime};
32use uuid::Uuid;
33
34use crate::error::AuditaurError;
35
36static PANIC_SINK: Mutex<Option<PanicSink>> = Mutex::new(None);
37static PANIC_HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
38
39#[derive(Clone)]
40struct PanicSink {
41    session_id: String,
42    store: Arc<Mutex<SqliteStore>>,
43}
44
45pub struct AuditaurState {
46    pub session_id: Option<String>,
47    enabled: bool,
48    store: Option<Arc<Mutex<SqliteStore>>>,
49    discovery_path: Option<PathBuf>,
50    bridge_dir: Option<PathBuf>,
51    heartbeat_alive: Option<Arc<AtomicBool>>,
52    bridge_notifier_started: Arc<AtomicBool>,
53    bridge_notifier_alive: Arc<AtomicBool>,
54    redact_defaults: bool,
55    extra_redaction_keys: Vec<String>,
56    retention_limits: RetentionLimits,
57}
58
59impl AuditaurState {
60    pub fn initialize(
61        config: AuditaurConfig,
62        pid: u32,
63        app_identifier: Option<String>,
64    ) -> Result<Self, AuditaurError> {
65        let enabled = config.enabled.unwrap_or_else(default_enabled);
66        if !enabled {
67            return Ok(Self {
68                session_id: None,
69                enabled: false,
70                store: None,
71                discovery_path: None,
72                bridge_dir: None,
73                heartbeat_alive: None,
74                bridge_notifier_started: Arc::new(AtomicBool::new(false)),
75                bridge_notifier_alive: Arc::new(AtomicBool::new(false)),
76                redact_defaults: config.redact_defaults,
77                extra_redaction_keys: config.extra_redaction_keys,
78                retention_limits: RetentionLimits::default(),
79            });
80        }
81
82        if !cfg!(debug_assertions) && !config.allow_release_builds {
83            return Err(AuditaurError::new(
84                "Auditaur is disabled in release builds unless allow_release_builds is true.",
85            ));
86        }
87
88        let data_dir = auditaur_core::resolve_data_dir(config.data_dir.as_ref())
89            .map_err(|error| AuditaurError::new(error.to_string()))?;
90        let session_id = Uuid::new_v4().to_string();
91        let instance_id = Uuid::new_v4().to_string();
92        let session_dir = data_dir.join("sessions").join(&session_id);
93        let bridge_dir = session_dir.join(DRIVE_BRIDGE_DIR);
94        let apps_dir = data_dir.join("apps");
95        fs::create_dir_all(&session_dir)?;
96        fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_REQUESTS_DIR))?;
97        fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_IN_FLIGHT_DIR))?;
98        fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_RESPONSES_DIR))?;
99        fs::create_dir_all(&apps_dir)?;
100
101        let database_path = session_dir.join(TELEMETRY_DATABASE_FILE);
102        let store = SqliteStore::open(&database_path)?;
103        store.migrate()?;
104
105        let service_name = config
106            .service_name
107            .clone()
108            .or_else(|| std::env::var("CARGO_PKG_NAME").ok())
109            .unwrap_or_else(|| "tauri-app".to_string());
110        let service_version = config.service_version.clone();
111        let started_at = now_rfc3339()?;
112        store.create_session(&Session {
113            id: session_id.clone(),
114            session_name: config.session_name.clone(),
115            service_name: service_name.clone(),
116            service_version: service_version.clone(),
117            app_identifier: app_identifier.clone(),
118            pid: Some(i64::from(pid)),
119            started_at: started_at.clone(),
120            ended_at: None,
121            schema_version: SQLITE_SCHEMA_VERSION,
122            auditaur_version: Some(env!("CARGO_PKG_VERSION").to_string()),
123        })?;
124
125        let discovery_path = apps_dir.join(format!("{instance_id}.json"));
126        let discovery = DiscoveryFile {
127            schema_version: 1,
128            instance_id,
129            session_id: session_id.clone(),
130            service_name,
131            service_version,
132            app_identifier,
133            pid,
134            started_at,
135            database_path: database_path.to_string_lossy().to_string(),
136            capabilities: vec![
137                "logs".to_string(),
138                "traces".to_string(),
139                "frontend_errors".to_string(),
140                "ipc".to_string(),
141                "events".to_string(),
142                "windows".to_string(),
143                DRIVE_BRIDGE_CAPABILITY.to_string(),
144            ],
145            last_heartbeat_at: now_rfc3339()?,
146        };
147        write_discovery(&discovery_path, &discovery)?;
148
149        let heartbeat_alive = Arc::new(AtomicBool::new(true));
150        let bridge_notifier_started = Arc::new(AtomicBool::new(false));
151        let bridge_notifier_alive = Arc::new(AtomicBool::new(true));
152        start_heartbeat(
153            discovery_path.clone(),
154            discovery,
155            heartbeat_alive.clone(),
156            Duration::from_millis(config.heartbeat_interval_ms.max(1_000)),
157        );
158        let store = Arc::new(Mutex::new(store));
159        crate::tracing::install_sink(session_id.clone(), store.clone());
160        install_panic_sink(session_id.clone(), store.clone());
161
162        Ok(Self {
163            session_id: Some(session_id),
164            enabled: true,
165            store: Some(store),
166            discovery_path: Some(discovery_path),
167            bridge_dir: Some(bridge_dir),
168            heartbeat_alive: Some(heartbeat_alive),
169            bridge_notifier_started,
170            bridge_notifier_alive,
171            redact_defaults: config.redact_defaults,
172            extra_redaction_keys: config.extra_redaction_keys,
173            retention_limits: RetentionLimits {
174                max_session_bytes: config.max_session_bytes,
175                ..RetentionLimits::default()
176            },
177        })
178    }
179
180    pub fn export_batch(&self, batch: OTelBatch) -> Result<(), AuditaurError> {
181        if !self.enabled {
182            return Ok(());
183        }
184
185        let Some(store) = &self.store else {
186            return Err(AuditaurError::new(
187                "Auditaur is enabled without an initialized store.",
188            ));
189        };
190        let session_id = self
191            .session_id
192            .as_deref()
193            .ok_or_else(|| AuditaurError::new("Auditaur is enabled without a session id."))?;
194        let store = store
195            .lock()
196            .map_err(|_| AuditaurError::new("Auditaur SQLite store lock was poisoned."))?;
197
198        for mut span in batch.spans {
199            if span.session_id.is_empty() {
200                span.session_id = session_id.to_string();
201            }
202            span.attributes = self.redact_value(&span.attributes);
203            store.insert_span(&span)?;
204        }
205
206        for mut event in batch.span_events {
207            if event.session_id.is_empty() {
208                event.session_id = session_id.to_string();
209            }
210            event.attributes = self.redact_value(&event.attributes);
211            store.insert_span_event(&event)?;
212        }
213
214        for mut log in batch.logs {
215            if log.session_id.is_empty() {
216                log.session_id = session_id.to_string();
217            }
218            log.attributes = self.redact_value(&log.attributes);
219            log.body_json = log.body_json.as_ref().map(|value| self.redact_value(value));
220            store.insert_log(&log)?;
221        }
222
223        for mut error in batch.frontend_errors {
224            if error.session_id.is_empty() {
225                error.session_id = session_id.to_string();
226            }
227            error.attributes = self.redact_value(&error.attributes);
228            store.insert_frontend_error(&error)?;
229        }
230
231        for mut call in batch.tauri_ipc_calls {
232            if call.session_id.is_empty() {
233                call.session_id = session_id.to_string();
234            }
235            if let Some(args_json) = &call.args_json {
236                let outcome = auditaur_core::redaction::redact_json_with_options(
237                    args_json,
238                    self.redact_defaults,
239                    &self.extra_redaction_keys,
240                );
241                call.args_json = Some(outcome.value);
242                call.args_redacted = outcome.redacted;
243            } else {
244                call.args_redacted = false;
245            }
246            store.insert_tauri_ipc_call(&call)?;
247        }
248
249        for mut event in batch.tauri_events {
250            if event.session_id.is_empty() {
251                event.session_id = session_id.to_string();
252            }
253            if let Some(payload_json) = &event.payload_json {
254                let outcome = auditaur_core::redaction::redact_json_with_options(
255                    payload_json,
256                    self.redact_defaults,
257                    &self.extra_redaction_keys,
258                );
259                event.payload_json = Some(outcome.value);
260                event.payload_redacted = outcome.redacted;
261            } else {
262                event.payload_redacted = false;
263            }
264            store.insert_tauri_event(&event)?;
265        }
266
267        store.enforce_retention(self.retention_limits)?;
268
269        Ok(())
270    }
271
272    pub fn tracing_layer(&self) -> crate::tracing::AuditaurTracingLayer {
273        match (&self.session_id, &self.store) {
274            (Some(session_id), Some(store)) => {
275                crate::tracing::AuditaurTracingLayer::with_sink(session_id.clone(), store.clone())
276            }
277            _ => crate::tracing::tracing_layer(),
278        }
279    }
280
281    pub(crate) fn store(&self) -> Option<Arc<Mutex<SqliteStore>>> {
282        self.store.clone()
283    }
284
285    pub(crate) fn bridge_dir_path(&self) -> Option<PathBuf> {
286        self.bridge_dir.clone()
287    }
288
289    pub(crate) fn start_bridge_notifier_if_needed(&self) -> Option<(PathBuf, Arc<AtomicBool>)> {
290        let bridge_dir = self.bridge_dir_path()?;
291        if self.bridge_notifier_started.swap(true, Ordering::SeqCst) {
292            return None;
293        }
294        Some((bridge_dir, self.bridge_notifier_alive.clone()))
295    }
296
297    pub fn register_drive_bridge(
298        &self,
299        window_label: Option<String>,
300    ) -> Result<DriveBridgeStatus, AuditaurError> {
301        let bridge_dir = self.bridge_dir()?;
302        ensure_bridge_dirs(&bridge_dir)?;
303        sweep_stale_bridge_files(&bridge_dir)?;
304        let now = now_unix_nanos();
305        let registered_at_unix_nanos = read_drive_bridge_status(&bridge_dir)
306            .ok()
307            .flatten()
308            .map(|status| status.registered_at_unix_nanos)
309            .unwrap_or(now);
310        let status = DriveBridgeStatus {
311            schema_version: 1,
312            protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
313            active: true,
314            window_label: window_label.clone(),
315            registered_at_unix_nanos,
316            last_heartbeat_unix_nanos: now,
317            targets: vec![auditaur_core::drive_bridge::DriveBridgeTarget {
318                target_id: "auditaur-bridge".to_string(),
319                title: "Auditaur in-app drive bridge".to_string(),
320                window_label,
321                active: true,
322                last_heartbeat_unix_nanos: now,
323            }],
324        };
325        write_drive_bridge_status(&bridge_dir, &status)?;
326        Ok(status)
327    }
328
329    pub fn poll_drive_bridge_request(
330        &self,
331        window_label: Option<String>,
332    ) -> Result<Option<DriveBridgeRequest>, AuditaurError> {
333        let bridge_dir = self.bridge_dir()?;
334        ensure_bridge_dirs(&bridge_dir)?;
335        sweep_stale_bridge_files(&bridge_dir)?;
336        let requests_dir = bridge_dir.join(DRIVE_BRIDGE_REQUESTS_DIR);
337        let in_flight_dir = bridge_dir.join(DRIVE_BRIDGE_IN_FLIGHT_DIR);
338        tracing::debug!(
339            window_label = window_label.as_deref(),
340            requests_dir = %requests_dir.display(),
341            "Auditaur drive bridge poll started"
342        );
343        let Some(request_path) =
344            first_matching_request_file(&requests_dir, window_label.as_deref())?
345        else {
346            return Ok(None);
347        };
348        tracing::debug!(
349            window_label = window_label.as_deref(),
350            request_path = %request_path.display(),
351            "Auditaur drive bridge poll matched request"
352        );
353        let file_name = request_path
354            .file_name()
355            .ok_or_else(|| AuditaurError::new("drive bridge request path had no file name"))?;
356        let in_flight_path = in_flight_dir.join(file_name);
357        match fs::rename(&request_path, &in_flight_path) {
358            Ok(()) => {}
359            Err(error) if error.kind() == ErrorKind::NotFound => {
360                tracing::debug!(
361                    request_path = %request_path.display(),
362                    "Auditaur drive bridge request was already claimed"
363                );
364                return Ok(None);
365            }
366            Err(error) => return Err(error.into()),
367        }
368        let request = fs::read(&in_flight_path)?;
369        let request: DriveBridgeRequest = serde_json::from_slice(&request)?;
370        tracing::debug!(
371            action = request.action.as_str(),
372            request_id = request.request_id.as_str(),
373            selector = request.selector.as_deref(),
374            "Auditaur drive bridge poll returning request"
375        );
376        Ok(Some(request))
377    }
378
379    pub fn complete_drive_bridge_request(
380        &self,
381        response: DriveBridgeResponse,
382    ) -> Result<(), AuditaurError> {
383        let bridge_dir = self.bridge_dir()?;
384        ensure_bridge_dirs(&bridge_dir)?;
385        let response_path = bridge_dir
386            .join(DRIVE_BRIDGE_RESPONSES_DIR)
387            .join(format!("{}.json", safe_bridge_id(&response.request_id)?));
388        atomic_write_json(&response_path, &response)?;
389        let in_flight_path = bridge_dir
390            .join(DRIVE_BRIDGE_IN_FLIGHT_DIR)
391            .join(format!("{}.json", safe_bridge_id(&response.request_id)?));
392        if in_flight_path.exists() {
393            fs::remove_file(in_flight_path)?;
394        }
395        Ok(())
396    }
397
398    fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
399        auditaur_core::redaction::redact_json_with_options(
400            value,
401            self.redact_defaults,
402            &self.extra_redaction_keys,
403        )
404        .value
405    }
406
407    fn bridge_dir(&self) -> Result<PathBuf, AuditaurError> {
408        if !self.enabled {
409            return Err(AuditaurError::new("Auditaur drive bridge is disabled."));
410        }
411        self.bridge_dir
412            .clone()
413            .ok_or_else(|| AuditaurError::new("Auditaur drive bridge has no session directory."))
414    }
415}
416
417impl Drop for AuditaurState {
418    fn drop(&mut self) {
419        if let Some(alive) = &self.heartbeat_alive {
420            alive.store(false, Ordering::SeqCst);
421        }
422        self.bridge_notifier_alive.store(false, Ordering::SeqCst);
423        if let Some(session_id) = &self.session_id {
424            crate::tracing::clear_sink(session_id);
425            clear_panic_sink(session_id);
426        }
427        if let Some(path) = &self.discovery_path {
428            let _ = fs::remove_file(path);
429        }
430    }
431}
432
433fn default_enabled() -> bool {
434    cfg!(debug_assertions) || std::env::var("AUDITAUR").ok().as_deref() == Some("1")
435}
436
437fn now_rfc3339() -> Result<String, AuditaurError> {
438    OffsetDateTime::now_utc()
439        .format(&Rfc3339)
440        .map_err(|error| AuditaurError::new(error.to_string()))
441}
442
443fn write_discovery(path: &PathBuf, discovery: &DiscoveryFile) -> Result<(), AuditaurError> {
444    atomic_write_json(path, discovery)?;
445    Ok(())
446}
447
448fn ensure_bridge_dirs(bridge_dir: &Path) -> Result<(), AuditaurError> {
449    fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_REQUESTS_DIR))?;
450    fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_IN_FLIGHT_DIR))?;
451    fs::create_dir_all(bridge_dir.join(DRIVE_BRIDGE_RESPONSES_DIR))?;
452    Ok(())
453}
454
455fn first_matching_request_file(
456    dir: &Path,
457    window_label: Option<&str>,
458) -> Result<Option<PathBuf>, AuditaurError> {
459    let mut paths = fs::read_dir(dir)?
460        .filter_map(Result::ok)
461        .map(|entry| entry.path())
462        .filter(|path| path.extension().and_then(|value| value.to_str()) == Some("json"))
463        .collect::<Vec<_>>();
464    paths.sort();
465    for path in paths {
466        let bytes = match fs::read(&path) {
467            Ok(bytes) => bytes,
468            Err(_) => continue,
469        };
470        let Ok(request) = serde_json::from_slice::<DriveBridgeRequest>(&bytes) else {
471            continue;
472        };
473        if request
474            .window_label
475            .as_deref()
476            .is_none_or(|requested| Some(requested) == window_label)
477        {
478            return Ok(Some(path));
479        }
480    }
481    Ok(None)
482}
483
484fn sweep_stale_bridge_files(bridge_dir: &Path) -> Result<(), AuditaurError> {
485    let now = now_unix_nanos();
486    for dirname in [
487        DRIVE_BRIDGE_REQUESTS_DIR,
488        DRIVE_BRIDGE_IN_FLIGHT_DIR,
489        DRIVE_BRIDGE_RESPONSES_DIR,
490    ] {
491        let dir = bridge_dir.join(dirname);
492        if !dir.exists() {
493            continue;
494        }
495        for entry in fs::read_dir(dir)?.filter_map(Result::ok) {
496            let path = entry.path();
497            if !is_bridge_json_or_temp_file(&path) {
498                continue;
499            }
500            let modified = entry
501                .metadata()
502                .and_then(|metadata| metadata.modified())
503                .ok()
504                .and_then(|modified| modified.duration_since(std::time::UNIX_EPOCH).ok())
505                .and_then(|duration| i64::try_from(duration.as_nanos()).ok())
506                .unwrap_or(now);
507            if now.saturating_sub(modified) > DRIVE_BRIDGE_STALE_FILE_NANOS {
508                let _ = fs::remove_file(path);
509            }
510        }
511    }
512    Ok(())
513}
514
515fn is_bridge_json_or_temp_file(path: &Path) -> bool {
516    if path.extension().and_then(|value| value.to_str()) == Some("json") {
517        return true;
518    }
519    path.file_name()
520        .and_then(|value| value.to_str())
521        .is_some_and(|name| name.contains(".json.") && name.ends_with(".tmp"))
522}
523
524fn safe_bridge_id(id: &str) -> Result<&str, AuditaurError> {
525    if id.is_empty()
526        || !id
527            .chars()
528            .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
529    {
530        return Err(AuditaurError::new(
531            "drive bridge request id contains invalid characters.",
532        ));
533    }
534    Ok(id)
535}
536
537fn read_drive_bridge_status(bridge_dir: &Path) -> Result<Option<DriveBridgeStatus>, AuditaurError> {
538    let status_path = bridge_dir.join(DRIVE_BRIDGE_STATUS_FILE);
539    if !status_path.exists() {
540        return Ok(None);
541    }
542    let bytes = fs::read(status_path)?;
543    serde_json::from_slice(&bytes).map(Some).map_err(Into::into)
544}
545
546fn write_drive_bridge_status(
547    bridge_dir: &Path,
548    status: &DriveBridgeStatus,
549) -> Result<(), AuditaurError> {
550    atomic_write_json(&bridge_dir.join(DRIVE_BRIDGE_STATUS_FILE), status)?;
551    Ok(())
552}
553
554fn atomic_write_json<T: serde::Serialize>(path: &Path, value: &T) -> Result<(), AuditaurError> {
555    let bytes = serde_json::to_vec_pretty(value)?;
556    atomic_write(path, &bytes)
557}
558
559fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), AuditaurError> {
560    let parent = path
561        .parent()
562        .ok_or_else(|| AuditaurError::new("atomic write target has no parent directory"))?;
563    fs::create_dir_all(parent)?;
564    let file_name = path
565        .file_name()
566        .and_then(|value| value.to_str())
567        .ok_or_else(|| AuditaurError::new("atomic write target has no UTF-8 file name"))?;
568    let temp_path = parent.join(format!(
569        ".{file_name}.{}.tmp",
570        now_unix_nanos().saturating_abs()
571    ));
572    fs::write(&temp_path, bytes)?;
573    fs::rename(temp_path, path)?;
574    Ok(())
575}
576
577fn start_heartbeat(
578    discovery_path: PathBuf,
579    mut discovery: DiscoveryFile,
580    alive: Arc<AtomicBool>,
581    interval: Duration,
582) {
583    thread::spawn(move || {
584        while alive.load(Ordering::SeqCst) {
585            thread::sleep(interval);
586            if !alive.load(Ordering::SeqCst) {
587                break;
588            }
589            if let Ok(timestamp) = now_rfc3339() {
590                discovery.last_heartbeat_at = timestamp;
591                let _ = write_discovery(&discovery_path, &discovery);
592            }
593        }
594    });
595}
596
597fn install_panic_sink(session_id: String, store: Arc<Mutex<SqliteStore>>) {
598    if let Ok(mut sink) = PANIC_SINK.lock() {
599        *sink = Some(PanicSink { session_id, store });
600    }
601    if PANIC_HOOK_INSTALLED.swap(true, Ordering::SeqCst) {
602        return;
603    }
604    let previous = panic::take_hook();
605    panic::set_hook(Box::new(move |info| {
606        record_panic(info);
607        previous(info);
608    }));
609}
610
611fn clear_panic_sink(session_id: &str) {
612    let Ok(mut sink) = PANIC_SINK.lock() else {
613        return;
614    };
615    if sink
616        .as_ref()
617        .map(|sink| sink.session_id.as_str() == session_id)
618        .unwrap_or(false)
619    {
620        *sink = None;
621    }
622}
623
624fn active_panic_sink() -> Option<PanicSink> {
625    PANIC_SINK.lock().ok().and_then(|sink| sink.clone())
626}
627
628fn record_panic(info: &panic::PanicHookInfo<'_>) {
629    let Some(sink) = active_panic_sink() else {
630        return;
631    };
632    let Ok(store) = sink.store.try_lock() else {
633        return;
634    };
635    let message = panic_message(info);
636    let location = info.location().map(|location| {
637        format!(
638            "{}:{}:{}",
639            location.file(),
640            location.line(),
641            location.column()
642        )
643    });
644    let timestamp = now_unix_nanos();
645    let attributes = json!({
646        "auditaur.source": "panic_hook",
647        "exception.escaped": true,
648        "code.filepath": info.location().map(|location| location.file()),
649        "code.lineno": info.location().map(|location| location.line()),
650        "code.column": info.location().map(|location| location.column()),
651    });
652    let _ = store.insert_log(&LogRecord {
653        session_id: sink.session_id.clone(),
654        timestamp_unix_nanos: timestamp,
655        observed_timestamp_unix_nanos: None,
656        severity_text: Some("ERROR".to_string()),
657        severity_number: Some(17),
658        body: Some(format!("Rust panic: {message}")),
659        body_json: Some(json!({
660            "message": message,
661            "location": location,
662        })),
663        trace_id: None,
664        span_id: None,
665        scope_name: Some("panic".to_string()),
666        scope_version: None,
667        attributes: attributes.clone(),
668        source: TelemetrySource::Plugin,
669    });
670    let _ = store.insert_frontend_error(&FrontendError {
671        session_id: sink.session_id,
672        timestamp_unix_nanos: timestamp,
673        message,
674        stack: location,
675        filename: info.location().map(|location| location.file().to_string()),
676        line_number: info.location().map(|location| i64::from(location.line())),
677        column_number: info.location().map(|location| i64::from(location.column())),
678        error_type: Some("RustPanic".to_string()),
679        trace_id: None,
680        span_id: None,
681        window_label: None,
682        attributes,
683    });
684}
685
686fn panic_message(info: &panic::PanicHookInfo<'_>) -> String {
687    info.payload()
688        .downcast_ref::<&str>()
689        .map(|message| (*message).to_string())
690        .or_else(|| info.payload().downcast_ref::<String>().cloned())
691        .unwrap_or_else(|| "panic payload was not a string".to_string())
692}
693
694fn now_unix_nanos() -> i64 {
695    let now = std::time::SystemTime::now()
696        .duration_since(std::time::UNIX_EPOCH)
697        .unwrap_or_default();
698    i64::try_from(now.as_nanos()).unwrap_or(i64::MAX)
699}
700
701#[cfg(test)]
702mod tests {
703    use super::AuditaurState;
704    use auditaur_collector::{exporter_sqlite::SqliteStore, receiver::OTelBatch};
705    use auditaur_core::{
706        drive_bridge::{
707            DriveBridgeRequest, DriveBridgeResponse, DRIVE_BRIDGE_IN_FLIGHT_DIR,
708            DRIVE_BRIDGE_PROTOCOL_VERSION, DRIVE_BRIDGE_REQUESTS_DIR, DRIVE_BRIDGE_RESPONSES_DIR,
709            DRIVE_BRIDGE_STALE_FILE_NANOS,
710        },
711        model::{LogRecord, SpanEventRecord, SpanRecord, TauriEventRecord, TauriIpcCall},
712        storage::{FrontendErrorQuery, SpanEventQuery},
713        AuditaurConfig,
714    };
715    use serde_json::json;
716    use std::sync::atomic::Ordering;
717    use std::time::{Duration, SystemTime};
718    use tempfile::TempDir;
719
720    #[test]
721    fn initializes_session_database_and_discovery_file() {
722        let _guard = crate::test_support::global_state_lock();
723        let temp = TempDir::new().unwrap();
724        let state = AuditaurState::initialize(
725            AuditaurConfig {
726                enabled: Some(true),
727                service_name: Some("plugin-test".to_string()),
728                session_name: Some("plugin-session".to_string()),
729                data_dir: Some(temp.path().to_path_buf()),
730                ..AuditaurConfig::default()
731            },
732            123,
733            Some("dev.auditaur.test".to_string()),
734        )
735        .unwrap();
736        if let Some(session_id) = state.session_id.as_deref() {
737            crate::tracing::clear_sink(session_id);
738        }
739
740        assert!(state.session_id.is_some());
741        let db = temp
742            .path()
743            .join("sessions")
744            .join(state.session_id.as_ref().unwrap())
745            .join("telemetry.sqlite");
746        let store = SqliteStore::open(db).unwrap();
747        let session = store
748            .get_session(state.session_id.as_ref().unwrap())
749            .unwrap()
750            .unwrap();
751        assert_eq!(session.session_name.as_deref(), Some("plugin-session"));
752        assert_eq!(
753            temp.path().join("apps").read_dir().unwrap().count(),
754            1,
755            "discovery file should be written"
756        );
757    }
758
759    #[test]
760    fn exports_redacted_batch_to_sqlite() {
761        let _guard = crate::test_support::global_state_lock();
762        let temp = TempDir::new().unwrap();
763        let state = AuditaurState::initialize(
764            AuditaurConfig {
765                enabled: Some(true),
766                service_name: Some("plugin-test".to_string()),
767                data_dir: Some(temp.path().to_path_buf()),
768                ..AuditaurConfig::default()
769            },
770            123,
771            None,
772        )
773        .unwrap();
774        let session_id = state.session_id.clone().unwrap();
775        crate::tracing::clear_sink(&session_id);
776
777        state
778            .export_batch(OTelBatch {
779                logs: vec![LogRecord {
780                    session_id: String::new(),
781                    timestamp_unix_nanos: 1,
782                    observed_timestamp_unix_nanos: None,
783                    severity_text: Some("INFO".to_string()),
784                    severity_number: Some(9),
785                    body: Some("hello".to_string()),
786                    body_json: Some(json!({ "token": "secret" })),
787                    trace_id: None,
788                    span_id: None,
789                    scope_name: None,
790                    scope_version: None,
791                    attributes: json!({ "api_key": "secret" }),
792                    source: auditaur_core::model::TelemetrySource::Frontend,
793                }],
794                spans: vec![SpanRecord {
795                    session_id: String::new(),
796                    trace_id: "trace".to_string(),
797                    span_id: "span".to_string(),
798                    parent_span_id: None,
799                    name: "agentive.run".to_string(),
800                    kind: Some("internal".to_string()),
801                    start_time_unix_nanos: 1,
802                    end_time_unix_nanos: Some(4),
803                    status_code: Some("OK".to_string()),
804                    status_message: None,
805                    scope_name: Some("agentive".to_string()),
806                    scope_version: Some("0.2.1".to_string()),
807                    attributes: json!({ "agentive.run_id": "run", "token": "secret" }),
808                    source: auditaur_core::model::TelemetrySource::ThirdPartyOtel,
809                }],
810                span_events: vec![SpanEventRecord {
811                    session_id: String::new(),
812                    trace_id: "trace".to_string(),
813                    span_id: "span".to_string(),
814                    name: "agent-event".to_string(),
815                    timestamp_unix_nanos: 2,
816                    attributes: json!({ "token": "secret", "summary": "done" }),
817                }],
818                tauri_ipc_calls: vec![TauriIpcCall {
819                    session_id: String::new(),
820                    timestamp_unix_nanos: 2,
821                    duration_ms: Some(1.0),
822                    command: "save".to_string(),
823                    status: "OK".to_string(),
824                    error_message: None,
825                    trace_id: Some("trace".to_string()),
826                    span_id: Some("span".to_string()),
827                    window_label: Some("main".to_string()),
828                    args_json: Some(json!({ "password": "secret" })),
829                    args_redacted: true,
830                    result_summary: Some("ok".to_string()),
831                }],
832                tauri_events: vec![TauriEventRecord {
833                    session_id: String::new(),
834                    timestamp_unix_nanos: 3,
835                    event_name: "save".to_string(),
836                    direction: "emit".to_string(),
837                    target: None,
838                    trace_id: Some("trace".to_string()),
839                    span_id: Some("event-span".to_string()),
840                    window_label: Some("main".to_string()),
841                    payload_summary: Some("payload".to_string()),
842                    payload_json: Some(json!({ "token": "secret" })),
843                    payload_redacted: true,
844                }],
845                ..OTelBatch::default()
846            })
847            .unwrap();
848
849        let db = temp
850            .path()
851            .join("sessions")
852            .join(&session_id)
853            .join("telemetry.sqlite");
854        let store = SqliteStore::open(db).unwrap();
855        let logs = store
856            .list_logs(&auditaur_core::storage::LogQuery::default())
857            .unwrap();
858
859        let log = logs
860            .iter()
861            .find(|log| log.body.as_deref() == Some("hello"))
862            .expect("exported frontend log should be present");
863        assert_eq!(log.session_id, session_id);
864        assert_eq!(log.attributes["api_key"], "[REDACTED]");
865        assert_eq!(log.body_json.as_ref().unwrap()["token"], "[REDACTED]");
866
867        let span_events = store.list_span_events(&SpanEventQuery::default()).unwrap();
868        assert_eq!(span_events[0].session_id, session_id);
869        assert_eq!(span_events[0].attributes["token"], "[REDACTED]");
870
871        let ipc = store
872            .list_tauri_ipc_calls(&auditaur_core::storage::TauriIpcQuery::default())
873            .unwrap();
874        assert_eq!(ipc[0].session_id, session_id);
875        assert_eq!(ipc[0].args_json.as_ref().unwrap()["password"], "[REDACTED]");
876
877        let events = store
878            .list_tauri_events(&auditaur_core::storage::TauriEventQuery::default())
879            .unwrap();
880        assert_eq!(
881            events[0].payload_json.as_ref().unwrap()["token"],
882            "[REDACTED]"
883        );
884    }
885
886    #[test]
887    fn drive_bridge_register_poll_and_complete_round_trip() {
888        let _guard = crate::test_support::global_state_lock();
889        let temp = TempDir::new().unwrap();
890        let state = AuditaurState::initialize(
891            AuditaurConfig {
892                enabled: Some(true),
893                service_name: Some("bridge-test".to_string()),
894                data_dir: Some(temp.path().to_path_buf()),
895                ..AuditaurConfig::default()
896            },
897            123,
898            None,
899        )
900        .unwrap();
901        let session_id = state.session_id.clone().unwrap();
902        crate::tracing::clear_sink(&session_id);
903
904        let status = state
905            .register_drive_bridge(Some("main".to_string()))
906            .unwrap();
907        assert!(status.active);
908        assert_eq!(status.window_label.as_deref(), Some("main"));
909
910        let bridge_dir = state.bridge_dir().unwrap();
911        let request = DriveBridgeRequest {
912            schema_version: 1,
913            protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
914            request_id: "request-1".to_string(),
915            action: "exists".to_string(),
916            selector: Some("#ready".to_string()),
917            value: None,
918            visible_only: true,
919            window_label: Some("main".to_string()),
920            test_id: Some("test".to_string()),
921            step_id: Some("step".to_string()),
922            created_at_unix_nanos: 1,
923        };
924        std::fs::write(
925            bridge_dir
926                .join(DRIVE_BRIDGE_REQUESTS_DIR)
927                .join("request-1.json"),
928            serde_json::to_vec_pretty(&request).unwrap(),
929        )
930        .unwrap();
931
932        let polled = state
933            .poll_drive_bridge_request(Some("main".to_string()))
934            .unwrap()
935            .unwrap();
936        assert_eq!(polled.request_id, "request-1");
937        assert!(!bridge_dir
938            .join(DRIVE_BRIDGE_REQUESTS_DIR)
939            .join("request-1.json")
940            .exists());
941        assert!(bridge_dir
942            .join(DRIVE_BRIDGE_IN_FLIGHT_DIR)
943            .join("request-1.json")
944            .exists());
945
946        state
947            .complete_drive_bridge_request(DriveBridgeResponse {
948                schema_version: 1,
949                protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
950                request_id: "request-1".to_string(),
951                action: "exists".to_string(),
952                selector: Some("#ready".to_string()),
953                visible_only: true,
954                ok: true,
955                payload: json!({ "exists": true }),
956                error: None,
957                completed_at_unix_nanos: 2,
958            })
959            .unwrap();
960
961        assert!(!bridge_dir
962            .join(DRIVE_BRIDGE_IN_FLIGHT_DIR)
963            .join("request-1.json")
964            .exists());
965        let response: DriveBridgeResponse = serde_json::from_slice(
966            &std::fs::read(
967                bridge_dir
968                    .join(DRIVE_BRIDGE_RESPONSES_DIR)
969                    .join("request-1.json"),
970            )
971            .unwrap(),
972        )
973        .unwrap();
974        assert_eq!(response.payload["exists"], true);
975    }
976
977    #[test]
978    fn drive_bridge_rejects_unsafe_response_request_id() {
979        let _guard = crate::test_support::global_state_lock();
980        let temp = TempDir::new().unwrap();
981        let state = AuditaurState::initialize(
982            AuditaurConfig {
983                enabled: Some(true),
984                service_name: Some("bridge-test".to_string()),
985                data_dir: Some(temp.path().to_path_buf()),
986                ..AuditaurConfig::default()
987            },
988            123,
989            None,
990        )
991        .unwrap();
992        let session_id = state.session_id.clone().unwrap();
993        crate::tracing::clear_sink(&session_id);
994
995        let error = state
996            .complete_drive_bridge_request(DriveBridgeResponse {
997                schema_version: 1,
998                protocol_version: DRIVE_BRIDGE_PROTOCOL_VERSION,
999                request_id: "../escape".to_string(),
1000                action: "exists".to_string(),
1001                selector: None,
1002                visible_only: false,
1003                ok: false,
1004                payload: json!({ "ok": false }),
1005                error: Some("bad".to_string()),
1006                completed_at_unix_nanos: 2,
1007            })
1008            .unwrap_err();
1009        assert!(error.to_string().contains("invalid characters"));
1010    }
1011
1012    #[test]
1013    fn drive_bridge_notifier_starts_once_and_stops_with_state() {
1014        let _guard = crate::test_support::global_state_lock();
1015        let temp = TempDir::new().unwrap();
1016        let state = AuditaurState::initialize(
1017            AuditaurConfig {
1018                enabled: Some(true),
1019                service_name: Some("bridge-test".to_string()),
1020                data_dir: Some(temp.path().to_path_buf()),
1021                ..AuditaurConfig::default()
1022            },
1023            123,
1024            None,
1025        )
1026        .unwrap();
1027        let session_id = state.session_id.clone().unwrap();
1028        crate::tracing::clear_sink(&session_id);
1029
1030        state
1031            .register_drive_bridge(Some("main".to_string()))
1032            .unwrap();
1033        let first = state.start_bridge_notifier_if_needed();
1034        assert!(first.is_some(), "first bridge registration starts notifier");
1035        let (_, alive) = first.unwrap();
1036        assert!(alive.load(Ordering::SeqCst));
1037        assert!(
1038            state.start_bridge_notifier_if_needed().is_none(),
1039            "notifier startup is idempotent"
1040        );
1041        drop(state);
1042        assert!(
1043            !alive.load(Ordering::SeqCst),
1044            "notifier lifetime flag stops with plugin state"
1045        );
1046    }
1047
1048    #[test]
1049    fn drive_bridge_sweeps_stale_atomic_temp_files() {
1050        let _guard = crate::test_support::global_state_lock();
1051        let temp = TempDir::new().unwrap();
1052        let state = AuditaurState::initialize(
1053            AuditaurConfig {
1054                enabled: Some(true),
1055                service_name: Some("bridge-test".to_string()),
1056                data_dir: Some(temp.path().to_path_buf()),
1057                ..AuditaurConfig::default()
1058            },
1059            123,
1060            None,
1061        )
1062        .unwrap();
1063        let session_id = state.session_id.clone().unwrap();
1064        crate::tracing::clear_sink(&session_id);
1065
1066        let bridge_dir = state.bridge_dir().unwrap();
1067        let stale_temp = bridge_dir
1068            .join(DRIVE_BRIDGE_REQUESTS_DIR)
1069            .join("request-1.json.123.tmp");
1070        std::fs::write(&stale_temp, b"partial").unwrap();
1071        std::fs::OpenOptions::new()
1072            .write(true)
1073            .open(&stale_temp)
1074            .unwrap()
1075            .set_modified(
1076                SystemTime::now()
1077                    - Duration::from_nanos(
1078                        u64::try_from(DRIVE_BRIDGE_STALE_FILE_NANOS).unwrap() + 1_000_000,
1079                    ),
1080            )
1081            .unwrap();
1082
1083        state
1084            .register_drive_bridge(Some("main".to_string()))
1085            .unwrap();
1086        assert!(
1087            !stale_temp.exists(),
1088            "stale atomic-write temp files should be reclaimed"
1089        );
1090    }
1091
1092    #[test]
1093    fn panic_hook_records_and_clears_with_state() {
1094        let _guard = crate::test_support::global_state_lock();
1095        let temp = TempDir::new().unwrap();
1096        let state = AuditaurState::initialize(
1097            AuditaurConfig {
1098                enabled: Some(true),
1099                service_name: Some("panic-test".to_string()),
1100                data_dir: Some(temp.path().to_path_buf()),
1101                ..AuditaurConfig::default()
1102            },
1103            123,
1104            None,
1105        )
1106        .unwrap();
1107        let session_id = state.session_id.clone().unwrap();
1108        crate::tracing::clear_sink(&session_id);
1109
1110        let _ = std::panic::catch_unwind(|| panic!("intentional auditaur panic test"));
1111
1112        let store = store_for(&temp, &session_id);
1113        let errors = store
1114            .list_frontend_errors(&FrontendErrorQuery::default())
1115            .unwrap();
1116        assert_eq!(errors.len(), 1);
1117        assert_eq!(errors[0].error_type.as_deref(), Some("RustPanic"));
1118        assert_eq!(errors[0].message, "intentional auditaur panic test");
1119
1120        drop(state);
1121        let _ = std::panic::catch_unwind(|| panic!("after drop"));
1122        let errors_after_drop = store
1123            .list_frontend_errors(&FrontendErrorQuery::default())
1124            .unwrap();
1125        assert_eq!(errors_after_drop.len(), 1);
1126    }
1127
1128    fn store_for(temp: &TempDir, session_id: &str) -> SqliteStore {
1129        let db = temp
1130            .path()
1131            .join("sessions")
1132            .join(session_id)
1133            .join("telemetry.sqlite");
1134        SqliteStore::open(db).unwrap()
1135    }
1136}