use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::{Deserialize, Serialize};
use bb_runtime::atomic::{AtomicOpDecl, AtomicOpKind, AtomicOpsetDecl, DispatchResult};
use bb_runtime::bus::OpError;
use bb_runtime::completion::{CompletionHandle, ContractResponse};
use bb_runtime::envelope::{SlotFill, WireEnvelope};
use bb_runtime::framework::Address;
use bb_runtime::ids::{ComponentRef, PeerId};
use bb_runtime::runtime::RuntimeResourceRef;
use bb_runtime::slot_value::SlotValue;
use bb_runtime::syscall::values::BytesValue;
use bb_ir::types::{TYPE_BYTES, TYPE_PEER_ID, TYPE_PEER_ID_VEC, TYPE_SCALAR_I32, TYPE_TRIGGER};
pub const GLOBAL_REGISTRY_SERVER_CREF: u32 = 0;
pub const GLOBAL_REGISTRY_CLIENT_CREF: u32 = 1;
pub const GLOBAL_REGISTRY_DOMAIN: &str = "ai.bytesandbrains.protocol.global_registry";
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Handshake {
pub assigned_ttl_ns: u64,
pub heartbeat_interval_ns: u64,
pub server_addresses: Vec<Address>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, bb_derive::Concrete)]
pub struct GlobalRegistryClient {
pub last_assigned_ttl_ns: u64,
pub last_heartbeat_interval_ns: u64,
#[serde(skip)]
pub last_announce_ts_ns: u64,
}
impl GlobalRegistryClient {
pub fn new() -> Self {
Self::default()
}
}
static GLOBAL_REGISTRY_CLIENT_OPS: &[AtomicOpDecl] = &[
AtomicOpDecl {
name: "Announce",
inputs: &[("server_peer", &TYPE_PEER_ID)],
outputs: &[("wakeup", &TYPE_TRIGGER)],
kind: AtomicOpKind::Immediate,
type_relations: &[],
},
AtomicOpDecl {
name: "Handshake",
inputs: &[],
outputs: &[("wakeup", &TYPE_TRIGGER)],
kind: AtomicOpKind::Immediate,
type_relations: &[],
},
];
impl bb_runtime::roles::ProtocolRuntime for GlobalRegistryClient {
type Error = OpError;
fn atomic_opset(&self) -> AtomicOpsetDecl {
AtomicOpsetDecl {
domain: GLOBAL_REGISTRY_DOMAIN,
version: 1,
ops: GLOBAL_REGISTRY_CLIENT_OPS,
}
}
fn dispatch_atomic(
&mut self,
op_type: &str,
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>,
) -> Result<DispatchResult, OpError> {
match op_type {
"Announce" => {
let now = ctx.time.scheduler.now_ns();
if self.last_announce_ts_ns != 0
&& self.last_heartbeat_interval_ns != 0
&& now.saturating_sub(self.last_announce_ts_ns)
< self.last_heartbeat_interval_ns
{
return Ok(DispatchResult::Immediate(Vec::new()));
}
let server_peer = downcast_peer_id(inputs, "server_peer")?;
let local_addresses = ctx.local_addresses().to_vec();
if local_addresses.is_empty() {
return Err(OpError {
detail: "GlobalRegistryClient::Announce: no local addresses; \
configure via install(...) or node.add_local_address()"
.to_string(),
..Default::default()
});
}
let payload = bincode::serialize(&(ctx.current.self_peer, local_addresses))
.map_err(|e| OpError {
detail: format!("Announce: serialize (self_peer, addresses): {e}"),
..Default::default()
})?;
let dest_suffix = Address::empty()
.component(ComponentRef::from(GLOBAL_REGISTRY_SERVER_CREF))
.op("Announce")
.to_bytes();
let dest_peer_addr = Address::empty().p2p(server_peer).to_bytes();
let env = WireEnvelope {
dest_peer_addresses: vec![dest_peer_addr],
fills: vec![SlotFill {
dest_suffix,
payload,
trigger_only: false,
..Default::default()
}],
correlation: None,
remaining_deadline_ns: self.last_assigned_ttl_ns,
edge_rtt_reports: Vec::new(),
..Default::default()
};
ctx.net.outbound.push(env);
self.last_announce_ts_ns = now;
Ok(DispatchResult::Immediate(Vec::new()))
}
"Handshake" => {
let payload = inputs
.iter()
.find_map(|(_, v)| v.as_any().downcast_ref::<BytesValue>().map(|b| b.0.clone()))
.ok_or_else(|| OpError {
detail: "Handshake: missing BytesValue payload".to_string(),
..Default::default()
})?;
let handshake: Handshake = bincode::deserialize(&payload).map_err(|e| OpError {
detail: format!("Handshake: decode: {e}"),
..Default::default()
})?;
self.last_assigned_ttl_ns = handshake.assigned_ttl_ns;
self.last_heartbeat_interval_ns = handshake.heartbeat_interval_ns;
if !handshake.server_addresses.is_empty() {
if let Some(server_peer) = ctx.current.inbound.src_peer {
if let Err(e) = ctx
.peers
.addresses
.add_peer(server_peer, handshake.server_addresses)
{
ctx.bus.publish(bb_runtime::bus::NodeEvent::Infra(
bb_runtime::bus::InfraEvent::OpFailure {
op_ref: ctx.current.op_ref,
error: OpError {
detail: format!(
"Handshake: address_book.add_peer({server_peer:?}): {e}"
),
..Default::default()
},
},
));
}
}
}
Ok(DispatchResult::Immediate(Vec::new()))
}
other => Err(OpError {
detail: format!("unknown op for GlobalRegistryClient: {other}"),
..Default::default()
}),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct GlobalRegistryServerConfig {
pub default_ttl_ns: u64,
pub min_ttl_ns: u64,
pub max_ttl_ns: u64,
}
impl Default for GlobalRegistryServerConfig {
fn default() -> Self {
Self {
default_ttl_ns: 90_000_000_000,
min_ttl_ns: 30_000_000_000,
max_ttl_ns: 300_000_000_000,
}
}
}
#[derive(Debug, Serialize, Deserialize, bb_derive::Concrete, bb_derive::PeerSelector)]
pub struct GlobalRegistryServer {
pub config: GlobalRegistryServerConfig,
pub seed: u64,
pub entries: HashMap<PeerId, (u64, Address)>,
#[serde(skip)]
sample_counter: AtomicU64,
}
impl Default for GlobalRegistryServer {
fn default() -> Self {
Self {
config: GlobalRegistryServerConfig::default(),
seed: 0,
entries: HashMap::new(),
sample_counter: AtomicU64::new(0),
}
}
}
impl Clone for GlobalRegistryServer {
fn clone(&self) -> Self {
Self {
config: self.config,
seed: self.seed,
entries: self.entries.clone(),
sample_counter: AtomicU64::new(0),
}
}
}
impl GlobalRegistryServer {
pub fn new(seed: u64) -> Self {
Self {
config: GlobalRegistryServerConfig::default(),
seed,
entries: HashMap::new(),
sample_counter: AtomicU64::new(0),
}
}
pub fn with_config(seed: u64, config: GlobalRegistryServerConfig) -> Self {
Self {
config,
seed,
entries: HashMap::new(),
sample_counter: AtomicU64::new(0),
}
}
pub fn heartbeat_interval_ns(&self) -> u64 {
self.config.default_ttl_ns / 3
}
fn evict_expired(&mut self, now_ns: u64, addresses: &mut bb_runtime::framework::AddressBook) {
let expired: Vec<PeerId> = self
.entries
.iter()
.filter_map(|(peer, (expires, _))| (now_ns >= *expires).then_some(*peer))
.collect();
for peer in expired {
self.entries.remove(&peer);
let _ = addresses.drop_peer(peer);
}
}
fn live_peers(&self) -> Vec<PeerId> {
self.entries.keys().copied().collect()
}
}
impl bb_runtime::contracts::PeerSelector for GlobalRegistryServer {
type Error = OpError;
fn select(
&mut self,
ctx: &mut bb_runtime::runtime::RuntimeResourceRef<'_>,
params: bb_runtime::contracts::peer_selector::SelectParams,
_completion: CompletionHandle<Vec<PeerId>, Self::Error>,
) -> ContractResponse<Vec<PeerId>, Self::Error> {
use bb_runtime::contracts::peer_selector::SelectParams;
let now = ctx.time.scheduler.now_ns();
self.evict_expired(now, ctx.peers.addresses);
let known = self.live_peers();
let out = match params {
SelectParams::All => known,
SelectParams::Random { n } => {
sample_n(&known, n as usize, self.seed, &self.sample_counter)
}
SelectParams::NearKey { key: _, n } => {
let take = (n as usize).min(known.len());
known[..take].to_vec()
}
};
ContractResponse::Now(Ok(out))
}
fn current_view(
&mut self,
ctx: &mut bb_runtime::runtime::RuntimeResourceRef<'_>,
_completion: CompletionHandle<Vec<PeerId>, Self::Error>,
) -> ContractResponse<Vec<PeerId>, Self::Error> {
let now = ctx.time.scheduler.now_ns();
self.evict_expired(now, ctx.peers.addresses);
ContractResponse::Now(Ok(self.live_peers()))
}
}
static GLOBAL_REGISTRY_SERVER_OPS: &[AtomicOpDecl] = &[
AtomicOpDecl {
name: "Sample",
inputs: &[("count", &TYPE_SCALAR_I32), ("cookie", &TYPE_BYTES)],
outputs: &[("peers", &TYPE_PEER_ID_VEC), ("next_cookie", &TYPE_BYTES)],
kind: AtomicOpKind::Immediate,
type_relations: &[],
},
AtomicOpDecl {
name: "CurrentView",
inputs: &[("cookie", &TYPE_BYTES)],
outputs: &[("peers", &TYPE_PEER_ID_VEC), ("next_cookie", &TYPE_BYTES)],
kind: AtomicOpKind::Immediate,
type_relations: &[],
},
AtomicOpDecl {
name: "Announce",
inputs: &[],
outputs: &[],
kind: AtomicOpKind::Immediate,
type_relations: &[],
},
];
impl bb_runtime::roles::ProtocolRuntime for GlobalRegistryServer {
type Error = OpError;
fn atomic_opset(&self) -> AtomicOpsetDecl {
AtomicOpsetDecl {
domain: GLOBAL_REGISTRY_DOMAIN,
version: 1,
ops: GLOBAL_REGISTRY_SERVER_OPS,
}
}
fn dispatch_atomic(
&mut self,
op_type: &str,
inputs: &[(&str, &dyn SlotValue)],
ctx: &mut RuntimeResourceRef<'_>,
) -> Result<DispatchResult, OpError> {
match op_type {
"Announce" => {
let payload = inputs
.iter()
.find_map(|(_, v)| v.as_any().downcast_ref::<BytesValue>().map(|b| b.0.clone()))
.ok_or_else(|| OpError {
detail: "Announce: missing BytesValue payload".to_string(),
..Default::default()
})?;
let (announcing_peer, announced_addresses): (PeerId, Vec<Address>) =
bincode::deserialize(&payload).map_err(|e| OpError {
detail: format!("Announce: decode (peer, addresses): {e}"),
..Default::default()
})?;
if announced_addresses.is_empty() {
return Err(OpError {
detail:
"GlobalRegistryServer::Announce: client supplied empty address list"
.to_string(),
..Default::default()
});
}
let now = ctx.time.scheduler.now_ns();
let ttl = self.config.default_ttl_ns;
let heartbeat = ttl / 3;
let source_addr = announced_addresses[0].clone();
let is_new = !self.entries.contains_key(&announcing_peer);
self.entries.insert(
announcing_peer,
(now.saturating_add(ttl), source_addr.clone()),
);
if is_new {
ctx.peers
.addresses
.add_peer(announcing_peer, announced_addresses)
.map_err(|e| OpError {
detail: format!("Announce: address_book.add_peer: {e}"),
..Default::default()
})?;
}
let server_addresses = ctx.local_addresses().to_vec();
if server_addresses.is_empty() {
return Err(OpError {
detail: "GlobalRegistryServer::Announce: no local addresses to advertise; \
configure via install(...) or node.add_local_address()"
.to_string(),
..Default::default()
});
}
let handshake = Handshake {
assigned_ttl_ns: ttl,
heartbeat_interval_ns: heartbeat,
server_addresses,
};
let handshake_payload = bincode::serialize(&handshake).map_err(|e| OpError {
detail: format!("Announce: serialize handshake: {e}"),
..Default::default()
})?;
let reply_suffix = Address::empty()
.component(ComponentRef::from(GLOBAL_REGISTRY_CLIENT_CREF))
.op("Handshake")
.to_bytes();
let reply_env = WireEnvelope {
dest_peer_addresses: vec![Address::empty().p2p(announcing_peer).to_bytes()],
fills: vec![SlotFill {
dest_suffix: reply_suffix,
payload: handshake_payload,
trigger_only: false,
..Default::default()
}],
correlation: None,
remaining_deadline_ns: 0,
edge_rtt_reports: Vec::new(),
..Default::default()
};
ctx.net.outbound.push(reply_env);
Ok(DispatchResult::Immediate(Vec::new()))
}
"Sample" => {
let now = ctx.time.scheduler.now_ns();
self.evict_expired(now, ctx.peers.addresses);
let n = inputs
.iter()
.find_map(|(name, v)| {
(*name == "count").then(|| v.as_any().downcast_ref::<u32>().copied())
})
.flatten()
.unwrap_or(0) as usize;
let known = self.live_peers();
let picked = sample_n(&known, n, self.seed, &self.sample_counter);
let next_cookie = next_cookie_from(inputs);
Ok(DispatchResult::Immediate(vec![
("peers".to_string(), Box::new(picked) as Box<dyn SlotValue>),
(
"next_cookie".to_string(),
Box::new(BytesValue(next_cookie)) as Box<dyn SlotValue>,
),
]))
}
"CurrentView" => {
let now = ctx.time.scheduler.now_ns();
self.evict_expired(now, ctx.peers.addresses);
let view = self.live_peers();
let next_cookie = next_cookie_from(inputs);
Ok(DispatchResult::Immediate(vec![
("peers".to_string(), Box::new(view) as Box<dyn SlotValue>),
(
"next_cookie".to_string(),
Box::new(BytesValue(next_cookie)) as Box<dyn SlotValue>,
),
]))
}
other => Err(OpError {
detail: format!("unknown op for GlobalRegistryServer: {other}"),
..Default::default()
}),
}
}
}
fn downcast_peer_id(inputs: &[(&str, &dyn SlotValue)], name: &str) -> Result<PeerId, OpError> {
for (slot, v) in inputs {
if *slot != name {
continue;
}
if let Some(p) = v.as_any().downcast_ref::<PeerId>() {
return Ok(*p);
}
if let Some(pv) = v
.as_any()
.downcast_ref::<bb_runtime::syscall::values::PeerIdValue>()
{
return Ok(pv.0);
}
}
Err(OpError {
detail: format!("missing `{name}` input (expected PeerId)"),
..Default::default()
})
}
fn next_cookie_from(inputs: &[(&str, &dyn SlotValue)]) -> Vec<u8> {
for (slot, v) in inputs {
if *slot != "cookie" {
continue;
}
if let Some(b) = v.as_any().downcast_ref::<BytesValue>() {
return b.0.clone();
}
}
Vec::new()
}
fn sample_n(peers: &[PeerId], n: usize, seed: u64, counter: &AtomicU64) -> Vec<PeerId> {
if peers.is_empty() || n == 0 {
return Vec::new();
}
let take = n.min(peers.len());
let count = counter.fetch_add(1, Ordering::Relaxed).wrapping_add(1);
let mut state = seed.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(count);
let mut pool: Vec<PeerId> = peers.to_vec();
for i in 0..take {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
let j = i + (state as usize) % (pool.len() - i);
pool.swap(i, j);
}
pool.truncate(take);
pool
}