hefa_core/tracing/
mod.rs

1use std::path::{Path, PathBuf};
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use rusqlite::{Connection, params};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use thiserror::Error;
9use tokio::task;
10use uuid::Uuid;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct SpanContext {
14    pub id: String,
15    pub parent_id: Option<String>,
16    pub name: String,
17    pub attributes: Value,
18    pub start_time: DateTime<Utc>,
19}
20
21impl SpanContext {
22    pub fn new(name: &str, parent_id: Option<String>, attributes: Value) -> Self {
23        Self {
24            id: Uuid::new_v4().to_string(),
25            parent_id,
26            name: name.to_string(),
27            attributes,
28            start_time: Utc::now(),
29        }
30    }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum SpanStatus {
35    Ok,
36    Error(String),
37}
38
39#[derive(Debug, Error)]
40pub enum TraceError {
41    #[error("database error: {0}")]
42    Database(#[from] rusqlite::Error),
43    #[error("io error: {0}")]
44    Io(#[from] std::io::Error),
45    #[error("task error: {0}")]
46    Join(#[from] tokio::task::JoinError),
47    #[error("serialization error: {0}")]
48    Serialization(String),
49}
50
51#[async_trait]
52pub trait Tracer: Send + Sync {
53    async fn start_span(
54        &self,
55        name: &str,
56        attributes: Value,
57        parent: Option<String>,
58    ) -> Result<SpanContext, TraceError>;
59
60    async fn start_trace(&self, name: &str, attributes: Value) -> Result<SpanContext, TraceError> {
61        self.start_span(name, attributes, None).await
62    }
63
64    async fn start_child_span(
65        &self,
66        parent: &SpanContext,
67        name: &str,
68        attributes: Value,
69    ) -> Result<SpanContext, TraceError> {
70        self.start_span(name, attributes, Some(parent.id.clone()))
71            .await
72    }
73
74    async fn end_span(&self, span: SpanContext, status: SpanStatus) -> Result<(), TraceError>;
75
76    async fn record_event(
77        &self,
78        span_id: &str,
79        name: &str,
80        attributes: Value,
81    ) -> Result<(), TraceError>;
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct StdoutTracer;
86
87#[async_trait]
88impl Tracer for StdoutTracer {
89    async fn start_span(
90        &self,
91        name: &str,
92        attributes: Value,
93        parent: Option<String>,
94    ) -> Result<SpanContext, TraceError> {
95        let ctx = SpanContext::new(name, parent, attributes);
96        println!(
97            "[TRACE] start span {} name={} attrs={}",
98            ctx.id, ctx.name, ctx.attributes
99        );
100        Ok(ctx)
101    }
102
103    async fn end_span(&self, span: SpanContext, status: SpanStatus) -> Result<(), TraceError> {
104        println!(
105            "[TRACE] end span {} status={:?}",
106            span.id,
107            match status {
108                SpanStatus::Ok => "ok".to_string(),
109                SpanStatus::Error(msg) => format!("error:{msg}"),
110            }
111        );
112        Ok(())
113    }
114
115    async fn record_event(
116        &self,
117        span_id: &str,
118        name: &str,
119        attributes: Value,
120    ) -> Result<(), TraceError> {
121        println!(
122            "[TRACE] event span={} name={} attrs={}",
123            span_id, name, attributes
124        );
125        Ok(())
126    }
127}
128
129#[derive(Debug, Clone)]
130pub struct SqliteTracer {
131    path: PathBuf,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
135pub struct StoredSpan {
136    pub id: String,
137    pub name: String,
138    pub parent_id: Option<String>,
139    pub status: Option<String>,
140    pub error: Option<String>,
141    pub start_time: DateTime<Utc>,
142    pub end_time: Option<DateTime<Utc>>,
143    pub attributes: Value,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct StoredEvent {
148    pub span_id: String,
149    pub name: String,
150    pub attributes: Value,
151    pub time: DateTime<Utc>,
152}
153
154impl SqliteTracer {
155    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, TraceError> {
156        let tracer = Self {
157            path: path.as_ref().to_path_buf(),
158        };
159        tracer.init_schema()?;
160        Ok(tracer)
161    }
162
163    pub async fn list_recent_spans(&self, limit: usize) -> Result<Vec<StoredSpan>, TraceError> {
164        self.run(move |conn| {
165            let mut stmt = conn.prepare(
166                "SELECT id, parent_id, name, status, error, attributes, start_time, end_time
167                 FROM spans ORDER BY start_time DESC LIMIT ?1",
168            )?;
169            let rows = stmt.query_map(params![limit as i64], |row| {
170                let start_time: String = row.get(6)?;
171                let end_time: Option<String> = row.get(7)?;
172                Ok(StoredSpan {
173                    id: row.get(0)?,
174                    parent_id: row.get(1)?,
175                    name: row.get(2)?,
176                    status: row.get(3)?,
177                    error: row.get(4)?,
178                    attributes: serde_json::from_str::<Value>(&row.get::<_, String>(5)?)
179                        .unwrap_or(Value::Null),
180                    start_time: DateTime::parse_from_rfc3339(&start_time)
181                        .map(|dt| dt.with_timezone(&Utc))
182                        .unwrap_or_else(|_| Utc::now()),
183                    end_time: end_time.and_then(|ts| {
184                        DateTime::parse_from_rfc3339(&ts)
185                            .ok()
186                            .map(|dt| dt.with_timezone(&Utc))
187                    }),
188                })
189            })?;
190            let spans = rows.collect::<Result<Vec<_>, _>>()?;
191            Ok(spans)
192        })
193        .await
194    }
195
196    pub async fn list_events(&self, span_id: &str) -> Result<Vec<StoredEvent>, TraceError> {
197        let span_id = span_id.to_string();
198        self.run(move |conn| {
199            let mut stmt = conn.prepare(
200                "SELECT span_id, name, attributes, time FROM events WHERE span_id = ?1 ORDER BY time ASC",
201            )?;
202            let events = stmt
203                .query_map(params![span_id], |row| {
204                    let time: String = row.get(3)?;
205                    Ok(StoredEvent {
206                        span_id: row.get(0)?,
207                        name: row.get(1)?,
208                        attributes: serde_json::from_str::<Value>(&row.get::<_, String>(2)?)
209                            .unwrap_or(Value::Null),
210                        time: DateTime::parse_from_rfc3339(&time)
211                            .map(|dt| dt.with_timezone(&Utc))
212                            .unwrap_or_else(|_| Utc::now()),
213                    })
214                })?
215                .collect::<Result<Vec<_>, _>>()?;
216            Ok(events)
217        })
218        .await
219    }
220
221    fn init_schema(&self) -> Result<(), TraceError> {
222        let conn = Connection::open(&self.path)?;
223        conn.execute_batch(
224            "
225            CREATE TABLE IF NOT EXISTS spans (
226                id TEXT PRIMARY KEY,
227                parent_id TEXT,
228                name TEXT NOT NULL,
229                status TEXT,
230                error TEXT,
231                attributes TEXT,
232                start_time TEXT NOT NULL,
233                end_time TEXT
234            );
235            CREATE TABLE IF NOT EXISTS events (
236                id INTEGER PRIMARY KEY AUTOINCREMENT,
237                span_id TEXT NOT NULL,
238                name TEXT NOT NULL,
239                attributes TEXT,
240                time TEXT NOT NULL
241            );
242        ",
243        )?;
244        Ok(())
245    }
246
247    async fn run<R, F>(&self, f: F) -> Result<R, TraceError>
248    where
249        R: Send + 'static,
250        F: FnOnce(&Connection) -> Result<R, rusqlite::Error> + Send + 'static,
251    {
252        let path = self.path.clone();
253        task::spawn_blocking(move || {
254            let conn = Connection::open(path)?;
255            f(&conn)
256        })
257        .await?
258        .map_err(TraceError::from)
259    }
260}
261
262#[async_trait]
263impl Tracer for SqliteTracer {
264    async fn start_span(
265        &self,
266        name: &str,
267        attributes: Value,
268        parent: Option<String>,
269    ) -> Result<SpanContext, TraceError> {
270        let ctx = SpanContext::new(name, parent, attributes);
271        let ctx_clone = ctx.clone();
272        self.run(move |conn| {
273            conn.execute(
274                "INSERT INTO spans (id, parent_id, name, attributes, start_time) VALUES (?1, ?2, ?3, ?4, ?5)",
275                params![
276                    ctx_clone.id,
277                    ctx_clone.parent_id,
278                    ctx_clone.name,
279                    ctx_clone.attributes.to_string(),
280                    ctx_clone.start_time.to_rfc3339(),
281                ],
282            )?;
283            Ok(())
284        })
285        .await?;
286        Ok(ctx)
287    }
288
289    async fn end_span(&self, span: SpanContext, status: SpanStatus) -> Result<(), TraceError> {
290        let status_text = match &status {
291            SpanStatus::Ok => "ok".to_string(),
292            SpanStatus::Error(_) => "error".to_string(),
293        };
294        let error_text = match status {
295            SpanStatus::Ok => None,
296            SpanStatus::Error(msg) => Some(msg),
297        };
298        self.run(move |conn| {
299            conn.execute(
300                "UPDATE spans SET status = ?1, error = ?2, end_time = ?3 WHERE id = ?4",
301                params![status_text, error_text, Utc::now().to_rfc3339(), span.id,],
302            )?;
303            Ok(())
304        })
305        .await?;
306        Ok(())
307    }
308
309    async fn record_event(
310        &self,
311        span_id: &str,
312        name: &str,
313        attributes: Value,
314    ) -> Result<(), TraceError> {
315        let span_id = span_id.to_string();
316        let name = name.to_string();
317        self.run(move |conn| {
318            conn.execute(
319                "INSERT INTO events (span_id, name, attributes, time) VALUES (?1, ?2, ?3, ?4)",
320                params![
321                    span_id,
322                    name,
323                    attributes.to_string(),
324                    Utc::now().to_rfc3339()
325                ],
326            )?;
327            Ok(())
328        })
329        .await?;
330        Ok(())
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use serde_json::json;
338    use tempfile::NamedTempFile;
339
340    #[tokio::test]
341    async fn sqlite_tracer_records_spans_and_events() {
342        let file = NamedTempFile::new().unwrap();
343        let tracer = SqliteTracer::new(file.path()).unwrap();
344        let span = tracer
345            .start_span("test", json!({"key": "value"}), None)
346            .await
347            .unwrap();
348        tracer
349            .record_event(&span.id, "step", json!({"state": 1}))
350            .await
351            .unwrap();
352        tracer.end_span(span.clone(), SpanStatus::Ok).await.unwrap();
353        let spans = tracer.list_recent_spans(10).await.unwrap();
354        assert_eq!(spans.len(), 1);
355        assert_eq!(spans[0].name, "test");
356        assert_eq!(spans[0].status.as_deref(), Some("ok"));
357        let events = tracer.list_events(&span.id).await.unwrap();
358        assert_eq!(events.len(), 1);
359        assert_eq!(events[0].name, "step");
360    }
361}