1use meerkat_core::comms::{PeerAddress, PeerId, PeerName, TrustedPeerDescriptor};
9use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
10use std::fmt;
11
12pub use meerkat_core::comms::SUPERVISOR_BRIDGE_INTENT;
17pub const SUPERVISOR_BRIDGE_BOOTSTRAP_TOKEN_PARAM: &str = "mob_supervisor_bootstrap_token";
19#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
26#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
27pub struct BridgeProtocolVersion(u32);
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct UnsupportedBridgeProtocolVersion {
32 raw: u32,
33}
34
35impl fmt::Display for UnsupportedBridgeProtocolVersion {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 write!(
38 f,
39 "unsupported supervisor bridge protocol version {} (supported {:?}; default {})",
40 self.raw,
41 BridgeProtocolVersion::SUPPORTED,
42 BridgeProtocolVersion::DEFAULT
43 )
44 }
45}
46
47impl std::error::Error for UnsupportedBridgeProtocolVersion {}
48
49impl BridgeProtocolVersion {
50 pub const V2: Self = Self(2);
52 pub const CURRENT: Self = Self::V2;
54 pub const DEFAULT: Self = Self::V2;
56 pub const SUPPORTED: &'static [Self] = &[Self::V2];
58
59 pub const fn is_supported(self) -> bool {
60 matches!(self.0, 2)
61 }
62
63 pub const fn same_protocol_as(self, other: Self) -> bool {
64 self.0 == other.0
65 }
66
67 pub fn supported() -> &'static [Self] {
68 Self::SUPPORTED
69 }
70
71 fn from_supported_u32(raw: u32) -> Result<Self, UnsupportedBridgeProtocolVersion> {
72 match raw {
73 2 => Ok(Self::V2),
74 _ => Err(UnsupportedBridgeProtocolVersion { raw }),
75 }
76 }
77}
78
79impl Default for BridgeProtocolVersion {
80 fn default() -> Self {
81 Self::DEFAULT
82 }
83}
84
85impl fmt::Debug for BridgeProtocolVersion {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 write!(f, "{}", self.0)
88 }
89}
90
91impl fmt::Display for BridgeProtocolVersion {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 write!(f, "{}", self.0)
94 }
95}
96
97impl TryFrom<u32> for BridgeProtocolVersion {
98 type Error = UnsupportedBridgeProtocolVersion;
99
100 fn try_from(raw: u32) -> Result<Self, Self::Error> {
101 Self::from_supported_u32(raw)
102 }
103}
104
105impl Serialize for BridgeProtocolVersion {
106 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
107 where
108 S: Serializer,
109 {
110 serializer.serialize_u32(self.0)
111 }
112}
113
114impl<'de> Deserialize<'de> for BridgeProtocolVersion {
115 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
116 where
117 D: Deserializer<'de>,
118 {
119 let raw = u32::deserialize(deserializer)?;
120 Self::from_supported_u32(raw).map_err(de::Error::custom)
121 }
122}
123
124pub const SUPERVISOR_BRIDGE_PROTOCOL_VERSION: BridgeProtocolVersion =
134 BridgeProtocolVersion::CURRENT;
135pub const SUPERVISOR_BRIDGE_CURRENT_PROTOCOL_VERSION: BridgeProtocolVersion =
137 BridgeProtocolVersion::CURRENT;
138pub const SUPERVISOR_BRIDGE_DEFAULT_PROTOCOL_VERSION: BridgeProtocolVersion =
140 BridgeProtocolVersion::DEFAULT;
141pub const SUPERVISOR_BRIDGE_SUPPORTED_PROTOCOL_VERSIONS: &[BridgeProtocolVersion] =
143 BridgeProtocolVersion::SUPPORTED;
144
145pub const fn supervisor_bridge_current_protocol_version() -> BridgeProtocolVersion {
147 SUPERVISOR_BRIDGE_CURRENT_PROTOCOL_VERSION
148}
149
150pub const fn supervisor_bridge_default_protocol_version() -> BridgeProtocolVersion {
152 SUPERVISOR_BRIDGE_DEFAULT_PROTOCOL_VERSION
153}
154
155pub fn supervisor_bridge_supported_protocol_versions() -> &'static [BridgeProtocolVersion] {
157 SUPERVISOR_BRIDGE_SUPPORTED_PROTOCOL_VERSIONS
158}
159
160pub fn supervisor_bridge_protocol_version_supported(
162 protocol_version: BridgeProtocolVersion,
163) -> bool {
164 protocol_version.is_supported()
165}
166
167fn default_supported_protocol_versions() -> Vec<BridgeProtocolVersion> {
168 supervisor_bridge_supported_protocol_versions().to_vec()
169}
170
171pub fn canonicalize_bridge_address(address: &str) -> String {
173 let Some((base, query)) = address.split_once('?') else {
174 return address.to_string();
175 };
176 let filtered: Vec<&str> = query
177 .split('&')
178 .filter(|pair| {
179 pair.split_once('=')
180 .map(|(key, _)| key != SUPERVISOR_BRIDGE_BOOTSTRAP_TOKEN_PARAM)
181 .unwrap_or(true)
182 })
183 .filter(|pair| !pair.is_empty())
184 .collect();
185 if filtered.is_empty() {
186 base.to_string()
187 } else {
188 format!("{base}?{}", filtered.join("&"))
189 }
190}
191
192#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
198#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
199#[serde(tag = "command", rename_all = "snake_case", deny_unknown_fields)]
200#[non_exhaustive]
201pub enum BridgeCommand {
202 BindMember(BridgeBindPayload),
203 AuthorizeSupervisor(BridgeSupervisorPayload),
204 RevokeSupervisor(BridgeSupervisorPayload),
205 DeliverMemberInput(BridgeDeliveryPayload),
206 ObserveMember(BridgeSupervisorPayload),
207 InterruptMember(BridgeSupervisorPayload),
208 HardCancelMember(BridgeHardCancelPayload),
209 RetireMember(BridgeSupervisorPayload),
210 DestroyMember(BridgeSupervisorPayload),
211 WireMember(BridgePeerWiringPayload),
212 UnwireMember(BridgePeerWiringPayload),
213}
214
215impl BridgeCommand {
216 pub fn protocol_version(&self) -> BridgeProtocolVersion {
218 match self {
219 Self::BindMember(payload) => payload.protocol_version,
220 Self::AuthorizeSupervisor(payload)
221 | Self::RevokeSupervisor(payload)
222 | Self::ObserveMember(payload)
223 | Self::InterruptMember(payload)
224 | Self::RetireMember(payload)
225 | Self::DestroyMember(payload) => payload.protocol_version,
226 Self::HardCancelMember(payload) => payload.protocol_version,
227 Self::DeliverMemberInput(payload) => payload.protocol_version,
228 Self::WireMember(payload) | Self::UnwireMember(payload) => payload.protocol_version,
229 }
230 }
231}
232
233#[derive(Debug)]
235pub enum BridgeCommandDecodeError {
236 UnsupportedProtocolVersion(UnsupportedBridgeProtocolVersion),
237 Invalid(serde_json::Error),
238}
239
240impl fmt::Display for BridgeCommandDecodeError {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 match self {
243 Self::UnsupportedProtocolVersion(error) => error.fmt(f),
244 Self::Invalid(error) => error.fmt(f),
245 }
246 }
247}
248
249impl std::error::Error for BridgeCommandDecodeError {
250 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
251 match self {
252 Self::UnsupportedProtocolVersion(error) => Some(error),
253 Self::Invalid(error) => Some(error),
254 }
255 }
256}
257
258pub fn decode_bridge_command(
260 value: serde_json::Value,
261) -> Result<BridgeCommand, BridgeCommandDecodeError> {
262 if let Some(raw) = value
263 .get("protocol_version")
264 .and_then(serde_json::Value::as_u64)
265 .and_then(|raw| u32::try_from(raw).ok())
266 {
267 BridgeProtocolVersion::from_supported_u32(raw)
268 .map_err(BridgeCommandDecodeError::UnsupportedProtocolVersion)?;
269 }
270 serde_json::from_value(value).map_err(BridgeCommandDecodeError::Invalid)
271}
272
273#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
279#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(tag = "result", rename_all = "snake_case", deny_unknown_fields)]
281#[non_exhaustive]
282pub enum BridgeReply {
283 BindMember(BridgeBindResponse),
284 Ack(BridgeAck),
285 Observation(BridgeObservationResponse),
286 Delivery(BridgeDeliveryResponse),
287 Retire(BridgeRetireResponse),
288 Destroy(BridgeDestroyResponse),
289 Rejected {
290 cause: BridgeRejectionCause,
291 reason: String,
292 },
293}
294
295#[derive(Debug, Clone, PartialEq, Eq)]
301#[non_exhaustive]
302pub enum BridgeRejectionReply {
303 Typed {
304 cause: BridgeRejectionCause,
305 reason: String,
306 },
307 LegacyV1RawString {
308 reason: String,
309 },
310}
311
312impl BridgeRejectionReply {
313 pub fn reason(&self) -> &str {
314 match self {
315 Self::Typed { reason, .. } | Self::LegacyV1RawString { reason } => reason,
316 }
317 }
318
319 pub fn typed_cause(&self) -> Option<BridgeRejectionCause> {
320 match self {
321 Self::Typed { cause, .. } => Some(*cause),
322 Self::LegacyV1RawString { .. } => None,
323 }
324 }
325
326 pub fn is_legacy_v1_raw_string(&self) -> bool {
327 matches!(self, Self::LegacyV1RawString { .. })
328 }
329}
330
331pub fn decode_protocol_v2_bridge_rejection(
337 value: &serde_json::Value,
338) -> Option<BridgeRejectionReply> {
339 match serde_json::from_value::<BridgeReply>(value.clone()).ok()? {
340 BridgeReply::Rejected { cause, reason } => {
341 Some(BridgeRejectionReply::Typed { cause, reason })
342 }
343 _ => None,
344 }
345}
346
347pub fn decode_legacy_v1_raw_string_rejection(
349 value: &serde_json::Value,
350) -> Option<BridgeRejectionReply> {
351 value
352 .as_str()
353 .map(|reason| BridgeRejectionReply::LegacyV1RawString {
354 reason: reason.to_string(),
355 })
356}
357
358pub fn decode_bridge_rejection_reply(
364 protocol_version: BridgeProtocolVersion,
365 value: &serde_json::Value,
366) -> Option<BridgeRejectionReply> {
367 let _ = protocol_version;
368 decode_protocol_v2_bridge_rejection(value)
369}
370
371#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
378#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
379#[serde(rename_all = "snake_case")]
380#[non_exhaustive]
381pub enum BridgeRejectionCause {
382 NotBound,
384 StaleSupervisor,
386 SenderMismatch,
388 AlreadyBound,
391 InvalidBootstrapToken,
393 UnsupportedProtocolVersion,
395 InvalidSupervisorSpec,
397 InvalidPeerSpec,
399 AddressMismatch,
402 Unsupported,
404 Internal,
406}
407
408#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
415#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
416#[serde(rename_all = "snake_case")]
417#[non_exhaustive]
418pub enum BridgeRejectionClass {
419 RecoverableBySupervisorRebind,
423 Fatal,
427}
428
429impl BridgeRejectionCause {
430 pub const fn class(self) -> BridgeRejectionClass {
432 match self {
433 Self::NotBound | Self::StaleSupervisor | Self::SenderMismatch => {
434 BridgeRejectionClass::RecoverableBySupervisorRebind
435 }
436 Self::AlreadyBound
437 | Self::InvalidBootstrapToken
438 | Self::UnsupportedProtocolVersion
439 | Self::InvalidSupervisorSpec
440 | Self::InvalidPeerSpec
441 | Self::AddressMismatch
442 | Self::Unsupported
443 | Self::Internal => BridgeRejectionClass::Fatal,
444 }
445 }
446}
447
448#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
454#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
455#[serde(rename_all = "snake_case")]
456#[non_exhaustive]
457pub enum BridgeMemberRuntimeState {
458 Initializing,
459 Idle,
460 Attached,
461 Running,
462 Retired,
463 Stopped,
464 Destroyed,
465}
466
467impl std::fmt::Display for BridgeMemberRuntimeState {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 match self {
470 Self::Initializing => write!(f, "initializing"),
471 Self::Idle => write!(f, "idle"),
472 Self::Attached => write!(f, "attached"),
473 Self::Running => write!(f, "running"),
474 Self::Retired => write!(f, "retired"),
475 Self::Stopped => write!(f, "stopped"),
476 Self::Destroyed => write!(f, "destroyed"),
477 }
478 }
479}
480
481#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
495#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496#[serde(deny_unknown_fields)]
497pub struct BridgePeerSpec {
498 pub name: String,
499 pub peer_id: String,
500 pub address: String,
501 #[serde(default)]
506 pub pubkey: [u8; 32],
507}
508
509#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
511pub struct BridgePeerPubKey([u8; 32]);
512
513impl BridgePeerPubKey {
514 pub const fn new(bytes: [u8; 32]) -> Self {
515 Self(bytes)
516 }
517
518 pub const fn as_bytes(&self) -> &[u8; 32] {
519 &self.0
520 }
521
522 pub const fn into_bytes(self) -> [u8; 32] {
523 self.0
524 }
525
526 pub const fn is_zero(&self) -> bool {
527 let mut index = 0;
528 while index < self.0.len() {
529 if self.0[index] != 0 {
530 return false;
531 }
532 index += 1;
533 }
534 true
535 }
536
537 pub fn derived_peer_id(&self) -> PeerId {
538 PeerId::from_ed25519_pubkey(&self.0)
539 }
540}
541
542impl From<[u8; 32]> for BridgePeerPubKey {
543 fn from(bytes: [u8; 32]) -> Self {
544 Self::new(bytes)
545 }
546}
547
548#[derive(Debug, Clone, PartialEq, Eq)]
550pub struct BridgePeerIdentity {
551 pub name: PeerName,
552 pub peer_id: PeerId,
553 pub address: PeerAddress,
554 pub pubkey: BridgePeerPubKey,
555}
556
557impl BridgePeerIdentity {
558 pub fn into_trusted_peer_descriptor(self) -> TrustedPeerDescriptor {
559 TrustedPeerDescriptor {
560 peer_id: self.peer_id,
561 name: self.name,
562 address: self.address,
563 pubkey: self.pubkey.into_bytes(),
564 }
565 }
566}
567
568#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
570#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
571#[serde(rename_all = "snake_case")]
572#[non_exhaustive]
573pub enum BridgePeerConnectivity {
574 Reachable,
575 Unreachable,
576 Unknown,
577}
578
579impl From<TrustedPeerDescriptor> for BridgePeerSpec {
580 fn from(spec: TrustedPeerDescriptor) -> Self {
581 Self {
582 name: spec.name.as_str().to_string(),
583 peer_id: spec.peer_id.as_str(),
584 address: spec.address.to_string(),
585 pubkey: spec.pubkey,
586 }
587 }
588}
589
590impl TryFrom<BridgePeerSpec> for meerkat_core::comms::TrustedPeerDescriptor {
591 type Error = String;
592
593 fn try_from(spec: BridgePeerSpec) -> Result<Self, Self::Error> {
594 Self::try_from(&spec)
595 }
596}
597
598impl TryFrom<&BridgePeerSpec> for BridgePeerIdentity {
599 type Error = String;
600
601 fn try_from(spec: &BridgePeerSpec) -> Result<Self, Self::Error> {
602 let peer_id = PeerId::parse(&spec.peer_id).map_err(|e| format!("invalid peer_id: {e}"))?;
603 let name =
604 PeerName::new(spec.name.clone()).map_err(|e| format!("invalid peer name: {e}"))?;
605 let address = parse_peer_address(&spec.address)?;
606 let pubkey = BridgePeerPubKey::new(spec.pubkey);
607 if pubkey.is_zero() {
608 return Err("peer pubkey must be non-zero".to_string());
609 }
610 let derived = pubkey.derived_peer_id();
611 if derived != peer_id {
612 return Err(format!(
613 "peer_id {peer_id} does not match pubkey-derived id {derived}"
614 ));
615 }
616 Ok(Self {
617 name,
618 peer_id,
619 address,
620 pubkey,
621 })
622 }
623}
624
625impl TryFrom<&BridgePeerSpec> for TrustedPeerDescriptor {
626 type Error = String;
627
628 fn try_from(spec: &BridgePeerSpec) -> Result<Self, Self::Error> {
629 BridgePeerIdentity::try_from(spec).map(BridgePeerIdentity::into_trusted_peer_descriptor)
630 }
631}
632
633fn parse_peer_address(raw: &str) -> Result<PeerAddress, String> {
634 PeerAddress::parse(raw).map_err(|err| err.to_string())
635}
636
637#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
643#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
644#[serde(deny_unknown_fields)]
645pub struct BridgeSupervisorPayload {
646 pub supervisor: BridgePeerSpec,
647 pub epoch: u64,
648 pub protocol_version: BridgeProtocolVersion,
649}
650
651#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
658#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
659#[serde(deny_unknown_fields)]
660pub struct BridgeHardCancelPayload {
661 pub supervisor: BridgePeerSpec,
662 pub epoch: u64,
663 pub protocol_version: BridgeProtocolVersion,
664 pub reason: String,
665}
666
667#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
675#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
676#[serde(transparent)]
677pub struct BridgeBootstrapToken(String);
678
679impl BridgeBootstrapToken {
680 pub fn new(token: impl Into<String>) -> Self {
681 Self(token.into())
682 }
683
684 pub fn as_str(&self) -> &str {
685 &self.0
686 }
687
688 pub fn into_string(self) -> String {
689 self.0
690 }
691
692 pub fn is_empty(&self) -> bool {
693 self.0.is_empty()
694 }
695
696 pub fn len(&self) -> usize {
697 self.0.len()
698 }
699}
700
701impl From<String> for BridgeBootstrapToken {
702 fn from(token: String) -> Self {
703 Self(token)
704 }
705}
706
707impl From<&str> for BridgeBootstrapToken {
708 fn from(token: &str) -> Self {
709 Self(token.to_string())
710 }
711}
712
713impl std::fmt::Debug for BridgeBootstrapToken {
714 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
715 if self.0.is_empty() {
716 write!(f, "BridgeBootstrapToken(empty)")
717 } else {
718 write!(f, "BridgeBootstrapToken(<redacted, {}B>)", self.0.len())
719 }
720 }
721}
722
723#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
730#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
731#[serde(deny_unknown_fields)]
732pub struct BridgeBindPayload {
733 pub supervisor: BridgePeerSpec,
734 pub epoch: u64,
735 pub protocol_version: BridgeProtocolVersion,
736 pub expected_peer_id: String,
738 pub expected_address: String,
739 pub bootstrap_token: BridgeBootstrapToken,
740}
741
742#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
744#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
745#[serde(deny_unknown_fields)]
746pub struct BridgeCapabilities {
747 #[serde(default = "supervisor_bridge_current_protocol_version")]
749 pub current_protocol_version: BridgeProtocolVersion,
750 #[serde(default = "supervisor_bridge_default_protocol_version")]
752 pub default_protocol_version: BridgeProtocolVersion,
753 #[serde(default = "default_supported_protocol_versions")]
755 pub supported_protocol_versions: Vec<BridgeProtocolVersion>,
756 #[serde(default)]
757 pub deliver_member_input: bool,
758 #[serde(default)]
759 pub observe_member: bool,
760 #[serde(default)]
761 pub interrupt_member: bool,
762 #[serde(default)]
763 pub hard_cancel_member: bool,
764 #[serde(default)]
765 pub retire_member: bool,
766 #[serde(default)]
767 pub destroy_member: bool,
768 #[serde(default)]
769 pub wire_member: bool,
770 #[serde(default)]
771 pub unwire_member: bool,
772}
773
774impl Default for BridgeCapabilities {
775 fn default() -> Self {
776 Self {
777 current_protocol_version: supervisor_bridge_current_protocol_version(),
778 default_protocol_version: supervisor_bridge_default_protocol_version(),
779 supported_protocol_versions: supervisor_bridge_supported_protocol_versions().to_vec(),
780 deliver_member_input: false,
781 observe_member: false,
782 interrupt_member: false,
783 hard_cancel_member: false,
784 retire_member: false,
785 destroy_member: false,
786 wire_member: false,
787 unwire_member: false,
788 }
789 }
790}
791
792#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
798#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
799#[serde(deny_unknown_fields)]
800pub struct BridgeBindResponse {
801 pub peer_id: String,
803 pub address: String,
804 pub capabilities: BridgeCapabilities,
805}
806
807#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
809#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
810#[serde(deny_unknown_fields)]
811pub struct BridgeAck {
812 pub ok: bool,
813}
814
815#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
817#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
818#[serde(deny_unknown_fields)]
819pub struct BridgeDeliveryPayload {
820 pub supervisor: BridgePeerSpec,
821 pub epoch: u64,
822 pub protocol_version: BridgeProtocolVersion,
823 pub input_id: String,
824 pub content: meerkat_core::types::ContentInput,
825 pub handling_mode: meerkat_core::types::HandlingMode,
826}
827
828#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
830#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
831#[serde(tag = "outcome", rename_all = "snake_case", deny_unknown_fields)]
832pub enum BridgeDeliveryOutcome {
833 Accepted,
834 Deduplicated {
835 existing_input_id: String,
836 },
837 Rejected {
838 cause: BridgeDeliveryRejectionCause,
839 reason: String,
840 },
841}
842
843#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
850#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
851#[serde(tag = "kind", rename_all = "snake_case", deny_unknown_fields)]
852#[non_exhaustive]
853pub enum BridgeDeliveryRejectionCause {
854 NotReady { state: BridgeMemberRuntimeState },
856 DurabilityViolation { detail: String },
858 PeerHandlingModeInvalid { detail: String },
860 Internal { detail: String },
862}
863
864impl std::fmt::Display for BridgeDeliveryRejectionCause {
865 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
866 match self {
867 Self::NotReady { state } => write!(f, "not_ready(state={state})"),
868 Self::DurabilityViolation { detail } => {
869 write!(f, "durability_violation(detail={detail})")
870 }
871 Self::PeerHandlingModeInvalid { detail } => {
872 write!(f, "peer_handling_mode_invalid(detail={detail})")
873 }
874 Self::Internal { detail } => write!(f, "internal(detail={detail})"),
875 }
876 }
877}
878
879#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
881#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
882#[serde(deny_unknown_fields)]
883pub struct BridgeDeliveryResponse {
884 pub input_id: String,
885 pub canonical_input_id: Option<String>,
886 pub outcome: BridgeDeliveryOutcome,
887}
888
889#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
891#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
892#[serde(deny_unknown_fields)]
893pub struct BridgePeerWiringPayload {
894 pub supervisor: BridgePeerSpec,
895 pub epoch: u64,
896 pub protocol_version: BridgeProtocolVersion,
897 pub peer_spec: BridgePeerSpec,
898}
899
900#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
902#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
903#[serde(deny_unknown_fields)]
904pub struct BridgeRetireResponse {
905 pub inputs_abandoned: usize,
906 pub inputs_pending_drain: usize,
907}
908
909#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
911#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
912#[serde(deny_unknown_fields)]
913pub struct BridgeDestroyResponse {
914 pub inputs_abandoned: usize,
915}
916
917#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
919#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
920#[serde(deny_unknown_fields)]
921pub struct BridgeObservationResponse {
922 pub state: BridgeMemberRuntimeState,
924 #[serde(default, skip_serializing_if = "Option::is_none")]
925 pub accepting_inputs: Option<bool>,
926 #[serde(default, skip_serializing_if = "Option::is_none")]
928 pub current_run_id: Option<String>,
929 #[serde(default, skip_serializing_if = "Option::is_none")]
930 pub peer_connectivity: Option<BridgePeerConnectivity>,
931 #[serde(default, skip_serializing_if = "Option::is_none")]
932 pub last_error: Option<String>,
933 pub observed_at: String,
935}
936
937impl BridgeObservationResponse {
938 pub fn new(
940 state: BridgeMemberRuntimeState,
941 accepting_inputs: Option<bool>,
942 current_run_id: Option<String>,
943 peer_connectivity: Option<BridgePeerConnectivity>,
944 last_error: Option<String>,
945 observed_at: String,
946 ) -> Self {
947 Self {
948 state,
949 accepting_inputs,
950 current_run_id,
951 peer_connectivity,
952 last_error,
953 observed_at,
954 }
955 }
956}
957
958#[cfg(test)]
959#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
960mod tests {
961 use super::*;
962 use serde_json::json;
963
964 #[test]
965 fn observation_response_new_sets_observation_fields() {
966 let response = BridgeObservationResponse::new(
967 BridgeMemberRuntimeState::Running,
968 Some(true),
969 Some("run-1".to_string()),
970 Some(BridgePeerConnectivity::Reachable),
971 None,
972 "2026-04-16T07:00:00Z".to_string(),
973 );
974
975 assert_eq!(response.state, BridgeMemberRuntimeState::Running);
976 assert_eq!(response.current_run_id.as_deref(), Some("run-1"));
977 assert_eq!(response.accepting_inputs, Some(true));
978 assert_eq!(
979 response.peer_connectivity,
980 Some(BridgePeerConnectivity::Reachable)
981 );
982 }
983
984 fn sample_peer_spec() -> BridgePeerSpec {
989 BridgePeerSpec {
990 name: "member-a".to_string(),
991 peer_id: "peer-abc".to_string(),
992 address: "tcp://127.0.0.1:7000".to_string(),
993 pubkey: [0u8; 32],
994 }
995 }
996
997 fn sample_supervisor_payload() -> BridgeSupervisorPayload {
998 BridgeSupervisorPayload {
999 supervisor: sample_peer_spec(),
1000 epoch: 42,
1001 protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1002 }
1003 }
1004
1005 fn sample_hard_cancel_payload() -> BridgeHardCancelPayload {
1006 BridgeHardCancelPayload {
1007 supervisor: sample_peer_spec(),
1008 epoch: 42,
1009 protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1010 reason: "test hard cancel".to_string(),
1011 }
1012 }
1013
1014 fn sample_wiring_payload() -> BridgePeerWiringPayload {
1015 BridgePeerWiringPayload {
1016 supervisor: sample_peer_spec(),
1017 epoch: 7,
1018 protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1019 peer_spec: BridgePeerSpec {
1020 name: "member-b".to_string(),
1021 peer_id: "peer-xyz".to_string(),
1022 address: "tcp://127.0.0.1:7001".to_string(),
1023 pubkey: [0u8; 32],
1024 },
1025 }
1026 }
1027
1028 fn assert_command_round_trip(cmd: &BridgeCommand) {
1038 let value = serde_json::to_value(cmd).expect("serialize command");
1039 let decoded: BridgeCommand = serde_json::from_value(value.clone()).expect("decode command");
1040 let reencoded = serde_json::to_value(&decoded).expect("reserialize command");
1041 assert_eq!(
1042 value, reencoded,
1043 "BridgeCommand round-trip must preserve wire shape"
1044 );
1045 }
1046
1047 #[test]
1048 fn bridge_command_unknown_top_level_field_fails_closed() {
1049 let mut value =
1050 serde_json::to_value(BridgeCommand::ObserveMember(sample_supervisor_payload()))
1051 .expect("serialize command");
1052 value["extra_behavior"] = json!(true);
1053
1054 let err = serde_json::from_value::<BridgeCommand>(value)
1055 .expect_err("unknown command fields must fail at serde boundary");
1056 let message = err.to_string();
1057 assert!(
1058 message.contains("extra_behavior") || message.contains("unknown field"),
1059 "expected unknown field error, got: {message}"
1060 );
1061 }
1062
1063 #[test]
1064 fn bridge_command_unknown_nested_payload_field_fails_closed() {
1065 let mut value =
1066 serde_json::to_value(BridgeCommand::ObserveMember(sample_supervisor_payload()))
1067 .expect("serialize command");
1068 value["supervisor"]["extra_behavior"] = json!(true);
1069
1070 let err = serde_json::from_value::<BridgeCommand>(value)
1071 .expect_err("unknown nested payload fields must fail at serde boundary");
1072 let message = err.to_string();
1073 assert!(
1074 message.contains("extra_behavior") || message.contains("unknown field"),
1075 "expected unknown field error, got: {message}"
1076 );
1077 }
1078
1079 #[test]
1080 fn bridge_command_bind_member_round_trip() {
1081 let cmd = BridgeCommand::BindMember(BridgeBindPayload {
1082 supervisor: sample_peer_spec(),
1083 epoch: 1,
1084 protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1085 expected_peer_id: "peer-expected".to_string(),
1086 expected_address: "tcp://127.0.0.1:9000".to_string(),
1087 bootstrap_token: "bootstrap-secret".into(),
1088 });
1089 assert_command_round_trip(&cmd);
1090 }
1091
1092 #[test]
1093 fn bridge_peer_spec_rejects_unknown_address_scheme() {
1094 let spec = BridgePeerSpec {
1095 name: "member-a".to_string(),
1096 peer_id: "aaaaaaaa-0000-4000-8000-000000000001".to_string(),
1097 address: "http://127.0.0.1:7000".to_string(),
1098 pubkey: [0u8; 32],
1099 };
1100
1101 let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1102 .expect_err("supervisor bridge peer specs must fail closed on unknown schemes");
1103 assert!(
1104 err.contains("unknown peer address transport"),
1105 "unexpected error: {err}",
1106 );
1107 }
1108
1109 #[test]
1110 fn bridge_peer_spec_rejects_schemeless_address() {
1111 let spec = BridgePeerSpec {
1112 name: "member-a".to_string(),
1113 peer_id: "aaaaaaaa-0000-4000-8000-000000000001".to_string(),
1114 address: "127.0.0.1:7000".to_string(),
1115 pubkey: [0u8; 32],
1116 };
1117
1118 let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1119 .expect_err("supervisor bridge peer specs must fail closed on schemeless addresses");
1120 assert!(
1121 err.contains("missing transport scheme"),
1122 "unexpected error: {err}",
1123 );
1124 }
1125
1126 #[test]
1127 fn bridge_peer_spec_rejects_zero_pubkey() {
1128 let spec = BridgePeerSpec {
1129 name: "member-a".to_string(),
1130 peer_id: PeerId::from_ed25519_pubkey(&[1u8; 32]).to_string(),
1131 address: "tcp://127.0.0.1:7000".to_string(),
1132 pubkey: [0u8; 32],
1133 };
1134
1135 let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1136 .expect_err("supervisor bridge peer specs must fail closed on zero pubkeys");
1137 assert!(
1138 err.contains("pubkey") && err.contains("non-zero"),
1139 "unexpected error: {err}",
1140 );
1141 }
1142
1143 #[test]
1144 fn bridge_peer_spec_missing_pubkey_defaults_to_zero_and_rejects() {
1145 let value = json!({
1146 "name": "member-a",
1147 "peer_id": PeerId::from_ed25519_pubkey(&[2u8; 32]).to_string(),
1148 "address": "tcp://127.0.0.1:7000"
1149 });
1150 let spec: BridgePeerSpec =
1151 serde_json::from_value(value).expect("legacy bridge peer spec should deserialize");
1152
1153 let err = meerkat_core::comms::TrustedPeerDescriptor::try_from(&spec)
1154 .expect_err("missing pubkey must not become trusted zero-key authority");
1155 assert!(
1156 err.contains("pubkey") && err.contains("non-zero"),
1157 "unexpected error: {err}",
1158 );
1159 }
1160
1161 #[test]
1162 fn bridge_command_authorize_supervisor_round_trip() {
1163 let cmd = BridgeCommand::AuthorizeSupervisor(sample_supervisor_payload());
1164 assert_command_round_trip(&cmd);
1165 }
1166
1167 #[test]
1168 fn bridge_command_revoke_supervisor_round_trip() {
1169 let cmd = BridgeCommand::RevokeSupervisor(sample_supervisor_payload());
1170 assert_command_round_trip(&cmd);
1171 }
1172
1173 #[test]
1174 fn bridge_command_deliver_member_input_round_trip() {
1175 let cmd = BridgeCommand::DeliverMemberInput(BridgeDeliveryPayload {
1176 supervisor: sample_peer_spec(),
1177 epoch: 2,
1178 protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1179 input_id: "input-1".to_string(),
1180 content: meerkat_core::types::ContentInput::Text("hello".to_string()),
1181 handling_mode: meerkat_core::types::HandlingMode::Queue,
1182 });
1183 assert_command_round_trip(&cmd);
1184 }
1185
1186 #[test]
1187 fn bridge_command_observe_member_round_trip() {
1188 let cmd = BridgeCommand::ObserveMember(sample_supervisor_payload());
1189 assert_command_round_trip(&cmd);
1190 }
1191
1192 #[test]
1193 fn bridge_command_interrupt_member_round_trip() {
1194 let cmd = BridgeCommand::InterruptMember(sample_supervisor_payload());
1195 assert_command_round_trip(&cmd);
1196 }
1197
1198 #[test]
1199 fn bridge_command_hard_cancel_member_round_trip() {
1200 let cmd = BridgeCommand::HardCancelMember(sample_hard_cancel_payload());
1201 assert_command_round_trip(&cmd);
1202 }
1203
1204 #[test]
1205 fn bridge_command_retire_member_round_trip() {
1206 let cmd = BridgeCommand::RetireMember(sample_supervisor_payload());
1207 assert_command_round_trip(&cmd);
1208 }
1209
1210 #[test]
1211 fn bridge_command_destroy_member_round_trip() {
1212 let cmd = BridgeCommand::DestroyMember(sample_supervisor_payload());
1213 assert_command_round_trip(&cmd);
1214 }
1215
1216 #[test]
1217 fn bridge_command_wire_member_round_trip() {
1218 let cmd = BridgeCommand::WireMember(sample_wiring_payload());
1219 assert_command_round_trip(&cmd);
1220 }
1221
1222 #[test]
1223 fn bridge_command_unwire_member_round_trip() {
1224 let cmd = BridgeCommand::UnwireMember(sample_wiring_payload());
1225 assert_command_round_trip(&cmd);
1226 }
1227
1228 #[test]
1237 fn bridge_reply_rejected_round_trip_with_typed_cause() {
1238 let reply = BridgeReply::Rejected {
1239 cause: BridgeRejectionCause::StaleSupervisor,
1240 reason: "epoch too low".to_string(),
1241 };
1242 let value = serde_json::to_value(&reply).expect("serialize rejected reply");
1243 assert_eq!(
1244 value,
1245 json!({
1246 "result": "rejected",
1247 "cause": "stale_supervisor",
1248 "reason": "epoch too low",
1249 }),
1250 "wire shape must tag rejection with `result` + `cause` + `reason`"
1251 );
1252 let decoded: BridgeReply = serde_json::from_value(value.clone()).expect("decode reply");
1253 match decoded {
1254 BridgeReply::Rejected { cause, ref reason } => {
1255 assert_eq!(cause, BridgeRejectionCause::StaleSupervisor);
1256 assert_eq!(reason, "epoch too low");
1257 }
1258 other => panic!("expected BridgeReply::Rejected, got {other:?}"),
1259 }
1260 let reencoded = serde_json::to_value(&decoded).expect("reserialize reply");
1261 assert_eq!(value, reencoded);
1262 }
1263
1264 #[test]
1265 fn bridge_rejection_decoder_accepts_typed_protocol_v2_rejection() {
1266 let value = json!({
1267 "result": "rejected",
1268 "cause": "sender_mismatch",
1269 "reason": "wrong supervisor",
1270 });
1271
1272 let decoded = decode_bridge_rejection_reply(SUPERVISOR_BRIDGE_PROTOCOL_VERSION, &value)
1273 .expect("typed rejection should decode");
1274
1275 assert_eq!(
1276 decoded.typed_cause(),
1277 Some(BridgeRejectionCause::SenderMismatch)
1278 );
1279 assert_eq!(decoded.reason(), "wrong supervisor");
1280 assert!(!decoded.is_legacy_v1_raw_string());
1281 }
1282
1283 #[test]
1284 fn bridge_rejection_decoder_rejects_raw_string_for_protocol_v2() {
1285 let value = json!("legacy rejection");
1286
1287 assert!(
1288 decode_bridge_rejection_reply(SUPERVISOR_BRIDGE_PROTOCOL_VERSION, &value).is_none(),
1289 "protocol v2 must not promote raw strings into typed rejection causes"
1290 );
1291 }
1292
1293 #[test]
1294 fn bridge_rejection_decoder_isolates_raw_string_to_legacy_v1() {
1295 let value = json!("legacy rejection");
1296
1297 let decoded = decode_legacy_v1_raw_string_rejection(&value)
1298 .expect("legacy raw string should decode only through the explicit v1 helper");
1299
1300 assert_eq!(decoded.typed_cause(), None);
1301 assert_eq!(decoded.reason(), "legacy rejection");
1302 assert!(decoded.is_legacy_v1_raw_string());
1303 }
1304
1305 #[test]
1306 fn bridge_command_reports_payload_protocol_version() {
1307 let command = BridgeCommand::AuthorizeSupervisor(sample_supervisor_payload());
1308
1309 assert_eq!(
1310 command.protocol_version(),
1311 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1312 );
1313 }
1314
1315 #[test]
1316 fn supervisor_bridge_protocol_versions_are_reported_from_single_authority() {
1317 assert_eq!(
1318 supervisor_bridge_current_protocol_version(),
1319 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1320 );
1321 assert_eq!(
1322 supervisor_bridge_default_protocol_version(),
1323 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1324 );
1325 assert_eq!(
1326 supervisor_bridge_supported_protocol_versions(),
1327 &[SUPERVISOR_BRIDGE_PROTOCOL_VERSION]
1328 );
1329 assert!(supervisor_bridge_protocol_version_supported(
1330 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1331 ));
1332 assert!(BridgeProtocolVersion::from_supported_u32(1).is_err());
1333 assert!(BridgeProtocolVersion::from_supported_u32(999).is_err());
1334 }
1335
1336 #[test]
1337 fn bridge_capabilities_default_reports_canonical_protocol_versions() {
1338 let capabilities = BridgeCapabilities::default();
1339 assert_eq!(
1340 capabilities.current_protocol_version,
1341 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1342 );
1343 assert_eq!(
1344 capabilities.default_protocol_version,
1345 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1346 );
1347 assert_eq!(
1348 capabilities.supported_protocol_versions,
1349 vec![SUPERVISOR_BRIDGE_PROTOCOL_VERSION]
1350 );
1351 }
1352
1353 #[test]
1354 fn bridge_capabilities_deserialize_legacy_without_protocol_report() {
1355 let capabilities: BridgeCapabilities = serde_json::from_value(json!({
1356 "deliver_member_input": true,
1357 "observe_member": true,
1358 "interrupt_member": true,
1359 "retire_member": true,
1360 "destroy_member": true,
1361 "wire_member": true,
1362 "unwire_member": true,
1363 }))
1364 .expect("legacy capability payload without protocol report should decode");
1365
1366 assert_eq!(
1367 capabilities.current_protocol_version,
1368 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1369 );
1370 assert_eq!(
1371 capabilities.default_protocol_version,
1372 SUPERVISOR_BRIDGE_PROTOCOL_VERSION
1373 );
1374 assert_eq!(
1375 capabilities.supported_protocol_versions,
1376 vec![SUPERVISOR_BRIDGE_PROTOCOL_VERSION]
1377 );
1378 assert!(capabilities.deliver_member_input);
1379 assert!(capabilities.observe_member);
1380 assert!(capabilities.interrupt_member);
1381 assert!(!capabilities.hard_cancel_member);
1382 assert!(capabilities.retire_member);
1383 assert!(capabilities.destroy_member);
1384 assert!(capabilities.wire_member);
1385 assert!(capabilities.unwire_member);
1386 }
1387
1388 #[test]
1389 fn bridge_bind_payload_rejects_unsupported_protocol_version_at_wire_boundary() {
1390 let raw = json!({
1391 "supervisor": {
1392 "name": "mob/__mob_supervisor__",
1393 "peer_id": "00000000-0000-0000-0000-00000000bbbb",
1394 "address": "inproc://mob/__mob_supervisor__",
1395 },
1396 "epoch": 7,
1397 "protocol_version": 999,
1398 "expected_peer_id": "00000000-0000-0000-0000-00000000aaaa",
1399 "expected_address": "inproc://member",
1400 "bootstrap_token": "tok-raw-string",
1401 });
1402
1403 let error = serde_json::from_value::<BridgeBindPayload>(raw)
1404 .expect_err("unsupported protocol versions must fail closed at decode");
1405
1406 assert!(
1407 error
1408 .to_string()
1409 .contains("unsupported supervisor bridge protocol version"),
1410 "unexpected error: {error}",
1411 );
1412 }
1413
1414 #[test]
1415 fn bridge_capabilities_reject_unsupported_protocol_versions_at_wire_boundary() {
1416 let raw = json!({
1417 "current_protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1418 "default_protocol_version": 999,
1419 "supported_protocol_versions": [SUPERVISOR_BRIDGE_PROTOCOL_VERSION],
1420 "deliver_member_input": true,
1421 });
1422
1423 let error = serde_json::from_value::<BridgeCapabilities>(raw)
1424 .expect_err("unsupported advertised defaults must fail closed at decode");
1425
1426 assert!(
1427 error
1428 .to_string()
1429 .contains("unsupported supervisor bridge protocol version"),
1430 "unexpected error: {error}",
1431 );
1432 }
1433
1434 #[test]
1441 fn observation_response_round_trip_all_optional_present() {
1442 let response = BridgeObservationResponse {
1443 state: BridgeMemberRuntimeState::Running,
1444 accepting_inputs: Some(true),
1445 current_run_id: Some("run-42".to_string()),
1446 peer_connectivity: Some(BridgePeerConnectivity::Reachable),
1447 last_error: Some("transient network blip".to_string()),
1448 observed_at: "2026-04-16T07:00:00Z".to_string(),
1449 };
1450 let value = serde_json::to_value(&response).expect("serialize observation");
1451 assert_eq!(
1452 value,
1453 json!({
1454 "state": "running",
1455 "accepting_inputs": true,
1456 "current_run_id": "run-42",
1457 "peer_connectivity": "reachable",
1458 "last_error": "transient network blip",
1459 "observed_at": "2026-04-16T07:00:00Z",
1460 })
1461 );
1462 let decoded: BridgeObservationResponse =
1463 serde_json::from_value(value.clone()).expect("decode observation");
1464 assert_eq!(decoded, response);
1465 let reencoded = serde_json::to_value(&decoded).expect("reserialize observation");
1466 assert_eq!(value, reencoded);
1467 }
1468
1469 #[test]
1470 fn observation_response_round_trip_all_optional_absent() {
1471 let response = BridgeObservationResponse {
1472 state: BridgeMemberRuntimeState::Idle,
1473 accepting_inputs: None,
1474 current_run_id: None,
1475 peer_connectivity: None,
1476 last_error: None,
1477 observed_at: "2026-04-16T07:01:00Z".to_string(),
1478 };
1479 let value = serde_json::to_value(&response).expect("serialize observation");
1480 assert_eq!(
1481 value,
1482 json!({
1483 "state": "idle",
1484 "observed_at": "2026-04-16T07:01:00Z",
1485 }),
1486 "absent optional fields must be skipped on the wire"
1487 );
1488 let decoded: BridgeObservationResponse =
1489 serde_json::from_value(value.clone()).expect("decode observation");
1490 assert_eq!(decoded, response);
1491 let reencoded = serde_json::to_value(&decoded).expect("reserialize observation");
1492 assert_eq!(value, reencoded);
1493 }
1494
1495 #[test]
1500 fn peer_connectivity_serializes_as_snake_case() {
1501 for (variant, expected) in [
1502 (BridgePeerConnectivity::Reachable, "reachable"),
1503 (BridgePeerConnectivity::Unreachable, "unreachable"),
1504 (BridgePeerConnectivity::Unknown, "unknown"),
1505 ] {
1506 let value = serde_json::to_value(variant).expect("serialize connectivity");
1507 assert_eq!(
1508 value,
1509 json!(expected),
1510 "variant {variant:?} must serialize as {expected:?}"
1511 );
1512 let decoded: BridgePeerConnectivity =
1513 serde_json::from_value(value).expect("decode connectivity");
1514 assert_eq!(decoded, variant);
1515 }
1516 }
1517
1518 #[test]
1523 fn member_runtime_state_display_and_round_trip_all_variants() {
1524 let cases: &[(BridgeMemberRuntimeState, &str)] = &[
1525 (BridgeMemberRuntimeState::Initializing, "initializing"),
1526 (BridgeMemberRuntimeState::Idle, "idle"),
1527 (BridgeMemberRuntimeState::Attached, "attached"),
1528 (BridgeMemberRuntimeState::Running, "running"),
1529 (BridgeMemberRuntimeState::Retired, "retired"),
1530 (BridgeMemberRuntimeState::Stopped, "stopped"),
1531 (BridgeMemberRuntimeState::Destroyed, "destroyed"),
1532 ];
1533 for (variant, expected) in cases {
1534 assert_eq!(
1535 variant.to_string(),
1536 *expected,
1537 "Display output must match snake_case wire form for {variant:?}"
1538 );
1539 let value = serde_json::to_value(variant).expect("serialize runtime state");
1540 assert_eq!(value, json!(expected));
1541 let decoded: BridgeMemberRuntimeState =
1542 serde_json::from_value(value).expect("decode runtime state");
1543 assert_eq!(decoded, *variant);
1544 }
1545 }
1546
1547 #[test]
1557 fn bridge_rejection_cause_snake_case_round_trip_all_variants() {
1558 let cases: &[(BridgeRejectionCause, &str)] = &[
1559 (BridgeRejectionCause::NotBound, "not_bound"),
1560 (BridgeRejectionCause::StaleSupervisor, "stale_supervisor"),
1561 (BridgeRejectionCause::SenderMismatch, "sender_mismatch"),
1562 (BridgeRejectionCause::AlreadyBound, "already_bound"),
1563 (
1564 BridgeRejectionCause::InvalidBootstrapToken,
1565 "invalid_bootstrap_token",
1566 ),
1567 (
1568 BridgeRejectionCause::UnsupportedProtocolVersion,
1569 "unsupported_protocol_version",
1570 ),
1571 (
1572 BridgeRejectionCause::InvalidSupervisorSpec,
1573 "invalid_supervisor_spec",
1574 ),
1575 (BridgeRejectionCause::InvalidPeerSpec, "invalid_peer_spec"),
1576 (BridgeRejectionCause::AddressMismatch, "address_mismatch"),
1577 (BridgeRejectionCause::Unsupported, "unsupported"),
1578 (BridgeRejectionCause::Internal, "internal"),
1579 ];
1580 for (cause, expected) in cases {
1581 let value = serde_json::to_value(cause).expect("serialize cause");
1582 assert_eq!(
1583 value,
1584 json!(expected),
1585 "cause {cause:?} must serialize as {expected:?}"
1586 );
1587 let decoded: BridgeRejectionCause =
1588 serde_json::from_value(value).expect("decode cause");
1589 assert_eq!(decoded, *cause);
1590 }
1591 }
1592
1593 fn assert_reply_round_trip(reply: BridgeReply, expected: serde_json::Value) {
1602 let value = serde_json::to_value(&reply).expect("serialize reply");
1603 assert_eq!(value, expected, "reply wire shape must be stable");
1604 let _decoded: BridgeReply = serde_json::from_value(value).expect("decode reply");
1605 }
1606
1607 #[test]
1608 fn bridge_reply_unknown_field_fails_closed() {
1609 let err = serde_json::from_value::<BridgeReply>(json!({
1610 "result": "ack",
1611 "ok": true,
1612 "extra_behavior": true,
1613 }))
1614 .expect_err("unknown reply fields must fail at serde boundary");
1615 let message = err.to_string();
1616 assert!(
1617 message.contains("extra_behavior") || message.contains("unknown field"),
1618 "expected unknown field error, got: {message}"
1619 );
1620 }
1621
1622 #[test]
1623 fn bridge_reply_bind_member_ack_round_trip() {
1624 assert_reply_round_trip(
1625 BridgeReply::BindMember(BridgeBindResponse {
1626 peer_id: "peer-x".to_string(),
1627 address: "inproc://peer-x".to_string(),
1628 capabilities: BridgeCapabilities::default(),
1629 }),
1630 json!({
1631 "result": "bind_member",
1632 "peer_id": "peer-x",
1633 "address": "inproc://peer-x",
1634 "capabilities": {
1635 "current_protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1636 "default_protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1637 "supported_protocol_versions": [SUPERVISOR_BRIDGE_PROTOCOL_VERSION],
1638 "deliver_member_input": false,
1639 "observe_member": false,
1640 "interrupt_member": false,
1641 "hard_cancel_member": false,
1642 "retire_member": false,
1643 "destroy_member": false,
1644 "wire_member": false,
1645 "unwire_member": false,
1646 },
1647 }),
1648 );
1649 }
1650
1651 #[test]
1652 fn bridge_reply_ack_round_trip() {
1653 assert_reply_round_trip(
1654 BridgeReply::Ack(BridgeAck { ok: true }),
1655 json!({ "result": "ack", "ok": true }),
1656 );
1657 }
1658
1659 #[test]
1660 fn bridge_reply_observation_round_trip() {
1661 assert_reply_round_trip(
1662 BridgeReply::Observation(BridgeObservationResponse {
1663 state: BridgeMemberRuntimeState::Running,
1664 accepting_inputs: None,
1665 current_run_id: None,
1666 peer_connectivity: None,
1667 last_error: None,
1668 observed_at: "2026-04-17T00:00:00Z".to_string(),
1669 }),
1670 json!({
1671 "result": "observation",
1672 "state": "running",
1673 "observed_at": "2026-04-17T00:00:00Z",
1674 }),
1675 );
1676 }
1677
1678 #[test]
1679 fn bridge_reply_delivery_round_trip() {
1680 assert_reply_round_trip(
1681 BridgeReply::Delivery(BridgeDeliveryResponse {
1682 input_id: "in-1".to_string(),
1683 canonical_input_id: None,
1684 outcome: BridgeDeliveryOutcome::Accepted,
1685 }),
1686 json!({
1687 "result": "delivery",
1688 "input_id": "in-1",
1689 "canonical_input_id": null,
1690 "outcome": { "outcome": "accepted" },
1691 }),
1692 );
1693
1694 assert_reply_round_trip(
1695 BridgeReply::Delivery(BridgeDeliveryResponse {
1696 input_id: "in-2".to_string(),
1697 canonical_input_id: None,
1698 outcome: BridgeDeliveryOutcome::Rejected {
1699 cause: BridgeDeliveryRejectionCause::DurabilityViolation {
1700 detail: "derived durable input cannot be accepted".to_string(),
1701 },
1702 reason: "derived durable input cannot be accepted".to_string(),
1703 },
1704 }),
1705 json!({
1706 "result": "delivery",
1707 "input_id": "in-2",
1708 "canonical_input_id": null,
1709 "outcome": {
1710 "outcome": "rejected",
1711 "cause": {
1712 "kind": "durability_violation",
1713 "detail": "derived durable input cannot be accepted",
1714 },
1715 "reason": "derived durable input cannot be accepted",
1716 },
1717 }),
1718 );
1719 }
1720
1721 #[test]
1722 fn bridge_delivery_not_ready_carries_typed_member_state() {
1723 let outcome = BridgeDeliveryOutcome::Rejected {
1724 cause: BridgeDeliveryRejectionCause::NotReady {
1725 state: BridgeMemberRuntimeState::Stopped,
1726 },
1727 reason: "runtime not accepting input while in state: stopped".to_string(),
1728 };
1729 let value = serde_json::to_value(&outcome).expect("serialize outcome");
1730 assert_eq!(
1731 value,
1732 json!({
1733 "outcome": "rejected",
1734 "cause": {
1735 "kind": "not_ready",
1736 "state": "stopped",
1737 },
1738 "reason": "runtime not accepting input while in state: stopped",
1739 })
1740 );
1741
1742 let decoded: BridgeDeliveryOutcome =
1743 serde_json::from_value(value).expect("legacy wire state string remains compatible");
1744 assert_eq!(decoded, outcome);
1745 }
1746
1747 #[test]
1748 fn bridge_reply_retire_round_trip() {
1749 assert_reply_round_trip(
1750 BridgeReply::Retire(BridgeRetireResponse {
1751 inputs_abandoned: 2,
1752 inputs_pending_drain: 0,
1753 }),
1754 json!({
1755 "result": "retire",
1756 "inputs_abandoned": 2,
1757 "inputs_pending_drain": 0,
1758 }),
1759 );
1760 }
1761
1762 #[test]
1763 fn bridge_reply_destroy_round_trip() {
1764 assert_reply_round_trip(
1765 BridgeReply::Destroy(BridgeDestroyResponse {
1766 inputs_abandoned: 3,
1767 }),
1768 json!({
1769 "result": "destroy",
1770 "inputs_abandoned": 3,
1771 }),
1772 );
1773 }
1774
1775 #[test]
1784 fn bridge_bootstrap_token_debug_redacts_nonempty_body() {
1785 let token = BridgeBootstrapToken::new("super-secret-bootstrap");
1786 let rendered = format!("{token:?}");
1787 assert_eq!(
1788 rendered,
1789 format!(
1790 "BridgeBootstrapToken(<redacted, {}B>)",
1791 "super-secret-bootstrap".len()
1792 )
1793 );
1794 assert!(
1795 !rendered.contains("super-secret-bootstrap"),
1796 "Debug output must not contain the raw token body"
1797 );
1798 }
1799
1800 #[test]
1801 fn bridge_bootstrap_token_debug_marks_empty_token() {
1802 let token = BridgeBootstrapToken::new("");
1803 assert_eq!(format!("{token:?}"), "BridgeBootstrapToken(empty)");
1804 }
1805
1806 #[test]
1814 fn bridge_bootstrap_token_serde_is_transparent_over_string() {
1815 let token = BridgeBootstrapToken::new("tok-abc");
1816 let value = serde_json::to_value(&token).expect("serialize token");
1817 assert_eq!(value, json!("tok-abc"));
1818 let decoded: BridgeBootstrapToken =
1819 serde_json::from_value(json!("tok-abc")).expect("decode token");
1820 assert_eq!(decoded, token);
1821 let s = serde_json::to_string(&token).expect("serialize string");
1823 assert_eq!(s, "\"tok-abc\"");
1824 }
1825
1826 #[test]
1835 fn bridge_bind_payload_wire_compat_with_plain_string_bootstrap_token() {
1836 let raw = json!({
1837 "supervisor": {
1838 "name": "mob/__mob_supervisor__",
1839 "peer_id": "00000000-0000-0000-0000-00000000bbbb",
1840 "address": "inproc://mob/__mob_supervisor__",
1841 },
1842 "epoch": 7,
1843 "protocol_version": SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
1844 "expected_peer_id": "00000000-0000-0000-0000-00000000aaaa",
1845 "expected_address": "inproc://member",
1846 "bootstrap_token": "tok-raw-string",
1847 });
1848 let payload: BridgeBindPayload =
1849 serde_json::from_value(raw.clone()).expect("decode pre-newtype payload");
1850 assert_eq!(payload.bootstrap_token.as_str(), "tok-raw-string");
1851 assert_eq!(payload.supervisor.pubkey, [0u8; 32]);
1852 let reencoded = serde_json::to_value(&payload).expect("reserialize payload");
1853 let mut expected = raw;
1854 expected["supervisor"]["pubkey"] = json!(vec![0u8; 32]);
1855 assert_eq!(
1856 reencoded, expected,
1857 "pre-pubkey payloads must decode and reserialize with the defaulted pubkey"
1858 );
1859 }
1860}