Skip to main content

ciab_sandbox/
execd.rs

1use bytes::Bytes;
2use futures::StreamExt;
3use reqwest::Client;
4use tokio::sync::mpsc;
5use uuid::Uuid;
6
7use ciab_core::error::{CiabError, CiabResult};
8use ciab_core::types::sandbox::{ExecRequest, ExecResult, FileInfo, ResourceStats};
9use ciab_core::types::stream::{StreamEvent, StreamEventType};
10
11#[derive(Clone)]
12pub struct ExecdClient {
13    endpoint_url: String,
14    client: Client,
15}
16
17impl ExecdClient {
18    pub fn new(endpoint_url: String) -> Self {
19        Self {
20            endpoint_url,
21            client: Client::new(),
22        }
23    }
24
25    pub async fn run_command(&self, req: &ExecRequest) -> CiabResult<ExecResult> {
26        let url = format!("{}/exec", self.endpoint_url);
27        let resp = self
28            .client
29            .post(&url)
30            .json(req)
31            .send()
32            .await
33            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
34
35        let status = resp.status();
36        if !status.is_success() {
37            let body = resp.text().await.unwrap_or_default();
38            return Err(CiabError::ExecFailed(format!("({}): {}", status, body)));
39        }
40
41        resp.json::<ExecResult>()
42            .await
43            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
44    }
45
46    pub async fn run_command_stream(
47        &self,
48        req: &ExecRequest,
49        tx: mpsc::Sender<StreamEvent>,
50        sandbox_id: Uuid,
51    ) -> CiabResult<i32> {
52        let url = format!("{}/exec/stream", self.endpoint_url);
53        let resp = self
54            .client
55            .post(&url)
56            .json(req)
57            .send()
58            .await
59            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
60
61        let status = resp.status();
62        if !status.is_success() {
63            let body = resp.text().await.unwrap_or_default();
64            return Err(CiabError::ExecFailed(format!("({}): {}", status, body)));
65        }
66
67        let mut stream = resp.bytes_stream();
68        let mut buffer = String::new();
69        let mut exit_code: i32 = -1;
70
71        while let Some(chunk_result) = stream.next().await {
72            let chunk: Bytes =
73                chunk_result.map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
74            buffer.push_str(&String::from_utf8_lossy(&chunk));
75
76            while let Some(pos) = buffer.find("\n\n") {
77                let event_block = buffer[..pos].to_string();
78                buffer = buffer[pos + 2..].to_string();
79
80                let mut event_type_str: Option<String> = None;
81                let mut data_str: Option<String> = None;
82
83                for line in event_block.lines() {
84                    if let Some(rest) = line.strip_prefix("event: ") {
85                        event_type_str = Some(rest.trim().to_string());
86                    } else if let Some(rest) = line.strip_prefix("data: ") {
87                        data_str = Some(rest.trim().to_string());
88                    }
89                }
90
91                if let (Some(evt_type), Some(data_raw)) = (event_type_str, data_str) {
92                    let data: serde_json::Value =
93                        serde_json::from_str(&data_raw).unwrap_or(serde_json::json!(data_raw));
94
95                    let stream_event_type = match evt_type.as_str() {
96                        "text_delta" => StreamEventType::TextDelta,
97                        "text_complete" => StreamEventType::TextComplete,
98                        "error" => StreamEventType::Error,
99                        "stats" => StreamEventType::Stats,
100                        "log_line" => StreamEventType::LogLine,
101                        _ => StreamEventType::TextDelta,
102                    };
103
104                    // Extract exit code if present
105                    if let Some(code) = data.get("exit_code").and_then(|v| v.as_i64()) {
106                        exit_code = code as i32;
107                    }
108
109                    let event = StreamEvent {
110                        id: uuid::Uuid::new_v4().to_string(),
111                        sandbox_id,
112                        session_id: None,
113                        event_type: stream_event_type,
114                        data,
115                        timestamp: chrono::Utc::now(),
116                    };
117
118                    if tx.send(event).await.is_err() {
119                        // Receiver dropped
120                        break;
121                    }
122                }
123            }
124        }
125
126        Ok(exit_code)
127    }
128
129    pub async fn upload_file(&self, path: &str, content: &[u8], mode: u32) -> CiabResult<()> {
130        let url = format!("{}/files/{}", self.endpoint_url, path);
131        let resp = self
132            .client
133            .put(&url)
134            .header("X-File-Mode", mode.to_string())
135            .body(content.to_vec())
136            .send()
137            .await
138            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
139
140        let status = resp.status();
141        if !status.is_success() {
142            let body = resp.text().await.unwrap_or_default();
143            return Err(CiabError::OpenSandboxError(format!(
144                "upload file failed ({}): {}",
145                status, body
146            )));
147        }
148
149        Ok(())
150    }
151
152    pub async fn download_file(&self, path: &str) -> CiabResult<Vec<u8>> {
153        let url = format!("{}/files/{}", self.endpoint_url, path);
154        let resp = self
155            .client
156            .get(&url)
157            .send()
158            .await
159            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
160
161        let status = resp.status();
162        if !status.is_success() {
163            let body = resp.text().await.unwrap_or_default();
164            return Err(CiabError::FileNotFound(format!("({}): {}", status, body)));
165        }
166
167        resp.bytes()
168            .await
169            .map(|b| b.to_vec())
170            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
171    }
172
173    pub async fn list_files(&self, pattern: &str) -> CiabResult<Vec<FileInfo>> {
174        let url = format!("{}/files", self.endpoint_url);
175        let resp = self
176            .client
177            .get(&url)
178            .query(&[("pattern", pattern)])
179            .send()
180            .await
181            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
182
183        let status = resp.status();
184        if !status.is_success() {
185            let body = resp.text().await.unwrap_or_default();
186            return Err(CiabError::OpenSandboxError(format!(
187                "list files failed ({}): {}",
188                status, body
189            )));
190        }
191
192        resp.json::<Vec<FileInfo>>()
193            .await
194            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
195    }
196
197    pub async fn get_metrics(&self) -> CiabResult<ResourceStats> {
198        let url = format!("{}/metrics", self.endpoint_url);
199        let resp = self
200            .client
201            .get(&url)
202            .send()
203            .await
204            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
205
206        let status = resp.status();
207        if !status.is_success() {
208            let body = resp.text().await.unwrap_or_default();
209            return Err(CiabError::OpenSandboxError(format!(
210                "get metrics failed ({}): {}",
211                status, body
212            )));
213        }
214
215        resp.json::<ResourceStats>()
216            .await
217            .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
218    }
219}