algocline_engine/execution/registry.rs
1//! `SessionRegistryV2` — the engine-level session lifecycle manager for the v2 path.
2//!
3//! Coexists with the legacy `SessionRegistry` (`session.rs`) without modifying it.
4//! New callers (Subtask 3's `AppService::ExecutionService` impl) use this registry.
5//!
6//! # Design invariants
7//!
8//! - **Invariant 6**: `spawn_v2()` returns the `SessionId` immediately; execution
9//! runs in the background via `tokio::spawn(driver_loop(...))`.
10//! - **Crux R1**: No `rmcp::*`, `progressToken`, `_meta`, `notifications/*`, or
11//! `mcp_`-prefixed identifiers appear anywhere in this module.
12//! - **Crux R2**: Cancellation uses `CancellationToken::cancel()`; no
13//! `JoinHandle::abort()` or process kill path exists.
14//! - **Crux R3**: `observe()` is a sync `fn` that calls `bus_tx.subscribe()` and
15//! returns a valid handle with zero pre-registered observers.
16//! - **K-4**: The `sessions` `RwLock` is never held across `.await` points; the
17//! `clone-then-release` pattern is used throughout.
18
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicI64, Ordering};
21use std::sync::Arc;
22use std::time::Duration;
23
24use algocline_core::execution::{
25 AwaitError, CancelError, CancelReason, ExecutionState, ExecutionStateTag, ObserveError,
26 ObserverHandle, PauseKind, ProgressEvent, ResumeError, ResumeOutcome, SessionId, SpawnError,
27 StateError, TerminalOutcome,
28};
29use algocline_core::{ExecutionMetrics, ExecutionObserver, QueryId, TokenUsage};
30use tokio::sync::{Mutex, RwLock};
31use tokio_util::sync::CancellationToken;
32
33use super::driver::{build_cancel_info, driver_loop, now_ms, transition_state, DriverContext};
34use super::observer::BroadcastObserverHandle;
35use super::record::{RespTxsMap, SessionRecord};
36use crate::card::FileCardStore;
37use crate::executor::Executor;
38use crate::state::JsonFileStore;
39
40// ---------------------------------------------------------------------------
41// SessionRegistryV2
42// ---------------------------------------------------------------------------
43
44/// Registry that manages the lifecycle of v2 execution sessions.
45///
46/// `Clone` is cheap — the inner `Arc<RwLock<...>>` is reference-counted.
47#[derive(Clone)]
48pub struct SessionRegistryV2 {
49 sessions: Arc<RwLock<HashMap<SessionId, Arc<SessionRecord>>>>,
50 executor: Arc<Executor>,
51 state_store: Arc<JsonFileStore>,
52 card_store: Arc<FileCardStore>,
53 scenarios_dir: std::path::PathBuf,
54}
55
56impl SessionRegistryV2 {
57 /// Create a new empty registry backed by `executor`, with the storage paths
58 /// that will be injected into each spawned VM session.
59 ///
60 /// The `state_store` / `card_store` / `scenarios_dir` mirror the legacy
61 /// `AppService` resolution against the `AppConfig::app_dir()` layout, so a
62 /// v2 caller produces the same on-disk side effects as a legacy caller.
63 pub fn new(
64 executor: Arc<Executor>,
65 state_store: Arc<JsonFileStore>,
66 card_store: Arc<FileCardStore>,
67 scenarios_dir: std::path::PathBuf,
68 ) -> Self {
69 Self {
70 sessions: Arc::new(RwLock::new(HashMap::new())),
71 executor,
72 state_store,
73 card_store,
74 scenarios_dir,
75 }
76 }
77
78 // -----------------------------------------------------------------------
79 // spawn_v2
80 // -----------------------------------------------------------------------
81
82 /// Start a new v2 execution session, returning the `SessionId` immediately.
83 ///
84 /// Execution proceeds in the background via `tokio::spawn(driver_loop(...))`.
85 /// The caller receives the `SessionId` without waiting for execution to complete
86 /// or for the first event (Invariant 6 / debt #40955).
87 ///
88 /// Only [`algocline_core::execution::SpecKind::Run`] is supported in this subtask.
89 /// Other variants return [`SpawnError::InvalidSpec`]. Subtask 3 will extend this
90 /// to handle `Advice` and `Eval` through the full `AppService` path.
91 ///
92 /// # Errors
93 /// - [`SpawnError::Engine`] — the executor failed to start the session.
94 /// - [`SpawnError::InvalidSpec`] — the provided spec is malformed or uses an
95 /// unsupported kind.
96 pub async fn spawn_v2(
97 &self,
98 spec: algocline_core::execution::SessionSpec,
99 ) -> Result<SessionId, SpawnError> {
100 use algocline_core::execution::SpecKind;
101
102 // Extract code from the spec kind. Only Run is supported here.
103 let code = match spec.kind {
104 SpecKind::Run { code } => code,
105 other => {
106 return Err(SpawnError::InvalidSpec(format!(
107 "SessionRegistryV2::spawn_v2 only supports SpecKind::Run; got {:?}",
108 std::mem::discriminant(&other)
109 )));
110 }
111 };
112
113 if code.trim().is_empty() {
114 return Err(SpawnError::InvalidSpec("code must not be empty".into()));
115 }
116
117 let ctx = spec.ctx.unwrap_or_else(|| serde_json::json!({}));
118
119 // Start the per-session VM using the storage paths injected at
120 // registry construction (mirrors legacy AppService::start_and_tick).
121 let session = self
122 .executor
123 .start_session(
124 code,
125 ctx,
126 vec![], // extra_lib_paths — populated by Advice/Eval kinds later
127 vec![], // variant_pkgs — populated by Advice/Eval kinds later
128 Arc::clone(&self.state_store),
129 Arc::clone(&self.card_store),
130 self.scenarios_dir.clone(),
131 )
132 .await
133 .map_err(SpawnError::Engine)?;
134
135 let (exec_task, llm_rx, vm_driver, metrics) = session.into_driver_parts();
136
137 // Build shared components — all constructed before spawning the task.
138 let state: Arc<Mutex<ExecutionState>> = Arc::new(Mutex::new(ExecutionState::Running));
139 let cancel_token = CancellationToken::new();
140 let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
141 // Wall-clock ms timestamp for idle-time GC (Crux #3 legacy parity).
142 // Initialised to now_ms() so a session that is evicted before driver_loop
143 // even starts is treated as "just spawned" rather than immediately expired.
144 let last_active: Arc<AtomicI64> = Arc::new(AtomicI64::new(now_ms()));
145 // Wrap metrics in Arc and clone into both DriverContext and SessionRecord
146 // so both can access the same SessionStatus accumulator (K-4 clone-then-release).
147 let metrics_arc: Arc<ExecutionMetrics> = Arc::new(metrics);
148
149 // Crux R3 (sink-free): the receiver returned alongside `bus_tx` is
150 // dropped immediately. `bus_tx.send()` returns `Err(SendError)` when
151 // 0 observers are subscribed, but every call site in `driver_loop`
152 // uses `let _ = bus_tx.send(...)` to absorb the result — the caller
153 // is never crashed by 0 observers. See
154 // `record::tests::bus_tx_does_not_crash_caller_with_zero_observers`.
155 let (bus_tx, _) = tokio::sync::broadcast::channel::<ProgressEvent>(256);
156
157 let session_id = SessionId::generate();
158
159 // Bundle shared resources for driver_loop.
160 let ctx = DriverContext {
161 state: Arc::clone(&state),
162 bus_tx: bus_tx.clone(),
163 cancel_token: cancel_token.clone(),
164 resp_txs: Arc::clone(&resp_txs),
165 last_active: Arc::clone(&last_active),
166 metrics: Arc::clone(&metrics_arc),
167 };
168
169 let join_handle = tokio::spawn(async move {
170 // vm_driver must stay alive for the duration of the session.
171 let _keep_driver = vm_driver;
172 driver_loop(ctx, exec_task, llm_rx).await;
173 });
174
175 // Assemble the record with all shared fields.
176 let record = Arc::new(SessionRecord {
177 state,
178 bus_tx,
179 last_active,
180 cancel_token,
181 join_handle: Mutex::new(Some(join_handle)),
182 resp_txs,
183 first_cancel_info: Mutex::new(None),
184 metrics: metrics_arc,
185 });
186
187 // Insert into registry.
188 {
189 let mut map = self.sessions.write().await;
190 map.insert(session_id.clone(), record);
191 }
192 Ok(session_id)
193 }
194
195 // -----------------------------------------------------------------------
196 // state
197 // -----------------------------------------------------------------------
198
199 /// Query the current [`ExecutionState`] of a session.
200 ///
201 /// # Errors
202 /// - [`StateError::NotFound`] — no session with the given id exists.
203 pub async fn state(&self, id: &SessionId) -> Result<ExecutionState, StateError> {
204 let record = self
205 .get_record(id)
206 .await
207 .ok_or_else(|| StateError::NotFound(id.clone()))?;
208 let guard = record.state.lock().await;
209 Ok(guard.clone())
210 }
211
212 // -----------------------------------------------------------------------
213 // resume
214 // -----------------------------------------------------------------------
215
216 /// Resume a paused session by delivering LLM responses.
217 ///
218 /// # Errors
219 /// - [`ResumeError::NotFound`] — no session with the given id exists.
220 /// - [`ResumeError::NotPaused`] — the session is not in the `Paused` state.
221 /// - [`ResumeError::AlreadyCancelled`] — the session is already cancelled.
222 pub async fn resume(
223 &self,
224 id: &SessionId,
225 payload: algocline_core::execution::ResumePayload,
226 ) -> Result<ResumeOutcome, ResumeError> {
227 use algocline_core::execution::ResumePayload;
228
229 let record = self
230 .get_record(id)
231 .await
232 .ok_or_else(|| ResumeError::NotFound(id.clone()))?;
233
234 // checkpoint C: at resume entry
235 // If the token is already cancelled, reject the resume immediately.
236 if record.cancel_token.is_cancelled() {
237 return Err(ResumeError::AlreadyCancelled);
238 }
239
240 // Verify the session is Paused (or Cancelled after the token check above).
241 let (actual_tag, pause_kind) = {
242 let guard = record.state.lock().await;
243 let tag = guard.tag();
244 let kind = if let ExecutionState::Paused(ref info) = *guard {
245 info.kind
246 } else {
247 PauseKind::Single
248 };
249 (tag, kind)
250 };
251
252 match actual_tag {
253 ExecutionStateTag::Cancelled => return Err(ResumeError::AlreadyCancelled),
254 ExecutionStateTag::Paused => {} // continue
255 _ => return Err(ResumeError::NotPaused { actual_tag }),
256 }
257
258 // Extract query responses from the payload, preserving per-response usage.
259 let responses: Vec<(String, String, Option<TokenUsage>)> = match payload {
260 ResumePayload::Single {
261 query_id,
262 response,
263 usage,
264 } => vec![(query_id, response, usage)],
265 ResumePayload::Batch(batch) => batch
266 .into_iter()
267 .map(|r| (r.query_id, r.response, r.usage))
268 .collect(),
269 };
270
271 // Deliver responses via the shared resp_txs map.
272 {
273 let mut txs = record.resp_txs.lock().await;
274 for (qid_str, response, _usage) in &responses {
275 let qid = QueryId::parse(qid_str);
276 match txs.remove(&qid) {
277 Some(tx) => {
278 if let Err(_e) = tx.send(Ok(response.clone())) {
279 tracing::debug!(
280 "registry::resume: oneshot receiver already dropped for query {qid_str}"
281 );
282 }
283 }
284 None => {
285 tracing::debug!("registry::resume: no pending tx for query {qid_str}");
286 }
287 }
288 }
289 }
290
291 // Propagate per-response usage to the metrics observer (Crux 1: same Arc).
292 // Observer call is outside the txs lock scope to keep cancel/lock paths intact.
293 let observer = record.metrics.create_observer();
294 for (qid_str, response, usage) in &responses {
295 let qid = QueryId::parse(qid_str);
296 observer.on_response_fed(&qid, response, usage.as_ref());
297 }
298
299 // Transition state from Paused → Running.
300 {
301 let guard = record.state.lock().await;
302 if guard.tag() == ExecutionStateTag::Paused {
303 drop(guard);
304 transition_state(&record.state, &record.bus_tx, ExecutionState::Running).await;
305 let _ = record.bus_tx.send(ProgressEvent::ResumeAccepted {
306 payload_kind: pause_kind,
307 at: now_ms(),
308 });
309 }
310 }
311
312 Ok(ResumeOutcome::Continued)
313 }
314
315 // -----------------------------------------------------------------------
316 // cancel
317 // -----------------------------------------------------------------------
318
319 /// Request cooperative cancellation of a session.
320 ///
321 /// Idempotent: returns `Ok(())` for sessions already in a terminal state.
322 ///
323 /// # Errors
324 /// - [`CancelError::NotFound`] — no session with the given id exists.
325 pub async fn cancel(&self, id: &SessionId, reason: CancelReason) -> Result<(), CancelError> {
326 let record = self
327 .get_record(id)
328 .await
329 .ok_or_else(|| CancelError::NotFound(id.clone()))?;
330
331 // Idempotency: already terminal → Ok.
332 {
333 let guard = record.state.lock().await;
334 if matches!(
335 guard.tag(),
336 ExecutionStateTag::Done | ExecutionStateTag::Failed | ExecutionStateTag::Cancelled
337 ) {
338 return Ok(());
339 }
340 }
341
342 // Store the first CancelInfo (idempotent: only set once).
343 {
344 let mut first = record.first_cancel_info.lock().await;
345 if first.is_none() {
346 let info = build_cancel_info(&record.state, reason).await;
347 *first = Some(info);
348 }
349 }
350
351 // Signal the driver (Crux R2: cooperative — no abort).
352 record.cancel_token.cancel();
353
354 // For Paused sessions, transition immediately: the driver is blocked
355 // waiting for a resume and won't hit a checkpoint on its own.
356 let should_transition = {
357 let guard = record.state.lock().await;
358 guard.tag() == ExecutionStateTag::Paused
359 };
360 if should_transition {
361 let cancel_info_opt = {
362 let first = record.first_cancel_info.lock().await;
363 first.clone()
364 };
365 if let Some(info) = cancel_info_opt {
366 transition_state(
367 &record.state,
368 &record.bus_tx,
369 ExecutionState::Cancelled(info),
370 )
371 .await;
372 }
373 }
374
375 Ok(())
376 }
377
378 // -----------------------------------------------------------------------
379 // observe (sync fn — Crux R3)
380 // -----------------------------------------------------------------------
381
382 /// Subscribe to the progress event stream for a session.
383 ///
384 /// This is a **synchronous** `fn`: `broadcast::Sender::subscribe()` is
385 /// synchronous and does not perform I/O. Multiple concurrent subscribers
386 /// each receive the full event stream independently (Crux R3).
387 ///
388 /// # Errors
389 /// - [`ObserveError::NotFound`] — no session with the given id exists, **or**
390 /// `try_read()` experienced lock contention (write lock held by `spawn`).
391 /// The contention path emits `tracing::warn!(target = "session.observe", ...)`;
392 /// callers cannot distinguish it from a true absent-session result.
393 pub fn observe(&self, id: &SessionId) -> Result<Box<dyn ObserverHandle>, ObserveError> {
394 // Non-blocking read; the write lock is only held very briefly during spawn.
395 match self.sessions.try_read() {
396 Ok(map) => {
397 let record = map
398 .get(id)
399 .ok_or_else(|| ObserveError::NotFound(id.clone()))?;
400 Ok(Box::new(BroadcastObserverHandle::new(&record.bus_tx)))
401 }
402 Err(_) => {
403 tracing::warn!(
404 target = "session.observe",
405 session_id = %id,
406 "try_read contention; surfacing as NotFound"
407 );
408 Err(ObserveError::NotFound(id.clone()))
409 }
410 }
411 }
412
413 // -----------------------------------------------------------------------
414 // await_terminal
415 // -----------------------------------------------------------------------
416
417 /// Await the terminal state of a session.
418 ///
419 /// Polls the shared state until it reaches a terminal variant (`Done`,
420 /// `Cancelled`, or `Failed`). The `JoinHandle` is never `.abort()`-ed
421 /// (Crux R2).
422 ///
423 /// # Errors
424 /// - [`AwaitError::NotFound`] — no session with the given id exists.
425 pub async fn await_terminal(&self, id: &SessionId) -> Result<TerminalOutcome, AwaitError> {
426 let record = self
427 .get_record(id)
428 .await
429 .ok_or_else(|| AwaitError::NotFound(id.clone()))?;
430
431 // Single-awaiter path: take the JoinHandle and await `driver_loop`
432 // completion directly. Replaces the previous `yield_now()` polling
433 // loop that occupied a tokio worker slot scheduling-wise even though
434 // it consumed no CPU. The `driver_loop` guarantees a terminal
435 // `transition_state` before returning, so once `handle.await` resolves
436 // the state is guaranteed terminal.
437 let handle_opt = {
438 let mut guard = record.join_handle.lock().await;
439 guard.take()
440 };
441
442 if let Some(handle) = handle_opt {
443 handle
444 .await
445 .map_err(|e| AwaitError::Joined(format!("driver_loop join error: {e}")))?;
446 }
447 // (None branch: another caller has already taken the handle. Either
448 // they are still awaiting it — in which case the driver_loop has not
449 // yet transitioned to terminal — or they have already finished, in
450 // which case the state is terminal. We fall through to a single
451 // state read; the rare concurrent race returns `AwaitError::Joined`.)
452
453 let guard = record.state.lock().await;
454 match &*guard {
455 ExecutionState::Done(result) => Ok(TerminalOutcome::Done(result.clone())),
456 ExecutionState::Cancelled(info) => Ok(TerminalOutcome::Cancelled(info.clone())),
457 ExecutionState::Failed(info) => Ok(TerminalOutcome::Failed(info.clone())),
458 other => Err(AwaitError::Joined(format!(
459 "await_terminal: driver_loop completed but state is {:?} (concurrent awaiter race)",
460 other.tag()
461 ))),
462 }
463 }
464
465 // -----------------------------------------------------------------------
466 // Internal helpers
467 // -----------------------------------------------------------------------
468
469 // -----------------------------------------------------------------------
470 // spawn_gc_task
471 // -----------------------------------------------------------------------
472
473 /// Spawn a background GC task that periodically evicts idle, terminal sessions.
474 ///
475 /// Mirrors the legacy `SessionRegistry::spawn_gc_task` contract (Crux #3 legacy
476 /// parity) with two extensions:
477 ///
478 /// 1. **Subscriber-count gate** (Crux #1): a session is only evicted when
479 /// `bus_tx.receiver_count() == 0` at the moment the write guard is held,
480 /// ensuring no use-after-eviction for active observers.
481 /// 2. **Parameterised `interval`** (Crux #2): callers can supply a sub-second
482 /// interval for test determinism without requiring `tokio::time::pause`.
483 ///
484 /// The `JoinHandle` returned by `tokio::spawn` is intentionally dropped —
485 /// the task runs until process exit (legacy fire-and-forget contract).
486 ///
487 /// # K-4 invariant
488 ///
489 /// The `sessions` write guard is acquired once per GC tick. All operations
490 /// inside the guard (`receiver_count()`, `AtomicI64::load`, `HashMap::remove`)
491 /// are **synchronous** — no `.await` is called while the guard is held.
492 pub fn spawn_gc_task(&self, ttl: Duration, interval: Duration) {
493 let sessions = Arc::clone(&self.sessions);
494 tokio::spawn(async move {
495 let mut ticker = tokio::time::interval(interval);
496 loop {
497 ticker.tick().await;
498 // Acquire the write guard once per tick. All reads and removes
499 // within this block are sync — no `.await` inside the guard (K-4).
500 let mut map = sessions.write().await;
501 let mut to_evict: Vec<SessionId> = Vec::new();
502 for (id, record) in map.iter() {
503 // Crux #1: check subscriber count atomically with the guard held.
504 // `receiver_count()` is sync (no lock required on its own), but
505 // holding the write guard here means `observe()` cannot attach a
506 // new subscriber via `try_read()` concurrently — TOCTOU excluded.
507 let no_subscribers = record.bus_tx.receiver_count() == 0;
508 let last_ms = record.last_active.load(Ordering::Relaxed);
509 if no_subscribers && is_expired_v2(last_ms, ttl) {
510 to_evict.push(id.clone());
511 }
512 }
513 for id in &to_evict {
514 tracing::info!(session_id = %id, "GC: reaping expired v2 session");
515 map.remove(id);
516 }
517 }
518 });
519 }
520
521 // -----------------------------------------------------------------------
522 // Internal helpers
523 // -----------------------------------------------------------------------
524
525 /// Clone-then-release lookup (K-4): the lock is dropped before returning.
526 async fn get_record(&self, id: &SessionId) -> Option<Arc<SessionRecord>> {
527 let map = self.sessions.read().await;
528 map.get(id).cloned()
529 }
530}
531
532// ---------------------------------------------------------------------------
533// GC helpers (module-private)
534// ---------------------------------------------------------------------------
535
536/// Returns `true` when the session has been idle for at least `ttl`.
537///
538/// Uses wall-clock milliseconds matching the legacy `is_expired_impl` semantics:
539/// `now_ms() - last_active_ms >= ttl.as_millis()`.
540///
541/// The legacy implementation uses `Instant` (monotonic) whereas this uses
542/// `SystemTime` (wall-clock) — identical to the `now_ms()` helper in `driver.rs`
543/// and to `Session.last_activity_ms` in the legacy codebase (Crux #3 parity).
544fn is_expired_v2(last_active_ms: i64, ttl: Duration) -> bool {
545 let now = super::driver::now_ms();
546 let elapsed_ms = now.saturating_sub(last_active_ms);
547 elapsed_ms >= ttl.as_millis() as i64
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use algocline_core::execution::{
554 CancelCode, CancelReason, ExecutionState, SessionSpec, SpecKind,
555 };
556 use std::sync::Arc;
557
558 async fn make_executor() -> Arc<Executor> {
559 Arc::new(Executor::new(vec![]).await.expect("Executor::new"))
560 }
561
562 /// Construct a registry backed by per-test tempdir paths so the legacy
563 /// AppConfig::app_dir() layout is approximated without touching the user's
564 /// `~/.algocline` directory.
565 fn make_registry(executor: Arc<Executor>) -> (SessionRegistryV2, tempfile::TempDir) {
566 let tmp = tempfile::tempdir().expect("tempdir");
567 let state_store = Arc::new(JsonFileStore::new(tmp.path().join("state")));
568 let card_store = Arc::new(FileCardStore::new(tmp.path().join("cards")));
569 let scenarios_dir = tmp.path().join("scenarios");
570 (
571 SessionRegistryV2::new(executor, state_store, card_store, scenarios_dir),
572 tmp,
573 )
574 }
575
576 fn simple_spec(code: &str) -> SessionSpec {
577 SessionSpec {
578 kind: SpecKind::Run {
579 code: code.to_owned(),
580 },
581 project_root: None,
582 ctx: None,
583 }
584 }
585
586 fn cancel_reason() -> CancelReason {
587 CancelReason {
588 code: CancelCode::User,
589 detail: None,
590 requested_at: now_ms(),
591 }
592 }
593
594 // -----------------------------------------------------------------------
595 // spawn_returns_session_id_immediately (debt #40955)
596 // -----------------------------------------------------------------------
597
598 /// `spawn_v2` must return `SessionId` without blocking on execution.
599 #[tokio::test]
600 async fn spawn_returns_session_id_immediately() {
601 let executor = make_executor().await;
602 let (registry, _tmp) = make_registry(executor);
603
604 let start = std::time::Instant::now();
605 let result = tokio::time::timeout(
606 std::time::Duration::from_millis(200),
607 registry.spawn_v2(simple_spec("return 42")),
608 )
609 .await;
610
611 assert!(result.is_ok(), "spawn_v2 must complete within 200ms");
612 assert!(
613 result.unwrap().is_ok(),
614 "spawn_v2 must return Ok(SessionId)"
615 );
616
617 let elapsed = start.elapsed();
618 assert!(
619 elapsed < std::time::Duration::from_millis(150),
620 "spawn_v2 took too long: {elapsed:?}"
621 );
622 }
623
624 // -----------------------------------------------------------------------
625 // state_query_running
626 // -----------------------------------------------------------------------
627
628 /// Immediately after spawn, `state()` must return Running or Paused.
629 #[tokio::test]
630 async fn state_query_running() {
631 let executor = make_executor().await;
632 let (registry, _tmp) = make_registry(executor);
633
634 // Lua that pauses immediately so the session is observable.
635 let sid = registry
636 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
637 .await
638 .expect("spawn");
639
640 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
641
642 let state = registry.state(&sid).await.expect("state");
643 assert!(
644 matches!(state, ExecutionState::Running | ExecutionState::Paused(_)),
645 "state just after spawn must be Running or Paused, got: {:?}",
646 state.tag()
647 );
648 }
649
650 // -----------------------------------------------------------------------
651 // cancel_at_checkpoint_c_at_resume_entry
652 // -----------------------------------------------------------------------
653
654 /// `resume()` on a cancelled session must return `AlreadyCancelled`.
655 #[tokio::test]
656 async fn cancel_at_checkpoint_c_at_resume_entry() {
657 use algocline_core::execution::{ResumeError, ResumePayload};
658
659 let executor = make_executor().await;
660 let (registry, _tmp) = make_registry(executor);
661
662 let sid = registry
663 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
664 .await
665 .expect("spawn");
666
667 // Wait for Paused.
668 let mut retries = 0;
669 loop {
670 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
671 if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
672 break;
673 }
674 retries += 1;
675 assert!(retries < 50, "session did not reach Paused state");
676 }
677
678 registry
679 .cancel(&sid, cancel_reason())
680 .await
681 .expect("cancel");
682
683 // checkpoint C: at resume entry
684 let result = registry
685 .resume(
686 &sid,
687 ResumePayload::Single {
688 query_id: "q".into(),
689 response: "4".into(),
690 usage: None,
691 },
692 )
693 .await;
694
695 assert!(
696 matches!(result, Err(ResumeError::AlreadyCancelled)),
697 "resume on cancelled session must return AlreadyCancelled, got: {result:?}"
698 );
699 }
700
701 // -----------------------------------------------------------------------
702 // cancel_idempotent
703 // -----------------------------------------------------------------------
704
705 #[tokio::test]
706 async fn cancel_idempotent() {
707 let executor = make_executor().await;
708 let (registry, _tmp) = make_registry(executor);
709
710 let sid = registry
711 .spawn_v2(simple_spec("return 1"))
712 .await
713 .expect("spawn");
714
715 registry
716 .cancel(&sid, cancel_reason())
717 .await
718 .expect("first cancel");
719 registry
720 .cancel(&sid, cancel_reason())
721 .await
722 .expect("second cancel");
723 }
724
725 // -----------------------------------------------------------------------
726 // await_terminal returns Done without busy-polling
727 // -----------------------------------------------------------------------
728
729 /// Regression for #2 (case A): `await_terminal` must complete by awaiting
730 /// the `driver_loop` `JoinHandle` directly (single-awaiter `take` +
731 /// `.await`) instead of polling `state` in a `yield_now()` loop. We can't
732 /// observe scheduler occupancy from a test, but we can verify the
733 /// behavioural contract: (1) the call returns the correct `TerminalOutcome`,
734 /// (2) it returns within a tight wall-clock budget without sleep, and
735 /// (3) a second concurrent caller does not panic.
736 #[tokio::test]
737 async fn await_terminal_returns_done_for_trivial_script() {
738 let executor = make_executor().await;
739 let (registry, _tmp) = make_registry(executor);
740
741 let sid = registry
742 .spawn_v2(simple_spec("return 42"))
743 .await
744 .expect("spawn");
745
746 let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
747 match outcome {
748 TerminalOutcome::Done(result) => {
749 assert_eq!(result.value, serde_json::json!(42));
750 }
751 other => panic!("expected Done, got: {other:?}"),
752 }
753 }
754
755 /// Regression for #2 (case A) single-awaiter discipline: when two callers
756 /// race on `await_terminal`, the second caller (which observes `None` after
757 /// the first has taken the handle) must NOT panic. It must either return
758 /// the same terminal outcome (if the first has already finished) or an
759 /// `AwaitError::Joined` (the documented race fallback).
760 #[tokio::test]
761 async fn await_terminal_does_not_panic_on_second_concurrent_caller() {
762 let executor = make_executor().await;
763 let (registry, _tmp) = make_registry(executor);
764
765 let sid = registry
766 .spawn_v2(simple_spec("return 99"))
767 .await
768 .expect("spawn");
769
770 let r1 = registry.clone();
771 let r2 = registry.clone();
772 let s1 = sid.clone();
773 let s2 = sid.clone();
774
775 let h1 = tokio::spawn(async move { r1.await_terminal(&s1).await });
776 let h2 = tokio::spawn(async move { r2.await_terminal(&s2).await });
777
778 let out1 = h1.await.expect("h1 join");
779 let out2 = h2.await.expect("h2 join");
780
781 // First-caller path must succeed with the real outcome.
782 let first_ok = matches!(&out1, Ok(TerminalOutcome::Done(_)))
783 || matches!(&out2, Ok(TerminalOutcome::Done(_)));
784 assert!(
785 first_ok,
786 "at least one caller must observe Done; got out1={out1:?}, out2={out2:?}"
787 );
788 // Second caller may have observed Joined (race) or Done; either is OK,
789 // neither must panic — which we've already verified by the join above.
790 }
791
792 // -----------------------------------------------------------------------
793 // observe_sink_free (Crux R3 — registry level)
794 // -----------------------------------------------------------------------
795
796 /// `observe()` must succeed and return a valid handle even with 0 prior observers.
797 #[tokio::test]
798 async fn observe_sink_free_registry() {
799 let executor = make_executor().await;
800 let (registry, _tmp) = make_registry(executor);
801
802 let sid = registry
803 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
804 .await
805 .expect("spawn");
806
807 // observe() before any subscriber exists must succeed.
808 let handle = registry.observe(&sid);
809 assert!(
810 handle.is_ok(),
811 "observe() must return Ok even with 0 prior observers"
812 );
813 }
814
815 // -----------------------------------------------------------------------
816 // observe_multi_subscriber_fan_out (Crux R3 — registry level)
817 // -----------------------------------------------------------------------
818
819 /// Multiple independent observers each get the same events.
820 #[tokio::test]
821 async fn observe_multi_subscriber_fan_out_registry() {
822 use algocline_core::execution::ObserverRecvError;
823
824 let executor = make_executor().await;
825 let (registry, _tmp) = make_registry(executor);
826
827 // A script that returns immediately — the driver will publish Done.
828 let sid = registry
829 .spawn_v2(simple_spec("return 99"))
830 .await
831 .expect("spawn");
832
833 // Subscribe 3 observers.
834 let mut h1 = registry.observe(&sid).expect("observe h1");
835 let mut h2 = registry.observe(&sid).expect("observe h2");
836 let mut h3 = registry.observe(&sid).expect("observe h3");
837
838 // Wait for terminal so we know events have been published.
839 let _ = registry.await_terminal(&sid).await;
840
841 // Each observer must receive at least the terminal StateTransition.
842 // Drain with idle-timeout: bus_tx is retained in SessionRecord for
843 // sink-free late-subscribe (Crux R3), so Closed never fires while the
844 // registry is alive. A 100ms idle window after await_terminal() is
845 // sufficient — all events are already buffered.
846 use std::time::Duration;
847 for (label, handle) in [("h1", &mut h1), ("h2", &mut h2), ("h3", &mut h3)] {
848 let mut got_transition = false;
849 loop {
850 match tokio::time::timeout(Duration::from_millis(100), handle.recv()).await {
851 Ok(Ok(ProgressEvent::StateTransition { .. })) => got_transition = true,
852 Ok(Ok(_)) => {}
853 Ok(Err(ObserverRecvError::Closed)) => break,
854 Ok(Err(ObserverRecvError::Lagged(_))) => {}
855 Err(_) => break, // idle-timeout: no more events coming
856 }
857 }
858 assert!(
859 got_transition,
860 "{label}: must receive at least one StateTransition event"
861 );
862 }
863 }
864
865 // -----------------------------------------------------------------------
866 // AC#5a — gc_evicts_terminal_session_after_ttl
867 // -----------------------------------------------------------------------
868
869 /// GC must remove a terminal session (no subscribers) after TTL has elapsed
870 /// and one full interval tick has fired.
871 ///
872 /// Covers: `tokio::time::interval` + `AtomicI64::load` + `RwLock::write` +
873 /// `receiver_count == 0` (Crux #1 / concurrency-analysis §2 5a).
874 #[tokio::test]
875 async fn gc_evicts_terminal_session_after_ttl() {
876 use algocline_core::execution::ObserveError;
877 use std::time::Duration;
878
879 let executor = make_executor().await;
880 let (registry, _tmp) = make_registry(executor);
881
882 let ttl = Duration::from_millis(100);
883 let interval = Duration::from_millis(50);
884
885 let sid = registry
886 .spawn_v2(simple_spec("return 1"))
887 .await
888 .expect("spawn");
889
890 // Wait for the session to complete (terminal, no subscribers).
891 registry.await_terminal(&sid).await.expect("await_terminal");
892
893 // Sleep beyond one full GC interval + TTL + slack so the GC has had at
894 // least one opportunity to evict (R4 fallback: interval + ttl + 50ms).
895 tokio::time::sleep(interval + ttl + Duration::from_millis(50)).await;
896
897 registry.spawn_gc_task(ttl, interval);
898
899 // Sleep again to let the newly spawned GC run at least one tick.
900 tokio::time::sleep(interval + Duration::from_millis(50)).await;
901
902 // The session must now be gone.
903 assert!(
904 matches!(registry.observe(&sid), Err(ObserveError::NotFound(_))),
905 "session must be evicted after TTL + interval"
906 );
907 }
908
909 // -----------------------------------------------------------------------
910 // AC#5b — gc_does_not_evict_session_with_active_subscriber
911 // -----------------------------------------------------------------------
912
913 /// GC must NOT evict a session that still has active subscribers, even after
914 /// TTL has elapsed. Once the subscriber is dropped, subsequent GC ticks must
915 /// evict the session.
916 ///
917 /// Covers: `broadcast::Sender::receiver_count` > 0 path (Crux #1 /
918 /// concurrency-analysis §2 5b).
919 #[tokio::test]
920 async fn gc_does_not_evict_session_with_active_subscriber() {
921 use algocline_core::execution::ObserveError;
922 use std::time::Duration;
923
924 let executor = make_executor().await;
925 let (registry, _tmp) = make_registry(executor);
926
927 let ttl = Duration::from_millis(100);
928 let interval = Duration::from_millis(50);
929
930 let sid = registry
931 .spawn_v2(simple_spec("return 2"))
932 .await
933 .expect("spawn");
934
935 // Acquire a subscriber *before* the session reaches terminal.
936 let _handle = registry.observe(&sid).expect("observe");
937
938 // Wait for terminal while subscriber is still held.
939 registry.await_terminal(&sid).await.expect("await_terminal");
940
941 // Start GC — session has receiver_count > 0, must NOT be evicted.
942 registry.spawn_gc_task(ttl, interval);
943
944 // Sleep well beyond TTL + interval.
945 tokio::time::sleep(interval + ttl + Duration::from_millis(50)).await;
946
947 // Session must still be present (subscriber is alive).
948 assert!(
949 registry.observe(&sid).is_ok(),
950 "session must NOT be evicted while a subscriber is held"
951 );
952
953 // Drop the subscriber — now eviction is permitted.
954 drop(_handle);
955
956 // Sleep for another interval + slack so GC ticks again after the drop.
957 tokio::time::sleep(interval + Duration::from_millis(50)).await;
958
959 // Now the session should be evicted.
960 assert!(
961 matches!(registry.observe(&sid), Err(ObserveError::NotFound(_))),
962 "session must be evicted after subscriber is dropped and GC ticks"
963 );
964 }
965
966 // -----------------------------------------------------------------------
967 // AC#5c — gc_respects_interval_no_immediate_eviction
968 // -----------------------------------------------------------------------
969
970 /// GC must NOT evict a terminal session before the interval has fired,
971 /// even if TTL has already elapsed.
972 ///
973 /// Covers: `tokio::time::interval MissedTickBehavior::Burst` guard
974 /// (R4 / concurrency-analysis §2 5c).
975 #[tokio::test]
976 async fn gc_respects_interval_no_immediate_eviction() {
977 use std::time::Duration;
978
979 let executor = make_executor().await;
980 let (registry, _tmp) = make_registry(executor);
981
982 // Use a long interval so we can assert the session is still present
983 // after TTL has elapsed but before an interval tick fires.
984 let ttl = Duration::from_millis(20);
985 let interval = Duration::from_millis(500);
986
987 let sid = registry
988 .spawn_v2(simple_spec("return 3"))
989 .await
990 .expect("spawn");
991
992 registry.await_terminal(&sid).await.expect("await_terminal");
993
994 // Start GC after TTL has elapsed — the first tick fires up to `interval`
995 // from now, so we check immediately (well before the first tick).
996 tokio::time::sleep(ttl + Duration::from_millis(10)).await;
997 registry.spawn_gc_task(ttl, interval);
998
999 // Check immediately — no tick has fired yet.
1000 assert!(
1001 registry.observe(&sid).is_ok(),
1002 "session must NOT be evicted before first GC tick fires"
1003 );
1004 }
1005
1006 // -----------------------------------------------------------------------
1007 // AC#5d — test_atomic_last_active_updated_by_driver_loop
1008 // -----------------------------------------------------------------------
1009
1010 /// Concurrent writer (store) and reader (load) on `last_active` with
1011 /// Relaxed ordering must not panic or cause UB; final value must be > 0.
1012 ///
1013 /// Covers: `AtomicI64::store` + `AtomicI64::load` Relaxed ordering safety
1014 /// under concurrent access (concurrency-analysis §2 5d / Crux #3 invariant).
1015 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1016 async fn test_atomic_last_active_updated_by_driver_loop() {
1017 use std::sync::atomic::{AtomicI64, Ordering};
1018 use std::sync::Arc;
1019
1020 let last_active = Arc::new(AtomicI64::new(0));
1021
1022 let writer_la = Arc::clone(&last_active);
1023 let writer = tokio::spawn(async move {
1024 for _ in 0..1000 {
1025 writer_la.store(now_ms(), Ordering::Relaxed);
1026 tokio::task::yield_now().await;
1027 }
1028 });
1029
1030 let reader_la = Arc::clone(&last_active);
1031 let reader = tokio::spawn(async move {
1032 for _ in 0..1000 {
1033 let _ = reader_la.load(Ordering::Relaxed);
1034 tokio::task::yield_now().await;
1035 }
1036 });
1037
1038 writer.await.expect("writer task must not panic");
1039 reader.await.expect("reader task must not panic");
1040
1041 // After 1000 stores of now_ms() the value must be > 0.
1042 assert!(
1043 last_active.load(Ordering::Relaxed) > 0,
1044 "last_active must be updated to a non-zero wall-clock value"
1045 );
1046 }
1047
1048 // -----------------------------------------------------------------------
1049 // AC#5e — test_concurrent_observe_during_gc_tick
1050 // -----------------------------------------------------------------------
1051
1052 /// 8 concurrent tasks each calling `observe()` 100 times while GC is running
1053 /// must produce only `Ok` or `Err(NotFound)` — never a panic.
1054 ///
1055 /// Covers: `RwLock::try_read` vs `RwLock::write` mutual exclusion +
1056 /// `Arc<RwLock<HashMap>>` clone safety (concurrency-analysis §2 5e).
1057 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1058 async fn test_concurrent_observe_during_gc_tick() {
1059 use algocline_core::execution::ObserveError;
1060 use std::sync::Arc;
1061 use std::time::Duration;
1062
1063 let executor = make_executor().await;
1064 let (registry, _tmp) = make_registry(executor);
1065 let registry = Arc::new(registry);
1066
1067 let ttl = Duration::from_millis(10);
1068 let interval = Duration::from_millis(5);
1069
1070 let sid = registry
1071 .spawn_v2(simple_spec("return 42"))
1072 .await
1073 .expect("spawn");
1074
1075 registry.await_terminal(&sid).await.expect("await_terminal");
1076 registry.spawn_gc_task(ttl, interval);
1077
1078 let mut handles = Vec::new();
1079 for _ in 0..8 {
1080 let reg = Arc::clone(®istry);
1081 let id = sid.clone();
1082 handles.push(tokio::spawn(async move {
1083 for _ in 0..100 {
1084 match reg.observe(&id) {
1085 Ok(_) | Err(ObserveError::NotFound(_)) => {}
1086 }
1087 tokio::task::yield_now().await;
1088 }
1089 }));
1090 }
1091
1092 for h in handles {
1093 h.await.expect("concurrent observe task must not panic");
1094 }
1095 }
1096
1097 // -----------------------------------------------------------------------
1098 // AC#5f — test_gc_task_spawn_survives_handle_drop
1099 // -----------------------------------------------------------------------
1100
1101 /// `spawn_gc_task` internally drops the `JoinHandle` (legacy fire-and-forget).
1102 /// Verify the GC loop continues running after `spawn_gc_task()` returns by
1103 /// asserting eviction occurs after the expected window.
1104 ///
1105 /// Covers: `tokio::task::spawn` "JoinHandle drop ≠ task abort" contract
1106 /// (concurrency-analysis §2 5f / Crux #2 legacy parity).
1107 #[tokio::test]
1108 async fn test_gc_task_spawn_survives_handle_drop() {
1109 use algocline_core::execution::ObserveError;
1110 use std::time::Duration;
1111
1112 let executor = make_executor().await;
1113 let (registry, _tmp) = make_registry(executor);
1114
1115 let ttl = Duration::from_millis(100);
1116 let interval = Duration::from_millis(50);
1117
1118 // Start GC first — handle is immediately dropped inside spawn_gc_task.
1119 registry.spawn_gc_task(ttl, interval);
1120
1121 let sid = registry
1122 .spawn_v2(simple_spec("return 99"))
1123 .await
1124 .expect("spawn");
1125
1126 registry.await_terminal(&sid).await.expect("await_terminal");
1127
1128 // Sleep long enough for at least 2 GC ticks after TTL.
1129 tokio::time::sleep(ttl + interval * 2 + Duration::from_millis(50)).await;
1130
1131 // The GC task (whose JoinHandle was dropped) must have continued running
1132 // and evicted the session.
1133 assert!(
1134 matches!(registry.observe(&sid), Err(ObserveError::NotFound(_))),
1135 "session must be evicted by the GC task even after its JoinHandle was dropped"
1136 );
1137 }
1138
1139 // -----------------------------------------------------------------------
1140 // AC#5g — test_arc_rwlock_hashmap_shared_across_clones
1141 // -----------------------------------------------------------------------
1142
1143 /// `SessionRegistryV2: Clone` shares the same underlying
1144 /// `Arc<RwLock<HashMap>>`. A session spawned via one clone must be visible
1145 /// from another clone, and GC started on one clone must evict sessions
1146 /// visible from the other.
1147 ///
1148 /// Covers: `Arc<RwLock<HashMap>>` Send + Sync + Clone shared-state contract
1149 /// (concurrency-analysis §2 5g).
1150 #[tokio::test]
1151 async fn test_arc_rwlock_hashmap_shared_across_clones() {
1152 use algocline_core::execution::ObserveError;
1153 use std::time::Duration;
1154
1155 let executor = make_executor().await;
1156 let (registry_a, _tmp) = make_registry(executor);
1157 let registry_b = registry_a.clone();
1158
1159 let ttl = Duration::from_millis(100);
1160 let interval = Duration::from_millis(50);
1161
1162 // Spawn via registry_a.
1163 let sid = registry_a
1164 .spawn_v2(simple_spec("return 7"))
1165 .await
1166 .expect("spawn via registry_a");
1167
1168 // Session must be visible from registry_b (shared Arc<RwLock<HashMap>>).
1169 assert!(
1170 registry_b.observe(&sid).is_ok(),
1171 "session spawned via registry_a must be visible from registry_b"
1172 );
1173
1174 // Wait for terminal.
1175 registry_a
1176 .await_terminal(&sid)
1177 .await
1178 .expect("await_terminal");
1179
1180 // Start GC via registry_b.
1181 registry_b.spawn_gc_task(ttl, interval);
1182
1183 // Sleep long enough for eviction.
1184 tokio::time::sleep(ttl + interval + Duration::from_millis(50)).await;
1185
1186 // Session evicted via registry_b's GC must be invisible from registry_a too.
1187 assert!(
1188 matches!(registry_a.observe(&sid), Err(ObserveError::NotFound(_))),
1189 "session evicted by registry_b GC must be gone from registry_a too"
1190 );
1191 }
1192
1193 // -----------------------------------------------------------------------
1194 // usage_aggregate_none_for_run_without_llm_calls (test (b))
1195 // -----------------------------------------------------------------------
1196
1197 /// When no `alc.llm` call occurs, `Done.usage` must be `None`.
1198 /// Verifies that the `on_paused` wiring does not falsely activate when no
1199 /// LLM call occurs, and that `usage_aggregate()` gates on `llm_calls > 0`.
1200 #[tokio::test]
1201 async fn usage_aggregate_none_for_run_without_llm_calls() {
1202 use algocline_core::execution::TerminalOutcome;
1203
1204 let executor = make_executor().await;
1205 let (registry, _tmp) = make_registry(executor);
1206
1207 let sid = registry
1208 .spawn_v2(simple_spec("return 42"))
1209 .await
1210 .expect("spawn");
1211
1212 let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
1213 match outcome {
1214 TerminalOutcome::Done(result) => {
1215 assert_eq!(
1216 result.usage, None,
1217 "Done.usage must be None when no alc.llm call occurred"
1218 );
1219 }
1220 other => panic!("expected Done, got: {other:?}"),
1221 }
1222 }
1223
1224 // -----------------------------------------------------------------------
1225 // usage_aggregate_some_for_run_with_llm_call (test (a))
1226 // -----------------------------------------------------------------------
1227
1228 /// When `alc.llm` is called and resumed with host-reported usage,
1229 /// `Done.usage` must be `Some(TokenUsage { prompt_tokens: Some(10), completion_tokens: Some(5) })`.
1230 /// Verifies both `on_paused` wiring and `on_response_fed` propagation.
1231 #[tokio::test]
1232 async fn usage_aggregate_some_for_run_with_llm_call() {
1233 use algocline_core::execution::{ResumePayload, TerminalOutcome};
1234 use algocline_core::TokenUsage;
1235
1236 let executor = make_executor().await;
1237 let (registry, _tmp) = make_registry(executor);
1238
1239 let sid = registry
1240 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
1241 .await
1242 .expect("spawn");
1243
1244 // Wait for Paused state.
1245 let mut retries = 0;
1246 loop {
1247 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1248 if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
1249 break;
1250 }
1251 retries += 1;
1252 assert!(retries < 500, "session did not reach Paused state");
1253 }
1254
1255 // Resume with host-reported usage.
1256 registry
1257 .resume(
1258 &sid,
1259 ResumePayload::Single {
1260 query_id: "q-0".into(),
1261 response: "answer".into(),
1262 usage: Some(TokenUsage {
1263 prompt_tokens: Some(10),
1264 completion_tokens: Some(5),
1265 }),
1266 },
1267 )
1268 .await
1269 .expect("resume");
1270
1271 let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
1272 match outcome {
1273 TerminalOutcome::Done(result) => {
1274 assert_eq!(
1275 result.usage,
1276 Some(TokenUsage {
1277 prompt_tokens: Some(10),
1278 completion_tokens: Some(5),
1279 }),
1280 "Done.usage must reflect host-reported token counts"
1281 );
1282 }
1283 other => panic!("expected Done, got: {other:?}"),
1284 }
1285 }
1286
1287 // -----------------------------------------------------------------------
1288 // usage_aggregate_uses_estimates_when_usage_omitted (test (d))
1289 // -----------------------------------------------------------------------
1290
1291 /// When `alc.llm` is called but resumed with `usage: None`, `Done.usage`
1292 /// must be `Some` with non-zero estimated values (Estimated source from
1293 /// prompt length heuristic in `MetricsObserver::on_paused`).
1294 #[tokio::test]
1295 async fn usage_aggregate_uses_estimates_when_usage_omitted() {
1296 use algocline_core::execution::{ResumePayload, TerminalOutcome};
1297
1298 let executor = make_executor().await;
1299 let (registry, _tmp) = make_registry(executor);
1300
1301 let sid = registry
1302 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
1303 .await
1304 .expect("spawn");
1305
1306 // Wait for Paused state.
1307 let mut retries = 0;
1308 loop {
1309 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1310 if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
1311 break;
1312 }
1313 retries += 1;
1314 assert!(retries < 500, "session did not reach Paused state");
1315 }
1316
1317 // Resume without host-reported usage (observer uses Estimated values).
1318 registry
1319 .resume(
1320 &sid,
1321 ResumePayload::Single {
1322 query_id: "q-0".into(),
1323 response: "answer".into(),
1324 usage: None,
1325 },
1326 )
1327 .await
1328 .expect("resume");
1329
1330 let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
1331 match outcome {
1332 TerminalOutcome::Done(result) => {
1333 let usage = result
1334 .usage
1335 .expect("Done.usage must be Some when alc.llm was called");
1336 assert!(
1337 usage.prompt_tokens.unwrap_or(0) > 0
1338 || usage.completion_tokens.unwrap_or(0) > 0,
1339 "Done.usage must have non-zero estimated tokens, got: {usage:?}"
1340 );
1341 }
1342 other => panic!("expected Done, got: {other:?}"),
1343 }
1344 }
1345}