Skip to main content

muxi_rust/
formation_client.rs

1use crate::{errors::{MuxiError, Result}, SseEvent, VERSION, version_check};
2use reqwest::Client;
3use serde_json::{json, Value};
4use std::time::Duration;
5use futures::Stream;
6use async_stream::stream;
7
8#[derive(Clone)]
9pub struct FormationConfig {
10    pub server_url: Option<String>,
11    pub formation_id: Option<String>,
12    pub base_url: Option<String>,
13    pub client_key: Option<String>,
14    pub admin_key: Option<String>,
15    pub timeout: u64,
16    pub mode: String,  // "live" (default) or "draft" for local dev
17    pub(crate) app: Option<String>,  // Internal: for Console telemetry
18}
19
20impl FormationConfig {
21    pub fn new(server_url: &str, formation_id: &str, client_key: &str, admin_key: &str) -> Self {
22        Self {
23            server_url: Some(server_url.to_string()),
24            formation_id: Some(formation_id.to_string()),
25            base_url: None,
26            client_key: Some(client_key.to_string()),
27            admin_key: Some(admin_key.to_string()),
28            timeout: 30,
29            mode: "live".to_string(),
30            app: None,
31        }
32    }
33    
34    pub fn with_base_url(base_url: &str, client_key: &str, admin_key: &str) -> Self {
35        Self {
36            server_url: None,
37            formation_id: None,
38            base_url: Some(base_url.to_string()),
39            client_key: Some(client_key.to_string()),
40            admin_key: Some(admin_key.to_string()),
41            timeout: 30,
42            mode: "live".to_string(),
43            app: None,
44        }
45    }
46}
47
48#[derive(Clone)]
49pub struct FormationClient {
50    base_url: String,
51    client_key: Option<String>,
52    admin_key: Option<String>,
53    app: Option<String>,
54    client: Client,
55}
56
57impl FormationClient {
58    pub fn new(config: FormationConfig) -> Result<Self> {
59        let base_url = if let Some(base) = config.base_url {
60            base.trim_end_matches('/').to_string()
61        } else if let (Some(server), Some(formation)) = (&config.server_url, &config.formation_id) {
62            let prefix = if config.mode == "draft" { "draft" } else { "api" };
63            format!("{}/{}/{}/v1", server.trim_end_matches('/'), prefix, formation)
64        } else {
65            return Err(MuxiError::Connection("Must provide base_url or server_url+formation_id".to_string()));
66        };
67        
68        let client = Client::builder()
69            .timeout(Duration::from_secs(config.timeout))
70            .build()?;
71        
72        Ok(Self { base_url, client_key: config.client_key, admin_key: config.admin_key, app: config.app, client })
73    }
74    
75    // Health / Status
76    pub async fn health(&self) -> Result<Value> { self.request("GET", "/health", None, None, false, None).await }
77    pub async fn get_status(&self) -> Result<Value> { self.request("GET", "/status", None, None, true, None).await }
78    pub async fn get_config(&self) -> Result<Value> { self.request("GET", "/config", None, None, true, None).await }
79    pub async fn get_formation_info(&self) -> Result<Value> { self.request("GET", "/formation", None, None, true, None).await }
80    
81    // Agents / MCP
82    pub async fn get_agents(&self) -> Result<Value> { self.request("GET", "/agents", None, None, true, None).await }
83    pub async fn get_agent(&self, agent_id: &str) -> Result<Value> { self.request("GET", &format!("/agents/{}", agent_id), None, None, true, None).await }
84    pub async fn get_mcp_servers(&self) -> Result<Value> { self.request("GET", "/mcp/servers", None, None, true, None).await }
85    pub async fn get_mcp_server(&self, server_id: &str) -> Result<Value> { self.request("GET", &format!("/mcp/servers/{}", server_id), None, None, true, None).await }
86    pub async fn get_mcp_tools(&self) -> Result<Value> { self.request("GET", "/mcp/tools", None, None, true, None).await }
87    
88    // Secrets
89    pub async fn get_secrets(&self) -> Result<Value> { self.request("GET", "/secrets", None, None, true, None).await }
90    pub async fn get_secret(&self, key: &str) -> Result<Value> { self.request("GET", &format!("/secrets/{}", key), None, None, true, None).await }
91    pub async fn set_secret(&self, key: &str, value: &str) -> Result<Value> { self.request("PUT", &format!("/secrets/{}", key), None, Some(json!({"value": value})), true, None).await }
92    pub async fn delete_secret(&self, key: &str) -> Result<Value> { self.request("DELETE", &format!("/secrets/{}", key), None, None, true, None).await }
93    
94    // Chat
95    pub async fn chat(&self, payload: Value, user_id: Option<&str>) -> Result<Value> { self.request("POST", "/chat", None, Some(payload), false, user_id).await }
96    pub fn chat_stream<'a>(&'a self, mut payload: Value, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
97        payload.as_object_mut().map(|o| o.insert("stream".to_string(), json!(true)));
98        self.stream_sse_post("/chat", payload, false, user_id)
99    }
100    pub async fn audio_chat(&self, payload: Value, user_id: Option<&str>) -> Result<Value> { self.request("POST", "/audiochat", None, Some(payload), false, user_id).await }
101    pub fn audio_chat_stream<'a>(&'a self, mut payload: Value, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
102        payload.as_object_mut().map(|o| o.insert("stream".to_string(), json!(true)));
103        self.stream_sse_post("/audiochat", payload, false, user_id)
104    }
105    
106    // Sessions
107    pub async fn get_sessions(&self, user_id: &str, limit: Option<u32>) -> Result<Value> {
108        let mut params = vec![("user_id", user_id.to_string())];
109        if let Some(l) = limit { params.push(("limit", l.to_string())); }
110        self.request("GET", "/sessions", Some(params), None, false, Some(user_id)).await
111    }
112    pub async fn get_session(&self, session_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/sessions/{}", session_id), None, None, false, Some(user_id)).await }
113    pub async fn get_session_messages(&self, session_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/sessions/{}/messages", session_id), None, None, false, Some(user_id)).await }
114    pub async fn restore_session(&self, session_id: &str, user_id: &str, messages: Value) -> Result<Value> {
115        self.request("POST", &format!("/sessions/{}/restore", session_id), None, Some(json!({"messages": messages})), false, Some(user_id)).await
116    }
117    
118    // Requests
119    pub async fn get_requests(&self, user_id: &str) -> Result<Value> { self.request("GET", "/requests", None, None, false, Some(user_id)).await }
120    pub async fn get_request_status(&self, request_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/requests/{}", request_id), None, None, false, Some(user_id)).await }
121    pub async fn cancel_request(&self, request_id: &str, user_id: &str) -> Result<Value> { self.request("DELETE", &format!("/requests/{}", request_id), None, None, false, Some(user_id)).await }
122    
123    // Memory
124    pub async fn get_memory_config(&self) -> Result<Value> { self.request("GET", "/memory", None, None, true, None).await }
125    pub async fn get_memories(&self, user_id: &str, limit: Option<u32>) -> Result<Value> {
126        let mut params = vec![("user_id", user_id.to_string())];
127        if let Some(l) = limit { params.push(("limit", l.to_string())); }
128        self.request("GET", "/memories", Some(params), None, false, Some(user_id)).await
129    }
130    pub async fn add_memory(&self, user_id: &str, memory_type: &str, detail: &str) -> Result<Value> {
131        self.request("POST", "/memories", None, Some(json!({"user_id": user_id, "type": memory_type, "detail": detail})), false, Some(user_id)).await
132    }
133    pub async fn delete_memory(&self, user_id: &str, memory_id: &str) -> Result<Value> {
134        self.request("DELETE", &format!("/memories/{}", memory_id), Some(vec![("user_id", user_id.to_string())]), None, false, Some(user_id)).await
135    }
136    pub async fn get_user_buffer(&self, user_id: &str) -> Result<Value> {
137        self.request("GET", "/memory/buffer", Some(vec![("user_id", user_id.to_string())]), None, false, None).await
138    }
139    pub async fn clear_user_buffer(&self, user_id: &str) -> Result<Value> {
140        self.request("DELETE", "/memory/buffer", Some(vec![("user_id", user_id.to_string())]), None, false, None).await
141    }
142    pub async fn clear_all_buffers(&self) -> Result<Value> { self.request("DELETE", "/memory/buffer", None, None, true, None).await }
143    pub async fn clear_session_buffer(&self, user_id: &str, session_id: &str) -> Result<Value> {
144        self.request("DELETE", &format!("/memory/buffer/{}", session_id), Some(vec![("user_id", user_id.to_string())]), None, false, None).await
145    }
146    pub async fn get_buffer_stats(&self) -> Result<Value> { self.request("GET", "/memory/stats", None, None, true, None).await }
147    
148    // Scheduler
149    pub async fn get_scheduler_config(&self) -> Result<Value> { self.request("GET", "/scheduler", None, None, true, None).await }
150    pub async fn get_scheduler_jobs(&self, user_id: &str) -> Result<Value> {
151        self.request("GET", "/scheduler/jobs", Some(vec![("user_id", user_id.to_string())]), None, true, None).await
152    }
153    pub async fn get_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("GET", &format!("/scheduler/jobs/{}", job_id), None, None, true, None).await }
154    pub async fn create_scheduler_job(&self, job_type: &str, schedule: &str, message: &str, user_id: &str) -> Result<Value> {
155        self.request("POST", "/scheduler/jobs", None, Some(json!({"type": job_type, "schedule": schedule, "message": message, "user_id": user_id})), true, None).await
156    }
157    pub async fn delete_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("DELETE", &format!("/scheduler/jobs/{}", job_id), None, None, true, None).await }
158    pub async fn update_scheduler_job(&self, job_id: &str, updates: Value) -> Result<Value> { self.request("PUT", &format!("/scheduler/jobs/{}", job_id), None, Some(updates), true, None).await }
159    pub async fn pause_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("POST", &format!("/scheduler/jobs/{}/pause", job_id), None, None, true, None).await }
160    pub async fn resume_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("POST", &format!("/scheduler/jobs/{}/resume", job_id), None, None, true, None).await }
161    
162    // Config endpoints
163    pub async fn get_async_config(&self) -> Result<Value> { self.request("GET", "/async", None, None, true, None).await }
164    pub async fn get_a2a_config(&self) -> Result<Value> { self.request("GET", "/a2a", None, None, true, None).await }
165    pub async fn get_logging_config(&self) -> Result<Value> { self.request("GET", "/logging", None, None, true, None).await }
166    pub async fn get_logging_destinations(&self) -> Result<Value> { self.request("GET", "/logging/destinations", None, None, true, None).await }
167    pub async fn get_overlord_config(&self) -> Result<Value> { self.request("GET", "/overlord", None, None, true, None).await }
168    pub async fn get_overlord_soul(&self) -> Result<Value> { self.request("GET", "/overlord/soul", None, None, true, None).await }
169    pub async fn get_llm_settings(&self) -> Result<Value> { self.request("GET", "/llm/settings", None, None, true, None).await }
170    
171    // Triggers / SOPs / Audit
172    pub async fn get_triggers(&self) -> Result<Value> { self.request("GET", "/triggers", None, None, false, None).await }
173    pub async fn get_trigger(&self, name: &str) -> Result<Value> { self.request("GET", &format!("/triggers/{}", name), None, None, false, None).await }
174    pub async fn fire_trigger(&self, name: &str, data: Value, is_async: bool, user_id: Option<&str>) -> Result<Value> {
175        self.request("POST", &format!("/triggers/{}", name), Some(vec![("async", is_async.to_string())]), Some(data), false, user_id).await
176    }
177    pub async fn get_sops(&self) -> Result<Value> { self.request("GET", "/sops", None, None, false, None).await }
178    pub async fn get_sop(&self, name: &str) -> Result<Value> { self.request("GET", &format!("/sops/{}", name), None, None, false, None).await }
179    pub async fn get_audit_log(&self) -> Result<Value> { self.request("GET", "/audit", None, None, true, None).await }
180    pub async fn clear_audit_log(&self) -> Result<Value> { self.request("DELETE", "/audit?confirm=clear-audit-log", None, None, true, None).await }
181    
182    // Credentials
183    pub async fn list_credential_services(&self) -> Result<Value> { self.request("GET", "/credentials/services", None, None, true, None).await }
184    pub async fn list_credentials(&self, user_id: &str) -> Result<Value> { self.request("GET", "/credentials", None, None, false, Some(user_id)).await }
185    pub async fn get_credential(&self, credential_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/credentials/{}", credential_id), None, None, false, Some(user_id)).await }
186    pub async fn create_credential(&self, user_id: &str, payload: Value) -> Result<Value> { self.request("POST", "/credentials", None, Some(payload), false, Some(user_id)).await }
187    pub async fn delete_credential(&self, credential_id: &str, user_id: &str) -> Result<Value> { self.request("DELETE", &format!("/credentials/{}", credential_id), None, None, false, Some(user_id)).await }
188    
189    // User identifiers
190    pub async fn get_user_identifiers(&self, user_id: &str) -> Result<Value> { self.request("GET", &format!("/users/identifiers/{}", user_id), None, None, true, None).await }
191    pub async fn link_user_identifier(&self, muxi_user_id: &str, identifiers: Value) -> Result<Value> {
192        self.request("POST", "/users/identifiers", None, Some(json!({"muxi_user_id": muxi_user_id, "identifiers": identifiers})), true, None).await
193    }
194    pub async fn unlink_user_identifier(&self, identifier: &str) -> Result<Value> { self.request("DELETE", &format!("/users/identifiers/{}", identifier), None, None, true, None).await }
195    
196    // Streaming
197    pub fn stream_events<'a>(&'a self, user_id: &'a str) -> impl Stream<Item = Result<SseEvent>> + 'a {
198        self.stream_sse_get("/events", Some(vec![("user_id", user_id.to_string())]), false, Some(user_id))
199    }
200    pub fn stream_logs<'a>(&'a self, filters: Option<Vec<(&'a str, String)>>) -> impl Stream<Item = Result<SseEvent>> + 'a {
201        self.stream_sse_get("/logs", filters, true, None)
202    }
203    
204    // Resolve user
205    pub async fn resolve_user(&self, identifier: &str, create_user: bool) -> Result<Value> {
206        self.request("POST", "/users/resolve", None, Some(json!({"identifier": identifier, "create_user": create_user})), false, None).await
207    }
208    
209    async fn request(&self, method: &str, path: &str, params: Option<Vec<(&str, String)>>, body: Option<Value>, use_admin: bool, user_id: Option<&str>) -> Result<Value> {
210        let url = self.build_url(path, params);
211        let mut req = match method {
212            "GET" => self.client.get(&url),
213            "POST" => self.client.post(&url),
214            "PUT" => self.client.put(&url),
215            "DELETE" => self.client.delete(&url),
216            _ => return Err(MuxiError::Connection(format!("Unknown method: {}", method))),
217        };
218        
219        req = self.add_headers(req, use_admin, user_id, body.is_some());
220        if let Some(b) = body { req = req.json(&b); }
221        
222        let resp = req.send().await?;
223        self.handle_response(resp).await
224    }
225    
226    fn stream_sse_post<'a>(&'a self, path: &'a str, body: Value, use_admin: bool, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
227        stream! {
228            let url = self.build_url(path, None);
229            let mut req = self.client.post(&url);
230            req = self.add_headers(req, use_admin, user_id, true);
231            req = req.header("Accept", "text/event-stream").json(&body);
232            
233            match req.send().await {
234                Ok(r) => {
235                    let mut current_event: Option<String> = None;
236                    let mut data_parts: Vec<String> = Vec::new();
237                    let text = r.text().await.unwrap_or_default();
238                    for line in text.lines() {
239                        if line.starts_with(':') { continue; }
240                        if line.is_empty() {
241                            if !data_parts.is_empty() {
242                                yield Ok(SseEvent { event: current_event.take().unwrap_or_else(|| "message".to_string()), data: data_parts.join("\n") });
243                                data_parts.clear();
244                            }
245                            continue;
246                        }
247                        if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
248                        else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
249                    }
250                }
251                Err(e) => yield Err(MuxiError::Request(e)),
252            }
253        }
254    }
255    
256    fn stream_sse_get<'a>(&'a self, path: &'a str, params: Option<Vec<(&'a str, String)>>, use_admin: bool, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
257        stream! {
258            let url = self.build_url(path, params);
259            let mut req = self.client.get(&url);
260            req = self.add_headers(req, use_admin, user_id, false);
261            req = req.header("Accept", "text/event-stream");
262            
263            match req.send().await {
264                Ok(r) => {
265                    let mut current_event: Option<String> = None;
266                    let mut data_parts: Vec<String> = Vec::new();
267                    let text = r.text().await.unwrap_or_default();
268                    for line in text.lines() {
269                        if line.starts_with(':') { continue; }
270                        if line.is_empty() {
271                            if !data_parts.is_empty() {
272                                yield Ok(SseEvent { event: current_event.take().unwrap_or_else(|| "message".to_string()), data: data_parts.join("\n") });
273                                data_parts.clear();
274                            }
275                            continue;
276                        }
277                        if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
278                        else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
279                    }
280                }
281                Err(e) => yield Err(MuxiError::Request(e)),
282            }
283        }
284    }
285    
286    fn build_url(&self, path: &str, params: Option<Vec<(&str, String)>>) -> String {
287        let full_path = if path.starts_with('/') { path.to_string() } else { format!("/{}", path) };
288        let mut url = format!("{}{}", self.base_url, full_path);
289        if let Some(p) = params {
290            if !p.is_empty() {
291                url.push('?');
292                url.push_str(&p.iter().map(|(k, v)| format!("{}={}", k, v)).collect::<Vec<_>>().join("&"));
293            }
294        }
295        url
296    }
297    
298    fn add_headers(&self, req: reqwest::RequestBuilder, use_admin: bool, user_id: Option<&str>, has_body: bool) -> reqwest::RequestBuilder {
299        let mut req = req
300            .header("X-Muxi-SDK", format!("rust/{}", VERSION))
301            .header("X-Muxi-Client", format!("rust/{}", VERSION))
302            .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
303            .header("Accept", "application/json");
304        
305        if let Some(app) = &self.app { req = req.header("X-Muxi-App", app); }
306        if use_admin {
307            if let Some(key) = &self.admin_key { req = req.header("X-MUXI-ADMIN-KEY", key); }
308        } else {
309            if let Some(key) = &self.client_key { req = req.header("X-MUXI-CLIENT-KEY", key); }
310        }
311        if let Some(uid) = user_id { req = req.header("X-Muxi-User-ID", uid); }
312        if has_body { req = req.header("Content-Type", "application/json"); }
313        req
314    }
315    
316    async fn handle_response(&self, resp: reqwest::Response) -> Result<Value> {
317        let status = resp.status().as_u16();
318        
319        // Check for SDK updates (non-blocking, once per process)
320        let headers: std::collections::HashMap<String, String> = resp.headers()
321            .iter()
322            .filter_map(|(k, v)| v.to_str().ok().map(|s| (k.to_string(), s.to_string())))
323            .collect();
324        version_check::check_for_updates(&headers);
325        
326        let retry_after = resp.headers().get("Retry-After").and_then(|v| v.to_str().ok()).and_then(|v| v.parse().ok());
327        let body = resp.text().await.unwrap_or_default();
328        
329        if status >= 400 {
330            let (code, message) = if let Ok(json) = serde_json::from_str::<Value>(&body) {
331                (json.get("code").or(json.get("error")).and_then(|v| v.as_str()).map(String::from), json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error").to_string())
332            } else { (None, "Unknown error".to_string()) };
333            return Err(MuxiError::from_response(status, code, message, retry_after));
334        }
335        
336        if body.is_empty() { Ok(json!({})) } else { Ok(self.unwrap_envelope(serde_json::from_str(&body)?)) }
337    }
338    
339    fn unwrap_envelope(&self, value: Value) -> Value {
340        if let Some(obj) = value.as_object() {
341            if let Some(data) = obj.get("data") {
342                if let Some(mut result) = data.as_object().cloned() {
343                    if let Some(req) = obj.get("request").and_then(|r| r.as_object()) {
344                        if let Some(id) = req.get("id") { if !result.contains_key("request_id") { result.insert("request_id".to_string(), id.clone()); } }
345                    }
346                    return Value::Object(result);
347                }
348                return data.clone();
349            }
350        }
351        value
352    }
353}