Skip to main content

bb_ops/protocols/global_registry/
mod.rs

1//! `GlobalRegistry` — federation membership protocol with two halves.
2//!
3//! - [`GlobalRegistryClient`]: bound on every client Node. Records an
4//!   `Announce` op whose `server_peer` input wires the bootstrap-server
5//!   identity through the graph (no struct field). On dispatch the
6//!   client ships an envelope carrying `ctx.current.self_peer` to the
7//!   server's well-known [`GLOBAL_REGISTRY_SERVER_CREF`] component, then
8//!   refreshes its TTL/heartbeat state from the server's `Handshake`
9//!   reply.
10//!
11//! - [`GlobalRegistryServer`]: bound on the server Node. Accepts
12//!   inbound `Announce` envelopes, registers the announcing peer in
13//!   the runtime [`AddressBook`](bb_runtime::framework::AddressBook)
14//!   under a server-assigned TTL, and replies with a `Handshake`
15//!   carrying `(assigned_ttl_ns, heartbeat_interval_ns)`. Lazy
16//!   eviction runs at the top of every `Sample` / `CurrentView` read.
17//!
18//! The protocol carries no static peer identity on either side: the
19//! client wires `server_peer` via graph input, and the server reads
20//! the announcing peer from the inbound payload.
21
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25use serde::{Deserialize, Serialize};
26
27use bb_runtime::atomic::{AtomicOpDecl, AtomicOpKind, AtomicOpsetDecl, DispatchResult};
28use bb_runtime::bus::OpError;
29use bb_runtime::completion::{CompletionHandle, ContractResponse};
30use bb_runtime::envelope::{SlotFill, WireEnvelope};
31use bb_runtime::framework::Address;
32use bb_runtime::ids::{ComponentRef, PeerId};
33use bb_runtime::runtime::RuntimeResourceRef;
34use bb_runtime::slot_value::SlotValue;
35use bb_runtime::syscall::values::BytesValue;
36
37use bb_ir::types::{TYPE_BYTES, TYPE_PEER_ID, TYPE_PEER_ID_VEC, TYPE_SCALAR_I32, TYPE_TRIGGER};
38
39/// Well-known `ComponentRef` the client addresses the server's
40/// [`GlobalRegistryServer`] component at. Pinned on both sides; a
41/// production deployment would resolve via discovery.
42pub const GLOBAL_REGISTRY_SERVER_CREF: u32 = 0;
43
44/// Well-known `ComponentRef` the server addresses the client's
45/// [`GlobalRegistryClient`] component at when delivering Handshake
46/// replies. Pinned on both sides.
47pub const GLOBAL_REGISTRY_CLIENT_CREF: u32 = 1;
48
49/// Atomic-op opset domain shared by both halves.
50pub const GLOBAL_REGISTRY_DOMAIN: &str = "ai.bytesandbrains.protocol.global_registry";
51
52/// Handshake payload the server replies with on a successful
53/// `Announce`. The client decodes TTL/heartbeat state and merges
54/// `server_addresses` into its [`AddressBook`] entry for the server
55/// peer so subsequent dial attempts can pick any reachable endpoint.
56#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
57pub struct Handshake {
58    /// TTL the server assigned this client. The client must
59    /// re-`Announce` before this elapses or the server lazily evicts
60    /// the entry on its next `Sample` / `CurrentView` read.
61    pub assigned_ttl_ns: u64,
62    /// Server-derived heartbeat interval (`assigned_ttl_ns / 3`).
63    /// The client throttles outgoing `Announce` calls to no more
64    /// than one per interval.
65    pub heartbeat_interval_ns: u64,
66    /// Full multi-address bag the server advertises for itself. The
67    /// client lands these in its [`AddressBook`] entry for the
68    /// server peer so the dialer can pick any reachable endpoint.
69    /// Empty means the server failed to populate its own
70    /// `local_addresses()` — a deployment error the server rejects
71    /// before reaching this struct.
72    pub server_addresses: Vec<Address>,
73}
74
75// ─── Client ────────────────────────────────────────────────────────
76
77/// Client half of GlobalRegistry. Holds TTL / heartbeat state echoed
78/// from the server's most recent Handshake; the server peer is wired
79/// through the graph at every `Announce` and is not a struct field.
80#[derive(Clone, Debug, Default, Serialize, Deserialize, bb_derive::Concrete)]
81pub struct GlobalRegistryClient {
82    /// TTL the server assigned in its last Handshake response.
83    /// Stamped into outgoing `Announce` envelopes as
84    /// `remaining_deadline_ns` so receivers can age out stale entries
85    /// without per-peer state.
86    pub last_assigned_ttl_ns: u64,
87
88    /// Heartbeat interval the server computed in its last Handshake
89    /// (`assigned_ttl_ns / 3`). The client respects this interval when
90    /// throttling subsequent Announces.
91    pub last_heartbeat_interval_ns: u64,
92
93    /// Monotonic-ns timestamp of the most recent Announce dispatch.
94    /// Skipped on snapshot/restore so bootstrap re-seeds the cadence
95    /// on resume.
96    #[serde(skip)]
97    pub last_announce_ts_ns: u64,
98}
99
100impl GlobalRegistryClient {
101    /// Construct with default TTL state. The server peer is wired
102    /// through the graph at every `Announce` op and is not a field.
103    pub fn new() -> Self {
104        Self::default()
105    }
106}
107
108/// Atomic-op declarations for the client half. `Announce` carries
109/// the server peer as a graph input; `Handshake` delivers the
110/// server's TTL/heartbeat reply back into the client.
111static GLOBAL_REGISTRY_CLIENT_OPS: &[AtomicOpDecl] = &[
112    AtomicOpDecl {
113        name: "Announce",
114        inputs: &[("server_peer", &TYPE_PEER_ID)],
115        outputs: &[("wakeup", &TYPE_TRIGGER)],
116        kind: AtomicOpKind::Immediate,
117        type_relations: &[],
118    },
119    AtomicOpDecl {
120        name: "Handshake",
121        inputs: &[],
122        outputs: &[("wakeup", &TYPE_TRIGGER)],
123        kind: AtomicOpKind::Immediate,
124        type_relations: &[],
125    },
126];
127
128impl bb_runtime::roles::ProtocolRuntime for GlobalRegistryClient {
129    type Error = OpError;
130
131    fn atomic_opset(&self) -> AtomicOpsetDecl {
132        AtomicOpsetDecl {
133            domain: GLOBAL_REGISTRY_DOMAIN,
134            version: 1,
135            ops: GLOBAL_REGISTRY_CLIENT_OPS,
136        }
137    }
138
139    fn dispatch_atomic(
140        &mut self,
141        op_type: &str,
142        inputs: &[(&str, &dyn SlotValue)],
143        ctx: &mut RuntimeResourceRef<'_>,
144    ) -> Result<DispatchResult, OpError> {
145        match op_type {
146            "Announce" => {
147                let now = ctx.time.scheduler.now_ns();
148                // Heartbeat throttle. Sub-interval calls are silent
149                // no-ops; the first Announce always fires because
150                // both `last_announce_ts_ns` and
151                // `last_heartbeat_interval_ns` start at zero.
152                if self.last_announce_ts_ns != 0
153                    && self.last_heartbeat_interval_ns != 0
154                    && now.saturating_sub(self.last_announce_ts_ns)
155                        < self.last_heartbeat_interval_ns
156                {
157                    return Ok(DispatchResult::Immediate(Vec::new()));
158                }
159
160                let server_peer = downcast_peer_id(inputs, "server_peer")?;
161
162                // The Announce payload carries the client's full
163                // address bag so the server's AddressBook learns
164                // every reachable endpoint on first contact. No
165                // synthesis fallback: a client with no local
166                // addresses is a deployment error caught at the
167                // bottom of the bootstrap path, not papered over
168                // with a `/p2p/<PeerId>` placeholder here.
169                let local_addresses = ctx.local_addresses().to_vec();
170                if local_addresses.is_empty() {
171                    return Err(OpError {
172                        detail: "GlobalRegistryClient::Announce: no local addresses; \
173                                 configure via install(...) or node.add_local_address()"
174                            .to_string(),
175                        ..Default::default()
176                    });
177                }
178                let payload = bincode::serialize(&(ctx.current.self_peer, local_addresses))
179                    .map_err(|e| OpError {
180                        detail: format!("Announce: serialize (self_peer, addresses): {e}"),
181                        ..Default::default()
182                    })?;
183
184                let dest_suffix = Address::empty()
185                    .component(ComponentRef::from(GLOBAL_REGISTRY_SERVER_CREF))
186                    .op("Announce")
187                    .to_bytes();
188                let dest_peer_addr = Address::empty().p2p(server_peer).to_bytes();
189
190                let env = WireEnvelope {
191                    dest_peer_addresses: vec![dest_peer_addr],
192                    fills: vec![SlotFill {
193                        dest_suffix,
194                        payload,
195                        trigger_only: false,
196                        ..Default::default()
197                    }],
198                    correlation: None,
199                    remaining_deadline_ns: self.last_assigned_ttl_ns,
200                    edge_rtt_reports: Vec::new(),
201                    ..Default::default()
202                };
203                ctx.net.outbound.push(env);
204
205                self.last_announce_ts_ns = now;
206                Ok(DispatchResult::Immediate(Vec::new()))
207            }
208            "Handshake" => {
209                let payload = inputs
210                    .iter()
211                    .find_map(|(_, v)| v.as_any().downcast_ref::<BytesValue>().map(|b| b.0.clone()))
212                    .ok_or_else(|| OpError {
213                        detail: "Handshake: missing BytesValue payload".to_string(),
214                        ..Default::default()
215                    })?;
216                let handshake: Handshake = bincode::deserialize(&payload).map_err(|e| OpError {
217                    detail: format!("Handshake: decode: {e}"),
218                    ..Default::default()
219                })?;
220                self.last_assigned_ttl_ns = handshake.assigned_ttl_ns;
221                self.last_heartbeat_interval_ns = handshake.heartbeat_interval_ns;
222
223                // Merge the server's advertised address bag into the
224                // client's AddressBook. A handshake with an empty
225                // `server_addresses` is malformed (the server's
226                // dispatch rejects that case before serializing) so
227                // a defensive skip beats a crash here. AddressBook
228                // failures surface as a bus event rather than a
229                // hard return so a transient cap collision cannot
230                // tip the client into a fatal-error spiral.
231                if !handshake.server_addresses.is_empty() {
232                    if let Some(server_peer) = ctx.current.inbound.src_peer {
233                        if let Err(e) = ctx
234                            .peers
235                            .addresses
236                            .add_peer(server_peer, handshake.server_addresses)
237                        {
238                            ctx.bus.publish(bb_runtime::bus::NodeEvent::Infra(
239                                bb_runtime::bus::InfraEvent::OpFailure {
240                                    op_ref: ctx.current.op_ref,
241                                    error: OpError {
242                                        detail: format!(
243                                            "Handshake: address_book.add_peer({server_peer:?}): {e}"
244                                        ),
245                                        ..Default::default()
246                                    },
247                                },
248                            ));
249                        }
250                    }
251                }
252                Ok(DispatchResult::Immediate(Vec::new()))
253            }
254            other => Err(OpError {
255                detail: format!("unknown op for GlobalRegistryClient: {other}"),
256                ..Default::default()
257            }),
258        }
259    }
260}
261
262// ─── Server ────────────────────────────────────────────────────────
263
264/// Per-deployment knobs for [`GlobalRegistryServer`]. All durations
265/// are in nanoseconds. Defaults follow the deep-research bound:
266/// 90 s TTL, 30 s floor, 5 min ceiling.
267#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
268pub struct GlobalRegistryServerConfig {
269    /// TTL the server stamps onto fresh registrations. Clients
270    /// receive this verbatim in their Handshake reply.
271    pub default_ttl_ns: u64,
272    /// Floor for aggressive eviction policy bumps. Not enforced
273    /// directly today; reserved as the documented lower bound for
274    /// future server-side cohort tuning.
275    pub min_ttl_ns: u64,
276    /// Ceiling on stale entries. Not enforced directly today;
277    /// reserved as the documented upper bound for future
278    /// server-side cohort tuning.
279    pub max_ttl_ns: u64,
280}
281
282impl Default for GlobalRegistryServerConfig {
283    fn default() -> Self {
284        Self {
285            default_ttl_ns: 90_000_000_000,
286            min_ttl_ns: 30_000_000_000,
287            max_ttl_ns: 300_000_000_000,
288        }
289    }
290}
291
292/// Server half of GlobalRegistry. Maintains
293/// `PeerId → (expires_at_ns, source_address)` over the announced
294/// cohort and exposes the cohort as a
295/// [`bb_runtime::contracts::PeerSelector`] source. Eviction is lazy:
296/// `Sample` / `CurrentView` drop entries whose `expires_at_ns` has
297/// elapsed before consulting the registry.
298#[derive(Debug, Serialize, Deserialize, bb_derive::Concrete, bb_derive::PeerSelector)]
299pub struct GlobalRegistryServer {
300    /// Configurable TTL bounds + heartbeat policy.
301    pub config: GlobalRegistryServerConfig,
302
303    /// RNG seed for deterministic `Sample` selection. Persisted so a
304    /// restored server samples consistently.
305    pub seed: u64,
306
307    /// Registry: `PeerId → (expires_at_ns, source_address)`. Restored
308    /// alongside `seed` so cohort continuity survives snapshot/restore.
309    pub entries: HashMap<PeerId, (u64, Address)>,
310
311    /// Per-call counter the seedable RNG mixes in so successive
312    /// `sample(n)` returns vary even on a constant seed. Reset to
313    /// zero on snapshot/restore — restored servers replay sampling
314    /// from a known starting point. `AtomicU64` is the minimal
315    /// lock-free shape that fits the Contract's `&self`-only `select`
316    /// signature.
317    #[serde(skip)]
318    sample_counter: AtomicU64,
319}
320
321impl Default for GlobalRegistryServer {
322    fn default() -> Self {
323        Self {
324            config: GlobalRegistryServerConfig::default(),
325            seed: 0,
326            entries: HashMap::new(),
327            sample_counter: AtomicU64::new(0),
328        }
329    }
330}
331
332impl Clone for GlobalRegistryServer {
333    fn clone(&self) -> Self {
334        Self {
335            config: self.config,
336            seed: self.seed,
337            entries: self.entries.clone(),
338            sample_counter: AtomicU64::new(0),
339        }
340    }
341}
342
343impl GlobalRegistryServer {
344    /// Construct a fresh server with the default TTL/heartbeat policy
345    /// and `seed` driving deterministic `Sample` selection.
346    pub fn new(seed: u64) -> Self {
347        Self {
348            config: GlobalRegistryServerConfig::default(),
349            seed,
350            entries: HashMap::new(),
351            sample_counter: AtomicU64::new(0),
352        }
353    }
354
355    /// Construct with an explicit `config` override.
356    pub fn with_config(seed: u64, config: GlobalRegistryServerConfig) -> Self {
357        Self {
358            config,
359            seed,
360            entries: HashMap::new(),
361            sample_counter: AtomicU64::new(0),
362        }
363    }
364
365    /// Server-derived heartbeat interval. Integer division on
366    /// `default_ttl_ns / 3` gives the client three windows to refresh
367    /// before eviction.
368    pub fn heartbeat_interval_ns(&self) -> u64 {
369        self.config.default_ttl_ns / 3
370    }
371
372    /// Drop registry entries whose `expires_at_ns < now_ns` and
373    /// release their address-book references. Called at the top of
374    /// every `Sample` / `CurrentView` read.
375    fn evict_expired(&mut self, now_ns: u64, addresses: &mut bb_runtime::framework::AddressBook) {
376        let expired: Vec<PeerId> = self
377            .entries
378            .iter()
379            .filter_map(|(peer, (expires, _))| (now_ns >= *expires).then_some(*peer))
380            .collect();
381        for peer in expired {
382            self.entries.remove(&peer);
383            let _ = addresses.drop_peer(peer);
384        }
385    }
386
387    /// Live cohort (post-eviction) as a `Vec<PeerId>` in insertion
388    /// order. Helper used by both `Sample` and `CurrentView`.
389    fn live_peers(&self) -> Vec<PeerId> {
390        self.entries.keys().copied().collect()
391    }
392}
393
394impl bb_runtime::contracts::PeerSelector for GlobalRegistryServer {
395    type Error = OpError;
396
397    fn select(
398        &mut self,
399        ctx: &mut bb_runtime::runtime::RuntimeResourceRef<'_>,
400        params: bb_runtime::contracts::peer_selector::SelectParams,
401        _completion: CompletionHandle<Vec<PeerId>, Self::Error>,
402    ) -> ContractResponse<Vec<PeerId>, Self::Error> {
403        use bb_runtime::contracts::peer_selector::SelectParams;
404        let now = ctx.time.scheduler.now_ns();
405        self.evict_expired(now, ctx.peers.addresses);
406        let known = self.live_peers();
407        let out = match params {
408            SelectParams::All => known,
409            SelectParams::Random { n } => {
410                sample_n(&known, n as usize, self.seed, &self.sample_counter)
411            }
412            SelectParams::NearKey { key: _, n } => {
413                let take = (n as usize).min(known.len());
414                known[..take].to_vec()
415            }
416        };
417        ContractResponse::Now(Ok(out))
418    }
419
420    fn current_view(
421        &mut self,
422        ctx: &mut bb_runtime::runtime::RuntimeResourceRef<'_>,
423        _completion: CompletionHandle<Vec<PeerId>, Self::Error>,
424    ) -> ContractResponse<Vec<PeerId>, Self::Error> {
425        let now = ctx.time.scheduler.now_ns();
426        self.evict_expired(now, ctx.peers.addresses);
427        ContractResponse::Now(Ok(self.live_peers()))
428    }
429}
430
431/// Atomic-op declarations for the server half. `Sample` and
432/// `CurrentView` carry an opaque cookie (libp2p-style incremental
433/// discovery). v1 ships full cookies — `next_cookie` is reserved
434/// state the client echoes back unchanged on the next read.
435static GLOBAL_REGISTRY_SERVER_OPS: &[AtomicOpDecl] = &[
436    AtomicOpDecl {
437        name: "Sample",
438        inputs: &[("count", &TYPE_SCALAR_I32), ("cookie", &TYPE_BYTES)],
439        outputs: &[("peers", &TYPE_PEER_ID_VEC), ("next_cookie", &TYPE_BYTES)],
440        kind: AtomicOpKind::Immediate,
441        type_relations: &[],
442    },
443    AtomicOpDecl {
444        name: "CurrentView",
445        inputs: &[("cookie", &TYPE_BYTES)],
446        outputs: &[("peers", &TYPE_PEER_ID_VEC), ("next_cookie", &TYPE_BYTES)],
447        kind: AtomicOpKind::Immediate,
448        type_relations: &[],
449    },
450    AtomicOpDecl {
451        name: "Announce",
452        inputs: &[],
453        outputs: &[],
454        kind: AtomicOpKind::Immediate,
455        type_relations: &[],
456    },
457];
458
459impl bb_runtime::roles::ProtocolRuntime for GlobalRegistryServer {
460    type Error = OpError;
461
462    fn atomic_opset(&self) -> AtomicOpsetDecl {
463        AtomicOpsetDecl {
464            domain: GLOBAL_REGISTRY_DOMAIN,
465            version: 1,
466            ops: GLOBAL_REGISTRY_SERVER_OPS,
467        }
468    }
469
470    fn dispatch_atomic(
471        &mut self,
472        op_type: &str,
473        inputs: &[(&str, &dyn SlotValue)],
474        ctx: &mut RuntimeResourceRef<'_>,
475    ) -> Result<DispatchResult, OpError> {
476        match op_type {
477            "Announce" => {
478                let payload = inputs
479                    .iter()
480                    .find_map(|(_, v)| v.as_any().downcast_ref::<BytesValue>().map(|b| b.0.clone()))
481                    .ok_or_else(|| OpError {
482                        detail: "Announce: missing BytesValue payload".to_string(),
483                        ..Default::default()
484                    })?;
485                let (announcing_peer, announced_addresses): (PeerId, Vec<Address>) =
486                    bincode::deserialize(&payload).map_err(|e| OpError {
487                        detail: format!("Announce: decode (peer, addresses): {e}"),
488                        ..Default::default()
489                    })?;
490                if announced_addresses.is_empty() {
491                    return Err(OpError {
492                        detail:
493                            "GlobalRegistryServer::Announce: client supplied empty address list"
494                                .to_string(),
495                        ..Default::default()
496                    });
497                }
498
499                let now = ctx.time.scheduler.now_ns();
500                let ttl = self.config.default_ttl_ns;
501                let heartbeat = ttl / 3;
502                // First-address-wins for the registry's source-address
503                // slot. The full bag still lands in the AddressBook
504                // for dial fan-out.
505                let source_addr = announced_addresses[0].clone();
506
507                // Idempotent registration: a re-Announce from a known
508                // peer overwrites `expires_at_ns` without bumping the
509                // address-book `ref_count` (add_peer is a no-op for
510                // duplicate addresses on an existing entry, and the
511                // ref_count bump is what would otherwise leak).
512                let is_new = !self.entries.contains_key(&announcing_peer);
513                self.entries.insert(
514                    announcing_peer,
515                    (now.saturating_add(ttl), source_addr.clone()),
516                );
517                if is_new {
518                    ctx.peers
519                        .addresses
520                        .add_peer(announcing_peer, announced_addresses)
521                        .map_err(|e| OpError {
522                            detail: format!("Announce: address_book.add_peer: {e}"),
523                            ..Default::default()
524                        })?;
525                }
526
527                // Server's own addresses ride back on the Handshake
528                // so the client's dialer can pick any reachable
529                // endpoint. No synthesis fallback.
530                let server_addresses = ctx.local_addresses().to_vec();
531                if server_addresses.is_empty() {
532                    return Err(OpError {
533                        detail: "GlobalRegistryServer::Announce: no local addresses to advertise; \
534                                 configure via install(...) or node.add_local_address()"
535                            .to_string(),
536                        ..Default::default()
537                    });
538                }
539                let handshake = Handshake {
540                    assigned_ttl_ns: ttl,
541                    heartbeat_interval_ns: heartbeat,
542                    server_addresses,
543                };
544                let handshake_payload = bincode::serialize(&handshake).map_err(|e| OpError {
545                    detail: format!("Announce: serialize handshake: {e}"),
546                    ..Default::default()
547                })?;
548                let reply_suffix = Address::empty()
549                    .component(ComponentRef::from(GLOBAL_REGISTRY_CLIENT_CREF))
550                    .op("Handshake")
551                    .to_bytes();
552                let reply_env = WireEnvelope {
553                    dest_peer_addresses: vec![Address::empty().p2p(announcing_peer).to_bytes()],
554                    fills: vec![SlotFill {
555                        dest_suffix: reply_suffix,
556                        payload: handshake_payload,
557                        trigger_only: false,
558                        ..Default::default()
559                    }],
560                    correlation: None,
561                    remaining_deadline_ns: 0,
562                    edge_rtt_reports: Vec::new(),
563                    ..Default::default()
564                };
565                ctx.net.outbound.push(reply_env);
566
567                Ok(DispatchResult::Immediate(Vec::new()))
568            }
569            "Sample" => {
570                let now = ctx.time.scheduler.now_ns();
571                self.evict_expired(now, ctx.peers.addresses);
572                let n = inputs
573                    .iter()
574                    .find_map(|(name, v)| {
575                        (*name == "count").then(|| v.as_any().downcast_ref::<u32>().copied())
576                    })
577                    .flatten()
578                    .unwrap_or(0) as usize;
579                let known = self.live_peers();
580                let picked = sample_n(&known, n, self.seed, &self.sample_counter);
581                let next_cookie = next_cookie_from(inputs);
582                Ok(DispatchResult::Immediate(vec![
583                    ("peers".to_string(), Box::new(picked) as Box<dyn SlotValue>),
584                    (
585                        "next_cookie".to_string(),
586                        Box::new(BytesValue(next_cookie)) as Box<dyn SlotValue>,
587                    ),
588                ]))
589            }
590            "CurrentView" => {
591                let now = ctx.time.scheduler.now_ns();
592                self.evict_expired(now, ctx.peers.addresses);
593                let view = self.live_peers();
594                let next_cookie = next_cookie_from(inputs);
595                Ok(DispatchResult::Immediate(vec![
596                    ("peers".to_string(), Box::new(view) as Box<dyn SlotValue>),
597                    (
598                        "next_cookie".to_string(),
599                        Box::new(BytesValue(next_cookie)) as Box<dyn SlotValue>,
600                    ),
601                ]))
602            }
603            other => Err(OpError {
604                detail: format!("unknown op for GlobalRegistryServer: {other}"),
605                ..Default::default()
606            }),
607        }
608    }
609}
610
611// ─── Helpers ───────────────────────────────────────────────────────
612
613/// Downcast `inputs[name]` to a [`PeerId`]. Handles both the typed
614/// `PeerIdValue` carrier and a bare `PeerId` (constant ops route the
615/// raw value when no carrier is required).
616fn downcast_peer_id(inputs: &[(&str, &dyn SlotValue)], name: &str) -> Result<PeerId, OpError> {
617    for (slot, v) in inputs {
618        if *slot != name {
619            continue;
620        }
621        if let Some(p) = v.as_any().downcast_ref::<PeerId>() {
622            return Ok(*p);
623        }
624        if let Some(pv) = v
625            .as_any()
626            .downcast_ref::<bb_runtime::syscall::values::PeerIdValue>()
627        {
628            return Ok(pv.0);
629        }
630    }
631    Err(OpError {
632        detail: format!("missing `{name}` input (expected PeerId)"),
633        ..Default::default()
634    })
635}
636
637/// Read the inbound `cookie` (opaque bytes) and echo it back as the
638/// `next_cookie`. v1 ships full cookies — pagination state stays
639/// reserved so the surface is stable when v2 adds chunked discovery.
640fn next_cookie_from(inputs: &[(&str, &dyn SlotValue)]) -> Vec<u8> {
641    for (slot, v) in inputs {
642        if *slot != "cookie" {
643            continue;
644        }
645        if let Some(b) = v.as_any().downcast_ref::<BytesValue>() {
646            return b.0.clone();
647        }
648    }
649    Vec::new()
650}
651
652/// Deterministic `n`-subset of `peers` driven by an xorshift mixed
653/// with `seed` and a per-call counter. Empty input yields an empty
654/// result.
655fn sample_n(peers: &[PeerId], n: usize, seed: u64, counter: &AtomicU64) -> Vec<PeerId> {
656    if peers.is_empty() || n == 0 {
657        return Vec::new();
658    }
659    let take = n.min(peers.len());
660    let count = counter.fetch_add(1, Ordering::Relaxed).wrapping_add(1);
661    let mut state = seed.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(count);
662    let mut pool: Vec<PeerId> = peers.to_vec();
663    for i in 0..take {
664        state ^= state << 13;
665        state ^= state >> 7;
666        state ^= state << 17;
667        let j = i + (state as usize) % (pool.len() - i);
668        pool.swap(i, j);
669    }
670    pool.truncate(take);
671    pool
672}
673