Skip to main content

ds_api/agent/
agent_core.rs

1use std::collections::HashMap;
2
3use crate::api::ApiClient;
4use crate::conversation::{Conversation, LlmSummarizer, Summarizer};
5use crate::raw::request::message::{Message, Role};
6use crate::tool_trait::Tool;
7use serde_json::Value;
8use tokio::sync::mpsc;
9
10/// A tool call fragment emitted by [`AgentStream`][crate::agent::AgentStream].
11///
12/// In streaming mode multiple `ToolCallChunk`s are emitted per tool call:
13/// the first has an empty `delta` (name is known, no args yet); subsequent
14/// chunks carry incremental argument JSON.  In non-streaming mode a single
15/// chunk is emitted with the complete argument JSON in `delta`.
16#[derive(Debug, Clone)]
17pub struct ToolCallChunk {
18    pub id: String,
19    pub name: String,
20    pub delta: String,
21    pub index: u32,
22}
23
24/// The result of a completed tool invocation.
25///
26/// Yielded as `AgentEvent::ToolResult` after the tool has finished executing.
27#[derive(Debug, Clone)]
28pub struct ToolCallResult {
29    pub id: String,
30    pub name: String,
31    pub args: String,
32    pub result: Value,
33}
34
35/// Events emitted by [`AgentStream`][crate::agent::AgentStream].
36///
37/// Each variant represents a distinct, self-contained event in the agent lifecycle:
38///
39/// - `Token(String)` — a text fragment from the assistant.  In streaming mode each
40///   `Token` is a single SSE delta; in non-streaming mode the full response text
41///   arrives as one `Token`.
42/// - `ToolCall(id, name, delta)` — a tool call fragment.  Behaves exactly like
43///   `Token`: in streaming mode one event is emitted per SSE chunk (first chunk has
44///   an empty `delta` and carries the tool name; subsequent chunks carry incremental
45///   argument JSON).  In non-streaming mode a single event is emitted with the
46///   complete arguments string.  Accumulate `delta` values by `id` to reconstruct
47///   the full argument JSON.  Execution begins after all chunks for a turn are
48///   delivered.
49/// - `ToolResult(ToolCallResult)` — a tool has finished executing.  One event is
50///   emitted per call, in the same order as the corresponding `ToolCall` events.
51#[derive(Debug, Clone)]
52pub enum AgentEvent {
53    Token(String),
54    /// Emitted when the model produces reasoning/thinking content (e.g. deepseek-reasoner).
55    /// In streaming mode this arrives token-by-token before the main reply.
56    ReasoningToken(String),
57    ToolCall(ToolCallChunk),
58    ToolResult(ToolCallResult),
59}
60
61/// An agent that combines a [`Conversation`] with a set of callable tools.
62///
63/// Build one with the fluent builder methods, then call [`chat`][DeepseekAgent::chat]
64/// to start a turn:
65///
66/// ```no_run
67/// use ds_api::{DeepseekAgent, tool};
68/// use serde_json::{Value, json};
69///
70/// struct MyTool;
71///
72/// #[tool]
73/// impl ds_api::Tool for MyTool {
74///     async fn greet(&self, name: String) -> Value {
75///         json!({ "greeting": format!("Hello, {name}!") })
76///     }
77/// }
78///
79/// # #[tokio::main] async fn main() {
80/// let agent = DeepseekAgent::new("sk-...")
81///     .add_tool(MyTool);
82/// # }
83/// ```
84pub struct DeepseekAgent {
85    /// The conversation manages history, the API client, and context-window compression.
86    pub(crate) conversation: Conversation,
87    pub(crate) tools: Vec<Box<dyn Tool>>,
88    pub(crate) tool_index: HashMap<String, usize>,
89    /// When `true` the agent uses SSE streaming for each API turn so `Token` events
90    /// arrive incrementally.  When `false` (default) the full response is awaited.
91    pub(crate) streaming: bool,
92    /// The model to use for every API turn.  Defaults to `"deepseek-chat"`.
93    pub(crate) model: String,
94    /// Optional channel for injecting user messages mid-loop.
95    /// Messages received here are drained after each tool-execution round and
96    /// appended to the conversation history as `Role::User` messages before the
97    /// next API turn begins.
98    pub(crate) interrupt_rx: Option<mpsc::UnboundedReceiver<String>>,
99    /// Optional map of extra top-level JSON fields to merge into the API request body.
100    /// This is used by the builder helpers below to attach custom provider-specific
101    /// fields that the typed request doesn't yet expose.
102    pub(crate) extra_body: Option<serde_json::Map<String, serde_json::Value>>,
103    /// Optional channel for injecting or removing tools at runtime.
104    /// Messages received here are drained at every `Idle` transition,
105    /// before the next API turn begins — same timing as `interrupt_rx`.
106    pub(crate) tool_inject_rx: Option<mpsc::UnboundedReceiver<ToolInjection>>,
107}
108
109/// A runtime tool-injection command sent through the channel created by
110/// [`DeepseekAgent::with_tool_inject_channel`].
111pub enum ToolInjection {
112    /// Add a new tool to the running agent.
113    Add(Box<dyn Tool>),
114    /// Remove all tools whose `raw_tools()` names are in the given set.
115    Remove(Vec<String>),
116}
117
118impl DeepseekAgent {
119    fn from_parts(client: ApiClient, model: impl Into<String>) -> Self {
120        let model = model.into();
121        let summarizer = LlmSummarizer::new(client.clone()).with_model(model.clone());
122        Self {
123            conversation: Conversation::new(client).with_summarizer(summarizer),
124            tools: vec![],
125            tool_index: HashMap::new(),
126            streaming: false,
127            model,
128            interrupt_rx: None,
129            extra_body: None,
130            tool_inject_rx: None,
131        }
132    }
133
134    /// Create a new agent targeting the DeepSeek API with `deepseek-chat`.
135    pub fn new(token: impl Into<String>) -> Self {
136        Self::from_parts(ApiClient::new(token), "deepseek-chat")
137    }
138
139    /// Create an agent targeting an OpenAI-compatible provider.
140    ///
141    /// All three parameters are set at construction time and never change:
142    ///
143    /// ```no_run
144    /// use ds_api::DeepseekAgent;
145    ///
146    /// let agent = DeepseekAgent::custom(
147    ///     "sk-or-...",
148    ///     "https://openrouter.ai/api/v1",
149    ///     "meta-llama/llama-3.3-70b-instruct:free",
150    /// );
151    /// ```
152    pub fn custom(
153        token: impl Into<String>,
154        base_url: impl Into<String>,
155        model: impl Into<String>,
156    ) -> Self {
157        let client = ApiClient::new(token).with_base_url(base_url);
158        Self::from_parts(client, model)
159    }
160
161    /// Register a tool (builder-style, supports chaining).
162    ///
163    /// The tool's protocol-level function names are indexed so incoming tool-call
164    /// requests from the model can be dispatched to the correct implementation.
165    pub fn add_tool<TT: Tool + 'static>(mut self, tool: TT) -> Self {
166        let idx = self.tools.len();
167        for raw in tool.raw_tools() {
168            self.tool_index.insert(raw.function.name.clone(), idx);
169        }
170        self.tools.push(Box::new(tool));
171        self
172    }
173
174    /// Push a user message and return an [`AgentStream`][crate::agent::AgentStream]
175    /// that drives the full agent loop (API calls + tool execution).
176    pub fn chat(mut self, user_message: &str) -> crate::agent::stream::AgentStream {
177        self.conversation.push_user_input(user_message);
178        crate::agent::stream::AgentStream::new(self)
179    }
180
181    /// Start an agent turn from the current history **without** pushing a new
182    /// user message first.
183    ///
184    /// Use this when you have already appended the user message manually (e.g.
185    /// via [`push_user_message_with_name`][Self::push_user_message_with_name])
186    /// and want to drive the agent loop from that point.
187    pub fn chat_from_history(self) -> crate::agent::stream::AgentStream {
188        crate::agent::stream::AgentStream::new(self)
189    }
190
191    /// Enable SSE streaming for each API turn (builder-style).
192    pub fn with_streaming(mut self) -> Self {
193        self.streaming = true;
194        self
195    }
196
197    /// Merge arbitrary top-level JSON key/value pairs into the request body for
198    /// the next API turn. The pairs are stored on the agent and later merged
199    /// into the `ApiRequest` raw body when a request is built.
200    ///
201    /// Example:
202    /// let mut map = serde_json::Map::new();
203    /// map.insert(\"foo\".to_string(), serde_json::json!(\"bar\"));
204    /// let agent = DeepseekAgent::new(\"sk-...\").extra_body(map);
205    pub fn extra_body(mut self, map: serde_json::Map<String, serde_json::Value>) -> Self {
206        if let Some(ref mut existing) = self.extra_body {
207            existing.extend(map);
208        } else {
209            self.extra_body = Some(map);
210        }
211        self
212    }
213
214    /// Add a single extra top-level field to be merged into the request body.
215    /// Convenience helper to avoid constructing a full map.
216    pub fn extra_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
217        if let Some(ref mut m) = self.extra_body {
218            m.insert(key.into(), value);
219        } else {
220            let mut m = serde_json::Map::new();
221            m.insert(key.into(), value);
222            self.extra_body = Some(m);
223        }
224        self
225    }
226
227    /// Prepend a permanent system prompt to the conversation history (builder-style).
228    ///
229    /// System messages added this way are never removed by the built-in summarizers.
230    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
231        self.conversation
232            .history_mut()
233            .insert(0, Message::new(Role::System, &prompt.into()));
234        self
235    }
236
237    /// Replace the summarizer used for context-window management (builder-style).
238    pub fn with_summarizer(mut self, summarizer: impl Summarizer + 'static) -> Self {
239        self.conversation = self.conversation.with_summarizer(summarizer);
240        self
241    }
242
243    /// Seed the agent with an existing message history (builder-style).
244    ///
245    /// Used to restore a conversation from persistent storage (e.g. SQLite)
246    /// after a process restart.  The messages are set directly on the
247    /// underlying `Conversation` and will be included in the next API call.
248    ///
249    /// # Example
250    ///
251    /// ```no_run
252    /// use ds_api::DeepseekAgent;
253    /// use ds_api::raw::request::message::{Message, Role};
254    ///
255    /// # #[tokio::main] async fn main() {
256    /// let history = vec![
257    ///     Message::new(Role::User, "Hello"),
258    ///     Message::new(Role::Assistant, "Hi there!"),
259    /// ];
260    /// let agent = DeepseekAgent::new("sk-...").with_history(history);
261    /// # }
262    /// ```
263    pub fn with_history(mut self, history: Vec<crate::raw::request::message::Message>) -> Self {
264        self.conversation = self.conversation.with_history(history);
265        self
266    }
267
268    /// Append a user message with an optional display name to the conversation
269    /// history.
270    ///
271    /// The `name` field is passed through to the API as-is (OpenAI-compatible
272    /// providers use it to distinguish speakers in a shared channel).
273    ///
274    /// # Example
275    ///
276    /// ```no_run
277    /// use ds_api::DeepseekAgent;
278    ///
279    /// # #[tokio::main] async fn main() {
280    /// let mut agent = DeepseekAgent::new("sk-...");
281    /// agent.push_user_message_with_name("What time is it?", Some("alice"));
282    /// # }
283    /// ```
284    pub fn push_user_message_with_name(&mut self, text: &str, name: Option<&str>) {
285        use crate::raw::request::message::{Message, Role};
286        let mut msg = Message::new(Role::User, text);
287        msg.name = name.map(|n| n.to_string());
288        self.conversation.history_mut().push(msg);
289    }
290
291    /// Read-only view of the current conversation history.
292    ///
293    /// Returns all messages in order, including system prompts, user turns,
294    /// assistant replies, tool calls, and tool results.  Auto-summary messages
295    /// inserted by the built-in summarizers are also included.
296    ///
297    /// # Example
298    ///
299    /// ```no_run
300    /// use ds_api::DeepseekAgent;
301    ///
302    /// # #[tokio::main] async fn main() {
303    /// let agent = DeepseekAgent::new("sk-...");
304    /// for msg in agent.history() {
305    ///     println!("{:?}: {:?}", msg.role, msg.content);
306    /// }
307    /// # }
308    /// ```
309    pub fn history(&self) -> &[crate::raw::request::message::Message] {
310        self.conversation.history()
311    }
312
313    /// Attach an interrupt channel to the agent (builder-style).
314    ///
315    /// Returns the agent and the sender half of the channel.  Send any `String`
316    /// through the `UnboundedSender` at any time; the message will be picked up
317    /// after the current tool-execution round finishes and inserted into the
318    /// conversation history as a `Role::User` message before the next API turn.
319    ///
320    /// # Example
321    ///
322    /// ```no_run
323    /// use ds_api::DeepseekAgent;
324    /// use tokio::sync::mpsc;
325    ///
326    /// # #[tokio::main] async fn main() {
327    /// let (agent, tx) = DeepseekAgent::new("sk-...")
328    ///     .with_interrupt_channel();
329    ///
330    /// // In another task or callback:
331    /// tx.send("Actually, use Python instead.".into()).unwrap();
332    /// # }
333    /// ```
334    pub fn with_interrupt_channel(mut self) -> (Self, mpsc::UnboundedSender<String>) {
335        let (tx, rx) = mpsc::unbounded_channel();
336        self.interrupt_rx = Some(rx);
337        (self, tx)
338    }
339
340    /// Drain any pending messages from the interrupt channel and append them
341    /// to the conversation history as `Role::User` messages.
342    ///
343    /// Called by the state machine in [`AgentStream`] at the top of every
344    /// `Idle` transition so that injected messages are visible before each API
345    /// turn, not just after tool-execution rounds.
346    pub(crate) fn drain_interrupts(&mut self) {
347        if let Some(rx) = self.interrupt_rx.as_mut() {
348            while let Ok(msg) = rx.try_recv() {
349                self.conversation
350                    .history_mut()
351                    .push(Message::new(Role::User, &msg));
352            }
353        }
354    }
355
356    /// Drain any pending [`ToolInjection`]s from the channel and apply them.
357    ///
358    /// Called by the state machine at every `Idle` transition, right after
359    /// `drain_interrupts`, so added/removed tools take effect before the next
360    /// API turn.
361    pub(crate) fn drain_tool_injections(&mut self) {
362        if let Some(rx) = self.tool_inject_rx.as_mut() {
363            while let Ok(injection) = rx.try_recv() {
364                match injection {
365                    ToolInjection::Add(tool) => {
366                        let idx = self.tools.len();
367                        for raw in tool.raw_tools() {
368                            self.tool_index.insert(raw.function.name.clone(), idx);
369                        }
370                        self.tools.push(tool);
371                    }
372                    ToolInjection::Remove(names) => {
373                        let names_set: std::collections::HashSet<&str> =
374                            names.iter().map(String::as_str).collect();
375                        let mut new_tools: Vec<Box<dyn Tool>> = Vec::new();
376                        let mut new_index: HashMap<String, usize> = HashMap::new();
377                        for tool in self.tools.drain(..) {
378                            let raws = tool.raw_tools();
379                            if raws.iter().any(|r| names_set.contains(r.function.name.as_str())) {
380                                continue;
381                            }
382                            let idx = new_tools.len();
383                            for raw in raws {
384                                new_index.insert(raw.function.name.clone(), idx);
385                            }
386                            new_tools.push(tool);
387                        }
388                        self.tools = new_tools;
389                        self.tool_index = new_index;
390                    }
391                }
392            }
393        }
394    }
395
396    /// Attach a tool-injection channel to the agent (builder-style).
397    ///
398    /// Returns the agent and the sender half.  Send [`ToolInjection::Add`] or
399    /// [`ToolInjection::Remove`] at any time; changes are applied at the next
400    /// `Idle` transition (i.e. before the API turn that follows the current
401    /// tool-execution round).
402    pub fn with_tool_inject_channel(
403        mut self,
404    ) -> (Self, mpsc::UnboundedSender<ToolInjection>) {
405        let (tx, rx) = mpsc::unbounded_channel();
406        self.tool_inject_rx = Some(rx);
407        (self, tx)
408    }
409}