routa_core/acp/docker/
adapter.rs1use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::broadcast;
11
12pub struct DockerOpenCodeAdapter {
14 base_url: String,
15 client: Client,
16 alive: Arc<AtomicBool>,
17 local_session_id: Arc<tokio::sync::RwLock<Option<String>>>,
18 remote_session_id: Arc<tokio::sync::RwLock<Option<String>>>,
19 notification_tx: broadcast::Sender<serde_json::Value>,
20}
21
22#[derive(Debug, Serialize)]
23#[serde(rename_all = "camelCase")]
24struct NewSessionRequest {
25 title: String,
26}
27
28#[derive(Debug, Deserialize)]
29#[serde(rename_all = "camelCase")]
30struct NewSessionResponse {
31 session_id: String,
32}
33
34#[derive(Debug, Serialize)]
35#[serde(rename_all = "camelCase")]
36struct PromptRequest {
37 session_id: String,
38 prompt: String,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 skill_content: Option<String>,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 workspace_id: Option<String>,
43}
44
45impl DockerOpenCodeAdapter {
46 pub fn new(base_url: &str, notification_tx: broadcast::Sender<serde_json::Value>) -> Self {
48 let base_url = base_url.trim_end_matches('/').to_string();
49
50 let client = Client::builder()
51 .timeout(Duration::from_secs(300))
52 .build()
53 .expect("Failed to create HTTP client");
54
55 Self {
56 base_url,
57 client,
58 alive: Arc::new(AtomicBool::new(false)),
59 local_session_id: Arc::new(tokio::sync::RwLock::new(None)),
60 remote_session_id: Arc::new(tokio::sync::RwLock::new(None)),
61 notification_tx,
62 }
63 }
64
65 pub fn is_alive(&self) -> bool {
67 self.alive.load(Ordering::SeqCst)
68 }
69
70 pub async fn connect(&self) -> Result<(), String> {
72 let url = format!("{}/health", self.base_url);
73 let resp = self
74 .client
75 .get(&url)
76 .send()
77 .await
78 .map_err(|e| format!("Health check failed: {}", e))?;
79
80 if !resp.status().is_success() {
81 return Err(format!(
82 "Docker OpenCode health check failed: {} {}",
83 resp.status().as_u16(),
84 resp.status().canonical_reason().unwrap_or("")
85 ));
86 }
87
88 self.alive.store(true, Ordering::SeqCst);
89 Ok(())
90 }
91
92 pub async fn create_session(&self, title: Option<&str>) -> Result<String, String> {
94 if !self.is_alive() {
95 return Err("DockerOpenCodeAdapter is not connected".to_string());
96 }
97
98 let url = format!("{}/session/new", self.base_url);
99 let body = NewSessionRequest {
100 title: title.unwrap_or("Routa Docker Session").to_string(),
101 };
102
103 let resp = self
104 .client
105 .post(&url)
106 .json(&body)
107 .send()
108 .await
109 .map_err(|e| format!("Failed to create session: {}", e))?;
110
111 if !resp.status().is_success() {
112 let status = resp.status();
113 let body = resp.text().await.unwrap_or_default();
114 return Err(format!(
115 "Failed to create docker OpenCode session: {} {}{}",
116 status.as_u16(),
117 status.canonical_reason().unwrap_or(""),
118 if body.is_empty() {
119 "".to_string()
120 } else {
121 format!(": {}", body)
122 }
123 ));
124 }
125
126 let response: NewSessionResponse = resp
127 .json()
128 .await
129 .map_err(|e| format!("Failed to parse session response: {}", e))?;
130
131 *self.remote_session_id.write().await = Some(response.session_id.clone());
132
133 Ok(response.session_id)
134 }
135
136 pub async fn set_local_session_id(&self, session_id: &str) {
138 *self.local_session_id.write().await = Some(session_id.to_string());
139 }
140
141 pub async fn get_remote_session_id(&self) -> Option<String> {
143 self.remote_session_id.read().await.clone()
144 }
145
146 pub async fn cancel(&self) -> Result<(), String> {
148 let remote_sid = self.remote_session_id.read().await.clone();
149 if let Some(session_id) = remote_sid {
150 let url = format!("{}/session/cancel", self.base_url);
151 let _ = self
152 .client
153 .post(&url)
154 .json(&serde_json::json!({ "sessionId": session_id }))
155 .send()
156 .await;
157 }
158 Ok(())
159 }
160
161 pub async fn prompt_stream(
163 &self,
164 text: &str,
165 acp_session_id: Option<&str>,
166 skill_content: Option<&str>,
167 workspace_id: Option<&str>,
168 ) -> Result<(), String> {
169 if !self.is_alive() {
170 return Err("Docker OpenCode session is not active".to_string());
171 }
172
173 let remote_sid = self.remote_session_id.read().await.clone();
174 let remote_session_id = remote_sid.ok_or("No remote session ID")?;
175
176 let local_sid = self.local_session_id.read().await.clone();
177 let session_id = acp_session_id
178 .map(|s| s.to_string())
179 .or(local_sid)
180 .unwrap_or(remote_session_id.clone());
181
182 let url = format!("{}/session/prompt", self.base_url);
183 let body = PromptRequest {
184 session_id: remote_session_id,
185 prompt: text.to_string(),
186 skill_content: skill_content.map(|s| s.to_string()),
187 workspace_id: workspace_id.map(|s| s.to_string()),
188 };
189
190 let resp = self
191 .client
192 .post(&url)
193 .json(&body)
194 .send()
195 .await
196 .map_err(|e| format!("Docker OpenCode prompt failed: {}", e))?;
197
198 if !resp.status().is_success() {
199 return Err(format!(
200 "Docker OpenCode prompt failed: {} {}",
201 resp.status().as_u16(),
202 resp.status().canonical_reason().unwrap_or("")
203 ));
204 }
205
206 let content_type = resp
208 .headers()
209 .get("content-type")
210 .and_then(|v| v.to_str().ok())
211 .unwrap_or("");
212
213 if content_type.contains("text/event-stream") {
214 let bytes = resp
216 .bytes()
217 .await
218 .map_err(|e| format!("Failed to read SSE stream: {}", e))?;
219 let text = String::from_utf8_lossy(&bytes);
220
221 self.parse_sse_stream(&text, &session_id).await;
222 } else {
223 if let Ok(json) = resp.json::<serde_json::Value>().await {
225 let content = json
226 .get("content")
227 .and_then(|v| v.as_str())
228 .or_else(|| json.get("message").and_then(|v| v.as_str()))
229 .unwrap_or("");
230
231 if !content.is_empty() {
232 let msg = self.agent_chunk(&session_id, content);
233 let _ = self.notification_tx.send(msg);
234 }
235 }
236 }
237
238 let complete = self.turn_complete(&session_id);
240 let _ = self.notification_tx.send(complete);
241
242 Ok(())
243 }
244
245 async fn parse_sse_stream(&self, text: &str, session_id: &str) {
247 for frame in text.split("\n\n") {
248 if !frame.starts_with("data:") {
249 continue;
250 }
251
252 let payload = frame.strip_prefix("data:").unwrap_or("").trim();
253 if payload.is_empty() {
254 continue;
255 }
256
257 if let Some(parsed) = self.parse_stream_payload(payload, session_id) {
258 let _ = self.notification_tx.send(parsed);
259 }
260 }
261 }
262
263 fn parse_stream_payload(&self, payload: &str, session_id: &str) -> Option<serde_json::Value> {
265 let json: serde_json::Value = serde_json::from_str(payload).ok()?;
266
267 let content = json
269 .get("content")
270 .and_then(|v| v.as_str())
271 .or_else(|| json.get("message").and_then(|v| v.as_str()))
272 .or_else(|| {
273 json.get("params")
274 .and_then(|p| p.get("update"))
275 .and_then(|u| u.get("content"))
276 .and_then(|c| c.get("text"))
277 .and_then(|t| t.as_str())
278 });
279
280 if let Some(text) = content {
281 return Some(self.agent_chunk(session_id, text));
282 }
283
284 if json.get("method").and_then(|m| m.as_str()) == Some("session/update") {
286 return Some(json);
287 }
288
289 None
290 }
291
292 fn agent_chunk(&self, session_id: &str, text: &str) -> serde_json::Value {
294 serde_json::json!({
295 "jsonrpc": "2.0",
296 "method": "session/update",
297 "params": {
298 "sessionId": session_id,
299 "update": {
300 "sessionUpdate": "agent_chunk",
301 "content": { "type": "text", "text": text }
302 }
303 }
304 })
305 }
306
307 fn turn_complete(&self, session_id: &str) -> serde_json::Value {
309 serde_json::json!({
310 "jsonrpc": "2.0",
311 "method": "session/update",
312 "params": {
313 "sessionId": session_id,
314 "update": {
315 "sessionUpdate": "turn_complete",
316 "stopReason": "end_of_turn"
317 }
318 }
319 })
320 }
321}