1use super::config::*;
4use super::helpers::apply_input_filters;
5use super::run::run_loop;
6use crate::types::*;
7use tokio::sync::mpsc;
8
9pub async fn agent_loop(
28 prompts: Vec<AgentMessage>, context: &mut AgentContext, config: &AgentLoopConfig, tx: mpsc::UnboundedSender<AgentEvent>, cancel: tokio_util::sync::CancellationToken, ) -> Vec<AgentMessage> {
34 let (agent_id, session_id, loop_id) = ensure_loop_ids(context);
39
40 if let Some(ref before_loop) = config.before_loop {
42 if !before_loop(&context.messages, 0).await {
43 tx.send(AgentEvent::AgentEnd {
44 loop_id: loop_id.clone(),
45 messages: vec![],
46 usage: Usage::default(),
47 timestamp: chrono::Utc::now(),
48 rejection: None,
49 })
50 .ok();
51 return vec![];
52 }
53 }
54
55 tx.send(AgentEvent::AgentStart {
56 agent_id: agent_id.clone(),
57 session_id: session_id.clone(),
58 loop_id: loop_id.clone(),
59 parent_loop_id: context.parent_loop_id.clone(), continuation_kind: context
61 .continuation_kind
62 .clone()
63 .unwrap_or(ContinuationKind::Initial),
64 timestamp: chrono::Utc::now(),
65 metadata: None,
66 config_snapshot: Some(build_config_snapshot(config, context)),
67 })
68 .ok();
69
70 let prompts = match apply_input_filters(prompts, &config.input_filters, &tx, &loop_id).await {
75 Ok(filtered) => filtered,
76 Err(reason) => {
77 tx.send(AgentEvent::AgentEnd {
80 loop_id: loop_id.clone(),
81 messages: vec![],
82 usage: Usage::default(),
83 timestamp: chrono::Utc::now(),
84 rejection: Some(reason),
85 })
86 .ok();
87 return vec![];
88 }
89 };
90
91 let mut new_messages: Vec<AgentMessage> = prompts.clone();
92
93 for prompt in &prompts {
95 context.messages.push(prompt.clone());
96 }
97
98 for prompt in &prompts {
100 context.user_context.push(prompt.clone());
101 }
102
103 let loop_usage = run_loop(
105 context,
106 &mut new_messages,
107 config,
108 &tx,
109 &cancel,
110 Some(&prompts),
111 )
112 .await;
113
114 tx.send(AgentEvent::AgentEnd {
115 loop_id,
116 messages: new_messages.clone(),
117 usage: loop_usage.clone(),
118 timestamp: chrono::Utc::now(),
119 rejection: None,
120 })
121 .ok();
122 if let Some(ref after_loop) = config.after_loop {
124 after_loop(&new_messages, &loop_usage).await;
125 }
126 new_messages
127}
128
129pub async fn agent_loop_continue(
145 context: &mut AgentContext, config: &AgentLoopConfig, tx: mpsc::UnboundedSender<AgentEvent>, cancel: tokio_util::sync::CancellationToken, ) -> Vec<AgentMessage> {
150 assert!(
154 context.agent_id.is_some(),
155 "agent_loop_continue requires context.agent_id to be set — \
156 identity must carry over from the originating loop"
157 );
158 assert!(
159 context.session_id.is_some(),
160 "agent_loop_continue requires context.session_id to be set — \
161 the session must be established before a continuation"
162 );
163
164 assert!(
165 !context.messages.is_empty(),
166 "Cannot continue: no messages in context"
167 );
168
169 if let Some(last) = context.messages.last() {
171 assert!(
172 last.role() != "assistant",
173 "Cannot continue from assistant message"
174 );
175 }
176
177 let mut new_messages: Vec<AgentMessage> = Vec::new();
178
179 if context.user_context.is_empty() && context.inrun_context.is_empty() {
181 for msg in &context.messages {
182 match msg.as_llm() {
183 Some(Message::User { .. }) => context.user_context.push(msg.clone()),
184 Some(Message::Assistant { .. }) | Some(Message::ToolResult { .. }) => {
185 context
186 .inrun_context
187 .push(crate::types::InRunEntry::Live(msg.clone()));
188 }
189 _ => {} }
191 }
192 }
193
194 let agent_id = context
198 .agent_id
199 .as_ref()
200 .expect("asserted Some above")
201 .clone();
202 let session_id = context
203 .session_id
204 .as_ref()
205 .expect("asserted Some above")
206 .clone();
207 let loop_id = context
208 .loop_id
209 .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
210 .clone();
211
212 if let Some(ref before_loop) = config.before_loop {
214 if !before_loop(&context.messages, 0).await {
215 tx.send(AgentEvent::AgentEnd {
216 loop_id: loop_id.clone(),
217 messages: vec![],
218 usage: Usage::default(),
219 timestamp: chrono::Utc::now(),
220 rejection: None,
221 })
222 .ok();
223 return vec![];
224 }
225 }
226
227 tx.send(AgentEvent::AgentStart {
228 agent_id,
229 session_id,
230 loop_id: loop_id.clone(),
231 parent_loop_id: context.parent_loop_id.clone(), continuation_kind: context
233 .continuation_kind
234 .clone()
235 .unwrap_or(ContinuationKind::Initial),
236 timestamp: chrono::Utc::now(),
237 metadata: None,
238 config_snapshot: Some(build_config_snapshot(config, context)),
239 })
240 .ok();
241
242 let loop_usage = run_loop(context, &mut new_messages, config, &tx, &cancel, None).await;
243
244 tx.send(AgentEvent::AgentEnd {
245 loop_id,
246 messages: new_messages.clone(),
247 usage: loop_usage.clone(),
248 timestamp: chrono::Utc::now(),
249 rejection: None,
250 })
251 .ok();
252 if let Some(ref after_loop) = config.after_loop {
254 after_loop(&new_messages, &loop_usage).await;
255 }
256 new_messages
257}
258
259fn ensure_loop_ids(ctx: &mut AgentContext) -> (String, String, String) {
266 let agent_id = ctx
267 .agent_id
268 .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
269 .clone();
270 let session_id = ctx
271 .session_id
272 .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
273 .clone();
274 let loop_id = ctx
275 .loop_id
276 .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
277 .clone();
278 (agent_id, session_id, loop_id)
279}
280
281fn build_config_snapshot(
283 config: &AgentLoopConfig,
284 context: &AgentContext,
285) -> crate::session::LoopConfigSnapshot {
286 let config_id = context
289 .loop_id
290 .as_deref()
291 .and_then(|lid| {
292 let session_id = context.session_id.as_deref().unwrap_or("");
293 lid.strip_prefix(session_id)
294 .and_then(|rest| rest.strip_prefix('.'))
295 .and_then(|rest| rest.rsplit_once('.'))
296 .map(|(seg, _n)| seg.to_string())
297 })
298 .or_else(|| config.config_id.clone());
299
300 crate::session::LoopConfigSnapshot {
301 model: config.model_config.id.clone(),
302 provider: config.model_config.provider.clone(),
303 config_id,
304 name: Some(config.model_config.name.clone()),
305 api: Some(config.model_config.api),
306 base_url: Some(config.model_config.base_url.clone()),
307 reasoning: Some(config.model_config.reasoning),
308 context_window: Some(config.model_config.context_window),
309 max_tokens: Some(config.model_config.max_tokens),
310 thinking_level: Some(config.thinking_level),
311 temperature: config.temperature,
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[test]
320 fn ensure_loop_ids_populates_missing_fields() {
321 let mut ctx = AgentContext::default();
322 assert!(ctx.agent_id.is_none());
323 assert!(ctx.session_id.is_none());
324 assert!(ctx.loop_id.is_none());
325
326 let (a, s, l) = ensure_loop_ids(&mut ctx);
327 assert!(!a.is_empty() && !s.is_empty() && !l.is_empty());
328 assert_eq!(ctx.agent_id.as_deref(), Some(a.as_str()));
329 assert_eq!(ctx.session_id.as_deref(), Some(s.as_str()));
330 assert_eq!(ctx.loop_id.as_deref(), Some(l.as_str()));
331 }
332
333 #[test]
334 fn ensure_loop_ids_idempotent() {
335 let mut ctx = AgentContext::default();
336 let (a1, s1, l1) = ensure_loop_ids(&mut ctx);
337 let (a2, s2, l2) = ensure_loop_ids(&mut ctx);
338 assert_eq!(a1, a2);
339 assert_eq!(s1, s2);
340 assert_eq!(l1, l2);
341 }
342
343 #[test]
344 fn ensure_loop_ids_preserves_existing() {
345 let mut ctx = AgentContext {
346 agent_id: Some("agent-x".into()),
347 session_id: Some("session-y".into()),
348 ..AgentContext::default()
350 };
351
352 let (a, s, l) = ensure_loop_ids(&mut ctx);
353 assert_eq!(a, "agent-x");
354 assert_eq!(s, "session-y");
355 assert!(!l.is_empty());
356 assert_eq!(ctx.loop_id.as_deref(), Some(l.as_str()));
357 }
358}