1use std::collections::HashMap;
4use std::time::Instant;
5
6use super::json;
7use super::metrics;
8use super::types::{
9 AggregatedResult, CommandResult, HostBenchmark, HostConfig, HostHealth, HostState,
10 RemoteAgentConfig, RemoteError, RemoteResult,
11};
12
13#[derive(Debug)]
15pub struct RemoteAgent {
16 config: RemoteAgentConfig,
18 hosts: HashMap<String, HostState>,
20 history: Vec<CommandResult>,
22 pub(crate) max_history: usize,
24}
25
26impl RemoteAgent {
27 pub fn new(config: RemoteAgentConfig) -> Self {
29 Self {
30 config,
31 hosts: HashMap::new(),
32 history: Vec::new(),
33 max_history: 1000,
34 }
35 }
36
37 pub fn add_host(&mut self, config: HostConfig) {
39 let key = format!("{}:{}", config.host, config.port);
40 self.hosts.insert(key, HostState::new(config));
41 }
42
43 pub fn remove_host(&mut self, host: &str) -> Option<HostState> {
45 self.hosts
47 .remove(host)
48 .or_else(|| self.hosts.remove(&format!("{}:22", host)))
49 }
50
51 pub fn hosts(&self) -> impl Iterator<Item = &HostState> {
53 self.hosts.values()
54 }
55
56 pub fn available_hosts(&self) -> impl Iterator<Item = &HostState> {
58 self.hosts.values().filter(|h| h.is_available())
59 }
60
61 pub fn host_count(&self) -> usize {
63 self.hosts.len()
64 }
65
66 pub fn available_count(&self) -> usize {
68 self.available_hosts().count()
69 }
70
71 pub fn execute_on_host(
73 &mut self,
74 host_key: &str,
75 command: &str,
76 ) -> RemoteResult<CommandResult> {
77 let config = {
79 let state = self
80 .hosts
81 .get(host_key)
82 .ok_or_else(|| RemoteError::HostNotFound {
83 host: host_key.to_string(),
84 })?;
85 state.config.clone()
86 };
87
88 let start = Instant::now();
90
91 let (exit_code, stdout, stderr) = self.simulate_command(command, &config);
94
95 let duration_ms = start.elapsed().as_millis() as u64;
96
97 let result = CommandResult {
98 host: host_key.to_string(),
99 exit_code,
100 stdout,
101 stderr,
102 duration_ms,
103 };
104
105 if let Some(state) = self.hosts.get_mut(host_key) {
107 if result.success() {
108 state.record_success(duration_ms);
109 } else {
110 state.record_failure(&result.stderr);
111 }
112 }
113
114 self.add_to_history(result.clone());
116
117 if result.success() {
118 Ok(result)
119 } else {
120 Err(RemoteError::CommandFailed {
121 host: host_key.to_string(),
122 exit_code: result.exit_code,
123 stderr: result.stderr,
124 })
125 }
126 }
127
128 fn simulate_command(&self, command: &str, config: &HostConfig) -> (i32, String, String) {
130 if command.contains("echo") && command.contains("health") {
132 return (0, "OK".to_string(), String::new());
133 }
134
135 if command.contains("cbtop") && command.contains("--json") {
137 let arch = config.architecture.as_deref().unwrap_or("x86_64");
138 let json = format!(
139 r#"{{"host":"{}","arch":"{}","throughput":1000000,"latency_p50":50,"latency_p99":200,"memory":1073741824}}"#,
140 config.host, arch
141 );
142 return (0, json, String::new());
143 }
144
145 (1, String::new(), "Unknown command".to_string())
147 }
148
149 pub fn execute_on_all(&mut self, command: &str) -> Vec<Result<CommandResult, RemoteError>> {
151 let host_keys: Vec<String> = self
152 .available_hosts()
153 .map(|h| format!("{}:{}", h.config.host, h.config.port))
154 .collect();
155
156 let mut results = Vec::new();
157 for key in host_keys {
158 results.push(self.execute_on_host(&key, command));
159 }
160 results
161 }
162
163 pub fn health_check(&mut self) -> HashMap<String, HostHealth> {
165 let host_keys: Vec<String> = self.hosts.keys().cloned().collect();
166 let mut results = HashMap::new();
167
168 for key in host_keys {
169 let _ = self.execute_on_host(&key, "echo health");
170 if let Some(state) = self.hosts.get(&key) {
171 results.insert(key, state.health.clone());
172 }
173 }
174
175 results
176 }
177
178 pub fn collect_benchmarks(&mut self) -> RemoteResult<AggregatedResult> {
180 let start = Instant::now();
181 let command = format!("{} benchmark --json", self.config.remote_binary_path);
182
183 let results = self.execute_on_all(&command);
184
185 let mut host_benchmarks = Vec::new();
186 let mut failures = Vec::new();
187
188 for result in results {
189 match result {
190 Ok(cmd_result) => {
191 if let Some(benchmark) = self.parse_benchmark_json(&cmd_result) {
192 host_benchmarks.push(benchmark);
193 }
194 }
195 Err(e) => {
196 failures.push(e.to_string());
197 }
198 }
199 }
200
201 if host_benchmarks.is_empty() {
202 return Err(RemoteError::AllHostsFailed { failures });
203 }
204
205 let aggregated = self.aggregate_results(&host_benchmarks, failures.len());
207
208 Ok(AggregatedResult {
209 host_results: host_benchmarks,
210 throughput_geomean: aggregated.0,
211 latency_p50_mean_us: aggregated.1,
212 latency_p99_max_us: aggregated.2,
213 hosts_succeeded: aggregated.3,
214 hosts_failed: aggregated.4,
215 collection_time_ms: start.elapsed().as_millis() as u64,
216 })
217 }
218
219 fn parse_benchmark_json(&self, result: &CommandResult) -> Option<HostBenchmark> {
221 let stdout = &result.stdout;
222
223 let host = json::extract_json_string(stdout, "host")?;
224 let arch =
225 json::extract_json_string(stdout, "arch").unwrap_or_else(|| "unknown".to_string());
226 let throughput = json::extract_json_number(stdout, "throughput")?;
227 let latency_p50 = json::extract_json_number(stdout, "latency_p50")?;
228 let latency_p99 = json::extract_json_number(stdout, "latency_p99")?;
229 let memory = json::extract_json_number(stdout, "memory")? as u64;
230
231 Some(HostBenchmark {
232 host,
233 architecture: arch,
234 throughput_ops: throughput,
235 latency_p50_us: latency_p50,
236 latency_p99_us: latency_p99,
237 memory_bytes: memory,
238 gpu_utilization: None,
239 timestamp_ns: std::time::SystemTime::now()
240 .duration_since(std::time::UNIX_EPOCH)
241 .map(|d| d.as_nanos() as u64)
242 .unwrap_or(0),
243 })
244 }
245
246 pub(crate) fn aggregate_results(
248 &self,
249 benchmarks: &[HostBenchmark],
250 failure_count: usize,
251 ) -> (f64, f64, f64, usize, usize) {
252 if benchmarks.is_empty() {
253 return (0.0, 0.0, 0.0, 0, failure_count);
254 }
255
256 let (throughput, latency_p50, latency_p99) =
257 metrics::compute_metrics(benchmarks, self.config.aggregation);
258
259 (
260 throughput,
261 latency_p50,
262 latency_p99,
263 benchmarks.len(),
264 failure_count,
265 )
266 }
267
268 fn add_to_history(&mut self, result: CommandResult) {
270 self.history.push(result);
271 if self.history.len() > self.max_history {
272 self.history.remove(0);
273 }
274 }
275
276 pub fn history(&self) -> &[CommandResult] {
278 &self.history
279 }
280
281 pub fn config(&self) -> &RemoteAgentConfig {
283 &self.config
284 }
285
286 fn filter_hosts(&self, pred: impl Fn(&HostConfig) -> bool) -> Vec<&HostState> {
288 self.hosts.values().filter(|h| pred(&h.config)).collect()
289 }
290
291 pub fn hosts_with_label(&self, key: &str, value: &str) -> Vec<&HostState> {
293 self.filter_hosts(|c| c.labels.get(key).map(|v| v == value).unwrap_or(false))
294 }
295
296 pub fn hosts_with_arch(&self, arch: &str) -> Vec<&HostState> {
298 self.filter_hosts(|c| c.architecture.as_deref() == Some(arch))
299 }
300}