1use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::time::Duration;
9
10use crate::reasoning::conversation::Conversation;
11use crate::reasoning::inference::{ToolDefinition, Usage};
12use crate::types::AgentId;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct Observation {
20 pub source: String,
22 pub content: String,
24 pub is_error: bool,
26 #[serde(default, skip_serializing_if = "Option::is_none")]
30 pub call_id: Option<String>,
31 #[serde(default)]
33 pub metadata: HashMap<String, String>,
34}
35
36impl Observation {
37 pub fn tool_result(tool_name: impl Into<String>, content: impl Into<String>) -> Self {
39 Self {
40 source: tool_name.into(),
41 content: content.into(),
42 is_error: false,
43 call_id: None,
44 metadata: HashMap::new(),
45 }
46 }
47
48 pub fn tool_error(tool_name: impl Into<String>, error: impl Into<String>) -> Self {
50 Self {
51 source: tool_name.into(),
52 content: error.into(),
53 is_error: true,
54 call_id: None,
55 metadata: HashMap::new(),
56 }
57 }
58
59 pub fn policy_denial(reason: impl Into<String>) -> Self {
61 Self {
62 source: "policy_gate".into(),
63 content: reason.into(),
64 is_error: true,
65 call_id: None,
66 metadata: HashMap::new(),
67 }
68 }
69
70 pub fn with_call_id(mut self, call_id: impl Into<String>) -> Self {
72 self.call_id = Some(call_id.into());
73 self
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum ProposedAction {
80 ToolCall {
82 call_id: String,
84 name: String,
86 arguments: String,
88 },
89 Delegate {
91 target: String,
93 message: String,
95 },
96 Respond {
98 content: String,
100 },
101 Terminate {
103 reason: String,
105 output: String,
107 },
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LoopDecision {
113 Allow,
115 Deny { reason: String },
117 Modify {
119 modified_action: Box<ProposedAction>,
120 reason: String,
121 },
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct LoopState {
127 pub agent_id: AgentId,
129 pub iteration: u32,
131 pub total_usage: Usage,
133 pub conversation: Conversation,
135 pub pending_observations: Vec<Observation>,
137 pub started_at: chrono::DateTime<chrono::Utc>,
139 pub current_phase: String,
141 #[serde(default)]
143 pub metadata: HashMap<String, serde_json::Value>,
144}
145
146impl LoopState {
147 pub fn new(agent_id: AgentId, conversation: Conversation) -> Self {
149 Self {
150 agent_id,
151 iteration: 0,
152 total_usage: Usage::default(),
153 conversation,
154 pending_observations: Vec::new(),
155 started_at: chrono::Utc::now(),
156 current_phase: "initialized".into(),
157 metadata: HashMap::new(),
158 }
159 }
160
161 pub fn add_usage(&mut self, usage: &Usage) {
163 self.total_usage.prompt_tokens += usage.prompt_tokens;
164 self.total_usage.completion_tokens += usage.completion_tokens;
165 self.total_usage.total_tokens += usage.total_tokens;
166 }
167
168 pub fn elapsed(&self) -> chrono::Duration {
170 chrono::Utc::now() - self.started_at
171 }
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct LoopConfig {
177 pub max_iterations: u32,
179 pub max_total_tokens: u32,
181 pub timeout: Duration,
183 pub default_recovery: RecoveryStrategy,
185 pub tool_timeout: Duration,
187 pub max_concurrent_tools: usize,
189 pub context_token_budget: usize,
191 #[serde(default = "default_loop_temperature")]
195 pub temperature: f32,
196 #[serde(default, skip_serializing_if = "Vec::is_empty")]
198 pub tool_definitions: Vec<ToolDefinition>,
199 #[cfg(feature = "orga-adaptive")]
201 #[serde(default, skip_serializing_if = "Option::is_none")]
202 pub tool_profile: Option<crate::reasoning::tool_profile::ToolProfile>,
203 #[cfg(feature = "orga-adaptive")]
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub step_iteration: Option<crate::reasoning::progress_tracker::StepIterationConfig>,
207 #[cfg(feature = "orga-adaptive")]
209 #[serde(default, skip_serializing_if = "Option::is_none")]
210 pub pre_hydration: Option<crate::reasoning::pre_hydrate::PreHydrationConfig>,
211}
212
213fn default_loop_temperature() -> f32 {
214 0.3
217}
218
219impl Default for LoopConfig {
220 fn default() -> Self {
221 Self {
222 max_iterations: 25,
223 max_total_tokens: 100_000,
224 timeout: Duration::from_secs(300),
225 default_recovery: RecoveryStrategy::Retry {
226 max_attempts: 2,
227 base_delay: Duration::from_millis(500),
228 },
229 tool_timeout: Duration::from_secs(30),
230 max_concurrent_tools: 5,
231 context_token_budget: 32_000,
232 temperature: default_loop_temperature(),
233 tool_definitions: Vec::new(),
234 #[cfg(feature = "orga-adaptive")]
235 tool_profile: None,
236 #[cfg(feature = "orga-adaptive")]
237 step_iteration: None,
238 #[cfg(feature = "orga-adaptive")]
239 pre_hydration: None,
240 }
241 }
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
249pub enum RecoveryStrategy {
250 Retry {
252 max_attempts: u32,
253 base_delay: Duration,
254 },
255 Fallback { alternatives: Vec<String> },
257 CachedResult { max_staleness: Duration },
259 LlmRecovery { max_recovery_attempts: u32 },
261 Escalate {
263 queue: String,
264 context_snapshot: bool,
265 },
266 DeadLetter,
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct LoopResult {
273 pub output: String,
275 pub iterations: u32,
277 pub total_usage: Usage,
279 pub termination_reason: TerminationReason,
281 pub duration: Duration,
283 pub conversation: Conversation,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub enum TerminationReason {
290 Completed,
292 MaxIterations,
294 MaxTokens,
296 Timeout,
298 PolicyDenial { reason: String },
300 Error { message: String },
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
306pub enum LoopEvent {
307 Started {
309 agent_id: AgentId,
310 config: Box<LoopConfig>,
311 },
312 ReasoningComplete {
314 iteration: u32,
315 actions: Vec<ProposedAction>,
316 usage: Usage,
317 },
318 PolicyEvaluated {
320 iteration: u32,
321 action_count: usize,
322 denied_count: usize,
323 },
324 ToolsDispatched {
326 iteration: u32,
327 tool_count: usize,
328 duration: Duration,
329 },
330 ObservationsCollected {
332 iteration: u32,
333 observation_count: usize,
334 },
335 Terminated {
337 reason: TerminationReason,
338 iterations: u32,
339 total_usage: Usage,
340 duration: Duration,
341 },
342 RecoveryTriggered {
344 iteration: u32,
345 tool_name: String,
346 strategy: RecoveryStrategy,
347 error: String,
348 },
349 #[cfg(feature = "orga-adaptive")]
351 StepLimitReached {
352 step_id: String,
353 attempts: u32,
354 reason: String,
355 },
356 #[cfg(feature = "orga-adaptive")]
358 PreHydrationComplete {
359 references_found: usize,
360 references_resolved: usize,
361 references_failed: usize,
362 total_tokens: usize,
363 },
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct JournalEntry {
374 pub sequence: u64,
376 pub timestamp: chrono::DateTime<chrono::Utc>,
378 pub agent_id: AgentId,
380 pub iteration: u32,
382 pub event: LoopEvent,
384}
385
386#[async_trait::async_trait]
390pub trait JournalWriter: Send + Sync {
391 async fn append(&self, entry: JournalEntry) -> Result<(), JournalError>;
393 async fn next_sequence(&self) -> u64;
395}
396
397pub struct BufferedJournal {
403 sequence: std::sync::atomic::AtomicU64,
404 capacity: usize,
405 buffer: tokio::sync::Mutex<VecDeque<JournalEntry>>,
406}
407
408impl Default for BufferedJournal {
409 fn default() -> Self {
410 Self::new(1000)
411 }
412}
413
414impl BufferedJournal {
415 pub fn new(capacity: usize) -> Self {
417 Self {
418 sequence: std::sync::atomic::AtomicU64::new(0),
419 capacity,
420 buffer: tokio::sync::Mutex::new(VecDeque::with_capacity(capacity)),
421 }
422 }
423
424 pub async fn entries(&self) -> Vec<JournalEntry> {
426 self.buffer.lock().await.iter().cloned().collect()
427 }
428
429 pub async fn drain(&self) -> Vec<JournalEntry> {
431 self.buffer.lock().await.drain(..).collect()
432 }
433}
434
435#[async_trait::async_trait]
436impl JournalWriter for BufferedJournal {
437 async fn append(&self, entry: JournalEntry) -> Result<(), JournalError> {
438 self.sequence
439 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
440 let mut buf = self.buffer.lock().await;
441 if buf.len() >= self.capacity {
442 buf.pop_front();
443 }
444 buf.push_back(entry);
445 Ok(())
446 }
447
448 async fn next_sequence(&self) -> u64 {
449 self.sequence.load(std::sync::atomic::Ordering::Relaxed)
450 }
451}
452
453#[derive(Debug, thiserror::Error)]
455pub enum JournalError {
456 #[error("Journal write failed: {0}")]
457 WriteFailed(String),
458 #[error("Journal read failed: {0}")]
459 ReadFailed(String),
460 #[error("Journal sequence error: expected {expected}, got {actual}")]
461 SequenceError { expected: u64, actual: u64 },
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[test]
469 fn test_observation_constructors() {
470 let tool_result = Observation::tool_result("search", "found 5 results");
471 assert_eq!(tool_result.source, "search");
472 assert!(!tool_result.is_error);
473
474 let tool_error = Observation::tool_error("search", "timeout");
475 assert!(tool_error.is_error);
476
477 let denial = Observation::policy_denial("not authorized");
478 assert_eq!(denial.source, "policy_gate");
479 assert!(denial.is_error);
480 }
481
482 #[test]
483 fn test_loop_state_new() {
484 let state = LoopState::new(AgentId::new(), Conversation::with_system("test"));
485 assert_eq!(state.iteration, 0);
486 assert_eq!(state.total_usage.total_tokens, 0);
487 assert!(state.pending_observations.is_empty());
488 }
489
490 #[test]
491 fn test_loop_state_add_usage() {
492 let mut state = LoopState::new(AgentId::new(), Conversation::new());
493 state.add_usage(&Usage {
494 prompt_tokens: 100,
495 completion_tokens: 50,
496 total_tokens: 150,
497 });
498 state.add_usage(&Usage {
499 prompt_tokens: 200,
500 completion_tokens: 80,
501 total_tokens: 280,
502 });
503 assert_eq!(state.total_usage.prompt_tokens, 300);
504 assert_eq!(state.total_usage.completion_tokens, 130);
505 assert_eq!(state.total_usage.total_tokens, 430);
506 }
507
508 #[test]
509 fn test_loop_config_default() {
510 let config = LoopConfig::default();
511 assert_eq!(config.max_iterations, 25);
512 assert_eq!(config.max_total_tokens, 100_000);
513 assert_eq!(config.max_concurrent_tools, 5);
514 }
515
516 #[test]
517 fn test_proposed_action_variants() {
518 let tc = ProposedAction::ToolCall {
519 call_id: "c1".into(),
520 name: "search".into(),
521 arguments: "{}".into(),
522 };
523 assert!(matches!(tc, ProposedAction::ToolCall { .. }));
524
525 let respond = ProposedAction::Respond {
526 content: "done".into(),
527 };
528 assert!(matches!(respond, ProposedAction::Respond { .. }));
529
530 let terminate = ProposedAction::Terminate {
531 reason: "finished".into(),
532 output: "result".into(),
533 };
534 assert!(matches!(terminate, ProposedAction::Terminate { .. }));
535 }
536
537 #[test]
538 fn test_recovery_strategy_serde() {
539 let retry = RecoveryStrategy::Retry {
540 max_attempts: 3,
541 base_delay: Duration::from_millis(100),
542 };
543 let json = serde_json::to_string(&retry).unwrap();
544 let _restored: RecoveryStrategy = serde_json::from_str(&json).unwrap();
545
546 let llm = RecoveryStrategy::LlmRecovery {
547 max_recovery_attempts: 1,
548 };
549 let json = serde_json::to_string(&llm).unwrap();
550 assert!(json.contains("LlmRecovery"));
551 }
552
553 fn make_journal_entry(seq: u64, iteration: u32) -> JournalEntry {
554 JournalEntry {
555 sequence: seq,
556 timestamp: chrono::Utc::now(),
557 agent_id: AgentId::new(),
558 iteration,
559 event: LoopEvent::Started {
560 agent_id: AgentId::new(),
561 config: Box::new(LoopConfig::default()),
562 },
563 }
564 }
565
566 #[tokio::test]
567 async fn test_buffered_journal_retains_entries() {
568 let journal = BufferedJournal::new(100);
569 assert_eq!(journal.next_sequence().await, 0);
570
571 journal.append(make_journal_entry(0, 0)).await.unwrap();
572 journal.append(make_journal_entry(1, 1)).await.unwrap();
573
574 assert_eq!(journal.next_sequence().await, 2);
575
576 let entries = journal.entries().await;
577 assert_eq!(entries.len(), 2);
578 assert_eq!(entries[0].sequence, 0);
579 assert_eq!(entries[1].sequence, 1);
580 }
581
582 #[tokio::test]
583 async fn test_buffered_journal_overflow_evicts_oldest() {
584 let journal = BufferedJournal::new(3);
585
586 for i in 0..5u64 {
587 journal
588 .append(make_journal_entry(i, i as u32))
589 .await
590 .unwrap();
591 }
592
593 let entries = journal.entries().await;
594 assert_eq!(entries.len(), 3);
595 assert_eq!(entries[0].sequence, 2);
597 assert_eq!(entries[1].sequence, 3);
598 assert_eq!(entries[2].sequence, 4);
599 }
600
601 #[tokio::test]
602 async fn test_buffered_journal_drain_empties_buffer() {
603 let journal = BufferedJournal::new(100);
604
605 journal.append(make_journal_entry(0, 0)).await.unwrap();
606 journal.append(make_journal_entry(1, 1)).await.unwrap();
607
608 let drained = journal.drain().await;
609 assert_eq!(drained.len(), 2);
610
611 let entries = journal.entries().await;
613 assert!(entries.is_empty());
614
615 assert_eq!(journal.next_sequence().await, 2);
617 }
618
619 #[tokio::test]
620 async fn test_buffered_journal_entries_returns_all() {
621 let journal = BufferedJournal::new(100);
622
623 for i in 0..10u64 {
624 journal
625 .append(make_journal_entry(i, i as u32))
626 .await
627 .unwrap();
628 }
629
630 let entries = journal.entries().await;
631 assert_eq!(entries.len(), 10);
632 for (idx, entry) in entries.iter().enumerate() {
633 assert_eq!(entry.sequence, idx as u64);
634 }
635 }
636
637 #[test]
638 fn test_loop_event_serde() {
639 let event = LoopEvent::Terminated {
640 reason: TerminationReason::Completed,
641 iterations: 5,
642 total_usage: Usage {
643 prompt_tokens: 1000,
644 completion_tokens: 500,
645 total_tokens: 1500,
646 },
647 duration: Duration::from_secs(10),
648 };
649 let json = serde_json::to_string(&event).unwrap();
650 assert!(json.contains("Terminated"));
651 }
652}