1use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::path::{Path, PathBuf};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct Machine {
15 pub id: String,
17 pub host: String,
19 #[serde(default = "default_ssh_port")]
21 pub port: u16,
22 pub user: String,
24 #[serde(default, skip_serializing_if = "Option::is_none")]
26 pub key_path: Option<String>,
27 #[serde(default)]
29 pub tags: Vec<String>,
30 #[serde(default, skip_serializing_if = "Option::is_none")]
32 pub working_dir: Option<String>,
33 pub registered_at: DateTime<Utc>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub last_health: Option<HealthStatus>,
38}
39
40fn default_ssh_port() -> u16 {
41 22
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct HealthStatus {
47 pub reachable: bool,
49 pub latency_ms: Option<u64>,
51 pub load: Option<f64>,
53 pub available_memory_mb: Option<u64>,
55 pub checked_at: DateTime<Utc>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub error: Option<String>,
60}
61
62pub struct MachineRegistry {
64 registry_path: PathBuf,
65}
66
67impl MachineRegistry {
68 pub fn new(path: &Path) -> Self {
70 Self {
71 registry_path: path.to_path_buf(),
72 }
73 }
74
75 pub fn default_path() -> PathBuf {
77 directories::BaseDirs::new()
78 .map(|d| {
79 d.home_dir()
80 .join(".mur")
81 .join("commander")
82 .join("machines.json")
83 })
84 .unwrap_or_else(|| PathBuf::from("/tmp/mur-commander/machines.json"))
85 }
86
87 pub fn list(&self) -> Result<Vec<Machine>> {
89 if !self.registry_path.exists() {
90 return Ok(Vec::new());
91 }
92 let content =
93 std::fs::read_to_string(&self.registry_path).context("Reading machine registry")?;
94 serde_json::from_str(&content).context("Parsing machine registry")
95 }
96
97 pub fn add(&self, machine: Machine) -> Result<()> {
99 let mut machines = self.list()?;
100 if machines.iter().any(|m| m.id == machine.id) {
101 anyhow::bail!("Machine '{}' already exists", machine.id);
102 }
103 machines.push(machine);
104 self.save(&machines)
105 }
106
107 pub fn remove(&self, id: &str) -> Result<bool> {
109 let mut machines = self.list()?;
110 let before = machines.len();
111 machines.retain(|m| m.id != id);
112 if machines.len() == before {
113 return Ok(false);
114 }
115 self.save(&machines)?;
116 Ok(true)
117 }
118
119 pub fn get(&self, id: &str) -> Result<Option<Machine>> {
121 let machines = self.list()?;
122 Ok(machines.into_iter().find(|m| m.id == id))
123 }
124
125 pub fn update_health(&self, id: &str, health: HealthStatus) -> Result<()> {
127 let mut machines = self.list()?;
128 if let Some(machine) = machines.iter_mut().find(|m| m.id == id) {
129 machine.last_health = Some(health);
130 }
131 self.save(&machines)
132 }
133
134 fn save(&self, machines: &[Machine]) -> Result<()> {
136 if let Some(parent) = self.registry_path.parent() {
137 std::fs::create_dir_all(parent)?;
138 }
139 let json = serde_json::to_string_pretty(machines)?;
140 std::fs::write(&self.registry_path, json).context("Writing machine registry")
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct RemoteExecResult {
151 pub machine_id: String,
153 pub exit_code: i32,
155 pub stdout: String,
157 pub stderr: String,
159 pub duration_ms: u64,
161 pub success: bool,
163}
164
165pub struct RemoteExecutor;
167
168impl RemoteExecutor {
169 pub async fn exec(machine: &Machine, command: &str) -> Result<RemoteExecResult> {
171 let start = std::time::Instant::now();
172
173 let mut ssh_args = vec![
174 "-o".to_string(),
175 "StrictHostKeyChecking=yes".to_string(),
176 "-o".to_string(),
177 "ConnectTimeout=10".to_string(),
178 "-p".to_string(),
179 machine.port.to_string(),
180 ];
181
182 if let Some(ref key) = machine.key_path {
183 ssh_args.push("-i".into());
184 ssh_args.push(key.clone());
185 }
186
187 let target = format!("{}@{}", machine.user, machine.host);
188 ssh_args.push(target);
189
190 let escaped_command = shell_escape(command);
193 let full_command = if let Some(ref dir) = machine.working_dir {
194 format!("cd {} && sh -c {}", shell_escape(dir), escaped_command)
195 } else {
196 format!("sh -c {}", escaped_command)
197 };
198 ssh_args.push(full_command);
199
200 let output = tokio::process::Command::new("ssh")
201 .args(&ssh_args)
202 .output()
203 .await
204 .context("Spawning SSH process")?;
205
206 let duration = start.elapsed();
207 let exit_code = output.status.code().unwrap_or(-1);
208
209 Ok(RemoteExecResult {
210 machine_id: machine.id.clone(),
211 exit_code,
212 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
213 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
214 duration_ms: duration.as_millis() as u64,
215 success: exit_code == 0,
216 })
217 }
218
219 pub async fn health_check(machine: &Machine) -> HealthStatus {
221 let start = std::time::Instant::now();
222
223 let result = Self::exec(
224 machine,
225 "echo ok && uptime | awk '{print $NF}' && free -m 2>/dev/null | awk '/Mem:/{print $7}' || echo 0",
226 )
227 .await;
228
229 let duration = start.elapsed();
230
231 match result {
232 Ok(exec_result) if exec_result.success => {
233 let lines: Vec<&str> = exec_result.stdout.lines().collect();
234 let load = lines
235 .get(1)
236 .and_then(|l| l.trim().parse::<f64>().ok());
237 let mem = lines
238 .get(2)
239 .and_then(|l| l.trim().parse::<u64>().ok());
240
241 HealthStatus {
242 reachable: true,
243 latency_ms: Some(duration.as_millis() as u64),
244 load,
245 available_memory_mb: mem,
246 checked_at: Utc::now(),
247 error: None,
248 }
249 }
250 Ok(exec_result) => HealthStatus {
251 reachable: false,
252 latency_ms: Some(duration.as_millis() as u64),
253 load: None,
254 available_memory_mb: None,
255 checked_at: Utc::now(),
256 error: Some(exec_result.stderr),
257 },
258 Err(e) => HealthStatus {
259 reachable: false,
260 latency_ms: None,
261 load: None,
262 available_memory_mb: None,
263 checked_at: Utc::now(),
264 error: Some(e.to_string()),
265 },
266 }
267 }
268}
269
270pub struct AgentRelay;
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct RelayResult {
280 pub results: Vec<RemoteExecResult>,
282 pub total_duration_ms: u64,
284 pub successes: usize,
286 pub failures: usize,
288}
289
290impl AgentRelay {
291 pub async fn exec_sequential(
293 machines: &[Machine],
294 command: &str,
295 ) -> Result<RelayResult> {
296 let start = std::time::Instant::now();
297 let mut results = Vec::new();
298
299 for machine in machines {
300 let result = RemoteExecutor::exec(machine, command).await?;
301 results.push(result);
302 }
303
304 let successes = results.iter().filter(|r| r.success).count();
305 let failures = results.len() - successes;
306
307 Ok(RelayResult {
308 results,
309 total_duration_ms: start.elapsed().as_millis() as u64,
310 successes,
311 failures,
312 })
313 }
314
315 pub async fn exec_parallel(
317 machines: &[Machine],
318 command: &str,
319 ) -> Result<RelayResult> {
320 let start = std::time::Instant::now();
321 let mut handles = Vec::new();
322
323 for machine in machines {
324 let machine = machine.clone();
325 let cmd = command.to_string();
326 handles.push(tokio::spawn(async move {
327 RemoteExecutor::exec(&machine, &cmd).await
328 }));
329 }
330
331 let mut results = Vec::new();
332 for handle in handles {
333 match handle.await {
334 Ok(Ok(result)) => results.push(result),
335 Ok(Err(e)) => {
336 results.push(RemoteExecResult {
337 machine_id: "unknown".into(),
338 exit_code: -1,
339 stdout: String::new(),
340 stderr: e.to_string(),
341 duration_ms: 0,
342 success: false,
343 });
344 }
345 Err(e) => {
346 results.push(RemoteExecResult {
347 machine_id: "unknown".into(),
348 exit_code: -1,
349 stdout: String::new(),
350 stderr: e.to_string(),
351 duration_ms: 0,
352 success: false,
353 });
354 }
355 }
356 }
357
358 let successes = results.iter().filter(|r| r.success).count();
359 let failures = results.len() - successes;
360
361 Ok(RelayResult {
362 results,
363 total_duration_ms: start.elapsed().as_millis() as u64,
364 successes,
365 failures,
366 })
367 }
368
369 pub async fn health_check_all(machines: &[Machine]) -> Vec<(String, HealthStatus)> {
371 let mut results = Vec::new();
372
373 for machine in machines {
374 let health = RemoteExecutor::health_check(machine).await;
375 results.push((machine.id.clone(), health));
376 }
377
378 results
379 }
380}
381
382fn shell_escape(s: &str) -> String {
384 format!("'{}'", s.replace('\'', "'\\''"))
385}
386
387pub fn aggregate_results(results: &[RemoteExecResult]) -> serde_json::Value {
393 let total = results.len();
394 let successes = results.iter().filter(|r| r.success).count();
395 let failures = total - successes;
396 let avg_duration = if total > 0 {
397 results.iter().map(|r| r.duration_ms).sum::<u64>() / total as u64
398 } else {
399 0
400 };
401
402 serde_json::json!({
403 "total_machines": total,
404 "successes": successes,
405 "failures": failures,
406 "avg_duration_ms": avg_duration,
407 "machines": results.iter().map(|r| serde_json::json!({
408 "id": r.machine_id,
409 "success": r.success,
410 "exit_code": r.exit_code,
411 "duration_ms": r.duration_ms,
412 })).collect::<Vec<_>>(),
413 })
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use tempfile::TempDir;
420
421 fn test_registry() -> (TempDir, MachineRegistry) {
422 let dir = TempDir::new().unwrap();
423 let path = dir.path().join("machines.json");
424 (dir, MachineRegistry::new(&path))
425 }
426
427 fn test_machine(id: &str) -> Machine {
428 Machine {
429 id: id.into(),
430 host: "192.168.1.100".into(),
431 port: 22,
432 user: "deploy".into(),
433 key_path: None,
434 tags: vec!["staging".into()],
435 working_dir: Some("/opt/app".into()),
436 registered_at: Utc::now(),
437 last_health: None,
438 }
439 }
440
441 #[test]
442 fn test_empty_registry() {
443 let (_dir, registry) = test_registry();
444 assert!(registry.list().unwrap().is_empty());
445 }
446
447 #[test]
448 fn test_add_machine() {
449 let (_dir, registry) = test_registry();
450 registry.add(test_machine("web-1")).unwrap();
451
452 let machines = registry.list().unwrap();
453 assert_eq!(machines.len(), 1);
454 assert_eq!(machines[0].id, "web-1");
455 }
456
457 #[test]
458 fn test_add_duplicate_machine() {
459 let (_dir, registry) = test_registry();
460 registry.add(test_machine("web-1")).unwrap();
461 let err = registry.add(test_machine("web-1"));
462 assert!(err.is_err());
463 }
464
465 #[test]
466 fn test_remove_machine() {
467 let (_dir, registry) = test_registry();
468 registry.add(test_machine("web-1")).unwrap();
469
470 assert!(registry.remove("web-1").unwrap());
471 assert!(!registry.remove("web-1").unwrap());
472 assert!(registry.list().unwrap().is_empty());
473 }
474
475 #[test]
476 fn test_get_machine() {
477 let (_dir, registry) = test_registry();
478 registry.add(test_machine("web-1")).unwrap();
479
480 let machine = registry.get("web-1").unwrap();
481 assert!(machine.is_some());
482 assert_eq!(machine.unwrap().host, "192.168.1.100");
483
484 assert!(registry.get("nonexistent").unwrap().is_none());
485 }
486
487 #[test]
488 fn test_update_health() {
489 let (_dir, registry) = test_registry();
490 registry.add(test_machine("web-1")).unwrap();
491
492 let health = HealthStatus {
493 reachable: true,
494 latency_ms: Some(42),
495 load: Some(0.5),
496 available_memory_mb: Some(4096),
497 checked_at: Utc::now(),
498 error: None,
499 };
500 registry.update_health("web-1", health).unwrap();
501
502 let machine = registry.get("web-1").unwrap().unwrap();
503 assert!(machine.last_health.is_some());
504 assert!(machine.last_health.unwrap().reachable);
505 }
506
507 #[test]
508 fn test_aggregate_results() {
509 let results = vec![
510 RemoteExecResult {
511 machine_id: "web-1".into(),
512 exit_code: 0,
513 stdout: "ok".into(),
514 stderr: String::new(),
515 duration_ms: 100,
516 success: true,
517 },
518 RemoteExecResult {
519 machine_id: "web-2".into(),
520 exit_code: 1,
521 stdout: String::new(),
522 stderr: "error".into(),
523 duration_ms: 200,
524 success: false,
525 },
526 ];
527
528 let agg = aggregate_results(&results);
529 assert_eq!(agg["total_machines"], 2);
530 assert_eq!(agg["successes"], 1);
531 assert_eq!(agg["failures"], 1);
532 assert_eq!(agg["avg_duration_ms"], 150);
533 }
534
535 #[test]
536 fn test_shell_escape() {
537 assert_eq!(shell_escape("/opt/app"), "'/opt/app'");
538 assert_eq!(shell_escape("path with spaces"), "'path with spaces'");
539 assert_eq!(
540 shell_escape("it's a path"),
541 "'it'\\''s a path'"
542 );
543 }
544
545 #[test]
546 fn test_machine_serialization() {
547 let machine = test_machine("web-1");
548 let json = serde_json::to_string(&machine).unwrap();
549 let back: Machine = serde_json::from_str(&json).unwrap();
550 assert_eq!(back.id, "web-1");
551 assert_eq!(back.port, 22);
552 }
553
554 #[test]
555 fn test_relay_result_serialization() {
556 let result = RelayResult {
557 results: vec![],
558 total_duration_ms: 500,
559 successes: 0,
560 failures: 0,
561 };
562 let json = serde_json::to_string(&result).unwrap();
563 assert!(json.contains("500"));
564 }
565}