Skip to main content

quantum_sdk/
agent.rs

1use std::collections::HashMap;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::Stream;
6use pin_project_lite::pin_project;
7use serde::{Deserialize, Serialize};
8
9use crate::client::Client;
10use crate::error::Result;
11use crate::session::ContextConfig;
12
13// ---------------------------------------------------------------------------
14// Agent
15// ---------------------------------------------------------------------------
16
17/// Describes a worker agent in a multi-agent run.
18#[derive(Debug, Clone, Serialize, Deserialize, Default)]
19pub struct AgentWorker {
20    /// Worker name.
21    pub name: String,
22
23    /// Model ID for this worker.
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub model: Option<String>,
26
27    /// Worker tier (e.g. "fast", "thinking").
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub tier: Option<String>,
30
31    /// Description of this worker's role.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub description: Option<String>,
34}
35
36/// Request body for an agent run.
37#[derive(Debug, Clone, Serialize, Default)]
38pub struct AgentRequest {
39    /// Session identifier for continuity across runs.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub session_id: Option<String>,
42
43    /// The task for the agent to accomplish.
44    pub task: String,
45
46    /// Model for the conductor agent.
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub conductor_model: Option<String>,
49
50    /// Worker agents available to the conductor.
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub workers: Option<Vec<AgentWorker>>,
53
54    /// Maximum number of steps before stopping.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub max_steps: Option<i32>,
57
58    /// System prompt for the conductor.
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub system_prompt: Option<String>,
61
62    /// Context configuration for session management.
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub context_config: Option<crate::session::ContextConfig>,
65}
66
67// ---------------------------------------------------------------------------
68// Mission
69// ---------------------------------------------------------------------------
70
71/// Describes a named worker for a mission (map keyed by name).
72#[derive(Debug, Clone, Serialize, Deserialize, Default)]
73pub struct MissionWorker {
74    /// Model ID for this worker.
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub model: Option<String>,
77
78    /// Worker tier.
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub tier: Option<String>,
81
82    /// Description of this worker's purpose.
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub description: Option<String>,
85
86    /// Worker to escalate to on failure (e.g. cheap coder → expensive coder).
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub escalate_to: Option<String>,
89
90    /// Max retries before escalating (default 1).
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub max_retries: Option<i32>,
93}
94
95/// Request body for a mission run.
96#[derive(Debug, Clone, Serialize, Default)]
97pub struct MissionRequest {
98    /// The high-level goal for the mission.
99    pub goal: String,
100
101    /// Execution strategy hint.
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub strategy: Option<String>,
104
105    /// Model for the conductor.
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub conductor_model: Option<String>,
108
109    /// Conductor tier override. Default: "expensive".
110    /// Set to "cheap" when using a fast router as conductor.
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub conductor_tier: Option<String>,
113
114    /// Named workers (key = worker name).
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub workers: Option<HashMap<String, MissionWorker>>,
117
118    /// Maximum number of steps.
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub max_steps: Option<i32>,
121
122    /// System prompt for the conductor.
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub system_prompt: Option<String>,
125
126    /// Session identifier for continuity.
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub session_id: Option<String>,
129
130    /// Whether to auto-plan before execution.
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub auto_plan: Option<bool>,
133
134    /// Context management configuration.
135    #[serde(skip_serializing_if = "Option::is_none")]
136    pub context_config: Option<ContextConfig>,
137
138    /// Model for worker nodes (codegen strategy).
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub worker_model: Option<String>,
141
142    /// Deployment ID — route worker inference to a managed Vertex endpoint.
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub deployment_id: Option<String>,
145
146    /// Build command to run after codegen (e.g. "cargo build", "npm run build").
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub build_command: Option<String>,
149
150    /// Workspace directory for generated files.
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub workspace_path: Option<String>,
153}
154
155/// Backwards-compatible alias for [`AgentWorker`].
156pub type AgentWorkerConfig = AgentWorker;
157
158/// Backwards-compatible alias for [`MissionWorker`].
159pub type MissionWorkerConfig = MissionWorker;
160
161// ---------------------------------------------------------------------------
162// SSE Stream
163// ---------------------------------------------------------------------------
164
165/// A single event from an agent or mission SSE stream.
166#[derive(Debug, Clone, Deserialize)]
167pub struct AgentStreamEvent {
168    /// Event type (e.g. "step", "thought", "tool_call", "tool_result", "message", "error", "done").
169    #[serde(rename = "type", default)]
170    pub event_type: String,
171
172    /// Raw JSON payload for caller to interpret.
173    #[serde(flatten)]
174    pub data: HashMap<String, serde_json::Value>,
175}
176
177/// A single SSE event from an agent run stream.
178/// Alias for [`AgentStreamEvent`] for backwards compatibility.
179pub type AgentEvent = AgentStreamEvent;
180
181/// A single SSE event from a mission run stream.
182/// Alias for [`AgentStreamEvent`] since both use the same SSE format.
183pub type MissionEvent = AgentStreamEvent;
184
185pin_project! {
186    /// An async stream of [`AgentStreamEvent`]s from an agent or mission SSE response.
187    pub struct AgentStream {
188        #[pin]
189        inner: Pin<Box<dyn Stream<Item = AgentStreamEvent> + Send>>,
190    }
191}
192
193impl Stream for AgentStream {
194    type Item = AgentStreamEvent;
195
196    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197        self.project().inner.poll_next(cx)
198    }
199}
200
201/// Converts a byte stream into a stream of parsed [`AgentStreamEvent`]s.
202fn sse_to_agent_events<S>(byte_stream: S) -> impl Stream<Item = AgentStreamEvent> + Send
203where
204    S: Stream<Item = std::result::Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
205{
206    let pinned_stream = Box::pin(byte_stream);
207
208    let line_stream = futures_util::stream::unfold(
209        (pinned_stream, String::new()),
210        |(mut stream, mut buffer)| async move {
211            use futures_util::StreamExt;
212            loop {
213                if let Some(newline_pos) = buffer.find('\n') {
214                    let line = buffer[..newline_pos].trim_end_matches('\r').to_string();
215                    buffer = buffer[newline_pos + 1..].to_string();
216                    return Some((line, (stream, buffer)));
217                }
218
219                match stream.next().await {
220                    Some(Ok(chunk)) => {
221                        buffer.push_str(&String::from_utf8_lossy(&chunk));
222                    }
223                    Some(Err(_)) | None => {
224                        if !buffer.is_empty() {
225                            let remaining = std::mem::take(&mut buffer);
226                            return Some((remaining, (stream, buffer)));
227                        }
228                        return None;
229                    }
230                }
231            }
232        },
233    );
234
235    let pinned_lines = Box::pin(line_stream);
236    futures_util::stream::unfold(pinned_lines, |mut lines| async move {
237        use futures_util::StreamExt;
238        loop {
239            let line = lines.next().await?;
240
241            if !line.starts_with("data: ") {
242                continue;
243            }
244            let payload = &line["data: ".len()..];
245
246            if payload == "[DONE]" {
247                let ev = AgentStreamEvent {
248                    event_type: "done".to_string(),
249                    data: HashMap::new(),
250                };
251                return Some((ev, lines));
252            }
253
254            match serde_json::from_str::<AgentStreamEvent>(payload) {
255                Ok(ev) => return Some((ev, lines)),
256                Err(e) => {
257                    let mut data = HashMap::new();
258                    data.insert(
259                        "error".to_string(),
260                        serde_json::Value::String(format!("parse SSE: {e}")),
261                    );
262                    let ev = AgentStreamEvent {
263                        event_type: "error".to_string(),
264                        data,
265                    };
266                    return Some((ev, lines));
267                }
268            }
269        }
270    })
271}
272
273impl Client {
274    /// Starts an agent run and returns an SSE event stream.
275    ///
276    /// The agent orchestrates one or more worker models to accomplish the task,
277    /// streaming progress events as it works.
278    pub async fn agent_run(&self, req: &AgentRequest) -> Result<AgentStream> {
279        let (resp, _meta) = self.post_stream_raw("/qai/v1/agent", req).await?;
280        let byte_stream = resp.bytes_stream();
281        let event_stream = sse_to_agent_events(byte_stream);
282        Ok(AgentStream {
283            inner: Box::pin(event_stream),
284        })
285    }
286
287    /// Starts a mission run and returns an SSE event stream.
288    ///
289    /// Missions are higher-level than agents -- they can auto-plan, assign
290    /// named workers, and manage context across multiple steps.
291    pub async fn mission_run(&self, req: &MissionRequest) -> Result<AgentStream> {
292        let (resp, _meta) = self.post_stream_raw("/qai/v1/missions", req).await?;
293        let byte_stream = resp.bytes_stream();
294        let event_stream = sse_to_agent_events(byte_stream);
295        Ok(AgentStream {
296            inner: Box::pin(event_stream),
297        })
298    }
299}