1use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25use serde::{Deserialize, Serialize};
26
27use bb_runtime::atomic::{AtomicOpDecl, AtomicOpKind, AtomicOpsetDecl, DispatchResult};
28use bb_runtime::bus::OpError;
29use bb_runtime::completion::{CompletionHandle, ContractResponse};
30use bb_runtime::envelope::{SlotFill, WireEnvelope};
31use bb_runtime::framework::Address;
32use bb_runtime::ids::{ComponentRef, PeerId};
33use bb_runtime::runtime::RuntimeResourceRef;
34use bb_runtime::slot_value::SlotValue;
35use bb_runtime::syscall::values::BytesValue;
36
37use bb_ir::types::{TYPE_BYTES, TYPE_PEER_ID, TYPE_PEER_ID_VEC, TYPE_SCALAR_I32, TYPE_TRIGGER};
38
39pub const GLOBAL_REGISTRY_SERVER_CREF: u32 = 0;
43
44pub const GLOBAL_REGISTRY_CLIENT_CREF: u32 = 1;
48
49pub const GLOBAL_REGISTRY_DOMAIN: &str = "ai.bytesandbrains.protocol.global_registry";
51
52#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
57pub struct Handshake {
58 pub assigned_ttl_ns: u64,
62 pub heartbeat_interval_ns: u64,
66 pub server_addresses: Vec<Address>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, bb_derive::Concrete)]
81pub struct GlobalRegistryClient {
82 pub last_assigned_ttl_ns: u64,
87
88 pub last_heartbeat_interval_ns: u64,
92
93 #[serde(skip)]
97 pub last_announce_ts_ns: u64,
98}
99
100impl GlobalRegistryClient {
101 pub fn new() -> Self {
104 Self::default()
105 }
106}
107
108static GLOBAL_REGISTRY_CLIENT_OPS: &[AtomicOpDecl] = &[
112 AtomicOpDecl {
113 name: "Announce",
114 inputs: &[("server_peer", &TYPE_PEER_ID)],
115 outputs: &[("wakeup", &TYPE_TRIGGER)],
116 kind: AtomicOpKind::Immediate,
117 type_relations: &[],
118 },
119 AtomicOpDecl {
120 name: "Handshake",
121 inputs: &[],
122 outputs: &[("wakeup", &TYPE_TRIGGER)],
123 kind: AtomicOpKind::Immediate,
124 type_relations: &[],
125 },
126];
127
128impl bb_runtime::roles::ProtocolRuntime for GlobalRegistryClient {
129 type Error = OpError;
130
131 fn atomic_opset(&self) -> AtomicOpsetDecl {
132 AtomicOpsetDecl {
133 domain: GLOBAL_REGISTRY_DOMAIN,
134 version: 1,
135 ops: GLOBAL_REGISTRY_CLIENT_OPS,
136 }
137 }
138
139 fn dispatch_atomic(
140 &mut self,
141 op_type: &str,
142 inputs: &[(&str, &dyn SlotValue)],
143 ctx: &mut RuntimeResourceRef<'_>,
144 ) -> Result<DispatchResult, OpError> {
145 match op_type {
146 "Announce" => {
147 let now = ctx.time.scheduler.now_ns();
148 if self.last_announce_ts_ns != 0
153 && self.last_heartbeat_interval_ns != 0
154 && now.saturating_sub(self.last_announce_ts_ns)
155 < self.last_heartbeat_interval_ns
156 {
157 return Ok(DispatchResult::Immediate(Vec::new()));
158 }
159
160 let server_peer = downcast_peer_id(inputs, "server_peer")?;
161
162 let local_addresses = ctx.local_addresses().to_vec();
170 if local_addresses.is_empty() {
171 return Err(OpError {
172 detail: "GlobalRegistryClient::Announce: no local addresses; \
173 configure via install(...) or node.add_local_address()"
174 .to_string(),
175 ..Default::default()
176 });
177 }
178 let payload = bincode::serialize(&(ctx.current.self_peer, local_addresses))
179 .map_err(|e| OpError {
180 detail: format!("Announce: serialize (self_peer, addresses): {e}"),
181 ..Default::default()
182 })?;
183
184 let dest_suffix = Address::empty()
185 .component(ComponentRef::from(GLOBAL_REGISTRY_SERVER_CREF))
186 .op("Announce")
187 .to_bytes();
188 let dest_peer_addr = Address::empty().p2p(server_peer).to_bytes();
189
190 let env = WireEnvelope {
191 dest_peer_addresses: vec![dest_peer_addr],
192 fills: vec![SlotFill {
193 dest_suffix,
194 payload,
195 trigger_only: false,
196 ..Default::default()
197 }],
198 correlation: None,
199 remaining_deadline_ns: self.last_assigned_ttl_ns,
200 edge_rtt_reports: Vec::new(),
201 ..Default::default()
202 };
203 ctx.net.outbound.push(env);
204
205 self.last_announce_ts_ns = now;
206 Ok(DispatchResult::Immediate(Vec::new()))
207 }
208 "Handshake" => {
209 let payload = inputs
210 .iter()
211 .find_map(|(_, v)| v.as_any().downcast_ref::<BytesValue>().map(|b| b.0.clone()))
212 .ok_or_else(|| OpError {
213 detail: "Handshake: missing BytesValue payload".to_string(),
214 ..Default::default()
215 })?;
216 let handshake: Handshake = bincode::deserialize(&payload).map_err(|e| OpError {
217 detail: format!("Handshake: decode: {e}"),
218 ..Default::default()
219 })?;
220 self.last_assigned_ttl_ns = handshake.assigned_ttl_ns;
221 self.last_heartbeat_interval_ns = handshake.heartbeat_interval_ns;
222
223 if !handshake.server_addresses.is_empty() {
232 if let Some(server_peer) = ctx.current.inbound.src_peer {
233 if let Err(e) = ctx
234 .peers
235 .addresses
236 .add_peer(server_peer, handshake.server_addresses)
237 {
238 ctx.bus.publish(bb_runtime::bus::NodeEvent::Infra(
239 bb_runtime::bus::InfraEvent::OpFailure {
240 op_ref: ctx.current.op_ref,
241 error: OpError {
242 detail: format!(
243 "Handshake: address_book.add_peer({server_peer:?}): {e}"
244 ),
245 ..Default::default()
246 },
247 },
248 ));
249 }
250 }
251 }
252 Ok(DispatchResult::Immediate(Vec::new()))
253 }
254 other => Err(OpError {
255 detail: format!("unknown op for GlobalRegistryClient: {other}"),
256 ..Default::default()
257 }),
258 }
259 }
260}
261
262#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
268pub struct GlobalRegistryServerConfig {
269 pub default_ttl_ns: u64,
272 pub min_ttl_ns: u64,
276 pub max_ttl_ns: u64,
280}
281
282impl Default for GlobalRegistryServerConfig {
283 fn default() -> Self {
284 Self {
285 default_ttl_ns: 90_000_000_000,
286 min_ttl_ns: 30_000_000_000,
287 max_ttl_ns: 300_000_000_000,
288 }
289 }
290}
291
292#[derive(Debug, Serialize, Deserialize, bb_derive::Concrete, bb_derive::PeerSelector)]
299pub struct GlobalRegistryServer {
300 pub config: GlobalRegistryServerConfig,
302
303 pub seed: u64,
306
307 pub entries: HashMap<PeerId, (u64, Address)>,
310
311 #[serde(skip)]
318 sample_counter: AtomicU64,
319}
320
321impl Default for GlobalRegistryServer {
322 fn default() -> Self {
323 Self {
324 config: GlobalRegistryServerConfig::default(),
325 seed: 0,
326 entries: HashMap::new(),
327 sample_counter: AtomicU64::new(0),
328 }
329 }
330}
331
332impl Clone for GlobalRegistryServer {
333 fn clone(&self) -> Self {
334 Self {
335 config: self.config,
336 seed: self.seed,
337 entries: self.entries.clone(),
338 sample_counter: AtomicU64::new(0),
339 }
340 }
341}
342
343impl GlobalRegistryServer {
344 pub fn new(seed: u64) -> Self {
347 Self {
348 config: GlobalRegistryServerConfig::default(),
349 seed,
350 entries: HashMap::new(),
351 sample_counter: AtomicU64::new(0),
352 }
353 }
354
355 pub fn with_config(seed: u64, config: GlobalRegistryServerConfig) -> Self {
357 Self {
358 config,
359 seed,
360 entries: HashMap::new(),
361 sample_counter: AtomicU64::new(0),
362 }
363 }
364
365 pub fn heartbeat_interval_ns(&self) -> u64 {
369 self.config.default_ttl_ns / 3
370 }
371
372 fn evict_expired(&mut self, now_ns: u64, addresses: &mut bb_runtime::framework::AddressBook) {
376 let expired: Vec<PeerId> = self
377 .entries
378 .iter()
379 .filter_map(|(peer, (expires, _))| (now_ns >= *expires).then_some(*peer))
380 .collect();
381 for peer in expired {
382 self.entries.remove(&peer);
383 let _ = addresses.drop_peer(peer);
384 }
385 }
386
387 fn live_peers(&self) -> Vec<PeerId> {
390 self.entries.keys().copied().collect()
391 }
392}
393
394impl bb_runtime::contracts::PeerSelector for GlobalRegistryServer {
395 type Error = OpError;
396
397 fn select(
398 &mut self,
399 ctx: &mut bb_runtime::runtime::RuntimeResourceRef<'_>,
400 params: bb_runtime::contracts::peer_selector::SelectParams,
401 _completion: CompletionHandle<Vec<PeerId>, Self::Error>,
402 ) -> ContractResponse<Vec<PeerId>, Self::Error> {
403 use bb_runtime::contracts::peer_selector::SelectParams;
404 let now = ctx.time.scheduler.now_ns();
405 self.evict_expired(now, ctx.peers.addresses);
406 let known = self.live_peers();
407 let out = match params {
408 SelectParams::All => known,
409 SelectParams::Random { n } => {
410 sample_n(&known, n as usize, self.seed, &self.sample_counter)
411 }
412 SelectParams::NearKey { key: _, n } => {
413 let take = (n as usize).min(known.len());
414 known[..take].to_vec()
415 }
416 };
417 ContractResponse::Now(Ok(out))
418 }
419
420 fn current_view(
421 &mut self,
422 ctx: &mut bb_runtime::runtime::RuntimeResourceRef<'_>,
423 _completion: CompletionHandle<Vec<PeerId>, Self::Error>,
424 ) -> ContractResponse<Vec<PeerId>, Self::Error> {
425 let now = ctx.time.scheduler.now_ns();
426 self.evict_expired(now, ctx.peers.addresses);
427 ContractResponse::Now(Ok(self.live_peers()))
428 }
429}
430
431static GLOBAL_REGISTRY_SERVER_OPS: &[AtomicOpDecl] = &[
436 AtomicOpDecl {
437 name: "Sample",
438 inputs: &[("count", &TYPE_SCALAR_I32), ("cookie", &TYPE_BYTES)],
439 outputs: &[("peers", &TYPE_PEER_ID_VEC), ("next_cookie", &TYPE_BYTES)],
440 kind: AtomicOpKind::Immediate,
441 type_relations: &[],
442 },
443 AtomicOpDecl {
444 name: "CurrentView",
445 inputs: &[("cookie", &TYPE_BYTES)],
446 outputs: &[("peers", &TYPE_PEER_ID_VEC), ("next_cookie", &TYPE_BYTES)],
447 kind: AtomicOpKind::Immediate,
448 type_relations: &[],
449 },
450 AtomicOpDecl {
451 name: "Announce",
452 inputs: &[],
453 outputs: &[],
454 kind: AtomicOpKind::Immediate,
455 type_relations: &[],
456 },
457];
458
459impl bb_runtime::roles::ProtocolRuntime for GlobalRegistryServer {
460 type Error = OpError;
461
462 fn atomic_opset(&self) -> AtomicOpsetDecl {
463 AtomicOpsetDecl {
464 domain: GLOBAL_REGISTRY_DOMAIN,
465 version: 1,
466 ops: GLOBAL_REGISTRY_SERVER_OPS,
467 }
468 }
469
470 fn dispatch_atomic(
471 &mut self,
472 op_type: &str,
473 inputs: &[(&str, &dyn SlotValue)],
474 ctx: &mut RuntimeResourceRef<'_>,
475 ) -> Result<DispatchResult, OpError> {
476 match op_type {
477 "Announce" => {
478 let payload = inputs
479 .iter()
480 .find_map(|(_, v)| v.as_any().downcast_ref::<BytesValue>().map(|b| b.0.clone()))
481 .ok_or_else(|| OpError {
482 detail: "Announce: missing BytesValue payload".to_string(),
483 ..Default::default()
484 })?;
485 let (announcing_peer, announced_addresses): (PeerId, Vec<Address>) =
486 bincode::deserialize(&payload).map_err(|e| OpError {
487 detail: format!("Announce: decode (peer, addresses): {e}"),
488 ..Default::default()
489 })?;
490 if announced_addresses.is_empty() {
491 return Err(OpError {
492 detail:
493 "GlobalRegistryServer::Announce: client supplied empty address list"
494 .to_string(),
495 ..Default::default()
496 });
497 }
498
499 let now = ctx.time.scheduler.now_ns();
500 let ttl = self.config.default_ttl_ns;
501 let heartbeat = ttl / 3;
502 let source_addr = announced_addresses[0].clone();
506
507 let is_new = !self.entries.contains_key(&announcing_peer);
513 self.entries.insert(
514 announcing_peer,
515 (now.saturating_add(ttl), source_addr.clone()),
516 );
517 if is_new {
518 ctx.peers
519 .addresses
520 .add_peer(announcing_peer, announced_addresses)
521 .map_err(|e| OpError {
522 detail: format!("Announce: address_book.add_peer: {e}"),
523 ..Default::default()
524 })?;
525 }
526
527 let server_addresses = ctx.local_addresses().to_vec();
531 if server_addresses.is_empty() {
532 return Err(OpError {
533 detail: "GlobalRegistryServer::Announce: no local addresses to advertise; \
534 configure via install(...) or node.add_local_address()"
535 .to_string(),
536 ..Default::default()
537 });
538 }
539 let handshake = Handshake {
540 assigned_ttl_ns: ttl,
541 heartbeat_interval_ns: heartbeat,
542 server_addresses,
543 };
544 let handshake_payload = bincode::serialize(&handshake).map_err(|e| OpError {
545 detail: format!("Announce: serialize handshake: {e}"),
546 ..Default::default()
547 })?;
548 let reply_suffix = Address::empty()
549 .component(ComponentRef::from(GLOBAL_REGISTRY_CLIENT_CREF))
550 .op("Handshake")
551 .to_bytes();
552 let reply_env = WireEnvelope {
553 dest_peer_addresses: vec![Address::empty().p2p(announcing_peer).to_bytes()],
554 fills: vec![SlotFill {
555 dest_suffix: reply_suffix,
556 payload: handshake_payload,
557 trigger_only: false,
558 ..Default::default()
559 }],
560 correlation: None,
561 remaining_deadline_ns: 0,
562 edge_rtt_reports: Vec::new(),
563 ..Default::default()
564 };
565 ctx.net.outbound.push(reply_env);
566
567 Ok(DispatchResult::Immediate(Vec::new()))
568 }
569 "Sample" => {
570 let now = ctx.time.scheduler.now_ns();
571 self.evict_expired(now, ctx.peers.addresses);
572 let n = inputs
573 .iter()
574 .find_map(|(name, v)| {
575 (*name == "count").then(|| v.as_any().downcast_ref::<u32>().copied())
576 })
577 .flatten()
578 .unwrap_or(0) as usize;
579 let known = self.live_peers();
580 let picked = sample_n(&known, n, self.seed, &self.sample_counter);
581 let next_cookie = next_cookie_from(inputs);
582 Ok(DispatchResult::Immediate(vec![
583 ("peers".to_string(), Box::new(picked) as Box<dyn SlotValue>),
584 (
585 "next_cookie".to_string(),
586 Box::new(BytesValue(next_cookie)) as Box<dyn SlotValue>,
587 ),
588 ]))
589 }
590 "CurrentView" => {
591 let now = ctx.time.scheduler.now_ns();
592 self.evict_expired(now, ctx.peers.addresses);
593 let view = self.live_peers();
594 let next_cookie = next_cookie_from(inputs);
595 Ok(DispatchResult::Immediate(vec![
596 ("peers".to_string(), Box::new(view) as Box<dyn SlotValue>),
597 (
598 "next_cookie".to_string(),
599 Box::new(BytesValue(next_cookie)) as Box<dyn SlotValue>,
600 ),
601 ]))
602 }
603 other => Err(OpError {
604 detail: format!("unknown op for GlobalRegistryServer: {other}"),
605 ..Default::default()
606 }),
607 }
608 }
609}
610
611fn downcast_peer_id(inputs: &[(&str, &dyn SlotValue)], name: &str) -> Result<PeerId, OpError> {
617 for (slot, v) in inputs {
618 if *slot != name {
619 continue;
620 }
621 if let Some(p) = v.as_any().downcast_ref::<PeerId>() {
622 return Ok(*p);
623 }
624 if let Some(pv) = v
625 .as_any()
626 .downcast_ref::<bb_runtime::syscall::values::PeerIdValue>()
627 {
628 return Ok(pv.0);
629 }
630 }
631 Err(OpError {
632 detail: format!("missing `{name}` input (expected PeerId)"),
633 ..Default::default()
634 })
635}
636
637fn next_cookie_from(inputs: &[(&str, &dyn SlotValue)]) -> Vec<u8> {
641 for (slot, v) in inputs {
642 if *slot != "cookie" {
643 continue;
644 }
645 if let Some(b) = v.as_any().downcast_ref::<BytesValue>() {
646 return b.0.clone();
647 }
648 }
649 Vec::new()
650}
651
652fn sample_n(peers: &[PeerId], n: usize, seed: u64, counter: &AtomicU64) -> Vec<PeerId> {
656 if peers.is_empty() || n == 0 {
657 return Vec::new();
658 }
659 let take = n.min(peers.len());
660 let count = counter.fetch_add(1, Ordering::Relaxed).wrapping_add(1);
661 let mut state = seed.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(count);
662 let mut pool: Vec<PeerId> = peers.to_vec();
663 for i in 0..take {
664 state ^= state << 13;
665 state ^= state >> 7;
666 state ^= state << 17;
667 let j = i + (state as usize) % (pool.len() - i);
668 pool.swap(i, j);
669 }
670 pool.truncate(take);
671 pool
672}
673