1use algocline_core::execution::{
19 AwaitError, CancelError, CancelReason, ExecutionService, ExecutionState, ObserveError,
20 ObserverHandle, ResumeError, ResumeOutcome, ResumePayload, SessionId, SessionSpec, SpawnError,
21 StateError, TerminalOutcome,
22};
23use async_trait::async_trait;
24
25use crate::service::AppService;
26
27#[async_trait]
28impl ExecutionService for AppService {
29 async fn spawn(&self, spec: SessionSpec) -> Result<SessionId, SpawnError> {
30 self.execution_registry.spawn_v2(spec).await
31 }
32
33 async fn state(&self, id: &SessionId) -> Result<ExecutionState, StateError> {
34 self.execution_registry.state(id).await
35 }
36
37 async fn resume(
38 &self,
39 id: &SessionId,
40 payload: ResumePayload,
41 ) -> Result<ResumeOutcome, ResumeError> {
42 self.execution_registry.resume(id, payload).await
43 }
44
45 async fn cancel(&self, id: &SessionId, reason: CancelReason) -> Result<(), CancelError> {
46 self.execution_registry.cancel(id, reason).await
47 }
48
49 fn observe(&self, id: &SessionId) -> Result<Box<dyn ObserverHandle>, ObserveError> {
50 self.execution_registry.observe(id)
51 }
52
53 async fn await_terminal(&self, id: &SessionId) -> Result<TerminalOutcome, AwaitError> {
54 self.execution_registry.await_terminal(id).await
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use algocline_core::execution::{
61 CancelCode, CancelReason, ExecutionService, ExecutionState, ObserverRecvError,
62 ProgressEvent, ResumePayload, SessionSpec, SpecKind, TerminalOutcome,
63 };
64
65 use crate::service::test_support::make_app_service;
66
67 fn simple_spec(code: &str) -> SessionSpec {
68 SessionSpec {
69 kind: SpecKind::Run {
70 code: code.to_owned(),
71 },
72 project_root: None,
73 ctx: None,
74 }
75 }
76
77 fn user_cancel_reason() -> CancelReason {
78 CancelReason {
79 code: CancelCode::User,
80 detail: None,
81 requested_at: std::time::SystemTime::now()
82 .duration_since(std::time::UNIX_EPOCH)
83 .unwrap_or_default()
84 .as_millis() as i64,
85 }
86 }
87
88 #[tokio::test]
94 async fn spawn_returns_session_id_and_running_state() {
95 let svc = make_app_service().await;
96 let sid = svc
98 .spawn(simple_spec(r#"return alc.llm("q")"#))
99 .await
100 .expect("spawn must succeed");
101
102 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
104
105 let state = svc.state(&sid).await.expect("state must succeed");
106 assert!(
107 matches!(state, ExecutionState::Running | ExecutionState::Paused(_)),
108 "state immediately after spawn must be Running or Paused, got: {:?}",
109 state.tag()
110 );
111 }
112
113 #[tokio::test]
119 async fn spawn_run_lua_to_completion() {
120 let svc = make_app_service().await;
121 let sid = svc
122 .spawn(simple_spec("return 42"))
123 .await
124 .expect("spawn must succeed");
125
126 let outcome =
127 tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
128 .await
129 .expect("await_terminal must not timeout")
130 .expect("await_terminal must succeed");
131
132 match outcome {
133 TerminalOutcome::Done(result) => {
134 assert_eq!(
135 result.value,
136 serde_json::json!(42),
137 "Done result value must be 42"
138 );
139 }
140 other => panic!("expected Done, got: {other:?}"),
141 }
142 }
143
144 #[tokio::test]
150 async fn spawn_lua_pause_publishes_progress_event() {
151 let svc = make_app_service().await;
152
153 let sid = svc
154 .spawn(simple_spec(r#"return alc.llm("tell me something")"#))
155 .await
156 .expect("spawn must succeed");
157
158 let mut handle = svc.observe(&sid).expect("observe must succeed");
160
161 let got_pause = tokio::time::timeout(std::time::Duration::from_secs(5), async {
163 loop {
164 match handle.recv().await {
165 Ok(ProgressEvent::PauseRequested { .. }) => return true,
166 Ok(_) => {}
167 Err(ObserverRecvError::Closed) => return false,
168 Err(ObserverRecvError::Lagged(_)) => {}
169 }
170 }
171 })
172 .await
173 .expect("must not timeout waiting for PauseRequested");
174
175 assert!(got_pause, "must receive PauseRequested event");
176 }
177
178 #[tokio::test]
185 async fn resume_continues_paused_session() {
186 use algocline_core::execution::ResumeOutcome;
187
188 let svc = make_app_service().await;
189
190 let sid = svc
192 .spawn(simple_spec(r#"return alc.llm("what is 1+1?")"#))
193 .await
194 .expect("spawn must succeed");
195
196 let query_id = tokio::time::timeout(std::time::Duration::from_secs(5), async {
198 loop {
199 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
200 let state = svc.state(&sid).await.expect("state");
201 if let ExecutionState::Paused(info) = state {
202 if let Some(prompt) = info.prompts.first() {
204 return prompt.query_id.clone();
205 }
206 }
207 }
208 })
209 .await
210 .expect("must reach Paused state within timeout");
211
212 let outcome = svc
213 .resume(
214 &sid,
215 ResumePayload::Single {
216 query_id,
217 response: "2".into(),
218 usage: None,
219 },
220 )
221 .await
222 .expect("resume must succeed");
223
224 assert!(
225 matches!(outcome, ResumeOutcome::Continued),
226 "resume must return Continued, got: {outcome:?}"
227 );
228
229 let terminal =
231 tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
232 .await
233 .expect("await_terminal must not timeout")
234 .expect("await_terminal must succeed");
235
236 assert!(
237 matches!(terminal, TerminalOutcome::Done(_)),
238 "session must complete as Done after resume, got: {terminal:?}"
239 );
240 }
241
242 #[tokio::test]
254 async fn cancel_running_session_transitions_to_cancelled() {
255 let svc = make_app_service().await;
256
257 let sid = svc
259 .spawn(simple_spec(r#"return alc.llm("cancel me")"#))
260 .await
261 .expect("spawn must succeed");
262
263 tokio::time::timeout(std::time::Duration::from_secs(5), async {
265 loop {
266 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
267 let state = svc.state(&sid).await.expect("state");
268 if matches!(state, ExecutionState::Paused(_)) {
269 return;
270 }
271 }
272 })
273 .await
274 .expect("session must reach Paused state within timeout");
275
276 svc.cancel(&sid, user_cancel_reason())
277 .await
278 .expect("cancel must succeed");
279
280 let terminal =
281 tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
282 .await
283 .expect("await_terminal must not timeout")
284 .expect("await_terminal must succeed");
285
286 match terminal {
287 TerminalOutcome::Cancelled(info) => {
288 assert_eq!(
289 info.reason.code,
290 CancelCode::User,
291 "cancel code must be User"
292 );
293 }
294 other => panic!("expected Cancelled, got: {other:?}"),
295 }
296 }
297
298 #[tokio::test]
305 async fn cancel_idempotent_returns_ok() {
306 let svc = make_app_service().await;
307
308 let sid = svc
310 .spawn(simple_spec(r#"return alc.llm("cancel me")"#))
311 .await
312 .expect("spawn must succeed");
313
314 tokio::time::timeout(std::time::Duration::from_secs(5), async {
316 loop {
317 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
318 let state = svc.state(&sid).await.expect("state");
319 if matches!(state, ExecutionState::Paused(_)) {
320 return;
321 }
322 }
323 })
324 .await
325 .expect("session must reach Paused state within timeout");
326
327 svc.cancel(&sid, user_cancel_reason())
328 .await
329 .expect("first cancel must return Ok");
330
331 svc.cancel(&sid, user_cancel_reason())
333 .await
334 .expect("second cancel must return Ok (idempotent)");
335
336 let terminal =
338 tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
339 .await
340 .expect("await_terminal must not timeout")
341 .expect("await_terminal must succeed");
342
343 assert!(
344 matches!(terminal, TerminalOutcome::Cancelled(_)),
345 "session must be Cancelled, got: {terminal:?}"
346 );
347 }
348
349 #[tokio::test]
356 async fn observe_multi_subscriber_fan_out_via_appservice() {
357 let svc = make_app_service().await;
358
359 let sid = svc
360 .spawn(simple_spec("return 99"))
361 .await
362 .expect("spawn must succeed");
363
364 let mut h1 = svc.observe(&sid).expect("observe h1 must succeed");
366 let mut h2 = svc.observe(&sid).expect("observe h2 must succeed");
367
368 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
370 .await
371 .expect("await_terminal must not timeout");
372
373 use std::time::Duration;
385 for (label, handle) in [("h1", &mut h1), ("h2", &mut h2)] {
386 let mut got_transition = false;
387 loop {
388 match tokio::time::timeout(Duration::from_millis(100), handle.recv()).await {
389 Ok(Ok(ProgressEvent::StateTransition { .. })) => got_transition = true,
390 Ok(Ok(_)) => {}
391 Ok(Err(ObserverRecvError::Closed)) => break,
392 Ok(Err(ObserverRecvError::Lagged(_))) => {}
393 Err(_) => break, }
395 }
396 assert!(
397 got_transition,
398 "{label}: must receive at least one StateTransition event"
399 );
400 }
401 }
402
403 #[tokio::test]
412 async fn observe_sink_free_when_no_subscribers() {
413 use algocline_core::execution::ObserveError;
414
415 let svc = make_app_service().await;
416
417 let sid = svc
418 .spawn(simple_spec("return 1"))
419 .await
420 .expect("spawn must succeed");
421
422 let terminal =
425 tokio::time::timeout(std::time::Duration::from_secs(5), svc.await_terminal(&sid))
426 .await
427 .expect("await_terminal must not timeout even with 0 observers")
428 .expect("await_terminal must succeed");
429
430 assert!(
431 matches!(terminal, TerminalOutcome::Done(_)),
432 "session must complete as Done, got: {terminal:?}"
433 );
434
435 match svc.observe(&sid) {
438 Ok(_handle) => {
439 }
441 Err(ObserveError::NotFound(_)) => {
442 }
444 }
445 }
446}