mpl_proxy/
qom_recorder.rs

1//! QoM Event Recording and History
2//!
3//! Tracks QoM evaluations, persists events to disk, and maintains history for trends.
4
5use std::collections::VecDeque;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use chrono::{DateTime, Duration, Utc};
10use serde::{Deserialize, Serialize};
11use tokio::sync::RwLock;
12use tracing::{debug, warn};
13
14use mpl_core::determinism::{DeterminismChecker, DeterminismConfig, DeterminismResult, RequestSignature};
15use mpl_core::groundedness::{GroundednessChecker, GroundednessConfig, GroundednessResult, SourceDocument};
16use mpl_core::ontology::{Ontology, OntologyChecker, OntologyResult};
17
18/// A single QoM evaluation event
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct QomEvent {
21    /// Unique event ID
22    pub id: String,
23    /// Timestamp
24    pub timestamp: DateTime<Utc>,
25    /// SType being evaluated
26    pub stype: String,
27    /// Profile used for evaluation
28    pub profile: String,
29    /// Whether the evaluation passed
30    pub passed: bool,
31    /// Individual metric scores
32    pub scores: QomScores,
33    /// Failure reason if any
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub failure_reason: Option<String>,
36    /// Request payload hash (for determinism tracking)
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub payload_hash: Option<String>,
39}
40
41/// Individual QoM metric scores
42#[derive(Debug, Clone, Default, Serialize, Deserialize)]
43pub struct QomScores {
44    /// Schema Fidelity (1.0 = valid, 0.0 = invalid)
45    pub sf: Option<f64>,
46    /// Instruction Compliance
47    pub ic: Option<f64>,
48    /// Tool Outcome Correctness
49    pub toc: Option<f64>,
50    /// Groundedness
51    pub g: Option<f64>,
52    /// Determinism Jitter (1 - jitter, so 1.0 = stable)
53    pub dj: Option<f64>,
54    /// Ontology Adherence
55    pub oa: Option<f64>,
56}
57
58/// Aggregated history point for trends
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct QomHistoryPoint {
61    /// Timestamp for this aggregation point
62    pub timestamp: DateTime<Utc>,
63    /// Number of events in this period
64    pub count: usize,
65    /// Average scores
66    pub sf: f64,
67    pub ic: f64,
68    pub toc: f64,
69    pub g: f64,
70    pub dj: f64,
71    pub oa: f64,
72    /// Pass rate
73    pub pass_rate: f64,
74}
75
76/// Summary statistics for QoM metrics
77#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct QomSummary {
79    pub schema_fidelity: MetricSummary,
80    pub instruction_compliance: MetricSummary,
81    pub tool_outcome_correctness: MetricSummary,
82    pub groundedness: MetricSummary,
83    pub determinism_jitter: MetricSummary,
84    pub ontology_adherence: MetricSummary,
85}
86
87/// Summary for a single metric
88#[derive(Debug, Clone, Default, Serialize, Deserialize)]
89pub struct MetricSummary {
90    pub score: Option<f64>,
91    pub samples: usize,
92    pub failures: usize,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub pending: Option<usize>,
95}
96
97/// Configuration for the QoM recorder
98#[derive(Debug, Clone)]
99pub struct QomRecorderConfig {
100    /// Directory to store QoM data
101    pub data_dir: PathBuf,
102    /// Maximum number of events to keep in memory
103    pub max_events_memory: usize,
104    /// Maximum number of events to keep on disk
105    pub max_events_disk: usize,
106    /// History aggregation interval
107    pub history_interval: Duration,
108    /// Whether to enable groundedness checking
109    pub enable_groundedness: bool,
110    /// Whether to enable determinism checking
111    pub enable_determinism: bool,
112    /// Whether to enable ontology checking
113    pub enable_ontology: bool,
114}
115
116impl Default for QomRecorderConfig {
117    fn default() -> Self {
118        Self {
119            data_dir: PathBuf::from(".mpl/qom"),
120            max_events_memory: 1000,
121            max_events_disk: 10000,
122            history_interval: Duration::minutes(5),
123            enable_groundedness: true,
124            enable_determinism: true,
125            enable_ontology: true,
126        }
127    }
128}
129
130/// QoM recorder - tracks events, computes metrics, persists to disk
131pub struct QomRecorder {
132    config: QomRecorderConfig,
133    /// Recent events (in memory)
134    events: Arc<RwLock<VecDeque<QomEvent>>>,
135    /// Running totals for summary
136    totals: Arc<RwLock<QomTotals>>,
137    /// Groundedness checker
138    groundedness_checker: GroundednessChecker,
139    /// Determinism checker (tracks response history)
140    determinism_checker: Arc<RwLock<DeterminismChecker>>,
141    /// Ontology specs by SType
142    ontology_specs: Arc<RwLock<std::collections::HashMap<String, Ontology>>>,
143    /// Event counter for ID generation
144    event_counter: std::sync::atomic::AtomicU64,
145}
146
147/// Running totals for summary computation
148#[derive(Debug, Default)]
149struct QomTotals {
150    sf_sum: f64,
151    sf_count: usize,
152    sf_failures: usize,
153    ic_sum: f64,
154    ic_count: usize,
155    ic_failures: usize,
156    toc_sum: f64,
157    toc_count: usize,
158    toc_failures: usize,
159    toc_pending: usize,
160    g_sum: f64,
161    g_count: usize,
162    g_failures: usize,
163    dj_sum: f64,
164    dj_count: usize,
165    dj_failures: usize,
166    oa_sum: f64,
167    oa_count: usize,
168    oa_failures: usize,
169}
170
171impl QomRecorder {
172    /// Create a new QoM recorder
173    pub fn new(config: QomRecorderConfig) -> Self {
174        // Ensure data directory exists
175        if let Err(e) = std::fs::create_dir_all(&config.data_dir) {
176            warn!("Failed to create QoM data directory: {}", e);
177        }
178
179        Self {
180            config,
181            events: Arc::new(RwLock::new(VecDeque::new())),
182            totals: Arc::new(RwLock::new(QomTotals::default())),
183            groundedness_checker: GroundednessChecker::new(GroundednessConfig::default()),
184            determinism_checker: Arc::new(RwLock::new(DeterminismChecker::new(
185                DeterminismConfig::default(),
186            ))),
187            ontology_specs: Arc::new(RwLock::new(std::collections::HashMap::new())),
188            event_counter: std::sync::atomic::AtomicU64::new(0),
189        }
190    }
191
192    /// Generate a unique event ID
193    fn next_event_id(&self) -> String {
194        let count = self.event_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
195        format!("evt_{:016x}", count)
196    }
197
198    /// Record a QoM evaluation event
199    pub async fn record_event(&self, event: QomEvent) {
200        // Update totals
201        {
202            let mut totals = self.totals.write().await;
203            if let Some(sf) = event.scores.sf {
204                totals.sf_sum += sf;
205                totals.sf_count += 1;
206                if sf < 1.0 {
207                    totals.sf_failures += 1;
208                }
209            }
210            if let Some(ic) = event.scores.ic {
211                totals.ic_sum += ic;
212                totals.ic_count += 1;
213                if ic < 0.97 {
214                    totals.ic_failures += 1;
215                }
216            }
217            if let Some(toc) = event.scores.toc {
218                totals.toc_sum += toc;
219                totals.toc_count += 1;
220                if toc < 0.9 {
221                    totals.toc_failures += 1;
222                }
223            }
224            if let Some(g) = event.scores.g {
225                totals.g_sum += g;
226                totals.g_count += 1;
227                if g < 0.8 {
228                    totals.g_failures += 1;
229                }
230            }
231            if let Some(dj) = event.scores.dj {
232                totals.dj_sum += dj;
233                totals.dj_count += 1;
234                if dj < 0.9 {
235                    totals.dj_failures += 1;
236                }
237            }
238            if let Some(oa) = event.scores.oa {
239                totals.oa_sum += oa;
240                totals.oa_count += 1;
241                if oa < 0.95 {
242                    totals.oa_failures += 1;
243                }
244            }
245        }
246
247        // Add to in-memory events
248        {
249            let mut events = self.events.write().await;
250            events.push_back(event.clone());
251            while events.len() > self.config.max_events_memory {
252                events.pop_front();
253            }
254        }
255
256        // Persist to disk (append to events file)
257        self.persist_event(&event).await;
258    }
259
260    /// Persist an event to disk
261    async fn persist_event(&self, event: &QomEvent) {
262        let events_file = self.config.data_dir.join("qom_events.jsonl");
263
264        if let Ok(line) = serde_json::to_string(event) {
265            use tokio::io::AsyncWriteExt;
266            if let Ok(mut file) = tokio::fs::OpenOptions::new()
267                .create(true)
268                .append(true)
269                .open(&events_file)
270                .await
271            {
272                let _ = file.write_all(format!("{}\n", line).as_bytes()).await;
273            }
274        }
275    }
276
277    /// Create a QoM event from validation results
278    pub fn create_event(
279        &self,
280        stype: &str,
281        profile: &str,
282        passed: bool,
283        scores: QomScores,
284        failure_reason: Option<String>,
285        payload_hash: Option<String>,
286    ) -> QomEvent {
287        QomEvent {
288            id: self.next_event_id(),
289            timestamp: Utc::now(),
290            stype: stype.to_string(),
291            profile: profile.to_string(),
292            passed,
293            scores,
294            failure_reason,
295            payload_hash,
296        }
297    }
298
299    /// Check groundedness of a response
300    pub fn check_groundedness(
301        &self,
302        response: &str,
303        sources: &[SourceDocument],
304    ) -> GroundednessResult {
305        if !self.config.enable_groundedness {
306            return GroundednessResult {
307                score: 1.0,
308                total_claims: 0,
309                grounded_claims: 0,
310                ungrounded_claims: 0,
311                needs_review_count: 0,
312                claim_results: vec![],
313                method: mpl_core::groundedness::GroundingMethod::Skipped,
314            };
315        }
316
317        self.groundedness_checker.check(response, sources, None)
318    }
319
320    /// Check determinism of a response
321    pub async fn check_determinism(
322        &self,
323        stype: &str,
324        payload_hash: &str,
325        response: &serde_json::Value,
326    ) -> DeterminismResult {
327        if !self.config.enable_determinism {
328            return DeterminismResult {
329                similarity: 1.0,
330                is_deterministic: true,
331                differences: vec![],
332                comparison_count: 0,
333                average_similarity: 1.0,
334                jitter: 0.0,
335            };
336        }
337
338        let signature = RequestSignature {
339            stype: stype.to_string(),
340            payload_hash: payload_hash.to_string(),
341            tool_name: None,
342        };
343
344        let mut checker = self.determinism_checker.write().await;
345        checker.check_and_record(&signature, response)
346    }
347
348    /// Check ontology adherence
349    pub async fn check_ontology(&self, stype: &str, payload: &serde_json::Value) -> OntologyResult {
350        if !self.config.enable_ontology {
351            return OntologyResult {
352                adheres: true,
353                score: 1.0,
354                violations: vec![],
355                constraints_checked: 0,
356                violation_count: 0,
357                error_count: 0,
358                warning_count: 0,
359            };
360        }
361
362        let specs = self.ontology_specs.read().await;
363        if let Some(spec) = specs.get(stype) {
364            let checker = OntologyChecker::new(spec.clone());
365            return checker.check(payload);
366        }
367
368        // No ontology spec for this SType
369        OntologyResult {
370            adheres: true,
371            score: 1.0,
372            violations: vec![],
373            constraints_checked: 0,
374            violation_count: 0,
375            error_count: 0,
376            warning_count: 0,
377        }
378    }
379
380    /// Load ontology spec for an SType
381    pub async fn load_ontology(&self, stype: &str, spec: Ontology) {
382        let mut specs = self.ontology_specs.write().await;
383        specs.insert(stype.to_string(), spec);
384    }
385
386    /// Get QoM summary statistics
387    pub async fn get_summary(&self) -> QomSummary {
388        let t = self.totals.read().await;
389
390        QomSummary {
391            schema_fidelity: MetricSummary {
392                score: if t.sf_count > 0 {
393                    Some(t.sf_sum / t.sf_count as f64)
394                } else {
395                    None
396                },
397                samples: t.sf_count,
398                failures: t.sf_failures,
399                pending: None,
400            },
401            instruction_compliance: MetricSummary {
402                score: if t.ic_count > 0 {
403                    Some(t.ic_sum / t.ic_count as f64)
404                } else {
405                    None
406                },
407                samples: t.ic_count,
408                failures: t.ic_failures,
409                pending: None,
410            },
411            tool_outcome_correctness: MetricSummary {
412                score: if t.toc_count > 0 {
413                    Some(t.toc_sum / t.toc_count as f64)
414                } else {
415                    None
416                },
417                samples: t.toc_count,
418                failures: t.toc_failures,
419                pending: Some(t.toc_pending),
420            },
421            groundedness: MetricSummary {
422                score: if t.g_count > 0 {
423                    Some(t.g_sum / t.g_count as f64)
424                } else {
425                    None
426                },
427                samples: t.g_count,
428                failures: t.g_failures,
429                pending: None,
430            },
431            determinism_jitter: MetricSummary {
432                score: if t.dj_count > 0 {
433                    Some(t.dj_sum / t.dj_count as f64)
434                } else {
435                    None
436                },
437                samples: t.dj_count,
438                failures: t.dj_failures,
439                pending: None,
440            },
441            ontology_adherence: MetricSummary {
442                score: if t.oa_count > 0 {
443                    Some(t.oa_sum / t.oa_count as f64)
444                } else {
445                    None
446                },
447                samples: t.oa_count,
448                failures: t.oa_failures,
449                pending: None,
450            },
451        }
452    }
453
454    /// Get recent events
455    pub async fn get_events(&self, limit: usize) -> Vec<QomEvent> {
456        let events = self.events.read().await;
457        events.iter().rev().take(limit).cloned().collect()
458    }
459
460    /// Get history for a time period
461    pub async fn get_history(&self, period: &str) -> Vec<QomHistoryPoint> {
462        let now = Utc::now();
463        let (duration, points) = match period {
464            "1h" => (Duration::hours(1), 12),
465            "6h" => (Duration::hours(6), 12),
466            "7d" => (Duration::days(7), 14),
467            _ => (Duration::hours(24), 24), // 24h default
468        };
469
470        // Load history from disk or compute from events
471        let history_file = self.config.data_dir.join("qom_history.json");
472
473        if history_file.exists() {
474            if let Ok(content) = tokio::fs::read_to_string(&history_file).await {
475                if let Ok(history) = serde_json::from_str::<Vec<QomHistoryPoint>>(&content) {
476                    // Filter to requested period
477                    let cutoff = now - duration;
478                    return history
479                        .into_iter()
480                        .filter(|p| p.timestamp > cutoff)
481                        .collect();
482                }
483            }
484        }
485
486        // Generate from in-memory events
487        self.compute_history_from_events(duration, points).await
488    }
489
490    /// Compute history points from in-memory events
491    async fn compute_history_from_events(&self, duration: Duration, points: usize) -> Vec<QomHistoryPoint> {
492        let now = Utc::now();
493        let interval = duration / points as i32;
494        let mut history = Vec::with_capacity(points);
495
496        let events = self.events.read().await;
497
498        for i in 0..points {
499            let point_start = now - duration + interval * i as i32;
500            let point_end = point_start + interval;
501
502            let mut sf_sum = 0.0;
503            let mut ic_sum = 0.0;
504            let mut toc_sum = 0.0;
505            let mut g_sum = 0.0;
506            let mut dj_sum = 0.0;
507            let mut oa_sum = 0.0;
508            let mut pass_count = 0;
509            let mut total_count = 0;
510            let mut sf_count = 0;
511            let mut ic_count = 0;
512            let mut toc_count = 0;
513            let mut g_count = 0;
514            let mut dj_count = 0;
515            let mut oa_count = 0;
516
517            for event in events.iter() {
518                if event.timestamp >= point_start && event.timestamp < point_end {
519                    total_count += 1;
520                    if event.passed {
521                        pass_count += 1;
522                    }
523                    if let Some(sf) = event.scores.sf {
524                        sf_sum += sf;
525                        sf_count += 1;
526                    }
527                    if let Some(ic) = event.scores.ic {
528                        ic_sum += ic;
529                        ic_count += 1;
530                    }
531                    if let Some(toc) = event.scores.toc {
532                        toc_sum += toc;
533                        toc_count += 1;
534                    }
535                    if let Some(g) = event.scores.g {
536                        g_sum += g;
537                        g_count += 1;
538                    }
539                    if let Some(dj) = event.scores.dj {
540                        dj_sum += dj;
541                        dj_count += 1;
542                    }
543                    if let Some(oa) = event.scores.oa {
544                        oa_sum += oa;
545                        oa_count += 1;
546                    }
547                }
548            }
549
550            history.push(QomHistoryPoint {
551                timestamp: point_start,
552                count: total_count,
553                sf: if sf_count > 0 { sf_sum / sf_count as f64 } else { 1.0 },
554                ic: if ic_count > 0 { ic_sum / ic_count as f64 } else { 0.0 },
555                toc: if toc_count > 0 { toc_sum / toc_count as f64 } else { 0.0 },
556                g: if g_count > 0 { g_sum / g_count as f64 } else { 0.0 },
557                dj: if dj_count > 0 { dj_sum / dj_count as f64 } else { 0.0 },
558                oa: if oa_count > 0 { oa_sum / oa_count as f64 } else { 0.0 },
559                pass_rate: if total_count > 0 {
560                    pass_count as f64 / total_count as f64
561                } else {
562                    1.0
563                },
564            });
565        }
566
567        history
568    }
569
570    /// Increment TOC pending count
571    pub async fn inc_toc_pending(&self) {
572        let mut totals = self.totals.write().await;
573        totals.toc_pending += 1;
574    }
575
576    /// Decrement TOC pending count
577    pub async fn dec_toc_pending(&self) {
578        let mut totals = self.totals.write().await;
579        if totals.toc_pending > 0 {
580            totals.toc_pending -= 1;
581        }
582    }
583
584    /// Persist history to disk (should be called periodically)
585    pub async fn persist_history(&self) {
586        let history = self.compute_history_from_events(Duration::days(7), 168).await; // 7 days, hourly
587        let history_file = self.config.data_dir.join("qom_history.json");
588
589        if let Ok(content) = serde_json::to_string_pretty(&history) {
590            if let Err(e) = tokio::fs::write(&history_file, content).await {
591                warn!("Failed to persist QoM history: {}", e);
592            } else {
593                debug!("Persisted {} history points", history.len());
594            }
595        }
596    }
597
598    /// Load events from disk on startup
599    pub async fn load_from_disk(&self) -> anyhow::Result<()> {
600        let events_file = self.config.data_dir.join("qom_events.jsonl");
601
602        if !events_file.exists() {
603            return Ok(());
604        }
605
606        let content = tokio::fs::read_to_string(&events_file).await?;
607        let mut loaded = 0;
608
609        let lines: Vec<&str> = content.lines().collect();
610        let mut events = self.events.write().await;
611
612        for line in lines.iter().rev().take(self.config.max_events_memory) {
613            if let Ok(event) = serde_json::from_str::<QomEvent>(line) {
614                events.push_front(event);
615                loaded += 1;
616            }
617        }
618
619        debug!("Loaded {} QoM events from disk", loaded);
620        Ok(())
621    }
622}
623
624impl Default for QomRecorder {
625    fn default() -> Self {
626        Self::new(QomRecorderConfig::default())
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633
634    #[tokio::test]
635    async fn test_record_event() {
636        let recorder = QomRecorder::new(QomRecorderConfig {
637            data_dir: PathBuf::from("/tmp/mpl_test_qom"),
638            ..Default::default()
639        });
640
641        let event = recorder.create_event(
642            "org.test.Type.v1",
643            "qom-basic",
644            true,
645            QomScores {
646                sf: Some(1.0),
647                ic: Some(0.95),
648                ..Default::default()
649            },
650            None,
651            None,
652        );
653
654        recorder.record_event(event).await;
655
656        let summary = recorder.get_summary().await;
657        assert_eq!(summary.schema_fidelity.samples, 1);
658        assert_eq!(summary.instruction_compliance.samples, 1);
659    }
660
661    #[tokio::test]
662    async fn test_get_events() {
663        let recorder = QomRecorder::new(QomRecorderConfig {
664            data_dir: PathBuf::from("/tmp/mpl_test_qom2"),
665            ..Default::default()
666        });
667
668        for i in 0..5 {
669            let event = recorder.create_event(
670                &format!("org.test.Type{}.v1", i),
671                "qom-basic",
672                true,
673                QomScores::default(),
674                None,
675                None,
676            );
677            recorder.record_event(event).await;
678        }
679
680        let events = recorder.get_events(3).await;
681        assert_eq!(events.len(), 3);
682    }
683}