Skip to main content

algocline_engine/execution/
registry.rs

1//! `SessionRegistryV2` — the engine-level session lifecycle manager for the v2 path.
2//!
3//! Coexists with the legacy `SessionRegistry` (`session.rs`) without modifying it.
4//! New callers (Subtask 3's `AppService::ExecutionService` impl) use this registry.
5//!
6//! # Design invariants
7//!
8//! - **Invariant 6**: `spawn_v2()` returns the `SessionId` immediately; execution
9//!   runs in the background via `tokio::spawn(driver_loop(...))`.
10//! - **Crux R1**: No `rmcp::*`, `progressToken`, `_meta`, `notifications/*`, or
11//!   `mcp_`-prefixed identifiers appear anywhere in this module.
12//! - **Crux R2**: Cancellation uses `CancellationToken::cancel()`; no
13//!   `JoinHandle::abort()` or process kill path exists.
14//! - **Crux R3**: `observe()` is a sync `fn` that calls `bus_tx.subscribe()` and
15//!   returns a valid handle with zero pre-registered observers.
16//! - **K-4**: The `sessions` `RwLock` is never held across `.await` points; the
17//!   `clone-then-release` pattern is used throughout.
18
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use algocline_core::execution::{
23    AwaitError, CancelError, CancelReason, ExecutionState, ExecutionStateTag, ObserveError,
24    ObserverHandle, PauseKind, ProgressEvent, ResumeError, ResumeOutcome, SessionId, SpawnError,
25    StateError, TerminalOutcome,
26};
27use algocline_core::QueryId;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30
31use super::driver::{build_cancel_info, driver_loop, now_ms, transition_state};
32use super::observer::BroadcastObserverHandle;
33use super::record::{RespTxsMap, SessionRecord};
34use crate::card::FileCardStore;
35use crate::executor::Executor;
36use crate::state::JsonFileStore;
37
38// ---------------------------------------------------------------------------
39// SessionRegistryV2
40// ---------------------------------------------------------------------------
41
42/// Registry that manages the lifecycle of v2 execution sessions.
43///
44/// `Clone` is cheap — the inner `Arc<RwLock<...>>` is reference-counted.
45#[derive(Clone)]
46pub struct SessionRegistryV2 {
47    sessions: Arc<RwLock<HashMap<SessionId, Arc<SessionRecord>>>>,
48    executor: Arc<Executor>,
49    state_store: Arc<JsonFileStore>,
50    card_store: Arc<FileCardStore>,
51    scenarios_dir: std::path::PathBuf,
52}
53
54impl SessionRegistryV2 {
55    /// Create a new empty registry backed by `executor`, with the storage paths
56    /// that will be injected into each spawned VM session.
57    ///
58    /// The `state_store` / `card_store` / `scenarios_dir` mirror the legacy
59    /// `AppService` resolution against the `AppConfig::app_dir()` layout, so a
60    /// v2 caller produces the same on-disk side effects as a legacy caller.
61    pub fn new(
62        executor: Arc<Executor>,
63        state_store: Arc<JsonFileStore>,
64        card_store: Arc<FileCardStore>,
65        scenarios_dir: std::path::PathBuf,
66    ) -> Self {
67        Self {
68            sessions: Arc::new(RwLock::new(HashMap::new())),
69            executor,
70            state_store,
71            card_store,
72            scenarios_dir,
73        }
74    }
75
76    // -----------------------------------------------------------------------
77    // spawn_v2
78    // -----------------------------------------------------------------------
79
80    /// Start a new v2 execution session, returning the `SessionId` immediately.
81    ///
82    /// Execution proceeds in the background via `tokio::spawn(driver_loop(...))`.
83    /// The caller receives the `SessionId` without waiting for execution to complete
84    /// or for the first event (Invariant 6 / debt #40955).
85    ///
86    /// Only [`algocline_core::execution::SpecKind::Run`] is supported in this subtask.
87    /// Other variants return [`SpawnError::InvalidSpec`].  Subtask 3 will extend this
88    /// to handle `Advice` and `Eval` through the full `AppService` path.
89    ///
90    /// # Errors
91    /// - [`SpawnError::Engine`] — the executor failed to start the session.
92    /// - [`SpawnError::InvalidSpec`] — the provided spec is malformed or uses an
93    ///   unsupported kind.
94    pub async fn spawn_v2(
95        &self,
96        spec: algocline_core::execution::SessionSpec,
97    ) -> Result<SessionId, SpawnError> {
98        use algocline_core::execution::SpecKind;
99
100        // Extract code from the spec kind.  Only Run is supported here.
101        let code = match spec.kind {
102            SpecKind::Run { code } => code,
103            other => {
104                return Err(SpawnError::InvalidSpec(format!(
105                    "SessionRegistryV2::spawn_v2 only supports SpecKind::Run; got {:?}",
106                    std::mem::discriminant(&other)
107                )));
108            }
109        };
110
111        if code.trim().is_empty() {
112            return Err(SpawnError::InvalidSpec("code must not be empty".into()));
113        }
114
115        let ctx = spec.ctx.unwrap_or_else(|| serde_json::json!({}));
116
117        // Start the per-session VM using the storage paths injected at
118        // registry construction (mirrors legacy AppService::start_and_tick).
119        let session = self
120            .executor
121            .start_session(
122                code,
123                ctx,
124                vec![], // extra_lib_paths — populated by Advice/Eval kinds later
125                vec![], // variant_pkgs   — populated by Advice/Eval kinds later
126                Arc::clone(&self.state_store),
127                Arc::clone(&self.card_store),
128                self.scenarios_dir.clone(),
129            )
130            .await
131            .map_err(SpawnError::Engine)?;
132
133        let (exec_task, llm_rx, vm_driver) = session.into_driver_parts();
134
135        // Build shared components — all constructed before spawning the task.
136        let state: Arc<Mutex<ExecutionState>> = Arc::new(Mutex::new(ExecutionState::Running));
137        let cancel_token = CancellationToken::new();
138        let resp_txs: RespTxsMap = Arc::new(Mutex::new(HashMap::new()));
139
140        // Crux R3 (sink-free): the receiver returned alongside `bus_tx` is
141        // dropped immediately.  `bus_tx.send()` returns `Err(SendError)` when
142        // 0 observers are subscribed, but every call site in `driver_loop`
143        // uses `let _ = bus_tx.send(...)` to absorb the result — the caller
144        // is never crashed by 0 observers.  See
145        // `record::tests::bus_tx_does_not_crash_caller_with_zero_observers`.
146        let (bus_tx, _) = tokio::sync::broadcast::channel::<ProgressEvent>(256);
147
148        let session_id = SessionId::generate();
149
150        // Clones for the driver_loop closure.
151        let state_d = Arc::clone(&state);
152        let bus_tx_d = bus_tx.clone();
153        let cancel_d = cancel_token.clone();
154        let resp_txs_d = Arc::clone(&resp_txs);
155
156        let join_handle = tokio::spawn(async move {
157            // vm_driver must stay alive for the duration of the session.
158            let _keep_driver = vm_driver;
159            driver_loop(exec_task, llm_rx, state_d, bus_tx_d, cancel_d, resp_txs_d).await;
160        });
161
162        // Assemble the record with all shared fields.
163        let record = Arc::new(SessionRecord {
164            state,
165            bus_tx,
166            cancel_token,
167            join_handle: Mutex::new(Some(join_handle)),
168            resp_txs,
169            first_cancel_info: Mutex::new(None),
170        });
171
172        // Insert into registry.
173        {
174            let mut map = self.sessions.write().await;
175            map.insert(session_id.clone(), record);
176        }
177        Ok(session_id)
178    }
179
180    // -----------------------------------------------------------------------
181    // state
182    // -----------------------------------------------------------------------
183
184    /// Query the current [`ExecutionState`] of a session.
185    ///
186    /// # Errors
187    /// - [`StateError::NotFound`] — no session with the given id exists.
188    pub async fn state(&self, id: &SessionId) -> Result<ExecutionState, StateError> {
189        let record = self
190            .get_record(id)
191            .await
192            .ok_or_else(|| StateError::NotFound(id.clone()))?;
193        let guard = record.state.lock().await;
194        Ok(guard.clone())
195    }
196
197    // -----------------------------------------------------------------------
198    // resume
199    // -----------------------------------------------------------------------
200
201    /// Resume a paused session by delivering LLM responses.
202    ///
203    /// # Errors
204    /// - [`ResumeError::NotFound`] — no session with the given id exists.
205    /// - [`ResumeError::NotPaused`] — the session is not in the `Paused` state.
206    /// - [`ResumeError::AlreadyCancelled`] — the session is already cancelled.
207    pub async fn resume(
208        &self,
209        id: &SessionId,
210        payload: algocline_core::execution::ResumePayload,
211    ) -> Result<ResumeOutcome, ResumeError> {
212        use algocline_core::execution::ResumePayload;
213
214        let record = self
215            .get_record(id)
216            .await
217            .ok_or_else(|| ResumeError::NotFound(id.clone()))?;
218
219        // checkpoint C: at resume entry
220        // If the token is already cancelled, reject the resume immediately.
221        if record.cancel_token.is_cancelled() {
222            return Err(ResumeError::AlreadyCancelled);
223        }
224
225        // Verify the session is Paused (or Cancelled after the token check above).
226        let (actual_tag, pause_kind) = {
227            let guard = record.state.lock().await;
228            let tag = guard.tag();
229            let kind = if let ExecutionState::Paused(ref info) = *guard {
230                info.kind
231            } else {
232                PauseKind::Single
233            };
234            (tag, kind)
235        };
236
237        match actual_tag {
238            ExecutionStateTag::Cancelled => return Err(ResumeError::AlreadyCancelled),
239            ExecutionStateTag::Paused => {} // continue
240            _ => return Err(ResumeError::NotPaused { actual_tag }),
241        }
242
243        // Extract query responses from the payload.
244        let responses: Vec<(String, String)> = match payload {
245            ResumePayload::Single {
246                query_id, response, ..
247            } => vec![(query_id, response)],
248            ResumePayload::Batch(batch) => batch
249                .into_iter()
250                .map(|r| (r.query_id, r.response))
251                .collect(),
252        };
253
254        // Deliver responses via the shared resp_txs map.
255        {
256            let mut txs = record.resp_txs.lock().await;
257            for (qid_str, response) in responses {
258                let qid = QueryId::parse(&qid_str);
259                match txs.remove(&qid) {
260                    Some(tx) => {
261                        if let Err(_e) = tx.send(Ok(response)) {
262                            tracing::debug!(
263                                "registry::resume: oneshot receiver already dropped for query {qid_str}"
264                            );
265                        }
266                    }
267                    None => {
268                        tracing::debug!("registry::resume: no pending tx for query {qid_str}");
269                    }
270                }
271            }
272        }
273
274        // Transition state from Paused → Running.
275        {
276            let guard = record.state.lock().await;
277            if guard.tag() == ExecutionStateTag::Paused {
278                drop(guard);
279                transition_state(&record.state, &record.bus_tx, ExecutionState::Running).await;
280                let _ = record.bus_tx.send(ProgressEvent::ResumeAccepted {
281                    payload_kind: pause_kind,
282                    at: now_ms(),
283                });
284            }
285        }
286
287        Ok(ResumeOutcome::Continued)
288    }
289
290    // -----------------------------------------------------------------------
291    // cancel
292    // -----------------------------------------------------------------------
293
294    /// Request cooperative cancellation of a session.
295    ///
296    /// Idempotent: returns `Ok(())` for sessions already in a terminal state.
297    ///
298    /// # Errors
299    /// - [`CancelError::NotFound`] — no session with the given id exists.
300    pub async fn cancel(&self, id: &SessionId, reason: CancelReason) -> Result<(), CancelError> {
301        let record = self
302            .get_record(id)
303            .await
304            .ok_or_else(|| CancelError::NotFound(id.clone()))?;
305
306        // Idempotency: already terminal → Ok.
307        {
308            let guard = record.state.lock().await;
309            if matches!(
310                guard.tag(),
311                ExecutionStateTag::Done | ExecutionStateTag::Failed | ExecutionStateTag::Cancelled
312            ) {
313                return Ok(());
314            }
315        }
316
317        // Store the first CancelInfo (idempotent: only set once).
318        {
319            let mut first = record.first_cancel_info.lock().await;
320            if first.is_none() {
321                let info = build_cancel_info(&record.state, reason).await;
322                *first = Some(info);
323            }
324        }
325
326        // Signal the driver (Crux R2: cooperative — no abort).
327        record.cancel_token.cancel();
328
329        // For Paused sessions, transition immediately: the driver is blocked
330        // waiting for a resume and won't hit a checkpoint on its own.
331        let should_transition = {
332            let guard = record.state.lock().await;
333            guard.tag() == ExecutionStateTag::Paused
334        };
335        if should_transition {
336            let cancel_info_opt = {
337                let first = record.first_cancel_info.lock().await;
338                first.clone()
339            };
340            if let Some(info) = cancel_info_opt {
341                transition_state(
342                    &record.state,
343                    &record.bus_tx,
344                    ExecutionState::Cancelled(info),
345                )
346                .await;
347            }
348        }
349
350        Ok(())
351    }
352
353    // -----------------------------------------------------------------------
354    // observe  (sync fn — Crux R3)
355    // -----------------------------------------------------------------------
356
357    /// Subscribe to the progress event stream for a session.
358    ///
359    /// This is a **synchronous** `fn`: `broadcast::Sender::subscribe()` is
360    /// synchronous and does not perform I/O.  Multiple concurrent subscribers
361    /// each receive the full event stream independently (Crux R3).
362    ///
363    /// # Errors
364    /// - [`ObserveError::NotFound`] — no session with the given id exists.
365    pub fn observe(&self, id: &SessionId) -> Result<Box<dyn ObserverHandle>, ObserveError> {
366        // Non-blocking read; the write lock is only held very briefly during spawn.
367        match self.sessions.try_read() {
368            Ok(map) => {
369                let record = map
370                    .get(id)
371                    .ok_or_else(|| ObserveError::NotFound(id.clone()))?;
372                Ok(Box::new(BroadcastObserverHandle::new(&record.bus_tx)))
373            }
374            Err(_) => Err(ObserveError::NotFound(id.clone())),
375        }
376    }
377
378    // -----------------------------------------------------------------------
379    // await_terminal
380    // -----------------------------------------------------------------------
381
382    /// Await the terminal state of a session.
383    ///
384    /// Polls the shared state until it reaches a terminal variant (`Done`,
385    /// `Cancelled`, or `Failed`).  The `JoinHandle` is never `.abort()`-ed
386    /// (Crux R2).
387    ///
388    /// # Errors
389    /// - [`AwaitError::NotFound`] — no session with the given id exists.
390    pub async fn await_terminal(&self, id: &SessionId) -> Result<TerminalOutcome, AwaitError> {
391        let record = self
392            .get_record(id)
393            .await
394            .ok_or_else(|| AwaitError::NotFound(id.clone()))?;
395
396        // Single-awaiter path: take the JoinHandle and await `driver_loop`
397        // completion directly.  Replaces the previous `yield_now()` polling
398        // loop that occupied a tokio worker slot scheduling-wise even though
399        // it consumed no CPU.  The `driver_loop` guarantees a terminal
400        // `transition_state` before returning, so once `handle.await` resolves
401        // the state is guaranteed terminal.
402        let handle_opt = {
403            let mut guard = record.join_handle.lock().await;
404            guard.take()
405        };
406
407        if let Some(handle) = handle_opt {
408            handle
409                .await
410                .map_err(|e| AwaitError::Joined(format!("driver_loop join error: {e}")))?;
411        }
412        // (None branch: another caller has already taken the handle.  Either
413        // they are still awaiting it — in which case the driver_loop has not
414        // yet transitioned to terminal — or they have already finished, in
415        // which case the state is terminal.  We fall through to a single
416        // state read; the rare concurrent race returns `AwaitError::Joined`.)
417
418        let guard = record.state.lock().await;
419        match &*guard {
420            ExecutionState::Done(result) => Ok(TerminalOutcome::Done(result.clone())),
421            ExecutionState::Cancelled(info) => Ok(TerminalOutcome::Cancelled(info.clone())),
422            ExecutionState::Failed(info) => Ok(TerminalOutcome::Failed(info.clone())),
423            other => Err(AwaitError::Joined(format!(
424                "await_terminal: driver_loop completed but state is {:?} (concurrent awaiter race)",
425                other.tag()
426            ))),
427        }
428    }
429
430    // -----------------------------------------------------------------------
431    // Internal helpers
432    // -----------------------------------------------------------------------
433
434    /// Clone-then-release lookup (K-4): the lock is dropped before returning.
435    async fn get_record(&self, id: &SessionId) -> Option<Arc<SessionRecord>> {
436        let map = self.sessions.read().await;
437        map.get(id).cloned()
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use algocline_core::execution::{
445        CancelCode, CancelReason, ExecutionState, SessionSpec, SpecKind,
446    };
447    use std::sync::Arc;
448
449    async fn make_executor() -> Arc<Executor> {
450        Arc::new(Executor::new(vec![]).await.expect("Executor::new"))
451    }
452
453    /// Construct a registry backed by per-test tempdir paths so the legacy
454    /// AppConfig::app_dir() layout is approximated without touching the user's
455    /// `~/.algocline` directory.
456    fn make_registry(executor: Arc<Executor>) -> (SessionRegistryV2, tempfile::TempDir) {
457        let tmp = tempfile::tempdir().expect("tempdir");
458        let state_store = Arc::new(JsonFileStore::new(tmp.path().join("state")));
459        let card_store = Arc::new(FileCardStore::new(tmp.path().join("cards")));
460        let scenarios_dir = tmp.path().join("scenarios");
461        (
462            SessionRegistryV2::new(executor, state_store, card_store, scenarios_dir),
463            tmp,
464        )
465    }
466
467    fn simple_spec(code: &str) -> SessionSpec {
468        SessionSpec {
469            kind: SpecKind::Run {
470                code: code.to_owned(),
471            },
472            project_root: None,
473            ctx: None,
474        }
475    }
476
477    fn cancel_reason() -> CancelReason {
478        CancelReason {
479            code: CancelCode::User,
480            detail: None,
481            requested_at: now_ms(),
482        }
483    }
484
485    // -----------------------------------------------------------------------
486    // spawn_returns_session_id_immediately (debt #40955)
487    // -----------------------------------------------------------------------
488
489    /// `spawn_v2` must return `SessionId` without blocking on execution.
490    #[tokio::test]
491    async fn spawn_returns_session_id_immediately() {
492        let executor = make_executor().await;
493        let (registry, _tmp) = make_registry(executor);
494
495        let start = std::time::Instant::now();
496        let result = tokio::time::timeout(
497            std::time::Duration::from_millis(200),
498            registry.spawn_v2(simple_spec("return 42")),
499        )
500        .await;
501
502        assert!(result.is_ok(), "spawn_v2 must complete within 200ms");
503        assert!(
504            result.unwrap().is_ok(),
505            "spawn_v2 must return Ok(SessionId)"
506        );
507
508        let elapsed = start.elapsed();
509        assert!(
510            elapsed < std::time::Duration::from_millis(150),
511            "spawn_v2 took too long: {elapsed:?}"
512        );
513    }
514
515    // -----------------------------------------------------------------------
516    // state_query_running
517    // -----------------------------------------------------------------------
518
519    /// Immediately after spawn, `state()` must return Running or Paused.
520    #[tokio::test]
521    async fn state_query_running() {
522        let executor = make_executor().await;
523        let (registry, _tmp) = make_registry(executor);
524
525        // Lua that pauses immediately so the session is observable.
526        let sid = registry
527            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
528            .await
529            .expect("spawn");
530
531        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
532
533        let state = registry.state(&sid).await.expect("state");
534        assert!(
535            matches!(state, ExecutionState::Running | ExecutionState::Paused(_)),
536            "state just after spawn must be Running or Paused, got: {:?}",
537            state.tag()
538        );
539    }
540
541    // -----------------------------------------------------------------------
542    // cancel_at_checkpoint_c_at_resume_entry
543    // -----------------------------------------------------------------------
544
545    /// `resume()` on a cancelled session must return `AlreadyCancelled`.
546    #[tokio::test]
547    async fn cancel_at_checkpoint_c_at_resume_entry() {
548        use algocline_core::execution::{ResumeError, ResumePayload};
549
550        let executor = make_executor().await;
551        let (registry, _tmp) = make_registry(executor);
552
553        let sid = registry
554            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
555            .await
556            .expect("spawn");
557
558        // Wait for Paused.
559        let mut retries = 0;
560        loop {
561            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
562            if registry.state(&sid).await.expect("state").tag() == ExecutionStateTag::Paused {
563                break;
564            }
565            retries += 1;
566            assert!(retries < 50, "session did not reach Paused state");
567        }
568
569        registry
570            .cancel(&sid, cancel_reason())
571            .await
572            .expect("cancel");
573
574        // checkpoint C: at resume entry
575        let result = registry
576            .resume(
577                &sid,
578                ResumePayload::Single {
579                    query_id: "q".into(),
580                    response: "4".into(),
581                    usage: None,
582                },
583            )
584            .await;
585
586        assert!(
587            matches!(result, Err(ResumeError::AlreadyCancelled)),
588            "resume on cancelled session must return AlreadyCancelled, got: {result:?}"
589        );
590    }
591
592    // -----------------------------------------------------------------------
593    // cancel_idempotent
594    // -----------------------------------------------------------------------
595
596    #[tokio::test]
597    async fn cancel_idempotent() {
598        let executor = make_executor().await;
599        let (registry, _tmp) = make_registry(executor);
600
601        let sid = registry
602            .spawn_v2(simple_spec("return 1"))
603            .await
604            .expect("spawn");
605
606        registry
607            .cancel(&sid, cancel_reason())
608            .await
609            .expect("first cancel");
610        registry
611            .cancel(&sid, cancel_reason())
612            .await
613            .expect("second cancel");
614    }
615
616    // -----------------------------------------------------------------------
617    // await_terminal returns Done without busy-polling
618    // -----------------------------------------------------------------------
619
620    /// Regression for #2 (case A): `await_terminal` must complete by awaiting
621    /// the `driver_loop` `JoinHandle` directly (single-awaiter `take` +
622    /// `.await`) instead of polling `state` in a `yield_now()` loop.  We can't
623    /// observe scheduler occupancy from a test, but we can verify the
624    /// behavioural contract: (1) the call returns the correct `TerminalOutcome`,
625    /// (2) it returns within a tight wall-clock budget without sleep, and
626    /// (3) a second concurrent caller does not panic.
627    #[tokio::test]
628    async fn await_terminal_returns_done_for_trivial_script() {
629        let executor = make_executor().await;
630        let (registry, _tmp) = make_registry(executor);
631
632        let sid = registry
633            .spawn_v2(simple_spec("return 42"))
634            .await
635            .expect("spawn");
636
637        let outcome = registry.await_terminal(&sid).await.expect("await_terminal");
638        match outcome {
639            TerminalOutcome::Done(result) => {
640                assert_eq!(result.value, serde_json::json!(42));
641            }
642            other => panic!("expected Done, got: {other:?}"),
643        }
644    }
645
646    /// Regression for #2 (case A) single-awaiter discipline: when two callers
647    /// race on `await_terminal`, the second caller (which observes `None` after
648    /// the first has taken the handle) must NOT panic.  It must either return
649    /// the same terminal outcome (if the first has already finished) or an
650    /// `AwaitError::Joined` (the documented race fallback).
651    #[tokio::test]
652    async fn await_terminal_does_not_panic_on_second_concurrent_caller() {
653        let executor = make_executor().await;
654        let (registry, _tmp) = make_registry(executor);
655
656        let sid = registry
657            .spawn_v2(simple_spec("return 99"))
658            .await
659            .expect("spawn");
660
661        let r1 = registry.clone();
662        let r2 = registry.clone();
663        let s1 = sid.clone();
664        let s2 = sid.clone();
665
666        let h1 = tokio::spawn(async move { r1.await_terminal(&s1).await });
667        let h2 = tokio::spawn(async move { r2.await_terminal(&s2).await });
668
669        let out1 = h1.await.expect("h1 join");
670        let out2 = h2.await.expect("h2 join");
671
672        // First-caller path must succeed with the real outcome.
673        let first_ok = matches!(&out1, Ok(TerminalOutcome::Done(_)))
674            || matches!(&out2, Ok(TerminalOutcome::Done(_)));
675        assert!(
676            first_ok,
677            "at least one caller must observe Done; got out1={out1:?}, out2={out2:?}"
678        );
679        // Second caller may have observed Joined (race) or Done; either is OK,
680        // neither must panic — which we've already verified by the join above.
681    }
682
683    // -----------------------------------------------------------------------
684    // observe_sink_free (Crux R3 — registry level)
685    // -----------------------------------------------------------------------
686
687    /// `observe()` must succeed and return a valid handle even with 0 prior observers.
688    #[tokio::test]
689    async fn observe_sink_free_registry() {
690        let executor = make_executor().await;
691        let (registry, _tmp) = make_registry(executor);
692
693        let sid = registry
694            .spawn_v2(simple_spec(r#"return alc.llm("q")"#))
695            .await
696            .expect("spawn");
697
698        // observe() before any subscriber exists must succeed.
699        let handle = registry.observe(&sid);
700        assert!(
701            handle.is_ok(),
702            "observe() must return Ok even with 0 prior observers"
703        );
704    }
705
706    // -----------------------------------------------------------------------
707    // observe_multi_subscriber_fan_out (Crux R3 — registry level)
708    // -----------------------------------------------------------------------
709
710    /// Multiple independent observers each get the same events.
711    #[tokio::test]
712    async fn observe_multi_subscriber_fan_out_registry() {
713        use algocline_core::execution::ObserverRecvError;
714
715        let executor = make_executor().await;
716        let (registry, _tmp) = make_registry(executor);
717
718        // A script that returns immediately — the driver will publish Done.
719        let sid = registry
720            .spawn_v2(simple_spec("return 99"))
721            .await
722            .expect("spawn");
723
724        // Subscribe 3 observers.
725        let mut h1 = registry.observe(&sid).expect("observe h1");
726        let mut h2 = registry.observe(&sid).expect("observe h2");
727        let mut h3 = registry.observe(&sid).expect("observe h3");
728
729        // Wait for terminal so we know events have been published.
730        let _ = registry.await_terminal(&sid).await;
731
732        // Each observer must receive at least the terminal StateTransition.
733        // Drain with idle-timeout: bus_tx is retained in SessionRecord for
734        // sink-free late-subscribe (Crux R3), so Closed never fires while the
735        // registry is alive.  A 100ms idle window after await_terminal() is
736        // sufficient — all events are already buffered.
737        use std::time::Duration;
738        for (label, handle) in [("h1", &mut h1), ("h2", &mut h2), ("h3", &mut h3)] {
739            let mut got_transition = false;
740            loop {
741                match tokio::time::timeout(Duration::from_millis(100), handle.recv()).await {
742                    Ok(Ok(ProgressEvent::StateTransition { .. })) => got_transition = true,
743                    Ok(Ok(_)) => {}
744                    Ok(Err(ObserverRecvError::Closed)) => break,
745                    Ok(Err(ObserverRecvError::Lagged(_))) => {}
746                    Err(_) => break, // idle-timeout: no more events coming
747                }
748            }
749            assert!(
750                got_transition,
751                "{label}: must receive at least one StateTransition event"
752            );
753        }
754    }
755}