1use crate::{auth::Auth, errors::{MuxiError, Result}, SseEvent, VERSION, version_check};
2use reqwest::Client;
3use serde_json::{json, Value};
4use std::time::Duration;
5use futures::Stream;
6use async_stream::stream;
7
8fn parse_sse_events(text: &str) -> Vec<SseEvent> {
9 let mut events = Vec::new();
10 let mut current_event: Option<String> = None;
11 let mut data_parts: Vec<String> = Vec::new();
12
13 for line in text.lines() {
14 if line.starts_with(':') { continue; }
15 if line.is_empty() {
16 if !data_parts.is_empty() {
17 events.push(SseEvent {
18 event: current_event.take().unwrap_or_else(|| "message".to_string()),
19 data: data_parts.join("\n"),
20 });
21 data_parts.clear();
22 }
23 continue;
24 }
25 if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
26 else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
27 }
28 events
29}
30
31#[derive(Clone)]
32pub struct ServerConfig {
33 pub url: String,
34 pub key_id: String,
35 pub secret_key: String,
36 pub timeout: u64,
37 pub max_retries: u32,
38 pub(crate) app: Option<String>, }
40
41impl ServerConfig {
42 pub fn new(url: &str, key_id: &str, secret_key: &str) -> Self {
43 Self {
44 url: url.trim_end_matches('/').to_string(),
45 key_id: key_id.to_string(),
46 secret_key: secret_key.to_string(),
47 timeout: 30,
48 max_retries: 0,
49 app: None,
50 }
51 }
52}
53
54#[derive(Clone)]
55pub struct ServerClient {
56 config: ServerConfig,
57 client: Client,
58}
59
60impl ServerClient {
61 pub fn new(config: ServerConfig) -> Result<Self> {
62 let client = Client::builder()
63 .timeout(Duration::from_secs(config.timeout))
64 .build()?;
65 Ok(Self { config, client })
66 }
67
68 pub async fn health(&self) -> Result<Value> { self.get("/health", false).await }
69 pub async fn status(&self) -> Result<Value> { self.rpc_get("/rpc/server/status").await }
70 pub async fn list_formations(&self) -> Result<Value> { self.rpc_get("/rpc/formations").await }
71 pub async fn get_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_get(&format!("/rpc/formations/{}", formation_id)).await }
72 pub async fn stop_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/stop", formation_id), json!({})).await }
73 pub async fn start_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/start", formation_id), json!({})).await }
74 pub async fn restart_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/restart", formation_id), json!({})).await }
75 pub async fn rollback_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/rollback", formation_id), json!({})).await }
76 pub async fn delete_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_delete(&format!("/rpc/formations/{}", formation_id)).await }
77 pub async fn cancel_update(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/cancel-update", formation_id), json!({})).await }
78 pub async fn deploy_formation(&self, formation_id: &str, payload: Value) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/deploy", formation_id), payload).await }
79 pub async fn update_formation(&self, formation_id: &str, payload: Value) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/update", formation_id), payload).await }
80
81 pub async fn get_formation_logs(&self, formation_id: &str, limit: Option<u32>) -> Result<Value> {
82 let path = match limit {
83 Some(l) => format!("/rpc/formations/{}/logs?limit={}", formation_id, l),
84 None => format!("/rpc/formations/{}/logs", formation_id),
85 };
86 self.rpc_get(&path).await
87 }
88
89 pub async fn get_server_logs(&self, limit: Option<u32>) -> Result<Value> {
90 let path = match limit {
91 Some(l) => format!("/rpc/server/logs?limit={}", l),
92 None => "/rpc/server/logs".to_string(),
93 };
94 self.rpc_get(&path).await
95 }
96
97 pub fn deploy_formation_stream(&self, formation_id: &str, payload: Value) -> impl Stream<Item = Result<SseEvent>> + '_ {
98 let path = format!("/rpc/formations/{}/deploy/stream", formation_id);
99 let client = self.client.clone();
100 let config = self.config.clone();
101 stream! {
102 let url = format!("{}{}", config.url, path);
103 let resp = client.post(&url)
104 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
105 .header("Authorization", Auth::build_auth_header(&config.key_id, &config.secret_key, "POST", &path))
106 .header("Accept", "text/event-stream")
107 .header("Content-Type", "application/json")
108 .json(&payload)
109 .send()
110 .await;
111
112 match resp {
113 Ok(r) => {
114 let text = r.text().await.unwrap_or_default();
115 for event in parse_sse_events(&text) { yield Ok(event); }
116 }
117 Err(e) => yield Err(MuxiError::Request(e)),
118 }
119 }
120 }
121
122 pub fn stream_formation_logs(&self, formation_id: &str) -> impl Stream<Item = Result<SseEvent>> + '_ {
123 let path = format!("/rpc/formations/{}/logs/stream", formation_id);
124 let client = self.client.clone();
125 let config = self.config.clone();
126 stream! {
127 let url = format!("{}{}", config.url, path);
128 let resp = client.get(&url)
129 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
130 .header("Authorization", Auth::build_auth_header(&config.key_id, &config.secret_key, "GET", &path))
131 .header("Accept", "text/event-stream")
132 .send()
133 .await;
134
135 match resp {
136 Ok(r) => {
137 let text = r.text().await.unwrap_or_default();
138 for event in parse_sse_events(&text) { yield Ok(event); }
139 }
140 Err(e) => yield Err(MuxiError::Request(e)),
141 }
142 }
143 }
144
145 async fn get(&self, path: &str, auth: bool) -> Result<Value> {
146 let url = format!("{}{}", self.config.url, path);
147 let mut req = self.client.get(&url)
148 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
149 .header("X-Muxi-Client", format!("rust/{}", VERSION))
150 .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
151 .header("Accept", "application/json");
152
153 if auth {
154 req = req.header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "GET", path));
155 }
156
157 let resp = req.send().await?;
158 self.handle_response(resp).await
159 }
160
161 async fn rpc_get(&self, path: &str) -> Result<Value> { self.get(path, true).await }
162
163 async fn rpc_post(&self, path: &str, body: Value) -> Result<Value> {
164 let url = format!("{}{}", self.config.url, path);
165 let resp = self.client.post(&url)
166 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
167 .header("X-Muxi-Client", format!("rust/{}", VERSION))
168 .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
169 .header("Accept", "application/json")
170 .header("Content-Type", "application/json")
171 .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "POST", path))
172 .json(&body)
173 .send()
174 .await?;
175
176 self.handle_response(resp).await
177 }
178
179 async fn rpc_delete(&self, path: &str) -> Result<Value> {
180 let url = format!("{}{}", self.config.url, path);
181 let resp = self.client.delete(&url)
182 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
183 .header("X-Muxi-Client", format!("rust/{}", VERSION))
184 .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
185 .header("Accept", "application/json")
186 .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "DELETE", path))
187 .send()
188 .await?;
189
190 self.handle_response(resp).await
191 }
192
193 fn stream_sse_post<'a>(&'a self, path: &'a str, body: Value) -> impl Stream<Item = Result<SseEvent>> + 'a {
194 stream! {
195 let url = format!("{}{}", self.config.url, path);
196 let resp = self.client.post(&url)
197 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
198 .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "POST", path))
199 .header("Accept", "text/event-stream")
200 .header("Content-Type", "application/json")
201 .json(&body)
202 .send()
203 .await;
204
205 match resp {
206 Ok(r) => {
207 let mut current_event: Option<String> = None;
208 let mut data_parts: Vec<String> = Vec::new();
209 let text = r.text().await.unwrap_or_default();
210 for line in text.lines() {
211 if line.starts_with(':') { continue; }
212 if line.is_empty() {
213 if !data_parts.is_empty() {
214 yield Ok(SseEvent {
215 event: current_event.take().unwrap_or_else(|| "message".to_string()),
216 data: data_parts.join("\n"),
217 });
218 data_parts.clear();
219 }
220 continue;
221 }
222 if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
223 else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
224 }
225 }
226 Err(e) => yield Err(MuxiError::Request(e)),
227 }
228 }
229 }
230
231 fn stream_sse_get<'a>(&'a self, path: &'a str) -> impl Stream<Item = Result<SseEvent>> + 'a {
232 stream! {
233 let url = format!("{}{}", self.config.url, path);
234 let resp = self.client.get(&url)
235 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
236 .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "GET", path))
237 .header("Accept", "text/event-stream")
238 .send()
239 .await;
240
241 match resp {
242 Ok(r) => {
243 let mut current_event: Option<String> = None;
244 let mut data_parts: Vec<String> = Vec::new();
245 let text = r.text().await.unwrap_or_default();
246 for line in text.lines() {
247 if line.starts_with(':') { continue; }
248 if line.is_empty() {
249 if !data_parts.is_empty() {
250 yield Ok(SseEvent {
251 event: current_event.take().unwrap_or_else(|| "message".to_string()),
252 data: data_parts.join("\n"),
253 });
254 data_parts.clear();
255 }
256 continue;
257 }
258 if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
259 else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
260 }
261 }
262 Err(e) => yield Err(MuxiError::Request(e)),
263 }
264 }
265 }
266
267 async fn handle_response(&self, resp: reqwest::Response) -> Result<Value> {
268 let status = resp.status().as_u16();
269
270 let headers: std::collections::HashMap<String, String> = resp.headers()
272 .iter()
273 .filter_map(|(k, v)| v.to_str().ok().map(|s| (k.to_string(), s.to_string())))
274 .collect();
275 version_check::check_for_updates(&headers);
276
277 let retry_after = resp.headers()
278 .get("Retry-After")
279 .and_then(|v| v.to_str().ok())
280 .and_then(|v| v.parse().ok());
281
282 let body = resp.text().await.unwrap_or_default();
283
284 if status >= 400 {
285 let (code, message) = if let Ok(json) = serde_json::from_str::<Value>(&body) {
286 (
287 json.get("code").or(json.get("error")).and_then(|v| v.as_str()).map(String::from),
288 json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error").to_string(),
289 )
290 } else {
291 (None, "Unknown error".to_string())
292 };
293 return Err(MuxiError::from_response(status, code, message, retry_after));
294 }
295
296 if body.is_empty() {
297 Ok(json!({}))
298 } else {
299 Ok(serde_json::from_str(&body)?)
300 }
301 }
302}