Skip to main content

walrus_core/agent/
mod.rs

1//! Immutable agent definition and execution methods.
2//!
3//! [`Agent`] owns its configuration, model, tool schemas, and an optional
4//! [`ToolSender`] for dispatching tool calls to the runtime. Conversation
5//! history is passed in externally — the agent itself is stateless.
6//! It drives LLM execution through [`Agent::step`], [`Agent::run`], and
7//! [`Agent::run_stream`]. `run_stream()` is the canonical step loop —
8//! `run()` collects its events and returns the final response.
9
10use crate::model::{
11    Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12    Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use compact::COMPACT_SENTINEL;
18pub use config::AgentConfig;
19use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
20use futures_core::Stream;
21use futures_util::StreamExt;
22use tokio::sync::{mpsc, oneshot};
23pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
24
25mod builder;
26mod compact;
27pub mod config;
28pub mod event;
29pub mod tool;
30
31/// Extract sender from the last user message in history.
32fn last_sender(history: &[Message]) -> compact_str::CompactString {
33    history
34        .iter()
35        .rev()
36        .find(|m| m.role == Role::User)
37        .map(|m| m.sender.clone())
38        .unwrap_or_default()
39}
40
41/// An immutable agent definition.
42///
43/// Generic over `M: Model` — stores the model provider alongside config,
44/// tool schemas, and an optional sender for tool dispatch. Conversation
45/// history is owned externally and passed into execution methods.
46/// Callers drive execution via `step()` (single LLM round), `run()` (loop to
47/// completion), or `run_stream()` (yields events as a stream).
48pub struct Agent<M: Model> {
49    /// Agent configuration (name, prompt, model, limits, tool_choice).
50    pub config: AgentConfig,
51    /// The model provider for LLM calls.
52    model: M,
53    /// Tool schemas advertised to the LLM. Set once at build time.
54    tools: Vec<Tool>,
55    /// Sender for dispatching tool calls to the runtime. None = no tools.
56    tool_tx: Option<ToolSender>,
57}
58
59impl<M: Model> Agent<M> {
60    /// Perform a single LLM round: send request, dispatch tools, return step.
61    ///
62    /// Composes a [`Request`] from config state (system prompt + history +
63    /// tool schemas), calls the stored model, dispatches any tool calls via
64    /// the [`ToolSender`] channel, and appends results to history.
65    pub async fn step(&self, history: &mut Vec<Message>) -> Result<AgentStep> {
66        let model_name = self
67            .config
68            .model
69            .clone()
70            .unwrap_or_else(|| self.model.active_model());
71
72        let mut messages = Vec::with_capacity(1 + history.len());
73        if !self.config.system_prompt.is_empty() {
74            messages.push(Message::system(&self.config.system_prompt));
75        }
76        messages.extend(history.iter().cloned());
77
78        let mut request = Request::new(model_name)
79            .with_messages(messages)
80            .with_tool_choice(self.config.tool_choice.clone())
81            .with_think(self.config.thinking);
82        if !self.tools.is_empty() {
83            request = request.with_tools(self.tools.clone());
84        }
85
86        let response = self.model.send(&request).await?;
87        let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
88
89        if let Some(msg) = response.message() {
90            history.push(msg);
91        }
92
93        let mut tool_results = Vec::new();
94        if !tool_calls.is_empty() {
95            let sender = last_sender(history);
96            for tc in &tool_calls {
97                let result = self
98                    .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
99                    .await;
100                let msg = Message::tool(&result, tc.id.clone());
101                history.push(msg.clone());
102                tool_results.push(msg);
103            }
104        }
105
106        Ok(AgentStep {
107            response,
108            tool_calls,
109            tool_results,
110        })
111    }
112
113    /// Dispatch a single tool call via the tool sender channel.
114    ///
115    /// Returns the result string. If no sender is configured, returns an error
116    /// message without panicking.
117    async fn dispatch_tool(&self, name: &str, args: &str, sender: &str) -> String {
118        let Some(tx) = &self.tool_tx else {
119            return format!("tool '{name}' called but no tool sender configured");
120        };
121        let (reply_tx, reply_rx) = oneshot::channel();
122        let req = ToolRequest {
123            name: name.to_owned(),
124            args: args.to_owned(),
125            agent: self.config.name.to_string(),
126            reply: reply_tx,
127            task_id: None,
128            sender: sender.into(),
129        };
130        if tx.send(req).is_err() {
131            return format!("tool channel closed while calling '{name}'");
132        }
133        reply_rx
134            .await
135            .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
136    }
137
138    /// Determine the stop reason for a step with no tool calls.
139    fn stop_reason(step: &AgentStep) -> AgentStopReason {
140        if step.response.content().is_some() {
141            AgentStopReason::TextResponse
142        } else {
143            AgentStopReason::NoAction
144        }
145    }
146
147    /// Run the agent loop to completion, returning the final response.
148    ///
149    /// Wraps [`Agent::run_stream`] — collects all events, sends each through
150    /// `events`, and extracts the `Done` response.
151    pub async fn run(
152        &self,
153        history: &mut Vec<Message>,
154        events: mpsc::UnboundedSender<AgentEvent>,
155    ) -> AgentResponse {
156        let mut stream = std::pin::pin!(self.run_stream(history));
157        let mut response = None;
158        while let Some(event) = stream.next().await {
159            if let AgentEvent::Done(ref resp) = event {
160                response = Some(resp.clone());
161            }
162            let _ = events.send(event);
163        }
164
165        response.unwrap_or_else(|| AgentResponse {
166            final_response: None,
167            iterations: 0,
168            stop_reason: AgentStopReason::Error("stream ended without Done".into()),
169            steps: vec![],
170        })
171    }
172
173    /// Run the agent loop as a stream of [`AgentEvent`]s.
174    ///
175    /// Uses the model's streaming API so text deltas are yielded token-by-token.
176    /// Tool call responses are dispatched after the stream completes (arguments
177    /// arrive incrementally and must be fully accumulated first).
178    pub fn run_stream<'a>(
179        &'a self,
180        history: &'a mut Vec<Message>,
181    ) -> impl Stream<Item = AgentEvent> + 'a {
182        stream! {
183            let mut steps = Vec::new();
184            let max = self.config.max_iterations;
185
186            for _ in 0..max {
187                // Build the request (same logic as step()).
188                let model_name = self
189                    .config
190                    .model
191                    .clone()
192                    .unwrap_or_else(|| self.model.active_model());
193
194                let mut messages = Vec::with_capacity(1 + history.len());
195                if !self.config.system_prompt.is_empty() {
196                    messages.push(Message::system(&self.config.system_prompt));
197                }
198                messages.extend(history.iter().cloned());
199
200                let mut request = Request::new(model_name)
201                    .with_messages(messages)
202                    .with_tool_choice(self.config.tool_choice.clone())
203                    .with_think(self.config.thinking);
204                if !self.tools.is_empty() {
205                    request = request.with_tools(self.tools.clone());
206                }
207
208                // Stream from the model, yielding text deltas as they arrive.
209                let mut builder = MessageBuilder::new(Role::Assistant);
210                let mut finish_reason = None;
211                let mut last_meta = CompletionMeta::default();
212                let mut last_usage = None;
213                let mut stream_error = None;
214
215                {
216                    let mut chunk_stream = std::pin::pin!(self.model.stream(request));
217                    while let Some(result) = chunk_stream.next().await {
218                        match result {
219                            Ok(chunk) => {
220                                if let Some(text) = chunk.content() {
221                                    yield AgentEvent::TextDelta(text.to_owned());
222                                }
223                                if let Some(reason) = chunk.reasoning_content() {
224                                    yield AgentEvent::ThinkingDelta(reason.to_owned());
225                                }
226                                if let Some(r) = chunk.reason() {
227                                    finish_reason = Some(*r);
228                                }
229                                last_meta = chunk.meta.clone();
230                                if chunk.usage.is_some() {
231                                    last_usage = chunk.usage.clone();
232                                }
233                                builder.accept(&chunk);
234                            }
235                            Err(e) => {
236                                stream_error = Some(e.to_string());
237                                break;
238                            }
239                        }
240                    }
241                }
242                if let Some(e) = stream_error {
243                    yield AgentEvent::Done(AgentResponse {
244                        final_response: None,
245                        iterations: steps.len(),
246                        stop_reason: AgentStopReason::Error(e),
247                        steps,
248                    });
249                    return;
250                }
251
252                // Build the accumulated message and response.
253                let msg = builder.build();
254                let tool_calls = msg.tool_calls.to_vec();
255                let content = if msg.content.is_empty() {
256                    None
257                } else {
258                    Some(msg.content.clone())
259                };
260
261                let response = Response {
262                    meta: last_meta,
263                    choices: vec![Choice {
264                        index: 0,
265                        delta: Delta {
266                            role: Some(Role::Assistant),
267                            content: content.clone(),
268                            reasoning_content: if msg.reasoning_content.is_empty() {
269                                None
270                            } else {
271                                Some(msg.reasoning_content.clone())
272                            },
273                            tool_calls: if tool_calls.is_empty() {
274                                None
275                            } else {
276                                Some(tool_calls.clone())
277                            },
278                        },
279                        finish_reason,
280                        logprobs: None,
281                    }],
282                    usage: last_usage.unwrap_or(Usage {
283                        prompt_tokens: 0,
284                        completion_tokens: 0,
285                        total_tokens: 0,
286                        prompt_cache_hit_tokens: None,
287                        prompt_cache_miss_tokens: None,
288                        completion_tokens_details: None,
289                    }),
290                };
291
292                history.push(msg);
293                let has_tool_calls = !tool_calls.is_empty();
294
295                // Dispatch tool calls if any.
296                let mut tool_results = Vec::new();
297                let mut compact_triggered = false;
298                if has_tool_calls {
299                    let sender = last_sender(history);
300                    yield AgentEvent::ToolCallsStart(tool_calls.clone());
301                    for tc in &tool_calls {
302                        let result = self
303                            .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
304                            .await;
305                        if result.starts_with(compact::COMPACT_SENTINEL) {
306                            compact_triggered = true;
307                        }
308                        let msg = Message::tool(&result, tc.id.clone());
309                        history.push(msg.clone());
310                        tool_results.push(msg);
311                        yield AgentEvent::ToolResult {
312                            call_id: tc.id.clone(),
313                            output: result,
314                        };
315                    }
316                    yield AgentEvent::ToolCallsComplete;
317                }
318
319                // Handle compaction: summarize history, store journal, replace.
320                if compact_triggered {
321                    if let Some(summary) = self.compact(history).await {
322                        // Store journal entry via internal tool dispatch.
323                        let _ = self.dispatch_tool("__journal__", &summary, "").await;
324                        // Replace history with the summary.
325                        *history = vec![Message::user(&summary)];
326                        yield AgentEvent::TextDelta(
327                            "\n[context compacted]\n".to_owned(),
328                        );
329                    }
330                    // Continue the agent loop with compact context.
331                    continue;
332                }
333
334                let step = AgentStep {
335                    response,
336                    tool_calls,
337                    tool_results,
338                };
339
340                if !has_tool_calls {
341                    let stop_reason = Self::stop_reason(&step);
342                    steps.push(step);
343                    yield AgentEvent::Done(AgentResponse {
344                        final_response: content,
345                        iterations: steps.len(),
346                        stop_reason,
347                        steps,
348                    });
349                    return;
350                }
351
352                steps.push(step);
353            }
354
355            let final_response = steps.last().and_then(|s| s.response.content().cloned());
356            yield AgentEvent::Done(AgentResponse {
357                final_response,
358                iterations: steps.len(),
359                stop_reason: AgentStopReason::MaxIterations,
360                steps,
361            });
362        }
363    }
364}