1use std::path::{Path, PathBuf};
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use rusqlite::{Connection, params};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use thiserror::Error;
9use tokio::task;
10use uuid::Uuid;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct SpanContext {
14 pub id: String,
15 pub parent_id: Option<String>,
16 pub name: String,
17 pub attributes: Value,
18 pub start_time: DateTime<Utc>,
19}
20
21impl SpanContext {
22 pub fn new(name: &str, parent_id: Option<String>, attributes: Value) -> Self {
23 Self {
24 id: Uuid::new_v4().to_string(),
25 parent_id,
26 name: name.to_string(),
27 attributes,
28 start_time: Utc::now(),
29 }
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum SpanStatus {
35 Ok,
36 Error(String),
37}
38
39#[derive(Debug, Error)]
40pub enum TraceError {
41 #[error("database error: {0}")]
42 Database(#[from] rusqlite::Error),
43 #[error("io error: {0}")]
44 Io(#[from] std::io::Error),
45 #[error("task error: {0}")]
46 Join(#[from] tokio::task::JoinError),
47 #[error("serialization error: {0}")]
48 Serialization(String),
49}
50
51#[async_trait]
52pub trait Tracer: Send + Sync {
53 async fn start_span(
54 &self,
55 name: &str,
56 attributes: Value,
57 parent: Option<String>,
58 ) -> Result<SpanContext, TraceError>;
59
60 async fn start_trace(&self, name: &str, attributes: Value) -> Result<SpanContext, TraceError> {
61 self.start_span(name, attributes, None).await
62 }
63
64 async fn start_child_span(
65 &self,
66 parent: &SpanContext,
67 name: &str,
68 attributes: Value,
69 ) -> Result<SpanContext, TraceError> {
70 self.start_span(name, attributes, Some(parent.id.clone()))
71 .await
72 }
73
74 async fn end_span(&self, span: SpanContext, status: SpanStatus) -> Result<(), TraceError>;
75
76 async fn record_event(
77 &self,
78 span_id: &str,
79 name: &str,
80 attributes: Value,
81 ) -> Result<(), TraceError>;
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct StdoutTracer;
86
87#[async_trait]
88impl Tracer for StdoutTracer {
89 async fn start_span(
90 &self,
91 name: &str,
92 attributes: Value,
93 parent: Option<String>,
94 ) -> Result<SpanContext, TraceError> {
95 let ctx = SpanContext::new(name, parent, attributes);
96 println!(
97 "[TRACE] start span {} name={} attrs={}",
98 ctx.id, ctx.name, ctx.attributes
99 );
100 Ok(ctx)
101 }
102
103 async fn end_span(&self, span: SpanContext, status: SpanStatus) -> Result<(), TraceError> {
104 println!(
105 "[TRACE] end span {} status={:?}",
106 span.id,
107 match status {
108 SpanStatus::Ok => "ok".to_string(),
109 SpanStatus::Error(msg) => format!("error:{msg}"),
110 }
111 );
112 Ok(())
113 }
114
115 async fn record_event(
116 &self,
117 span_id: &str,
118 name: &str,
119 attributes: Value,
120 ) -> Result<(), TraceError> {
121 println!(
122 "[TRACE] event span={} name={} attrs={}",
123 span_id, name, attributes
124 );
125 Ok(())
126 }
127}
128
129#[derive(Debug, Clone)]
130pub struct SqliteTracer {
131 path: PathBuf,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
135pub struct StoredSpan {
136 pub id: String,
137 pub name: String,
138 pub parent_id: Option<String>,
139 pub status: Option<String>,
140 pub error: Option<String>,
141 pub start_time: DateTime<Utc>,
142 pub end_time: Option<DateTime<Utc>>,
143 pub attributes: Value,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct StoredEvent {
148 pub span_id: String,
149 pub name: String,
150 pub attributes: Value,
151 pub time: DateTime<Utc>,
152}
153
154impl SqliteTracer {
155 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, TraceError> {
156 let tracer = Self {
157 path: path.as_ref().to_path_buf(),
158 };
159 tracer.init_schema()?;
160 Ok(tracer)
161 }
162
163 pub async fn list_recent_spans(&self, limit: usize) -> Result<Vec<StoredSpan>, TraceError> {
164 self.run(move |conn| {
165 let mut stmt = conn.prepare(
166 "SELECT id, parent_id, name, status, error, attributes, start_time, end_time
167 FROM spans ORDER BY start_time DESC LIMIT ?1",
168 )?;
169 let rows = stmt.query_map(params![limit as i64], |row| {
170 let start_time: String = row.get(6)?;
171 let end_time: Option<String> = row.get(7)?;
172 Ok(StoredSpan {
173 id: row.get(0)?,
174 parent_id: row.get(1)?,
175 name: row.get(2)?,
176 status: row.get(3)?,
177 error: row.get(4)?,
178 attributes: serde_json::from_str::<Value>(&row.get::<_, String>(5)?)
179 .unwrap_or(Value::Null),
180 start_time: DateTime::parse_from_rfc3339(&start_time)
181 .map(|dt| dt.with_timezone(&Utc))
182 .unwrap_or_else(|_| Utc::now()),
183 end_time: end_time.and_then(|ts| {
184 DateTime::parse_from_rfc3339(&ts)
185 .ok()
186 .map(|dt| dt.with_timezone(&Utc))
187 }),
188 })
189 })?;
190 let spans = rows.collect::<Result<Vec<_>, _>>()?;
191 Ok(spans)
192 })
193 .await
194 }
195
196 pub async fn list_events(&self, span_id: &str) -> Result<Vec<StoredEvent>, TraceError> {
197 let span_id = span_id.to_string();
198 self.run(move |conn| {
199 let mut stmt = conn.prepare(
200 "SELECT span_id, name, attributes, time FROM events WHERE span_id = ?1 ORDER BY time ASC",
201 )?;
202 let events = stmt
203 .query_map(params![span_id], |row| {
204 let time: String = row.get(3)?;
205 Ok(StoredEvent {
206 span_id: row.get(0)?,
207 name: row.get(1)?,
208 attributes: serde_json::from_str::<Value>(&row.get::<_, String>(2)?)
209 .unwrap_or(Value::Null),
210 time: DateTime::parse_from_rfc3339(&time)
211 .map(|dt| dt.with_timezone(&Utc))
212 .unwrap_or_else(|_| Utc::now()),
213 })
214 })?
215 .collect::<Result<Vec<_>, _>>()?;
216 Ok(events)
217 })
218 .await
219 }
220
221 fn init_schema(&self) -> Result<(), TraceError> {
222 let conn = Connection::open(&self.path)?;
223 conn.execute_batch(
224 "
225 CREATE TABLE IF NOT EXISTS spans (
226 id TEXT PRIMARY KEY,
227 parent_id TEXT,
228 name TEXT NOT NULL,
229 status TEXT,
230 error TEXT,
231 attributes TEXT,
232 start_time TEXT NOT NULL,
233 end_time TEXT
234 );
235 CREATE TABLE IF NOT EXISTS events (
236 id INTEGER PRIMARY KEY AUTOINCREMENT,
237 span_id TEXT NOT NULL,
238 name TEXT NOT NULL,
239 attributes TEXT,
240 time TEXT NOT NULL
241 );
242 ",
243 )?;
244 Ok(())
245 }
246
247 async fn run<R, F>(&self, f: F) -> Result<R, TraceError>
248 where
249 R: Send + 'static,
250 F: FnOnce(&Connection) -> Result<R, rusqlite::Error> + Send + 'static,
251 {
252 let path = self.path.clone();
253 task::spawn_blocking(move || {
254 let conn = Connection::open(path)?;
255 f(&conn)
256 })
257 .await?
258 .map_err(TraceError::from)
259 }
260}
261
262#[async_trait]
263impl Tracer for SqliteTracer {
264 async fn start_span(
265 &self,
266 name: &str,
267 attributes: Value,
268 parent: Option<String>,
269 ) -> Result<SpanContext, TraceError> {
270 let ctx = SpanContext::new(name, parent, attributes);
271 let ctx_clone = ctx.clone();
272 self.run(move |conn| {
273 conn.execute(
274 "INSERT INTO spans (id, parent_id, name, attributes, start_time) VALUES (?1, ?2, ?3, ?4, ?5)",
275 params![
276 ctx_clone.id,
277 ctx_clone.parent_id,
278 ctx_clone.name,
279 ctx_clone.attributes.to_string(),
280 ctx_clone.start_time.to_rfc3339(),
281 ],
282 )?;
283 Ok(())
284 })
285 .await?;
286 Ok(ctx)
287 }
288
289 async fn end_span(&self, span: SpanContext, status: SpanStatus) -> Result<(), TraceError> {
290 let status_text = match &status {
291 SpanStatus::Ok => "ok".to_string(),
292 SpanStatus::Error(_) => "error".to_string(),
293 };
294 let error_text = match status {
295 SpanStatus::Ok => None,
296 SpanStatus::Error(msg) => Some(msg),
297 };
298 self.run(move |conn| {
299 conn.execute(
300 "UPDATE spans SET status = ?1, error = ?2, end_time = ?3 WHERE id = ?4",
301 params![status_text, error_text, Utc::now().to_rfc3339(), span.id,],
302 )?;
303 Ok(())
304 })
305 .await?;
306 Ok(())
307 }
308
309 async fn record_event(
310 &self,
311 span_id: &str,
312 name: &str,
313 attributes: Value,
314 ) -> Result<(), TraceError> {
315 let span_id = span_id.to_string();
316 let name = name.to_string();
317 self.run(move |conn| {
318 conn.execute(
319 "INSERT INTO events (span_id, name, attributes, time) VALUES (?1, ?2, ?3, ?4)",
320 params![
321 span_id,
322 name,
323 attributes.to_string(),
324 Utc::now().to_rfc3339()
325 ],
326 )?;
327 Ok(())
328 })
329 .await?;
330 Ok(())
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use serde_json::json;
338 use tempfile::NamedTempFile;
339
340 #[tokio::test]
341 async fn sqlite_tracer_records_spans_and_events() {
342 let file = NamedTempFile::new().unwrap();
343 let tracer = SqliteTracer::new(file.path()).unwrap();
344 let span = tracer
345 .start_span("test", json!({"key": "value"}), None)
346 .await
347 .unwrap();
348 tracer
349 .record_event(&span.id, "step", json!({"state": 1}))
350 .await
351 .unwrap();
352 tracer.end_span(span.clone(), SpanStatus::Ok).await.unwrap();
353 let spans = tracer.list_recent_spans(10).await.unwrap();
354 assert_eq!(spans.len(), 1);
355 assert_eq!(spans[0].name, "test");
356 assert_eq!(spans[0].status.as_deref(), Some("ok"));
357 let events = tracer.list_events(&span.id).await.unwrap();
358 assert_eq!(events.len(), 1);
359 assert_eq!(events[0].name, "step");
360 }
361}