ds_api/agent/agent_core.rs
1use std::collections::HashMap;
2
3use crate::api::ApiClient;
4use crate::conversation::{Conversation, LlmSummarizer, Summarizer};
5use crate::raw::request::message::{Message, Role};
6use crate::tool_trait::Tool;
7use serde_json::Value;
8use tokio::sync::mpsc;
9
10/// A tool call fragment emitted by [`AgentStream`][crate::agent::AgentStream].
11///
12/// In streaming mode multiple `ToolCallChunk`s are emitted per tool call:
13/// the first has an empty `delta` (name is known, no args yet); subsequent
14/// chunks carry incremental argument JSON. In non-streaming mode a single
15/// chunk is emitted with the complete argument JSON in `delta`.
16#[derive(Debug, Clone)]
17pub struct ToolCallChunk {
18 pub id: String,
19 pub name: String,
20 pub delta: String,
21 pub index: u32,
22}
23
24/// The result of a completed tool invocation.
25///
26/// Yielded as `AgentEvent::ToolResult` after the tool has finished executing.
27#[derive(Debug, Clone)]
28pub struct ToolCallResult {
29 pub id: String,
30 pub name: String,
31 pub args: String,
32 pub result: Value,
33}
34
35/// Events emitted by [`AgentStream`][crate::agent::AgentStream].
36///
37/// Each variant represents a distinct, self-contained event in the agent lifecycle:
38///
39/// - `Token(String)` — a text fragment from the assistant. In streaming mode each
40/// `Token` is a single SSE delta; in non-streaming mode the full response text
41/// arrives as one `Token`.
42/// - `ToolCall(id, name, delta)` — a tool call fragment. Behaves exactly like
43/// `Token`: in streaming mode one event is emitted per SSE chunk (first chunk has
44/// an empty `delta` and carries the tool name; subsequent chunks carry incremental
45/// argument JSON). In non-streaming mode a single event is emitted with the
46/// complete arguments string. Accumulate `delta` values by `id` to reconstruct
47/// the full argument JSON. Execution begins after all chunks for a turn are
48/// delivered.
49/// - `ToolResult(ToolCallResult)` — a tool has finished executing. One event is
50/// emitted per call, in the same order as the corresponding `ToolCall` events.
51#[derive(Debug, Clone)]
52pub enum AgentEvent {
53 Token(String),
54 /// Emitted when the model produces reasoning/thinking content (e.g. deepseek-reasoner).
55 /// In streaming mode this arrives token-by-token before the main reply.
56 ReasoningToken(String),
57 ToolCall(ToolCallChunk),
58 ToolResult(ToolCallResult),
59}
60
61/// An agent that combines a [`Conversation`] with a set of callable tools.
62///
63/// Build one with the fluent builder methods, then call [`chat`][DeepseekAgent::chat]
64/// to start a turn:
65///
66/// ```no_run
67/// use ds_api::{DeepseekAgent, tool};
68/// use serde_json::{Value, json};
69///
70/// struct MyTool;
71///
72/// #[tool]
73/// impl ds_api::Tool for MyTool {
74/// async fn greet(&self, name: String) -> Value {
75/// json!({ "greeting": format!("Hello, {name}!") })
76/// }
77/// }
78///
79/// # #[tokio::main] async fn main() {
80/// let agent = DeepseekAgent::new("sk-...")
81/// .add_tool(MyTool);
82/// # }
83/// ```
84pub struct DeepseekAgent {
85 /// The conversation manages history, the API client, and context-window compression.
86 pub(crate) conversation: Conversation,
87 pub(crate) tools: Vec<Box<dyn Tool>>,
88 pub(crate) tool_index: HashMap<String, usize>,
89 /// When `true` the agent uses SSE streaming for each API turn so `Token` events
90 /// arrive incrementally. When `false` (default) the full response is awaited.
91 pub(crate) streaming: bool,
92 /// The model to use for every API turn. Defaults to `"deepseek-chat"`.
93 pub(crate) model: String,
94 /// Optional channel for injecting user messages mid-loop.
95 /// Messages received here are drained after each tool-execution round and
96 /// appended to the conversation history as `Role::User` messages before the
97 /// next API turn begins.
98 pub(crate) interrupt_rx: Option<mpsc::UnboundedReceiver<String>>,
99 /// Optional map of extra top-level JSON fields to merge into the API request body.
100 /// This is used by the builder helpers below to attach custom provider-specific
101 /// fields that the typed request doesn't yet expose.
102 pub(crate) extra_body: Option<serde_json::Map<String, serde_json::Value>>,
103 /// Optional channel for injecting or removing tools at runtime.
104 /// Messages received here are drained at every `Idle` transition,
105 /// before the next API turn begins — same timing as `interrupt_rx`.
106 pub(crate) tool_inject_rx: Option<mpsc::UnboundedReceiver<ToolInjection>>,
107}
108
109/// A runtime tool-injection command sent through the channel created by
110/// [`DeepseekAgent::with_tool_inject_channel`].
111pub enum ToolInjection {
112 /// Add a new tool to the running agent.
113 Add(Box<dyn Tool>),
114 /// Remove all tools whose `raw_tools()` names are in the given set.
115 Remove(Vec<String>),
116}
117
118impl DeepseekAgent {
119 fn from_parts(client: ApiClient, model: impl Into<String>) -> Self {
120 let model = model.into();
121 let summarizer = LlmSummarizer::new(client.clone()).with_model(model.clone());
122 Self {
123 conversation: Conversation::new(client).with_summarizer(summarizer),
124 tools: vec![],
125 tool_index: HashMap::new(),
126 streaming: false,
127 model,
128 interrupt_rx: None,
129 extra_body: None,
130 tool_inject_rx: None,
131 }
132 }
133
134 /// Create a new agent targeting the DeepSeek API with `deepseek-chat`.
135 pub fn new(token: impl Into<String>) -> Self {
136 Self::from_parts(ApiClient::new(token), "deepseek-chat")
137 }
138
139 /// Create an agent targeting an OpenAI-compatible provider.
140 ///
141 /// All three parameters are set at construction time and never change:
142 ///
143 /// ```no_run
144 /// use ds_api::DeepseekAgent;
145 ///
146 /// let agent = DeepseekAgent::custom(
147 /// "sk-or-...",
148 /// "https://openrouter.ai/api/v1",
149 /// "meta-llama/llama-3.3-70b-instruct:free",
150 /// );
151 /// ```
152 pub fn custom(
153 token: impl Into<String>,
154 base_url: impl Into<String>,
155 model: impl Into<String>,
156 ) -> Self {
157 let client = ApiClient::new(token).with_base_url(base_url);
158 Self::from_parts(client, model)
159 }
160
161 /// Register a tool (builder-style, supports chaining).
162 ///
163 /// The tool's protocol-level function names are indexed so incoming tool-call
164 /// requests from the model can be dispatched to the correct implementation.
165 pub fn add_tool<TT: Tool + 'static>(mut self, tool: TT) -> Self {
166 let idx = self.tools.len();
167 for raw in tool.raw_tools() {
168 self.tool_index.insert(raw.function.name.clone(), idx);
169 }
170 self.tools.push(Box::new(tool));
171 self
172 }
173
174 /// Push a user message and return an [`AgentStream`][crate::agent::AgentStream]
175 /// that drives the full agent loop (API calls + tool execution).
176 pub fn chat(mut self, user_message: &str) -> crate::agent::stream::AgentStream {
177 self.conversation.push_user_input(user_message);
178 crate::agent::stream::AgentStream::new(self)
179 }
180
181 /// Start an agent turn from the current history **without** pushing a new
182 /// user message first.
183 ///
184 /// Use this when you have already appended the user message manually (e.g.
185 /// via [`push_user_message_with_name`][Self::push_user_message_with_name])
186 /// and want to drive the agent loop from that point.
187 pub fn chat_from_history(self) -> crate::agent::stream::AgentStream {
188 crate::agent::stream::AgentStream::new(self)
189 }
190
191 /// Enable SSE streaming for each API turn (builder-style).
192 pub fn with_streaming(mut self) -> Self {
193 self.streaming = true;
194 self
195 }
196
197 /// Merge arbitrary top-level JSON key/value pairs into the request body for
198 /// the next API turn. The pairs are stored on the agent and later merged
199 /// into the `ApiRequest` raw body when a request is built.
200 ///
201 /// Example:
202 /// let mut map = serde_json::Map::new();
203 /// map.insert(\"foo\".to_string(), serde_json::json!(\"bar\"));
204 /// let agent = DeepseekAgent::new(\"sk-...\").extra_body(map);
205 pub fn extra_body(mut self, map: serde_json::Map<String, serde_json::Value>) -> Self {
206 if let Some(ref mut existing) = self.extra_body {
207 existing.extend(map);
208 } else {
209 self.extra_body = Some(map);
210 }
211 self
212 }
213
214 /// Add a single extra top-level field to be merged into the request body.
215 /// Convenience helper to avoid constructing a full map.
216 pub fn extra_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
217 if let Some(ref mut m) = self.extra_body {
218 m.insert(key.into(), value);
219 } else {
220 let mut m = serde_json::Map::new();
221 m.insert(key.into(), value);
222 self.extra_body = Some(m);
223 }
224 self
225 }
226
227 /// Prepend a permanent system prompt to the conversation history (builder-style).
228 ///
229 /// System messages added this way are never removed by the built-in summarizers.
230 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
231 self.conversation
232 .history_mut()
233 .insert(0, Message::new(Role::System, &prompt.into()));
234 self
235 }
236
237 /// Replace the summarizer used for context-window management (builder-style).
238 pub fn with_summarizer(mut self, summarizer: impl Summarizer + 'static) -> Self {
239 self.conversation = self.conversation.with_summarizer(summarizer);
240 self
241 }
242
243 /// Seed the agent with an existing message history (builder-style).
244 ///
245 /// Used to restore a conversation from persistent storage (e.g. SQLite)
246 /// after a process restart. The messages are set directly on the
247 /// underlying `Conversation` and will be included in the next API call.
248 ///
249 /// # Example
250 ///
251 /// ```no_run
252 /// use ds_api::DeepseekAgent;
253 /// use ds_api::raw::request::message::{Message, Role};
254 ///
255 /// # #[tokio::main] async fn main() {
256 /// let history = vec![
257 /// Message::new(Role::User, "Hello"),
258 /// Message::new(Role::Assistant, "Hi there!"),
259 /// ];
260 /// let agent = DeepseekAgent::new("sk-...").with_history(history);
261 /// # }
262 /// ```
263 pub fn with_history(mut self, history: Vec<crate::raw::request::message::Message>) -> Self {
264 self.conversation = self.conversation.with_history(history);
265 self
266 }
267
268 /// Append a user message with an optional display name to the conversation
269 /// history.
270 ///
271 /// The `name` field is passed through to the API as-is (OpenAI-compatible
272 /// providers use it to distinguish speakers in a shared channel).
273 ///
274 /// # Example
275 ///
276 /// ```no_run
277 /// use ds_api::DeepseekAgent;
278 ///
279 /// # #[tokio::main] async fn main() {
280 /// let mut agent = DeepseekAgent::new("sk-...");
281 /// agent.push_user_message_with_name("What time is it?", Some("alice"));
282 /// # }
283 /// ```
284 pub fn push_user_message_with_name(&mut self, text: &str, name: Option<&str>) {
285 use crate::raw::request::message::{Message, Role};
286 let mut msg = Message::new(Role::User, text);
287 msg.name = name.map(|n| n.to_string());
288 self.conversation.history_mut().push(msg);
289 }
290
291 /// Read-only view of the current conversation history.
292 ///
293 /// Returns all messages in order, including system prompts, user turns,
294 /// assistant replies, tool calls, and tool results. Auto-summary messages
295 /// inserted by the built-in summarizers are also included.
296 ///
297 /// # Example
298 ///
299 /// ```no_run
300 /// use ds_api::DeepseekAgent;
301 ///
302 /// # #[tokio::main] async fn main() {
303 /// let agent = DeepseekAgent::new("sk-...");
304 /// for msg in agent.history() {
305 /// println!("{:?}: {:?}", msg.role, msg.content);
306 /// }
307 /// # }
308 /// ```
309 pub fn history(&self) -> &[crate::raw::request::message::Message] {
310 self.conversation.history()
311 }
312
313 /// Attach an interrupt channel to the agent (builder-style).
314 ///
315 /// Returns the agent and the sender half of the channel. Send any `String`
316 /// through the `UnboundedSender` at any time; the message will be picked up
317 /// after the current tool-execution round finishes and inserted into the
318 /// conversation history as a `Role::User` message before the next API turn.
319 ///
320 /// # Example
321 ///
322 /// ```no_run
323 /// use ds_api::DeepseekAgent;
324 /// use tokio::sync::mpsc;
325 ///
326 /// # #[tokio::main] async fn main() {
327 /// let (agent, tx) = DeepseekAgent::new("sk-...")
328 /// .with_interrupt_channel();
329 ///
330 /// // In another task or callback:
331 /// tx.send("Actually, use Python instead.".into()).unwrap();
332 /// # }
333 /// ```
334 pub fn with_interrupt_channel(mut self) -> (Self, mpsc::UnboundedSender<String>) {
335 let (tx, rx) = mpsc::unbounded_channel();
336 self.interrupt_rx = Some(rx);
337 (self, tx)
338 }
339
340 /// Drain any pending messages from the interrupt channel and append them
341 /// to the conversation history as `Role::User` messages.
342 ///
343 /// Called by the state machine in [`AgentStream`] at the top of every
344 /// `Idle` transition so that injected messages are visible before each API
345 /// turn, not just after tool-execution rounds.
346 pub(crate) fn drain_interrupts(&mut self) {
347 if let Some(rx) = self.interrupt_rx.as_mut() {
348 while let Ok(msg) = rx.try_recv() {
349 self.conversation
350 .history_mut()
351 .push(Message::new(Role::User, &msg));
352 }
353 }
354 }
355
356 /// Drain any pending [`ToolInjection`]s from the channel and apply them.
357 ///
358 /// Called by the state machine at every `Idle` transition, right after
359 /// `drain_interrupts`, so added/removed tools take effect before the next
360 /// API turn.
361 pub(crate) fn drain_tool_injections(&mut self) {
362 if let Some(rx) = self.tool_inject_rx.as_mut() {
363 while let Ok(injection) = rx.try_recv() {
364 match injection {
365 ToolInjection::Add(tool) => {
366 let idx = self.tools.len();
367 for raw in tool.raw_tools() {
368 self.tool_index.insert(raw.function.name.clone(), idx);
369 }
370 self.tools.push(tool);
371 }
372 ToolInjection::Remove(names) => {
373 let names_set: std::collections::HashSet<&str> =
374 names.iter().map(String::as_str).collect();
375 let mut new_tools: Vec<Box<dyn Tool>> = Vec::new();
376 let mut new_index: HashMap<String, usize> = HashMap::new();
377 for tool in self.tools.drain(..) {
378 let raws = tool.raw_tools();
379 if raws.iter().any(|r| names_set.contains(r.function.name.as_str())) {
380 continue;
381 }
382 let idx = new_tools.len();
383 for raw in raws {
384 new_index.insert(raw.function.name.clone(), idx);
385 }
386 new_tools.push(tool);
387 }
388 self.tools = new_tools;
389 self.tool_index = new_index;
390 }
391 }
392 }
393 }
394 }
395
396 /// Attach a tool-injection channel to the agent (builder-style).
397 ///
398 /// Returns the agent and the sender half. Send [`ToolInjection::Add`] or
399 /// [`ToolInjection::Remove`] at any time; changes are applied at the next
400 /// `Idle` transition (i.e. before the API turn that follows the current
401 /// tool-execution round).
402 pub fn with_tool_inject_channel(
403 mut self,
404 ) -> (Self, mpsc::UnboundedSender<ToolInjection>) {
405 let (tx, rx) = mpsc::unbounded_channel();
406 self.tool_inject_rx = Some(rx);
407 (self, tx)
408 }
409}