Skip to main content

ai_session/observability/
mod.rs

1//! Observability and debugging features for AI sessions
2
3use anyhow::Result;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11/// Observability layer for AI workflows
12pub struct ObservabilityLayer {
13    /// Semantic tracer
14    pub tracer: SemanticTracer,
15    /// Decision tracker
16    pub decision_tracker: DecisionTracker,
17    /// Performance profiler
18    pub profiler: AIProfiler,
19    /// Anomaly detector
20    pub anomaly_detector: AnomalyDetector,
21}
22
23impl Default for ObservabilityLayer {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl ObservabilityLayer {
30    /// Create a new observability layer
31    pub fn new() -> Self {
32        Self {
33            tracer: SemanticTracer::new(),
34            decision_tracker: DecisionTracker::new(),
35            profiler: AIProfiler::new(),
36            anomaly_detector: AnomalyDetector::new(),
37        }
38    }
39
40    /// Record a trace event
41    pub async fn trace(&self, event: TraceEvent) -> Result<()> {
42        self.tracer.record(event).await
43    }
44
45    /// Track a decision
46    pub async fn track_decision(&self, decision: Decision) -> Result<()> {
47        self.decision_tracker.track(decision).await
48    }
49
50    /// Profile performance
51    pub async fn profile<F, R>(&self, name: &str, f: F) -> Result<R>
52    where
53        F: std::future::Future<Output = R>,
54    {
55        let start = std::time::Instant::now();
56        let result = f.await;
57        let duration = start.elapsed();
58
59        self.profiler.record_timing(name, duration).await?;
60        Ok(result)
61    }
62
63    /// Detect anomalies
64    pub async fn check_anomalies(&self) -> Vec<Anomaly> {
65        self.anomaly_detector.detect().await
66    }
67}
68
69/// Semantic tracer for understanding execution flow
70pub struct SemanticTracer {
71    /// Trace storage
72    traces: Arc<RwLock<Vec<TraceEvent>>>,
73    /// Span stack
74    span_stack: Arc<RwLock<Vec<SpanId>>>,
75}
76
77impl Default for SemanticTracer {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl SemanticTracer {
84    /// Create a new tracer
85    pub fn new() -> Self {
86        Self {
87            traces: Arc::new(RwLock::new(Vec::new())),
88            span_stack: Arc::new(RwLock::new(Vec::new())),
89        }
90    }
91
92    /// Record a trace event
93    pub async fn record(&self, event: TraceEvent) -> Result<()> {
94        self.traces.write().await.push(event);
95        Ok(())
96    }
97
98    /// Start a new span
99    pub async fn start_span(&self, name: &str, metadata: HashMap<String, String>) -> SpanId {
100        let span_id = SpanId::new();
101        let event = TraceEvent {
102            id: Uuid::new_v4(),
103            span_id: span_id.clone(),
104            timestamp: Utc::now(),
105            event_type: TraceEventType::SpanStart,
106            name: name.to_string(),
107            metadata,
108        };
109
110        self.record(event).await.ok();
111        self.span_stack.write().await.push(span_id.clone());
112        span_id
113    }
114
115    /// End a span
116    pub async fn end_span(&self, span_id: SpanId) -> Result<()> {
117        let event = TraceEvent {
118            id: Uuid::new_v4(),
119            span_id: span_id.clone(),
120            timestamp: Utc::now(),
121            event_type: TraceEventType::SpanEnd,
122            name: "span_end".to_string(),
123            metadata: HashMap::new(),
124        };
125
126        self.record(event).await?;
127        self.span_stack.write().await.retain(|id| id != &span_id);
128        Ok(())
129    }
130
131    /// Get current span
132    pub async fn current_span(&self) -> Option<SpanId> {
133        self.span_stack.read().await.last().cloned()
134    }
135
136    /// Get all traces
137    pub async fn get_traces(&self) -> Vec<TraceEvent> {
138        self.traces.read().await.clone()
139    }
140}
141
142/// Span identifier
143#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
144pub struct SpanId(Uuid);
145
146impl Default for SpanId {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl SpanId {
153    /// Create a new span ID
154    pub fn new() -> Self {
155        Self(Uuid::new_v4())
156    }
157}
158
159/// Trace event
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct TraceEvent {
162    /// Event ID
163    pub id: Uuid,
164    /// Associated span
165    pub span_id: SpanId,
166    /// Timestamp
167    pub timestamp: DateTime<Utc>,
168    /// Event type
169    pub event_type: TraceEventType,
170    /// Event name
171    pub name: String,
172    /// Metadata
173    pub metadata: HashMap<String, String>,
174}
175
176/// Trace event types
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub enum TraceEventType {
179    SpanStart,
180    SpanEnd,
181    Log,
182    Error,
183    Decision,
184    StateChange,
185}
186
187/// Decision tracker
188pub struct DecisionTracker {
189    /// Decision history
190    decisions: Arc<RwLock<Vec<Decision>>>,
191    /// Decision rationales
192    rationales: Arc<RwLock<HashMap<DecisionId, Rationale>>>,
193    /// Decision outcomes
194    outcomes: Arc<RwLock<HashMap<DecisionId, Outcome>>>,
195}
196
197impl Default for DecisionTracker {
198    fn default() -> Self {
199        Self::new()
200    }
201}
202
203impl DecisionTracker {
204    /// Create a new decision tracker
205    pub fn new() -> Self {
206        Self {
207            decisions: Arc::new(RwLock::new(Vec::new())),
208            rationales: Arc::new(RwLock::new(HashMap::new())),
209            outcomes: Arc::new(RwLock::new(HashMap::new())),
210        }
211    }
212
213    /// Track a decision
214    pub async fn track(&self, decision: Decision) -> Result<()> {
215        self.decisions.write().await.push(decision);
216        Ok(())
217    }
218
219    /// Add rationale for a decision
220    pub async fn add_rationale(&self, decision_id: DecisionId, rationale: Rationale) -> Result<()> {
221        self.rationales.write().await.insert(decision_id, rationale);
222        Ok(())
223    }
224
225    /// Record outcome of a decision
226    pub async fn record_outcome(&self, decision_id: DecisionId, outcome: Outcome) -> Result<()> {
227        self.outcomes.write().await.insert(decision_id, outcome);
228        Ok(())
229    }
230
231    /// Get decision history
232    pub async fn get_decisions(&self) -> Vec<Decision> {
233        self.decisions.read().await.clone()
234    }
235
236    /// Analyze decision patterns
237    pub async fn analyze_patterns(&self) -> DecisionAnalysis {
238        let decisions = self.decisions.read().await;
239        let outcomes = self.outcomes.read().await;
240
241        let total = decisions.len();
242        let with_outcomes = outcomes.len();
243        let successful = outcomes.values().filter(|o| o.success).count();
244
245        DecisionAnalysis {
246            total_decisions: total,
247            decisions_with_outcomes: with_outcomes,
248            success_rate: if with_outcomes > 0 {
249                successful as f64 / with_outcomes as f64
250            } else {
251                0.0
252            },
253            common_patterns: Vec::new(), // Would analyze patterns in real implementation
254        }
255    }
256}
257
258/// Decision ID
259#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
260pub struct DecisionId(Uuid);
261
262impl Default for DecisionId {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268impl DecisionId {
269    /// Create a new decision ID
270    pub fn new() -> Self {
271        Self(Uuid::new_v4())
272    }
273}
274
275/// Decision record
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct Decision {
278    /// Decision ID
279    pub id: DecisionId,
280    /// Decision type
281    pub decision_type: DecisionType,
282    /// Options considered
283    pub options: Vec<String>,
284    /// Selected option
285    pub selected: String,
286    /// Confidence score
287    pub confidence: f64,
288    /// Timestamp
289    pub timestamp: DateTime<Utc>,
290}
291
292/// Decision types
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub enum DecisionType {
295    TaskAssignment,
296    ResourceAllocation,
297    StrategySelection,
298    ErrorHandling,
299    Optimization,
300}
301
302/// Decision rationale
303#[derive(Debug, Clone, Serialize, Deserialize)]
304pub struct Rationale {
305    /// Reasoning steps
306    pub reasoning: Vec<String>,
307    /// Factors considered
308    pub factors: HashMap<String, f64>,
309    /// Constraints
310    pub constraints: Vec<String>,
311}
312
313/// Decision outcome
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct Outcome {
316    /// Was successful
317    pub success: bool,
318    /// Result description
319    pub result: String,
320    /// Metrics
321    pub metrics: HashMap<String, f64>,
322    /// Lessons learned
323    pub lessons: Vec<String>,
324}
325
326/// Decision analysis results
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct DecisionAnalysis {
329    /// Total decisions made
330    pub total_decisions: usize,
331    /// Decisions with recorded outcomes
332    pub decisions_with_outcomes: usize,
333    /// Success rate
334    pub success_rate: f64,
335    /// Common patterns
336    pub common_patterns: Vec<DecisionPattern>,
337}
338
339/// Decision pattern
340#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct DecisionPattern {
342    /// Pattern name
343    pub name: String,
344    /// Frequency
345    pub frequency: usize,
346    /// Average success rate
347    pub success_rate: f64,
348}
349
350/// AI performance profiler
351pub struct AIProfiler {
352    /// Timing records
353    timings: Arc<RwLock<HashMap<String, Vec<std::time::Duration>>>>,
354    /// Memory usage records
355    memory_usage: Arc<RwLock<Vec<MemorySnapshot>>>,
356    /// Token usage
357    token_usage: Arc<RwLock<TokenUsage>>,
358}
359
360impl AIProfiler {
361    /// Create a new profiler
362    pub fn new() -> Self {
363        Self {
364            timings: Arc::new(RwLock::new(HashMap::new())),
365            memory_usage: Arc::new(RwLock::new(Vec::new())),
366            token_usage: Arc::new(RwLock::new(TokenUsage::default())),
367        }
368    }
369
370    /// Record timing
371    pub async fn record_timing(&self, name: &str, duration: std::time::Duration) -> Result<()> {
372        self.timings
373            .write()
374            .await
375            .entry(name.to_string())
376            .or_insert_with(Vec::new)
377            .push(duration);
378        Ok(())
379    }
380
381    /// Record memory usage
382    pub async fn record_memory(&self, snapshot: MemorySnapshot) -> Result<()> {
383        self.memory_usage.write().await.push(snapshot);
384        Ok(())
385    }
386
387    /// Update token usage
388    pub async fn update_token_usage(
389        &self,
390        input_tokens: usize,
391        output_tokens: usize,
392    ) -> Result<()> {
393        let mut usage = self.token_usage.write().await;
394        usage.input_tokens += input_tokens;
395        usage.output_tokens += output_tokens;
396        usage.total_tokens += input_tokens + output_tokens;
397        Ok(())
398    }
399
400    /// Get performance summary
401    pub async fn get_summary(&self) -> PerformanceSummary {
402        let timings = self.timings.read().await;
403        let memory = self.memory_usage.read().await;
404        let tokens = self.token_usage.read().await;
405
406        let mut timing_stats = HashMap::new();
407        for (name, durations) in timings.iter() {
408            if !durations.is_empty() {
409                let total: std::time::Duration = durations.iter().sum();
410                let avg = total / durations.len() as u32;
411                timing_stats.insert(
412                    name.clone(),
413                    TimingStats {
414                        count: durations.len(),
415                        total,
416                        average: avg,
417                        min: *durations.iter().min().unwrap(),
418                        max: *durations.iter().max().unwrap(),
419                    },
420                );
421            }
422        }
423
424        PerformanceSummary {
425            timing_stats,
426            peak_memory: memory.iter().map(|s| s.used_bytes).max().unwrap_or(0),
427            token_usage: tokens.clone(),
428        }
429    }
430}
431
432impl Default for AIProfiler {
433    fn default() -> Self {
434        Self::new()
435    }
436}
437
438/// Memory snapshot
439#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct MemorySnapshot {
441    /// Timestamp
442    pub timestamp: DateTime<Utc>,
443    /// Used memory in bytes
444    pub used_bytes: usize,
445    /// Context
446    pub context: String,
447}
448
449/// Token usage statistics
450#[derive(Debug, Clone, Default, Serialize, Deserialize)]
451pub struct TokenUsage {
452    /// Input tokens
453    pub input_tokens: usize,
454    /// Output tokens
455    pub output_tokens: usize,
456    /// Total tokens
457    pub total_tokens: usize,
458}
459
460/// Timing statistics
461#[derive(Debug, Clone, Serialize, Deserialize)]
462pub struct TimingStats {
463    /// Number of recordings
464    pub count: usize,
465    /// Total time
466    pub total: std::time::Duration,
467    /// Average time
468    pub average: std::time::Duration,
469    /// Minimum time
470    pub min: std::time::Duration,
471    /// Maximum time
472    pub max: std::time::Duration,
473}
474
475/// Performance summary
476#[derive(Debug, Clone, Serialize, Deserialize)]
477pub struct PerformanceSummary {
478    /// Timing statistics by operation
479    pub timing_stats: HashMap<String, TimingStats>,
480    /// Peak memory usage
481    pub peak_memory: usize,
482    /// Token usage
483    pub token_usage: TokenUsage,
484}
485
486/// Anomaly detector
487pub struct AnomalyDetector {
488    /// Anomaly history
489    anomalies: Arc<RwLock<Vec<Anomaly>>>,
490    /// Detection rules
491    _rules: Arc<RwLock<Vec<DetectionRule>>>,
492}
493
494impl AnomalyDetector {
495    /// Create a new anomaly detector
496    pub fn new() -> Self {
497        Self {
498            anomalies: Arc::new(RwLock::new(Vec::new())),
499            _rules: Arc::new(RwLock::new(Self::default_rules())),
500        }
501    }
502
503    /// Default detection rules
504    fn default_rules() -> Vec<DetectionRule> {
505        vec![
506            DetectionRule {
507                name: "High Error Rate".to_string(),
508                condition: RuleCondition::ErrorRate { threshold: 0.1 },
509                severity: Severity::Warning,
510            },
511            DetectionRule {
512                name: "Slow Response".to_string(),
513                condition: RuleCondition::ResponseTime {
514                    threshold: std::time::Duration::from_secs(30),
515                },
516                severity: Severity::Warning,
517            },
518        ]
519    }
520
521    /// Detect anomalies
522    pub async fn detect(&self) -> Vec<Anomaly> {
523        // Simplified implementation
524        // In reality, this would analyze metrics and patterns
525        self.anomalies.read().await.clone()
526    }
527
528    /// Record an anomaly
529    pub async fn record_anomaly(&self, anomaly: Anomaly) -> Result<()> {
530        self.anomalies.write().await.push(anomaly);
531        Ok(())
532    }
533}
534
535/// Anomaly
536#[derive(Debug, Clone, Serialize, Deserialize)]
537pub struct Anomaly {
538    /// Anomaly ID
539    pub id: Uuid,
540    /// Anomaly type
541    pub anomaly_type: AnomalyType,
542    /// Description
543    pub description: String,
544    /// Severity
545    pub severity: Severity,
546    /// Detected at
547    pub detected_at: DateTime<Utc>,
548    /// Context
549    pub context: HashMap<String, serde_json::Value>,
550}
551
552impl Default for AnomalyDetector {
553    fn default() -> Self {
554        Self::new()
555    }
556}
557
558/// Anomaly types
559#[derive(Debug, Clone, Serialize, Deserialize)]
560pub enum AnomalyType {
561    Performance,
562    Error,
563    Security,
564    Resource,
565    Behavioral,
566}
567
568/// Severity levels
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
570pub enum Severity {
571    Info,
572    Warning,
573    Error,
574    Critical,
575}
576
577/// Detection rule
578#[derive(Debug, Clone)]
579pub struct DetectionRule {
580    /// Rule name
581    pub name: String,
582    /// Condition
583    pub condition: RuleCondition,
584    /// Severity if triggered
585    pub severity: Severity,
586}
587
588/// Rule condition
589pub enum RuleCondition {
590    ErrorRate { threshold: f64 },
591    ResponseTime { threshold: std::time::Duration },
592    MemoryUsage { threshold: usize },
593}
594
595impl Clone for RuleCondition {
596    fn clone(&self) -> Self {
597        match self {
598            Self::ErrorRate { threshold } => Self::ErrorRate {
599                threshold: *threshold,
600            },
601            Self::ResponseTime { threshold } => Self::ResponseTime {
602                threshold: *threshold,
603            },
604            Self::MemoryUsage { threshold } => Self::MemoryUsage {
605                threshold: *threshold,
606            },
607        }
608    }
609}
610
611impl std::fmt::Debug for RuleCondition {
612    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
613        match self {
614            Self::ErrorRate { threshold } => f
615                .debug_struct("ErrorRate")
616                .field("threshold", threshold)
617                .finish(),
618            Self::ResponseTime { threshold } => f
619                .debug_struct("ResponseTime")
620                .field("threshold", threshold)
621                .finish(),
622            Self::MemoryUsage { threshold } => f
623                .debug_struct("MemoryUsage")
624                .field("threshold", threshold)
625                .finish(),
626        }
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633
634    #[tokio::test]
635    async fn test_semantic_tracer() {
636        let tracer = SemanticTracer::new();
637
638        let span_id = tracer.start_span("test_operation", HashMap::new()).await;
639        tracer.end_span(span_id).await.unwrap();
640
641        let traces = tracer.get_traces().await;
642        assert_eq!(traces.len(), 2); // Start and end events
643    }
644
645    #[tokio::test]
646    async fn test_decision_tracker() {
647        let tracker = DecisionTracker::new();
648
649        let decision = Decision {
650            id: DecisionId::new(),
651            decision_type: DecisionType::TaskAssignment,
652            options: vec!["Option A".to_string(), "Option B".to_string()],
653            selected: "Option A".to_string(),
654            confidence: 0.85,
655            timestamp: Utc::now(),
656        };
657
658        tracker.track(decision.clone()).await.unwrap();
659
660        let decisions = tracker.get_decisions().await;
661        assert_eq!(decisions.len(), 1);
662        assert_eq!(decisions[0].selected, "Option A");
663    }
664}