Skip to main content

cbtop/remote_agent/
agent.rs

1//! Remote agent for distributed benchmark collection.
2
3use std::collections::HashMap;
4use std::time::Instant;
5
6use super::json;
7use super::metrics;
8use super::types::{
9    AggregatedResult, CommandResult, HostBenchmark, HostConfig, HostHealth, HostState,
10    RemoteAgentConfig, RemoteError, RemoteResult,
11};
12
13/// Remote agent for distributed benchmark collection
14#[derive(Debug)]
15pub struct RemoteAgent {
16    /// Agent configuration
17    config: RemoteAgentConfig,
18    /// Registered hosts
19    hosts: HashMap<String, HostState>,
20    /// Command execution history
21    history: Vec<CommandResult>,
22    /// Maximum history size
23    pub(crate) max_history: usize,
24}
25
26impl RemoteAgent {
27    /// Create a new remote agent
28    pub fn new(config: RemoteAgentConfig) -> Self {
29        Self {
30            config,
31            hosts: HashMap::new(),
32            history: Vec::new(),
33            max_history: 1000,
34        }
35    }
36
37    /// Add a host to the agent pool
38    pub fn add_host(&mut self, config: HostConfig) {
39        let key = format!("{}:{}", config.host, config.port);
40        self.hosts.insert(key, HostState::new(config));
41    }
42
43    /// Remove a host from the pool
44    pub fn remove_host(&mut self, host: &str) -> Option<HostState> {
45        // Try with default port if not specified
46        self.hosts
47            .remove(host)
48            .or_else(|| self.hosts.remove(&format!("{}:22", host)))
49    }
50
51    /// Get all registered hosts
52    pub fn hosts(&self) -> impl Iterator<Item = &HostState> {
53        self.hosts.values()
54    }
55
56    /// Get available (healthy) hosts
57    pub fn available_hosts(&self) -> impl Iterator<Item = &HostState> {
58        self.hosts.values().filter(|h| h.is_available())
59    }
60
61    /// Get host count
62    pub fn host_count(&self) -> usize {
63        self.hosts.len()
64    }
65
66    /// Get available host count
67    pub fn available_count(&self) -> usize {
68        self.available_hosts().count()
69    }
70
71    /// Execute a command on a specific host (simulated for now)
72    pub fn execute_on_host(
73        &mut self,
74        host_key: &str,
75        command: &str,
76    ) -> RemoteResult<CommandResult> {
77        // Clone config to avoid borrow conflict
78        let config = {
79            let state = self
80                .hosts
81                .get(host_key)
82                .ok_or_else(|| RemoteError::HostNotFound {
83                    host: host_key.to_string(),
84                })?;
85            state.config.clone()
86        };
87
88        // Simulate command execution
89        let start = Instant::now();
90
91        // In a real implementation, this would use SSH
92        // For now, we simulate based on command content
93        let (exit_code, stdout, stderr) = self.simulate_command(command, &config);
94
95        let duration_ms = start.elapsed().as_millis() as u64;
96
97        let result = CommandResult {
98            host: host_key.to_string(),
99            exit_code,
100            stdout,
101            stderr,
102            duration_ms,
103        };
104
105        // Update host state
106        if let Some(state) = self.hosts.get_mut(host_key) {
107            if result.success() {
108                state.record_success(duration_ms);
109            } else {
110                state.record_failure(&result.stderr);
111            }
112        }
113
114        // Add to history
115        self.add_to_history(result.clone());
116
117        if result.success() {
118            Ok(result)
119        } else {
120            Err(RemoteError::CommandFailed {
121                host: host_key.to_string(),
122                exit_code: result.exit_code,
123                stderr: result.stderr,
124            })
125        }
126    }
127
128    /// Simulate command execution (placeholder for real SSH)
129    fn simulate_command(&self, command: &str, config: &HostConfig) -> (i32, String, String) {
130        // Check for health check command
131        if command.contains("echo") && command.contains("health") {
132            return (0, "OK".to_string(), String::new());
133        }
134
135        // Check for cbtop benchmark command
136        if command.contains("cbtop") && command.contains("--json") {
137            let arch = config.architecture.as_deref().unwrap_or("x86_64");
138            let json = format!(
139                r#"{{"host":"{}","arch":"{}","throughput":1000000,"latency_p50":50,"latency_p99":200,"memory":1073741824}}"#,
140                config.host, arch
141            );
142            return (0, json, String::new());
143        }
144
145        // Unknown command - simulate failure
146        (1, String::new(), "Unknown command".to_string())
147    }
148
149    /// Execute a command on all available hosts
150    pub fn execute_on_all(&mut self, command: &str) -> Vec<Result<CommandResult, RemoteError>> {
151        let host_keys: Vec<String> = self
152            .available_hosts()
153            .map(|h| format!("{}:{}", h.config.host, h.config.port))
154            .collect();
155
156        let mut results = Vec::new();
157        for key in host_keys {
158            results.push(self.execute_on_host(&key, command));
159        }
160        results
161    }
162
163    /// Run a health check on all hosts
164    pub fn health_check(&mut self) -> HashMap<String, HostHealth> {
165        let host_keys: Vec<String> = self.hosts.keys().cloned().collect();
166        let mut results = HashMap::new();
167
168        for key in host_keys {
169            let _ = self.execute_on_host(&key, "echo health");
170            if let Some(state) = self.hosts.get(&key) {
171                results.insert(key, state.health.clone());
172            }
173        }
174
175        results
176    }
177
178    /// Collect benchmarks from all available hosts
179    pub fn collect_benchmarks(&mut self) -> RemoteResult<AggregatedResult> {
180        let start = Instant::now();
181        let command = format!("{} benchmark --json", self.config.remote_binary_path);
182
183        let results = self.execute_on_all(&command);
184
185        let mut host_benchmarks = Vec::new();
186        let mut failures = Vec::new();
187
188        for result in results {
189            match result {
190                Ok(cmd_result) => {
191                    if let Some(benchmark) = self.parse_benchmark_json(&cmd_result) {
192                        host_benchmarks.push(benchmark);
193                    }
194                }
195                Err(e) => {
196                    failures.push(e.to_string());
197                }
198            }
199        }
200
201        if host_benchmarks.is_empty() {
202            return Err(RemoteError::AllHostsFailed { failures });
203        }
204
205        // Aggregate results
206        let aggregated = self.aggregate_results(&host_benchmarks, failures.len());
207
208        Ok(AggregatedResult {
209            host_results: host_benchmarks,
210            throughput_geomean: aggregated.0,
211            latency_p50_mean_us: aggregated.1,
212            latency_p99_max_us: aggregated.2,
213            hosts_succeeded: aggregated.3,
214            hosts_failed: aggregated.4,
215            collection_time_ms: start.elapsed().as_millis() as u64,
216        })
217    }
218
219    /// Parse benchmark JSON output
220    fn parse_benchmark_json(&self, result: &CommandResult) -> Option<HostBenchmark> {
221        let stdout = &result.stdout;
222
223        let host = json::extract_json_string(stdout, "host")?;
224        let arch =
225            json::extract_json_string(stdout, "arch").unwrap_or_else(|| "unknown".to_string());
226        let throughput = json::extract_json_number(stdout, "throughput")?;
227        let latency_p50 = json::extract_json_number(stdout, "latency_p50")?;
228        let latency_p99 = json::extract_json_number(stdout, "latency_p99")?;
229        let memory = json::extract_json_number(stdout, "memory")? as u64;
230
231        Some(HostBenchmark {
232            host,
233            architecture: arch,
234            throughput_ops: throughput,
235            latency_p50_us: latency_p50,
236            latency_p99_us: latency_p99,
237            memory_bytes: memory,
238            gpu_utilization: None,
239            timestamp_ns: std::time::SystemTime::now()
240                .duration_since(std::time::UNIX_EPOCH)
241                .map(|d| d.as_nanos() as u64)
242                .unwrap_or(0),
243        })
244    }
245
246    /// Aggregate benchmark results based on strategy
247    pub(crate) fn aggregate_results(
248        &self,
249        benchmarks: &[HostBenchmark],
250        failure_count: usize,
251    ) -> (f64, f64, f64, usize, usize) {
252        if benchmarks.is_empty() {
253            return (0.0, 0.0, 0.0, 0, failure_count);
254        }
255
256        let (throughput, latency_p50, latency_p99) =
257            metrics::compute_metrics(benchmarks, self.config.aggregation);
258
259        (
260            throughput,
261            latency_p50,
262            latency_p99,
263            benchmarks.len(),
264            failure_count,
265        )
266    }
267
268    /// Add result to history with size limit
269    fn add_to_history(&mut self, result: CommandResult) {
270        self.history.push(result);
271        if self.history.len() > self.max_history {
272            self.history.remove(0);
273        }
274    }
275
276    /// Get command history
277    pub fn history(&self) -> &[CommandResult] {
278        &self.history
279    }
280
281    /// Get configuration
282    pub fn config(&self) -> &RemoteAgentConfig {
283        &self.config
284    }
285
286    /// Filter hosts by a predicate on their config.
287    fn filter_hosts(&self, pred: impl Fn(&HostConfig) -> bool) -> Vec<&HostState> {
288        self.hosts.values().filter(|h| pred(&h.config)).collect()
289    }
290
291    /// Filter hosts by label
292    pub fn hosts_with_label(&self, key: &str, value: &str) -> Vec<&HostState> {
293        self.filter_hosts(|c| c.labels.get(key).map(|v| v == value).unwrap_or(false))
294    }
295
296    /// Filter hosts by architecture
297    pub fn hosts_with_arch(&self, arch: &str) -> Vec<&HostState> {
298        self.filter_hosts(|c| c.architecture.as_deref() == Some(arch))
299    }
300}