crabtalk_core/agent/mod.rs
1//! Immutable agent definition and execution methods.
2//!
3//! [`Agent`] owns its configuration, model, tool schemas, and an optional
4//! [`ToolDispatcher`] handle for executing tool calls. Conversation
5//! history is passed in externally — the agent itself is stateless.
6//! It drives LLM execution through [`Agent::step`], [`Agent::run`], and
7//! [`Agent::run_stream`]. `run_stream()` is the canonical step loop —
8//! `run()` collects its events and returns the final response.
9
10use crate::model::{HistoryEntry, MessageBuilder, Model};
11use anyhow::Result;
12use async_stream::stream;
13pub use builder::AgentBuilder;
14pub use config::AgentConfig;
15use crabllm_core::{ChatCompletionRequest, Provider, Role, Tool, ToolCall, ToolChoice, Usage};
16use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
17use futures_core::Stream;
18use futures_util::{StreamExt, future::join_all, stream::FuturesUnordered};
19pub use id::AgentId;
20use std::sync::Arc;
21use tokio::sync::{mpsc, watch};
22pub use tool::{AsTool, ToolDispatcher};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28mod id;
29pub mod tool;
30
31/// A neutral placeholder assistant message returned by `step()` when the
32/// provider yields zero choices. Used only as a step record so callers see
33/// an empty AgentStep instead of a panic; nothing is appended to history.
34fn empty_assistant_message() -> crabllm_core::Message {
35 crabllm_core::Message {
36 role: Role::Assistant,
37 content: Some(serde_json::Value::String(String::new())),
38 tool_calls: None,
39 tool_call_id: None,
40 name: None,
41 reasoning_content: None,
42 extra: Default::default(),
43 }
44}
45
46/// Extract sender from the last user entry in history.
47fn last_sender(history: &[HistoryEntry]) -> String {
48 history
49 .iter()
50 .rev()
51 .find(|e| *e.role() == Role::User)
52 .map(|e| e.sender.clone())
53 .unwrap_or_default()
54}
55
56/// Borrow the inner string from a tool-dispatch result regardless of
57/// success/error. The LLM wire format (crabllm-core `Message`) has no
58/// `is_error` flag, so the agent collapses both arms to a plain string
59/// when appending to history. UI clients still get the distinction via
60/// `AgentEvent::ToolResult.output`.
61fn tool_output_text(result: &Result<String, String>) -> &str {
62 match result {
63 Ok(s) | Err(s) => s,
64 }
65}
66
67/// An immutable agent definition.
68///
69/// Generic over `P: crabllm_core::Provider` — holds a `Model<P>` wrapper
70/// alongside config, tool schemas, and an optional sender for tool
71/// dispatch. Conversation history is owned externally and passed into
72/// execution methods. Callers drive execution via `step()` (single LLM
73/// round), `run()` (loop to completion), or `run_stream()` (yields events
74/// as a stream).
75pub struct Agent<P: Provider + 'static> {
76 /// Agent configuration (name, prompt, model, limits, tool_choice).
77 pub config: AgentConfig,
78 /// The model wrapper for LLM calls.
79 model: Model<P>,
80 /// Tool schemas advertised to the LLM. Set once at build time.
81 tools: Vec<Tool>,
82 /// Dispatcher for tool calls. None = no tools.
83 dispatcher: Option<Arc<dyn ToolDispatcher>>,
84}
85
86impl<P: Provider + 'static> Clone for Agent<P> {
87 fn clone(&self) -> Self {
88 Self {
89 config: self.config.clone(),
90 model: self.model.clone(),
91 tools: self.tools.clone(),
92 dispatcher: self.dispatcher.clone(),
93 }
94 }
95}
96
97impl<P: Provider + 'static> Agent<P> {
98 /// Resolve the model name from agent config.
99 fn model_name(&self) -> String {
100 self.config.model.clone()
101 }
102
103 /// Build a `ChatCompletionRequest` from config state (system prompt +
104 /// history + tool schemas).
105 ///
106 /// If `tool_choice_override` is provided, it takes precedence over the
107 /// agent config's `tool_choice`. Projects each `HistoryEntry` through
108 /// `to_wire_message()` so guest assistant messages get wrapped in
109 /// `<from agent="...">` tags.
110 fn build_request(
111 &self,
112 history: &[HistoryEntry],
113 tool_choice_override: Option<&ToolChoice>,
114 ) -> ChatCompletionRequest {
115 let model_name = self.model_name();
116
117 let mut messages = Vec::with_capacity(1 + history.len());
118 if !self.config.system_prompt.is_empty() {
119 messages.push(crabllm_core::Message::system(&self.config.system_prompt));
120 }
121 messages.extend(history.iter().map(|e| e.to_wire_message()));
122
123 let tool_choice = tool_choice_override
124 .cloned()
125 .unwrap_or_else(|| self.config.tool_choice.clone());
126
127 ChatCompletionRequest {
128 model: model_name,
129 messages,
130 temperature: None,
131 top_p: None,
132 max_tokens: None,
133 stream: None,
134 stop: None,
135 tools: if self.tools.is_empty() {
136 None
137 } else {
138 Some(self.tools.clone())
139 },
140 tool_choice: Some(tool_choice),
141 frequency_penalty: None,
142 presence_penalty: None,
143 seed: None,
144 user: None,
145 reasoning_effort: self.config.thinking.then(|| "high".to_string()),
146 extra: Default::default(),
147 }
148 }
149
150 /// Perform a single LLM round: send request, dispatch tools, return step.
151 ///
152 /// Composes a [`ChatCompletionRequest`] from config state (system prompt +
153 /// history + tool schemas), calls the stored model, dispatches any tool
154 /// calls via the [`ToolDispatcher`], and appends results to history.
155 pub async fn step(
156 &self,
157 history: &mut Vec<HistoryEntry>,
158 conversation_id: Option<u64>,
159 ) -> Result<AgentStep> {
160 let request = self.build_request(history, None);
161 let response = self.model.send_ct(request).await?;
162 let tool_calls: Vec<ToolCall> = response.tool_calls().to_vec();
163 let finish_reason = response.finish_reason().cloned();
164 let usage = response.usage.clone().unwrap_or_default();
165
166 // If the provider returned zero choices, there is no message to record
167 // — match the old `step()` behavior of not appending anything in that
168 // case, instead of bloating history with a synthetic empty assistant
169 // entry on flaky providers.
170 let Some(message) = response.message().cloned() else {
171 return Ok(AgentStep {
172 message: empty_assistant_message(),
173 usage,
174 finish_reason,
175 tool_calls,
176 tool_results: Vec::new(),
177 });
178 };
179
180 history.push(HistoryEntry::from_message(message.clone()));
181
182 let mut tool_results = Vec::new();
183 if !tool_calls.is_empty() {
184 let sender = last_sender(history);
185 let outputs = join_all(tool_calls.iter().map(|tc| {
186 self.dispatch_tool(
187 &tc.function.name,
188 &tc.function.arguments,
189 &sender,
190 conversation_id,
191 )
192 }))
193 .await;
194 for (tc, result) in tool_calls.iter().zip(outputs) {
195 let entry =
196 HistoryEntry::tool(tool_output_text(&result), tc.id.clone(), &tc.function.name);
197 history.push(entry.clone());
198 tool_results.push(entry);
199 }
200 }
201
202 Ok(AgentStep {
203 message,
204 usage,
205 finish_reason,
206 tool_calls,
207 tool_results,
208 })
209 }
210
211 /// Dispatch a single tool call via the configured [`ToolDispatcher`].
212 ///
213 /// Returns `Ok(output)` for normal tool output or `Err(message)` for a
214 /// failure. If no dispatcher is configured, returns an `Err` describing
215 /// the misconfiguration; otherwise the dispatcher's verdict is forwarded
216 /// unchanged.
217 async fn dispatch_tool(
218 &self,
219 name: &str,
220 args: &str,
221 sender: &str,
222 conversation_id: Option<u64>,
223 ) -> Result<String, String> {
224 let Some(dispatcher) = &self.dispatcher else {
225 return Err(format!(
226 "tool '{name}' called but no tool dispatcher configured"
227 ));
228 };
229 dispatcher
230 .dispatch(name, args, &self.config.name, sender, conversation_id)
231 .await
232 }
233
234 /// Determine the stop reason for a step with no tool calls.
235 fn stop_reason(step: &AgentStep) -> AgentStopReason {
236 let has_text = step
237 .message
238 .content
239 .as_ref()
240 .and_then(|v| v.as_str())
241 .is_some_and(|s| !s.is_empty());
242 if has_text {
243 AgentStopReason::TextResponse
244 } else {
245 AgentStopReason::NoAction
246 }
247 }
248
249 /// Run the agent loop to completion, returning the final response.
250 ///
251 /// Wraps [`Agent::run_stream`] — collects all events, sends each through
252 /// `events`, and extracts the `Done` response.
253 pub async fn run(
254 &self,
255 history: &mut Vec<HistoryEntry>,
256 events: mpsc::UnboundedSender<AgentEvent>,
257 conversation_id: Option<u64>,
258 tool_choice: Option<ToolChoice>,
259 ) -> AgentResponse {
260 let mut stream =
261 std::pin::pin!(self.run_stream(history, conversation_id, None, tool_choice));
262 let mut response = None;
263 while let Some(event) = stream.next().await {
264 if let AgentEvent::Done(ref resp) = event {
265 response = Some(resp.clone());
266 }
267 let _ = events.send(event);
268 }
269
270 response.unwrap_or_else(|| AgentResponse {
271 final_response: None,
272 iterations: 0,
273 stop_reason: AgentStopReason::Error("stream ended without Done".into()),
274 steps: vec![],
275 model: self.model_name(),
276 })
277 }
278
279 /// Run the agent loop as a stream of [`AgentEvent`]s.
280 ///
281 /// Uses the model's streaming API so text deltas are yielded token-by-token.
282 /// Tool call responses are dispatched after the stream completes (arguments
283 /// arrive incrementally and must be fully accumulated first).
284 pub fn run_stream<'a>(
285 &'a self,
286 history: &'a mut Vec<HistoryEntry>,
287 conversation_id: Option<u64>,
288 mut steer_rx: Option<watch::Receiver<Option<String>>>,
289 tool_choice: Option<ToolChoice>,
290 ) -> impl Stream<Item = AgentEvent> + 'a {
291 stream! {
292 let mut steps = Vec::new();
293 let max = self.config.max_iterations;
294 let model_name = self.model_name();
295
296 for _ in 0..max {
297 // Check for pending steering message before the next model call.
298 // Scope the borrow so the !Send guard is dropped before yield.
299 let steer_content = steer_rx.as_mut().and_then(|rx| {
300 rx.has_changed().ok()?.then(|| rx.borrow_and_update().clone())?
301 });
302 if let Some(content) = steer_content {
303 let sender = last_sender(history);
304 history.push(HistoryEntry::user_with_sender(&content, &sender));
305 yield AgentEvent::UserSteered { content };
306 }
307
308 let request = self.build_request(history, tool_choice.as_ref());
309
310 // Stream from the model, yielding text deltas as they arrive.
311 let mut builder = MessageBuilder::new(Role::Assistant);
312 let mut finish_reason = None;
313 let mut last_usage: Option<Usage> = None;
314 let mut stream_error = None;
315 let mut tool_begin_emitted = false;
316
317 // Tracks the currently open text/thinking segment so we can
318 // bracket deltas with explicit Start/End events. Only one
319 // segment is open at a time — type transitions emit the
320 // closing event for the previous segment first.
321 #[derive(PartialEq)]
322 enum OpenSegment { None, Text, Thinking }
323 let mut open = OpenSegment::None;
324
325 {
326 let mut chunk_stream = std::pin::pin!(self.model.stream_ct(request));
327 while let Some(result) = chunk_stream.next().await {
328 match result {
329 Ok(chunk) => {
330 // Process text portion. Match existing behavior:
331 // emit TextDelta even when the slice is empty.
332 if let Some(text) = chunk.content() {
333 if open != OpenSegment::Text {
334 if open == OpenSegment::Thinking {
335 yield AgentEvent::ThinkingEnd;
336 }
337 yield AgentEvent::TextStart;
338 open = OpenSegment::Text;
339 }
340 yield AgentEvent::TextDelta(text.to_owned());
341 }
342 // Process reasoning portion. Same atomic-flip logic.
343 if let Some(reason) = chunk.reasoning_content() {
344 if open != OpenSegment::Thinking {
345 if open == OpenSegment::Text {
346 yield AgentEvent::TextEnd;
347 }
348 yield AgentEvent::ThinkingStart;
349 open = OpenSegment::Thinking;
350 }
351 yield AgentEvent::ThinkingDelta(reason.to_owned());
352 }
353 if let Some(r) = chunk.finish_reason() {
354 finish_reason = Some(r.clone());
355 }
356 if chunk.usage.is_some() {
357 last_usage = chunk.usage.clone();
358 }
359 builder.accept(&chunk);
360 // Emit ToolCallsBegin as soon as tool names appear
361 // in the builder, so the CLI can show markers while
362 // args are still streaming. Uses current builder
363 // state, which may already have partial/full args.
364 if !tool_begin_emitted {
365 let calls = builder.peek_tool_calls();
366 if !calls.is_empty() {
367 tool_begin_emitted = true;
368 yield AgentEvent::ToolCallsBegin(calls);
369 }
370 }
371 }
372 Err(e) => {
373 stream_error = Some(e.to_string());
374 break;
375 }
376 }
377 }
378 // Close whatever segment is still open at end of stream.
379 match open {
380 OpenSegment::Text => yield AgentEvent::TextEnd,
381 OpenSegment::Thinking => yield AgentEvent::ThinkingEnd,
382 OpenSegment::None => {}
383 }
384 }
385 if let Some(e) = stream_error {
386 yield AgentEvent::Done(AgentResponse {
387 final_response: None,
388 iterations: steps.len(),
389 stop_reason: AgentStopReason::Error(e),
390 steps,
391 model: model_name.clone(),
392 });
393 return;
394 }
395
396 // Build the accumulated message. `MessageBuilder::build`
397 // already drops degenerate (id-less or name-less) tool call
398 // fragments, so any tool_calls present here are well-formed.
399 let message = builder.build();
400 let tool_calls: Vec<ToolCall> =
401 message.tool_calls.clone().unwrap_or_default();
402 let content = message
403 .content
404 .as_ref()
405 .and_then(|v| v.as_str())
406 .filter(|s| !s.is_empty())
407 .map(|s| s.to_owned());
408 let usage = last_usage.unwrap_or_default();
409 let has_tool_calls = !tool_calls.is_empty();
410
411 // If the stream produced neither text nor any usable tool
412 // call, treat the round as a no-op: do not push the empty
413 // assistant message into history (which would persist via
414 // `append_messages` and contaminate the next request),
415 // yield Done with NoAction, and return. This is the
416 // mid-stream-disconnect path — reqwest can end an SSE
417 // stream cleanly with `Ok(None)` on a TCP RST, so we
418 // can't rely on `stream_error` alone to catch it.
419 if content.is_none() && !has_tool_calls {
420 yield AgentEvent::Done(AgentResponse {
421 final_response: None,
422 iterations: steps.len(),
423 stop_reason: AgentStopReason::NoAction,
424 steps,
425 model: model_name.clone(),
426 });
427 return;
428 }
429
430 history.push(HistoryEntry::from_message(message.clone()));
431
432 // Dispatch tool calls concurrently.
433 //
434 // `FuturesUnordered` polls each dispatch future to completion
435 // independently so `ToolResult` events fire in completion
436 // order (fast tools don't wait on slow siblings in the UI).
437 // Outputs are buffered by the original call index so history
438 // entries append in call order — providers pair results to
439 // calls by position in some encodings, so this ordering is
440 // load-bearing.
441 let mut tool_results = Vec::new();
442 if has_tool_calls {
443 let sender = last_sender(history);
444 yield AgentEvent::ToolCallsStart(tool_calls.clone());
445
446 let mut pending: FuturesUnordered<_> = tool_calls
447 .iter()
448 .enumerate()
449 .map(|(idx, tc)| {
450 let fut = self.dispatch_tool(
451 &tc.function.name,
452 &tc.function.arguments,
453 &sender,
454 conversation_id,
455 );
456 // `start` is captured inside the async block so
457 // it measures actual polled runtime, not the time
458 // since `FuturesUnordered` was built.
459 async move {
460 let start = std::time::Instant::now();
461 let out = fut.await;
462 (idx, out, start.elapsed().as_millis() as u64)
463 }
464 })
465 .collect();
466
467 let mut buffered: Vec<Option<Result<String, String>>> =
468 vec![None; tool_calls.len()];
469 while let Some((idx, output, duration_ms)) = pending.next().await {
470 let call_id = tool_calls[idx].id.clone();
471 // Clone into the event; the owned Result lands in
472 // `buffered[idx]` so the drain-loop tail can append
473 // history entries in original call order.
474 yield AgentEvent::ToolResult {
475 call_id,
476 output: output.clone(),
477 duration_ms,
478 };
479 buffered[idx] = Some(output);
480 }
481
482 for (tc, out) in tool_calls.iter().zip(buffered.into_iter()) {
483 let out = out.expect("FuturesUnordered drained every slot");
484 let entry = HistoryEntry::tool(
485 tool_output_text(&out),
486 tc.id.clone(),
487 &tc.function.name,
488 );
489 history.push(entry.clone());
490 tool_results.push(entry);
491 }
492
493 yield AgentEvent::ToolCallsComplete;
494 }
495
496 // Auto-compaction: check token estimate after each step.
497 if let Some(threshold) = self.config.compact_threshold
498 && Self::estimate_tokens(history) > threshold
499 {
500 if let Some(summary) = self.compact(history).await {
501 yield AgentEvent::Compact { summary: summary.clone() };
502 *history = vec![HistoryEntry::user(&summary)];
503 yield AgentEvent::TextStart;
504 yield AgentEvent::TextDelta(
505 "\n[context compacted]\n".to_owned(),
506 );
507 yield AgentEvent::TextEnd;
508 }
509 continue;
510 }
511
512 let step = AgentStep {
513 message,
514 usage,
515 finish_reason,
516 tool_calls,
517 tool_results,
518 };
519
520 if !step.tool_calls.is_empty() {
521 steps.push(step);
522 } else {
523 let stop_reason = Self::stop_reason(&step);
524 steps.push(step);
525 yield AgentEvent::Done(AgentResponse {
526 final_response: content,
527 iterations: steps.len(),
528 stop_reason,
529 steps,
530 model: model_name.clone(),
531 });
532 return;
533 }
534 }
535
536 let final_response = steps
537 .last()
538 .and_then(|s| s.message.content.as_ref())
539 .and_then(|v| v.as_str())
540 .filter(|s| !s.is_empty())
541 .map(|s| s.to_owned());
542 yield AgentEvent::Done(AgentResponse {
543 final_response,
544 iterations: steps.len(),
545 stop_reason: AgentStopReason::MaxIterations,
546 steps,
547 model: model_name,
548 });
549 }
550 }
551}