Skip to main content

ds_api/agent/
agent_core.rs

1use std::collections::HashMap;
2
3use crate::api::ApiClient;
4use crate::conversation::{Conversation, LlmSummarizer, Summarizer};
5use crate::raw::request::message::{Message, Role};
6use crate::tool_trait::Tool;
7use serde_json::Value;
8use tokio::sync::mpsc;
9
10/// Information about a tool call requested by the model.
11///
12/// Yielded as `AgentEvent::ToolCall` when the model requests a tool invocation.
13/// At this point the tool has not yet been executed.
14#[derive(Debug, Clone)]
15pub struct ToolCallInfo {
16    pub id: String,
17    pub name: String,
18    pub args: Value,
19}
20
21/// The result of a completed tool invocation.
22///
23/// Yielded as `AgentEvent::ToolResult` after the tool has finished executing.
24#[derive(Debug, Clone)]
25pub struct ToolCallResult {
26    pub id: String,
27    pub name: String,
28    pub args: Value,
29    pub result: Value,
30}
31
32/// Events emitted by [`AgentStream`][crate::agent::AgentStream].
33///
34/// Each variant represents a distinct, self-contained event in the agent lifecycle:
35///
36/// - `Token(String)` — a text fragment from the assistant.  In streaming mode each
37///   `Token` is a single SSE delta; in non-streaming mode the full response text
38///   arrives as one `Token`.
39/// - `ToolCall(ToolCallInfo)` — the model has requested a tool invocation.  One event
40///   is emitted per call, before execution begins.
41/// - `ToolResult(ToolCallResult)` — a tool has finished executing.  One event is
42///   emitted per call, in the same order as the corresponding `ToolCall` events.
43#[derive(Debug, Clone)]
44pub enum AgentEvent {
45    Token(String),
46    ToolCall(ToolCallInfo),
47    ToolResult(ToolCallResult),
48}
49
50/// An agent that combines a [`Conversation`] with a set of callable tools.
51///
52/// Build one with the fluent builder methods, then call [`chat`][DeepseekAgent::chat]
53/// to start a turn:
54///
55/// ```no_run
56/// use ds_api::{DeepseekAgent, tool};
57/// use serde_json::{Value, json};
58///
59/// struct MyTool;
60///
61/// #[tool]
62/// impl ds_api::Tool for MyTool {
63///     async fn greet(&self, name: String) -> Value {
64///         json!({ "greeting": format!("Hello, {name}!") })
65///     }
66/// }
67///
68/// # #[tokio::main] async fn main() {
69/// let agent = DeepseekAgent::new("sk-...")
70///     .add_tool(MyTool);
71/// # }
72/// ```
73pub struct DeepseekAgent {
74    /// The conversation manages history, the API client, and context-window compression.
75    pub(crate) conversation: Conversation,
76    pub(crate) tools: Vec<Box<dyn Tool>>,
77    pub(crate) tool_index: HashMap<String, usize>,
78    /// When `true` the agent uses SSE streaming for each API turn so `Token` events
79    /// arrive incrementally.  When `false` (default) the full response is awaited.
80    pub(crate) streaming: bool,
81    /// The model to use for every API turn.  Defaults to `"deepseek-chat"`.
82    pub(crate) model: String,
83    /// Optional channel for injecting user messages mid-loop.
84    /// Messages received here are drained after each tool-execution round and
85    /// appended to the conversation history as `Role::User` messages before the
86    /// next API turn begins.
87    pub(crate) interrupt_rx: Option<mpsc::UnboundedReceiver<String>>,
88}
89
90impl DeepseekAgent {
91    fn from_parts(client: ApiClient, model: impl Into<String>) -> Self {
92        let model = model.into();
93        let summarizer = LlmSummarizer::new(client.clone()).with_model(model.clone());
94        Self {
95            conversation: Conversation::new(client).with_summarizer(summarizer),
96            tools: vec![],
97            tool_index: HashMap::new(),
98            streaming: false,
99            model,
100            interrupt_rx: None,
101        }
102    }
103
104    /// Create a new agent targeting the DeepSeek API with `deepseek-chat`.
105    pub fn new(token: impl Into<String>) -> Self {
106        Self::from_parts(ApiClient::new(token), "deepseek-chat")
107    }
108
109    /// Create an agent targeting an OpenAI-compatible provider.
110    ///
111    /// All three parameters are set at construction time and never change:
112    ///
113    /// ```no_run
114    /// use ds_api::DeepseekAgent;
115    ///
116    /// let agent = DeepseekAgent::custom(
117    ///     "sk-or-...",
118    ///     "https://openrouter.ai/api/v1",
119    ///     "meta-llama/llama-3.3-70b-instruct:free",
120    /// );
121    /// ```
122    pub fn custom(
123        token: impl Into<String>,
124        base_url: impl Into<String>,
125        model: impl Into<String>,
126    ) -> Self {
127        let client = ApiClient::new(token).with_base_url(base_url);
128        Self::from_parts(client, model)
129    }
130
131    /// Register a tool (builder-style, supports chaining).
132    ///
133    /// The tool's protocol-level function names are indexed so incoming tool-call
134    /// requests from the model can be dispatched to the correct implementation.
135    pub fn add_tool<TT: Tool + 'static>(mut self, tool: TT) -> Self {
136        let idx = self.tools.len();
137        for raw in tool.raw_tools() {
138            self.tool_index.insert(raw.function.name.clone(), idx);
139        }
140        self.tools.push(Box::new(tool));
141        self
142    }
143
144    /// Push a user message and return an [`AgentStream`][crate::agent::AgentStream]
145    /// that drives the full agent loop (API calls + tool execution).
146    pub fn chat(mut self, user_message: &str) -> crate::agent::stream::AgentStream {
147        self.conversation.push_user_input(user_message);
148        crate::agent::stream::AgentStream::new(self)
149    }
150
151    /// Start an agent turn from the current history **without** pushing a new
152    /// user message first.
153    ///
154    /// Use this when you have already appended the user message manually (e.g.
155    /// via [`push_user_message_with_name`][Self::push_user_message_with_name])
156    /// and want to drive the agent loop from that point.
157    pub fn chat_from_history(self) -> crate::agent::stream::AgentStream {
158        crate::agent::stream::AgentStream::new(self)
159    }
160
161    /// Enable SSE streaming for each API turn (builder-style).
162    pub fn with_streaming(mut self) -> Self {
163        self.streaming = true;
164        self
165    }
166
167    /// Prepend a permanent system prompt to the conversation history (builder-style).
168    ///
169    /// System messages added this way are never removed by the built-in summarizers.
170    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
171        self.conversation
172            .add_message(Message::new(Role::System, &prompt.into()));
173        self
174    }
175
176    /// Replace the summarizer used for context-window management (builder-style).
177    pub fn with_summarizer(mut self, summarizer: impl Summarizer + 'static) -> Self {
178        self.conversation = self.conversation.with_summarizer(summarizer);
179        self
180    }
181
182    /// Seed the agent with an existing message history (builder-style).
183    ///
184    /// Used to restore a conversation from persistent storage (e.g. SQLite)
185    /// after a process restart.  The messages are set directly on the
186    /// underlying `Conversation` and will be included in the next API call.
187    ///
188    /// # Example
189    ///
190    /// ```no_run
191    /// use ds_api::DeepseekAgent;
192    /// use ds_api::raw::request::message::{Message, Role};
193    ///
194    /// # #[tokio::main] async fn main() {
195    /// let history = vec![
196    ///     Message::new(Role::User, "Hello"),
197    ///     Message::new(Role::Assistant, "Hi there!"),
198    /// ];
199    /// let agent = DeepseekAgent::new("sk-...").with_history(history);
200    /// # }
201    /// ```
202    pub fn with_history(mut self, history: Vec<crate::raw::request::message::Message>) -> Self {
203        self.conversation = self.conversation.with_history(history);
204        self
205    }
206
207    /// Append a user message with an optional display name to the conversation
208    /// history.
209    ///
210    /// The `name` field is passed through to the API as-is (OpenAI-compatible
211    /// providers use it to distinguish speakers in a shared channel).
212    ///
213    /// # Example
214    ///
215    /// ```no_run
216    /// use ds_api::DeepseekAgent;
217    ///
218    /// # #[tokio::main] async fn main() {
219    /// let mut agent = DeepseekAgent::new("sk-...");
220    /// agent.push_user_message_with_name("What time is it?", Some("alice"));
221    /// # }
222    /// ```
223    pub fn push_user_message_with_name(&mut self, text: &str, name: Option<&str>) {
224        use crate::raw::request::message::{Message, Role};
225        let mut msg = Message::new(Role::User, text);
226        msg.name = name.map(|n| n.to_string());
227        self.conversation.history_mut().push(msg);
228    }
229
230    /// Read-only view of the current conversation history.
231    ///
232    /// Returns all messages in order, including system prompts, user turns,
233    /// assistant replies, tool calls, and tool results.  Auto-summary messages
234    /// inserted by the built-in summarizers are also included.
235    ///
236    /// # Example
237    ///
238    /// ```no_run
239    /// use ds_api::DeepseekAgent;
240    ///
241    /// # #[tokio::main] async fn main() {
242    /// let agent = DeepseekAgent::new("sk-...");
243    /// for msg in agent.history() {
244    ///     println!("{:?}: {:?}", msg.role, msg.content);
245    /// }
246    /// # }
247    /// ```
248    pub fn history(&self) -> &[crate::raw::request::message::Message] {
249        self.conversation.history()
250    }
251
252    /// Attach an interrupt channel to the agent (builder-style).
253    ///
254    /// Returns the agent and the sender half of the channel.  Send any `String`
255    /// through the `UnboundedSender` at any time; the message will be picked up
256    /// after the current tool-execution round finishes and inserted into the
257    /// conversation history as a `Role::User` message before the next API turn.
258    ///
259    /// # Example
260    ///
261    /// ```no_run
262    /// use ds_api::DeepseekAgent;
263    /// use tokio::sync::mpsc;
264    ///
265    /// # #[tokio::main] async fn main() {
266    /// let (agent, tx) = DeepseekAgent::new("sk-...")
267    ///     .with_interrupt_channel();
268    ///
269    /// // In another task or callback:
270    /// tx.send("Actually, use Python instead.".into()).unwrap();
271    /// # }
272    /// ```
273    pub fn with_interrupt_channel(mut self) -> (Self, mpsc::UnboundedSender<String>) {
274        let (tx, rx) = mpsc::unbounded_channel();
275        self.interrupt_rx = Some(rx);
276        (self, tx)
277    }
278
279    /// Drain any pending messages from the interrupt channel and append them
280    /// to the conversation history as `Role::User` messages.
281    ///
282    /// Called by the state machine in [`AgentStream`] at the top of every
283    /// `Idle` transition so that injected messages are visible before each API
284    /// turn, not just after tool-execution rounds.
285    pub(crate) fn drain_interrupts(&mut self) {
286        if let Some(rx) = self.interrupt_rx.as_mut() {
287            while let Ok(msg) = rx.try_recv() {
288                self.conversation
289                    .history_mut()
290                    .push(Message::new(Role::User, &msg));
291            }
292        }
293    }
294}