use std::collections::{HashMap, HashSet, VecDeque};
use std::io::Read;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::features::fitness::PeerFitness;
use crate::goals::{Goal, GoalId, GoalRegistry, GoalStatus, Task, TaskId, TaskResult, TaskStatus};
use crate::types::{Cell, Entry, Instruction};
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(2);
const PEER_TIMEOUT: Duration = Duration::from_secs(15);
const PROPOSAL_TIMEOUT: Duration = Duration::from_secs(5);
const PROPOSAL_COOLDOWN: Duration = Duration::from_secs(10);
const RECV_TIMEOUT: Duration = Duration::from_millis(500);
const MAX_GOSSIP_PEERS: usize = 8;
const DEFAULT_CAPACITY: u32 = 100;
const MSG_HEARTBEAT: u8 = 1;
const MSG_PROPOSE: u8 = 2;
const MSG_VOTE: u8 = 3;
const MSG_COMMIT: u8 = 4;
const MSG_REJECT: u8 = 5;
const MSG_DATA: u8 = 6;
const MSG_GOAL_BROADCAST: u8 = 7;
const MSG_TASK_CLAIM: u8 = 8;
const MSG_TASK_RESULT: u8 = 9;
const MSG_WORD_SHARE: u8 = 10;
const MSG_DISCOVERY_BEACON: u8 = 11;
const MSG_SPAWN_INTENT: u8 = 12;
const MSG_CULL_INTENT: u8 = 13;
const MSG_REPLICATE_REQUEST: u8 = 14;
const MSG_REPLICATE_ACCEPT: u8 = 15;
const MSG_REPLICATE_DENY: u8 = 16;
const MSG_SEXP: u8 = 17;
const MAGIC: &[u8; 4] = b"UNIT";
const DISCOVERY_PORT: u16 = 4200;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(5);
pub type NodeId = [u8; 8];
fn generate_id() -> NodeId {
let mut id = [0u8; 8];
if let Ok(mut f) = std::fs::File::open("/dev/urandom") {
let _ = f.read_exact(&mut id);
} else {
let t = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
id = t.to_ne_bytes();
}
id
}
pub fn id_to_hex(id: &NodeId) -> String {
id.iter().map(|b| format!("{:02x}", b)).collect()
}
fn write_u8(buf: &mut Vec<u8>, v: u8) {
buf.push(v);
}
fn write_u16(buf: &mut Vec<u8>, v: u16) {
buf.extend_from_slice(&v.to_be_bytes());
}
fn write_u32(buf: &mut Vec<u8>, v: u32) {
buf.extend_from_slice(&v.to_be_bytes());
}
fn write_u64(buf: &mut Vec<u8>, v: u64) {
buf.extend_from_slice(&v.to_be_bytes());
}
fn write_i64(buf: &mut Vec<u8>, v: i64) {
buf.extend_from_slice(&v.to_be_bytes());
}
fn write_bytes(buf: &mut Vec<u8>, data: &[u8]) {
buf.extend_from_slice(data);
}
fn read_u8(data: &[u8], pos: &mut usize) -> Option<u8> {
if *pos >= data.len() {
return None;
}
let v = data[*pos];
*pos += 1;
Some(v)
}
fn read_u16(data: &[u8], pos: &mut usize) -> Option<u16> {
if *pos + 2 > data.len() {
return None;
}
let v = u16::from_be_bytes([data[*pos], data[*pos + 1]]);
*pos += 2;
Some(v)
}
fn read_u32(data: &[u8], pos: &mut usize) -> Option<u32> {
if *pos + 4 > data.len() {
return None;
}
let v = u32::from_be_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]);
*pos += 4;
Some(v)
}
fn read_u64(data: &[u8], pos: &mut usize) -> Option<u64> {
if *pos + 8 > data.len() {
return None;
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&data[*pos..*pos + 8]);
*pos += 8;
Some(u64::from_be_bytes(bytes))
}
fn read_i64(data: &[u8], pos: &mut usize) -> Option<i64> {
if *pos + 8 > data.len() {
return None;
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&data[*pos..*pos + 8]);
*pos += 8;
Some(i64::from_be_bytes(bytes))
}
fn read_bytes(data: &[u8], pos: &mut usize, len: usize) -> Option<Vec<u8>> {
if *pos + len > data.len() {
return None;
}
let v = data[*pos..*pos + len].to_vec();
*pos += len;
Some(v)
}
const HEADER_SIZE: usize = 4 + 1 + 8 + 2;
fn encode_header(buf: &mut Vec<u8>, msg_type: u8, id: &NodeId, port: u16) {
write_bytes(buf, MAGIC);
write_u8(buf, msg_type);
write_bytes(buf, id);
write_u16(buf, port);
}
fn decode_header(data: &[u8], pos: &mut usize) -> Option<(u8, NodeId, u16)> {
let magic = read_bytes(data, pos, 4)?;
if magic != MAGIC {
return None;
}
let msg_type = read_u8(data, pos)?;
let id_bytes = read_bytes(data, pos, 8)?;
let mut id = [0u8; 8];
id.copy_from_slice(&id_bytes);
let port = read_u16(data, pos)?;
Some((msg_type, id, port))
}
#[derive(Clone, Debug)]
struct PeerInfo {
addr: SocketAddr,
id: NodeId,
load: u32,
capacity: u32,
peer_count: u16,
fitness: i64,
last_seen: Instant,
}
#[derive(Clone, Debug)]
struct Proposal {
id: u64,
proposer: NodeId,
reason: String,
votes_yes: HashSet<NodeId>,
votes_no: HashSet<NodeId>,
started: Instant,
total_peers_at_start: usize,
committed: bool,
state_bytes: Option<Vec<u8>>,
}
#[derive(Clone, Debug)]
pub struct InboxMessage {
pub from: NodeId,
pub data: Vec<u8>,
}
pub(crate) struct MeshState {
id: NodeId,
port: u16,
peers: HashMap<NodeId, PeerInfo>,
inbox: VecDeque<InboxMessage>,
proposals: HashMap<u64, Proposal>,
last_proposal_time: Option<Instant>,
load: u32,
capacity: u32,
event_log: VecDeque<String>,
pub(crate) goals: GoalRegistry,
fitness: i64,
auto_replicate_needed: bool,
running: bool,
pub(crate) shared_words: Vec<(String, NodeId)>,
pub(crate) word_inbox: VecDeque<SharedWord>,
pub(crate) auto_discover: bool,
pub(crate) auto_share: bool,
pub(crate) auto_spawn: bool,
pub(crate) auto_cull: bool,
pub(crate) min_units: usize,
pub(crate) max_units: usize,
pub(crate) spawn_intent_active: bool,
pub(crate) cull_intent_active: bool,
pub(crate) trust_level: TrustLevel,
pub(crate) pending_requests: Vec<ReplicationRequest>,
pub(crate) replication_log: Vec<ReplicationLogEntry>,
pub(crate) next_request_id: u32,
pub(crate) parent_id: Option<NodeId>,
pub(crate) children_ids: Vec<NodeId>,
pub(crate) sexp_inbox: VecDeque<String>,
}
#[derive(Clone, Debug)]
pub struct SharedWord {
pub name: String,
pub body_source: String,
pub origin: NodeId,
}
#[derive(Clone, Debug, PartialEq)]
pub enum TrustLevel {
All, Mesh, Family, None, }
impl TrustLevel {
pub fn label(&self) -> &str {
match self {
TrustLevel::All => "all",
TrustLevel::Mesh => "mesh",
TrustLevel::Family => "family",
TrustLevel::None => "none",
}
}
pub fn as_val(&self) -> i64 {
match self {
TrustLevel::All => 0,
TrustLevel::Mesh => 1,
TrustLevel::Family => 2,
TrustLevel::None => 3,
}
}
}
#[derive(Clone, Debug)]
pub struct ReplicationRequest {
pub id: u32,
pub sender_id: NodeId,
pub sender_fitness: i64,
pub sender_generation: u32,
pub package_size: u64,
pub reason: String,
pub received_at: Instant,
}
#[derive(Clone, Debug)]
pub struct ReplicationLogEntry {
pub timestamp: u64,
pub direction: String, pub peer_id: NodeId,
pub reason: String,
pub result: String, }
impl MeshState {
fn log_event(&mut self, msg: String) {
self.event_log.push_back(msg);
if self.event_log.len() > 20 {
self.event_log.pop_front();
}
}
}
pub struct MeshNode {
id: NodeId,
id_hex: String,
socket: UdpSocket,
state: Arc<Mutex<MeshState>>,
_thread: Option<thread::JoinHandle<()>>,
repl_rx: Option<std::sync::mpsc::Receiver<Vec<u8>>>,
pub repl_port: u16,
pub external_addr: Option<SocketAddr>,
pub mesh_key: Option<String>,
}
impl Drop for MeshNode {
fn drop(&mut self) {
if let Ok(mut st) = self.state.lock() {
st.running = false;
}
if let Some(handle) = self._thread.take() {
let _ = handle.join();
}
}
}
impl MeshNode {
pub fn start(port: u16, seed_peers: Vec<SocketAddr>) -> Result<Self, String> {
Self::start_with_id(None, port, seed_peers)
}
pub fn start_with_id(
fixed_id: Option<NodeId>,
port: u16,
seed_peers: Vec<SocketAddr>,
) -> Result<Self, String> {
let id = fixed_id.unwrap_or_else(generate_id);
let id_hex = id_to_hex(&id);
let bind_addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
let socket = UdpSocket::bind(bind_addr).map_err(|e| format!("bind: {}", e))?;
let local_port = socket.local_addr().map_err(|e| format!("{}", e))?.port();
let state = Arc::new(Mutex::new(MeshState {
id,
port: local_port,
peers: HashMap::new(),
inbox: VecDeque::new(),
proposals: HashMap::new(),
last_proposal_time: None,
load: 0,
capacity: DEFAULT_CAPACITY,
event_log: VecDeque::new(),
goals: GoalRegistry::new(&id),
fitness: 0,
auto_replicate_needed: false,
running: true,
shared_words: Vec::new(),
word_inbox: VecDeque::new(),
auto_discover: true,
auto_share: false,
auto_spawn: false,
auto_cull: false,
min_units: 1,
max_units: 10,
spawn_intent_active: false,
cull_intent_active: false,
trust_level: TrustLevel::All,
pending_requests: Vec::new(),
replication_log: Vec::new(),
next_request_id: 1,
parent_id: None,
children_ids: Vec::new(),
sexp_inbox: VecDeque::new(),
}));
{
let mut st = state.lock().unwrap();
for addr in &seed_peers {
let pseudo_id = addr_to_pseudo_id(addr);
st.peers.insert(
pseudo_id,
PeerInfo {
addr: *addr,
id: pseudo_id,
load: 0,
capacity: 0,
peer_count: 0,
fitness: 0,
last_seen: Instant::now(),
},
);
}
st.log_event(format!(
"mesh started on port {} with {} seed peers",
local_port,
seed_peers.len()
));
}
let thread_socket = socket.try_clone().map_err(|e| format!("clone: {}", e))?;
let thread_state = Arc::clone(&state);
let thread_id = id;
let handle = thread::spawn(move || {
network_thread(thread_socket, thread_state, thread_id);
});
let repl_tcp_port = if local_port > 0 { local_port + 1000 } else { 0 };
let (repl_rx, actual_repl_port) =
match crate::spawn::start_replication_listener(repl_tcp_port) {
Ok(rx) => (Some(rx), repl_tcp_port),
Err(_) => (None, 0),
};
Self::start_discovery_listener(Arc::clone(&state), id);
Ok(MeshNode {
id,
id_hex,
socket,
state,
_thread: Some(handle),
repl_rx,
repl_port: actual_repl_port,
external_addr: None,
mesh_key: None,
})
}
pub fn id_hex(&self) -> &str {
&self.id_hex
}
pub fn id(&self) -> &NodeId {
&self.id
}
pub fn local_port(&self) -> u16 {
self.socket.local_addr().map(|a| a.port()).unwrap_or(0)
}
pub fn peer_count(&self) -> usize {
let st = self.state.lock().unwrap();
st.peers.len()
}
pub fn load(&self) -> u32 {
self.state.lock().unwrap().load
}
pub fn set_load(&self, load: u32) {
self.state.lock().unwrap().load = load;
}
pub fn capacity(&self) -> u32 {
self.state.lock().unwrap().capacity
}
pub fn format_status(&self) -> String {
let st = self.state.lock().unwrap();
let mut out = String::from("--- mesh status ---\n");
out.push_str(&format!("id: {}\n", id_to_hex(&st.id)));
out.push_str(&format!("port: {}\n", st.port));
out.push_str(&format!("load: {}/{}\n", st.load, st.capacity));
out.push_str(&format!("peers: {}\n", st.peers.len()));
for peer in st.peers.values() {
let age = peer.last_seen.elapsed().as_secs();
out.push_str(&format!(
" {} @ {} load={}/{} seen={}s ago\n",
id_to_hex(&peer.id),
peer.addr,
peer.load,
peer.capacity,
age
));
}
if !st.proposals.is_empty() {
out.push_str("proposals:\n");
for (pid, prop) in &st.proposals {
out.push_str(&format!(
" #{:016x} by {} yes={} no={} committed={}\n",
pid,
id_to_hex(&prop.proposer),
prop.votes_yes.len(),
prop.votes_no.len(),
prop.committed
));
}
}
if !st.event_log.is_empty() {
out.push_str("recent events:\n");
for evt in &st.event_log {
out.push_str(&format!(" {}\n", evt));
}
}
out.push_str("---\n");
out
}
pub fn send_data(&self, data: &[u8]) {
let st = self.state.lock().unwrap();
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE + 2 + data.len());
encode_header(&mut buf, MSG_DATA, &self.id, port);
write_u16(&mut buf, data.len() as u16);
write_bytes(&mut buf, data);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
}
pub fn recv_data(&self) -> Option<InboxMessage> {
self.state.lock().unwrap().inbox.pop_front()
}
pub fn propose_replicate(&self, reason: &str, state_bytes: Vec<u8>) -> Result<(), String> {
let mut st = self.state.lock().unwrap();
if let Some(last) = st.last_proposal_time {
if last.elapsed() < PROPOSAL_COOLDOWN {
let remaining = PROPOSAL_COOLDOWN - last.elapsed();
return Err(format!("cooldown: wait {}s", remaining.as_secs()));
}
}
for prop in st.proposals.values() {
if prop.proposer == self.id
&& !prop.committed
&& prop.started.elapsed() < PROPOSAL_TIMEOUT
{
return Err("already have an active proposal".into());
}
}
if st.peers.is_empty() {
return Err("no peers to vote".into());
}
let proposal_id = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let total_peers = st.peers.len();
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
let proposal = Proposal {
id: proposal_id,
proposer: self.id,
reason: reason.to_string(),
votes_yes: HashSet::new(),
votes_no: HashSet::new(),
started: Instant::now(),
total_peers_at_start: total_peers,
committed: false,
state_bytes: Some(state_bytes),
};
st.proposals.insert(proposal_id, proposal);
st.log_event(format!(
"proposed replication #{:016x}: {}",
proposal_id, reason
));
drop(st);
let reason_bytes = reason.as_bytes();
let mut buf = Vec::with_capacity(HEADER_SIZE + 8 + 2 + reason_bytes.len());
encode_header(&mut buf, MSG_PROPOSE, &self.id, port);
write_u64(&mut buf, proposal_id);
write_u16(&mut buf, reason_bytes.len() as u16);
write_bytes(&mut buf, reason_bytes);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
Ok(())
}
pub fn shutdown(&self) {
if let Ok(mut st) = self.state.lock() {
st.running = false;
}
}
pub fn create_goal(&self, description: &str, priority: Cell, code: Option<String>) -> GoalId {
let mut st = self.state.lock().unwrap();
let goal_id = st
.goals
.create_goal(description.to_string(), priority, self.id, code);
st.log_event(format!("goal #{} created: {}", goal_id, description));
let goal = st.goals.goals.get(&goal_id).cloned();
let tasks: Vec<Task> = goal
.as_ref()
.map(|g| {
g.task_ids
.iter()
.filter_map(|tid| st.goals.tasks.get(tid).cloned())
.collect()
})
.unwrap_or_default();
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
if let Some(goal) = goal {
let buf = encode_goal_broadcast(&self.id, port, &goal, &tasks);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
}
goal_id
}
pub fn claim_task(&self) -> Option<(TaskId, GoalId, String)> {
let mut st = self.state.lock().unwrap();
let result = st.goals.claim_task(self.id);
if let Some((task_id, goal_id, ref desc)) = result {
st.log_event(format!(
"claimed task #{} (goal #{}): {}",
task_id, goal_id, desc
));
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE + 16);
encode_header(&mut buf, MSG_TASK_CLAIM, &self.id, port);
write_u64(&mut buf, task_id);
write_u64(&mut buf, goal_id);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
return Some((task_id, goal_id, desc.clone()));
}
None
}
pub fn complete_task_with_result(&self, task_id: TaskId, result: TaskResult) {
let mut st = self.state.lock().unwrap();
let goal_id = st.goals.tasks.get(&task_id).map(|t| t.goal_id);
let success = result.success;
st.goals.complete_task(task_id, Some(result.clone()));
st.log_event(format!(
"task #{} {} (stack={} output={}b)",
task_id,
if success { "completed" } else { "failed" },
result.stack_snapshot.len(),
result.output.len(),
));
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
let output_bytes = result.output.as_bytes();
let error_bytes = result.error.as_deref().unwrap_or("").as_bytes();
let mut buf = Vec::with_capacity(HEADER_SIZE + 32 + output_bytes.len());
encode_header(&mut buf, MSG_TASK_RESULT, &self.id, port);
write_u64(&mut buf, task_id);
write_u64(&mut buf, goal_id.unwrap_or(0));
write_u8(&mut buf, if success { 1 } else { 0 });
write_u16(&mut buf, result.stack_snapshot.len() as u16);
for &val in &result.stack_snapshot {
write_i64(&mut buf, val);
}
write_u16(&mut buf, output_bytes.len() as u16);
write_bytes(&mut buf, output_bytes);
write_u16(&mut buf, error_bytes.len() as u16);
write_bytes(&mut buf, error_bytes);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
}
pub fn claim_executable_task(&self) -> Option<(TaskId, GoalId, String, String)> {
let mut st = self.state.lock().unwrap();
let result = st.goals.claim_executable_task(self.id);
if let Some((task_id, goal_id, ref desc, _)) = result {
st.log_event(format!(
"auto-claimed task #{} (goal #{})",
task_id, goal_id
));
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE + 16);
encode_header(&mut buf, MSG_TASK_CLAIM, &self.id, port);
write_u64(&mut buf, task_id);
write_u64(&mut buf, goal_id);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
return Some((task_id, goal_id, desc.clone(), result.unwrap().3));
}
None
}
pub fn cancel_goal(&self, goal_id: GoalId) -> bool {
let mut st = self.state.lock().unwrap();
let ok = st.goals.cancel_goal(goal_id);
if ok {
st.log_event(format!("goal #{} cancelled", goal_id));
let goal = st.goals.goals.get(&goal_id).cloned();
let tasks: Vec<Task> = goal
.as_ref()
.map(|g| {
g.task_ids
.iter()
.filter_map(|tid| st.goals.tasks.get(tid).cloned())
.collect()
})
.unwrap_or_default();
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
if let Some(goal) = goal {
let buf = encode_goal_broadcast(&self.id, port, &goal, &tasks);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
}
}
ok
}
pub fn steer_goal(&self, goal_id: GoalId, priority: Cell) -> bool {
let mut st = self.state.lock().unwrap();
let ok = st.goals.steer_goal(goal_id, priority);
if ok {
st.log_event(format!("goal #{} priority -> {}", goal_id, priority));
let goal = st.goals.goals.get(&goal_id).cloned();
let tasks: Vec<Task> = goal
.as_ref()
.map(|g| {
g.task_ids
.iter()
.filter_map(|tid| st.goals.tasks.get(tid).cloned())
.collect()
})
.unwrap_or_default();
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
if let Some(goal) = goal {
let buf = encode_goal_broadcast(&self.id, port, &goal, &tasks);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
}
}
ok
}
pub fn format_goals(&self) -> String {
self.state.lock().unwrap().goals.format_goals()
}
pub fn format_tasks(&self) -> String {
self.state.lock().unwrap().goals.format_my_tasks(&self.id)
}
pub fn format_goal_tasks(&self, goal_id: GoalId) -> String {
self.state.lock().unwrap().goals.format_goal_tasks(goal_id)
}
pub fn format_report(&self) -> String {
self.state.lock().unwrap().goals.format_report()
}
pub fn format_task_result(&self, task_id: TaskId) -> String {
self.state.lock().unwrap().goals.format_task_result(task_id)
}
pub fn format_goal_result(&self, goal_id: GoalId) -> String {
self.state.lock().unwrap().goals.format_goal_result(goal_id)
}
pub fn pending_goal_count(&self) -> usize {
self.state.lock().unwrap().goals.active_goal_count()
}
pub fn should_auto_replicate(&self) -> bool {
self.state.lock().unwrap().auto_replicate_needed
}
pub fn clear_auto_replicate(&self) {
self.state.lock().unwrap().auto_replicate_needed = false;
}
pub fn clone_goals(&self) -> GoalRegistry {
self.state.lock().unwrap().goals.clone()
}
pub fn goal_code(&self, goal_id: GoalId) -> Option<String> {
self.state.lock().unwrap().goals.goal_code(goal_id)
}
pub fn id_bytes(&self) -> NodeId {
self.id
}
pub fn set_fitness(&self, score: i64) {
self.state.lock().unwrap().fitness = score;
}
pub fn recv_replication(&self) -> Option<Vec<u8>> {
self.repl_rx.as_ref().and_then(|rx| rx.try_recv().ok())
}
pub(crate) fn state_lock(&self) -> std::sync::MutexGuard<'_, MeshState> {
self.state.lock().unwrap()
}
pub fn peer_fitness_list(&self) -> Vec<PeerFitness> {
let st = self.state.lock().unwrap();
st.peers
.values()
.map(|p| PeerFitness {
id: p.id,
score: p.fitness,
})
.collect()
}
pub fn peer_details(&self) -> Vec<(String, i64, String)> {
let st = self.state.lock().unwrap();
st.peers
.values()
.map(|p| (id_to_hex(&p.id), p.fitness, p.addr.to_string()))
.collect()
}
pub fn my_addr(&self) -> String {
if let Some(addr) = self.external_addr {
return format!("{} (external)", addr);
}
self.socket
.local_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "unknown".into())
}
pub fn peer_table(&self) -> Vec<(String, String, i64, u64)> {
let st = self.state.lock().unwrap();
st.peers
.values()
.map(|p| {
let age_secs = p.last_seen.elapsed().as_secs();
(id_to_hex(&p.id), p.addr.to_string(), p.fitness, age_secs)
})
.collect()
}
pub fn mesh_stats(&self) -> (usize, u16) {
let st = self.state.lock().unwrap();
(st.peers.len(), st.port)
}
pub fn connect_peer(&self, addr: SocketAddr) {
let pseudo = addr_to_pseudo_id(&addr);
let mut st = self.state.lock().unwrap();
if let std::collections::hash_map::Entry::Vacant(e) = st.peers.entry(pseudo) {
e.insert(PeerInfo {
addr,
id: pseudo,
load: 0,
capacity: 0,
peer_count: 0,
fitness: 0,
last_seen: Instant::now(),
});
st.log_event(format!("manual connect: {}", addr));
}
}
pub fn disconnect_peer(&self, hex_id: &str) -> bool {
let mut st = self.state.lock().unwrap();
let key = st.peers.keys().find(|k| id_to_hex(k) == hex_id).copied();
if let Some(k) = key {
st.peers.remove(&k);
st.log_event(format!("disconnected {}", hex_id));
true
} else {
false
}
}
pub fn goal_stats(&self) -> (usize, usize, usize, usize) {
let st = self.state.lock().unwrap();
let total = st.goals.goals.len();
let pending = st
.goals
.goals
.values()
.filter(|g| g.status == crate::goals::GoalStatus::Pending)
.count();
let active = st
.goals
.goals
.values()
.filter(|g| g.status == crate::goals::GoalStatus::Active)
.count();
let completed = st
.goals
.goals
.values()
.filter(|g| g.status == crate::goals::GoalStatus::Completed)
.count();
(total, pending, active, completed)
}
pub fn drain_recent_events(&self) -> Vec<String> {
let mut st = self.state.lock().unwrap();
let events: Vec<String> = st.event_log.drain(..).collect();
events.into_iter().rev().take(20).collect()
}
pub fn send_discovery_beacon(&self) {
let st = self.state.lock().unwrap();
if !st.auto_discover {
return;
}
let port = st.port;
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE + 2);
encode_header(&mut buf, MSG_DISCOVERY_BEACON, &self.id, port);
let broadcast_addr: std::net::SocketAddr =
format!("127.0.0.1:{}", DISCOVERY_PORT).parse().unwrap();
let _ = self.socket.send_to(&buf, broadcast_addr);
}
pub(crate) fn start_discovery_listener(state: Arc<Mutex<MeshState>>, my_id: NodeId) {
let addr = format!("0.0.0.0:{}", DISCOVERY_PORT);
let sock = match UdpSocket::bind(&addr) {
Ok(s) => s,
Err(_) => return, };
sock.set_read_timeout(Some(Duration::from_millis(500))).ok();
std::thread::spawn(move || {
let mut buf = [0u8; 256];
loop {
{
let st = state.lock().unwrap();
if !st.running {
return;
}
}
if let Ok((len, src)) = sock.recv_from(&mut buf) {
if len < HEADER_SIZE {
continue;
}
if &buf[0..4] != MAGIC {
continue;
}
let msg_type = buf[4];
if msg_type != MSG_DISCOVERY_BEACON {
continue;
}
let mut sender_id = [0u8; 8];
sender_id.copy_from_slice(&buf[5..13]);
if sender_id == my_id {
continue;
} let sender_port = u16::from_be_bytes([buf[13], buf[14]]);
let peer_addr: std::net::SocketAddr =
format!("{}:{}", src.ip(), sender_port).parse().unwrap();
let mut st = state.lock().unwrap();
if let std::collections::hash_map::Entry::Vacant(e) = st.peers.entry(sender_id)
{
e.insert(PeerInfo {
addr: peer_addr,
id: sender_id,
load: 0,
capacity: 0,
peer_count: 0,
fitness: 0,
last_seen: Instant::now(),
});
st.log_event(format!(
"discovered {} via beacon @ {}",
id_to_hex(&sender_id),
peer_addr
));
}
}
}
});
}
pub fn share_word(&self, name: &str, source: &str) {
let st = self.state.lock().unwrap();
let port = st.port;
let peers: Vec<std::net::SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE + 4 + name.len() + source.len());
encode_header(&mut buf, MSG_WORD_SHARE, &self.id, port);
let nb = name.as_bytes();
write_u16(&mut buf, nb.len() as u16);
write_bytes(&mut buf, nb);
let sb = source.as_bytes();
write_u16(&mut buf, sb.len() as u16);
write_bytes(&mut buf, sb);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
let sexp = crate::sexp::msg_word_share(name, source, &self.id);
self.send_sexp(&sexp.to_string());
}
pub fn send_sexp(&self, sexp_str: &str) {
let st = self.state.lock().unwrap();
let port = st.port;
let peers: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
let sb = sexp_str.as_bytes();
let mut buf = Vec::with_capacity(HEADER_SIZE + 2 + sb.len());
encode_header(&mut buf, MSG_SEXP, &self.id, port);
write_u16(&mut buf, sb.len() as u16);
write_bytes(&mut buf, sb);
for addr in &peers {
let _ = self.socket.send_to(&buf, addr);
}
}
pub fn recv_sexp_messages(&self) -> Vec<String> {
let mut st = self.state.lock().unwrap();
st.sexp_inbox.drain(..).collect()
}
pub fn recv_shared_words(&self) -> Vec<SharedWord> {
let mut st = self.state.lock().unwrap();
st.word_inbox.drain(..).collect()
}
pub fn shared_words_list(&self) -> Vec<(String, String)> {
let st = self.state.lock().unwrap();
st.shared_words
.iter()
.map(|(name, origin)| (name.clone(), id_to_hex(origin)))
.collect()
}
pub fn should_auto_spawn(&self) -> bool {
let st = self.state.lock().unwrap();
if !st.auto_spawn {
return false;
}
let units = st.peers.len() + 1;
if units >= st.max_units {
return false;
}
let pending = st.goals.pending_task_count();
pending > units
}
pub fn should_auto_cull(&self) -> bool {
let st = self.state.lock().unwrap();
if !st.auto_cull {
return false;
}
let units = st.peers.len() + 1;
if units <= st.min_units {
return false;
}
let my_fitness = st.fitness;
let total: i64 = st.peers.values().map(|p| p.fitness).sum::<i64>() + my_fitness;
let avg = total / units as i64;
my_fitness < avg - 10 }
pub fn format_swarm_status(&self) -> String {
let st = self.state.lock().unwrap();
let units = st.peers.len() + 1;
format!(
"--- swarm status ---\n\
units: {}/{}-{}\n\
auto-discover: {} auto-share: {} auto-spawn: {} auto-cull: {}\n\
shared words: {} pending word inbox: {}\n\
---\n",
units,
st.min_units,
st.max_units,
if st.auto_discover { "ON" } else { "OFF" },
if st.auto_share { "ON" } else { "OFF" },
if st.auto_spawn { "ON" } else { "OFF" },
if st.auto_cull { "ON" } else { "OFF" },
st.shared_words.len(),
st.word_inbox.len(),
)
}
pub fn set_trust_level(&self, level: TrustLevel) {
self.state.lock().unwrap().trust_level = level;
}
pub fn trust_level(&self) -> TrustLevel {
self.state.lock().unwrap().trust_level.clone()
}
pub fn should_auto_accept(&self, sender: &NodeId) -> bool {
let st = self.state.lock().unwrap();
match st.trust_level {
TrustLevel::All => true,
TrustLevel::Mesh => st.peers.contains_key(sender),
TrustLevel::Family => {
st.parent_id.as_ref() == Some(sender) || st.children_ids.contains(sender)
}
TrustLevel::None => false,
}
}
pub fn queue_request(
&self,
sender: NodeId,
fitness: i64,
generation: u32,
size: u64,
reason: String,
) -> u32 {
let mut st = self.state.lock().unwrap();
let from_peer = st
.pending_requests
.iter()
.filter(|r| r.sender_id == sender)
.count();
if from_peer >= 3 {
return 0;
}
let id = st.next_request_id;
st.next_request_id += 1;
st.pending_requests.push(ReplicationRequest {
id,
sender_id: sender,
sender_fitness: fitness,
sender_generation: generation,
package_size: size,
reason,
received_at: Instant::now(),
});
id
}
pub fn accept_oldest(&self) -> Option<(NodeId, u32)> {
let mut st = self.state.lock().unwrap();
if st.pending_requests.is_empty() {
return None;
}
let req = st.pending_requests.remove(0);
self.log_replication(&mut st, "incoming", &req.sender_id, &req.reason, "accepted");
Some((req.sender_id, req.id))
}
pub fn deny_oldest(&self) -> Option<u32> {
let mut st = self.state.lock().unwrap();
if st.pending_requests.is_empty() {
return None;
}
let req = st.pending_requests.remove(0);
self.log_replication(&mut st, "incoming", &req.sender_id, &req.reason, "denied");
Some(req.id)
}
pub fn deny_all_requests(&self) -> usize {
let mut st = self.state.lock().unwrap();
let count = st.pending_requests.len();
let entries: Vec<ReplicationLogEntry> = st
.pending_requests
.iter()
.map(|req| ReplicationLogEntry {
timestamp: now_secs(),
direction: "incoming".into(),
peer_id: req.sender_id,
reason: req.reason.clone(),
result: "denied".into(),
})
.collect();
for entry in entries {
st.replication_log.push(entry);
}
st.pending_requests.clear();
count
}
pub fn expire_requests(&self) {
let mut st = self.state.lock().unwrap();
let before = st.pending_requests.len();
st.pending_requests
.retain(|r| r.received_at.elapsed().as_secs() < 60);
let expired = before - st.pending_requests.len();
if expired > 0 {
st.log_event(format!("{} replication request(s) expired", expired));
}
}
pub fn format_requests(&self) -> String {
let st = self.state.lock().unwrap();
if st.pending_requests.is_empty() {
return " (no pending requests)\n".to_string();
}
let mut out = String::new();
for r in &st.pending_requests {
let age = r.received_at.elapsed().as_secs();
out.push_str(&format!(
" #{} from {} gen={} fitness={} size={}KB age={}s: {}\n",
r.id,
id_to_hex(&r.sender_id),
r.sender_generation,
r.sender_fitness,
r.package_size / 1024,
age,
r.reason
));
}
out
}
pub fn format_replication_log(&self) -> String {
let st = self.state.lock().unwrap();
if st.replication_log.is_empty() {
return " (no replication history)\n".to_string();
}
let mut out = String::new();
for e in st.replication_log.iter().rev().take(20) {
out.push_str(&format!(
" {} {} {} [{}]: {}\n",
e.timestamp,
e.direction,
id_to_hex(&e.peer_id),
e.result,
e.reason
));
}
out
}
fn log_replication(
&self,
st: &mut MeshState,
direction: &str,
peer: &NodeId,
reason: &str,
result: &str,
) {
st.replication_log.push(ReplicationLogEntry {
timestamp: now_secs(),
direction: direction.into(),
peer_id: *peer,
reason: reason.into(),
result: result.into(),
});
if st.replication_log.len() > 100 {
st.replication_log.remove(0);
}
}
}
fn now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn network_thread(socket: UdpSocket, state: Arc<Mutex<MeshState>>, my_id: NodeId) {
let _ = socket.set_read_timeout(Some(RECV_TIMEOUT));
let mut recv_buf = [0u8; 65535];
let mut last_heartbeat = Instant::now() - HEARTBEAT_INTERVAL;
loop {
{
let st = state.lock().unwrap();
if !st.running {
return;
}
}
if let Ok((len, src_addr)) = socket.recv_from(&mut recv_buf) {
let packet = &recv_buf[..len];
handle_packet(packet, src_addr, &socket, &state, &my_id);
}
if last_heartbeat.elapsed() >= HEARTBEAT_INTERVAL {
send_heartbeat(&socket, &state, &my_id);
last_heartbeat = Instant::now();
}
check_proposals(&socket, &state, &my_id);
prune_peers(&state);
check_auto_replication(&state);
if last_heartbeat.elapsed() >= DISCOVERY_INTERVAL {
let st = state.lock().unwrap();
if st.auto_discover {
let port = st.port;
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE);
encode_header(&mut buf, MSG_DISCOVERY_BEACON, &my_id, port);
let _ = socket.send_to(&buf, format!("127.0.0.1:{}", DISCOVERY_PORT));
}
}
}
}
fn send_heartbeat(socket: &UdpSocket, state: &Arc<Mutex<MeshState>>, my_id: &NodeId) {
let st = state.lock().unwrap();
let port = st.port;
let load = st.load;
let capacity = st.capacity;
let peer_count = st.peers.len() as u16;
let gossip_addrs: Vec<SocketAddr> = st
.peers
.values()
.take(MAX_GOSSIP_PEERS)
.map(|p| p.addr)
.collect();
let fitness = st.fitness;
let all_peer_addrs: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE + 19 + gossip_addrs.len() * 6);
encode_header(&mut buf, MSG_HEARTBEAT, my_id, port);
write_u32(&mut buf, load);
write_u32(&mut buf, capacity);
write_u16(&mut buf, peer_count);
write_u8(&mut buf, gossip_addrs.len() as u8);
for addr in &gossip_addrs {
if let SocketAddr::V4(v4) = addr {
write_bytes(&mut buf, &v4.ip().octets());
write_u16(&mut buf, v4.port());
}
}
write_i64(&mut buf, fitness);
for addr in &all_peer_addrs {
let _ = socket.send_to(&buf, addr);
}
let sexp = crate::sexp::msg_peer_status(my_id, peer_count as usize, fitness, load, capacity);
let sexp_str = sexp.to_string();
let sb = sexp_str.as_bytes();
let mut sexp_buf = Vec::with_capacity(HEADER_SIZE + 2 + sb.len());
encode_header(&mut sexp_buf, MSG_SEXP, my_id, port);
write_u16(&mut sexp_buf, sb.len() as u16);
write_bytes(&mut sexp_buf, sb);
for addr in &all_peer_addrs {
let _ = socket.send_to(&sexp_buf, addr);
}
}
fn handle_packet(
data: &[u8],
src_addr: SocketAddr,
socket: &UdpSocket,
state: &Arc<Mutex<MeshState>>,
my_id: &NodeId,
) {
let mut pos = 0;
let (msg_type, sender_id, sender_port) = match decode_header(data, &mut pos) {
Some(h) => h,
None => return, };
if sender_id == *my_id {
return;
}
let sender_addr = SocketAddr::new(src_addr.ip(), sender_port);
match msg_type {
MSG_HEARTBEAT => {
handle_heartbeat(data, &mut pos, sender_id, sender_addr, socket, state, my_id)
}
MSG_PROPOSE => handle_propose(data, &mut pos, sender_id, sender_addr, socket, state, my_id),
MSG_VOTE => handle_vote(data, &mut pos, sender_id, socket, state, my_id),
MSG_COMMIT => handle_commit(data, &mut pos, sender_id, state),
MSG_REJECT => handle_reject(data, &mut pos, sender_id, state),
MSG_DATA => handle_data(data, &mut pos, sender_id, state),
MSG_GOAL_BROADCAST => handle_goal_broadcast(data, &mut pos, sender_id, state),
MSG_TASK_CLAIM => handle_task_claim(data, &mut pos, sender_id, state),
MSG_TASK_RESULT => handle_task_result(data, &mut pos, sender_id, state),
MSG_WORD_SHARE => handle_word_share(data, &mut pos, sender_id, state),
MSG_DISCOVERY_BEACON => {} MSG_SPAWN_INTENT => {
let mut st = state.lock().unwrap();
st.spawn_intent_active = true;
st.log_event(format!("spawn intent from {}", id_to_hex(&sender_id)));
}
MSG_CULL_INTENT => {
let mut st = state.lock().unwrap();
st.cull_intent_active = true;
st.log_event(format!("cull intent from {}", id_to_hex(&sender_id)));
}
MSG_REPLICATE_REQUEST => {
handle_replicate_request(data, &mut pos, sender_id, state, socket, my_id);
}
MSG_REPLICATE_ACCEPT | MSG_REPLICATE_DENY => {
let mut st = state.lock().unwrap();
let result = if msg_type == MSG_REPLICATE_ACCEPT {
"accepted"
} else {
"denied"
};
st.log_event(format!(
"replication {} by {}",
result,
id_to_hex(&sender_id)
));
}
MSG_SEXP => {
handle_sexp(data, &mut pos, sender_id, state);
}
_ => {} }
}
fn handle_heartbeat(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
sender_addr: SocketAddr,
socket: &UdpSocket,
state: &Arc<Mutex<MeshState>>,
my_id: &NodeId,
) {
let load = match read_u32(data, pos) {
Some(v) => v,
None => return,
};
let capacity = match read_u32(data, pos) {
Some(v) => v,
None => return,
};
let peer_count = match read_u16(data, pos) {
Some(v) => v,
None => return,
};
let gossip_count = match read_u8(data, pos) {
Some(v) => v as usize,
None => return,
};
let mut gossip_addrs = Vec::with_capacity(gossip_count);
for _ in 0..gossip_count {
let ip_bytes = match read_bytes(data, pos, 4) {
Some(b) => b,
None => break,
};
let port = match read_u16(data, pos) {
Some(v) => v,
None => break,
};
let ip = Ipv4Addr::new(ip_bytes[0], ip_bytes[1], ip_bytes[2], ip_bytes[3]);
gossip_addrs.push(SocketAddr::V4(SocketAddrV4::new(ip, port)));
}
let mut st = state.lock().unwrap();
let my_port = st.port;
let pseudo = addr_to_pseudo_id(&sender_addr);
if pseudo != sender_id {
st.peers.remove(&pseudo);
}
let is_new = !st.peers.contains_key(&sender_id);
st.peers.insert(
sender_id,
PeerInfo {
addr: sender_addr,
id: sender_id,
load,
capacity,
peer_count,
fitness: 0, last_seen: Instant::now(),
},
);
if is_new {
st.log_event(format!(
"discovered peer {} @ {}",
id_to_hex(&sender_id),
sender_addr
));
let port = st.port;
let active_goals: Vec<(Goal, Vec<Task>)> = st
.goals
.goals
.values()
.filter(|g| g.status != GoalStatus::Failed)
.map(|g| {
let tasks: Vec<Task> = g
.task_ids
.iter()
.filter_map(|tid| st.goals.tasks.get(tid).cloned())
.collect();
(g.clone(), tasks)
})
.collect();
for (goal, tasks) in &active_goals {
let buf = encode_goal_broadcast(my_id, port, goal, tasks);
let _ = socket.send_to(&buf, sender_addr);
}
}
for addr in gossip_addrs {
if addr.port() == my_port && addr.ip().is_loopback() {
continue;
}
let known = st.peers.values().any(|p| p.addr == addr);
if !known {
let pseudo_id = addr_to_pseudo_id(&addr);
st.peers.insert(
pseudo_id,
PeerInfo {
addr,
id: pseudo_id,
load: 0,
capacity: 0,
peer_count: 0,
fitness: 0,
last_seen: Instant::now(),
},
);
st.log_event(format!("gossip: discovered peer at {}", addr));
}
}
if let Some(fitness) = read_i64(data, pos) {
if let Some(peer) = st.peers.get_mut(&sender_id) {
peer.fitness = fitness;
}
}
}
fn handle_propose(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
sender_addr: SocketAddr,
socket: &UdpSocket,
state: &Arc<Mutex<MeshState>>,
my_id: &NodeId,
) {
let proposal_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let reason_len = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let reason_bytes = match read_bytes(data, pos, reason_len) {
Some(v) => v,
None => return,
};
let reason = String::from_utf8_lossy(&reason_bytes).to_string();
let mut st = state.lock().unwrap();
st.log_event(format!(
"proposal #{:016x} from {}: {}",
proposal_id,
id_to_hex(&sender_id),
reason
));
let vote = st.load < (st.capacity * 80 / 100);
let port = st.port;
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE + 9);
encode_header(&mut buf, MSG_VOTE, my_id, port);
write_u64(&mut buf, proposal_id);
write_u8(&mut buf, if vote { 1 } else { 0 });
let _ = socket.send_to(&buf, sender_addr);
}
fn handle_vote(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
_socket: &UdpSocket,
state: &Arc<Mutex<MeshState>>,
_my_id: &NodeId,
) {
let proposal_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let vote_val = match read_u8(data, pos) {
Some(v) => v,
None => return,
};
let mut st = state.lock().unwrap();
let mut yes_count = 0;
let mut no_count = 0;
if let Some(prop) = st.proposals.get_mut(&proposal_id) {
if vote_val == 1 {
prop.votes_yes.insert(sender_id);
} else {
prop.votes_no.insert(sender_id);
}
yes_count = prop.votes_yes.len();
no_count = prop.votes_no.len();
}
st.log_event(format!(
"vote {} from {} on #{:016x} (yes={}, no={})",
if vote_val == 1 { "YES" } else { "NO" },
id_to_hex(&sender_id),
proposal_id,
yes_count,
no_count,
));
}
fn handle_commit(data: &[u8], pos: &mut usize, sender_id: NodeId, state: &Arc<Mutex<MeshState>>) {
let proposal_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let state_len = match read_u32(data, pos) {
Some(v) => v as usize,
None => return,
};
let state_bytes = match read_bytes(data, pos, state_len) {
Some(v) => v,
None => return,
};
let mut st = state.lock().unwrap();
st.log_event(format!(
"commit #{:016x} from {} ({} bytes of state)",
proposal_id,
id_to_hex(&sender_id),
state_bytes.len()
));
st.inbox.push_back(InboxMessage {
from: sender_id,
data: state_bytes,
});
}
fn handle_reject(data: &[u8], pos: &mut usize, sender_id: NodeId, state: &Arc<Mutex<MeshState>>) {
let proposal_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let mut st = state.lock().unwrap();
st.log_event(format!(
"reject #{:016x} from {}",
proposal_id,
id_to_hex(&sender_id)
));
st.proposals.remove(&proposal_id);
}
fn handle_data(data: &[u8], pos: &mut usize, sender_id: NodeId, state: &Arc<Mutex<MeshState>>) {
let data_len = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let payload = match read_bytes(data, pos, data_len) {
Some(v) => v,
None => return,
};
let mut st = state.lock().unwrap();
st.inbox.push_back(InboxMessage {
from: sender_id,
data: payload,
});
}
fn encode_goal_broadcast(my_id: &NodeId, port: u16, goal: &Goal, tasks: &[Task]) -> Vec<u8> {
let mut buf = Vec::with_capacity(256);
encode_header(&mut buf, MSG_GOAL_BROADCAST, my_id, port);
write_u64(&mut buf, goal.id);
write_i64(&mut buf, goal.priority);
write_u8(&mut buf, goal.status.as_u8());
write_bytes(&mut buf, &goal.creator);
write_u64(&mut buf, goal.created_at);
let desc = goal.description.as_bytes();
write_u16(&mut buf, desc.len() as u16);
write_bytes(&mut buf, desc);
if let Some(ref code) = goal.code {
let cb = code.as_bytes();
write_u8(&mut buf, 1);
write_u16(&mut buf, cb.len() as u16);
write_bytes(&mut buf, cb);
} else {
write_u8(&mut buf, 0);
}
write_u16(&mut buf, tasks.len() as u16);
for task in tasks {
write_u64(&mut buf, task.id);
write_u8(&mut buf, task.status.as_u8());
if let Some(ref assignee) = task.assigned_to {
write_u8(&mut buf, 1);
write_bytes(&mut buf, assignee);
} else {
write_u8(&mut buf, 0);
}
let tdesc = task.description.as_bytes();
write_u16(&mut buf, tdesc.len() as u16);
write_bytes(&mut buf, tdesc);
if let Some(ref result) = task.result {
write_u8(&mut buf, 1);
write_u8(&mut buf, if result.success { 1 } else { 0 });
write_u16(&mut buf, result.stack_snapshot.len() as u16);
for &val in &result.stack_snapshot {
write_i64(&mut buf, val);
}
let ob = result.output.as_bytes();
write_u16(&mut buf, ob.len() as u16);
write_bytes(&mut buf, ob);
let eb = result.error.as_deref().unwrap_or("").as_bytes();
write_u16(&mut buf, eb.len() as u16);
write_bytes(&mut buf, eb);
} else {
write_u8(&mut buf, 0);
}
}
buf
}
fn handle_goal_broadcast(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
state: &Arc<Mutex<MeshState>>,
) {
let goal_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let priority = match read_i64(data, pos) {
Some(v) => v,
None => return,
};
let status = match read_u8(data, pos) {
Some(v) => GoalStatus::from_u8(v),
None => return,
};
let creator = match read_bytes(data, pos, 8) {
Some(v) => {
let mut id = [0u8; 8];
id.copy_from_slice(&v);
id
}
None => return,
};
let created_at = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let desc_len = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let desc_bytes = match read_bytes(data, pos, desc_len) {
Some(v) => v,
None => return,
};
let description = String::from_utf8_lossy(&desc_bytes).to_string();
let has_code = match read_u8(data, pos) {
Some(v) => v != 0,
None => return,
};
let code = if has_code {
let clen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let cb = match read_bytes(data, pos, clen) {
Some(v) => v,
None => return,
};
Some(String::from_utf8_lossy(&cb).to_string())
} else {
None
};
let task_count = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let mut task_ids = Vec::with_capacity(task_count);
let mut tasks = Vec::with_capacity(task_count);
for _ in 0..task_count {
let task_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let task_status = match read_u8(data, pos) {
Some(v) => TaskStatus::from_u8(v),
None => return,
};
let has_assignee = match read_u8(data, pos) {
Some(v) => v != 0,
None => return,
};
let assigned_to = if has_assignee {
match read_bytes(data, pos, 8) {
Some(v) => {
let mut id = [0u8; 8];
id.copy_from_slice(&v);
Some(id)
}
None => return,
}
} else {
None
};
let tdesc_len = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let tdesc_bytes = match read_bytes(data, pos, tdesc_len) {
Some(v) => v,
None => return,
};
let task_desc = String::from_utf8_lossy(&tdesc_bytes).to_string();
let has_result = match read_u8(data, pos) {
Some(v) => v != 0,
None => return,
};
let result = if has_result {
let success = match read_u8(data, pos) {
Some(v) => v != 0,
None => return,
};
let slen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let mut stack_snapshot = Vec::with_capacity(slen);
for _ in 0..slen {
match read_i64(data, pos) {
Some(v) => stack_snapshot.push(v),
None => return,
}
}
let olen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let output = match read_bytes(data, pos, olen) {
Some(v) => String::from_utf8_lossy(&v).to_string(),
None => return,
};
let elen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let error = if elen > 0 {
match read_bytes(data, pos, elen) {
Some(v) => Some(String::from_utf8_lossy(&v).to_string()),
None => return,
}
} else {
None
};
Some(TaskResult {
stack_snapshot,
output,
success,
error,
})
} else {
None
};
task_ids.push(task_id);
tasks.push(Task {
id: task_id,
goal_id,
description: task_desc,
code: None, assigned_to,
status: task_status,
result,
created_at,
});
}
let goal = Goal {
id: goal_id,
description,
code,
priority,
status,
created_at,
creator,
task_ids,
};
let mut st = state.lock().unwrap();
let is_new = !st.goals.goals.contains_key(&goal_id);
st.goals.merge_goal(goal);
for task in tasks {
st.goals.merge_task(task);
}
if is_new {
let desc = st
.goals
.goals
.get(&goal_id)
.map(|g| g.description.clone())
.unwrap_or_else(|| "?".to_string());
st.log_event(format!(
"received goal #{} from {}: {}",
goal_id,
id_to_hex(&sender_id),
desc
));
}
}
fn handle_task_claim(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
state: &Arc<Mutex<MeshState>>,
) {
let task_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let _goal_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let mut st = state.lock().unwrap();
if let Some(task) = st.goals.tasks.get_mut(&task_id) {
if task.status == TaskStatus::Waiting {
task.assigned_to = Some(sender_id);
task.status = TaskStatus::Running;
let gid = task.goal_id;
if let Some(goal) = st.goals.goals.get_mut(&gid) {
if goal.status == GoalStatus::Pending {
goal.status = GoalStatus::Active;
}
}
st.log_event(format!(
"peer {} claimed task #{}",
id_to_hex(&sender_id),
task_id
));
}
}
}
fn handle_task_result(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
state: &Arc<Mutex<MeshState>>,
) {
let task_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let _goal_id = match read_u64(data, pos) {
Some(v) => v,
None => return,
};
let success = match read_u8(data, pos) {
Some(v) => v != 0,
None => return,
};
let slen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let mut stack_snapshot = Vec::with_capacity(slen);
for _ in 0..slen {
match read_i64(data, pos) {
Some(v) => stack_snapshot.push(v),
None => return,
}
}
let olen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let output = match read_bytes(data, pos, olen) {
Some(v) => String::from_utf8_lossy(&v).to_string(),
None => return,
};
let elen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let error = if elen > 0 {
match read_bytes(data, pos, elen) {
Some(v) => Some(String::from_utf8_lossy(&v).to_string()),
None => return,
}
} else {
None
};
let result = TaskResult {
stack_snapshot,
output,
success,
error,
};
let mut st = state.lock().unwrap();
st.goals.complete_task(task_id, Some(result));
st.log_event(format!(
"task #{} result from {}",
task_id,
id_to_hex(&sender_id)
));
}
fn handle_replicate_request(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
state: &Arc<Mutex<MeshState>>,
socket: &UdpSocket,
my_id: &NodeId,
) {
let fitness = read_i64(data, pos).unwrap_or(0);
let generation = read_u32(data, pos).unwrap_or(0);
let package_size = read_u64(data, pos).unwrap_or(0);
let reason_len = read_u16(data, pos).unwrap_or(0) as usize;
let reason = read_bytes(data, pos, reason_len)
.map(|b| String::from_utf8_lossy(&b).to_string())
.unwrap_or_default();
if package_size > 100_000_000 {
let mut st = state.lock().unwrap();
st.log_event(format!(
"rejected oversized replication from {} ({}MB)",
id_to_hex(&sender_id),
package_size / 1_000_000
));
return;
}
let mut st = state.lock().unwrap();
let auto_accept = match st.trust_level {
TrustLevel::All => true,
TrustLevel::Mesh => st.peers.contains_key(&sender_id),
TrustLevel::Family => {
st.parent_id.as_ref() == Some(&sender_id) || st.children_ids.contains(&sender_id)
}
TrustLevel::None => false,
};
if auto_accept {
let trust_label = st.trust_level.label().to_string();
st.log_event(format!(
"[auto-accepted from {} (trust: {})]",
id_to_hex(&sender_id),
trust_label
));
let port = st.port;
drop(st);
let mut buf = Vec::with_capacity(HEADER_SIZE);
encode_header(&mut buf, MSG_REPLICATE_ACCEPT, my_id, port);
if let Some(peer) = state.lock().unwrap().peers.get(&sender_id) {
let _ = socket.send_to(&buf, peer.addr);
}
} else {
let from_peer = st
.pending_requests
.iter()
.filter(|r| r.sender_id == sender_id)
.count();
if from_peer >= 3 {
st.log_event(format!(
"rate-limited replication from {}",
id_to_hex(&sender_id)
));
return;
}
let rid = st.next_request_id;
st.next_request_id += 1;
st.pending_requests.push(ReplicationRequest {
id: rid,
sender_id,
sender_fitness: fitness,
sender_generation: generation,
package_size,
reason: reason.clone(),
received_at: Instant::now(),
});
st.log_event(format!(
"[replication request #{} from {} (gen {}, fitness {}, {}KB) — ACCEPT or DENY]",
rid,
id_to_hex(&sender_id),
generation,
fitness,
package_size / 1024
));
}
}
fn handle_word_share(
data: &[u8],
pos: &mut usize,
sender_id: NodeId,
state: &Arc<Mutex<MeshState>>,
) {
let name_len = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let name_bytes = match read_bytes(data, pos, name_len) {
Some(v) => v,
None => return,
};
let name = String::from_utf8_lossy(&name_bytes).to_string();
let source_len = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let source_bytes = match read_bytes(data, pos, source_len) {
Some(v) => v,
None => return,
};
let source = String::from_utf8_lossy(&source_bytes).to_string();
let mut st = state.lock().unwrap();
if st.shared_words.iter().any(|(n, _)| n == &name) {
return;
}
st.word_inbox.push_back(SharedWord {
name: name.clone(),
body_source: source,
origin: sender_id,
});
st.shared_words.push((name.clone(), sender_id));
st.log_event(format!(
"received word '{}' from {}",
name,
id_to_hex(&sender_id)
));
}
fn handle_sexp(data: &[u8], pos: &mut usize, sender_id: NodeId, state: &Arc<Mutex<MeshState>>) {
let slen = match read_u16(data, pos) {
Some(v) => v as usize,
None => return,
};
let sbytes = match read_bytes(data, pos, slen) {
Some(v) => v,
None => return,
};
let msg = String::from_utf8_lossy(&sbytes).to_string();
let mut st = state.lock().unwrap();
st.sexp_inbox.push_back(msg.clone());
st.log_event(format!(
"sexp from {}: {}",
id_to_hex(&sender_id),
if msg.len() > 60 {
format!("{}...", &msg[..60])
} else {
msg
}
));
}
fn check_auto_replication(state: &Arc<Mutex<MeshState>>) {
let mut st = state.lock().unwrap();
if st.auto_replicate_needed {
return; }
let pending = st.goals.pending_task_count();
let units = st.peers.len() + 1; if pending > units {
if let Some(last) = st.last_proposal_time {
if last.elapsed() < PROPOSAL_COOLDOWN {
return;
}
}
let has_active = st
.proposals
.values()
.any(|p| !p.committed && p.started.elapsed() < PROPOSAL_TIMEOUT);
if !has_active {
st.auto_replicate_needed = true;
st.log_event(format!(
"auto-replicate triggered: {} pending tasks, {} units",
pending, units
));
}
}
}
fn check_proposals(socket: &UdpSocket, state: &Arc<Mutex<MeshState>>, my_id: &NodeId) {
let mut st = state.lock().unwrap();
let port = st.port;
let proposal_ids: Vec<u64> = st.proposals.keys().cloned().collect();
let peer_addrs: Vec<SocketAddr> = st.peers.values().map(|p| p.addr).collect();
for pid in proposal_ids {
let prop = match st.proposals.get(&pid) {
Some(p) => p.clone(),
None => continue,
};
if prop.committed {
continue;
}
if prop.proposer != *my_id {
if prop.started.elapsed() > PROPOSAL_TIMEOUT * 2 {
st.proposals.remove(&pid);
}
continue;
}
let total = prop.total_peers_at_start;
let quorum = total / 2 + 1;
if prop.votes_yes.len() >= quorum {
st.log_event(format!(
"quorum reached for #{:016x} ({}/{})",
pid,
prop.votes_yes.len(),
total
));
if let Some(p) = st.proposals.get_mut(&pid) {
p.committed = true;
}
st.last_proposal_time = Some(Instant::now());
if let Some(state_bytes) = &prop.state_bytes {
let mut buf = Vec::with_capacity(HEADER_SIZE + 8 + 4 + state_bytes.len());
encode_header(&mut buf, MSG_COMMIT, my_id, port);
write_u64(&mut buf, pid);
write_u32(&mut buf, state_bytes.len() as u32);
write_bytes(&mut buf, state_bytes);
for addr in &peer_addrs {
let _ = socket.send_to(&buf, addr);
}
}
continue;
}
if prop.votes_no.len() > total / 2 {
st.log_event(format!(
"proposal #{:016x} rejected ({} NO votes)",
pid,
prop.votes_no.len()
));
st.proposals.remove(&pid);
st.last_proposal_time = Some(Instant::now());
let mut buf = Vec::with_capacity(HEADER_SIZE + 8);
encode_header(&mut buf, MSG_REJECT, my_id, port);
write_u64(&mut buf, pid);
for addr in &peer_addrs {
let _ = socket.send_to(&buf, addr);
}
continue;
}
if prop.started.elapsed() > PROPOSAL_TIMEOUT {
st.log_event(format!(
"proposal #{:016x} timed out (yes={}, no={}, needed={})",
pid,
prop.votes_yes.len(),
prop.votes_no.len(),
quorum
));
st.proposals.remove(&pid);
st.last_proposal_time = Some(Instant::now());
let mut buf = Vec::with_capacity(HEADER_SIZE + 8);
encode_header(&mut buf, MSG_REJECT, my_id, port);
write_u64(&mut buf, pid);
for addr in &peer_addrs {
let _ = socket.send_to(&buf, addr);
}
}
}
}
fn prune_peers(state: &Arc<Mutex<MeshState>>) {
let mut st = state.lock().unwrap();
let stale: Vec<NodeId> = st
.peers
.iter()
.filter(|(_, p)| p.last_seen.elapsed() > PEER_TIMEOUT)
.map(|(id, _)| *id)
.collect();
for id in &stale {
st.peers.remove(id);
st.log_event(format!("peer {} timed out", id_to_hex(id)));
}
}
fn addr_to_pseudo_id(addr: &SocketAddr) -> NodeId {
let mut id = [0xFFu8; 8];
let port_bytes = addr.port().to_be_bytes();
id[0] = port_bytes[0];
id[1] = port_bytes[1];
if let SocketAddr::V4(v4) = addr {
let octets = v4.ip().octets();
id[2] = octets[0];
id[3] = octets[1];
id[4] = octets[2];
id[5] = octets[3];
}
id
}
pub fn serialize_state(
dictionary: &[Entry],
memory: &[Cell],
here: usize,
goals: Option<&GoalRegistry>,
) -> Vec<u8> {
let mut buf = Vec::with_capacity(4096);
write_bytes(&mut buf, MAGIC);
write_u8(&mut buf, 1);
write_u32(&mut buf, dictionary.len() as u32);
for entry in dictionary {
let name_bytes = entry.name.as_bytes();
write_u16(&mut buf, name_bytes.len() as u16);
write_bytes(&mut buf, name_bytes);
let flags = (if entry.immediate { 1u8 } else { 0 }) | (if entry.hidden { 2u8 } else { 0 });
write_u8(&mut buf, flags);
write_u32(&mut buf, entry.body.len() as u32);
for instr in &entry.body {
serialize_instruction(&mut buf, instr);
}
}
let mem_cells = here.min(memory.len());
write_u32(&mut buf, here as u32);
write_u32(&mut buf, mem_cells as u32);
for cell in memory.iter().take(mem_cells) {
write_i64(&mut buf, *cell);
}
if let Some(registry) = goals {
let goal_list: Vec<&Goal> = registry.goals.values().collect();
write_u32(&mut buf, goal_list.len() as u32);
for goal in &goal_list {
write_u64(&mut buf, goal.id);
write_i64(&mut buf, goal.priority);
write_u8(&mut buf, goal.status.as_u8());
write_bytes(&mut buf, &goal.creator);
write_u64(&mut buf, goal.created_at);
let desc = goal.description.as_bytes();
write_u16(&mut buf, desc.len() as u16);
write_bytes(&mut buf, desc);
write_u16(&mut buf, goal.task_ids.len() as u16);
for tid in &goal.task_ids {
write_u64(&mut buf, *tid);
}
}
let task_list: Vec<&Task> = registry.tasks.values().collect();
write_u32(&mut buf, task_list.len() as u32);
for task in &task_list {
write_u64(&mut buf, task.id);
write_u64(&mut buf, task.goal_id);
write_u8(&mut buf, task.status.as_u8());
if let Some(ref assignee) = task.assigned_to {
write_u8(&mut buf, 1);
write_bytes(&mut buf, assignee);
} else {
write_u8(&mut buf, 0);
}
let tdesc = task.description.as_bytes();
write_u16(&mut buf, tdesc.len() as u16);
write_bytes(&mut buf, tdesc);
if let Some(ref result) = task.result {
write_u8(&mut buf, 1);
write_u8(&mut buf, if result.success { 1 } else { 0 });
write_u16(&mut buf, result.stack_snapshot.len() as u16);
for &val in &result.stack_snapshot {
write_i64(&mut buf, val);
}
let ob = result.output.as_bytes();
write_u16(&mut buf, ob.len() as u16);
write_bytes(&mut buf, ob);
let eb = result.error.as_deref().unwrap_or("").as_bytes();
write_u16(&mut buf, eb.len() as u16);
write_bytes(&mut buf, eb);
} else {
write_u8(&mut buf, 0);
}
write_u64(&mut buf, task.created_at);
}
}
buf
}
fn serialize_instruction(buf: &mut Vec<u8>, instr: &Instruction) {
match instr {
Instruction::Primitive(id) => {
write_u8(buf, 0);
write_u32(buf, *id as u32);
}
Instruction::Literal(val) => {
write_u8(buf, 1);
write_i64(buf, *val);
}
Instruction::Call(idx) => {
write_u8(buf, 2);
write_u32(buf, *idx as u32);
}
Instruction::StringLit(s) => {
write_u8(buf, 3);
let bytes = s.as_bytes();
write_u32(buf, bytes.len() as u32);
write_bytes(buf, bytes);
}
Instruction::Branch(offset) => {
write_u8(buf, 4);
write_i64(buf, *offset);
}
Instruction::BranchIfZero(offset) => {
write_u8(buf, 5);
write_i64(buf, *offset);
}
}
}
pub fn deserialize_state(data: &[u8]) -> Option<(Vec<Entry>, Vec<Cell>, usize)> {
let mut pos = 0;
let magic = read_bytes(data, &mut pos, 4)?;
if magic != MAGIC {
return None;
}
let version = read_u8(data, &mut pos)?;
if version != 1 {
return None;
}
let entry_count = read_u32(data, &mut pos)? as usize;
let mut dictionary = Vec::with_capacity(entry_count);
for _ in 0..entry_count {
let name_len = read_u16(data, &mut pos)? as usize;
let name_bytes = read_bytes(data, &mut pos, name_len)?;
let name = String::from_utf8_lossy(&name_bytes).to_string();
let flags = read_u8(data, &mut pos)?;
let immediate = flags & 1 != 0;
let hidden = flags & 2 != 0;
let body_len = read_u32(data, &mut pos)? as usize;
let mut body = Vec::with_capacity(body_len);
for _ in 0..body_len {
let instr = deserialize_instruction(data, &mut pos)?;
body.push(instr);
}
dictionary.push(Entry {
name,
immediate,
hidden,
body,
});
}
let here = read_u32(data, &mut pos)? as usize;
let mem_cells = read_u32(data, &mut pos)? as usize;
let mut memory = vec![0i64; 65536];
for i in 0..mem_cells.min(memory.len()) {
memory[i] = read_i64(data, &mut pos)?;
}
Some((dictionary, memory, here))
}
fn deserialize_instruction(data: &[u8], pos: &mut usize) -> Option<Instruction> {
let tag = read_u8(data, pos)?;
match tag {
0 => {
let id = read_u32(data, pos)? as usize;
Some(Instruction::Primitive(id))
}
1 => {
let val = read_i64(data, pos)?;
Some(Instruction::Literal(val))
}
2 => {
let idx = read_u32(data, pos)? as usize;
Some(Instruction::Call(idx))
}
3 => {
let len = read_u32(data, pos)? as usize;
let bytes = read_bytes(data, pos, len)?;
let s = String::from_utf8_lossy(&bytes).to_string();
Some(Instruction::StringLit(s))
}
4 => {
let offset = read_i64(data, pos)?;
Some(Instruction::Branch(offset))
}
5 => {
let offset = read_i64(data, pos)?;
Some(Instruction::BranchIfZero(offset))
}
_ => None,
}
}