Skip to main content

crabtalk_core/agent/
mod.rs

1//! Immutable agent definition and execution methods.
2//!
3//! [`Agent`] owns its configuration, model, tool schemas, and an optional
4//! [`ToolDispatcher`] handle for executing tool calls. Conversation
5//! history is passed in externally — the agent itself is stateless.
6//! It drives LLM execution through [`Agent::step`], [`Agent::run`], and
7//! [`Agent::run_stream`]. `run_stream()` is the canonical step loop —
8//! `run()` collects its events and returns the final response.
9
10use crate::model::{HistoryEntry, MessageBuilder, Model};
11use anyhow::Result;
12use async_stream::stream;
13pub use builder::AgentBuilder;
14pub use config::AgentConfig;
15use crabllm_core::{ChatCompletionRequest, Provider, Role, Tool, ToolCall, ToolChoice, Usage};
16use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
17use futures_core::Stream;
18use futures_util::{StreamExt, future::join_all, stream::FuturesUnordered};
19pub use id::AgentId;
20use std::sync::Arc;
21use tokio::sync::{mpsc, watch};
22pub use tool::{AsTool, ToolDispatcher};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28mod id;
29pub mod tool;
30
31/// A neutral placeholder assistant message returned by `step()` when the
32/// provider yields zero choices. Used only as a step record so callers see
33/// an empty AgentStep instead of a panic; nothing is appended to history.
34fn empty_assistant_message() -> crabllm_core::Message {
35    crabllm_core::Message {
36        role: Role::Assistant,
37        content: Some(serde_json::Value::String(String::new())),
38        tool_calls: None,
39        tool_call_id: None,
40        name: None,
41        reasoning_content: None,
42        extra: Default::default(),
43    }
44}
45
46/// Extract sender from the last user entry in history.
47fn last_sender(history: &[HistoryEntry]) -> String {
48    history
49        .iter()
50        .rev()
51        .find(|e| *e.role() == Role::User)
52        .map(|e| e.sender.clone())
53        .unwrap_or_default()
54}
55
56/// Borrow the inner string from a tool-dispatch result regardless of
57/// success/error. The LLM wire format (crabllm-core `Message`) has no
58/// `is_error` flag, so the agent collapses both arms to a plain string
59/// when appending to history. UI clients still get the distinction via
60/// `AgentEvent::ToolResult.output`.
61fn tool_output_text(result: &Result<String, String>) -> &str {
62    match result {
63        Ok(s) | Err(s) => s,
64    }
65}
66
67/// An immutable agent definition.
68///
69/// Generic over `P: crabllm_core::Provider` — holds a `Model<P>` wrapper
70/// alongside config, tool schemas, and an optional sender for tool
71/// dispatch. Conversation history is owned externally and passed into
72/// execution methods. Callers drive execution via `step()` (single LLM
73/// round), `run()` (loop to completion), or `run_stream()` (yields events
74/// as a stream).
75pub struct Agent<P: Provider + 'static> {
76    /// Agent configuration (name, prompt, model, limits, tool_choice).
77    pub config: AgentConfig,
78    /// The model wrapper for LLM calls.
79    model: Model<P>,
80    /// Tool schemas advertised to the LLM. Set once at build time.
81    tools: Vec<Tool>,
82    /// Dispatcher for tool calls. None = no tools.
83    dispatcher: Option<Arc<dyn ToolDispatcher>>,
84}
85
86impl<P: Provider + 'static> Clone for Agent<P> {
87    fn clone(&self) -> Self {
88        Self {
89            config: self.config.clone(),
90            model: self.model.clone(),
91            tools: self.tools.clone(),
92            dispatcher: self.dispatcher.clone(),
93        }
94    }
95}
96
97impl<P: Provider + 'static> Agent<P> {
98    /// Resolve the model name from agent config.
99    fn model_name(&self) -> String {
100        self.config.model.clone()
101    }
102
103    /// Build a `ChatCompletionRequest` from config state (system prompt +
104    /// history + tool schemas).
105    ///
106    /// If `tool_choice_override` is provided, it takes precedence over the
107    /// agent config's `tool_choice`. Projects each `HistoryEntry` through
108    /// `to_wire_message()` so guest assistant messages get wrapped in
109    /// `<from agent="...">` tags.
110    fn build_request(
111        &self,
112        history: &[HistoryEntry],
113        tool_choice_override: Option<&ToolChoice>,
114    ) -> ChatCompletionRequest {
115        let model_name = self.model_name();
116
117        let mut messages = Vec::with_capacity(1 + history.len());
118        if !self.config.system_prompt.is_empty() {
119            messages.push(crabllm_core::Message::system(&self.config.system_prompt));
120        }
121        messages.extend(history.iter().map(|e| e.to_wire_message()));
122
123        let tool_choice = tool_choice_override
124            .cloned()
125            .unwrap_or_else(|| self.config.tool_choice.clone());
126
127        ChatCompletionRequest {
128            model: model_name,
129            messages,
130            temperature: None,
131            top_p: None,
132            max_tokens: None,
133            stream: None,
134            stop: None,
135            tools: if self.tools.is_empty() {
136                None
137            } else {
138                Some(self.tools.clone())
139            },
140            tool_choice: Some(tool_choice),
141            frequency_penalty: None,
142            presence_penalty: None,
143            seed: None,
144            user: None,
145            reasoning_effort: self.config.thinking.then(|| "high".to_string()),
146            extra: Default::default(),
147        }
148    }
149
150    /// Perform a single LLM round: send request, dispatch tools, return step.
151    ///
152    /// Composes a [`ChatCompletionRequest`] from config state (system prompt +
153    /// history + tool schemas), calls the stored model, dispatches any tool
154    /// calls via the [`ToolDispatcher`], and appends results to history.
155    pub async fn step(
156        &self,
157        history: &mut Vec<HistoryEntry>,
158        conversation_id: Option<u64>,
159    ) -> Result<AgentStep> {
160        let request = self.build_request(history, None);
161        let response = self.model.send_ct(request).await?;
162        let tool_calls: Vec<ToolCall> = response.tool_calls().to_vec();
163        let finish_reason = response.finish_reason().cloned();
164        let usage = response.usage.clone().unwrap_or_default();
165
166        // If the provider returned zero choices, there is no message to record
167        // — match the old `step()` behavior of not appending anything in that
168        // case, instead of bloating history with a synthetic empty assistant
169        // entry on flaky providers.
170        let Some(message) = response.message().cloned() else {
171            return Ok(AgentStep {
172                message: empty_assistant_message(),
173                usage,
174                finish_reason,
175                tool_calls,
176                tool_results: Vec::new(),
177            });
178        };
179
180        history.push(HistoryEntry::from_message(message.clone()));
181
182        let mut tool_results = Vec::new();
183        if !tool_calls.is_empty() {
184            let sender = last_sender(history);
185            let outputs = join_all(tool_calls.iter().map(|tc| {
186                self.dispatch_tool(
187                    &tc.function.name,
188                    &tc.function.arguments,
189                    &sender,
190                    conversation_id,
191                )
192            }))
193            .await;
194            for (tc, result) in tool_calls.iter().zip(outputs) {
195                let entry =
196                    HistoryEntry::tool(tool_output_text(&result), tc.id.clone(), &tc.function.name);
197                history.push(entry.clone());
198                tool_results.push(entry);
199            }
200        }
201
202        Ok(AgentStep {
203            message,
204            usage,
205            finish_reason,
206            tool_calls,
207            tool_results,
208        })
209    }
210
211    /// Dispatch a single tool call via the configured [`ToolDispatcher`].
212    ///
213    /// Returns `Ok(output)` for normal tool output or `Err(message)` for a
214    /// failure. If no dispatcher is configured, returns an `Err` describing
215    /// the misconfiguration; otherwise the dispatcher's verdict is forwarded
216    /// unchanged.
217    async fn dispatch_tool(
218        &self,
219        name: &str,
220        args: &str,
221        sender: &str,
222        conversation_id: Option<u64>,
223    ) -> Result<String, String> {
224        let Some(dispatcher) = &self.dispatcher else {
225            return Err(format!(
226                "tool '{name}' called but no tool dispatcher configured"
227            ));
228        };
229        dispatcher
230            .dispatch(name, args, &self.config.name, sender, conversation_id)
231            .await
232    }
233
234    /// Determine the stop reason for a step with no tool calls.
235    fn stop_reason(step: &AgentStep) -> AgentStopReason {
236        let has_text = step
237            .message
238            .content
239            .as_ref()
240            .and_then(|v| v.as_str())
241            .is_some_and(|s| !s.is_empty());
242        if has_text {
243            AgentStopReason::TextResponse
244        } else {
245            AgentStopReason::NoAction
246        }
247    }
248
249    /// Run the agent loop to completion, returning the final response.
250    ///
251    /// Wraps [`Agent::run_stream`] — collects all events, sends each through
252    /// `events`, and extracts the `Done` response.
253    pub async fn run(
254        &self,
255        history: &mut Vec<HistoryEntry>,
256        events: mpsc::UnboundedSender<AgentEvent>,
257        conversation_id: Option<u64>,
258        tool_choice: Option<ToolChoice>,
259    ) -> AgentResponse {
260        let mut stream =
261            std::pin::pin!(self.run_stream(history, conversation_id, None, tool_choice));
262        let mut response = None;
263        while let Some(event) = stream.next().await {
264            if let AgentEvent::Done(ref resp) = event {
265                response = Some(resp.clone());
266            }
267            let _ = events.send(event);
268        }
269
270        response.unwrap_or_else(|| AgentResponse {
271            final_response: None,
272            iterations: 0,
273            stop_reason: AgentStopReason::Error("stream ended without Done".into()),
274            steps: vec![],
275            model: self.model_name(),
276        })
277    }
278
279    /// Run the agent loop as a stream of [`AgentEvent`]s.
280    ///
281    /// Uses the model's streaming API so text deltas are yielded token-by-token.
282    /// Tool call responses are dispatched after the stream completes (arguments
283    /// arrive incrementally and must be fully accumulated first).
284    pub fn run_stream<'a>(
285        &'a self,
286        history: &'a mut Vec<HistoryEntry>,
287        conversation_id: Option<u64>,
288        mut steer_rx: Option<watch::Receiver<Option<String>>>,
289        tool_choice: Option<ToolChoice>,
290    ) -> impl Stream<Item = AgentEvent> + 'a {
291        stream! {
292            let mut steps = Vec::new();
293            let max = self.config.max_iterations;
294            let model_name = self.model_name();
295
296            for _ in 0..max {
297                // Check for pending steering message before the next model call.
298                // Scope the borrow so the !Send guard is dropped before yield.
299                let steer_content = steer_rx.as_mut().and_then(|rx| {
300                    rx.has_changed().ok()?.then(|| rx.borrow_and_update().clone())?
301                });
302                if let Some(content) = steer_content {
303                    let sender = last_sender(history);
304                    history.push(HistoryEntry::user_with_sender(&content, &sender));
305                    yield AgentEvent::UserSteered { content };
306                }
307
308                let request = self.build_request(history, tool_choice.as_ref());
309
310                // Stream from the model, yielding text deltas as they arrive.
311                let mut builder = MessageBuilder::new(Role::Assistant);
312                let mut finish_reason = None;
313                let mut last_usage: Option<Usage> = None;
314                let mut stream_error = None;
315                let mut tool_begin_emitted = false;
316
317                // Tracks the currently open text/thinking segment so we can
318                // bracket deltas with explicit Start/End events. Only one
319                // segment is open at a time — type transitions emit the
320                // closing event for the previous segment first.
321                #[derive(PartialEq)]
322                enum OpenSegment { None, Text, Thinking }
323                let mut open = OpenSegment::None;
324
325                {
326                    let mut chunk_stream = std::pin::pin!(self.model.stream_ct(request));
327                    while let Some(result) = chunk_stream.next().await {
328                        match result {
329                            Ok(chunk) => {
330                                // Process text portion. Match existing behavior:
331                                // emit TextDelta even when the slice is empty.
332                                if let Some(text) = chunk.content() {
333                                    if open != OpenSegment::Text {
334                                        if open == OpenSegment::Thinking {
335                                            yield AgentEvent::ThinkingEnd;
336                                        }
337                                        yield AgentEvent::TextStart;
338                                        open = OpenSegment::Text;
339                                    }
340                                    yield AgentEvent::TextDelta(text.to_owned());
341                                }
342                                // Process reasoning portion. Same atomic-flip logic.
343                                if let Some(reason) = chunk.reasoning_content() {
344                                    if open != OpenSegment::Thinking {
345                                        if open == OpenSegment::Text {
346                                            yield AgentEvent::TextEnd;
347                                        }
348                                        yield AgentEvent::ThinkingStart;
349                                        open = OpenSegment::Thinking;
350                                    }
351                                    yield AgentEvent::ThinkingDelta(reason.to_owned());
352                                }
353                                if let Some(r) = chunk.finish_reason() {
354                                    finish_reason = Some(r.clone());
355                                }
356                                if chunk.usage.is_some() {
357                                    last_usage = chunk.usage.clone();
358                                }
359                                builder.accept(&chunk);
360                                // Emit ToolCallsBegin as soon as tool names appear
361                                // in the builder, so the CLI can show markers while
362                                // args are still streaming. Uses current builder
363                                // state, which may already have partial/full args.
364                                if !tool_begin_emitted {
365                                    let calls = builder.peek_tool_calls();
366                                    if !calls.is_empty() {
367                                        tool_begin_emitted = true;
368                                        yield AgentEvent::ToolCallsBegin(calls);
369                                    }
370                                }
371                            }
372                            Err(e) => {
373                                stream_error = Some(e.to_string());
374                                break;
375                            }
376                        }
377                    }
378                    // Close whatever segment is still open at end of stream.
379                    match open {
380                        OpenSegment::Text => yield AgentEvent::TextEnd,
381                        OpenSegment::Thinking => yield AgentEvent::ThinkingEnd,
382                        OpenSegment::None => {}
383                    }
384                }
385                if let Some(e) = stream_error {
386                    yield AgentEvent::Done(AgentResponse {
387                        final_response: None,
388                        iterations: steps.len(),
389                        stop_reason: AgentStopReason::Error(e),
390                        steps,
391                        model: model_name.clone(),
392                    });
393                    return;
394                }
395
396                // Build the accumulated message. `MessageBuilder::build`
397                // already drops degenerate (id-less or name-less) tool call
398                // fragments, so any tool_calls present here are well-formed.
399                let message = builder.build();
400                let tool_calls: Vec<ToolCall> =
401                    message.tool_calls.clone().unwrap_or_default();
402                let content = message
403                    .content
404                    .as_ref()
405                    .and_then(|v| v.as_str())
406                    .filter(|s| !s.is_empty())
407                    .map(|s| s.to_owned());
408                let usage = last_usage.unwrap_or_default();
409                let has_tool_calls = !tool_calls.is_empty();
410
411                // If the stream produced neither text nor any usable tool
412                // call, treat the round as a no-op: do not push the empty
413                // assistant message into history (which would persist via
414                // `append_messages` and contaminate the next request),
415                // yield Done with NoAction, and return. This is the
416                // mid-stream-disconnect path — reqwest can end an SSE
417                // stream cleanly with `Ok(None)` on a TCP RST, so we
418                // can't rely on `stream_error` alone to catch it.
419                if content.is_none() && !has_tool_calls {
420                    yield AgentEvent::Done(AgentResponse {
421                        final_response: None,
422                        iterations: steps.len(),
423                        stop_reason: AgentStopReason::NoAction,
424                        steps,
425                        model: model_name.clone(),
426                    });
427                    return;
428                }
429
430                history.push(HistoryEntry::from_message(message.clone()));
431
432                // Dispatch tool calls concurrently.
433                //
434                // `FuturesUnordered` polls each dispatch future to completion
435                // independently so `ToolResult` events fire in completion
436                // order (fast tools don't wait on slow siblings in the UI).
437                // Outputs are buffered by the original call index so history
438                // entries append in call order — providers pair results to
439                // calls by position in some encodings, so this ordering is
440                // load-bearing.
441                let mut tool_results = Vec::new();
442                if has_tool_calls {
443                    let sender = last_sender(history);
444                    yield AgentEvent::ToolCallsStart(tool_calls.clone());
445
446                    let mut pending: FuturesUnordered<_> = tool_calls
447                        .iter()
448                        .enumerate()
449                        .map(|(idx, tc)| {
450                            let fut = self.dispatch_tool(
451                                &tc.function.name,
452                                &tc.function.arguments,
453                                &sender,
454                                conversation_id,
455                            );
456                            // `start` is captured inside the async block so
457                            // it measures actual polled runtime, not the time
458                            // since `FuturesUnordered` was built.
459                            async move {
460                                let start = std::time::Instant::now();
461                                let out = fut.await;
462                                (idx, out, start.elapsed().as_millis() as u64)
463                            }
464                        })
465                        .collect();
466
467                    let mut buffered: Vec<Option<Result<String, String>>> =
468                        vec![None; tool_calls.len()];
469                    while let Some((idx, output, duration_ms)) = pending.next().await {
470                        let call_id = tool_calls[idx].id.clone();
471                        // Clone into the event; the owned Result lands in
472                        // `buffered[idx]` so the drain-loop tail can append
473                        // history entries in original call order.
474                        yield AgentEvent::ToolResult {
475                            call_id,
476                            output: output.clone(),
477                            duration_ms,
478                        };
479                        buffered[idx] = Some(output);
480                    }
481
482                    for (tc, out) in tool_calls.iter().zip(buffered.into_iter()) {
483                        let out = out.expect("FuturesUnordered drained every slot");
484                        let entry = HistoryEntry::tool(
485                            tool_output_text(&out),
486                            tc.id.clone(),
487                            &tc.function.name,
488                        );
489                        history.push(entry.clone());
490                        tool_results.push(entry);
491                    }
492
493                    yield AgentEvent::ToolCallsComplete;
494                }
495
496                // Auto-compaction: check token estimate after each step.
497                if let Some(threshold) = self.config.compact_threshold
498                    && Self::estimate_tokens(history) > threshold
499                {
500                    if let Some(summary) = self.compact(history).await {
501                        yield AgentEvent::Compact { summary: summary.clone() };
502                        *history = vec![HistoryEntry::user(&summary)];
503                        yield AgentEvent::TextStart;
504                        yield AgentEvent::TextDelta(
505                            "\n[context compacted]\n".to_owned(),
506                        );
507                        yield AgentEvent::TextEnd;
508                    }
509                    continue;
510                }
511
512                let step = AgentStep {
513                    message,
514                    usage,
515                    finish_reason,
516                    tool_calls,
517                    tool_results,
518                };
519
520                if !step.tool_calls.is_empty() {
521                    steps.push(step);
522                } else {
523                    let stop_reason = Self::stop_reason(&step);
524                    steps.push(step);
525                    yield AgentEvent::Done(AgentResponse {
526                        final_response: content,
527                        iterations: steps.len(),
528                        stop_reason,
529                        steps,
530                        model: model_name.clone(),
531                    });
532                    return;
533                }
534            }
535
536            let final_response = steps
537                .last()
538                .and_then(|s| s.message.content.as_ref())
539                .and_then(|v| v.as_str())
540                .filter(|s| !s.is_empty())
541                .map(|s| s.to_owned());
542            yield AgentEvent::Done(AgentResponse {
543                final_response,
544                iterations: steps.len(),
545                stop_reason: AgentStopReason::MaxIterations,
546                steps,
547                model: model_name,
548            });
549        }
550    }
551}