Skip to main content

algocline_core/
metrics.rs

1use std::sync::{Arc, Mutex};
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use crate::budget::Budget;
5use crate::observer::ExecutionObserver;
6use crate::progress::ProgressInfo;
7use crate::recent_log::{LogEntry, LogSink};
8use crate::tokens::{estimate_tokens, TokenCount, TokenSource};
9use crate::{BudgetHandle, CustomMetrics, CustomMetricsHandle, LlmQuery, ProgressHandle, QueryId};
10
11// ─── Transcript ─────────────────────────────────────────────
12
13/// A single prompt/response exchange in the transcript.
14///
15/// Each entry is the authoritative token record for one LLM call.
16/// Token counts start as character-based estimates (`on_paused`) and
17/// are upgraded to host-provided values when available (`on_response_fed`).
18/// Session-level totals are computed by summing across all entries.
19struct TranscriptEntry {
20    query_id: String,
21    prompt: String,
22    system: Option<String>,
23    response: Option<String>,
24    /// Prompt token count for this query (Estimated or Provided).
25    prompt_tokens: u64,
26    prompt_source: TokenSource,
27    /// Response token count for this query (Estimated or Provided).
28    /// Zero until `on_response_fed` is called.
29    response_tokens: u64,
30    response_source: TokenSource,
31    /// Unix millisecond timestamp when the LLM request was issued (on_paused).
32    started_at_ms: i64,
33    /// Unix millisecond timestamp when the LLM response was received (on_response_fed).
34    /// None until the response arrives.
35    completed_at_ms: Option<i64>,
36}
37
38impl TranscriptEntry {
39    fn to_json(&self) -> serde_json::Value {
40        serde_json::json!({
41            "query_id": self.query_id,
42            "prompt": self.prompt,
43            "system": self.system,
44            "response": self.response,
45        })
46    }
47
48    /// Project this entry into the `conversation_history` JSON shape for
49    /// `alc_status` output (opt-in via `include_history=true`).
50    ///
51    /// # Returns
52    ///
53    /// A `serde_json::Value` with fields:
54    /// `query_id`, `prompt`, `response`, `prompt_tokens`, `response_tokens`,
55    /// `started_at` (unix ms), `completed_at` (unix ms or null).
56    fn to_history_json(&self) -> serde_json::Value {
57        serde_json::json!({
58            "query_id": self.query_id,
59            "prompt": self.prompt,
60            "response": self.response,
61            "prompt_tokens": self.prompt_tokens,
62            "response_tokens": self.response_tokens,
63            "started_at": self.started_at_ms,
64            "completed_at": self.completed_at_ms,
65        })
66    }
67}
68
69/// Metrics automatically derived from the execution lifecycle.
70///
71/// # Locking design
72///
73/// `SessionStatus` is wrapped in `Arc<std::sync::Mutex>` and shared across:
74///
75/// | Consumer | Thread | Access | Via |
76/// |---|---|---|---|
77/// | `MetricsObserver` | tokio async task | write (on_paused, on_response_fed, etc.) | `Arc<Mutex<SessionStatus>>` |
78/// | `BudgetHandle` | Lua OS thread | read (check, remaining) | `Arc<Mutex<SessionStatus>>` |
79/// | `ProgressHandle` | Lua OS thread | write (set) | `Arc<Mutex<SessionStatus>>` |
80/// | `ExecutionMetrics` | tokio async task | read (to_json, snapshot, transcript_to_json) | `Arc<Mutex<SessionStatus>>` |
81///
82/// ## Why `std::sync::Mutex` (not `tokio::sync::Mutex`)
83///
84/// All lock holders complete within microseconds (field reads, arithmetic,
85/// small JSON construction) and **never hold the lock across `.await` points**.
86/// Per tokio guidance, `std::sync::Mutex` is preferred when the critical
87/// section is short and synchronous.
88///
89/// ## Lock ordering
90///
91/// When nested with `SessionRegistry`'s `tokio::sync::Mutex` (lock **C**),
92/// the invariant is always **C → A** (registry lock acquired first).
93/// No code path acquires A then C, so deadlock is structurally impossible.
94///
95/// ## Contention analysis
96///
97/// Each session creates its own `ExecutionMetrics` instance (see
98/// `Executor::start_session`), so the `SessionStatus` mutex is **not shared
99/// across sessions**. Within a single session, the Lua thread and the
100/// tokio async task alternate via mpsc channel handoff:
101///
102/// 1. Lua calls `alc.llm()` → `BudgetHandle::check()` locks A (Lua thread)
103/// 2. Lock released, then `tx.send(LlmRequest)` (mpsc)
104/// 3. `Session::wait_event()` receives request → `on_paused()` locks A (async task)
105///
106/// Steps 1 and 3 are sequenced by the mpsc channel, so they never contend.
107/// The only true contention is `snapshot()` (from `alc_status`) vs. observer
108/// methods, which is harmless given microsecond hold times.
109///
110/// ## Poison policy
111///
112/// Poison can only occur if a thread panics while holding this lock.
113/// The only panic-capable code under the lock is `Vec::push` and
114/// `serde_json::json!` (both panic only on OOM). On OOM the process
115/// is unrecoverable, so poison handling is academic.
116///
117/// Policy: `BudgetHandle::check()` propagates poison as `Err` (because
118/// it gates Lua control flow). All other consumers silently skip on
119/// poison (observation/recording — degraded but non-fatal).
120/// If you encounter a poison error in production, it indicates either
121/// OOM or a bug in code executed under the lock.
122pub(crate) struct SessionStatus {
123    started_at: Instant,
124    ended_at: Option<Instant>,
125    pub(crate) llm_calls: u64,
126    pauses: u64,
127    rounds: u64,
128    total_prompt_chars: u64,
129    total_response_chars: u64,
130    transcript: Vec<TranscriptEntry>,
131    pub(crate) budget: Option<Budget>,
132    pub(crate) progress: Option<ProgressInfo>,
133}
134
135impl SessionStatus {
136    fn new() -> Self {
137        Self {
138            started_at: Instant::now(),
139            ended_at: None,
140            llm_calls: 0,
141            pauses: 0,
142            rounds: 0,
143            total_prompt_chars: 0,
144            total_response_chars: 0,
145            transcript: Vec::new(),
146            budget: None,
147            progress: None,
148        }
149    }
150
151    /// Aggregate prompt tokens from all transcript entries.
152    fn prompt_token_count(&self) -> TokenCount {
153        let mut tc = TokenCount::new(TokenSource::Definite);
154        for e in &self.transcript {
155            tc.accumulate(e.prompt_tokens, e.prompt_source);
156        }
157        tc
158    }
159
160    /// Aggregate response tokens from all transcript entries.
161    fn response_token_count(&self) -> TokenCount {
162        let mut tc = TokenCount::new(TokenSource::Definite);
163        for e in &self.transcript {
164            tc.accumulate(e.response_tokens, e.response_source);
165        }
166        tc
167    }
168
169    /// Total tokens (prompt + response) across all transcript entries.
170    fn total_tokens(&self) -> u64 {
171        self.transcript
172            .iter()
173            .map(|e| e.prompt_tokens + e.response_tokens)
174            .sum()
175    }
176
177    /// Wall-clock elapsed milliseconds since session start.
178    fn elapsed_ms(&self) -> u64 {
179        self.ended_at
180            .map(|end| end.duration_since(self.started_at).as_millis() as u64)
181            .unwrap_or_else(|| self.started_at.elapsed().as_millis() as u64)
182    }
183
184    fn to_json(&self) -> serde_json::Value {
185        let prompt_tc = self.prompt_token_count();
186        let response_tc = self.response_token_count();
187        let total_tc = TokenCount {
188            tokens: prompt_tc.tokens + response_tc.tokens,
189            source: prompt_tc.source.weaker(response_tc.source),
190        };
191        let mut json = serde_json::json!({
192            "elapsed_ms": self.elapsed_ms(),
193            "llm_calls": self.llm_calls,
194            "pauses": self.pauses,
195            "rounds": self.rounds,
196            "total_prompt_chars": self.total_prompt_chars,
197            "total_response_chars": self.total_response_chars,
198            "prompt_tokens": prompt_tc.to_json(),
199            "response_tokens": response_tc.to_json(),
200            "total_tokens": total_tc.to_json(),
201        });
202        if let Some(ref b) = self.budget {
203            json["budget"] = b.to_json();
204        }
205        json
206    }
207
208    pub(crate) fn check_budget(&self) -> Result<(), String> {
209        match self.budget {
210            Some(ref b) => b.check(self.llm_calls, self.elapsed_ms(), self.total_tokens()),
211            None => Ok(()),
212        }
213    }
214
215    /// Lightweight snapshot for external observation (alc_status).
216    ///
217    /// Returns running metrics with additive v2 fields:
218    /// - `tokens` — cumulative prompt/response/total counts plus `current_query`
219    ///   for the in-flight request (if any).  Always included.
220    /// - `recent_logs` — capped ring buffer (≤20) of recent log entries.
221    ///   Always included.
222    /// - `conversation_history` — last ≤10 transcript entries.
223    ///   Included **only** when `include_history=true` to protect high-frequency
224    ///   polling callers from wire-size inflation (see design: wf-sim
225    ///   restructure_shape verdict and metrics.rs doc "without transcript which
226    ///   can be large").
227    ///
228    /// # Arguments
229    ///
230    /// - `include_history` — When `true`, `conversation_history` (≤10 entries)
231    ///   is appended to the output JSON.  When `false`, the key is absent.
232    /// - `log_sink` — The session's [`LogSink`] from which `recent_logs` is
233    ///   populated (held at `ExecutionMetrics` level to allow lock-free cloning).
234    fn snapshot(&self, include_history: bool, log_sink: &LogSink) -> serde_json::Value {
235        // Build token aggregates.
236        let prompt_tc = self.prompt_token_count();
237        let response_tc = self.response_token_count();
238        let total_tokens = prompt_tc.tokens + response_tc.tokens;
239
240        // Determine in-flight query (last transcript entry without a response,
241        // only meaningful while paused).
242        let current_query = self.transcript.last().and_then(|e| {
243            if e.response.is_none() {
244                Some(serde_json::json!({
245                    "query_id": e.query_id,
246                    "prompt_tokens": e.prompt_tokens,
247                    "started_waiting_at": e.started_at_ms,
248                }))
249            } else {
250                None
251            }
252        });
253
254        let mut json = serde_json::json!({
255            "elapsed_ms": self.elapsed_ms(),
256            "llm_calls": self.llm_calls,
257            "rounds": self.rounds,
258            "tokens": {
259                "prompt_total": prompt_tc.tokens,
260                "response_total": response_tc.tokens,
261                "total": total_tokens,
262                "current_query": current_query,
263            },
264            "recent_logs": log_sink.to_json(),
265        });
266
267        if let Some(ref p) = self.progress {
268            json["progress"] = serde_json::json!({
269                "step": p.step,
270                "total": p.total,
271                "message": p.message,
272            });
273        }
274
275        if let Some(ref b) = self.budget {
276            json["budget_remaining"] =
277                b.remaining_json(self.llm_calls, self.elapsed_ms(), self.total_tokens());
278        }
279
280        if include_history {
281            // Emit the last ≤10 transcript entries in chronological order.
282            let start = self.transcript.len().saturating_sub(10);
283            let history: Vec<serde_json::Value> = self.transcript[start..]
284                .iter()
285                .map(|e| e.to_history_json())
286                .collect();
287            json["conversation_history"] = serde_json::Value::Array(history);
288        }
289
290        json
291    }
292
293    pub(crate) fn budget_remaining(&self) -> serde_json::Value {
294        match self.budget {
295            None => serde_json::Value::Null,
296            Some(ref b) => b.remaining_json(self.llm_calls, self.elapsed_ms(), self.total_tokens()),
297        }
298    }
299}
300
301/// Measurement data for a single execution.
302///
303/// Created per-session in `Executor::start_session()`. The `auto` and
304/// `custom` mutexes are **not shared across sessions** — each session
305/// gets independent instances. Handles (`BudgetHandle`, `ProgressHandle`,
306/// `MetricsObserver`) are cloned from the same `Arc` and handed to the
307/// Lua bridge and observer respectively.
308///
309/// The `log_sink` is a separate `Arc`-backed ring buffer for per-session log
310/// capture. It is shared with the Lua bridge (via `log_sink_handle()`) so that
311/// `print()` and `alc.log()` output is routed directly without acquiring the
312/// `SessionStatus` mutex.
313pub struct ExecutionMetrics {
314    auto: Arc<Mutex<SessionStatus>>,
315    custom: Arc<Mutex<CustomMetrics>>,
316    log_sink: LogSink,
317}
318
319impl ExecutionMetrics {
320    pub fn new() -> Self {
321        Self {
322            auto: Arc::new(Mutex::new(SessionStatus::new())),
323            custom: Arc::new(Mutex::new(CustomMetrics::new())),
324            log_sink: LogSink::new(),
325        }
326    }
327
328    /// JSON snapshot combining auto and custom metrics.
329    pub fn to_json(&self) -> serde_json::Value {
330        let auto_json = self
331            .auto
332            .lock()
333            .map(|m| m.to_json())
334            .unwrap_or(serde_json::Value::Null);
335
336        let custom_json = self
337            .custom
338            .lock()
339            .map(|m| m.to_json())
340            .unwrap_or(serde_json::Value::Null);
341
342        serde_json::json!({
343            "auto": auto_json,
344            "custom": custom_json,
345        })
346    }
347
348    /// Transcript entries as JSON array.
349    pub fn transcript_to_json(&self) -> Vec<serde_json::Value> {
350        self.auto
351            .lock()
352            .map(|m| m.transcript.iter().map(|e| e.to_json()).collect())
353            .unwrap_or_default()
354    }
355
356    /// Handle for custom metrics, passed to the Lua bridge.
357    pub fn custom_metrics_handle(&self) -> CustomMetricsHandle {
358        CustomMetricsHandle::new(Arc::clone(&self.custom))
359    }
360
361    /// Set session budget limits.
362    pub fn set_budget(&self, budget: Budget) {
363        if let Ok(mut m) = self.auto.lock() {
364            m.budget = Some(budget);
365        }
366    }
367
368    /// Create a budget handle for the Lua bridge to check limits.
369    pub fn budget_handle(&self) -> BudgetHandle {
370        BudgetHandle::new(Arc::clone(&self.auto))
371    }
372
373    /// Create a progress handle for the Lua bridge to report progress.
374    pub fn progress_handle(&self) -> ProgressHandle {
375        ProgressHandle::new(Arc::clone(&self.auto))
376    }
377
378    /// Lightweight snapshot for external observation (alc_status).
379    ///
380    /// Returns metrics without transcript by default; pass `include_history=true`
381    /// to additionally include the last ≤10 conversation exchanges.
382    ///
383    /// # Arguments
384    ///
385    /// - `include_history` — When `true`, `conversation_history` (≤10 entries)
386    ///   is included in the JSON output.  When `false` (default), the key is absent.
387    ///
388    /// # Returns
389    ///
390    /// A `serde_json::Value` snapshot, or `Value::Null` if the internal mutex
391    /// is poisoned (only possible on OOM-induced panic — degraded but non-fatal).
392    pub fn snapshot(&self, include_history: bool) -> serde_json::Value {
393        self.auto
394            .lock()
395            .map(|m| m.snapshot(include_history, &self.log_sink))
396            .unwrap_or(serde_json::Value::Null)
397    }
398
399    pub fn create_observer(&self) -> MetricsObserver {
400        MetricsObserver::new(Arc::clone(&self.auto), self.log_sink.clone())
401    }
402
403    /// Return a cloned handle to the session's log-capture ring buffer.
404    ///
405    /// The returned [`LogSink`] shares the same underlying `Arc<Mutex<VecDeque>>`
406    /// as the observer.  Pass this to the Lua bridge so that `print()` /
407    /// `alc.log()` output is routed into the per-session ring buffer.
408    ///
409    /// # Returns
410    ///
411    /// A cloned [`LogSink`] that shares state with the observer's sink.
412    pub fn log_sink_handle(&self) -> LogSink {
413        self.log_sink.clone()
414    }
415
416    /// Create a stats handle for the Lua bridge to read auto-counted
417    /// session metrics (e.g. `alc.stats.llm_calls()`).
418    pub fn stats_handle(&self) -> StatsHandle {
419        StatsHandle::new(Arc::clone(&self.auto))
420    }
421
422    /// Aggregate token usage for this execution.
423    ///
424    /// Returns `None` when no LLM calls have been observed (`llm_calls == 0`),
425    /// preserving the wire-shape invariant: `None` means "aggregation path did
426    /// not run" and `Some(TokenUsage{…})` means "at least one LLM call occurred"
427    /// (even when the host did not supply exact counts).
428    ///
429    /// Returns `None` on mutex poison (OOM-class failure; silent skip per existing
430    /// observation/recording policy — process unrecoverable in that case).
431    pub fn usage_aggregate(&self) -> Option<crate::TokenUsage> {
432        let m = self.auto.lock().ok()?;
433        if m.llm_calls == 0 {
434            return None;
435        }
436        Some(crate::TokenUsage {
437            prompt_tokens: Some(m.prompt_token_count().tokens),
438            completion_tokens: Some(m.response_token_count().tokens),
439        })
440    }
441}
442
443/// Read-only handle exposing auto-counted [`SessionStatus`] metrics to
444/// the Lua bridge (e.g. `alc.stats.llm_calls()`).
445///
446/// Cloned per session and per fork-child VM. Each holds an
447/// `Arc<Mutex<SessionStatus>>` shared with the observer that writes to
448/// `llm_calls` on every paused-cycle complete.
449///
450/// # Poison policy
451///
452/// Read methods return `0` (or sensible defaults) on mutex poison —
453/// they are observational and non-fatal, mirroring `BudgetHandle::remaining`.
454/// Reads do **not** mutate `SessionStatus`.
455#[derive(Clone)]
456pub struct StatsHandle {
457    auto: Arc<Mutex<SessionStatus>>,
458}
459
460impl StatsHandle {
461    pub(crate) fn new(auto: Arc<Mutex<SessionStatus>>) -> Self {
462        Self { auto }
463    }
464
465    /// Total LLM calls observed in the current session so far.
466    ///
467    /// Returns `0` on mutex poison (observational; matches the
468    /// `Null` fallback used by `BudgetHandle::remaining`). Within a
469    /// single session the Lua thread is the only writer path, so
470    /// poison only occurs when an observer callback panicked under
471    /// the lock — an unrecoverable state where `0` is acceptable.
472    pub fn llm_calls(&self) -> u64 {
473        self.auto.lock().map(|m| m.llm_calls).unwrap_or(0)
474    }
475}
476
477impl Default for ExecutionMetrics {
478    fn default() -> Self {
479        Self::new()
480    }
481}
482
483impl serde::Serialize for ExecutionMetrics {
484    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
485        self.to_json().serialize(serializer)
486    }
487}
488
489/// Updates SessionStatus via the ExecutionObserver trait.
490pub struct MetricsObserver {
491    auto: Arc<Mutex<SessionStatus>>,
492    log_sink: LogSink,
493}
494
495impl MetricsObserver {
496    pub(crate) fn new(auto: Arc<Mutex<SessionStatus>>, log_sink: LogSink) -> Self {
497        Self { auto, log_sink }
498    }
499}
500
501impl ExecutionObserver for MetricsObserver {
502    fn on_paused(&self, queries: &[LlmQuery]) {
503        // Note: duration_since fails only if wall clock is before UNIX_EPOCH
504        // (broken system clock). Saturating to zero is harmless for timestamps.
505        let now_ms = SystemTime::now()
506            .duration_since(UNIX_EPOCH)
507            .unwrap_or_default()
508            .as_millis() as i64;
509        if let Ok(mut m) = self.auto.lock() {
510            m.pauses += 1;
511            m.llm_calls += queries.len() as u64;
512            for q in queries {
513                m.total_prompt_chars += q.prompt.len() as u64;
514                let mut est = estimate_tokens(&q.prompt);
515                if let Some(ref sys) = q.system {
516                    m.total_prompt_chars += sys.len() as u64;
517                    est += estimate_tokens(sys);
518                }
519                m.transcript.push(TranscriptEntry {
520                    query_id: q.id.as_str().to_string(),
521                    prompt: q.prompt.clone(),
522                    system: q.system.clone(),
523                    response: None,
524                    prompt_tokens: est,
525                    prompt_source: TokenSource::Estimated,
526                    response_tokens: 0,
527                    response_source: TokenSource::Estimated,
528                    started_at_ms: now_ms,
529                    completed_at_ms: None,
530                });
531            }
532        }
533    }
534
535    fn on_response_fed(
536        &self,
537        query_id: &QueryId,
538        response: &str,
539        usage: Option<&crate::TokenUsage>,
540    ) {
541        // Note: duration_since fails only if wall clock is before UNIX_EPOCH
542        // (broken system clock). Saturating to zero is harmless for timestamps.
543        let now_ms = SystemTime::now()
544            .duration_since(UNIX_EPOCH)
545            .unwrap_or_default()
546            .as_millis() as i64;
547        if let Ok(mut m) = self.auto.lock() {
548            m.total_response_chars += response.len() as u64;
549
550            if let Some(entry) = m
551                .transcript
552                .iter_mut()
553                .rev()
554                .find(|e| e.query_id == query_id.as_str())
555            {
556                entry.response = Some(response.to_string());
557                entry.completed_at_ms = Some(now_ms);
558
559                // Prompt tokens: upgrade to Provided if host reported them.
560                if let Some(pt) = usage.and_then(|u| u.prompt_tokens) {
561                    entry.prompt_tokens = pt;
562                    entry.prompt_source = TokenSource::Provided;
563                }
564
565                // Response tokens: Provided if available, else Estimated.
566                match usage.and_then(|u| u.completion_tokens) {
567                    Some(ct) => {
568                        entry.response_tokens = ct;
569                        entry.response_source = TokenSource::Provided;
570                    }
571                    None => {
572                        entry.response_tokens = estimate_tokens(response);
573                        entry.response_source = TokenSource::Estimated;
574                    }
575                }
576            }
577        }
578    }
579
580    fn on_log(&self, entry: &LogEntry) {
581        self.log_sink.push(entry.clone());
582    }
583
584    fn on_resumed(&self) {
585        if let Ok(mut m) = self.auto.lock() {
586            m.rounds += 1;
587        }
588    }
589
590    fn on_completed(&self, _result: &serde_json::Value) {
591        if let Ok(mut m) = self.auto.lock() {
592            m.ended_at = Some(Instant::now());
593        }
594    }
595
596    fn on_failed(&self, _error: &str) {
597        if let Ok(mut m) = self.auto.lock() {
598            m.ended_at = Some(Instant::now());
599        }
600    }
601
602    fn on_cancelled(&self) {
603        if let Ok(mut m) = self.auto.lock() {
604            m.ended_at = Some(Instant::now());
605        }
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use crate::{LlmQuery, QueryId};
613
614    #[test]
615    fn metrics_to_json_has_auto_and_custom() {
616        let metrics = ExecutionMetrics::new();
617        let json = metrics.to_json();
618        assert!(json.get("auto").is_some());
619        assert!(json.get("custom").is_some());
620    }
621
622    #[test]
623    fn custom_handle_shares_state() {
624        let metrics = ExecutionMetrics::new();
625        let handle = metrics.custom_metrics_handle();
626
627        handle.record("key".into(), serde_json::json!("value"));
628
629        let json = metrics.to_json();
630        let custom = json.get("custom").unwrap();
631        assert_eq!(custom.get("key").unwrap(), "value");
632    }
633
634    #[test]
635    fn observer_updates_auto_metrics() {
636        let metrics = ExecutionMetrics::new();
637        let observer = metrics.create_observer();
638
639        let queries = vec![LlmQuery {
640            id: QueryId::batch(0),
641            prompt: "test".into(),
642            system: None,
643            max_tokens: 100,
644            grounded: false,
645            underspecified: false,
646        }];
647
648        observer.on_paused(&queries);
649        observer.on_completed(&serde_json::json!(null));
650
651        let json = metrics.to_json();
652        let auto = json.get("auto").unwrap();
653        assert_eq!(auto.get("llm_calls").unwrap(), 1);
654        assert_eq!(auto.get("pauses").unwrap(), 1);
655        assert_eq!(auto.get("rounds").unwrap(), 0);
656        assert_eq!(auto.get("total_prompt_chars").unwrap(), 4); // "test" = 4 chars
657        assert_eq!(auto.get("total_response_chars").unwrap(), 0);
658    }
659
660    #[test]
661    fn observer_tracks_prompt_and_response_chars() {
662        let metrics = ExecutionMetrics::new();
663        let observer = metrics.create_observer();
664
665        let queries = vec![
666            LlmQuery {
667                id: QueryId::batch(0),
668                prompt: "hello".into(),     // 5 chars
669                system: Some("sys".into()), // 3 chars
670                max_tokens: 100,
671                grounded: false,
672                underspecified: false,
673            },
674            LlmQuery {
675                id: QueryId::batch(1),
676                prompt: "world".into(), // 5 chars
677                system: None,
678                max_tokens: 100,
679                grounded: false,
680                underspecified: false,
681            },
682        ];
683
684        observer.on_paused(&queries);
685        observer.on_response_fed(&QueryId::batch(0), &"x".repeat(42), None);
686        observer.on_response_fed(&QueryId::batch(1), &"y".repeat(58), None);
687        observer.on_resumed();
688        observer.on_completed(&serde_json::json!(null));
689
690        let json = metrics.to_json();
691        let auto = json.get("auto").unwrap();
692        assert_eq!(auto.get("total_prompt_chars").unwrap(), 13); // 5+3+5
693        assert_eq!(auto.get("total_response_chars").unwrap(), 100); // 42+58
694        assert_eq!(auto.get("rounds").unwrap(), 1);
695    }
696
697    #[test]
698    fn observer_tracks_multiple_rounds() {
699        let metrics = ExecutionMetrics::new();
700        let observer = metrics.create_observer();
701
702        let q = vec![LlmQuery {
703            id: QueryId::single(),
704            prompt: "p".into(),
705            system: None,
706            max_tokens: 10,
707            grounded: false,
708            underspecified: false,
709        }];
710
711        // Round 1
712        observer.on_paused(&q);
713        observer.on_response_fed(&QueryId::single(), &"x".repeat(10), None);
714        observer.on_resumed();
715        // Round 2
716        observer.on_paused(&q);
717        observer.on_response_fed(&QueryId::single(), &"y".repeat(20), None);
718        observer.on_resumed();
719        // Round 3
720        observer.on_paused(&q);
721        observer.on_response_fed(&QueryId::single(), &"z".repeat(30), None);
722        observer.on_resumed();
723
724        observer.on_completed(&serde_json::json!(null));
725
726        let json = metrics.to_json();
727        let auto = json.get("auto").unwrap();
728        assert_eq!(auto.get("rounds").unwrap(), 3);
729        assert_eq!(auto.get("pauses").unwrap(), 3);
730        assert_eq!(auto.get("llm_calls").unwrap(), 3);
731        assert_eq!(auto.get("total_prompt_chars").unwrap(), 3); // "p" x 3
732        assert_eq!(auto.get("total_response_chars").unwrap(), 60); // 10+20+30
733    }
734
735    #[test]
736    fn transcript_records_prompt_response_pairs() {
737        let metrics = ExecutionMetrics::new();
738        let observer = metrics.create_observer();
739
740        let queries = vec![LlmQuery {
741            id: QueryId::single(),
742            prompt: "What is 2+2?".into(),
743            system: Some("You are a calculator.".into()),
744            max_tokens: 50,
745            grounded: false,
746            underspecified: false,
747        }];
748
749        observer.on_paused(&queries);
750        observer.on_response_fed(&QueryId::single(), "4", None);
751        observer.on_resumed();
752        observer.on_completed(&serde_json::json!(null));
753
754        let transcript = metrics.transcript_to_json();
755        assert_eq!(transcript.len(), 1);
756        assert_eq!(transcript[0]["query_id"], "q-0");
757        assert_eq!(transcript[0]["prompt"], "What is 2+2?");
758        assert_eq!(transcript[0]["system"], "You are a calculator.");
759        assert_eq!(transcript[0]["response"], "4");
760    }
761
762    #[test]
763    fn transcript_not_in_stats() {
764        let metrics = ExecutionMetrics::new();
765        let observer = metrics.create_observer();
766        observer.on_paused(&[LlmQuery {
767            id: QueryId::single(),
768            prompt: "p".into(),
769            system: None,
770            max_tokens: 10,
771            grounded: false,
772            underspecified: false,
773        }]);
774        observer.on_response_fed(&QueryId::single(), "r", None);
775        observer.on_resumed();
776        observer.on_completed(&serde_json::json!(null));
777
778        let json = metrics.to_json();
779        assert!(json["auto"].get("transcript").is_none());
780    }
781
782    #[test]
783    fn transcript_multi_round() {
784        let metrics = ExecutionMetrics::new();
785        let observer = metrics.create_observer();
786
787        // Round 1
788        observer.on_paused(&[LlmQuery {
789            id: QueryId::single(),
790            prompt: "step1".into(),
791            system: None,
792            max_tokens: 100,
793            grounded: false,
794            underspecified: false,
795        }]);
796        observer.on_response_fed(&QueryId::single(), "answer1", None);
797        observer.on_resumed();
798
799        // Round 2
800        observer.on_paused(&[LlmQuery {
801            id: QueryId::single(),
802            prompt: "step2".into(),
803            system: Some("expert".into()),
804            max_tokens: 100,
805            grounded: false,
806            underspecified: false,
807        }]);
808        observer.on_response_fed(&QueryId::single(), "answer2", None);
809        observer.on_resumed();
810
811        observer.on_completed(&serde_json::json!(null));
812
813        let transcript = metrics.transcript_to_json();
814        assert_eq!(transcript.len(), 2);
815
816        assert_eq!(transcript[0]["prompt"], "step1");
817        assert!(transcript[0]["system"].is_null());
818        assert_eq!(transcript[0]["response"], "answer1");
819
820        assert_eq!(transcript[1]["prompt"], "step2");
821        assert_eq!(transcript[1]["system"], "expert");
822        assert_eq!(transcript[1]["response"], "answer2");
823    }
824
825    #[test]
826    fn transcript_batch_queries() {
827        let metrics = ExecutionMetrics::new();
828        let observer = metrics.create_observer();
829
830        let queries = vec![
831            LlmQuery {
832                id: QueryId::batch(0),
833                prompt: "q0".into(),
834                system: None,
835                max_tokens: 50,
836                grounded: false,
837                underspecified: false,
838            },
839            LlmQuery {
840                id: QueryId::batch(1),
841                prompt: "q1".into(),
842                system: None,
843                max_tokens: 50,
844                grounded: false,
845                underspecified: false,
846            },
847        ];
848
849        observer.on_paused(&queries);
850        observer.on_response_fed(&QueryId::batch(0), "r0", None);
851        observer.on_response_fed(&QueryId::batch(1), "r1", None);
852        observer.on_resumed();
853        observer.on_completed(&serde_json::json!(null));
854
855        let transcript = metrics.transcript_to_json();
856        assert_eq!(transcript.len(), 2);
857        assert_eq!(transcript[0]["query_id"], "q-0");
858        assert_eq!(transcript[0]["response"], "r0");
859        assert_eq!(transcript[1]["query_id"], "q-1");
860        assert_eq!(transcript[1]["response"], "r1");
861    }
862
863    // ── v2 tests ────────────────────────────────────────────────
864
865    // T1: on_log routes entries into the LogSink shared with metrics
866    #[test]
867    fn on_log_routes_to_log_sink() {
868        let metrics = ExecutionMetrics::new();
869        let observer = metrics.create_observer();
870
871        observer.on_log(&crate::LogEntry::new("info", "engine", "hello"));
872        observer.on_log(&crate::LogEntry::new("warn", "alc.log", "world"));
873
874        let sink = metrics.log_sink_handle();
875        let entries = sink.entries();
876        assert_eq!(entries.len(), 2);
877        assert_eq!(entries[0].level, "info");
878        assert_eq!(entries[0].source, "engine");
879        assert_eq!(entries[0].message, "hello");
880        assert_eq!(entries[1].level, "warn");
881        assert_eq!(entries[1].message, "world");
882    }
883
884    // T2: boundary — on_log cap=20 enforcement via observer
885    #[test]
886    fn on_log_cap_enforcement_via_observer() {
887        let metrics = ExecutionMetrics::new();
888        let observer = metrics.create_observer();
889
890        for i in 0..=20u32 {
891            observer.on_log(&crate::LogEntry::new("info", "engine", format!("msg-{i}")));
892        }
893
894        let sink = metrics.log_sink_handle();
895        let entries = sink.entries();
896        assert_eq!(entries.len(), crate::recent_log::LOG_SINK_CAP);
897        assert_eq!(entries[0].message, "msg-1");
898        assert_eq!(
899            entries[crate::recent_log::LOG_SINK_CAP - 1].message,
900            "msg-20"
901        );
902    }
903
904    // T1: on_paused records started_at_ms; on_response_fed sets completed_at_ms
905    // Verified via snapshot(true) which projects TranscriptEntry timestamps into JSON.
906    #[test]
907    fn transcript_timestamps_recorded() {
908        let metrics = ExecutionMetrics::new();
909        let observer = metrics.create_observer();
910
911        let before = std::time::SystemTime::now()
912            .duration_since(std::time::UNIX_EPOCH)
913            .unwrap_or_default()
914            .as_millis() as i64;
915
916        observer.on_paused(&[LlmQuery {
917            id: QueryId::single(),
918            prompt: "ts-test".into(),
919            system: None,
920            max_tokens: 10,
921            grounded: false,
922            underspecified: false,
923        }]);
924
925        observer.on_response_fed(&QueryId::single(), "response", None);
926
927        let after_fed = std::time::SystemTime::now()
928            .duration_since(std::time::UNIX_EPOCH)
929            .unwrap_or_default()
930            .as_millis() as i64;
931
932        // Use snapshot(true) to expose timestamps from transcript projection.
933        let snap = metrics.snapshot(true);
934        let history = snap["conversation_history"]
935            .as_array()
936            .expect("conversation_history must be array");
937        assert_eq!(history.len(), 1);
938
939        let started_at = history[0]["started_at"]
940            .as_i64()
941            .expect("started_at must be i64");
942        let completed_at = history[0]["completed_at"]
943            .as_i64()
944            .expect("completed_at must be i64 (not null)");
945
946        assert!(
947            started_at >= before,
948            "started_at ({started_at}) should be >= before ({before})"
949        );
950        assert!(
951            completed_at >= started_at,
952            "completed_at ({completed_at}) should be >= started_at ({started_at})"
953        );
954        assert!(
955            completed_at <= after_fed,
956            "completed_at ({completed_at}) should be <= after_fed ({after_fed})"
957        );
958    }
959
960    // T1: paused state shows current_query in snapshot (include_history=false)
961    #[test]
962    fn snapshot_current_query_while_paused() {
963        let metrics = ExecutionMetrics::new();
964        let observer = metrics.create_observer();
965
966        observer.on_paused(&[LlmQuery {
967            id: QueryId::single(),
968            prompt: "in-flight".into(),
969            system: None,
970            max_tokens: 10,
971            grounded: false,
972            underspecified: false,
973        }]);
974
975        // Snapshot without completing the response — last entry has response=None
976        let snap = metrics.snapshot(false);
977
978        let tokens = snap.get("tokens").expect("tokens field must be present");
979        let current_query = tokens
980            .get("current_query")
981            .expect("current_query must be present");
982        assert!(
983            !current_query.is_null(),
984            "current_query should be non-null while paused"
985        );
986        assert_eq!(current_query["query_id"], "q-0");
987        // conversation_history must be absent with include_history=false
988        assert!(
989            snap.get("conversation_history").is_none(),
990            "conversation_history must be absent when include_history=false"
991        );
992    }
993
994    // T2: after response is fed, current_query becomes null
995    #[test]
996    fn snapshot_current_query_null_after_response() {
997        let metrics = ExecutionMetrics::new();
998        let observer = metrics.create_observer();
999
1000        observer.on_paused(&[LlmQuery {
1001            id: QueryId::single(),
1002            prompt: "done".into(),
1003            system: None,
1004            max_tokens: 10,
1005            grounded: false,
1006            underspecified: false,
1007        }]);
1008        observer.on_response_fed(&QueryId::single(), "answer", None);
1009
1010        let snap = metrics.snapshot(false);
1011        let tokens = snap.get("tokens").expect("tokens must be present");
1012        let current_query = &tokens["current_query"];
1013        assert!(
1014            current_query.is_null(),
1015            "current_query should be null after response is fed"
1016        );
1017    }
1018
1019    // T1/T3: conversation_history only when include_history=true
1020    #[test]
1021    fn snapshot_conversation_history_opt_in() {
1022        let metrics = ExecutionMetrics::new();
1023        let observer = metrics.create_observer();
1024
1025        observer.on_paused(&[LlmQuery {
1026            id: QueryId::single(),
1027            prompt: "hello".into(),
1028            system: None,
1029            max_tokens: 50,
1030            grounded: false,
1031            underspecified: false,
1032        }]);
1033        observer.on_response_fed(&QueryId::single(), "world", None);
1034        observer.on_resumed();
1035        observer.on_completed(&serde_json::json!(null));
1036
1037        // false: conversation_history key must be absent
1038        let snap_false = metrics.snapshot(false);
1039        assert!(
1040            snap_false.get("conversation_history").is_none(),
1041            "conversation_history must be absent with include_history=false"
1042        );
1043
1044        // true: conversation_history key must be present
1045        let snap_true = metrics.snapshot(true);
1046        let history = snap_true
1047            .get("conversation_history")
1048            .expect("conversation_history must be present with include_history=true");
1049        let arr = history
1050            .as_array()
1051            .expect("conversation_history must be an array");
1052        assert_eq!(arr.len(), 1);
1053        assert_eq!(arr[0]["query_id"], "q-0");
1054        assert_eq!(arr[0]["prompt"], "hello");
1055        assert_eq!(arr[0]["response"], "world");
1056        // started_at and completed_at must be present
1057        assert!(arr[0].get("started_at").is_some());
1058        assert!(arr[0].get("completed_at").is_some());
1059    }
1060
1061    // T2: conversation_history capped at 10 entries
1062    #[test]
1063    fn snapshot_conversation_history_capped_at_10() {
1064        let metrics = ExecutionMetrics::new();
1065        let observer = metrics.create_observer();
1066
1067        for i in 0..15u32 {
1068            observer.on_paused(&[LlmQuery {
1069                id: QueryId::single(),
1070                prompt: format!("prompt-{i}"),
1071                system: None,
1072                max_tokens: 10,
1073                grounded: false,
1074                underspecified: false,
1075            }]);
1076            observer.on_response_fed(&QueryId::single(), &format!("resp-{i}"), None);
1077            observer.on_resumed();
1078        }
1079
1080        let snap = metrics.snapshot(true);
1081        let history = snap["conversation_history"]
1082            .as_array()
1083            .expect("must be array");
1084        assert_eq!(history.len(), 10, "capped at 10 entries");
1085        // Should be the last 10: prompt-5 through prompt-14
1086        assert_eq!(history[0]["prompt"], "prompt-5");
1087        assert_eq!(history[9]["prompt"], "prompt-14");
1088    }
1089
1090    // T1: recent_logs appears in snapshot output
1091    #[test]
1092    fn snapshot_includes_recent_logs() {
1093        let metrics = ExecutionMetrics::new();
1094        let observer = metrics.create_observer();
1095        observer.on_log(&crate::LogEntry::new("info", "engine", "test-log"));
1096
1097        let snap = metrics.snapshot(false);
1098        let logs = snap
1099            .get("recent_logs")
1100            .expect("recent_logs must be in snapshot");
1101        let arr = logs.as_array().expect("recent_logs must be array");
1102        assert_eq!(arr.len(), 1);
1103        assert_eq!(arr[0]["message"], "test-log");
1104    }
1105
1106    // T1: tokens aggregate is correct in snapshot
1107    #[test]
1108    fn snapshot_tokens_aggregate() {
1109        let metrics = ExecutionMetrics::new();
1110        let observer = metrics.create_observer();
1111
1112        observer.on_paused(&[LlmQuery {
1113            id: QueryId::single(),
1114            prompt: "x".repeat(100),
1115            system: None,
1116            max_tokens: 50,
1117            grounded: false,
1118            underspecified: false,
1119        }]);
1120        observer.on_response_fed(&QueryId::single(), &"y".repeat(50), None);
1121        observer.on_resumed();
1122
1123        let snap = metrics.snapshot(false);
1124        let tokens = snap.get("tokens").expect("tokens must be in snapshot");
1125        let prompt_total = tokens["prompt_total"]
1126            .as_u64()
1127            .expect("prompt_total must be u64");
1128        let response_total = tokens["response_total"]
1129            .as_u64()
1130            .expect("response_total must be u64");
1131        let total = tokens["total"].as_u64().expect("total must be u64");
1132        // Estimates: 100 chars / 4 ≈ 25, 50 chars / 4 ≈ 12 (estimate_tokens rounding)
1133        assert!(prompt_total > 0, "prompt_total must be positive");
1134        assert!(response_total > 0, "response_total must be positive");
1135        assert_eq!(total, prompt_total + response_total);
1136    }
1137
1138    // -----------------------------------------------------------------------
1139    // usage_aggregate tests
1140    // -----------------------------------------------------------------------
1141
1142    #[test]
1143    fn usage_aggregate_none_when_no_llm_calls() {
1144        let metrics = ExecutionMetrics::new();
1145        assert!(
1146            metrics.usage_aggregate().is_none(),
1147            "fresh metrics with no LLM calls must return None"
1148        );
1149    }
1150
1151    #[test]
1152    fn usage_aggregate_some_when_llm_calls_recorded() {
1153        use crate::TokenUsage;
1154
1155        let metrics = ExecutionMetrics::new();
1156        let observer = metrics.create_observer();
1157
1158        let queries = vec![LlmQuery {
1159            id: QueryId::batch(0),
1160            prompt: "test".into(),
1161            system: None,
1162            max_tokens: 100,
1163            grounded: false,
1164            underspecified: false,
1165        }];
1166        observer.on_paused(&queries);
1167        observer.on_response_fed(
1168            &QueryId::batch(0),
1169            "ans",
1170            Some(&TokenUsage {
1171                prompt_tokens: Some(10),
1172                completion_tokens: Some(5),
1173            }),
1174        );
1175
1176        let result = metrics.usage_aggregate();
1177        assert!(
1178            result.is_some(),
1179            "usage_aggregate must return Some after LLM call"
1180        );
1181        let usage = result.unwrap();
1182        assert_eq!(
1183            usage.prompt_tokens,
1184            Some(10),
1185            "prompt_tokens must match provided value"
1186        );
1187        assert_eq!(
1188            usage.completion_tokens,
1189            Some(5),
1190            "completion_tokens must match provided value"
1191        );
1192    }
1193
1194    #[test]
1195    fn usage_aggregate_some_with_estimated_path() {
1196        // on_paused + on_response_fed with no host-provided usage (estimated path).
1197        // Even with no explicit TokenUsage from host, usage_aggregate returns Some
1198        // because at least one LLM call occurred (llm_calls > 0).
1199        let metrics = ExecutionMetrics::new();
1200        let observer = metrics.create_observer();
1201
1202        let queries = vec![LlmQuery {
1203            id: QueryId::single(),
1204            prompt: "hello world".into(),
1205            system: None,
1206            max_tokens: 50,
1207            grounded: false,
1208            underspecified: false,
1209        }];
1210        observer.on_paused(&queries);
1211        // No host-supplied usage — token counts are estimated from character counts.
1212        observer.on_response_fed(&QueryId::single(), "response text", None);
1213
1214        let result = metrics.usage_aggregate();
1215        assert!(
1216            result.is_some(),
1217            "usage_aggregate must return Some even when token counts are estimated"
1218        );
1219        let usage = result.unwrap();
1220        assert!(
1221            usage.prompt_tokens.is_some(),
1222            "prompt_tokens must be Some (estimated)"
1223        );
1224        assert!(
1225            usage.completion_tokens.is_some(),
1226            "completion_tokens must be Some (estimated)"
1227        );
1228    }
1229}