use crate::vm::VM;
pub struct UnitSlot {
pub vm: VM,
pub busy: bool,
pub tasks_completed: u64,
pub user_words: Vec<String>,
}
pub struct GoalResult {
pub unit_index: usize,
pub output: String,
}
pub struct MultiUnitHost {
pub units: Vec<UnitSlot>,
cap: usize,
}
impl MultiUnitHost {
pub fn new(cap: usize) -> Self {
MultiUnitHost {
units: Vec::new(),
cap,
}
}
pub fn with_default_cap() -> Self {
Self::new(100)
}
pub fn len(&self) -> usize {
self.units.len()
}
pub fn cap(&self) -> usize {
self.cap
}
pub fn is_empty(&self) -> bool {
self.units.is_empty()
}
pub fn is_full(&self) -> bool {
self.units.len() >= self.cap
}
pub fn spawn(&mut self) -> Option<usize> {
if self.is_full() {
return None;
}
let mut vm = VM::new();
vm.silent = true;
vm.output_buffer = Some(String::new());
vm.load_prelude();
vm.output_buffer = None;
vm.silent = false;
let idx = self.units.len();
self.units.push(UnitSlot {
vm,
busy: false,
tasks_completed: 0,
user_words: Vec::new(),
});
Some(idx)
}
pub fn define_on(&mut self, idx: usize, definition: &str) -> bool {
if idx >= self.units.len() {
return false;
}
self.units[idx].vm.eval(definition);
self.units[idx].user_words.push(definition.to_string());
true
}
pub fn spawn_n(&mut self, n: usize) -> usize {
let mut spawned = 0;
for _ in 0..n {
if self.spawn().is_none() {
break;
}
spawned += 1;
}
spawned
}
pub fn pick_worker(&self) -> Option<usize> {
if self.units.is_empty() {
return None;
}
let mut best: Option<usize> = None;
let mut best_score: u64 = u64::MAX;
for (i, slot) in self.units.iter().enumerate() {
if slot.busy {
continue;
}
if slot.tasks_completed < best_score {
best_score = slot.tasks_completed;
best = Some(i);
}
}
best.or(Some(0))
}
pub fn execute_goal(&mut self, code: &str) -> Option<GoalResult> {
let i = self.pick_worker()?;
let slot = &mut self.units[i];
slot.busy = true;
let output = slot.vm.eval(code);
slot.tasks_completed += 1;
slot.busy = false;
Some(GoalResult {
unit_index: i,
output,
})
}
pub fn share_word(&mut self, definition: &str) {
for slot in self.units.iter_mut() {
slot.vm.eval(definition);
slot.user_words.push(definition.to_string());
}
}
pub fn teach_from(&mut self, source_idx: usize, words: &[&str]) -> Vec<String> {
let mut taught = Vec::new();
if source_idx >= self.units.len() {
return taught;
}
let mut to_replay: Vec<(String, String)> = Vec::new();
for &word in words {
let needle = format!(": {} ", word);
let needle_alt = format!(": {}\n", word);
let def = self.units[source_idx]
.user_words
.iter()
.rev()
.find(|d| {
let t = d.trim_start();
t.starts_with(&needle) || t.starts_with(&needle_alt)
})
.cloned();
if let Some(d) = def {
to_replay.push((word.to_string(), d));
}
}
for (word, def) in to_replay {
taught.push(word);
for (i, slot) in self.units.iter_mut().enumerate() {
if i == source_idx {
continue;
}
slot.vm.eval(&def);
slot.user_words.push(def.clone());
}
}
taught
}
}
use crate::mesh::{self, MeshNode, NodeId};
use std::net::SocketAddr;
#[derive(Debug, Clone)]
pub struct RemoteProcess {
pub host_id: NodeId,
pub host_id_hex: String,
pub units_hosted: u32,
pub addr: SocketAddr,
}
#[derive(Debug, Clone)]
pub struct DispatchedRemoteMsg {
pub from_host_hex: String,
pub unit_index: usize,
pub output: String,
}
pub struct MultiUnitNode {
pub host: MultiUnitHost,
pub mesh: Option<MeshNode>,
}
impl MultiUnitNode {
pub fn new(
cap: usize,
mesh_port: Option<u16>,
seed_peers: Vec<SocketAddr>,
) -> Result<Self, String> {
let mesh = match mesh_port {
Some(p) => Some(MeshNode::start(p, seed_peers)?),
None => None,
};
Ok(MultiUnitNode {
host: MultiUnitHost::new(cap),
mesh,
})
}
pub fn host_id(&self) -> Option<NodeId> {
self.mesh.as_ref().map(|m| *m.id())
}
pub fn host_id_hex(&self) -> Option<String> {
self.host_id().map(|id| mesh::id_to_hex(&id))
}
pub fn mesh_port(&self) -> Option<u16> {
self.mesh.as_ref().map(|m| m.local_port())
}
pub fn host_unit_count(&self) -> usize {
self.host.len()
}
pub fn spawn_n(&mut self, n: usize) -> usize {
let before = self.host.len();
let count = self.host.spawn_n(n);
let host_hex = self.host_id_hex().unwrap_or_default();
for i in before..self.host.len() {
inject_host_constants(&mut self.host.units[i].vm, &host_hex, i);
}
let siblings = self.host.len().saturating_sub(1) as i64;
for slot in self.host.units.iter_mut() {
slot.vm.eval(&format!("{} _SIBLINGS !", siblings));
}
if let Some(ref m) = self.mesh {
m.set_load(self.host.len() as u32);
m.force_heartbeat();
}
count
}
pub fn remote_processes(&self) -> Vec<RemoteProcess> {
let mesh = match self.mesh.as_ref() {
Some(m) => m,
None => return Vec::new(),
};
let my_id = *mesh.id();
mesh.peer_unit_counts()
.into_iter()
.filter(|(id, _, _)| *id != my_id)
.map(|(id, load, addr)| RemoteProcess {
host_id: id,
host_id_hex: mesh::id_to_hex(&id),
units_hosted: load,
addr,
})
.collect()
}
pub fn send_to_process(&self, target: &NodeId, payload: &str) -> bool {
let mesh = match self.mesh.as_ref() {
Some(m) => m,
None => return false,
};
let target_addr = mesh
.peer_unit_counts()
.into_iter()
.find(|(id, _, _)| id == target)
.map(|(_, _, addr)| addr);
let addr = match target_addr {
Some(a) => a,
None => return false,
};
let from_hex = mesh::id_to_hex(mesh.id());
let to_hex = mesh::id_to_hex(target);
let safe = payload.replace('"', "'");
let sexp = format!(
"(host-msg :to \"{}\" :from \"{}\" :payload \"{}\")",
to_hex, from_hex, safe
);
mesh.send_sexp_to(addr, &sexp);
true
}
pub fn drain_and_dispatch(&mut self) -> Vec<DispatchedRemoteMsg> {
let mut events = Vec::new();
let (raw_msgs, my_hex) = match self.mesh.as_ref() {
Some(m) => (m.recv_sexp_messages(), mesh::id_to_hex(m.id())),
None => return events,
};
for raw in raw_msgs {
let parsed = match crate::sexp::try_parse_mesh_msg(&raw) {
Some(s) => s,
None => continue,
};
if crate::sexp::msg_type(&parsed) != Some("host-msg") {
continue;
}
let to = parsed
.get_key(":to")
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string();
if to != my_hex {
continue;
}
let from = parsed
.get_key(":from")
.and_then(|s| s.as_str())
.unwrap_or("?")
.to_string();
let payload = parsed
.get_key(":payload")
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string();
if payload.is_empty() {
continue;
}
if let Some(r) = self.host.execute_goal(&payload) {
let siblings = self.host.len().saturating_sub(1) as i64;
self.host.units[r.unit_index]
.vm
.eval(&format!("{} _SIBLINGS !", siblings));
events.push(DispatchedRemoteMsg {
from_host_hex: from,
unit_index: r.unit_index,
output: r.output,
});
}
}
let remotes = self.remote_processes().len() as i64;
for slot in self.host.units.iter_mut() {
slot.vm.eval(&format!("{} _REMOTES !", remotes));
}
events
}
}
fn inject_host_constants(vm: &mut crate::vm::VM, host_id_hex: &str, unit_idx: usize) {
vm.eval(&format!(": HOST-ID .\" {}\" CR ;", host_id_hex));
vm.eval(&format!(": UNIT-IDX {} ;", unit_idx));
vm.eval("VARIABLE _SIBLINGS 0 _SIBLINGS !");
vm.eval(": SIBLING-COUNT _SIBLINGS @ ;");
vm.eval("VARIABLE _REMOTES 0 _REMOTES !");
vm.eval(": MESH-PROCESS-COUNT _REMOTES @ ;");
}
#[cfg(test)]
mod bridge_tests {
use super::*;
use std::time::Duration;
fn pair(units_a: usize, units_b: usize) -> (MultiUnitNode, MultiUnitNode) {
let mut a = MultiUnitNode::new(64, Some(0), vec![]).expect("start a");
a.spawn_n(units_a);
let a_addr: SocketAddr = format!("127.0.0.1:{}", a.mesh_port().unwrap())
.parse()
.unwrap();
let mut b = MultiUnitNode::new(64, Some(0), vec![a_addr]).expect("start b");
b.spawn_n(units_b);
for _ in 0..3 {
a.mesh.as_ref().unwrap().force_heartbeat();
b.mesh.as_ref().unwrap().force_heartbeat();
std::thread::sleep(Duration::from_millis(20));
}
(a, b)
}
#[test]
fn host_id_is_set_and_stable() {
let mut a = MultiUnitNode::new(8, Some(0), vec![]).unwrap();
a.spawn_n(2);
let id1 = a.host_id().unwrap();
let id2 = a.host_id().unwrap();
assert_eq!(id1, id2);
assert_eq!(a.host_id_hex().unwrap().len(), 16);
}
#[test]
fn sibling_count_excludes_self() {
let mut a = MultiUnitNode::new(8, None, vec![]).unwrap();
a.spawn_n(4);
let out = a.host.units[0].vm.eval("SIBLING-COUNT .");
assert!(out.contains('3'), "out: {:?}", out);
}
#[test]
fn remote_processes_excludes_self_and_includes_unit_count() {
let (mut a, b) = pair(2, 3);
let _ = a.drain_and_dispatch(); let remotes = a.remote_processes();
let b_id = b.host_id().unwrap();
let entry = remotes
.iter()
.find(|r| r.host_id == b_id)
.expect("b not visible from a");
assert_eq!(entry.units_hosted, 3, "b advertised wrong unit count");
assert!(
!remotes.iter().any(|r| r.host_id == a.host_id().unwrap()),
"remote_processes must exclude self"
);
}
#[test]
fn cross_process_message_is_dispatched_to_a_local_unit() {
let (mut a, mut b) = pair(2, 3);
let _ = a.drain_and_dispatch();
let _ = b.drain_and_dispatch();
let b_id = b.host_id().unwrap();
assert!(a.send_to_process(&b_id, "2 3 + ."));
std::thread::sleep(Duration::from_millis(50));
let dispatched = b.drain_and_dispatch();
assert_eq!(dispatched.len(), 1, "expected 1 dispatched msg, got {:?}", dispatched);
let ev = &dispatched[0];
assert!(ev.unit_index < b.host.len());
assert!(
ev.output.contains('5'),
"expected `5` in dispatched output: {:?}",
ev.output
);
assert_eq!(b.host.units[ev.unit_index].tasks_completed, 1);
}
#[test]
fn host_crash_evicts_peer_from_remote_table() {
let (mut a, b) = pair(2, 2);
let _ = a.drain_and_dispatch();
let b_id = b.host_id().unwrap();
assert!(a.remote_processes().iter().any(|r| r.host_id == b_id));
drop(b);
std::thread::sleep(Duration::from_millis(80));
let evicted = a
.mesh
.as_ref()
.unwrap()
.evict_peers_older_than(Duration::from_millis(50));
assert!(evicted >= 1, "expected to evict at least 1 stale peer");
assert!(
!a.remote_processes().iter().any(|r| r.host_id == b_id),
"b should be gone from a's remote_processes after eviction"
);
}
#[test]
fn host_constants_are_per_unit() {
let mut a = MultiUnitNode::new(8, Some(0), vec![]).unwrap();
a.spawn_n(3);
for i in 0..3 {
let out = a.host.units[i].vm.eval("UNIT-IDX .");
assert!(
out.contains(&i.to_string()),
"unit {} UNIT-IDX out: {:?}",
i,
out
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn spawn_respects_cap() {
let mut h = MultiUnitHost::new(3);
assert_eq!(h.spawn(), Some(0));
assert_eq!(h.spawn(), Some(1));
assert_eq!(h.spawn(), Some(2));
assert!(h.is_full());
assert_eq!(h.spawn(), None);
assert_eq!(h.len(), 3);
}
#[test]
fn spawn_n_returns_actual_count() {
let mut h = MultiUnitHost::new(5);
assert_eq!(h.spawn_n(3), 3);
assert_eq!(h.spawn_n(10), 2); assert_eq!(h.len(), 5);
}
#[test]
fn pick_worker_picks_least_busy() {
let mut h = MultiUnitHost::new(5);
h.spawn_n(3);
h.units[0].tasks_completed = 5;
h.units[1].tasks_completed = 1;
h.units[2].tasks_completed = 3;
assert_eq!(h.pick_worker(), Some(1));
}
#[test]
fn pick_worker_skips_busy() {
let mut h = MultiUnitHost::new(5);
h.spawn_n(3);
h.units[0].busy = true;
h.units[1].busy = true;
h.units[2].tasks_completed = 7;
assert_eq!(h.pick_worker(), Some(2));
}
#[test]
fn pick_worker_falls_back_to_zero_when_all_busy() {
let mut h = MultiUnitHost::new(3);
h.spawn_n(2);
h.units[0].busy = true;
h.units[1].busy = true;
assert_eq!(h.pick_worker(), Some(0));
}
#[test]
fn pick_worker_returns_none_when_empty() {
let h = MultiUnitHost::new(3);
assert_eq!(h.pick_worker(), None);
}
#[test]
fn execute_goal_runs_and_increments_tasks() {
let mut h = MultiUnitHost::new(3);
h.spawn_n(2);
let r = h.execute_goal("2 3 + .").unwrap();
assert!(r.output.contains('5'), "output: {:?}", r.output);
assert_eq!(h.units[r.unit_index].tasks_completed, 1);
assert!(!h.units[r.unit_index].busy);
}
#[test]
fn execute_goal_round_robins_across_idle_units() {
let mut h = MultiUnitHost::new(3);
h.spawn_n(3);
let r0 = h.execute_goal("1 .").unwrap();
let r1 = h.execute_goal("2 .").unwrap();
let r2 = h.execute_goal("3 .").unwrap();
let mut hits = vec![r0.unit_index, r1.unit_index, r2.unit_index];
hits.sort();
assert_eq!(hits, vec![0, 1, 2], "expected one goal per unit");
}
#[test]
fn share_word_makes_word_available_on_every_unit() {
let mut h = MultiUnitHost::new(5);
h.spawn_n(3);
h.share_word(": DOUBLE 2 * ;");
for i in 0..3 {
let out = h.units[i].vm.eval("21 DOUBLE .");
assert!(out.contains("42"), "unit {} output: {:?}", i, out);
}
}
#[test]
fn teach_from_copies_definition_to_others() {
let mut h = MultiUnitHost::new(5);
h.spawn_n(3);
assert!(h.define_on(0, ": TRIPLE 3 * ;"));
let probe = h.units[1].vm.eval("7 TRIPLE .");
assert!(
probe.contains("unknown"),
"unit 1 already knows TRIPLE: {:?}",
probe
);
let taught = h.teach_from(0, &["TRIPLE"]);
assert_eq!(taught, vec!["TRIPLE".to_string()]);
for i in 1..3 {
let out = h.units[i].vm.eval("7 TRIPLE .");
assert!(out.contains("21"), "unit {} output: {:?}", i, out);
}
}
#[test]
fn define_on_records_user_word() {
let mut h = MultiUnitHost::new(3);
h.spawn_n(1);
assert!(h.define_on(0, ": HELLO 99 ;"));
assert_eq!(h.units[0].user_words, vec![": HELLO 99 ;".to_string()]);
let out = h.units[0].vm.eval("HELLO .");
assert!(out.contains("99"), "out: {:?}", out);
}
#[test]
fn share_word_records_user_word_on_every_unit() {
let mut h = MultiUnitHost::new(3);
h.spawn_n(2);
h.share_word(": GREET 42 ;");
for slot in &h.units {
assert_eq!(slot.user_words, vec![": GREET 42 ;".to_string()]);
}
}
#[test]
fn teach_from_skips_unknown_words() {
let mut h = MultiUnitHost::new(3);
h.spawn_n(2);
let taught = h.teach_from(0, &["NOPE-NOT-A-WORD"]);
assert!(taught.is_empty(), "got: {:?}", taught);
}
}