1use super::checkpoint::WorkflowCheckpoint;
9use super::executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome};
10use crate::agent::AgentEvent;
11use crate::ordered_parallel::run_ordered_parallel_with_limit;
12use crate::store::SessionStore;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::broadcast;
16
17fn now_epoch_ms() -> u64 {
18 std::time::SystemTime::now()
19 .duration_since(std::time::UNIX_EPOCH)
20 .map(|d| d.as_millis() as u64)
21 .unwrap_or(0)
22}
23
24pub type PipelineStage<I> =
31 Arc<dyn Fn(Option<&StepOutcome>, &I) -> Option<AgentStepSpec> + Send + Sync>;
32
33pub async fn execute_pipeline<I>(
47 executor: Arc<dyn AgentExecutor>,
48 items: Vec<I>,
49 stages: Vec<PipelineStage<I>>,
50 event_tx: Option<broadcast::Sender<AgentEvent>>,
51) -> Vec<Option<StepOutcome>>
52where
53 I: Send + 'static,
54{
55 let limit = executor.concurrency_hint();
56 let stages = Arc::new(stages);
57
58 let results = run_ordered_parallel_with_limit(items, limit, move |_idx, item| {
59 let executor = Arc::clone(&executor);
60 let stages = Arc::clone(&stages);
61 let event_tx = event_tx.clone();
62 async move {
63 let mut prev: Option<StepOutcome> = None;
64 for stage in stages.iter() {
65 let Some(spec) = stage(prev.as_ref(), &item) else {
66 break;
67 };
68 let outcome = executor.execute_step(spec, event_tx.clone()).await;
69 let succeeded = outcome.success;
70 prev = Some(outcome);
71 if !succeeded {
72 break;
73 }
74 }
75 prev
76 }
77 })
78 .await;
79
80 results
83 .into_iter()
84 .map(|result| result.output.unwrap_or(None))
85 .collect()
86}
87
88pub async fn execute_steps_parallel_resumable(
103 executor: Arc<dyn AgentExecutor>,
104 specs: Vec<AgentStepSpec>,
105 workflow_id: &str,
106 store: Arc<dyn SessionStore>,
107 event_tx: Option<broadcast::Sender<AgentEvent>>,
108) -> Vec<StepOutcome> {
109 let done: HashMap<String, StepOutcome> = match store.load_workflow_checkpoint(workflow_id).await
115 {
116 Ok(Some(cp)) => cp.completed(),
117 Ok(None) => HashMap::new(),
118 Err(e) => {
119 tracing::warn!(
120 workflow_id = %workflow_id,
121 error = %e,
122 "workflow checkpoint unreadable; re-running the workflow from scratch"
123 );
124 HashMap::new()
125 }
126 };
127
128 let pending: Vec<AgentStepSpec> = specs
129 .iter()
130 .filter(|s| !done.contains_key(&s.task_id))
131 .cloned()
132 .collect();
133 let labels: Vec<(String, String)> = pending
134 .iter()
135 .map(|s| (s.task_id.clone(), s.agent.clone()))
136 .collect();
137
138 let acc = Arc::new(tokio::sync::Mutex::new(done.clone()));
140 let limit = executor.concurrency_hint();
141 let workflow_id_owned = workflow_id.to_string();
142 let store_steps = Arc::clone(&store);
143
144 let results = run_ordered_parallel_with_limit(pending, limit, move |_idx, spec| {
145 let executor = Arc::clone(&executor);
146 let event_tx = event_tx.clone();
147 let acc = Arc::clone(&acc);
148 let store = Arc::clone(&store_steps);
149 let workflow_id = workflow_id_owned.clone();
150 async move {
151 let outcome = executor.execute_step(spec, event_tx).await;
152 if outcome.success {
156 let mut guard = acc.lock().await;
157 guard.insert(outcome.task_id.clone(), outcome.clone());
158 let checkpoint =
159 WorkflowCheckpoint::from_completed(&workflow_id, &guard, now_epoch_ms());
160 if let Err(e) = store
161 .save_workflow_checkpoint(&workflow_id, &checkpoint)
162 .await
163 {
164 tracing::warn!(
166 workflow_id = %workflow_id,
167 error = %e,
168 "workflow checkpoint save failed; run continues"
169 );
170 }
171 }
172 outcome
173 }
174 })
175 .await;
176
177 let mut fresh: HashMap<String, StepOutcome> = HashMap::new();
178 for result in results {
179 match result.output {
180 Ok(outcome) => {
181 fresh.insert(outcome.task_id.clone(), outcome);
182 }
183 Err(error) => {
184 if let Some((task_id, agent)) = labels.get(result.index).cloned() {
185 fresh.insert(
186 task_id.clone(),
187 StepOutcome::failed(task_id, agent, error.to_string()),
188 );
189 }
190 }
191 }
192 }
193
194 let merged: Vec<StepOutcome> = specs
196 .iter()
197 .map(|s| {
198 done.get(&s.task_id)
199 .cloned()
200 .or_else(|| fresh.remove(&s.task_id))
201 .unwrap_or_else(|| {
202 StepOutcome::failed(
203 s.task_id.clone(),
204 s.agent.clone(),
205 "step produced no outcome",
206 )
207 })
208 })
209 .collect();
210
211 if merged.iter().all(|o| o.success) {
212 let _ = store.delete_workflow_checkpoint(workflow_id).await;
213 }
214 merged
215}
216
217pub enum LoopDecision {
219 Continue(Vec<AgentStepSpec>),
221 Stop,
223}
224
225pub async fn execute_loop<F>(
241 executor: Arc<dyn AgentExecutor>,
242 initial: Vec<AgentStepSpec>,
243 max_iterations: usize,
244 event_tx: Option<broadcast::Sender<AgentEvent>>,
245 mut next: F,
246) -> Vec<StepOutcome>
247where
248 F: FnMut(&[StepOutcome]) -> LoopDecision + Send,
249{
250 let cap = max_iterations.max(1);
251 let mut specs = initial;
252 let mut last = Vec::new();
253 let mut iterations = 0;
254
255 while !specs.is_empty() {
256 let round = execute_steps_parallel(
257 Arc::clone(&executor),
258 std::mem::take(&mut specs),
259 event_tx.clone(),
260 )
261 .await;
262 iterations += 1;
263 let decision = next(&round);
264 last = round;
265 match decision {
266 LoopDecision::Continue(more) if iterations < cap => specs = more,
267 _ => break,
268 }
269 }
270
271 last
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use async_trait::async_trait;
278 use std::sync::atomic::{AtomicUsize, Ordering};
279 use std::time::Duration;
280
281 struct EchoExecutor {
284 active: Arc<AtomicUsize>,
285 max_active: Arc<AtomicUsize>,
286 }
287
288 impl EchoExecutor {
289 fn new() -> Self {
290 Self {
291 active: Arc::new(AtomicUsize::new(0)),
292 max_active: Arc::new(AtomicUsize::new(0)),
293 }
294 }
295 }
296
297 #[async_trait]
298 impl AgentExecutor for EchoExecutor {
299 async fn execute_step(
300 &self,
301 spec: AgentStepSpec,
302 _event_tx: Option<broadcast::Sender<AgentEvent>>,
303 ) -> StepOutcome {
304 let now = self.active.fetch_add(1, Ordering::SeqCst) + 1;
305 self.max_active.fetch_max(now, Ordering::SeqCst);
306 tokio::time::sleep(Duration::from_millis(15)).await;
307 self.active.fetch_sub(1, Ordering::SeqCst);
308 assert!(spec.agent != "boom", "boom");
309 StepOutcome {
310 task_id: spec.task_id.clone(),
311 session_id: format!("task-run-{}", spec.task_id),
312 agent: spec.agent.clone(),
313 output: spec.prompt.clone(),
314 success: spec.agent != "fail",
315 structured: None,
316 }
317 }
318 fn concurrency_hint(&self) -> usize {
319 4
320 }
321 }
322
323 fn stage<I, F>(f: F) -> PipelineStage<I>
324 where
325 F: Fn(Option<&StepOutcome>, &I) -> Option<AgentStepSpec> + Send + Sync + 'static,
326 {
327 Arc::new(f)
328 }
329
330 #[tokio::test]
331 async fn each_item_chains_through_stages_and_later_stages_see_prior_output() {
332 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
333 let stages = vec![
336 stage(|_prev: Option<&StepOutcome>, item: &&str| {
337 Some(AgentStepSpec::new("s1", "explore", "d", *item))
338 }),
339 stage(|prev: Option<&StepOutcome>, _item: &&str| {
340 let prior = prev.map(|o| o.output.clone()).unwrap_or_default();
341 Some(AgentStepSpec::new(
342 "s2",
343 "review",
344 "d",
345 format!("review of: {prior}"),
346 ))
347 }),
348 ];
349 let out = execute_pipeline(exec, vec!["alpha", "beta"], stages, None).await;
350
351 assert_eq!(out.len(), 2, "one result per item, order preserved");
352 assert_eq!(out[0].as_ref().unwrap().output, "review of: alpha");
355 assert_eq!(out[1].as_ref().unwrap().output, "review of: beta");
356 assert!(out.iter().all(|o| o.as_ref().unwrap().success));
357 }
358
359 #[tokio::test]
360 async fn chain_stops_on_failure_and_on_none_stage() {
361 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
362 let stages = vec![
365 stage(|_p: Option<&StepOutcome>, item: &&str| {
366 let agent = if *item == "x" { "fail" } else { "explore" };
367 Some(AgentStepSpec::new("s1", agent, "d", *item))
368 }),
369 stage(|_p: Option<&StepOutcome>, item: &&str| {
370 if *item == "y" {
371 None } else {
373 Some(AgentStepSpec::new("s2", "review", "d", "second"))
374 }
375 }),
376 ];
377 let out = execute_pipeline(exec, vec!["x", "y"], stages, None).await;
378
379 let first = out[0].as_ref().unwrap();
380 assert!(!first.success, "failed stage 1 surfaces");
381 assert_eq!(
382 first.output, "x",
383 "stage 2 did not run after stage 1 failed"
384 );
385
386 let second = out[1].as_ref().unwrap();
387 assert!(second.success);
388 assert_eq!(
389 second.output, "y",
390 "stage 2 returned None → chain stopped at stage 1"
391 );
392 }
393
394 #[tokio::test]
395 async fn no_barrier_between_stages_bounded_by_hint() {
396 let echo = EchoExecutor::new();
397 let max_active = Arc::clone(&echo.max_active);
398 let exec: Arc<dyn AgentExecutor> = Arc::new(echo);
399 let stages = vec![
400 stage(|_p: Option<&StepOutcome>, item: &usize| {
401 Some(AgentStepSpec::new(
402 format!("s1-{item}"),
403 "explore",
404 "d",
405 "p",
406 ))
407 }),
408 stage(|_p: Option<&StepOutcome>, item: &usize| {
409 Some(AgentStepSpec::new(format!("s2-{item}"), "review", "d", "p"))
410 }),
411 ];
412 let items: Vec<usize> = (0..8).collect();
413 let out = execute_pipeline(exec, items, stages, None).await;
414 assert_eq!(out.len(), 8);
415 assert!(out.iter().all(|o| o.is_some()));
416 assert!(
418 max_active.load(Ordering::SeqCst) <= 4,
419 "concurrency never exceeds the executor's hint"
420 );
421 }
422
423 #[tokio::test]
424 async fn panicking_stage_isolates_to_its_chain() {
425 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
426 let stages = vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
427 Some(AgentStepSpec::new("s1", *item, "d", "p"))
429 })];
430 let out = execute_pipeline(exec, vec!["explore", "boom", "review"], stages, None).await;
431 assert_eq!(out.len(), 3);
432 assert!(out[0].as_ref().unwrap().success);
433 assert!(out[1].is_none(), "panicked chain becomes None, not a drop");
434 assert!(out[2].as_ref().unwrap().success, "later chains unaffected");
435 }
436
437 struct RecordingExecutor {
439 ran: Arc<tokio::sync::Mutex<Vec<String>>>,
440 }
441
442 #[async_trait]
443 impl AgentExecutor for RecordingExecutor {
444 async fn execute_step(
445 &self,
446 spec: AgentStepSpec,
447 _event_tx: Option<broadcast::Sender<AgentEvent>>,
448 ) -> StepOutcome {
449 self.ran.lock().await.push(spec.task_id.clone());
450 StepOutcome {
451 task_id: spec.task_id.clone(),
452 session_id: format!("task-run-{}", spec.task_id),
453 agent: spec.agent.clone(),
454 output: format!("ran:{}", spec.task_id),
455 success: true,
456 structured: None,
457 }
458 }
459 fn concurrency_hint(&self) -> usize {
460 4
461 }
462 }
463
464 #[tokio::test]
465 async fn resumable_skips_completed_then_clears_on_success() {
466 use crate::store::MemorySessionStore;
467 let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
468
469 let mut done = std::collections::HashMap::new();
472 done.insert(
473 "a".to_string(),
474 StepOutcome {
475 task_id: "a".into(),
476 session_id: "task-run-a".into(),
477 agent: "explore".into(),
478 output: "cached-a".into(),
479 success: true,
480 structured: None,
481 },
482 );
483 store
484 .save_workflow_checkpoint(
485 "wf-1",
486 &WorkflowCheckpoint::from_completed("wf-1", &done, 1),
487 )
488 .await
489 .unwrap();
490
491 let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
494 let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
495 ran: Arc::clone(&ran),
496 });
497 let specs = vec![
498 AgentStepSpec::new("a", "explore", "d", "pa"),
499 AgentStepSpec::new("b", "review", "d", "pb"),
500 ];
501
502 let out =
503 execute_steps_parallel_resumable(exec, specs, "wf-1", Arc::clone(&store), None).await;
504
505 assert_eq!(
506 *ran.lock().await,
507 vec!["b".to_string()],
508 "only the not-yet-completed step runs"
509 );
510 assert_eq!(out.len(), 2);
511 assert_eq!(out[0].task_id, "a");
512 assert_eq!(
513 out[0].output, "cached-a",
514 "completed step returns its cached outcome, unchanged"
515 );
516 assert_eq!(out[1].task_id, "b");
517 assert!(out.iter().all(|o| o.success));
518 assert!(
519 store
520 .load_workflow_checkpoint("wf-1")
521 .await
522 .unwrap()
523 .is_none(),
524 "a fully-succeeded workflow clears its checkpoint"
525 );
526 }
527
528 #[tokio::test]
529 async fn resumable_retains_checkpoint_recording_only_successes_on_partial_failure() {
530 use crate::store::MemorySessionStore;
531 let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
532 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
534 let specs = vec![
535 AgentStepSpec::new("ok", "explore", "d", "p"),
536 AgentStepSpec::new("bad", "fail", "d", "p"),
537 ];
538
539 let out =
540 execute_steps_parallel_resumable(exec, specs, "wf-2", Arc::clone(&store), None).await;
541 assert!(out[0].success);
542 assert!(!out[1].success);
543
544 let cp = store
547 .load_workflow_checkpoint("wf-2")
548 .await
549 .unwrap()
550 .expect("checkpoint retained on partial failure");
551 let completed = cp.completed();
552 assert!(completed.contains_key("ok"), "succeeded step is recorded");
553 assert!(
554 !completed.contains_key("bad"),
555 "failed step is NOT recorded → it retries on resume"
556 );
557 }
558
559 struct ZeroHintExecutor;
560 #[async_trait]
561 impl AgentExecutor for ZeroHintExecutor {
562 async fn execute_step(
563 &self,
564 spec: AgentStepSpec,
565 _event_tx: Option<broadcast::Sender<AgentEvent>>,
566 ) -> StepOutcome {
567 StepOutcome {
568 task_id: spec.task_id.clone(),
569 session_id: format!("task-run-{}", spec.task_id),
570 agent: spec.agent.clone(),
571 output: "ok".to_string(),
572 success: true,
573 structured: None,
574 }
575 }
576 fn concurrency_hint(&self) -> usize {
577 0
578 }
579 }
580
581 #[tokio::test]
582 async fn empty_inputs_return_empty() {
583 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
584 assert!(
585 crate::orchestration::execute_steps_parallel(Arc::clone(&exec), vec![], None)
586 .await
587 .is_empty()
588 );
589 let stages: Vec<PipelineStage<&str>> =
590 vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
591 Some(AgentStepSpec::new("s", "explore", "d", *item))
592 })];
593 assert!(execute_pipeline(exec, Vec::<&str>::new(), stages, None)
594 .await
595 .is_empty());
596 }
597
598 #[tokio::test]
599 async fn zero_concurrency_hint_still_makes_progress() {
600 let exec: Arc<dyn AgentExecutor> = Arc::new(ZeroHintExecutor);
603 let specs = vec![
604 AgentStepSpec::new("a", "explore", "d", "p"),
605 AgentStepSpec::new("b", "explore", "d", "p"),
606 AgentStepSpec::new("c", "explore", "d", "p"),
607 ];
608 let out = crate::orchestration::execute_steps_parallel(exec, specs, None).await;
609 assert_eq!(
610 out.iter().map(|o| o.task_id.as_str()).collect::<Vec<_>>(),
611 vec!["a", "b", "c"]
612 );
613 assert!(out.iter().all(|o| o.success));
614 }
615
616 #[tokio::test]
617 async fn pipeline_first_stage_none_yields_none_outcome() {
618 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
619 let stages: Vec<PipelineStage<&str>> =
620 vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
621 if *item == "skip" {
622 None
623 } else {
624 Some(AgentStepSpec::new("s", "explore", "d", *item))
625 }
626 })];
627 let out = execute_pipeline(exec, vec!["skip", "run"], stages, None).await;
628 assert!(
629 out[0].is_none(),
630 "a first-stage None yields a None outcome (chain never started)"
631 );
632 assert!(out[1].as_ref().unwrap().success);
633 }
634
635 fn cached(task_id: &str, agent: &str, output: &str) -> StepOutcome {
636 StepOutcome {
637 task_id: task_id.to_string(),
638 session_id: format!("task-run-{task_id}"),
639 agent: agent.to_string(),
640 output: output.to_string(),
641 success: true,
642 structured: None,
643 }
644 }
645
646 #[tokio::test]
647 async fn resumable_reruns_all_when_checkpoint_load_errors() {
648 use crate::store::MemorySessionStore;
649 let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
650
651 let mut done = std::collections::HashMap::new();
656 done.insert("a".to_string(), cached("a", "explore", "old"));
657 let mut cp = WorkflowCheckpoint::from_completed("wf-err", &done, 1);
658 cp.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1;
659 store.save_workflow_checkpoint("wf-err", &cp).await.unwrap();
660
661 let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
662 let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
663 ran: Arc::clone(&ran),
664 });
665 let specs = vec![
666 AgentStepSpec::new("a", "explore", "d", "pa"),
667 AgentStepSpec::new("b", "review", "d", "pb"),
668 ];
669 let out =
670 execute_steps_parallel_resumable(exec, specs, "wf-err", Arc::clone(&store), None).await;
671
672 let mut ran_ids = ran.lock().await.clone();
673 ran_ids.sort();
674 assert_eq!(
675 ran_ids,
676 vec!["a".to_string(), "b".to_string()],
677 "an unreadable (future-version) checkpoint is ignored → all steps re-run"
678 );
679 assert_eq!(out.len(), 2);
680 assert!(out.iter().all(|o| o.success));
681 }
682
683 #[tokio::test]
684 async fn resumable_ignores_checkpointed_steps_absent_from_new_specs() {
685 use crate::store::MemorySessionStore;
686 let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
687
688 let mut done = std::collections::HashMap::new();
692 done.insert("a".to_string(), cached("a", "explore", "cached-a"));
693 done.insert("b".to_string(), cached("b", "review", "cached-b"));
694 store
695 .save_workflow_checkpoint(
696 "wf-x",
697 &WorkflowCheckpoint::from_completed("wf-x", &done, 1),
698 )
699 .await
700 .unwrap();
701
702 let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
703 let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
704 ran: Arc::clone(&ran),
705 });
706 let specs = vec![
707 AgentStepSpec::new("b", "review", "d", "pb"),
708 AgentStepSpec::new("c", "plan", "d", "pc"),
709 ];
710 let out =
711 execute_steps_parallel_resumable(exec, specs, "wf-x", Arc::clone(&store), None).await;
712
713 assert_eq!(
714 *ran.lock().await,
715 vec!["c".to_string()],
716 "cached b reused, stale a dropped, only new c runs"
717 );
718 assert_eq!(out.len(), 2);
719 assert_eq!(out[0].task_id, "b");
720 assert_eq!(out[0].output, "cached-b");
721 assert_eq!(out[1].task_id, "c");
722 assert!(out.iter().all(|o| o.success));
723 }
724
725 #[tokio::test]
726 async fn loop_stops_when_predicate_says_stop() {
727 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
728 let mut rounds = 0;
729 let out = crate::orchestration::execute_loop(
730 exec,
731 vec![AgentStepSpec::new("r0", "explore", "d", "p")],
732 10,
733 None,
734 |outcomes| {
735 rounds += 1;
736 if rounds < 3 {
738 LoopDecision::Continue(vec![AgentStepSpec::new(
739 format!("r{rounds}"),
740 "explore",
741 "d",
742 outcomes[0].output.clone(),
743 )])
744 } else {
745 LoopDecision::Stop
746 }
747 },
748 )
749 .await;
750 assert_eq!(rounds, 3, "predicate saw exactly three rounds");
751 assert_eq!(out.len(), 1, "returns the last round's outcomes");
752 assert!(out[0].success);
753 }
754
755 #[tokio::test]
756 async fn loop_is_hard_capped_by_max_iterations() {
757 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
758 let mut rounds = 0;
759 let _ = crate::orchestration::execute_loop(
761 exec,
762 vec![AgentStepSpec::new("r", "explore", "d", "p")],
763 3,
764 None,
765 |_outcomes| {
766 rounds += 1;
767 LoopDecision::Continue(vec![AgentStepSpec::new("r", "explore", "d", "p")])
768 },
769 )
770 .await;
771 assert_eq!(
772 rounds, 3,
773 "max_iterations is a hard cap on a never-stopping predicate"
774 );
775 }
776
777 #[tokio::test]
778 async fn loop_with_empty_initial_runs_nothing() {
779 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
780 let mut called = false;
781 let out = crate::orchestration::execute_loop(exec, vec![], 5, None, |_| {
782 called = true;
783 LoopDecision::Stop
784 })
785 .await;
786 assert!(out.is_empty());
787 assert!(!called, "predicate is not invoked when there is no work");
788 }
789
790 #[tokio::test]
791 async fn loop_stops_when_predicate_requests_no_further_specs() {
792 let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
794 let mut rounds = 0;
795 let out = crate::orchestration::execute_loop(
796 exec,
797 vec![AgentStepSpec::new("r0", "explore", "d", "p")],
798 10,
799 None,
800 |_| {
801 rounds += 1;
802 LoopDecision::Continue(vec![]) },
804 )
805 .await;
806 assert_eq!(rounds, 1);
807 assert_eq!(out.len(), 1, "the completed round is still returned");
808 }
809}