Skip to main content

meerkat_contracts/wire/
supervisor_bridge.rs

1//! Supervisor bridge protocol wire types.
2//!
3//! Typed wire envelope for cross-machine bridge commands between a mob
4//! supervisor and the runtime instances it manages. Both `meerkat-mob`
5//! (sender) and `meerkat-runtime` (receiver) consume these types. Neither
6//! crate depends on the other — the contracts crate owns the vocabulary.
7
8use meerkat_core::comms::{PeerAddress, PeerId, PeerName, TrustedPeerDescriptor};
9use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
10use std::fmt;
11
12/// Comms intent used for all supervisor bridge commands.
13///
14/// The sender sets this as the request `intent`; the receiver checks for it
15/// before attempting to deserialize `params` as [`BridgeCommand`].
16pub use meerkat_core::comms::SUPERVISOR_BRIDGE_INTENT;
17/// Address query parameter carrying the one-time bind bootstrap token.
18pub const SUPERVISOR_BRIDGE_BOOTSTRAP_TOKEN_PARAM: &str = "mob_supervisor_bootstrap_token";
19/// A supported supervisor bridge wire protocol version.
20///
21/// The JSON representation remains the historic integer so persisted records
22/// and wire payloads do not change shape. Construction is intentionally routed
23/// through this type so unsupported values fail at serde/TryFrom boundaries
24/// instead of being carried deeper as raw integers.
25#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
26#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
27pub struct BridgeProtocolVersion(u32);
28
29/// Unsupported supervisor bridge wire protocol version.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct UnsupportedBridgeProtocolVersion {
32    raw: u32,
33}
34
35impl fmt::Display for UnsupportedBridgeProtocolVersion {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        write!(
38            f,
39            "unsupported supervisor bridge protocol version {} (supported {:?}; default {})",
40            self.raw,
41            BridgeProtocolVersion::SUPPORTED,
42            BridgeProtocolVersion::DEFAULT
43        )
44    }
45}
46
47impl std::error::Error for UnsupportedBridgeProtocolVersion {}
48
49impl BridgeProtocolVersion {
50    /// Protocol with typed rejection causes.
51    pub const V2: Self = Self(2);
52    /// Current protocol version implemented by this bridge contract.
53    pub const CURRENT: Self = Self::V2;
54    /// Default protocol version for new supervisor authority records.
55    pub const DEFAULT: Self = Self::V2;
56    /// Protocol versions accepted by this bridge contract.
57    pub const SUPPORTED: &'static [Self] = &[Self::V2];
58
59    pub const fn is_supported(self) -> bool {
60        matches!(self.0, 2)
61    }
62
63    pub const fn same_protocol_as(self, other: Self) -> bool {
64        self.0 == other.0
65    }
66
67    pub fn supported() -> &'static [Self] {
68        Self::SUPPORTED
69    }
70
71    fn from_supported_u32(raw: u32) -> Result<Self, UnsupportedBridgeProtocolVersion> {
72        match raw {
73            2 => Ok(Self::V2),
74            _ => Err(UnsupportedBridgeProtocolVersion { raw }),
75        }
76    }
77}
78
79impl Default for BridgeProtocolVersion {
80    fn default() -> Self {
81        Self::DEFAULT
82    }
83}
84
85impl fmt::Debug for BridgeProtocolVersion {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        write!(f, "{}", self.0)
88    }
89}
90
91impl fmt::Display for BridgeProtocolVersion {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        write!(f, "{}", self.0)
94    }
95}
96
97impl TryFrom<u32> for BridgeProtocolVersion {
98    type Error = UnsupportedBridgeProtocolVersion;
99
100    fn try_from(raw: u32) -> Result<Self, Self::Error> {
101        Self::from_supported_u32(raw)
102    }
103}
104
105impl Serialize for BridgeProtocolVersion {
106    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
107    where
108        S: Serializer,
109    {
110        serializer.serialize_u32(self.0)
111    }
112}
113
114impl<'de> Deserialize<'de> for BridgeProtocolVersion {
115    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
116    where
117        D: Deserializer<'de>,
118    {
119        let raw = u32::deserialize(deserializer)?;
120        Self::from_supported_u32(raw).map_err(de::Error::custom)
121    }
122}
123
124/// Current supervisor bridge wire protocol version.
125///
126/// Version history:
127/// - `1`: initial protocol with untyped `BridgeReply::Rejected { reason: String }`.
128/// - `2`: `BridgeReply::Rejected { cause: BridgeRejectionCause, reason: String }`
129///   so callers branch on typed cause; runtime-side emitters pass typed
130///   `BridgeReply` values through to the transport. Delivery rejections
131///   carry typed `BridgeDeliveryRejectionCause` data; the string `reason`
132///   remains diagnostic presentation only.
133pub const SUPERVISOR_BRIDGE_PROTOCOL_VERSION: BridgeProtocolVersion =
134    BridgeProtocolVersion::CURRENT;
135/// Canonical current supervisor bridge protocol version.
136pub const SUPERVISOR_BRIDGE_CURRENT_PROTOCOL_VERSION: BridgeProtocolVersion =
137    BridgeProtocolVersion::CURRENT;
138/// Canonical default supervisor bridge protocol version for new authorities.
139pub const SUPERVISOR_BRIDGE_DEFAULT_PROTOCOL_VERSION: BridgeProtocolVersion =
140    BridgeProtocolVersion::DEFAULT;
141/// Canonical set of protocol versions this runtime/wire contract accepts.
142pub const SUPERVISOR_BRIDGE_SUPPORTED_PROTOCOL_VERSIONS: &[BridgeProtocolVersion] =
143    BridgeProtocolVersion::SUPPORTED;
144
145/// Return the canonical current supervisor bridge protocol version.
146pub const fn supervisor_bridge_current_protocol_version() -> BridgeProtocolVersion {
147    SUPERVISOR_BRIDGE_CURRENT_PROTOCOL_VERSION
148}
149
150/// Return the canonical default supervisor bridge protocol version.
151pub const fn supervisor_bridge_default_protocol_version() -> BridgeProtocolVersion {
152    SUPERVISOR_BRIDGE_DEFAULT_PROTOCOL_VERSION
153}
154
155/// Return the canonical list of supported supervisor bridge protocol versions.
156pub fn supervisor_bridge_supported_protocol_versions() -> &'static [BridgeProtocolVersion] {
157    SUPERVISOR_BRIDGE_SUPPORTED_PROTOCOL_VERSIONS
158}
159
160/// Return `true` when `protocol_version` is accepted by this bridge contract.
161pub fn supervisor_bridge_protocol_version_supported(
162    protocol_version: BridgeProtocolVersion,
163) -> bool {
164    protocol_version.is_supported()
165}
166
167fn default_supported_protocol_versions() -> Vec<BridgeProtocolVersion> {
168    supervisor_bridge_supported_protocol_versions().to_vec()
169}
170
171/// Remove the one-time bind bootstrap token from an advertised bridge address.
172pub fn canonicalize_bridge_address(address: &str) -> String {
173    let Some((base, query)) = address.split_once('?') else {
174        return address.to_string();
175    };
176    let filtered: Vec<&str> = query
177        .split('&')
178        .filter(|pair| {
179            pair.split_once('=')
180                .map(|(key, _)| key != SUPERVISOR_BRIDGE_BOOTSTRAP_TOKEN_PARAM)
181                .unwrap_or(true)
182        })
183        .filter(|pair| !pair.is_empty())
184        .collect();
185    if filtered.is_empty() {
186        base.to_string()
187    } else {
188        format!("{base}?{}", filtered.join("&"))
189    }
190}
191
192// ---------------------------------------------------------------------------
193// Command envelope
194// ---------------------------------------------------------------------------
195
196/// A typed command sent from a supervisor to a member runtime.
197#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
198#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
199#[serde(tag = "command", rename_all = "snake_case", deny_unknown_fields)]
200#[non_exhaustive]
201pub enum BridgeCommand {
202    BindMember(BridgeBindPayload),
203    AuthorizeSupervisor(BridgeSupervisorPayload),
204    RevokeSupervisor(BridgeSupervisorPayload),
205    DeliverMemberInput(BridgeDeliveryPayload),
206    ObserveMember(BridgeSupervisorPayload),
207    InterruptMember(BridgeSupervisorPayload),
208    HardCancelMember(BridgeHardCancelPayload),
209    RetireMember(BridgeSupervisorPayload),
210    DestroyMember(BridgeSupervisorPayload),
211    WireMember(BridgePeerWiringPayload),
212    UnwireMember(BridgePeerWiringPayload),
213}
214
215impl BridgeCommand {
216    /// Protocol version carried by this command's payload.
217    pub fn protocol_version(&self) -> BridgeProtocolVersion {
218        match self {
219            Self::BindMember(payload) => payload.protocol_version,
220            Self::AuthorizeSupervisor(payload)
221            | Self::RevokeSupervisor(payload)
222            | Self::ObserveMember(payload)
223            | Self::InterruptMember(payload)
224            | Self::RetireMember(payload)
225            | Self::DestroyMember(payload) => payload.protocol_version,
226            Self::HardCancelMember(payload) => payload.protocol_version,
227            Self::DeliverMemberInput(payload) => payload.protocol_version,
228            Self::WireMember(payload) | Self::UnwireMember(payload) => payload.protocol_version,
229        }
230    }
231}
232
233/// Decode failure for a supervisor bridge command.
234#[derive(Debug)]
235pub enum BridgeCommandDecodeError {
236    UnsupportedProtocolVersion(UnsupportedBridgeProtocolVersion),
237    Invalid(serde_json::Error),
238}
239
240impl fmt::Display for BridgeCommandDecodeError {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        match self {
243            Self::UnsupportedProtocolVersion(error) => error.fmt(f),
244            Self::Invalid(error) => error.fmt(f),
245        }
246    }
247}
248
249impl std::error::Error for BridgeCommandDecodeError {
250    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
251        match self {
252            Self::UnsupportedProtocolVersion(error) => Some(error),
253            Self::Invalid(error) => Some(error),
254        }
255    }
256}
257
258/// Decode a bridge command while preserving typed protocol-version failures.
259pub fn decode_bridge_command(
260    value: serde_json::Value,
261) -> Result<BridgeCommand, BridgeCommandDecodeError> {
262    if let Some(raw) = value
263        .get("protocol_version")
264        .and_then(serde_json::Value::as_u64)
265        .and_then(|raw| u32::try_from(raw).ok())
266    {
267        BridgeProtocolVersion::from_supported_u32(raw)
268            .map_err(BridgeCommandDecodeError::UnsupportedProtocolVersion)?;
269    }
270    serde_json::from_value(value).map_err(BridgeCommandDecodeError::Invalid)
271}
272
273// ---------------------------------------------------------------------------
274// Reply envelope
275// ---------------------------------------------------------------------------
276
277/// A typed reply from a member runtime back to the supervisor.
278#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
279#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(tag = "result", rename_all = "snake_case", deny_unknown_fields)]
281#[non_exhaustive]
282pub enum BridgeReply {
283    BindMember(BridgeBindResponse),
284    Ack(BridgeAck),
285    Observation(BridgeObservationResponse),
286    Delivery(BridgeDeliveryResponse),
287    Retire(BridgeRetireResponse),
288    Destroy(BridgeDestroyResponse),
289    Rejected {
290        cause: BridgeRejectionCause,
291        reason: String,
292    },
293}
294
295/// Decoded bridge rejection reply.
296///
297/// Protocol v2 rejections carry a typed [`BridgeRejectionCause`]. Bare JSON
298/// string replies are only a protocol-v1 compatibility shape; callers must not
299/// treat them as authoritative typed v2 causes.
300#[derive(Debug, Clone, PartialEq, Eq)]
301#[non_exhaustive]
302pub enum BridgeRejectionReply {
303    Typed {
304        cause: BridgeRejectionCause,
305        reason: String,
306    },
307    LegacyV1RawString {
308        reason: String,
309    },
310}
311
312impl BridgeRejectionReply {
313    pub fn reason(&self) -> &str {
314        match self {
315            Self::Typed { reason, .. } | Self::LegacyV1RawString { reason } => reason,
316        }
317    }
318
319    pub fn typed_cause(&self) -> Option<BridgeRejectionCause> {
320        match self {
321            Self::Typed { cause, .. } => Some(*cause),
322            Self::LegacyV1RawString { .. } => None,
323        }
324    }
325
326    pub fn is_legacy_v1_raw_string(&self) -> bool {
327        matches!(self, Self::LegacyV1RawString { .. })
328    }
329}
330
331/// Decode a typed protocol-v2 bridge rejection.
332///
333/// Deliberately ignores bare JSON strings. Use
334/// [`decode_legacy_v1_raw_string_rejection`] only when the command was sent
335/// over the explicit protocol-v1 compatibility path.
336pub fn decode_protocol_v2_bridge_rejection(
337    value: &serde_json::Value,
338) -> Option<BridgeRejectionReply> {
339    match serde_json::from_value::<BridgeReply>(value.clone()).ok()? {
340        BridgeReply::Rejected { cause, reason } => {
341            Some(BridgeRejectionReply::Typed { cause, reason })
342        }
343        _ => None,
344    }
345}
346
347/// Decode the legacy protocol-v1 raw-string rejection compatibility shape.
348pub fn decode_legacy_v1_raw_string_rejection(
349    value: &serde_json::Value,
350) -> Option<BridgeRejectionReply> {
351    value
352        .as_str()
353        .map(|reason| BridgeRejectionReply::LegacyV1RawString {
354            reason: reason.to_string(),
355        })
356}
357
358/// Decode a bridge rejection according to a supported command protocol version.
359///
360/// Supported supervisors expect typed rejection replies. The legacy v1 bare
361/// string shape remains isolated in [`decode_legacy_v1_raw_string_rejection`]
362/// and is never promoted through this supported-version path.
363pub fn decode_bridge_rejection_reply(
364    protocol_version: BridgeProtocolVersion,
365    value: &serde_json::Value,
366) -> Option<BridgeRejectionReply> {
367    let _ = protocol_version;
368    decode_protocol_v2_bridge_rejection(value)
369}
370
371/// Typed vocabulary for why a bridge command was rejected.
372///
373/// Callers branch on the typed `cause` to drive recovery logic; the
374/// accompanying `reason` string is for operator diagnostics only and must
375/// not be pattern-matched. Reserve `Internal` for true invariant
376/// violations — ordinary validation failures get a specific cause.
377#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
378#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
379#[serde(rename_all = "snake_case")]
380#[non_exhaustive]
381pub enum BridgeRejectionCause {
382    /// No supervisor is bound yet; caller must `bind_member` first.
383    NotBound,
384    /// The request targets an older epoch than the bound authority.
385    StaleSupervisor,
386    /// The authenticated sender does not match the authorized supervisor.
387    SenderMismatch,
388    /// A different supervisor is already bound; rotation must go through
389    /// `authorize_supervisor`, not `bind_member`.
390    AlreadyBound,
391    /// The bootstrap token did not match the runtime's expected value.
392    InvalidBootstrapToken,
393    /// The wire protocol version is not supported by this runtime.
394    UnsupportedProtocolVersion,
395    /// The embedded supervisor peer spec failed validation.
396    InvalidSupervisorSpec,
397    /// The embedded trusted-peer spec failed validation.
398    InvalidPeerSpec,
399    /// The `expected_address` in the bind payload does not match this
400    /// runtime's advertised address.
401    AddressMismatch,
402    /// The command variant is not currently handled by this runtime.
403    Unsupported,
404    /// An unexpected invariant was violated while handling the command.
405    Internal,
406}
407
408/// Recoverability class of a bridge rejection.
409///
410/// The class is a protocol-level property of each [`BridgeRejectionCause`]
411/// variant — not a decision for downstream helpers to make by pattern
412/// matching on a hardcoded cause set. Callers branch on the class to
413/// decide whether recovery by re-running `BindMember` is appropriate.
414#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
415#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
416#[serde(rename_all = "snake_case")]
417#[non_exhaustive]
418pub enum BridgeRejectionClass {
419    /// Member is reachable and protocol-compliant, but its supervisor
420    /// authority is missing or out-of-sync with the caller's. A fresh
421    /// `BindMember` will reconcile.
422    RecoverableBySupervisorRebind,
423    /// The rejection reflects a hard contract violation (protocol
424    /// version, identity, bootstrap proof, invariant) that a fresh bind
425    /// cannot fix. The rejection must bubble up.
426    Fatal,
427}
428
429impl BridgeRejectionCause {
430    /// Protocol-level recoverability class for this rejection cause.
431    pub const fn class(self) -> BridgeRejectionClass {
432        match self {
433            Self::NotBound | Self::StaleSupervisor | Self::SenderMismatch => {
434                BridgeRejectionClass::RecoverableBySupervisorRebind
435            }
436            Self::AlreadyBound
437            | Self::InvalidBootstrapToken
438            | Self::UnsupportedProtocolVersion
439            | Self::InvalidSupervisorSpec
440            | Self::InvalidPeerSpec
441            | Self::AddressMismatch
442            | Self::Unsupported
443            | Self::Internal => BridgeRejectionClass::Fatal,
444        }
445    }
446}
447
448// ---------------------------------------------------------------------------
449// Member runtime state (wire projection)
450// ---------------------------------------------------------------------------
451
452/// Wire projection of a member's runtime state.
453#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
454#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
455#[serde(rename_all = "snake_case")]
456#[non_exhaustive]
457pub enum BridgeMemberRuntimeState {
458    Initializing,
459    Idle,
460    Attached,
461    Running,
462    Retired,
463    Stopped,
464    Destroyed,
465}
466
467impl std::fmt::Display for BridgeMemberRuntimeState {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        match self {
470            Self::Initializing => write!(f, "initializing"),
471            Self::Idle => write!(f, "idle"),
472            Self::Attached => write!(f, "attached"),
473            Self::Running => write!(f, "running"),
474            Self::Retired => write!(f, "retired"),
475            Self::Stopped => write!(f, "stopped"),
476            Self::Destroyed => write!(f, "destroyed"),
477        }
478    }
479}
480
481// ---------------------------------------------------------------------------
482// Trusted peer spec (bridge-local, no meerkat-core dependency needed)
483// ---------------------------------------------------------------------------
484
485/// Minimal trusted peer identity for supervisor bridge wire messages.
486///
487/// Mirrors `meerkat_core::comms::TrustedPeerDescriptor` (post-C-TRP) but is
488/// self-contained in the contracts crate so neither sender nor receiver
489/// needs a cross-crate dependency for deserialization. Fields stay
490/// stringly at the wire boundary — `peer_id` is the canonical comms routing
491/// UUID, while raw Ed25519 public key material is carried only in `pubkey`.
492/// The typed `PeerId`/`PeerName`/`PeerAddress` atoms are re-hydrated on the
493/// receiving side.
494#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496#[serde(deny_unknown_fields)]
497pub struct BridgePeerSpec {
498    pub name: String,
499    pub peer_id: String,
500    pub address: String,
501    /// Ed25519 signing public key bytes — required so the receiver can
502    /// verify envelope signatures after trust registration. Serialized
503    /// as a 32-element array; the JSON form is an array of numbers, not
504    /// a base64/hex string, to keep the wire deliberately boring.
505    #[serde(default)]
506    pub pubkey: [u8; 32],
507}
508
509/// Typed Ed25519 signing/trust subject carried by a bridge peer spec.
510#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
511pub struct BridgePeerPubKey([u8; 32]);
512
513impl BridgePeerPubKey {
514    pub const fn new(bytes: [u8; 32]) -> Self {
515        Self(bytes)
516    }
517
518    pub const fn as_bytes(&self) -> &[u8; 32] {
519        &self.0
520    }
521
522    pub const fn into_bytes(self) -> [u8; 32] {
523        self.0
524    }
525
526    pub const fn is_zero(&self) -> bool {
527        let mut index = 0;
528        while index < self.0.len() {
529            if self.0[index] != 0 {
530                return false;
531            }
532            index += 1;
533        }
534        true
535    }
536
537    pub fn derived_peer_id(&self) -> PeerId {
538        PeerId::from_ed25519_pubkey(&self.0)
539    }
540}
541
542impl From<[u8; 32]> for BridgePeerPubKey {
543    fn from(bytes: [u8; 32]) -> Self {
544        Self::new(bytes)
545    }
546}
547
548/// Bridge peer identity after the stringly wire spec has crossed the boundary.
549#[derive(Debug, Clone, PartialEq, Eq)]
550pub struct BridgePeerIdentity {
551    pub name: PeerName,
552    pub peer_id: PeerId,
553    pub address: PeerAddress,
554    pub pubkey: BridgePeerPubKey,
555}
556
557impl BridgePeerIdentity {
558    pub fn into_trusted_peer_descriptor(self) -> TrustedPeerDescriptor {
559        TrustedPeerDescriptor {
560            peer_id: self.peer_id,
561            name: self.name,
562            address: self.address,
563            pubkey: self.pubkey.into_bytes(),
564        }
565    }
566}
567
568/// Connectivity class observed for the bridged member runtime.
569#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
570#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
571#[serde(rename_all = "snake_case")]
572#[non_exhaustive]
573pub enum BridgePeerConnectivity {
574    Reachable,
575    Unreachable,
576    Unknown,
577}
578
579impl From<TrustedPeerDescriptor> for BridgePeerSpec {
580    fn from(spec: TrustedPeerDescriptor) -> Self {
581        Self {
582            name: spec.name.as_str().to_string(),
583            peer_id: spec.peer_id.as_str(),
584            address: spec.address.to_string(),
585            pubkey: spec.pubkey,
586        }
587    }
588}
589
590impl TryFrom<BridgePeerSpec> for meerkat_core::comms::TrustedPeerDescriptor {
591    type Error = String;
592
593    fn try_from(spec: BridgePeerSpec) -> Result<Self, Self::Error> {
594        Self::try_from(&spec)
595    }
596}
597
598impl TryFrom<&BridgePeerSpec> for BridgePeerIdentity {
599    type Error = String;
600
601    fn try_from(spec: &BridgePeerSpec) -> Result<Self, Self::Error> {
602        let peer_id = PeerId::parse(&spec.peer_id).map_err(|e| format!("invalid peer_id: {e}"))?;
603        let name =
604            PeerName::new(spec.name.clone()).map_err(|e| format!("invalid peer name: {e}"))?;
605        let address = parse_peer_address(&spec.address)?;
606        let pubkey = BridgePeerPubKey::new(spec.pubkey);
607        if pubkey.is_zero() {
608            return Err("peer pubkey must be non-zero".to_string());
609        }
610        let derived = pubkey.derived_peer_id();
611        if derived != peer_id {
612            return Err(format!(
613                "peer_id {peer_id} does not match pubkey-derived id {derived}"
614            ));
615        }
616        Ok(Self {
617            name,
618            peer_id,
619            address,
620            pubkey,
621        })
622    }
623}
624
625impl TryFrom<&BridgePeerSpec> for TrustedPeerDescriptor {
626    type Error = String;
627
628    fn try_from(spec: &BridgePeerSpec) -> Result<Self, Self::Error> {
629        BridgePeerIdentity::try_from(spec).map(BridgePeerIdentity::into_trusted_peer_descriptor)
630    }
631}
632
633fn parse_peer_address(raw: &str) -> Result<PeerAddress, String> {
634    PeerAddress::parse(raw).map_err(|err| err.to_string())
635}
636
637// ---------------------------------------------------------------------------
638// Payload types
639// ---------------------------------------------------------------------------
640
641/// Supervisor authority credentials included in every bridge command.
642#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
643#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
644#[serde(deny_unknown_fields)]
645pub struct BridgeSupervisorPayload {
646    pub supervisor: BridgePeerSpec,
647    pub epoch: u64,
648    pub protocol_version: BridgeProtocolVersion,
649}
650
651/// Explicit hard-cancel command payload.
652///
653/// `InterruptMember` is the cooperative boundary-break path. This payload is
654/// intentionally separate so supervisors cannot accidentally collapse boundary
655/// cancellation and immediate user/session interrupt authority onto one wire
656/// command.
657#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
658#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
659#[serde(deny_unknown_fields)]
660pub struct BridgeHardCancelPayload {
661    pub supervisor: BridgePeerSpec,
662    pub epoch: u64,
663    pub protocol_version: BridgeProtocolVersion,
664    pub reason: String,
665}
666
667/// One-time bootstrap proof exchanged between a mob supervisor and a
668/// member runtime on initial bind.
669///
670/// Transparent over the wire (`#[serde(transparent)]` — a bare JSON string),
671/// but carries a redacting `Debug` impl and has no `Display` impl so the
672/// raw secret cannot accidentally land in logs or panic messages. Treat it
673/// like an API key: read `as_str()` only at the comms/transport boundary.
674#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
675#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
676#[serde(transparent)]
677pub struct BridgeBootstrapToken(String);
678
679impl BridgeBootstrapToken {
680    pub fn new(token: impl Into<String>) -> Self {
681        Self(token.into())
682    }
683
684    pub fn as_str(&self) -> &str {
685        &self.0
686    }
687
688    pub fn into_string(self) -> String {
689        self.0
690    }
691
692    pub fn is_empty(&self) -> bool {
693        self.0.is_empty()
694    }
695
696    pub fn len(&self) -> usize {
697        self.0.len()
698    }
699}
700
701impl From<String> for BridgeBootstrapToken {
702    fn from(token: String) -> Self {
703        Self(token)
704    }
705}
706
707impl From<&str> for BridgeBootstrapToken {
708    fn from(token: &str) -> Self {
709        Self(token.to_string())
710    }
711}
712
713impl std::fmt::Debug for BridgeBootstrapToken {
714    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
715        if self.0.is_empty() {
716            write!(f, "BridgeBootstrapToken(empty)")
717        } else {
718            write!(f, "BridgeBootstrapToken(<redacted, {}B>)", self.0.len())
719        }
720    }
721}
722
723// Deliberately no `Display` impl: `Display` is the canonical "show me this
724// value" path and the bootstrap token must never flow into a format string
725// or user-facing surface. Call `as_str()` explicitly when bridging to the
726// comms layer.
727
728/// Bind a remote runtime to this supervisor.
729#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
730#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
731#[serde(deny_unknown_fields)]
732pub struct BridgeBindPayload {
733    pub supervisor: BridgePeerSpec,
734    pub epoch: u64,
735    pub protocol_version: BridgeProtocolVersion,
736    /// Expected canonical member `PeerId`; not an Ed25519 public-key string.
737    pub expected_peer_id: String,
738    pub expected_address: String,
739    pub bootstrap_token: BridgeBootstrapToken,
740}
741
742/// Capabilities advertised by a member runtime on bind.
743#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
744#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
745#[serde(deny_unknown_fields)]
746pub struct BridgeCapabilities {
747    /// Protocol version implemented by the responding member runtime.
748    #[serde(default = "supervisor_bridge_current_protocol_version")]
749    pub current_protocol_version: BridgeProtocolVersion,
750    /// Protocol version new supervisors should use for fresh authority records.
751    #[serde(default = "supervisor_bridge_default_protocol_version")]
752    pub default_protocol_version: BridgeProtocolVersion,
753    /// Protocol versions accepted by the responding member runtime.
754    #[serde(default = "default_supported_protocol_versions")]
755    pub supported_protocol_versions: Vec<BridgeProtocolVersion>,
756    #[serde(default)]
757    pub deliver_member_input: bool,
758    #[serde(default)]
759    pub observe_member: bool,
760    #[serde(default)]
761    pub interrupt_member: bool,
762    #[serde(default)]
763    pub hard_cancel_member: bool,
764    #[serde(default)]
765    pub retire_member: bool,
766    #[serde(default)]
767    pub destroy_member: bool,
768    #[serde(default)]
769    pub wire_member: bool,
770    #[serde(default)]
771    pub unwire_member: bool,
772}
773
774impl Default for BridgeCapabilities {
775    fn default() -> Self {
776        Self {
777            current_protocol_version: supervisor_bridge_current_protocol_version(),
778            default_protocol_version: supervisor_bridge_default_protocol_version(),
779            supported_protocol_versions: supervisor_bridge_supported_protocol_versions().to_vec(),
780            deliver_member_input: false,
781            observe_member: false,
782            interrupt_member: false,
783            hard_cancel_member: false,
784            retire_member: false,
785            destroy_member: false,
786            wire_member: false,
787            unwire_member: false,
788        }
789    }
790}
791
792// ---------------------------------------------------------------------------
793// Response types
794// ---------------------------------------------------------------------------
795
796/// Response to a bind command.
797#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
798#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
799#[serde(deny_unknown_fields)]
800pub struct BridgeBindResponse {
801    /// Canonical member `PeerId`; transport public-key bytes stay out of this field.
802    pub peer_id: String,
803    pub address: String,
804    pub capabilities: BridgeCapabilities,
805}
806
807/// Simple acknowledgment.
808#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
809#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
810#[serde(deny_unknown_fields)]
811pub struct BridgeAck {
812    pub ok: bool,
813}
814
815/// Deliver one logical input to a member.
816#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
818#[serde(deny_unknown_fields)]
819pub struct BridgeDeliveryPayload {
820    pub supervisor: BridgePeerSpec,
821    pub epoch: u64,
822    pub protocol_version: BridgeProtocolVersion,
823    pub input_id: String,
824    pub content: meerkat_core::types::ContentInput,
825    pub handling_mode: meerkat_core::types::HandlingMode,
826}
827
828/// Outcome of a delivery attempt.
829#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
830#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
831#[serde(tag = "outcome", rename_all = "snake_case", deny_unknown_fields)]
832pub enum BridgeDeliveryOutcome {
833    Accepted,
834    Deduplicated {
835        existing_input_id: String,
836    },
837    Rejected {
838        cause: BridgeDeliveryRejectionCause,
839        reason: String,
840    },
841}
842
843/// Typed vocabulary for why member input delivery was rejected.
844///
845/// This mirrors the runtime accept-boundary rejection vocabulary at the
846/// bridge wire boundary. Callers should branch on this typed cause and treat
847/// the sibling `reason` string on [`BridgeDeliveryOutcome::Rejected`] as
848/// operator-facing presentation only.
849#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
850#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
851#[serde(tag = "kind", rename_all = "snake_case", deny_unknown_fields)]
852#[non_exhaustive]
853pub enum BridgeDeliveryRejectionCause {
854    /// Runtime was not in a state that accepts input.
855    NotReady { state: BridgeMemberRuntimeState },
856    /// Input failed durability validation.
857    DurabilityViolation { detail: String },
858    /// Peer input carried a forbidden handling mode.
859    PeerHandlingModeInvalid { detail: String },
860    /// The bridge could not map the rejection to a known typed cause.
861    Internal { detail: String },
862}
863
864impl std::fmt::Display for BridgeDeliveryRejectionCause {
865    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
866        match self {
867            Self::NotReady { state } => write!(f, "not_ready(state={state})"),
868            Self::DurabilityViolation { detail } => {
869                write!(f, "durability_violation(detail={detail})")
870            }
871            Self::PeerHandlingModeInvalid { detail } => {
872                write!(f, "peer_handling_mode_invalid(detail={detail})")
873            }
874            Self::Internal { detail } => write!(f, "internal(detail={detail})"),
875        }
876    }
877}
878
879/// Full response to a delivery command.
880#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
881#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
882#[serde(deny_unknown_fields)]
883pub struct BridgeDeliveryResponse {
884    pub input_id: String,
885    pub canonical_input_id: Option<String>,
886    pub outcome: BridgeDeliveryOutcome,
887}
888
889/// Peer wiring command payload.
890#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
891#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
892#[serde(deny_unknown_fields)]
893pub struct BridgePeerWiringPayload {
894    pub supervisor: BridgePeerSpec,
895    pub epoch: u64,
896    pub protocol_version: BridgeProtocolVersion,
897    pub peer_spec: BridgePeerSpec,
898}
899
900/// Response to a retire command.
901#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
902#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
903#[serde(deny_unknown_fields)]
904pub struct BridgeRetireResponse {
905    pub inputs_abandoned: usize,
906    pub inputs_pending_drain: usize,
907}
908
909/// Response to a destroy command.
910#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
911#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
912#[serde(deny_unknown_fields)]
913pub struct BridgeDestroyResponse {
914    pub inputs_abandoned: usize,
915}
916
917/// Response to an observe command.
918#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
919#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
920#[serde(deny_unknown_fields)]
921pub struct BridgeObservationResponse {
922    /// Bridged runtime state for the observed member.
923    pub state: BridgeMemberRuntimeState,
924    #[serde(default, skip_serializing_if = "Option::is_none")]
925    pub accepting_inputs: Option<bool>,
926    /// Current run identifier reported by the member runtime.
927    #[serde(default, skip_serializing_if = "Option::is_none")]
928    pub current_run_id: Option<String>,
929    #[serde(default, skip_serializing_if = "Option::is_none")]
930    pub peer_connectivity: Option<BridgePeerConnectivity>,
931    #[serde(default, skip_serializing_if = "Option::is_none")]
932    pub last_error: Option<String>,
933    /// ISO 8601 timestamp.
934    pub observed_at: String,
935}
936
937impl BridgeObservationResponse {
938    /// Build an observation reply.
939    pub fn new(
940        state: BridgeMemberRuntimeState,
941        accepting_inputs: Option<bool>,
942        current_run_id: Option<String>,
943        peer_connectivity: Option<BridgePeerConnectivity>,
944        last_error: Option<String>,
945        observed_at: String,
946    ) -> Self {
947        Self {
948            state,
949            accepting_inputs,
950            current_run_id,
951            peer_connectivity,
952            last_error,
953            observed_at,
954        }
955    }
956}
957
958#[cfg(test)]
959#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
960mod tests {
961    use super::*;
962    use serde_json::json;
963
964    #[test]
965    fn observation_response_new_sets_observation_fields() {
966        let response = BridgeObservationResponse::new(
967            BridgeMemberRuntimeState::Running,
968            Some(true),
969            Some("run-1".to_string()),
970            Some(BridgePeerConnectivity::Reachable),
971            None,
972            "2026-04-16T07:00:00Z".to_string(),
973        );
974
975        assert_eq!(response.state, BridgeMemberRuntimeState::Running);
976        assert_eq!(response.current_run_id.as_deref(), Some("run-1"));
977        assert_eq!(response.accepting_inputs, Some(true));
978        assert_eq!(
979            response.peer_connectivity,
980            Some(BridgePeerConnectivity::Reachable)
981        );
982    }
983
984    // -----------------------------------------------------------------------
985    // Helpers
986    // -----------------------------------------------------------------------
987
988    fn sample_peer_spec() -> BridgePeerSpec {
989        BridgePeerSpec {
990            name: "member-a".to_string(),
991            peer_id: "peer-abc".to_string(),
992            address: "tcp://127.0.0.1:7000".to_string(),
993            pubkey: [0u8; 32],
994        }
995    }
996
997    fn sample_supervisor_payload() -> BridgeSupervisorPayload {
998        BridgeSupervisorPayload {
999            supervisor: sample_peer_spec(),
1000            epoch: 42,
1001            protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1002        }
1003    }
1004
1005    fn sample_hard_cancel_payload() -> BridgeHardCancelPayload {
1006        BridgeHardCancelPayload {
1007            supervisor: sample_peer_spec(),
1008            epoch: 42,
1009            protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1010            reason: "test hard cancel".to_string(),
1011        }
1012    }
1013
1014    fn sample_wiring_payload() -> BridgePeerWiringPayload {
1015        BridgePeerWiringPayload {
1016            supervisor: sample_peer_spec(),
1017            epoch: 7,
1018            protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1019            peer_spec: BridgePeerSpec {
1020                name: "member-b".to_string(),
1021                peer_id: "peer-xyz".to_string(),
1022                address: "tcp://127.0.0.1:7001".to_string(),
1023                pubkey: [0u8; 32],
1024            },
1025        }
1026    }
1027
1028    // -----------------------------------------------------------------------
1029    // 1. BridgeCommand JSON round-trip — one subtest per variant.
1030    // -----------------------------------------------------------------------
1031    //
1032    // `BridgeCommand` does not derive `PartialEq`, so we assert round-trip
1033    // correctness by comparing `serde_json::Value` before and after a
1034    // decode/encode cycle. Equal JSON values imply semantic equality under
1035    // the wire contract.
1036
1037    fn assert_command_round_trip(cmd: &BridgeCommand) {
1038        let value = serde_json::to_value(cmd).expect("serialize command");
1039        let decoded: BridgeCommand = serde_json::from_value(value.clone()).expect("decode command");
1040        let reencoded = serde_json::to_value(&decoded).expect("reserialize command");
1041        assert_eq!(
1042            value, reencoded,
1043            "BridgeCommand round-trip must preserve wire shape"
1044        );
1045    }
1046
1047    #[test]
1048    fn bridge_command_unknown_top_level_field_fails_closed() {
1049        let mut value =
1050            serde_json::to_value(BridgeCommand::ObserveMember(sample_supervisor_payload()))
1051                .expect("serialize command");
1052        value["extra_behavior"] = json!(true);
1053
1054        let err = serde_json::from_value::<BridgeCommand>(value)
1055            .expect_err("unknown command fields must fail at serde boundary");
1056        let message = err.to_string();
1057        assert!(
1058            message.contains("extra_behavior") || message.contains("unknown field"),
1059            "expected unknown field error, got: {message}"
1060        );
1061    }
1062
1063    #[test]
1064    fn bridge_command_unknown_nested_payload_field_fails_closed() {
1065        let mut value =
1066            serde_json::to_value(BridgeCommand::ObserveMember(sample_supervisor_payload()))
1067                .expect("serialize command");
1068        value["supervisor"]["extra_behavior"] = json!(true);
1069
1070        let err = serde_json::from_value::<BridgeCommand>(value)
1071            .expect_err("unknown nested payload fields must fail at serde boundary");
1072        let message = err.to_string();
1073        assert!(
1074            message.contains("extra_behavior") || message.contains("unknown field"),
1075            "expected unknown field error, got: {message}"
1076        );
1077    }
1078
1079    #[test]
1080    fn bridge_command_bind_member_round_trip() {
1081        let cmd = BridgeCommand::BindMember(BridgeBindPayload {
1082            supervisor: sample_peer_spec(),
1083            epoch: 1,
1084            protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1085            expected_peer_id: "peer-expected".to_string(),
1086            expected_address: "tcp://127.0.0.1:9000".to_string(),
1087            bootstrap_token: "bootstrap-secret".into(),
1088        });
1089        assert_command_round_trip(&cmd);
1090    }
1091
1092    #[test]
1093    fn bridge_peer_spec_rejects_unknown_address_scheme() {
1094        let spec = BridgePeerSpec {
1095            name: "member-a".to_string(),
1096            peer_id: "aaaaaaaa-0000-4000-8000-000000000001".to_string(),
1097            address: "http://127.0.0.1:7000".to_string(),
1098            pubkey: [0u8; 32],
1099        };
1100
1101        let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1102            .expect_err("supervisor bridge peer specs must fail closed on unknown schemes");
1103        assert!(
1104            err.contains("unknown peer address transport"),
1105            "unexpected error: {err}",
1106        );
1107    }
1108
1109    #[test]
1110    fn bridge_peer_spec_rejects_schemeless_address() {
1111        let spec = BridgePeerSpec {
1112            name: "member-a".to_string(),
1113            peer_id: "aaaaaaaa-0000-4000-8000-000000000001".to_string(),
1114            address: "127.0.0.1:7000".to_string(),
1115            pubkey: [0u8; 32],
1116        };
1117
1118        let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1119            .expect_err("supervisor bridge peer specs must fail closed on schemeless addresses");
1120        assert!(
1121            err.contains("missing transport scheme"),
1122            "unexpected error: {err}",
1123        );
1124    }
1125
1126    #[test]
1127    fn bridge_peer_spec_rejects_zero_pubkey() {
1128        let spec = BridgePeerSpec {
1129            name: "member-a".to_string(),
1130            peer_id: PeerId::from_ed25519_pubkey(&[1u8; 32]).to_string(),
1131            address: "tcp://127.0.0.1:7000".to_string(),
1132            pubkey: [0u8; 32],
1133        };
1134
1135        let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1136            .expect_err("supervisor bridge peer specs must fail closed on zero pubkeys");
1137        assert!(
1138            err.contains("pubkey") && err.contains("non-zero"),
1139            "unexpected error: {err}",
1140        );
1141    }
1142
1143    #[test]
1144    fn bridge_peer_spec_missing_pubkey_defaults_to_zero_and_rejects() {
1145        let value = json!({
1146            "name": "member-a",
1147            "peer_id": PeerId::from_ed25519_pubkey(&[2u8; 32]).to_string(),
1148            "address": "tcp://127.0.0.1:7000"
1149        });
1150        let spec: BridgePeerSpec =
1151            serde_json::from_value(value).expect("legacy bridge peer spec should deserialize");
1152
1153        let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1154            .expect_err("missing pubkey must not become trusted zero-key authority");
1155        assert!(
1156            err.contains("pubkey") && err.contains("non-zero"),
1157            "unexpected error: {err}",
1158        );
1159    }
1160
1161    #[test]
1162    fn bridge_command_authorize_supervisor_round_trip() {
1163        let cmd = BridgeCommand::AuthorizeSupervisor(sample_supervisor_payload());
1164        assert_command_round_trip(&cmd);
1165    }
1166
1167    #[test]
1168    fn bridge_command_revoke_supervisor_round_trip() {
1169        let cmd = BridgeCommand::RevokeSupervisor(sample_supervisor_payload());
1170        assert_command_round_trip(&cmd);
1171    }
1172
1173    #[test]
1174    fn bridge_command_deliver_member_input_round_trip() {
1175        let cmd = BridgeCommand::DeliverMemberInput(BridgeDeliveryPayload {
1176            supervisor: sample_peer_spec(),
1177            epoch: 2,
1178            protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1179            input_id: "input-1".to_string(),
1180            content: meerkat_core::types::ContentInput::Text("hello".to_string()),
1181            handling_mode: meerkat_core::types::HandlingMode::Queue,
1182        });
1183        assert_command_round_trip(&cmd);
1184    }
1185
1186    #[test]
1187    fn bridge_command_observe_member_round_trip() {
1188        let cmd = BridgeCommand::ObserveMember(sample_supervisor_payload());
1189        assert_command_round_trip(&cmd);
1190    }
1191
1192    #[test]
1193    fn bridge_command_interrupt_member_round_trip() {
1194        let cmd = BridgeCommand::InterruptMember(sample_supervisor_payload());
1195        assert_command_round_trip(&cmd);
1196    }
1197
1198    #[test]
1199    fn bridge_command_hard_cancel_member_round_trip() {
1200        let cmd = BridgeCommand::HardCancelMember(sample_hard_cancel_payload());
1201        assert_command_round_trip(&cmd);
1202    }
1203
1204    #[test]
1205    fn bridge_command_retire_member_round_trip() {
1206        let cmd = BridgeCommand::RetireMember(sample_supervisor_payload());
1207        assert_command_round_trip(&cmd);
1208    }
1209
1210    #[test]
1211    fn bridge_command_destroy_member_round_trip() {
1212        let cmd = BridgeCommand::DestroyMember(sample_supervisor_payload());
1213        assert_command_round_trip(&cmd);
1214    }
1215
1216    #[test]
1217    fn bridge_command_wire_member_round_trip() {
1218        let cmd = BridgeCommand::WireMember(sample_wiring_payload());
1219        assert_command_round_trip(&cmd);
1220    }
1221
1222    #[test]
1223    fn bridge_command_unwire_member_round_trip() {
1224        let cmd = BridgeCommand::UnwireMember(sample_wiring_payload());
1225        assert_command_round_trip(&cmd);
1226    }
1227
1228    // -----------------------------------------------------------------------
1229    // 2. BridgeReply::Rejected round-trip with typed cause.
1230    // -----------------------------------------------------------------------
1231    //
1232    // Pins the `{ result, cause, reason }` wire shape. Callers branch on
1233    // the typed `cause`; `reason` is a human-readable diagnostic and must
1234    // not be pattern-matched.
1235
1236    #[test]
1237    fn bridge_reply_rejected_round_trip_with_typed_cause() {
1238        let reply = BridgeReply::Rejected {
1239            cause: BridgeRejectionCause::StaleSupervisor,
1240            reason: "epoch too low".to_string(),
1241        };
1242        let value = serde_json::to_value(&reply).expect("serialize rejected reply");
1243        assert_eq!(
1244            value,
1245            json!({
1246                "result": "rejected",
1247                "cause": "stale_supervisor",
1248                "reason": "epoch too low",
1249            }),
1250            "wire shape must tag rejection with `result` + `cause` + `reason`"
1251        );
1252        let decoded: BridgeReply = serde_json::from_value(value.clone()).expect("decode reply");
1253        match decoded {
1254            BridgeReply::Rejected { cause, ref reason } => {
1255                assert_eq!(cause, BridgeRejectionCause::StaleSupervisor);
1256                assert_eq!(reason, "epoch too low");
1257            }
1258            other => panic!("expected BridgeReply::Rejected, got {other:?}"),
1259        }
1260        let reencoded = serde_json::to_value(&decoded).expect("reserialize reply");
1261        assert_eq!(value, reencoded);
1262    }
1263
1264    #[test]
1265    fn bridge_rejection_decoder_accepts_typed_protocol_v2_rejection() {
1266        let value = json!({
1267            "result": "rejected",
1268            "cause": "sender_mismatch",
1269            "reason": "wrong supervisor",
1270        });
1271
1272        let decoded = decode_bridge_rejection_reply(SUPERVISOR_BRIDGE_PROTOCOL_VERSION, &value)
1273            .expect("typed rejection should decode");
1274
1275        assert_eq!(
1276            decoded.typed_cause(),
1277            Some(BridgeRejectionCause::SenderMismatch)
1278        );
1279        assert_eq!(decoded.reason(), "wrong supervisor");
1280        assert!(!decoded.is_legacy_v1_raw_string());
1281    }
1282
1283    #[test]
1284    fn bridge_rejection_decoder_rejects_raw_string_for_protocol_v2() {
1285        let value = json!("legacy rejection");
1286
1287        assert!(
1288            decode_bridge_rejection_reply(SUPERVISOR_BRIDGE_PROTOCOL_VERSION, &value).is_none(),
1289            "protocol v2 must not promote raw strings into typed rejection causes"
1290        );
1291    }
1292
1293    #[test]
1294    fn bridge_rejection_decoder_isolates_raw_string_to_legacy_v1() {
1295        let value = json!("legacy rejection");
1296
1297        let decoded = decode_legacy_v1_raw_string_rejection(&value)
1298            .expect("legacy raw string should decode only through the explicit v1 helper");
1299
1300        assert_eq!(decoded.typed_cause(), None);
1301        assert_eq!(decoded.reason(), "legacy rejection");
1302        assert!(decoded.is_legacy_v1_raw_string());
1303    }
1304
1305    #[test]
1306    fn bridge_command_reports_payload_protocol_version() {
1307        let command = BridgeCommand::AuthorizeSupervisor(sample_supervisor_payload());
1308
1309        assert_eq!(
1310            command.protocol_version(),
1311            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1312        );
1313    }
1314
1315    #[test]
1316    fn supervisor_bridge_protocol_versions_are_reported_from_single_authority() {
1317        assert_eq!(
1318            supervisor_bridge_current_protocol_version(),
1319            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1320        );
1321        assert_eq!(
1322            supervisor_bridge_default_protocol_version(),
1323            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1324        );
1325        assert_eq!(
1326            supervisor_bridge_supported_protocol_versions(),
1327            &[SUPERVISOR_BRIDGE_PROTOCOL_VERSION]
1328        );
1329        assert!(supervisor_bridge_protocol_version_supported(
1330            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1331        ));
1332        assert!(BridgeProtocolVersion::from_supported_u32(1).is_err());
1333        assert!(BridgeProtocolVersion::from_supported_u32(999).is_err());
1334    }
1335
1336    #[test]
1337    fn bridge_capabilities_default_reports_canonical_protocol_versions() {
1338        let capabilities = BridgeCapabilities::default();
1339        assert_eq!(
1340            capabilities.current_protocol_version,
1341            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1342        );
1343        assert_eq!(
1344            capabilities.default_protocol_version,
1345            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1346        );
1347        assert_eq!(
1348            capabilities.supported_protocol_versions,
1349            vec![SUPERVISOR_BRIDGE_PROTOCOL_VERSION]
1350        );
1351    }
1352
1353    #[test]
1354    fn bridge_capabilities_deserialize_legacy_without_protocol_report() {
1355        let capabilities: BridgeCapabilities = serde_json::from_value(json!({
1356            "deliver_member_input": true,
1357            "observe_member": true,
1358            "interrupt_member": true,
1359            "retire_member": true,
1360            "destroy_member": true,
1361            "wire_member": true,
1362            "unwire_member": true,
1363        }))
1364        .expect("legacy capability payload without protocol report should decode");
1365
1366        assert_eq!(
1367            capabilities.current_protocol_version,
1368            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1369        );
1370        assert_eq!(
1371            capabilities.default_protocol_version,
1372            SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1373        );
1374        assert_eq!(
1375            capabilities.supported_protocol_versions,
1376            vec![SUPERVISOR_BRIDGE_PROTOCOL_VERSION]
1377        );
1378        assert!(capabilities.deliver_member_input);
1379        assert!(capabilities.observe_member);
1380        assert!(capabilities.interrupt_member);
1381        assert!(!capabilities.hard_cancel_member);
1382        assert!(capabilities.retire_member);
1383        assert!(capabilities.destroy_member);
1384        assert!(capabilities.wire_member);
1385        assert!(capabilities.unwire_member);
1386    }
1387
1388    #[test]
1389    fn bridge_bind_payload_rejects_unsupported_protocol_version_at_wire_boundary() {
1390        let raw = json!({
1391            "supervisor": {
1392                "name": "mob/__mob_supervisor__",
1393                "peer_id": "00000000-0000-0000-0000-00000000bbbb",
1394                "address": "inproc://mob/__mob_supervisor__",
1395            },
1396            "epoch": 7,
1397            "protocol_version": 999,
1398            "expected_peer_id": "00000000-0000-0000-0000-00000000aaaa",
1399            "expected_address": "inproc://member",
1400            "bootstrap_token": "tok-raw-string",
1401        });
1402
1403        let error = serde_json::from_value::<BridgeBindPayload>(raw)
1404            .expect_err("unsupported protocol versions must fail closed at decode");
1405
1406        assert!(
1407            error
1408                .to_string()
1409                .contains("unsupported supervisor bridge protocol version"),
1410            "unexpected error: {error}",
1411        );
1412    }
1413
1414    #[test]
1415    fn bridge_capabilities_reject_unsupported_protocol_versions_at_wire_boundary() {
1416        let raw = json!({
1417            "current_protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1418            "default_protocol_version": 999,
1419            "supported_protocol_versions": [SUPERVISOR_BRIDGE_PROTOCOL_VERSION],
1420            "deliver_member_input": true,
1421        });
1422
1423        let error = serde_json::from_value::<BridgeCapabilities>(raw)
1424            .expect_err("unsupported advertised defaults must fail closed at decode");
1425
1426        assert!(
1427            error
1428                .to_string()
1429                .contains("unsupported supervisor bridge protocol version"),
1430            "unexpected error: {error}",
1431        );
1432    }
1433
1434    // -----------------------------------------------------------------------
1435    // 3. BridgeObservationResponse round-trip with all-present / all-absent
1436    //    optional fields. `skip_serializing_if = "Option::is_none"` must
1437    //    drop absent fields from the wire form.
1438    // -----------------------------------------------------------------------
1439
1440    #[test]
1441    fn observation_response_round_trip_all_optional_present() {
1442        let response = BridgeObservationResponse {
1443            state: BridgeMemberRuntimeState::Running,
1444            accepting_inputs: Some(true),
1445            current_run_id: Some("run-42".to_string()),
1446            peer_connectivity: Some(BridgePeerConnectivity::Reachable),
1447            last_error: Some("transient network blip".to_string()),
1448            observed_at: "2026-04-16T07:00:00Z".to_string(),
1449        };
1450        let value = serde_json::to_value(&response).expect("serialize observation");
1451        assert_eq!(
1452            value,
1453            json!({
1454                "state": "running",
1455                "accepting_inputs": true,
1456                "current_run_id": "run-42",
1457                "peer_connectivity": "reachable",
1458                "last_error": "transient network blip",
1459                "observed_at": "2026-04-16T07:00:00Z",
1460            })
1461        );
1462        let decoded: BridgeObservationResponse =
1463            serde_json::from_value(value.clone()).expect("decode observation");
1464        assert_eq!(decoded, response);
1465        let reencoded = serde_json::to_value(&decoded).expect("reserialize observation");
1466        assert_eq!(value, reencoded);
1467    }
1468
1469    #[test]
1470    fn observation_response_round_trip_all_optional_absent() {
1471        let response = BridgeObservationResponse {
1472            state: BridgeMemberRuntimeState::Idle,
1473            accepting_inputs: None,
1474            current_run_id: None,
1475            peer_connectivity: None,
1476            last_error: None,
1477            observed_at: "2026-04-16T07:01:00Z".to_string(),
1478        };
1479        let value = serde_json::to_value(&response).expect("serialize observation");
1480        assert_eq!(
1481            value,
1482            json!({
1483                "state": "idle",
1484                "observed_at": "2026-04-16T07:01:00Z",
1485            }),
1486            "absent optional fields must be skipped on the wire"
1487        );
1488        let decoded: BridgeObservationResponse =
1489            serde_json::from_value(value.clone()).expect("decode observation");
1490        assert_eq!(decoded, response);
1491        let reencoded = serde_json::to_value(&decoded).expect("reserialize observation");
1492        assert_eq!(value, reencoded);
1493    }
1494
1495    // -----------------------------------------------------------------------
1496    // 4. BridgePeerConnectivity snake_case rename.
1497    // -----------------------------------------------------------------------
1498
1499    #[test]
1500    fn peer_connectivity_serializes_as_snake_case() {
1501        for (variant, expected) in [
1502            (BridgePeerConnectivity::Reachable, "reachable"),
1503            (BridgePeerConnectivity::Unreachable, "unreachable"),
1504            (BridgePeerConnectivity::Unknown, "unknown"),
1505        ] {
1506            let value = serde_json::to_value(variant).expect("serialize connectivity");
1507            assert_eq!(
1508                value,
1509                json!(expected),
1510                "variant {variant:?} must serialize as {expected:?}"
1511            );
1512            let decoded: BridgePeerConnectivity =
1513                serde_json::from_value(value).expect("decode connectivity");
1514            assert_eq!(decoded, variant);
1515        }
1516    }
1517
1518    // -----------------------------------------------------------------------
1519    // 5. BridgeMemberRuntimeState — every variant: Display + serde round-trip.
1520    // -----------------------------------------------------------------------
1521
1522    #[test]
1523    fn member_runtime_state_display_and_round_trip_all_variants() {
1524        let cases: &[(BridgeMemberRuntimeState, &str)] = &[
1525            (BridgeMemberRuntimeState::Initializing, "initializing"),
1526            (BridgeMemberRuntimeState::Idle, "idle"),
1527            (BridgeMemberRuntimeState::Attached, "attached"),
1528            (BridgeMemberRuntimeState::Running, "running"),
1529            (BridgeMemberRuntimeState::Retired, "retired"),
1530            (BridgeMemberRuntimeState::Stopped, "stopped"),
1531            (BridgeMemberRuntimeState::Destroyed, "destroyed"),
1532        ];
1533        for (variant, expected) in cases {
1534            assert_eq!(
1535                variant.to_string(),
1536                *expected,
1537                "Display output must match snake_case wire form for {variant:?}"
1538            );
1539            let value = serde_json::to_value(variant).expect("serialize runtime state");
1540            assert_eq!(value, json!(expected));
1541            let decoded: BridgeMemberRuntimeState =
1542                serde_json::from_value(value).expect("decode runtime state");
1543            assert_eq!(decoded, *variant);
1544        }
1545    }
1546
1547    // -----------------------------------------------------------------------
1548    // 6. BridgeRejectionCause — snake_case round-trip for every variant.
1549    // -----------------------------------------------------------------------
1550    //
1551    // Mob-side fallback logic (see `should_fall_back_to_bind`) branches on
1552    // typed causes. Any accidental rename or new variant that skipped the
1553    // snake_case convention would silently change fallback behavior, so pin
1554    // the full matrix.
1555
1556    #[test]
1557    fn bridge_rejection_cause_snake_case_round_trip_all_variants() {
1558        let cases: &[(BridgeRejectionCause, &str)] = &[
1559            (BridgeRejectionCause::NotBound, "not_bound"),
1560            (BridgeRejectionCause::StaleSupervisor, "stale_supervisor"),
1561            (BridgeRejectionCause::SenderMismatch, "sender_mismatch"),
1562            (BridgeRejectionCause::AlreadyBound, "already_bound"),
1563            (
1564                BridgeRejectionCause::InvalidBootstrapToken,
1565                "invalid_bootstrap_token",
1566            ),
1567            (
1568                BridgeRejectionCause::UnsupportedProtocolVersion,
1569                "unsupported_protocol_version",
1570            ),
1571            (
1572                BridgeRejectionCause::InvalidSupervisorSpec,
1573                "invalid_supervisor_spec",
1574            ),
1575            (BridgeRejectionCause::InvalidPeerSpec, "invalid_peer_spec"),
1576            (BridgeRejectionCause::AddressMismatch, "address_mismatch"),
1577            (BridgeRejectionCause::Unsupported, "unsupported"),
1578            (BridgeRejectionCause::Internal, "internal"),
1579        ];
1580        for (cause, expected) in cases {
1581            let value = serde_json::to_value(cause).expect("serialize cause");
1582            assert_eq!(
1583                value,
1584                json!(expected),
1585                "cause {cause:?} must serialize as {expected:?}"
1586            );
1587            let decoded: BridgeRejectionCause =
1588                serde_json::from_value(value).expect("decode cause");
1589            assert_eq!(decoded, *cause);
1590        }
1591    }
1592
1593    // -----------------------------------------------------------------------
1594    // 7. BridgeReply — protocol v2 round-trip for every success variant.
1595    // -----------------------------------------------------------------------
1596    //
1597    // The dispatcher in `comms_drain.rs` constructs exactly these variants;
1598    // the mob side decodes them via `send_bridge_command_typed`. A rename
1599    // would break the typed dispatch silently, so pin every `result` tag.
1600
1601    fn assert_reply_round_trip(reply: BridgeReply, expected: serde_json::Value) {
1602        let value = serde_json::to_value(&reply).expect("serialize reply");
1603        assert_eq!(value, expected, "reply wire shape must be stable");
1604        let _decoded: BridgeReply = serde_json::from_value(value).expect("decode reply");
1605    }
1606
1607    #[test]
1608    fn bridge_reply_unknown_field_fails_closed() {
1609        let err = serde_json::from_value::<BridgeReply>(json!({
1610            "result": "ack",
1611            "ok": true,
1612            "extra_behavior": true,
1613        }))
1614        .expect_err("unknown reply fields must fail at serde boundary");
1615        let message = err.to_string();
1616        assert!(
1617            message.contains("extra_behavior") || message.contains("unknown field"),
1618            "expected unknown field error, got: {message}"
1619        );
1620    }
1621
1622    #[test]
1623    fn bridge_reply_bind_member_ack_round_trip() {
1624        assert_reply_round_trip(
1625            BridgeReply::BindMember(BridgeBindResponse {
1626                peer_id: "peer-x".to_string(),
1627                address: "inproc://peer-x".to_string(),
1628                capabilities: BridgeCapabilities::default(),
1629            }),
1630            json!({
1631                "result": "bind_member",
1632                "peer_id": "peer-x",
1633                "address": "inproc://peer-x",
1634                "capabilities": {
1635                    "current_protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1636                    "default_protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1637                    "supported_protocol_versions": [SUPERVISOR_BRIDGE_PROTOCOL_VERSION],
1638                    "deliver_member_input": false,
1639                    "observe_member": false,
1640                    "interrupt_member": false,
1641                    "hard_cancel_member": false,
1642                    "retire_member": false,
1643                    "destroy_member": false,
1644                    "wire_member": false,
1645                    "unwire_member": false,
1646                },
1647            }),
1648        );
1649    }
1650
1651    #[test]
1652    fn bridge_reply_ack_round_trip() {
1653        assert_reply_round_trip(
1654            BridgeReply::Ack(BridgeAck { ok: true }),
1655            json!({ "result": "ack", "ok": true }),
1656        );
1657    }
1658
1659    #[test]
1660    fn bridge_reply_observation_round_trip() {
1661        assert_reply_round_trip(
1662            BridgeReply::Observation(BridgeObservationResponse {
1663                state: BridgeMemberRuntimeState::Running,
1664                accepting_inputs: None,
1665                current_run_id: None,
1666                peer_connectivity: None,
1667                last_error: None,
1668                observed_at: "2026-04-17T00:00:00Z".to_string(),
1669            }),
1670            json!({
1671                "result": "observation",
1672                "state": "running",
1673                "observed_at": "2026-04-17T00:00:00Z",
1674            }),
1675        );
1676    }
1677
1678    #[test]
1679    fn bridge_reply_delivery_round_trip() {
1680        assert_reply_round_trip(
1681            BridgeReply::Delivery(BridgeDeliveryResponse {
1682                input_id: "in-1".to_string(),
1683                canonical_input_id: None,
1684                outcome: BridgeDeliveryOutcome::Accepted,
1685            }),
1686            json!({
1687                "result": "delivery",
1688                "input_id": "in-1",
1689                "canonical_input_id": null,
1690                "outcome": { "outcome": "accepted" },
1691            }),
1692        );
1693
1694        assert_reply_round_trip(
1695            BridgeReply::Delivery(BridgeDeliveryResponse {
1696                input_id: "in-2".to_string(),
1697                canonical_input_id: None,
1698                outcome: BridgeDeliveryOutcome::Rejected {
1699                    cause: BridgeDeliveryRejectionCause::DurabilityViolation {
1700                        detail: "derived durable input cannot be accepted".to_string(),
1701                    },
1702                    reason: "derived durable input cannot be accepted".to_string(),
1703                },
1704            }),
1705            json!({
1706                "result": "delivery",
1707                "input_id": "in-2",
1708                "canonical_input_id": null,
1709                "outcome": {
1710                    "outcome": "rejected",
1711                    "cause": {
1712                        "kind": "durability_violation",
1713                        "detail": "derived durable input cannot be accepted",
1714                    },
1715                    "reason": "derived durable input cannot be accepted",
1716                },
1717            }),
1718        );
1719    }
1720
1721    #[test]
1722    fn bridge_delivery_not_ready_carries_typed_member_state() {
1723        let outcome = BridgeDeliveryOutcome::Rejected {
1724            cause: BridgeDeliveryRejectionCause::NotReady {
1725                state: BridgeMemberRuntimeState::Stopped,
1726            },
1727            reason: "runtime not accepting input while in state: stopped".to_string(),
1728        };
1729        let value = serde_json::to_value(&outcome).expect("serialize outcome");
1730        assert_eq!(
1731            value,
1732            json!({
1733                "outcome": "rejected",
1734                "cause": {
1735                    "kind": "not_ready",
1736                    "state": "stopped",
1737                },
1738                "reason": "runtime not accepting input while in state: stopped",
1739            })
1740        );
1741
1742        let decoded: BridgeDeliveryOutcome =
1743            serde_json::from_value(value).expect("legacy wire state string remains compatible");
1744        assert_eq!(decoded, outcome);
1745    }
1746
1747    #[test]
1748    fn bridge_reply_retire_round_trip() {
1749        assert_reply_round_trip(
1750            BridgeReply::Retire(BridgeRetireResponse {
1751                inputs_abandoned: 2,
1752                inputs_pending_drain: 0,
1753            }),
1754            json!({
1755                "result": "retire",
1756                "inputs_abandoned": 2,
1757                "inputs_pending_drain": 0,
1758            }),
1759        );
1760    }
1761
1762    #[test]
1763    fn bridge_reply_destroy_round_trip() {
1764        assert_reply_round_trip(
1765            BridgeReply::Destroy(BridgeDestroyResponse {
1766                inputs_abandoned: 3,
1767            }),
1768            json!({
1769                "result": "destroy",
1770                "inputs_abandoned": 3,
1771            }),
1772        );
1773    }
1774
1775    // -----------------------------------------------------------------------
1776    // 8. BridgeBootstrapToken::fmt::Debug must redact the body.
1777    // -----------------------------------------------------------------------
1778    //
1779    // The whole point of the newtype is that `{:?}` in tracing/panic
1780    // messages never leaks the raw secret. Regressing the Debug impl to the
1781    // default derive would silently reintroduce the leak, so pin the format.
1782
1783    #[test]
1784    fn bridge_bootstrap_token_debug_redacts_nonempty_body() {
1785        let token = BridgeBootstrapToken::new("super-secret-bootstrap");
1786        let rendered = format!("{token:?}");
1787        assert_eq!(
1788            rendered,
1789            format!(
1790                "BridgeBootstrapToken(<redacted, {}B>)",
1791                "super-secret-bootstrap".len()
1792            )
1793        );
1794        assert!(
1795            !rendered.contains("super-secret-bootstrap"),
1796            "Debug output must not contain the raw token body"
1797        );
1798    }
1799
1800    #[test]
1801    fn bridge_bootstrap_token_debug_marks_empty_token() {
1802        let token = BridgeBootstrapToken::new("");
1803        assert_eq!(format!("{token:?}"), "BridgeBootstrapToken(empty)");
1804    }
1805
1806    // -----------------------------------------------------------------------
1807    // 9. BridgeBootstrapToken — `#[serde(transparent)]` round-trip.
1808    // -----------------------------------------------------------------------
1809    //
1810    // Wire format is a bare JSON string. Older clients and persisted
1811    // records that store `"tok-abc"` must still decode unchanged.
1812
1813    #[test]
1814    fn bridge_bootstrap_token_serde_is_transparent_over_string() {
1815        let token = BridgeBootstrapToken::new("tok-abc");
1816        let value = serde_json::to_value(&token).expect("serialize token");
1817        assert_eq!(value, json!("tok-abc"));
1818        let decoded: BridgeBootstrapToken =
1819            serde_json::from_value(json!("tok-abc")).expect("decode token");
1820        assert_eq!(decoded, token);
1821        // And as a plain string
1822        let s = serde_json::to_string(&token).expect("serialize string");
1823        assert_eq!(s, "\"tok-abc\"");
1824    }
1825
1826    // -----------------------------------------------------------------------
1827    // 12. Wire compat on M1: pre-newtype raw JSON must deserialize into the
1828    //     newtype BridgeBindPayload and serialize back byte-for-byte.
1829    // -----------------------------------------------------------------------
1830    //
1831    // Pins that introducing BridgeBootstrapToken did NOT break wire compat
1832    // with supervisors/members that emit a plain string bootstrap_token.
1833
1834    #[test]
1835    fn bridge_bind_payload_wire_compat_with_plain_string_bootstrap_token() {
1836        let raw = json!({
1837            "supervisor": {
1838                "name": "mob/__mob_supervisor__",
1839                "peer_id": "00000000-0000-0000-0000-00000000bbbb",
1840                "address": "inproc://mob/__mob_supervisor__",
1841            },
1842            "epoch": 7,
1843            "protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1844            "expected_peer_id": "00000000-0000-0000-0000-00000000aaaa",
1845            "expected_address": "inproc://member",
1846            "bootstrap_token": "tok-raw-string",
1847        });
1848        let payload: BridgeBindPayload =
1849            serde_json::from_value(raw.clone()).expect("decode pre-newtype payload");
1850        assert_eq!(payload.bootstrap_token.as_str(), "tok-raw-string");
1851        assert_eq!(payload.supervisor.pubkey, [0u8; 32]);
1852        let reencoded = serde_json::to_value(&payload).expect("reserialize payload");
1853        let mut expected = raw;
1854        expected["supervisor"]["pubkey"] = json!(vec![0u8; 32]);
1855        assert_eq!(
1856            reencoded, expected,
1857            "pre-pubkey payloads must decode and reserialize with the defaulted pubkey"
1858        );
1859    }
1860}