Skip to main content

meerkat_core/
comms.rs

1//! Canonical communication API types for Meerkat.
2//!
3//! This module defines the public contract for comms command, response, and stream
4//! controls. It intentionally stays transport-agnostic and keeps names stable for
5//! the host and SDK surface migration work.
6
7use crate::event::{AgentEvent, EventEnvelope};
8use crate::interaction::{InteractionId, ResponseStatus};
9use crate::types::{ContentBlock, HandlingMode};
10use futures::Stream;
11use serde::{Deserialize, Serialize};
12use std::collections::BTreeMap;
13use std::pin::Pin;
14use uuid::Uuid;
15
16/// Comms request intent used for all supervisor bridge commands.
17///
18/// This is auth-exempt at peer ingress so a supervisor can complete the
19/// bootstrap handshake before the private trust edge exists. Keep the literal
20/// under core ingress authority; transport crates should compare through typed
21/// core policy rather than owning a local string exemption.
22pub const SUPERVISOR_BRIDGE_INTENT: &str = "supervisor.bridge";
23
24/// Canonical runtime identity for a peer.
25///
26/// `PeerId` is the routing key: the router and trust store key by `PeerId`,
27/// never by `PeerName`. Two peers may legitimately share a display `PeerName`
28/// (per the Wave-B V5 dogma note), but their `PeerId`s never collide — the
29/// underlying UUID is globally unique.
30///
31/// Constructed freshly (`PeerId::new`) for a peer minted locally, parsed
32/// from a hyphenated UUID (`PeerId::parse`) when we've been given an identity
33/// over the wire, or derived from a 32-byte Ed25519 public key when a transport
34/// still authenticates by raw signing key.
35#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
37pub struct PeerId(#[cfg_attr(feature = "schema", schemars(with = "String"))] pub Uuid);
38
39/// UUIDv5 namespace for deriving [`PeerId`] from an Ed25519 signing pubkey.
40///
41/// `PeerId` is the canonical runtime routing key: both the router and the
42/// trust store index peers by `PeerId`, never by display name. The derivation
43/// is a content hash of the 32-byte public key so a given key always resolves
44/// to the same `PeerId` across runtimes.
45const PEER_ID_ED25519_PUBKEY_NAMESPACE: Uuid =
46    Uuid::from_u128(0x6d65_6572_6b61_7450_6565_7249_6430_0001);
47
48impl PeerId {
49    /// Mint a new `PeerId` with a fresh UUID v7 (time-ordered).
50    pub fn new() -> Self {
51        Self(crate::time_compat::new_uuid_v7())
52    }
53
54    /// Wrap an existing UUID.
55    pub const fn from_uuid(uuid: Uuid) -> Self {
56        Self(uuid)
57    }
58
59    /// Parse a hyphenated UUID string into a `PeerId`.
60    pub fn parse(s: &str) -> Result<Self, PeerIdError> {
61        Uuid::parse_str(s)
62            .map(Self)
63            .map_err(|source| PeerIdError::Invalid {
64                input: s.to_string(),
65                source,
66            })
67    }
68
69    /// Derive the canonical routing id for a 32-byte Ed25519 public key.
70    pub fn from_ed25519_pubkey(pubkey: &[u8; 32]) -> Self {
71        Self(Uuid::new_v5(&PEER_ID_ED25519_PUBKEY_NAMESPACE, pubkey))
72    }
73
74    /// Hyphenated UUID string form.
75    pub fn as_str(&self) -> String {
76        self.0.to_string()
77    }
78
79    /// Borrow the underlying UUID.
80    pub const fn as_uuid(&self) -> &Uuid {
81        &self.0
82    }
83}
84
85impl Default for PeerId {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl std::fmt::Display for PeerId {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        self.0.fmt(f)
94    }
95}
96
97/// Error parsing a [`PeerId`] from a string.
98#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
99pub enum PeerIdError {
100    #[error("invalid peer id {input:?}: {source}")]
101    Invalid {
102        input: String,
103        #[source]
104        source: uuid::Error,
105    },
106}
107
108/// Typed transport atom for a peer address.
109///
110/// Replaces the old free-form `address: String` on `PeerDirectoryEntry` so
111/// callers cannot accidentally invent new transports by string concatenation
112/// at a call site.
113#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116#[non_exhaustive]
117pub enum PeerTransport {
118    /// In-process routing within this runtime (no network hop).
119    Inproc,
120    /// Unix domain socket.
121    Uds,
122    /// TCP endpoint.
123    Tcp,
124}
125
126impl PeerTransport {
127    /// Stable short code used as the URI scheme half of a peer address.
128    pub const fn as_scheme(&self) -> &'static str {
129        match self {
130            Self::Inproc => "inproc",
131            Self::Uds => "uds",
132            Self::Tcp => "tcp",
133        }
134    }
135}
136
137impl std::fmt::Display for PeerTransport {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.write_str(self.as_scheme())
140    }
141}
142
143/// Typed peer address: transport atom plus endpoint string.
144///
145/// The `endpoint` is transport-specific (path for `Uds`, `host:port` for
146/// `Tcp`, agent name for `Inproc`) but is carried as a validated `String`
147/// so the transport atom can be branched on without re-parsing.
148#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
149#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
150pub struct PeerAddress {
151    pub transport: PeerTransport,
152    pub endpoint: String,
153}
154
155/// Error parsing a typed [`PeerAddress`] from its URI-shaped string form.
156#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
157pub enum PeerAddressParseError {
158    #[error("peer address missing transport scheme: {input}")]
159    MissingTransportScheme { input: String },
160    #[error("unknown peer address transport {scheme:?} in address {input:?}")]
161    UnknownTransport { input: String, scheme: String },
162}
163
164impl PeerAddress {
165    pub fn new(transport: PeerTransport, endpoint: impl Into<String>) -> Self {
166        Self {
167            transport,
168            endpoint: endpoint.into(),
169        }
170    }
171
172    pub const fn transport(&self) -> PeerTransport {
173        self.transport
174    }
175
176    pub fn endpoint(&self) -> &str {
177        &self.endpoint
178    }
179
180    /// Strictly parse `scheme://endpoint` peer addresses.
181    ///
182    /// Only the currently supported transport schemes are accepted. Unknown
183    /// schemes and schemeless input fail closed so callers cannot silently
184    /// reinterpret address truth as TCP.
185    pub fn parse(raw: impl AsRef<str>) -> Result<Self, PeerAddressParseError> {
186        let raw = raw.as_ref();
187        let (scheme, endpoint) =
188            raw.split_once("://")
189                .ok_or_else(|| PeerAddressParseError::MissingTransportScheme {
190                    input: raw.to_string(),
191                })?;
192        let transport = match scheme {
193            "inproc" => PeerTransport::Inproc,
194            "uds" => PeerTransport::Uds,
195            "tcp" => PeerTransport::Tcp,
196            other => {
197                return Err(PeerAddressParseError::UnknownTransport {
198                    input: raw.to_string(),
199                    scheme: other.to_string(),
200                });
201            }
202        };
203        Ok(Self::new(transport, endpoint))
204    }
205}
206
207impl std::fmt::Display for PeerAddress {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        write!(f, "{}://{}", self.transport.as_scheme(), self.endpoint)
210    }
211}
212
213impl std::str::FromStr for PeerAddress {
214    type Err = PeerAddressParseError;
215
216    fn from_str(s: &str) -> Result<Self, Self::Err> {
217        Self::parse(s)
218    }
219}
220
221impl TryFrom<&str> for PeerAddress {
222    type Error = PeerAddressParseError;
223
224    fn try_from(value: &str) -> Result<Self, Self::Error> {
225        Self::parse(value)
226    }
227}
228
229impl TryFrom<String> for PeerAddress {
230    type Error = PeerAddressParseError;
231
232    fn try_from(value: String) -> Result<Self, Self::Error> {
233        Self::parse(value)
234    }
235}
236
237/// Display-only slug for a peer.
238///
239/// `PeerName` is **not** a routing key after Wave-B V5: the router resolves
240/// sends by [`PeerId`], and trust stores are keyed by [`PeerId`]. `PeerName`
241/// is retained so human-facing surfaces (CLI, REST `comms.peers`, logs) can
242/// render a recognisable handle next to the opaque id.
243#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
244#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
245pub struct PeerName(String);
246
247impl PeerName {
248    /// Create a new peer name if it passes basic validation.
249    pub fn new(name: impl Into<String>) -> Result<Self, String> {
250        let name = name.into();
251        if name.trim().is_empty() {
252            return Err("peer name cannot be empty".to_string());
253        }
254        if name.chars().any(char::is_control) {
255            return Err("peer name cannot contain control characters".to_string());
256        }
257        Ok(Self(name))
258    }
259
260    pub fn as_str(&self) -> &str {
261        &self.0
262    }
263
264    pub fn as_string(&self) -> String {
265        self.0.clone()
266    }
267}
268
269impl AsRef<str> for PeerName {
270    fn as_ref(&self) -> &str {
271        self.as_str()
272    }
273}
274
275impl std::fmt::Display for PeerName {
276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277        self.0.fmt(f)
278    }
279}
280
281impl From<PeerName> for String {
282    fn from(peer_name: PeerName) -> Self {
283        peer_name.0
284    }
285}
286
287/// Canonical outbound peer route.
288///
289/// `peer_id` is the only routing key. `display_name` is optional presentation
290/// metadata retained for diagnostics after a boundary resolves a name through
291/// trust or discovery.
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct PeerRoute {
294    pub peer_id: PeerId,
295    pub display_name: Option<PeerName>,
296}
297
298impl PeerRoute {
299    pub fn new(peer_id: PeerId) -> Self {
300        Self {
301            peer_id,
302            display_name: None,
303        }
304    }
305
306    pub fn with_display_name(peer_id: PeerId, display_name: PeerName) -> Self {
307        Self {
308            peer_id,
309            display_name: Some(display_name),
310        }
311    }
312
313    pub fn label(&self) -> String {
314        self.display_name
315            .as_ref()
316            .map(PeerName::as_string)
317            .unwrap_or_else(|| self.peer_id.to_string())
318    }
319}
320
321/// Routing-subset descriptor for a trusted peer — the identity fields that
322/// traverse the core seam.
323///
324/// Replaces the old stringly trusted-peer spec `{ name, peer_id, address }`
325/// with typed atoms: `PeerId` (runtime routing key), `PeerName` (display
326/// slug), `PeerAddress` (transport + endpoint), and a 32-byte signing
327/// public key that lets the receiver verify envelope signatures. Richer
328/// trust-store metadata (reachability snapshots, discovery labels) stays
329/// in `meerkat-comms::trust::TrustedPeer` — this descriptor is the
330/// minimal typed subset the core seam needs to route and admit a peer.
331#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
332#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
333pub struct TrustedPeerDescriptor {
334    /// Canonical runtime identity — the routing key. Never collides.
335    pub peer_id: PeerId,
336    /// Display-only slug for humans. Two peers may legitimately share a
337    /// name; their `peer_id` values still differ.
338    pub name: PeerName,
339    /// Typed transport atom + endpoint. Transport cannot be invented by
340    /// string concatenation at a call site.
341    pub address: PeerAddress,
342    /// Ed25519 signing public key (32 bytes). The receiver needs this to
343    /// verify envelope signatures; the router derives `PeerId` from it
344    /// via UUIDv5 so `peer_id` and `pubkey` are consistent.
345    pub pubkey: [u8; 32],
346}
347
348impl TrustedPeerDescriptor {
349    pub fn pubkey_is_zero(pubkey: &[u8; 32]) -> bool {
350        *pubkey == [0u8; 32]
351    }
352
353    pub fn has_zero_pubkey(&self) -> bool {
354        Self::pubkey_is_zero(&self.pubkey)
355    }
356
357    pub fn validate_pubkey_for_peer_id(peer_id: PeerId, pubkey: &[u8; 32]) -> Result<(), String> {
358        if Self::pubkey_is_zero(pubkey) {
359            return Err("TrustedPeerDescriptor.pubkey must be non-zero".to_string());
360        }
361        let derived = PeerId::from_ed25519_pubkey(pubkey);
362        if derived != peer_id {
363            return Err(format!(
364                "peer_id {peer_id} does not match pubkey-derived id {derived}"
365            ));
366        }
367        Ok(())
368    }
369
370    /// Build a descriptor with a **zero Ed25519 signing pubkey** from
371    /// typed identity atoms.
372    ///
373    /// The zero-pubkey default is **test-only** — envelope signature
374    /// verification trivially fails against it. In-process `inproc`
375    /// tests use this shape because the router identity map is what
376    /// authorizes the peer; production paths construct
377    /// `TrustedPeerDescriptor` via the struct literal with an explicit
378    /// pubkey (or use [`Self::with_pubkey`] to stamp one onto a
379    /// test-built descriptor). The loud name keeps the hazard surface
380    /// explicit — a production call site using this helper is always
381    /// wrong and will read wrong at review.
382    pub fn test_only_unsigned(
383        name: impl Into<String>,
384        peer_id: impl AsRef<str>,
385        address: impl AsRef<str>,
386    ) -> Result<Self, String> {
387        let name = PeerName::new(name).map_err(|e| format!("invalid peer name: {e}"))?;
388        let peer_id =
389            PeerId::parse(peer_id.as_ref()).map_err(|e| format!("invalid peer_id: {e}"))?;
390        let address = PeerAddress::parse(address.as_ref()).map_err(|e| e.to_string())?;
391        Ok(Self {
392            peer_id,
393            name,
394            address,
395            pubkey: [0u8; 32],
396        })
397    }
398
399    /// Typed sibling of [`Self::test_only_unsigned`]: build a descriptor
400    /// from an already-typed [`PeerId`] instead of a stringly-typed peer-id
401    /// argument.
402    ///
403    /// Post-#24 `PeerId` is a typed UUID; `PeerId::parse` only accepts
404    /// hyphenated UUID strings. The stringly-typed
405    /// [`Self::test_only_unsigned`] accepts anything `AsRef<str>` and
406    /// round-trips through `PeerId::parse`, which is the right contract
407    /// for call sites whose peer-id comes off the wire (comms-drain
408    /// supervisor reconcile, ops lifecycle) — they receive a UUID string
409    /// and the helper validates it.
410    ///
411    /// Test fixtures that mint a peer locally do NOT have a UUID string
412    /// to start from. They have a debug-friendly alias (`"remote-agent-b"`,
413    /// `"stale-peer"`) and want a random `PeerId`. The stringly form
414    /// forced them to either (a) stamp the alias in as an invalid UUID
415    /// (which rejects post-#24) or (b) reach outside the helper to mint
416    /// a UUID separately. This typed sibling accepts the typed `PeerId`
417    /// directly, skipping the parse round-trip.
418    pub fn test_only_unsigned_typed(
419        name: impl Into<String>,
420        peer_id: PeerId,
421        address: impl AsRef<str>,
422    ) -> Result<Self, String> {
423        let name = PeerName::new(name).map_err(|e| format!("invalid peer name: {e}"))?;
424        let address = PeerAddress::parse(address.as_ref()).map_err(|e| e.to_string())?;
425        Ok(Self {
426            peer_id,
427            name,
428            address,
429            pubkey: [0u8; 32],
430        })
431    }
432
433    /// Attach a non-zero Ed25519 signing pubkey. Test and production
434    /// paths that already have a derived `PeerId` + pubkey use the
435    /// field-literal constructor directly; this helper is for
436    /// retroactively stamping a pubkey onto a descriptor built via
437    /// [`Self::test_only_unsigned`].
438    pub fn with_pubkey(mut self, pubkey: [u8; 32]) -> Self {
439        self.pubkey = pubkey;
440        self
441    }
442
443    /// Build a descriptor with a caller-supplied Ed25519 signing pubkey
444    /// from typed identity atoms.
445    ///
446    /// This is the dogma-clean alternative to
447    /// [`Self::test_only_unsigned`] for live-comms paths where the
448    /// caller has a real pubkey (e.g. from
449    /// `CommsRuntime::public_key().as_bytes()`). The supervisor needs
450    /// a non-zero-pubkey trust entry for signed-envelope replies to
451    /// admit past `is_trusted(&envelope.from)` at ingress.
452    ///
453    /// Like [`Self::test_only_unsigned`], this accepts a stringly
454    /// `peer_id` that must parse as a UUID (post-#24 `PeerId::parse`
455    /// only accepts hyphenated UUID strings). The hashed consistency
456    /// check in [`crate::comms`] enforces that the supplied `peer_id`
457    /// matches `PubKey::from(pubkey).to_peer_id()` at descriptor →
458    /// trust conversion.
459    pub fn unsigned_with_pubkey(
460        name: impl Into<String>,
461        peer_id: impl AsRef<str>,
462        pubkey: [u8; 32],
463        address: impl AsRef<str>,
464    ) -> Result<Self, String> {
465        let mut descriptor = Self::test_only_unsigned(name, peer_id, address)?;
466        Self::validate_pubkey_for_peer_id(descriptor.peer_id, &pubkey)?;
467        descriptor.pubkey = pubkey;
468        Ok(descriptor)
469    }
470}
471
472/// One-way peer lifecycle notification kind.
473///
474/// These notifications are control-plane topology updates, not correlated
475/// peer work requests. They intentionally do not create request/response
476/// lifecycles and must never require an LLM-authored reply.
477#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
478#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
479pub enum PeerLifecycleKind {
480    #[serde(rename = "mob.peer_added")]
481    PeerAdded,
482    #[serde(rename = "mob.peer_retired")]
483    PeerRetired,
484    #[serde(rename = "mob.peer_unwired")]
485    PeerUnwired,
486}
487
488impl PeerLifecycleKind {
489    pub const fn as_str(self) -> &'static str {
490        match self {
491            Self::PeerAdded => "mob.peer_added",
492            Self::PeerRetired => "mob.peer_retired",
493            Self::PeerUnwired => "mob.peer_unwired",
494        }
495    }
496}
497
498impl std::fmt::Display for PeerLifecycleKind {
499    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500        f.write_str(self.as_str())
501    }
502}
503
504/// Typed wire request for `comms/send`.
505///
506/// Variants are serde-tagged on `kind` and validated structurally at the
507/// deserialization boundary. Required fields per kind are enforced by the
508/// type system; invalid discriminators (`source`, `stream`, `handling_mode`,
509/// `status`) become serde deserialization errors rather than runtime
510/// string-match failures.
511///
512/// Cross-field invariants that cannot be expressed structurally (e.g.
513/// `handling_mode` is forbidden on `Accepted` peer responses) are checked
514/// in [`CommsCommandRequest::into_command`].
515#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
517#[serde(tag = "kind", rename_all = "snake_case", deny_unknown_fields)]
518pub enum CommsCommandRequest {
519    /// Inject input into the local session.
520    Input {
521        body: String,
522        #[serde(default, skip_serializing_if = "Option::is_none")]
523        blocks: Option<Vec<ContentBlock>>,
524        #[serde(default, skip_serializing_if = "Option::is_none")]
525        source: Option<InputSource>,
526        #[serde(default, skip_serializing_if = "Option::is_none")]
527        stream: Option<InputStreamMode>,
528        #[serde(default, skip_serializing_if = "Option::is_none")]
529        handling_mode: Option<HandlingMode>,
530        #[serde(default, skip_serializing_if = "Option::is_none")]
531        allow_self_session: Option<bool>,
532    },
533    /// Send a one-way peer message.
534    PeerMessage {
535        to: PeerId,
536        body: String,
537        #[serde(default, skip_serializing_if = "Option::is_none")]
538        blocks: Option<Vec<ContentBlock>>,
539        #[serde(default, skip_serializing_if = "Option::is_none")]
540        handling_mode: Option<HandlingMode>,
541    },
542    /// Send a one-way peer lifecycle notification.
543    PeerLifecycle {
544        to: PeerId,
545        lifecycle_kind: PeerLifecycleKind,
546        #[serde(default)]
547        params: serde_json::Value,
548    },
549    /// Send a request to a peer.
550    PeerRequest {
551        to: PeerId,
552        intent: String,
553        #[serde(default)]
554        params: serde_json::Value,
555        #[serde(default, skip_serializing_if = "Option::is_none")]
556        blocks: Option<Vec<ContentBlock>>,
557        #[serde(default, skip_serializing_if = "Option::is_none")]
558        handling_mode: Option<HandlingMode>,
559        #[serde(default, skip_serializing_if = "Option::is_none")]
560        stream: Option<InputStreamMode>,
561    },
562    /// Send a response to a prior peer request.
563    PeerResponse {
564        to: PeerId,
565        in_reply_to: InteractionId,
566        status: ResponseStatus,
567        #[serde(default)]
568        result: serde_json::Value,
569        #[serde(default, skip_serializing_if = "Option::is_none")]
570        blocks: Option<Vec<ContentBlock>>,
571        #[serde(default, skip_serializing_if = "Option::is_none")]
572        handling_mode: Option<HandlingMode>,
573    },
574}
575
576/// Cross-field validation failure for [`CommsCommandRequest::into_command`].
577///
578/// Per-field discriminator validation is enforced by serde at deserialization
579/// — only invariants that span multiple fields surface here.
580#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
581pub enum CommsCommandError {
582    /// `handling_mode` is set on a `peer_response` whose `status` is
583    /// `Accepted`. Progress responses cannot carry a handling mode — the
584    /// receiver's admission gate would drop them, so reject at parse time.
585    #[error("handling_mode is forbidden on accepted peer responses")]
586    HandlingModeForbiddenForAcceptedResponse,
587}
588
589impl CommsCommandRequest {
590    /// Convert the typed wire request into a [`CommsCommand`] domain envelope.
591    ///
592    /// `session_id` is supplied separately because it is owned by the
593    /// surface that received the request, not the wire payload.
594    pub fn into_command(
595        self,
596        session_id: &crate::types::SessionId,
597    ) -> Result<CommsCommand, CommsCommandError> {
598        Ok(match self {
599            CommsCommandRequest::Input {
600                body,
601                blocks,
602                source,
603                stream,
604                handling_mode,
605                allow_self_session,
606            } => CommsCommand::Input {
607                session_id: session_id.clone(),
608                body,
609                blocks,
610                handling_mode: handling_mode.unwrap_or_default(),
611                source: source.unwrap_or(InputSource::Rpc),
612                stream: stream.unwrap_or(InputStreamMode::None),
613                allow_self_session: allow_self_session.unwrap_or(false),
614            },
615            CommsCommandRequest::PeerMessage {
616                to,
617                body,
618                blocks,
619                handling_mode,
620            } => CommsCommand::PeerMessage {
621                to: PeerRoute::new(to),
622                body,
623                blocks,
624                handling_mode: handling_mode.unwrap_or_default(),
625            },
626            CommsCommandRequest::PeerLifecycle {
627                to,
628                lifecycle_kind,
629                params,
630            } => CommsCommand::PeerLifecycle {
631                to: PeerRoute::new(to),
632                kind: lifecycle_kind,
633                params,
634            },
635            CommsCommandRequest::PeerRequest {
636                to,
637                intent,
638                params,
639                blocks,
640                handling_mode,
641                stream,
642            } => CommsCommand::PeerRequest {
643                to: PeerRoute::new(to),
644                intent,
645                params,
646                blocks,
647                handling_mode: handling_mode.unwrap_or_default(),
648                stream: stream.unwrap_or(InputStreamMode::None),
649            },
650            CommsCommandRequest::PeerResponse {
651                to,
652                in_reply_to,
653                status,
654                result,
655                blocks,
656                handling_mode,
657            } => {
658                if status == ResponseStatus::Accepted && handling_mode.is_some() {
659                    return Err(CommsCommandError::HandlingModeForbiddenForAcceptedResponse);
660                }
661                CommsCommand::PeerResponse {
662                    to: PeerRoute::new(to),
663                    in_reply_to,
664                    status,
665                    result,
666                    blocks,
667                    handling_mode,
668                }
669            }
670        })
671    }
672
673    /// Stable wire discriminant for telemetry / logging.
674    pub fn kind(&self) -> &'static str {
675        match self {
676            Self::Input { .. } => "input",
677            Self::PeerMessage { .. } => "peer_message",
678            Self::PeerLifecycle { .. } => "peer_lifecycle",
679            Self::PeerRequest { .. } => "peer_request",
680            Self::PeerResponse { .. } => "peer_response",
681        }
682    }
683}
684/// Source for an input event posted to an agent.
685#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
686#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
687#[serde(rename_all = "lowercase")]
688pub enum InputSource {
689    Tcp,
690    Uds,
691    Stdin,
692    Webhook,
693    Rpc,
694}
695
696impl From<crate::config::PlainEventSource> for InputSource {
697    fn from(source: crate::config::PlainEventSource) -> Self {
698        match source {
699            crate::config::PlainEventSource::Tcp => Self::Tcp,
700            crate::config::PlainEventSource::Uds => Self::Uds,
701            crate::config::PlainEventSource::Stdin => Self::Stdin,
702            crate::config::PlainEventSource::Webhook => Self::Webhook,
703            crate::config::PlainEventSource::Rpc => Self::Rpc,
704        }
705    }
706}
707
708impl From<InputSource> for crate::config::PlainEventSource {
709    fn from(source: InputSource) -> Self {
710        match source {
711            InputSource::Tcp => Self::Tcp,
712            InputSource::Uds => Self::Uds,
713            InputSource::Stdin => Self::Stdin,
714            InputSource::Webhook => Self::Webhook,
715            InputSource::Rpc => Self::Rpc,
716        }
717    }
718}
719
720/// Whether this input/peer command should reserve a local interaction stream.
721#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
722#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
723#[serde(rename_all = "snake_case")]
724pub enum InputStreamMode {
725    /// Do not reserve any stream.
726    None,
727    /// Reserve an interaction stream for the command.
728    ReserveInteraction,
729}
730
731/// Transport-independent comms command envelope.
732#[derive(Debug, Clone, PartialEq, Eq)]
733pub enum CommsCommand {
734    /// Inject input into the local session.
735    Input {
736        session_id: crate::types::SessionId,
737        body: String,
738        blocks: Option<Vec<ContentBlock>>,
739        handling_mode: HandlingMode,
740        source: InputSource,
741        stream: InputStreamMode,
742        allow_self_session: bool,
743    },
744    /// Send a one-way peer message.
745    PeerMessage {
746        to: PeerRoute,
747        body: String,
748        blocks: Option<Vec<ContentBlock>>,
749        handling_mode: HandlingMode,
750    },
751    /// Send a one-way peer lifecycle notification.
752    PeerLifecycle {
753        to: PeerRoute,
754        kind: PeerLifecycleKind,
755        params: serde_json::Value,
756    },
757    /// Send a request to a peer.
758    PeerRequest {
759        to: PeerRoute,
760        intent: String,
761        params: serde_json::Value,
762        blocks: Option<Vec<ContentBlock>>,
763        handling_mode: HandlingMode,
764        stream: InputStreamMode,
765    },
766    /// Send a response to a prior peer request.
767    PeerResponse {
768        to: PeerRoute,
769        in_reply_to: InteractionId,
770        status: ResponseStatus,
771        result: serde_json::Value,
772        blocks: Option<Vec<ContentBlock>>,
773        handling_mode: Option<HandlingMode>,
774    },
775}
776
777impl CommsCommand {
778    pub fn command_kind(&self) -> &'static str {
779        match self {
780            Self::Input { .. } => "input",
781            Self::PeerMessage { .. } => "peer_message",
782            Self::PeerLifecycle { .. } => "peer_lifecycle",
783            Self::PeerRequest { .. } => "peer_request",
784            Self::PeerResponse { .. } => "peer_response",
785        }
786    }
787}
788
789/// Receipt returned after accepting a comms command.
790#[derive(Debug, Clone, PartialEq, Eq)]
791pub enum SendReceipt {
792    InputAccepted {
793        interaction_id: InteractionId,
794        stream_reserved: bool,
795    },
796    PeerMessageSent {
797        envelope_id: uuid::Uuid,
798        acked: bool,
799    },
800    PeerLifecycleSent {
801        envelope_id: uuid::Uuid,
802    },
803    PeerRequestSent {
804        envelope_id: uuid::Uuid,
805        interaction_id: InteractionId,
806        stream_reserved: bool,
807    },
808    PeerResponseSent {
809        envelope_id: uuid::Uuid,
810        in_reply_to: InteractionId,
811    },
812}
813
814#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
815#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
816#[serde(rename_all = "snake_case")]
817pub enum PeerDirectorySource {
818    Trusted,
819    Inproc,
820    TrustedAndInproc,
821    Unknown,
822}
823
824impl PeerDirectorySource {
825    pub const fn as_str(&self) -> &'static str {
826        match self {
827            Self::Trusted => "trusted",
828            Self::Inproc => "inproc",
829            Self::TrustedAndInproc => "trusted_and_inproc",
830            Self::Unknown => "unknown",
831        }
832    }
833}
834
835impl std::fmt::Display for PeerDirectorySource {
836    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
837        f.write_str(self.as_str())
838    }
839}
840
841#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
842#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
843#[serde(rename_all = "snake_case")]
844pub enum PeerSendability {
845    PeerMessage,
846    PeerRequest,
847    PeerResponse,
848}
849
850impl PeerSendability {
851    pub const DIRECTORY_DEFAULTS: [Self; 3] =
852        [Self::PeerMessage, Self::PeerRequest, Self::PeerResponse];
853
854    pub fn directory_defaults() -> Vec<Self> {
855        Self::DIRECTORY_DEFAULTS.to_vec()
856    }
857
858    pub const fn as_str(&self) -> &'static str {
859        match self {
860            Self::PeerMessage => "peer_message",
861            Self::PeerRequest => "peer_request",
862            Self::PeerResponse => "peer_response",
863        }
864    }
865}
866
867impl std::fmt::Display for PeerSendability {
868    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
869        f.write_str(self.as_str())
870    }
871}
872
873/// Typed peer capability envelope for peer-directory output.
874///
875/// Extensions are intentionally opaque display/integration metadata. Core
876/// routing, admission, and policy decisions must use typed fields such as
877/// [`PeerDirectoryEntry::sendable_kinds`] instead of consulting this bag.
878#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
879#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
880pub struct PeerCapabilitySet {
881    #[serde(default = "PeerCapabilitySet::default_version")]
882    pub version: u16,
883    #[serde(default)]
884    pub extensions: BTreeMap<String, serde_json::Value>,
885}
886
887impl PeerCapabilitySet {
888    pub const CURRENT_VERSION: u16 = 1;
889
890    const fn default_version() -> u16 {
891        Self::CURRENT_VERSION
892    }
893
894    pub fn with_extension(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
895        self.extensions.insert(key.into(), value);
896        self
897    }
898}
899
900impl Default for PeerCapabilitySet {
901    fn default() -> Self {
902        Self {
903            version: Self::CURRENT_VERSION,
904            extensions: BTreeMap::new(),
905        }
906    }
907}
908
909#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
910#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
911#[serde(rename_all = "snake_case")]
912pub enum PeerReachability {
913    Unknown,
914    Reachable,
915    Unreachable,
916}
917
918#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
919#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
920#[serde(rename_all = "snake_case")]
921#[non_exhaustive]
922pub enum PeerReachabilityReason {
923    OfflineOrNoAck,
924    TransportError,
925    /// The peer admitted the transport but rejected our envelope at its
926    /// ingress policy gate (untrusted sender, full inbox, etc.). The peer
927    /// is still reachable at the transport level; policy denied us.
928    AdmissionDropped,
929}
930
931#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
932#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
933pub struct PeerDirectoryEntry {
934    /// Canonical runtime identity — the routing key.
935    pub peer_id: PeerId,
936    /// Display-only slug. Multiple entries may share a name; none share a
937    /// `peer_id`.
938    pub name: PeerName,
939    /// Typed transport atom + endpoint. Replaces the prior free-form
940    /// `address: String` so the transport cannot be invented by string
941    /// concatenation at a call site.
942    pub address: PeerAddress,
943    pub source: PeerDirectorySource,
944    pub sendable_kinds: Vec<PeerSendability>,
945    pub capabilities: PeerCapabilitySet,
946    pub reachability: PeerReachability,
947    pub last_unreachable_reason: Option<PeerReachabilityReason>,
948    /// Supplementary discovery metadata (description, labels).
949    pub meta: crate::PeerMeta,
950}
951
952#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
953#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
954pub struct PeerDirectoryListing {
955    pub peers: Vec<PeerDirectoryEntry>,
956}
957
958impl PeerDirectoryListing {
959    pub fn new(peers: Vec<PeerDirectoryEntry>) -> Self {
960        Self { peers }
961    }
962}
963
964impl From<Vec<PeerDirectoryEntry>> for PeerDirectoryListing {
965    fn from(peers: Vec<PeerDirectoryEntry>) -> Self {
966        Self::new(peers)
967    }
968}
969
970/// Scope for streaming event output.
971#[derive(Debug, Clone, PartialEq, Eq, Hash)]
972pub enum StreamScope {
973    Session(crate::types::SessionId),
974    Interaction(InteractionId),
975}
976
977/// Typed stream over enveloped agent events.
978pub type EventStream = Pin<Box<dyn Stream<Item = EventEnvelope<AgentEvent>> + Send>>;
979
980/// Errors for stream attachment and lookup.
981#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
982pub enum StreamError {
983    #[error("interaction not reserved: {0}")]
984    NotReserved(InteractionId),
985    #[error("stream not found: {0}")]
986    NotFound(String),
987    #[error("already attached: {0}")]
988    AlreadyAttached(InteractionId),
989    #[error("stream closed")]
990    Closed,
991    #[error("permission denied: {0}")]
992    PermissionDenied(String),
993    #[error("timeout: {0}")]
994    Timeout(String),
995    #[error("internal: {0}")]
996    Internal(String),
997}
998
999/// Typed reason a peer rejected our envelope at its ingress admission gate.
1000///
1001/// This mirrors `meerkat_comms::DropReason` across the core boundary so
1002/// `SendError::AdmissionDropped` can carry the typed cause all the way to
1003/// REST/RPC/MCP error payloads. Callers distinguish transport-level failure
1004/// (`PeerOffline`) from policy-level rejection (`AdmissionDropped { reason }`)
1005/// without collapsing both into "peer unreachable".
1006#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
1007#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1008#[serde(rename_all = "snake_case")]
1009#[non_exhaustive]
1010pub enum AdmissionDropReason {
1011    /// `require_peer_auth` is on, the sender is not in the trusted set, and
1012    /// the envelope is not auth-exempt (e.g. supervisor-bridge bootstrap).
1013    UntrustedSender,
1014    /// Classification rejected the item before the admission gate ran.
1015    ClassificationRejected,
1016    /// The receiver's classified inbox is closed (receiver dropped).
1017    SessionClosed,
1018    /// The receiver's classified inbox is at capacity.
1019    InboxFull,
1020}
1021
1022impl AdmissionDropReason {
1023    /// Stable wire code for this drop reason, suitable for REST/RPC/MCP
1024    /// error payloads. Callers-facing discriminant — must stay stable.
1025    pub fn as_code(&self) -> &'static str {
1026        match self {
1027            AdmissionDropReason::UntrustedSender => "untrusted_sender",
1028            AdmissionDropReason::ClassificationRejected => "classification_rejected",
1029            AdmissionDropReason::SessionClosed => "session_closed",
1030            AdmissionDropReason::InboxFull => "inbox_full",
1031        }
1032    }
1033}
1034
1035#[derive(Debug, Clone, thiserror::Error)]
1036#[non_exhaustive]
1037pub enum SendError {
1038    #[error("peer not found: {0}")]
1039    PeerNotFound(String),
1040    #[error("peer offline")]
1041    PeerOffline,
1042    #[error("peer not sendable")]
1043    PeerNotSendable(String),
1044    #[error("input stream closed")]
1045    InputClosed,
1046    #[error("unsupported command: {0}")]
1047    Unsupported(String),
1048    #[error("validation failed: {0}")]
1049    Validation(String),
1050    #[error("internal: {0}")]
1051    Internal(String),
1052    /// Receiver admitted the envelope-transport but rejected it at ingress
1053    /// for a typed policy reason (untrusted sender, full inbox, etc.). This
1054    /// is semantically distinct from `PeerOffline` — transport worked,
1055    /// policy refused.
1056    #[error("peer dropped at admission: {reason:?}")]
1057    AdmissionDropped { reason: AdmissionDropReason },
1058}
1059
1060#[derive(Debug, Clone, thiserror::Error)]
1061pub enum SendAndStreamError {
1062    #[error("send failed: {0}")]
1063    Send(#[from] SendError),
1064    #[error("stream attach failed: receipt={receipt:?}, error={error}")]
1065    StreamAttach {
1066        receipt: SendReceipt,
1067        error: StreamError,
1068    },
1069}
1070
1071#[cfg(test)]
1072#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1073mod tests {
1074    use super::*;
1075
1076    #[test]
1077    fn peer_name_validation() {
1078        assert!(PeerName::new("alice").is_ok());
1079        assert!(PeerName::new("".to_string()).is_err());
1080        assert!(PeerName::new("bad\x00name").is_err());
1081    }
1082
1083    #[test]
1084    fn peer_directory_entry_fields() -> Result<(), String> {
1085        let entry = PeerDirectoryEntry {
1086            peer_id: PeerId::new(),
1087            name: PeerName::new("agent")?,
1088            address: PeerAddress::new(PeerTransport::Inproc, "agent"),
1089            source: PeerDirectorySource::Inproc,
1090            sendable_kinds: vec![PeerSendability::PeerMessage],
1091            capabilities: PeerCapabilitySet::default(),
1092            reachability: PeerReachability::Unknown,
1093            last_unreachable_reason: None,
1094            meta: crate::PeerMeta::default(),
1095        };
1096        assert_eq!(entry.name.as_str(), "agent");
1097        assert_eq!(entry.address.transport(), PeerTransport::Inproc);
1098        assert_eq!(entry.address.endpoint(), "agent");
1099        assert_eq!(entry.source, PeerDirectorySource::Inproc);
1100        Ok(())
1101    }
1102
1103    #[test]
1104    fn peer_directory_listing_serializes_typed_source_sendability_and_capabilities()
1105    -> Result<(), String> {
1106        let entry = PeerDirectoryEntry {
1107            peer_id: PeerId::new(),
1108            name: PeerName::new("agent")?,
1109            address: PeerAddress::new(PeerTransport::Inproc, "agent"),
1110            source: PeerDirectorySource::Inproc,
1111            sendable_kinds: vec![PeerSendability::PeerMessage, PeerSendability::PeerRequest],
1112            capabilities: PeerCapabilitySet::default()
1113                .with_extension("vendor.echo", serde_json::json!({ "enabled": true })),
1114            reachability: PeerReachability::Reachable,
1115            last_unreachable_reason: None,
1116            meta: crate::PeerMeta::default(),
1117        };
1118
1119        let value = serde_json::to_value(PeerDirectoryListing::new(vec![entry]))
1120            .map_err(|err| err.to_string())?;
1121        let peer = &value["peers"][0];
1122
1123        assert_eq!(peer["source"], "inproc");
1124        assert_eq!(
1125            peer["sendable_kinds"],
1126            serde_json::json!(["peer_message", "peer_request"])
1127        );
1128        assert_eq!(peer["capabilities"]["version"], 1);
1129        assert_eq!(
1130            peer["capabilities"]["extensions"]["vendor.echo"]["enabled"],
1131            true
1132        );
1133        Ok(())
1134    }
1135
1136    #[test]
1137    fn peer_id_parse_round_trip() {
1138        let id = PeerId::new();
1139        let parsed = PeerId::parse(&id.as_str()).expect("parse");
1140        assert_eq!(id, parsed);
1141    }
1142
1143    #[test]
1144    fn peer_id_parse_rejects_garbage() {
1145        let err = PeerId::parse("not-a-uuid").expect_err("parse must reject");
1146        match err {
1147            PeerIdError::Invalid { input, .. } => assert_eq!(input, "not-a-uuid"),
1148        }
1149    }
1150
1151    #[test]
1152    fn peer_address_display() {
1153        let addr = PeerAddress::new(PeerTransport::Tcp, "127.0.0.1:4200");
1154        assert_eq!(addr.to_string(), "tcp://127.0.0.1:4200");
1155    }
1156
1157    #[test]
1158    fn peer_address_parse_round_trips_supported_schemes() {
1159        let cases = [
1160            ("inproc://agent-a", PeerTransport::Inproc, "agent-a"),
1161            (
1162                "uds:///tmp/meerkat.sock",
1163                PeerTransport::Uds,
1164                "/tmp/meerkat.sock",
1165            ),
1166            ("tcp://127.0.0.1:4200", PeerTransport::Tcp, "127.0.0.1:4200"),
1167        ];
1168
1169        for (raw, transport, endpoint) in cases {
1170            let parsed = PeerAddress::parse(raw).expect("supported address parses");
1171            assert_eq!(parsed.transport(), transport);
1172            assert_eq!(parsed.endpoint(), endpoint);
1173            assert_eq!(parsed.to_string(), raw);
1174        }
1175    }
1176
1177    #[test]
1178    fn peer_address_parse_rejects_unknown_scheme() {
1179        let err = PeerAddress::parse("http://127.0.0.1:4200")
1180            .expect_err("unknown transport schemes must fail closed");
1181        assert!(
1182            err.to_string().contains("unknown peer address transport"),
1183            "unexpected error: {err}",
1184        );
1185    }
1186
1187    #[test]
1188    fn peer_address_parse_rejects_schemeless_input() {
1189        let err = PeerAddress::parse("127.0.0.1:4200")
1190            .expect_err("strict parser requires an address scheme");
1191        assert!(
1192            err.to_string().contains("missing transport scheme"),
1193            "unexpected error: {err}",
1194        );
1195    }
1196
1197    #[test]
1198    fn input_stream_mode_roundtrip() -> Result<(), serde_json::Error> {
1199        let mode = InputStreamMode::ReserveInteraction;
1200        let serialized = serde_json::to_value(mode)?;
1201        assert_eq!(serialized.as_str(), Some("reserve_interaction"));
1202        assert_eq!(serde_json::from_value::<InputStreamMode>(serialized)?, mode);
1203        Ok(())
1204    }
1205
1206    #[test]
1207    fn deserialize_input_with_typed_source() -> Result<(), serde_json::Error> {
1208        let json = r#"{"kind":"input","body":"hello","source":"webhook","handling_mode":"steer"}"#;
1209        let req: CommsCommandRequest = serde_json::from_str(json)?;
1210        match req {
1211            CommsCommandRequest::Input {
1212                body,
1213                source,
1214                handling_mode,
1215                ..
1216            } => {
1217                assert_eq!(body, "hello");
1218                assert_eq!(source, Some(InputSource::Webhook));
1219                assert_eq!(handling_mode, Some(HandlingMode::Steer));
1220            }
1221            other => panic!("expected input command request, got {other:?}"),
1222        }
1223        Ok(())
1224    }
1225
1226    #[test]
1227    fn deserialize_input_invalid_source_rejects_at_serde_boundary() {
1228        let json = r#"{"kind":"input","body":"hello","source":"webhookd"}"#;
1229        let err = serde_json::from_str::<CommsCommandRequest>(json)
1230            .expect_err("invalid source must fail deserialization");
1231        let msg = err.to_string();
1232        // serde reports "unknown variant `webhookd`, expected one of ...".
1233        assert!(
1234            msg.contains("webhookd"),
1235            "error should name the rejected value, got: {msg}"
1236        );
1237    }
1238
1239    #[test]
1240    fn deserialize_unknown_kind_rejects_at_serde_boundary() {
1241        let json = r#"{"kind":"foobar","body":"hello"}"#;
1242        let err = serde_json::from_str::<CommsCommandRequest>(json)
1243            .expect_err("unknown kind must fail deserialization");
1244        let msg = err.to_string();
1245        assert!(
1246            msg.contains("foobar") || msg.contains("variant"),
1247            "error should mention unknown variant, got: {msg}"
1248        );
1249    }
1250}