1use std::collections::HashMap;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::Stream;
6use pin_project_lite::pin_project;
7use serde::{Deserialize, Serialize};
8
9use crate::client::Client;
10use crate::error::Result;
11use crate::session::ContextConfig;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
19pub struct AgentWorker {
20 pub name: String,
22
23 #[serde(skip_serializing_if = "Option::is_none")]
25 pub model: Option<String>,
26
27 #[serde(skip_serializing_if = "Option::is_none")]
29 pub tier: Option<String>,
30
31 #[serde(skip_serializing_if = "Option::is_none")]
33 pub description: Option<String>,
34}
35
36#[derive(Debug, Clone, Serialize, Default)]
38pub struct AgentRequest {
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub session_id: Option<String>,
42
43 pub task: String,
45
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub conductor_model: Option<String>,
49
50 #[serde(skip_serializing_if = "Option::is_none")]
52 pub workers: Option<Vec<AgentWorker>>,
53
54 #[serde(skip_serializing_if = "Option::is_none")]
56 pub max_steps: Option<i32>,
57
58 #[serde(skip_serializing_if = "Option::is_none")]
60 pub system_prompt: Option<String>,
61
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub context_config: Option<crate::session::ContextConfig>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, Default)]
73pub struct MissionWorker {
74 #[serde(skip_serializing_if = "Option::is_none")]
76 pub model: Option<String>,
77
78 #[serde(skip_serializing_if = "Option::is_none")]
80 pub tier: Option<String>,
81
82 #[serde(skip_serializing_if = "Option::is_none")]
84 pub description: Option<String>,
85
86 #[serde(skip_serializing_if = "Option::is_none")]
88 pub escalate_to: Option<String>,
89
90 #[serde(skip_serializing_if = "Option::is_none")]
92 pub max_retries: Option<i32>,
93}
94
95#[derive(Debug, Clone, Serialize, Default)]
97pub struct MissionRequest {
98 pub goal: String,
100
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub strategy: Option<String>,
104
105 #[serde(skip_serializing_if = "Option::is_none")]
107 pub conductor_model: Option<String>,
108
109 #[serde(skip_serializing_if = "Option::is_none")]
112 pub conductor_tier: Option<String>,
113
114 #[serde(skip_serializing_if = "Option::is_none")]
116 pub workers: Option<HashMap<String, MissionWorker>>,
117
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub max_steps: Option<i32>,
121
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub system_prompt: Option<String>,
125
126 #[serde(skip_serializing_if = "Option::is_none")]
128 pub session_id: Option<String>,
129
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub auto_plan: Option<bool>,
133
134 #[serde(skip_serializing_if = "Option::is_none")]
136 pub context_config: Option<ContextConfig>,
137
138 #[serde(skip_serializing_if = "Option::is_none")]
140 pub worker_model: Option<String>,
141
142 #[serde(skip_serializing_if = "Option::is_none")]
144 pub deployment_id: Option<String>,
145
146 #[serde(skip_serializing_if = "Option::is_none")]
148 pub build_command: Option<String>,
149
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub workspace_path: Option<String>,
153}
154
155pub type AgentWorkerConfig = AgentWorker;
157
158pub type MissionWorkerConfig = MissionWorker;
160
161#[derive(Debug, Clone, Deserialize)]
167pub struct AgentStreamEvent {
168 #[serde(rename = "type", default)]
170 pub event_type: String,
171
172 #[serde(flatten)]
174 pub data: HashMap<String, serde_json::Value>,
175}
176
177pub type AgentEvent = AgentStreamEvent;
180
181pub type MissionEvent = AgentStreamEvent;
184
185pin_project! {
186 pub struct AgentStream {
188 #[pin]
189 inner: Pin<Box<dyn Stream<Item = AgentStreamEvent> + Send>>,
190 }
191}
192
193impl Stream for AgentStream {
194 type Item = AgentStreamEvent;
195
196 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197 self.project().inner.poll_next(cx)
198 }
199}
200
201fn sse_to_agent_events<S>(byte_stream: S) -> impl Stream<Item = AgentStreamEvent> + Send
203where
204 S: Stream<Item = std::result::Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
205{
206 let pinned_stream = Box::pin(byte_stream);
207
208 let line_stream = futures_util::stream::unfold(
209 (pinned_stream, String::new()),
210 |(mut stream, mut buffer)| async move {
211 use futures_util::StreamExt;
212 loop {
213 if let Some(newline_pos) = buffer.find('\n') {
214 let line = buffer[..newline_pos].trim_end_matches('\r').to_string();
215 buffer = buffer[newline_pos + 1..].to_string();
216 return Some((line, (stream, buffer)));
217 }
218
219 match stream.next().await {
220 Some(Ok(chunk)) => {
221 buffer.push_str(&String::from_utf8_lossy(&chunk));
222 }
223 Some(Err(_)) | None => {
224 if !buffer.is_empty() {
225 let remaining = std::mem::take(&mut buffer);
226 return Some((remaining, (stream, buffer)));
227 }
228 return None;
229 }
230 }
231 }
232 },
233 );
234
235 let pinned_lines = Box::pin(line_stream);
236 futures_util::stream::unfold(pinned_lines, |mut lines| async move {
237 use futures_util::StreamExt;
238 loop {
239 let line = lines.next().await?;
240
241 if !line.starts_with("data: ") {
242 continue;
243 }
244 let payload = &line["data: ".len()..];
245
246 if payload == "[DONE]" {
247 let ev = AgentStreamEvent {
248 event_type: "done".to_string(),
249 data: HashMap::new(),
250 };
251 return Some((ev, lines));
252 }
253
254 match serde_json::from_str::<AgentStreamEvent>(payload) {
255 Ok(ev) => return Some((ev, lines)),
256 Err(e) => {
257 let mut data = HashMap::new();
258 data.insert(
259 "error".to_string(),
260 serde_json::Value::String(format!("parse SSE: {e}")),
261 );
262 let ev = AgentStreamEvent {
263 event_type: "error".to_string(),
264 data,
265 };
266 return Some((ev, lines));
267 }
268 }
269 }
270 })
271}
272
273impl Client {
274 pub async fn agent_run(&self, req: &AgentRequest) -> Result<AgentStream> {
279 let (resp, _meta) = self.post_stream_raw("/qai/v1/agent", req).await?;
280 let byte_stream = resp.bytes_stream();
281 let event_stream = sse_to_agent_events(byte_stream);
282 Ok(AgentStream {
283 inner: Box::pin(event_stream),
284 })
285 }
286
287 pub async fn mission_run(&self, req: &MissionRequest) -> Result<AgentStream> {
292 let (resp, _meta) = self.post_stream_raw("/qai/v1/missions", req).await?;
293 let byte_stream = resp.bytes_stream();
294 let event_stream = sse_to_agent_events(byte_stream);
295 Ok(AgentStream {
296 inner: Box::pin(event_stream),
297 })
298 }
299}