ds_api/agent/agent_core.rs
1use std::collections::HashMap;
2
3use crate::api::ApiClient;
4use crate::conversation::{Conversation, LlmSummarizer, Summarizer};
5use crate::raw::request::message::{Message, Role};
6use crate::tool_trait::Tool;
7use serde_json::Value;
8use tokio::sync::mpsc;
9
10/// A tool call fragment emitted by [`AgentStream`][crate::agent::AgentStream].
11///
12/// In streaming mode multiple `ToolCallChunk`s are emitted per tool call:
13/// the first has an empty `delta` (name is known, no args yet); subsequent
14/// chunks carry incremental argument JSON. In non-streaming mode a single
15/// chunk is emitted with the complete argument JSON in `delta`.
16#[derive(Debug, Clone)]
17pub struct ToolCallChunk {
18 pub id: String,
19 pub name: String,
20 pub delta: String,
21 pub index: u32,
22}
23
24/// The result of a completed tool invocation.
25///
26/// Yielded as `AgentEvent::ToolResult` after the tool has finished executing.
27#[derive(Debug, Clone)]
28pub struct ToolCallResult {
29 pub id: String,
30 pub name: String,
31 pub args: String,
32 pub result: Value,
33}
34
35/// Events emitted by [`AgentStream`][crate::agent::AgentStream].
36///
37/// Each variant represents a distinct, self-contained event in the agent lifecycle:
38///
39/// - `Token(String)` — a text fragment from the assistant. In streaming mode each
40/// `Token` is a single SSE delta; in non-streaming mode the full response text
41/// arrives as one `Token`.
42/// - `ToolCall(id, name, delta)` — a tool call fragment. Behaves exactly like
43/// `Token`: in streaming mode one event is emitted per SSE chunk (first chunk has
44/// an empty `delta` and carries the tool name; subsequent chunks carry incremental
45/// argument JSON). In non-streaming mode a single event is emitted with the
46/// complete arguments string. Accumulate `delta` values by `id` to reconstruct
47/// the full argument JSON. Execution begins after all chunks for a turn are
48/// delivered.
49/// - `ToolResult(ToolCallResult)` — a tool has finished executing. One event is
50/// emitted per call, in the same order as the corresponding `ToolCall` events.
51#[derive(Debug, Clone)]
52pub enum AgentEvent {
53 Token(String),
54 /// Emitted when the model produces reasoning/thinking content (e.g. deepseek-reasoner).
55 /// In streaming mode this arrives token-by-token before the main reply.
56 ReasoningToken(String),
57 ToolCall(ToolCallChunk),
58 ToolResult(ToolCallResult),
59}
60
61/// An agent that combines a [`Conversation`] with a set of callable tools.
62///
63/// Build one with the fluent builder methods, then call [`chat`][DeepseekAgent::chat]
64/// to start a turn:
65///
66/// ```no_run
67/// use ds_api::{DeepseekAgent, tool};
68/// use serde_json::{Value, json};
69///
70/// struct MyTool;
71///
72/// #[tool]
73/// impl ds_api::Tool for MyTool {
74/// async fn greet(&self, name: String) -> Value {
75/// json!({ "greeting": format!("Hello, {name}!") })
76/// }
77/// }
78///
79/// # #[tokio::main] async fn main() {
80/// let agent = DeepseekAgent::new("sk-...")
81/// .add_tool(MyTool);
82/// # }
83/// ```
84pub struct DeepseekAgent {
85 /// The conversation manages history, the API client, and context-window compression.
86 pub(crate) conversation: Conversation,
87 pub(crate) tools: Vec<Box<dyn Tool>>,
88 pub(crate) tool_index: HashMap<String, usize>,
89 /// When `true` the agent uses SSE streaming for each API turn so `Token` events
90 /// arrive incrementally. When `false` (default) the full response is awaited.
91 pub(crate) streaming: bool,
92 /// The model to use for every API turn. Defaults to `"deepseek-chat"`.
93 pub(crate) model: String,
94 /// Always-on channel for injecting user messages mid-loop.
95 pub(crate) interrupt_tx: mpsc::UnboundedSender<String>,
96 pub(crate) interrupt_rx: mpsc::UnboundedReceiver<String>,
97 /// Always-on channel for injecting or removing tools at runtime.
98 pub(crate) tool_inject_tx: mpsc::UnboundedSender<ToolInjection>,
99 pub(crate) tool_inject_rx: mpsc::UnboundedReceiver<ToolInjection>,
100 /// Optional map of extra top-level JSON fields to merge into the API request body.
101 pub(crate) extra_body: Option<serde_json::Map<String, serde_json::Value>>,
102}
103
104/// A runtime tool-injection command sent through the channel created by
105/// [`DeepseekAgent::with_tool_inject_channel`].
106pub enum ToolInjection {
107 /// Add a new tool to the running agent.
108 Add(Box<dyn Tool>),
109 /// Remove all tools whose `raw_tools()` names are in the given set.
110 Remove(Vec<String>),
111}
112
113impl DeepseekAgent {
114 fn from_parts(client: ApiClient, model: impl Into<String>) -> Self {
115 let model = model.into();
116 let summarizer = LlmSummarizer::new(client.clone()).with_model(model.clone());
117 let (interrupt_tx, interrupt_rx) = mpsc::unbounded_channel();
118 let (tool_inject_tx, tool_inject_rx) = mpsc::unbounded_channel();
119 Self {
120 conversation: Conversation::new(client).with_summarizer(summarizer),
121 tools: vec![],
122 tool_index: HashMap::new(),
123 streaming: false,
124 model,
125 interrupt_tx,
126 interrupt_rx,
127 tool_inject_tx,
128 tool_inject_rx,
129 extra_body: None,
130 }
131 }
132
133 /// Create a new agent targeting the DeepSeek API with `deepseek-chat`.
134 pub fn new(token: impl Into<String>) -> Self {
135 Self::from_parts(ApiClient::new(token), "deepseek-chat")
136 }
137
138 /// Create an agent targeting an OpenAI-compatible provider.
139 ///
140 /// All three parameters are set at construction time and never change:
141 ///
142 /// ```no_run
143 /// use ds_api::DeepseekAgent;
144 ///
145 /// let agent = DeepseekAgent::custom(
146 /// "sk-or-...",
147 /// "https://openrouter.ai/api/v1",
148 /// "meta-llama/llama-3.3-70b-instruct:free",
149 /// );
150 /// ```
151 pub fn custom(
152 token: impl Into<String>,
153 base_url: impl Into<String>,
154 model: impl Into<String>,
155 ) -> Self {
156 let client = ApiClient::new(token).with_base_url(base_url);
157 Self::from_parts(client, model)
158 }
159
160 /// Register a tool (builder-style, supports chaining).
161 ///
162 /// The tool's protocol-level function names are indexed so incoming tool-call
163 /// requests from the model can be dispatched to the correct implementation.
164 pub fn add_tool<TT: Tool + 'static>(mut self, tool: TT) -> Self {
165 let idx = self.tools.len();
166 for raw in tool.raw_tools() {
167 self.tool_index.insert(raw.function.name.clone(), idx);
168 }
169 self.tools.push(Box::new(tool));
170 self
171 }
172
173 /// Push a user message and return an [`AgentStream`][crate::agent::AgentStream]
174 /// that drives the full agent loop (API calls + tool execution).
175 pub fn chat(mut self, user_message: &str) -> crate::agent::stream::AgentStream {
176 self.conversation.push_user_input(user_message);
177 crate::agent::stream::AgentStream::new(self)
178 }
179
180 /// Start an agent turn from the current history **without** pushing a new
181 /// user message first.
182 ///
183 /// Use this when you have already appended the user message manually (e.g.
184 /// via [`push_user_message_with_name`][Self::push_user_message_with_name])
185 /// and want to drive the agent loop from that point.
186 pub fn chat_from_history(self) -> crate::agent::stream::AgentStream {
187 crate::agent::stream::AgentStream::new(self)
188 }
189
190 /// Enable SSE streaming for each API turn (builder-style).
191 pub fn with_streaming(mut self) -> Self {
192 self.streaming = true;
193 self
194 }
195
196 /// Merge arbitrary top-level JSON key/value pairs into the request body for
197 /// the next API turn. The pairs are stored on the agent and later merged
198 /// into the `ApiRequest` raw body when a request is built.
199 ///
200 /// Example:
201 /// let mut map = serde_json::Map::new();
202 /// map.insert(\"foo\".to_string(), serde_json::json!(\"bar\"));
203 /// let agent = DeepseekAgent::new(\"sk-...\").extra_body(map);
204 pub fn extra_body(mut self, map: serde_json::Map<String, serde_json::Value>) -> Self {
205 if let Some(ref mut existing) = self.extra_body {
206 existing.extend(map);
207 } else {
208 self.extra_body = Some(map);
209 }
210 self
211 }
212
213 /// Add a single extra top-level field to be merged into the request body.
214 /// Convenience helper to avoid constructing a full map.
215 pub fn extra_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
216 if let Some(ref mut m) = self.extra_body {
217 m.insert(key.into(), value);
218 } else {
219 let mut m = serde_json::Map::new();
220 m.insert(key.into(), value);
221 self.extra_body = Some(m);
222 }
223 self
224 }
225
226 /// Prepend a permanent system prompt to the conversation history (builder-style).
227 ///
228 /// System messages added this way are never removed by the built-in summarizers.
229 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
230 self.conversation
231 .history_mut()
232 .insert(0, Message::new(Role::System, &prompt.into()));
233 self
234 }
235
236 /// Replace the summarizer used for context-window management (builder-style).
237 pub fn with_summarizer(mut self, summarizer: impl Summarizer + 'static) -> Self {
238 self.conversation = self.conversation.with_summarizer(summarizer);
239 self
240 }
241
242 /// Seed the agent with an existing message history (builder-style).
243 ///
244 /// Used to restore a conversation from persistent storage (e.g. SQLite)
245 /// after a process restart. The messages are set directly on the
246 /// underlying `Conversation` and will be included in the next API call.
247 ///
248 /// # Example
249 ///
250 /// ```no_run
251 /// use ds_api::DeepseekAgent;
252 /// use ds_api::raw::request::message::{Message, Role};
253 ///
254 /// # #[tokio::main] async fn main() {
255 /// let history = vec![
256 /// Message::new(Role::User, "Hello"),
257 /// Message::new(Role::Assistant, "Hi there!"),
258 /// ];
259 /// let agent = DeepseekAgent::new("sk-...").with_history(history);
260 /// # }
261 /// ```
262 pub fn with_history(mut self, history: Vec<crate::raw::request::message::Message>) -> Self {
263 self.conversation = self.conversation.with_history(history);
264 self
265 }
266
267 /// Append a user message with an optional display name to the conversation
268 /// history.
269 ///
270 /// The `name` field is passed through to the API as-is (OpenAI-compatible
271 /// providers use it to distinguish speakers in a shared channel).
272 ///
273 /// # Example
274 ///
275 /// ```no_run
276 /// use ds_api::DeepseekAgent;
277 ///
278 /// # #[tokio::main] async fn main() {
279 /// let mut agent = DeepseekAgent::new("sk-...");
280 /// agent.push_user_message_with_name("What time is it?", Some("alice"));
281 /// # }
282 /// ```
283 pub fn push_user_message_with_name(&mut self, text: &str, name: Option<&str>) {
284 use crate::raw::request::message::{Message, Role};
285 let mut msg = Message::new(Role::User, text);
286 msg.name = name.map(|n| n.to_string());
287 self.conversation.history_mut().push(msg);
288 }
289
290 /// Read-only view of the current conversation history.
291 ///
292 /// Returns all messages in order, including system prompts, user turns,
293 /// assistant replies, tool calls, and tool results. Auto-summary messages
294 /// inserted by the built-in summarizers are also included.
295 ///
296 /// # Example
297 ///
298 /// ```no_run
299 /// use ds_api::DeepseekAgent;
300 ///
301 /// # #[tokio::main] async fn main() {
302 /// let agent = DeepseekAgent::new("sk-...");
303 /// for msg in agent.history() {
304 /// println!("{:?}: {:?}", msg.role, msg.content);
305 /// }
306 /// # }
307 /// ```
308 pub fn history(&self) -> &[crate::raw::request::message::Message] {
309 self.conversation.history()
310 }
311
312 /// Clone the sender half of the interrupt channel.
313 ///
314 /// Send any `String` at any time; the message will be appended to history
315 /// as a `Role::User` message before the next API turn.
316 pub fn interrupt_sender(&self) -> mpsc::UnboundedSender<String> {
317 self.interrupt_tx.clone()
318 }
319
320 /// Clone the sender half of the tool-injection channel.
321 ///
322 /// Send [`ToolInjection::Add`] or [`ToolInjection::Remove`] at any time;
323 /// changes are applied at the next `Idle` transition.
324 pub fn tool_inject_sender(&self) -> mpsc::UnboundedSender<ToolInjection> {
325 self.tool_inject_tx.clone()
326 }
327
328 /// Drain any pending messages from the interrupt channel and append them
329 /// to the conversation history as `Role::User` messages.
330 ///
331 /// Called by the state machine at every `Idle` transition.
332 pub(crate) fn drain_interrupts(&mut self) {
333 while let Ok(msg) = self.interrupt_rx.try_recv() {
334 self.conversation
335 .history_mut()
336 .push(Message::new(Role::User, &msg));
337 }
338 }
339
340 /// Drain any pending [`ToolInjection`]s and apply them.
341 ///
342 /// Called by the state machine at every `Idle` transition, right after
343 /// `drain_interrupts`, so added/removed tools take effect before the next
344 /// API turn.
345 pub(crate) fn drain_tool_injections(&mut self) {
346 while let Ok(injection) = self.tool_inject_rx.try_recv() {
347 match injection {
348 ToolInjection::Add(tool) => {
349 let idx = self.tools.len();
350 for raw in tool.raw_tools() {
351 self.tool_index.insert(raw.function.name.clone(), idx);
352 }
353 self.tools.push(tool);
354 }
355 ToolInjection::Remove(names) => {
356 let names_set: std::collections::HashSet<&str> =
357 names.iter().map(String::as_str).collect();
358 let mut new_tools: Vec<Box<dyn Tool>> = Vec::new();
359 let mut new_index: HashMap<String, usize> = HashMap::new();
360 for tool in self.tools.drain(..) {
361 let raws = tool.raw_tools();
362 if raws.iter().any(|r| names_set.contains(r.function.name.as_str())) {
363 continue;
364 }
365 let idx = new_tools.len();
366 for raw in raws {
367 new_index.insert(raw.function.name.clone(), idx);
368 }
369 new_tools.push(tool);
370 }
371 self.tools = new_tools;
372 self.tool_index = new_index;
373 }
374 }
375 }
376 }
377}