trace_weft_recorder/
lib.rs1use anyhow::Result;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::fs::{self, OpenOptions};
5use tokio::io::AsyncWriteExt;
6use tokio::sync::Mutex;
7use trace_weft_core::{CapturePolicy, EventRecord, SpanRecord};
8
9#[cfg(feature = "sqlite")]
10pub mod sqlite;
11#[cfg(feature = "sqlite")]
12use sqlite::SqliteRecorder;
13
14#[derive(Debug, Clone)]
15pub struct LocalConfig {
16 pub database_path: PathBuf,
17 pub sqlite_db_path: PathBuf,
18 pub blob_dir: PathBuf,
19 pub capture_content: CapturePolicy,
20}
21
22#[async_trait::async_trait]
23pub trait TraceStore: Send + Sync {
24 async fn record_span(&self, span: SpanRecord) -> Result<()>;
25
26 async fn record_event(&self, _event: EventRecord) -> Result<()> {
29 Ok(())
30 }
31}
32
33#[derive(Debug, Clone, Copy, Default)]
37pub struct NullStore;
38
39#[async_trait::async_trait]
40impl TraceStore for NullStore {
41 async fn record_span(&self, _span: SpanRecord) -> Result<()> {
42 Ok(())
43 }
44}
45
46pub struct DualRecorder {
47 jsonl_file: Arc<Mutex<tokio::fs::File>>,
48 events_file: Arc<Mutex<tokio::fs::File>>,
49 #[cfg(feature = "sqlite")]
50 sqlite: SqliteRecorder,
51}
52
53impl DualRecorder {
54 pub async fn new(config: LocalConfig) -> Result<Self> {
55 if let Some(parent) = config.database_path.parent() {
56 fs::create_dir_all(parent).await?;
57 }
58 fs::create_dir_all(&config.blob_dir).await?;
59
60 let file = OpenOptions::new()
61 .create(true)
62 .append(true)
63 .open(&config.database_path)
64 .await?;
65
66 let events_path = config.database_path.with_extension("events.jsonl");
69 let events_file = OpenOptions::new()
70 .create(true)
71 .append(true)
72 .open(&events_path)
73 .await?;
74
75 #[cfg(feature = "sqlite")]
76 let sqlite = SqliteRecorder::new(config.sqlite_db_path).await?;
77
78 Ok(Self {
79 jsonl_file: Arc::new(Mutex::new(file)),
80 events_file: Arc::new(Mutex::new(events_file)),
81 #[cfg(feature = "sqlite")]
82 sqlite,
83 })
84 }
85}
86
87async fn append_jsonl<T: serde::Serialize>(file: &Mutex<tokio::fs::File>, value: &T) -> Result<()> {
88 let json = serde_json::to_string(value)?;
89 let mut file = file.lock().await;
90 file.write_all(json.as_bytes()).await?;
91 file.write_all(b"\n").await?;
92 file.flush().await?;
93 Ok(())
94}
95
96#[async_trait::async_trait]
97impl TraceStore for DualRecorder {
98 async fn record_span(&self, span: SpanRecord) -> Result<()> {
99 append_jsonl(&self.jsonl_file, &span).await?;
100 #[cfg(feature = "sqlite")]
101 self.sqlite.record_span(span).await?;
102 Ok(())
103 }
104
105 async fn record_event(&self, event: EventRecord) -> Result<()> {
106 append_jsonl(&self.events_file, &event).await?;
107 #[cfg(feature = "sqlite")]
108 self.sqlite.record_event(event).await?;
109 Ok(())
110 }
111}
112
113pub type LocalRecorder = DualRecorder;