Skip to main content

stryke/
controller.rs

1//! `stryke controller` — Interactive REPL for coordinating stress test agents.
2//!
3//! ## Usage
4//!
5//! ```sh
6//! stryke controller                    # listen on 0.0.0.0:9999
7//! stryke controller --port 8888        # custom port
8//! stryke controller --bind 10.0.0.1    # specific interface
9//! ```
10//!
11//! ## Commands
12//!
13//! - `status` — list connected agents
14//! - `fire [duration]` — start stress test on all agents
15//! - `fire node1,node2 [duration]` — specific agents
16//! - `terminate` — stop stress test
17//! - `shutdown` — disconnect all agents and exit
18//! - `help` — show commands
19
20use std::collections::HashMap;
21use std::io::{Read, Write};
22use std::net::{TcpListener, TcpStream};
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, Mutex, OnceLock};
25use std::thread;
26use std::time::Instant;
27
28use crate::agent::{
29    frame_kind, AgentHello, AgentHelloAck, AgentState, EvalCommand, EvalResult, FireCommand,
30    WorkloadType, AGENT_PROTO_VERSION,
31};
32use std::time::Duration;
33
34/// Connected agent state
35struct ConnectedAgent {
36    stream: TcpStream,
37    hostname: String,
38    cores: usize,
39    memory_bytes: u64,
40    agent_name: Option<String>,
41    state: AgentState,
42    #[allow(dead_code)]
43    session_id: u64,
44    connected_at: Instant,
45}
46
47/// Controller state
48pub struct Controller {
49    /// `agents` field.
50    agents: Arc<Mutex<HashMap<u64, ConnectedAgent>>>,
51    /// `next_session_id` field.
52    next_session_id: AtomicU64,
53    /// `running` field.
54    running: AtomicBool,
55    /// Active chants — fired at each new agent registered by accept_loop.
56    /// Lives on Controller (not ControllerHandle) so accept_loop can read
57    /// it via `self` without a separate channel.
58    chants: Arc<Mutex<HashMap<u64, String>>>,
59    /// :cloistered mode — when true, accept_loop rejects agents whose
60    /// AGENT_AUTH frame doesn't carry a token in `auth_tokens`. When
61    /// false (default), agents bypass the AUTH check entirely.
62    cloistered: AtomicBool,
63    /// Valid AUTH tokens for cloistered mode. Populated by
64    /// `set_cloistered(token)` and checked in accept_loop against the
65    /// incoming AGENT_AUTH frame's token field.
66    auth_tokens: Arc<Mutex<std::collections::HashSet<String>>>,
67    /// When true, accept_loop suppresses the per-agent "[agent connected]"
68    /// eprintln. Used during bulk-spawn (large `congregation(N)` /
69    /// `anoint(N)`) where the main thread is in a tight fork loop and a
70    /// concurrent eprintln from this background thread can leave the
71    /// child with a borrowed `std::io::stderr` RefCell — guaranteed
72    /// panic on the child's first stdio call. Toggled via
73    /// [`ControllerHandle::set_quiet_accept`].
74    quiet_accept: AtomicBool,
75}
76
77impl Default for Controller {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl Controller {
84    /// `new` — see implementation.
85    pub fn new() -> Self {
86        Self {
87            agents: Arc::new(Mutex::new(HashMap::new())),
88            next_session_id: AtomicU64::new(1),
89            running: AtomicBool::new(true),
90            chants: Arc::new(Mutex::new(HashMap::new())),
91            cloistered: AtomicBool::new(false),
92            auth_tokens: Arc::new(Mutex::new(std::collections::HashSet::new())),
93            quiet_accept: AtomicBool::new(false),
94        }
95    }
96
97    /// Accept incoming agent connections
98    fn accept_loop(&self, listener: TcpListener) {
99        for stream in listener.incoming() {
100            if !self.running.load(Ordering::Relaxed) {
101                break;
102            }
103
104            match stream {
105                Ok(mut stream) => {
106                    let session_id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
107
108                    // Read AGENT_HELLO
109                    let (kind, payload) = match read_frame(&mut stream) {
110                        Ok(f) => f,
111                        Err(e) => {
112                            eprintln!("controller: failed to read hello: {}", e);
113                            continue;
114                        }
115                    };
116
117                    if kind != frame_kind::AGENT_HELLO {
118                        eprintln!("controller: expected AGENT_HELLO, got {}", kind);
119                        continue;
120                    }
121
122                    let hello: AgentHello = match bincode::deserialize(&payload) {
123                        Ok(h) => h,
124                        Err(e) => {
125                            eprintln!("controller: invalid hello: {}", e);
126                            continue;
127                        }
128                    };
129
130                    if hello.proto_version != AGENT_PROTO_VERSION {
131                        let ack = AgentHelloAck {
132                            session_id: 0,
133                            accepted: false,
134                            message: format!(
135                                "protocol version mismatch: got {}, expected {}",
136                                hello.proto_version, AGENT_PROTO_VERSION
137                            ),
138                        };
139                        let ack_bytes = bincode::serialize(&ack).unwrap();
140                        let _ = write_frame(&mut stream, frame_kind::AGENT_HELLO_ACK, &ack_bytes);
141                        continue;
142                    }
143
144                    let name = hello
145                        .agent_name
146                        .clone()
147                        .unwrap_or_else(|| hello.hostname.clone());
148
149                    // :cloistered mode — require an AGENT_AUTH frame with
150                    // a valid token within 500ms of HELLO. Agents in open
151                    // mode don't send AUTH; we'd block forever waiting if
152                    // we required it unconditionally, so this read only
153                    // happens when cloistered is true. The check happens
154                    // BEFORE we send the success ACK so rejected agents
155                    // get a single accepted=false ACK, not an accepted=true
156                    // followed by a rejection.
157                    if self.cloistered.load(Ordering::Relaxed) {
158                        let _ = stream.set_read_timeout(Some(Duration::from_millis(500)));
159                        let auth_result = read_frame(&mut stream);
160                        let _ = stream.set_read_timeout(None);
161                        let auth_token: Option<String> = match auth_result {
162                            Ok((frame_kind::AGENT_AUTH, payload)) => {
163                                bincode::deserialize::<crate::agent::AgentAuth>(&payload)
164                                    .ok()
165                                    .map(|a| a.token)
166                            }
167                            _ => None,
168                        };
169                        let valid = auth_token
170                            .as_ref()
171                            .map(|tok| self.auth_tokens.lock().unwrap().contains(tok))
172                            .unwrap_or(false);
173                        if !valid {
174                            eprintln!("[cloistered] rejected agent {} — no/bad AUTH token", name);
175                            let rej = AgentHelloAck {
176                                session_id: 0,
177                                accepted: false,
178                                message: "cloistered: missing or invalid AUTH token".into(),
179                            };
180                            if let Ok(rb) = bincode::serialize(&rej) {
181                                let _ = write_frame(&mut stream, frame_kind::AGENT_HELLO_ACK, &rb);
182                            }
183                            continue;
184                        }
185                    }
186
187                    // Send accepted-HELLO_ACK now that any cloistered check
188                    // has passed.
189                    let ack = AgentHelloAck {
190                        session_id,
191                        accepted: true,
192                        message: "connected".to_string(),
193                    };
194                    let ack_bytes = bincode::serialize(&ack).unwrap();
195                    if let Err(e) =
196                        write_frame(&mut stream, frame_kind::AGENT_HELLO_ACK, &ack_bytes)
197                    {
198                        eprintln!("controller: failed to send hello ack: {}", e);
199                        continue;
200                    }
201
202                    if !self.quiet_accept.load(Ordering::Relaxed) {
203                        eprintln!(
204                            "[agent connected] {} (cores={}, session={})",
205                            name, hello.cores, session_id
206                        );
207                    }
208
209                    let agent = ConnectedAgent {
210                        stream,
211                        hostname: hello.hostname,
212                        cores: hello.cores,
213                        memory_bytes: hello.memory_bytes,
214                        agent_name: hello.agent_name,
215                        state: AgentState::Idle,
216                        session_id,
217                        connected_at: Instant::now(),
218                    };
219
220                    // Insert into roster, then fire any active chants at
221                    // this new joiner so late-comers receive the same
222                    // ongoing prayers as everyone else.
223                    {
224                        let mut agents = self.agents.lock().unwrap();
225                        agents.insert(session_id, agent);
226                        // Drop the lock before issuing chants — fire_chant
227                        // reacquires it.
228                    }
229                    self.fire_chants_at(session_id);
230                }
231                Err(e) => {
232                    if self.running.load(Ordering::Relaxed) {
233                        eprintln!("controller: accept error: {}", e);
234                    }
235                }
236            }
237        }
238    }
239
240    /// Fire every currently-active chant at the named agent. Called by
241    /// `accept_loop` after a new agent is registered so late joiners
242    /// receive the same continuous prayers as everyone else. Errors
243    /// (write failures from a disconnecting agent) are silently swallowed
244    /// — same convention as `scatter`.
245    fn fire_chants_at(&self, session_id: u64) {
246        // Snapshot the chant codes so we don't hold the chants lock while
247        // doing IO under the agents lock.
248        let chant_codes: Vec<String> = {
249            let chants = self.chants.lock().unwrap();
250            chants.values().cloned().collect()
251        };
252        if chant_codes.is_empty() {
253            return;
254        }
255        let mut agents = self.agents.lock().unwrap();
256        let agent = match agents.get_mut(&session_id) {
257            Some(a) => a,
258            None => return,
259        };
260        for code in chant_codes {
261            let cmd = EvalCommand { code };
262            if let Ok(bytes) = bincode::serialize(&cmd) {
263                let _ = write_frame(&mut agent.stream, frame_kind::EVAL, &bytes);
264            }
265        }
266    }
267
268    /// Send FIRE to all agents
269    fn fire_all(&self, duration_secs: f64) {
270        let cmd = FireCommand {
271            workload: WorkloadType::Cpu,
272            duration_secs,
273            intensity: 1.0,
274        };
275        let cmd_bytes = bincode::serialize(&cmd).unwrap();
276
277        let mut agents = self.agents.lock().unwrap();
278        let mut fired = 0;
279
280        for agent in agents.values_mut() {
281            if write_frame(&mut agent.stream, frame_kind::FIRE, &cmd_bytes).is_ok() {
282                agent.state = AgentState::Firing;
283                fired += 1;
284            }
285        }
286
287        eprintln!("[fire] {} agents, duration={}s", fired, duration_secs);
288    }
289
290    /// Send EVAL to every connected agent, then synchronously collect each agent's
291    /// `EvalResult` and print it to stdout. Per-agent the path is request/response:
292    /// stale frames from previous commands (`METRICS` after a long-running `FIRE`,
293    /// etc.) are quietly skipped so the next visible line is always the eval result.
294    /// A 30-second read timeout guards against agents that ignore the frame entirely
295    /// (e.g. an old agent version with no EVAL handler).
296    ///
297    /// Output ordering: agents are visited in **stable alphabetical order by display
298    /// name** (agent_name if set, else hostname) so successive controllers and
299    /// successive `@eval` calls within one controller produce comparable transcripts.
300    /// HashMap iteration order would shuffle per controller run otherwise (Rust
301    /// randomizes the hash seed per process).
302    ///
303    /// Multi-line output is prefixed **per line**: each `\n`-separated line of an
304    /// agent's stringified result carries its own `[name/ok|ERR]` tag so grepping
305    /// or diffing transcripts by agent stays trivial regardless of result shape.
306    ///
307    /// **Concurrent execution.** Done as a two-pass loop with no threading or
308    /// concurrency primitives:
309    ///
310    ///   * **Pass 1** writes the EVAL frame to every agent in rapid succession
311    ///     (each `write_frame` is just a kernel send, no waiting for the reply).
312    ///     By the end of pass 1 every agent is already executing in parallel.
313    ///   * **Pass 2** reads the EVAL_RESULT back from each agent in the same
314    ///     sorted order.
315    ///
316    /// Total wall time = max(per-agent latency), not sum — three agents that
317    /// each take 5 s now finish in ~5 s wall, not 15. Output stays alphabetical
318    /// because pass 2 reads in the same order pass 1 wrote.
319    fn eval_all(&self, code: &str) {
320        let cmd = EvalCommand {
321            code: code.to_string(),
322        };
323        let cmd_bytes = bincode::serialize(&cmd).expect("serialize EvalCommand");
324
325        let mut agents = self.agents.lock().unwrap();
326        if agents.is_empty() {
327            println!("[eval] no agents connected");
328            return;
329        }
330
331        // Build a stable visit order by display name. Done inside the mutex guard
332        // so the (id → name) snapshot can't be invalidated by an accept thread.
333        let mut order: Vec<(u64, String)> = agents
334            .iter()
335            .map(|(id, a)| {
336                let name = a.agent_name.clone().unwrap_or_else(|| a.hostname.clone());
337                (*id, name)
338            })
339            .collect();
340        order.sort_by(|a, b| a.1.cmp(&b.1));
341
342        // Pass 1 — fan out: write EVAL to every agent, set its read timeout.
343        // Tracks (id, name) pairs we successfully dispatched to, so pass 2 only
344        // tries to read from agents that actually received the frame.
345        let mut dispatched: Vec<(u64, String)> = Vec::with_capacity(order.len());
346        for (id, name) in &order {
347            let agent = match agents.get_mut(id) {
348                Some(a) => a,
349                None => continue,
350            };
351            if let Err(e) = write_frame(&mut agent.stream, frame_kind::EVAL, &cmd_bytes) {
352                print_tagged(name, "ERR", &format!("write error: {}", e));
353                continue;
354            }
355            let _ = agent.stream.set_read_timeout(Some(Duration::from_secs(30)));
356            dispatched.push((*id, name.clone()));
357        }
358        // At this point every dispatched agent is executing concurrently.
359
360        // Pass 2 — collect: read EVAL_RESULT from each agent in sorted order.
361        for (id, name) in &dispatched {
362            let agent = match agents.get_mut(id) {
363                Some(a) => a,
364                None => continue,
365            };
366            loop {
367                match read_frame(&mut agent.stream) {
368                    Ok((frame_kind::EVAL_RESULT, payload)) => {
369                        match bincode::deserialize::<EvalResult>(&payload) {
370                            Ok(r) => {
371                                let tag = if r.ok { "ok" } else { "ERR" };
372                                print_tagged(name, tag, &r.output);
373                            }
374                            Err(e) => {
375                                print_tagged(name, "ERR", &format!("malformed EVAL_RESULT: {}", e))
376                            }
377                        }
378                        break;
379                    }
380                    Ok((other_kind, _)) => {
381                        eprintln!(
382                            "[{}] (dropped stale frame kind 0x{:02X} while awaiting EVAL_RESULT)",
383                            name, other_kind
384                        );
385                    }
386                    Err(e) => {
387                        print_tagged(name, "ERR", &format!("read error: {}", e));
388                        break;
389                    }
390                }
391            }
392            let _ = agent.stream.set_read_timeout(None);
393        }
394    }
395
396    /// Send TERMINATE to all agents
397    fn terminate_all(&self) {
398        let mut agents = self.agents.lock().unwrap();
399        let mut terminated = 0;
400
401        for agent in agents.values_mut() {
402            if write_frame(&mut agent.stream, frame_kind::TERMINATE, &[]).is_ok() {
403                agent.state = AgentState::Terminated;
404                terminated += 1;
405            }
406        }
407
408        eprintln!("[terminate] {} agents", terminated);
409    }
410
411    /// Print status of all agents
412    fn print_status(&self) {
413        let agents = self.agents.lock().unwrap();
414
415        if agents.is_empty() {
416            println!("No agents connected.");
417            return;
418        }
419
420        println!(
421            "{:<20} {:>6} {:>10} {:>12} {:>10}",
422            "AGENT", "CORES", "MEMORY", "STATE", "UPTIME"
423        );
424        println!("{}", "-".repeat(62));
425
426        for agent in agents.values() {
427            let name = agent
428                .agent_name
429                .clone()
430                .unwrap_or_else(|| agent.hostname.clone());
431            let mem_gb = agent.memory_bytes / (1024 * 1024 * 1024);
432            let state = match agent.state {
433                AgentState::Idle => "idle",
434                AgentState::Armed => "armed",
435                AgentState::Firing => "FIRING",
436                AgentState::Terminated => "terminated",
437            };
438            let uptime = agent.connected_at.elapsed().as_secs();
439
440            println!(
441                "{:<20} {:>6} {:>8}GB {:>12} {:>8}s",
442                name, agent.cores, mem_gb, state, uptime
443            );
444        }
445
446        let total_cores: usize = agents.values().map(|a| a.cores).sum();
447        let firing_count = agents
448            .values()
449            .filter(|a| a.state == AgentState::Firing)
450            .count();
451
452        println!();
453        println!(
454            "Total: {} agents, {} cores, {} firing",
455            agents.len(),
456            total_cores,
457            firing_count
458        );
459    }
460
461    /// Send SHUTDOWN to all agents
462    fn shutdown_all(&self) {
463        let mut agents = self.agents.lock().unwrap();
464
465        for agent in agents.values_mut() {
466            let _ = write_frame(&mut agent.stream, frame_kind::SHUTDOWN, &[]);
467        }
468
469        agents.clear();
470        self.running.store(false, Ordering::Relaxed);
471        eprintln!("[shutdown] all agents disconnected");
472    }
473
474    /// Run the REPL
475    fn run_repl(&self) {
476        use std::io::{stdin, BufRead};
477
478        println!("stryke controller v{}", env!("CARGO_PKG_VERSION"));
479        println!("Type 'help' for commands, Ctrl-C to exit\n");
480
481        let stdin = stdin();
482        for line in stdin.lock().lines() {
483            let line = match line {
484                Ok(l) => l,
485                Err(_) => break,
486            };
487
488            // `@` prefix: ship the rest of the line as stryke source to every
489            // agent. Matches the sigil the user already associates with `@` in
490            // the language, and saves four keystrokes vs the explicit `eval`
491            // verb. `@   code`, `@code`, `@code with args` all work — the `@`
492            // is stripped and the remainder is sent verbatim.
493            let trimmed = line.trim_start();
494            if let Some(rest) = trimmed.strip_prefix('@') {
495                let code = rest.trim();
496                if code.is_empty() {
497                    println!("usage: @CODE  (alias for `eval CODE`)");
498                } else {
499                    self.eval_all(code);
500                }
501                continue;
502            }
503
504            let parts: Vec<&str> = line.split_whitespace().collect();
505            if parts.is_empty() {
506                continue;
507            }
508
509            match parts[0] {
510                "status" | "s" => self.print_status(),
511                "fire" | "f" => {
512                    let duration = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(10.0);
513                    self.fire_all(duration);
514                }
515                "eval" | "e" => {
516                    // Everything after the verb (preserving inner whitespace) is the source.
517                    let after = line
518                        .trim_start()
519                        .splitn(2, char::is_whitespace)
520                        .nth(1)
521                        .unwrap_or("")
522                        .trim();
523                    if after.is_empty() {
524                        println!("usage: eval CODE  (sends CODE to every connected agent for execution against its persistent VM)");
525                    } else {
526                        self.eval_all(after);
527                    }
528                }
529                "terminate" | "t" | "stop" => self.terminate_all(),
530                "shutdown" | "quit" | "exit" | "q" => {
531                    self.shutdown_all();
532                    break;
533                }
534                "help" | "h" | "?" => {
535                    println!("Commands:");
536                    println!("  status (s)           List connected agents");
537                    println!("  fire [SECS] (f)      Start stress test (default: 10s)");
538                    println!("  eval CODE (e)        Run arbitrary stryke source on every agent (state persists across calls)");
539                    println!("  @CODE                Shorthand for `eval CODE` — `@<source>` ships <source> to every agent");
540                    println!("  terminate (t)        Stop stress test");
541                    println!("  shutdown (q)         Disconnect all and exit");
542                    println!("  help (h)             Show this help");
543                }
544                _ => println!("Unknown command: {}. Type 'help' for commands.", parts[0]),
545            }
546        }
547    }
548}
549
550/// Read a framed message
551/// Print `output` to stdout with every line prefixed `[name/tag] `. Empty
552/// output produces a single bare `[name/tag]` line so the caller always sees
553/// a row per agent even when the eval returned void.
554fn print_tagged(name: &str, tag: &str, output: &str) {
555    let stdout = std::io::stdout();
556    let mut handle = stdout.lock();
557    let _ = write_tagged(&mut handle, name, tag, output);
558}
559
560/// Inner workhorse for [`print_tagged`], generic over `Write` so tests can
561/// observe the exact bytes that go to stdout.
562fn write_tagged<W: Write>(w: &mut W, name: &str, tag: &str, output: &str) -> std::io::Result<()> {
563    if output.is_empty() {
564        writeln!(w, "[{}/{}]", name, tag)?;
565        return Ok(());
566    }
567    for ln in output.lines() {
568        writeln!(w, "[{}/{}] {}", name, tag, ln)?;
569    }
570    // Preserve a trailing newline in the source output (e.g. `p "x"` ends with \n)
571    // by emitting a bare prefixed line, so successive evals don't visually run
572    // into each other.
573    if output.ends_with('\n') {
574        writeln!(w, "[{}/{}]", name, tag)?;
575    }
576    Ok(())
577}
578
579fn read_frame<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
580    let mut len_buf = [0u8; 8];
581    r.read_exact(&mut len_buf)?;
582    let len = u64::from_le_bytes(len_buf) as usize;
583    if len < 1 {
584        return Err(std::io::Error::new(
585            std::io::ErrorKind::InvalidData,
586            "empty frame",
587        ));
588    }
589    let mut payload = vec![0u8; len];
590    r.read_exact(&mut payload)?;
591    let kind = payload[0];
592    Ok((kind, payload[1..].to_vec()))
593}
594
595/// Write a framed message
596fn write_frame<W: Write>(w: &mut W, kind: u8, payload: &[u8]) -> std::io::Result<()> {
597    let total_len = 1 + payload.len();
598    w.write_all(&(total_len as u64).to_le_bytes())?;
599    w.write_all(&[kind])?;
600    w.write_all(payload)?;
601    w.flush()
602}
603
604/// Main entry point — back-compat wrapper that delegates to
605/// [`spawn_controller`] + [`ControllerHandle::run_repl_blocking`]. Preserves
606/// the historical CLI behaviour: bind, accept agents in a background thread,
607/// run the interactive REPL on the main thread, cleanly join the accept thread
608/// on REPL exit. Scripts that want non-REPL programmatic access use
609/// [`spawn_controller`] directly.
610pub fn run_controller(bind: &str, port: u16) -> i32 {
611    let handle = match spawn_controller(bind, port) {
612        Ok(h) => h,
613        Err(e) => {
614            eprintln!("controller: cannot bind to {}:{}: {}", bind, port, e);
615            return 1;
616        }
617    };
618
619    eprintln!("stryke controller listening on {}", handle.listen_addr());
620    eprintln!("Waiting for agents...\n");
621
622    handle.run_repl_blocking();
623    0
624}
625
626// ============================================================================
627//                  Programmatic / Scriptable API (Tier 0)
628// ============================================================================
629//
630// Lets `.stk` scripts drive the controller without the REPL. Builtins in
631// `builtins.rs` (`ordain`, `muster`, `pray`, `annex`) wrap these methods and
632// expose opaque integer IDs to script code via the registries below.
633//
634// Semantics:
635//   * `spawn_controller(bind, port)` — bind listener, start accept thread,
636//     return `Arc<ControllerHandle>`. Non-blocking.
637//   * `ControllerHandle::muster()` — current session IDs of connected agents.
638//   * `ControllerHandle::welcome(n, timeout)` — block until >= n agents have
639//     connected, or the timeout elapses.
640//   * `ControllerHandle::scatter(code, &[id])` — write EVAL to each agent in
641//     parallel (fan-out pass), return a `petition_id`. Agents start executing
642//     immediately; results are NOT collected here.
643//   * `ControllerHandle::gather(petition_id, timeout)` — read EVAL_RESULT
644//     from every agent dispatched in that scatter, in parallel, return a
645//     HashMap<session_id, EvalResult>. Removes the divination from the
646//     pending table once consumed (so the same petition_id can't be
647//     gathered twice).
648//   * `ControllerHandle::shutdown()` — send SHUTDOWN to all agents, mark
649//     accept loop done, wake it with a self-connect, join the accept thread.
650//
651// Threading note: scatter + gather both lock the `agents` map for the duration
652// of the operation. This serializes against the accept loop briefly, but lets
653// us reuse the same TcpStream for both write and read passes without cloning
654// or per-agent reader threads. Multi-outstanding-petition concurrency is a
655// Tier 1 problem.
656
657/// One outstanding scatter. Records the session IDs the EVAL was actually
658/// written to so `gather` only reads from agents that received the frame.
659struct DivinationState {
660    dispatched: Vec<u64>,
661}
662
663/// Non-blocking handle to a running [`Controller`]. Returned by
664/// [`spawn_controller`]; used by the scriptable builtins to drive the
665/// distributed compute fabric from `.stk` code.
666pub struct ControllerHandle {
667    /// `controller` field.
668    controller: Arc<Controller>,
669    /// `listen_addr` field.
670    listen_addr: std::net::SocketAddr,
671    /// `accept_handle` field.
672    accept_handle: Mutex<Option<thread::JoinHandle<()>>>,
673    /// `next_petition_id` field.
674    next_petition_id: AtomicU64,
675    /// `pending_divinations` field.
676    pending_divinations: Mutex<HashMap<u64, DivinationState>>,
677}
678
679impl ControllerHandle {
680    /// Returns the address the listener is actually bound to (port may have
681    /// been auto-assigned if 0 was passed in).
682    pub fn listen_addr(&self) -> std::net::SocketAddr {
683        self.listen_addr
684    }
685
686    /// Current count of connected agents.
687    pub fn agent_count(&self) -> usize {
688        self.controller.agents.lock().unwrap().len()
689    }
690
691    /// Return session IDs of all currently connected agents, in numerically
692    /// sorted order (deterministic for tests and scripts).
693    pub fn muster(&self) -> Vec<u64> {
694        let mut ids: Vec<u64> = self
695            .controller
696            .agents
697            .lock()
698            .unwrap()
699            .keys()
700            .copied()
701            .collect();
702        ids.sort_unstable();
703        ids
704    }
705
706    /// Block until at least `target_count` agents are connected, or `timeout`
707    /// elapses. Returns `true` if the count was reached. Polls every 50ms.
708    pub fn welcome(&self, target_count: usize, timeout: Duration) -> bool {
709        let start = Instant::now();
710        loop {
711            if self.agent_count() >= target_count {
712                return true;
713            }
714            if start.elapsed() >= timeout {
715                return self.agent_count() >= target_count;
716            }
717            thread::sleep(Duration::from_millis(50));
718        }
719    }
720
721    /// Fan EVAL out to `agent_ids` in parallel. Returns a `petition_id` used
722    /// later with [`gather`](Self::gather) to collect results. Agents that
723    /// don't exist in the roster or whose write fails are silently dropped
724    /// from the dispatched set, so a stale agent_id in `agent_ids` does NOT
725    /// cause the whole scatter to fail.
726    ///
727    /// Returns `Err` only if the EvalCommand fails to bincode-serialize
728    /// (which should never happen for plain strings).
729    pub fn scatter(&self, code: &str, agent_ids: &[u64]) -> std::io::Result<u64> {
730        use rayon::prelude::*;
731
732        let cmd = EvalCommand {
733            code: code.to_string(),
734        };
735        let cmd_bytes = bincode::serialize(&cmd)
736            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
737
738        // Tier 2: parallel fanout. Take the mutex once to grab per-agent
739        // TcpStream clones (sharing the underlying socket), drop the mutex,
740        // then write_frame on each clone in parallel via Rayon. The N
741        // sequential write_frames in the prior impl scaled O(N) at ~10μs
742        // per write; parallel scaling brings 10k-agent fanout from ~100ms
743        // down to ~1ms on an 8-core box.
744        //
745        // Stream clones share the underlying fd — writing to a clone IS
746        // writing to the agent's socket. Reads in `gather` go through the
747        // original (which we kept in the HashMap), so writer and reader
748        // don't conflict.
749        let stream_clones: Vec<(u64, TcpStream)> = {
750            let agents = self.controller.agents.lock().unwrap();
751            agent_ids
752                .iter()
753                .filter_map(|id| {
754                    agents
755                        .get(id)
756                        .and_then(|a| a.stream.try_clone().ok().map(|s| (*id, s)))
757                })
758                .collect()
759        };
760
761        let cmd_bytes = Arc::new(cmd_bytes);
762        let dispatched: Vec<u64> = stream_clones
763            .into_par_iter()
764            .filter_map(|(id, mut stream)| {
765                if write_frame(&mut stream, frame_kind::EVAL, cmd_bytes.as_slice()).is_ok() {
766                    Some(id)
767                } else {
768                    None
769                }
770            })
771            .collect();
772
773        let petition_id = self.next_petition_id.fetch_add(1, Ordering::Relaxed);
774        self.pending_divinations
775            .lock()
776            .unwrap()
777            .insert(petition_id, DivinationState { dispatched });
778        Ok(petition_id)
779    }
780
781    /// Block for results of `petition_id` up to `timeout` (per-agent read
782    /// timeout, not total wall time). Returns a HashMap of session_id →
783    /// EvalResult for every agent that replied with a valid EVAL_RESULT
784    /// frame in time. Agents that timed out, errored, or disconnected are
785    /// omitted from the map.
786    ///
787    /// Stale frames in the per-agent socket buffer (e.g. METRICS left over
788    /// from a prior FIRE) are silently skipped — same loop pattern as
789    /// `eval_all` so manual REPL behaviour stays consistent with scripted.
790    ///
791    /// Removes the divination from the pending table on return, so a
792    /// second `gather` on the same petition_id is an error.
793    pub fn gather(
794        &self,
795        petition_id: u64,
796        timeout: Duration,
797    ) -> std::io::Result<HashMap<u64, EvalResult>> {
798        let state = self
799            .pending_divinations
800            .lock()
801            .unwrap()
802            .remove(&petition_id)
803            .ok_or_else(|| {
804                std::io::Error::new(
805                    std::io::ErrorKind::NotFound,
806                    format!("no pending divination for petition_id {}", petition_id),
807                )
808            })?;
809
810        let mut results: HashMap<u64, EvalResult> = HashMap::new();
811        let mut agents = self.controller.agents.lock().unwrap();
812        for id in &state.dispatched {
813            let agent = match agents.get_mut(id) {
814                Some(a) => a,
815                None => continue, // agent disconnected between scatter and gather
816            };
817            let _ = agent.stream.set_read_timeout(Some(timeout));
818            loop {
819                match read_frame(&mut agent.stream) {
820                    Ok((frame_kind::EVAL_RESULT, payload)) => {
821                        if let Ok(r) = bincode::deserialize::<EvalResult>(&payload) {
822                            results.insert(*id, r);
823                        }
824                        break;
825                    }
826                    Ok(_) => continue, // stale frame, skip
827                    Err(_) => break,   // timeout or disconnect
828                }
829            }
830            let _ = agent.stream.set_read_timeout(None);
831        }
832        Ok(results)
833    }
834
835    /// Register an active chant — a prayer that fires at every current
836    /// agent now AND at every new agent that joins later (via the
837    /// `accept_loop` → `fire_chants_at` path). Returns a chant_id used
838    /// by `amen_chant` to stop the rescatter.
839    ///
840    /// Fire-and-forget: chants don't accumulate replies. Use for state
841    /// distribution (`bestow`-like push to current + future workers).
842    pub fn chant(&self, code: &str, agent_ids: &[u64]) -> std::io::Result<u64> {
843        // Fan out to current agents using the regular scatter machinery
844        // — we just don't register a divination since there's no gather.
845        let cmd = EvalCommand {
846            code: code.to_string(),
847        };
848        let cmd_bytes = bincode::serialize(&cmd)
849            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
850        let mut agents = self.controller.agents.lock().unwrap();
851        for id in agent_ids {
852            if let Some(agent) = agents.get_mut(id) {
853                let _ = write_frame(&mut agent.stream, frame_kind::EVAL, &cmd_bytes);
854            }
855        }
856        drop(agents);
857
858        // Record in active_chants so late joiners get it too.
859        let chant_id = NEXT_CHANT_ID.fetch_add(1, Ordering::Relaxed);
860        self.controller
861            .chants
862            .lock()
863            .unwrap()
864            .insert(chant_id, code.to_string());
865        Ok(chant_id)
866    }
867
868    /// Stop an active chant. Late joiners after this call won't receive it.
869    /// Returns true if the chant was active and removed, false otherwise.
870    pub fn amen_chant(&self, chant_id: u64) -> bool {
871        self.controller
872            .chants
873            .lock()
874            .unwrap()
875            .remove(&chant_id)
876            .is_some()
877    }
878
879    /// Silence the accept_loop's per-agent "[agent connected]" eprintln.
880    /// Set to true before a bulk-spawn loop (`congregation(N)` / `anoint(N)`)
881    /// to prevent the fork-thread/stdio RefCell race that loses 1-3
882    /// children at N>~50 on macOS Rust stdio. Set back to false after
883    /// the spawn loop completes if you want the REPL UX back.
884    pub fn set_quiet_accept(&self, quiet: bool) {
885        self.controller.quiet_accept.store(quiet, Ordering::Relaxed);
886    }
887
888    /// Turn :cloistered mode on (with a single accepted token) or off
889    /// (with an empty token). Cloistered accept_loop reads an AGENT_AUTH
890    /// frame after HELLO and rejects agents that don't present a valid
891    /// token. Open mode bypasses the AUTH read entirely.
892    pub fn set_cloistered(&self, token: Option<&str>) {
893        match token {
894            Some(t) if !t.is_empty() => {
895                self.controller
896                    .auth_tokens
897                    .lock()
898                    .unwrap()
899                    .insert(t.to_string());
900                self.controller.cloistered.store(true, Ordering::Relaxed);
901            }
902            _ => {
903                self.controller.cloistered.store(false, Ordering::Relaxed);
904                self.controller.auth_tokens.lock().unwrap().clear();
905            }
906        }
907    }
908
909    /// Send SHUTDOWN to a specific subset of agents (the `excommunicate` verb).
910    /// Each agent receives a SHUTDOWN frame and exits its loop; the agent's
911    /// TCP connection is dropped. Returns the count of agents that the frame
912    /// was successfully written to (write failures from disconnected agents
913    /// are silently swallowed — same convention as `scatter`).
914    pub fn excommunicate(&self, agent_ids: &[u64]) -> usize {
915        let mut agents = self.controller.agents.lock().unwrap();
916        let mut count = 0;
917        for id in agent_ids {
918            if let Some(agent) = agents.get_mut(id) {
919                if write_frame(&mut agent.stream, frame_kind::SHUTDOWN, &[]).is_ok() {
920                    count += 1;
921                }
922            }
923        }
924        // Best-effort removal from the roster. The accept thread won't see
925        // the disconnect until the OS notices the dropped connection, so we
926        // proactively drop them here.
927        for id in agent_ids {
928            agents.remove(id);
929        }
930        count
931    }
932
933    /// Pilgrimage barrier — scatter `barrier_code` to all `agent_ids` and
934    /// block until every agent that received the frame replies, OR `timeout`
935    /// elapses. Returns `true` if every dispatched agent replied in time.
936    ///
937    /// `barrier_code` is the prayer the agents execute at the barrier; for a
938    /// pure rendezvous, pass `"1"` and the agent's reply is the synchronization
939    /// signal. For computational barriers, pass code that does the work and
940    /// returns when done.
941    pub fn pilgrimage(&self, barrier_code: &str, agent_ids: &[u64], timeout: Duration) -> bool {
942        let petition_id = match self.scatter(barrier_code, agent_ids) {
943            Ok(p) => p,
944            Err(_) => return false,
945        };
946        let expected = self
947            .pending_divinations
948            .lock()
949            .unwrap()
950            .get(&petition_id)
951            .map(|d| d.dispatched.len())
952            .unwrap_or(0);
953        match self.gather(petition_id, timeout) {
954            Ok(results) => results.len() == expected,
955            Err(_) => false,
956        }
957    }
958
959    /// SHUTDOWN every agent, stop the accept loop, join the accept thread.
960    /// Wakes the blocking `listener.incoming()` call by self-connecting to
961    /// the bound address (the connection's HELLO read will fail and the
962    /// accept thread will exit its loop now that `running` is false).
963    pub fn shutdown(&self) {
964        self.controller.shutdown_all();
965        // Wake the accept loop by self-connecting; it'll see running=false
966        // and break. We swallow any connect error — the listener may already
967        // be gone if shutdown_all races with another shutdown call.
968        let _ = TcpStream::connect(self.listen_addr);
969        if let Some(h) = self.accept_handle.lock().unwrap().take() {
970            let _ = h.join();
971        }
972    }
973
974    /// Run the existing REPL on the calling thread, then clean shutdown.
975    /// Used by [`run_controller`] for back-compat with the CLI subcommand.
976    pub fn run_repl_blocking(&self) {
977        self.controller.run_repl();
978        self.controller.running.store(false, Ordering::Relaxed);
979        let _ = TcpStream::connect(self.listen_addr);
980        if let Some(h) = self.accept_handle.lock().unwrap().take() {
981            let _ = h.join();
982        }
983    }
984}
985
986/// Bind a listener and start the accept thread, return a non-blocking handle.
987/// Pass `port = 0` to let the OS pick a free port; recover the chosen one via
988/// [`ControllerHandle::listen_addr`].
989pub fn spawn_controller(bind: &str, port: u16) -> std::io::Result<Arc<ControllerHandle>> {
990    let addr = format!("{}:{}", bind, port);
991    let listener = TcpListener::bind(&addr)?;
992    let listen_addr = listener.local_addr()?;
993
994    let controller = Arc::new(Controller::new());
995    let ctrl_clone = Arc::clone(&controller);
996    let accept_handle = thread::spawn(move || {
997        ctrl_clone.accept_loop(listener);
998    });
999
1000    Ok(Arc::new(ControllerHandle {
1001        controller,
1002        listen_addr,
1003        accept_handle: Mutex::new(Some(accept_handle)),
1004        next_petition_id: AtomicU64::new(1),
1005        pending_divinations: Mutex::new(HashMap::new()),
1006    }))
1007}
1008
1009// ─── Global handle registries (script ↔ Rust bridge) ────────────────────────
1010//
1011// Stryke scripts can't hold `Arc<ControllerHandle>` directly — they only see
1012// `StrykeValue`s. So we stash the live handle in a process-global registry
1013// and hand the script an opaque integer ID. Same pattern for divination
1014// handles (each `pray` call returns a divination ID).
1015//
1016// Both registries are `OnceLock<Mutex<HashMap>>` — initialised on first use,
1017// shared across all threads. The script-visible IDs are monotonic atomics
1018// so they never collide within a single process.
1019
1020static CONTROLLER_REGISTRY: OnceLock<Mutex<HashMap<u64, Arc<ControllerHandle>>>> = OnceLock::new();
1021static NEXT_HANDLE_ID: AtomicU64 = AtomicU64::new(1);
1022
1023fn controller_registry() -> &'static Mutex<HashMap<u64, Arc<ControllerHandle>>> {
1024    CONTROLLER_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1025}
1026
1027/// Register a controller handle; returns a script-visible u64 ID.
1028pub fn register_controller(handle: Arc<ControllerHandle>) -> u64 {
1029    let id = NEXT_HANDLE_ID.fetch_add(1, Ordering::Relaxed);
1030    controller_registry().lock().unwrap().insert(id, handle);
1031    id
1032}
1033
1034/// Look up a controller by its script-visible ID. Returns `None` if the ID
1035/// was never registered or has been unregistered.
1036pub fn get_controller(handle_id: u64) -> Option<Arc<ControllerHandle>> {
1037    controller_registry()
1038        .lock()
1039        .unwrap()
1040        .get(&handle_id)
1041        .map(Arc::clone)
1042}
1043
1044/// Remove a controller from the registry. Caller typically also calls
1045/// `shutdown()` on the returned Arc before dropping it.
1046pub fn unregister_controller(handle_id: u64) -> Option<Arc<ControllerHandle>> {
1047    controller_registry().lock().unwrap().remove(&handle_id)
1048}
1049
1050// ─── Chant ID atomic + registry ──────────────────────────────────────────────
1051//
1052// Chants get their own ID space (separate from divinations) so amen() can
1053// dispatch correctly. The CHANT_REGISTRY maps script-visible chant_id →
1054// controller_id so `amen` can find the right controller's active-chant table.
1055
1056static NEXT_CHANT_ID: AtomicU64 = AtomicU64::new(1);
1057static CHANT_REGISTRY: OnceLock<Mutex<HashMap<u64, (u64, u64)>>> = OnceLock::new();
1058
1059fn chant_registry() -> &'static Mutex<HashMap<u64, (u64, u64)>> {
1060    CHANT_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1061}
1062
1063/// Register a chant in the global registry; returns the script-visible
1064/// chant_id. The pair stored is (controller_id, controller_local_chant_id).
1065pub fn register_chant(controller_id: u64, local_chant_id: u64) -> u64 {
1066    let id = NEXT_CHANT_ID.fetch_add(1, Ordering::Relaxed);
1067    chant_registry()
1068        .lock()
1069        .unwrap()
1070        .insert(id, (controller_id, local_chant_id));
1071    id
1072}
1073/// `get_chant` — see implementation.
1074pub fn get_chant(chant_id: u64) -> Option<(u64, u64)> {
1075    chant_registry().lock().unwrap().get(&chant_id).copied()
1076}
1077/// `unregister_chant` — see implementation.
1078pub fn unregister_chant(chant_id: u64) -> Option<(u64, u64)> {
1079    chant_registry().lock().unwrap().remove(&chant_id)
1080}
1081
1082// ─── Cathedral — in-process named congregation registry ─────────────────────
1083//
1084// Maps "congregation name" → controller endpoint (host:port). Masters
1085// register themselves at `ordain("name", ...)`; slaves look up the
1086// endpoint at `profess("name")` and connect.
1087//
1088// In-process registry only (not a separate daemon). Tier 5+ work would
1089// promote this to a `stryked` standalone binary so cross-host congregations
1090// can resolve names; for now everything is within one shared OS-image
1091// process address space.
1092
1093static CATHEDRAL: OnceLock<Mutex<HashMap<String, String>>> = OnceLock::new();
1094
1095fn cathedral() -> &'static Mutex<HashMap<String, String>> {
1096    CATHEDRAL.get_or_init(|| Mutex::new(HashMap::new()))
1097}
1098
1099/// Register a congregation name → endpoint binding. Returns the previous
1100/// binding if any (so caller can detect collisions if it cares).
1101pub fn cathedral_register(name: &str, endpoint: &str) -> Option<String> {
1102    cathedral()
1103        .lock()
1104        .unwrap()
1105        .insert(name.to_string(), endpoint.to_string())
1106}
1107
1108/// Look up a congregation name → endpoint. Returns None if unregistered.
1109pub fn cathedral_lookup(name: &str) -> Option<String> {
1110    cathedral().lock().unwrap().get(name).cloned()
1111}
1112
1113/// Remove a name from the registry. Returns the endpoint that was bound.
1114pub fn cathedral_unregister(name: &str) -> Option<String> {
1115    cathedral().lock().unwrap().remove(name)
1116}
1117
1118/// Enumerate registered congregation names (sorted).
1119pub fn cathedral_names() -> Vec<String> {
1120    let mut names: Vec<String> = cathedral().lock().unwrap().keys().cloned().collect();
1121    names.sort();
1122    names
1123}
1124
1125// ─── Divination registry: divination_id → (controller_id, petition_id) ──────
1126
1127static DIVINATION_REGISTRY: OnceLock<Mutex<HashMap<u64, (u64, u64)>>> = OnceLock::new();
1128static NEXT_DIVINATION_ID: AtomicU64 = AtomicU64::new(1);
1129
1130fn divination_registry() -> &'static Mutex<HashMap<u64, (u64, u64)>> {
1131    DIVINATION_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
1132}
1133
1134/// Register a divination; returns a script-visible u64 ID that resolves back
1135/// to (controller_id, petition_id) via [`get_divination`].
1136pub fn register_divination(controller_id: u64, petition_id: u64) -> u64 {
1137    let id = NEXT_DIVINATION_ID.fetch_add(1, Ordering::Relaxed);
1138    divination_registry()
1139        .lock()
1140        .unwrap()
1141        .insert(id, (controller_id, petition_id));
1142    id
1143}
1144
1145/// Look up a divination's (controller_id, petition_id) pair.
1146pub fn get_divination(divination_id: u64) -> Option<(u64, u64)> {
1147    divination_registry()
1148        .lock()
1149        .unwrap()
1150        .get(&divination_id)
1151        .copied()
1152}
1153
1154/// Remove a divination from the registry. Returns the (controller_id,
1155/// petition_id) pair so the caller can route the actual gather request.
1156pub fn unregister_divination(divination_id: u64) -> Option<(u64, u64)> {
1157    divination_registry().lock().unwrap().remove(&divination_id)
1158}
1159
1160// ─── Current-controller tracking for ergonomic script API ───────────────────
1161//
1162// Scripts that work with a single congregation (the overwhelmingly common
1163// case) shouldn't have to thread a controller handle through every `pray` /
1164// `muster` / `annex` call. We stash the most-recently-created controller
1165// here and `pray` / `muster` / `annex` fall back to it when no explicit
1166// controller is named.
1167//
1168// Scripts juggling multiple concurrent congregations pass the controller
1169// handle explicitly via the low-level `ordain` return value.
1170
1171static CURRENT_CONTROLLER_ID: AtomicU64 = AtomicU64::new(0);
1172
1173/// Make `controller_id` the implicit target for subsequent `pray` / `muster`
1174/// / `annex` calls that don't name a controller.
1175pub fn set_current_controller(controller_id: u64) {
1176    CURRENT_CONTROLLER_ID.store(controller_id, Ordering::Relaxed);
1177}
1178
1179/// Return the current implicit controller, or `None` if no congregation /
1180/// ordination has happened yet in this process.
1181pub fn get_current_controller() -> Option<u64> {
1182    let id = CURRENT_CONTROLLER_ID.load(Ordering::Relaxed);
1183    if id == 0 {
1184        None
1185    } else {
1186        Some(id)
1187    }
1188}
1189
1190/// Print controller help
1191pub fn print_help() {
1192    println!("stryke controller — Distributed load testing controller");
1193    println!();
1194    println!("USAGE:");
1195    println!("    stryke controller [OPTIONS]");
1196    println!();
1197    println!("OPTIONS:");
1198    println!("    --bind ADDR          Bind address (default: 0.0.0.0)");
1199    println!("    --port PORT          Listen port (default: 9999)");
1200    println!("    --help               Print this help");
1201    println!();
1202    println!("COMMANDS (in REPL):");
1203    println!("    status               List connected agents");
1204    println!("    fire [SECS]          Start stress test (default: 10 seconds)");
1205    println!("    terminate            Stop stress test");
1206    println!("    shutdown             Disconnect all agents and exit");
1207    println!();
1208    println!("EXAMPLE:");
1209    println!("    stryke controller --port 9999");
1210    println!();
1211    println!("    controller> status");
1212    println!("    controller> fire 60      # 60 second stress test");
1213    println!("    controller> terminate");
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218    use super::*;
1219    use crate::agent::{frame_kind, handle_eval_frame, read_frame};
1220    use crate::vm_helper::VMHelper;
1221    use std::net::{TcpListener, TcpStream};
1222    use std::sync::Arc;
1223    use std::thread;
1224    use std::time::{Duration, Instant};
1225
1226    fn s(name: &str, tag: &str, output: &str) -> String {
1227        let mut buf = Vec::new();
1228        write_tagged(&mut buf, name, tag, output).unwrap();
1229        String::from_utf8(buf).unwrap()
1230    }
1231
1232    /// Spawn a synthetic agent on a fresh loopback port that:
1233    ///   1. accepts one connection,
1234    ///   2. reads one EVAL frame,
1235    ///   3. sleeps `delay`,
1236    ///   4. replies with an EVAL_RESULT computed against a real `VMHelper`.
1237    /// Returns the controller-side `TcpStream` and the handle for the worker.
1238    fn spawn_synthetic_agent(delay: Duration) -> (TcpStream, thread::JoinHandle<()>) {
1239        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1240        let addr = listener.local_addr().unwrap();
1241        let handle = thread::spawn(move || {
1242            let (mut server, _) = listener.accept().expect("accept");
1243            let mut interp = VMHelper::new();
1244            let (kind, payload) = read_frame(&mut server).expect("read EVAL");
1245            assert_eq!(kind, frame_kind::EVAL);
1246            thread::sleep(delay);
1247            handle_eval_frame(&mut server, &mut interp, &payload).expect("reply");
1248        });
1249        let client = TcpStream::connect(addr).expect("connect");
1250        (client, handle)
1251    }
1252
1253    /// Build a `Controller` populated with the supplied mock agents (one
1254    /// `ConnectedAgent` per entry, ids 1..N, names "agent-NN").
1255    fn controller_with_agents(streams: Vec<TcpStream>) -> Controller {
1256        let controller = Controller::new();
1257        let mut agents = controller.agents.lock().unwrap();
1258        for (i, stream) in streams.into_iter().enumerate() {
1259            let id = (i + 1) as u64;
1260            agents.insert(
1261                id,
1262                ConnectedAgent {
1263                    stream,
1264                    hostname: "localhost".to_string(),
1265                    cores: 1,
1266                    memory_bytes: 0,
1267                    agent_name: Some(format!("agent-{:02}", id)),
1268                    state: AgentState::Idle,
1269                    session_id: id,
1270                    connected_at: Instant::now(),
1271                },
1272            );
1273        }
1274        drop(agents);
1275        controller
1276    }
1277
1278    /// Single-line output: one tagged row, one trailing newline (from writeln!).
1279    #[test]
1280    fn single_line_output_emits_one_tagged_row() {
1281        assert_eq!(s("node-01", "ok", "42"), "[node-01/ok] 42\n");
1282    }
1283
1284    /// Multi-line output: EVERY line carries the prefix — the wart this commit fixes.
1285    /// Without per-line prefixing, only the first line would have `[name/tag]` and a
1286    /// pipeline like `@p "a"; p "b"; p "c"; 0` would print three orphan lines.
1287    #[test]
1288    fn multi_line_output_prefixes_every_line() {
1289        let got = s("node-02", "ok", "alpha\nbeta\ngamma");
1290        assert_eq!(
1291            got, "[node-02/ok] alpha\n[node-02/ok] beta\n[node-02/ok] gamma\n",
1292            "every line must carry the [name/tag] prefix"
1293        );
1294    }
1295
1296    /// Empty output is NOT swallowed — the caller still sees one bare prefixed line
1297    /// so void returns ("undef" stringifies to "") produce visible per-agent rows.
1298    #[test]
1299    fn empty_output_still_emits_a_row() {
1300        assert_eq!(s("node-03", "ok", ""), "[node-03/ok]\n");
1301    }
1302
1303    /// Trailing newline in source (e.g. `p "x"` returns `"x\n"`) emits the visible
1304    /// content's tagged line plus one bare prefixed line so successive evals don't
1305    /// visually run into each other.
1306    #[test]
1307    fn trailing_newline_emits_blank_prefixed_terminator() {
1308        let got = s("node-04", "ok", "x\n");
1309        assert_eq!(got, "[node-04/ok] x\n[node-04/ok]\n");
1310    }
1311
1312    /// Error tag formatting parallels the ok tag — no special casing.
1313    #[test]
1314    fn error_tag_format_matches_ok() {
1315        assert_eq!(
1316            s("node-05", "ERR", "Division by zero at -e line 1"),
1317            "[node-05/ERR] Division by zero at -e line 1\n"
1318        );
1319    }
1320
1321    /// The two-pass fan-out in `eval_all` must execute agents **in parallel**, not
1322    /// serially. Three mock agents each sleep 250 ms before replying; with the
1323    /// previous serial loop the wall-clock would be ≥750 ms. With the parallel
1324    /// fan-out the writes go out in microseconds and the reads block on the
1325    /// slowest agent — total wall ≈ 250 ms. We assert well under the 750 ms serial
1326    /// bound to keep the test non-flaky under CI load while still failing loudly
1327    /// if anyone reintroduces serial dispatch.
1328    #[test]
1329    fn eval_all_executes_agents_in_parallel_not_serially() {
1330        const N: usize = 3;
1331        const DELAY: Duration = Duration::from_millis(250);
1332        const SERIAL_BOUND: Duration = Duration::from_millis(750); // N * DELAY
1333
1334        let (s1, h1) = spawn_synthetic_agent(DELAY);
1335        let (s2, h2) = spawn_synthetic_agent(DELAY);
1336        let (s3, h3) = spawn_synthetic_agent(DELAY);
1337        let controller = controller_with_agents(vec![s1, s2, s3]);
1338
1339        let start = Instant::now();
1340        controller.eval_all("1 + 1");
1341        let elapsed = start.elapsed();
1342
1343        for h in [h1, h2, h3] {
1344            h.join().expect("agent thread");
1345        }
1346
1347        assert!(
1348            elapsed < Duration::from_millis(600),
1349            "eval_all must run {} agents in parallel (each delay {:?}); elapsed {:?} \
1350             is too close to the serial bound {:?}",
1351            N,
1352            DELAY,
1353            elapsed,
1354            SERIAL_BOUND
1355        );
1356    }
1357
1358    /// Empty controller (no agents connected) prints a notice and returns without
1359    /// panic / hang. Regression guard for the empty-agents branch.
1360    #[test]
1361    fn eval_all_with_no_agents_is_a_noop() {
1362        let controller = Controller::new();
1363        let start = Instant::now();
1364        controller.eval_all("anything");
1365        let elapsed = start.elapsed();
1366        assert!(
1367            elapsed < Duration::from_millis(100),
1368            "no-agents eval_all should return instantly, took {:?}",
1369            elapsed
1370        );
1371    }
1372
1373    /// Stale frames left over in the TCP buffer from a prior FIRE (METRICS) or
1374    /// STATUS (STATUS_RESP) must be silently skipped so `eval_all` finds the
1375    /// EVAL_RESULT it's actually waiting for. Pins the behaviour at controller.rs's
1376    /// pass-2 inner loop.
1377    #[test]
1378    fn eval_all_skips_stale_frames_before_eval_result() {
1379        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1380        let addr = listener.local_addr().unwrap();
1381        let handle = thread::spawn(move || {
1382            let (mut server, _) = listener.accept().expect("accept");
1383            let mut interp = VMHelper::new();
1384            let (kind, payload) = read_frame(&mut server).expect("read EVAL");
1385            assert_eq!(kind, frame_kind::EVAL);
1386            // Send an unrelated frame FIRST (simulating a leftover METRICS from a
1387            // prior FIRE). Controller must drop it and keep waiting for EVAL_RESULT.
1388            super::write_frame(&mut server, frame_kind::METRICS, &[0u8; 4]).unwrap();
1389            handle_eval_frame(&mut server, &mut interp, &payload).expect("reply");
1390        });
1391        let client = TcpStream::connect(addr).unwrap();
1392        let controller = controller_with_agents(vec![client]);
1393
1394        // If the stale-frame skipping is broken, eval_all hangs or deserializes
1395        // METRICS bytes as EvalResult and prints garbage. Either way the test fails
1396        // via the timeout / agent panic.
1397        let start = Instant::now();
1398        controller.eval_all("42");
1399        assert!(start.elapsed() < Duration::from_secs(5));
1400        handle.join().expect("agent thread");
1401    }
1402
1403    /// Multiple `eval_all` calls against the same controller reuse the persistent
1404    /// per-agent `VMHelper`, so package globals set in call N are visible in N+1.
1405    /// Smoke test for the REPL-style semantics from the user's perspective.
1406    #[test]
1407    fn successive_eval_all_calls_share_per_agent_vm_state() {
1408        // Single agent that handles TWO EVAL frames against the same VM.
1409        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1410        let addr = listener.local_addr().unwrap();
1411        let handle = thread::spawn(move || {
1412            let (mut server, _) = listener.accept().expect("accept");
1413            let mut interp = VMHelper::new();
1414            for _ in 0..2 {
1415                let (kind, payload) = read_frame(&mut server).expect("read EVAL");
1416                assert_eq!(kind, frame_kind::EVAL);
1417                handle_eval_frame(&mut server, &mut interp, &payload).expect("reply");
1418            }
1419        });
1420        let client = TcpStream::connect(addr).unwrap();
1421        let controller = controller_with_agents(vec![client]);
1422
1423        // Frame 1: define a package global.
1424        controller.eval_all("$main::tally = 10; $main::tally");
1425        // Frame 2: read it back. If state didn't persist, we'd see "" / undef.
1426        controller.eval_all("$main::tally + 32");
1427        // We don't have stdout capture here, so the assertion is via the synthetic
1428        // agent thread's panics — it errors if frame deserialisation fails.
1429        handle.join().expect("agent thread");
1430        // Force a real connection close so the agent thread above wakes.
1431        // (Controlled by the controller dropping at end of scope.)
1432        // The test passes if the agent handled exactly 2 frames without panicking.
1433    }
1434
1435    /// `Arc<Controller>` clones share the agents map — the accept loop holds one
1436    /// clone and the REPL thread holds another. Sanity check that eval_all on a
1437    /// cloned controller sees the same agents as the original.
1438    #[test]
1439    fn arc_clone_shares_agents_map() {
1440        let (s, h) = spawn_synthetic_agent(Duration::from_millis(0));
1441        let controller = Arc::new(controller_with_agents(vec![s]));
1442        let clone = Arc::clone(&controller);
1443        clone.eval_all("99");
1444        h.join().expect("agent thread");
1445        // Sanity: the agent map carries one entry after the eval.
1446        assert_eq!(controller.agents.lock().unwrap().len(), 1);
1447    }
1448}