Skip to main content

mur_core/
machine.rs

1//! Multi-Machine — SSH remote execution, agent relay, and machine registry.
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::path::{Path, PathBuf};
7
8// ---------------------------------------------------------------------------
9// Machine Registry
10// ---------------------------------------------------------------------------
11
12/// A registered remote machine.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct Machine {
15    /// Unique machine identifier (user-chosen name).
16    pub id: String,
17    /// Hostname or IP address.
18    pub host: String,
19    /// SSH port (default: 22).
20    #[serde(default = "default_ssh_port")]
21    pub port: u16,
22    /// SSH username.
23    pub user: String,
24    /// Path to SSH private key (optional, uses ssh-agent by default).
25    #[serde(default, skip_serializing_if = "Option::is_none")]
26    pub key_path: Option<String>,
27    /// Optional tags for grouping machines.
28    #[serde(default)]
29    pub tags: Vec<String>,
30    /// Working directory on the remote machine.
31    #[serde(default, skip_serializing_if = "Option::is_none")]
32    pub working_dir: Option<String>,
33    /// When this machine was registered.
34    pub registered_at: DateTime<Utc>,
35    /// Last health check result.
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub last_health: Option<HealthStatus>,
38}
39
40fn default_ssh_port() -> u16 {
41    22
42}
43
44/// Health check result for a machine.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct HealthStatus {
47    /// Whether the machine is reachable.
48    pub reachable: bool,
49    /// Round-trip latency in milliseconds.
50    pub latency_ms: Option<u64>,
51    /// System load (if available).
52    pub load: Option<f64>,
53    /// Available memory in MB (if available).
54    pub available_memory_mb: Option<u64>,
55    /// Timestamp of this check.
56    pub checked_at: DateTime<Utc>,
57    /// Error message if unreachable.
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub error: Option<String>,
60}
61
62/// Machine registry — persists machine configurations.
63pub struct MachineRegistry {
64    registry_path: PathBuf,
65}
66
67impl MachineRegistry {
68    /// Create a new machine registry.
69    pub fn new(path: &Path) -> Self {
70        Self {
71            registry_path: path.to_path_buf(),
72        }
73    }
74
75    /// Default registry path.
76    pub fn default_path() -> PathBuf {
77        directories::BaseDirs::new()
78            .map(|d| {
79                d.home_dir()
80                    .join(".mur")
81                    .join("commander")
82                    .join("machines.json")
83            })
84            .unwrap_or_else(|| PathBuf::from("/tmp/mur-commander/machines.json"))
85    }
86
87    /// List all registered machines.
88    pub fn list(&self) -> Result<Vec<Machine>> {
89        if !self.registry_path.exists() {
90            return Ok(Vec::new());
91        }
92        let content =
93            std::fs::read_to_string(&self.registry_path).context("Reading machine registry")?;
94        serde_json::from_str(&content).context("Parsing machine registry")
95    }
96
97    /// Add a machine to the registry.
98    pub fn add(&self, machine: Machine) -> Result<()> {
99        let mut machines = self.list()?;
100        if machines.iter().any(|m| m.id == machine.id) {
101            anyhow::bail!("Machine '{}' already exists", machine.id);
102        }
103        machines.push(machine);
104        self.save(&machines)
105    }
106
107    /// Remove a machine by ID.
108    pub fn remove(&self, id: &str) -> Result<bool> {
109        let mut machines = self.list()?;
110        let before = machines.len();
111        machines.retain(|m| m.id != id);
112        if machines.len() == before {
113            return Ok(false);
114        }
115        self.save(&machines)?;
116        Ok(true)
117    }
118
119    /// Get a machine by ID.
120    pub fn get(&self, id: &str) -> Result<Option<Machine>> {
121        let machines = self.list()?;
122        Ok(machines.into_iter().find(|m| m.id == id))
123    }
124
125    /// Update a machine's health status.
126    pub fn update_health(&self, id: &str, health: HealthStatus) -> Result<()> {
127        let mut machines = self.list()?;
128        if let Some(machine) = machines.iter_mut().find(|m| m.id == id) {
129            machine.last_health = Some(health);
130        }
131        self.save(&machines)
132    }
133
134    /// Save the registry.
135    fn save(&self, machines: &[Machine]) -> Result<()> {
136        if let Some(parent) = self.registry_path.parent() {
137            std::fs::create_dir_all(parent)?;
138        }
139        let json = serde_json::to_string_pretty(machines)?;
140        std::fs::write(&self.registry_path, json).context("Writing machine registry")
141    }
142}
143
144// ---------------------------------------------------------------------------
145// Remote Execution
146// ---------------------------------------------------------------------------
147
148/// Result of a remote command execution.
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct RemoteExecResult {
151    /// Machine that executed the command.
152    pub machine_id: String,
153    /// Exit code of the command.
154    pub exit_code: i32,
155    /// Standard output.
156    pub stdout: String,
157    /// Standard error.
158    pub stderr: String,
159    /// Execution duration in milliseconds.
160    pub duration_ms: u64,
161    /// Whether execution succeeded (exit_code == 0).
162    pub success: bool,
163}
164
165/// Remote executor using SSH (via async process spawning with `ssh` command).
166pub struct RemoteExecutor;
167
168impl RemoteExecutor {
169    /// Execute a command on a remote machine via SSH.
170    pub async fn exec(machine: &Machine, command: &str) -> Result<RemoteExecResult> {
171        let start = std::time::Instant::now();
172
173        let mut ssh_args = vec![
174            "-o".to_string(),
175            "StrictHostKeyChecking=yes".to_string(),
176            "-o".to_string(),
177            "ConnectTimeout=10".to_string(),
178            "-p".to_string(),
179            machine.port.to_string(),
180        ];
181
182        if let Some(ref key) = machine.key_path {
183            ssh_args.push("-i".into());
184            ssh_args.push(key.clone());
185        }
186
187        let target = format!("{}@{}", machine.user, machine.host);
188        ssh_args.push(target);
189
190        // If a working directory is set, cd into it first
191        // Escape the command to prevent shell injection on remote
192        let escaped_command = shell_escape(command);
193        let full_command = if let Some(ref dir) = machine.working_dir {
194            format!("cd {} && sh -c {}", shell_escape(dir), escaped_command)
195        } else {
196            format!("sh -c {}", escaped_command)
197        };
198        ssh_args.push(full_command);
199
200        let output = tokio::process::Command::new("ssh")
201            .args(&ssh_args)
202            .output()
203            .await
204            .context("Spawning SSH process")?;
205
206        let duration = start.elapsed();
207        let exit_code = output.status.code().unwrap_or(-1);
208
209        Ok(RemoteExecResult {
210            machine_id: machine.id.clone(),
211            exit_code,
212            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
213            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
214            duration_ms: duration.as_millis() as u64,
215            success: exit_code == 0,
216        })
217    }
218
219    /// Check if a machine is reachable via SSH.
220    pub async fn health_check(machine: &Machine) -> HealthStatus {
221        let start = std::time::Instant::now();
222
223        let result = Self::exec(
224            machine,
225            "echo ok && uptime | awk '{print $NF}' && free -m 2>/dev/null | awk '/Mem:/{print $7}' || echo 0",
226        )
227        .await;
228
229        let duration = start.elapsed();
230
231        match result {
232            Ok(exec_result) if exec_result.success => {
233                let lines: Vec<&str> = exec_result.stdout.lines().collect();
234                let load = lines
235                    .get(1)
236                    .and_then(|l| l.trim().parse::<f64>().ok());
237                let mem = lines
238                    .get(2)
239                    .and_then(|l| l.trim().parse::<u64>().ok());
240
241                HealthStatus {
242                    reachable: true,
243                    latency_ms: Some(duration.as_millis() as u64),
244                    load,
245                    available_memory_mb: mem,
246                    checked_at: Utc::now(),
247                    error: None,
248                }
249            }
250            Ok(exec_result) => HealthStatus {
251                reachable: false,
252                latency_ms: Some(duration.as_millis() as u64),
253                load: None,
254                available_memory_mb: None,
255                checked_at: Utc::now(),
256                error: Some(exec_result.stderr),
257            },
258            Err(e) => HealthStatus {
259                reachable: false,
260                latency_ms: None,
261                load: None,
262                available_memory_mb: None,
263                checked_at: Utc::now(),
264                error: Some(e.to_string()),
265            },
266        }
267    }
268}
269
270// ---------------------------------------------------------------------------
271// Agent Relay — execute workflows on remote machines
272// ---------------------------------------------------------------------------
273
274/// Relay for executing workflows across multiple machines.
275pub struct AgentRelay;
276
277/// Result of multi-machine execution.
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct RelayResult {
280    /// Per-machine results.
281    pub results: Vec<RemoteExecResult>,
282    /// Total duration in milliseconds.
283    pub total_duration_ms: u64,
284    /// Number of successful executions.
285    pub successes: usize,
286    /// Number of failed executions.
287    pub failures: usize,
288}
289
290impl AgentRelay {
291    /// Execute a command on multiple machines sequentially.
292    pub async fn exec_sequential(
293        machines: &[Machine],
294        command: &str,
295    ) -> Result<RelayResult> {
296        let start = std::time::Instant::now();
297        let mut results = Vec::new();
298
299        for machine in machines {
300            let result = RemoteExecutor::exec(machine, command).await?;
301            results.push(result);
302        }
303
304        let successes = results.iter().filter(|r| r.success).count();
305        let failures = results.len() - successes;
306
307        Ok(RelayResult {
308            results,
309            total_duration_ms: start.elapsed().as_millis() as u64,
310            successes,
311            failures,
312        })
313    }
314
315    /// Execute a command on multiple machines concurrently.
316    pub async fn exec_parallel(
317        machines: &[Machine],
318        command: &str,
319    ) -> Result<RelayResult> {
320        let start = std::time::Instant::now();
321        let mut handles = Vec::new();
322
323        for machine in machines {
324            let machine = machine.clone();
325            let cmd = command.to_string();
326            handles.push(tokio::spawn(async move {
327                RemoteExecutor::exec(&machine, &cmd).await
328            }));
329        }
330
331        let mut results = Vec::new();
332        for handle in handles {
333            match handle.await {
334                Ok(Ok(result)) => results.push(result),
335                Ok(Err(e)) => {
336                    results.push(RemoteExecResult {
337                        machine_id: "unknown".into(),
338                        exit_code: -1,
339                        stdout: String::new(),
340                        stderr: e.to_string(),
341                        duration_ms: 0,
342                        success: false,
343                    });
344                }
345                Err(e) => {
346                    results.push(RemoteExecResult {
347                        machine_id: "unknown".into(),
348                        exit_code: -1,
349                        stdout: String::new(),
350                        stderr: e.to_string(),
351                        duration_ms: 0,
352                        success: false,
353                    });
354                }
355            }
356        }
357
358        let successes = results.iter().filter(|r| r.success).count();
359        let failures = results.len() - successes;
360
361        Ok(RelayResult {
362            results,
363            total_duration_ms: start.elapsed().as_millis() as u64,
364            successes,
365            failures,
366        })
367    }
368
369    /// Run health checks on all machines.
370    pub async fn health_check_all(machines: &[Machine]) -> Vec<(String, HealthStatus)> {
371        let mut results = Vec::new();
372
373        for machine in machines {
374            let health = RemoteExecutor::health_check(machine).await;
375            results.push((machine.id.clone(), health));
376        }
377
378        results
379    }
380}
381
382/// Minimal shell escaping for a path.
383fn shell_escape(s: &str) -> String {
384    format!("'{}'", s.replace('\'', "'\\''"))
385}
386
387// ---------------------------------------------------------------------------
388// Result Aggregation
389// ---------------------------------------------------------------------------
390
391/// Aggregate results from multi-machine execution.
392pub fn aggregate_results(results: &[RemoteExecResult]) -> serde_json::Value {
393    let total = results.len();
394    let successes = results.iter().filter(|r| r.success).count();
395    let failures = total - successes;
396    let avg_duration = if total > 0 {
397        results.iter().map(|r| r.duration_ms).sum::<u64>() / total as u64
398    } else {
399        0
400    };
401
402    serde_json::json!({
403        "total_machines": total,
404        "successes": successes,
405        "failures": failures,
406        "avg_duration_ms": avg_duration,
407        "machines": results.iter().map(|r| serde_json::json!({
408            "id": r.machine_id,
409            "success": r.success,
410            "exit_code": r.exit_code,
411            "duration_ms": r.duration_ms,
412        })).collect::<Vec<_>>(),
413    })
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use tempfile::TempDir;
420
421    fn test_registry() -> (TempDir, MachineRegistry) {
422        let dir = TempDir::new().unwrap();
423        let path = dir.path().join("machines.json");
424        (dir, MachineRegistry::new(&path))
425    }
426
427    fn test_machine(id: &str) -> Machine {
428        Machine {
429            id: id.into(),
430            host: "192.168.1.100".into(),
431            port: 22,
432            user: "deploy".into(),
433            key_path: None,
434            tags: vec!["staging".into()],
435            working_dir: Some("/opt/app".into()),
436            registered_at: Utc::now(),
437            last_health: None,
438        }
439    }
440
441    #[test]
442    fn test_empty_registry() {
443        let (_dir, registry) = test_registry();
444        assert!(registry.list().unwrap().is_empty());
445    }
446
447    #[test]
448    fn test_add_machine() {
449        let (_dir, registry) = test_registry();
450        registry.add(test_machine("web-1")).unwrap();
451
452        let machines = registry.list().unwrap();
453        assert_eq!(machines.len(), 1);
454        assert_eq!(machines[0].id, "web-1");
455    }
456
457    #[test]
458    fn test_add_duplicate_machine() {
459        let (_dir, registry) = test_registry();
460        registry.add(test_machine("web-1")).unwrap();
461        let err = registry.add(test_machine("web-1"));
462        assert!(err.is_err());
463    }
464
465    #[test]
466    fn test_remove_machine() {
467        let (_dir, registry) = test_registry();
468        registry.add(test_machine("web-1")).unwrap();
469
470        assert!(registry.remove("web-1").unwrap());
471        assert!(!registry.remove("web-1").unwrap());
472        assert!(registry.list().unwrap().is_empty());
473    }
474
475    #[test]
476    fn test_get_machine() {
477        let (_dir, registry) = test_registry();
478        registry.add(test_machine("web-1")).unwrap();
479
480        let machine = registry.get("web-1").unwrap();
481        assert!(machine.is_some());
482        assert_eq!(machine.unwrap().host, "192.168.1.100");
483
484        assert!(registry.get("nonexistent").unwrap().is_none());
485    }
486
487    #[test]
488    fn test_update_health() {
489        let (_dir, registry) = test_registry();
490        registry.add(test_machine("web-1")).unwrap();
491
492        let health = HealthStatus {
493            reachable: true,
494            latency_ms: Some(42),
495            load: Some(0.5),
496            available_memory_mb: Some(4096),
497            checked_at: Utc::now(),
498            error: None,
499        };
500        registry.update_health("web-1", health).unwrap();
501
502        let machine = registry.get("web-1").unwrap().unwrap();
503        assert!(machine.last_health.is_some());
504        assert!(machine.last_health.unwrap().reachable);
505    }
506
507    #[test]
508    fn test_aggregate_results() {
509        let results = vec![
510            RemoteExecResult {
511                machine_id: "web-1".into(),
512                exit_code: 0,
513                stdout: "ok".into(),
514                stderr: String::new(),
515                duration_ms: 100,
516                success: true,
517            },
518            RemoteExecResult {
519                machine_id: "web-2".into(),
520                exit_code: 1,
521                stdout: String::new(),
522                stderr: "error".into(),
523                duration_ms: 200,
524                success: false,
525            },
526        ];
527
528        let agg = aggregate_results(&results);
529        assert_eq!(agg["total_machines"], 2);
530        assert_eq!(agg["successes"], 1);
531        assert_eq!(agg["failures"], 1);
532        assert_eq!(agg["avg_duration_ms"], 150);
533    }
534
535    #[test]
536    fn test_shell_escape() {
537        assert_eq!(shell_escape("/opt/app"), "'/opt/app'");
538        assert_eq!(shell_escape("path with spaces"), "'path with spaces'");
539        assert_eq!(
540            shell_escape("it's a path"),
541            "'it'\\''s a path'"
542        );
543    }
544
545    #[test]
546    fn test_machine_serialization() {
547        let machine = test_machine("web-1");
548        let json = serde_json::to_string(&machine).unwrap();
549        let back: Machine = serde_json::from_str(&json).unwrap();
550        assert_eq!(back.id, "web-1");
551        assert_eq!(back.port, 22);
552    }
553
554    #[test]
555    fn test_relay_result_serialization() {
556        let result = RelayResult {
557            results: vec![],
558            total_duration_ms: 500,
559            successes: 0,
560            failures: 0,
561        };
562        let json = serde_json::to_string(&result).unwrap();
563        assert!(json.contains("500"));
564    }
565}