1use std::sync::{Arc, Mutex};
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use crate::budget::Budget;
5use crate::observer::ExecutionObserver;
6use crate::progress::ProgressInfo;
7use crate::recent_log::{LogEntry, LogSink};
8use crate::tokens::{estimate_tokens, TokenCount, TokenSource};
9use crate::{BudgetHandle, CustomMetrics, CustomMetricsHandle, LlmQuery, ProgressHandle, QueryId};
10
11struct TranscriptEntry {
20 query_id: String,
21 prompt: String,
22 system: Option<String>,
23 response: Option<String>,
24 prompt_tokens: u64,
26 prompt_source: TokenSource,
27 response_tokens: u64,
30 response_source: TokenSource,
31 started_at_ms: i64,
33 completed_at_ms: Option<i64>,
36}
37
38impl TranscriptEntry {
39 fn to_json(&self) -> serde_json::Value {
40 serde_json::json!({
41 "query_id": self.query_id,
42 "prompt": self.prompt,
43 "system": self.system,
44 "response": self.response,
45 })
46 }
47
48 fn to_history_json(&self) -> serde_json::Value {
57 serde_json::json!({
58 "query_id": self.query_id,
59 "prompt": self.prompt,
60 "response": self.response,
61 "prompt_tokens": self.prompt_tokens,
62 "response_tokens": self.response_tokens,
63 "started_at": self.started_at_ms,
64 "completed_at": self.completed_at_ms,
65 })
66 }
67}
68
69pub(crate) struct SessionStatus {
123 started_at: Instant,
124 ended_at: Option<Instant>,
125 pub(crate) llm_calls: u64,
126 pauses: u64,
127 rounds: u64,
128 total_prompt_chars: u64,
129 total_response_chars: u64,
130 transcript: Vec<TranscriptEntry>,
131 pub(crate) budget: Option<Budget>,
132 pub(crate) progress: Option<ProgressInfo>,
133}
134
135impl SessionStatus {
136 fn new() -> Self {
137 Self {
138 started_at: Instant::now(),
139 ended_at: None,
140 llm_calls: 0,
141 pauses: 0,
142 rounds: 0,
143 total_prompt_chars: 0,
144 total_response_chars: 0,
145 transcript: Vec::new(),
146 budget: None,
147 progress: None,
148 }
149 }
150
151 fn prompt_token_count(&self) -> TokenCount {
153 let mut tc = TokenCount::new(TokenSource::Definite);
154 for e in &self.transcript {
155 tc.accumulate(e.prompt_tokens, e.prompt_source);
156 }
157 tc
158 }
159
160 fn response_token_count(&self) -> TokenCount {
162 let mut tc = TokenCount::new(TokenSource::Definite);
163 for e in &self.transcript {
164 tc.accumulate(e.response_tokens, e.response_source);
165 }
166 tc
167 }
168
169 fn total_tokens(&self) -> u64 {
171 self.transcript
172 .iter()
173 .map(|e| e.prompt_tokens + e.response_tokens)
174 .sum()
175 }
176
177 fn elapsed_ms(&self) -> u64 {
179 self.ended_at
180 .map(|end| end.duration_since(self.started_at).as_millis() as u64)
181 .unwrap_or_else(|| self.started_at.elapsed().as_millis() as u64)
182 }
183
184 fn to_json(&self) -> serde_json::Value {
185 let prompt_tc = self.prompt_token_count();
186 let response_tc = self.response_token_count();
187 let total_tc = TokenCount {
188 tokens: prompt_tc.tokens + response_tc.tokens,
189 source: prompt_tc.source.weaker(response_tc.source),
190 };
191 let mut json = serde_json::json!({
192 "elapsed_ms": self.elapsed_ms(),
193 "llm_calls": self.llm_calls,
194 "pauses": self.pauses,
195 "rounds": self.rounds,
196 "total_prompt_chars": self.total_prompt_chars,
197 "total_response_chars": self.total_response_chars,
198 "prompt_tokens": prompt_tc.to_json(),
199 "response_tokens": response_tc.to_json(),
200 "total_tokens": total_tc.to_json(),
201 });
202 if let Some(ref b) = self.budget {
203 json["budget"] = b.to_json();
204 }
205 json
206 }
207
208 pub(crate) fn check_budget(&self) -> Result<(), String> {
209 match self.budget {
210 Some(ref b) => b.check(self.llm_calls, self.elapsed_ms(), self.total_tokens()),
211 None => Ok(()),
212 }
213 }
214
215 fn snapshot(&self, include_history: bool, log_sink: &LogSink) -> serde_json::Value {
235 let prompt_tc = self.prompt_token_count();
237 let response_tc = self.response_token_count();
238 let total_tokens = prompt_tc.tokens + response_tc.tokens;
239
240 let current_query = self.transcript.last().and_then(|e| {
243 if e.response.is_none() {
244 Some(serde_json::json!({
245 "query_id": e.query_id,
246 "prompt_tokens": e.prompt_tokens,
247 "started_waiting_at": e.started_at_ms,
248 }))
249 } else {
250 None
251 }
252 });
253
254 let mut json = serde_json::json!({
255 "elapsed_ms": self.elapsed_ms(),
256 "llm_calls": self.llm_calls,
257 "rounds": self.rounds,
258 "tokens": {
259 "prompt_total": prompt_tc.tokens,
260 "response_total": response_tc.tokens,
261 "total": total_tokens,
262 "current_query": current_query,
263 },
264 "recent_logs": log_sink.to_json(),
265 });
266
267 if let Some(ref p) = self.progress {
268 json["progress"] = serde_json::json!({
269 "step": p.step,
270 "total": p.total,
271 "message": p.message,
272 });
273 }
274
275 if let Some(ref b) = self.budget {
276 json["budget_remaining"] =
277 b.remaining_json(self.llm_calls, self.elapsed_ms(), self.total_tokens());
278 }
279
280 if include_history {
281 let start = self.transcript.len().saturating_sub(10);
283 let history: Vec<serde_json::Value> = self.transcript[start..]
284 .iter()
285 .map(|e| e.to_history_json())
286 .collect();
287 json["conversation_history"] = serde_json::Value::Array(history);
288 }
289
290 json
291 }
292
293 pub(crate) fn budget_remaining(&self) -> serde_json::Value {
294 match self.budget {
295 None => serde_json::Value::Null,
296 Some(ref b) => b.remaining_json(self.llm_calls, self.elapsed_ms(), self.total_tokens()),
297 }
298 }
299}
300
301pub struct ExecutionMetrics {
314 auto: Arc<Mutex<SessionStatus>>,
315 custom: Arc<Mutex<CustomMetrics>>,
316 log_sink: LogSink,
317}
318
319impl ExecutionMetrics {
320 pub fn new() -> Self {
321 Self {
322 auto: Arc::new(Mutex::new(SessionStatus::new())),
323 custom: Arc::new(Mutex::new(CustomMetrics::new())),
324 log_sink: LogSink::new(),
325 }
326 }
327
328 pub fn to_json(&self) -> serde_json::Value {
330 let auto_json = self
331 .auto
332 .lock()
333 .map(|m| m.to_json())
334 .unwrap_or(serde_json::Value::Null);
335
336 let custom_json = self
337 .custom
338 .lock()
339 .map(|m| m.to_json())
340 .unwrap_or(serde_json::Value::Null);
341
342 serde_json::json!({
343 "auto": auto_json,
344 "custom": custom_json,
345 })
346 }
347
348 pub fn transcript_to_json(&self) -> Vec<serde_json::Value> {
350 self.auto
351 .lock()
352 .map(|m| m.transcript.iter().map(|e| e.to_json()).collect())
353 .unwrap_or_default()
354 }
355
356 pub fn custom_metrics_handle(&self) -> CustomMetricsHandle {
358 CustomMetricsHandle::new(Arc::clone(&self.custom))
359 }
360
361 pub fn set_budget(&self, budget: Budget) {
363 if let Ok(mut m) = self.auto.lock() {
364 m.budget = Some(budget);
365 }
366 }
367
368 pub fn budget_handle(&self) -> BudgetHandle {
370 BudgetHandle::new(Arc::clone(&self.auto))
371 }
372
373 pub fn progress_handle(&self) -> ProgressHandle {
375 ProgressHandle::new(Arc::clone(&self.auto))
376 }
377
378 pub fn snapshot(&self, include_history: bool) -> serde_json::Value {
393 self.auto
394 .lock()
395 .map(|m| m.snapshot(include_history, &self.log_sink))
396 .unwrap_or(serde_json::Value::Null)
397 }
398
399 pub fn create_observer(&self) -> MetricsObserver {
400 MetricsObserver::new(Arc::clone(&self.auto), self.log_sink.clone())
401 }
402
403 pub fn log_sink_handle(&self) -> LogSink {
413 self.log_sink.clone()
414 }
415}
416
417impl Default for ExecutionMetrics {
418 fn default() -> Self {
419 Self::new()
420 }
421}
422
423impl serde::Serialize for ExecutionMetrics {
424 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
425 self.to_json().serialize(serializer)
426 }
427}
428
429pub struct MetricsObserver {
431 auto: Arc<Mutex<SessionStatus>>,
432 log_sink: LogSink,
433}
434
435impl MetricsObserver {
436 pub(crate) fn new(auto: Arc<Mutex<SessionStatus>>, log_sink: LogSink) -> Self {
437 Self { auto, log_sink }
438 }
439}
440
441impl ExecutionObserver for MetricsObserver {
442 fn on_paused(&self, queries: &[LlmQuery]) {
443 let now_ms = SystemTime::now()
446 .duration_since(UNIX_EPOCH)
447 .unwrap_or_default()
448 .as_millis() as i64;
449 if let Ok(mut m) = self.auto.lock() {
450 m.pauses += 1;
451 m.llm_calls += queries.len() as u64;
452 for q in queries {
453 m.total_prompt_chars += q.prompt.len() as u64;
454 let mut est = estimate_tokens(&q.prompt);
455 if let Some(ref sys) = q.system {
456 m.total_prompt_chars += sys.len() as u64;
457 est += estimate_tokens(sys);
458 }
459 m.transcript.push(TranscriptEntry {
460 query_id: q.id.as_str().to_string(),
461 prompt: q.prompt.clone(),
462 system: q.system.clone(),
463 response: None,
464 prompt_tokens: est,
465 prompt_source: TokenSource::Estimated,
466 response_tokens: 0,
467 response_source: TokenSource::Estimated,
468 started_at_ms: now_ms,
469 completed_at_ms: None,
470 });
471 }
472 }
473 }
474
475 fn on_response_fed(
476 &self,
477 query_id: &QueryId,
478 response: &str,
479 usage: Option<&crate::TokenUsage>,
480 ) {
481 let now_ms = SystemTime::now()
484 .duration_since(UNIX_EPOCH)
485 .unwrap_or_default()
486 .as_millis() as i64;
487 if let Ok(mut m) = self.auto.lock() {
488 m.total_response_chars += response.len() as u64;
489
490 if let Some(entry) = m
491 .transcript
492 .iter_mut()
493 .rev()
494 .find(|e| e.query_id == query_id.as_str())
495 {
496 entry.response = Some(response.to_string());
497 entry.completed_at_ms = Some(now_ms);
498
499 if let Some(pt) = usage.and_then(|u| u.prompt_tokens) {
501 entry.prompt_tokens = pt;
502 entry.prompt_source = TokenSource::Provided;
503 }
504
505 match usage.and_then(|u| u.completion_tokens) {
507 Some(ct) => {
508 entry.response_tokens = ct;
509 entry.response_source = TokenSource::Provided;
510 }
511 None => {
512 entry.response_tokens = estimate_tokens(response);
513 entry.response_source = TokenSource::Estimated;
514 }
515 }
516 }
517 }
518 }
519
520 fn on_log(&self, entry: &LogEntry) {
521 self.log_sink.push(entry.clone());
522 }
523
524 fn on_resumed(&self) {
525 if let Ok(mut m) = self.auto.lock() {
526 m.rounds += 1;
527 }
528 }
529
530 fn on_completed(&self, _result: &serde_json::Value) {
531 if let Ok(mut m) = self.auto.lock() {
532 m.ended_at = Some(Instant::now());
533 }
534 }
535
536 fn on_failed(&self, _error: &str) {
537 if let Ok(mut m) = self.auto.lock() {
538 m.ended_at = Some(Instant::now());
539 }
540 }
541
542 fn on_cancelled(&self) {
543 if let Ok(mut m) = self.auto.lock() {
544 m.ended_at = Some(Instant::now());
545 }
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552 use crate::{LlmQuery, QueryId};
553
554 #[test]
555 fn metrics_to_json_has_auto_and_custom() {
556 let metrics = ExecutionMetrics::new();
557 let json = metrics.to_json();
558 assert!(json.get("auto").is_some());
559 assert!(json.get("custom").is_some());
560 }
561
562 #[test]
563 fn custom_handle_shares_state() {
564 let metrics = ExecutionMetrics::new();
565 let handle = metrics.custom_metrics_handle();
566
567 handle.record("key".into(), serde_json::json!("value"));
568
569 let json = metrics.to_json();
570 let custom = json.get("custom").unwrap();
571 assert_eq!(custom.get("key").unwrap(), "value");
572 }
573
574 #[test]
575 fn observer_updates_auto_metrics() {
576 let metrics = ExecutionMetrics::new();
577 let observer = metrics.create_observer();
578
579 let queries = vec![LlmQuery {
580 id: QueryId::batch(0),
581 prompt: "test".into(),
582 system: None,
583 max_tokens: 100,
584 grounded: false,
585 underspecified: false,
586 }];
587
588 observer.on_paused(&queries);
589 observer.on_completed(&serde_json::json!(null));
590
591 let json = metrics.to_json();
592 let auto = json.get("auto").unwrap();
593 assert_eq!(auto.get("llm_calls").unwrap(), 1);
594 assert_eq!(auto.get("pauses").unwrap(), 1);
595 assert_eq!(auto.get("rounds").unwrap(), 0);
596 assert_eq!(auto.get("total_prompt_chars").unwrap(), 4); assert_eq!(auto.get("total_response_chars").unwrap(), 0);
598 }
599
600 #[test]
601 fn observer_tracks_prompt_and_response_chars() {
602 let metrics = ExecutionMetrics::new();
603 let observer = metrics.create_observer();
604
605 let queries = vec![
606 LlmQuery {
607 id: QueryId::batch(0),
608 prompt: "hello".into(), system: Some("sys".into()), max_tokens: 100,
611 grounded: false,
612 underspecified: false,
613 },
614 LlmQuery {
615 id: QueryId::batch(1),
616 prompt: "world".into(), system: None,
618 max_tokens: 100,
619 grounded: false,
620 underspecified: false,
621 },
622 ];
623
624 observer.on_paused(&queries);
625 observer.on_response_fed(&QueryId::batch(0), &"x".repeat(42), None);
626 observer.on_response_fed(&QueryId::batch(1), &"y".repeat(58), None);
627 observer.on_resumed();
628 observer.on_completed(&serde_json::json!(null));
629
630 let json = metrics.to_json();
631 let auto = json.get("auto").unwrap();
632 assert_eq!(auto.get("total_prompt_chars").unwrap(), 13); assert_eq!(auto.get("total_response_chars").unwrap(), 100); assert_eq!(auto.get("rounds").unwrap(), 1);
635 }
636
637 #[test]
638 fn observer_tracks_multiple_rounds() {
639 let metrics = ExecutionMetrics::new();
640 let observer = metrics.create_observer();
641
642 let q = vec![LlmQuery {
643 id: QueryId::single(),
644 prompt: "p".into(),
645 system: None,
646 max_tokens: 10,
647 grounded: false,
648 underspecified: false,
649 }];
650
651 observer.on_paused(&q);
653 observer.on_response_fed(&QueryId::single(), &"x".repeat(10), None);
654 observer.on_resumed();
655 observer.on_paused(&q);
657 observer.on_response_fed(&QueryId::single(), &"y".repeat(20), None);
658 observer.on_resumed();
659 observer.on_paused(&q);
661 observer.on_response_fed(&QueryId::single(), &"z".repeat(30), None);
662 observer.on_resumed();
663
664 observer.on_completed(&serde_json::json!(null));
665
666 let json = metrics.to_json();
667 let auto = json.get("auto").unwrap();
668 assert_eq!(auto.get("rounds").unwrap(), 3);
669 assert_eq!(auto.get("pauses").unwrap(), 3);
670 assert_eq!(auto.get("llm_calls").unwrap(), 3);
671 assert_eq!(auto.get("total_prompt_chars").unwrap(), 3); assert_eq!(auto.get("total_response_chars").unwrap(), 60); }
674
675 #[test]
676 fn transcript_records_prompt_response_pairs() {
677 let metrics = ExecutionMetrics::new();
678 let observer = metrics.create_observer();
679
680 let queries = vec![LlmQuery {
681 id: QueryId::single(),
682 prompt: "What is 2+2?".into(),
683 system: Some("You are a calculator.".into()),
684 max_tokens: 50,
685 grounded: false,
686 underspecified: false,
687 }];
688
689 observer.on_paused(&queries);
690 observer.on_response_fed(&QueryId::single(), "4", None);
691 observer.on_resumed();
692 observer.on_completed(&serde_json::json!(null));
693
694 let transcript = metrics.transcript_to_json();
695 assert_eq!(transcript.len(), 1);
696 assert_eq!(transcript[0]["query_id"], "q-0");
697 assert_eq!(transcript[0]["prompt"], "What is 2+2?");
698 assert_eq!(transcript[0]["system"], "You are a calculator.");
699 assert_eq!(transcript[0]["response"], "4");
700 }
701
702 #[test]
703 fn transcript_not_in_stats() {
704 let metrics = ExecutionMetrics::new();
705 let observer = metrics.create_observer();
706 observer.on_paused(&[LlmQuery {
707 id: QueryId::single(),
708 prompt: "p".into(),
709 system: None,
710 max_tokens: 10,
711 grounded: false,
712 underspecified: false,
713 }]);
714 observer.on_response_fed(&QueryId::single(), "r", None);
715 observer.on_resumed();
716 observer.on_completed(&serde_json::json!(null));
717
718 let json = metrics.to_json();
719 assert!(json["auto"].get("transcript").is_none());
720 }
721
722 #[test]
723 fn transcript_multi_round() {
724 let metrics = ExecutionMetrics::new();
725 let observer = metrics.create_observer();
726
727 observer.on_paused(&[LlmQuery {
729 id: QueryId::single(),
730 prompt: "step1".into(),
731 system: None,
732 max_tokens: 100,
733 grounded: false,
734 underspecified: false,
735 }]);
736 observer.on_response_fed(&QueryId::single(), "answer1", None);
737 observer.on_resumed();
738
739 observer.on_paused(&[LlmQuery {
741 id: QueryId::single(),
742 prompt: "step2".into(),
743 system: Some("expert".into()),
744 max_tokens: 100,
745 grounded: false,
746 underspecified: false,
747 }]);
748 observer.on_response_fed(&QueryId::single(), "answer2", None);
749 observer.on_resumed();
750
751 observer.on_completed(&serde_json::json!(null));
752
753 let transcript = metrics.transcript_to_json();
754 assert_eq!(transcript.len(), 2);
755
756 assert_eq!(transcript[0]["prompt"], "step1");
757 assert!(transcript[0]["system"].is_null());
758 assert_eq!(transcript[0]["response"], "answer1");
759
760 assert_eq!(transcript[1]["prompt"], "step2");
761 assert_eq!(transcript[1]["system"], "expert");
762 assert_eq!(transcript[1]["response"], "answer2");
763 }
764
765 #[test]
766 fn transcript_batch_queries() {
767 let metrics = ExecutionMetrics::new();
768 let observer = metrics.create_observer();
769
770 let queries = vec![
771 LlmQuery {
772 id: QueryId::batch(0),
773 prompt: "q0".into(),
774 system: None,
775 max_tokens: 50,
776 grounded: false,
777 underspecified: false,
778 },
779 LlmQuery {
780 id: QueryId::batch(1),
781 prompt: "q1".into(),
782 system: None,
783 max_tokens: 50,
784 grounded: false,
785 underspecified: false,
786 },
787 ];
788
789 observer.on_paused(&queries);
790 observer.on_response_fed(&QueryId::batch(0), "r0", None);
791 observer.on_response_fed(&QueryId::batch(1), "r1", None);
792 observer.on_resumed();
793 observer.on_completed(&serde_json::json!(null));
794
795 let transcript = metrics.transcript_to_json();
796 assert_eq!(transcript.len(), 2);
797 assert_eq!(transcript[0]["query_id"], "q-0");
798 assert_eq!(transcript[0]["response"], "r0");
799 assert_eq!(transcript[1]["query_id"], "q-1");
800 assert_eq!(transcript[1]["response"], "r1");
801 }
802
803 #[test]
807 fn on_log_routes_to_log_sink() {
808 let metrics = ExecutionMetrics::new();
809 let observer = metrics.create_observer();
810
811 observer.on_log(&crate::LogEntry::new("info", "engine", "hello"));
812 observer.on_log(&crate::LogEntry::new("warn", "alc.log", "world"));
813
814 let sink = metrics.log_sink_handle();
815 let entries = sink.entries();
816 assert_eq!(entries.len(), 2);
817 assert_eq!(entries[0].level, "info");
818 assert_eq!(entries[0].source, "engine");
819 assert_eq!(entries[0].message, "hello");
820 assert_eq!(entries[1].level, "warn");
821 assert_eq!(entries[1].message, "world");
822 }
823
824 #[test]
826 fn on_log_cap_enforcement_via_observer() {
827 let metrics = ExecutionMetrics::new();
828 let observer = metrics.create_observer();
829
830 for i in 0..=20u32 {
831 observer.on_log(&crate::LogEntry::new("info", "engine", format!("msg-{i}")));
832 }
833
834 let sink = metrics.log_sink_handle();
835 let entries = sink.entries();
836 assert_eq!(entries.len(), crate::recent_log::LOG_SINK_CAP);
837 assert_eq!(entries[0].message, "msg-1");
838 assert_eq!(
839 entries[crate::recent_log::LOG_SINK_CAP - 1].message,
840 "msg-20"
841 );
842 }
843
844 #[test]
847 fn transcript_timestamps_recorded() {
848 let metrics = ExecutionMetrics::new();
849 let observer = metrics.create_observer();
850
851 let before = std::time::SystemTime::now()
852 .duration_since(std::time::UNIX_EPOCH)
853 .unwrap_or_default()
854 .as_millis() as i64;
855
856 observer.on_paused(&[LlmQuery {
857 id: QueryId::single(),
858 prompt: "ts-test".into(),
859 system: None,
860 max_tokens: 10,
861 grounded: false,
862 underspecified: false,
863 }]);
864
865 observer.on_response_fed(&QueryId::single(), "response", None);
866
867 let after_fed = std::time::SystemTime::now()
868 .duration_since(std::time::UNIX_EPOCH)
869 .unwrap_or_default()
870 .as_millis() as i64;
871
872 let snap = metrics.snapshot(true);
874 let history = snap["conversation_history"]
875 .as_array()
876 .expect("conversation_history must be array");
877 assert_eq!(history.len(), 1);
878
879 let started_at = history[0]["started_at"]
880 .as_i64()
881 .expect("started_at must be i64");
882 let completed_at = history[0]["completed_at"]
883 .as_i64()
884 .expect("completed_at must be i64 (not null)");
885
886 assert!(
887 started_at >= before,
888 "started_at ({started_at}) should be >= before ({before})"
889 );
890 assert!(
891 completed_at >= started_at,
892 "completed_at ({completed_at}) should be >= started_at ({started_at})"
893 );
894 assert!(
895 completed_at <= after_fed,
896 "completed_at ({completed_at}) should be <= after_fed ({after_fed})"
897 );
898 }
899
900 #[test]
902 fn snapshot_current_query_while_paused() {
903 let metrics = ExecutionMetrics::new();
904 let observer = metrics.create_observer();
905
906 observer.on_paused(&[LlmQuery {
907 id: QueryId::single(),
908 prompt: "in-flight".into(),
909 system: None,
910 max_tokens: 10,
911 grounded: false,
912 underspecified: false,
913 }]);
914
915 let snap = metrics.snapshot(false);
917
918 let tokens = snap.get("tokens").expect("tokens field must be present");
919 let current_query = tokens
920 .get("current_query")
921 .expect("current_query must be present");
922 assert!(
923 !current_query.is_null(),
924 "current_query should be non-null while paused"
925 );
926 assert_eq!(current_query["query_id"], "q-0");
927 assert!(
929 snap.get("conversation_history").is_none(),
930 "conversation_history must be absent when include_history=false"
931 );
932 }
933
934 #[test]
936 fn snapshot_current_query_null_after_response() {
937 let metrics = ExecutionMetrics::new();
938 let observer = metrics.create_observer();
939
940 observer.on_paused(&[LlmQuery {
941 id: QueryId::single(),
942 prompt: "done".into(),
943 system: None,
944 max_tokens: 10,
945 grounded: false,
946 underspecified: false,
947 }]);
948 observer.on_response_fed(&QueryId::single(), "answer", None);
949
950 let snap = metrics.snapshot(false);
951 let tokens = snap.get("tokens").expect("tokens must be present");
952 let current_query = &tokens["current_query"];
953 assert!(
954 current_query.is_null(),
955 "current_query should be null after response is fed"
956 );
957 }
958
959 #[test]
961 fn snapshot_conversation_history_opt_in() {
962 let metrics = ExecutionMetrics::new();
963 let observer = metrics.create_observer();
964
965 observer.on_paused(&[LlmQuery {
966 id: QueryId::single(),
967 prompt: "hello".into(),
968 system: None,
969 max_tokens: 50,
970 grounded: false,
971 underspecified: false,
972 }]);
973 observer.on_response_fed(&QueryId::single(), "world", None);
974 observer.on_resumed();
975 observer.on_completed(&serde_json::json!(null));
976
977 let snap_false = metrics.snapshot(false);
979 assert!(
980 snap_false.get("conversation_history").is_none(),
981 "conversation_history must be absent with include_history=false"
982 );
983
984 let snap_true = metrics.snapshot(true);
986 let history = snap_true
987 .get("conversation_history")
988 .expect("conversation_history must be present with include_history=true");
989 let arr = history
990 .as_array()
991 .expect("conversation_history must be an array");
992 assert_eq!(arr.len(), 1);
993 assert_eq!(arr[0]["query_id"], "q-0");
994 assert_eq!(arr[0]["prompt"], "hello");
995 assert_eq!(arr[0]["response"], "world");
996 assert!(arr[0].get("started_at").is_some());
998 assert!(arr[0].get("completed_at").is_some());
999 }
1000
1001 #[test]
1003 fn snapshot_conversation_history_capped_at_10() {
1004 let metrics = ExecutionMetrics::new();
1005 let observer = metrics.create_observer();
1006
1007 for i in 0..15u32 {
1008 observer.on_paused(&[LlmQuery {
1009 id: QueryId::single(),
1010 prompt: format!("prompt-{i}"),
1011 system: None,
1012 max_tokens: 10,
1013 grounded: false,
1014 underspecified: false,
1015 }]);
1016 observer.on_response_fed(&QueryId::single(), &format!("resp-{i}"), None);
1017 observer.on_resumed();
1018 }
1019
1020 let snap = metrics.snapshot(true);
1021 let history = snap["conversation_history"]
1022 .as_array()
1023 .expect("must be array");
1024 assert_eq!(history.len(), 10, "capped at 10 entries");
1025 assert_eq!(history[0]["prompt"], "prompt-5");
1027 assert_eq!(history[9]["prompt"], "prompt-14");
1028 }
1029
1030 #[test]
1032 fn snapshot_includes_recent_logs() {
1033 let metrics = ExecutionMetrics::new();
1034 let observer = metrics.create_observer();
1035 observer.on_log(&crate::LogEntry::new("info", "engine", "test-log"));
1036
1037 let snap = metrics.snapshot(false);
1038 let logs = snap
1039 .get("recent_logs")
1040 .expect("recent_logs must be in snapshot");
1041 let arr = logs.as_array().expect("recent_logs must be array");
1042 assert_eq!(arr.len(), 1);
1043 assert_eq!(arr[0]["message"], "test-log");
1044 }
1045
1046 #[test]
1048 fn snapshot_tokens_aggregate() {
1049 let metrics = ExecutionMetrics::new();
1050 let observer = metrics.create_observer();
1051
1052 observer.on_paused(&[LlmQuery {
1053 id: QueryId::single(),
1054 prompt: "x".repeat(100),
1055 system: None,
1056 max_tokens: 50,
1057 grounded: false,
1058 underspecified: false,
1059 }]);
1060 observer.on_response_fed(&QueryId::single(), &"y".repeat(50), None);
1061 observer.on_resumed();
1062
1063 let snap = metrics.snapshot(false);
1064 let tokens = snap.get("tokens").expect("tokens must be in snapshot");
1065 let prompt_total = tokens["prompt_total"]
1066 .as_u64()
1067 .expect("prompt_total must be u64");
1068 let response_total = tokens["response_total"]
1069 .as_u64()
1070 .expect("response_total must be u64");
1071 let total = tokens["total"].as_u64().expect("total must be u64");
1072 assert!(prompt_total > 0, "prompt_total must be positive");
1074 assert!(response_total > 0, "response_total must be positive");
1075 assert_eq!(total, prompt_total + response_total);
1076 }
1077}