Skip to main content

symbi_runtime/reasoning/
loop_types.rs

1//! Core types for the reasoning loop
2//!
3//! Defines observations, proposed actions, loop configuration, state,
4//! and recovery strategies for the observe-reason-gate-act cycle.
5
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::time::Duration;
9
10use crate::reasoning::conversation::Conversation;
11use crate::reasoning::inference::{ToolDefinition, Usage};
12use crate::types::AgentId;
13
14/// An observation that feeds into the reasoning step.
15///
16/// Observations come from tool results, environment state, policy feedback,
17/// or external events. The loop collects them before each reasoning step.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct Observation {
20    /// Source of the observation (e.g., tool name, "policy_gate", "environment").
21    pub source: String,
22    /// The observation content.
23    pub content: String,
24    /// Whether this observation indicates an error.
25    pub is_error: bool,
26    /// The tool call ID that produced this observation.
27    /// When present, used as the `tool_use_id` in conversation messages
28    /// to correlate tool results with the originating tool_use block.
29    #[serde(default, skip_serializing_if = "Option::is_none")]
30    pub call_id: Option<String>,
31    /// Metadata for logging and auditing.
32    #[serde(default)]
33    pub metadata: HashMap<String, String>,
34}
35
36impl Observation {
37    /// Create a tool result observation.
38    pub fn tool_result(tool_name: impl Into<String>, content: impl Into<String>) -> Self {
39        Self {
40            source: tool_name.into(),
41            content: content.into(),
42            is_error: false,
43            call_id: None,
44            metadata: HashMap::new(),
45        }
46    }
47
48    /// Create a tool error observation.
49    pub fn tool_error(tool_name: impl Into<String>, error: impl Into<String>) -> Self {
50        Self {
51            source: tool_name.into(),
52            content: error.into(),
53            is_error: true,
54            call_id: None,
55            metadata: HashMap::new(),
56        }
57    }
58
59    /// Create a policy denial observation.
60    pub fn policy_denial(reason: impl Into<String>) -> Self {
61        Self {
62            source: "policy_gate".into(),
63            content: reason.into(),
64            is_error: true,
65            call_id: None,
66            metadata: HashMap::new(),
67        }
68    }
69
70    /// Attach a tool call ID to this observation.
71    pub fn with_call_id(mut self, call_id: impl Into<String>) -> Self {
72        self.call_id = Some(call_id.into());
73        self
74    }
75}
76
77/// An action proposed by the reasoning step, pending policy evaluation.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum ProposedAction {
80    /// Call a tool with the given name and arguments.
81    ToolCall {
82        /// Unique call identifier.
83        call_id: String,
84        /// Tool name.
85        name: String,
86        /// JSON-encoded arguments.
87        arguments: String,
88    },
89    /// Delegate work to another agent.
90    Delegate {
91        /// Target agent identifier or name.
92        target: String,
93        /// Message to send.
94        message: String,
95    },
96    /// Respond to the user/caller with content (text or structured).
97    Respond {
98        /// Response content.
99        content: String,
100    },
101    /// Terminate the loop (agent has finished its task).
102    Terminate {
103        /// Reason for termination.
104        reason: String,
105        /// Final output.
106        output: String,
107    },
108}
109
110/// The policy gate's decision for a proposed action.
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LoopDecision {
113    /// Allow the action to proceed.
114    Allow,
115    /// Deny the action with a reason (fed back as observation).
116    Deny { reason: String },
117    /// Allow with modifications (e.g., parameter redaction).
118    Modify {
119        modified_action: Box<ProposedAction>,
120        reason: String,
121    },
122}
123
124/// Runtime state of the reasoning loop.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct LoopState {
127    /// Agent identity.
128    pub agent_id: AgentId,
129    /// Current iteration number (0-indexed).
130    pub iteration: u32,
131    /// Cumulative token usage.
132    pub total_usage: Usage,
133    /// The conversation so far.
134    pub conversation: Conversation,
135    /// Pending observations to process.
136    pub pending_observations: Vec<Observation>,
137    /// Timestamp when the loop started.
138    pub started_at: chrono::DateTime<chrono::Utc>,
139    /// Current loop phase for logging.
140    pub current_phase: String,
141    /// Arbitrary metadata carried across iterations.
142    #[serde(default)]
143    pub metadata: HashMap<String, serde_json::Value>,
144}
145
146impl LoopState {
147    /// Create initial state for a new loop.
148    pub fn new(agent_id: AgentId, conversation: Conversation) -> Self {
149        Self {
150            agent_id,
151            iteration: 0,
152            total_usage: Usage::default(),
153            conversation,
154            pending_observations: Vec::new(),
155            started_at: chrono::Utc::now(),
156            current_phase: "initialized".into(),
157            metadata: HashMap::new(),
158        }
159    }
160
161    /// Accumulate token usage from an inference response.
162    pub fn add_usage(&mut self, usage: &Usage) {
163        self.total_usage.prompt_tokens += usage.prompt_tokens;
164        self.total_usage.completion_tokens += usage.completion_tokens;
165        self.total_usage.total_tokens += usage.total_tokens;
166    }
167
168    /// Get elapsed time since loop start.
169    pub fn elapsed(&self) -> chrono::Duration {
170        chrono::Utc::now() - self.started_at
171    }
172}
173
174/// Configuration for a reasoning loop instance.
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct LoopConfig {
177    /// Maximum iterations before forced termination.
178    pub max_iterations: u32,
179    /// Maximum total tokens before forced termination.
180    pub max_total_tokens: u32,
181    /// Maximum wall-clock time for the entire loop.
182    pub timeout: Duration,
183    /// Default recovery strategy for tool failures.
184    pub default_recovery: RecoveryStrategy,
185    /// Per-tool timeout for individual tool calls.
186    pub tool_timeout: Duration,
187    /// Maximum concurrent tool calls during parallel dispatch.
188    pub max_concurrent_tools: usize,
189    /// Token budget for context window management.
190    pub context_token_budget: usize,
191    /// Sampling temperature applied to inference calls. 0.0 = deterministic.
192    /// Defaults to the same value as `InferenceOptions::default()` to preserve
193    /// existing behavior for callers that don't set this explicitly.
194    #[serde(default = "default_loop_temperature")]
195    pub temperature: f32,
196    /// Tool definitions available during this loop run.
197    #[serde(default, skip_serializing_if = "Vec::is_empty")]
198    pub tool_definitions: Vec<ToolDefinition>,
199    /// Tool profile for filtering tools visible to the LLM.
200    #[cfg(feature = "orga-adaptive")]
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub tool_profile: Option<crate::reasoning::tool_profile::ToolProfile>,
203    /// Per-step iteration limits for stuck loop detection.
204    #[cfg(feature = "orga-adaptive")]
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub step_iteration: Option<crate::reasoning::progress_tracker::StepIterationConfig>,
207    /// Pre-hydration configuration for deterministic context pre-fetch.
208    #[cfg(feature = "orga-adaptive")]
209    #[serde(default, skip_serializing_if = "Option::is_none")]
210    pub pre_hydration: Option<crate::reasoning::pre_hydrate::PreHydrationConfig>,
211}
212
213fn default_loop_temperature() -> f32 {
214    // Match the InferenceOptions default to preserve existing behavior for
215    // callers that don't override this. Eval/test workloads should set 0.0.
216    0.3
217}
218
219impl Default for LoopConfig {
220    fn default() -> Self {
221        Self {
222            max_iterations: 25,
223            max_total_tokens: 100_000,
224            timeout: Duration::from_secs(300),
225            default_recovery: RecoveryStrategy::Retry {
226                max_attempts: 2,
227                base_delay: Duration::from_millis(500),
228            },
229            tool_timeout: Duration::from_secs(30),
230            max_concurrent_tools: 5,
231            context_token_budget: 32_000,
232            temperature: default_loop_temperature(),
233            tool_definitions: Vec::new(),
234            #[cfg(feature = "orga-adaptive")]
235            tool_profile: None,
236            #[cfg(feature = "orga-adaptive")]
237            step_iteration: None,
238            #[cfg(feature = "orga-adaptive")]
239            pre_hydration: None,
240        }
241    }
242}
243
244/// Strategy for recovering from tool execution failures.
245///
246/// By default, recovery is deterministic (retry, fallback, cache, dead letter).
247/// LLM-driven recovery is opt-in and rate-limited to prevent failure amplification.
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub enum RecoveryStrategy {
250    /// Retry with exponential backoff.
251    Retry {
252        max_attempts: u32,
253        base_delay: Duration,
254    },
255    /// Try alternative tools in order.
256    Fallback { alternatives: Vec<String> },
257    /// Return a cached result if available and not too stale.
258    CachedResult { max_staleness: Duration },
259    /// Ask the LLM to propose an alternative approach (opt-in, rate-limited).
260    LlmRecovery { max_recovery_attempts: u32 },
261    /// Escalate to a human or external system.
262    Escalate {
263        queue: String,
264        context_snapshot: bool,
265    },
266    /// Send to dead letter queue and continue without the result.
267    DeadLetter,
268}
269
270/// The final result of a completed reasoning loop.
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct LoopResult {
273    /// The final response content.
274    pub output: String,
275    /// Total iterations executed.
276    pub iterations: u32,
277    /// Total token usage.
278    pub total_usage: Usage,
279    /// How the loop terminated.
280    pub termination_reason: TerminationReason,
281    /// Wall-clock duration.
282    pub duration: Duration,
283    /// The full conversation history.
284    pub conversation: Conversation,
285}
286
287/// Why the loop terminated.
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub enum TerminationReason {
290    /// Agent produced a final response (normal completion).
291    Completed,
292    /// Hit the maximum iteration limit.
293    MaxIterations,
294    /// Hit the token budget limit.
295    MaxTokens,
296    /// Hit the timeout.
297    Timeout,
298    /// Policy denied a critical action with no recovery path.
299    PolicyDenial { reason: String },
300    /// An unrecoverable error occurred.
301    Error { message: String },
302}
303
304/// Events emitted during loop execution for observability.
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub enum LoopEvent {
307    /// Loop started.
308    Started {
309        agent_id: AgentId,
310        config: Box<LoopConfig>,
311    },
312    /// Reasoning step completed.
313    ReasoningComplete {
314        iteration: u32,
315        actions: Vec<ProposedAction>,
316        usage: Usage,
317    },
318    /// Policy evaluation completed.
319    PolicyEvaluated {
320        iteration: u32,
321        action_count: usize,
322        denied_count: usize,
323    },
324    /// Tool dispatch completed.
325    ToolsDispatched {
326        iteration: u32,
327        tool_count: usize,
328        duration: Duration,
329    },
330    /// Observations collected.
331    ObservationsCollected {
332        iteration: u32,
333        observation_count: usize,
334    },
335    /// Loop terminated.
336    Terminated {
337        reason: TerminationReason,
338        iterations: u32,
339        total_usage: Usage,
340        duration: Duration,
341    },
342    /// Error recovery triggered.
343    RecoveryTriggered {
344        iteration: u32,
345        tool_name: String,
346        strategy: RecoveryStrategy,
347        error: String,
348    },
349    /// A step hit its reattempt limit (emitted by coordinators).
350    #[cfg(feature = "orga-adaptive")]
351    StepLimitReached {
352        step_id: String,
353        attempts: u32,
354        reason: String,
355    },
356    /// Pre-hydration phase completed.
357    #[cfg(feature = "orga-adaptive")]
358    PreHydrationComplete {
359        references_found: usize,
360        references_resolved: usize,
361        references_failed: usize,
362        total_tokens: usize,
363    },
364}
365
366/// A journal entry for durable execution.
367///
368/// In Phase 2, entries are emitted via the `JournalWriter` trait.
369/// The default implementation is `BufferedJournal` which retains entries
370/// in a bounded in-memory ring buffer for observability.
371/// Phase 5 adds `DurableJournal` backed by SQLite/Postgres.
372#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct JournalEntry {
374    /// Monotonically increasing sequence number.
375    pub sequence: u64,
376    /// When this entry was created.
377    pub timestamp: chrono::DateTime<chrono::Utc>,
378    /// The agent this entry belongs to.
379    pub agent_id: AgentId,
380    /// The iteration this entry was created in.
381    pub iteration: u32,
382    /// The event recorded.
383    pub event: LoopEvent,
384}
385
386/// Trait for writing journal entries.
387///
388/// Default implementation is `BufferedJournal`. Phase 5 provides `DurableJournal`.
389#[async_trait::async_trait]
390pub trait JournalWriter: Send + Sync {
391    /// Append an entry to the journal.
392    async fn append(&self, entry: JournalEntry) -> Result<(), JournalError>;
393    /// Get the next sequence number.
394    async fn next_sequence(&self) -> u64;
395}
396
397/// In-memory journal that retains entries in a bounded ring buffer.
398///
399/// Provides observability without requiring durable storage. Events are
400/// queryable via `entries()` and consumable via `drain()`. When the buffer
401/// reaches capacity, the oldest entries are evicted.
402pub struct BufferedJournal {
403    sequence: std::sync::atomic::AtomicU64,
404    capacity: usize,
405    buffer: tokio::sync::Mutex<VecDeque<JournalEntry>>,
406}
407
408impl Default for BufferedJournal {
409    fn default() -> Self {
410        Self::new(1000)
411    }
412}
413
414impl BufferedJournal {
415    /// Create a new buffered journal with the given capacity.
416    pub fn new(capacity: usize) -> Self {
417        Self {
418            sequence: std::sync::atomic::AtomicU64::new(0),
419            capacity,
420            buffer: tokio::sync::Mutex::new(VecDeque::with_capacity(capacity)),
421        }
422    }
423
424    /// Return all currently buffered entries (oldest first).
425    pub async fn entries(&self) -> Vec<JournalEntry> {
426        self.buffer.lock().await.iter().cloned().collect()
427    }
428
429    /// Consume and return all buffered entries, emptying the buffer.
430    pub async fn drain(&self) -> Vec<JournalEntry> {
431        self.buffer.lock().await.drain(..).collect()
432    }
433}
434
435#[async_trait::async_trait]
436impl JournalWriter for BufferedJournal {
437    async fn append(&self, entry: JournalEntry) -> Result<(), JournalError> {
438        self.sequence
439            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
440        let mut buf = self.buffer.lock().await;
441        if buf.len() >= self.capacity {
442            buf.pop_front();
443        }
444        buf.push_back(entry);
445        Ok(())
446    }
447
448    async fn next_sequence(&self) -> u64 {
449        self.sequence.load(std::sync::atomic::Ordering::Relaxed)
450    }
451}
452
453/// Errors from the journal system.
454#[derive(Debug, thiserror::Error)]
455pub enum JournalError {
456    #[error("Journal write failed: {0}")]
457    WriteFailed(String),
458    #[error("Journal read failed: {0}")]
459    ReadFailed(String),
460    #[error("Journal sequence error: expected {expected}, got {actual}")]
461    SequenceError { expected: u64, actual: u64 },
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[test]
469    fn test_observation_constructors() {
470        let tool_result = Observation::tool_result("search", "found 5 results");
471        assert_eq!(tool_result.source, "search");
472        assert!(!tool_result.is_error);
473
474        let tool_error = Observation::tool_error("search", "timeout");
475        assert!(tool_error.is_error);
476
477        let denial = Observation::policy_denial("not authorized");
478        assert_eq!(denial.source, "policy_gate");
479        assert!(denial.is_error);
480    }
481
482    #[test]
483    fn test_loop_state_new() {
484        let state = LoopState::new(AgentId::new(), Conversation::with_system("test"));
485        assert_eq!(state.iteration, 0);
486        assert_eq!(state.total_usage.total_tokens, 0);
487        assert!(state.pending_observations.is_empty());
488    }
489
490    #[test]
491    fn test_loop_state_add_usage() {
492        let mut state = LoopState::new(AgentId::new(), Conversation::new());
493        state.add_usage(&Usage {
494            prompt_tokens: 100,
495            completion_tokens: 50,
496            total_tokens: 150,
497        });
498        state.add_usage(&Usage {
499            prompt_tokens: 200,
500            completion_tokens: 80,
501            total_tokens: 280,
502        });
503        assert_eq!(state.total_usage.prompt_tokens, 300);
504        assert_eq!(state.total_usage.completion_tokens, 130);
505        assert_eq!(state.total_usage.total_tokens, 430);
506    }
507
508    #[test]
509    fn test_loop_config_default() {
510        let config = LoopConfig::default();
511        assert_eq!(config.max_iterations, 25);
512        assert_eq!(config.max_total_tokens, 100_000);
513        assert_eq!(config.max_concurrent_tools, 5);
514    }
515
516    #[test]
517    fn test_proposed_action_variants() {
518        let tc = ProposedAction::ToolCall {
519            call_id: "c1".into(),
520            name: "search".into(),
521            arguments: "{}".into(),
522        };
523        assert!(matches!(tc, ProposedAction::ToolCall { .. }));
524
525        let respond = ProposedAction::Respond {
526            content: "done".into(),
527        };
528        assert!(matches!(respond, ProposedAction::Respond { .. }));
529
530        let terminate = ProposedAction::Terminate {
531            reason: "finished".into(),
532            output: "result".into(),
533        };
534        assert!(matches!(terminate, ProposedAction::Terminate { .. }));
535    }
536
537    #[test]
538    fn test_recovery_strategy_serde() {
539        let retry = RecoveryStrategy::Retry {
540            max_attempts: 3,
541            base_delay: Duration::from_millis(100),
542        };
543        let json = serde_json::to_string(&retry).unwrap();
544        let _restored: RecoveryStrategy = serde_json::from_str(&json).unwrap();
545
546        let llm = RecoveryStrategy::LlmRecovery {
547            max_recovery_attempts: 1,
548        };
549        let json = serde_json::to_string(&llm).unwrap();
550        assert!(json.contains("LlmRecovery"));
551    }
552
553    fn make_journal_entry(seq: u64, iteration: u32) -> JournalEntry {
554        JournalEntry {
555            sequence: seq,
556            timestamp: chrono::Utc::now(),
557            agent_id: AgentId::new(),
558            iteration,
559            event: LoopEvent::Started {
560                agent_id: AgentId::new(),
561                config: Box::new(LoopConfig::default()),
562            },
563        }
564    }
565
566    #[tokio::test]
567    async fn test_buffered_journal_retains_entries() {
568        let journal = BufferedJournal::new(100);
569        assert_eq!(journal.next_sequence().await, 0);
570
571        journal.append(make_journal_entry(0, 0)).await.unwrap();
572        journal.append(make_journal_entry(1, 1)).await.unwrap();
573
574        assert_eq!(journal.next_sequence().await, 2);
575
576        let entries = journal.entries().await;
577        assert_eq!(entries.len(), 2);
578        assert_eq!(entries[0].sequence, 0);
579        assert_eq!(entries[1].sequence, 1);
580    }
581
582    #[tokio::test]
583    async fn test_buffered_journal_overflow_evicts_oldest() {
584        let journal = BufferedJournal::new(3);
585
586        for i in 0..5u64 {
587            journal
588                .append(make_journal_entry(i, i as u32))
589                .await
590                .unwrap();
591        }
592
593        let entries = journal.entries().await;
594        assert_eq!(entries.len(), 3);
595        // Oldest two (seq 0, 1) were evicted
596        assert_eq!(entries[0].sequence, 2);
597        assert_eq!(entries[1].sequence, 3);
598        assert_eq!(entries[2].sequence, 4);
599    }
600
601    #[tokio::test]
602    async fn test_buffered_journal_drain_empties_buffer() {
603        let journal = BufferedJournal::new(100);
604
605        journal.append(make_journal_entry(0, 0)).await.unwrap();
606        journal.append(make_journal_entry(1, 1)).await.unwrap();
607
608        let drained = journal.drain().await;
609        assert_eq!(drained.len(), 2);
610
611        // Buffer is now empty
612        let entries = journal.entries().await;
613        assert!(entries.is_empty());
614
615        // Sequence counter is not reset by drain
616        assert_eq!(journal.next_sequence().await, 2);
617    }
618
619    #[tokio::test]
620    async fn test_buffered_journal_entries_returns_all() {
621        let journal = BufferedJournal::new(100);
622
623        for i in 0..10u64 {
624            journal
625                .append(make_journal_entry(i, i as u32))
626                .await
627                .unwrap();
628        }
629
630        let entries = journal.entries().await;
631        assert_eq!(entries.len(), 10);
632        for (idx, entry) in entries.iter().enumerate() {
633            assert_eq!(entry.sequence, idx as u64);
634        }
635    }
636
637    #[test]
638    fn test_loop_event_serde() {
639        let event = LoopEvent::Terminated {
640            reason: TerminationReason::Completed,
641            iterations: 5,
642            total_usage: Usage {
643                prompt_tokens: 1000,
644                completion_tokens: 500,
645                total_tokens: 1500,
646            },
647            duration: Duration::from_secs(10),
648        };
649        let json = serde_json::to_string(&event).unwrap();
650        assert!(json.contains("Terminated"));
651    }
652}