Skip to main content

algocline_app/service/
execution_service_impl.rs

1//! `impl ExecutionService for AppService` — thin delegation to [`SessionRegistryV2`].
2//!
3//! All six verbs (`spawn`, `state`, `resume`, `cancel`, `observe`, `await_terminal`)
4//! delegate directly to `self.execution_registry` with no additional logic.
5//!
6//! # Crux constraints honoured
7//!
8//! - **R1 (Wire-concept exclusion)**: This file imports nothing from `rmcp`, contains
9//!   no `progressToken`, `_meta`, `notifications/`, or `mcp_`-prefixed identifiers.
10//! - **R2 (Cooperative cancellation)**: `cancel` calls
11//!   `execution_registry.cancel(id, reason)` only; no `JoinHandle::abort()` or
12//!   process-kill path exists in this file.
13//! - **R3 (Sink-free broadcast fan-out)**: `observe` is a sync `fn` that delegates
14//!   to `execution_registry.observe(id)`.  Multi-subscriber behaviour is verified by
15//!   `observe_multi_subscriber_fan_out_via_appservice` and
16//!   `observe_sink_free_when_no_subscribers` in the tests below.
17
18use algocline_core::execution::{
19    AwaitError, CancelError, CancelReason, ExecutionService, ExecutionState, ObserveError,
20    ObserverHandle, ResumeError, ResumeOutcome, ResumePayload, SessionId, SessionSpec, SpawnError,
21    StateError, TerminalOutcome,
22};
23use async_trait::async_trait;
24
25use crate::service::AppService;
26
27#[async_trait]
28impl ExecutionService for AppService {
29    async fn spawn(&self, spec: SessionSpec) -> Result<SessionId, SpawnError> {
30        self.execution_registry.spawn_v2(spec).await
31    }
32
33    async fn state(&self, id: &SessionId) -> Result<ExecutionState, StateError> {
34        self.execution_registry.state(id).await
35    }
36
37    async fn resume(
38        &self,
39        id: &SessionId,
40        payload: ResumePayload,
41    ) -> Result<ResumeOutcome, ResumeError> {
42        self.execution_registry.resume(id, payload).await
43    }
44
45    async fn cancel(&self, id: &SessionId, reason: CancelReason) -> Result<(), CancelError> {
46        self.execution_registry.cancel(id, reason).await
47    }
48
49    fn observe(&self, id: &SessionId) -> Result<Box<dyn ObserverHandle>, ObserveError> {
50        self.execution_registry.observe(id)
51    }
52
53    async fn await_terminal(&self, id: &SessionId) -> Result<TerminalOutcome, AwaitError> {
54        self.execution_registry.await_terminal(id).await
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use algocline_core::execution::{
61        CancelCode, CancelReason, ExecutionService, ExecutionState, ObserverRecvError,
62        ProgressEvent, ResumePayload, SessionSpec, SpecKind, TerminalOutcome,
63    };
64
65    use crate::service::test_support::make_app_service;
66
67    fn simple_spec(code: &str) -> SessionSpec {
68        SessionSpec {
69            kind: SpecKind::Run {
70                code: code.to_owned(),
71            },
72            project_root: None,
73            ctx: None,
74        }
75    }
76
77    fn user_cancel_reason() -> CancelReason {
78        CancelReason {
79            code: CancelCode::User,
80            detail: None,
81            requested_at: std::time::SystemTime::now()
82                .duration_since(std::time::UNIX_EPOCH)
83                .unwrap_or_default()
84                .as_millis() as i64,
85        }
86    }
87
88    // -----------------------------------------------------------------------
89    // spawn_returns_session_id_and_running_state
90    // -----------------------------------------------------------------------
91
92    /// `spawn()` → `state()` immediately returns Running (or Paused for LLM-touching scripts).
93    #[tokio::test]
94    async fn spawn_returns_session_id_and_running_state() {
95        let svc = make_app_service().await;
96        // Use a Lua script that pauses so we can observe the state before completion.
97        let sid = svc
98            .spawn(simple_spec(r#"return alc.llm("q")"#))
99            .await
100            .expect("spawn must succeed");
101
102        // Wait a short time for the driver to start.
103        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
104
105        let state = svc.state(&sid).await.expect("state must succeed");
106        assert!(
107            matches!(state, ExecutionState::Running | ExecutionState::Paused(_)),
108            "state immediately after spawn must be Running or Paused, got: {:?}",
109            state.tag()
110        );
111    }
112
113    // -----------------------------------------------------------------------
114    // spawn_run_lua_to_completion
115    // -----------------------------------------------------------------------
116
117    /// `SpecKind::Run { code: "return 42" }` → `await_terminal()` → `Terminal(Done(value=42))`.
118    #[tokio::test]
119    async fn spawn_run_lua_to_completion() {
120        let svc = make_app_service().await;
121        let sid = svc
122            .spawn(simple_spec("return 42"))
123            .await
124            .expect("spawn must succeed");
125
126        let outcome =
127            tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
128                .await
129                .expect("await_terminal must not timeout")
130                .expect("await_terminal must succeed");
131
132        match outcome {
133            TerminalOutcome::Done(result) => {
134                assert_eq!(
135                    result.value,
136                    serde_json::json!(42),
137                    "Done result value must be 42"
138                );
139            }
140            other => panic!("expected Done, got: {other:?}"),
141        }
142    }
143
144    // -----------------------------------------------------------------------
145    // spawn_lua_pause_publishes_progress_event
146    // -----------------------------------------------------------------------
147
148    /// Lua `alc.llm(...)` causes a pause → `observe()` receiver gets `ProgressEvent::PauseRequested`.
149    #[tokio::test]
150    async fn spawn_lua_pause_publishes_progress_event() {
151        let svc = make_app_service().await;
152
153        let sid = svc
154            .spawn(simple_spec(r#"return alc.llm("tell me something")"#))
155            .await
156            .expect("spawn must succeed");
157
158        // Subscribe before the pause event is published.
159        let mut handle = svc.observe(&sid).expect("observe must succeed");
160
161        // Wait for the PauseRequested event (with timeout).
162        let got_pause = tokio::time::timeout(std::time::Duration::from_secs(5), async {
163            loop {
164                match handle.recv().await {
165                    Ok(ProgressEvent::PauseRequested { .. }) => return true,
166                    Ok(_) => {}
167                    Err(ObserverRecvError::Closed) => return false,
168                    Err(ObserverRecvError::Lagged(_)) => {}
169                }
170            }
171        })
172        .await
173        .expect("must not timeout waiting for PauseRequested");
174
175        assert!(got_pause, "must receive PauseRequested event");
176    }
177
178    // -----------------------------------------------------------------------
179    // resume_continues_paused_session
180    // -----------------------------------------------------------------------
181
182    /// Pause → resume(`ResumePayload::Single { response, ... }`) → `ResumeOutcome::Continued`
183    /// and session eventually reaches `Done`.
184    #[tokio::test]
185    async fn resume_continues_paused_session() {
186        use algocline_core::execution::ResumeOutcome;
187
188        let svc = make_app_service().await;
189
190        // Strategy that calls alc.llm once and returns the response.
191        let sid = svc
192            .spawn(simple_spec(r#"return alc.llm("what is 1+1?")"#))
193            .await
194            .expect("spawn must succeed");
195
196        // Wait until the session reaches Paused, capturing the query_id.
197        let query_id = tokio::time::timeout(std::time::Duration::from_secs(5), async {
198            loop {
199                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
200                let state = svc.state(&sid).await.expect("state");
201                if let ExecutionState::Paused(info) = state {
202                    // The query id is the first pending prompt's id.
203                    if let Some(prompt) = info.prompts.first() {
204                        return prompt.query_id.clone();
205                    }
206                }
207            }
208        })
209        .await
210        .expect("must reach Paused state within timeout");
211
212        let outcome = svc
213            .resume(
214                &sid,
215                ResumePayload::Single {
216                    query_id,
217                    response: "2".into(),
218                    usage: None,
219                },
220            )
221            .await
222            .expect("resume must succeed");
223
224        assert!(
225            matches!(outcome, ResumeOutcome::Continued),
226            "resume must return Continued, got: {outcome:?}"
227        );
228
229        // The session should now complete.
230        let terminal =
231            tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
232                .await
233                .expect("await_terminal must not timeout")
234                .expect("await_terminal must succeed");
235
236        assert!(
237            matches!(terminal, TerminalOutcome::Done(_)),
238            "session must complete as Done after resume, got: {terminal:?}"
239        );
240    }
241
242    // -----------------------------------------------------------------------
243    // cancel_running_session_transitions_to_cancelled
244    // -----------------------------------------------------------------------
245
246    /// Paused Lua session → `cancel()` → `await_terminal()` → `Cancelled(info)`
247    /// with `info.reason.code == CancelCode::User`.
248    ///
249    /// Uses `alc.llm(...)` to enter Paused state so that `cancel()` can exercise
250    /// the direct Paused→Cancelled transition in `registry::cancel()` without
251    /// requiring the driver to reach a cooperative-cancellation checkpoint
252    /// (which a tight CPU loop would never yield to).
253    #[tokio::test]
254    async fn cancel_running_session_transitions_to_cancelled() {
255        let svc = make_app_service().await;
256
257        // Spawn a session that pauses waiting for an LLM response.
258        let sid = svc
259            .spawn(simple_spec(r#"return alc.llm("cancel me")"#))
260            .await
261            .expect("spawn must succeed");
262
263        // Wait until the session reaches Paused state.
264        tokio::time::timeout(std::time::Duration::from_secs(5), async {
265            loop {
266                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
267                let state = svc.state(&sid).await.expect("state");
268                if matches!(state, ExecutionState::Paused(_)) {
269                    return;
270                }
271            }
272        })
273        .await
274        .expect("session must reach Paused state within timeout");
275
276        svc.cancel(&sid, user_cancel_reason())
277            .await
278            .expect("cancel must succeed");
279
280        let terminal =
281            tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
282                .await
283                .expect("await_terminal must not timeout")
284                .expect("await_terminal must succeed");
285
286        match terminal {
287            TerminalOutcome::Cancelled(info) => {
288                assert_eq!(
289                    info.reason.code,
290                    CancelCode::User,
291                    "cancel code must be User"
292                );
293            }
294            other => panic!("expected Cancelled, got: {other:?}"),
295        }
296    }
297
298    // -----------------------------------------------------------------------
299    // cancel_idempotent_returns_ok  (Crux R2)
300    // -----------------------------------------------------------------------
301
302    /// Calling `cancel()` twice on the same paused session must both return `Ok(())`.
303    /// The first `CancelInfo` must remain in the terminal state.
304    #[tokio::test]
305    async fn cancel_idempotent_returns_ok() {
306        let svc = make_app_service().await;
307
308        // Spawn a session that pauses waiting for an LLM response.
309        let sid = svc
310            .spawn(simple_spec(r#"return alc.llm("cancel me")"#))
311            .await
312            .expect("spawn must succeed");
313
314        // Wait until the session reaches Paused state.
315        tokio::time::timeout(std::time::Duration::from_secs(5), async {
316            loop {
317                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
318                let state = svc.state(&sid).await.expect("state");
319                if matches!(state, ExecutionState::Paused(_)) {
320                    return;
321                }
322            }
323        })
324        .await
325        .expect("session must reach Paused state within timeout");
326
327        svc.cancel(&sid, user_cancel_reason())
328            .await
329            .expect("first cancel must return Ok");
330
331        // Second cancel on the same session must also return Ok.
332        svc.cancel(&sid, user_cancel_reason())
333            .await
334            .expect("second cancel must return Ok (idempotent)");
335
336        // State must reflect the first cancel's info.
337        let terminal =
338            tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
339                .await
340                .expect("await_terminal must not timeout")
341                .expect("await_terminal must succeed");
342
343        assert!(
344            matches!(terminal, TerminalOutcome::Cancelled(_)),
345            "session must be Cancelled, got: {terminal:?}"
346        );
347    }
348
349    // -----------------------------------------------------------------------
350    // observe_multi_subscriber_fan_out_via_appservice  (Crux R3)
351    // -----------------------------------------------------------------------
352
353    /// Two independent `observe()` calls must each receive the full event stream.
354    /// Neither subscriber affects the other.
355    #[tokio::test]
356    async fn observe_multi_subscriber_fan_out_via_appservice() {
357        let svc = make_app_service().await;
358
359        let sid = svc
360            .spawn(simple_spec("return 99"))
361            .await
362            .expect("spawn must succeed");
363
364        // Subscribe two independent handles before the session terminates.
365        let mut h1 = svc.observe(&sid).expect("observe h1 must succeed");
366        let mut h2 = svc.observe(&sid).expect("observe h2 must succeed");
367
368        // Wait for terminal so events have been published.
369        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
370            .await
371            .expect("await_terminal must not timeout");
372
373        // Each handle must receive at least one StateTransition event.
374        //
375        // The recv loop is bounded by a 100ms idle-timeout per receive:
376        // `SessionRegistryV2` is sink-free (registry retains `bus_tx` so late
377        // `observe()` calls can still subscribe), so `bus_tx` does NOT drop
378        // when the session reaches a terminal state — `Closed` would never
379        // fire and a bare `loop { recv().await }` blocks forever (the
380        // observed 7-min worktree hang). Timeout exit is correct: after
381        // `await_terminal()` succeeded, all events have been published, so
382        // a 100ms idle window past the last event proves nothing more is
383        // coming.
384        use std::time::Duration;
385        for (label, handle) in [("h1", &mut h1), ("h2", &mut h2)] {
386            let mut got_transition = false;
387            loop {
388                match tokio::time::timeout(Duration::from_millis(100), handle.recv()).await {
389                    Ok(Ok(ProgressEvent::StateTransition { .. })) => got_transition = true,
390                    Ok(Ok(_)) => {}
391                    Ok(Err(ObserverRecvError::Closed)) => break,
392                    Ok(Err(ObserverRecvError::Lagged(_))) => {}
393                    Err(_) => break, // idle-timeout: no more events coming
394                }
395            }
396            assert!(
397                got_transition,
398                "{label}: must receive at least one StateTransition event"
399            );
400        }
401    }
402
403    // -----------------------------------------------------------------------
404    // observe_sink_free_when_no_subscribers  (Crux R3)
405    // -----------------------------------------------------------------------
406
407    /// Spawn a session without calling `observe()` first.  `await_terminal()` must
408    /// complete normally (sink-free: 0 observers do not stall execution).
409    /// After terminal, `observe()` must either return a valid handle
410    /// (if session still in registry) or `ObserveError::NotFound` (if GC'd).
411    #[tokio::test]
412    async fn observe_sink_free_when_no_subscribers() {
413        use algocline_core::execution::ObserveError;
414
415        let svc = make_app_service().await;
416
417        let sid = svc
418            .spawn(simple_spec("return 1"))
419            .await
420            .expect("spawn must succeed");
421
422        // Do NOT observe — no subscribers at all.
423
424        let terminal =
425            tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
426                .await
427                .expect("await_terminal must not timeout even with 0 observers")
428                .expect("await_terminal must succeed");
429
430        assert!(
431            matches!(terminal, TerminalOutcome::Done(_)),
432            "session must complete as Done, got: {terminal:?}"
433        );
434
435        // After terminal, observe either returns a valid handle (session still alive)
436        // or NotFound (session GC'd) — both are acceptable per the spec.
437        match svc.observe(&sid) {
438            Ok(_handle) => {
439                // Session still in registry — valid.
440            }
441            Err(ObserveError::NotFound(_)) => {
442                // Session GC'd — also valid.
443            }
444        }
445    }
446}