Skip to main content

tauri_plugin_auditaur/
state.rs

1use std::{
2    fs, panic,
3    path::PathBuf,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, Mutex,
7    },
8    thread,
9    time::Duration,
10};
11
12use auditaur_collector::{
13    exporter_sqlite::{SqliteStore, SQLITE_SCHEMA_VERSION, TELEMETRY_DATABASE_FILE},
14    receiver::OTelBatch,
15    retention::RetentionLimits,
16};
17use auditaur_core::{
18    discovery::DiscoveryFile,
19    model::{FrontendError, LogRecord, Session, TelemetrySource},
20    AuditaurConfig,
21};
22use serde_json::json;
23use time::{format_description::well_known::Rfc3339, OffsetDateTime};
24use uuid::Uuid;
25
26use crate::error::AuditaurError;
27
28static PANIC_SINK: Mutex<Option<PanicSink>> = Mutex::new(None);
29static PANIC_HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
30
31#[derive(Clone)]
32struct PanicSink {
33    session_id: String,
34    store: Arc<Mutex<SqliteStore>>,
35}
36
37pub struct AuditaurState {
38    pub session_id: Option<String>,
39    enabled: bool,
40    store: Option<Arc<Mutex<SqliteStore>>>,
41    discovery_path: Option<PathBuf>,
42    heartbeat_alive: Option<Arc<AtomicBool>>,
43    redact_defaults: bool,
44    extra_redaction_keys: Vec<String>,
45    retention_limits: RetentionLimits,
46}
47
48impl AuditaurState {
49    pub fn initialize(
50        config: AuditaurConfig,
51        pid: u32,
52        app_identifier: Option<String>,
53    ) -> Result<Self, AuditaurError> {
54        let enabled = config.enabled.unwrap_or_else(default_enabled);
55        if !enabled {
56            return Ok(Self {
57                session_id: None,
58                enabled: false,
59                store: None,
60                discovery_path: None,
61                heartbeat_alive: None,
62                redact_defaults: config.redact_defaults,
63                extra_redaction_keys: config.extra_redaction_keys,
64                retention_limits: RetentionLimits::default(),
65            });
66        }
67
68        if !cfg!(debug_assertions) && !config.allow_release_builds {
69            return Err(AuditaurError::new(
70                "Auditaur is disabled in release builds unless allow_release_builds is true.",
71            ));
72        }
73
74        let data_dir = auditaur_core::resolve_data_dir(config.data_dir.as_ref())
75            .map_err(|error| AuditaurError::new(error.to_string()))?;
76        let session_id = Uuid::new_v4().to_string();
77        let instance_id = Uuid::new_v4().to_string();
78        let session_dir = data_dir.join("sessions").join(&session_id);
79        let apps_dir = data_dir.join("apps");
80        fs::create_dir_all(&session_dir)?;
81        fs::create_dir_all(&apps_dir)?;
82
83        let database_path = session_dir.join(TELEMETRY_DATABASE_FILE);
84        let store = SqliteStore::open(&database_path)?;
85        store.migrate()?;
86
87        let service_name = config
88            .service_name
89            .clone()
90            .or_else(|| std::env::var("CARGO_PKG_NAME").ok())
91            .unwrap_or_else(|| "tauri-app".to_string());
92        let service_version = config.service_version.clone();
93        let started_at = now_rfc3339()?;
94        store.create_session(&Session {
95            id: session_id.clone(),
96            session_name: config.session_name.clone(),
97            service_name: service_name.clone(),
98            service_version: service_version.clone(),
99            app_identifier: app_identifier.clone(),
100            pid: Some(i64::from(pid)),
101            started_at: started_at.clone(),
102            ended_at: None,
103            schema_version: SQLITE_SCHEMA_VERSION,
104            auditaur_version: Some(env!("CARGO_PKG_VERSION").to_string()),
105        })?;
106
107        let discovery_path = apps_dir.join(format!("{instance_id}.json"));
108        let discovery = DiscoveryFile {
109            schema_version: 1,
110            instance_id,
111            session_id: session_id.clone(),
112            service_name,
113            service_version,
114            app_identifier,
115            pid,
116            started_at,
117            database_path: database_path.to_string_lossy().to_string(),
118            capabilities: vec![
119                "logs".to_string(),
120                "traces".to_string(),
121                "frontend_errors".to_string(),
122                "ipc".to_string(),
123                "events".to_string(),
124                "windows".to_string(),
125            ],
126            last_heartbeat_at: now_rfc3339()?,
127        };
128        write_discovery(&discovery_path, &discovery)?;
129
130        let heartbeat_alive = Arc::new(AtomicBool::new(true));
131        start_heartbeat(
132            discovery_path.clone(),
133            discovery,
134            heartbeat_alive.clone(),
135            Duration::from_millis(config.heartbeat_interval_ms.max(1_000)),
136        );
137        let store = Arc::new(Mutex::new(store));
138        crate::tracing::install_sink(session_id.clone(), store.clone());
139        install_panic_sink(session_id.clone(), store.clone());
140
141        Ok(Self {
142            session_id: Some(session_id),
143            enabled: true,
144            store: Some(store),
145            discovery_path: Some(discovery_path),
146            heartbeat_alive: Some(heartbeat_alive),
147            redact_defaults: config.redact_defaults,
148            extra_redaction_keys: config.extra_redaction_keys,
149            retention_limits: RetentionLimits {
150                max_session_bytes: config.max_session_bytes,
151                ..RetentionLimits::default()
152            },
153        })
154    }
155
156    pub fn export_batch(&self, batch: OTelBatch) -> Result<(), AuditaurError> {
157        if !self.enabled {
158            return Ok(());
159        }
160
161        let Some(store) = &self.store else {
162            return Err(AuditaurError::new(
163                "Auditaur is enabled without an initialized store.",
164            ));
165        };
166        let session_id = self
167            .session_id
168            .as_deref()
169            .ok_or_else(|| AuditaurError::new("Auditaur is enabled without a session id."))?;
170        let store = store
171            .lock()
172            .map_err(|_| AuditaurError::new("Auditaur SQLite store lock was poisoned."))?;
173
174        for mut span in batch.spans {
175            if span.session_id.is_empty() {
176                span.session_id = session_id.to_string();
177            }
178            span.attributes = self.redact_value(&span.attributes);
179            store.insert_span(&span)?;
180        }
181
182        for mut log in batch.logs {
183            if log.session_id.is_empty() {
184                log.session_id = session_id.to_string();
185            }
186            log.attributes = self.redact_value(&log.attributes);
187            log.body_json = log.body_json.as_ref().map(|value| self.redact_value(value));
188            store.insert_log(&log)?;
189        }
190
191        for mut error in batch.frontend_errors {
192            if error.session_id.is_empty() {
193                error.session_id = session_id.to_string();
194            }
195            error.attributes = self.redact_value(&error.attributes);
196            store.insert_frontend_error(&error)?;
197        }
198
199        for mut call in batch.tauri_ipc_calls {
200            if call.session_id.is_empty() {
201                call.session_id = session_id.to_string();
202            }
203            if let Some(args_json) = &call.args_json {
204                let outcome = auditaur_core::redaction::redact_json_with_options(
205                    args_json,
206                    self.redact_defaults,
207                    &self.extra_redaction_keys,
208                );
209                call.args_json = Some(outcome.value);
210                call.args_redacted = outcome.redacted;
211            } else {
212                call.args_redacted = false;
213            }
214            store.insert_tauri_ipc_call(&call)?;
215        }
216
217        for mut event in batch.tauri_events {
218            if event.session_id.is_empty() {
219                event.session_id = session_id.to_string();
220            }
221            if let Some(payload_json) = &event.payload_json {
222                let outcome = auditaur_core::redaction::redact_json_with_options(
223                    payload_json,
224                    self.redact_defaults,
225                    &self.extra_redaction_keys,
226                );
227                event.payload_json = Some(outcome.value);
228                event.payload_redacted = outcome.redacted;
229            } else {
230                event.payload_redacted = false;
231            }
232            store.insert_tauri_event(&event)?;
233        }
234
235        store.enforce_retention(self.retention_limits)?;
236
237        Ok(())
238    }
239
240    pub fn tracing_layer(&self) -> crate::tracing::AuditaurTracingLayer {
241        match (&self.session_id, &self.store) {
242            (Some(session_id), Some(store)) => {
243                crate::tracing::AuditaurTracingLayer::with_sink(session_id.clone(), store.clone())
244            }
245            _ => crate::tracing::tracing_layer(),
246        }
247    }
248
249    pub(crate) fn store(&self) -> Option<Arc<Mutex<SqliteStore>>> {
250        self.store.clone()
251    }
252
253    fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
254        auditaur_core::redaction::redact_json_with_options(
255            value,
256            self.redact_defaults,
257            &self.extra_redaction_keys,
258        )
259        .value
260    }
261}
262
263impl Drop for AuditaurState {
264    fn drop(&mut self) {
265        if let Some(alive) = &self.heartbeat_alive {
266            alive.store(false, Ordering::SeqCst);
267        }
268        if let Some(session_id) = &self.session_id {
269            crate::tracing::clear_sink(session_id);
270            clear_panic_sink(session_id);
271        }
272        if let Some(path) = &self.discovery_path {
273            let _ = fs::remove_file(path);
274        }
275    }
276}
277
278fn default_enabled() -> bool {
279    cfg!(debug_assertions) || std::env::var("AUDITAUR").ok().as_deref() == Some("1")
280}
281
282fn now_rfc3339() -> Result<String, AuditaurError> {
283    OffsetDateTime::now_utc()
284        .format(&Rfc3339)
285        .map_err(|error| AuditaurError::new(error.to_string()))
286}
287
288fn write_discovery(path: &PathBuf, discovery: &DiscoveryFile) -> Result<(), AuditaurError> {
289    fs::write(path, serde_json::to_vec_pretty(discovery)?)?;
290    Ok(())
291}
292
293fn start_heartbeat(
294    discovery_path: PathBuf,
295    mut discovery: DiscoveryFile,
296    alive: Arc<AtomicBool>,
297    interval: Duration,
298) {
299    thread::spawn(move || {
300        while alive.load(Ordering::SeqCst) {
301            thread::sleep(interval);
302            if !alive.load(Ordering::SeqCst) {
303                break;
304            }
305            if let Ok(timestamp) = now_rfc3339() {
306                discovery.last_heartbeat_at = timestamp;
307                let _ = write_discovery(&discovery_path, &discovery);
308            }
309        }
310    });
311}
312
313fn install_panic_sink(session_id: String, store: Arc<Mutex<SqliteStore>>) {
314    if let Ok(mut sink) = PANIC_SINK.lock() {
315        *sink = Some(PanicSink { session_id, store });
316    }
317    if PANIC_HOOK_INSTALLED.swap(true, Ordering::SeqCst) {
318        return;
319    }
320    let previous = panic::take_hook();
321    panic::set_hook(Box::new(move |info| {
322        record_panic(info);
323        previous(info);
324    }));
325}
326
327fn clear_panic_sink(session_id: &str) {
328    let Ok(mut sink) = PANIC_SINK.lock() else {
329        return;
330    };
331    if sink
332        .as_ref()
333        .map(|sink| sink.session_id.as_str() == session_id)
334        .unwrap_or(false)
335    {
336        *sink = None;
337    }
338}
339
340fn active_panic_sink() -> Option<PanicSink> {
341    PANIC_SINK.lock().ok().and_then(|sink| sink.clone())
342}
343
344fn record_panic(info: &panic::PanicHookInfo<'_>) {
345    let Some(sink) = active_panic_sink() else {
346        return;
347    };
348    let Ok(store) = sink.store.try_lock() else {
349        return;
350    };
351    let message = panic_message(info);
352    let location = info.location().map(|location| {
353        format!(
354            "{}:{}:{}",
355            location.file(),
356            location.line(),
357            location.column()
358        )
359    });
360    let timestamp = now_unix_nanos();
361    let attributes = json!({
362        "auditaur.source": "panic_hook",
363        "exception.escaped": true,
364        "code.filepath": info.location().map(|location| location.file()),
365        "code.lineno": info.location().map(|location| location.line()),
366        "code.column": info.location().map(|location| location.column()),
367    });
368    let _ = store.insert_log(&LogRecord {
369        session_id: sink.session_id.clone(),
370        timestamp_unix_nanos: timestamp,
371        observed_timestamp_unix_nanos: None,
372        severity_text: Some("ERROR".to_string()),
373        severity_number: Some(17),
374        body: Some(format!("Rust panic: {message}")),
375        body_json: Some(json!({
376            "message": message,
377            "location": location,
378        })),
379        trace_id: None,
380        span_id: None,
381        scope_name: Some("panic".to_string()),
382        scope_version: None,
383        attributes: attributes.clone(),
384        source: TelemetrySource::Plugin,
385    });
386    let _ = store.insert_frontend_error(&FrontendError {
387        session_id: sink.session_id,
388        timestamp_unix_nanos: timestamp,
389        message,
390        stack: location,
391        filename: info.location().map(|location| location.file().to_string()),
392        line_number: info.location().map(|location| i64::from(location.line())),
393        column_number: info.location().map(|location| i64::from(location.column())),
394        error_type: Some("RustPanic".to_string()),
395        trace_id: None,
396        span_id: None,
397        window_label: None,
398        attributes,
399    });
400}
401
402fn panic_message(info: &panic::PanicHookInfo<'_>) -> String {
403    info.payload()
404        .downcast_ref::<&str>()
405        .map(|message| (*message).to_string())
406        .or_else(|| info.payload().downcast_ref::<String>().cloned())
407        .unwrap_or_else(|| "panic payload was not a string".to_string())
408}
409
410fn now_unix_nanos() -> i64 {
411    let now = std::time::SystemTime::now()
412        .duration_since(std::time::UNIX_EPOCH)
413        .unwrap_or_default();
414    i64::try_from(now.as_nanos()).unwrap_or(i64::MAX)
415}
416
417#[cfg(test)]
418mod tests {
419    use super::AuditaurState;
420    use auditaur_collector::{exporter_sqlite::SqliteStore, receiver::OTelBatch};
421    use auditaur_core::{
422        model::{LogRecord, TauriEventRecord, TauriIpcCall},
423        storage::FrontendErrorQuery,
424        AuditaurConfig,
425    };
426    use serde_json::json;
427    use tempfile::TempDir;
428
429    #[test]
430    fn initializes_session_database_and_discovery_file() {
431        let _guard = crate::test_support::global_state_lock();
432        let temp = TempDir::new().unwrap();
433        let state = AuditaurState::initialize(
434            AuditaurConfig {
435                enabled: Some(true),
436                service_name: Some("plugin-test".to_string()),
437                session_name: Some("plugin-session".to_string()),
438                data_dir: Some(temp.path().to_path_buf()),
439                ..AuditaurConfig::default()
440            },
441            123,
442            Some("dev.auditaur.test".to_string()),
443        )
444        .unwrap();
445        if let Some(session_id) = state.session_id.as_deref() {
446            crate::tracing::clear_sink(session_id);
447        }
448
449        assert!(state.session_id.is_some());
450        let db = temp
451            .path()
452            .join("sessions")
453            .join(state.session_id.as_ref().unwrap())
454            .join("telemetry.sqlite");
455        let store = SqliteStore::open(db).unwrap();
456        let session = store
457            .get_session(state.session_id.as_ref().unwrap())
458            .unwrap()
459            .unwrap();
460        assert_eq!(session.session_name.as_deref(), Some("plugin-session"));
461        assert_eq!(
462            temp.path().join("apps").read_dir().unwrap().count(),
463            1,
464            "discovery file should be written"
465        );
466    }
467
468    #[test]
469    fn exports_redacted_batch_to_sqlite() {
470        let _guard = crate::test_support::global_state_lock();
471        let temp = TempDir::new().unwrap();
472        let state = AuditaurState::initialize(
473            AuditaurConfig {
474                enabled: Some(true),
475                service_name: Some("plugin-test".to_string()),
476                data_dir: Some(temp.path().to_path_buf()),
477                ..AuditaurConfig::default()
478            },
479            123,
480            None,
481        )
482        .unwrap();
483        let session_id = state.session_id.clone().unwrap();
484        crate::tracing::clear_sink(&session_id);
485
486        state
487            .export_batch(OTelBatch {
488                logs: vec![LogRecord {
489                    session_id: String::new(),
490                    timestamp_unix_nanos: 1,
491                    observed_timestamp_unix_nanos: None,
492                    severity_text: Some("INFO".to_string()),
493                    severity_number: Some(9),
494                    body: Some("hello".to_string()),
495                    body_json: Some(json!({ "token": "secret" })),
496                    trace_id: None,
497                    span_id: None,
498                    scope_name: None,
499                    scope_version: None,
500                    attributes: json!({ "api_key": "secret" }),
501                    source: auditaur_core::model::TelemetrySource::Frontend,
502                }],
503                tauri_ipc_calls: vec![TauriIpcCall {
504                    session_id: String::new(),
505                    timestamp_unix_nanos: 2,
506                    duration_ms: Some(1.0),
507                    command: "save".to_string(),
508                    status: "OK".to_string(),
509                    error_message: None,
510                    trace_id: Some("trace".to_string()),
511                    span_id: Some("span".to_string()),
512                    window_label: Some("main".to_string()),
513                    args_json: Some(json!({ "password": "secret" })),
514                    args_redacted: true,
515                    result_summary: Some("ok".to_string()),
516                }],
517                tauri_events: vec![TauriEventRecord {
518                    session_id: String::new(),
519                    timestamp_unix_nanos: 3,
520                    event_name: "save".to_string(),
521                    direction: "emit".to_string(),
522                    target: None,
523                    trace_id: Some("trace".to_string()),
524                    span_id: Some("event-span".to_string()),
525                    window_label: Some("main".to_string()),
526                    payload_summary: Some("payload".to_string()),
527                    payload_json: Some(json!({ "token": "secret" })),
528                    payload_redacted: true,
529                }],
530                ..OTelBatch::default()
531            })
532            .unwrap();
533
534        let db = temp
535            .path()
536            .join("sessions")
537            .join(&session_id)
538            .join("telemetry.sqlite");
539        let store = SqliteStore::open(db).unwrap();
540        let logs = store
541            .list_logs(&auditaur_core::storage::LogQuery::default())
542            .unwrap();
543
544        let log = logs
545            .iter()
546            .find(|log| log.body.as_deref() == Some("hello"))
547            .expect("exported frontend log should be present");
548        assert_eq!(log.session_id, session_id);
549        assert_eq!(log.attributes["api_key"], "[REDACTED]");
550        assert_eq!(log.body_json.as_ref().unwrap()["token"], "[REDACTED]");
551
552        let ipc = store
553            .list_tauri_ipc_calls(&auditaur_core::storage::TauriIpcQuery::default())
554            .unwrap();
555        assert_eq!(ipc[0].session_id, session_id);
556        assert_eq!(ipc[0].args_json.as_ref().unwrap()["password"], "[REDACTED]");
557
558        let events = store
559            .list_tauri_events(&auditaur_core::storage::TauriEventQuery::default())
560            .unwrap();
561        assert_eq!(
562            events[0].payload_json.as_ref().unwrap()["token"],
563            "[REDACTED]"
564        );
565    }
566
567    #[test]
568    fn panic_hook_records_and_clears_with_state() {
569        let _guard = crate::test_support::global_state_lock();
570        let temp = TempDir::new().unwrap();
571        let state = AuditaurState::initialize(
572            AuditaurConfig {
573                enabled: Some(true),
574                service_name: Some("panic-test".to_string()),
575                data_dir: Some(temp.path().to_path_buf()),
576                ..AuditaurConfig::default()
577            },
578            123,
579            None,
580        )
581        .unwrap();
582        let session_id = state.session_id.clone().unwrap();
583        crate::tracing::clear_sink(&session_id);
584
585        let _ = std::panic::catch_unwind(|| panic!("intentional auditaur panic test"));
586
587        let store = store_for(&temp, &session_id);
588        let errors = store
589            .list_frontend_errors(&FrontendErrorQuery::default())
590            .unwrap();
591        assert_eq!(errors.len(), 1);
592        assert_eq!(errors[0].error_type.as_deref(), Some("RustPanic"));
593        assert_eq!(errors[0].message, "intentional auditaur panic test");
594
595        drop(state);
596        let _ = std::panic::catch_unwind(|| panic!("after drop"));
597        let errors_after_drop = store
598            .list_frontend_errors(&FrontendErrorQuery::default())
599            .unwrap();
600        assert_eq!(errors_after_drop.len(), 1);
601    }
602
603    fn store_for(temp: &TempDir, session_id: &str) -> SqliteStore {
604        let db = temp
605            .path()
606            .join("sessions")
607            .join(session_id)
608            .join("telemetry.sqlite");
609        SqliteStore::open(db).unwrap()
610    }
611}