1use crate::callable::{Callable, CallableInvoker, DynCallable};
29use crate::graph::CheckpointStore;
30use crate::kernel::ids::{StepId, StepSourceType};
31use crate::policy::LongRunningExecutionPolicy;
32use crate::runner::Runner;
33use crate::streaming::StreamEvent;
34use serde::{Deserialize, Serialize};
35use std::collections::VecDeque;
36use std::time::{Duration, Instant};
37
38type InvokerWorkItem = (String, String, u32, Option<StepId>, Option<String>);
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct DiscoveredStep {
45 #[serde(default)]
47 pub name: Option<String>,
48 pub input: String,
50 #[serde(default)]
52 pub reason: Option<String>,
53 #[serde(default)]
55 pub priority: u8,
56}
57
58impl DiscoveredStep {
59 pub fn new(input: impl Into<String>) -> Self {
61 Self {
62 name: None,
63 input: input.into(),
64 reason: None,
65 priority: 50,
66 }
67 }
68
69 pub fn with_name(mut self, name: impl Into<String>) -> Self {
71 self.name = Some(name.into());
72 self
73 }
74
75 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
77 self.reason = Some(reason.into());
78 self
79 }
80
81 pub fn with_priority(mut self, priority: u8) -> Self {
83 self.priority = priority;
84 self
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct DiscoveryOutput {
91 pub result: String,
93 pub discovered_steps: Vec<DiscoveredStep>,
95 pub is_complete: bool,
97}
98
99impl DiscoveryOutput {
100 pub fn parse(output: &str) -> Self {
102 if let Some(parsed) = Self::try_parse_json(output) {
104 return parsed;
105 }
106
107 let is_complete = output.ends_with("[DONE]")
109 || output.ends_with("[COMPLETE]")
110 || output.contains("\"status\": \"complete\"")
111 || output.contains("\"status\":\"complete\"");
112
113 let result = output
115 .trim_end_matches("[DONE]")
116 .trim_end_matches("[COMPLETE]")
117 .trim()
118 .to_string();
119
120 Self {
121 result,
122 discovered_steps: Vec::new(),
123 is_complete,
124 }
125 }
126
127 fn try_parse_json(output: &str) -> Option<Self> {
129 let trimmed = output.trim();
131
132 let json_str = if trimmed.starts_with('{') {
134 trimmed
135 } else if let Some(start) = trimmed.find("```json") {
136 let content_start = trimmed[start..].find('\n').map(|i| start + i + 1)?;
137 let content_end = trimmed[content_start..].find("```")?;
138 &trimmed[content_start..content_start + content_end]
139 } else if let Some(start) = trimmed.find('{') {
140 let end = trimmed.rfind('}')?;
142 if end > start {
143 &trimmed[start..=end]
144 } else {
145 return None;
146 }
147 } else {
148 return None;
149 };
150
151 #[derive(Deserialize)]
153 struct DiscoveryOutputJson {
154 #[serde(default)]
155 result: Option<String>,
156 #[serde(default)]
157 output: Option<String>,
158 #[serde(default)]
159 discovered_steps: Vec<DiscoveredStep>,
160 #[serde(default)]
161 status: Option<String>,
162 }
163
164 let parsed: DiscoveryOutputJson = serde_json::from_str(json_str).ok()?;
165
166 let result = parsed
167 .result
168 .or(parsed.output)
169 .unwrap_or_else(|| json_str.to_string());
170
171 let is_complete = parsed
172 .status
173 .map(|s| s == "complete" || s == "done")
174 .unwrap_or(false);
175
176 Some(Self {
177 result,
178 discovered_steps: parsed.discovered_steps,
179 is_complete,
180 })
181 }
182
183 pub fn has_pending_work(&self) -> bool {
185 !self.discovered_steps.is_empty() && !self.is_complete
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct AgenticLoopResult {
192 pub output: String,
194 pub steps_executed: u32,
196 pub discovered_steps_processed: u32,
198 pub max_depth_reached: u32,
200 pub completed: bool,
202 pub stop_reason: Option<String>,
204 pub history: Vec<String>,
206}
207
208pub struct AgenticLoop;
210
211impl AgenticLoop {
212 pub async fn run<S: CheckpointStore>(
220 runner: &mut Runner<S>,
221 callable: DynCallable,
222 input: String,
223 policy: LongRunningExecutionPolicy,
224 ) -> anyhow::Result<String> {
225 let result = Self::run_with_details(runner, callable, input, policy).await?;
226 Ok(result.output)
227 }
228
229 pub async fn run_with_details<S: CheckpointStore>(
231 runner: &mut Runner<S>,
232 callable: DynCallable,
233 input: String,
234 policy: LongRunningExecutionPolicy,
235 ) -> anyhow::Result<AgenticLoopResult> {
236 let start_time = Instant::now();
238 let mut steps_executed: u32 = 0;
239 let mut discovered_steps_processed: u32 = 0;
240 let mut max_depth_reached: u32 = 0;
241 let mut history: Vec<String> = Vec::new();
242
243 let mut work_queue: VecDeque<(String, u32, Option<StepId>, Option<String>)> =
245 VecDeque::new();
246 work_queue.push_back((input.clone(), 0, None, None));
247
248 let mut last_output = String::new();
250
251 history.push(format!("User: {}", input));
253
254 while let Some((current_input, depth, triggered_by, reason)) = work_queue.pop_front() {
256 if depth > max_depth_reached {
257 max_depth_reached = depth;
258 }
259
260 if let Some(max_steps) = policy.max_discovered_steps {
264 if steps_executed >= max_steps {
265 runner.emitter().emit(StreamEvent::execution_failed(
266 runner.execution_id(),
267 crate::kernel::ExecutionError::quota_exceeded(format!(
268 "Max discovered steps exceeded: {} >= {}",
269 steps_executed, max_steps
270 )),
271 ));
272 return Ok(AgenticLoopResult {
273 output: last_output,
274 steps_executed,
275 discovered_steps_processed,
276 max_depth_reached,
277 completed: false,
278 stop_reason: Some("max_discovered_steps".to_string()),
279 history,
280 });
281 }
282 }
283
284 if let Some(max_depth) = policy.max_discovery_depth {
286 if depth > max_depth {
287 runner.emitter().emit(StreamEvent::execution_failed(
288 runner.execution_id(),
289 crate::kernel::ExecutionError::quota_exceeded(format!(
290 "Max discovery depth exceeded: {} > {}",
291 depth, max_depth
292 )),
293 ));
294 return Ok(AgenticLoopResult {
295 output: last_output,
296 steps_executed,
297 discovered_steps_processed,
298 max_depth_reached,
299 completed: false,
300 stop_reason: Some("max_discovery_depth".to_string()),
301 history,
302 });
303 }
304 }
305
306 if let Some(timeout) = policy.idle_timeout_seconds {
308 if start_time.elapsed() > Duration::from_secs(timeout) {
309 runner.emitter().emit(StreamEvent::execution_failed(
310 runner.execution_id(),
311 crate::kernel::ExecutionError::timeout(format!(
312 "Idle timeout after {}s",
313 timeout
314 )),
315 ));
316 return Ok(AgenticLoopResult {
317 output: last_output,
318 steps_executed,
319 discovered_steps_processed,
320 max_depth_reached,
321 completed: false,
322 stop_reason: Some("idle_timeout".to_string()),
323 history,
324 });
325 }
326 }
327
328 let step_id = StepId::new();
330 if triggered_by.is_some() {
331 runner.emitter().emit(StreamEvent::step_discovered(
332 runner.execution_id(),
333 &step_id,
334 triggered_by.as_ref(),
335 StepSourceType::Discovered,
336 reason.as_deref().unwrap_or("Discovered by previous step"),
337 depth,
338 ));
339 discovered_steps_processed += 1;
340 }
341
342 let result = runner
344 .run_callable(callable.as_ref() as &dyn Callable, ¤t_input)
345 .await;
346
347 match result {
348 Ok(output) => {
349 steps_executed += 1;
350 history.push(format!("Assistant [depth={}]: {}", depth, &output));
351
352 let should_checkpoint = policy.checkpointing.on_discovery
354 || policy
355 .checkpointing
356 .interval_steps
357 .is_some_and(|i| steps_executed.is_multiple_of(i));
358
359 if should_checkpoint {
360 let state = crate::graph::NodeState::from_string(&output);
361 if let Err(e) = runner
362 .save_checkpoint(state, Some(callable.name()), Some(callable.name()))
363 .await
364 {
365 tracing::warn!("Failed to save checkpoint: {}", e);
367 }
368 }
369
370 let discovery = DiscoveryOutput::parse(&output);
372
373 last_output = discovery.result.clone();
375
376 if discovery.is_complete {
378 tracing::debug!(steps = steps_executed, "Callable signaled completion");
379 return Ok(AgenticLoopResult {
380 output: discovery.result,
381 steps_executed,
382 discovered_steps_processed,
383 max_depth_reached,
384 completed: true,
385 stop_reason: None,
386 history,
387 });
388 }
389
390 if !discovery.discovered_steps.is_empty() {
392 tracing::debug!(
393 count = discovery.discovered_steps.len(),
394 "Discovered new steps"
395 );
396
397 let mut sorted_steps = discovery.discovered_steps;
399 sorted_steps.sort_by(|a, b| b.priority.cmp(&a.priority));
400
401 for discovered in sorted_steps {
402 work_queue.push_back((
403 discovered.input,
404 depth + 1,
405 Some(step_id.clone()),
406 discovered.reason,
407 ));
408 }
409 }
410 }
411 Err(e) => {
412 runner.emitter().emit(StreamEvent::execution_failed(
414 runner.execution_id(),
415 crate::kernel::ExecutionError::kernel_internal(e.to_string()),
416 ));
417 return Err(e);
418 }
419 }
420 }
421
422 Ok(AgenticLoopResult {
424 output: last_output,
425 steps_executed,
426 discovered_steps_processed,
427 max_depth_reached,
428 completed: true,
429 stop_reason: None,
430 history,
431 })
432 }
433
434 pub async fn run_with_invoker<S: CheckpointStore>(
438 runner: &mut Runner<S>,
439 invoker: &CallableInvoker,
440 initial_callable_name: &str,
441 input: String,
442 policy: LongRunningExecutionPolicy,
443 ) -> anyhow::Result<AgenticLoopResult> {
444 let start_time = Instant::now();
446 let mut steps_executed: u32 = 0;
447 let mut discovered_steps_processed: u32 = 0;
448 let mut max_depth_reached: u32 = 0;
449 let mut history: Vec<String> = Vec::new();
450
451 let mut work_queue: VecDeque<InvokerWorkItem> = VecDeque::new();
453 work_queue.push_back((
454 initial_callable_name.to_string(),
455 input.clone(),
456 0,
457 None,
458 None,
459 ));
460
461 let mut last_output = String::new();
462 history.push(format!("User: {}", input));
463
464 while let Some((callable_name, current_input, depth, triggered_by, reason)) =
465 work_queue.pop_front()
466 {
467 if depth > max_depth_reached {
468 max_depth_reached = depth;
469 }
470
471 if let Some(max_steps) = policy.max_discovered_steps {
473 if steps_executed >= max_steps {
474 return Ok(AgenticLoopResult {
475 output: last_output,
476 steps_executed,
477 discovered_steps_processed,
478 max_depth_reached,
479 completed: false,
480 stop_reason: Some("max_discovered_steps".to_string()),
481 history,
482 });
483 }
484 }
485
486 if let Some(max_depth) = policy.max_discovery_depth {
487 if depth > max_depth {
488 return Ok(AgenticLoopResult {
489 output: last_output,
490 steps_executed,
491 discovered_steps_processed,
492 max_depth_reached,
493 completed: false,
494 stop_reason: Some("max_discovery_depth".to_string()),
495 history,
496 });
497 }
498 }
499
500 if let Some(timeout) = policy.idle_timeout_seconds {
501 if start_time.elapsed() > Duration::from_secs(timeout) {
502 return Ok(AgenticLoopResult {
503 output: last_output,
504 steps_executed,
505 discovered_steps_processed,
506 max_depth_reached,
507 completed: false,
508 stop_reason: Some("idle_timeout".to_string()),
509 history,
510 });
511 }
512 }
513
514 let step_id = StepId::new();
516 if triggered_by.is_some() {
517 runner.emitter().emit(StreamEvent::step_discovered(
518 runner.execution_id(),
519 &step_id,
520 triggered_by.as_ref(),
521 StepSourceType::Discovered,
522 reason.as_deref().unwrap_or("Discovered by previous step"),
523 depth,
524 ));
525 discovered_steps_processed += 1;
526 }
527
528 let callable = invoker.get(&callable_name).ok_or_else(|| {
530 anyhow::anyhow!("Callable '{}' not found in registry", callable_name)
531 })?;
532
533 let result = runner
535 .run_callable(callable.as_ref() as &dyn Callable, ¤t_input)
536 .await;
537
538 match result {
539 Ok(output) => {
540 steps_executed += 1;
541 history.push(format!("{}[depth={}]: {}", callable_name, depth, &output));
542
543 let should_checkpoint = policy.checkpointing.on_discovery
545 || policy
546 .checkpointing
547 .interval_steps
548 .is_some_and(|i| steps_executed.is_multiple_of(i));
549
550 if should_checkpoint {
551 let state = crate::graph::NodeState::from_string(&output);
552 if let Err(e) = runner
553 .save_checkpoint(state, Some(&callable_name), Some(&callable_name))
554 .await
555 {
556 tracing::warn!("Failed to save checkpoint: {}", e);
557 }
558 }
559
560 let discovery = DiscoveryOutput::parse(&output);
562 last_output = discovery.result.clone();
563
564 if discovery.is_complete {
565 return Ok(AgenticLoopResult {
566 output: discovery.result,
567 steps_executed,
568 discovered_steps_processed,
569 max_depth_reached,
570 completed: true,
571 stop_reason: None,
572 history,
573 });
574 }
575
576 let mut sorted_steps = discovery.discovered_steps;
578 sorted_steps.sort_by(|a, b| b.priority.cmp(&a.priority));
579
580 for discovered in sorted_steps {
581 let target_callable =
582 discovered.name.unwrap_or_else(|| callable_name.clone());
583 work_queue.push_back((
584 target_callable,
585 discovered.input,
586 depth + 1,
587 Some(step_id.clone()),
588 discovered.reason,
589 ));
590 }
591 }
592 Err(e) => {
593 runner.emitter().emit(StreamEvent::execution_failed(
594 runner.execution_id(),
595 crate::kernel::ExecutionError::kernel_internal(e.to_string()),
596 ));
597 return Err(e);
598 }
599 }
600 }
601
602 Ok(AgenticLoopResult {
603 output: last_output,
604 steps_executed,
605 discovered_steps_processed,
606 max_depth_reached,
607 completed: true,
608 stop_reason: None,
609 history,
610 })
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617 use crate::callable::Callable;
618 use crate::graph::InMemoryCheckpointStore;
619 use async_trait::async_trait;
620 use std::sync::atomic::{AtomicU32, Ordering};
621 use std::sync::Arc;
622
623 struct DiscoveryCallable {
625 name: String,
626 discover_count: AtomicU32,
628 }
629
630 impl DiscoveryCallable {
631 fn new(name: &str, discover_count: u32) -> Self {
632 Self {
633 name: name.to_string(),
634 discover_count: AtomicU32::new(discover_count),
635 }
636 }
637 }
638
639 #[async_trait]
640 impl Callable for DiscoveryCallable {
641 fn name(&self) -> &str {
642 &self.name
643 }
644
645 async fn run(&self, input: &str) -> anyhow::Result<String> {
646 let remaining = self.discover_count.fetch_sub(1, Ordering::SeqCst);
647
648 if remaining > 0 {
649 Ok(format!(
651 r#"{{
652 "result": "Processed: {}",
653 "discovered_steps": [
654 {{"input": "Follow-up task {}", "reason": "Discovered work"}}
655 ]
656 }}"#,
657 input, remaining
658 ))
659 } else {
660 Ok(format!("Final result for: {} [DONE]", input))
662 }
663 }
664 }
665
666 struct SingleShotCallable {
668 name: String,
669 }
670
671 impl SingleShotCallable {
672 fn new(name: &str) -> Self {
673 Self {
674 name: name.to_string(),
675 }
676 }
677 }
678
679 #[async_trait]
680 impl Callable for SingleShotCallable {
681 fn name(&self) -> &str {
682 &self.name
683 }
684
685 async fn run(&self, input: &str) -> anyhow::Result<String> {
686 Ok(format!("Processed: {}", input))
687 }
688 }
689
690 struct FailingCallable {
692 name: String,
693 fail_after: AtomicU32,
694 }
695
696 impl FailingCallable {
697 fn new(name: &str, fail_after: u32) -> Self {
698 Self {
699 name: name.to_string(),
700 fail_after: AtomicU32::new(fail_after),
701 }
702 }
703 }
704
705 #[async_trait]
706 impl Callable for FailingCallable {
707 fn name(&self) -> &str {
708 &self.name
709 }
710
711 async fn run(&self, input: &str) -> anyhow::Result<String> {
712 let remaining = self.fail_after.fetch_sub(1, Ordering::SeqCst);
713 if remaining > 0 {
714 Ok(format!(
715 r#"{{
716 "result": "Step {}",
717 "discovered_steps": [{{"input": "Next step"}}]
718 }}"#,
719 input
720 ))
721 } else {
722 anyhow::bail!("Simulated failure")
723 }
724 }
725 }
726
727 #[test]
730 fn test_parse_plain_output() {
731 let output = "Just a simple response";
732 let discovery = DiscoveryOutput::parse(output);
733
734 assert_eq!(discovery.result, "Just a simple response");
735 assert!(discovery.discovered_steps.is_empty());
736 assert!(!discovery.is_complete);
737 }
738
739 #[test]
740 fn test_parse_done_marker() {
741 let output = "Task completed successfully [DONE]";
742 let discovery = DiscoveryOutput::parse(output);
743
744 assert_eq!(discovery.result, "Task completed successfully");
745 assert!(discovery.discovered_steps.is_empty());
746 assert!(discovery.is_complete);
747 }
748
749 #[test]
750 fn test_parse_complete_marker() {
751 let output = "All work finished [COMPLETE]";
752 let discovery = DiscoveryOutput::parse(output);
753
754 assert_eq!(discovery.result, "All work finished");
755 assert!(discovery.is_complete);
756 }
757
758 #[test]
759 fn test_parse_json_with_discovered_steps() {
760 let output = r#"{
761 "result": "Analyzed the system",
762 "discovered_steps": [
763 {"input": "Refactor module A", "reason": "Code smell detected"},
764 {"input": "Add tests for B"}
765 ]
766 }"#;
767
768 let discovery = DiscoveryOutput::parse(output);
769
770 assert_eq!(discovery.result, "Analyzed the system");
771 assert_eq!(discovery.discovered_steps.len(), 2);
772 assert_eq!(discovery.discovered_steps[0].input, "Refactor module A");
773 assert_eq!(
774 discovery.discovered_steps[0].reason,
775 Some("Code smell detected".to_string())
776 );
777 assert_eq!(discovery.discovered_steps[1].input, "Add tests for B");
778 assert!(!discovery.is_complete);
779 }
780
781 #[test]
782 fn test_parse_json_with_status_complete() {
783 let output = r#"{"result": "Done", "status": "complete"}"#;
784 let discovery = DiscoveryOutput::parse(output);
785
786 assert_eq!(discovery.result, "Done");
787 assert!(discovery.is_complete);
788 }
789
790 #[test]
791 fn test_parse_json_embedded_in_text() {
792 let output = r#"Here is the analysis:
793 {
794 "result": "Found issues",
795 "discovered_steps": [{"input": "Fix issue 1"}]
796 }
797 End of response."#;
798
799 let discovery = DiscoveryOutput::parse(output);
800
801 assert!(!discovery.discovered_steps.is_empty());
803 assert_eq!(discovery.discovered_steps[0].input, "Fix issue 1");
804 }
805
806 #[test]
807 fn test_has_pending_work() {
808 let no_steps = DiscoveryOutput {
810 result: "test".to_string(),
811 discovered_steps: vec![],
812 is_complete: false,
813 };
814 assert!(!no_steps.has_pending_work());
815
816 let complete = DiscoveryOutput {
818 result: "test".to_string(),
819 discovered_steps: vec![DiscoveredStep::new("task")],
820 is_complete: true,
821 };
822 assert!(!complete.has_pending_work());
823
824 let pending = DiscoveryOutput {
826 result: "test".to_string(),
827 discovered_steps: vec![DiscoveredStep::new("task")],
828 is_complete: false,
829 };
830 assert!(pending.has_pending_work());
831 }
832
833 #[test]
836 fn test_discovered_step_builder() {
837 let step = DiscoveredStep::new("Process data")
838 .with_name("data-processor")
839 .with_reason("Data needs processing")
840 .with_priority(80);
841
842 assert_eq!(step.input, "Process data");
843 assert_eq!(step.name, Some("data-processor".to_string()));
844 assert_eq!(step.reason, Some("Data needs processing".to_string()));
845 assert_eq!(step.priority, 80);
846 }
847
848 #[tokio::test]
851 async fn test_single_shot_execution() {
852 let store = Arc::new(InMemoryCheckpointStore::new());
853 let mut runner = Runner::new(store);
854 let callable: DynCallable = Arc::new(SingleShotCallable::new("single"));
855 let policy = LongRunningExecutionPolicy::standard();
856
857 let result = AgenticLoop::run(&mut runner, callable, "test input".to_string(), policy)
858 .await
859 .unwrap();
860
861 assert!(result.contains("Processed: test input"));
862 }
863
864 #[tokio::test]
865 async fn test_discovery_loop_multiple_steps() {
866 let store = Arc::new(InMemoryCheckpointStore::new());
867 let mut runner = Runner::new(store);
868 let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 3));
870 let policy = LongRunningExecutionPolicy::standard();
871
872 let result = AgenticLoop::run_with_details(
873 &mut runner,
874 callable,
875 "initial task".to_string(),
876 policy,
877 )
878 .await
879 .unwrap();
880
881 assert!(result.steps_executed >= 3);
883 assert!(result.completed);
884 assert!(result.stop_reason.is_none());
885 }
886
887 #[tokio::test]
888 async fn test_max_steps_limit() {
889 let store = Arc::new(InMemoryCheckpointStore::new());
890 let mut runner = Runner::new(store);
891 let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 100));
893
894 let policy = LongRunningExecutionPolicy {
895 max_discovered_steps: Some(5),
896 ..LongRunningExecutionPolicy::standard()
897 };
898
899 let result =
900 AgenticLoop::run_with_details(&mut runner, callable, "task".to_string(), policy)
901 .await
902 .unwrap();
903
904 assert!(result.steps_executed <= 5);
906 assert!(!result.completed);
907 assert_eq!(result.stop_reason, Some("max_discovered_steps".to_string()));
908 }
909
910 #[tokio::test]
911 async fn test_max_depth_limit() {
912 let store = Arc::new(InMemoryCheckpointStore::new());
913 let mut runner = Runner::new(store);
914 let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 20));
916
917 let policy = LongRunningExecutionPolicy {
918 max_discovery_depth: Some(3),
919 max_discovered_steps: Some(100), ..LongRunningExecutionPolicy::standard()
921 };
922
923 let result =
924 AgenticLoop::run_with_details(&mut runner, callable, "task".to_string(), policy)
925 .await
926 .unwrap();
927
928 assert!(result.max_depth_reached <= 4); assert!(!result.completed);
931 assert_eq!(result.stop_reason, Some("max_discovery_depth".to_string()));
932 }
933
934 #[tokio::test]
935 async fn test_error_propagation() {
936 let store = Arc::new(InMemoryCheckpointStore::new());
937 let mut runner = Runner::new(store);
938 let callable: DynCallable = Arc::new(FailingCallable::new("failing", 2));
940 let policy = LongRunningExecutionPolicy::standard();
941
942 let result = AgenticLoop::run(&mut runner, callable, "task".to_string(), policy).await;
943
944 assert!(result.is_err());
945 assert!(result
946 .unwrap_err()
947 .to_string()
948 .contains("Simulated failure"));
949 }
950
951 #[tokio::test]
952 async fn test_history_tracking() {
953 let store = Arc::new(InMemoryCheckpointStore::new());
954 let mut runner = Runner::new(store);
955 let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 2));
956 let policy = LongRunningExecutionPolicy::standard();
957
958 let result =
959 AgenticLoop::run_with_details(&mut runner, callable, "start".to_string(), policy)
960 .await
961 .unwrap();
962
963 assert!(!result.history.is_empty());
965 assert!(result.history[0].contains("User: start"));
966 }
967
968 #[tokio::test]
969 async fn test_priority_ordering() {
970 let output = r#"{
973 "result": "test",
974 "discovered_steps": [
975 {"input": "low priority", "priority": 10},
976 {"input": "high priority", "priority": 90},
977 {"input": "medium priority", "priority": 50}
978 ]
979 }"#;
980
981 let discovery = DiscoveryOutput::parse(output);
982 let mut sorted = discovery.discovered_steps;
983 sorted.sort_by(|a, b| b.priority.cmp(&a.priority));
984
985 assert_eq!(sorted[0].input, "high priority");
986 assert_eq!(sorted[1].input, "medium priority");
987 assert_eq!(sorted[2].input, "low priority");
988 }
989
990 #[tokio::test]
991 async fn test_checkpointing_on_discovery() {
992 let store = Arc::new(InMemoryCheckpointStore::new());
993 let mut runner = Runner::new(store.clone());
994 let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 2));
995
996 let policy = LongRunningExecutionPolicy {
997 checkpointing: crate::policy::CheckpointPolicy {
998 on_discovery: true,
999 ..Default::default()
1000 },
1001 ..LongRunningExecutionPolicy::standard()
1002 };
1003
1004 let _ = AgenticLoop::run(&mut runner, callable, "task".to_string(), policy).await;
1005
1006 let checkpoint = store
1008 .load_latest(runner.execution_id().as_str())
1009 .await
1010 .unwrap();
1011 assert!(checkpoint.is_some());
1012 }
1013}