Skip to main content

stryke/
agent.rs

1//! `stryke agent` — Persistent load testing agent for distributed stress testing.
2//!
3//! ## Overview
4//!
5//! The agent runs as a daemon, connects to a controller via TCP, and awaits commands.
6//! When the controller sends a FIRE command, the agent executes stress workloads until
7//! TERMINATE is received. Designed for enterprise load testing of distributed clusters.
8//!
9//! ## Config file
10//!
11//! Default: `~/.config/stryke/agent.toml`
12//!
13//! ```toml
14//! [controller]
15//! host = "controller.example.com"
16//! port = 9999
17//!
18//! [limits]
19//! max_temp = 85       # auto-terminate if CPU temp exceeds (Celsius)
20//! max_duration = 3600 # max seconds per stress session
21//!
22//! [agent]
23//! name = "node-01"    # optional, defaults to hostname
24//! ```
25//!
26//! ## Wire protocol
27//!
28//! Same framing as remote_wire: `[u64 LE length][u8 kind][bincode payload]`
29//!
30//! ```text
31//! controller                      agent
32//!     │                             │
33//!     │◄──── AGENT_HELLO ───────────│  (hostname, cores, memory)
34//!     │───── AGENT_HELLO_ACK ──────►│  (session_id, config overrides)
35//!     │                             │
36//!     │───── FIRE ─────────────────►│  (workload type, duration, intensity)
37//!     │◄──── METRICS ───────────────│  (cpu%, temp, memory, hashes/sec)
38//!     │◄──── METRICS ───────────────│
39//!     │───── TERMINATE ────────────►│
40//!     │◄──── TERM_ACK ──────────────│  (final stats)
41//!     │                             │
42//!     │───── SHUTDOWN ─────────────►│
43//!     │                             └─ exit 0
44//! ```
45
46use serde::{Deserialize, Serialize};
47use std::io::{Read, Write};
48use std::net::TcpStream;
49use std::path::PathBuf;
50use std::sync::atomic::{AtomicBool, Ordering};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53
54/// Agent protocol frame kinds
55pub mod frame_kind {
56    pub const AGENT_HELLO: u8 = 0x10;
57    pub const AGENT_HELLO_ACK: u8 = 0x11;
58    pub const FIRE: u8 = 0x12;
59    pub const METRICS: u8 = 0x13;
60    pub const TERMINATE: u8 = 0x14;
61    pub const TERM_ACK: u8 = 0x15;
62    pub const SHUTDOWN: u8 = 0x16;
63    pub const STATUS: u8 = 0x17;
64    pub const STATUS_RESP: u8 = 0x18;
65    pub const ERROR: u8 = 0xFF;
66}
67
68pub const AGENT_PROTO_VERSION: u32 = 1;
69
70/// Agent configuration (from TOML file)
71#[derive(Debug, Clone, Serialize, Deserialize, Default)]
72pub struct AgentConfig {
73    #[serde(default)]
74    pub controller: ControllerConfig,
75    #[serde(default)]
76    pub limits: LimitsConfig,
77    #[serde(default)]
78    pub agent: AgentIdentity,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ControllerConfig {
83    #[serde(default = "default_host")]
84    pub host: String,
85    #[serde(default = "default_port")]
86    pub port: u16,
87}
88
89fn default_host() -> String {
90    "localhost".to_string()
91}
92fn default_port() -> u16 {
93    9999
94}
95
96impl Default for ControllerConfig {
97    fn default() -> Self {
98        Self {
99            host: default_host(),
100            port: default_port(),
101        }
102    }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct LimitsConfig {
107    #[serde(default = "default_max_temp")]
108    pub max_temp: u32,
109    #[serde(default = "default_max_duration")]
110    pub max_duration: u64,
111}
112
113fn default_max_temp() -> u32 {
114    85
115}
116fn default_max_duration() -> u64 {
117    3600
118}
119
120impl Default for LimitsConfig {
121    fn default() -> Self {
122        Self {
123            max_temp: default_max_temp(),
124            max_duration: default_max_duration(),
125        }
126    }
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize, Default)]
130pub struct AgentIdentity {
131    #[serde(default)]
132    pub name: Option<String>,
133}
134
135/// Hello message from agent to controller
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct AgentHello {
138    pub proto_version: u32,
139    pub stryke_version: String,
140    pub hostname: String,
141    pub cores: usize,
142    pub memory_bytes: u64,
143    pub agent_name: Option<String>,
144}
145
146/// Acknowledgment from controller
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct AgentHelloAck {
149    pub session_id: u64,
150    pub accepted: bool,
151    pub message: String,
152}
153
154/// Fire command — start stress test
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct FireCommand {
157    pub workload: WorkloadType,
158    pub duration_secs: f64,
159    pub intensity: f64, // 0.0-1.0, percentage of cores to use
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub enum WorkloadType {
164    Cpu,
165    Memory { bytes: u64 },
166    Io { dir: String, iterations: u64 },
167    Combined,
168    Custom { code: String },
169}
170
171/// Metrics report from agent
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct AgentMetrics {
174    pub cpu_percent: f64,
175    pub memory_used: u64,
176    pub hashes_per_sec: u64,
177    pub elapsed_secs: f64,
178    pub state: AgentState,
179}
180
181#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
182pub enum AgentState {
183    Idle,
184    Armed,
185    Firing,
186    Terminated,
187}
188
189/// Termination acknowledgment with final stats
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TermAck {
192    pub total_hashes: u64,
193    pub total_duration: f64,
194    pub peak_cpu: f64,
195}
196
197/// Read a framed message from a stream
198fn read_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
199    let mut len_buf = [0u8; 8];
200    r.read_exact(&mut len_buf)?;
201    let len = u64::from_le_bytes(len_buf) as usize;
202    if len < 1 {
203        return Err(std::io::Error::new(
204            std::io::ErrorKind::InvalidData,
205            "empty frame",
206        ));
207    }
208    let mut payload = vec![0u8; len];
209    r.read_exact(&mut payload)?;
210    let kind = payload[0];
211    Ok((kind, payload[1..].to_vec()))
212}
213
214/// Write a framed message to a stream
215fn write_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
216    let total_len = 1 + payload.len();
217    w.write_all(&(total_len as u64).to_le_bytes())?;
218    w.write_all(&[kind])?;
219    w.write_all(payload)?;
220    w.flush()
221}
222
223/// Get default config path
224pub fn default_config_path() -> PathBuf {
225    dirs::config_dir()
226        .unwrap_or_else(|| PathBuf::from("."))
227        .join("stryke")
228        .join("agent.toml")
229}
230
231/// Load config from file or return defaults
232pub fn load_config(path: Option<&str>) -> AgentConfig {
233    let config_path = path.map(PathBuf::from).unwrap_or_else(default_config_path);
234
235    if config_path.exists() {
236        match std::fs::read_to_string(&config_path) {
237            Ok(content) => match toml::from_str(&content) {
238                Ok(config) => {
239                    eprintln!("stryke agent: loaded config from {}", config_path.display());
240                    return config;
241                }
242                Err(e) => {
243                    eprintln!(
244                        "stryke agent: config parse error {}: {}",
245                        config_path.display(),
246                        e
247                    );
248                }
249            },
250            Err(e) => {
251                eprintln!("stryke agent: cannot read {}: {}", config_path.display(), e);
252            }
253        }
254    }
255
256    eprintln!("stryke agent: using default config (controller=localhost:9999)");
257    AgentConfig::default()
258}
259
260/// Get system hostname
261fn get_hostname() -> String {
262    hostname::get()
263        .map(|h| h.to_string_lossy().to_string())
264        .unwrap_or_else(|_| "unknown".to_string())
265}
266
267/// Get CPU core count
268fn get_cores() -> usize {
269    std::thread::available_parallelism()
270        .map(|p| p.get())
271        .unwrap_or(1)
272}
273
274/// Get total system memory (approximate)
275fn get_memory() -> u64 {
276    // Simple heuristic — real implementation would use sysinfo crate
277    // For now, return a placeholder based on typical server memory
278    16 * 1024 * 1024 * 1024 // 16GB default
279}
280
281/// Run the stress workload — pins ALL cores to 100% TDP
282fn run_workload(
283    workload: &WorkloadType,
284    duration_secs: f64,
285    terminate: Arc<AtomicBool>,
286) -> (u64, f64) {
287    use sha2::{Digest, Sha256};
288    use std::sync::atomic::AtomicU64;
289
290    let start = Instant::now();
291    let duration = Duration::from_secs_f64(duration_secs);
292    let num_cores = std::thread::available_parallelism()
293        .map(|p| p.get())
294        .unwrap_or(1);
295
296    match workload {
297        WorkloadType::Cpu | WorkloadType::Combined => {
298            let total_hashes = AtomicU64::new(0);
299
300            std::thread::scope(|s| {
301                for _ in 0..num_cores {
302                    let term = Arc::clone(&terminate);
303                    let counter = &total_hashes;
304                    s.spawn(move || {
305                        let mut local_count: u64 = 0;
306                        let mut data = [0u8; 64];
307
308                        while start.elapsed() < duration && !term.load(Ordering::Relaxed) {
309                            for _ in 0..1000 {
310                                let hash = Sha256::digest(data);
311                                data[..32].copy_from_slice(&hash);
312                                local_count += 1;
313                            }
314                        }
315
316                        counter.fetch_add(local_count, Ordering::Relaxed);
317                    });
318                }
319            });
320
321            (
322                total_hashes.load(Ordering::Relaxed),
323                start.elapsed().as_secs_f64(),
324            )
325        }
326        WorkloadType::Memory { bytes } => {
327            let bytes_per_core = *bytes as usize / num_cores;
328
329            std::thread::scope(|s| {
330                for core_id in 0..num_cores {
331                    let term = Arc::clone(&terminate);
332                    s.spawn(move || {
333                        if term.load(Ordering::Relaxed) {
334                            return;
335                        }
336                        let mut buf: Vec<u8> = vec![0u8; bytes_per_core];
337                        for i in (0..bytes_per_core).step_by(4096) {
338                            if term.load(Ordering::Relaxed) {
339                                break;
340                            }
341                            buf[i] = ((i + core_id) & 0xff) as u8;
342                        }
343                        std::hint::black_box(&buf);
344                    });
345                }
346            });
347
348            (*bytes, start.elapsed().as_secs_f64())
349        }
350        WorkloadType::Io { dir, iterations } => {
351            use std::fs;
352            use std::io::Write as IoWrite;
353
354            let total_bytes = AtomicU64::new(0);
355            let iters_per_core = *iterations as usize / num_cores;
356
357            std::thread::scope(|s| {
358                for core_id in 0..num_cores {
359                    let term = Arc::clone(&terminate);
360                    let counter = &total_bytes;
361                    let dir = dir.clone();
362                    s.spawn(move || {
363                        let io_data = vec![0xABu8; 1_000_000];
364                        for i in 0..iters_per_core {
365                            if term.load(Ordering::Relaxed) {
366                                break;
367                            }
368                            let path = format!("{}/stryke_stress_{}_{}", dir, core_id, i);
369                            if let Ok(mut f) = fs::File::create(&path) {
370                                let _ = f.write_all(&io_data);
371                            }
372                            let _ = fs::read(&path);
373                            let _ = fs::remove_file(&path);
374                            counter.fetch_add(io_data.len() as u64, Ordering::Relaxed);
375                        }
376                    });
377                }
378            });
379
380            (
381                total_bytes.load(Ordering::Relaxed),
382                start.elapsed().as_secs_f64(),
383            )
384        }
385        WorkloadType::Custom { code: _ } => {
386            // TODO: execute custom stryke code
387            (0, start.elapsed().as_secs_f64())
388        }
389    }
390}
391
392/// Main agent loop
393pub fn run_agent(config_path: Option<&str>) -> i32 {
394    run_agent_with_overrides(config_path, None, None)
395}
396
397/// Main agent loop with CLI overrides
398pub fn run_agent_with_overrides(
399    config_path: Option<&str>,
400    controller_override: Option<&str>,
401    port_override: Option<u16>,
402) -> i32 {
403    let mut config = load_config(config_path);
404
405    if let Some(host) = controller_override {
406        config.controller.host = host.to_string();
407    }
408    if let Some(port) = port_override {
409        config.controller.port = port;
410    }
411
412    let addr = format!("{}:{}", config.controller.host, config.controller.port);
413
414    eprintln!("stryke agent: connecting to controller at {}", addr);
415
416    let mut stream = match TcpStream::connect(&addr) {
417        Ok(s) => s,
418        Err(e) => {
419            eprintln!("stryke agent: connection failed: {}", e);
420            return 1;
421        }
422    };
423
424    // Set read timeout for non-blocking checks
425    let _ = stream.set_read_timeout(Some(Duration::from_millis(100)));
426
427    // Send AGENT_HELLO
428    let hello = AgentHello {
429        proto_version: AGENT_PROTO_VERSION,
430        stryke_version: env!("CARGO_PKG_VERSION").to_string(),
431        hostname: get_hostname(),
432        cores: get_cores(),
433        memory_bytes: get_memory(),
434        agent_name: config.agent.name.clone(),
435    };
436
437    let hello_bytes = bincode::serialize(&hello).expect("serialize hello");
438    if let Err(e) = write_frame(&mut stream, frame_kind::AGENT_HELLO, &hello_bytes) {
439        eprintln!("stryke agent: failed to send hello: {}", e);
440        return 1;
441    }
442
443    // Wait for HELLO_ACK
444    let (kind, payload) = match read_frame(&mut stream) {
445        Ok(f) => f,
446        Err(e) => {
447            eprintln!("stryke agent: failed to read hello ack: {}", e);
448            return 1;
449        }
450    };
451
452    if kind != frame_kind::AGENT_HELLO_ACK {
453        eprintln!("stryke agent: unexpected frame kind: {}", kind);
454        return 1;
455    }
456
457    let ack: AgentHelloAck = match bincode::deserialize(&payload) {
458        Ok(a) => a,
459        Err(e) => {
460            eprintln!("stryke agent: failed to parse hello ack: {}", e);
461            return 1;
462        }
463    };
464
465    if !ack.accepted {
466        eprintln!("stryke agent: rejected by controller: {}", ack.message);
467        return 1;
468    }
469
470    eprintln!(
471        "stryke agent: connected (session_id={}, cores={}, hostname={})",
472        ack.session_id,
473        get_cores(),
474        get_hostname()
475    );
476    eprintln!("stryke agent: awaiting commands...");
477
478    // Disable read timeout for blocking reads
479    let _ = stream.set_read_timeout(None);
480
481    // Main command loop
482    let terminate = Arc::new(AtomicBool::new(false));
483    #[allow(unused_assignments)]
484    let mut state = AgentState::Idle;
485    let mut session_start: Option<Instant> = None;
486    let mut total_hashes: u64 = 0;
487    let mut peak_cpu: f64 = 0.0;
488
489    loop {
490        let (kind, payload) = match read_frame(&mut stream) {
491            Ok(f) => f,
492            Err(e) => {
493                if e.kind() == std::io::ErrorKind::UnexpectedEof {
494                    eprintln!("stryke agent: controller disconnected");
495                } else {
496                    eprintln!("stryke agent: read error: {}", e);
497                }
498                break;
499            }
500        };
501
502        match kind {
503            frame_kind::FIRE => {
504                let cmd: FireCommand = match bincode::deserialize(&payload) {
505                    Ok(c) => c,
506                    Err(e) => {
507                        eprintln!("stryke agent: invalid FIRE command: {}", e);
508                        continue;
509                    }
510                };
511
512                eprintln!(
513                    "stryke agent: FIRE received (duration={}s, intensity={})",
514                    cmd.duration_secs, cmd.intensity
515                );
516
517                #[allow(unused_assignments)]
518                {
519                    state = AgentState::Firing;
520                }
521                session_start = Some(Instant::now());
522                terminate.store(false, Ordering::Relaxed);
523
524                // Run workload in a separate thread so we can handle TERMINATE
525                let term_clone = Arc::clone(&terminate);
526                let workload = cmd.workload.clone();
527                let duration = cmd.duration_secs;
528
529                let handle =
530                    std::thread::spawn(move || run_workload(&workload, duration, term_clone));
531
532                // Wait for completion or termination
533                let (hashes, elapsed) = handle.join().unwrap_or((0, 0.0));
534                total_hashes += hashes;
535
536                // Send final metrics
537                let metrics = AgentMetrics {
538                    cpu_percent: 100.0, // Was at max
539                    memory_used: 0,
540                    hashes_per_sec: if elapsed > 0.0 {
541                        (hashes as f64 / elapsed) as u64
542                    } else {
543                        0
544                    },
545                    elapsed_secs: elapsed,
546                    state: AgentState::Idle,
547                };
548
549                let metrics_bytes = bincode::serialize(&metrics).expect("serialize metrics");
550                let _ = write_frame(&mut stream, frame_kind::METRICS, &metrics_bytes);
551
552                state = AgentState::Idle;
553                eprintln!(
554                    "stryke agent: workload complete ({} hashes in {:.2}s)",
555                    hashes, elapsed
556                );
557            }
558
559            frame_kind::TERMINATE => {
560                eprintln!("stryke agent: TERMINATE received");
561                terminate.store(true, Ordering::Relaxed);
562
563                let elapsed = session_start
564                    .map(|s| s.elapsed().as_secs_f64())
565                    .unwrap_or(0.0);
566                let term_ack = TermAck {
567                    total_hashes,
568                    total_duration: elapsed,
569                    peak_cpu,
570                };
571
572                let ack_bytes = bincode::serialize(&term_ack).expect("serialize term_ack");
573                let _ = write_frame(&mut stream, frame_kind::TERM_ACK, &ack_bytes);
574
575                state = AgentState::Idle;
576                total_hashes = 0;
577                peak_cpu = 0.0;
578                session_start = None;
579            }
580
581            frame_kind::STATUS => {
582                let metrics = AgentMetrics {
583                    cpu_percent: if state == AgentState::Firing {
584                        100.0
585                    } else {
586                        0.0
587                    },
588                    memory_used: 0,
589                    hashes_per_sec: 0,
590                    elapsed_secs: session_start
591                        .map(|s| s.elapsed().as_secs_f64())
592                        .unwrap_or(0.0),
593                    state,
594                };
595
596                let metrics_bytes = bincode::serialize(&metrics).expect("serialize metrics");
597                let _ = write_frame(&mut stream, frame_kind::STATUS_RESP, &metrics_bytes);
598            }
599
600            frame_kind::SHUTDOWN => {
601                eprintln!("stryke agent: SHUTDOWN received, exiting");
602                terminate.store(true, Ordering::Relaxed);
603                break;
604            }
605
606            _ => {
607                eprintln!("stryke agent: unknown frame kind: {}", kind);
608            }
609        }
610    }
611
612    eprintln!("stryke agent: disconnected");
613    0
614}
615
616/// Print agent help
617pub fn print_help() {
618    println!("stryke agent — Distributed load testing agent");
619    println!();
620    println!("USAGE:");
621    println!("    stryke agent [OPTIONS]");
622    println!();
623    println!("OPTIONS:");
624    println!("    -c, --config PATH    Config file (default: ~/.config/stryke/agent.toml)");
625    println!("    --controller HOST    Controller address (overrides config)");
626    println!("    --port PORT          Controller port (overrides config)");
627    println!("    --help               Print this help");
628    println!();
629    println!("CONFIG FILE:");
630    println!("    ~/.config/stryke/agent.toml");
631    println!();
632    println!("    [controller]");
633    println!("    host = \"controller.example.com\"");
634    println!("    port = 9999");
635    println!();
636    println!("    [limits]");
637    println!("    max_temp = 85");
638    println!("    max_duration = 3600");
639    println!();
640    println!("    [agent]");
641    println!("    name = \"node-01\"");
642    println!();
643    println!("EXAMPLE:");
644    println!("    stryke agent                           # use config file");
645    println!("    stryke agent --controller 10.0.0.1     # connect to specific host");
646}