1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use sqlx::Row;
7use tokio::sync::RwLock;
8
9use forge_core::observability::{LogEntry, Metric, MetricKind, MetricValue, Span, SpanStatus};
10use forge_core::LogLevel;
11
12const BATCH_SIZE: usize = 1000;
14
15pub struct MetricsStore {
17 pool: sqlx::PgPool,
18 pending: Arc<RwLock<Vec<Metric>>>,
20}
21
22impl MetricsStore {
23 pub fn new(pool: sqlx::PgPool) -> Self {
25 Self {
26 pool,
27 pending: Arc::new(RwLock::new(Vec::new())),
28 }
29 }
30
31 pub async fn store(&self, metrics: Vec<Metric>) -> forge_core::Result<()> {
33 if metrics.is_empty() {
34 return Ok(());
35 }
36
37 for chunk in metrics.chunks(BATCH_SIZE) {
39 self.insert_batch(chunk).await?;
40 }
41
42 Ok(())
43 }
44
45 async fn insert_batch(&self, metrics: &[Metric]) -> forge_core::Result<()> {
47 let names: Vec<&str> = metrics.iter().map(|m| m.name.as_str()).collect();
48 let kinds: Vec<String> = metrics.iter().map(|m| m.kind.to_string()).collect();
49 let values: Vec<f64> = metrics
50 .iter()
51 .map(|m| m.value.as_value().unwrap_or(0.0))
52 .collect();
53 let labels: Vec<serde_json::Value> = metrics
54 .iter()
55 .map(|m| serde_json::to_value(&m.labels).unwrap_or(serde_json::Value::Null))
56 .collect();
57 let timestamps: Vec<DateTime<Utc>> = metrics.iter().map(|m| m.timestamp).collect();
58
59 sqlx::query(
60 r#"
61 INSERT INTO forge_metrics (name, kind, value, labels, timestamp)
62 SELECT * FROM UNNEST($1::TEXT[], $2::TEXT[], $3::FLOAT8[], $4::JSONB[], $5::TIMESTAMPTZ[])
63 "#,
64 )
65 .bind(&names)
66 .bind(&kinds)
67 .bind(&values)
68 .bind(&labels)
69 .bind(×tamps)
70 .execute(&self.pool)
71 .await
72 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
73
74 Ok(())
75 }
76
77 pub async fn query(
79 &self,
80 name: &str,
81 from: DateTime<Utc>,
82 to: DateTime<Utc>,
83 ) -> forge_core::Result<Vec<Metric>> {
84 let rows = sqlx::query(
85 r#"
86 SELECT name, kind, value, labels, timestamp
87 FROM forge_metrics
88 WHERE name = $1 AND timestamp >= $2 AND timestamp <= $3
89 ORDER BY timestamp DESC
90 LIMIT 1000
91 "#,
92 )
93 .bind(name)
94 .bind(from)
95 .bind(to)
96 .fetch_all(&self.pool)
97 .await
98 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
99
100 let metrics = rows
101 .into_iter()
102 .map(|row| {
103 let name: String = row.get("name");
104 let kind_str: String = row.get("kind");
105 let value: f64 = row.get("value");
106 let labels: serde_json::Value = row.get("labels");
107 let timestamp: DateTime<Utc> = row.get("timestamp");
108
109 let kind = match kind_str.as_str() {
110 "counter" => MetricKind::Counter,
111 "gauge" => MetricKind::Gauge,
112 "histogram" => MetricKind::Histogram,
113 "summary" => MetricKind::Summary,
114 _ => MetricKind::Gauge,
115 };
116
117 let labels_map: HashMap<String, String> =
118 serde_json::from_value(labels).unwrap_or_default();
119
120 Metric {
121 name,
122 kind,
123 value: MetricValue::Value(value),
124 labels: labels_map,
125 timestamp,
126 description: None,
127 }
128 })
129 .collect();
130
131 Ok(metrics)
132 }
133
134 pub async fn list_latest(&self) -> forge_core::Result<Vec<Metric>> {
136 let rows = sqlx::query(
137 r#"
138 SELECT DISTINCT ON (name) name, kind, value, labels, timestamp
139 FROM forge_metrics
140 ORDER BY name, timestamp DESC
141 "#,
142 )
143 .fetch_all(&self.pool)
144 .await
145 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
146
147 let metrics = rows
148 .into_iter()
149 .map(|row| {
150 let name: String = row.get("name");
151 let kind_str: String = row.get("kind");
152 let value: f64 = row.get("value");
153 let labels: serde_json::Value = row.get("labels");
154 let timestamp: DateTime<Utc> = row.get("timestamp");
155
156 let kind = match kind_str.as_str() {
157 "counter" => MetricKind::Counter,
158 "gauge" => MetricKind::Gauge,
159 "histogram" => MetricKind::Histogram,
160 "summary" => MetricKind::Summary,
161 _ => MetricKind::Gauge,
162 };
163
164 Metric {
165 name,
166 kind,
167 value: MetricValue::Value(value),
168 labels: serde_json::from_value(labels).unwrap_or_default(),
169 timestamp,
170 description: None,
171 }
172 })
173 .collect();
174
175 Ok(metrics)
176 }
177
178 pub async fn pending_count(&self) -> usize {
180 self.pending.read().await.len()
181 }
182
183 pub async fn cleanup(&self, retention: Duration) -> forge_core::Result<u64> {
185 let cutoff = Utc::now() - chrono::Duration::from_std(retention).unwrap();
186
187 let result = sqlx::query("DELETE FROM forge_metrics WHERE timestamp < $1")
188 .bind(cutoff)
189 .execute(&self.pool)
190 .await
191 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
192
193 Ok(result.rows_affected())
194 }
195}
196
197pub struct LogStore {
199 pool: sqlx::PgPool,
200 pending: Arc<RwLock<Vec<LogEntry>>>,
202}
203
204impl LogStore {
205 pub fn new(pool: sqlx::PgPool) -> Self {
207 Self {
208 pool,
209 pending: Arc::new(RwLock::new(Vec::new())),
210 }
211 }
212
213 pub async fn store(&self, logs: Vec<LogEntry>) -> forge_core::Result<()> {
215 if logs.is_empty() {
216 return Ok(());
217 }
218
219 for chunk in logs.chunks(BATCH_SIZE) {
220 self.insert_batch(chunk).await?;
221 }
222
223 Ok(())
224 }
225
226 async fn insert_batch(&self, logs: &[LogEntry]) -> forge_core::Result<()> {
228 let levels: Vec<String> = logs.iter().map(|l| l.level.to_string()).collect();
229 let messages: Vec<&str> = logs.iter().map(|l| l.message.as_str()).collect();
230 let targets: Vec<Option<&str>> = logs.iter().map(|l| l.target.as_deref()).collect();
231 let fields: Vec<serde_json::Value> = logs
232 .iter()
233 .map(|l| serde_json::to_value(&l.fields).unwrap_or(serde_json::Value::Null))
234 .collect();
235 let trace_ids: Vec<Option<String>> = logs.iter().map(|l| l.trace_id.clone()).collect();
236 let span_ids: Vec<Option<String>> = logs.iter().map(|l| l.span_id.clone()).collect();
237 let timestamps: Vec<DateTime<Utc>> = logs.iter().map(|l| l.timestamp).collect();
238
239 sqlx::query(
240 r#"
241 INSERT INTO forge_logs (level, message, target, fields, trace_id, span_id, timestamp)
242 SELECT * FROM UNNEST($1::TEXT[], $2::TEXT[], $3::TEXT[], $4::JSONB[], $5::TEXT[], $6::TEXT[], $7::TIMESTAMPTZ[])
243 "#,
244 )
245 .bind(&levels)
246 .bind(&messages)
247 .bind(&targets)
248 .bind(&fields)
249 .bind(&trace_ids)
250 .bind(&span_ids)
251 .bind(×tamps)
252 .execute(&self.pool)
253 .await
254 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
255
256 Ok(())
257 }
258
259 pub async fn query(
261 &self,
262 level: Option<LogLevel>,
263 from: Option<DateTime<Utc>>,
264 to: Option<DateTime<Utc>>,
265 limit: usize,
266 ) -> forge_core::Result<Vec<LogEntry>> {
267 let level_filter = level.map(|l| l.to_string());
268
269 let rows = sqlx::query(
270 r#"
271 SELECT id, level, message, target, fields, trace_id, span_id, timestamp
272 FROM forge_logs
273 WHERE ($1::TEXT IS NULL OR level = $1)
274 AND ($2::TIMESTAMPTZ IS NULL OR timestamp >= $2)
275 AND ($3::TIMESTAMPTZ IS NULL OR timestamp <= $3)
276 ORDER BY timestamp DESC
277 LIMIT $4
278 "#,
279 )
280 .bind(&level_filter)
281 .bind(from)
282 .bind(to)
283 .bind(limit as i64)
284 .fetch_all(&self.pool)
285 .await
286 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
287
288 let logs = rows
289 .into_iter()
290 .map(|row| {
291 let level_str: String = row.get("level");
292 let message: String = row.get("message");
293 let target: Option<String> = row.get("target");
294 let fields: serde_json::Value = row.get("fields");
295 let timestamp: DateTime<Utc> = row.get("timestamp");
296
297 let level = match level_str.to_lowercase().as_str() {
298 "trace" => LogLevel::Trace,
299 "debug" => LogLevel::Debug,
300 "info" => LogLevel::Info,
301 "warn" => LogLevel::Warn,
302 "error" => LogLevel::Error,
303 _ => LogLevel::Info,
304 };
305
306 LogEntry {
307 level,
308 message,
309 target,
310 fields: serde_json::from_value(fields).unwrap_or_default(),
311 trace_id: None,
312 span_id: None,
313 timestamp,
314 node_id: None,
315 }
316 })
317 .collect();
318
319 Ok(logs)
320 }
321
322 pub async fn search(&self, query: &str, limit: usize) -> forge_core::Result<Vec<LogEntry>> {
324 let search_pattern = format!("%{}%", query);
325
326 let rows = sqlx::query(
327 r#"
328 SELECT id, level, message, target, fields, trace_id, span_id, timestamp
329 FROM forge_logs
330 WHERE message ILIKE $1
331 ORDER BY timestamp DESC
332 LIMIT $2
333 "#,
334 )
335 .bind(&search_pattern)
336 .bind(limit as i64)
337 .fetch_all(&self.pool)
338 .await
339 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
340
341 let logs = rows
342 .into_iter()
343 .map(|row| {
344 let level_str: String = row.get("level");
345 let message: String = row.get("message");
346 let target: Option<String> = row.get("target");
347 let fields: serde_json::Value = row.get("fields");
348 let timestamp: DateTime<Utc> = row.get("timestamp");
349
350 LogEntry {
351 level: level_str.parse().unwrap_or_default(),
352 message,
353 target,
354 fields: serde_json::from_value(fields).unwrap_or_default(),
355 trace_id: None,
356 span_id: None,
357 timestamp,
358 node_id: None,
359 }
360 })
361 .collect();
362
363 Ok(logs)
364 }
365
366 pub async fn pending_count(&self) -> usize {
368 self.pending.read().await.len()
369 }
370
371 pub async fn cleanup(&self, retention: Duration) -> forge_core::Result<u64> {
373 let cutoff = Utc::now() - chrono::Duration::from_std(retention).unwrap();
374
375 let result = sqlx::query("DELETE FROM forge_logs WHERE timestamp < $1")
376 .bind(cutoff)
377 .execute(&self.pool)
378 .await
379 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
380
381 Ok(result.rows_affected())
382 }
383}
384
385pub struct TraceStore {
387 pool: sqlx::PgPool,
388 traces: Arc<RwLock<HashMap<String, Vec<Span>>>>,
390}
391
392impl TraceStore {
393 pub fn new(pool: sqlx::PgPool) -> Self {
395 Self {
396 pool,
397 traces: Arc::new(RwLock::new(HashMap::new())),
398 }
399 }
400
401 pub async fn store(&self, spans: Vec<Span>) -> forge_core::Result<()> {
403 if spans.is_empty() {
404 return Ok(());
405 }
406
407 for chunk in spans.chunks(BATCH_SIZE) {
408 self.insert_batch(chunk).await?;
409 }
410
411 Ok(())
412 }
413
414 async fn insert_batch(&self, spans: &[Span]) -> forge_core::Result<()> {
416 let ids: Vec<uuid::Uuid> = spans.iter().map(|_| uuid::Uuid::new_v4()).collect();
417 let trace_ids: Vec<String> = spans
418 .iter()
419 .map(|s| s.context.trace_id.to_string())
420 .collect();
421 let span_ids: Vec<String> = spans
422 .iter()
423 .map(|s| s.context.span_id.to_string())
424 .collect();
425 let parent_ids: Vec<Option<String>> = spans
426 .iter()
427 .map(|s| s.context.parent_span_id.as_ref().map(|id| id.to_string()))
428 .collect();
429 let names: Vec<&str> = spans.iter().map(|s| s.name.as_str()).collect();
430 let kinds: Vec<String> = spans.iter().map(|s| s.kind.to_string()).collect();
431 let statuses: Vec<String> = spans.iter().map(|s| s.status.to_string()).collect();
432 let attributes: Vec<serde_json::Value> = spans
433 .iter()
434 .map(|s| serde_json::to_value(&s.attributes).unwrap_or(serde_json::Value::Null))
435 .collect();
436 let events: Vec<serde_json::Value> = spans
437 .iter()
438 .map(|s| serde_json::to_value(&s.events).unwrap_or(serde_json::Value::Null))
439 .collect();
440 let start_times: Vec<DateTime<Utc>> = spans.iter().map(|s| s.start_time).collect();
441 let end_times: Vec<Option<DateTime<Utc>>> = spans.iter().map(|s| s.end_time).collect();
442 let durations: Vec<Option<i32>> = spans
443 .iter()
444 .map(|s| s.duration_ms().map(|d| d as i32))
445 .collect();
446
447 sqlx::query(
448 r#"
449 INSERT INTO forge_traces (
450 id, trace_id, span_id, parent_span_id, name, kind, status,
451 attributes, events, started_at, ended_at, duration_ms
452 )
453 SELECT * FROM UNNEST(
454 $1::UUID[], $2::TEXT[], $3::TEXT[], $4::TEXT[], $5::TEXT[], $6::TEXT[], $7::TEXT[],
455 $8::JSONB[], $9::JSONB[], $10::TIMESTAMPTZ[], $11::TIMESTAMPTZ[], $12::INT[]
456 )
457 "#,
458 )
459 .bind(&ids)
460 .bind(&trace_ids)
461 .bind(&span_ids)
462 .bind(&parent_ids)
463 .bind(&names)
464 .bind(&kinds)
465 .bind(&statuses)
466 .bind(&attributes)
467 .bind(&events)
468 .bind(&start_times)
469 .bind(&end_times)
470 .bind(&durations)
471 .execute(&self.pool)
472 .await
473 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
474
475 Ok(())
476 }
477
478 pub async fn get_trace(&self, trace_id: &str) -> forge_core::Result<Vec<Span>> {
480 let rows = sqlx::query(
481 r#"
482 SELECT trace_id, span_id, parent_span_id, name, kind, status,
483 attributes, events, started_at, ended_at, duration_ms
484 FROM forge_traces
485 WHERE trace_id = $1
486 ORDER BY started_at ASC
487 "#,
488 )
489 .bind(trace_id)
490 .fetch_all(&self.pool)
491 .await
492 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
493
494 let spans = rows
495 .into_iter()
496 .map(|row| {
497 let name: String = row.get("name");
498 let _kind_str: String = row.get("kind");
499 let status_str: String = row.get("status");
500 let start_time: DateTime<Utc> = row.get("started_at");
501 let end_time: Option<DateTime<Utc>> = row.get("ended_at");
502
503 let mut span = Span::new(&name);
504 span.start_time = start_time;
505 span.end_time = end_time;
506 span.status = match status_str.as_str() {
507 "ok" => SpanStatus::Ok,
508 "error" => SpanStatus::Error,
509 _ => SpanStatus::Unset,
510 };
511 span.attributes = row
512 .get::<serde_json::Value, _>("attributes")
513 .as_object()
514 .cloned()
515 .map(|m| m.into_iter().collect())
516 .unwrap_or_default();
517
518 span
519 })
520 .collect();
521
522 Ok(spans)
523 }
524
525 pub async fn query(
527 &self,
528 from: DateTime<Utc>,
529 to: DateTime<Utc>,
530 limit: usize,
531 ) -> forge_core::Result<Vec<String>> {
532 let rows = sqlx::query(
533 r#"
534 SELECT DISTINCT trace_id
535 FROM forge_traces
536 WHERE started_at >= $1 AND started_at <= $2
537 ORDER BY trace_id
538 LIMIT $3
539 "#,
540 )
541 .bind(from)
542 .bind(to)
543 .bind(limit as i64)
544 .fetch_all(&self.pool)
545 .await
546 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
547
548 Ok(rows.into_iter().map(|r| r.get("trace_id")).collect())
549 }
550
551 pub async fn list_recent(&self, limit: usize) -> forge_core::Result<Vec<TraceSummary>> {
553 let rows = sqlx::query(
554 r#"
555 WITH trace_stats AS (
556 SELECT
557 trace_id,
558 MIN(started_at) as started_at,
559 MAX(duration_ms) as duration_ms,
560 COUNT(*) as span_count,
561 BOOL_OR(status = 'error') as has_error,
562 (array_agg(name ORDER BY started_at ASC))[1] as root_span_name
563 FROM forge_traces
564 GROUP BY trace_id
565 ORDER BY started_at DESC
566 LIMIT $1
567 )
568 SELECT * FROM trace_stats
569 "#,
570 )
571 .bind(limit as i64)
572 .fetch_all(&self.pool)
573 .await
574 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
575
576 let summaries = rows
577 .into_iter()
578 .map(|row| TraceSummary {
579 trace_id: row.get("trace_id"),
580 root_span_name: row.get("root_span_name"),
581 started_at: row.get("started_at"),
582 duration_ms: row.get::<Option<i32>, _>("duration_ms").map(|d| d as u64),
583 span_count: row.get::<i64, _>("span_count") as u32,
584 has_error: row.get("has_error"),
585 })
586 .collect();
587
588 Ok(summaries)
589 }
590
591 pub async fn find_errors(&self, limit: usize) -> forge_core::Result<Vec<String>> {
593 let rows = sqlx::query(
594 r#"
595 SELECT DISTINCT trace_id
596 FROM forge_traces
597 WHERE status = 'error'
598 ORDER BY trace_id
599 LIMIT $1
600 "#,
601 )
602 .bind(limit as i64)
603 .fetch_all(&self.pool)
604 .await
605 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
606
607 Ok(rows.into_iter().map(|r| r.get("trace_id")).collect())
608 }
609
610 pub async fn trace_count(&self) -> usize {
612 self.traces.read().await.len()
613 }
614
615 pub async fn span_count(&self) -> usize {
617 self.traces.read().await.values().map(|v| v.len()).sum()
618 }
619
620 pub async fn cleanup(&self, retention: Duration) -> forge_core::Result<u64> {
622 let cutoff = Utc::now() - chrono::Duration::from_std(retention).unwrap();
623
624 let result = sqlx::query("DELETE FROM forge_traces WHERE started_at < $1")
625 .bind(cutoff)
626 .execute(&self.pool)
627 .await
628 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
629
630 Ok(result.rows_affected())
631 }
632}
633
634#[derive(Debug, Clone)]
636pub struct TraceSummary {
637 pub trace_id: String,
638 pub root_span_name: String,
639 pub started_at: DateTime<Utc>,
640 pub duration_ms: Option<u64>,
641 pub span_count: u32,
642 pub has_error: bool,
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648
649 #[tokio::test]
650 async fn test_metrics_store_basic() {
651 let pool = sqlx::PgPool::connect_lazy("postgres://localhost/test").unwrap();
653 let store = MetricsStore::new(pool);
654
655 assert_eq!(store.pending_count().await, 0);
657 }
658
659 #[tokio::test]
660 async fn test_log_store_basic() {
661 let pool = sqlx::PgPool::connect_lazy("postgres://localhost/test").unwrap();
662 let store = LogStore::new(pool);
663
664 assert_eq!(store.pending_count().await, 0);
665 }
666
667 #[tokio::test]
668 async fn test_trace_store_basic() {
669 let pool = sqlx::PgPool::connect_lazy("postgres://localhost/test").unwrap();
670 let store = TraceStore::new(pool);
671
672 assert_eq!(store.trace_count().await, 0);
673 assert_eq!(store.span_count().await, 0);
674 }
675}