1use std::collections::VecDeque;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use chrono::{DateTime, Duration, Utc};
10use serde::{Deserialize, Serialize};
11use tokio::sync::RwLock;
12use tracing::{debug, warn};
13
14use mpl_core::determinism::{DeterminismChecker, DeterminismConfig, DeterminismResult, RequestSignature};
15use mpl_core::groundedness::{GroundednessChecker, GroundednessConfig, GroundednessResult, SourceDocument};
16use mpl_core::ontology::{Ontology, OntologyChecker, OntologyResult};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct QomEvent {
21 pub id: String,
23 pub timestamp: DateTime<Utc>,
25 pub stype: String,
27 pub profile: String,
29 pub passed: bool,
31 pub scores: QomScores,
33 #[serde(skip_serializing_if = "Option::is_none")]
35 pub failure_reason: Option<String>,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub payload_hash: Option<String>,
39}
40
41#[derive(Debug, Clone, Default, Serialize, Deserialize)]
43pub struct QomScores {
44 pub sf: Option<f64>,
46 pub ic: Option<f64>,
48 pub toc: Option<f64>,
50 pub g: Option<f64>,
52 pub dj: Option<f64>,
54 pub oa: Option<f64>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct QomHistoryPoint {
61 pub timestamp: DateTime<Utc>,
63 pub count: usize,
65 pub sf: f64,
67 pub ic: f64,
68 pub toc: f64,
69 pub g: f64,
70 pub dj: f64,
71 pub oa: f64,
72 pub pass_rate: f64,
74}
75
76#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct QomSummary {
79 pub schema_fidelity: MetricSummary,
80 pub instruction_compliance: MetricSummary,
81 pub tool_outcome_correctness: MetricSummary,
82 pub groundedness: MetricSummary,
83 pub determinism_jitter: MetricSummary,
84 pub ontology_adherence: MetricSummary,
85}
86
87#[derive(Debug, Clone, Default, Serialize, Deserialize)]
89pub struct MetricSummary {
90 pub score: Option<f64>,
91 pub samples: usize,
92 pub failures: usize,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub pending: Option<usize>,
95}
96
97#[derive(Debug, Clone)]
99pub struct QomRecorderConfig {
100 pub data_dir: PathBuf,
102 pub max_events_memory: usize,
104 pub max_events_disk: usize,
106 pub history_interval: Duration,
108 pub enable_groundedness: bool,
110 pub enable_determinism: bool,
112 pub enable_ontology: bool,
114}
115
116impl Default for QomRecorderConfig {
117 fn default() -> Self {
118 Self {
119 data_dir: PathBuf::from(".mpl/qom"),
120 max_events_memory: 1000,
121 max_events_disk: 10000,
122 history_interval: Duration::minutes(5),
123 enable_groundedness: true,
124 enable_determinism: true,
125 enable_ontology: true,
126 }
127 }
128}
129
130pub struct QomRecorder {
132 config: QomRecorderConfig,
133 events: Arc<RwLock<VecDeque<QomEvent>>>,
135 totals: Arc<RwLock<QomTotals>>,
137 groundedness_checker: GroundednessChecker,
139 determinism_checker: Arc<RwLock<DeterminismChecker>>,
141 ontology_specs: Arc<RwLock<std::collections::HashMap<String, Ontology>>>,
143 event_counter: std::sync::atomic::AtomicU64,
145}
146
147#[derive(Debug, Default)]
149struct QomTotals {
150 sf_sum: f64,
151 sf_count: usize,
152 sf_failures: usize,
153 ic_sum: f64,
154 ic_count: usize,
155 ic_failures: usize,
156 toc_sum: f64,
157 toc_count: usize,
158 toc_failures: usize,
159 toc_pending: usize,
160 g_sum: f64,
161 g_count: usize,
162 g_failures: usize,
163 dj_sum: f64,
164 dj_count: usize,
165 dj_failures: usize,
166 oa_sum: f64,
167 oa_count: usize,
168 oa_failures: usize,
169}
170
171impl QomRecorder {
172 pub fn new(config: QomRecorderConfig) -> Self {
174 if let Err(e) = std::fs::create_dir_all(&config.data_dir) {
176 warn!("Failed to create QoM data directory: {}", e);
177 }
178
179 Self {
180 config,
181 events: Arc::new(RwLock::new(VecDeque::new())),
182 totals: Arc::new(RwLock::new(QomTotals::default())),
183 groundedness_checker: GroundednessChecker::new(GroundednessConfig::default()),
184 determinism_checker: Arc::new(RwLock::new(DeterminismChecker::new(
185 DeterminismConfig::default(),
186 ))),
187 ontology_specs: Arc::new(RwLock::new(std::collections::HashMap::new())),
188 event_counter: std::sync::atomic::AtomicU64::new(0),
189 }
190 }
191
192 fn next_event_id(&self) -> String {
194 let count = self.event_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
195 format!("evt_{:016x}", count)
196 }
197
198 pub async fn record_event(&self, event: QomEvent) {
200 {
202 let mut totals = self.totals.write().await;
203 if let Some(sf) = event.scores.sf {
204 totals.sf_sum += sf;
205 totals.sf_count += 1;
206 if sf < 1.0 {
207 totals.sf_failures += 1;
208 }
209 }
210 if let Some(ic) = event.scores.ic {
211 totals.ic_sum += ic;
212 totals.ic_count += 1;
213 if ic < 0.97 {
214 totals.ic_failures += 1;
215 }
216 }
217 if let Some(toc) = event.scores.toc {
218 totals.toc_sum += toc;
219 totals.toc_count += 1;
220 if toc < 0.9 {
221 totals.toc_failures += 1;
222 }
223 }
224 if let Some(g) = event.scores.g {
225 totals.g_sum += g;
226 totals.g_count += 1;
227 if g < 0.8 {
228 totals.g_failures += 1;
229 }
230 }
231 if let Some(dj) = event.scores.dj {
232 totals.dj_sum += dj;
233 totals.dj_count += 1;
234 if dj < 0.9 {
235 totals.dj_failures += 1;
236 }
237 }
238 if let Some(oa) = event.scores.oa {
239 totals.oa_sum += oa;
240 totals.oa_count += 1;
241 if oa < 0.95 {
242 totals.oa_failures += 1;
243 }
244 }
245 }
246
247 {
249 let mut events = self.events.write().await;
250 events.push_back(event.clone());
251 while events.len() > self.config.max_events_memory {
252 events.pop_front();
253 }
254 }
255
256 self.persist_event(&event).await;
258 }
259
260 async fn persist_event(&self, event: &QomEvent) {
262 let events_file = self.config.data_dir.join("qom_events.jsonl");
263
264 if let Ok(line) = serde_json::to_string(event) {
265 use tokio::io::AsyncWriteExt;
266 if let Ok(mut file) = tokio::fs::OpenOptions::new()
267 .create(true)
268 .append(true)
269 .open(&events_file)
270 .await
271 {
272 let _ = file.write_all(format!("{}\n", line).as_bytes()).await;
273 }
274 }
275 }
276
277 pub fn create_event(
279 &self,
280 stype: &str,
281 profile: &str,
282 passed: bool,
283 scores: QomScores,
284 failure_reason: Option<String>,
285 payload_hash: Option<String>,
286 ) -> QomEvent {
287 QomEvent {
288 id: self.next_event_id(),
289 timestamp: Utc::now(),
290 stype: stype.to_string(),
291 profile: profile.to_string(),
292 passed,
293 scores,
294 failure_reason,
295 payload_hash,
296 }
297 }
298
299 pub fn check_groundedness(
301 &self,
302 response: &str,
303 sources: &[SourceDocument],
304 ) -> GroundednessResult {
305 if !self.config.enable_groundedness {
306 return GroundednessResult {
307 score: 1.0,
308 total_claims: 0,
309 grounded_claims: 0,
310 ungrounded_claims: 0,
311 needs_review_count: 0,
312 claim_results: vec![],
313 method: mpl_core::groundedness::GroundingMethod::Skipped,
314 };
315 }
316
317 self.groundedness_checker.check(response, sources, None)
318 }
319
320 pub async fn check_determinism(
322 &self,
323 stype: &str,
324 payload_hash: &str,
325 response: &serde_json::Value,
326 ) -> DeterminismResult {
327 if !self.config.enable_determinism {
328 return DeterminismResult {
329 similarity: 1.0,
330 is_deterministic: true,
331 differences: vec![],
332 comparison_count: 0,
333 average_similarity: 1.0,
334 jitter: 0.0,
335 };
336 }
337
338 let signature = RequestSignature {
339 stype: stype.to_string(),
340 payload_hash: payload_hash.to_string(),
341 tool_name: None,
342 };
343
344 let mut checker = self.determinism_checker.write().await;
345 checker.check_and_record(&signature, response)
346 }
347
348 pub async fn check_ontology(&self, stype: &str, payload: &serde_json::Value) -> OntologyResult {
350 if !self.config.enable_ontology {
351 return OntologyResult {
352 adheres: true,
353 score: 1.0,
354 violations: vec![],
355 constraints_checked: 0,
356 violation_count: 0,
357 error_count: 0,
358 warning_count: 0,
359 };
360 }
361
362 let specs = self.ontology_specs.read().await;
363 if let Some(spec) = specs.get(stype) {
364 let checker = OntologyChecker::new(spec.clone());
365 return checker.check(payload);
366 }
367
368 OntologyResult {
370 adheres: true,
371 score: 1.0,
372 violations: vec![],
373 constraints_checked: 0,
374 violation_count: 0,
375 error_count: 0,
376 warning_count: 0,
377 }
378 }
379
380 pub async fn load_ontology(&self, stype: &str, spec: Ontology) {
382 let mut specs = self.ontology_specs.write().await;
383 specs.insert(stype.to_string(), spec);
384 }
385
386 pub async fn get_summary(&self) -> QomSummary {
388 let t = self.totals.read().await;
389
390 QomSummary {
391 schema_fidelity: MetricSummary {
392 score: if t.sf_count > 0 {
393 Some(t.sf_sum / t.sf_count as f64)
394 } else {
395 None
396 },
397 samples: t.sf_count,
398 failures: t.sf_failures,
399 pending: None,
400 },
401 instruction_compliance: MetricSummary {
402 score: if t.ic_count > 0 {
403 Some(t.ic_sum / t.ic_count as f64)
404 } else {
405 None
406 },
407 samples: t.ic_count,
408 failures: t.ic_failures,
409 pending: None,
410 },
411 tool_outcome_correctness: MetricSummary {
412 score: if t.toc_count > 0 {
413 Some(t.toc_sum / t.toc_count as f64)
414 } else {
415 None
416 },
417 samples: t.toc_count,
418 failures: t.toc_failures,
419 pending: Some(t.toc_pending),
420 },
421 groundedness: MetricSummary {
422 score: if t.g_count > 0 {
423 Some(t.g_sum / t.g_count as f64)
424 } else {
425 None
426 },
427 samples: t.g_count,
428 failures: t.g_failures,
429 pending: None,
430 },
431 determinism_jitter: MetricSummary {
432 score: if t.dj_count > 0 {
433 Some(t.dj_sum / t.dj_count as f64)
434 } else {
435 None
436 },
437 samples: t.dj_count,
438 failures: t.dj_failures,
439 pending: None,
440 },
441 ontology_adherence: MetricSummary {
442 score: if t.oa_count > 0 {
443 Some(t.oa_sum / t.oa_count as f64)
444 } else {
445 None
446 },
447 samples: t.oa_count,
448 failures: t.oa_failures,
449 pending: None,
450 },
451 }
452 }
453
454 pub async fn get_events(&self, limit: usize) -> Vec<QomEvent> {
456 let events = self.events.read().await;
457 events.iter().rev().take(limit).cloned().collect()
458 }
459
460 pub async fn get_history(&self, period: &str) -> Vec<QomHistoryPoint> {
462 let now = Utc::now();
463 let (duration, points) = match period {
464 "1h" => (Duration::hours(1), 12),
465 "6h" => (Duration::hours(6), 12),
466 "7d" => (Duration::days(7), 14),
467 _ => (Duration::hours(24), 24), };
469
470 let history_file = self.config.data_dir.join("qom_history.json");
472
473 if history_file.exists() {
474 if let Ok(content) = tokio::fs::read_to_string(&history_file).await {
475 if let Ok(history) = serde_json::from_str::<Vec<QomHistoryPoint>>(&content) {
476 let cutoff = now - duration;
478 return history
479 .into_iter()
480 .filter(|p| p.timestamp > cutoff)
481 .collect();
482 }
483 }
484 }
485
486 self.compute_history_from_events(duration, points).await
488 }
489
490 async fn compute_history_from_events(&self, duration: Duration, points: usize) -> Vec<QomHistoryPoint> {
492 let now = Utc::now();
493 let interval = duration / points as i32;
494 let mut history = Vec::with_capacity(points);
495
496 let events = self.events.read().await;
497
498 for i in 0..points {
499 let point_start = now - duration + interval * i as i32;
500 let point_end = point_start + interval;
501
502 let mut sf_sum = 0.0;
503 let mut ic_sum = 0.0;
504 let mut toc_sum = 0.0;
505 let mut g_sum = 0.0;
506 let mut dj_sum = 0.0;
507 let mut oa_sum = 0.0;
508 let mut pass_count = 0;
509 let mut total_count = 0;
510 let mut sf_count = 0;
511 let mut ic_count = 0;
512 let mut toc_count = 0;
513 let mut g_count = 0;
514 let mut dj_count = 0;
515 let mut oa_count = 0;
516
517 for event in events.iter() {
518 if event.timestamp >= point_start && event.timestamp < point_end {
519 total_count += 1;
520 if event.passed {
521 pass_count += 1;
522 }
523 if let Some(sf) = event.scores.sf {
524 sf_sum += sf;
525 sf_count += 1;
526 }
527 if let Some(ic) = event.scores.ic {
528 ic_sum += ic;
529 ic_count += 1;
530 }
531 if let Some(toc) = event.scores.toc {
532 toc_sum += toc;
533 toc_count += 1;
534 }
535 if let Some(g) = event.scores.g {
536 g_sum += g;
537 g_count += 1;
538 }
539 if let Some(dj) = event.scores.dj {
540 dj_sum += dj;
541 dj_count += 1;
542 }
543 if let Some(oa) = event.scores.oa {
544 oa_sum += oa;
545 oa_count += 1;
546 }
547 }
548 }
549
550 history.push(QomHistoryPoint {
551 timestamp: point_start,
552 count: total_count,
553 sf: if sf_count > 0 { sf_sum / sf_count as f64 } else { 1.0 },
554 ic: if ic_count > 0 { ic_sum / ic_count as f64 } else { 0.0 },
555 toc: if toc_count > 0 { toc_sum / toc_count as f64 } else { 0.0 },
556 g: if g_count > 0 { g_sum / g_count as f64 } else { 0.0 },
557 dj: if dj_count > 0 { dj_sum / dj_count as f64 } else { 0.0 },
558 oa: if oa_count > 0 { oa_sum / oa_count as f64 } else { 0.0 },
559 pass_rate: if total_count > 0 {
560 pass_count as f64 / total_count as f64
561 } else {
562 1.0
563 },
564 });
565 }
566
567 history
568 }
569
570 pub async fn inc_toc_pending(&self) {
572 let mut totals = self.totals.write().await;
573 totals.toc_pending += 1;
574 }
575
576 pub async fn dec_toc_pending(&self) {
578 let mut totals = self.totals.write().await;
579 if totals.toc_pending > 0 {
580 totals.toc_pending -= 1;
581 }
582 }
583
584 pub async fn persist_history(&self) {
586 let history = self.compute_history_from_events(Duration::days(7), 168).await; let history_file = self.config.data_dir.join("qom_history.json");
588
589 if let Ok(content) = serde_json::to_string_pretty(&history) {
590 if let Err(e) = tokio::fs::write(&history_file, content).await {
591 warn!("Failed to persist QoM history: {}", e);
592 } else {
593 debug!("Persisted {} history points", history.len());
594 }
595 }
596 }
597
598 pub async fn load_from_disk(&self) -> anyhow::Result<()> {
600 let events_file = self.config.data_dir.join("qom_events.jsonl");
601
602 if !events_file.exists() {
603 return Ok(());
604 }
605
606 let content = tokio::fs::read_to_string(&events_file).await?;
607 let mut loaded = 0;
608
609 let lines: Vec<&str> = content.lines().collect();
610 let mut events = self.events.write().await;
611
612 for line in lines.iter().rev().take(self.config.max_events_memory) {
613 if let Ok(event) = serde_json::from_str::<QomEvent>(line) {
614 events.push_front(event);
615 loaded += 1;
616 }
617 }
618
619 debug!("Loaded {} QoM events from disk", loaded);
620 Ok(())
621 }
622}
623
624impl Default for QomRecorder {
625 fn default() -> Self {
626 Self::new(QomRecorderConfig::default())
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 use super::*;
633
634 #[tokio::test]
635 async fn test_record_event() {
636 let recorder = QomRecorder::new(QomRecorderConfig {
637 data_dir: PathBuf::from("/tmp/mpl_test_qom"),
638 ..Default::default()
639 });
640
641 let event = recorder.create_event(
642 "org.test.Type.v1",
643 "qom-basic",
644 true,
645 QomScores {
646 sf: Some(1.0),
647 ic: Some(0.95),
648 ..Default::default()
649 },
650 None,
651 None,
652 );
653
654 recorder.record_event(event).await;
655
656 let summary = recorder.get_summary().await;
657 assert_eq!(summary.schema_fidelity.samples, 1);
658 assert_eq!(summary.instruction_compliance.samples, 1);
659 }
660
661 #[tokio::test]
662 async fn test_get_events() {
663 let recorder = QomRecorder::new(QomRecorderConfig {
664 data_dir: PathBuf::from("/tmp/mpl_test_qom2"),
665 ..Default::default()
666 });
667
668 for i in 0..5 {
669 let event = recorder.create_event(
670 &format!("org.test.Type{}.v1", i),
671 "qom-basic",
672 true,
673 QomScores::default(),
674 None,
675 None,
676 );
677 recorder.record_event(event).await;
678 }
679
680 let events = recorder.get_events(3).await;
681 assert_eq!(events.len(), 3);
682 }
683}