1use serde::{Deserialize, Serialize};
47use std::io::{Read, Write};
48use std::net::TcpStream;
49use std::path::PathBuf;
50use std::sync::atomic::{AtomicBool, Ordering};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53
54pub mod frame_kind {
56 pub const AGENT_HELLO: u8 = 0x10;
57 pub const AGENT_HELLO_ACK: u8 = 0x11;
58 pub const FIRE: u8 = 0x12;
59 pub const METRICS: u8 = 0x13;
60 pub const TERMINATE: u8 = 0x14;
61 pub const TERM_ACK: u8 = 0x15;
62 pub const SHUTDOWN: u8 = 0x16;
63 pub const STATUS: u8 = 0x17;
64 pub const STATUS_RESP: u8 = 0x18;
65 pub const ERROR: u8 = 0xFF;
66}
67
68pub const AGENT_PROTO_VERSION: u32 = 1;
69
70#[derive(Debug, Clone, Serialize, Deserialize, Default)]
72pub struct AgentConfig {
73 #[serde(default)]
74 pub controller: ControllerConfig,
75 #[serde(default)]
76 pub limits: LimitsConfig,
77 #[serde(default)]
78 pub agent: AgentIdentity,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ControllerConfig {
83 #[serde(default = "default_host")]
84 pub host: String,
85 #[serde(default = "default_port")]
86 pub port: u16,
87}
88
89fn default_host() -> String {
90 "localhost".to_string()
91}
92fn default_port() -> u16 {
93 9999
94}
95
96impl Default for ControllerConfig {
97 fn default() -> Self {
98 Self {
99 host: default_host(),
100 port: default_port(),
101 }
102 }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct LimitsConfig {
107 #[serde(default = "default_max_temp")]
108 pub max_temp: u32,
109 #[serde(default = "default_max_duration")]
110 pub max_duration: u64,
111}
112
113fn default_max_temp() -> u32 {
114 85
115}
116fn default_max_duration() -> u64 {
117 3600
118}
119
120impl Default for LimitsConfig {
121 fn default() -> Self {
122 Self {
123 max_temp: default_max_temp(),
124 max_duration: default_max_duration(),
125 }
126 }
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize, Default)]
130pub struct AgentIdentity {
131 #[serde(default)]
132 pub name: Option<String>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct AgentHello {
138 pub proto_version: u32,
139 pub stryke_version: String,
140 pub hostname: String,
141 pub cores: usize,
142 pub memory_bytes: u64,
143 pub agent_name: Option<String>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct AgentHelloAck {
149 pub session_id: u64,
150 pub accepted: bool,
151 pub message: String,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct FireCommand {
157 pub workload: WorkloadType,
158 pub duration_secs: f64,
159 pub intensity: f64, }
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub enum WorkloadType {
164 Cpu,
165 Memory { bytes: u64 },
166 Io { dir: String, iterations: u64 },
167 Combined,
168 Custom { code: String },
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct AgentMetrics {
174 pub cpu_percent: f64,
175 pub memory_used: u64,
176 pub hashes_per_sec: u64,
177 pub elapsed_secs: f64,
178 pub state: AgentState,
179}
180
181#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
182pub enum AgentState {
183 Idle,
184 Armed,
185 Firing,
186 Terminated,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TermAck {
192 pub total_hashes: u64,
193 pub total_duration: f64,
194 pub peak_cpu: f64,
195}
196
197fn read_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
199 let mut len_buf = [0u8; 8];
200 r.read_exact(&mut len_buf)?;
201 let len = u64::from_le_bytes(len_buf) as usize;
202 if len < 1 {
203 return Err(std::io::Error::new(
204 std::io::ErrorKind::InvalidData,
205 "empty frame",
206 ));
207 }
208 let mut payload = vec![0u8; len];
209 r.read_exact(&mut payload)?;
210 let kind = payload[0];
211 Ok((kind, payload[1..].to_vec()))
212}
213
214fn write_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
216 let total_len = 1 + payload.len();
217 w.write_all(&(total_len as u64).to_le_bytes())?;
218 w.write_all(&[kind])?;
219 w.write_all(payload)?;
220 w.flush()
221}
222
223pub fn default_config_path() -> PathBuf {
225 dirs::config_dir()
226 .unwrap_or_else(|| PathBuf::from("."))
227 .join("stryke")
228 .join("agent.toml")
229}
230
231pub fn load_config(path: Option<&str>) -> AgentConfig {
233 let config_path = path.map(PathBuf::from).unwrap_or_else(default_config_path);
234
235 if config_path.exists() {
236 match std::fs::read_to_string(&config_path) {
237 Ok(content) => match toml::from_str(&content) {
238 Ok(config) => {
239 eprintln!("stryke agent: loaded config from {}", config_path.display());
240 return config;
241 }
242 Err(e) => {
243 eprintln!(
244 "stryke agent: config parse error {}: {}",
245 config_path.display(),
246 e
247 );
248 }
249 },
250 Err(e) => {
251 eprintln!("stryke agent: cannot read {}: {}", config_path.display(), e);
252 }
253 }
254 }
255
256 eprintln!("stryke agent: using default config (controller=localhost:9999)");
257 AgentConfig::default()
258}
259
260fn get_hostname() -> String {
262 hostname::get()
263 .map(|h| h.to_string_lossy().to_string())
264 .unwrap_or_else(|_| "unknown".to_string())
265}
266
267fn get_cores() -> usize {
269 std::thread::available_parallelism()
270 .map(|p| p.get())
271 .unwrap_or(1)
272}
273
274fn get_memory() -> u64 {
276 16 * 1024 * 1024 * 1024 }
280
281fn run_workload(
283 workload: &WorkloadType,
284 duration_secs: f64,
285 terminate: Arc<AtomicBool>,
286) -> (u64, f64) {
287 use sha2::{Digest, Sha256};
288 use std::sync::atomic::AtomicU64;
289
290 let start = Instant::now();
291 let duration = Duration::from_secs_f64(duration_secs);
292 let num_cores = std::thread::available_parallelism()
293 .map(|p| p.get())
294 .unwrap_or(1);
295
296 match workload {
297 WorkloadType::Cpu | WorkloadType::Combined => {
298 let total_hashes = AtomicU64::new(0);
299
300 std::thread::scope(|s| {
301 for _ in 0..num_cores {
302 let term = Arc::clone(&terminate);
303 let counter = &total_hashes;
304 s.spawn(move || {
305 let mut local_count: u64 = 0;
306 let mut data = [0u8; 64];
307
308 while start.elapsed() < duration && !term.load(Ordering::Relaxed) {
309 for _ in 0..1000 {
310 let hash = Sha256::digest(data);
311 data[..32].copy_from_slice(&hash);
312 local_count += 1;
313 }
314 }
315
316 counter.fetch_add(local_count, Ordering::Relaxed);
317 });
318 }
319 });
320
321 (
322 total_hashes.load(Ordering::Relaxed),
323 start.elapsed().as_secs_f64(),
324 )
325 }
326 WorkloadType::Memory { bytes } => {
327 let bytes_per_core = *bytes as usize / num_cores;
328
329 std::thread::scope(|s| {
330 for core_id in 0..num_cores {
331 let term = Arc::clone(&terminate);
332 s.spawn(move || {
333 if term.load(Ordering::Relaxed) {
334 return;
335 }
336 let mut buf: Vec<u8> = vec![0u8; bytes_per_core];
337 for i in (0..bytes_per_core).step_by(4096) {
338 if term.load(Ordering::Relaxed) {
339 break;
340 }
341 buf[i] = ((i + core_id) & 0xff) as u8;
342 }
343 std::hint::black_box(&buf);
344 });
345 }
346 });
347
348 (*bytes, start.elapsed().as_secs_f64())
349 }
350 WorkloadType::Io { dir, iterations } => {
351 use std::fs;
352 use std::io::Write as IoWrite;
353
354 let total_bytes = AtomicU64::new(0);
355 let iters_per_core = *iterations as usize / num_cores;
356
357 std::thread::scope(|s| {
358 for core_id in 0..num_cores {
359 let term = Arc::clone(&terminate);
360 let counter = &total_bytes;
361 let dir = dir.clone();
362 s.spawn(move || {
363 let io_data = vec![0xABu8; 1_000_000];
364 for i in 0..iters_per_core {
365 if term.load(Ordering::Relaxed) {
366 break;
367 }
368 let path = format!("{}/stryke_stress_{}_{}", dir, core_id, i);
369 if let Ok(mut f) = fs::File::create(&path) {
370 let _ = f.write_all(&io_data);
371 }
372 let _ = fs::read(&path);
373 let _ = fs::remove_file(&path);
374 counter.fetch_add(io_data.len() as u64, Ordering::Relaxed);
375 }
376 });
377 }
378 });
379
380 (
381 total_bytes.load(Ordering::Relaxed),
382 start.elapsed().as_secs_f64(),
383 )
384 }
385 WorkloadType::Custom { code: _ } => {
386 (0, start.elapsed().as_secs_f64())
388 }
389 }
390}
391
392pub fn run_agent(config_path: Option<&str>) -> i32 {
394 run_agent_with_overrides(config_path, None, None)
395}
396
397pub fn run_agent_with_overrides(
399 config_path: Option<&str>,
400 controller_override: Option<&str>,
401 port_override: Option<u16>,
402) -> i32 {
403 let mut config = load_config(config_path);
404
405 if let Some(host) = controller_override {
406 config.controller.host = host.to_string();
407 }
408 if let Some(port) = port_override {
409 config.controller.port = port;
410 }
411
412 let addr = format!("{}:{}", config.controller.host, config.controller.port);
413
414 eprintln!("stryke agent: connecting to controller at {}", addr);
415
416 let mut stream = match TcpStream::connect(&addr) {
417 Ok(s) => s,
418 Err(e) => {
419 eprintln!("stryke agent: connection failed: {}", e);
420 return 1;
421 }
422 };
423
424 let _ = stream.set_read_timeout(Some(Duration::from_millis(100)));
426
427 let hello = AgentHello {
429 proto_version: AGENT_PROTO_VERSION,
430 stryke_version: env!("CARGO_PKG_VERSION").to_string(),
431 hostname: get_hostname(),
432 cores: get_cores(),
433 memory_bytes: get_memory(),
434 agent_name: config.agent.name.clone(),
435 };
436
437 let hello_bytes = bincode::serialize(&hello).expect("serialize hello");
438 if let Err(e) = write_frame(&mut stream, frame_kind::AGENT_HELLO, &hello_bytes) {
439 eprintln!("stryke agent: failed to send hello: {}", e);
440 return 1;
441 }
442
443 let (kind, payload) = match read_frame(&mut stream) {
445 Ok(f) => f,
446 Err(e) => {
447 eprintln!("stryke agent: failed to read hello ack: {}", e);
448 return 1;
449 }
450 };
451
452 if kind != frame_kind::AGENT_HELLO_ACK {
453 eprintln!("stryke agent: unexpected frame kind: {}", kind);
454 return 1;
455 }
456
457 let ack: AgentHelloAck = match bincode::deserialize(&payload) {
458 Ok(a) => a,
459 Err(e) => {
460 eprintln!("stryke agent: failed to parse hello ack: {}", e);
461 return 1;
462 }
463 };
464
465 if !ack.accepted {
466 eprintln!("stryke agent: rejected by controller: {}", ack.message);
467 return 1;
468 }
469
470 eprintln!(
471 "stryke agent: connected (session_id={}, cores={}, hostname={})",
472 ack.session_id,
473 get_cores(),
474 get_hostname()
475 );
476 eprintln!("stryke agent: awaiting commands...");
477
478 let _ = stream.set_read_timeout(None);
480
481 let terminate = Arc::new(AtomicBool::new(false));
483 #[allow(unused_assignments)]
484 let mut state = AgentState::Idle;
485 let mut session_start: Option<Instant> = None;
486 let mut total_hashes: u64 = 0;
487 let mut peak_cpu: f64 = 0.0;
488
489 loop {
490 let (kind, payload) = match read_frame(&mut stream) {
491 Ok(f) => f,
492 Err(e) => {
493 if e.kind() == std::io::ErrorKind::UnexpectedEof {
494 eprintln!("stryke agent: controller disconnected");
495 } else {
496 eprintln!("stryke agent: read error: {}", e);
497 }
498 break;
499 }
500 };
501
502 match kind {
503 frame_kind::FIRE => {
504 let cmd: FireCommand = match bincode::deserialize(&payload) {
505 Ok(c) => c,
506 Err(e) => {
507 eprintln!("stryke agent: invalid FIRE command: {}", e);
508 continue;
509 }
510 };
511
512 eprintln!(
513 "stryke agent: FIRE received (duration={}s, intensity={})",
514 cmd.duration_secs, cmd.intensity
515 );
516
517 #[allow(unused_assignments)]
518 {
519 state = AgentState::Firing;
520 }
521 session_start = Some(Instant::now());
522 terminate.store(false, Ordering::Relaxed);
523
524 let term_clone = Arc::clone(&terminate);
526 let workload = cmd.workload.clone();
527 let duration = cmd.duration_secs;
528
529 let handle =
530 std::thread::spawn(move || run_workload(&workload, duration, term_clone));
531
532 let (hashes, elapsed) = handle.join().unwrap_or((0, 0.0));
534 total_hashes += hashes;
535
536 let metrics = AgentMetrics {
538 cpu_percent: 100.0, memory_used: 0,
540 hashes_per_sec: if elapsed > 0.0 {
541 (hashes as f64 / elapsed) as u64
542 } else {
543 0
544 },
545 elapsed_secs: elapsed,
546 state: AgentState::Idle,
547 };
548
549 let metrics_bytes = bincode::serialize(&metrics).expect("serialize metrics");
550 let _ = write_frame(&mut stream, frame_kind::METRICS, &metrics_bytes);
551
552 state = AgentState::Idle;
553 eprintln!(
554 "stryke agent: workload complete ({} hashes in {:.2}s)",
555 hashes, elapsed
556 );
557 }
558
559 frame_kind::TERMINATE => {
560 eprintln!("stryke agent: TERMINATE received");
561 terminate.store(true, Ordering::Relaxed);
562
563 let elapsed = session_start
564 .map(|s| s.elapsed().as_secs_f64())
565 .unwrap_or(0.0);
566 let term_ack = TermAck {
567 total_hashes,
568 total_duration: elapsed,
569 peak_cpu,
570 };
571
572 let ack_bytes = bincode::serialize(&term_ack).expect("serialize term_ack");
573 let _ = write_frame(&mut stream, frame_kind::TERM_ACK, &ack_bytes);
574
575 state = AgentState::Idle;
576 total_hashes = 0;
577 peak_cpu = 0.0;
578 session_start = None;
579 }
580
581 frame_kind::STATUS => {
582 let metrics = AgentMetrics {
583 cpu_percent: if state == AgentState::Firing {
584 100.0
585 } else {
586 0.0
587 },
588 memory_used: 0,
589 hashes_per_sec: 0,
590 elapsed_secs: session_start
591 .map(|s| s.elapsed().as_secs_f64())
592 .unwrap_or(0.0),
593 state,
594 };
595
596 let metrics_bytes = bincode::serialize(&metrics).expect("serialize metrics");
597 let _ = write_frame(&mut stream, frame_kind::STATUS_RESP, &metrics_bytes);
598 }
599
600 frame_kind::SHUTDOWN => {
601 eprintln!("stryke agent: SHUTDOWN received, exiting");
602 terminate.store(true, Ordering::Relaxed);
603 break;
604 }
605
606 _ => {
607 eprintln!("stryke agent: unknown frame kind: {}", kind);
608 }
609 }
610 }
611
612 eprintln!("stryke agent: disconnected");
613 0
614}
615
616pub fn print_help() {
618 println!("stryke agent — Distributed load testing agent");
619 println!();
620 println!("USAGE:");
621 println!(" stryke agent [OPTIONS]");
622 println!();
623 println!("OPTIONS:");
624 println!(" -c, --config PATH Config file (default: ~/.config/stryke/agent.toml)");
625 println!(" --controller HOST Controller address (overrides config)");
626 println!(" --port PORT Controller port (overrides config)");
627 println!(" --help Print this help");
628 println!();
629 println!("CONFIG FILE:");
630 println!(" ~/.config/stryke/agent.toml");
631 println!();
632 println!(" [controller]");
633 println!(" host = \"controller.example.com\"");
634 println!(" port = 9999");
635 println!();
636 println!(" [limits]");
637 println!(" max_temp = 85");
638 println!(" max_duration = 3600");
639 println!();
640 println!(" [agent]");
641 println!(" name = \"node-01\"");
642 println!();
643 println!("EXAMPLE:");
644 println!(" stryke agent # use config file");
645 println!(" stryke agent --controller 10.0.0.1 # connect to specific host");
646}