1use std::collections::HashMap;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::Stream;
6use pin_project_lite::pin_project;
7use serde::{Deserialize, Serialize};
8
9use crate::client::Client;
10use crate::error::Result;
11use crate::session::ContextConfig;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
19pub struct AgentWorker {
20 pub name: String,
22
23 #[serde(skip_serializing_if = "Option::is_none")]
25 pub model: Option<String>,
26
27 #[serde(skip_serializing_if = "Option::is_none")]
29 pub tier: Option<String>,
30
31 #[serde(skip_serializing_if = "Option::is_none")]
33 pub description: Option<String>,
34}
35
36#[derive(Debug, Clone, Serialize, Default)]
38pub struct AgentRequest {
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub session_id: Option<String>,
42
43 pub task: String,
45
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub conductor_model: Option<String>,
49
50 #[serde(skip_serializing_if = "Option::is_none")]
52 pub workers: Option<Vec<AgentWorker>>,
53
54 #[serde(skip_serializing_if = "Option::is_none")]
56 pub max_steps: Option<i32>,
57
58 #[serde(skip_serializing_if = "Option::is_none")]
60 pub system_prompt: Option<String>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
69pub struct MissionWorker {
70 #[serde(skip_serializing_if = "Option::is_none")]
72 pub model: Option<String>,
73
74 #[serde(skip_serializing_if = "Option::is_none")]
76 pub tier: Option<String>,
77
78 #[serde(skip_serializing_if = "Option::is_none")]
80 pub description: Option<String>,
81}
82
83#[derive(Debug, Clone, Serialize, Default)]
85pub struct MissionRequest {
86 pub goal: String,
88
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub strategy: Option<String>,
92
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub conductor_model: Option<String>,
96
97 #[serde(skip_serializing_if = "Option::is_none")]
99 pub workers: Option<HashMap<String, MissionWorker>>,
100
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub max_steps: Option<i32>,
104
105 #[serde(skip_serializing_if = "Option::is_none")]
107 pub system_prompt: Option<String>,
108
109 #[serde(skip_serializing_if = "Option::is_none")]
111 pub session_id: Option<String>,
112
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub auto_plan: Option<bool>,
116
117 #[serde(skip_serializing_if = "Option::is_none")]
119 pub context_config: Option<ContextConfig>,
120
121 #[serde(skip_serializing_if = "Option::is_none")]
123 pub worker_model: Option<String>,
124
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub deployment_id: Option<String>,
128
129 #[serde(skip_serializing_if = "Option::is_none")]
131 pub build_command: Option<String>,
132
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub workspace_path: Option<String>,
136}
137
138pub type AgentWorkerConfig = AgentWorker;
140
141pub type MissionWorkerConfig = MissionWorker;
143
144#[derive(Debug, Clone, Deserialize)]
150pub struct AgentStreamEvent {
151 #[serde(rename = "type", default)]
153 pub event_type: String,
154
155 #[serde(flatten)]
157 pub data: HashMap<String, serde_json::Value>,
158}
159
160pub type AgentEvent = AgentStreamEvent;
163
164pub type MissionEvent = AgentStreamEvent;
167
168pin_project! {
169 pub struct AgentStream {
171 #[pin]
172 inner: Pin<Box<dyn Stream<Item = AgentStreamEvent> + Send>>,
173 }
174}
175
176impl Stream for AgentStream {
177 type Item = AgentStreamEvent;
178
179 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180 self.project().inner.poll_next(cx)
181 }
182}
183
184fn sse_to_agent_events<S>(byte_stream: S) -> impl Stream<Item = AgentStreamEvent> + Send
186where
187 S: Stream<Item = std::result::Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
188{
189 let pinned_stream = Box::pin(byte_stream);
190
191 let line_stream = futures_util::stream::unfold(
192 (pinned_stream, String::new()),
193 |(mut stream, mut buffer)| async move {
194 use futures_util::StreamExt;
195 loop {
196 if let Some(newline_pos) = buffer.find('\n') {
197 let line = buffer[..newline_pos].trim_end_matches('\r').to_string();
198 buffer = buffer[newline_pos + 1..].to_string();
199 return Some((line, (stream, buffer)));
200 }
201
202 match stream.next().await {
203 Some(Ok(chunk)) => {
204 buffer.push_str(&String::from_utf8_lossy(&chunk));
205 }
206 Some(Err(_)) | None => {
207 if !buffer.is_empty() {
208 let remaining = std::mem::take(&mut buffer);
209 return Some((remaining, (stream, buffer)));
210 }
211 return None;
212 }
213 }
214 }
215 },
216 );
217
218 let pinned_lines = Box::pin(line_stream);
219 futures_util::stream::unfold(pinned_lines, |mut lines| async move {
220 use futures_util::StreamExt;
221 loop {
222 let line = lines.next().await?;
223
224 if !line.starts_with("data: ") {
225 continue;
226 }
227 let payload = &line["data: ".len()..];
228
229 if payload == "[DONE]" {
230 let ev = AgentStreamEvent {
231 event_type: "done".to_string(),
232 data: HashMap::new(),
233 };
234 return Some((ev, lines));
235 }
236
237 match serde_json::from_str::<AgentStreamEvent>(payload) {
238 Ok(ev) => return Some((ev, lines)),
239 Err(e) => {
240 let mut data = HashMap::new();
241 data.insert(
242 "error".to_string(),
243 serde_json::Value::String(format!("parse SSE: {e}")),
244 );
245 let ev = AgentStreamEvent {
246 event_type: "error".to_string(),
247 data,
248 };
249 return Some((ev, lines));
250 }
251 }
252 }
253 })
254}
255
256impl Client {
257 pub async fn agent_run(&self, req: &AgentRequest) -> Result<AgentStream> {
262 let (resp, _meta) = self.post_stream_raw("/qai/v1/agent", req).await?;
263 let byte_stream = resp.bytes_stream();
264 let event_stream = sse_to_agent_events(byte_stream);
265 Ok(AgentStream {
266 inner: Box::pin(event_stream),
267 })
268 }
269
270 pub async fn mission_run(&self, req: &MissionRequest) -> Result<AgentStream> {
275 let (resp, _meta) = self.post_stream_raw("/qai/v1/missions", req).await?;
276 let byte_stream = resp.bytes_stream();
277 let event_stream = sse_to_agent_events(byte_stream);
278 Ok(AgentStream {
279 inner: Box::pin(event_stream),
280 })
281 }
282}