use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::Duration;
use crate::reasoning::conversation::Conversation;
use crate::reasoning::inference::{ToolDefinition, Usage};
use crate::types::AgentId;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Observation {
pub source: String,
pub content: String,
pub is_error: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub call_id: Option<String>,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
impl Observation {
pub fn tool_result(tool_name: impl Into<String>, content: impl Into<String>) -> Self {
Self {
source: tool_name.into(),
content: content.into(),
is_error: false,
call_id: None,
metadata: HashMap::new(),
}
}
pub fn tool_error(tool_name: impl Into<String>, error: impl Into<String>) -> Self {
Self {
source: tool_name.into(),
content: error.into(),
is_error: true,
call_id: None,
metadata: HashMap::new(),
}
}
pub fn policy_denial(reason: impl Into<String>) -> Self {
Self {
source: "policy_gate".into(),
content: reason.into(),
is_error: true,
call_id: None,
metadata: HashMap::new(),
}
}
pub fn with_call_id(mut self, call_id: impl Into<String>) -> Self {
self.call_id = Some(call_id.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ProposedAction {
ToolCall {
call_id: String,
name: String,
arguments: String,
},
Delegate {
target: String,
message: String,
},
Respond {
content: String,
},
Terminate {
reason: String,
output: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LoopDecision {
Allow,
Deny { reason: String },
Modify {
modified_action: Box<ProposedAction>,
reason: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoopState {
pub agent_id: AgentId,
pub iteration: u32,
pub total_usage: Usage,
pub conversation: Conversation,
pub pending_observations: Vec<Observation>,
pub started_at: chrono::DateTime<chrono::Utc>,
pub current_phase: String,
#[serde(default)]
pub metadata: HashMap<String, serde_json::Value>,
}
impl LoopState {
pub fn new(agent_id: AgentId, conversation: Conversation) -> Self {
Self {
agent_id,
iteration: 0,
total_usage: Usage::default(),
conversation,
pending_observations: Vec::new(),
started_at: chrono::Utc::now(),
current_phase: "initialized".into(),
metadata: HashMap::new(),
}
}
pub fn add_usage(&mut self, usage: &Usage) {
self.total_usage.prompt_tokens += usage.prompt_tokens;
self.total_usage.completion_tokens += usage.completion_tokens;
self.total_usage.total_tokens += usage.total_tokens;
}
pub fn elapsed(&self) -> chrono::Duration {
chrono::Utc::now() - self.started_at
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoopConfig {
pub max_iterations: u32,
pub max_total_tokens: u32,
pub timeout: Duration,
pub default_recovery: RecoveryStrategy,
pub tool_timeout: Duration,
pub max_concurrent_tools: usize,
pub context_token_budget: usize,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tool_definitions: Vec<ToolDefinition>,
#[cfg(feature = "orga-adaptive")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_profile: Option<crate::reasoning::tool_profile::ToolProfile>,
#[cfg(feature = "orga-adaptive")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub step_iteration: Option<crate::reasoning::progress_tracker::StepIterationConfig>,
#[cfg(feature = "orga-adaptive")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pre_hydration: Option<crate::reasoning::pre_hydrate::PreHydrationConfig>,
}
impl Default for LoopConfig {
fn default() -> Self {
Self {
max_iterations: 25,
max_total_tokens: 100_000,
timeout: Duration::from_secs(300),
default_recovery: RecoveryStrategy::Retry {
max_attempts: 2,
base_delay: Duration::from_millis(500),
},
tool_timeout: Duration::from_secs(30),
max_concurrent_tools: 5,
context_token_budget: 32_000,
tool_definitions: Vec::new(),
#[cfg(feature = "orga-adaptive")]
tool_profile: None,
#[cfg(feature = "orga-adaptive")]
step_iteration: None,
#[cfg(feature = "orga-adaptive")]
pre_hydration: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RecoveryStrategy {
Retry {
max_attempts: u32,
base_delay: Duration,
},
Fallback { alternatives: Vec<String> },
CachedResult { max_staleness: Duration },
LlmRecovery { max_recovery_attempts: u32 },
Escalate {
queue: String,
context_snapshot: bool,
},
DeadLetter,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoopResult {
pub output: String,
pub iterations: u32,
pub total_usage: Usage,
pub termination_reason: TerminationReason,
pub duration: Duration,
pub conversation: Conversation,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TerminationReason {
Completed,
MaxIterations,
MaxTokens,
Timeout,
PolicyDenial { reason: String },
Error { message: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LoopEvent {
Started {
agent_id: AgentId,
config: Box<LoopConfig>,
},
ReasoningComplete {
iteration: u32,
actions: Vec<ProposedAction>,
usage: Usage,
},
PolicyEvaluated {
iteration: u32,
action_count: usize,
denied_count: usize,
},
ToolsDispatched {
iteration: u32,
tool_count: usize,
duration: Duration,
},
ObservationsCollected {
iteration: u32,
observation_count: usize,
},
Terminated {
reason: TerminationReason,
iterations: u32,
total_usage: Usage,
duration: Duration,
},
RecoveryTriggered {
iteration: u32,
tool_name: String,
strategy: RecoveryStrategy,
error: String,
},
#[cfg(feature = "orga-adaptive")]
StepLimitReached {
step_id: String,
attempts: u32,
reason: String,
},
#[cfg(feature = "orga-adaptive")]
PreHydrationComplete {
references_found: usize,
references_resolved: usize,
references_failed: usize,
total_tokens: usize,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JournalEntry {
pub sequence: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub agent_id: AgentId,
pub iteration: u32,
pub event: LoopEvent,
}
#[async_trait::async_trait]
pub trait JournalWriter: Send + Sync {
async fn append(&self, entry: JournalEntry) -> Result<(), JournalError>;
async fn next_sequence(&self) -> u64;
}
pub struct BufferedJournal {
sequence: std::sync::atomic::AtomicU64,
capacity: usize,
buffer: tokio::sync::Mutex<VecDeque<JournalEntry>>,
}
impl Default for BufferedJournal {
fn default() -> Self {
Self::new(1000)
}
}
impl BufferedJournal {
pub fn new(capacity: usize) -> Self {
Self {
sequence: std::sync::atomic::AtomicU64::new(0),
capacity,
buffer: tokio::sync::Mutex::new(VecDeque::with_capacity(capacity)),
}
}
pub async fn entries(&self) -> Vec<JournalEntry> {
self.buffer.lock().await.iter().cloned().collect()
}
pub async fn drain(&self) -> Vec<JournalEntry> {
self.buffer.lock().await.drain(..).collect()
}
}
#[async_trait::async_trait]
impl JournalWriter for BufferedJournal {
async fn append(&self, entry: JournalEntry) -> Result<(), JournalError> {
self.sequence
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut buf = self.buffer.lock().await;
if buf.len() >= self.capacity {
buf.pop_front();
}
buf.push_back(entry);
Ok(())
}
async fn next_sequence(&self) -> u64 {
self.sequence.load(std::sync::atomic::Ordering::Relaxed)
}
}
#[derive(Debug, thiserror::Error)]
pub enum JournalError {
#[error("Journal write failed: {0}")]
WriteFailed(String),
#[error("Journal read failed: {0}")]
ReadFailed(String),
#[error("Journal sequence error: expected {expected}, got {actual}")]
SequenceError { expected: u64, actual: u64 },
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_observation_constructors() {
let tool_result = Observation::tool_result("search", "found 5 results");
assert_eq!(tool_result.source, "search");
assert!(!tool_result.is_error);
let tool_error = Observation::tool_error("search", "timeout");
assert!(tool_error.is_error);
let denial = Observation::policy_denial("not authorized");
assert_eq!(denial.source, "policy_gate");
assert!(denial.is_error);
}
#[test]
fn test_loop_state_new() {
let state = LoopState::new(AgentId::new(), Conversation::with_system("test"));
assert_eq!(state.iteration, 0);
assert_eq!(state.total_usage.total_tokens, 0);
assert!(state.pending_observations.is_empty());
}
#[test]
fn test_loop_state_add_usage() {
let mut state = LoopState::new(AgentId::new(), Conversation::new());
state.add_usage(&Usage {
prompt_tokens: 100,
completion_tokens: 50,
total_tokens: 150,
});
state.add_usage(&Usage {
prompt_tokens: 200,
completion_tokens: 80,
total_tokens: 280,
});
assert_eq!(state.total_usage.prompt_tokens, 300);
assert_eq!(state.total_usage.completion_tokens, 130);
assert_eq!(state.total_usage.total_tokens, 430);
}
#[test]
fn test_loop_config_default() {
let config = LoopConfig::default();
assert_eq!(config.max_iterations, 25);
assert_eq!(config.max_total_tokens, 100_000);
assert_eq!(config.max_concurrent_tools, 5);
}
#[test]
fn test_proposed_action_variants() {
let tc = ProposedAction::ToolCall {
call_id: "c1".into(),
name: "search".into(),
arguments: "{}".into(),
};
assert!(matches!(tc, ProposedAction::ToolCall { .. }));
let respond = ProposedAction::Respond {
content: "done".into(),
};
assert!(matches!(respond, ProposedAction::Respond { .. }));
let terminate = ProposedAction::Terminate {
reason: "finished".into(),
output: "result".into(),
};
assert!(matches!(terminate, ProposedAction::Terminate { .. }));
}
#[test]
fn test_recovery_strategy_serde() {
let retry = RecoveryStrategy::Retry {
max_attempts: 3,
base_delay: Duration::from_millis(100),
};
let json = serde_json::to_string(&retry).unwrap();
let _restored: RecoveryStrategy = serde_json::from_str(&json).unwrap();
let llm = RecoveryStrategy::LlmRecovery {
max_recovery_attempts: 1,
};
let json = serde_json::to_string(&llm).unwrap();
assert!(json.contains("LlmRecovery"));
}
fn make_journal_entry(seq: u64, iteration: u32) -> JournalEntry {
JournalEntry {
sequence: seq,
timestamp: chrono::Utc::now(),
agent_id: AgentId::new(),
iteration,
event: LoopEvent::Started {
agent_id: AgentId::new(),
config: Box::new(LoopConfig::default()),
},
}
}
#[tokio::test]
async fn test_buffered_journal_retains_entries() {
let journal = BufferedJournal::new(100);
assert_eq!(journal.next_sequence().await, 0);
journal.append(make_journal_entry(0, 0)).await.unwrap();
journal.append(make_journal_entry(1, 1)).await.unwrap();
assert_eq!(journal.next_sequence().await, 2);
let entries = journal.entries().await;
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[1].sequence, 1);
}
#[tokio::test]
async fn test_buffered_journal_overflow_evicts_oldest() {
let journal = BufferedJournal::new(3);
for i in 0..5u64 {
journal
.append(make_journal_entry(i, i as u32))
.await
.unwrap();
}
let entries = journal.entries().await;
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 2);
assert_eq!(entries[1].sequence, 3);
assert_eq!(entries[2].sequence, 4);
}
#[tokio::test]
async fn test_buffered_journal_drain_empties_buffer() {
let journal = BufferedJournal::new(100);
journal.append(make_journal_entry(0, 0)).await.unwrap();
journal.append(make_journal_entry(1, 1)).await.unwrap();
let drained = journal.drain().await;
assert_eq!(drained.len(), 2);
let entries = journal.entries().await;
assert!(entries.is_empty());
assert_eq!(journal.next_sequence().await, 2);
}
#[tokio::test]
async fn test_buffered_journal_entries_returns_all() {
let journal = BufferedJournal::new(100);
for i in 0..10u64 {
journal
.append(make_journal_entry(i, i as u32))
.await
.unwrap();
}
let entries = journal.entries().await;
assert_eq!(entries.len(), 10);
for (idx, entry) in entries.iter().enumerate() {
assert_eq!(entry.sequence, idx as u64);
}
}
#[test]
fn test_loop_event_serde() {
let event = LoopEvent::Terminated {
reason: TerminationReason::Completed,
iterations: 5,
total_usage: Usage {
prompt_tokens: 1000,
completion_tokens: 500,
total_tokens: 1500,
},
duration: Duration::from_secs(10),
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("Terminated"));
}
}