forge_runtime/observability/
storage.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use sqlx::Row;
7use tokio::sync::RwLock;
8
9use forge_core::observability::{LogEntry, Metric, MetricKind, MetricValue, Span, SpanStatus};
10use forge_core::LogLevel;
11
12/// Maximum number of items to insert in a single batch.
13const BATCH_SIZE: usize = 1000;
14
15/// Metrics store for persisting metrics to PostgreSQL.
16pub struct MetricsStore {
17    pool: sqlx::PgPool,
18    /// Metrics waiting to be written (buffer for batching).
19    pending: Arc<RwLock<Vec<Metric>>>,
20}
21
22impl MetricsStore {
23    /// Create a new metrics store.
24    pub fn new(pool: sqlx::PgPool) -> Self {
25        Self {
26            pool,
27            pending: Arc::new(RwLock::new(Vec::new())),
28        }
29    }
30
31    /// Store a batch of metrics to the database.
32    pub async fn store(&self, metrics: Vec<Metric>) -> forge_core::Result<()> {
33        if metrics.is_empty() {
34            return Ok(());
35        }
36
37        // Process in batches to avoid hitting parameter limits
38        for chunk in metrics.chunks(BATCH_SIZE) {
39            self.insert_batch(chunk).await?;
40        }
41
42        Ok(())
43    }
44
45    /// Insert a batch of metrics using UNNEST.
46    async fn insert_batch(&self, metrics: &[Metric]) -> forge_core::Result<()> {
47        let names: Vec<&str> = metrics.iter().map(|m| m.name.as_str()).collect();
48        let kinds: Vec<String> = metrics.iter().map(|m| m.kind.to_string()).collect();
49        let values: Vec<f64> = metrics
50            .iter()
51            .map(|m| m.value.as_value().unwrap_or(0.0))
52            .collect();
53        let labels: Vec<serde_json::Value> = metrics
54            .iter()
55            .map(|m| serde_json::to_value(&m.labels).unwrap_or(serde_json::Value::Null))
56            .collect();
57        let timestamps: Vec<DateTime<Utc>> = metrics.iter().map(|m| m.timestamp).collect();
58
59        sqlx::query(
60            r#"
61            INSERT INTO forge_metrics (name, kind, value, labels, timestamp)
62            SELECT * FROM UNNEST($1::TEXT[], $2::TEXT[], $3::FLOAT8[], $4::JSONB[], $5::TIMESTAMPTZ[])
63            "#,
64        )
65        .bind(&names)
66        .bind(&kinds)
67        .bind(&values)
68        .bind(&labels)
69        .bind(&timestamps)
70        .execute(&self.pool)
71        .await
72        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
73
74        Ok(())
75    }
76
77    /// Query metrics by name and time range.
78    pub async fn query(
79        &self,
80        name: &str,
81        from: DateTime<Utc>,
82        to: DateTime<Utc>,
83    ) -> forge_core::Result<Vec<Metric>> {
84        let rows = sqlx::query(
85            r#"
86            SELECT name, kind, value, labels, timestamp
87            FROM forge_metrics
88            WHERE name = $1 AND timestamp >= $2 AND timestamp <= $3
89            ORDER BY timestamp DESC
90            LIMIT 1000
91            "#,
92        )
93        .bind(name)
94        .bind(from)
95        .bind(to)
96        .fetch_all(&self.pool)
97        .await
98        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
99
100        let metrics = rows
101            .into_iter()
102            .map(|row| {
103                let name: String = row.get("name");
104                let kind_str: String = row.get("kind");
105                let value: f64 = row.get("value");
106                let labels: serde_json::Value = row.get("labels");
107                let timestamp: DateTime<Utc> = row.get("timestamp");
108
109                let kind = match kind_str.as_str() {
110                    "counter" => MetricKind::Counter,
111                    "gauge" => MetricKind::Gauge,
112                    "histogram" => MetricKind::Histogram,
113                    "summary" => MetricKind::Summary,
114                    _ => MetricKind::Gauge,
115                };
116
117                let labels_map: HashMap<String, String> =
118                    serde_json::from_value(labels).unwrap_or_default();
119
120                Metric {
121                    name,
122                    kind,
123                    value: MetricValue::Value(value),
124                    labels: labels_map,
125                    timestamp,
126                    description: None,
127                }
128            })
129            .collect();
130
131        Ok(metrics)
132    }
133
134    /// Get latest value for each unique metric name.
135    pub async fn list_latest(&self) -> forge_core::Result<Vec<Metric>> {
136        let rows = sqlx::query(
137            r#"
138            SELECT DISTINCT ON (name) name, kind, value, labels, timestamp
139            FROM forge_metrics
140            ORDER BY name, timestamp DESC
141            "#,
142        )
143        .fetch_all(&self.pool)
144        .await
145        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
146
147        let metrics = rows
148            .into_iter()
149            .map(|row| {
150                let name: String = row.get("name");
151                let kind_str: String = row.get("kind");
152                let value: f64 = row.get("value");
153                let labels: serde_json::Value = row.get("labels");
154                let timestamp: DateTime<Utc> = row.get("timestamp");
155
156                let kind = match kind_str.as_str() {
157                    "counter" => MetricKind::Counter,
158                    "gauge" => MetricKind::Gauge,
159                    "histogram" => MetricKind::Histogram,
160                    "summary" => MetricKind::Summary,
161                    _ => MetricKind::Gauge,
162                };
163
164                Metric {
165                    name,
166                    kind,
167                    value: MetricValue::Value(value),
168                    labels: serde_json::from_value(labels).unwrap_or_default(),
169                    timestamp,
170                    description: None,
171                }
172            })
173            .collect();
174
175        Ok(metrics)
176    }
177
178    /// Get pending count (items buffered but not yet flushed).
179    pub async fn pending_count(&self) -> usize {
180        self.pending.read().await.len()
181    }
182
183    /// Run cleanup to remove old metrics.
184    pub async fn cleanup(&self, retention: Duration) -> forge_core::Result<u64> {
185        let cutoff = Utc::now() - chrono::Duration::from_std(retention).unwrap();
186
187        let result = sqlx::query("DELETE FROM forge_metrics WHERE timestamp < $1")
188            .bind(cutoff)
189            .execute(&self.pool)
190            .await
191            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
192
193        Ok(result.rows_affected())
194    }
195}
196
197/// Log store for persisting logs to PostgreSQL.
198pub struct LogStore {
199    pool: sqlx::PgPool,
200    /// Logs waiting to be written.
201    pending: Arc<RwLock<Vec<LogEntry>>>,
202}
203
204impl LogStore {
205    /// Create a new log store.
206    pub fn new(pool: sqlx::PgPool) -> Self {
207        Self {
208            pool,
209            pending: Arc::new(RwLock::new(Vec::new())),
210        }
211    }
212
213    /// Store a batch of logs to the database.
214    pub async fn store(&self, logs: Vec<LogEntry>) -> forge_core::Result<()> {
215        if logs.is_empty() {
216            return Ok(());
217        }
218
219        for chunk in logs.chunks(BATCH_SIZE) {
220            self.insert_batch(chunk).await?;
221        }
222
223        Ok(())
224    }
225
226    /// Insert a batch of logs using UNNEST.
227    async fn insert_batch(&self, logs: &[LogEntry]) -> forge_core::Result<()> {
228        let levels: Vec<String> = logs.iter().map(|l| l.level.to_string()).collect();
229        let messages: Vec<&str> = logs.iter().map(|l| l.message.as_str()).collect();
230        let targets: Vec<Option<&str>> = logs.iter().map(|l| l.target.as_deref()).collect();
231        let fields: Vec<serde_json::Value> = logs
232            .iter()
233            .map(|l| serde_json::to_value(&l.fields).unwrap_or(serde_json::Value::Null))
234            .collect();
235        let trace_ids: Vec<Option<String>> = logs.iter().map(|l| l.trace_id.clone()).collect();
236        let span_ids: Vec<Option<String>> = logs.iter().map(|l| l.span_id.clone()).collect();
237        let timestamps: Vec<DateTime<Utc>> = logs.iter().map(|l| l.timestamp).collect();
238
239        sqlx::query(
240            r#"
241            INSERT INTO forge_logs (level, message, target, fields, trace_id, span_id, timestamp)
242            SELECT * FROM UNNEST($1::TEXT[], $2::TEXT[], $3::TEXT[], $4::JSONB[], $5::TEXT[], $6::TEXT[], $7::TIMESTAMPTZ[])
243            "#,
244        )
245        .bind(&levels)
246        .bind(&messages)
247        .bind(&targets)
248        .bind(&fields)
249        .bind(&trace_ids)
250        .bind(&span_ids)
251        .bind(&timestamps)
252        .execute(&self.pool)
253        .await
254        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
255
256        Ok(())
257    }
258
259    /// Query logs with filters.
260    pub async fn query(
261        &self,
262        level: Option<LogLevel>,
263        from: Option<DateTime<Utc>>,
264        to: Option<DateTime<Utc>>,
265        limit: usize,
266    ) -> forge_core::Result<Vec<LogEntry>> {
267        let level_filter = level.map(|l| l.to_string());
268
269        let rows = sqlx::query(
270            r#"
271            SELECT id, level, message, target, fields, trace_id, span_id, timestamp
272            FROM forge_logs
273            WHERE ($1::TEXT IS NULL OR level = $1)
274              AND ($2::TIMESTAMPTZ IS NULL OR timestamp >= $2)
275              AND ($3::TIMESTAMPTZ IS NULL OR timestamp <= $3)
276            ORDER BY timestamp DESC
277            LIMIT $4
278            "#,
279        )
280        .bind(&level_filter)
281        .bind(from)
282        .bind(to)
283        .bind(limit as i64)
284        .fetch_all(&self.pool)
285        .await
286        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
287
288        let logs = rows
289            .into_iter()
290            .map(|row| {
291                let level_str: String = row.get("level");
292                let message: String = row.get("message");
293                let target: Option<String> = row.get("target");
294                let fields: serde_json::Value = row.get("fields");
295                let timestamp: DateTime<Utc> = row.get("timestamp");
296
297                let level = match level_str.to_lowercase().as_str() {
298                    "trace" => LogLevel::Trace,
299                    "debug" => LogLevel::Debug,
300                    "info" => LogLevel::Info,
301                    "warn" => LogLevel::Warn,
302                    "error" => LogLevel::Error,
303                    _ => LogLevel::Info,
304                };
305
306                LogEntry {
307                    level,
308                    message,
309                    target,
310                    fields: serde_json::from_value(fields).unwrap_or_default(),
311                    trace_id: None,
312                    span_id: None,
313                    timestamp,
314                    node_id: None,
315                }
316            })
317            .collect();
318
319        Ok(logs)
320    }
321
322    /// Search logs by message content.
323    pub async fn search(&self, query: &str, limit: usize) -> forge_core::Result<Vec<LogEntry>> {
324        let search_pattern = format!("%{}%", query);
325
326        let rows = sqlx::query(
327            r#"
328            SELECT id, level, message, target, fields, trace_id, span_id, timestamp
329            FROM forge_logs
330            WHERE message ILIKE $1
331            ORDER BY timestamp DESC
332            LIMIT $2
333            "#,
334        )
335        .bind(&search_pattern)
336        .bind(limit as i64)
337        .fetch_all(&self.pool)
338        .await
339        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
340
341        let logs = rows
342            .into_iter()
343            .map(|row| {
344                let level_str: String = row.get("level");
345                let message: String = row.get("message");
346                let target: Option<String> = row.get("target");
347                let fields: serde_json::Value = row.get("fields");
348                let timestamp: DateTime<Utc> = row.get("timestamp");
349
350                LogEntry {
351                    level: level_str.parse().unwrap_or_default(),
352                    message,
353                    target,
354                    fields: serde_json::from_value(fields).unwrap_or_default(),
355                    trace_id: None,
356                    span_id: None,
357                    timestamp,
358                    node_id: None,
359                }
360            })
361            .collect();
362
363        Ok(logs)
364    }
365
366    /// Get pending count.
367    pub async fn pending_count(&self) -> usize {
368        self.pending.read().await.len()
369    }
370
371    /// Run cleanup to remove old logs.
372    pub async fn cleanup(&self, retention: Duration) -> forge_core::Result<u64> {
373        let cutoff = Utc::now() - chrono::Duration::from_std(retention).unwrap();
374
375        let result = sqlx::query("DELETE FROM forge_logs WHERE timestamp < $1")
376            .bind(cutoff)
377            .execute(&self.pool)
378            .await
379            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
380
381        Ok(result.rows_affected())
382    }
383}
384
385/// Trace store for persisting traces to PostgreSQL.
386pub struct TraceStore {
387    pool: sqlx::PgPool,
388    /// Traces indexed by trace ID (for in-flight spans).
389    traces: Arc<RwLock<HashMap<String, Vec<Span>>>>,
390}
391
392impl TraceStore {
393    /// Create a new trace store.
394    pub fn new(pool: sqlx::PgPool) -> Self {
395        Self {
396            pool,
397            traces: Arc::new(RwLock::new(HashMap::new())),
398        }
399    }
400
401    /// Store a batch of spans to the database.
402    pub async fn store(&self, spans: Vec<Span>) -> forge_core::Result<()> {
403        if spans.is_empty() {
404            return Ok(());
405        }
406
407        for chunk in spans.chunks(BATCH_SIZE) {
408            self.insert_batch(chunk).await?;
409        }
410
411        Ok(())
412    }
413
414    /// Insert a batch of spans using UNNEST.
415    async fn insert_batch(&self, spans: &[Span]) -> forge_core::Result<()> {
416        let ids: Vec<uuid::Uuid> = spans.iter().map(|_| uuid::Uuid::new_v4()).collect();
417        let trace_ids: Vec<String> = spans
418            .iter()
419            .map(|s| s.context.trace_id.to_string())
420            .collect();
421        let span_ids: Vec<String> = spans
422            .iter()
423            .map(|s| s.context.span_id.to_string())
424            .collect();
425        let parent_ids: Vec<Option<String>> = spans
426            .iter()
427            .map(|s| s.context.parent_span_id.as_ref().map(|id| id.to_string()))
428            .collect();
429        let names: Vec<&str> = spans.iter().map(|s| s.name.as_str()).collect();
430        let kinds: Vec<String> = spans.iter().map(|s| s.kind.to_string()).collect();
431        let statuses: Vec<String> = spans.iter().map(|s| s.status.to_string()).collect();
432        let attributes: Vec<serde_json::Value> = spans
433            .iter()
434            .map(|s| serde_json::to_value(&s.attributes).unwrap_or(serde_json::Value::Null))
435            .collect();
436        let events: Vec<serde_json::Value> = spans
437            .iter()
438            .map(|s| serde_json::to_value(&s.events).unwrap_or(serde_json::Value::Null))
439            .collect();
440        let start_times: Vec<DateTime<Utc>> = spans.iter().map(|s| s.start_time).collect();
441        let end_times: Vec<Option<DateTime<Utc>>> = spans.iter().map(|s| s.end_time).collect();
442        let durations: Vec<Option<i32>> = spans
443            .iter()
444            .map(|s| s.duration_ms().map(|d| d as i32))
445            .collect();
446
447        sqlx::query(
448            r#"
449            INSERT INTO forge_traces (
450                id, trace_id, span_id, parent_span_id, name, kind, status,
451                attributes, events, started_at, ended_at, duration_ms
452            )
453            SELECT * FROM UNNEST(
454                $1::UUID[], $2::TEXT[], $3::TEXT[], $4::TEXT[], $5::TEXT[], $6::TEXT[], $7::TEXT[],
455                $8::JSONB[], $9::JSONB[], $10::TIMESTAMPTZ[], $11::TIMESTAMPTZ[], $12::INT[]
456            )
457            "#,
458        )
459        .bind(&ids)
460        .bind(&trace_ids)
461        .bind(&span_ids)
462        .bind(&parent_ids)
463        .bind(&names)
464        .bind(&kinds)
465        .bind(&statuses)
466        .bind(&attributes)
467        .bind(&events)
468        .bind(&start_times)
469        .bind(&end_times)
470        .bind(&durations)
471        .execute(&self.pool)
472        .await
473        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
474
475        Ok(())
476    }
477
478    /// Get a trace by ID.
479    pub async fn get_trace(&self, trace_id: &str) -> forge_core::Result<Vec<Span>> {
480        let rows = sqlx::query(
481            r#"
482            SELECT trace_id, span_id, parent_span_id, name, kind, status,
483                   attributes, events, started_at, ended_at, duration_ms
484            FROM forge_traces
485            WHERE trace_id = $1
486            ORDER BY started_at ASC
487            "#,
488        )
489        .bind(trace_id)
490        .fetch_all(&self.pool)
491        .await
492        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
493
494        let spans = rows
495            .into_iter()
496            .map(|row| {
497                let name: String = row.get("name");
498                let _kind_str: String = row.get("kind");
499                let status_str: String = row.get("status");
500                let start_time: DateTime<Utc> = row.get("started_at");
501                let end_time: Option<DateTime<Utc>> = row.get("ended_at");
502
503                let mut span = Span::new(&name);
504                span.start_time = start_time;
505                span.end_time = end_time;
506                span.status = match status_str.as_str() {
507                    "ok" => SpanStatus::Ok,
508                    "error" => SpanStatus::Error,
509                    _ => SpanStatus::Unset,
510                };
511                span.attributes = row
512                    .get::<serde_json::Value, _>("attributes")
513                    .as_object()
514                    .cloned()
515                    .map(|m| m.into_iter().collect())
516                    .unwrap_or_default();
517
518                span
519            })
520            .collect();
521
522        Ok(spans)
523    }
524
525    /// Query traces by time range.
526    pub async fn query(
527        &self,
528        from: DateTime<Utc>,
529        to: DateTime<Utc>,
530        limit: usize,
531    ) -> forge_core::Result<Vec<String>> {
532        let rows = sqlx::query(
533            r#"
534            SELECT DISTINCT trace_id
535            FROM forge_traces
536            WHERE started_at >= $1 AND started_at <= $2
537            ORDER BY trace_id
538            LIMIT $3
539            "#,
540        )
541        .bind(from)
542        .bind(to)
543        .bind(limit as i64)
544        .fetch_all(&self.pool)
545        .await
546        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
547
548        Ok(rows.into_iter().map(|r| r.get("trace_id")).collect())
549    }
550
551    /// List recent traces with summary info.
552    pub async fn list_recent(&self, limit: usize) -> forge_core::Result<Vec<TraceSummary>> {
553        let rows = sqlx::query(
554            r#"
555            WITH trace_stats AS (
556                SELECT
557                    trace_id,
558                    MIN(started_at) as started_at,
559                    MAX(duration_ms) as duration_ms,
560                    COUNT(*) as span_count,
561                    BOOL_OR(status = 'error') as has_error,
562                    (array_agg(name ORDER BY started_at ASC))[1] as root_span_name
563                FROM forge_traces
564                GROUP BY trace_id
565                ORDER BY started_at DESC
566                LIMIT $1
567            )
568            SELECT * FROM trace_stats
569            "#,
570        )
571        .bind(limit as i64)
572        .fetch_all(&self.pool)
573        .await
574        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
575
576        let summaries = rows
577            .into_iter()
578            .map(|row| TraceSummary {
579                trace_id: row.get("trace_id"),
580                root_span_name: row.get("root_span_name"),
581                started_at: row.get("started_at"),
582                duration_ms: row.get::<Option<i32>, _>("duration_ms").map(|d| d as u64),
583                span_count: row.get::<i64, _>("span_count") as u32,
584                has_error: row.get("has_error"),
585            })
586            .collect();
587
588        Ok(summaries)
589    }
590
591    /// Find traces with errors.
592    pub async fn find_errors(&self, limit: usize) -> forge_core::Result<Vec<String>> {
593        let rows = sqlx::query(
594            r#"
595            SELECT DISTINCT trace_id
596            FROM forge_traces
597            WHERE status = 'error'
598            ORDER BY trace_id
599            LIMIT $1
600            "#,
601        )
602        .bind(limit as i64)
603        .fetch_all(&self.pool)
604        .await
605        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
606
607        Ok(rows.into_iter().map(|r| r.get("trace_id")).collect())
608    }
609
610    /// Get trace count.
611    pub async fn trace_count(&self) -> usize {
612        self.traces.read().await.len()
613    }
614
615    /// Get total span count.
616    pub async fn span_count(&self) -> usize {
617        self.traces.read().await.values().map(|v| v.len()).sum()
618    }
619
620    /// Run cleanup to remove old traces.
621    pub async fn cleanup(&self, retention: Duration) -> forge_core::Result<u64> {
622        let cutoff = Utc::now() - chrono::Duration::from_std(retention).unwrap();
623
624        let result = sqlx::query("DELETE FROM forge_traces WHERE started_at < $1")
625            .bind(cutoff)
626            .execute(&self.pool)
627            .await
628            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
629
630        Ok(result.rows_affected())
631    }
632}
633
634/// Summary of a trace for listing.
635#[derive(Debug, Clone)]
636pub struct TraceSummary {
637    pub trace_id: String,
638    pub root_span_name: String,
639    pub started_at: DateTime<Utc>,
640    pub duration_ms: Option<u64>,
641    pub span_count: u32,
642    pub has_error: bool,
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648
649    #[tokio::test]
650    async fn test_metrics_store_basic() {
651        // Test with lazy pool (doesn't connect)
652        let pool = sqlx::PgPool::connect_lazy("postgres://localhost/test").unwrap();
653        let store = MetricsStore::new(pool);
654
655        // pending_count should work even without real connection
656        assert_eq!(store.pending_count().await, 0);
657    }
658
659    #[tokio::test]
660    async fn test_log_store_basic() {
661        let pool = sqlx::PgPool::connect_lazy("postgres://localhost/test").unwrap();
662        let store = LogStore::new(pool);
663
664        assert_eq!(store.pending_count().await, 0);
665    }
666
667    #[tokio::test]
668    async fn test_trace_store_basic() {
669        let pool = sqlx::PgPool::connect_lazy("postgres://localhost/test").unwrap();
670        let store = TraceStore::new(pool);
671
672        assert_eq!(store.trace_count().await, 0);
673        assert_eq!(store.span_count().await, 0);
674    }
675}