use super::config::*;
use super::helpers::apply_input_filters;
use super::run::run_loop;
use crate::types::*;
use tokio::sync::mpsc;
pub async fn agent_loop(
prompts: Vec<AgentMessage>, context: &mut AgentContext, config: &AgentLoopConfig, tx: mpsc::UnboundedSender<AgentEvent>, cancel: tokio_util::sync::CancellationToken, ) -> Vec<AgentMessage> {
let (agent_id, session_id, loop_id) = ensure_loop_ids(context);
if let Some(ref before_loop) = config.before_loop {
if !before_loop(&context.messages, 0).await {
tx.send(AgentEvent::AgentEnd {
loop_id: loop_id.clone(),
messages: vec![],
usage: Usage::default(),
timestamp: chrono::Utc::now(),
rejection: None,
})
.ok();
return vec![];
}
}
tx.send(AgentEvent::AgentStart {
agent_id: agent_id.clone(),
session_id: session_id.clone(),
loop_id: loop_id.clone(),
parent_loop_id: context.parent_loop_id.clone(), continuation_kind: context
.continuation_kind
.clone()
.unwrap_or(ContinuationKind::Initial),
timestamp: chrono::Utc::now(),
metadata: None,
config_snapshot: Some(build_config_snapshot(config, context)),
})
.ok();
let prompts = match apply_input_filters(prompts, &config.input_filters, &tx, &loop_id).await {
Ok(filtered) => filtered,
Err(reason) => {
tx.send(AgentEvent::AgentEnd {
loop_id: loop_id.clone(),
messages: vec![],
usage: Usage::default(),
timestamp: chrono::Utc::now(),
rejection: Some(reason),
})
.ok();
return vec![];
}
};
let mut new_messages: Vec<AgentMessage> = prompts.clone();
for prompt in &prompts {
context.messages.push(prompt.clone());
}
for prompt in &prompts {
context.user_context.push(prompt.clone());
}
let loop_usage = run_loop(
context,
&mut new_messages,
config,
&tx,
&cancel,
Some(&prompts),
)
.await;
tx.send(AgentEvent::AgentEnd {
loop_id,
messages: new_messages.clone(),
usage: loop_usage.clone(),
timestamp: chrono::Utc::now(),
rejection: None,
})
.ok();
if let Some(ref after_loop) = config.after_loop {
after_loop(&new_messages, &loop_usage).await;
}
new_messages
}
pub async fn agent_loop_continue(
context: &mut AgentContext, config: &AgentLoopConfig, tx: mpsc::UnboundedSender<AgentEvent>, cancel: tokio_util::sync::CancellationToken, ) -> Vec<AgentMessage> {
assert!(
context.agent_id.is_some(),
"agent_loop_continue requires context.agent_id to be set — \
identity must carry over from the originating loop"
);
assert!(
context.session_id.is_some(),
"agent_loop_continue requires context.session_id to be set — \
the session must be established before a continuation"
);
assert!(
!context.messages.is_empty(),
"Cannot continue: no messages in context"
);
if let Some(last) = context.messages.last() {
assert!(
last.role() != "assistant",
"Cannot continue from assistant message"
);
}
let mut new_messages: Vec<AgentMessage> = Vec::new();
if context.user_context.is_empty() && context.inrun_context.is_empty() {
for msg in &context.messages {
match msg.as_llm() {
Some(Message::User { .. }) => context.user_context.push(msg.clone()),
Some(Message::Assistant { .. }) | Some(Message::ToolResult { .. }) => {
context
.inrun_context
.push(crate::types::InRunEntry::Live(msg.clone()));
}
_ => {} }
}
}
let agent_id = context
.agent_id
.as_ref()
.expect("asserted Some above")
.clone();
let session_id = context
.session_id
.as_ref()
.expect("asserted Some above")
.clone();
let loop_id = context
.loop_id
.get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
.clone();
if let Some(ref before_loop) = config.before_loop {
if !before_loop(&context.messages, 0).await {
tx.send(AgentEvent::AgentEnd {
loop_id: loop_id.clone(),
messages: vec![],
usage: Usage::default(),
timestamp: chrono::Utc::now(),
rejection: None,
})
.ok();
return vec![];
}
}
tx.send(AgentEvent::AgentStart {
agent_id,
session_id,
loop_id: loop_id.clone(),
parent_loop_id: context.parent_loop_id.clone(), continuation_kind: context
.continuation_kind
.clone()
.unwrap_or(ContinuationKind::Initial),
timestamp: chrono::Utc::now(),
metadata: None,
config_snapshot: Some(build_config_snapshot(config, context)),
})
.ok();
let loop_usage = run_loop(context, &mut new_messages, config, &tx, &cancel, None).await;
tx.send(AgentEvent::AgentEnd {
loop_id,
messages: new_messages.clone(),
usage: loop_usage.clone(),
timestamp: chrono::Utc::now(),
rejection: None,
})
.ok();
if let Some(ref after_loop) = config.after_loop {
after_loop(&new_messages, &loop_usage).await;
}
new_messages
}
fn ensure_loop_ids(ctx: &mut AgentContext) -> (String, String, String) {
let agent_id = ctx
.agent_id
.get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
.clone();
let session_id = ctx
.session_id
.get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
.clone();
let loop_id = ctx
.loop_id
.get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
.clone();
(agent_id, session_id, loop_id)
}
fn build_config_snapshot(
config: &AgentLoopConfig,
context: &AgentContext,
) -> crate::session::LoopConfigSnapshot {
let config_id = context
.loop_id
.as_deref()
.and_then(|lid| {
let session_id = context.session_id.as_deref().unwrap_or("");
lid.strip_prefix(session_id)
.and_then(|rest| rest.strip_prefix('.'))
.and_then(|rest| rest.rsplit_once('.'))
.map(|(seg, _n)| seg.to_string())
})
.or_else(|| config.config_id.clone());
crate::session::LoopConfigSnapshot {
model: config.model_config.id.clone(),
provider: config.model_config.provider.clone(),
config_id,
name: Some(config.model_config.name.clone()),
api: Some(config.model_config.api),
base_url: Some(config.model_config.base_url.clone()),
reasoning: Some(config.model_config.reasoning),
context_window: Some(config.model_config.context_window),
max_tokens: Some(config.model_config.max_tokens),
thinking_level: Some(config.thinking_level),
temperature: config.temperature,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ensure_loop_ids_populates_missing_fields() {
let mut ctx = AgentContext::default();
assert!(ctx.agent_id.is_none());
assert!(ctx.session_id.is_none());
assert!(ctx.loop_id.is_none());
let (a, s, l) = ensure_loop_ids(&mut ctx);
assert!(!a.is_empty() && !s.is_empty() && !l.is_empty());
assert_eq!(ctx.agent_id.as_deref(), Some(a.as_str()));
assert_eq!(ctx.session_id.as_deref(), Some(s.as_str()));
assert_eq!(ctx.loop_id.as_deref(), Some(l.as_str()));
}
#[test]
fn ensure_loop_ids_idempotent() {
let mut ctx = AgentContext::default();
let (a1, s1, l1) = ensure_loop_ids(&mut ctx);
let (a2, s2, l2) = ensure_loop_ids(&mut ctx);
assert_eq!(a1, a2);
assert_eq!(s1, s2);
assert_eq!(l1, l2);
}
#[test]
fn ensure_loop_ids_preserves_existing() {
let mut ctx = AgentContext {
agent_id: Some("agent-x".into()),
session_id: Some("session-y".into()),
..AgentContext::default()
};
let (a, s, l) = ensure_loop_ids(&mut ctx);
assert_eq!(a, "agent-x");
assert_eq!(s, "session-y");
assert!(!l.is_empty());
assert_eq!(ctx.loop_id.as_deref(), Some(l.as_str()));
}
}