Skip to main content

tauri_plugin_auditaur/
state.rs

1use std::{
2    fs, panic,
3    path::PathBuf,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, Mutex,
7    },
8    thread,
9    time::Duration,
10};
11
12use auditaur_collector::{
13    exporter_sqlite::{SqliteStore, SQLITE_SCHEMA_VERSION, TELEMETRY_DATABASE_FILE},
14    receiver::OTelBatch,
15    retention::RetentionLimits,
16};
17use auditaur_core::{
18    discovery::DiscoveryFile,
19    model::{FrontendError, LogRecord, Session, TelemetrySource},
20    AuditaurConfig,
21};
22use serde_json::json;
23use time::{format_description::well_known::Rfc3339, OffsetDateTime};
24use uuid::Uuid;
25
26use crate::error::AuditaurError;
27
28static PANIC_SINK: Mutex<Option<PanicSink>> = Mutex::new(None);
29static PANIC_HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
30
31#[derive(Clone)]
32struct PanicSink {
33    session_id: String,
34    store: Arc<Mutex<SqliteStore>>,
35}
36
37pub struct AuditaurState {
38    pub session_id: Option<String>,
39    enabled: bool,
40    store: Option<Arc<Mutex<SqliteStore>>>,
41    discovery_path: Option<PathBuf>,
42    heartbeat_alive: Option<Arc<AtomicBool>>,
43    redact_defaults: bool,
44    extra_redaction_keys: Vec<String>,
45    retention_limits: RetentionLimits,
46}
47
48impl AuditaurState {
49    pub fn initialize(
50        config: AuditaurConfig,
51        pid: u32,
52        app_identifier: Option<String>,
53    ) -> Result<Self, AuditaurError> {
54        let enabled = config.enabled.unwrap_or_else(default_enabled);
55        if !enabled {
56            return Ok(Self {
57                session_id: None,
58                enabled: false,
59                store: None,
60                discovery_path: None,
61                heartbeat_alive: None,
62                redact_defaults: config.redact_defaults,
63                extra_redaction_keys: config.extra_redaction_keys,
64                retention_limits: RetentionLimits::default(),
65            });
66        }
67
68        if !cfg!(debug_assertions) && !config.allow_release_builds {
69            return Err(AuditaurError::new(
70                "Auditaur is disabled in release builds unless allow_release_builds is true.",
71            ));
72        }
73
74        let data_dir = auditaur_core::resolve_data_dir(config.data_dir.as_ref())
75            .map_err(|error| AuditaurError::new(error.to_string()))?;
76        let session_id = Uuid::new_v4().to_string();
77        let instance_id = Uuid::new_v4().to_string();
78        let session_dir = data_dir.join("sessions").join(&session_id);
79        let apps_dir = data_dir.join("apps");
80        fs::create_dir_all(&session_dir)?;
81        fs::create_dir_all(&apps_dir)?;
82
83        let database_path = session_dir.join(TELEMETRY_DATABASE_FILE);
84        let store = SqliteStore::open(&database_path)?;
85        store.migrate()?;
86
87        let service_name = config
88            .service_name
89            .clone()
90            .or_else(|| std::env::var("CARGO_PKG_NAME").ok())
91            .unwrap_or_else(|| "tauri-app".to_string());
92        let service_version = config.service_version.clone();
93        let started_at = now_rfc3339()?;
94        store.create_session(&Session {
95            id: session_id.clone(),
96            session_name: config.session_name.clone(),
97            service_name: service_name.clone(),
98            service_version: service_version.clone(),
99            app_identifier: app_identifier.clone(),
100            pid: Some(i64::from(pid)),
101            started_at: started_at.clone(),
102            ended_at: None,
103            schema_version: SQLITE_SCHEMA_VERSION,
104            auditaur_version: Some(env!("CARGO_PKG_VERSION").to_string()),
105        })?;
106
107        let discovery_path = apps_dir.join(format!("{instance_id}.json"));
108        let discovery = DiscoveryFile {
109            schema_version: 1,
110            instance_id,
111            session_id: session_id.clone(),
112            service_name,
113            service_version,
114            app_identifier,
115            pid,
116            started_at,
117            database_path: database_path.to_string_lossy().to_string(),
118            capabilities: vec![
119                "logs".to_string(),
120                "traces".to_string(),
121                "frontend_errors".to_string(),
122                "ipc".to_string(),
123                "events".to_string(),
124                "windows".to_string(),
125            ],
126            last_heartbeat_at: now_rfc3339()?,
127        };
128        write_discovery(&discovery_path, &discovery)?;
129
130        let heartbeat_alive = Arc::new(AtomicBool::new(true));
131        start_heartbeat(
132            discovery_path.clone(),
133            discovery,
134            heartbeat_alive.clone(),
135            Duration::from_millis(config.heartbeat_interval_ms.max(1_000)),
136        );
137        let store = Arc::new(Mutex::new(store));
138        crate::tracing::install_sink(session_id.clone(), store.clone());
139        install_panic_sink(session_id.clone(), store.clone());
140
141        Ok(Self {
142            session_id: Some(session_id),
143            enabled: true,
144            store: Some(store),
145            discovery_path: Some(discovery_path),
146            heartbeat_alive: Some(heartbeat_alive),
147            redact_defaults: config.redact_defaults,
148            extra_redaction_keys: config.extra_redaction_keys,
149            retention_limits: RetentionLimits {
150                max_session_bytes: config.max_session_bytes,
151                ..RetentionLimits::default()
152            },
153        })
154    }
155
156    pub fn export_batch(&self, batch: OTelBatch) -> Result<(), AuditaurError> {
157        if !self.enabled {
158            return Ok(());
159        }
160
161        let Some(store) = &self.store else {
162            return Err(AuditaurError::new(
163                "Auditaur is enabled without an initialized store.",
164            ));
165        };
166        let session_id = self
167            .session_id
168            .as_deref()
169            .ok_or_else(|| AuditaurError::new("Auditaur is enabled without a session id."))?;
170        let store = store
171            .lock()
172            .map_err(|_| AuditaurError::new("Auditaur SQLite store lock was poisoned."))?;
173
174        for mut span in batch.spans {
175            if span.session_id.is_empty() {
176                span.session_id = session_id.to_string();
177            }
178            span.attributes = self.redact_value(&span.attributes);
179            store.insert_span(&span)?;
180        }
181
182        for mut event in batch.span_events {
183            if event.session_id.is_empty() {
184                event.session_id = session_id.to_string();
185            }
186            event.attributes = self.redact_value(&event.attributes);
187            store.insert_span_event(&event)?;
188        }
189
190        for mut log in batch.logs {
191            if log.session_id.is_empty() {
192                log.session_id = session_id.to_string();
193            }
194            log.attributes = self.redact_value(&log.attributes);
195            log.body_json = log.body_json.as_ref().map(|value| self.redact_value(value));
196            store.insert_log(&log)?;
197        }
198
199        for mut error in batch.frontend_errors {
200            if error.session_id.is_empty() {
201                error.session_id = session_id.to_string();
202            }
203            error.attributes = self.redact_value(&error.attributes);
204            store.insert_frontend_error(&error)?;
205        }
206
207        for mut call in batch.tauri_ipc_calls {
208            if call.session_id.is_empty() {
209                call.session_id = session_id.to_string();
210            }
211            if let Some(args_json) = &call.args_json {
212                let outcome = auditaur_core::redaction::redact_json_with_options(
213                    args_json,
214                    self.redact_defaults,
215                    &self.extra_redaction_keys,
216                );
217                call.args_json = Some(outcome.value);
218                call.args_redacted = outcome.redacted;
219            } else {
220                call.args_redacted = false;
221            }
222            store.insert_tauri_ipc_call(&call)?;
223        }
224
225        for mut event in batch.tauri_events {
226            if event.session_id.is_empty() {
227                event.session_id = session_id.to_string();
228            }
229            if let Some(payload_json) = &event.payload_json {
230                let outcome = auditaur_core::redaction::redact_json_with_options(
231                    payload_json,
232                    self.redact_defaults,
233                    &self.extra_redaction_keys,
234                );
235                event.payload_json = Some(outcome.value);
236                event.payload_redacted = outcome.redacted;
237            } else {
238                event.payload_redacted = false;
239            }
240            store.insert_tauri_event(&event)?;
241        }
242
243        store.enforce_retention(self.retention_limits)?;
244
245        Ok(())
246    }
247
248    pub fn tracing_layer(&self) -> crate::tracing::AuditaurTracingLayer {
249        match (&self.session_id, &self.store) {
250            (Some(session_id), Some(store)) => {
251                crate::tracing::AuditaurTracingLayer::with_sink(session_id.clone(), store.clone())
252            }
253            _ => crate::tracing::tracing_layer(),
254        }
255    }
256
257    pub(crate) fn store(&self) -> Option<Arc<Mutex<SqliteStore>>> {
258        self.store.clone()
259    }
260
261    fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
262        auditaur_core::redaction::redact_json_with_options(
263            value,
264            self.redact_defaults,
265            &self.extra_redaction_keys,
266        )
267        .value
268    }
269}
270
271impl Drop for AuditaurState {
272    fn drop(&mut self) {
273        if let Some(alive) = &self.heartbeat_alive {
274            alive.store(false, Ordering::SeqCst);
275        }
276        if let Some(session_id) = &self.session_id {
277            crate::tracing::clear_sink(session_id);
278            clear_panic_sink(session_id);
279        }
280        if let Some(path) = &self.discovery_path {
281            let _ = fs::remove_file(path);
282        }
283    }
284}
285
286fn default_enabled() -> bool {
287    cfg!(debug_assertions) || std::env::var("AUDITAUR").ok().as_deref() == Some("1")
288}
289
290fn now_rfc3339() -> Result<String, AuditaurError> {
291    OffsetDateTime::now_utc()
292        .format(&Rfc3339)
293        .map_err(|error| AuditaurError::new(error.to_string()))
294}
295
296fn write_discovery(path: &PathBuf, discovery: &DiscoveryFile) -> Result<(), AuditaurError> {
297    fs::write(path, serde_json::to_vec_pretty(discovery)?)?;
298    Ok(())
299}
300
301fn start_heartbeat(
302    discovery_path: PathBuf,
303    mut discovery: DiscoveryFile,
304    alive: Arc<AtomicBool>,
305    interval: Duration,
306) {
307    thread::spawn(move || {
308        while alive.load(Ordering::SeqCst) {
309            thread::sleep(interval);
310            if !alive.load(Ordering::SeqCst) {
311                break;
312            }
313            if let Ok(timestamp) = now_rfc3339() {
314                discovery.last_heartbeat_at = timestamp;
315                let _ = write_discovery(&discovery_path, &discovery);
316            }
317        }
318    });
319}
320
321fn install_panic_sink(session_id: String, store: Arc<Mutex<SqliteStore>>) {
322    if let Ok(mut sink) = PANIC_SINK.lock() {
323        *sink = Some(PanicSink { session_id, store });
324    }
325    if PANIC_HOOK_INSTALLED.swap(true, Ordering::SeqCst) {
326        return;
327    }
328    let previous = panic::take_hook();
329    panic::set_hook(Box::new(move |info| {
330        record_panic(info);
331        previous(info);
332    }));
333}
334
335fn clear_panic_sink(session_id: &str) {
336    let Ok(mut sink) = PANIC_SINK.lock() else {
337        return;
338    };
339    if sink
340        .as_ref()
341        .map(|sink| sink.session_id.as_str() == session_id)
342        .unwrap_or(false)
343    {
344        *sink = None;
345    }
346}
347
348fn active_panic_sink() -> Option<PanicSink> {
349    PANIC_SINK.lock().ok().and_then(|sink| sink.clone())
350}
351
352fn record_panic(info: &panic::PanicHookInfo<'_>) {
353    let Some(sink) = active_panic_sink() else {
354        return;
355    };
356    let Ok(store) = sink.store.try_lock() else {
357        return;
358    };
359    let message = panic_message(info);
360    let location = info.location().map(|location| {
361        format!(
362            "{}:{}:{}",
363            location.file(),
364            location.line(),
365            location.column()
366        )
367    });
368    let timestamp = now_unix_nanos();
369    let attributes = json!({
370        "auditaur.source": "panic_hook",
371        "exception.escaped": true,
372        "code.filepath": info.location().map(|location| location.file()),
373        "code.lineno": info.location().map(|location| location.line()),
374        "code.column": info.location().map(|location| location.column()),
375    });
376    let _ = store.insert_log(&LogRecord {
377        session_id: sink.session_id.clone(),
378        timestamp_unix_nanos: timestamp,
379        observed_timestamp_unix_nanos: None,
380        severity_text: Some("ERROR".to_string()),
381        severity_number: Some(17),
382        body: Some(format!("Rust panic: {message}")),
383        body_json: Some(json!({
384            "message": message,
385            "location": location,
386        })),
387        trace_id: None,
388        span_id: None,
389        scope_name: Some("panic".to_string()),
390        scope_version: None,
391        attributes: attributes.clone(),
392        source: TelemetrySource::Plugin,
393    });
394    let _ = store.insert_frontend_error(&FrontendError {
395        session_id: sink.session_id,
396        timestamp_unix_nanos: timestamp,
397        message,
398        stack: location,
399        filename: info.location().map(|location| location.file().to_string()),
400        line_number: info.location().map(|location| i64::from(location.line())),
401        column_number: info.location().map(|location| i64::from(location.column())),
402        error_type: Some("RustPanic".to_string()),
403        trace_id: None,
404        span_id: None,
405        window_label: None,
406        attributes,
407    });
408}
409
410fn panic_message(info: &panic::PanicHookInfo<'_>) -> String {
411    info.payload()
412        .downcast_ref::<&str>()
413        .map(|message| (*message).to_string())
414        .or_else(|| info.payload().downcast_ref::<String>().cloned())
415        .unwrap_or_else(|| "panic payload was not a string".to_string())
416}
417
418fn now_unix_nanos() -> i64 {
419    let now = std::time::SystemTime::now()
420        .duration_since(std::time::UNIX_EPOCH)
421        .unwrap_or_default();
422    i64::try_from(now.as_nanos()).unwrap_or(i64::MAX)
423}
424
425#[cfg(test)]
426mod tests {
427    use super::AuditaurState;
428    use auditaur_collector::{exporter_sqlite::SqliteStore, receiver::OTelBatch};
429    use auditaur_core::{
430        model::{LogRecord, SpanEventRecord, SpanRecord, TauriEventRecord, TauriIpcCall},
431        storage::{FrontendErrorQuery, SpanEventQuery},
432        AuditaurConfig,
433    };
434    use serde_json::json;
435    use tempfile::TempDir;
436
437    #[test]
438    fn initializes_session_database_and_discovery_file() {
439        let _guard = crate::test_support::global_state_lock();
440        let temp = TempDir::new().unwrap();
441        let state = AuditaurState::initialize(
442            AuditaurConfig {
443                enabled: Some(true),
444                service_name: Some("plugin-test".to_string()),
445                session_name: Some("plugin-session".to_string()),
446                data_dir: Some(temp.path().to_path_buf()),
447                ..AuditaurConfig::default()
448            },
449            123,
450            Some("dev.auditaur.test".to_string()),
451        )
452        .unwrap();
453        if let Some(session_id) = state.session_id.as_deref() {
454            crate::tracing::clear_sink(session_id);
455        }
456
457        assert!(state.session_id.is_some());
458        let db = temp
459            .path()
460            .join("sessions")
461            .join(state.session_id.as_ref().unwrap())
462            .join("telemetry.sqlite");
463        let store = SqliteStore::open(db).unwrap();
464        let session = store
465            .get_session(state.session_id.as_ref().unwrap())
466            .unwrap()
467            .unwrap();
468        assert_eq!(session.session_name.as_deref(), Some("plugin-session"));
469        assert_eq!(
470            temp.path().join("apps").read_dir().unwrap().count(),
471            1,
472            "discovery file should be written"
473        );
474    }
475
476    #[test]
477    fn exports_redacted_batch_to_sqlite() {
478        let _guard = crate::test_support::global_state_lock();
479        let temp = TempDir::new().unwrap();
480        let state = AuditaurState::initialize(
481            AuditaurConfig {
482                enabled: Some(true),
483                service_name: Some("plugin-test".to_string()),
484                data_dir: Some(temp.path().to_path_buf()),
485                ..AuditaurConfig::default()
486            },
487            123,
488            None,
489        )
490        .unwrap();
491        let session_id = state.session_id.clone().unwrap();
492        crate::tracing::clear_sink(&session_id);
493
494        state
495            .export_batch(OTelBatch {
496                logs: vec![LogRecord {
497                    session_id: String::new(),
498                    timestamp_unix_nanos: 1,
499                    observed_timestamp_unix_nanos: None,
500                    severity_text: Some("INFO".to_string()),
501                    severity_number: Some(9),
502                    body: Some("hello".to_string()),
503                    body_json: Some(json!({ "token": "secret" })),
504                    trace_id: None,
505                    span_id: None,
506                    scope_name: None,
507                    scope_version: None,
508                    attributes: json!({ "api_key": "secret" }),
509                    source: auditaur_core::model::TelemetrySource::Frontend,
510                }],
511                spans: vec![SpanRecord {
512                    session_id: String::new(),
513                    trace_id: "trace".to_string(),
514                    span_id: "span".to_string(),
515                    parent_span_id: None,
516                    name: "agentive.run".to_string(),
517                    kind: Some("internal".to_string()),
518                    start_time_unix_nanos: 1,
519                    end_time_unix_nanos: Some(4),
520                    status_code: Some("OK".to_string()),
521                    status_message: None,
522                    scope_name: Some("agentive".to_string()),
523                    scope_version: Some("0.2.1".to_string()),
524                    attributes: json!({ "agentive.run_id": "run", "token": "secret" }),
525                    source: auditaur_core::model::TelemetrySource::ThirdPartyOtel,
526                }],
527                span_events: vec![SpanEventRecord {
528                    session_id: String::new(),
529                    trace_id: "trace".to_string(),
530                    span_id: "span".to_string(),
531                    name: "agent-event".to_string(),
532                    timestamp_unix_nanos: 2,
533                    attributes: json!({ "token": "secret", "summary": "done" }),
534                }],
535                tauri_ipc_calls: vec![TauriIpcCall {
536                    session_id: String::new(),
537                    timestamp_unix_nanos: 2,
538                    duration_ms: Some(1.0),
539                    command: "save".to_string(),
540                    status: "OK".to_string(),
541                    error_message: None,
542                    trace_id: Some("trace".to_string()),
543                    span_id: Some("span".to_string()),
544                    window_label: Some("main".to_string()),
545                    args_json: Some(json!({ "password": "secret" })),
546                    args_redacted: true,
547                    result_summary: Some("ok".to_string()),
548                }],
549                tauri_events: vec![TauriEventRecord {
550                    session_id: String::new(),
551                    timestamp_unix_nanos: 3,
552                    event_name: "save".to_string(),
553                    direction: "emit".to_string(),
554                    target: None,
555                    trace_id: Some("trace".to_string()),
556                    span_id: Some("event-span".to_string()),
557                    window_label: Some("main".to_string()),
558                    payload_summary: Some("payload".to_string()),
559                    payload_json: Some(json!({ "token": "secret" })),
560                    payload_redacted: true,
561                }],
562                ..OTelBatch::default()
563            })
564            .unwrap();
565
566        let db = temp
567            .path()
568            .join("sessions")
569            .join(&session_id)
570            .join("telemetry.sqlite");
571        let store = SqliteStore::open(db).unwrap();
572        let logs = store
573            .list_logs(&auditaur_core::storage::LogQuery::default())
574            .unwrap();
575
576        let log = logs
577            .iter()
578            .find(|log| log.body.as_deref() == Some("hello"))
579            .expect("exported frontend log should be present");
580        assert_eq!(log.session_id, session_id);
581        assert_eq!(log.attributes["api_key"], "[REDACTED]");
582        assert_eq!(log.body_json.as_ref().unwrap()["token"], "[REDACTED]");
583
584        let span_events = store.list_span_events(&SpanEventQuery::default()).unwrap();
585        assert_eq!(span_events[0].session_id, session_id);
586        assert_eq!(span_events[0].attributes["token"], "[REDACTED]");
587
588        let ipc = store
589            .list_tauri_ipc_calls(&auditaur_core::storage::TauriIpcQuery::default())
590            .unwrap();
591        assert_eq!(ipc[0].session_id, session_id);
592        assert_eq!(ipc[0].args_json.as_ref().unwrap()["password"], "[REDACTED]");
593
594        let events = store
595            .list_tauri_events(&auditaur_core::storage::TauriEventQuery::default())
596            .unwrap();
597        assert_eq!(
598            events[0].payload_json.as_ref().unwrap()["token"],
599            "[REDACTED]"
600        );
601    }
602
603    #[test]
604    fn panic_hook_records_and_clears_with_state() {
605        let _guard = crate::test_support::global_state_lock();
606        let temp = TempDir::new().unwrap();
607        let state = AuditaurState::initialize(
608            AuditaurConfig {
609                enabled: Some(true),
610                service_name: Some("panic-test".to_string()),
611                data_dir: Some(temp.path().to_path_buf()),
612                ..AuditaurConfig::default()
613            },
614            123,
615            None,
616        )
617        .unwrap();
618        let session_id = state.session_id.clone().unwrap();
619        crate::tracing::clear_sink(&session_id);
620
621        let _ = std::panic::catch_unwind(|| panic!("intentional auditaur panic test"));
622
623        let store = store_for(&temp, &session_id);
624        let errors = store
625            .list_frontend_errors(&FrontendErrorQuery::default())
626            .unwrap();
627        assert_eq!(errors.len(), 1);
628        assert_eq!(errors[0].error_type.as_deref(), Some("RustPanic"));
629        assert_eq!(errors[0].message, "intentional auditaur panic test");
630
631        drop(state);
632        let _ = std::panic::catch_unwind(|| panic!("after drop"));
633        let errors_after_drop = store
634            .list_frontend_errors(&FrontendErrorQuery::default())
635            .unwrap();
636        assert_eq!(errors_after_drop.len(), 1);
637    }
638
639    fn store_for(temp: &TempDir, session_id: &str) -> SqliteStore {
640        let db = temp
641            .path()
642            .join("sessions")
643            .join(session_id)
644            .join("telemetry.sqlite");
645        SqliteStore::open(db).unwrap()
646    }
647}