1use crate::{errors::{MuxiError, Result}, SseEvent, VERSION, version_check};
2use reqwest::Client;
3use serde_json::{json, Value};
4use std::time::Duration;
5use futures::Stream;
6use async_stream::stream;
7
8#[derive(Clone)]
9pub struct FormationConfig {
10 pub server_url: Option<String>,
11 pub formation_id: Option<String>,
12 pub base_url: Option<String>,
13 pub client_key: Option<String>,
14 pub admin_key: Option<String>,
15 pub timeout: u64,
16 pub mode: String, pub(crate) app: Option<String>, }
19
20impl FormationConfig {
21 pub fn new(server_url: &str, formation_id: &str, client_key: &str, admin_key: &str) -> Self {
22 Self {
23 server_url: Some(server_url.to_string()),
24 formation_id: Some(formation_id.to_string()),
25 base_url: None,
26 client_key: Some(client_key.to_string()),
27 admin_key: Some(admin_key.to_string()),
28 timeout: 30,
29 mode: "live".to_string(),
30 app: None,
31 }
32 }
33
34 pub fn with_base_url(base_url: &str, client_key: &str, admin_key: &str) -> Self {
35 Self {
36 server_url: None,
37 formation_id: None,
38 base_url: Some(base_url.to_string()),
39 client_key: Some(client_key.to_string()),
40 admin_key: Some(admin_key.to_string()),
41 timeout: 30,
42 mode: "live".to_string(),
43 app: None,
44 }
45 }
46}
47
48#[derive(Clone)]
49pub struct FormationClient {
50 base_url: String,
51 client_key: Option<String>,
52 admin_key: Option<String>,
53 app: Option<String>,
54 client: Client,
55}
56
57impl FormationClient {
58 pub fn new(config: FormationConfig) -> Result<Self> {
59 let base_url = if let Some(base) = config.base_url {
60 base.trim_end_matches('/').to_string()
61 } else if let (Some(server), Some(formation)) = (&config.server_url, &config.formation_id) {
62 let prefix = if config.mode == "draft" { "draft" } else { "api" };
63 format!("{}/{}/{}/v1", server.trim_end_matches('/'), prefix, formation)
64 } else {
65 return Err(MuxiError::Connection("Must provide base_url or server_url+formation_id".to_string()));
66 };
67
68 let client = Client::builder()
69 .timeout(Duration::from_secs(config.timeout))
70 .build()?;
71
72 Ok(Self { base_url, client_key: config.client_key, admin_key: config.admin_key, app: config.app, client })
73 }
74
75 pub async fn health(&self) -> Result<Value> { self.request("GET", "/health", None, None, false, None).await }
77 pub async fn get_status(&self) -> Result<Value> { self.request("GET", "/status", None, None, true, None).await }
78 pub async fn get_config(&self) -> Result<Value> { self.request("GET", "/config", None, None, true, None).await }
79 pub async fn get_formation_info(&self) -> Result<Value> { self.request("GET", "/formation", None, None, true, None).await }
80
81 pub async fn get_agents(&self) -> Result<Value> { self.request("GET", "/agents", None, None, true, None).await }
83 pub async fn get_agent(&self, agent_id: &str) -> Result<Value> { self.request("GET", &format!("/agents/{}", agent_id), None, None, true, None).await }
84 pub async fn get_mcp_servers(&self) -> Result<Value> { self.request("GET", "/mcp/servers", None, None, true, None).await }
85 pub async fn get_mcp_server(&self, server_id: &str) -> Result<Value> { self.request("GET", &format!("/mcp/servers/{}", server_id), None, None, true, None).await }
86 pub async fn get_mcp_tools(&self) -> Result<Value> { self.request("GET", "/mcp/tools", None, None, true, None).await }
87
88 pub async fn get_secrets(&self) -> Result<Value> { self.request("GET", "/secrets", None, None, true, None).await }
90 pub async fn get_secret(&self, key: &str) -> Result<Value> { self.request("GET", &format!("/secrets/{}", key), None, None, true, None).await }
91 pub async fn set_secret(&self, key: &str, value: &str) -> Result<Value> { self.request("PUT", &format!("/secrets/{}", key), None, Some(json!({"value": value})), true, None).await }
92 pub async fn delete_secret(&self, key: &str) -> Result<Value> { self.request("DELETE", &format!("/secrets/{}", key), None, None, true, None).await }
93
94 pub async fn chat(&self, payload: Value, user_id: Option<&str>) -> Result<Value> { self.request("POST", "/chat", None, Some(payload), false, user_id).await }
96 pub fn chat_stream<'a>(&'a self, mut payload: Value, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
97 payload.as_object_mut().map(|o| o.insert("stream".to_string(), json!(true)));
98 self.stream_sse_post("/chat", payload, false, user_id)
99 }
100 pub async fn audio_chat(&self, payload: Value, user_id: Option<&str>) -> Result<Value> { self.request("POST", "/audiochat", None, Some(payload), false, user_id).await }
101 pub fn audio_chat_stream<'a>(&'a self, mut payload: Value, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
102 payload.as_object_mut().map(|o| o.insert("stream".to_string(), json!(true)));
103 self.stream_sse_post("/audiochat", payload, false, user_id)
104 }
105
106 pub async fn get_sessions(&self, user_id: &str, limit: Option<u32>) -> Result<Value> {
108 let mut params = vec![("user_id", user_id.to_string())];
109 if let Some(l) = limit { params.push(("limit", l.to_string())); }
110 self.request("GET", "/sessions", Some(params), None, false, Some(user_id)).await
111 }
112 pub async fn get_session(&self, session_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/sessions/{}", session_id), None, None, false, Some(user_id)).await }
113 pub async fn get_session_messages(&self, session_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/sessions/{}/messages", session_id), None, None, false, Some(user_id)).await }
114 pub async fn restore_session(&self, session_id: &str, user_id: &str, messages: Value) -> Result<Value> {
115 self.request("POST", &format!("/sessions/{}/restore", session_id), None, Some(json!({"messages": messages})), false, Some(user_id)).await
116 }
117
118 pub async fn get_requests(&self, user_id: &str) -> Result<Value> { self.request("GET", "/requests", None, None, false, Some(user_id)).await }
120 pub async fn get_request_status(&self, request_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/requests/{}", request_id), None, None, false, Some(user_id)).await }
121 pub async fn cancel_request(&self, request_id: &str, user_id: &str) -> Result<Value> { self.request("DELETE", &format!("/requests/{}", request_id), None, None, false, Some(user_id)).await }
122
123 pub async fn get_memory_config(&self) -> Result<Value> { self.request("GET", "/memory", None, None, true, None).await }
125 pub async fn get_memories(&self, user_id: &str, limit: Option<u32>) -> Result<Value> {
126 let mut params = vec![("user_id", user_id.to_string())];
127 if let Some(l) = limit { params.push(("limit", l.to_string())); }
128 self.request("GET", "/memories", Some(params), None, false, Some(user_id)).await
129 }
130 pub async fn add_memory(&self, user_id: &str, memory_type: &str, detail: &str) -> Result<Value> {
131 self.request("POST", "/memories", None, Some(json!({"user_id": user_id, "type": memory_type, "detail": detail})), false, Some(user_id)).await
132 }
133 pub async fn delete_memory(&self, user_id: &str, memory_id: &str) -> Result<Value> {
134 self.request("DELETE", &format!("/memories/{}", memory_id), Some(vec![("user_id", user_id.to_string())]), None, false, Some(user_id)).await
135 }
136 pub async fn get_user_buffer(&self, user_id: &str) -> Result<Value> {
137 self.request("GET", "/memory/buffer", Some(vec![("user_id", user_id.to_string())]), None, false, None).await
138 }
139 pub async fn clear_user_buffer(&self, user_id: &str) -> Result<Value> {
140 self.request("DELETE", "/memory/buffer", Some(vec![("user_id", user_id.to_string())]), None, false, None).await
141 }
142 pub async fn clear_all_buffers(&self) -> Result<Value> { self.request("DELETE", "/memory/buffer", None, None, true, None).await }
143 pub async fn clear_session_buffer(&self, user_id: &str, session_id: &str) -> Result<Value> {
144 self.request("DELETE", &format!("/memory/buffer/{}", session_id), Some(vec![("user_id", user_id.to_string())]), None, false, None).await
145 }
146 pub async fn get_buffer_stats(&self) -> Result<Value> { self.request("GET", "/memory/stats", None, None, true, None).await }
147
148 pub async fn get_scheduler_config(&self) -> Result<Value> { self.request("GET", "/scheduler", None, None, true, None).await }
150 pub async fn get_scheduler_jobs(&self, user_id: &str) -> Result<Value> {
151 self.request("GET", "/scheduler/jobs", Some(vec![("user_id", user_id.to_string())]), None, true, None).await
152 }
153 pub async fn get_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("GET", &format!("/scheduler/jobs/{}", job_id), None, None, true, None).await }
154 pub async fn create_scheduler_job(&self, job_type: &str, schedule: &str, message: &str, user_id: &str) -> Result<Value> {
155 self.request("POST", "/scheduler/jobs", None, Some(json!({"type": job_type, "schedule": schedule, "message": message, "user_id": user_id})), true, None).await
156 }
157 pub async fn delete_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("DELETE", &format!("/scheduler/jobs/{}", job_id), None, None, true, None).await }
158 pub async fn update_scheduler_job(&self, job_id: &str, updates: Value) -> Result<Value> { self.request("PUT", &format!("/scheduler/jobs/{}", job_id), None, Some(updates), true, None).await }
159 pub async fn pause_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("POST", &format!("/scheduler/jobs/{}/pause", job_id), None, None, true, None).await }
160 pub async fn resume_scheduler_job(&self, job_id: &str) -> Result<Value> { self.request("POST", &format!("/scheduler/jobs/{}/resume", job_id), None, None, true, None).await }
161
162 pub async fn get_async_config(&self) -> Result<Value> { self.request("GET", "/async", None, None, true, None).await }
164 pub async fn get_a2a_config(&self) -> Result<Value> { self.request("GET", "/a2a", None, None, true, None).await }
165 pub async fn get_logging_config(&self) -> Result<Value> { self.request("GET", "/logging", None, None, true, None).await }
166 pub async fn get_logging_destinations(&self) -> Result<Value> { self.request("GET", "/logging/destinations", None, None, true, None).await }
167 pub async fn get_overlord_config(&self) -> Result<Value> { self.request("GET", "/overlord", None, None, true, None).await }
168 pub async fn get_overlord_soul(&self) -> Result<Value> { self.request("GET", "/overlord/soul", None, None, true, None).await }
169 pub async fn get_llm_settings(&self) -> Result<Value> { self.request("GET", "/llm/settings", None, None, true, None).await }
170
171 pub async fn get_triggers(&self) -> Result<Value> { self.request("GET", "/triggers", None, None, false, None).await }
173 pub async fn get_trigger(&self, name: &str) -> Result<Value> { self.request("GET", &format!("/triggers/{}", name), None, None, false, None).await }
174 pub async fn fire_trigger(&self, name: &str, data: Value, is_async: bool, user_id: Option<&str>) -> Result<Value> {
175 self.request("POST", &format!("/triggers/{}", name), Some(vec![("async", is_async.to_string())]), Some(data), false, user_id).await
176 }
177 pub async fn get_sops(&self) -> Result<Value> { self.request("GET", "/sops", None, None, false, None).await }
178 pub async fn get_sop(&self, name: &str) -> Result<Value> { self.request("GET", &format!("/sops/{}", name), None, None, false, None).await }
179 pub async fn get_audit_log(&self) -> Result<Value> { self.request("GET", "/audit", None, None, true, None).await }
180 pub async fn clear_audit_log(&self) -> Result<Value> { self.request("DELETE", "/audit?confirm=clear-audit-log", None, None, true, None).await }
181
182 pub async fn list_credential_services(&self) -> Result<Value> { self.request("GET", "/credentials/services", None, None, true, None).await }
184 pub async fn list_credentials(&self, user_id: &str) -> Result<Value> { self.request("GET", "/credentials", None, None, false, Some(user_id)).await }
185 pub async fn get_credential(&self, credential_id: &str, user_id: &str) -> Result<Value> { self.request("GET", &format!("/credentials/{}", credential_id), None, None, false, Some(user_id)).await }
186 pub async fn create_credential(&self, user_id: &str, payload: Value) -> Result<Value> { self.request("POST", "/credentials", None, Some(payload), false, Some(user_id)).await }
187 pub async fn delete_credential(&self, credential_id: &str, user_id: &str) -> Result<Value> { self.request("DELETE", &format!("/credentials/{}", credential_id), None, None, false, Some(user_id)).await }
188
189 pub async fn get_user_identifiers(&self, user_id: &str) -> Result<Value> { self.request("GET", &format!("/users/identifiers/{}", user_id), None, None, true, None).await }
191 pub async fn link_user_identifier(&self, muxi_user_id: &str, identifiers: Value) -> Result<Value> {
192 self.request("POST", "/users/identifiers", None, Some(json!({"muxi_user_id": muxi_user_id, "identifiers": identifiers})), true, None).await
193 }
194 pub async fn unlink_user_identifier(&self, identifier: &str) -> Result<Value> { self.request("DELETE", &format!("/users/identifiers/{}", identifier), None, None, true, None).await }
195
196 pub fn stream_events<'a>(&'a self, user_id: &'a str) -> impl Stream<Item = Result<SseEvent>> + 'a {
198 self.stream_sse_get("/events", Some(vec![("user_id", user_id.to_string())]), false, Some(user_id))
199 }
200 pub fn stream_logs<'a>(&'a self, filters: Option<Vec<(&'a str, String)>>) -> impl Stream<Item = Result<SseEvent>> + 'a {
201 self.stream_sse_get("/logs", filters, true, None)
202 }
203
204 pub async fn resolve_user(&self, identifier: &str, create_user: bool) -> Result<Value> {
206 self.request("POST", "/users/resolve", None, Some(json!({"identifier": identifier, "create_user": create_user})), false, None).await
207 }
208
209 async fn request(&self, method: &str, path: &str, params: Option<Vec<(&str, String)>>, body: Option<Value>, use_admin: bool, user_id: Option<&str>) -> Result<Value> {
210 let url = self.build_url(path, params);
211 let mut req = match method {
212 "GET" => self.client.get(&url),
213 "POST" => self.client.post(&url),
214 "PUT" => self.client.put(&url),
215 "DELETE" => self.client.delete(&url),
216 _ => return Err(MuxiError::Connection(format!("Unknown method: {}", method))),
217 };
218
219 req = self.add_headers(req, use_admin, user_id, body.is_some());
220 if let Some(b) = body { req = req.json(&b); }
221
222 let resp = req.send().await?;
223 self.handle_response(resp).await
224 }
225
226 fn stream_sse_post<'a>(&'a self, path: &'a str, body: Value, use_admin: bool, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
227 stream! {
228 let url = self.build_url(path, None);
229 let mut req = self.client.post(&url);
230 req = self.add_headers(req, use_admin, user_id, true);
231 req = req.header("Accept", "text/event-stream").json(&body);
232
233 match req.send().await {
234 Ok(r) => {
235 let mut current_event: Option<String> = None;
236 let mut data_parts: Vec<String> = Vec::new();
237 let text = r.text().await.unwrap_or_default();
238 for line in text.lines() {
239 if line.starts_with(':') { continue; }
240 if line.is_empty() {
241 if !data_parts.is_empty() {
242 yield Ok(SseEvent { event: current_event.take().unwrap_or_else(|| "message".to_string()), data: data_parts.join("\n") });
243 data_parts.clear();
244 }
245 continue;
246 }
247 if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
248 else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
249 }
250 }
251 Err(e) => yield Err(MuxiError::Request(e)),
252 }
253 }
254 }
255
256 fn stream_sse_get<'a>(&'a self, path: &'a str, params: Option<Vec<(&'a str, String)>>, use_admin: bool, user_id: Option<&'a str>) -> impl Stream<Item = Result<SseEvent>> + 'a {
257 stream! {
258 let url = self.build_url(path, params);
259 let mut req = self.client.get(&url);
260 req = self.add_headers(req, use_admin, user_id, false);
261 req = req.header("Accept", "text/event-stream");
262
263 match req.send().await {
264 Ok(r) => {
265 let mut current_event: Option<String> = None;
266 let mut data_parts: Vec<String> = Vec::new();
267 let text = r.text().await.unwrap_or_default();
268 for line in text.lines() {
269 if line.starts_with(':') { continue; }
270 if line.is_empty() {
271 if !data_parts.is_empty() {
272 yield Ok(SseEvent { event: current_event.take().unwrap_or_else(|| "message".to_string()), data: data_parts.join("\n") });
273 data_parts.clear();
274 }
275 continue;
276 }
277 if let Some(evt) = line.strip_prefix("event:") { current_event = Some(evt.trim().to_string()); }
278 else if let Some(d) = line.strip_prefix("data:") { data_parts.push(d.trim().to_string()); }
279 }
280 }
281 Err(e) => yield Err(MuxiError::Request(e)),
282 }
283 }
284 }
285
286 fn build_url(&self, path: &str, params: Option<Vec<(&str, String)>>) -> String {
287 let full_path = if path.starts_with('/') { path.to_string() } else { format!("/{}", path) };
288 let mut url = format!("{}{}", self.base_url, full_path);
289 if let Some(p) = params {
290 if !p.is_empty() {
291 url.push('?');
292 url.push_str(&p.iter().map(|(k, v)| format!("{}={}", k, v)).collect::<Vec<_>>().join("&"));
293 }
294 }
295 url
296 }
297
298 fn add_headers(&self, req: reqwest::RequestBuilder, use_admin: bool, user_id: Option<&str>, has_body: bool) -> reqwest::RequestBuilder {
299 let mut req = req
300 .header("X-Muxi-SDK", format!("rust/{}", VERSION))
301 .header("X-Muxi-Client", format!("rust/{}", VERSION))
302 .header("X-Muxi-Idempotency-Key", uuid::Uuid::new_v4().to_string())
303 .header("Accept", "application/json");
304
305 if let Some(app) = &self.app { req = req.header("X-Muxi-App", app); }
306 if use_admin {
307 if let Some(key) = &self.admin_key { req = req.header("X-MUXI-ADMIN-KEY", key); }
308 } else {
309 if let Some(key) = &self.client_key { req = req.header("X-MUXI-CLIENT-KEY", key); }
310 }
311 if let Some(uid) = user_id { req = req.header("X-Muxi-User-ID", uid); }
312 if has_body { req = req.header("Content-Type", "application/json"); }
313 req
314 }
315
316 async fn handle_response(&self, resp: reqwest::Response) -> Result<Value> {
317 let status = resp.status().as_u16();
318
319 let headers: std::collections::HashMap<String, String> = resp.headers()
321 .iter()
322 .filter_map(|(k, v)| v.to_str().ok().map(|s| (k.to_string(), s.to_string())))
323 .collect();
324 version_check::check_for_updates(&headers);
325
326 let retry_after = resp.headers().get("Retry-After").and_then(|v| v.to_str().ok()).and_then(|v| v.parse().ok());
327 let body = resp.text().await.unwrap_or_default();
328
329 if status >= 400 {
330 let (code, message) = if let Ok(json) = serde_json::from_str::<Value>(&body) {
331 (json.get("code").or(json.get("error")).and_then(|v| v.as_str()).map(String::from), json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error").to_string())
332 } else { (None, "Unknown error".to_string()) };
333 return Err(MuxiError::from_response(status, code, message, retry_after));
334 }
335
336 if body.is_empty() { Ok(json!({})) } else { Ok(self.unwrap_envelope(serde_json::from_str(&body)?)) }
337 }
338
339 fn unwrap_envelope(&self, value: Value) -> Value {
340 if let Some(obj) = value.as_object() {
341 if let Some(data) = obj.get("data") {
342 if let Some(mut result) = data.as_object().cloned() {
343 if let Some(req) = obj.get("request").and_then(|r| r.as_object()) {
344 if let Some(id) = req.get("id") { if !result.contains_key("request_id") { result.insert("request_id".to_string(), id.clone()); } }
345 }
346 return Value::Object(result);
347 }
348 return data.clone();
349 }
350 }
351 value
352 }
353}