Skip to main content

algocline_engine/execution/
registry.rs

1//! `SessionRegistryV2` — the engine-level session lifecycle manager for the v2 path.
2//!
3//! Coexists with the legacy `SessionRegistry` (`session.rs`) without modifying it.
4//! New callers (Subtask 3's `AppService::ExecutionService` impl) use this registry.
5//!
6//! # Design invariants
7//!
8//! - **Invariant 6**: `spawn_v2()` returns the `SessionId` immediately; execution
9//!   runs in the background via `tokio::spawn(driver_loop(...))`.
10//! - **Crux R1**: No `rmcp::*`, `progressToken`, `_meta`, `notifications/*`, or
11//!   `mcp_`-prefixed identifiers appear anywhere in this module.
12//! - **Crux R2**: Cancellation uses `CancellationToken::cancel()`; no
13//!   `JoinHandle::abort()` or process kill path exists.
14//! - **Crux R3**: `observe()` is a sync `fn` that calls `bus_tx.subscribe()` and
15//!   returns a valid handle with zero pre-registered observers.
16//! - **K-4**: The `sessions` `RwLock` is never held across `.await` points; the
17//!   `clone-then-release` pattern is used throughout.
18
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicI64, Ordering};
21use std::sync::Arc;
22use std::time::Duration;
23
24use algocline_core::execution::{
25    AwaitError, CancelError, CancelReason, ExecutionState, ExecutionStateTag, ObserveError,
26    ObserverHandle, PauseKind, ProgressEvent, ResumeError, ResumeOutcome, SessionId, SpawnError,
27    StateError, TerminalOutcome,
28};
29use algocline_core::{ExecutionMetrics, ExecutionObserver, QueryId, TokenUsage};
30use tokio::sync::{Mutex, RwLock};
31use tokio_util::sync::CancellationToken;
32
33use super::driver::{build_cancel_info, driver_loop, now_ms, transition_state, DriverContext};
34use super::observer::BroadcastObserverHandle;
35use super::record::{RespTxsMap, SessionRecord};
36use crate::card::FileCardStore;
37use crate::executor::Executor;
38use crate::state::JsonFileStore;
39
40// ---------------------------------------------------------------------------
41// SessionRegistryV2
42// ---------------------------------------------------------------------------
43
44/// Registry that manages the lifecycle of v2 execution sessions.
45///
46/// `Clone` is cheap — the inner `Arc<RwLock<...>>` is reference-counted.
47#[derive(Clone)]
48pub struct SessionRegistryV2 {
49    sessions: Arc<RwLock<HashMap<SessionId, Arc<SessionRecord>>>>,
50    executor: Arc<Executor>,
51    state_store: Arc<JsonFileStore>,
52    card_store: Arc<FileCardStore>,
53    scenarios_dir: std::path::PathBuf,
54}
55
56impl SessionRegistryV2 {
57    /// Create a new empty registry backed by `executor`, with the storage paths
58    /// that will be injected into each spawned VM session.
59    ///
60    /// The `state_store` / `card_store` / `scenarios_dir` mirror the legacy
61    /// `AppService` resolution against the `AppConfig::app_dir()` layout, so a
62    /// v2 caller produces the same on-disk side effects as a legacy caller.
63    pub fn new(
64        executor: Arc<Executor>,
65        state_store: Arc<JsonFileStore>,
66        card_store: Arc<FileCardStore>,
67        scenarios_dir: std::path::PathBuf,
68    ) -> Self {
69        Self {
70            sessions: Arc::new(RwLock::new(HashMap::new())),
71            executor,
72            state_store,
73            card_store,
74            scenarios_dir,
75        }
76    }
77
78    // -----------------------------------------------------------------------
79    // spawn_v2
80    // -----------------------------------------------------------------------
81
82    /// Start a new v2 execution session, returning the `SessionId` immediately.
83    ///
84    /// Execution proceeds in the background via `tokio::spawn(driver_loop(...))`.
85    /// The caller receives the `SessionId` without waiting for execution to complete
86    /// or for the first event (Invariant 6 / debt #40955).
87    ///
88    /// Only [`algocline_core::execution::SpecKind::Run`] is supported in this subtask.
89    /// Other variants return [`SpawnError::InvalidSpec`].  Subtask 3 will extend this
90    /// to handle `Advice` and `Eval` through the full `AppService` path.
91    ///
92    /// # Errors
93    /// - [`SpawnError::Engine`] — the executor failed to start the session.
94    /// - [`SpawnError::InvalidSpec`] — the provided spec is malformed or uses an
95    ///   unsupported kind.
96    pub async fn spawn_v2(
97        &self,
98        spec: algocline_core::execution::SessionSpec,
99    ) -> Result<SessionId, SpawnError> {
100        use algocline_core::execution::SpecKind;
101
102        // Extract code from the spec kind.  Only Run is supported here.
103        let code = match spec.kind {
104            SpecKind::Run { code } => code,
105            other => {
106                return Err(SpawnError::InvalidSpec(format!(
107                    "SessionRegistryV2::spawn_v2 only supports SpecKind::Run; got {:?}",
108                    std::mem::discriminant(&other)
109                )));
110            }
111        };
112
113        if code.trim().is_empty() {
114            return Err(SpawnError::InvalidSpec("code must not be empty".into()));
115        }
116
117        let ctx = spec.ctx.unwrap_or_else(|| serde_json::json!({}));
118
119        // Start the per-session VM using the storage paths injected at
120        // registry construction (mirrors legacy AppService::start_and_tick).
121        let session = self
122            .executor
123            .start_session(
124                code,
125                ctx,
126                vec![], // extra_lib_paths — populated by Advice/Eval kinds later
127                vec![], // variant_pkgs   — populated by Advice/Eval kinds later
128                Arc::clone(&self.state_store),
129                Arc::clone(&self.card_store),
130                self.scenarios_dir.clone(),
131            )
132            .await
133            .map_err(SpawnError::Engine)?;
134
135        let (exec_task, llm_rx, vm_driver, metrics) = session.into_driver_parts();
136
137        // Build shared components — all constructed before spawning the task.
138        let state: Arc<Mutex<ExecutionState>> = Arc::new(Mutex::new(ExecutionState::Running));
139        let cancel_token = CancellationToken::new();
140        let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
141        // Wall-clock ms timestamp for idle-time GC (Crux #3 legacy parity).
142        // Initialised to now_ms() so a session that is evicted before driver_loop
143        // even starts is treated as "just spawned" rather than immediately expired.
144        let last_active: Arc<AtomicI64> = Arc::new(AtomicI64::new(now_ms()));
145        // Wrap metrics in Arc and clone into both DriverContext and SessionRecord
146        // so both can access the same SessionStatus accumulator (K-4 clone-then-release).
147        let metrics_arc: Arc<ExecutionMetrics> = Arc::new(metrics);
148
149        // Crux R3 (sink-free): the receiver returned alongside `bus_tx` is
150        // dropped immediately.  `bus_tx.send()` returns `Err(SendError)` when
151        // 0 observers are subscribed, but every call site in `driver_loop`
152        // uses `let _ = bus_tx.send(...)` to absorb the result — the caller
153        // is never crashed by 0 observers.  See
154        // `record::tests::bus_tx_does_not_crash_caller_with_zero_observers`.
155        let (bus_tx, _) = tokio::sync::broadcast::channel::<ProgressEvent>(256);
156
157        let session_id = SessionId::generate();
158
159        // Bundle shared resources for driver_loop.
160        let ctx = DriverContext {
161            state: Arc::clone(&state),
162            bus_tx: bus_tx.clone(),
163            cancel_token: cancel_token.clone(),
164            resp_txs: Arc::clone(&resp_txs),
165            last_active: Arc::clone(&last_active),
166            metrics: Arc::clone(&metrics_arc),
167        };
168
169        let join_handle = tokio::spawn(async move {
170            // vm_driver must stay alive for the duration of the session.
171            let _keep_driver = vm_driver;
172            driver_loop(ctx, exec_task, llm_rx).await;
173        });
174
175        // Assemble the record with all shared fields.
176        let record = Arc::new(SessionRecord {
177            state,
178            bus_tx,
179            last_active,
180            cancel_token,
181            join_handle: Mutex::new(Some(join_handle)),
182            resp_txs,
183            first_cancel_info: Mutex::new(None),
184            metrics: metrics_arc,
185        });
186
187        // Insert into registry.
188        {
189            let mut map = self.sessions.write().await;
190            map.insert(session_id.clone(), record);
191        }
192        Ok(session_id)
193    }
194
195    // -----------------------------------------------------------------------
196    // state
197    // -----------------------------------------------------------------------
198
199    /// Query the current [`ExecutionState`] of a session.
200    ///
201    /// # Errors
202    /// - [`StateError::NotFound`] — no session with the given id exists.
203    pub async fn state(&self, id: &SessionId) -> Result<ExecutionState, StateError> {
204        let record = self
205            .get_record(id)
206            .await
207            .ok_or_else(|| StateError::NotFound(id.clone()))?;
208        let guard = record.state.lock().await;
209        Ok(guard.clone())
210    }
211
212    // -----------------------------------------------------------------------
213    // resume
214    // -----------------------------------------------------------------------
215
216    /// Resume a paused session by delivering LLM responses.
217    ///
218    /// # Errors
219    /// - [`ResumeError::NotFound`] — no session with the given id exists.
220    /// - [`ResumeError::NotPaused`] — the session is not in the `Paused` state.
221    /// - [`ResumeError::AlreadyCancelled`] — the session is already cancelled.
222    pub async fn resume(
223        &self,
224        id: &SessionId,
225        payload: algocline_core::execution::ResumePayload,
226    ) -> Result<ResumeOutcome, ResumeError> {
227        use algocline_core::execution::ResumePayload;
228
229        let record = self
230            .get_record(id)
231            .await
232            .ok_or_else(|| ResumeError::NotFound(id.clone()))?;
233
234        // checkpoint C: at resume entry
235        // If the token is already cancelled, reject the resume immediately.
236        if record.cancel_token.is_cancelled() {
237            return Err(ResumeError::AlreadyCancelled);
238        }
239
240        // Verify the session is Paused (or Cancelled after the token check above).
241        let (actual_tag, pause_kind) = {
242            let guard = record.state.lock().await;
243            let tag = guard.tag();
244            let kind = if let ExecutionState::Paused(ref info) = *guard {
245                info.kind
246            } else {
247                PauseKind::Single
248            };
249            (tag, kind)
250        };
251
252        match actual_tag {
253            ExecutionStateTag::Cancelled => return Err(ResumeError::AlreadyCancelled),
254            ExecutionStateTag::Paused => {} // continue
255            _ => return Err(ResumeError::NotPaused { actual_tag }),
256        }
257
258        // Extract query responses from the payload, preserving per-response usage.
259        let responses: Vec<(String, String, Option<TokenUsage>)> = match payload {
260            ResumePayload::Single {
261                query_id,
262                response,
263                usage,
264            } => vec![(query_id, response, usage)],
265            ResumePayload::Batch(batch) => batch
266                .into_iter()
267                .map(|r| (r.query_id, r.response, r.usage))
268                .collect(),
269        };
270
271        // Deliver responses via the shared resp_txs map.
272        {
273            let mut txs = record.resp_txs.lock().await;
274            for (qid_str, response, _usage) in &responses {
275                let qid = QueryId::parse(qid_str);
276                match txs.remove(&qid) {
277                    Some(tx) => {
278                        if let Err(_e) = tx.send(Ok(response.clone())) {
279                            tracing::debug!(
280                                "registry::resume: oneshot receiver already dropped for query {qid_str}"
281                            );
282                        }
283                    }
284                    None => {
285                        tracing::debug!("registry::resume: no pending tx for query {qid_str}");
286                    }
287                }
288            }
289        }
290
291        // Propagate per-response usage to the metrics observer (Crux 1: same Arc).
292        // Observer call is outside the txs lock scope to keep cancel/lock paths intact.
293        let observer = record.metrics.create_observer();
294        for (qid_str, response, usage) in &responses {
295            let qid = QueryId::parse(qid_str);
296            observer.on_response_fed(&qid, response, usage.as_ref());
297        }
298
299        // Transition state from Paused → Running.
300        {
301            let guard = record.state.lock().await;
302            if guard.tag() == ExecutionStateTag::Paused {
303                drop(guard);
304                transition_state(&record.state, &record.bus_tx, ExecutionState::Running).await;
305                let _ = record.bus_tx.send(ProgressEvent::ResumeAccepted {
306                    payload_kind: pause_kind,
307                    at: now_ms(),
308                });
309            }
310        }
311
312        Ok(ResumeOutcome::Continued)
313    }
314
315    // -----------------------------------------------------------------------
316    // cancel
317    // -----------------------------------------------------------------------
318
319    /// Request cooperative cancellation of a session.
320    ///
321    /// Idempotent: returns `Ok(())` for sessions already in a terminal state.
322    ///
323    /// # Errors
324    /// - [`CancelError::NotFound`] — no session with the given id exists.
325    pub async fn cancel(&self, id: &SessionId, reason: CancelReason) -> Result<(), CancelError> {
326        let record = self
327            .get_record(id)
328            .await
329            .ok_or_else(|| CancelError::NotFound(id.clone()))?;
330
331        // Idempotency: already terminal → Ok.
332        {
333            let guard = record.state.lock().await;
334            if matches!(
335                guard.tag(),
336                ExecutionStateTag::Done | ExecutionStateTag::Failed | ExecutionStateTag::Cancelled
337            ) {
338                return Ok(());
339            }
340        }
341
342        // Store the first CancelInfo (idempotent: only set once).
343        {
344            let mut first = record.first_cancel_info.lock().await;
345            if first.is_none() {
346                let info = build_cancel_info(&record.state, reason).await;
347                *first = Some(info);
348            }
349        }
350
351        // Signal the driver (Crux R2: cooperative — no abort).
352        record.cancel_token.cancel();
353
354        // For Paused sessions, transition immediately: the driver is blocked
355        // waiting for a resume and won't hit a checkpoint on its own.
356        let should_transition = {
357            let guard = record.state.lock().await;
358            guard.tag() == ExecutionStateTag::Paused
359        };
360        if should_transition {
361            let cancel_info_opt = {
362                let first = record.first_cancel_info.lock().await;
363                first.clone()
364            };
365            if let Some(info) = cancel_info_opt {
366                transition_state(
367                    &record.state,
368                    &record.bus_tx,
369                    ExecutionState::Cancelled(info),
370                )
371                .await;
372            }
373        }
374
375        Ok(())
376    }
377
378    // -----------------------------------------------------------------------
379    // observe  (sync fn — Crux R3)
380    // -----------------------------------------------------------------------
381
382    /// Subscribe to the progress event stream for a session.
383    ///
384    /// This is a **synchronous** `fn`: `broadcast::Sender::subscribe()` is
385    /// synchronous and does not perform I/O.  Multiple concurrent subscribers
386    /// each receive the full event stream independently (Crux R3).
387    ///
388    /// # Errors
389    /// - [`ObserveError::NotFound`] — no session with the given id exists, **or**
390    ///   `try_read()` experienced lock contention (write lock held by `spawn`).
391    ///   The contention path emits `tracing::warn!(target = "session.observe", ...)`;
392    ///   callers cannot distinguish it from a true absent-session result.
393    pub fn observe(&self, id: &SessionId) -> Result<Box<dyn ObserverHandle>, ObserveError> {
394        // Non-blocking read; the write lock is only held very briefly during spawn.
395        match self.sessions.try_read() {
396            Ok(map) => {
397                let record = map
398                    .get(id)
399                    .ok_or_else(|| ObserveError::NotFound(id.clone()))?;
400                Ok(Box::new(BroadcastObserverHandle::new(&record.bus_tx)))
401            }
402            Err(_) => {
403                tracing::warn!(
404                    target = "session.observe",
405                    session_id = %id,
406                    "try_read contention; surfacing as NotFound"
407                );
408                Err(ObserveError::NotFound(id.clone()))
409            }
410        }
411    }
412
413    // -----------------------------------------------------------------------
414    // await_terminal
415    // -----------------------------------------------------------------------
416
417    /// Await the terminal state of a session.
418    ///
419    /// Polls the shared state until it reaches a terminal variant (`Done`,
420    /// `Cancelled`, or `Failed`).  The `JoinHandle` is never `.abort()`-ed
421    /// (Crux R2).
422    ///
423    /// # Errors
424    /// - [`AwaitError::NotFound`] — no session with the given id exists.
425    pub async fn await_terminal(&self, id: &SessionId) -> Result<TerminalOutcome, AwaitError> {
426        let record = self
427            .get_record(id)
428            .await
429            .ok_or_else(|| AwaitError::NotFound(id.clone()))?;
430
431        // Single-awaiter path: take the JoinHandle and await `driver_loop`
432        // completion directly.  Replaces the previous `yield_now()` polling
433        // loop that occupied a tokio worker slot scheduling-wise even though
434        // it consumed no CPU.  The `driver_loop` guarantees a terminal
435        // `transition_state` before returning, so once `handle.await` resolves
436        // the state is guaranteed terminal.
437        let handle_opt = {
438            let mut guard = record.join_handle.lock().await;
439            guard.take()
440        };
441
442        if let Some(handle) = handle_opt {
443            handle
444                .await
445                .map_err(|e| AwaitError::Joined(format!("driver_loop join error: {e}")))?;
446        }
447        // (None branch: another caller has already taken the handle.  Either
448        // they are still awaiting it — in which case the driver_loop has not
449        // yet transitioned to terminal — or they have already finished, in
450        // which case the state is terminal.  We fall through to a single
451        // state read; the rare concurrent race returns `AwaitError::Joined`.)
452
453        let guard = record.state.lock().await;
454        match &*guard {
455            ExecutionState::Done(result) => Ok(TerminalOutcome::Done(result.clone())),
456            ExecutionState::Cancelled(info) => Ok(TerminalOutcome::Cancelled(info.clone())),
457            ExecutionState::Failed(info) => Ok(TerminalOutcome::Failed(info.clone())),
458            other => Err(AwaitError::Joined(format!(
459                "await_terminal: driver_loop completed but state is {:?} (concurrent awaiter race)",
460                other.tag()
461            ))),
462        }
463    }
464
465    // -----------------------------------------------------------------------
466    // Internal helpers
467    // -----------------------------------------------------------------------
468
469    // -----------------------------------------------------------------------
470    // spawn_gc_task
471    // -----------------------------------------------------------------------
472
473    /// Spawn a background GC task that periodically evicts idle, terminal sessions.
474    ///
475    /// Mirrors the legacy `SessionRegistry::spawn_gc_task` contract (Crux #3 legacy
476    /// parity) with two extensions:
477    ///
478    /// 1. **Subscriber-count gate** (Crux #1): a session is only evicted when
479    ///    `bus_tx.receiver_count() == 0` at the moment the write guard is held,
480    ///    ensuring no use-after-eviction for active observers.
481    /// 2. **Parameterised `interval`** (Crux #2): callers can supply a sub-second
482    ///    interval for test determinism without requiring `tokio::time::pause`.
483    ///
484    /// The `JoinHandle` returned by `tokio::spawn` is intentionally dropped —
485    /// the task runs until process exit (legacy fire-and-forget contract).
486    ///
487    /// # K-4 invariant
488    ///
489    /// The `sessions` write guard is acquired once per GC tick.  All operations
490    /// inside the guard (`receiver_count()`, `AtomicI64::load`, `HashMap::remove`)
491    /// are **synchronous** — no `.await` is called while the guard is held.
492    pub fn spawn_gc_task(&self, ttl: Duration, interval: Duration) {
493        let sessions = Arc::clone(&self.sessions);
494        tokio::spawn(async move {
495            let mut ticker = tokio::time::interval(interval);
496            loop {
497                ticker.tick().await;
498                // Acquire the write guard once per tick.  All reads and removes
499                // within this block are sync — no `.await` inside the guard (K-4).
500                let mut map = sessions.write().await;
501                let mut to_evict: Vec<SessionId> = Vec::new();
502                for (id, record) in map.iter() {
503                    // Crux #1: check subscriber count atomically with the guard held.
504                    // `receiver_count()` is sync (no lock required on its own), but
505                    // holding the write guard here means `observe()` cannot attach a
506                    // new subscriber via `try_read()` concurrently — TOCTOU excluded.
507                    let no_subscribers = record.bus_tx.receiver_count() == 0;
508                    let last_ms = record.last_active.load(Ordering::Relaxed);
509                    if no_subscribers && is_expired_v2(last_ms, ttl) {
510                        to_evict.push(id.clone());
511                    }
512                }
513                for id in &to_evict {
514                    tracing::info!(session_id = %id, "GC: reaping expired v2 session");
515                    map.remove(id);
516                }
517            }
518        });
519    }
520
521    // -----------------------------------------------------------------------
522    // Internal helpers
523    // -----------------------------------------------------------------------
524
525    /// Clone-then-release lookup (K-4): the lock is dropped before returning.
526    async fn get_record(&self, id: &SessionId) -> Option<Arc<SessionRecord>> {
527        let map = self.sessions.read().await;
528        map.get(id).cloned()
529    }
530}
531
532// ---------------------------------------------------------------------------
533// GC helpers (module-private)
534// ---------------------------------------------------------------------------
535
536/// Returns `true` when the session has been idle for at least `ttl`.
537///
538/// Uses wall-clock milliseconds matching the legacy `is_expired_impl` semantics:
539/// `now_ms() - last_active_ms >= ttl.as_millis()`.
540///
541/// The legacy implementation uses `Instant` (monotonic) whereas this uses
542/// `SystemTime` (wall-clock) — identical to the `now_ms()` helper in `driver.rs`
543/// and to `Session.last_activity_ms` in the legacy codebase (Crux #3 parity).
544fn is_expired_v2(last_active_ms: i64, ttl: Duration) -> bool {
545    let now = super::driver::now_ms();
546    let elapsed_ms = now.saturating_sub(last_active_ms);
547    elapsed_ms >= ttl.as_millis() as i64
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use algocline_core::execution::{
554        CancelCode, CancelReason, ExecutionState, SessionSpec, SpecKind,
555    };
556    use std::sync::Arc;
557
558    async fn make_executor() -> Arc<Executor> {
559        Arc::new(Executor::new(vec![]).await.expect("Executor::new"))
560    }
561
562    /// Construct a registry backed by per-test tempdir paths so the legacy
563    /// AppConfig::app_dir() layout is approximated without touching the user's
564    /// `~/.algocline` directory.
565    fn make_registry(executor: Arc<Executor>) -> (SessionRegistryV2, tempfile::TempDir) {
566        let tmp = tempfile::tempdir().expect("tempdir");
567        let state_store = Arc::new(JsonFileStore::new(tmp.path().join("state")));
568        let card_store = Arc::new(FileCardStore::new(tmp.path().join("cards")));
569        let scenarios_dir = tmp.path().join("scenarios");
570        (
571            SessionRegistryV2::new(executor, state_store, card_store, scenarios_dir),
572            tmp,
573        )
574    }
575
576    fn simple_spec(code: &str) -> SessionSpec {
577        SessionSpec {
578            kind: SpecKind::Run {
579                code: code.to_owned(),
580            },
581            project_root: None,
582            ctx: None,
583        }
584    }
585
586    fn cancel_reason() -> CancelReason {
587        CancelReason {
588            code: CancelCode::User,
589            detail: None,
590            requested_at: now_ms(),
591        }
592    }
593
594    // -----------------------------------------------------------------------
595    // spawn_returns_session_id_immediately (debt #40955)
596    // -----------------------------------------------------------------------
597
598    /// `spawn_v2` must return `SessionId` without blocking on execution.
599    #[tokio::test]
600    async fn spawn_returns_session_id_immediately() {
601        let executor = make_executor().await;
602        let (registry, _tmp) = make_registry(executor);
603
604        let start = std::time::Instant::now();
605        let result = tokio::time::timeout(
606            std::time::Duration::from_millis(200),
607            registry.spawn_v2(simple_spec("return 42")),
608        )
609        .await;
610
611        assert!(result.is_ok(), "spawn_v2 must complete within 200ms");
612        assert!(
613            result.unwrap().is_ok(),
614            "spawn_v2 must return Ok(SessionId)"
615        );
616
617        let elapsed = start.elapsed();
618        assert!(
619            elapsed < std::time::Duration::from_millis(150),
620            "spawn_v2 took too long: {elapsed:?}"
621        );
622    }
623
624    // -----------------------------------------------------------------------
625    // state_query_running
626    // -----------------------------------------------------------------------
627
628    /// Immediately after spawn, `state()` must return Running or Paused.
629    #[tokio::test]
630    async fn state_query_running() {
631        let executor = make_executor().await;
632        let (registry, _tmp) = make_registry(executor);
633
634        // Lua that pauses immediately so the session is observable.
635        let sid = registry
636            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
637            .await
638            .expect("spawn");
639
640        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
641
642        let state = registry.state(&sid).await.expect("state");
643        assert!(
644            matches!(state, ExecutionState::Running | ExecutionState::Paused(_)),
645            "state just after spawn must be Running or Paused, got: {:?}",
646            state.tag()
647        );
648    }
649
650    // -----------------------------------------------------------------------
651    // cancel_at_checkpoint_c_at_resume_entry
652    // -----------------------------------------------------------------------
653
654    /// `resume()` on a cancelled session must return `AlreadyCancelled`.
655    #[tokio::test]
656    async fn cancel_at_checkpoint_c_at_resume_entry() {
657        use algocline_core::execution::{ResumeError, ResumePayload};
658
659        let executor = make_executor().await;
660        let (registry, _tmp) = make_registry(executor);
661
662        let sid = registry
663            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
664            .await
665            .expect("spawn");
666
667        // Wait for Paused.
668        let mut retries = 0;
669        loop {
670            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
671            if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
672                break;
673            }
674            retries += 1;
675            assert!(retries < 50, "session did not reach Paused state");
676        }
677
678        registry
679            .cancel(&sid, cancel_reason())
680            .await
681            .expect("cancel");
682
683        // checkpoint C: at resume entry
684        let result = registry
685            .resume(
686                &sid,
687                ResumePayload::Single {
688                    query_id: "q".into(),
689                    response: "4".into(),
690                    usage: None,
691                },
692            )
693            .await;
694
695        assert!(
696            matches!(result, Err(ResumeError::AlreadyCancelled)),
697            "resume on cancelled session must return AlreadyCancelled, got: {result:?}"
698        );
699    }
700
701    // -----------------------------------------------------------------------
702    // cancel_idempotent
703    // -----------------------------------------------------------------------
704
705    #[tokio::test]
706    async fn cancel_idempotent() {
707        let executor = make_executor().await;
708        let (registry, _tmp) = make_registry(executor);
709
710        let sid = registry
711            .spawn_v2(simple_spec("return 1"))
712            .await
713            .expect("spawn");
714
715        registry
716            .cancel(&sid, cancel_reason())
717            .await
718            .expect("first cancel");
719        registry
720            .cancel(&sid, cancel_reason())
721            .await
722            .expect("second cancel");
723    }
724
725    // -----------------------------------------------------------------------
726    // await_terminal returns Done without busy-polling
727    // -----------------------------------------------------------------------
728
729    /// Regression for #2 (case A): `await_terminal` must complete by awaiting
730    /// the `driver_loop` `JoinHandle` directly (single-awaiter `take` +
731    /// `.await`) instead of polling `state` in a `yield_now()` loop.  We can't
732    /// observe scheduler occupancy from a test, but we can verify the
733    /// behavioural contract: (1) the call returns the correct `TerminalOutcome`,
734    /// (2) it returns within a tight wall-clock budget without sleep, and
735    /// (3) a second concurrent caller does not panic.
736    #[tokio::test]
737    async fn await_terminal_returns_done_for_trivial_script() {
738        let executor = make_executor().await;
739        let (registry, _tmp) = make_registry(executor);
740
741        let sid = registry
742            .spawn_v2(simple_spec("return 42"))
743            .await
744            .expect("spawn");
745
746        let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
747        match outcome {
748            TerminalOutcome::Done(result) => {
749                assert_eq!(result.value, serde_json::json!(42));
750            }
751            other => panic!("expected Done, got: {other:?}"),
752        }
753    }
754
755    /// Regression for #2 (case A) single-awaiter discipline: when two callers
756    /// race on `await_terminal`, the second caller (which observes `None` after
757    /// the first has taken the handle) must NOT panic.  It must either return
758    /// the same terminal outcome (if the first has already finished) or an
759    /// `AwaitError::Joined` (the documented race fallback).
760    #[tokio::test]
761    async fn await_terminal_does_not_panic_on_second_concurrent_caller() {
762        let executor = make_executor().await;
763        let (registry, _tmp) = make_registry(executor);
764
765        let sid = registry
766            .spawn_v2(simple_spec("return 99"))
767            .await
768            .expect("spawn");
769
770        let r1 = registry.clone();
771        let r2 = registry.clone();
772        let s1 = sid.clone();
773        let s2 = sid.clone();
774
775        let h1 = tokio::spawn(async move { r1.await_terminal(&s1).await });
776        let h2 = tokio::spawn(async move { r2.await_terminal(&s2).await });
777
778        let out1 = h1.await.expect("h1 join");
779        let out2 = h2.await.expect("h2 join");
780
781        // First-caller path must succeed with the real outcome.
782        let first_ok = matches!(&out1, Ok(TerminalOutcome::Done(_)))
783            || matches!(&out2, Ok(TerminalOutcome::Done(_)));
784        assert!(
785            first_ok,
786            "at least one caller must observe Done; got out1={out1:?}, out2={out2:?}"
787        );
788        // Second caller may have observed Joined (race) or Done; either is OK,
789        // neither must panic — which we've already verified by the join above.
790    }
791
792    // -----------------------------------------------------------------------
793    // observe_sink_free (Crux R3 — registry level)
794    // -----------------------------------------------------------------------
795
796    /// `observe()` must succeed and return a valid handle even with 0 prior observers.
797    #[tokio::test]
798    async fn observe_sink_free_registry() {
799        let executor = make_executor().await;
800        let (registry, _tmp) = make_registry(executor);
801
802        let sid = registry
803            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
804            .await
805            .expect("spawn");
806
807        // observe() before any subscriber exists must succeed.
808        let handle = registry.observe(&sid);
809        assert!(
810            handle.is_ok(),
811            "observe() must return Ok even with 0 prior observers"
812        );
813    }
814
815    // -----------------------------------------------------------------------
816    // observe_multi_subscriber_fan_out (Crux R3 — registry level)
817    // -----------------------------------------------------------------------
818
819    /// Multiple independent observers each get the same events.
820    #[tokio::test]
821    async fn observe_multi_subscriber_fan_out_registry() {
822        use algocline_core::execution::ObserverRecvError;
823
824        let executor = make_executor().await;
825        let (registry, _tmp) = make_registry(executor);
826
827        // A script that returns immediately — the driver will publish Done.
828        let sid = registry
829            .spawn_v2(simple_spec("return 99"))
830            .await
831            .expect("spawn");
832
833        // Subscribe 3 observers.
834        let mut h1 = registry.observe(&sid).expect("observe h1");
835        let mut h2 = registry.observe(&sid).expect("observe h2");
836        let mut h3 = registry.observe(&sid).expect("observe h3");
837
838        // Wait for terminal so we know events have been published.
839        let _ = registry.await_terminal(&sid).await;
840
841        // Each observer must receive at least the terminal StateTransition.
842        // Drain with idle-timeout: bus_tx is retained in SessionRecord for
843        // sink-free late-subscribe (Crux R3), so Closed never fires while the
844        // registry is alive.  A 100ms idle window after await_terminal() is
845        // sufficient — all events are already buffered.
846        use std::time::Duration;
847        for (label, handle) in [("h1", &mut h1), ("h2", &mut h2), ("h3", &mut h3)] {
848            let mut got_transition = false;
849            loop {
850                match tokio::time::timeout(Duration::from_millis(100), handle.recv()).await {
851                    Ok(Ok(ProgressEvent::StateTransition { .. })) => got_transition = true,
852                    Ok(Ok(_)) => {}
853                    Ok(Err(ObserverRecvError::Closed)) => break,
854                    Ok(Err(ObserverRecvError::Lagged(_))) => {}
855                    Err(_) => break, // idle-timeout: no more events coming
856                }
857            }
858            assert!(
859                got_transition,
860                "{label}: must receive at least one StateTransition event"
861            );
862        }
863    }
864
865    // -----------------------------------------------------------------------
866    // AC#5a — gc_evicts_terminal_session_after_ttl
867    // -----------------------------------------------------------------------
868
869    /// GC must remove a terminal session (no subscribers) after TTL has elapsed
870    /// and one full interval tick has fired.
871    ///
872    /// Covers: `tokio::time::interval` + `AtomicI64::load` + `RwLock::write` +
873    /// `receiver_count == 0` (Crux #1 / concurrency-analysis §2 5a).
874    #[tokio::test]
875    async fn gc_evicts_terminal_session_after_ttl() {
876        use algocline_core::execution::ObserveError;
877        use std::time::Duration;
878
879        let executor = make_executor().await;
880        let (registry, _tmp) = make_registry(executor);
881
882        let ttl = Duration::from_millis(100);
883        let interval = Duration::from_millis(50);
884
885        let sid = registry
886            .spawn_v2(simple_spec("return 1"))
887            .await
888            .expect("spawn");
889
890        // Wait for the session to complete (terminal, no subscribers).
891        registry.await_terminal(&sid).await.expect("await_terminal");
892
893        // Sleep beyond one full GC interval + TTL + slack so the GC has had at
894        // least one opportunity to evict (R4 fallback: interval + ttl + 50ms).
895        tokio::time::sleep(interval + ttl + Duration::from_millis(50)).await;
896
897        registry.spawn_gc_task(ttl, interval);
898
899        // Sleep again to let the newly spawned GC run at least one tick.
900        tokio::time::sleep(interval + Duration::from_millis(50)).await;
901
902        // The session must now be gone.
903        assert!(
904            matches!(registry.observe(&sid), Err(ObserveError::NotFound(_))),
905            "session must be evicted after TTL + interval"
906        );
907    }
908
909    // -----------------------------------------------------------------------
910    // AC#5b — gc_does_not_evict_session_with_active_subscriber
911    // -----------------------------------------------------------------------
912
913    /// GC must NOT evict a session that still has active subscribers, even after
914    /// TTL has elapsed.  Once the subscriber is dropped, subsequent GC ticks must
915    /// evict the session.
916    ///
917    /// Covers: `broadcast::Sender::receiver_count` > 0 path (Crux #1 /
918    /// concurrency-analysis §2 5b).
919    #[tokio::test]
920    async fn gc_does_not_evict_session_with_active_subscriber() {
921        use algocline_core::execution::ObserveError;
922        use std::time::Duration;
923
924        let executor = make_executor().await;
925        let (registry, _tmp) = make_registry(executor);
926
927        let ttl = Duration::from_millis(100);
928        let interval = Duration::from_millis(50);
929
930        let sid = registry
931            .spawn_v2(simple_spec("return 2"))
932            .await
933            .expect("spawn");
934
935        // Acquire a subscriber *before* the session reaches terminal.
936        let _handle = registry.observe(&sid).expect("observe");
937
938        // Wait for terminal while subscriber is still held.
939        registry.await_terminal(&sid).await.expect("await_terminal");
940
941        // Start GC — session has receiver_count > 0, must NOT be evicted.
942        registry.spawn_gc_task(ttl, interval);
943
944        // Sleep well beyond TTL + interval.
945        tokio::time::sleep(interval + ttl + Duration::from_millis(50)).await;
946
947        // Session must still be present (subscriber is alive).
948        assert!(
949            registry.observe(&sid).is_ok(),
950            "session must NOT be evicted while a subscriber is held"
951        );
952
953        // Drop the subscriber — now eviction is permitted.
954        drop(_handle);
955
956        // Sleep for another interval + slack so GC ticks again after the drop.
957        tokio::time::sleep(interval + Duration::from_millis(50)).await;
958
959        // Now the session should be evicted.
960        assert!(
961            matches!(registry.observe(&sid), Err(ObserveError::NotFound(_))),
962            "session must be evicted after subscriber is dropped and GC ticks"
963        );
964    }
965
966    // -----------------------------------------------------------------------
967    // AC#5c — gc_respects_interval_no_immediate_eviction
968    // -----------------------------------------------------------------------
969
970    /// GC must NOT evict a terminal session before the interval has fired,
971    /// even if TTL has already elapsed.
972    ///
973    /// Covers: `tokio::time::interval MissedTickBehavior::Burst` guard
974    /// (R4 / concurrency-analysis §2 5c).
975    #[tokio::test]
976    async fn gc_respects_interval_no_immediate_eviction() {
977        use std::time::Duration;
978
979        let executor = make_executor().await;
980        let (registry, _tmp) = make_registry(executor);
981
982        // Use a long interval so we can assert the session is still present
983        // after TTL has elapsed but before an interval tick fires.
984        let ttl = Duration::from_millis(20);
985        let interval = Duration::from_millis(500);
986
987        let sid = registry
988            .spawn_v2(simple_spec("return 3"))
989            .await
990            .expect("spawn");
991
992        registry.await_terminal(&sid).await.expect("await_terminal");
993
994        // Start GC after TTL has elapsed — the first tick fires up to `interval`
995        // from now, so we check immediately (well before the first tick).
996        tokio::time::sleep(ttl + Duration::from_millis(10)).await;
997        registry.spawn_gc_task(ttl, interval);
998
999        // Check immediately — no tick has fired yet.
1000        assert!(
1001            registry.observe(&sid).is_ok(),
1002            "session must NOT be evicted before first GC tick fires"
1003        );
1004    }
1005
1006    // -----------------------------------------------------------------------
1007    // AC#5d — test_atomic_last_active_updated_by_driver_loop
1008    // -----------------------------------------------------------------------
1009
1010    /// Concurrent writer (store) and reader (load) on `last_active` with
1011    /// Relaxed ordering must not panic or cause UB; final value must be > 0.
1012    ///
1013    /// Covers: `AtomicI64::store` + `AtomicI64::load` Relaxed ordering safety
1014    /// under concurrent access (concurrency-analysis §2 5d / Crux #3 invariant).
1015    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1016    async fn test_atomic_last_active_updated_by_driver_loop() {
1017        use std::sync::atomic::{AtomicI64, Ordering};
1018        use std::sync::Arc;
1019
1020        let last_active = Arc::new(AtomicI64::new(0));
1021
1022        let writer_la = Arc::clone(&last_active);
1023        let writer = tokio::spawn(async move {
1024            for _ in 0..1000 {
1025                writer_la.store(now_ms(), Ordering::Relaxed);
1026                tokio::task::yield_now().await;
1027            }
1028        });
1029
1030        let reader_la = Arc::clone(&last_active);
1031        let reader = tokio::spawn(async move {
1032            for _ in 0..1000 {
1033                let _ = reader_la.load(Ordering::Relaxed);
1034                tokio::task::yield_now().await;
1035            }
1036        });
1037
1038        writer.await.expect("writer task must not panic");
1039        reader.await.expect("reader task must not panic");
1040
1041        // After 1000 stores of now_ms() the value must be > 0.
1042        assert!(
1043            last_active.load(Ordering::Relaxed) > 0,
1044            "last_active must be updated to a non-zero wall-clock value"
1045        );
1046    }
1047
1048    // -----------------------------------------------------------------------
1049    // AC#5e — test_concurrent_observe_during_gc_tick
1050    // -----------------------------------------------------------------------
1051
1052    /// 8 concurrent tasks each calling `observe()` 100 times while GC is running
1053    /// must produce only `Ok` or `Err(NotFound)` — never a panic.
1054    ///
1055    /// Covers: `RwLock::try_read` vs `RwLock::write` mutual exclusion +
1056    /// `Arc<RwLock<HashMap>>` clone safety (concurrency-analysis §2 5e).
1057    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1058    async fn test_concurrent_observe_during_gc_tick() {
1059        use algocline_core::execution::ObserveError;
1060        use std::sync::Arc;
1061        use std::time::Duration;
1062
1063        let executor = make_executor().await;
1064        let (registry, _tmp) = make_registry(executor);
1065        let registry = Arc::new(registry);
1066
1067        let ttl = Duration::from_millis(10);
1068        let interval = Duration::from_millis(5);
1069
1070        let sid = registry
1071            .spawn_v2(simple_spec("return 42"))
1072            .await
1073            .expect("spawn");
1074
1075        registry.await_terminal(&sid).await.expect("await_terminal");
1076        registry.spawn_gc_task(ttl, interval);
1077
1078        let mut handles = Vec::new();
1079        for _ in 0..8 {
1080            let reg = Arc::clone(&registry);
1081            let id = sid.clone();
1082            handles.push(tokio::spawn(async move {
1083                for _ in 0..100 {
1084                    match reg.observe(&id) {
1085                        Ok(_) | Err(ObserveError::NotFound(_)) => {}
1086                    }
1087                    tokio::task::yield_now().await;
1088                }
1089            }));
1090        }
1091
1092        for h in handles {
1093            h.await.expect("concurrent observe task must not panic");
1094        }
1095    }
1096
1097    // -----------------------------------------------------------------------
1098    // AC#5f — test_gc_task_spawn_survives_handle_drop
1099    // -----------------------------------------------------------------------
1100
1101    /// `spawn_gc_task` internally drops the `JoinHandle` (legacy fire-and-forget).
1102    /// Verify the GC loop continues running after `spawn_gc_task()` returns by
1103    /// asserting eviction occurs after the expected window.
1104    ///
1105    /// Covers: `tokio::task::spawn` "JoinHandle drop ≠ task abort" contract
1106    /// (concurrency-analysis §2 5f / Crux #2 legacy parity).
1107    #[tokio::test]
1108    async fn test_gc_task_spawn_survives_handle_drop() {
1109        use algocline_core::execution::ObserveError;
1110        use std::time::Duration;
1111
1112        let executor = make_executor().await;
1113        let (registry, _tmp) = make_registry(executor);
1114
1115        let ttl = Duration::from_millis(100);
1116        let interval = Duration::from_millis(50);
1117
1118        // Start GC first — handle is immediately dropped inside spawn_gc_task.
1119        registry.spawn_gc_task(ttl, interval);
1120
1121        let sid = registry
1122            .spawn_v2(simple_spec("return 99"))
1123            .await
1124            .expect("spawn");
1125
1126        registry.await_terminal(&sid).await.expect("await_terminal");
1127
1128        // Sleep long enough for at least 2 GC ticks after TTL.
1129        tokio::time::sleep(ttl + interval * 2 + Duration::from_millis(50)).await;
1130
1131        // The GC task (whose JoinHandle was dropped) must have continued running
1132        // and evicted the session.
1133        assert!(
1134            matches!(registry.observe(&sid), Err(ObserveError::NotFound(_))),
1135            "session must be evicted by the GC task even after its JoinHandle was dropped"
1136        );
1137    }
1138
1139    // -----------------------------------------------------------------------
1140    // AC#5g — test_arc_rwlock_hashmap_shared_across_clones
1141    // -----------------------------------------------------------------------
1142
1143    /// `SessionRegistryV2: Clone` shares the same underlying
1144    /// `Arc<RwLock<HashMap>>`.  A session spawned via one clone must be visible
1145    /// from another clone, and GC started on one clone must evict sessions
1146    /// visible from the other.
1147    ///
1148    /// Covers: `Arc<RwLock<HashMap>>` Send + Sync + Clone shared-state contract
1149    /// (concurrency-analysis §2 5g).
1150    #[tokio::test]
1151    async fn test_arc_rwlock_hashmap_shared_across_clones() {
1152        use algocline_core::execution::ObserveError;
1153        use std::time::Duration;
1154
1155        let executor = make_executor().await;
1156        let (registry_a, _tmp) = make_registry(executor);
1157        let registry_b = registry_a.clone();
1158
1159        let ttl = Duration::from_millis(100);
1160        let interval = Duration::from_millis(50);
1161
1162        // Spawn via registry_a.
1163        let sid = registry_a
1164            .spawn_v2(simple_spec("return 7"))
1165            .await
1166            .expect("spawn via registry_a");
1167
1168        // Session must be visible from registry_b (shared Arc<RwLock<HashMap>>).
1169        assert!(
1170            registry_b.observe(&sid).is_ok(),
1171            "session spawned via registry_a must be visible from registry_b"
1172        );
1173
1174        // Wait for terminal.
1175        registry_a
1176            .await_terminal(&sid)
1177            .await
1178            .expect("await_terminal");
1179
1180        // Start GC via registry_b.
1181        registry_b.spawn_gc_task(ttl, interval);
1182
1183        // Sleep long enough for eviction.
1184        tokio::time::sleep(ttl + interval + Duration::from_millis(50)).await;
1185
1186        // Session evicted via registry_b's GC must be invisible from registry_a too.
1187        assert!(
1188            matches!(registry_a.observe(&sid), Err(ObserveError::NotFound(_))),
1189            "session evicted by registry_b GC must be gone from registry_a too"
1190        );
1191    }
1192
1193    // -----------------------------------------------------------------------
1194    // usage_aggregate_none_for_run_without_llm_calls (test (b))
1195    // -----------------------------------------------------------------------
1196
1197    /// When no `alc.llm` call occurs, `Done.usage` must be `None`.
1198    /// Verifies that the `on_paused` wiring does not falsely activate when no
1199    /// LLM call occurs, and that `usage_aggregate()` gates on `llm_calls > 0`.
1200    #[tokio::test]
1201    async fn usage_aggregate_none_for_run_without_llm_calls() {
1202        use algocline_core::execution::TerminalOutcome;
1203
1204        let executor = make_executor().await;
1205        let (registry, _tmp) = make_registry(executor);
1206
1207        let sid = registry
1208            .spawn_v2(simple_spec("return 42"))
1209            .await
1210            .expect("spawn");
1211
1212        let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
1213        match outcome {
1214            TerminalOutcome::Done(result) => {
1215                assert_eq!(
1216                    result.usage, None,
1217                    "Done.usage must be None when no alc.llm call occurred"
1218                );
1219            }
1220            other => panic!("expected Done, got: {other:?}"),
1221        }
1222    }
1223
1224    // -----------------------------------------------------------------------
1225    // usage_aggregate_some_for_run_with_llm_call (test (a))
1226    // -----------------------------------------------------------------------
1227
1228    /// When `alc.llm` is called and resumed with host-reported usage,
1229    /// `Done.usage` must be `Some(TokenUsage { prompt_tokens: Some(10), completion_tokens: Some(5) })`.
1230    /// Verifies both `on_paused` wiring and `on_response_fed` propagation.
1231    #[tokio::test]
1232    async fn usage_aggregate_some_for_run_with_llm_call() {
1233        use algocline_core::execution::{ResumePayload, TerminalOutcome};
1234        use algocline_core::TokenUsage;
1235
1236        let executor = make_executor().await;
1237        let (registry, _tmp) = make_registry(executor);
1238
1239        let sid = registry
1240            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
1241            .await
1242            .expect("spawn");
1243
1244        // Wait for Paused state.
1245        let mut retries = 0;
1246        loop {
1247            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1248            if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
1249                break;
1250            }
1251            retries += 1;
1252            assert!(retries < 500, "session did not reach Paused state");
1253        }
1254
1255        // Resume with host-reported usage.
1256        registry
1257            .resume(
1258                &sid,
1259                ResumePayload::Single {
1260                    query_id: "q-0".into(),
1261                    response: "answer".into(),
1262                    usage: Some(TokenUsage {
1263                        prompt_tokens: Some(10),
1264                        completion_tokens: Some(5),
1265                    }),
1266                },
1267            )
1268            .await
1269            .expect("resume");
1270
1271        let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
1272        match outcome {
1273            TerminalOutcome::Done(result) => {
1274                assert_eq!(
1275                    result.usage,
1276                    Some(TokenUsage {
1277                        prompt_tokens: Some(10),
1278                        completion_tokens: Some(5),
1279                    }),
1280                    "Done.usage must reflect host-reported token counts"
1281                );
1282            }
1283            other => panic!("expected Done, got: {other:?}"),
1284        }
1285    }
1286
1287    // -----------------------------------------------------------------------
1288    // usage_aggregate_uses_estimates_when_usage_omitted (test (d))
1289    // -----------------------------------------------------------------------
1290
1291    /// When `alc.llm` is called but resumed with `usage: None`, `Done.usage`
1292    /// must be `Some` with non-zero estimated values (Estimated source from
1293    /// prompt length heuristic in `MetricsObserver::on_paused`).
1294    #[tokio::test]
1295    async fn usage_aggregate_uses_estimates_when_usage_omitted() {
1296        use algocline_core::execution::{ResumePayload, TerminalOutcome};
1297
1298        let executor = make_executor().await;
1299        let (registry, _tmp) = make_registry(executor);
1300
1301        let sid = registry
1302            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
1303            .await
1304            .expect("spawn");
1305
1306        // Wait for Paused state.
1307        let mut retries = 0;
1308        loop {
1309            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1310            if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
1311                break;
1312            }
1313            retries += 1;
1314            assert!(retries < 500, "session did not reach Paused state");
1315        }
1316
1317        // Resume without host-reported usage (observer uses Estimated values).
1318        registry
1319            .resume(
1320                &sid,
1321                ResumePayload::Single {
1322                    query_id: "q-0".into(),
1323                    response: "answer".into(),
1324                    usage: None,
1325                },
1326            )
1327            .await
1328            .expect("resume");
1329
1330        let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
1331        match outcome {
1332            TerminalOutcome::Done(result) => {
1333                let usage = result
1334                    .usage
1335                    .expect("Done.usage must be Some when alc.llm was called");
1336                assert!(
1337                    usage.prompt_tokens.unwrap_or(0) > 0
1338                        || usage.completion_tokens.unwrap_or(0) > 0,
1339                    "Done.usage must have non-zero estimated tokens, got: {usage:?}"
1340                );
1341            }
1342            other => panic!("expected Done, got: {other:?}"),
1343        }
1344    }
1345}