stryke/controller.rs
1//! `stryke controller` — Interactive REPL for coordinating stress test agents.
2//!
3//! ## Usage
4//!
5//! ```sh
6//! stryke controller # listen on 0.0.0.0:9999
7//! stryke controller --port 8888 # custom port
8//! stryke controller --bind 10.0.0.1 # specific interface
9//! ```
10//!
11//! ## Commands
12//!
13//! - `status` — list connected agents
14//! - `fire [duration]` — start stress test on all agents
15//! - `fire node1,node2 [duration]` — specific agents
16//! - `terminate` — stop stress test
17//! - `shutdown` — disconnect all agents and exit
18//! - `help` — show commands
19
20use std::collections::HashMap;
21use std::io::{Read, Write};
22use std::net::{TcpListener, TcpStream};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, Mutex, OnceLock};
25use std::thread;
26use std::time::Instant;
27
28use crate::agent::{
29 frame_kind, AgentHello, AgentHelloAck, AgentState, EvalCommand, EvalResult, FireCommand,
30 WorkloadType, AGENT_PROTO_VERSION,
31};
32use std::time::Duration;
33
34/// Connected agent state
35struct ConnectedAgent {
36 stream: TcpStream,
37 hostname: String,
38 cores: usize,
39 memory_bytes: u64,
40 agent_name: Option<String>,
41 state: AgentState,
42 #[allow(dead_code)]
43 session_id: u64,
44 connected_at: Instant,
45}
46
47/// Controller state
48pub struct Controller {
49 /// `agents` field.
50 agents: Arc<Mutex<HashMap<u64, ConnectedAgent>>>,
51 /// `next_session_id` field.
52 next_session_id: AtomicU64,
53 /// `running` field.
54 running: AtomicBool,
55 /// Active chants — fired at each new agent registered by accept_loop.
56 /// Lives on Controller (not ControllerHandle) so accept_loop can read
57 /// it via `self` without a separate channel.
58 chants: Arc<Mutex<HashMap<u64, String>>>,
59 /// :cloistered mode — when true, accept_loop rejects agents whose
60 /// AGENT_AUTH frame doesn't carry a token in `auth_tokens`. When
61 /// false (default), agents bypass the AUTH check entirely.
62 cloistered: AtomicBool,
63 /// Valid AUTH tokens for cloistered mode. Populated by
64 /// `set_cloistered(token)` and checked in accept_loop against the
65 /// incoming AGENT_AUTH frame's token field.
66 auth_tokens: Arc<Mutex<std::collections::HashSet<String>>>,
67 /// When true, accept_loop suppresses the per-agent "[agent connected]"
68 /// eprintln. Used during bulk-spawn (large `congregation(N)` /
69 /// `anoint(N)`) where the main thread is in a tight fork loop and a
70 /// concurrent eprintln from this background thread can leave the
71 /// child with a borrowed `std::io::stderr` RefCell — guaranteed
72 /// panic on the child's first stdio call. Toggled via
73 /// [`ControllerHandle::set_quiet_accept`].
74 quiet_accept: AtomicBool,
75}
76
77impl Default for Controller {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl Controller {
84 /// `new` — see implementation.
85 pub fn new() -> Self {
86 Self {
87 agents: Arc::new(Mutex::new(HashMap::new())),
88 next_session_id: AtomicU64::new(1),
89 running: AtomicBool::new(true),
90 chants: Arc::new(Mutex::new(HashMap::new())),
91 cloistered: AtomicBool::new(false),
92 auth_tokens: Arc::new(Mutex::new(std::collections::HashSet::new())),
93 quiet_accept: AtomicBool::new(false),
94 }
95 }
96
97 /// Accept incoming agent connections
98 fn accept_loop(&self, listener: TcpListener) {
99 for stream in listener.incoming() {
100 if !self.running.load(Ordering::Relaxed) {
101 break;
102 }
103
104 match stream {
105 Ok(mut stream) => {
106 let session_id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
107
108 // Read AGENT_HELLO
109 let (kind, payload) = match read_frame(&mut stream) {
110 Ok(f) => f,
111 Err(e) => {
112 eprintln!("controller: failed to read hello: {}", e);
113 continue;
114 }
115 };
116
117 if kind != frame_kind::AGENT_HELLO {
118 eprintln!("controller: expected AGENT_HELLO, got {}", kind);
119 continue;
120 }
121
122 let hello: AgentHello = match bincode::deserialize(&payload) {
123 Ok(h) => h,
124 Err(e) => {
125 eprintln!("controller: invalid hello: {}", e);
126 continue;
127 }
128 };
129
130 if hello.proto_version != AGENT_PROTO_VERSION {
131 let ack = AgentHelloAck {
132 session_id: 0,
133 accepted: false,
134 message: format!(
135 "protocol version mismatch: got {}, expected {}",
136 hello.proto_version, AGENT_PROTO_VERSION
137 ),
138 };
139 let ack_bytes = bincode::serialize(&ack).unwrap();
140 let _ = write_frame(&mut stream, frame_kind::AGENT_HELLO_ACK, &ack_bytes);
141 continue;
142 }
143
144 let name = hello
145 .agent_name
146 .clone()
147 .unwrap_or_else(|| hello.hostname.clone());
148
149 // :cloistered mode — require an AGENT_AUTH frame with
150 // a valid token within 500ms of HELLO. Agents in open
151 // mode don't send AUTH; we'd block forever waiting if
152 // we required it unconditionally, so this read only
153 // happens when cloistered is true. The check happens
154 // BEFORE we send the success ACK so rejected agents
155 // get a single accepted=false ACK, not an accepted=true
156 // followed by a rejection.
157 if self.cloistered.load(Ordering::Relaxed) {
158 let _ = stream.set_read_timeout(Some(Duration::from_millis(500)));
159 let auth_result = read_frame(&mut stream);
160 let _ = stream.set_read_timeout(None);
161 let auth_token: Option<String> = match auth_result {
162 Ok((frame_kind::AGENT_AUTH, payload)) => {
163 bincode::deserialize::<crate::agent::AgentAuth>(&payload)
164 .ok()
165 .map(|a| a.token)
166 }
167 _ => None,
168 };
169 let valid = auth_token
170 .as_ref()
171 .map(|tok| self.auth_tokens.lock().unwrap().contains(tok))
172 .unwrap_or(false);
173 if !valid {
174 eprintln!("[cloistered] rejected agent {} — no/bad AUTH token", name);
175 let rej = AgentHelloAck {
176 session_id: 0,
177 accepted: false,
178 message: "cloistered: missing or invalid AUTH token".into(),
179 };
180 if let Ok(rb) = bincode::serialize(&rej) {
181 let _ = write_frame(&mut stream, frame_kind::AGENT_HELLO_ACK, &rb);
182 }
183 continue;
184 }
185 }
186
187 // Send accepted-HELLO_ACK now that any cloistered check
188 // has passed.
189 let ack = AgentHelloAck {
190 session_id,
191 accepted: true,
192 message: "connected".to_string(),
193 };
194 let ack_bytes = bincode::serialize(&ack).unwrap();
195 if let Err(e) =
196 write_frame(&mut stream, frame_kind::AGENT_HELLO_ACK, &ack_bytes)
197 {
198 eprintln!("controller: failed to send hello ack: {}", e);
199 continue;
200 }
201
202 if !self.quiet_accept.load(Ordering::Relaxed) {
203 eprintln!(
204 "[agent connected] {} (cores={}, session={})",
205 name, hello.cores, session_id
206 );
207 }
208
209 let agent = ConnectedAgent {
210 stream,
211 hostname: hello.hostname,
212 cores: hello.cores,
213 memory_bytes: hello.memory_bytes,
214 agent_name: hello.agent_name,
215 state: AgentState::Idle,
216 session_id,
217 connected_at: Instant::now(),
218 };
219
220 // Insert into roster, then fire any active chants at
221 // this new joiner so late-comers receive the same
222 // ongoing prayers as everyone else.
223 {
224 let mut agents = self.agents.lock().unwrap();
225 agents.insert(session_id, agent);
226 // Drop the lock before issuing chants — fire_chant
227 // reacquires it.
228 }
229 self.fire_chants_at(session_id);
230 }
231 Err(e) => {
232 if self.running.load(Ordering::Relaxed) {
233 eprintln!("controller: accept error: {}", e);
234 }
235 }
236 }
237 }
238 }
239
240 /// Fire every currently-active chant at the named agent. Called by
241 /// `accept_loop` after a new agent is registered so late joiners
242 /// receive the same continuous prayers as everyone else. Errors
243 /// (write failures from a disconnecting agent) are silently swallowed
244 /// — same convention as `scatter`.
245 fn fire_chants_at(&self, session_id: u64) {
246 // Snapshot the chant codes so we don't hold the chants lock while
247 // doing IO under the agents lock.
248 let chant_codes: Vec<String> = {
249 let chants = self.chants.lock().unwrap();
250 chants.values().cloned().collect()
251 };
252 if chant_codes.is_empty() {
253 return;
254 }
255 let mut agents = self.agents.lock().unwrap();
256 let agent = match agents.get_mut(&session_id) {
257 Some(a) => a,
258 None => return,
259 };
260 for code in chant_codes {
261 let cmd = EvalCommand { code };
262 if let Ok(bytes) = bincode::serialize(&cmd) {
263 let _ = write_frame(&mut agent.stream, frame_kind::EVAL, &bytes);
264 }
265 }
266 }
267
268 /// Send FIRE to all agents
269 fn fire_all(&self, duration_secs: f64) {
270 let cmd = FireCommand {
271 workload: WorkloadType::Cpu,
272 duration_secs,
273 intensity: 1.0,
274 };
275 let cmd_bytes = bincode::serialize(&cmd).unwrap();
276
277 let mut agents = self.agents.lock().unwrap();
278 let mut fired = 0;
279
280 for agent in agents.values_mut() {
281 if write_frame(&mut agent.stream, frame_kind::FIRE, &cmd_bytes).is_ok() {
282 agent.state = AgentState::Firing;
283 fired += 1;
284 }
285 }
286
287 eprintln!("[fire] {} agents, duration={}s", fired, duration_secs);
288 }
289
290 /// Send EVAL to every connected agent, then synchronously collect each agent's
291 /// `EvalResult` and print it to stdout. Per-agent the path is request/response:
292 /// stale frames from previous commands (`METRICS` after a long-running `FIRE`,
293 /// etc.) are quietly skipped so the next visible line is always the eval result.
294 /// A 30-second read timeout guards against agents that ignore the frame entirely
295 /// (e.g. an old agent version with no EVAL handler).
296 ///
297 /// Output ordering: agents are visited in **stable alphabetical order by display
298 /// name** (agent_name if set, else hostname) so successive controllers and
299 /// successive `@eval` calls within one controller produce comparable transcripts.
300 /// HashMap iteration order would shuffle per controller run otherwise (Rust
301 /// randomizes the hash seed per process).
302 ///
303 /// Multi-line output is prefixed **per line**: each `\n`-separated line of an
304 /// agent's stringified result carries its own `[name/ok|ERR]` tag so grepping
305 /// or diffing transcripts by agent stays trivial regardless of result shape.
306 ///
307 /// **Concurrent execution.** Done as a two-pass loop with no threading or
308 /// concurrency primitives:
309 ///
310 /// * **Pass 1** writes the EVAL frame to every agent in rapid succession
311 /// (each `write_frame` is just a kernel send, no waiting for the reply).
312 /// By the end of pass 1 every agent is already executing in parallel.
313 /// * **Pass 2** reads the EVAL_RESULT back from each agent in the same
314 /// sorted order.
315 ///
316 /// Total wall time = max(per-agent latency), not sum — three agents that
317 /// each take 5 s now finish in ~5 s wall, not 15. Output stays alphabetical
318 /// because pass 2 reads in the same order pass 1 wrote.
319 fn eval_all(&self, code: &str) {
320 let cmd = EvalCommand {
321 code: code.to_string(),
322 };
323 let cmd_bytes = bincode::serialize(&cmd).expect("serialize EvalCommand");
324
325 let mut agents = self.agents.lock().unwrap();
326 if agents.is_empty() {
327 println!("[eval] no agents connected");
328 return;
329 }
330
331 // Build a stable visit order by display name. Done inside the mutex guard
332 // so the (id → name) snapshot can't be invalidated by an accept thread.
333 let mut order: Vec<(u64, String)> = agents
334 .iter()
335 .map(|(id, a)| {
336 let name = a.agent_name.clone().unwrap_or_else(|| a.hostname.clone());
337 (*id, name)
338 })
339 .collect();
340 order.sort_by(|a, b| a.1.cmp(&b.1));
341
342 // Pass 1 — fan out: write EVAL to every agent, set its read timeout.
343 // Tracks (id, name) pairs we successfully dispatched to, so pass 2 only
344 // tries to read from agents that actually received the frame.
345 let mut dispatched: Vec<(u64, String)> = Vec::with_capacity(order.len());
346 for (id, name) in &order {
347 let agent = match agents.get_mut(id) {
348 Some(a) => a,
349 None => continue,
350 };
351 if let Err(e) = write_frame(&mut agent.stream, frame_kind::EVAL, &cmd_bytes) {
352 print_tagged(name, "ERR", &format!("write error: {}", e));
353 continue;
354 }
355 let _ = agent.stream.set_read_timeout(Some(Duration::from_secs(30)));
356 dispatched.push((*id, name.clone()));
357 }
358 // At this point every dispatched agent is executing concurrently.
359
360 // Pass 2 — collect: read EVAL_RESULT from each agent in sorted order.
361 for (id, name) in &dispatched {
362 let agent = match agents.get_mut(id) {
363 Some(a) => a,
364 None => continue,
365 };
366 loop {
367 match read_frame(&mut agent.stream) {
368 Ok((frame_kind::EVAL_RESULT, payload)) => {
369 match bincode::deserialize::<EvalResult>(&payload) {
370 Ok(r) => {
371 let tag = if r.ok { "ok" } else { "ERR" };
372 print_tagged(name, tag, &r.output);
373 }
374 Err(e) => {
375 print_tagged(name, "ERR", &format!("malformed EVAL_RESULT: {}", e))
376 }
377 }
378 break;
379 }
380 Ok((other_kind, _)) => {
381 eprintln!(
382 "[{}] (dropped stale frame kind 0x{:02X} while awaiting EVAL_RESULT)",
383 name, other_kind
384 );
385 }
386 Err(e) => {
387 print_tagged(name, "ERR", &format!("read error: {}", e));
388 break;
389 }
390 }
391 }
392 let _ = agent.stream.set_read_timeout(None);
393 }
394 }
395
396 /// Send TERMINATE to all agents
397 fn terminate_all(&self) {
398 let mut agents = self.agents.lock().unwrap();
399 let mut terminated = 0;
400
401 for agent in agents.values_mut() {
402 if write_frame(&mut agent.stream, frame_kind::TERMINATE, &[]).is_ok() {
403 agent.state = AgentState::Terminated;
404 terminated += 1;
405 }
406 }
407
408 eprintln!("[terminate] {} agents", terminated);
409 }
410
411 /// Print status of all agents
412 fn print_status(&self) {
413 let agents = self.agents.lock().unwrap();
414
415 if agents.is_empty() {
416 println!("No agents connected.");
417 return;
418 }
419
420 println!(
421 "{:<20} {:>6} {:>10} {:>12} {:>10}",
422 "AGENT", "CORES", "MEMORY", "STATE", "UPTIME"
423 );
424 println!("{}", "-".repeat(62));
425
426 for agent in agents.values() {
427 let name = agent
428 .agent_name
429 .clone()
430 .unwrap_or_else(|| agent.hostname.clone());
431 let mem_gb = agent.memory_bytes / (1024 * 1024 * 1024);
432 let state = match agent.state {
433 AgentState::Idle => "idle",
434 AgentState::Armed => "armed",
435 AgentState::Firing => "FIRING",
436 AgentState::Terminated => "terminated",
437 };
438 let uptime = agent.connected_at.elapsed().as_secs();
439
440 println!(
441 "{:<20} {:>6} {:>8}GB {:>12} {:>8}s",
442 name, agent.cores, mem_gb, state, uptime
443 );
444 }
445
446 let total_cores: usize = agents.values().map(|a| a.cores).sum();
447 let firing_count = agents
448 .values()
449 .filter(|a| a.state == AgentState::Firing)
450 .count();
451
452 println!();
453 println!(
454 "Total: {} agents, {} cores, {} firing",
455 agents.len(),
456 total_cores,
457 firing_count
458 );
459 }
460
461 /// Send SHUTDOWN to all agents
462 fn shutdown_all(&self) {
463 let mut agents = self.agents.lock().unwrap();
464
465 for agent in agents.values_mut() {
466 let _ = write_frame(&mut agent.stream, frame_kind::SHUTDOWN, &[]);
467 }
468
469 agents.clear();
470 self.running.store(false, Ordering::Relaxed);
471 eprintln!("[shutdown] all agents disconnected");
472 }
473
474 /// Run the REPL
475 fn run_repl(&self) {
476 use std::io::{stdin, BufRead};
477
478 println!("stryke controller v{}", env!("CARGO_PKG_VERSION"));
479 println!("Type 'help' for commands, Ctrl-C to exit\n");
480
481 let stdin = stdin();
482 for line in stdin.lock().lines() {
483 let line = match line {
484 Ok(l) => l,
485 Err(_) => break,
486 };
487
488 // `@` prefix: ship the rest of the line as stryke source to every
489 // agent. Matches the sigil the user already associates with `@` in
490 // the language, and saves four keystrokes vs the explicit `eval`
491 // verb. `@ code`, `@code`, `@code with args` all work — the `@`
492 // is stripped and the remainder is sent verbatim.
493 let trimmed = line.trim_start();
494 if let Some(rest) = trimmed.strip_prefix('@') {
495 let code = rest.trim();
496 if code.is_empty() {
497 println!("usage: @CODE (alias for `eval CODE`)");
498 } else {
499 self.eval_all(code);
500 }
501 continue;
502 }
503
504 let parts: Vec<&str> = line.split_whitespace().collect();
505 if parts.is_empty() {
506 continue;
507 }
508
509 match parts[0] {
510 "status" | "s" => self.print_status(),
511 "fire" | "f" => {
512 let duration = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(10.0);
513 self.fire_all(duration);
514 }
515 "eval" | "e" => {
516 // Everything after the verb (preserving inner whitespace) is the source.
517 let after = line
518 .trim_start()
519 .splitn(2, char::is_whitespace)
520 .nth(1)
521 .unwrap_or("")
522 .trim();
523 if after.is_empty() {
524 println!("usage: eval CODE (sends CODE to every connected agent for execution against its persistent VM)");
525 } else {
526 self.eval_all(after);
527 }
528 }
529 "terminate" | "t" | "stop" => self.terminate_all(),
530 "shutdown" | "quit" | "exit" | "q" => {
531 self.shutdown_all();
532 break;
533 }
534 "help" | "h" | "?" => {
535 println!("Commands:");
536 println!(" status (s) List connected agents");
537 println!(" fire [SECS] (f) Start stress test (default: 10s)");
538 println!(" eval CODE (e) Run arbitrary stryke source on every agent (state persists across calls)");
539 println!(" @CODE Shorthand for `eval CODE` — `@<source>` ships <source> to every agent");
540 println!(" terminate (t) Stop stress test");
541 println!(" shutdown (q) Disconnect all and exit");
542 println!(" help (h) Show this help");
543 }
544 _ => println!("Unknown command: {}. Type 'help' for commands.", parts[0]),
545 }
546 }
547 }
548}
549
550/// Read a framed message
551/// Print `output` to stdout with every line prefixed `[name/tag] `. Empty
552/// output produces a single bare `[name/tag]` line so the caller always sees
553/// a row per agent even when the eval returned void.
554fn print_tagged(name: &str, tag: &str, output: &str) {
555 let stdout = std::io::stdout();
556 let mut handle = stdout.lock();
557 let _ = write_tagged(&mut handle, name, tag, output);
558}
559
560/// Inner workhorse for [`print_tagged`], generic over `Write` so tests can
561/// observe the exact bytes that go to stdout.
562fn write_tagged<W: Write>(w: &mut W, name: &str, tag: &str, output: &str) -> std::io::Result<()> {
563 if output.is_empty() {
564 writeln!(w, "[{}/{}]", name, tag)?;
565 return Ok(());
566 }
567 for ln in output.lines() {
568 writeln!(w, "[{}/{}] {}", name, tag, ln)?;
569 }
570 // Preserve a trailing newline in the source output (e.g. `p "x"` ends with \n)
571 // by emitting a bare prefixed line, so successive evals don't visually run
572 // into each other.
573 if output.ends_with('\n') {
574 writeln!(w, "[{}/{}]", name, tag)?;
575 }
576 Ok(())
577}
578
579fn read_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
580 let mut len_buf = [0u8; 8];
581 r.read_exact(&mut len_buf)?;
582 let len = u64::from_le_bytes(len_buf) as usize;
583 if len < 1 {
584 return Err(std::io::Error::new(
585 std::io::ErrorKind::InvalidData,
586 "empty frame",
587 ));
588 }
589 let mut payload = vec![0u8; len];
590 r.read_exact(&mut payload)?;
591 let kind = payload[0];
592 Ok((kind, payload[1..].to_vec()))
593}
594
595/// Write a framed message
596fn write_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
597 let total_len = 1 + payload.len();
598 w.write_all(&(total_len as u64).to_le_bytes())?;
599 w.write_all(&[kind])?;
600 w.write_all(payload)?;
601 w.flush()
602}
603
604/// Main entry point — back-compat wrapper that delegates to
605/// [`spawn_controller`] + [`ControllerHandle::run_repl_blocking`]. Preserves
606/// the historical CLI behaviour: bind, accept agents in a background thread,
607/// run the interactive REPL on the main thread, cleanly join the accept thread
608/// on REPL exit. Scripts that want non-REPL programmatic access use
609/// [`spawn_controller`] directly.
610pub fn run_controller(bind: &str, port: u16) -> i32 {
611 let handle = match spawn_controller(bind, port) {
612 Ok(h) => h,
613 Err(e) => {
614 eprintln!("controller: cannot bind to {}:{}: {}", bind, port, e);
615 return 1;
616 }
617 };
618
619 eprintln!("stryke controller listening on {}", handle.listen_addr());
620 eprintln!("Waiting for agents...\n");
621
622 handle.run_repl_blocking();
623 0
624}
625
626// ============================================================================
627// Programmatic / Scriptable API (Tier 0)
628// ============================================================================
629//
630// Lets `.stk` scripts drive the controller without the REPL. Builtins in
631// `builtins.rs` (`ordain`, `muster`, `pray`, `annex`) wrap these methods and
632// expose opaque integer IDs to script code via the registries below.
633//
634// Semantics:
635// * `spawn_controller(bind, port)` — bind listener, start accept thread,
636// return `Arc<ControllerHandle>`. Non-blocking.
637// * `ControllerHandle::muster()` — current session IDs of connected agents.
638// * `ControllerHandle::welcome(n, timeout)` — block until >= n agents have
639// connected, or the timeout elapses.
640// * `ControllerHandle::scatter(code, &[id])` — write EVAL to each agent in
641// parallel (fan-out pass), return a `petition_id`. Agents start executing
642// immediately; results are NOT collected here.
643// * `ControllerHandle::gather(petition_id, timeout)` — read EVAL_RESULT
644// from every agent dispatched in that scatter, in parallel, return a
645// HashMap<session_id, EvalResult>. Removes the divination from the
646// pending table once consumed (so the same petition_id can't be
647// gathered twice).
648// * `ControllerHandle::shutdown()` — send SHUTDOWN to all agents, mark
649// accept loop done, wake it with a self-connect, join the accept thread.
650//
651// Threading note: scatter + gather both lock the `agents` map for the duration
652// of the operation. This serializes against the accept loop briefly, but lets
653// us reuse the same TcpStream for both write and read passes without cloning
654// or per-agent reader threads. Multi-outstanding-petition concurrency is a
655// Tier 1 problem.
656
657/// One outstanding scatter. Records the session IDs the EVAL was actually
658/// written to so `gather` only reads from agents that received the frame.
659struct DivinationState {
660 dispatched: Vec<u64>,
661}
662
663/// Non-blocking handle to a running [`Controller`]. Returned by
664/// [`spawn_controller`]; used by the scriptable builtins to drive the
665/// distributed compute fabric from `.stk` code.
666pub struct ControllerHandle {
667 /// `controller` field.
668 controller: Arc<Controller>,
669 /// `listen_addr` field.
670 listen_addr: std::net::SocketAddr,
671 /// `accept_handle` field.
672 accept_handle: Mutex<Option<thread::JoinHandle<()>>>,
673 /// `next_petition_id` field.
674 next_petition_id: AtomicU64,
675 /// `pending_divinations` field.
676 pending_divinations: Mutex<HashMap<u64, DivinationState>>,
677}
678
679impl ControllerHandle {
680 /// Returns the address the listener is actually bound to (port may have
681 /// been auto-assigned if 0 was passed in).
682 pub fn listen_addr(&self) -> std::net::SocketAddr {
683 self.listen_addr
684 }
685
686 /// Current count of connected agents.
687 pub fn agent_count(&self) -> usize {
688 self.controller.agents.lock().unwrap().len()
689 }
690
691 /// Return session IDs of all currently connected agents, in numerically
692 /// sorted order (deterministic for tests and scripts).
693 pub fn muster(&self) -> Vec<u64> {
694 let mut ids: Vec<u64> = self
695 .controller
696 .agents
697 .lock()
698 .unwrap()
699 .keys()
700 .copied()
701 .collect();
702 ids.sort_unstable();
703 ids
704 }
705
706 /// Block until at least `target_count` agents are connected, or `timeout`
707 /// elapses. Returns `true` if the count was reached. Polls every 50ms.
708 pub fn welcome(&self, target_count: usize, timeout: Duration) -> bool {
709 let start = Instant::now();
710 loop {
711 if self.agent_count() >= target_count {
712 return true;
713 }
714 if start.elapsed() >= timeout {
715 return self.agent_count() >= target_count;
716 }
717 thread::sleep(Duration::from_millis(50));
718 }
719 }
720
721 /// Fan EVAL out to `agent_ids` in parallel. Returns a `petition_id` used
722 /// later with [`gather`](Self::gather) to collect results. Agents that
723 /// don't exist in the roster or whose write fails are silently dropped
724 /// from the dispatched set, so a stale agent_id in `agent_ids` does NOT
725 /// cause the whole scatter to fail.
726 ///
727 /// Returns `Err` only if the EvalCommand fails to bincode-serialize
728 /// (which should never happen for plain strings).
729 pub fn scatter(&self, code: &str, agent_ids: &[u64]) -> std::io::Result<u64> {
730 use rayon::prelude::*;
731
732 let cmd = EvalCommand {
733 code: code.to_string(),
734 };
735 let cmd_bytes = bincode::serialize(&cmd)
736 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
737
738 // Tier 2: parallel fanout. Take the mutex once to grab per-agent
739 // TcpStream clones (sharing the underlying socket), drop the mutex,
740 // then write_frame on each clone in parallel via Rayon. The N
741 // sequential write_frames in the prior impl scaled O(N) at ~10μs
742 // per write; parallel scaling brings 10k-agent fanout from ~100ms
743 // down to ~1ms on an 8-core box.
744 //
745 // Stream clones share the underlying fd — writing to a clone IS
746 // writing to the agent's socket. Reads in `gather` go through the
747 // original (which we kept in the HashMap), so writer and reader
748 // don't conflict.
749 let stream_clones: Vec<(u64, TcpStream)> = {
750 let agents = self.controller.agents.lock().unwrap();
751 agent_ids
752 .iter()
753 .filter_map(|id| {
754 agents
755 .get(id)
756 .and_then(|a| a.stream.try_clone().ok().map(|s| (*id, s)))
757 })
758 .collect()
759 };
760
761 let cmd_bytes = Arc::new(cmd_bytes);
762 let dispatched: Vec<u64> = stream_clones
763 .into_par_iter()
764 .filter_map(|(id, mut stream)| {
765 if write_frame(&mut stream, frame_kind::EVAL, cmd_bytes.as_slice()).is_ok() {
766 Some(id)
767 } else {
768 None
769 }
770 })
771 .collect();
772
773 let petition_id = self.next_petition_id.fetch_add(1, Ordering::Relaxed);
774 self.pending_divinations
775 .lock()
776 .unwrap()
777 .insert(petition_id, DivinationState { dispatched });
778 Ok(petition_id)
779 }
780
781 /// Block for results of `petition_id` up to `timeout` (per-agent read
782 /// timeout, not total wall time). Returns a HashMap of session_id →
783 /// EvalResult for every agent that replied with a valid EVAL_RESULT
784 /// frame in time. Agents that timed out, errored, or disconnected are
785 /// omitted from the map.
786 ///
787 /// Stale frames in the per-agent socket buffer (e.g. METRICS left over
788 /// from a prior FIRE) are silently skipped — same loop pattern as
789 /// `eval_all` so manual REPL behaviour stays consistent with scripted.
790 ///
791 /// Removes the divination from the pending table on return, so a
792 /// second `gather` on the same petition_id is an error.
793 pub fn gather(
794 &self,
795 petition_id: u64,
796 timeout: Duration,
797 ) -> std::io::Result<HashMap<u64, EvalResult>> {
798 let state = self
799 .pending_divinations
800 .lock()
801 .unwrap()
802 .remove(&petition_id)
803 .ok_or_else(|| {
804 std::io::Error::new(
805 std::io::ErrorKind::NotFound,
806 format!("no pending divination for petition_id {}", petition_id),
807 )
808 })?;
809
810 let mut results: HashMap<u64, EvalResult> = HashMap::new();
811 let mut agents = self.controller.agents.lock().unwrap();
812 for id in &state.dispatched {
813 let agent = match agents.get_mut(id) {
814 Some(a) => a,
815 None => continue, // agent disconnected between scatter and gather
816 };
817 let _ = agent.stream.set_read_timeout(Some(timeout));
818 loop {
819 match read_frame(&mut agent.stream) {
820 Ok((frame_kind::EVAL_RESULT, payload)) => {
821 if let Ok(r) = bincode::deserialize::<EvalResult>(&payload) {
822 results.insert(*id, r);
823 }
824 break;
825 }
826 Ok(_) => continue, // stale frame, skip
827 Err(_) => break, // timeout or disconnect
828 }
829 }
830 let _ = agent.stream.set_read_timeout(None);
831 }
832 Ok(results)
833 }
834
835 /// Register an active chant — a prayer that fires at every current
836 /// agent now AND at every new agent that joins later (via the
837 /// `accept_loop` → `fire_chants_at` path). Returns a chant_id used
838 /// by `amen_chant` to stop the rescatter.
839 ///
840 /// Fire-and-forget: chants don't accumulate replies. Use for state
841 /// distribution (`bestow`-like push to current + future workers).
842 pub fn chant(&self, code: &str, agent_ids: &[u64]) -> std::io::Result<u64> {
843 // Fan out to current agents using the regular scatter machinery
844 // — we just don't register a divination since there's no gather.
845 let cmd = EvalCommand {
846 code: code.to_string(),
847 };
848 let cmd_bytes = bincode::serialize(&cmd)
849 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
850 let mut agents = self.controller.agents.lock().unwrap();
851 for id in agent_ids {
852 if let Some(agent) = agents.get_mut(id) {
853 let _ = write_frame(&mut agent.stream, frame_kind::EVAL, &cmd_bytes);
854 }
855 }
856 drop(agents);
857
858 // Record in active_chants so late joiners get it too.
859 let chant_id = NEXT_CHANT_ID.fetch_add(1, Ordering::Relaxed);
860 self.controller
861 .chants
862 .lock()
863 .unwrap()
864 .insert(chant_id, code.to_string());
865 Ok(chant_id)
866 }
867
868 /// Stop an active chant. Late joiners after this call won't receive it.
869 /// Returns true if the chant was active and removed, false otherwise.
870 pub fn amen_chant(&self, chant_id: u64) -> bool {
871 self.controller
872 .chants
873 .lock()
874 .unwrap()
875 .remove(&chant_id)
876 .is_some()
877 }
878
879 /// Silence the accept_loop's per-agent "[agent connected]" eprintln.
880 /// Set to true before a bulk-spawn loop (`congregation(N)` / `anoint(N)`)
881 /// to prevent the fork-thread/stdio RefCell race that loses 1-3
882 /// children at N>~50 on macOS Rust stdio. Set back to false after
883 /// the spawn loop completes if you want the REPL UX back.
884 pub fn set_quiet_accept(&self, quiet: bool) {
885 self.controller.quiet_accept.store(quiet, Ordering::Relaxed);
886 }
887
888 /// Turn :cloistered mode on (with a single accepted token) or off
889 /// (with an empty token). Cloistered accept_loop reads an AGENT_AUTH
890 /// frame after HELLO and rejects agents that don't present a valid
891 /// token. Open mode bypasses the AUTH read entirely.
892 pub fn set_cloistered(&self, token: Option<&str>) {
893 match token {
894 Some(t) if !t.is_empty() => {
895 self.controller
896 .auth_tokens
897 .lock()
898 .unwrap()
899 .insert(t.to_string());
900 self.controller.cloistered.store(true, Ordering::Relaxed);
901 }
902 _ => {
903 self.controller.cloistered.store(false, Ordering::Relaxed);
904 self.controller.auth_tokens.lock().unwrap().clear();
905 }
906 }
907 }
908
909 /// Send SHUTDOWN to a specific subset of agents (the `excommunicate` verb).
910 /// Each agent receives a SHUTDOWN frame and exits its loop; the agent's
911 /// TCP connection is dropped. Returns the count of agents that the frame
912 /// was successfully written to (write failures from disconnected agents
913 /// are silently swallowed — same convention as `scatter`).
914 pub fn excommunicate(&self, agent_ids: &[u64]) -> usize {
915 let mut agents = self.controller.agents.lock().unwrap();
916 let mut count = 0;
917 for id in agent_ids {
918 if let Some(agent) = agents.get_mut(id) {
919 if write_frame(&mut agent.stream, frame_kind::SHUTDOWN, &[]).is_ok() {
920 count += 1;
921 }
922 }
923 }
924 // Best-effort removal from the roster. The accept thread won't see
925 // the disconnect until the OS notices the dropped connection, so we
926 // proactively drop them here.
927 for id in agent_ids {
928 agents.remove(id);
929 }
930 count
931 }
932
933 /// Pilgrimage barrier — scatter `barrier_code` to all `agent_ids` and
934 /// block until every agent that received the frame replies, OR `timeout`
935 /// elapses. Returns `true` if every dispatched agent replied in time.
936 ///
937 /// `barrier_code` is the prayer the agents execute at the barrier; for a
938 /// pure rendezvous, pass `"1"` and the agent's reply is the synchronization
939 /// signal. For computational barriers, pass code that does the work and
940 /// returns when done.
941 pub fn pilgrimage(&self, barrier_code: &str, agent_ids: &[u64], timeout: Duration) -> bool {
942 let petition_id = match self.scatter(barrier_code, agent_ids) {
943 Ok(p) => p,
944 Err(_) => return false,
945 };
946 let expected = self
947 .pending_divinations
948 .lock()
949 .unwrap()
950 .get(&petition_id)
951 .map(|d| d.dispatched.len())
952 .unwrap_or(0);
953 match self.gather(petition_id, timeout) {
954 Ok(results) => results.len() == expected,
955 Err(_) => false,
956 }
957 }
958
959 /// SHUTDOWN every agent, stop the accept loop, join the accept thread.
960 /// Wakes the blocking `listener.incoming()` call by self-connecting to
961 /// the bound address (the connection's HELLO read will fail and the
962 /// accept thread will exit its loop now that `running` is false).
963 pub fn shutdown(&self) {
964 self.controller.shutdown_all();
965 // Wake the accept loop by self-connecting; it'll see running=false
966 // and break. We swallow any connect error — the listener may already
967 // be gone if shutdown_all races with another shutdown call.
968 let _ = TcpStream::connect(self.listen_addr);
969 if let Some(h) = self.accept_handle.lock().unwrap().take() {
970 let _ = h.join();
971 }
972 }
973
974 /// Run the existing REPL on the calling thread, then clean shutdown.
975 /// Used by [`run_controller`] for back-compat with the CLI subcommand.
976 pub fn run_repl_blocking(&self) {
977 self.controller.run_repl();
978 self.controller.running.store(false, Ordering::Relaxed);
979 let _ = TcpStream::connect(self.listen_addr);
980 if let Some(h) = self.accept_handle.lock().unwrap().take() {
981 let _ = h.join();
982 }
983 }
984}
985
986/// Bind a listener and start the accept thread, return a non-blocking handle.
987/// Pass `port = 0` to let the OS pick a free port; recover the chosen one via
988/// [`ControllerHandle::listen_addr`].
989pub fn spawn_controller(bind: &str, port: u16) -> std::io::Result<Arc<ControllerHandle>> {
990 let addr = format!("{}:{}", bind, port);
991 let listener = TcpListener::bind(&addr)?;
992 let listen_addr = listener.local_addr()?;
993
994 let controller = Arc::new(Controller::new());
995 let ctrl_clone = Arc::clone(&controller);
996 let accept_handle = thread::spawn(move || {
997 ctrl_clone.accept_loop(listener);
998 });
999
1000 Ok(Arc::new(ControllerHandle {
1001 controller,
1002 listen_addr,
1003 accept_handle: Mutex::new(Some(accept_handle)),
1004 next_petition_id: AtomicU64::new(1),
1005 pending_divinations: Mutex::new(HashMap::new()),
1006 }))
1007}
1008
1009// ─── Global handle registries (script ↔ Rust bridge) ────────────────────────
1010//
1011// Stryke scripts can't hold `Arc<ControllerHandle>` directly — they only see
1012// `StrykeValue`s. So we stash the live handle in a process-global registry
1013// and hand the script an opaque integer ID. Same pattern for divination
1014// handles (each `pray` call returns a divination ID).
1015//
1016// Both registries are `OnceLock<Mutex<HashMap>>` — initialised on first use,
1017// shared across all threads. The script-visible IDs are monotonic atomics
1018// so they never collide within a single process.
1019
1020static CONTROLLER_REGISTRY: OnceLock<Mutex<HashMap<u64, Arc<ControllerHandle>>>> = OnceLock::new();
1021static NEXT_HANDLE_ID: AtomicU64 = AtomicU64::new(1);
1022
1023fn controller_registry() -> &'static Mutex<HashMap<u64, Arc<ControllerHandle>>> {
1024 CONTROLLER_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1025}
1026
1027/// Register a controller handle; returns a script-visible u64 ID.
1028pub fn register_controller(handle: Arc<ControllerHandle>) -> u64 {
1029 let id = NEXT_HANDLE_ID.fetch_add(1, Ordering::Relaxed);
1030 controller_registry().lock().unwrap().insert(id, handle);
1031 id
1032}
1033
1034/// Look up a controller by its script-visible ID. Returns `None` if the ID
1035/// was never registered or has been unregistered.
1036pub fn get_controller(handle_id: u64) -> Option<Arc<ControllerHandle>> {
1037 controller_registry()
1038 .lock()
1039 .unwrap()
1040 .get(&handle_id)
1041 .map(Arc::clone)
1042}
1043
1044/// Remove a controller from the registry. Caller typically also calls
1045/// `shutdown()` on the returned Arc before dropping it.
1046pub fn unregister_controller(handle_id: u64) -> Option<Arc<ControllerHandle>> {
1047 controller_registry().lock().unwrap().remove(&handle_id)
1048}
1049
1050// ─── Chant ID atomic + registry ──────────────────────────────────────────────
1051//
1052// Chants get their own ID space (separate from divinations) so amen() can
1053// dispatch correctly. The CHANT_REGISTRY maps script-visible chant_id →
1054// controller_id so `amen` can find the right controller's active-chant table.
1055
1056static NEXT_CHANT_ID: AtomicU64 = AtomicU64::new(1);
1057static CHANT_REGISTRY: OnceLock<Mutex<HashMap<u64, (u64, u64)>>> = OnceLock::new();
1058
1059fn chant_registry() -> &'static Mutex<HashMap<u64, (u64, u64)>> {
1060 CHANT_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1061}
1062
1063/// Register a chant in the global registry; returns the script-visible
1064/// chant_id. The pair stored is (controller_id, controller_local_chant_id).
1065pub fn register_chant(controller_id: u64, local_chant_id: u64) -> u64 {
1066 let id = NEXT_CHANT_ID.fetch_add(1, Ordering::Relaxed);
1067 chant_registry()
1068 .lock()
1069 .unwrap()
1070 .insert(id, (controller_id, local_chant_id));
1071 id
1072}
1073/// `get_chant` — see implementation.
1074pub fn get_chant(chant_id: u64) -> Option<(u64, u64)> {
1075 chant_registry().lock().unwrap().get(&chant_id).copied()
1076}
1077/// `unregister_chant` — see implementation.
1078pub fn unregister_chant(chant_id: u64) -> Option<(u64, u64)> {
1079 chant_registry().lock().unwrap().remove(&chant_id)
1080}
1081
1082// ─── Cathedral — in-process named congregation registry ─────────────────────
1083//
1084// Maps "congregation name" → controller endpoint (host:port). Masters
1085// register themselves at `ordain("name", ...)`; slaves look up the
1086// endpoint at `profess("name")` and connect.
1087//
1088// In-process registry only (not a separate daemon). Tier 5+ work would
1089// promote this to a `stryked` standalone binary so cross-host congregations
1090// can resolve names; for now everything is within one shared OS-image
1091// process address space.
1092
1093static CATHEDRAL: OnceLock<Mutex<HashMap<String, String>>> = OnceLock::new();
1094
1095fn cathedral() -> &'static Mutex<HashMap<String, String>> {
1096 CATHEDRAL.get_or_init(|| Mutex::new(HashMap::new()))
1097}
1098
1099/// Register a congregation name → endpoint binding. Returns the previous
1100/// binding if any (so caller can detect collisions if it cares).
1101pub fn cathedral_register(name: &str, endpoint: &str) -> Option<String> {
1102 cathedral()
1103 .lock()
1104 .unwrap()
1105 .insert(name.to_string(), endpoint.to_string())
1106}
1107
1108/// Look up a congregation name → endpoint. Returns None if unregistered.
1109pub fn cathedral_lookup(name: &str) -> Option<String> {
1110 cathedral().lock().unwrap().get(name).cloned()
1111}
1112
1113/// Remove a name from the registry. Returns the endpoint that was bound.
1114pub fn cathedral_unregister(name: &str) -> Option<String> {
1115 cathedral().lock().unwrap().remove(name)
1116}
1117
1118/// Enumerate registered congregation names (sorted).
1119pub fn cathedral_names() -> Vec<String> {
1120 let mut names: Vec<String> = cathedral().lock().unwrap().keys().cloned().collect();
1121 names.sort();
1122 names
1123}
1124
1125// ─── Divination registry: divination_id → (controller_id, petition_id) ──────
1126
1127static DIVINATION_REGISTRY: OnceLock<Mutex<HashMap<u64, (u64, u64)>>> = OnceLock::new();
1128static NEXT_DIVINATION_ID: AtomicU64 = AtomicU64::new(1);
1129
1130fn divination_registry() -> &'static Mutex<HashMap<u64, (u64, u64)>> {
1131 DIVINATION_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1132}
1133
1134/// Register a divination; returns a script-visible u64 ID that resolves back
1135/// to (controller_id, petition_id) via [`get_divination`].
1136pub fn register_divination(controller_id: u64, petition_id: u64) -> u64 {
1137 let id = NEXT_DIVINATION_ID.fetch_add(1, Ordering::Relaxed);
1138 divination_registry()
1139 .lock()
1140 .unwrap()
1141 .insert(id, (controller_id, petition_id));
1142 id
1143}
1144
1145/// Look up a divination's (controller_id, petition_id) pair.
1146pub fn get_divination(divination_id: u64) -> Option<(u64, u64)> {
1147 divination_registry()
1148 .lock()
1149 .unwrap()
1150 .get(&divination_id)
1151 .copied()
1152}
1153
1154/// Remove a divination from the registry. Returns the (controller_id,
1155/// petition_id) pair so the caller can route the actual gather request.
1156pub fn unregister_divination(divination_id: u64) -> Option<(u64, u64)> {
1157 divination_registry().lock().unwrap().remove(&divination_id)
1158}
1159
1160// ─── Current-controller tracking for ergonomic script API ───────────────────
1161//
1162// Scripts that work with a single congregation (the overwhelmingly common
1163// case) shouldn't have to thread a controller handle through every `pray` /
1164// `muster` / `annex` call. We stash the most-recently-created controller
1165// here and `pray` / `muster` / `annex` fall back to it when no explicit
1166// controller is named.
1167//
1168// Scripts juggling multiple concurrent congregations pass the controller
1169// handle explicitly via the low-level `ordain` return value.
1170
1171static CURRENT_CONTROLLER_ID: AtomicU64 = AtomicU64::new(0);
1172
1173/// Make `controller_id` the implicit target for subsequent `pray` / `muster`
1174/// / `annex` calls that don't name a controller.
1175pub fn set_current_controller(controller_id: u64) {
1176 CURRENT_CONTROLLER_ID.store(controller_id, Ordering::Relaxed);
1177}
1178
1179/// Return the current implicit controller, or `None` if no congregation /
1180/// ordination has happened yet in this process.
1181pub fn get_current_controller() -> Option<u64> {
1182 let id = CURRENT_CONTROLLER_ID.load(Ordering::Relaxed);
1183 if id == 0 {
1184 None
1185 } else {
1186 Some(id)
1187 }
1188}
1189
1190/// Print controller help
1191pub fn print_help() {
1192 println!("stryke controller — Distributed load testing controller");
1193 println!();
1194 println!("USAGE:");
1195 println!(" stryke controller [OPTIONS]");
1196 println!();
1197 println!("OPTIONS:");
1198 println!(" --bind ADDR Bind address (default: 0.0.0.0)");
1199 println!(" --port PORT Listen port (default: 9999)");
1200 println!(" --help Print this help");
1201 println!();
1202 println!("COMMANDS (in REPL):");
1203 println!(" status List connected agents");
1204 println!(" fire [SECS] Start stress test (default: 10 seconds)");
1205 println!(" terminate Stop stress test");
1206 println!(" shutdown Disconnect all agents and exit");
1207 println!();
1208 println!("EXAMPLE:");
1209 println!(" stryke controller --port 9999");
1210 println!();
1211 println!(" controller> status");
1212 println!(" controller> fire 60 # 60 second stress test");
1213 println!(" controller> terminate");
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218 use super::*;
1219 use crate::agent::{frame_kind, handle_eval_frame, read_frame};
1220 use crate::vm_helper::VMHelper;
1221 use std::net::{TcpListener, TcpStream};
1222 use std::sync::Arc;
1223 use std::thread;
1224 use std::time::{Duration, Instant};
1225
1226 fn s(name: &str, tag: &str, output: &str) -> String {
1227 let mut buf = Vec::new();
1228 write_tagged(&mut buf, name, tag, output).unwrap();
1229 String::from_utf8(buf).unwrap()
1230 }
1231
1232 /// Spawn a synthetic agent on a fresh loopback port that:
1233 /// 1. accepts one connection,
1234 /// 2. reads one EVAL frame,
1235 /// 3. sleeps `delay`,
1236 /// 4. replies with an EVAL_RESULT computed against a real `VMHelper`.
1237 /// Returns the controller-side `TcpStream` and the handle for the worker.
1238 fn spawn_synthetic_agent(delay: Duration) -> (TcpStream, thread::JoinHandle<()>) {
1239 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1240 let addr = listener.local_addr().unwrap();
1241 let handle = thread::spawn(move || {
1242 let (mut server, _) = listener.accept().expect("accept");
1243 let mut interp = VMHelper::new();
1244 let (kind, payload) = read_frame(&mut server).expect("read EVAL");
1245 assert_eq!(kind, frame_kind::EVAL);
1246 thread::sleep(delay);
1247 handle_eval_frame(&mut server, &mut interp, &payload).expect("reply");
1248 });
1249 let client = TcpStream::connect(addr).expect("connect");
1250 (client, handle)
1251 }
1252
1253 /// Build a `Controller` populated with the supplied mock agents (one
1254 /// `ConnectedAgent` per entry, ids 1..N, names "agent-NN").
1255 fn controller_with_agents(streams: Vec<TcpStream>) -> Controller {
1256 let controller = Controller::new();
1257 let mut agents = controller.agents.lock().unwrap();
1258 for (i, stream) in streams.into_iter().enumerate() {
1259 let id = (i + 1) as u64;
1260 agents.insert(
1261 id,
1262 ConnectedAgent {
1263 stream,
1264 hostname: "localhost".to_string(),
1265 cores: 1,
1266 memory_bytes: 0,
1267 agent_name: Some(format!("agent-{:02}", id)),
1268 state: AgentState::Idle,
1269 session_id: id,
1270 connected_at: Instant::now(),
1271 },
1272 );
1273 }
1274 drop(agents);
1275 controller
1276 }
1277
1278 /// Single-line output: one tagged row, one trailing newline (from writeln!).
1279 #[test]
1280 fn single_line_output_emits_one_tagged_row() {
1281 assert_eq!(s("node-01", "ok", "42"), "[node-01/ok] 42\n");
1282 }
1283
1284 /// Multi-line output: EVERY line carries the prefix — the wart this commit fixes.
1285 /// Without per-line prefixing, only the first line would have `[name/tag]` and a
1286 /// pipeline like `@p "a"; p "b"; p "c"; 0` would print three orphan lines.
1287 #[test]
1288 fn multi_line_output_prefixes_every_line() {
1289 let got = s("node-02", "ok", "alpha\nbeta\ngamma");
1290 assert_eq!(
1291 got, "[node-02/ok] alpha\n[node-02/ok] beta\n[node-02/ok] gamma\n",
1292 "every line must carry the [name/tag] prefix"
1293 );
1294 }
1295
1296 /// Empty output is NOT swallowed — the caller still sees one bare prefixed line
1297 /// so void returns ("undef" stringifies to "") produce visible per-agent rows.
1298 #[test]
1299 fn empty_output_still_emits_a_row() {
1300 assert_eq!(s("node-03", "ok", ""), "[node-03/ok]\n");
1301 }
1302
1303 /// Trailing newline in source (e.g. `p "x"` returns `"x\n"`) emits the visible
1304 /// content's tagged line plus one bare prefixed line so successive evals don't
1305 /// visually run into each other.
1306 #[test]
1307 fn trailing_newline_emits_blank_prefixed_terminator() {
1308 let got = s("node-04", "ok", "x\n");
1309 assert_eq!(got, "[node-04/ok] x\n[node-04/ok]\n");
1310 }
1311
1312 /// Error tag formatting parallels the ok tag — no special casing.
1313 #[test]
1314 fn error_tag_format_matches_ok() {
1315 assert_eq!(
1316 s("node-05", "ERR", "Division by zero at -e line 1"),
1317 "[node-05/ERR] Division by zero at -e line 1\n"
1318 );
1319 }
1320
1321 /// The two-pass fan-out in `eval_all` must execute agents **in parallel**, not
1322 /// serially. Three mock agents each sleep 250 ms before replying; with the
1323 /// previous serial loop the wall-clock would be ≥750 ms. With the parallel
1324 /// fan-out the writes go out in microseconds and the reads block on the
1325 /// slowest agent — total wall ≈ 250 ms. We assert well under the 750 ms serial
1326 /// bound to keep the test non-flaky under CI load while still failing loudly
1327 /// if anyone reintroduces serial dispatch.
1328 #[test]
1329 fn eval_all_executes_agents_in_parallel_not_serially() {
1330 const N: usize = 3;
1331 const DELAY: Duration = Duration::from_millis(250);
1332 const SERIAL_BOUND: Duration = Duration::from_millis(750); // N * DELAY
1333
1334 let (s1, h1) = spawn_synthetic_agent(DELAY);
1335 let (s2, h2) = spawn_synthetic_agent(DELAY);
1336 let (s3, h3) = spawn_synthetic_agent(DELAY);
1337 let controller = controller_with_agents(vec![s1, s2, s3]);
1338
1339 let start = Instant::now();
1340 controller.eval_all("1 + 1");
1341 let elapsed = start.elapsed();
1342
1343 for h in [h1, h2, h3] {
1344 h.join().expect("agent thread");
1345 }
1346
1347 assert!(
1348 elapsed < Duration::from_millis(600),
1349 "eval_all must run {} agents in parallel (each delay {:?}); elapsed {:?} \
1350 is too close to the serial bound {:?}",
1351 N,
1352 DELAY,
1353 elapsed,
1354 SERIAL_BOUND
1355 );
1356 }
1357
1358 /// Empty controller (no agents connected) prints a notice and returns without
1359 /// panic / hang. Regression guard for the empty-agents branch.
1360 #[test]
1361 fn eval_all_with_no_agents_is_a_noop() {
1362 let controller = Controller::new();
1363 let start = Instant::now();
1364 controller.eval_all("anything");
1365 let elapsed = start.elapsed();
1366 assert!(
1367 elapsed < Duration::from_millis(100),
1368 "no-agents eval_all should return instantly, took {:?}",
1369 elapsed
1370 );
1371 }
1372
1373 /// Stale frames left over in the TCP buffer from a prior FIRE (METRICS) or
1374 /// STATUS (STATUS_RESP) must be silently skipped so `eval_all` finds the
1375 /// EVAL_RESULT it's actually waiting for. Pins the behaviour at controller.rs's
1376 /// pass-2 inner loop.
1377 #[test]
1378 fn eval_all_skips_stale_frames_before_eval_result() {
1379 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1380 let addr = listener.local_addr().unwrap();
1381 let handle = thread::spawn(move || {
1382 let (mut server, _) = listener.accept().expect("accept");
1383 let mut interp = VMHelper::new();
1384 let (kind, payload) = read_frame(&mut server).expect("read EVAL");
1385 assert_eq!(kind, frame_kind::EVAL);
1386 // Send an unrelated frame FIRST (simulating a leftover METRICS from a
1387 // prior FIRE). Controller must drop it and keep waiting for EVAL_RESULT.
1388 super::write_frame(&mut server, frame_kind::METRICS, &[0u8; 4]).unwrap();
1389 handle_eval_frame(&mut server, &mut interp, &payload).expect("reply");
1390 });
1391 let client = TcpStream::connect(addr).unwrap();
1392 let controller = controller_with_agents(vec![client]);
1393
1394 // If the stale-frame skipping is broken, eval_all hangs or deserializes
1395 // METRICS bytes as EvalResult and prints garbage. Either way the test fails
1396 // via the timeout / agent panic.
1397 let start = Instant::now();
1398 controller.eval_all("42");
1399 assert!(start.elapsed() < Duration::from_secs(5));
1400 handle.join().expect("agent thread");
1401 }
1402
1403 /// Multiple `eval_all` calls against the same controller reuse the persistent
1404 /// per-agent `VMHelper`, so package globals set in call N are visible in N+1.
1405 /// Smoke test for the REPL-style semantics from the user's perspective.
1406 #[test]
1407 fn successive_eval_all_calls_share_per_agent_vm_state() {
1408 // Single agent that handles TWO EVAL frames against the same VM.
1409 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1410 let addr = listener.local_addr().unwrap();
1411 let handle = thread::spawn(move || {
1412 let (mut server, _) = listener.accept().expect("accept");
1413 let mut interp = VMHelper::new();
1414 for _ in 0..2 {
1415 let (kind, payload) = read_frame(&mut server).expect("read EVAL");
1416 assert_eq!(kind, frame_kind::EVAL);
1417 handle_eval_frame(&mut server, &mut interp, &payload).expect("reply");
1418 }
1419 });
1420 let client = TcpStream::connect(addr).unwrap();
1421 let controller = controller_with_agents(vec![client]);
1422
1423 // Frame 1: define a package global.
1424 controller.eval_all("$main::tally = 10; $main::tally");
1425 // Frame 2: read it back. If state didn't persist, we'd see "" / undef.
1426 controller.eval_all("$main::tally + 32");
1427 // We don't have stdout capture here, so the assertion is via the synthetic
1428 // agent thread's panics — it errors if frame deserialisation fails.
1429 handle.join().expect("agent thread");
1430 // Force a real connection close so the agent thread above wakes.
1431 // (Controlled by the controller dropping at end of scope.)
1432 // The test passes if the agent handled exactly 2 frames without panicking.
1433 }
1434
1435 /// `Arc<Controller>` clones share the agents map — the accept loop holds one
1436 /// clone and the REPL thread holds another. Sanity check that eval_all on a
1437 /// cloned controller sees the same agents as the original.
1438 #[test]
1439 fn arc_clone_shares_agents_map() {
1440 let (s, h) = spawn_synthetic_agent(Duration::from_millis(0));
1441 let controller = Arc::new(controller_with_agents(vec![s]));
1442 let clone = Arc::clone(&controller);
1443 clone.eval_all("99");
1444 h.join().expect("agent thread");
1445 // Sanity: the agent map carries one entry after the eval.
1446 assert_eq!(controller.agents.lock().unwrap().len(), 1);
1447 }
1448}