algocline_engine/execution/registry.rs
1//! `SessionRegistryV2` — the engine-level session lifecycle manager for the v2 path.
2//!
3//! Coexists with the legacy `SessionRegistry` (`session.rs`) without modifying it.
4//! New callers (Subtask 3's `AppService::ExecutionService` impl) use this registry.
5//!
6//! # Design invariants
7//!
8//! - **Invariant 6**: `spawn_v2()` returns the `SessionId` immediately; execution
9//! runs in the background via `tokio::spawn(driver_loop(...))`.
10//! - **Crux R1**: No `rmcp::*`, `progressToken`, `_meta`, `notifications/*`, or
11//! `mcp_`-prefixed identifiers appear anywhere in this module.
12//! - **Crux R2**: Cancellation uses `CancellationToken::cancel()`; no
13//! `JoinHandle::abort()` or process kill path exists.
14//! - **Crux R3**: `observe()` is a sync `fn` that calls `bus_tx.subscribe()` and
15//! returns a valid handle with zero pre-registered observers.
16//! - **K-4**: The `sessions` `RwLock` is never held across `.await` points; the
17//! `clone-then-release` pattern is used throughout.
18
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use algocline_core::execution::{
23 AwaitError, CancelError, CancelReason, ExecutionState, ExecutionStateTag, ObserveError,
24 ObserverHandle, PauseKind, ProgressEvent, ResumeError, ResumeOutcome, SessionId, SpawnError,
25 StateError, TerminalOutcome,
26};
27use algocline_core::QueryId;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30
31use super::driver::{build_cancel_info, driver_loop, now_ms, transition_state};
32use super::observer::BroadcastObserverHandle;
33use super::record::{RespTxsMap, SessionRecord};
34use crate::card::FileCardStore;
35use crate::executor::Executor;
36use crate::state::JsonFileStore;
37
38// ---------------------------------------------------------------------------
39// SessionRegistryV2
40// ---------------------------------------------------------------------------
41
42/// Registry that manages the lifecycle of v2 execution sessions.
43///
44/// `Clone` is cheap — the inner `Arc<RwLock<...>>` is reference-counted.
45#[derive(Clone)]
46pub struct SessionRegistryV2 {
47 sessions: Arc<RwLock<HashMap<SessionId, Arc<SessionRecord>>>>,
48 executor: Arc<Executor>,
49 state_store: Arc<JsonFileStore>,
50 card_store: Arc<FileCardStore>,
51 scenarios_dir: std::path::PathBuf,
52}
53
54impl SessionRegistryV2 {
55 /// Create a new empty registry backed by `executor`, with the storage paths
56 /// that will be injected into each spawned VM session.
57 ///
58 /// The `state_store` / `card_store` / `scenarios_dir` mirror the legacy
59 /// `AppService` resolution against the `AppConfig::app_dir()` layout, so a
60 /// v2 caller produces the same on-disk side effects as a legacy caller.
61 pub fn new(
62 executor: Arc<Executor>,
63 state_store: Arc<JsonFileStore>,
64 card_store: Arc<FileCardStore>,
65 scenarios_dir: std::path::PathBuf,
66 ) -> Self {
67 Self {
68 sessions: Arc::new(RwLock::new(HashMap::new())),
69 executor,
70 state_store,
71 card_store,
72 scenarios_dir,
73 }
74 }
75
76 // -----------------------------------------------------------------------
77 // spawn_v2
78 // -----------------------------------------------------------------------
79
80 /// Start a new v2 execution session, returning the `SessionId` immediately.
81 ///
82 /// Execution proceeds in the background via `tokio::spawn(driver_loop(...))`.
83 /// The caller receives the `SessionId` without waiting for execution to complete
84 /// or for the first event (Invariant 6 / debt #40955).
85 ///
86 /// Only [`algocline_core::execution::SpecKind::Run`] is supported in this subtask.
87 /// Other variants return [`SpawnError::InvalidSpec`]. Subtask 3 will extend this
88 /// to handle `Advice` and `Eval` through the full `AppService` path.
89 ///
90 /// # Errors
91 /// - [`SpawnError::Engine`] — the executor failed to start the session.
92 /// - [`SpawnError::InvalidSpec`] — the provided spec is malformed or uses an
93 /// unsupported kind.
94 pub async fn spawn_v2(
95 &self,
96 spec: algocline_core::execution::SessionSpec,
97 ) -> Result<SessionId, SpawnError> {
98 use algocline_core::execution::SpecKind;
99
100 // Extract code from the spec kind. Only Run is supported here.
101 let code = match spec.kind {
102 SpecKind::Run { code } => code,
103 other => {
104 return Err(SpawnError::InvalidSpec(format!(
105 "SessionRegistryV2::spawn_v2 only supports SpecKind::Run; got {:?}",
106 std::mem::discriminant(&other)
107 )));
108 }
109 };
110
111 if code.trim().is_empty() {
112 return Err(SpawnError::InvalidSpec("code must not be empty".into()));
113 }
114
115 let ctx = spec.ctx.unwrap_or_else(|| serde_json::json!({}));
116
117 // Start the per-session VM using the storage paths injected at
118 // registry construction (mirrors legacy AppService::start_and_tick).
119 let session = self
120 .executor
121 .start_session(
122 code,
123 ctx,
124 vec![], // extra_lib_paths — populated by Advice/Eval kinds later
125 vec![], // variant_pkgs — populated by Advice/Eval kinds later
126 Arc::clone(&self.state_store),
127 Arc::clone(&self.card_store),
128 self.scenarios_dir.clone(),
129 )
130 .await
131 .map_err(SpawnError::Engine)?;
132
133 let (exec_task, llm_rx, vm_driver) = session.into_driver_parts();
134
135 // Build shared components — all constructed before spawning the task.
136 let state: Arc<Mutex<ExecutionState>> = Arc::new(Mutex::new(ExecutionState::Running));
137 let cancel_token = CancellationToken::new();
138 let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
139
140 // Crux R3 (sink-free): the receiver returned alongside `bus_tx` is
141 // dropped immediately. `bus_tx.send()` returns `Err(SendError)` when
142 // 0 observers are subscribed, but every call site in `driver_loop`
143 // uses `let _ = bus_tx.send(...)` to absorb the result — the caller
144 // is never crashed by 0 observers. See
145 // `record::tests::bus_tx_does_not_crash_caller_with_zero_observers`.
146 let (bus_tx, _) = tokio::sync::broadcast::channel::<ProgressEvent>(256);
147
148 let session_id = SessionId::generate();
149
150 // Clones for the driver_loop closure.
151 let state_d = Arc::clone(&state);
152 let bus_tx_d = bus_tx.clone();
153 let cancel_d = cancel_token.clone();
154 let resp_txs_d = Arc::clone(&resp_txs);
155
156 let join_handle = tokio::spawn(async move {
157 // vm_driver must stay alive for the duration of the session.
158 let _keep_driver = vm_driver;
159 driver_loop(exec_task, llm_rx, state_d, bus_tx_d, cancel_d, resp_txs_d).await;
160 });
161
162 // Assemble the record with all shared fields.
163 let record = Arc::new(SessionRecord {
164 state,
165 bus_tx,
166 cancel_token,
167 join_handle: Mutex::new(Some(join_handle)),
168 resp_txs,
169 first_cancel_info: Mutex::new(None),
170 });
171
172 // Insert into registry.
173 {
174 let mut map = self.sessions.write().await;
175 map.insert(session_id.clone(), record);
176 }
177 Ok(session_id)
178 }
179
180 // -----------------------------------------------------------------------
181 // state
182 // -----------------------------------------------------------------------
183
184 /// Query the current [`ExecutionState`] of a session.
185 ///
186 /// # Errors
187 /// - [`StateError::NotFound`] — no session with the given id exists.
188 pub async fn state(&self, id: &SessionId) -> Result<ExecutionState, StateError> {
189 let record = self
190 .get_record(id)
191 .await
192 .ok_or_else(|| StateError::NotFound(id.clone()))?;
193 let guard = record.state.lock().await;
194 Ok(guard.clone())
195 }
196
197 // -----------------------------------------------------------------------
198 // resume
199 // -----------------------------------------------------------------------
200
201 /// Resume a paused session by delivering LLM responses.
202 ///
203 /// # Errors
204 /// - [`ResumeError::NotFound`] — no session with the given id exists.
205 /// - [`ResumeError::NotPaused`] — the session is not in the `Paused` state.
206 /// - [`ResumeError::AlreadyCancelled`] — the session is already cancelled.
207 pub async fn resume(
208 &self,
209 id: &SessionId,
210 payload: algocline_core::execution::ResumePayload,
211 ) -> Result<ResumeOutcome, ResumeError> {
212 use algocline_core::execution::ResumePayload;
213
214 let record = self
215 .get_record(id)
216 .await
217 .ok_or_else(|| ResumeError::NotFound(id.clone()))?;
218
219 // checkpoint C: at resume entry
220 // If the token is already cancelled, reject the resume immediately.
221 if record.cancel_token.is_cancelled() {
222 return Err(ResumeError::AlreadyCancelled);
223 }
224
225 // Verify the session is Paused (or Cancelled after the token check above).
226 let (actual_tag, pause_kind) = {
227 let guard = record.state.lock().await;
228 let tag = guard.tag();
229 let kind = if let ExecutionState::Paused(ref info) = *guard {
230 info.kind
231 } else {
232 PauseKind::Single
233 };
234 (tag, kind)
235 };
236
237 match actual_tag {
238 ExecutionStateTag::Cancelled => return Err(ResumeError::AlreadyCancelled),
239 ExecutionStateTag::Paused => {} // continue
240 _ => return Err(ResumeError::NotPaused { actual_tag }),
241 }
242
243 // Extract query responses from the payload.
244 let responses: Vec<(String, String)> = match payload {
245 ResumePayload::Single {
246 query_id, response, ..
247 } => vec![(query_id, response)],
248 ResumePayload::Batch(batch) => batch
249 .into_iter()
250 .map(|r| (r.query_id, r.response))
251 .collect(),
252 };
253
254 // Deliver responses via the shared resp_txs map.
255 {
256 let mut txs = record.resp_txs.lock().await;
257 for (qid_str, response) in responses {
258 let qid = QueryId::parse(&qid_str);
259 match txs.remove(&qid) {
260 Some(tx) => {
261 if let Err(_e) = tx.send(Ok(response)) {
262 tracing::debug!(
263 "registry::resume: oneshot receiver already dropped for query {qid_str}"
264 );
265 }
266 }
267 None => {
268 tracing::debug!("registry::resume: no pending tx for query {qid_str}");
269 }
270 }
271 }
272 }
273
274 // Transition state from Paused → Running.
275 {
276 let guard = record.state.lock().await;
277 if guard.tag() == ExecutionStateTag::Paused {
278 drop(guard);
279 transition_state(&record.state, &record.bus_tx, ExecutionState::Running).await;
280 let _ = record.bus_tx.send(ProgressEvent::ResumeAccepted {
281 payload_kind: pause_kind,
282 at: now_ms(),
283 });
284 }
285 }
286
287 Ok(ResumeOutcome::Continued)
288 }
289
290 // -----------------------------------------------------------------------
291 // cancel
292 // -----------------------------------------------------------------------
293
294 /// Request cooperative cancellation of a session.
295 ///
296 /// Idempotent: returns `Ok(())` for sessions already in a terminal state.
297 ///
298 /// # Errors
299 /// - [`CancelError::NotFound`] — no session with the given id exists.
300 pub async fn cancel(&self, id: &SessionId, reason: CancelReason) -> Result<(), CancelError> {
301 let record = self
302 .get_record(id)
303 .await
304 .ok_or_else(|| CancelError::NotFound(id.clone()))?;
305
306 // Idempotency: already terminal → Ok.
307 {
308 let guard = record.state.lock().await;
309 if matches!(
310 guard.tag(),
311 ExecutionStateTag::Done | ExecutionStateTag::Failed | ExecutionStateTag::Cancelled
312 ) {
313 return Ok(());
314 }
315 }
316
317 // Store the first CancelInfo (idempotent: only set once).
318 {
319 let mut first = record.first_cancel_info.lock().await;
320 if first.is_none() {
321 let info = build_cancel_info(&record.state, reason).await;
322 *first = Some(info);
323 }
324 }
325
326 // Signal the driver (Crux R2: cooperative — no abort).
327 record.cancel_token.cancel();
328
329 // For Paused sessions, transition immediately: the driver is blocked
330 // waiting for a resume and won't hit a checkpoint on its own.
331 let should_transition = {
332 let guard = record.state.lock().await;
333 guard.tag() == ExecutionStateTag::Paused
334 };
335 if should_transition {
336 let cancel_info_opt = {
337 let first = record.first_cancel_info.lock().await;
338 first.clone()
339 };
340 if let Some(info) = cancel_info_opt {
341 transition_state(
342 &record.state,
343 &record.bus_tx,
344 ExecutionState::Cancelled(info),
345 )
346 .await;
347 }
348 }
349
350 Ok(())
351 }
352
353 // -----------------------------------------------------------------------
354 // observe (sync fn — Crux R3)
355 // -----------------------------------------------------------------------
356
357 /// Subscribe to the progress event stream for a session.
358 ///
359 /// This is a **synchronous** `fn`: `broadcast::Sender::subscribe()` is
360 /// synchronous and does not perform I/O. Multiple concurrent subscribers
361 /// each receive the full event stream independently (Crux R3).
362 ///
363 /// # Errors
364 /// - [`ObserveError::NotFound`] — no session with the given id exists.
365 pub fn observe(&self, id: &SessionId) -> Result<Box<dyn ObserverHandle>, ObserveError> {
366 // Non-blocking read; the write lock is only held very briefly during spawn.
367 match self.sessions.try_read() {
368 Ok(map) => {
369 let record = map
370 .get(id)
371 .ok_or_else(|| ObserveError::NotFound(id.clone()))?;
372 Ok(Box::new(BroadcastObserverHandle::new(&record.bus_tx)))
373 }
374 Err(_) => Err(ObserveError::NotFound(id.clone())),
375 }
376 }
377
378 // -----------------------------------------------------------------------
379 // await_terminal
380 // -----------------------------------------------------------------------
381
382 /// Await the terminal state of a session.
383 ///
384 /// Polls the shared state until it reaches a terminal variant (`Done`,
385 /// `Cancelled`, or `Failed`). The `JoinHandle` is never `.abort()`-ed
386 /// (Crux R2).
387 ///
388 /// # Errors
389 /// - [`AwaitError::NotFound`] — no session with the given id exists.
390 pub async fn await_terminal(&self, id: &SessionId) -> Result<TerminalOutcome, AwaitError> {
391 let record = self
392 .get_record(id)
393 .await
394 .ok_or_else(|| AwaitError::NotFound(id.clone()))?;
395
396 // Single-awaiter path: take the JoinHandle and await `driver_loop`
397 // completion directly. Replaces the previous `yield_now()` polling
398 // loop that occupied a tokio worker slot scheduling-wise even though
399 // it consumed no CPU. The `driver_loop` guarantees a terminal
400 // `transition_state` before returning, so once `handle.await` resolves
401 // the state is guaranteed terminal.
402 let handle_opt = {
403 let mut guard = record.join_handle.lock().await;
404 guard.take()
405 };
406
407 if let Some(handle) = handle_opt {
408 handle
409 .await
410 .map_err(|e| AwaitError::Joined(format!("driver_loop join error: {e}")))?;
411 }
412 // (None branch: another caller has already taken the handle. Either
413 // they are still awaiting it — in which case the driver_loop has not
414 // yet transitioned to terminal — or they have already finished, in
415 // which case the state is terminal. We fall through to a single
416 // state read; the rare concurrent race returns `AwaitError::Joined`.)
417
418 let guard = record.state.lock().await;
419 match &*guard {
420 ExecutionState::Done(result) => Ok(TerminalOutcome::Done(result.clone())),
421 ExecutionState::Cancelled(info) => Ok(TerminalOutcome::Cancelled(info.clone())),
422 ExecutionState::Failed(info) => Ok(TerminalOutcome::Failed(info.clone())),
423 other => Err(AwaitError::Joined(format!(
424 "await_terminal: driver_loop completed but state is {:?} (concurrent awaiter race)",
425 other.tag()
426 ))),
427 }
428 }
429
430 // -----------------------------------------------------------------------
431 // Internal helpers
432 // -----------------------------------------------------------------------
433
434 /// Clone-then-release lookup (K-4): the lock is dropped before returning.
435 async fn get_record(&self, id: &SessionId) -> Option<Arc<SessionRecord>> {
436 let map = self.sessions.read().await;
437 map.get(id).cloned()
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use algocline_core::execution::{
445 CancelCode, CancelReason, ExecutionState, SessionSpec, SpecKind,
446 };
447 use std::sync::Arc;
448
449 async fn make_executor() -> Arc<Executor> {
450 Arc::new(Executor::new(vec![]).await.expect("Executor::new"))
451 }
452
453 /// Construct a registry backed by per-test tempdir paths so the legacy
454 /// AppConfig::app_dir() layout is approximated without touching the user's
455 /// `~/.algocline` directory.
456 fn make_registry(executor: Arc<Executor>) -> (SessionRegistryV2, tempfile::TempDir) {
457 let tmp = tempfile::tempdir().expect("tempdir");
458 let state_store = Arc::new(JsonFileStore::new(tmp.path().join("state")));
459 let card_store = Arc::new(FileCardStore::new(tmp.path().join("cards")));
460 let scenarios_dir = tmp.path().join("scenarios");
461 (
462 SessionRegistryV2::new(executor, state_store, card_store, scenarios_dir),
463 tmp,
464 )
465 }
466
467 fn simple_spec(code: &str) -> SessionSpec {
468 SessionSpec {
469 kind: SpecKind::Run {
470 code: code.to_owned(),
471 },
472 project_root: None,
473 ctx: None,
474 }
475 }
476
477 fn cancel_reason() -> CancelReason {
478 CancelReason {
479 code: CancelCode::User,
480 detail: None,
481 requested_at: now_ms(),
482 }
483 }
484
485 // -----------------------------------------------------------------------
486 // spawn_returns_session_id_immediately (debt #40955)
487 // -----------------------------------------------------------------------
488
489 /// `spawn_v2` must return `SessionId` without blocking on execution.
490 #[tokio::test]
491 async fn spawn_returns_session_id_immediately() {
492 let executor = make_executor().await;
493 let (registry, _tmp) = make_registry(executor);
494
495 let start = std::time::Instant::now();
496 let result = tokio::time::timeout(
497 std::time::Duration::from_millis(200),
498 registry.spawn_v2(simple_spec("return 42")),
499 )
500 .await;
501
502 assert!(result.is_ok(), "spawn_v2 must complete within 200ms");
503 assert!(
504 result.unwrap().is_ok(),
505 "spawn_v2 must return Ok(SessionId)"
506 );
507
508 let elapsed = start.elapsed();
509 assert!(
510 elapsed < std::time::Duration::from_millis(150),
511 "spawn_v2 took too long: {elapsed:?}"
512 );
513 }
514
515 // -----------------------------------------------------------------------
516 // state_query_running
517 // -----------------------------------------------------------------------
518
519 /// Immediately after spawn, `state()` must return Running or Paused.
520 #[tokio::test]
521 async fn state_query_running() {
522 let executor = make_executor().await;
523 let (registry, _tmp) = make_registry(executor);
524
525 // Lua that pauses immediately so the session is observable.
526 let sid = registry
527 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
528 .await
529 .expect("spawn");
530
531 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
532
533 let state = registry.state(&sid).await.expect("state");
534 assert!(
535 matches!(state, ExecutionState::Running | ExecutionState::Paused(_)),
536 "state just after spawn must be Running or Paused, got: {:?}",
537 state.tag()
538 );
539 }
540
541 // -----------------------------------------------------------------------
542 // cancel_at_checkpoint_c_at_resume_entry
543 // -----------------------------------------------------------------------
544
545 /// `resume()` on a cancelled session must return `AlreadyCancelled`.
546 #[tokio::test]
547 async fn cancel_at_checkpoint_c_at_resume_entry() {
548 use algocline_core::execution::{ResumeError, ResumePayload};
549
550 let executor = make_executor().await;
551 let (registry, _tmp) = make_registry(executor);
552
553 let sid = registry
554 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
555 .await
556 .expect("spawn");
557
558 // Wait for Paused.
559 let mut retries = 0;
560 loop {
561 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
562 if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
563 break;
564 }
565 retries += 1;
566 assert!(retries < 50, "session did not reach Paused state");
567 }
568
569 registry
570 .cancel(&sid, cancel_reason())
571 .await
572 .expect("cancel");
573
574 // checkpoint C: at resume entry
575 let result = registry
576 .resume(
577 &sid,
578 ResumePayload::Single {
579 query_id: "q".into(),
580 response: "4".into(),
581 usage: None,
582 },
583 )
584 .await;
585
586 assert!(
587 matches!(result, Err(ResumeError::AlreadyCancelled)),
588 "resume on cancelled session must return AlreadyCancelled, got: {result:?}"
589 );
590 }
591
592 // -----------------------------------------------------------------------
593 // cancel_idempotent
594 // -----------------------------------------------------------------------
595
596 #[tokio::test]
597 async fn cancel_idempotent() {
598 let executor = make_executor().await;
599 let (registry, _tmp) = make_registry(executor);
600
601 let sid = registry
602 .spawn_v2(simple_spec("return 1"))
603 .await
604 .expect("spawn");
605
606 registry
607 .cancel(&sid, cancel_reason())
608 .await
609 .expect("first cancel");
610 registry
611 .cancel(&sid, cancel_reason())
612 .await
613 .expect("second cancel");
614 }
615
616 // -----------------------------------------------------------------------
617 // await_terminal returns Done without busy-polling
618 // -----------------------------------------------------------------------
619
620 /// Regression for #2 (case A): `await_terminal` must complete by awaiting
621 /// the `driver_loop` `JoinHandle` directly (single-awaiter `take` +
622 /// `.await`) instead of polling `state` in a `yield_now()` loop. We can't
623 /// observe scheduler occupancy from a test, but we can verify the
624 /// behavioural contract: (1) the call returns the correct `TerminalOutcome`,
625 /// (2) it returns within a tight wall-clock budget without sleep, and
626 /// (3) a second concurrent caller does not panic.
627 #[tokio::test]
628 async fn await_terminal_returns_done_for_trivial_script() {
629 let executor = make_executor().await;
630 let (registry, _tmp) = make_registry(executor);
631
632 let sid = registry
633 .spawn_v2(simple_spec("return 42"))
634 .await
635 .expect("spawn");
636
637 let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
638 match outcome {
639 TerminalOutcome::Done(result) => {
640 assert_eq!(result.value, serde_json::json!(42));
641 }
642 other => panic!("expected Done, got: {other:?}"),
643 }
644 }
645
646 /// Regression for #2 (case A) single-awaiter discipline: when two callers
647 /// race on `await_terminal`, the second caller (which observes `None` after
648 /// the first has taken the handle) must NOT panic. It must either return
649 /// the same terminal outcome (if the first has already finished) or an
650 /// `AwaitError::Joined` (the documented race fallback).
651 #[tokio::test]
652 async fn await_terminal_does_not_panic_on_second_concurrent_caller() {
653 let executor = make_executor().await;
654 let (registry, _tmp) = make_registry(executor);
655
656 let sid = registry
657 .spawn_v2(simple_spec("return 99"))
658 .await
659 .expect("spawn");
660
661 let r1 = registry.clone();
662 let r2 = registry.clone();
663 let s1 = sid.clone();
664 let s2 = sid.clone();
665
666 let h1 = tokio::spawn(async move { r1.await_terminal(&s1).await });
667 let h2 = tokio::spawn(async move { r2.await_terminal(&s2).await });
668
669 let out1 = h1.await.expect("h1 join");
670 let out2 = h2.await.expect("h2 join");
671
672 // First-caller path must succeed with the real outcome.
673 let first_ok = matches!(&out1, Ok(TerminalOutcome::Done(_)))
674 || matches!(&out2, Ok(TerminalOutcome::Done(_)));
675 assert!(
676 first_ok,
677 "at least one caller must observe Done; got out1={out1:?}, out2={out2:?}"
678 );
679 // Second caller may have observed Joined (race) or Done; either is OK,
680 // neither must panic — which we've already verified by the join above.
681 }
682
683 // -----------------------------------------------------------------------
684 // observe_sink_free (Crux R3 — registry level)
685 // -----------------------------------------------------------------------
686
687 /// `observe()` must succeed and return a valid handle even with 0 prior observers.
688 #[tokio::test]
689 async fn observe_sink_free_registry() {
690 let executor = make_executor().await;
691 let (registry, _tmp) = make_registry(executor);
692
693 let sid = registry
694 .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
695 .await
696 .expect("spawn");
697
698 // observe() before any subscriber exists must succeed.
699 let handle = registry.observe(&sid);
700 assert!(
701 handle.is_ok(),
702 "observe() must return Ok even with 0 prior observers"
703 );
704 }
705
706 // -----------------------------------------------------------------------
707 // observe_multi_subscriber_fan_out (Crux R3 — registry level)
708 // -----------------------------------------------------------------------
709
710 /// Multiple independent observers each get the same events.
711 #[tokio::test]
712 async fn observe_multi_subscriber_fan_out_registry() {
713 use algocline_core::execution::ObserverRecvError;
714
715 let executor = make_executor().await;
716 let (registry, _tmp) = make_registry(executor);
717
718 // A script that returns immediately — the driver will publish Done.
719 let sid = registry
720 .spawn_v2(simple_spec("return 99"))
721 .await
722 .expect("spawn");
723
724 // Subscribe 3 observers.
725 let mut h1 = registry.observe(&sid).expect("observe h1");
726 let mut h2 = registry.observe(&sid).expect("observe h2");
727 let mut h3 = registry.observe(&sid).expect("observe h3");
728
729 // Wait for terminal so we know events have been published.
730 let _ = registry.await_terminal(&sid).await;
731
732 // Each observer must receive at least the terminal StateTransition.
733 // Drain with idle-timeout: bus_tx is retained in SessionRecord for
734 // sink-free late-subscribe (Crux R3), so Closed never fires while the
735 // registry is alive. A 100ms idle window after await_terminal() is
736 // sufficient — all events are already buffered.
737 use std::time::Duration;
738 for (label, handle) in [("h1", &mut h1), ("h2", &mut h2), ("h3", &mut h3)] {
739 let mut got_transition = false;
740 loop {
741 match tokio::time::timeout(Duration::from_millis(100), handle.recv()).await {
742 Ok(Ok(ProgressEvent::StateTransition { .. })) => got_transition = true,
743 Ok(Ok(_)) => {}
744 Ok(Err(ObserverRecvError::Closed)) => break,
745 Ok(Err(ObserverRecvError::Lagged(_))) => {}
746 Err(_) => break, // idle-timeout: no more events coming
747 }
748 }
749 assert!(
750 got_transition,
751 "{label}: must receive at least one StateTransition event"
752 );
753 }
754 }
755}