Skip to main content

cbtop/remote_agent/
types.rs

1//! Type definitions for remote SSH/headless agent integration.
2
3use std::collections::HashMap;
4use std::time::Instant;
5
6/// Result type for remote agent operations
7pub type RemoteResult<T> = Result<T, RemoteError>;
8
9/// Errors that can occur during remote operations
10#[derive(Debug, Clone, PartialEq)]
11pub enum RemoteError {
12    /// SSH connection failed
13    ConnectionFailed { host: String, reason: String },
14    /// Authentication failed
15    AuthenticationFailed { host: String },
16    /// Command execution failed
17    CommandFailed {
18        host: String,
19        exit_code: i32,
20        stderr: String,
21    },
22    /// Timeout waiting for response
23    Timeout { host: String, timeout_ms: u64 },
24    /// Host not found in pool
25    HostNotFound { host: String },
26    /// All hosts failed
27    AllHostsFailed { failures: Vec<String> },
28    /// Invalid configuration
29    InvalidConfig { reason: String },
30    /// Result aggregation failed
31    AggregationFailed { reason: String },
32}
33
34impl std::fmt::Display for RemoteError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::ConnectionFailed { host, reason } => {
38                write!(f, "Connection to {} failed: {}", host, reason)
39            }
40            Self::AuthenticationFailed { host } => {
41                write!(f, "Authentication failed for {}", host)
42            }
43            Self::CommandFailed {
44                host,
45                exit_code,
46                stderr,
47            } => {
48                write!(
49                    f,
50                    "Command failed on {} (exit {}): {}",
51                    host, exit_code, stderr
52                )
53            }
54            Self::Timeout { host, timeout_ms } => {
55                write!(f, "Timeout after {}ms waiting for {}", timeout_ms, host)
56            }
57            Self::HostNotFound { host } => {
58                write!(f, "Host {} not found in pool", host)
59            }
60            Self::AllHostsFailed { failures } => {
61                write!(f, "All hosts failed: {:?}", failures)
62            }
63            Self::InvalidConfig { reason } => {
64                write!(f, "Invalid configuration: {}", reason)
65            }
66            Self::AggregationFailed { reason } => {
67                write!(f, "Result aggregation failed: {}", reason)
68            }
69        }
70    }
71}
72
73impl std::error::Error for RemoteError {}
74
75/// SSH authentication method
76#[derive(Debug, Clone)]
77pub enum AuthMethod {
78    /// SSH key-based authentication
79    Key {
80        /// Path to private key file
81        key_path: String,
82        /// Optional passphrase (from environment or keyring, never stored)
83        passphrase_env: Option<String>,
84    },
85    /// SSH agent forwarding
86    Agent,
87    /// Password from environment variable (never stored in config)
88    PasswordEnv {
89        /// Environment variable name containing password
90        env_var: String,
91    },
92}
93
94impl Default for AuthMethod {
95    fn default() -> Self {
96        Self::Agent
97    }
98}
99
100/// Remote host configuration
101#[derive(Debug, Clone)]
102pub struct HostConfig {
103    /// Hostname or IP address
104    pub host: String,
105    /// SSH port (default: 22)
106    pub port: u16,
107    /// Username for SSH connection
108    pub username: String,
109    /// Authentication method
110    pub auth: AuthMethod,
111    /// Connection timeout in milliseconds
112    pub connect_timeout_ms: u64,
113    /// Command execution timeout in milliseconds
114    pub command_timeout_ms: u64,
115    /// Host architecture (x86_64, aarch64, etc.)
116    pub architecture: Option<String>,
117    /// Host labels for grouping
118    pub labels: HashMap<String, String>,
119}
120
121impl HostConfig {
122    /// Create a new host configuration
123    pub fn new(host: impl Into<String>, username: impl Into<String>) -> Self {
124        Self {
125            host: host.into(),
126            port: 22,
127            username: username.into(),
128            auth: AuthMethod::default(),
129            connect_timeout_ms: 10_000,
130            command_timeout_ms: 60_000,
131            architecture: None,
132            labels: HashMap::new(),
133        }
134    }
135
136    /// Set SSH port
137    pub fn with_port(mut self, port: u16) -> Self {
138        self.port = port;
139        self
140    }
141
142    /// Set authentication method
143    pub fn with_auth(mut self, auth: AuthMethod) -> Self {
144        self.auth = auth;
145        self
146    }
147
148    /// Set connection timeout
149    pub fn with_connect_timeout_ms(mut self, timeout_ms: u64) -> Self {
150        self.connect_timeout_ms = timeout_ms;
151        self
152    }
153
154    /// Set command timeout
155    pub fn with_command_timeout_ms(mut self, timeout_ms: u64) -> Self {
156        self.command_timeout_ms = timeout_ms;
157        self
158    }
159
160    /// Set architecture
161    pub fn with_architecture(mut self, arch: impl Into<String>) -> Self {
162        self.architecture = Some(arch.into());
163        self
164    }
165
166    /// Add a label
167    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
168        self.labels.insert(key.into(), value.into());
169        self
170    }
171}
172
173/// Health status of a remote host
174#[derive(Debug, Clone, PartialEq)]
175pub enum HostHealth {
176    /// Host is healthy and responding
177    Healthy,
178    /// Host is degraded (slow responses)
179    Degraded { latency_ms: u64 },
180    /// Host is unreachable
181    Unreachable { last_error: String },
182    /// Host status unknown (never checked)
183    Unknown,
184}
185
186impl Default for HostHealth {
187    fn default() -> Self {
188        Self::Unknown
189    }
190}
191
192/// Remote host state
193#[derive(Debug, Clone)]
194pub struct HostState {
195    /// Host configuration
196    pub config: HostConfig,
197    /// Current health status
198    pub health: HostHealth,
199    /// Last successful connection time
200    pub last_success: Option<Instant>,
201    /// Consecutive failure count
202    pub failure_count: u32,
203    /// Total commands executed
204    pub commands_executed: u64,
205    /// Average latency in milliseconds
206    pub avg_latency_ms: f64,
207}
208
209impl HostState {
210    /// Create new host state from configuration
211    pub fn new(config: HostConfig) -> Self {
212        Self {
213            config,
214            health: HostHealth::Unknown,
215            last_success: None,
216            failure_count: 0,
217            commands_executed: 0,
218            avg_latency_ms: 0.0,
219        }
220    }
221
222    /// Check if host is available for commands
223    pub fn is_available(&self) -> bool {
224        matches!(
225            self.health,
226            HostHealth::Healthy | HostHealth::Degraded { .. } | HostHealth::Unknown
227        )
228    }
229
230    /// Record a successful command execution
231    pub fn record_success(&mut self, latency_ms: u64) {
232        self.last_success = Some(Instant::now());
233        self.failure_count = 0;
234        self.commands_executed += 1;
235
236        // Update running average
237        let n = self.commands_executed as f64;
238        self.avg_latency_ms = self.avg_latency_ms * ((n - 1.0) / n) + (latency_ms as f64) / n;
239
240        // Update health based on latency
241        self.health = if latency_ms > 5000 {
242            HostHealth::Degraded { latency_ms }
243        } else {
244            HostHealth::Healthy
245        };
246    }
247
248    /// Record a failed command
249    pub fn record_failure(&mut self, error: &str) {
250        self.failure_count += 1;
251        if self.failure_count >= 3 {
252            self.health = HostHealth::Unreachable {
253                last_error: error.to_string(),
254            };
255        }
256    }
257}
258
259/// Result from a single remote command
260#[derive(Debug, Clone)]
261pub struct CommandResult {
262    /// Host that executed the command
263    pub host: String,
264    /// Exit code (0 = success)
265    pub exit_code: i32,
266    /// Standard output
267    pub stdout: String,
268    /// Standard error
269    pub stderr: String,
270    /// Execution time in milliseconds
271    pub duration_ms: u64,
272}
273
274impl CommandResult {
275    /// Check if command succeeded
276    pub fn success(&self) -> bool {
277        self.exit_code == 0
278    }
279}
280
281/// Aggregated benchmark result from multiple hosts
282#[derive(Debug, Clone)]
283pub struct AggregatedResult {
284    /// Individual results from each host
285    pub host_results: Vec<HostBenchmark>,
286    /// Aggregated throughput (ops/sec, geometric mean)
287    pub throughput_geomean: f64,
288    /// Aggregated latency p50 (arithmetic mean)
289    pub latency_p50_mean_us: f64,
290    /// Aggregated latency p99 (max across hosts)
291    pub latency_p99_max_us: f64,
292    /// Number of successful hosts
293    pub hosts_succeeded: usize,
294    /// Number of failed hosts
295    pub hosts_failed: usize,
296    /// Total collection time
297    pub collection_time_ms: u64,
298}
299
300impl AggregatedResult {
301    /// Calculate success rate
302    pub fn success_rate(&self) -> f64 {
303        let total = self.hosts_succeeded + self.hosts_failed;
304        if total == 0 {
305            0.0
306        } else {
307            self.hosts_succeeded as f64 / total as f64
308        }
309    }
310}
311
312/// Benchmark result from a single host
313#[derive(Debug, Clone)]
314pub struct HostBenchmark {
315    /// Host identifier
316    pub host: String,
317    /// Host architecture
318    pub architecture: String,
319    /// Throughput in operations per second
320    pub throughput_ops: f64,
321    /// Latency p50 in microseconds
322    pub latency_p50_us: f64,
323    /// Latency p99 in microseconds
324    pub latency_p99_us: f64,
325    /// Memory usage in bytes
326    pub memory_bytes: u64,
327    /// GPU utilization (0.0-1.0)
328    pub gpu_utilization: Option<f64>,
329    /// Collection timestamp
330    pub timestamp_ns: u64,
331}
332
333/// Strategy for aggregating results from multiple hosts
334#[derive(Debug, Clone, Copy, PartialEq, Eq)]
335pub enum AggregationStrategy {
336    /// Use geometric mean for throughput, arithmetic for latency
337    GeometricMean,
338    /// Use median values
339    Median,
340    /// Use minimum values (pessimistic)
341    Minimum,
342    /// Use maximum values (optimistic)
343    Maximum,
344}
345
346impl Default for AggregationStrategy {
347    fn default() -> Self {
348        Self::GeometricMean
349    }
350}
351
352/// Remote agent configuration
353#[derive(Debug, Clone)]
354pub struct RemoteAgentConfig {
355    /// Maximum concurrent connections
356    pub max_concurrent: usize,
357    /// Retry count for failed commands
358    pub retry_count: u32,
359    /// Retry delay in milliseconds
360    pub retry_delay_ms: u64,
361    /// Health check interval in seconds
362    pub health_check_interval_sec: u64,
363    /// Aggregation strategy
364    pub aggregation: AggregationStrategy,
365    /// Path to cbtop binary on remote hosts
366    pub remote_binary_path: String,
367}
368
369impl Default for RemoteAgentConfig {
370    fn default() -> Self {
371        Self {
372            max_concurrent: 10,
373            retry_count: 3,
374            retry_delay_ms: 1000,
375            health_check_interval_sec: 60,
376            aggregation: AggregationStrategy::default(),
377            remote_binary_path: "/usr/local/bin/cbtop".to_string(),
378        }
379    }
380}
381
382/// Default retry delay in milliseconds
383pub const DEFAULT_RETRY_DELAY_MS: u64 = 1000;
384
385/// Default maximum concurrent connections
386pub const DEFAULT_MAX_CONCURRENT: usize = 10;
387
388/// Default health check interval in seconds
389pub const DEFAULT_HEALTH_CHECK_INTERVAL_SEC: u64 = 60;