Skip to main content

muxi_rust/
server_client.rs

1use crate::{auth::Auth, errors::{MuxiError, Result}, SseEvent, VERSION, version_check};
2use reqwest::Client;
3use serde_json::{json, Value};
4use std::time::Duration;
5use futures::Stream;
6use async_stream::stream;
7
8fn parse_sse_events(text: &str) -> Vec<SseEvent> {
9    let mut events = Vec::new();
10    let mut current_event: Option<String> = None;
11    let mut data_parts: Vec<String> = Vec::new();
12    
13    for line in text.lines() {
14        if line.starts_with(':') { continue; }
15        if line.is_empty() {
16            if !data_parts.is_empty() {
17                events.push(SseEvent {
18                    event: current_event.take().unwrap_or_else(|| "message".to_string()),
19                    data: data_parts.join("\n"),
20                });
21                data_parts.clear();
22            }
23            continue;
24        }
25        if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
26        else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
27    }
28    events
29}
30
31#[derive(Clone)]
32pub struct ServerConfig {
33    pub url: String,
34    pub key_id: String,
35    pub secret_key: String,
36    pub timeout: u64,
37    pub max_retries: u32,
38    pub(crate) app: Option<String>,  // Internal: for Console telemetry
39}
40
41impl ServerConfig {
42    pub fn new(url: &str, key_id: &str, secret_key: &str) -> Self {
43        Self {
44            url: url.trim_end_matches('/').to_string(),
45            key_id: key_id.to_string(),
46            secret_key: secret_key.to_string(),
47            timeout: 30,
48            max_retries: 0,
49            app: None,
50        }
51    }
52}
53
54#[derive(Clone)]
55pub struct ServerClient {
56    config: ServerConfig,
57    client: Client,
58}
59
60impl ServerClient {
61    pub fn new(config: ServerConfig) -> Result<Self> {
62        let client = Client::builder()
63            .timeout(Duration::from_secs(config.timeout))
64            .build()?;
65        Ok(Self { config, client })
66    }
67    
68    pub async fn health(&self) -> Result<Value> { self.get("/health", false).await }
69    pub async fn status(&self) -> Result<Value> { self.rpc_get("/rpc/server/status").await }
70    pub async fn list_formations(&self) -> Result<Value> { self.rpc_get("/rpc/formations").await }
71    pub async fn get_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_get(&format!("/rpc/formations/{}", formation_id)).await }
72    pub async fn stop_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/stop", formation_id), json!({})).await }
73    pub async fn start_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/start", formation_id), json!({})).await }
74    pub async fn restart_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/restart", formation_id), json!({})).await }
75    pub async fn rollback_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/rollback", formation_id), json!({})).await }
76    pub async fn delete_formation(&self, formation_id: &str) -> Result<Value> { self.rpc_delete(&format!("/rpc/formations/{}", formation_id)).await }
77    pub async fn cancel_update(&self, formation_id: &str) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/cancel-update", formation_id), json!({})).await }
78    pub async fn deploy_formation(&self, formation_id: &str, payload: Value) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/deploy", formation_id), payload).await }
79    pub async fn update_formation(&self, formation_id: &str, payload: Value) -> Result<Value> { self.rpc_post(&format!("/rpc/formations/{}/update", formation_id), payload).await }
80    
81    pub async fn get_formation_logs(&self, formation_id: &str, limit: Option<u32>) -> Result<Value> {
82        let path = match limit {
83            Some(l) => format!("/rpc/formations/{}/logs?limit={}", formation_id, l),
84            None => format!("/rpc/formations/{}/logs", formation_id),
85        };
86        self.rpc_get(&path).await
87    }
88    
89    pub async fn get_server_logs(&self, limit: Option<u32>) -> Result<Value> {
90        let path = match limit {
91            Some(l) => format!("/rpc/server/logs?limit={}", l),
92            None => "/rpc/server/logs".to_string(),
93        };
94        self.rpc_get(&path).await
95    }
96    
97    pub fn deploy_formation_stream(&self, formation_id: &str, payload: Value) -> impl Stream<Item = Result<SseEvent>> + '_ {
98        let path = format!("/rpc/formations/{}/deploy/stream", formation_id);
99        let client = self.client.clone();
100        let config = self.config.clone();
101        stream! {
102            let url = format!("{}{}", config.url, path);
103            let resp = client.post(&url)
104                .header("X-Muxi-SDK", format!("rust/{}", VERSION))
105                .header("Authorization", Auth::build_auth_header(&config.key_id, &config.secret_key, "POST", &path))
106                .header("Accept", "text/event-stream")
107                .header("Content-Type", "application/json")
108                .json(&payload)
109                .send()
110                .await;
111            
112            match resp {
113                Ok(r) => {
114                    let text = r.text().await.unwrap_or_default();
115                    for event in parse_sse_events(&text) { yield Ok(event); }
116                }
117                Err(e) => yield Err(MuxiError::Request(e)),
118            }
119        }
120    }
121    
122    pub fn stream_formation_logs(&self, formation_id: &str) -> impl Stream<Item = Result<SseEvent>> + '_ {
123        let path = format!("/rpc/formations/{}/logs/stream", formation_id);
124        let client = self.client.clone();
125        let config = self.config.clone();
126        stream! {
127            let url = format!("{}{}", config.url, path);
128            let resp = client.get(&url)
129                .header("X-Muxi-SDK", format!("rust/{}", VERSION))
130                .header("Authorization", Auth::build_auth_header(&config.key_id, &config.secret_key, "GET", &path))
131                .header("Accept", "text/event-stream")
132                .send()
133                .await;
134            
135            match resp {
136                Ok(r) => {
137                    let text = r.text().await.unwrap_or_default();
138                    for event in parse_sse_events(&text) { yield Ok(event); }
139                }
140                Err(e) => yield Err(MuxiError::Request(e)),
141            }
142        }
143    }
144    
145    async fn get(&self, path: &str, auth: bool) -> Result<Value> {
146        let url = format!("{}{}", self.config.url, path);
147        let mut req = self.client.get(&url)
148            .header("X-Muxi-SDK", format!("rust/{}", VERSION))
149            .header("X-Muxi-Client", format!("rust/{}", VERSION))
150            .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
151            .header("Accept", "application/json");
152        
153        if auth {
154            req = req.header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "GET", path));
155        }
156        
157        let resp = req.send().await?;
158        self.handle_response(resp).await
159    }
160    
161    async fn rpc_get(&self, path: &str) -> Result<Value> { self.get(path, true).await }
162    
163    async fn rpc_post(&self, path: &str, body: Value) -> Result<Value> {
164        let url = format!("{}{}", self.config.url, path);
165        let resp = self.client.post(&url)
166            .header("X-Muxi-SDK", format!("rust/{}", VERSION))
167            .header("X-Muxi-Client", format!("rust/{}", VERSION))
168            .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
169            .header("Accept", "application/json")
170            .header("Content-Type", "application/json")
171            .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "POST", path))
172            .json(&body)
173            .send()
174            .await?;
175        
176        self.handle_response(resp).await
177    }
178    
179    async fn rpc_delete(&self, path: &str) -> Result<Value> {
180        let url = format!("{}{}", self.config.url, path);
181        let resp = self.client.delete(&url)
182            .header("X-Muxi-SDK", format!("rust/{}", VERSION))
183            .header("X-Muxi-Client", format!("rust/{}", VERSION))
184            .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
185            .header("Accept", "application/json")
186            .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "DELETE", path))
187            .send()
188            .await?;
189        
190        self.handle_response(resp).await
191    }
192    
193    fn stream_sse_post<'a>(&'a self, path: &'a str, body: Value) -> impl Stream<Item = Result<SseEvent>> + 'a {
194        stream! {
195            let url = format!("{}{}", self.config.url, path);
196            let resp = self.client.post(&url)
197                .header("X-Muxi-SDK", format!("rust/{}", VERSION))
198                .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "POST", path))
199                .header("Accept", "text/event-stream")
200                .header("Content-Type", "application/json")
201                .json(&body)
202                .send()
203                .await;
204            
205            match resp {
206                Ok(r) => {
207                    let mut current_event: Option<String> = None;
208                    let mut data_parts: Vec<String> = Vec::new();
209                    let text = r.text().await.unwrap_or_default();
210                    for line in text.lines() {
211                        if line.starts_with(':') { continue; }
212                        if line.is_empty() {
213                            if !data_parts.is_empty() {
214                                yield Ok(SseEvent {
215                                    event: current_event.take().unwrap_or_else(|| "message".to_string()),
216                                    data: data_parts.join("\n"),
217                                });
218                                data_parts.clear();
219                            }
220                            continue;
221                        }
222                        if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
223                        else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
224                    }
225                }
226                Err(e) => yield Err(MuxiError::Request(e)),
227            }
228        }
229    }
230    
231    fn stream_sse_get<'a>(&'a self, path: &'a str) -> impl Stream<Item = Result<SseEvent>> + 'a {
232        stream! {
233            let url = format!("{}{}", self.config.url, path);
234            let resp = self.client.get(&url)
235                .header("X-Muxi-SDK", format!("rust/{}", VERSION))
236                .header("Authorization", Auth::build_auth_header(&self.config.key_id, &self.config.secret_key, "GET", path))
237                .header("Accept", "text/event-stream")
238                .send()
239                .await;
240            
241            match resp {
242                Ok(r) => {
243                    let mut current_event: Option<String> = None;
244                    let mut data_parts: Vec<String> = Vec::new();
245                    let text = r.text().await.unwrap_or_default();
246                    for line in text.lines() {
247                        if line.starts_with(':') { continue; }
248                        if line.is_empty() {
249                            if !data_parts.is_empty() {
250                                yield Ok(SseEvent {
251                                    event: current_event.take().unwrap_or_else(|| "message".to_string()),
252                                    data: data_parts.join("\n"),
253                                });
254                                data_parts.clear();
255                            }
256                            continue;
257                        }
258                        if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
259                        else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
260                    }
261                }
262                Err(e) => yield Err(MuxiError::Request(e)),
263            }
264        }
265    }
266    
267    async fn handle_response(&self, resp: reqwest::Response) -> Result<Value> {
268        let status = resp.status().as_u16();
269        
270        // Check for SDK updates (non-blocking, once per process)
271        let headers: std::collections::HashMap<String, String> = resp.headers()
272            .iter()
273            .filter_map(|(k, v)| v.to_str().ok().map(|s| (k.to_string(), s.to_string())))
274            .collect();
275        version_check::check_for_updates(&headers);
276        
277        let retry_after = resp.headers()
278            .get("Retry-After")
279            .and_then(|v| v.to_str().ok())
280            .and_then(|v| v.parse().ok());
281        
282        let body = resp.text().await.unwrap_or_default();
283        
284        if status >= 400 {
285            let (code, message) = if let Ok(json) = serde_json::from_str::<Value>(&body) {
286                (
287                    json.get("code").or(json.get("error")).and_then(|v| v.as_str()).map(String::from),
288                    json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error").to_string(),
289                )
290            } else {
291                (None, "Unknown error".to_string())
292            };
293            return Err(MuxiError::from_response(status, code, message, retry_after));
294        }
295        
296        if body.is_empty() {
297            Ok(json!({}))
298        } else {
299            Ok(serde_json::from_str(&body)?)
300        }
301    }
302}