Skip to main content

enact_core/runner/
loop.rs

1//! Agentic Loop implementation
2//!
3//! The core loop that drives long-running executions.
4//! It handles the "Discovery -> Execute -> Repeat" cycle.
5//!
6//! ## Discovery Loop Protocol
7//!
8//! Callables can signal discovered work through special markers in their output:
9//!
10//! 1. **JSON-based discovery**: Output contains a JSON object with `discovered_steps`:
11//!    ```json
12//!    {
13//!      "result": "Analyzed the codebase...",
14//!      "discovered_steps": [
15//!        {"name": "refactor-module", "input": "Refactor auth module", "reason": "Found code duplication"},
16//!        {"name": "add-tests", "input": "Add tests for auth", "reason": "Missing test coverage"}
17//!      ]
18//!    }
19//!    ```
20//!
21//! 2. **Completion signal**: Output ends with `[DONE]` or contains `"status": "complete"`.
22//!
23//! The loop continues until:
24//! - No more discovered steps
25//! - Completion signal received
26//! - Limits exceeded (max steps, timeout, cost)
27
28use crate::callable::{Callable, CallableInvoker, DynCallable};
29use crate::graph::CheckpointStore;
30use crate::kernel::ids::{StepId, StepSourceType};
31use crate::policy::LongRunningExecutionPolicy;
32use crate::runner::Runner;
33use crate::streaming::StreamEvent;
34use serde::{Deserialize, Serialize};
35use std::collections::VecDeque;
36use std::time::{Duration, Instant};
37
38/// Work queue item for invoker-based discovery loop
39/// (callable_name, input, depth, triggering_step_id, reason)
40type InvokerWorkItem = (String, String, u32, Option<StepId>, Option<String>);
41
42/// A discovered step parsed from callable output
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct DiscoveredStep {
45    /// Name of the callable to invoke (or use current if None)
46    #[serde(default)]
47    pub name: Option<String>,
48    /// Input to pass to the callable
49    pub input: String,
50    /// Human-readable reason for discovering this step
51    #[serde(default)]
52    pub reason: Option<String>,
53    /// Priority (higher = execute sooner)
54    #[serde(default)]
55    pub priority: u8,
56}
57
58impl DiscoveredStep {
59    /// Create a new discovered step
60    pub fn new(input: impl Into<String>) -> Self {
61        Self {
62            name: None,
63            input: input.into(),
64            reason: None,
65            priority: 50,
66        }
67    }
68
69    /// Set the callable name for this step
70    pub fn with_name(mut self, name: impl Into<String>) -> Self {
71        self.name = Some(name.into());
72        self
73    }
74
75    /// Set the reason for discovery
76    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
77        self.reason = Some(reason.into());
78        self
79    }
80
81    /// Set priority
82    pub fn with_priority(mut self, priority: u8) -> Self {
83        self.priority = priority;
84        self
85    }
86}
87
88/// Result of parsing callable output for discovery signals
89#[derive(Debug, Clone)]
90pub struct DiscoveryOutput {
91    /// The actual result/content from the callable
92    pub result: String,
93    /// Any discovered steps to execute
94    pub discovered_steps: Vec<DiscoveredStep>,
95    /// Whether the callable explicitly signaled completion
96    pub is_complete: bool,
97}
98
99impl DiscoveryOutput {
100    /// Parse callable output to extract discovery information
101    pub fn parse(output: &str) -> Self {
102        // Try JSON-based discovery format first
103        if let Some(parsed) = Self::try_parse_json(output) {
104            return parsed;
105        }
106
107        // Check for completion markers
108        let is_complete = output.ends_with("[DONE]")
109            || output.ends_with("[COMPLETE]")
110            || output.contains("\"status\": \"complete\"")
111            || output.contains("\"status\":\"complete\"");
112
113        // Clean output by removing completion markers
114        let result = output
115            .trim_end_matches("[DONE]")
116            .trim_end_matches("[COMPLETE]")
117            .trim()
118            .to_string();
119
120        Self {
121            result,
122            discovered_steps: Vec::new(),
123            is_complete,
124        }
125    }
126
127    /// Try to parse JSON-based discovery format
128    fn try_parse_json(output: &str) -> Option<Self> {
129        // Look for JSON object in output
130        let trimmed = output.trim();
131
132        // Try to find a JSON block
133        let json_str = if trimmed.starts_with('{') {
134            trimmed
135        } else if let Some(start) = trimmed.find("```json") {
136            let content_start = trimmed[start..].find('\n').map(|i| start + i + 1)?;
137            let content_end = trimmed[content_start..].find("```")?;
138            &trimmed[content_start..content_start + content_end]
139        } else if let Some(start) = trimmed.find('{') {
140            // Try to extract JSON object
141            let end = trimmed.rfind('}')?;
142            if end > start {
143                &trimmed[start..=end]
144            } else {
145                return None;
146            }
147        } else {
148            return None;
149        };
150
151        // Try to parse as DiscoveryOutputJson
152        #[derive(Deserialize)]
153        struct DiscoveryOutputJson {
154            #[serde(default)]
155            result: Option<String>,
156            #[serde(default)]
157            output: Option<String>,
158            #[serde(default)]
159            discovered_steps: Vec<DiscoveredStep>,
160            #[serde(default)]
161            status: Option<String>,
162        }
163
164        let parsed: DiscoveryOutputJson = serde_json::from_str(json_str).ok()?;
165
166        let result = parsed
167            .result
168            .or(parsed.output)
169            .unwrap_or_else(|| json_str.to_string());
170
171        let is_complete = parsed
172            .status
173            .map(|s| s == "complete" || s == "done")
174            .unwrap_or(false);
175
176        Some(Self {
177            result,
178            discovered_steps: parsed.discovered_steps,
179            is_complete,
180        })
181    }
182
183    /// Check if there are more steps to execute
184    pub fn has_pending_work(&self) -> bool {
185        !self.discovered_steps.is_empty() && !self.is_complete
186    }
187}
188
189/// Result of agentic loop execution
190#[derive(Debug, Clone)]
191pub struct AgenticLoopResult {
192    /// Final output from the execution
193    pub output: String,
194    /// Total number of steps executed
195    pub steps_executed: u32,
196    /// Total discovered steps processed
197    pub discovered_steps_processed: u32,
198    /// Maximum discovery depth reached
199    pub max_depth_reached: u32,
200    /// Whether execution completed normally
201    pub completed: bool,
202    /// Reason for stopping (if not completed)
203    pub stop_reason: Option<String>,
204    /// Execution history (step outputs)
205    pub history: Vec<String>,
206}
207
208/// The agentic loop driver
209pub struct AgenticLoop;
210
211impl AgenticLoop {
212    /// Run the agentic loop for a callable
213    ///
214    /// This drives the discovery-execute-repeat cycle for long-running executions.
215    /// The loop continues until:
216    /// - The callable signals completion (no more discovered steps)
217    /// - Policy limits are exceeded
218    /// - An error occurs
219    pub async fn run<S: CheckpointStore>(
220        runner: &mut Runner<S>,
221        callable: DynCallable,
222        input: String,
223        policy: LongRunningExecutionPolicy,
224    ) -> anyhow::Result<String> {
225        let result = Self::run_with_details(runner, callable, input, policy).await?;
226        Ok(result.output)
227    }
228
229    /// Run the agentic loop with detailed result
230    pub async fn run_with_details<S: CheckpointStore>(
231        runner: &mut Runner<S>,
232        callable: DynCallable,
233        input: String,
234        policy: LongRunningExecutionPolicy,
235    ) -> anyhow::Result<AgenticLoopResult> {
236        // Initial setup
237        let start_time = Instant::now();
238        let mut steps_executed: u32 = 0;
239        let mut discovered_steps_processed: u32 = 0;
240        let mut max_depth_reached: u32 = 0;
241        let mut history: Vec<String> = Vec::new();
242
243        // Work queue: (input, depth, triggering_step_id, reason)
244        let mut work_queue: VecDeque<(String, u32, Option<StepId>, Option<String>)> =
245            VecDeque::new();
246        work_queue.push_back((input.clone(), 0, None, None));
247
248        // Track the last output for the final result
249        let mut last_output = String::new();
250
251        // Push initial input to history
252        history.push(format!("User: {}", input));
253
254        // Main discovery loop
255        while let Some((current_input, depth, triggered_by, reason)) = work_queue.pop_front() {
256            if depth > max_depth_reached {
257                max_depth_reached = depth;
258            }
259
260            // === 1. Check Policy Limits ===
261
262            // Check max discovered steps
263            if let Some(max_steps) = policy.max_discovered_steps {
264                if steps_executed >= max_steps {
265                    runner.emitter().emit(StreamEvent::execution_failed(
266                        runner.execution_id(),
267                        crate::kernel::ExecutionError::quota_exceeded(format!(
268                            "Max discovered steps exceeded: {} >= {}",
269                            steps_executed, max_steps
270                        )),
271                    ));
272                    return Ok(AgenticLoopResult {
273                        output: last_output,
274                        steps_executed,
275                        discovered_steps_processed,
276                        max_depth_reached,
277                        completed: false,
278                        stop_reason: Some("max_discovered_steps".to_string()),
279                        history,
280                    });
281                }
282            }
283
284            // Check max discovery depth
285            if let Some(max_depth) = policy.max_discovery_depth {
286                if depth > max_depth {
287                    runner.emitter().emit(StreamEvent::execution_failed(
288                        runner.execution_id(),
289                        crate::kernel::ExecutionError::quota_exceeded(format!(
290                            "Max discovery depth exceeded: {} > {}",
291                            depth, max_depth
292                        )),
293                    ));
294                    return Ok(AgenticLoopResult {
295                        output: last_output,
296                        steps_executed,
297                        discovered_steps_processed,
298                        max_depth_reached,
299                        completed: false,
300                        stop_reason: Some("max_discovery_depth".to_string()),
301                        history,
302                    });
303                }
304            }
305
306            // Check idle timeout
307            if let Some(timeout) = policy.idle_timeout_seconds {
308                if start_time.elapsed() > Duration::from_secs(timeout) {
309                    runner.emitter().emit(StreamEvent::execution_failed(
310                        runner.execution_id(),
311                        crate::kernel::ExecutionError::timeout(format!(
312                            "Idle timeout after {}s",
313                            timeout
314                        )),
315                    ));
316                    return Ok(AgenticLoopResult {
317                        output: last_output,
318                        steps_executed,
319                        discovered_steps_processed,
320                        max_depth_reached,
321                        completed: false,
322                        stop_reason: Some("idle_timeout".to_string()),
323                        history,
324                    });
325                }
326            }
327
328            // === 2. Emit Step Discovered Event (for discovered steps) ===
329            let step_id = StepId::new();
330            if triggered_by.is_some() {
331                runner.emitter().emit(StreamEvent::step_discovered(
332                    runner.execution_id(),
333                    &step_id,
334                    triggered_by.as_ref(),
335                    StepSourceType::Discovered,
336                    reason.as_deref().unwrap_or("Discovered by previous step"),
337                    depth,
338                ));
339                discovered_steps_processed += 1;
340            }
341
342            // === 3. Execute Step ===
343            let result = runner
344                .run_callable(callable.as_ref() as &dyn Callable, &current_input)
345                .await;
346
347            match result {
348                Ok(output) => {
349                    steps_executed += 1;
350                    history.push(format!("Assistant [depth={}]: {}", depth, &output));
351
352                    // === 4. Checkpoint if policy requires ===
353                    let should_checkpoint = policy.checkpointing.on_discovery
354                        || policy
355                            .checkpointing
356                            .interval_steps
357                            .is_some_and(|i| steps_executed.is_multiple_of(i));
358
359                    if should_checkpoint {
360                        let state = crate::graph::NodeState::from_string(&output);
361                        if let Err(e) = runner
362                            .save_checkpoint(state, Some(callable.name()), Some(callable.name()))
363                            .await
364                        {
365                            // Log warning but continue
366                            tracing::warn!("Failed to save checkpoint: {}", e);
367                        }
368                    }
369
370                    // === 5. Parse Output for Discovery ===
371                    let discovery = DiscoveryOutput::parse(&output);
372
373                    // Update last_output to the cleaned result
374                    last_output = discovery.result.clone();
375
376                    // Check for explicit completion
377                    if discovery.is_complete {
378                        tracing::debug!(steps = steps_executed, "Callable signaled completion");
379                        return Ok(AgenticLoopResult {
380                            output: discovery.result,
381                            steps_executed,
382                            discovered_steps_processed,
383                            max_depth_reached,
384                            completed: true,
385                            stop_reason: None,
386                            history,
387                        });
388                    }
389
390                    // === 6. Queue Discovered Steps ===
391                    if !discovery.discovered_steps.is_empty() {
392                        tracing::debug!(
393                            count = discovery.discovered_steps.len(),
394                            "Discovered new steps"
395                        );
396
397                        // Sort by priority (higher first)
398                        let mut sorted_steps = discovery.discovered_steps;
399                        sorted_steps.sort_by(|a, b| b.priority.cmp(&a.priority));
400
401                        for discovered in sorted_steps {
402                            work_queue.push_back((
403                                discovered.input,
404                                depth + 1,
405                                Some(step_id.clone()),
406                                discovered.reason,
407                            ));
408                        }
409                    }
410                }
411                Err(e) => {
412                    // Step failed - emit error and stop
413                    runner.emitter().emit(StreamEvent::execution_failed(
414                        runner.execution_id(),
415                        crate::kernel::ExecutionError::kernel_internal(e.to_string()),
416                    ));
417                    return Err(e);
418                }
419            }
420        }
421
422        // Work queue exhausted - execution complete
423        Ok(AgenticLoopResult {
424            output: last_output,
425            steps_executed,
426            discovered_steps_processed,
427            max_depth_reached,
428            completed: true,
429            stop_reason: None,
430            history,
431        })
432    }
433
434    /// Run with a callable invoker for dynamic callable lookup
435    ///
436    /// This variant allows discovered steps to specify different callables by name.
437    pub async fn run_with_invoker<S: CheckpointStore>(
438        runner: &mut Runner<S>,
439        invoker: &CallableInvoker,
440        initial_callable_name: &str,
441        input: String,
442        policy: LongRunningExecutionPolicy,
443    ) -> anyhow::Result<AgenticLoopResult> {
444        // Initial setup
445        let start_time = Instant::now();
446        let mut steps_executed: u32 = 0;
447        let mut discovered_steps_processed: u32 = 0;
448        let mut max_depth_reached: u32 = 0;
449        let mut history: Vec<String> = Vec::new();
450
451        // Work queue for discovered steps
452        let mut work_queue: VecDeque<InvokerWorkItem> = VecDeque::new();
453        work_queue.push_back((
454            initial_callable_name.to_string(),
455            input.clone(),
456            0,
457            None,
458            None,
459        ));
460
461        let mut last_output = String::new();
462        history.push(format!("User: {}", input));
463
464        while let Some((callable_name, current_input, depth, triggered_by, reason)) =
465            work_queue.pop_front()
466        {
467            if depth > max_depth_reached {
468                max_depth_reached = depth;
469            }
470
471            // Check policy limits
472            if let Some(max_steps) = policy.max_discovered_steps {
473                if steps_executed >= max_steps {
474                    return Ok(AgenticLoopResult {
475                        output: last_output,
476                        steps_executed,
477                        discovered_steps_processed,
478                        max_depth_reached,
479                        completed: false,
480                        stop_reason: Some("max_discovered_steps".to_string()),
481                        history,
482                    });
483                }
484            }
485
486            if let Some(max_depth) = policy.max_discovery_depth {
487                if depth > max_depth {
488                    return Ok(AgenticLoopResult {
489                        output: last_output,
490                        steps_executed,
491                        discovered_steps_processed,
492                        max_depth_reached,
493                        completed: false,
494                        stop_reason: Some("max_discovery_depth".to_string()),
495                        history,
496                    });
497                }
498            }
499
500            if let Some(timeout) = policy.idle_timeout_seconds {
501                if start_time.elapsed() > Duration::from_secs(timeout) {
502                    return Ok(AgenticLoopResult {
503                        output: last_output,
504                        steps_executed,
505                        discovered_steps_processed,
506                        max_depth_reached,
507                        completed: false,
508                        stop_reason: Some("idle_timeout".to_string()),
509                        history,
510                    });
511                }
512            }
513
514            // Emit step discovered event
515            let step_id = StepId::new();
516            if triggered_by.is_some() {
517                runner.emitter().emit(StreamEvent::step_discovered(
518                    runner.execution_id(),
519                    &step_id,
520                    triggered_by.as_ref(),
521                    StepSourceType::Discovered,
522                    reason.as_deref().unwrap_or("Discovered by previous step"),
523                    depth,
524                ));
525                discovered_steps_processed += 1;
526            }
527
528            // Get the callable
529            let callable = invoker.get(&callable_name).ok_or_else(|| {
530                anyhow::anyhow!("Callable '{}' not found in registry", callable_name)
531            })?;
532
533            // Execute
534            let result = runner
535                .run_callable(callable.as_ref() as &dyn Callable, &current_input)
536                .await;
537
538            match result {
539                Ok(output) => {
540                    steps_executed += 1;
541                    history.push(format!("{}[depth={}]: {}", callable_name, depth, &output));
542
543                    // Checkpoint if needed
544                    let should_checkpoint = policy.checkpointing.on_discovery
545                        || policy
546                            .checkpointing
547                            .interval_steps
548                            .is_some_and(|i| steps_executed.is_multiple_of(i));
549
550                    if should_checkpoint {
551                        let state = crate::graph::NodeState::from_string(&output);
552                        if let Err(e) = runner
553                            .save_checkpoint(state, Some(&callable_name), Some(&callable_name))
554                            .await
555                        {
556                            tracing::warn!("Failed to save checkpoint: {}", e);
557                        }
558                    }
559
560                    // Parse for discovery
561                    let discovery = DiscoveryOutput::parse(&output);
562                    last_output = discovery.result.clone();
563
564                    if discovery.is_complete {
565                        return Ok(AgenticLoopResult {
566                            output: discovery.result,
567                            steps_executed,
568                            discovered_steps_processed,
569                            max_depth_reached,
570                            completed: true,
571                            stop_reason: None,
572                            history,
573                        });
574                    }
575
576                    // Queue discovered steps
577                    let mut sorted_steps = discovery.discovered_steps;
578                    sorted_steps.sort_by(|a, b| b.priority.cmp(&a.priority));
579
580                    for discovered in sorted_steps {
581                        let target_callable =
582                            discovered.name.unwrap_or_else(|| callable_name.clone());
583                        work_queue.push_back((
584                            target_callable,
585                            discovered.input,
586                            depth + 1,
587                            Some(step_id.clone()),
588                            discovered.reason,
589                        ));
590                    }
591                }
592                Err(e) => {
593                    runner.emitter().emit(StreamEvent::execution_failed(
594                        runner.execution_id(),
595                        crate::kernel::ExecutionError::kernel_internal(e.to_string()),
596                    ));
597                    return Err(e);
598                }
599            }
600        }
601
602        Ok(AgenticLoopResult {
603            output: last_output,
604            steps_executed,
605            discovered_steps_processed,
606            max_depth_reached,
607            completed: true,
608            stop_reason: None,
609            history,
610        })
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617    use crate::callable::Callable;
618    use crate::graph::InMemoryCheckpointStore;
619    use async_trait::async_trait;
620    use std::sync::atomic::{AtomicU32, Ordering};
621    use std::sync::Arc;
622
623    /// Mock callable that returns discovery steps
624    struct DiscoveryCallable {
625        name: String,
626        /// Number of times to discover new steps before completing
627        discover_count: AtomicU32,
628    }
629
630    impl DiscoveryCallable {
631        fn new(name: &str, discover_count: u32) -> Self {
632            Self {
633                name: name.to_string(),
634                discover_count: AtomicU32::new(discover_count),
635            }
636        }
637    }
638
639    #[async_trait]
640    impl Callable for DiscoveryCallable {
641        fn name(&self) -> &str {
642            &self.name
643        }
644
645        async fn run(&self, input: &str) -> anyhow::Result<String> {
646            let remaining = self.discover_count.fetch_sub(1, Ordering::SeqCst);
647
648            if remaining > 0 {
649                // Return with discovered steps
650                Ok(format!(
651                    r#"{{
652                        "result": "Processed: {}",
653                        "discovered_steps": [
654                            {{"input": "Follow-up task {}", "reason": "Discovered work"}}
655                        ]
656                    }}"#,
657                    input, remaining
658                ))
659            } else {
660                // Complete
661                Ok(format!("Final result for: {} [DONE]", input))
662            }
663        }
664    }
665
666    /// Mock callable that never discovers steps (single-shot)
667    struct SingleShotCallable {
668        name: String,
669    }
670
671    impl SingleShotCallable {
672        fn new(name: &str) -> Self {
673            Self {
674                name: name.to_string(),
675            }
676        }
677    }
678
679    #[async_trait]
680    impl Callable for SingleShotCallable {
681        fn name(&self) -> &str {
682            &self.name
683        }
684
685        async fn run(&self, input: &str) -> anyhow::Result<String> {
686            Ok(format!("Processed: {}", input))
687        }
688    }
689
690    /// Mock callable that fails after N calls
691    struct FailingCallable {
692        name: String,
693        fail_after: AtomicU32,
694    }
695
696    impl FailingCallable {
697        fn new(name: &str, fail_after: u32) -> Self {
698            Self {
699                name: name.to_string(),
700                fail_after: AtomicU32::new(fail_after),
701            }
702        }
703    }
704
705    #[async_trait]
706    impl Callable for FailingCallable {
707        fn name(&self) -> &str {
708            &self.name
709        }
710
711        async fn run(&self, input: &str) -> anyhow::Result<String> {
712            let remaining = self.fail_after.fetch_sub(1, Ordering::SeqCst);
713            if remaining > 0 {
714                Ok(format!(
715                    r#"{{
716                        "result": "Step {}",
717                        "discovered_steps": [{{"input": "Next step"}}]
718                    }}"#,
719                    input
720                ))
721            } else {
722                anyhow::bail!("Simulated failure")
723            }
724        }
725    }
726
727    // ============ DiscoveryOutput Parsing Tests ============
728
729    #[test]
730    fn test_parse_plain_output() {
731        let output = "Just a simple response";
732        let discovery = DiscoveryOutput::parse(output);
733
734        assert_eq!(discovery.result, "Just a simple response");
735        assert!(discovery.discovered_steps.is_empty());
736        assert!(!discovery.is_complete);
737    }
738
739    #[test]
740    fn test_parse_done_marker() {
741        let output = "Task completed successfully [DONE]";
742        let discovery = DiscoveryOutput::parse(output);
743
744        assert_eq!(discovery.result, "Task completed successfully");
745        assert!(discovery.discovered_steps.is_empty());
746        assert!(discovery.is_complete);
747    }
748
749    #[test]
750    fn test_parse_complete_marker() {
751        let output = "All work finished [COMPLETE]";
752        let discovery = DiscoveryOutput::parse(output);
753
754        assert_eq!(discovery.result, "All work finished");
755        assert!(discovery.is_complete);
756    }
757
758    #[test]
759    fn test_parse_json_with_discovered_steps() {
760        let output = r#"{
761            "result": "Analyzed the system",
762            "discovered_steps": [
763                {"input": "Refactor module A", "reason": "Code smell detected"},
764                {"input": "Add tests for B"}
765            ]
766        }"#;
767
768        let discovery = DiscoveryOutput::parse(output);
769
770        assert_eq!(discovery.result, "Analyzed the system");
771        assert_eq!(discovery.discovered_steps.len(), 2);
772        assert_eq!(discovery.discovered_steps[0].input, "Refactor module A");
773        assert_eq!(
774            discovery.discovered_steps[0].reason,
775            Some("Code smell detected".to_string())
776        );
777        assert_eq!(discovery.discovered_steps[1].input, "Add tests for B");
778        assert!(!discovery.is_complete);
779    }
780
781    #[test]
782    fn test_parse_json_with_status_complete() {
783        let output = r#"{"result": "Done", "status": "complete"}"#;
784        let discovery = DiscoveryOutput::parse(output);
785
786        assert_eq!(discovery.result, "Done");
787        assert!(discovery.is_complete);
788    }
789
790    #[test]
791    fn test_parse_json_embedded_in_text() {
792        let output = r#"Here is the analysis:
793        {
794            "result": "Found issues",
795            "discovered_steps": [{"input": "Fix issue 1"}]
796        }
797        End of response."#;
798
799        let discovery = DiscoveryOutput::parse(output);
800
801        // Should extract the JSON
802        assert!(!discovery.discovered_steps.is_empty());
803        assert_eq!(discovery.discovered_steps[0].input, "Fix issue 1");
804    }
805
806    #[test]
807    fn test_has_pending_work() {
808        // No steps = no pending work
809        let no_steps = DiscoveryOutput {
810            result: "test".to_string(),
811            discovered_steps: vec![],
812            is_complete: false,
813        };
814        assert!(!no_steps.has_pending_work());
815
816        // Steps but complete = no pending work
817        let complete = DiscoveryOutput {
818            result: "test".to_string(),
819            discovered_steps: vec![DiscoveredStep::new("task")],
820            is_complete: true,
821        };
822        assert!(!complete.has_pending_work());
823
824        // Steps and not complete = pending work
825        let pending = DiscoveryOutput {
826            result: "test".to_string(),
827            discovered_steps: vec![DiscoveredStep::new("task")],
828            is_complete: false,
829        };
830        assert!(pending.has_pending_work());
831    }
832
833    // ============ DiscoveredStep Tests ============
834
835    #[test]
836    fn test_discovered_step_builder() {
837        let step = DiscoveredStep::new("Process data")
838            .with_name("data-processor")
839            .with_reason("Data needs processing")
840            .with_priority(80);
841
842        assert_eq!(step.input, "Process data");
843        assert_eq!(step.name, Some("data-processor".to_string()));
844        assert_eq!(step.reason, Some("Data needs processing".to_string()));
845        assert_eq!(step.priority, 80);
846    }
847
848    // ============ AgenticLoop Tests ============
849
850    #[tokio::test]
851    async fn test_single_shot_execution() {
852        let store = Arc::new(InMemoryCheckpointStore::new());
853        let mut runner = Runner::new(store);
854        let callable: DynCallable = Arc::new(SingleShotCallable::new("single"));
855        let policy = LongRunningExecutionPolicy::standard();
856
857        let result = AgenticLoop::run(&mut runner, callable, "test input".to_string(), policy)
858            .await
859            .unwrap();
860
861        assert!(result.contains("Processed: test input"));
862    }
863
864    #[tokio::test]
865    async fn test_discovery_loop_multiple_steps() {
866        let store = Arc::new(InMemoryCheckpointStore::new());
867        let mut runner = Runner::new(store);
868        // Will discover 3 steps before completing
869        let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 3));
870        let policy = LongRunningExecutionPolicy::standard();
871
872        let result = AgenticLoop::run_with_details(
873            &mut runner,
874            callable,
875            "initial task".to_string(),
876            policy,
877        )
878        .await
879        .unwrap();
880
881        // Should have executed multiple steps
882        assert!(result.steps_executed >= 3);
883        assert!(result.completed);
884        assert!(result.stop_reason.is_none());
885    }
886
887    #[tokio::test]
888    async fn test_max_steps_limit() {
889        let store = Arc::new(InMemoryCheckpointStore::new());
890        let mut runner = Runner::new(store);
891        // Will try to discover 100 steps (more than limit)
892        let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 100));
893
894        let policy = LongRunningExecutionPolicy {
895            max_discovered_steps: Some(5),
896            ..LongRunningExecutionPolicy::standard()
897        };
898
899        let result =
900            AgenticLoop::run_with_details(&mut runner, callable, "task".to_string(), policy)
901                .await
902                .unwrap();
903
904        // Should stop at limit
905        assert!(result.steps_executed <= 5);
906        assert!(!result.completed);
907        assert_eq!(result.stop_reason, Some("max_discovered_steps".to_string()));
908    }
909
910    #[tokio::test]
911    async fn test_max_depth_limit() {
912        let store = Arc::new(InMemoryCheckpointStore::new());
913        let mut runner = Runner::new(store);
914        // Each step discovers one more step, creating a deep chain
915        let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 20));
916
917        let policy = LongRunningExecutionPolicy {
918            max_discovery_depth: Some(3),
919            max_discovered_steps: Some(100), // High limit to not hit this
920            ..LongRunningExecutionPolicy::standard()
921        };
922
923        let result =
924            AgenticLoop::run_with_details(&mut runner, callable, "task".to_string(), policy)
925                .await
926                .unwrap();
927
928        // Should stop at depth limit
929        assert!(result.max_depth_reached <= 4); // 0-indexed, so max 3 means depth 0-3
930        assert!(!result.completed);
931        assert_eq!(result.stop_reason, Some("max_discovery_depth".to_string()));
932    }
933
934    #[tokio::test]
935    async fn test_error_propagation() {
936        let store = Arc::new(InMemoryCheckpointStore::new());
937        let mut runner = Runner::new(store);
938        // Will fail after 2 successful calls
939        let callable: DynCallable = Arc::new(FailingCallable::new("failing", 2));
940        let policy = LongRunningExecutionPolicy::standard();
941
942        let result = AgenticLoop::run(&mut runner, callable, "task".to_string(), policy).await;
943
944        assert!(result.is_err());
945        assert!(result
946            .unwrap_err()
947            .to_string()
948            .contains("Simulated failure"));
949    }
950
951    #[tokio::test]
952    async fn test_history_tracking() {
953        let store = Arc::new(InMemoryCheckpointStore::new());
954        let mut runner = Runner::new(store);
955        let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 2));
956        let policy = LongRunningExecutionPolicy::standard();
957
958        let result =
959            AgenticLoop::run_with_details(&mut runner, callable, "start".to_string(), policy)
960                .await
961                .unwrap();
962
963        // Should have history entries
964        assert!(!result.history.is_empty());
965        assert!(result.history[0].contains("User: start"));
966    }
967
968    #[tokio::test]
969    async fn test_priority_ordering() {
970        // Test that higher priority steps are executed first
971        // This is implicitly tested through the sorting logic
972        let output = r#"{
973            "result": "test",
974            "discovered_steps": [
975                {"input": "low priority", "priority": 10},
976                {"input": "high priority", "priority": 90},
977                {"input": "medium priority", "priority": 50}
978            ]
979        }"#;
980
981        let discovery = DiscoveryOutput::parse(output);
982        let mut sorted = discovery.discovered_steps;
983        sorted.sort_by(|a, b| b.priority.cmp(&a.priority));
984
985        assert_eq!(sorted[0].input, "high priority");
986        assert_eq!(sorted[1].input, "medium priority");
987        assert_eq!(sorted[2].input, "low priority");
988    }
989
990    #[tokio::test]
991    async fn test_checkpointing_on_discovery() {
992        let store = Arc::new(InMemoryCheckpointStore::new());
993        let mut runner = Runner::new(store.clone());
994        let callable: DynCallable = Arc::new(DiscoveryCallable::new("discover", 2));
995
996        let policy = LongRunningExecutionPolicy {
997            checkpointing: crate::policy::CheckpointPolicy {
998                on_discovery: true,
999                ..Default::default()
1000            },
1001            ..LongRunningExecutionPolicy::standard()
1002        };
1003
1004        let _ = AgenticLoop::run(&mut runner, callable, "task".to_string(), policy).await;
1005
1006        // Should have created checkpoints
1007        let checkpoint = store
1008            .load_latest(runner.execution_id().as_str())
1009            .await
1010            .unwrap();
1011        assert!(checkpoint.is_some());
1012    }
1013}