Skip to main content

routa_core/acp/docker/
adapter.rs

1//! Docker OpenCode adapter for HTTP/SSE communication with containers.
2//!
3//! Mirrors the TypeScript `DockerOpenCodeAdapter` in `src/core/acp/docker/docker-opencode-adapter.ts`.
4
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::broadcast;
11
12/// Docker OpenCode adapter for communicating with containerized agents.
13pub struct DockerOpenCodeAdapter {
14    base_url: String,
15    client: Client,
16    alive: Arc<AtomicBool>,
17    local_session_id: Arc<tokio::sync::RwLock<Option<String>>>,
18    remote_session_id: Arc<tokio::sync::RwLock<Option<String>>>,
19    notification_tx: broadcast::Sender<serde_json::Value>,
20}
21
22#[derive(Debug, Serialize)]
23#[serde(rename_all = "camelCase")]
24struct NewSessionRequest {
25    title: String,
26}
27
28#[derive(Debug, Deserialize)]
29#[serde(rename_all = "camelCase")]
30struct NewSessionResponse {
31    session_id: String,
32}
33
34#[derive(Debug, Serialize)]
35#[serde(rename_all = "camelCase")]
36struct PromptRequest {
37    session_id: String,
38    prompt: String,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    skill_content: Option<String>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    workspace_id: Option<String>,
43}
44
45impl DockerOpenCodeAdapter {
46    /// Create a new adapter for communicating with a Docker container.
47    pub fn new(base_url: &str, notification_tx: broadcast::Sender<serde_json::Value>) -> Self {
48        let base_url = base_url.trim_end_matches('/').to_string();
49
50        let client = Client::builder()
51            .timeout(Duration::from_secs(300))
52            .build()
53            .expect("Failed to create HTTP client");
54
55        Self {
56            base_url,
57            client,
58            alive: Arc::new(AtomicBool::new(false)),
59            local_session_id: Arc::new(tokio::sync::RwLock::new(None)),
60            remote_session_id: Arc::new(tokio::sync::RwLock::new(None)),
61            notification_tx,
62        }
63    }
64
65    /// Check if the adapter is connected.
66    pub fn is_alive(&self) -> bool {
67        self.alive.load(Ordering::SeqCst)
68    }
69
70    /// Connect to the container by performing a health check.
71    pub async fn connect(&self) -> Result<(), String> {
72        let url = format!("{}/health", self.base_url);
73        let resp = self
74            .client
75            .get(&url)
76            .send()
77            .await
78            .map_err(|e| format!("Health check failed: {}", e))?;
79
80        if !resp.status().is_success() {
81            return Err(format!(
82                "Docker OpenCode health check failed: {} {}",
83                resp.status().as_u16(),
84                resp.status().canonical_reason().unwrap_or("")
85            ));
86        }
87
88        self.alive.store(true, Ordering::SeqCst);
89        Ok(())
90    }
91
92    /// Create a new session in the container.
93    pub async fn create_session(&self, title: Option<&str>) -> Result<String, String> {
94        if !self.is_alive() {
95            return Err("DockerOpenCodeAdapter is not connected".to_string());
96        }
97
98        let url = format!("{}/session/new", self.base_url);
99        let body = NewSessionRequest {
100            title: title.unwrap_or("Routa Docker Session").to_string(),
101        };
102
103        let resp = self
104            .client
105            .post(&url)
106            .json(&body)
107            .send()
108            .await
109            .map_err(|e| format!("Failed to create session: {}", e))?;
110
111        if !resp.status().is_success() {
112            let status = resp.status();
113            let body = resp.text().await.unwrap_or_default();
114            return Err(format!(
115                "Failed to create docker OpenCode session: {} {}{}",
116                status.as_u16(),
117                status.canonical_reason().unwrap_or(""),
118                if body.is_empty() {
119                    "".to_string()
120                } else {
121                    format!(": {}", body)
122                }
123            ));
124        }
125
126        let response: NewSessionResponse = resp
127            .json()
128            .await
129            .map_err(|e| format!("Failed to parse session response: {}", e))?;
130
131        *self.remote_session_id.write().await = Some(response.session_id.clone());
132
133        Ok(response.session_id)
134    }
135
136    /// Set the local session ID (Routa's session ID).
137    pub async fn set_local_session_id(&self, session_id: &str) {
138        *self.local_session_id.write().await = Some(session_id.to_string());
139    }
140
141    /// Get the remote session ID.
142    pub async fn get_remote_session_id(&self) -> Option<String> {
143        self.remote_session_id.read().await.clone()
144    }
145
146    /// Cancel the current prompt.
147    pub async fn cancel(&self) -> Result<(), String> {
148        let remote_sid = self.remote_session_id.read().await.clone();
149        if let Some(session_id) = remote_sid {
150            let url = format!("{}/session/cancel", self.base_url);
151            let _ = self
152                .client
153                .post(&url)
154                .json(&serde_json::json!({ "sessionId": session_id }))
155                .send()
156                .await;
157        }
158        Ok(())
159    }
160
161    /// Send a prompt to the container and stream the response via SSE.
162    pub async fn prompt_stream(
163        &self,
164        text: &str,
165        acp_session_id: Option<&str>,
166        skill_content: Option<&str>,
167        workspace_id: Option<&str>,
168    ) -> Result<(), String> {
169        if !self.is_alive() {
170            return Err("Docker OpenCode session is not active".to_string());
171        }
172
173        let remote_sid = self.remote_session_id.read().await.clone();
174        let remote_session_id = remote_sid.ok_or("No remote session ID")?;
175
176        let local_sid = self.local_session_id.read().await.clone();
177        let session_id = acp_session_id
178            .map(|s| s.to_string())
179            .or(local_sid)
180            .unwrap_or(remote_session_id.clone());
181
182        let url = format!("{}/session/prompt", self.base_url);
183        let body = PromptRequest {
184            session_id: remote_session_id,
185            prompt: text.to_string(),
186            skill_content: skill_content.map(|s| s.to_string()),
187            workspace_id: workspace_id.map(|s| s.to_string()),
188        };
189
190        let resp = self
191            .client
192            .post(&url)
193            .json(&body)
194            .send()
195            .await
196            .map_err(|e| format!("Docker OpenCode prompt failed: {}", e))?;
197
198        if !resp.status().is_success() {
199            return Err(format!(
200                "Docker OpenCode prompt failed: {} {}",
201                resp.status().as_u16(),
202                resp.status().canonical_reason().unwrap_or("")
203            ));
204        }
205
206        // Check if it's an SSE stream
207        let content_type = resp
208            .headers()
209            .get("content-type")
210            .and_then(|v| v.to_str().ok())
211            .unwrap_or("");
212
213        if content_type.contains("text/event-stream") {
214            // Handle SSE stream
215            let bytes = resp
216                .bytes()
217                .await
218                .map_err(|e| format!("Failed to read SSE stream: {}", e))?;
219            let text = String::from_utf8_lossy(&bytes);
220
221            self.parse_sse_stream(&text, &session_id).await;
222        } else {
223            // Handle JSON response
224            if let Ok(json) = resp.json::<serde_json::Value>().await {
225                let content = json
226                    .get("content")
227                    .and_then(|v| v.as_str())
228                    .or_else(|| json.get("message").and_then(|v| v.as_str()))
229                    .unwrap_or("");
230
231                if !content.is_empty() {
232                    let msg = self.agent_chunk(&session_id, content);
233                    let _ = self.notification_tx.send(msg);
234                }
235            }
236        }
237
238        // Send turn_complete
239        let complete = self.turn_complete(&session_id);
240        let _ = self.notification_tx.send(complete);
241
242        Ok(())
243    }
244
245    /// Parse SSE stream and emit notifications.
246    async fn parse_sse_stream(&self, text: &str, session_id: &str) {
247        for frame in text.split("\n\n") {
248            if !frame.starts_with("data:") {
249                continue;
250            }
251
252            let payload = frame.strip_prefix("data:").unwrap_or("").trim();
253            if payload.is_empty() {
254                continue;
255            }
256
257            if let Some(parsed) = self.parse_stream_payload(payload, session_id) {
258                let _ = self.notification_tx.send(parsed);
259            }
260        }
261    }
262
263    /// Parse a single SSE payload and convert to session update.
264    fn parse_stream_payload(&self, payload: &str, session_id: &str) -> Option<serde_json::Value> {
265        let json: serde_json::Value = serde_json::from_str(payload).ok()?;
266
267        // Extract content from various possible formats
268        let content = json
269            .get("content")
270            .and_then(|v| v.as_str())
271            .or_else(|| json.get("message").and_then(|v| v.as_str()))
272            .or_else(|| {
273                json.get("params")
274                    .and_then(|p| p.get("update"))
275                    .and_then(|u| u.get("content"))
276                    .and_then(|c| c.get("text"))
277                    .and_then(|t| t.as_str())
278            });
279
280        if let Some(text) = content {
281            return Some(self.agent_chunk(session_id, text));
282        }
283
284        // Pass through session/update notifications
285        if json.get("method").and_then(|m| m.as_str()) == Some("session/update") {
286            return Some(json);
287        }
288
289        None
290    }
291
292    /// Create an agent_chunk notification.
293    fn agent_chunk(&self, session_id: &str, text: &str) -> serde_json::Value {
294        serde_json::json!({
295            "jsonrpc": "2.0",
296            "method": "session/update",
297            "params": {
298                "sessionId": session_id,
299                "update": {
300                    "sessionUpdate": "agent_chunk",
301                    "content": { "type": "text", "text": text }
302                }
303            }
304        })
305    }
306
307    /// Create a turn_complete notification.
308    fn turn_complete(&self, session_id: &str) -> serde_json::Value {
309        serde_json::json!({
310            "jsonrpc": "2.0",
311            "method": "session/update",
312            "params": {
313                "sessionId": session_id,
314                "update": {
315                    "sessionUpdate": "turn_complete",
316                    "stopReason": "end_of_turn"
317                }
318            }
319        })
320    }
321}