1use std::{
2 fs, panic,
3 path::PathBuf,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, Mutex,
7 },
8 thread,
9 time::Duration,
10};
11
12use auditaur_collector::{
13 exporter_sqlite::{SqliteStore, SQLITE_SCHEMA_VERSION, TELEMETRY_DATABASE_FILE},
14 receiver::OTelBatch,
15 retention::RetentionLimits,
16};
17use auditaur_core::{
18 discovery::DiscoveryFile,
19 model::{FrontendError, LogRecord, Session, TelemetrySource},
20 AuditaurConfig,
21};
22use serde_json::json;
23use time::{format_description::well_known::Rfc3339, OffsetDateTime};
24use uuid::Uuid;
25
26use crate::error::AuditaurError;
27
28static PANIC_SINK: Mutex<Option<PanicSink>> = Mutex::new(None);
29static PANIC_HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
30
31#[derive(Clone)]
32struct PanicSink {
33 session_id: String,
34 store: Arc<Mutex<SqliteStore>>,
35}
36
37pub struct AuditaurState {
38 pub session_id: Option<String>,
39 enabled: bool,
40 store: Option<Arc<Mutex<SqliteStore>>>,
41 discovery_path: Option<PathBuf>,
42 heartbeat_alive: Option<Arc<AtomicBool>>,
43 redact_defaults: bool,
44 extra_redaction_keys: Vec<String>,
45 retention_limits: RetentionLimits,
46}
47
48impl AuditaurState {
49 pub fn initialize(
50 config: AuditaurConfig,
51 pid: u32,
52 app_identifier: Option<String>,
53 ) -> Result<Self, AuditaurError> {
54 let enabled = config.enabled.unwrap_or_else(default_enabled);
55 if !enabled {
56 return Ok(Self {
57 session_id: None,
58 enabled: false,
59 store: None,
60 discovery_path: None,
61 heartbeat_alive: None,
62 redact_defaults: config.redact_defaults,
63 extra_redaction_keys: config.extra_redaction_keys,
64 retention_limits: RetentionLimits::default(),
65 });
66 }
67
68 if !cfg!(debug_assertions) && !config.allow_release_builds {
69 return Err(AuditaurError::new(
70 "Auditaur is disabled in release builds unless allow_release_builds is true.",
71 ));
72 }
73
74 let data_dir = auditaur_core::resolve_data_dir(config.data_dir.as_ref())
75 .map_err(|error| AuditaurError::new(error.to_string()))?;
76 let session_id = Uuid::new_v4().to_string();
77 let instance_id = Uuid::new_v4().to_string();
78 let session_dir = data_dir.join("sessions").join(&session_id);
79 let apps_dir = data_dir.join("apps");
80 fs::create_dir_all(&session_dir)?;
81 fs::create_dir_all(&apps_dir)?;
82
83 let database_path = session_dir.join(TELEMETRY_DATABASE_FILE);
84 let store = SqliteStore::open(&database_path)?;
85 store.migrate()?;
86
87 let service_name = config
88 .service_name
89 .clone()
90 .or_else(|| std::env::var("CARGO_PKG_NAME").ok())
91 .unwrap_or_else(|| "tauri-app".to_string());
92 let service_version = config.service_version.clone();
93 let started_at = now_rfc3339()?;
94 store.create_session(&Session {
95 id: session_id.clone(),
96 session_name: config.session_name.clone(),
97 service_name: service_name.clone(),
98 service_version: service_version.clone(),
99 app_identifier: app_identifier.clone(),
100 pid: Some(i64::from(pid)),
101 started_at: started_at.clone(),
102 ended_at: None,
103 schema_version: SQLITE_SCHEMA_VERSION,
104 auditaur_version: Some(env!("CARGO_PKG_VERSION").to_string()),
105 })?;
106
107 let discovery_path = apps_dir.join(format!("{instance_id}.json"));
108 let discovery = DiscoveryFile {
109 schema_version: 1,
110 instance_id,
111 session_id: session_id.clone(),
112 service_name,
113 service_version,
114 app_identifier,
115 pid,
116 started_at,
117 database_path: database_path.to_string_lossy().to_string(),
118 capabilities: vec![
119 "logs".to_string(),
120 "traces".to_string(),
121 "frontend_errors".to_string(),
122 "ipc".to_string(),
123 "events".to_string(),
124 "windows".to_string(),
125 ],
126 last_heartbeat_at: now_rfc3339()?,
127 };
128 write_discovery(&discovery_path, &discovery)?;
129
130 let heartbeat_alive = Arc::new(AtomicBool::new(true));
131 start_heartbeat(
132 discovery_path.clone(),
133 discovery,
134 heartbeat_alive.clone(),
135 Duration::from_millis(config.heartbeat_interval_ms.max(1_000)),
136 );
137 let store = Arc::new(Mutex::new(store));
138 crate::tracing::install_sink(session_id.clone(), store.clone());
139 install_panic_sink(session_id.clone(), store.clone());
140
141 Ok(Self {
142 session_id: Some(session_id),
143 enabled: true,
144 store: Some(store),
145 discovery_path: Some(discovery_path),
146 heartbeat_alive: Some(heartbeat_alive),
147 redact_defaults: config.redact_defaults,
148 extra_redaction_keys: config.extra_redaction_keys,
149 retention_limits: RetentionLimits {
150 max_session_bytes: config.max_session_bytes,
151 ..RetentionLimits::default()
152 },
153 })
154 }
155
156 pub fn export_batch(&self, batch: OTelBatch) -> Result<(), AuditaurError> {
157 if !self.enabled {
158 return Ok(());
159 }
160
161 let Some(store) = &self.store else {
162 return Err(AuditaurError::new(
163 "Auditaur is enabled without an initialized store.",
164 ));
165 };
166 let session_id = self
167 .session_id
168 .as_deref()
169 .ok_or_else(|| AuditaurError::new("Auditaur is enabled without a session id."))?;
170 let store = store
171 .lock()
172 .map_err(|_| AuditaurError::new("Auditaur SQLite store lock was poisoned."))?;
173
174 for mut span in batch.spans {
175 if span.session_id.is_empty() {
176 span.session_id = session_id.to_string();
177 }
178 span.attributes = self.redact_value(&span.attributes);
179 store.insert_span(&span)?;
180 }
181
182 for mut log in batch.logs {
183 if log.session_id.is_empty() {
184 log.session_id = session_id.to_string();
185 }
186 log.attributes = self.redact_value(&log.attributes);
187 log.body_json = log.body_json.as_ref().map(|value| self.redact_value(value));
188 store.insert_log(&log)?;
189 }
190
191 for mut error in batch.frontend_errors {
192 if error.session_id.is_empty() {
193 error.session_id = session_id.to_string();
194 }
195 error.attributes = self.redact_value(&error.attributes);
196 store.insert_frontend_error(&error)?;
197 }
198
199 for mut call in batch.tauri_ipc_calls {
200 if call.session_id.is_empty() {
201 call.session_id = session_id.to_string();
202 }
203 if let Some(args_json) = &call.args_json {
204 let outcome = auditaur_core::redaction::redact_json_with_options(
205 args_json,
206 self.redact_defaults,
207 &self.extra_redaction_keys,
208 );
209 call.args_json = Some(outcome.value);
210 call.args_redacted = outcome.redacted;
211 } else {
212 call.args_redacted = false;
213 }
214 store.insert_tauri_ipc_call(&call)?;
215 }
216
217 for mut event in batch.tauri_events {
218 if event.session_id.is_empty() {
219 event.session_id = session_id.to_string();
220 }
221 if let Some(payload_json) = &event.payload_json {
222 let outcome = auditaur_core::redaction::redact_json_with_options(
223 payload_json,
224 self.redact_defaults,
225 &self.extra_redaction_keys,
226 );
227 event.payload_json = Some(outcome.value);
228 event.payload_redacted = outcome.redacted;
229 } else {
230 event.payload_redacted = false;
231 }
232 store.insert_tauri_event(&event)?;
233 }
234
235 store.enforce_retention(self.retention_limits)?;
236
237 Ok(())
238 }
239
240 pub fn tracing_layer(&self) -> crate::tracing::AuditaurTracingLayer {
241 match (&self.session_id, &self.store) {
242 (Some(session_id), Some(store)) => {
243 crate::tracing::AuditaurTracingLayer::with_sink(session_id.clone(), store.clone())
244 }
245 _ => crate::tracing::tracing_layer(),
246 }
247 }
248
249 pub(crate) fn store(&self) -> Option<Arc<Mutex<SqliteStore>>> {
250 self.store.clone()
251 }
252
253 fn redact_value(&self, value: &serde_json::Value) -> serde_json::Value {
254 auditaur_core::redaction::redact_json_with_options(
255 value,
256 self.redact_defaults,
257 &self.extra_redaction_keys,
258 )
259 .value
260 }
261}
262
263impl Drop for AuditaurState {
264 fn drop(&mut self) {
265 if let Some(alive) = &self.heartbeat_alive {
266 alive.store(false, Ordering::SeqCst);
267 }
268 if let Some(session_id) = &self.session_id {
269 crate::tracing::clear_sink(session_id);
270 clear_panic_sink(session_id);
271 }
272 if let Some(path) = &self.discovery_path {
273 let _ = fs::remove_file(path);
274 }
275 }
276}
277
278fn default_enabled() -> bool {
279 cfg!(debug_assertions) || std::env::var("AUDITAUR").ok().as_deref() == Some("1")
280}
281
282fn now_rfc3339() -> Result<String, AuditaurError> {
283 OffsetDateTime::now_utc()
284 .format(&Rfc3339)
285 .map_err(|error| AuditaurError::new(error.to_string()))
286}
287
288fn write_discovery(path: &PathBuf, discovery: &DiscoveryFile) -> Result<(), AuditaurError> {
289 fs::write(path, serde_json::to_vec_pretty(discovery)?)?;
290 Ok(())
291}
292
293fn start_heartbeat(
294 discovery_path: PathBuf,
295 mut discovery: DiscoveryFile,
296 alive: Arc<AtomicBool>,
297 interval: Duration,
298) {
299 thread::spawn(move || {
300 while alive.load(Ordering::SeqCst) {
301 thread::sleep(interval);
302 if !alive.load(Ordering::SeqCst) {
303 break;
304 }
305 if let Ok(timestamp) = now_rfc3339() {
306 discovery.last_heartbeat_at = timestamp;
307 let _ = write_discovery(&discovery_path, &discovery);
308 }
309 }
310 });
311}
312
313fn install_panic_sink(session_id: String, store: Arc<Mutex<SqliteStore>>) {
314 if let Ok(mut sink) = PANIC_SINK.lock() {
315 *sink = Some(PanicSink { session_id, store });
316 }
317 if PANIC_HOOK_INSTALLED.swap(true, Ordering::SeqCst) {
318 return;
319 }
320 let previous = panic::take_hook();
321 panic::set_hook(Box::new(move |info| {
322 record_panic(info);
323 previous(info);
324 }));
325}
326
327fn clear_panic_sink(session_id: &str) {
328 let Ok(mut sink) = PANIC_SINK.lock() else {
329 return;
330 };
331 if sink
332 .as_ref()
333 .map(|sink| sink.session_id.as_str() == session_id)
334 .unwrap_or(false)
335 {
336 *sink = None;
337 }
338}
339
340fn active_panic_sink() -> Option<PanicSink> {
341 PANIC_SINK.lock().ok().and_then(|sink| sink.clone())
342}
343
344fn record_panic(info: &panic::PanicHookInfo<'_>) {
345 let Some(sink) = active_panic_sink() else {
346 return;
347 };
348 let Ok(store) = sink.store.try_lock() else {
349 return;
350 };
351 let message = panic_message(info);
352 let location = info.location().map(|location| {
353 format!(
354 "{}:{}:{}",
355 location.file(),
356 location.line(),
357 location.column()
358 )
359 });
360 let timestamp = now_unix_nanos();
361 let attributes = json!({
362 "auditaur.source": "panic_hook",
363 "exception.escaped": true,
364 "code.filepath": info.location().map(|location| location.file()),
365 "code.lineno": info.location().map(|location| location.line()),
366 "code.column": info.location().map(|location| location.column()),
367 });
368 let _ = store.insert_log(&LogRecord {
369 session_id: sink.session_id.clone(),
370 timestamp_unix_nanos: timestamp,
371 observed_timestamp_unix_nanos: None,
372 severity_text: Some("ERROR".to_string()),
373 severity_number: Some(17),
374 body: Some(format!("Rust panic: {message}")),
375 body_json: Some(json!({
376 "message": message,
377 "location": location,
378 })),
379 trace_id: None,
380 span_id: None,
381 scope_name: Some("panic".to_string()),
382 scope_version: None,
383 attributes: attributes.clone(),
384 source: TelemetrySource::Plugin,
385 });
386 let _ = store.insert_frontend_error(&FrontendError {
387 session_id: sink.session_id,
388 timestamp_unix_nanos: timestamp,
389 message,
390 stack: location,
391 filename: info.location().map(|location| location.file().to_string()),
392 line_number: info.location().map(|location| i64::from(location.line())),
393 column_number: info.location().map(|location| i64::from(location.column())),
394 error_type: Some("RustPanic".to_string()),
395 trace_id: None,
396 span_id: None,
397 window_label: None,
398 attributes,
399 });
400}
401
402fn panic_message(info: &panic::PanicHookInfo<'_>) -> String {
403 info.payload()
404 .downcast_ref::<&str>()
405 .map(|message| (*message).to_string())
406 .or_else(|| info.payload().downcast_ref::<String>().cloned())
407 .unwrap_or_else(|| "panic payload was not a string".to_string())
408}
409
410fn now_unix_nanos() -> i64 {
411 let now = std::time::SystemTime::now()
412 .duration_since(std::time::UNIX_EPOCH)
413 .unwrap_or_default();
414 i64::try_from(now.as_nanos()).unwrap_or(i64::MAX)
415}
416
417#[cfg(test)]
418mod tests {
419 use super::AuditaurState;
420 use auditaur_collector::{exporter_sqlite::SqliteStore, receiver::OTelBatch};
421 use auditaur_core::{
422 model::{LogRecord, TauriEventRecord, TauriIpcCall},
423 storage::FrontendErrorQuery,
424 AuditaurConfig,
425 };
426 use serde_json::json;
427 use tempfile::TempDir;
428
429 #[test]
430 fn initializes_session_database_and_discovery_file() {
431 let _guard = crate::test_support::global_state_lock();
432 let temp = TempDir::new().unwrap();
433 let state = AuditaurState::initialize(
434 AuditaurConfig {
435 enabled: Some(true),
436 service_name: Some("plugin-test".to_string()),
437 session_name: Some("plugin-session".to_string()),
438 data_dir: Some(temp.path().to_path_buf()),
439 ..AuditaurConfig::default()
440 },
441 123,
442 Some("dev.auditaur.test".to_string()),
443 )
444 .unwrap();
445 if let Some(session_id) = state.session_id.as_deref() {
446 crate::tracing::clear_sink(session_id);
447 }
448
449 assert!(state.session_id.is_some());
450 let db = temp
451 .path()
452 .join("sessions")
453 .join(state.session_id.as_ref().unwrap())
454 .join("telemetry.sqlite");
455 let store = SqliteStore::open(db).unwrap();
456 let session = store
457 .get_session(state.session_id.as_ref().unwrap())
458 .unwrap()
459 .unwrap();
460 assert_eq!(session.session_name.as_deref(), Some("plugin-session"));
461 assert_eq!(
462 temp.path().join("apps").read_dir().unwrap().count(),
463 1,
464 "discovery file should be written"
465 );
466 }
467
468 #[test]
469 fn exports_redacted_batch_to_sqlite() {
470 let _guard = crate::test_support::global_state_lock();
471 let temp = TempDir::new().unwrap();
472 let state = AuditaurState::initialize(
473 AuditaurConfig {
474 enabled: Some(true),
475 service_name: Some("plugin-test".to_string()),
476 data_dir: Some(temp.path().to_path_buf()),
477 ..AuditaurConfig::default()
478 },
479 123,
480 None,
481 )
482 .unwrap();
483 let session_id = state.session_id.clone().unwrap();
484 crate::tracing::clear_sink(&session_id);
485
486 state
487 .export_batch(OTelBatch {
488 logs: vec![LogRecord {
489 session_id: String::new(),
490 timestamp_unix_nanos: 1,
491 observed_timestamp_unix_nanos: None,
492 severity_text: Some("INFO".to_string()),
493 severity_number: Some(9),
494 body: Some("hello".to_string()),
495 body_json: Some(json!({ "token": "secret" })),
496 trace_id: None,
497 span_id: None,
498 scope_name: None,
499 scope_version: None,
500 attributes: json!({ "api_key": "secret" }),
501 source: auditaur_core::model::TelemetrySource::Frontend,
502 }],
503 tauri_ipc_calls: vec![TauriIpcCall {
504 session_id: String::new(),
505 timestamp_unix_nanos: 2,
506 duration_ms: Some(1.0),
507 command: "save".to_string(),
508 status: "OK".to_string(),
509 error_message: None,
510 trace_id: Some("trace".to_string()),
511 span_id: Some("span".to_string()),
512 window_label: Some("main".to_string()),
513 args_json: Some(json!({ "password": "secret" })),
514 args_redacted: true,
515 result_summary: Some("ok".to_string()),
516 }],
517 tauri_events: vec![TauriEventRecord {
518 session_id: String::new(),
519 timestamp_unix_nanos: 3,
520 event_name: "save".to_string(),
521 direction: "emit".to_string(),
522 target: None,
523 trace_id: Some("trace".to_string()),
524 span_id: Some("event-span".to_string()),
525 window_label: Some("main".to_string()),
526 payload_summary: Some("payload".to_string()),
527 payload_json: Some(json!({ "token": "secret" })),
528 payload_redacted: true,
529 }],
530 ..OTelBatch::default()
531 })
532 .unwrap();
533
534 let db = temp
535 .path()
536 .join("sessions")
537 .join(&session_id)
538 .join("telemetry.sqlite");
539 let store = SqliteStore::open(db).unwrap();
540 let logs = store
541 .list_logs(&auditaur_core::storage::LogQuery::default())
542 .unwrap();
543
544 let log = logs
545 .iter()
546 .find(|log| log.body.as_deref() == Some("hello"))
547 .expect("exported frontend log should be present");
548 assert_eq!(log.session_id, session_id);
549 assert_eq!(log.attributes["api_key"], "[REDACTED]");
550 assert_eq!(log.body_json.as_ref().unwrap()["token"], "[REDACTED]");
551
552 let ipc = store
553 .list_tauri_ipc_calls(&auditaur_core::storage::TauriIpcQuery::default())
554 .unwrap();
555 assert_eq!(ipc[0].session_id, session_id);
556 assert_eq!(ipc[0].args_json.as_ref().unwrap()["password"], "[REDACTED]");
557
558 let events = store
559 .list_tauri_events(&auditaur_core::storage::TauriEventQuery::default())
560 .unwrap();
561 assert_eq!(
562 events[0].payload_json.as_ref().unwrap()["token"],
563 "[REDACTED]"
564 );
565 }
566
567 #[test]
568 fn panic_hook_records_and_clears_with_state() {
569 let _guard = crate::test_support::global_state_lock();
570 let temp = TempDir::new().unwrap();
571 let state = AuditaurState::initialize(
572 AuditaurConfig {
573 enabled: Some(true),
574 service_name: Some("panic-test".to_string()),
575 data_dir: Some(temp.path().to_path_buf()),
576 ..AuditaurConfig::default()
577 },
578 123,
579 None,
580 )
581 .unwrap();
582 let session_id = state.session_id.clone().unwrap();
583 crate::tracing::clear_sink(&session_id);
584
585 let _ = std::panic::catch_unwind(|| panic!("intentional auditaur panic test"));
586
587 let store = store_for(&temp, &session_id);
588 let errors = store
589 .list_frontend_errors(&FrontendErrorQuery::default())
590 .unwrap();
591 assert_eq!(errors.len(), 1);
592 assert_eq!(errors[0].error_type.as_deref(), Some("RustPanic"));
593 assert_eq!(errors[0].message, "intentional auditaur panic test");
594
595 drop(state);
596 let _ = std::panic::catch_unwind(|| panic!("after drop"));
597 let errors_after_drop = store
598 .list_frontend_errors(&FrontendErrorQuery::default())
599 .unwrap();
600 assert_eq!(errors_after_drop.len(), 1);
601 }
602
603 fn store_for(temp: &TempDir, session_id: &str) -> SqliteStore {
604 let db = temp
605 .path()
606 .join("sessions")
607 .join(session_id)
608 .join("telemetry.sqlite");
609 SqliteStore::open(db).unwrap()
610 }
611}