1use std::sync::{Arc, Mutex};
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use crate::budget::Budget;
5use crate::observer::ExecutionObserver;
6use crate::progress::ProgressInfo;
7use crate::recent_log::{LogEntry, LogSink};
8use crate::tokens::{estimate_tokens, TokenCount, TokenSource};
9use crate::{BudgetHandle, CustomMetrics, CustomMetricsHandle, LlmQuery, ProgressHandle, QueryId};
10
11struct TranscriptEntry {
20 query_id: String,
21 prompt: String,
22 system: Option<String>,
23 response: Option<String>,
24 prompt_tokens: u64,
26 prompt_source: TokenSource,
27 response_tokens: u64,
30 response_source: TokenSource,
31 started_at_ms: i64,
33 completed_at_ms: Option<i64>,
36}
37
38impl TranscriptEntry {
39 fn to_json(&self) -> serde_json::Value {
40 serde_json::json!({
41 "query_id": self.query_id,
42 "prompt": self.prompt,
43 "system": self.system,
44 "response": self.response,
45 })
46 }
47
48 fn to_history_json(&self) -> serde_json::Value {
57 serde_json::json!({
58 "query_id": self.query_id,
59 "prompt": self.prompt,
60 "response": self.response,
61 "prompt_tokens": self.prompt_tokens,
62 "response_tokens": self.response_tokens,
63 "started_at": self.started_at_ms,
64 "completed_at": self.completed_at_ms,
65 })
66 }
67}
68
69pub(crate) struct SessionStatus {
123 started_at: Instant,
124 ended_at: Option<Instant>,
125 pub(crate) llm_calls: u64,
126 pauses: u64,
127 rounds: u64,
128 total_prompt_chars: u64,
129 total_response_chars: u64,
130 transcript: Vec<TranscriptEntry>,
131 pub(crate) budget: Option<Budget>,
132 pub(crate) progress: Option<ProgressInfo>,
133}
134
135impl SessionStatus {
136 fn new() -> Self {
137 Self {
138 started_at: Instant::now(),
139 ended_at: None,
140 llm_calls: 0,
141 pauses: 0,
142 rounds: 0,
143 total_prompt_chars: 0,
144 total_response_chars: 0,
145 transcript: Vec::new(),
146 budget: None,
147 progress: None,
148 }
149 }
150
151 fn prompt_token_count(&self) -> TokenCount {
153 let mut tc = TokenCount::new(TokenSource::Definite);
154 for e in &self.transcript {
155 tc.accumulate(e.prompt_tokens, e.prompt_source);
156 }
157 tc
158 }
159
160 fn response_token_count(&self) -> TokenCount {
162 let mut tc = TokenCount::new(TokenSource::Definite);
163 for e in &self.transcript {
164 tc.accumulate(e.response_tokens, e.response_source);
165 }
166 tc
167 }
168
169 fn total_tokens(&self) -> u64 {
171 self.transcript
172 .iter()
173 .map(|e| e.prompt_tokens + e.response_tokens)
174 .sum()
175 }
176
177 fn elapsed_ms(&self) -> u64 {
179 self.ended_at
180 .map(|end| end.duration_since(self.started_at).as_millis() as u64)
181 .unwrap_or_else(|| self.started_at.elapsed().as_millis() as u64)
182 }
183
184 fn to_json(&self) -> serde_json::Value {
185 let prompt_tc = self.prompt_token_count();
186 let response_tc = self.response_token_count();
187 let total_tc = TokenCount {
188 tokens: prompt_tc.tokens + response_tc.tokens,
189 source: prompt_tc.source.weaker(response_tc.source),
190 };
191 let mut json = serde_json::json!({
192 "elapsed_ms": self.elapsed_ms(),
193 "llm_calls": self.llm_calls,
194 "pauses": self.pauses,
195 "rounds": self.rounds,
196 "total_prompt_chars": self.total_prompt_chars,
197 "total_response_chars": self.total_response_chars,
198 "prompt_tokens": prompt_tc.to_json(),
199 "response_tokens": response_tc.to_json(),
200 "total_tokens": total_tc.to_json(),
201 });
202 if let Some(ref b) = self.budget {
203 json["budget"] = b.to_json();
204 }
205 json
206 }
207
208 pub(crate) fn check_budget(&self) -> Result<(), String> {
209 match self.budget {
210 Some(ref b) => b.check(self.llm_calls, self.elapsed_ms(), self.total_tokens()),
211 None => Ok(()),
212 }
213 }
214
215 fn snapshot(&self, include_history: bool, log_sink: &LogSink) -> serde_json::Value {
235 let prompt_tc = self.prompt_token_count();
237 let response_tc = self.response_token_count();
238 let total_tokens = prompt_tc.tokens + response_tc.tokens;
239
240 let current_query = self.transcript.last().and_then(|e| {
243 if e.response.is_none() {
244 Some(serde_json::json!({
245 "query_id": e.query_id,
246 "prompt_tokens": e.prompt_tokens,
247 "started_waiting_at": e.started_at_ms,
248 }))
249 } else {
250 None
251 }
252 });
253
254 let mut json = serde_json::json!({
255 "elapsed_ms": self.elapsed_ms(),
256 "llm_calls": self.llm_calls,
257 "rounds": self.rounds,
258 "tokens": {
259 "prompt_total": prompt_tc.tokens,
260 "response_total": response_tc.tokens,
261 "total": total_tokens,
262 "current_query": current_query,
263 },
264 "recent_logs": log_sink.to_json(),
265 });
266
267 if let Some(ref p) = self.progress {
268 json["progress"] = serde_json::json!({
269 "step": p.step,
270 "total": p.total,
271 "message": p.message,
272 });
273 }
274
275 if let Some(ref b) = self.budget {
276 json["budget_remaining"] =
277 b.remaining_json(self.llm_calls, self.elapsed_ms(), self.total_tokens());
278 }
279
280 if include_history {
281 let start = self.transcript.len().saturating_sub(10);
283 let history: Vec<serde_json::Value> = self.transcript[start..]
284 .iter()
285 .map(|e| e.to_history_json())
286 .collect();
287 json["conversation_history"] = serde_json::Value::Array(history);
288 }
289
290 json
291 }
292
293 pub(crate) fn budget_remaining(&self) -> serde_json::Value {
294 match self.budget {
295 None => serde_json::Value::Null,
296 Some(ref b) => b.remaining_json(self.llm_calls, self.elapsed_ms(), self.total_tokens()),
297 }
298 }
299}
300
301pub struct ExecutionMetrics {
314 auto: Arc<Mutex<SessionStatus>>,
315 custom: Arc<Mutex<CustomMetrics>>,
316 log_sink: LogSink,
317}
318
319impl ExecutionMetrics {
320 pub fn new() -> Self {
321 Self {
322 auto: Arc::new(Mutex::new(SessionStatus::new())),
323 custom: Arc::new(Mutex::new(CustomMetrics::new())),
324 log_sink: LogSink::new(),
325 }
326 }
327
328 pub fn to_json(&self) -> serde_json::Value {
330 let auto_json = self
331 .auto
332 .lock()
333 .map(|m| m.to_json())
334 .unwrap_or(serde_json::Value::Null);
335
336 let custom_json = self
337 .custom
338 .lock()
339 .map(|m| m.to_json())
340 .unwrap_or(serde_json::Value::Null);
341
342 serde_json::json!({
343 "auto": auto_json,
344 "custom": custom_json,
345 })
346 }
347
348 pub fn transcript_to_json(&self) -> Vec<serde_json::Value> {
350 self.auto
351 .lock()
352 .map(|m| m.transcript.iter().map(|e| e.to_json()).collect())
353 .unwrap_or_default()
354 }
355
356 pub fn custom_metrics_handle(&self) -> CustomMetricsHandle {
358 CustomMetricsHandle::new(Arc::clone(&self.custom))
359 }
360
361 pub fn set_budget(&self, budget: Budget) {
363 if let Ok(mut m) = self.auto.lock() {
364 m.budget = Some(budget);
365 }
366 }
367
368 pub fn budget_handle(&self) -> BudgetHandle {
370 BudgetHandle::new(Arc::clone(&self.auto))
371 }
372
373 pub fn progress_handle(&self) -> ProgressHandle {
375 ProgressHandle::new(Arc::clone(&self.auto))
376 }
377
378 pub fn snapshot(&self, include_history: bool) -> serde_json::Value {
393 self.auto
394 .lock()
395 .map(|m| m.snapshot(include_history, &self.log_sink))
396 .unwrap_or(serde_json::Value::Null)
397 }
398
399 pub fn create_observer(&self) -> MetricsObserver {
400 MetricsObserver::new(Arc::clone(&self.auto), self.log_sink.clone())
401 }
402
403 pub fn log_sink_handle(&self) -> LogSink {
413 self.log_sink.clone()
414 }
415
416 pub fn stats_handle(&self) -> StatsHandle {
419 StatsHandle::new(Arc::clone(&self.auto))
420 }
421
422 pub fn usage_aggregate(&self) -> Option<crate::TokenUsage> {
432 let m = self.auto.lock().ok()?;
433 if m.llm_calls == 0 {
434 return None;
435 }
436 Some(crate::TokenUsage {
437 prompt_tokens: Some(m.prompt_token_count().tokens),
438 completion_tokens: Some(m.response_token_count().tokens),
439 })
440 }
441}
442
443#[derive(Clone)]
456pub struct StatsHandle {
457 auto: Arc<Mutex<SessionStatus>>,
458}
459
460impl StatsHandle {
461 pub(crate) fn new(auto: Arc<Mutex<SessionStatus>>) -> Self {
462 Self { auto }
463 }
464
465 pub fn llm_calls(&self) -> u64 {
473 self.auto.lock().map(|m| m.llm_calls).unwrap_or(0)
474 }
475}
476
477impl Default for ExecutionMetrics {
478 fn default() -> Self {
479 Self::new()
480 }
481}
482
483impl serde::Serialize for ExecutionMetrics {
484 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
485 self.to_json().serialize(serializer)
486 }
487}
488
489pub struct MetricsObserver {
491 auto: Arc<Mutex<SessionStatus>>,
492 log_sink: LogSink,
493}
494
495impl MetricsObserver {
496 pub(crate) fn new(auto: Arc<Mutex<SessionStatus>>, log_sink: LogSink) -> Self {
497 Self { auto, log_sink }
498 }
499}
500
501impl ExecutionObserver for MetricsObserver {
502 fn on_paused(&self, queries: &[LlmQuery]) {
503 let now_ms = SystemTime::now()
506 .duration_since(UNIX_EPOCH)
507 .unwrap_or_default()
508 .as_millis() as i64;
509 if let Ok(mut m) = self.auto.lock() {
510 m.pauses += 1;
511 m.llm_calls += queries.len() as u64;
512 for q in queries {
513 m.total_prompt_chars += q.prompt.len() as u64;
514 let mut est = estimate_tokens(&q.prompt);
515 if let Some(ref sys) = q.system {
516 m.total_prompt_chars += sys.len() as u64;
517 est += estimate_tokens(sys);
518 }
519 m.transcript.push(TranscriptEntry {
520 query_id: q.id.as_str().to_string(),
521 prompt: q.prompt.clone(),
522 system: q.system.clone(),
523 response: None,
524 prompt_tokens: est,
525 prompt_source: TokenSource::Estimated,
526 response_tokens: 0,
527 response_source: TokenSource::Estimated,
528 started_at_ms: now_ms,
529 completed_at_ms: None,
530 });
531 }
532 }
533 }
534
535 fn on_response_fed(
536 &self,
537 query_id: &QueryId,
538 response: &str,
539 usage: Option<&crate::TokenUsage>,
540 ) {
541 let now_ms = SystemTime::now()
544 .duration_since(UNIX_EPOCH)
545 .unwrap_or_default()
546 .as_millis() as i64;
547 if let Ok(mut m) = self.auto.lock() {
548 m.total_response_chars += response.len() as u64;
549
550 if let Some(entry) = m
551 .transcript
552 .iter_mut()
553 .rev()
554 .find(|e| e.query_id == query_id.as_str())
555 {
556 entry.response = Some(response.to_string());
557 entry.completed_at_ms = Some(now_ms);
558
559 if let Some(pt) = usage.and_then(|u| u.prompt_tokens) {
561 entry.prompt_tokens = pt;
562 entry.prompt_source = TokenSource::Provided;
563 }
564
565 match usage.and_then(|u| u.completion_tokens) {
567 Some(ct) => {
568 entry.response_tokens = ct;
569 entry.response_source = TokenSource::Provided;
570 }
571 None => {
572 entry.response_tokens = estimate_tokens(response);
573 entry.response_source = TokenSource::Estimated;
574 }
575 }
576 }
577 }
578 }
579
580 fn on_log(&self, entry: &LogEntry) {
581 self.log_sink.push(entry.clone());
582 }
583
584 fn on_resumed(&self) {
585 if let Ok(mut m) = self.auto.lock() {
586 m.rounds += 1;
587 }
588 }
589
590 fn on_completed(&self, _result: &serde_json::Value) {
591 if let Ok(mut m) = self.auto.lock() {
592 m.ended_at = Some(Instant::now());
593 }
594 }
595
596 fn on_failed(&self, _error: &str) {
597 if let Ok(mut m) = self.auto.lock() {
598 m.ended_at = Some(Instant::now());
599 }
600 }
601
602 fn on_cancelled(&self) {
603 if let Ok(mut m) = self.auto.lock() {
604 m.ended_at = Some(Instant::now());
605 }
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612 use crate::{LlmQuery, QueryId};
613
614 #[test]
615 fn metrics_to_json_has_auto_and_custom() {
616 let metrics = ExecutionMetrics::new();
617 let json = metrics.to_json();
618 assert!(json.get("auto").is_some());
619 assert!(json.get("custom").is_some());
620 }
621
622 #[test]
623 fn custom_handle_shares_state() {
624 let metrics = ExecutionMetrics::new();
625 let handle = metrics.custom_metrics_handle();
626
627 handle.record("key".into(), serde_json::json!("value"));
628
629 let json = metrics.to_json();
630 let custom = json.get("custom").unwrap();
631 assert_eq!(custom.get("key").unwrap(), "value");
632 }
633
634 #[test]
635 fn observer_updates_auto_metrics() {
636 let metrics = ExecutionMetrics::new();
637 let observer = metrics.create_observer();
638
639 let queries = vec![LlmQuery {
640 id: QueryId::batch(0),
641 prompt: "test".into(),
642 system: None,
643 max_tokens: 100,
644 grounded: false,
645 underspecified: false,
646 }];
647
648 observer.on_paused(&queries);
649 observer.on_completed(&serde_json::json!(null));
650
651 let json = metrics.to_json();
652 let auto = json.get("auto").unwrap();
653 assert_eq!(auto.get("llm_calls").unwrap(), 1);
654 assert_eq!(auto.get("pauses").unwrap(), 1);
655 assert_eq!(auto.get("rounds").unwrap(), 0);
656 assert_eq!(auto.get("total_prompt_chars").unwrap(), 4); assert_eq!(auto.get("total_response_chars").unwrap(), 0);
658 }
659
660 #[test]
661 fn observer_tracks_prompt_and_response_chars() {
662 let metrics = ExecutionMetrics::new();
663 let observer = metrics.create_observer();
664
665 let queries = vec![
666 LlmQuery {
667 id: QueryId::batch(0),
668 prompt: "hello".into(), system: Some("sys".into()), max_tokens: 100,
671 grounded: false,
672 underspecified: false,
673 },
674 LlmQuery {
675 id: QueryId::batch(1),
676 prompt: "world".into(), system: None,
678 max_tokens: 100,
679 grounded: false,
680 underspecified: false,
681 },
682 ];
683
684 observer.on_paused(&queries);
685 observer.on_response_fed(&QueryId::batch(0), &"x".repeat(42), None);
686 observer.on_response_fed(&QueryId::batch(1), &"y".repeat(58), None);
687 observer.on_resumed();
688 observer.on_completed(&serde_json::json!(null));
689
690 let json = metrics.to_json();
691 let auto = json.get("auto").unwrap();
692 assert_eq!(auto.get("total_prompt_chars").unwrap(), 13); assert_eq!(auto.get("total_response_chars").unwrap(), 100); assert_eq!(auto.get("rounds").unwrap(), 1);
695 }
696
697 #[test]
698 fn observer_tracks_multiple_rounds() {
699 let metrics = ExecutionMetrics::new();
700 let observer = metrics.create_observer();
701
702 let q = vec![LlmQuery {
703 id: QueryId::single(),
704 prompt: "p".into(),
705 system: None,
706 max_tokens: 10,
707 grounded: false,
708 underspecified: false,
709 }];
710
711 observer.on_paused(&q);
713 observer.on_response_fed(&QueryId::single(), &"x".repeat(10), None);
714 observer.on_resumed();
715 observer.on_paused(&q);
717 observer.on_response_fed(&QueryId::single(), &"y".repeat(20), None);
718 observer.on_resumed();
719 observer.on_paused(&q);
721 observer.on_response_fed(&QueryId::single(), &"z".repeat(30), None);
722 observer.on_resumed();
723
724 observer.on_completed(&serde_json::json!(null));
725
726 let json = metrics.to_json();
727 let auto = json.get("auto").unwrap();
728 assert_eq!(auto.get("rounds").unwrap(), 3);
729 assert_eq!(auto.get("pauses").unwrap(), 3);
730 assert_eq!(auto.get("llm_calls").unwrap(), 3);
731 assert_eq!(auto.get("total_prompt_chars").unwrap(), 3); assert_eq!(auto.get("total_response_chars").unwrap(), 60); }
734
735 #[test]
736 fn transcript_records_prompt_response_pairs() {
737 let metrics = ExecutionMetrics::new();
738 let observer = metrics.create_observer();
739
740 let queries = vec![LlmQuery {
741 id: QueryId::single(),
742 prompt: "What is 2+2?".into(),
743 system: Some("You are a calculator.".into()),
744 max_tokens: 50,
745 grounded: false,
746 underspecified: false,
747 }];
748
749 observer.on_paused(&queries);
750 observer.on_response_fed(&QueryId::single(), "4", None);
751 observer.on_resumed();
752 observer.on_completed(&serde_json::json!(null));
753
754 let transcript = metrics.transcript_to_json();
755 assert_eq!(transcript.len(), 1);
756 assert_eq!(transcript[0]["query_id"], "q-0");
757 assert_eq!(transcript[0]["prompt"], "What is 2+2?");
758 assert_eq!(transcript[0]["system"], "You are a calculator.");
759 assert_eq!(transcript[0]["response"], "4");
760 }
761
762 #[test]
763 fn transcript_not_in_stats() {
764 let metrics = ExecutionMetrics::new();
765 let observer = metrics.create_observer();
766 observer.on_paused(&[LlmQuery {
767 id: QueryId::single(),
768 prompt: "p".into(),
769 system: None,
770 max_tokens: 10,
771 grounded: false,
772 underspecified: false,
773 }]);
774 observer.on_response_fed(&QueryId::single(), "r", None);
775 observer.on_resumed();
776 observer.on_completed(&serde_json::json!(null));
777
778 let json = metrics.to_json();
779 assert!(json["auto"].get("transcript").is_none());
780 }
781
782 #[test]
783 fn transcript_multi_round() {
784 let metrics = ExecutionMetrics::new();
785 let observer = metrics.create_observer();
786
787 observer.on_paused(&[LlmQuery {
789 id: QueryId::single(),
790 prompt: "step1".into(),
791 system: None,
792 max_tokens: 100,
793 grounded: false,
794 underspecified: false,
795 }]);
796 observer.on_response_fed(&QueryId::single(), "answer1", None);
797 observer.on_resumed();
798
799 observer.on_paused(&[LlmQuery {
801 id: QueryId::single(),
802 prompt: "step2".into(),
803 system: Some("expert".into()),
804 max_tokens: 100,
805 grounded: false,
806 underspecified: false,
807 }]);
808 observer.on_response_fed(&QueryId::single(), "answer2", None);
809 observer.on_resumed();
810
811 observer.on_completed(&serde_json::json!(null));
812
813 let transcript = metrics.transcript_to_json();
814 assert_eq!(transcript.len(), 2);
815
816 assert_eq!(transcript[0]["prompt"], "step1");
817 assert!(transcript[0]["system"].is_null());
818 assert_eq!(transcript[0]["response"], "answer1");
819
820 assert_eq!(transcript[1]["prompt"], "step2");
821 assert_eq!(transcript[1]["system"], "expert");
822 assert_eq!(transcript[1]["response"], "answer2");
823 }
824
825 #[test]
826 fn transcript_batch_queries() {
827 let metrics = ExecutionMetrics::new();
828 let observer = metrics.create_observer();
829
830 let queries = vec![
831 LlmQuery {
832 id: QueryId::batch(0),
833 prompt: "q0".into(),
834 system: None,
835 max_tokens: 50,
836 grounded: false,
837 underspecified: false,
838 },
839 LlmQuery {
840 id: QueryId::batch(1),
841 prompt: "q1".into(),
842 system: None,
843 max_tokens: 50,
844 grounded: false,
845 underspecified: false,
846 },
847 ];
848
849 observer.on_paused(&queries);
850 observer.on_response_fed(&QueryId::batch(0), "r0", None);
851 observer.on_response_fed(&QueryId::batch(1), "r1", None);
852 observer.on_resumed();
853 observer.on_completed(&serde_json::json!(null));
854
855 let transcript = metrics.transcript_to_json();
856 assert_eq!(transcript.len(), 2);
857 assert_eq!(transcript[0]["query_id"], "q-0");
858 assert_eq!(transcript[0]["response"], "r0");
859 assert_eq!(transcript[1]["query_id"], "q-1");
860 assert_eq!(transcript[1]["response"], "r1");
861 }
862
863 #[test]
867 fn on_log_routes_to_log_sink() {
868 let metrics = ExecutionMetrics::new();
869 let observer = metrics.create_observer();
870
871 observer.on_log(&crate::LogEntry::new("info", "engine", "hello"));
872 observer.on_log(&crate::LogEntry::new("warn", "alc.log", "world"));
873
874 let sink = metrics.log_sink_handle();
875 let entries = sink.entries();
876 assert_eq!(entries.len(), 2);
877 assert_eq!(entries[0].level, "info");
878 assert_eq!(entries[0].source, "engine");
879 assert_eq!(entries[0].message, "hello");
880 assert_eq!(entries[1].level, "warn");
881 assert_eq!(entries[1].message, "world");
882 }
883
884 #[test]
886 fn on_log_cap_enforcement_via_observer() {
887 let metrics = ExecutionMetrics::new();
888 let observer = metrics.create_observer();
889
890 for i in 0..=20u32 {
891 observer.on_log(&crate::LogEntry::new("info", "engine", format!("msg-{i}")));
892 }
893
894 let sink = metrics.log_sink_handle();
895 let entries = sink.entries();
896 assert_eq!(entries.len(), crate::recent_log::LOG_SINK_CAP);
897 assert_eq!(entries[0].message, "msg-1");
898 assert_eq!(
899 entries[crate::recent_log::LOG_SINK_CAP - 1].message,
900 "msg-20"
901 );
902 }
903
904 #[test]
907 fn transcript_timestamps_recorded() {
908 let metrics = ExecutionMetrics::new();
909 let observer = metrics.create_observer();
910
911 let before = std::time::SystemTime::now()
912 .duration_since(std::time::UNIX_EPOCH)
913 .unwrap_or_default()
914 .as_millis() as i64;
915
916 observer.on_paused(&[LlmQuery {
917 id: QueryId::single(),
918 prompt: "ts-test".into(),
919 system: None,
920 max_tokens: 10,
921 grounded: false,
922 underspecified: false,
923 }]);
924
925 observer.on_response_fed(&QueryId::single(), "response", None);
926
927 let after_fed = std::time::SystemTime::now()
928 .duration_since(std::time::UNIX_EPOCH)
929 .unwrap_or_default()
930 .as_millis() as i64;
931
932 let snap = metrics.snapshot(true);
934 let history = snap["conversation_history"]
935 .as_array()
936 .expect("conversation_history must be array");
937 assert_eq!(history.len(), 1);
938
939 let started_at = history[0]["started_at"]
940 .as_i64()
941 .expect("started_at must be i64");
942 let completed_at = history[0]["completed_at"]
943 .as_i64()
944 .expect("completed_at must be i64 (not null)");
945
946 assert!(
947 started_at >= before,
948 "started_at ({started_at}) should be >= before ({before})"
949 );
950 assert!(
951 completed_at >= started_at,
952 "completed_at ({completed_at}) should be >= started_at ({started_at})"
953 );
954 assert!(
955 completed_at <= after_fed,
956 "completed_at ({completed_at}) should be <= after_fed ({after_fed})"
957 );
958 }
959
960 #[test]
962 fn snapshot_current_query_while_paused() {
963 let metrics = ExecutionMetrics::new();
964 let observer = metrics.create_observer();
965
966 observer.on_paused(&[LlmQuery {
967 id: QueryId::single(),
968 prompt: "in-flight".into(),
969 system: None,
970 max_tokens: 10,
971 grounded: false,
972 underspecified: false,
973 }]);
974
975 let snap = metrics.snapshot(false);
977
978 let tokens = snap.get("tokens").expect("tokens field must be present");
979 let current_query = tokens
980 .get("current_query")
981 .expect("current_query must be present");
982 assert!(
983 !current_query.is_null(),
984 "current_query should be non-null while paused"
985 );
986 assert_eq!(current_query["query_id"], "q-0");
987 assert!(
989 snap.get("conversation_history").is_none(),
990 "conversation_history must be absent when include_history=false"
991 );
992 }
993
994 #[test]
996 fn snapshot_current_query_null_after_response() {
997 let metrics = ExecutionMetrics::new();
998 let observer = metrics.create_observer();
999
1000 observer.on_paused(&[LlmQuery {
1001 id: QueryId::single(),
1002 prompt: "done".into(),
1003 system: None,
1004 max_tokens: 10,
1005 grounded: false,
1006 underspecified: false,
1007 }]);
1008 observer.on_response_fed(&QueryId::single(), "answer", None);
1009
1010 let snap = metrics.snapshot(false);
1011 let tokens = snap.get("tokens").expect("tokens must be present");
1012 let current_query = &tokens["current_query"];
1013 assert!(
1014 current_query.is_null(),
1015 "current_query should be null after response is fed"
1016 );
1017 }
1018
1019 #[test]
1021 fn snapshot_conversation_history_opt_in() {
1022 let metrics = ExecutionMetrics::new();
1023 let observer = metrics.create_observer();
1024
1025 observer.on_paused(&[LlmQuery {
1026 id: QueryId::single(),
1027 prompt: "hello".into(),
1028 system: None,
1029 max_tokens: 50,
1030 grounded: false,
1031 underspecified: false,
1032 }]);
1033 observer.on_response_fed(&QueryId::single(), "world", None);
1034 observer.on_resumed();
1035 observer.on_completed(&serde_json::json!(null));
1036
1037 let snap_false = metrics.snapshot(false);
1039 assert!(
1040 snap_false.get("conversation_history").is_none(),
1041 "conversation_history must be absent with include_history=false"
1042 );
1043
1044 let snap_true = metrics.snapshot(true);
1046 let history = snap_true
1047 .get("conversation_history")
1048 .expect("conversation_history must be present with include_history=true");
1049 let arr = history
1050 .as_array()
1051 .expect("conversation_history must be an array");
1052 assert_eq!(arr.len(), 1);
1053 assert_eq!(arr[0]["query_id"], "q-0");
1054 assert_eq!(arr[0]["prompt"], "hello");
1055 assert_eq!(arr[0]["response"], "world");
1056 assert!(arr[0].get("started_at").is_some());
1058 assert!(arr[0].get("completed_at").is_some());
1059 }
1060
1061 #[test]
1063 fn snapshot_conversation_history_capped_at_10() {
1064 let metrics = ExecutionMetrics::new();
1065 let observer = metrics.create_observer();
1066
1067 for i in 0..15u32 {
1068 observer.on_paused(&[LlmQuery {
1069 id: QueryId::single(),
1070 prompt: format!("prompt-{i}"),
1071 system: None,
1072 max_tokens: 10,
1073 grounded: false,
1074 underspecified: false,
1075 }]);
1076 observer.on_response_fed(&QueryId::single(), &format!("resp-{i}"), None);
1077 observer.on_resumed();
1078 }
1079
1080 let snap = metrics.snapshot(true);
1081 let history = snap["conversation_history"]
1082 .as_array()
1083 .expect("must be array");
1084 assert_eq!(history.len(), 10, "capped at 10 entries");
1085 assert_eq!(history[0]["prompt"], "prompt-5");
1087 assert_eq!(history[9]["prompt"], "prompt-14");
1088 }
1089
1090 #[test]
1092 fn snapshot_includes_recent_logs() {
1093 let metrics = ExecutionMetrics::new();
1094 let observer = metrics.create_observer();
1095 observer.on_log(&crate::LogEntry::new("info", "engine", "test-log"));
1096
1097 let snap = metrics.snapshot(false);
1098 let logs = snap
1099 .get("recent_logs")
1100 .expect("recent_logs must be in snapshot");
1101 let arr = logs.as_array().expect("recent_logs must be array");
1102 assert_eq!(arr.len(), 1);
1103 assert_eq!(arr[0]["message"], "test-log");
1104 }
1105
1106 #[test]
1108 fn snapshot_tokens_aggregate() {
1109 let metrics = ExecutionMetrics::new();
1110 let observer = metrics.create_observer();
1111
1112 observer.on_paused(&[LlmQuery {
1113 id: QueryId::single(),
1114 prompt: "x".repeat(100),
1115 system: None,
1116 max_tokens: 50,
1117 grounded: false,
1118 underspecified: false,
1119 }]);
1120 observer.on_response_fed(&QueryId::single(), &"y".repeat(50), None);
1121 observer.on_resumed();
1122
1123 let snap = metrics.snapshot(false);
1124 let tokens = snap.get("tokens").expect("tokens must be in snapshot");
1125 let prompt_total = tokens["prompt_total"]
1126 .as_u64()
1127 .expect("prompt_total must be u64");
1128 let response_total = tokens["response_total"]
1129 .as_u64()
1130 .expect("response_total must be u64");
1131 let total = tokens["total"].as_u64().expect("total must be u64");
1132 assert!(prompt_total > 0, "prompt_total must be positive");
1134 assert!(response_total > 0, "response_total must be positive");
1135 assert_eq!(total, prompt_total + response_total);
1136 }
1137
1138 #[test]
1143 fn usage_aggregate_none_when_no_llm_calls() {
1144 let metrics = ExecutionMetrics::new();
1145 assert!(
1146 metrics.usage_aggregate().is_none(),
1147 "fresh metrics with no LLM calls must return None"
1148 );
1149 }
1150
1151 #[test]
1152 fn usage_aggregate_some_when_llm_calls_recorded() {
1153 use crate::TokenUsage;
1154
1155 let metrics = ExecutionMetrics::new();
1156 let observer = metrics.create_observer();
1157
1158 let queries = vec![LlmQuery {
1159 id: QueryId::batch(0),
1160 prompt: "test".into(),
1161 system: None,
1162 max_tokens: 100,
1163 grounded: false,
1164 underspecified: false,
1165 }];
1166 observer.on_paused(&queries);
1167 observer.on_response_fed(
1168 &QueryId::batch(0),
1169 "ans",
1170 Some(&TokenUsage {
1171 prompt_tokens: Some(10),
1172 completion_tokens: Some(5),
1173 }),
1174 );
1175
1176 let result = metrics.usage_aggregate();
1177 assert!(
1178 result.is_some(),
1179 "usage_aggregate must return Some after LLM call"
1180 );
1181 let usage = result.unwrap();
1182 assert_eq!(
1183 usage.prompt_tokens,
1184 Some(10),
1185 "prompt_tokens must match provided value"
1186 );
1187 assert_eq!(
1188 usage.completion_tokens,
1189 Some(5),
1190 "completion_tokens must match provided value"
1191 );
1192 }
1193
1194 #[test]
1195 fn usage_aggregate_some_with_estimated_path() {
1196 let metrics = ExecutionMetrics::new();
1200 let observer = metrics.create_observer();
1201
1202 let queries = vec![LlmQuery {
1203 id: QueryId::single(),
1204 prompt: "hello world".into(),
1205 system: None,
1206 max_tokens: 50,
1207 grounded: false,
1208 underspecified: false,
1209 }];
1210 observer.on_paused(&queries);
1211 observer.on_response_fed(&QueryId::single(), "response text", None);
1213
1214 let result = metrics.usage_aggregate();
1215 assert!(
1216 result.is_some(),
1217 "usage_aggregate must return Some even when token counts are estimated"
1218 );
1219 let usage = result.unwrap();
1220 assert!(
1221 usage.prompt_tokens.is_some(),
1222 "prompt_tokens must be Some (estimated)"
1223 );
1224 assert!(
1225 usage.completion_tokens.is_some(),
1226 "completion_tokens must be Some (estimated)"
1227 );
1228 }
1229}