1use std::{
2 fs, panic,
3 path::PathBuf,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, Mutex,
7 },
8 thread,
9 time::Duration,
10};
11
12use auditaur_collector::{
13 exporter_sqlite::{SqliteStore, SQLITE_SCHEMA_VERSION, TELEMETRY_DATABASE_FILE},
14 receiver::OTelBatch,
15 retention::RetentionLimits,
16};
17use auditaur_core::{
18 discovery::DiscoveryFile,
19 model::{FrontendError, LogRecord, Session, TelemetrySource},
20 AuditaurConfig,
21};
22use serde_json::json;
23use time::{format_description::well_known::Rfc3339, OffsetDateTime};
24use uuid::Uuid;
25
26use crate::error::AuditaurError;
27
28static PANIC_SINK: Mutex<Option<PanicSink>> = Mutex::new(None);
29static PANIC_HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
30
31#[derive(Clone)]
32struct PanicSink {
33 session_id: String,
34 store: Arc<Mutex<SqliteStore>>,
35}
36
37pub struct AuditaurState {
38 pub session_id: Option<String>,
39 enabled: bool,
40 store: Option<Arc<Mutex<SqliteStore>>>,
41 discovery_path: Option<PathBuf>,
42 heartbeat_alive: Option<Arc<AtomicBool>>,
43 redact_defaults: bool,
44 extra_redaction_keys: Vec<String>,
45 retention_limits: RetentionLimits,
46}
47
48impl AuditaurState {
49 pub fn initialize(
50 config: AuditaurConfig,
51 pid: u32,
52 app_identifier: Option<String>,
53 ) -> Result<Self, AuditaurError> {
54 let enabled = config.enabled.unwrap_or_else(default_enabled);
55 if !enabled {
56 return Ok(Self {
57 session_id: None,
58 enabled: false,
59 store: None,
60 discovery_path: None,
61 heartbeat_alive: None,
62 redact_defaults: config.redact_defaults,
63 extra_redaction_keys: config.extra_redaction_keys,
64 retention_limits: RetentionLimits::default(),
65 });
66 }
67
68 if !cfg!(debug_assertions) && !config.allow_release_builds {
69 return Err(AuditaurError::new(
70 "Auditaur is disabled in release builds unless allow_release_builds is true.",
71 ));
72 }
73
74 let data_dir = auditaur_core::resolve_data_dir(config.data_dir.as_ref())
75 .map_err(|error| AuditaurError::new(error.to_string()))?;
76 let session_id = Uuid::new_v4().to_string();
77 let instance_id = Uuid::new_v4().to_string();
78 let session_dir = data_dir.join("sessions").join(&session_id);
79 let apps_dir = data_dir.join("apps");
80 fs::create_dir_all(&session_dir)?;
81 fs::create_dir_all(&apps_dir)?;
82
83 let database_path = session_dir.join(TELEMETRY_DATABASE_FILE);
84 let store = SqliteStore::open(&database_path)?;
85 store.migrate()?;
86
87 let service_name = config
88 .service_name
89 .clone()
90 .or_else(|| std::env::var("CARGO_PKG_NAME").ok())
91 .unwrap_or_else(|| "tauri-app".to_string());
92 let service_version = config.service_version.clone();
93 let started_at = now_rfc3339()?;
94 store.create_session(&Session {
95 id: session_id.clone(),
96 session_name: config.session_name.clone(),
97 service_name: service_name.clone(),
98 service_version: service_version.clone(),
99 app_identifier: app_identifier.clone(),
100 pid: Some(i64::from(pid)),
101 started_at: started_at.clone(),
102 ended_at: None,
103 schema_version: SQLITE_SCHEMA_VERSION,
104 auditaur_version: Some(env!("CARGO_PKG_VERSION").to_string()),
105 })?;
106
107 let discovery_path = apps_dir.join(format!("{instance_id}.json"));
108 let discovery = DiscoveryFile {
109 schema_version: 1,
110 instance_id,
111 session_id: session_id.clone(),
112 service_name,
113 service_version,
114 app_identifier,
115 pid,
116 started_at,
117 database_path: database_path.to_string_lossy().to_string(),
118 capabilities: vec![
119 "logs".to_string(),
120 "traces".to_string(),
121 "frontend_errors".to_string(),
122 "ipc".to_string(),
123 "events".to_string(),
124 "windows".to_string(),
125 ],
126 last_heartbeat_at: now_rfc3339()?,
127 };
128 write_discovery(&discovery_path, &discovery)?;
129
130 let heartbeat_alive = Arc::new(AtomicBool::new(true));
131 start_heartbeat(
132 discovery_path.clone(),
133 discovery,
134 heartbeat_alive.clone(),
135 Duration::from_millis(config.heartbeat_interval_ms.max(1_000)),
136 );
137 let store = Arc::new(Mutex::new(store));
138 crate::tracing::install_sink(session_id.clone(), store.clone());
139 install_panic_sink(session_id.clone(), store.clone());
140
141 Ok(Self {
142 session_id: Some(session_id),
143 enabled: true,
144 store: Some(store),
145 discovery_path: Some(discovery_path),
146 heartbeat_alive: Some(heartbeat_alive),
147 redact_defaults: config.redact_defaults,
148 extra_redaction_keys: config.extra_redaction_keys,
149 retention_limits: RetentionLimits {
150 max_session_bytes: config.max_session_bytes,
151 ..RetentionLimits::default()
152 },
153 })
154 }
155
156 pub fn export_batch(&self, batch: OTelBatch) -> Result<(), AuditaurError> {
157 if !self.enabled {
158 return Ok(());
159 }
160
161 let Some(store) = &self.store else {
162 return Err(AuditaurError::new(
163 "Auditaur is enabled without an initialized store.",
164 ));
165 };
166 let session_id = self
167 .session_id
168 .as_deref()
169 .ok_or_else(|| AuditaurError::new("Auditaur is enabled without a session id."))?;
170 let store = store
171 .lock()
172 .map_err(|_| AuditaurError::new("Auditaur SQLite store lock was poisoned."))?;
173
174 for mut span in batch.spans {
175 if span.session_id.is_empty() {
176 span.session_id = session_id.to_string();
177 }
178 span.attributes = self.redact_value(&span.attributes);
179 store.insert_span(&span)?;
180 }
181
182 for mut event in batch.span_events {
183 if event.session_id.is_empty() {
184 event.session_id = session_id.to_string();
185 }
186 event.attributes = self.redact_value(&event.attributes);
187 store.insert_span_event(&event)?;
188 }
189
190 for mut log in batch.logs {
191 if log.session_id.is_empty() {
192 log.session_id = session_id.to_string();
193 }
194 log.attributes = self.redact_value(&log.attributes);
195 log.body_json = log.body_json.as_ref().map(|value| self.redact_value(value));
196 store.insert_log(&log)?;
197 }
198
199 for mut error in batch.frontend_errors {
200 if error.session_id.is_empty() {
201 error.session_id = session_id.to_string();
202 }
203 error.attributes = self.redact_value(&error.attributes);
204 store.insert_frontend_error(&error)?;
205 }
206
207 for mut call in batch.tauri_ipc_calls {
208 if call.session_id.is_empty() {
209 call.session_id = session_id.to_string();
210 }
211 if let Some(args_json) = &call.args_json {
212 let outcome = auditaur_core::redaction::redact_json_with_options(
213 args_json,
214 self.redact_defaults,
215 &self.extra_redaction_keys,
216 );
217 call.args_json = Some(outcome.value);
218 call.args_redacted = outcome.redacted;
219 } else {
220 call.args_redacted = false;
221 }
222 store.insert_tauri_ipc_call(&call)?;
223 }
224
225 for mut event in batch.tauri_events {
226 if event.session_id.is_empty() {
227 event.session_id = session_id.to_string();
228 }
229 if let Some(payload_json) = &event.payload_json {
230 let outcome = auditaur_core::redaction::redact_json_with_options(
231 payload_json,
232 self.redact_defaults,
233 &self.extra_redaction_keys,
234 );
235 event.payload_json = Some(outcome.value);
236 event.payload_redacted = outcome.redacted;
237 } else {
238 event.payload_redacted = false;
239 }
240 store.insert_tauri_event(&event)?;
241 }
242
243 store.enforce_retention(self.retention_limits)?;
244
245 Ok(())
246 }
247
248 pub fn tracing_layer(&self) -> crate::tracing::AuditaurTracingLayer {
249 match (&self.session_id, &self.store) {
250 (Some(session_id), Some(store)) => {
251 crate::tracing::AuditaurTracingLayer::with_sink(session_id.clone(), store.clone())
252 }
253 _ => crate::tracing::tracing_layer(),
254 }
255 }
256
257 pub(crate) fn store(&self) -> Option<Arc<Mutex<SqliteStore>>> {
258 self.store.clone()
259 }
260
261 fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
262 auditaur_core::redaction::redact_json_with_options(
263 value,
264 self.redact_defaults,
265 &self.extra_redaction_keys,
266 )
267 .value
268 }
269}
270
271impl Drop for AuditaurState {
272 fn drop(&mut self) {
273 if let Some(alive) = &self.heartbeat_alive {
274 alive.store(false, Ordering::SeqCst);
275 }
276 if let Some(session_id) = &self.session_id {
277 crate::tracing::clear_sink(session_id);
278 clear_panic_sink(session_id);
279 }
280 if let Some(path) = &self.discovery_path {
281 let _ = fs::remove_file(path);
282 }
283 }
284}
285
286fn default_enabled() -> bool {
287 cfg!(debug_assertions) || std::env::var("AUDITAUR").ok().as_deref() == Some("1")
288}
289
290fn now_rfc3339() -> Result<String, AuditaurError> {
291 OffsetDateTime::now_utc()
292 .format(&Rfc3339)
293 .map_err(|error| AuditaurError::new(error.to_string()))
294}
295
296fn write_discovery(path: &PathBuf, discovery: &DiscoveryFile) -> Result<(), AuditaurError> {
297 fs::write(path, serde_json::to_vec_pretty(discovery)?)?;
298 Ok(())
299}
300
301fn start_heartbeat(
302 discovery_path: PathBuf,
303 mut discovery: DiscoveryFile,
304 alive: Arc<AtomicBool>,
305 interval: Duration,
306) {
307 thread::spawn(move || {
308 while alive.load(Ordering::SeqCst) {
309 thread::sleep(interval);
310 if !alive.load(Ordering::SeqCst) {
311 break;
312 }
313 if let Ok(timestamp) = now_rfc3339() {
314 discovery.last_heartbeat_at = timestamp;
315 let _ = write_discovery(&discovery_path, &discovery);
316 }
317 }
318 });
319}
320
321fn install_panic_sink(session_id: String, store: Arc<Mutex<SqliteStore>>) {
322 if let Ok(mut sink) = PANIC_SINK.lock() {
323 *sink = Some(PanicSink { session_id, store });
324 }
325 if PANIC_HOOK_INSTALLED.swap(true, Ordering::SeqCst) {
326 return;
327 }
328 let previous = panic::take_hook();
329 panic::set_hook(Box::new(move |info| {
330 record_panic(info);
331 previous(info);
332 }));
333}
334
335fn clear_panic_sink(session_id: &str) {
336 let Ok(mut sink) = PANIC_SINK.lock() else {
337 return;
338 };
339 if sink
340 .as_ref()
341 .map(|sink| sink.session_id.as_str() == session_id)
342 .unwrap_or(false)
343 {
344 *sink = None;
345 }
346}
347
348fn active_panic_sink() -> Option<PanicSink> {
349 PANIC_SINK.lock().ok().and_then(|sink| sink.clone())
350}
351
352fn record_panic(info: &panic::PanicHookInfo<'_>) {
353 let Some(sink) = active_panic_sink() else {
354 return;
355 };
356 let Ok(store) = sink.store.try_lock() else {
357 return;
358 };
359 let message = panic_message(info);
360 let location = info.location().map(|location| {
361 format!(
362 "{}:{}:{}",
363 location.file(),
364 location.line(),
365 location.column()
366 )
367 });
368 let timestamp = now_unix_nanos();
369 let attributes = json!({
370 "auditaur.source": "panic_hook",
371 "exception.escaped": true,
372 "code.filepath": info.location().map(|location| location.file()),
373 "code.lineno": info.location().map(|location| location.line()),
374 "code.column": info.location().map(|location| location.column()),
375 });
376 let _ = store.insert_log(&LogRecord {
377 session_id: sink.session_id.clone(),
378 timestamp_unix_nanos: timestamp,
379 observed_timestamp_unix_nanos: None,
380 severity_text: Some("ERROR".to_string()),
381 severity_number: Some(17),
382 body: Some(format!("Rust panic: {message}")),
383 body_json: Some(json!({
384 "message": message,
385 "location": location,
386 })),
387 trace_id: None,
388 span_id: None,
389 scope_name: Some("panic".to_string()),
390 scope_version: None,
391 attributes: attributes.clone(),
392 source: TelemetrySource::Plugin,
393 });
394 let _ = store.insert_frontend_error(&FrontendError {
395 session_id: sink.session_id,
396 timestamp_unix_nanos: timestamp,
397 message,
398 stack: location,
399 filename: info.location().map(|location| location.file().to_string()),
400 line_number: info.location().map(|location| i64::from(location.line())),
401 column_number: info.location().map(|location| i64::from(location.column())),
402 error_type: Some("RustPanic".to_string()),
403 trace_id: None,
404 span_id: None,
405 window_label: None,
406 attributes,
407 });
408}
409
410fn panic_message(info: &panic::PanicHookInfo<'_>) -> String {
411 info.payload()
412 .downcast_ref::<&str>()
413 .map(|message| (*message).to_string())
414 .or_else(|| info.payload().downcast_ref::<String>().cloned())
415 .unwrap_or_else(|| "panic payload was not a string".to_string())
416}
417
418fn now_unix_nanos() -> i64 {
419 let now = std::time::SystemTime::now()
420 .duration_since(std::time::UNIX_EPOCH)
421 .unwrap_or_default();
422 i64::try_from(now.as_nanos()).unwrap_or(i64::MAX)
423}
424
425#[cfg(test)]
426mod tests {
427 use super::AuditaurState;
428 use auditaur_collector::{exporter_sqlite::SqliteStore, receiver::OTelBatch};
429 use auditaur_core::{
430 model::{LogRecord, SpanEventRecord, SpanRecord, TauriEventRecord, TauriIpcCall},
431 storage::{FrontendErrorQuery, SpanEventQuery},
432 AuditaurConfig,
433 };
434 use serde_json::json;
435 use tempfile::TempDir;
436
437 #[test]
438 fn initializes_session_database_and_discovery_file() {
439 let _guard = crate::test_support::global_state_lock();
440 let temp = TempDir::new().unwrap();
441 let state = AuditaurState::initialize(
442 AuditaurConfig {
443 enabled: Some(true),
444 service_name: Some("plugin-test".to_string()),
445 session_name: Some("plugin-session".to_string()),
446 data_dir: Some(temp.path().to_path_buf()),
447 ..AuditaurConfig::default()
448 },
449 123,
450 Some("dev.auditaur.test".to_string()),
451 )
452 .unwrap();
453 if let Some(session_id) = state.session_id.as_deref() {
454 crate::tracing::clear_sink(session_id);
455 }
456
457 assert!(state.session_id.is_some());
458 let db = temp
459 .path()
460 .join("sessions")
461 .join(state.session_id.as_ref().unwrap())
462 .join("telemetry.sqlite");
463 let store = SqliteStore::open(db).unwrap();
464 let session = store
465 .get_session(state.session_id.as_ref().unwrap())
466 .unwrap()
467 .unwrap();
468 assert_eq!(session.session_name.as_deref(), Some("plugin-session"));
469 assert_eq!(
470 temp.path().join("apps").read_dir().unwrap().count(),
471 1,
472 "discovery file should be written"
473 );
474 }
475
476 #[test]
477 fn exports_redacted_batch_to_sqlite() {
478 let _guard = crate::test_support::global_state_lock();
479 let temp = TempDir::new().unwrap();
480 let state = AuditaurState::initialize(
481 AuditaurConfig {
482 enabled: Some(true),
483 service_name: Some("plugin-test".to_string()),
484 data_dir: Some(temp.path().to_path_buf()),
485 ..AuditaurConfig::default()
486 },
487 123,
488 None,
489 )
490 .unwrap();
491 let session_id = state.session_id.clone().unwrap();
492 crate::tracing::clear_sink(&session_id);
493
494 state
495 .export_batch(OTelBatch {
496 logs: vec![LogRecord {
497 session_id: String::new(),
498 timestamp_unix_nanos: 1,
499 observed_timestamp_unix_nanos: None,
500 severity_text: Some("INFO".to_string()),
501 severity_number: Some(9),
502 body: Some("hello".to_string()),
503 body_json: Some(json!({ "token": "secret" })),
504 trace_id: None,
505 span_id: None,
506 scope_name: None,
507 scope_version: None,
508 attributes: json!({ "api_key": "secret" }),
509 source: auditaur_core::model::TelemetrySource::Frontend,
510 }],
511 spans: vec![SpanRecord {
512 session_id: String::new(),
513 trace_id: "trace".to_string(),
514 span_id: "span".to_string(),
515 parent_span_id: None,
516 name: "agentive.run".to_string(),
517 kind: Some("internal".to_string()),
518 start_time_unix_nanos: 1,
519 end_time_unix_nanos: Some(4),
520 status_code: Some("OK".to_string()),
521 status_message: None,
522 scope_name: Some("agentive".to_string()),
523 scope_version: Some("0.2.1".to_string()),
524 attributes: json!({ "agentive.run_id": "run", "token": "secret" }),
525 source: auditaur_core::model::TelemetrySource::ThirdPartyOtel,
526 }],
527 span_events: vec![SpanEventRecord {
528 session_id: String::new(),
529 trace_id: "trace".to_string(),
530 span_id: "span".to_string(),
531 name: "agent-event".to_string(),
532 timestamp_unix_nanos: 2,
533 attributes: json!({ "token": "secret", "summary": "done" }),
534 }],
535 tauri_ipc_calls: vec![TauriIpcCall {
536 session_id: String::new(),
537 timestamp_unix_nanos: 2,
538 duration_ms: Some(1.0),
539 command: "save".to_string(),
540 status: "OK".to_string(),
541 error_message: None,
542 trace_id: Some("trace".to_string()),
543 span_id: Some("span".to_string()),
544 window_label: Some("main".to_string()),
545 args_json: Some(json!({ "password": "secret" })),
546 args_redacted: true,
547 result_summary: Some("ok".to_string()),
548 }],
549 tauri_events: vec![TauriEventRecord {
550 session_id: String::new(),
551 timestamp_unix_nanos: 3,
552 event_name: "save".to_string(),
553 direction: "emit".to_string(),
554 target: None,
555 trace_id: Some("trace".to_string()),
556 span_id: Some("event-span".to_string()),
557 window_label: Some("main".to_string()),
558 payload_summary: Some("payload".to_string()),
559 payload_json: Some(json!({ "token": "secret" })),
560 payload_redacted: true,
561 }],
562 ..OTelBatch::default()
563 })
564 .unwrap();
565
566 let db = temp
567 .path()
568 .join("sessions")
569 .join(&session_id)
570 .join("telemetry.sqlite");
571 let store = SqliteStore::open(db).unwrap();
572 let logs = store
573 .list_logs(&auditaur_core::storage::LogQuery::default())
574 .unwrap();
575
576 let log = logs
577 .iter()
578 .find(|log| log.body.as_deref() == Some("hello"))
579 .expect("exported frontend log should be present");
580 assert_eq!(log.session_id, session_id);
581 assert_eq!(log.attributes["api_key"], "[REDACTED]");
582 assert_eq!(log.body_json.as_ref().unwrap()["token"], "[REDACTED]");
583
584 let span_events = store.list_span_events(&SpanEventQuery::default()).unwrap();
585 assert_eq!(span_events[0].session_id, session_id);
586 assert_eq!(span_events[0].attributes["token"], "[REDACTED]");
587
588 let ipc = store
589 .list_tauri_ipc_calls(&auditaur_core::storage::TauriIpcQuery::default())
590 .unwrap();
591 assert_eq!(ipc[0].session_id, session_id);
592 assert_eq!(ipc[0].args_json.as_ref().unwrap()["password"], "[REDACTED]");
593
594 let events = store
595 .list_tauri_events(&auditaur_core::storage::TauriEventQuery::default())
596 .unwrap();
597 assert_eq!(
598 events[0].payload_json.as_ref().unwrap()["token"],
599 "[REDACTED]"
600 );
601 }
602
603 #[test]
604 fn panic_hook_records_and_clears_with_state() {
605 let _guard = crate::test_support::global_state_lock();
606 let temp = TempDir::new().unwrap();
607 let state = AuditaurState::initialize(
608 AuditaurConfig {
609 enabled: Some(true),
610 service_name: Some("panic-test".to_string()),
611 data_dir: Some(temp.path().to_path_buf()),
612 ..AuditaurConfig::default()
613 },
614 123,
615 None,
616 )
617 .unwrap();
618 let session_id = state.session_id.clone().unwrap();
619 crate::tracing::clear_sink(&session_id);
620
621 let _ = std::panic::catch_unwind(|| panic!("intentional auditaur panic test"));
622
623 let store = store_for(&temp, &session_id);
624 let errors = store
625 .list_frontend_errors(&FrontendErrorQuery::default())
626 .unwrap();
627 assert_eq!(errors.len(), 1);
628 assert_eq!(errors[0].error_type.as_deref(), Some("RustPanic"));
629 assert_eq!(errors[0].message, "intentional auditaur panic test");
630
631 drop(state);
632 let _ = std::panic::catch_unwind(|| panic!("after drop"));
633 let errors_after_drop = store
634 .list_frontend_errors(&FrontendErrorQuery::default())
635 .unwrap();
636 assert_eq!(errors_after_drop.len(), 1);
637 }
638
639 fn store_for(temp: &TempDir, session_id: &str) -> SqliteStore {
640 let db = temp
641 .path()
642 .join("sessions")
643 .join(session_id)
644 .join("telemetry.sqlite");
645 SqliteStore::open(db).unwrap()
646 }
647}