1use crate::event::{AgentEvent, EventEnvelope};
8use crate::interaction::{InteractionId, ResponseStatus};
9use crate::types::{ContentBlock, HandlingMode};
10use futures::Stream;
11use serde::{Deserialize, Serialize};
12use std::collections::BTreeMap;
13use std::pin::Pin;
14use uuid::Uuid;
15
16pub const SUPERVISOR_BRIDGE_INTENT: &str = "supervisor.bridge";
23
24#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
37pub struct PeerId(#[cfg_attr(feature = "schema", schemars(with = "String"))] pub Uuid);
38
39const PEER_ID_ED25519_PUBKEY_NAMESPACE: Uuid =
46 Uuid::from_u128(0x6d65_6572_6b61_7450_6565_7249_6430_0001);
47
48impl PeerId {
49 pub fn new() -> Self {
51 Self(crate::time_compat::new_uuid_v7())
52 }
53
54 pub const fn from_uuid(uuid: Uuid) -> Self {
56 Self(uuid)
57 }
58
59 pub fn parse(s: &str) -> Result<Self, PeerIdError> {
61 Uuid::parse_str(s)
62 .map(Self)
63 .map_err(|source| PeerIdError::Invalid {
64 input: s.to_string(),
65 source,
66 })
67 }
68
69 pub fn from_ed25519_pubkey(pubkey: &[u8; 32]) -> Self {
71 Self(Uuid::new_v5(&PEER_ID_ED25519_PUBKEY_NAMESPACE, pubkey))
72 }
73
74 pub fn as_str(&self) -> String {
76 self.0.to_string()
77 }
78
79 pub const fn as_uuid(&self) -> &Uuid {
81 &self.0
82 }
83}
84
85impl Default for PeerId {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl std::fmt::Display for PeerId {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 self.0.fmt(f)
94 }
95}
96
97#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
99pub enum PeerIdError {
100 #[error("invalid peer id {input:?}: {source}")]
101 Invalid {
102 input: String,
103 #[source]
104 source: uuid::Error,
105 },
106}
107
108#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116#[non_exhaustive]
117pub enum PeerTransport {
118 Inproc,
120 Uds,
122 Tcp,
124}
125
126impl PeerTransport {
127 pub const fn as_scheme(&self) -> &'static str {
129 match self {
130 Self::Inproc => "inproc",
131 Self::Uds => "uds",
132 Self::Tcp => "tcp",
133 }
134 }
135}
136
137impl std::fmt::Display for PeerTransport {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.write_str(self.as_scheme())
140 }
141}
142
143#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
149#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
150pub struct PeerAddress {
151 pub transport: PeerTransport,
152 pub endpoint: String,
153}
154
155#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
157pub enum PeerAddressParseError {
158 #[error("peer address missing transport scheme: {input}")]
159 MissingTransportScheme { input: String },
160 #[error("unknown peer address transport {scheme:?} in address {input:?}")]
161 UnknownTransport { input: String, scheme: String },
162}
163
164impl PeerAddress {
165 pub fn new(transport: PeerTransport, endpoint: impl Into<String>) -> Self {
166 Self {
167 transport,
168 endpoint: endpoint.into(),
169 }
170 }
171
172 pub const fn transport(&self) -> PeerTransport {
173 self.transport
174 }
175
176 pub fn endpoint(&self) -> &str {
177 &self.endpoint
178 }
179
180 pub fn parse(raw: impl AsRef<str>) -> Result<Self, PeerAddressParseError> {
186 let raw = raw.as_ref();
187 let (scheme, endpoint) =
188 raw.split_once("://")
189 .ok_or_else(|| PeerAddressParseError::MissingTransportScheme {
190 input: raw.to_string(),
191 })?;
192 let transport = match scheme {
193 "inproc" => PeerTransport::Inproc,
194 "uds" => PeerTransport::Uds,
195 "tcp" => PeerTransport::Tcp,
196 other => {
197 return Err(PeerAddressParseError::UnknownTransport {
198 input: raw.to_string(),
199 scheme: other.to_string(),
200 });
201 }
202 };
203 Ok(Self::new(transport, endpoint))
204 }
205}
206
207impl std::fmt::Display for PeerAddress {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 write!(f, "{}://{}", self.transport.as_scheme(), self.endpoint)
210 }
211}
212
213impl std::str::FromStr for PeerAddress {
214 type Err = PeerAddressParseError;
215
216 fn from_str(s: &str) -> Result<Self, Self::Err> {
217 Self::parse(s)
218 }
219}
220
221impl TryFrom<&str> for PeerAddress {
222 type Error = PeerAddressParseError;
223
224 fn try_from(value: &str) -> Result<Self, Self::Error> {
225 Self::parse(value)
226 }
227}
228
229impl TryFrom<String> for PeerAddress {
230 type Error = PeerAddressParseError;
231
232 fn try_from(value: String) -> Result<Self, Self::Error> {
233 Self::parse(value)
234 }
235}
236
237#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
244#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
245pub struct PeerName(String);
246
247impl PeerName {
248 pub fn new(name: impl Into<String>) -> Result<Self, String> {
250 let name = name.into();
251 if name.trim().is_empty() {
252 return Err("peer name cannot be empty".to_string());
253 }
254 if name.chars().any(char::is_control) {
255 return Err("peer name cannot contain control characters".to_string());
256 }
257 Ok(Self(name))
258 }
259
260 pub fn as_str(&self) -> &str {
261 &self.0
262 }
263
264 pub fn as_string(&self) -> String {
265 self.0.clone()
266 }
267}
268
269impl AsRef<str> for PeerName {
270 fn as_ref(&self) -> &str {
271 self.as_str()
272 }
273}
274
275impl std::fmt::Display for PeerName {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 self.0.fmt(f)
278 }
279}
280
281impl From<PeerName> for String {
282 fn from(peer_name: PeerName) -> Self {
283 peer_name.0
284 }
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct PeerRoute {
294 pub peer_id: PeerId,
295 pub display_name: Option<PeerName>,
296}
297
298impl PeerRoute {
299 pub fn new(peer_id: PeerId) -> Self {
300 Self {
301 peer_id,
302 display_name: None,
303 }
304 }
305
306 pub fn with_display_name(peer_id: PeerId, display_name: PeerName) -> Self {
307 Self {
308 peer_id,
309 display_name: Some(display_name),
310 }
311 }
312
313 pub fn label(&self) -> String {
314 self.display_name
315 .as_ref()
316 .map(PeerName::as_string)
317 .unwrap_or_else(|| self.peer_id.to_string())
318 }
319}
320
321#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
332#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
333pub struct TrustedPeerDescriptor {
334 pub peer_id: PeerId,
336 pub name: PeerName,
339 pub address: PeerAddress,
342 pub pubkey: [u8; 32],
346}
347
348impl TrustedPeerDescriptor {
349 pub fn pubkey_is_zero(pubkey: &[u8; 32]) -> bool {
350 *pubkey == [0u8; 32]
351 }
352
353 pub fn has_zero_pubkey(&self) -> bool {
354 Self::pubkey_is_zero(&self.pubkey)
355 }
356
357 pub fn validate_pubkey_for_peer_id(peer_id: PeerId, pubkey: &[u8; 32]) -> Result<(), String> {
358 if Self::pubkey_is_zero(pubkey) {
359 return Err("TrustedPeerDescriptor.pubkey must be non-zero".to_string());
360 }
361 let derived = PeerId::from_ed25519_pubkey(pubkey);
362 if derived != peer_id {
363 return Err(format!(
364 "peer_id {peer_id} does not match pubkey-derived id {derived}"
365 ));
366 }
367 Ok(())
368 }
369
370 pub fn test_only_unsigned(
383 name: impl Into<String>,
384 peer_id: impl AsRef<str>,
385 address: impl AsRef<str>,
386 ) -> Result<Self, String> {
387 let name = PeerName::new(name).map_err(|e| format!("invalid peer name: {e}"))?;
388 let peer_id =
389 PeerId::parse(peer_id.as_ref()).map_err(|e| format!("invalid peer_id: {e}"))?;
390 let address = PeerAddress::parse(address.as_ref()).map_err(|e| e.to_string())?;
391 Ok(Self {
392 peer_id,
393 name,
394 address,
395 pubkey: [0u8; 32],
396 })
397 }
398
399 pub fn test_only_unsigned_typed(
419 name: impl Into<String>,
420 peer_id: PeerId,
421 address: impl AsRef<str>,
422 ) -> Result<Self, String> {
423 let name = PeerName::new(name).map_err(|e| format!("invalid peer name: {e}"))?;
424 let address = PeerAddress::parse(address.as_ref()).map_err(|e| e.to_string())?;
425 Ok(Self {
426 peer_id,
427 name,
428 address,
429 pubkey: [0u8; 32],
430 })
431 }
432
433 pub fn with_pubkey(mut self, pubkey: [u8; 32]) -> Self {
439 self.pubkey = pubkey;
440 self
441 }
442
443 pub fn unsigned_with_pubkey(
460 name: impl Into<String>,
461 peer_id: impl AsRef<str>,
462 pubkey: [u8; 32],
463 address: impl AsRef<str>,
464 ) -> Result<Self, String> {
465 let mut descriptor = Self::test_only_unsigned(name, peer_id, address)?;
466 Self::validate_pubkey_for_peer_id(descriptor.peer_id, &pubkey)?;
467 descriptor.pubkey = pubkey;
468 Ok(descriptor)
469 }
470}
471
472#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
478#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
479pub enum PeerLifecycleKind {
480 #[serde(rename = "mob.peer_added")]
481 PeerAdded,
482 #[serde(rename = "mob.peer_retired")]
483 PeerRetired,
484 #[serde(rename = "mob.peer_unwired")]
485 PeerUnwired,
486}
487
488impl PeerLifecycleKind {
489 pub const fn as_str(self) -> &'static str {
490 match self {
491 Self::PeerAdded => "mob.peer_added",
492 Self::PeerRetired => "mob.peer_retired",
493 Self::PeerUnwired => "mob.peer_unwired",
494 }
495 }
496}
497
498impl std::fmt::Display for PeerLifecycleKind {
499 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500 f.write_str(self.as_str())
501 }
502}
503
504#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
517#[serde(tag = "kind", rename_all = "snake_case", deny_unknown_fields)]
518pub enum CommsCommandRequest {
519 Input {
521 body: String,
522 #[serde(default, skip_serializing_if = "Option::is_none")]
523 blocks: Option<Vec<ContentBlock>>,
524 #[serde(default, skip_serializing_if = "Option::is_none")]
525 source: Option<InputSource>,
526 #[serde(default, skip_serializing_if = "Option::is_none")]
527 stream: Option<InputStreamMode>,
528 #[serde(default, skip_serializing_if = "Option::is_none")]
529 handling_mode: Option<HandlingMode>,
530 #[serde(default, skip_serializing_if = "Option::is_none")]
531 allow_self_session: Option<bool>,
532 },
533 PeerMessage {
535 to: PeerId,
536 body: String,
537 #[serde(default, skip_serializing_if = "Option::is_none")]
538 blocks: Option<Vec<ContentBlock>>,
539 #[serde(default, skip_serializing_if = "Option::is_none")]
540 handling_mode: Option<HandlingMode>,
541 },
542 PeerLifecycle {
544 to: PeerId,
545 lifecycle_kind: PeerLifecycleKind,
546 #[serde(default)]
547 params: serde_json::Value,
548 },
549 PeerRequest {
551 to: PeerId,
552 intent: String,
553 #[serde(default)]
554 params: serde_json::Value,
555 #[serde(default, skip_serializing_if = "Option::is_none")]
556 blocks: Option<Vec<ContentBlock>>,
557 #[serde(default, skip_serializing_if = "Option::is_none")]
558 handling_mode: Option<HandlingMode>,
559 #[serde(default, skip_serializing_if = "Option::is_none")]
560 stream: Option<InputStreamMode>,
561 },
562 PeerResponse {
564 to: PeerId,
565 in_reply_to: InteractionId,
566 status: ResponseStatus,
567 #[serde(default)]
568 result: serde_json::Value,
569 #[serde(default, skip_serializing_if = "Option::is_none")]
570 blocks: Option<Vec<ContentBlock>>,
571 #[serde(default, skip_serializing_if = "Option::is_none")]
572 handling_mode: Option<HandlingMode>,
573 },
574}
575
576#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
581pub enum CommsCommandError {
582 #[error("handling_mode is forbidden on accepted peer responses")]
586 HandlingModeForbiddenForAcceptedResponse,
587}
588
589impl CommsCommandRequest {
590 pub fn into_command(
595 self,
596 session_id: &crate::types::SessionId,
597 ) -> Result<CommsCommand, CommsCommandError> {
598 Ok(match self {
599 CommsCommandRequest::Input {
600 body,
601 blocks,
602 source,
603 stream,
604 handling_mode,
605 allow_self_session,
606 } => CommsCommand::Input {
607 session_id: session_id.clone(),
608 body,
609 blocks,
610 handling_mode: handling_mode.unwrap_or_default(),
611 source: source.unwrap_or(InputSource::Rpc),
612 stream: stream.unwrap_or(InputStreamMode::None),
613 allow_self_session: allow_self_session.unwrap_or(false),
614 },
615 CommsCommandRequest::PeerMessage {
616 to,
617 body,
618 blocks,
619 handling_mode,
620 } => CommsCommand::PeerMessage {
621 to: PeerRoute::new(to),
622 body,
623 blocks,
624 handling_mode: handling_mode.unwrap_or_default(),
625 },
626 CommsCommandRequest::PeerLifecycle {
627 to,
628 lifecycle_kind,
629 params,
630 } => CommsCommand::PeerLifecycle {
631 to: PeerRoute::new(to),
632 kind: lifecycle_kind,
633 params,
634 },
635 CommsCommandRequest::PeerRequest {
636 to,
637 intent,
638 params,
639 blocks,
640 handling_mode,
641 stream,
642 } => CommsCommand::PeerRequest {
643 to: PeerRoute::new(to),
644 intent,
645 params,
646 blocks,
647 handling_mode: handling_mode.unwrap_or_default(),
648 stream: stream.unwrap_or(InputStreamMode::None),
649 },
650 CommsCommandRequest::PeerResponse {
651 to,
652 in_reply_to,
653 status,
654 result,
655 blocks,
656 handling_mode,
657 } => {
658 if status == ResponseStatus::Accepted && handling_mode.is_some() {
659 return Err(CommsCommandError::HandlingModeForbiddenForAcceptedResponse);
660 }
661 CommsCommand::PeerResponse {
662 to: PeerRoute::new(to),
663 in_reply_to,
664 status,
665 result,
666 blocks,
667 handling_mode,
668 }
669 }
670 })
671 }
672
673 pub fn kind(&self) -> &'static str {
675 match self {
676 Self::Input { .. } => "input",
677 Self::PeerMessage { .. } => "peer_message",
678 Self::PeerLifecycle { .. } => "peer_lifecycle",
679 Self::PeerRequest { .. } => "peer_request",
680 Self::PeerResponse { .. } => "peer_response",
681 }
682 }
683}
684#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
686#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
687#[serde(rename_all = "lowercase")]
688pub enum InputSource {
689 Tcp,
690 Uds,
691 Stdin,
692 Webhook,
693 Rpc,
694}
695
696impl From<crate::config::PlainEventSource> for InputSource {
697 fn from(source: crate::config::PlainEventSource) -> Self {
698 match source {
699 crate::config::PlainEventSource::Tcp => Self::Tcp,
700 crate::config::PlainEventSource::Uds => Self::Uds,
701 crate::config::PlainEventSource::Stdin => Self::Stdin,
702 crate::config::PlainEventSource::Webhook => Self::Webhook,
703 crate::config::PlainEventSource::Rpc => Self::Rpc,
704 }
705 }
706}
707
708impl From<InputSource> for crate::config::PlainEventSource {
709 fn from(source: InputSource) -> Self {
710 match source {
711 InputSource::Tcp => Self::Tcp,
712 InputSource::Uds => Self::Uds,
713 InputSource::Stdin => Self::Stdin,
714 InputSource::Webhook => Self::Webhook,
715 InputSource::Rpc => Self::Rpc,
716 }
717 }
718}
719
720#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
722#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
723#[serde(rename_all = "snake_case")]
724pub enum InputStreamMode {
725 None,
727 ReserveInteraction,
729}
730
731#[derive(Debug, Clone, PartialEq, Eq)]
733pub enum CommsCommand {
734 Input {
736 session_id: crate::types::SessionId,
737 body: String,
738 blocks: Option<Vec<ContentBlock>>,
739 handling_mode: HandlingMode,
740 source: InputSource,
741 stream: InputStreamMode,
742 allow_self_session: bool,
743 },
744 PeerMessage {
746 to: PeerRoute,
747 body: String,
748 blocks: Option<Vec<ContentBlock>>,
749 handling_mode: HandlingMode,
750 },
751 PeerLifecycle {
753 to: PeerRoute,
754 kind: PeerLifecycleKind,
755 params: serde_json::Value,
756 },
757 PeerRequest {
759 to: PeerRoute,
760 intent: String,
761 params: serde_json::Value,
762 blocks: Option<Vec<ContentBlock>>,
763 handling_mode: HandlingMode,
764 stream: InputStreamMode,
765 },
766 PeerResponse {
768 to: PeerRoute,
769 in_reply_to: InteractionId,
770 status: ResponseStatus,
771 result: serde_json::Value,
772 blocks: Option<Vec<ContentBlock>>,
773 handling_mode: Option<HandlingMode>,
774 },
775}
776
777impl CommsCommand {
778 pub fn command_kind(&self) -> &'static str {
779 match self {
780 Self::Input { .. } => "input",
781 Self::PeerMessage { .. } => "peer_message",
782 Self::PeerLifecycle { .. } => "peer_lifecycle",
783 Self::PeerRequest { .. } => "peer_request",
784 Self::PeerResponse { .. } => "peer_response",
785 }
786 }
787}
788
789#[derive(Debug, Clone, PartialEq, Eq)]
791pub enum SendReceipt {
792 InputAccepted {
793 interaction_id: InteractionId,
794 stream_reserved: bool,
795 },
796 PeerMessageSent {
797 envelope_id: uuid::Uuid,
798 acked: bool,
799 },
800 PeerLifecycleSent {
801 envelope_id: uuid::Uuid,
802 },
803 PeerRequestSent {
804 envelope_id: uuid::Uuid,
805 interaction_id: InteractionId,
806 stream_reserved: bool,
807 },
808 PeerResponseSent {
809 envelope_id: uuid::Uuid,
810 in_reply_to: InteractionId,
811 },
812}
813
814#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
815#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
816#[serde(rename_all = "snake_case")]
817pub enum PeerDirectorySource {
818 Trusted,
819 Inproc,
820 TrustedAndInproc,
821 Unknown,
822}
823
824impl PeerDirectorySource {
825 pub const fn as_str(&self) -> &'static str {
826 match self {
827 Self::Trusted => "trusted",
828 Self::Inproc => "inproc",
829 Self::TrustedAndInproc => "trusted_and_inproc",
830 Self::Unknown => "unknown",
831 }
832 }
833}
834
835impl std::fmt::Display for PeerDirectorySource {
836 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
837 f.write_str(self.as_str())
838 }
839}
840
841#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
842#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
843#[serde(rename_all = "snake_case")]
844pub enum PeerSendability {
845 PeerMessage,
846 PeerRequest,
847 PeerResponse,
848}
849
850impl PeerSendability {
851 pub const DIRECTORY_DEFAULTS: [Self; 3] =
852 [Self::PeerMessage, Self::PeerRequest, Self::PeerResponse];
853
854 pub fn directory_defaults() -> Vec<Self> {
855 Self::DIRECTORY_DEFAULTS.to_vec()
856 }
857
858 pub const fn as_str(&self) -> &'static str {
859 match self {
860 Self::PeerMessage => "peer_message",
861 Self::PeerRequest => "peer_request",
862 Self::PeerResponse => "peer_response",
863 }
864 }
865}
866
867impl std::fmt::Display for PeerSendability {
868 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
869 f.write_str(self.as_str())
870 }
871}
872
873#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
879#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
880pub struct PeerCapabilitySet {
881 #[serde(default = "PeerCapabilitySet::default_version")]
882 pub version: u16,
883 #[serde(default)]
884 pub extensions: BTreeMap<String, serde_json::Value>,
885}
886
887impl PeerCapabilitySet {
888 pub const CURRENT_VERSION: u16 = 1;
889
890 const fn default_version() -> u16 {
891 Self::CURRENT_VERSION
892 }
893
894 pub fn with_extension(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
895 self.extensions.insert(key.into(), value);
896 self
897 }
898}
899
900impl Default for PeerCapabilitySet {
901 fn default() -> Self {
902 Self {
903 version: Self::CURRENT_VERSION,
904 extensions: BTreeMap::new(),
905 }
906 }
907}
908
909#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
910#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
911#[serde(rename_all = "snake_case")]
912pub enum PeerReachability {
913 Unknown,
914 Reachable,
915 Unreachable,
916}
917
918#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
919#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
920#[serde(rename_all = "snake_case")]
921#[non_exhaustive]
922pub enum PeerReachabilityReason {
923 OfflineOrNoAck,
924 TransportError,
925 AdmissionDropped,
929}
930
931#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
932#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
933pub struct PeerDirectoryEntry {
934 pub peer_id: PeerId,
936 pub name: PeerName,
939 pub address: PeerAddress,
943 pub source: PeerDirectorySource,
944 pub sendable_kinds: Vec<PeerSendability>,
945 pub capabilities: PeerCapabilitySet,
946 pub reachability: PeerReachability,
947 pub last_unreachable_reason: Option<PeerReachabilityReason>,
948 pub meta: crate::PeerMeta,
950}
951
952#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
953#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
954pub struct PeerDirectoryListing {
955 pub peers: Vec<PeerDirectoryEntry>,
956}
957
958impl PeerDirectoryListing {
959 pub fn new(peers: Vec<PeerDirectoryEntry>) -> Self {
960 Self { peers }
961 }
962}
963
964impl From<Vec<PeerDirectoryEntry>> for PeerDirectoryListing {
965 fn from(peers: Vec<PeerDirectoryEntry>) -> Self {
966 Self::new(peers)
967 }
968}
969
970#[derive(Debug, Clone, PartialEq, Eq, Hash)]
972pub enum StreamScope {
973 Session(crate::types::SessionId),
974 Interaction(InteractionId),
975}
976
977pub type EventStream = Pin<Box<dyn Stream<Item = EventEnvelope<AgentEvent>> + Send>>;
979
980#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
982pub enum StreamError {
983 #[error("interaction not reserved: {0}")]
984 NotReserved(InteractionId),
985 #[error("stream not found: {0}")]
986 NotFound(String),
987 #[error("already attached: {0}")]
988 AlreadyAttached(InteractionId),
989 #[error("stream closed")]
990 Closed,
991 #[error("permission denied: {0}")]
992 PermissionDenied(String),
993 #[error("timeout: {0}")]
994 Timeout(String),
995 #[error("internal: {0}")]
996 Internal(String),
997}
998
999#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
1007#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1008#[serde(rename_all = "snake_case")]
1009#[non_exhaustive]
1010pub enum AdmissionDropReason {
1011 UntrustedSender,
1014 ClassificationRejected,
1016 SessionClosed,
1018 InboxFull,
1020}
1021
1022impl AdmissionDropReason {
1023 pub fn as_code(&self) -> &'static str {
1026 match self {
1027 AdmissionDropReason::UntrustedSender => "untrusted_sender",
1028 AdmissionDropReason::ClassificationRejected => "classification_rejected",
1029 AdmissionDropReason::SessionClosed => "session_closed",
1030 AdmissionDropReason::InboxFull => "inbox_full",
1031 }
1032 }
1033}
1034
1035#[derive(Debug, Clone, thiserror::Error)]
1036#[non_exhaustive]
1037pub enum SendError {
1038 #[error("peer not found: {0}")]
1039 PeerNotFound(String),
1040 #[error("peer offline")]
1041 PeerOffline,
1042 #[error("peer not sendable")]
1043 PeerNotSendable(String),
1044 #[error("input stream closed")]
1045 InputClosed,
1046 #[error("unsupported command: {0}")]
1047 Unsupported(String),
1048 #[error("validation failed: {0}")]
1049 Validation(String),
1050 #[error("internal: {0}")]
1051 Internal(String),
1052 #[error("peer dropped at admission: {reason:?}")]
1057 AdmissionDropped { reason: AdmissionDropReason },
1058}
1059
1060#[derive(Debug, Clone, thiserror::Error)]
1061pub enum SendAndStreamError {
1062 #[error("send failed: {0}")]
1063 Send(#[from] SendError),
1064 #[error("stream attach failed: receipt={receipt:?}, error={error}")]
1065 StreamAttach {
1066 receipt: SendReceipt,
1067 error: StreamError,
1068 },
1069}
1070
1071#[cfg(test)]
1072#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1073mod tests {
1074 use super::*;
1075
1076 #[test]
1077 fn peer_name_validation() {
1078 assert!(PeerName::new("alice").is_ok());
1079 assert!(PeerName::new("".to_string()).is_err());
1080 assert!(PeerName::new("bad\x00name").is_err());
1081 }
1082
1083 #[test]
1084 fn peer_directory_entry_fields() -> Result<(), String> {
1085 let entry = PeerDirectoryEntry {
1086 peer_id: PeerId::new(),
1087 name: PeerName::new("agent")?,
1088 address: PeerAddress::new(PeerTransport::Inproc, "agent"),
1089 source: PeerDirectorySource::Inproc,
1090 sendable_kinds: vec![PeerSendability::PeerMessage],
1091 capabilities: PeerCapabilitySet::default(),
1092 reachability: PeerReachability::Unknown,
1093 last_unreachable_reason: None,
1094 meta: crate::PeerMeta::default(),
1095 };
1096 assert_eq!(entry.name.as_str(), "agent");
1097 assert_eq!(entry.address.transport(), PeerTransport::Inproc);
1098 assert_eq!(entry.address.endpoint(), "agent");
1099 assert_eq!(entry.source, PeerDirectorySource::Inproc);
1100 Ok(())
1101 }
1102
1103 #[test]
1104 fn peer_directory_listing_serializes_typed_source_sendability_and_capabilities()
1105 -> Result<(), String> {
1106 let entry = PeerDirectoryEntry {
1107 peer_id: PeerId::new(),
1108 name: PeerName::new("agent")?,
1109 address: PeerAddress::new(PeerTransport::Inproc, "agent"),
1110 source: PeerDirectorySource::Inproc,
1111 sendable_kinds: vec![PeerSendability::PeerMessage, PeerSendability::PeerRequest],
1112 capabilities: PeerCapabilitySet::default()
1113 .with_extension("vendor.echo", serde_json::json!({ "enabled": true })),
1114 reachability: PeerReachability::Reachable,
1115 last_unreachable_reason: None,
1116 meta: crate::PeerMeta::default(),
1117 };
1118
1119 let value = serde_json::to_value(PeerDirectoryListing::new(vec![entry]))
1120 .map_err(|err| err.to_string())?;
1121 let peer = &value["peers"][0];
1122
1123 assert_eq!(peer["source"], "inproc");
1124 assert_eq!(
1125 peer["sendable_kinds"],
1126 serde_json::json!(["peer_message", "peer_request"])
1127 );
1128 assert_eq!(peer["capabilities"]["version"], 1);
1129 assert_eq!(
1130 peer["capabilities"]["extensions"]["vendor.echo"]["enabled"],
1131 true
1132 );
1133 Ok(())
1134 }
1135
1136 #[test]
1137 fn peer_id_parse_round_trip() {
1138 let id = PeerId::new();
1139 let parsed = PeerId::parse(&id.as_str()).expect("parse");
1140 assert_eq!(id, parsed);
1141 }
1142
1143 #[test]
1144 fn peer_id_parse_rejects_garbage() {
1145 let err = PeerId::parse("not-a-uuid").expect_err("parse must reject");
1146 match err {
1147 PeerIdError::Invalid { input, .. } => assert_eq!(input, "not-a-uuid"),
1148 }
1149 }
1150
1151 #[test]
1152 fn peer_address_display() {
1153 let addr = PeerAddress::new(PeerTransport::Tcp, "127.0.0.1:4200");
1154 assert_eq!(addr.to_string(), "tcp://127.0.0.1:4200");
1155 }
1156
1157 #[test]
1158 fn peer_address_parse_round_trips_supported_schemes() {
1159 let cases = [
1160 ("inproc://agent-a", PeerTransport::Inproc, "agent-a"),
1161 (
1162 "uds:///tmp/meerkat.sock",
1163 PeerTransport::Uds,
1164 "/tmp/meerkat.sock",
1165 ),
1166 ("tcp://127.0.0.1:4200", PeerTransport::Tcp, "127.0.0.1:4200"),
1167 ];
1168
1169 for (raw, transport, endpoint) in cases {
1170 let parsed = PeerAddress::parse(raw).expect("supported address parses");
1171 assert_eq!(parsed.transport(), transport);
1172 assert_eq!(parsed.endpoint(), endpoint);
1173 assert_eq!(parsed.to_string(), raw);
1174 }
1175 }
1176
1177 #[test]
1178 fn peer_address_parse_rejects_unknown_scheme() {
1179 let err = PeerAddress::parse("http://127.0.0.1:4200")
1180 .expect_err("unknown transport schemes must fail closed");
1181 assert!(
1182 err.to_string().contains("unknown peer address transport"),
1183 "unexpected error: {err}",
1184 );
1185 }
1186
1187 #[test]
1188 fn peer_address_parse_rejects_schemeless_input() {
1189 let err = PeerAddress::parse("127.0.0.1:4200")
1190 .expect_err("strict parser requires an address scheme");
1191 assert!(
1192 err.to_string().contains("missing transport scheme"),
1193 "unexpected error: {err}",
1194 );
1195 }
1196
1197 #[test]
1198 fn input_stream_mode_roundtrip() -> Result<(), serde_json::Error> {
1199 let mode = InputStreamMode::ReserveInteraction;
1200 let serialized = serde_json::to_value(mode)?;
1201 assert_eq!(serialized.as_str(), Some("reserve_interaction"));
1202 assert_eq!(serde_json::from_value::<InputStreamMode>(serialized)?, mode);
1203 Ok(())
1204 }
1205
1206 #[test]
1207 fn deserialize_input_with_typed_source() -> Result<(), serde_json::Error> {
1208 let json = r#"{"kind":"input","body":"hello","source":"webhook","handling_mode":"steer"}"#;
1209 let req: CommsCommandRequest = serde_json::from_str(json)?;
1210 match req {
1211 CommsCommandRequest::Input {
1212 body,
1213 source,
1214 handling_mode,
1215 ..
1216 } => {
1217 assert_eq!(body, "hello");
1218 assert_eq!(source, Some(InputSource::Webhook));
1219 assert_eq!(handling_mode, Some(HandlingMode::Steer));
1220 }
1221 other => panic!("expected input command request, got {other:?}"),
1222 }
1223 Ok(())
1224 }
1225
1226 #[test]
1227 fn deserialize_input_invalid_source_rejects_at_serde_boundary() {
1228 let json = r#"{"kind":"input","body":"hello","source":"webhookd"}"#;
1229 let err = serde_json::from_str::<CommsCommandRequest>(json)
1230 .expect_err("invalid source must fail deserialization");
1231 let msg = err.to_string();
1232 assert!(
1234 msg.contains("webhookd"),
1235 "error should name the rejected value, got: {msg}"
1236 );
1237 }
1238
1239 #[test]
1240 fn deserialize_unknown_kind_rejects_at_serde_boundary() {
1241 let json = r#"{"kind":"foobar","body":"hello"}"#;
1242 let err = serde_json::from_str::<CommsCommandRequest>(json)
1243 .expect_err("unknown kind must fail deserialization");
1244 let msg = err.to_string();
1245 assert!(
1246 msg.contains("foobar") || msg.contains("variant"),
1247 "error should mention unknown variant, got: {msg}"
1248 );
1249 }
1250}