1use bytes::Bytes;
2use futures::StreamExt;
3use reqwest::Client;
4use tokio::sync::mpsc;
5use uuid::Uuid;
6
7use ciab_core::error::{CiabError, CiabResult};
8use ciab_core::types::sandbox::{ExecRequest, ExecResult, FileInfo, ResourceStats};
9use ciab_core::types::stream::{StreamEvent, StreamEventType};
10
11#[derive(Clone)]
12pub struct ExecdClient {
13 endpoint_url: String,
14 client: Client,
15}
16
17impl ExecdClient {
18 pub fn new(endpoint_url: String) -> Self {
19 Self {
20 endpoint_url,
21 client: Client::new(),
22 }
23 }
24
25 pub async fn run_command(&self, req: &ExecRequest) -> CiabResult<ExecResult> {
26 let url = format!("{}/exec", self.endpoint_url);
27 let resp = self
28 .client
29 .post(&url)
30 .json(req)
31 .send()
32 .await
33 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
34
35 let status = resp.status();
36 if !status.is_success() {
37 let body = resp.text().await.unwrap_or_default();
38 return Err(CiabError::ExecFailed(format!("({}): {}", status, body)));
39 }
40
41 resp.json::<ExecResult>()
42 .await
43 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
44 }
45
46 pub async fn run_command_stream(
47 &self,
48 req: &ExecRequest,
49 tx: mpsc::Sender<StreamEvent>,
50 sandbox_id: Uuid,
51 ) -> CiabResult<i32> {
52 let url = format!("{}/exec/stream", self.endpoint_url);
53 let resp = self
54 .client
55 .post(&url)
56 .json(req)
57 .send()
58 .await
59 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
60
61 let status = resp.status();
62 if !status.is_success() {
63 let body = resp.text().await.unwrap_or_default();
64 return Err(CiabError::ExecFailed(format!("({}): {}", status, body)));
65 }
66
67 let mut stream = resp.bytes_stream();
68 let mut buffer = String::new();
69 let mut exit_code: i32 = -1;
70
71 while let Some(chunk_result) = stream.next().await {
72 let chunk: Bytes =
73 chunk_result.map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
74 buffer.push_str(&String::from_utf8_lossy(&chunk));
75
76 while let Some(pos) = buffer.find("\n\n") {
77 let event_block = buffer[..pos].to_string();
78 buffer = buffer[pos + 2..].to_string();
79
80 let mut event_type_str: Option<String> = None;
81 let mut data_str: Option<String> = None;
82
83 for line in event_block.lines() {
84 if let Some(rest) = line.strip_prefix("event: ") {
85 event_type_str = Some(rest.trim().to_string());
86 } else if let Some(rest) = line.strip_prefix("data: ") {
87 data_str = Some(rest.trim().to_string());
88 }
89 }
90
91 if let (Some(evt_type), Some(data_raw)) = (event_type_str, data_str) {
92 let data: serde_json::Value =
93 serde_json::from_str(&data_raw).unwrap_or(serde_json::json!(data_raw));
94
95 let stream_event_type = match evt_type.as_str() {
96 "text_delta" => StreamEventType::TextDelta,
97 "text_complete" => StreamEventType::TextComplete,
98 "error" => StreamEventType::Error,
99 "stats" => StreamEventType::Stats,
100 "log_line" => StreamEventType::LogLine,
101 _ => StreamEventType::TextDelta,
102 };
103
104 if let Some(code) = data.get("exit_code").and_then(|v| v.as_i64()) {
106 exit_code = code as i32;
107 }
108
109 let event = StreamEvent {
110 id: uuid::Uuid::new_v4().to_string(),
111 sandbox_id,
112 session_id: None,
113 event_type: stream_event_type,
114 data,
115 timestamp: chrono::Utc::now(),
116 };
117
118 if tx.send(event).await.is_err() {
119 break;
121 }
122 }
123 }
124 }
125
126 Ok(exit_code)
127 }
128
129 pub async fn upload_file(&self, path: &str, content: &[u8], mode: u32) -> CiabResult<()> {
130 let url = format!("{}/files/{}", self.endpoint_url, path);
131 let resp = self
132 .client
133 .put(&url)
134 .header("X-File-Mode", mode.to_string())
135 .body(content.to_vec())
136 .send()
137 .await
138 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
139
140 let status = resp.status();
141 if !status.is_success() {
142 let body = resp.text().await.unwrap_or_default();
143 return Err(CiabError::OpenSandboxError(format!(
144 "upload file failed ({}): {}",
145 status, body
146 )));
147 }
148
149 Ok(())
150 }
151
152 pub async fn download_file(&self, path: &str) -> CiabResult<Vec<u8>> {
153 let url = format!("{}/files/{}", self.endpoint_url, path);
154 let resp = self
155 .client
156 .get(&url)
157 .send()
158 .await
159 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
160
161 let status = resp.status();
162 if !status.is_success() {
163 let body = resp.text().await.unwrap_or_default();
164 return Err(CiabError::FileNotFound(format!("({}): {}", status, body)));
165 }
166
167 resp.bytes()
168 .await
169 .map(|b| b.to_vec())
170 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
171 }
172
173 pub async fn list_files(&self, pattern: &str) -> CiabResult<Vec<FileInfo>> {
174 let url = format!("{}/files", self.endpoint_url);
175 let resp = self
176 .client
177 .get(&url)
178 .query(&[("pattern", pattern)])
179 .send()
180 .await
181 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
182
183 let status = resp.status();
184 if !status.is_success() {
185 let body = resp.text().await.unwrap_or_default();
186 return Err(CiabError::OpenSandboxError(format!(
187 "list files failed ({}): {}",
188 status, body
189 )));
190 }
191
192 resp.json::<Vec<FileInfo>>()
193 .await
194 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
195 }
196
197 pub async fn get_metrics(&self) -> CiabResult<ResourceStats> {
198 let url = format!("{}/metrics", self.endpoint_url);
199 let resp = self
200 .client
201 .get(&url)
202 .send()
203 .await
204 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))?;
205
206 let status = resp.status();
207 if !status.is_success() {
208 let body = resp.text().await.unwrap_or_default();
209 return Err(CiabError::OpenSandboxError(format!(
210 "get metrics failed ({}): {}",
211 status, body
212 )));
213 }
214
215 resp.json::<ResourceStats>()
216 .await
217 .map_err(|e| CiabError::OpenSandboxError(e.to_string()))
218 }
219}