1use crate::event::{AgentEvent, EventEnvelope};
8use crate::interaction::{InteractionId, ResponseStatus};
9use crate::types::{ContentBlock, HandlingMode};
10use futures::Stream;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::pin::Pin;
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct PeerName(String);
17
18impl PeerName {
19 pub fn new(name: impl Into<String>) -> Result<Self, String> {
21 let name = name.into();
22 if name.trim().is_empty() {
23 return Err("peer name cannot be empty".to_string());
24 }
25 if name.chars().any(char::is_control) {
26 return Err("peer name cannot contain control characters".to_string());
27 }
28 Ok(Self(name))
29 }
30
31 pub fn as_str(&self) -> &str {
32 &self.0
33 }
34
35 pub fn as_string(&self) -> String {
36 self.0.clone()
37 }
38}
39
40impl AsRef<str> for PeerName {
41 fn as_ref(&self) -> &str {
42 self.as_str()
43 }
44}
45
46impl std::fmt::Display for PeerName {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 self.0.fmt(f)
49 }
50}
51
52impl From<PeerName> for String {
53 fn from(peer_name: PeerName) -> Self {
54 peer_name.0
55 }
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct CommsCommandRequest {
61 pub kind: String,
62 pub to: Option<String>,
63 pub body: Option<String>,
64 pub blocks: Option<Vec<ContentBlock>>,
65 pub intent: Option<String>,
66 pub params: Option<serde_json::Value>,
67 pub in_reply_to: Option<String>,
68 pub status: Option<String>,
69 pub result: Option<serde_json::Value>,
70 pub source: Option<String>,
71 pub stream: Option<String>,
72 pub allow_self_session: Option<bool>,
73 pub handling_mode: Option<String>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct CommsCommandValidationError {
79 pub field: String,
80 pub issue: String,
81 pub got: Option<String>,
82}
83
84impl CommsCommandValidationError {
85 fn new(field: impl Into<String>, issue: impl Into<String>, got: Option<String>) -> Self {
86 Self {
87 field: field.into(),
88 issue: issue.into(),
89 got,
90 }
91 }
92
93 pub fn to_json_value(&self) -> serde_json::Value {
94 match &self.got {
95 Some(got) => json!({
96 "field": self.field,
97 "issue": self.issue,
98 "got": got,
99 }),
100 None => json!({
101 "field": self.field,
102 "issue": self.issue,
103 }),
104 }
105 }
106}
107
108impl CommsCommandRequest {
109 pub fn parse(
110 &self,
111 session_id: &crate::types::SessionId,
112 ) -> Result<CommsCommand, Vec<CommsCommandValidationError>> {
113 let mut errors = Vec::new();
114
115 let kind = self.kind.as_str();
116 match kind {
117 "input" => {
118 let Some(body) = self.body.clone() else {
119 errors.push(CommsCommandValidationError::new(
120 "body",
121 "required_field",
122 None,
123 ));
124 return Err(errors);
125 };
126
127 let source = match self.source.as_deref() {
128 Some("tcp") => InputSource::Tcp,
129 Some("uds") => InputSource::Uds,
130 Some("stdin") => InputSource::Stdin,
131 Some("webhook") => InputSource::Webhook,
132 Some("rpc") | None => InputSource::Rpc,
133 Some(other) => {
134 errors.push(CommsCommandValidationError::new(
135 "source",
136 "invalid_value",
137 Some(other.to_string()),
138 ));
139 return Err(errors);
140 }
141 };
142
143 Ok(CommsCommand::Input {
144 session_id: session_id.clone(),
145 body,
146 blocks: self.blocks.clone(),
147 handling_mode: match self.handling_mode.as_deref() {
148 Some("steer") => HandlingMode::Steer,
149 Some("queue") | None => HandlingMode::Queue,
150 Some(other) => {
151 errors.push(CommsCommandValidationError::new(
152 "handling_mode",
153 "invalid_value",
154 Some(other.to_string()),
155 ));
156 return Err(errors);
157 }
158 },
159 source,
160 allow_self_session: self.allow_self_session.unwrap_or(false),
161 })
162 }
163 "peer_message" => {
164 let to = to_peer_name(self.to.as_ref(), &mut errors);
165 let handling_mode = match self.handling_mode.as_deref() {
166 Some("steer") => Some(HandlingMode::Steer),
167 Some("queue") => Some(HandlingMode::Queue),
168 None => {
169 errors.push(CommsCommandValidationError::new(
170 "handling_mode",
171 "required_field",
172 None,
173 ));
174 None
175 }
176 Some(other) => {
177 errors.push(CommsCommandValidationError::new(
178 "handling_mode",
179 "invalid_value",
180 Some(other.to_string()),
181 ));
182 None
183 }
184 };
185 if let Some(to) = to {
186 let Some(handling_mode) = handling_mode else {
187 return Err(errors);
188 };
189 Ok(CommsCommand::PeerMessage {
190 to,
191 body: self.body.clone().unwrap_or_default(),
192 blocks: self.blocks.clone(),
193 handling_mode,
194 })
195 } else {
196 Err(errors)
197 }
198 }
199 "peer_request" => {
200 let to = to_peer_name(self.to.as_ref(), &mut errors);
201 let Some(intent) = self.intent.clone() else {
202 errors.push(CommsCommandValidationError::new(
203 "intent",
204 "required_field",
205 None,
206 ));
207 return Err(errors);
208 };
209 if let Some(stream) = &self.stream {
210 errors.push(CommsCommandValidationError::new(
211 "stream",
212 "removed_unsupported_field",
213 Some(stream.clone()),
214 ));
215 return Err(errors);
216 }
217 let handling_mode = match self.handling_mode.as_deref() {
218 Some("steer") => Some(HandlingMode::Steer),
219 Some("queue") => Some(HandlingMode::Queue),
220 None => {
221 errors.push(CommsCommandValidationError::new(
222 "handling_mode",
223 "required_field",
224 None,
225 ));
226 None
227 }
228 Some(other) => {
229 errors.push(CommsCommandValidationError::new(
230 "handling_mode",
231 "invalid_value",
232 Some(other.to_string()),
233 ));
234 None
235 }
236 };
237 if errors.is_empty() {
238 let Some(to) = to else {
239 return Err(errors);
240 };
241 let Some(handling_mode) = handling_mode else {
242 return Err(errors);
243 };
244 Ok(CommsCommand::PeerRequest {
245 to,
246 intent,
247 params: match (&self.params, &self.body) {
248 (Some(p), _) => p.clone(),
249 (None, Some(body)) => serde_json::json!({ "body": body }),
250 (None, None) => serde_json::Value::Object(Default::default()),
251 },
252 handling_mode,
253 })
254 } else {
255 Err(errors)
256 }
257 }
258 "peer_response" => {
259 let to = to_peer_name(self.to.as_ref(), &mut errors);
260 let in_reply_to = match &self.in_reply_to {
261 Some(in_reply_to) => match uuid::Uuid::parse_str(in_reply_to) {
262 Ok(id) => crate::interaction::InteractionId(id),
263 Err(_) => {
264 errors.push(CommsCommandValidationError::new(
265 "in_reply_to",
266 "invalid_uuid",
267 Some(in_reply_to.clone()),
268 ));
269 return Err(errors);
270 }
271 },
272 None => {
273 errors.push(CommsCommandValidationError::new(
274 "in_reply_to",
275 "required_field",
276 None,
277 ));
278 return Err(errors);
279 }
280 };
281 let status = match self.status.as_deref() {
282 Some("accepted") => crate::ResponseStatus::Accepted,
283 Some("completed") | None => crate::ResponseStatus::Completed,
284 Some("failed") => crate::ResponseStatus::Failed,
285 Some(other) => {
286 errors.push(CommsCommandValidationError::new(
287 "status",
288 "invalid_value",
289 Some(other.to_string()),
290 ));
291 return Err(errors);
292 }
293 };
294 let handling_mode = match self.handling_mode.as_deref() {
295 Some("steer") => Some(HandlingMode::Steer),
296 Some("queue") => Some(HandlingMode::Queue),
297 None => None,
298 Some(other) => {
299 errors.push(CommsCommandValidationError::new(
300 "handling_mode",
301 "invalid_value",
302 Some(other.to_string()),
303 ));
304 return Err(errors);
305 }
306 };
307 if status == crate::ResponseStatus::Accepted && handling_mode.is_some() {
312 errors.push(CommsCommandValidationError::new(
313 "handling_mode",
314 "forbidden_for_accepted_response",
315 self.handling_mode.clone(),
316 ));
317 return Err(errors);
318 }
319 if errors.is_empty() {
320 let Some(to) = to else {
321 return Err(errors);
322 };
323 Ok(CommsCommand::PeerResponse {
324 to,
325 in_reply_to,
326 status,
327 result: self.result.clone().unwrap_or(serde_json::Value::Null),
328 handling_mode,
329 })
330 } else {
331 Err(errors)
332 }
333 }
334 other => {
335 errors.push(CommsCommandValidationError::new(
336 "kind",
337 "unknown_kind",
338 Some(other.to_string()),
339 ));
340 Err(errors)
341 }
342 }
343 }
344
345 pub fn validation_errors_to_json(
346 errors: &[CommsCommandValidationError],
347 ) -> Vec<serde_json::Value> {
348 errors
349 .iter()
350 .map(CommsCommandValidationError::to_json_value)
351 .collect()
352 }
353}
354
355fn to_peer_name(
356 value: Option<&String>,
357 errors: &mut Vec<CommsCommandValidationError>,
358) -> Option<PeerName> {
359 match value {
360 Some(name) => match PeerName::new(name) {
361 Ok(peer) => Some(peer),
362 Err(_) => {
363 errors.push(CommsCommandValidationError::new(
364 "to",
365 "invalid_value",
366 Some(name.clone()),
367 ));
368 None
369 }
370 },
371 None => {
372 errors.push(CommsCommandValidationError::new(
373 "to",
374 "required_field",
375 None,
376 ));
377 None
378 }
379 }
380}
381#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
383#[serde(rename_all = "lowercase")]
384pub enum InputSource {
385 Tcp,
386 Uds,
387 Stdin,
388 Webhook,
389 Rpc,
390}
391
392impl From<crate::config::PlainEventSource> for InputSource {
393 fn from(source: crate::config::PlainEventSource) -> Self {
394 match source {
395 crate::config::PlainEventSource::Tcp => Self::Tcp,
396 crate::config::PlainEventSource::Uds => Self::Uds,
397 crate::config::PlainEventSource::Stdin => Self::Stdin,
398 crate::config::PlainEventSource::Webhook => Self::Webhook,
399 crate::config::PlainEventSource::Rpc => Self::Rpc,
400 }
401 }
402}
403
404impl From<InputSource> for crate::config::PlainEventSource {
405 fn from(source: InputSource) -> Self {
406 match source {
407 InputSource::Tcp => Self::Tcp,
408 InputSource::Uds => Self::Uds,
409 InputSource::Stdin => Self::Stdin,
410 InputSource::Webhook => Self::Webhook,
411 InputSource::Rpc => Self::Rpc,
412 }
413 }
414}
415
416#[derive(Debug, Clone, PartialEq, Eq)]
418pub enum CommsCommand {
419 Input {
421 session_id: crate::types::SessionId,
422 body: String,
423 blocks: Option<Vec<ContentBlock>>,
424 handling_mode: HandlingMode,
425 source: InputSource,
426 allow_self_session: bool,
427 },
428 PeerMessage {
430 to: PeerName,
431 body: String,
432 blocks: Option<Vec<ContentBlock>>,
433 handling_mode: HandlingMode,
434 },
435 PeerRequest {
437 to: PeerName,
438 intent: String,
439 params: serde_json::Value,
440 handling_mode: HandlingMode,
441 },
442 PeerResponse {
444 to: PeerName,
445 in_reply_to: InteractionId,
446 status: ResponseStatus,
447 result: serde_json::Value,
448 handling_mode: Option<HandlingMode>,
449 },
450}
451
452impl CommsCommand {
453 pub fn command_kind(&self) -> &'static str {
454 match self {
455 Self::Input { .. } => "input",
456 Self::PeerMessage { .. } => "peer_message",
457 Self::PeerRequest { .. } => "peer_request",
458 Self::PeerResponse { .. } => "peer_response",
459 }
460 }
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
465pub enum SendReceipt {
466 InputAccepted {
467 interaction_id: InteractionId,
468 },
469 PeerMessageSent {
470 envelope_id: uuid::Uuid,
471 acked: bool,
472 },
473 PeerRequestSent {
474 request_id: InteractionId,
475 envelope_id: uuid::Uuid,
476 },
477 PeerResponseSent {
478 envelope_id: uuid::Uuid,
479 in_reply_to: InteractionId,
480 },
481}
482
483#[derive(Debug, Clone, PartialEq, Eq)]
484pub enum PeerDirectorySource {
485 Trusted,
486 Inproc,
487 TrustedAndInproc,
488 Unknown,
489}
490
491#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
492#[serde(rename_all = "snake_case")]
493pub enum PeerReachability {
494 Unknown,
495 Reachable,
496 Unreachable,
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
500#[serde(rename_all = "snake_case")]
501pub enum PeerReachabilityReason {
502 OfflineOrNoAck,
503 TransportError,
504}
505
506#[derive(Debug, Clone, PartialEq, Eq)]
507pub struct PeerDirectoryEntry {
508 pub name: PeerName,
509 pub peer_id: String,
510 pub address: String,
511 pub source: PeerDirectorySource,
512 pub sendable_kinds: Vec<String>,
513 pub capabilities: serde_json::Value,
514 pub reachability: PeerReachability,
515 pub last_unreachable_reason: Option<PeerReachabilityReason>,
516 pub meta: crate::PeerMeta,
518}
519
520#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
522pub struct TrustedPeerSpec {
523 pub name: String,
524 pub peer_id: String,
525 pub address: String,
526}
527
528impl TrustedPeerSpec {
529 pub fn new(
530 name: impl Into<String>,
531 peer_id: impl Into<String>,
532 address: impl Into<String>,
533 ) -> Result<Self, String> {
534 let name = PeerName::new(name.into())?;
535 Ok(Self {
536 name: name.0,
537 peer_id: peer_id.into(),
538 address: address.into(),
539 })
540 }
541}
542
543#[derive(Debug, Clone, PartialEq, Eq, Hash)]
545pub enum StreamScope {
546 Session(crate::types::SessionId),
547}
548
549pub type EventStream = Pin<Box<dyn Stream<Item = EventEnvelope<AgentEvent>> + Send>>;
551
552#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
554pub enum StreamError {
555 #[error("stream not found: {0}")]
556 NotFound(String),
557 #[error("stream closed")]
558 Closed,
559 #[error("permission denied: {0}")]
560 PermissionDenied(String),
561 #[error("timeout: {0}")]
562 Timeout(String),
563 #[error("internal: {0}")]
564 Internal(String),
565}
566
567#[derive(Debug, Clone, thiserror::Error)]
568pub enum SendError {
569 #[error("peer not found: {0}")]
570 PeerNotFound(String),
571 #[error("peer offline")]
572 PeerOffline,
573 #[error("peer not sendable")]
574 PeerNotSendable(String),
575 #[error("input stream closed")]
576 InputClosed,
577 #[error("unsupported command: {0}")]
578 Unsupported(String),
579 #[error("validation failed: {0}")]
580 Validation(String),
581 #[error("internal: {0}")]
582 Internal(String),
583}
584
585#[cfg(test)]
586#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
587mod tests {
588 use super::*;
589 use serde_json::Value;
590
591 #[test]
592 fn peer_name_validation() {
593 assert!(PeerName::new("alice").is_ok());
594 assert!(PeerName::new("".to_string()).is_err());
595 assert!(PeerName::new("bad\x00name").is_err());
596 }
597
598 #[test]
599 fn peer_directory_entry_fields() -> Result<(), String> {
600 let entry = PeerDirectoryEntry {
601 name: PeerName::new("agent")?,
602 peer_id: "ed25519:abc".to_string(),
603 address: "inproc://agent".to_string(),
604 source: PeerDirectorySource::Inproc,
605 sendable_kinds: vec!["peer_message".to_string()],
606 capabilities: Value::Object(serde_json::Map::default()),
607 reachability: PeerReachability::Unknown,
608 last_unreachable_reason: None,
609 meta: crate::PeerMeta::default(),
610 };
611 assert_eq!(entry.name.as_str(), "agent");
612 assert_eq!(entry.source, PeerDirectorySource::Inproc);
613 Ok(())
614 }
615
616 #[test]
617 fn trusted_peer_spec_requires_valid_name() {
618 let invalid = TrustedPeerSpec::new("", "ed25519:abc", "inproc://a");
619 assert!(invalid.is_err());
620 }
621
622 #[test]
623 fn trusted_peer_spec_keeps_peer_id_and_address() -> Result<(), String> {
624 let spec = TrustedPeerSpec::new("alice", "ed25519:abc", "inproc://alice")?;
625 assert_eq!(spec.name, "alice");
626 assert_eq!(spec.peer_id, "ed25519:abc");
627 assert_eq!(spec.address, "inproc://alice");
628 Ok(())
629 }
630
631 fn peer_request_cmd(
634 params: Option<Value>,
635 body: Option<String>,
636 handling_mode: Option<&str>,
637 ) -> Result<CommsCommand, Vec<CommsCommandValidationError>> {
638 let req = CommsCommandRequest {
639 kind: "peer_request".to_string(),
640 to: Some("bob".to_string()),
641 body,
642 blocks: None,
643 intent: Some("greet".to_string()),
644 params,
645 in_reply_to: None,
646 status: None,
647 result: None,
648 source: None,
649 stream: None,
650 allow_self_session: None,
651 handling_mode: handling_mode.map(str::to_string),
652 };
653 req.parse(&crate::types::SessionId::new())
654 }
655
656 #[test]
657 fn peer_request_with_params_only() {
658 let cmd = peer_request_cmd(
659 Some(serde_json::json!({"key": "value"})),
660 None,
661 Some("queue"),
662 )
663 .unwrap();
664 match cmd {
665 CommsCommand::PeerRequest {
666 params,
667 handling_mode,
668 ..
669 } => {
670 assert_eq!(params["key"], "value");
671 assert_eq!(handling_mode, HandlingMode::Queue);
672 }
673 other => panic!("expected PeerRequest, got {other:?}"),
674 }
675 }
676
677 #[test]
678 fn peer_request_with_body_only_promotes_to_params() {
679 let cmd = peer_request_cmd(None, Some("hello world".to_string()), Some("queue")).unwrap();
680 match cmd {
681 CommsCommand::PeerRequest { params, .. } => {
682 assert_eq!(
683 params["body"], "hello world",
684 "body should be promoted into params.body when params is absent"
685 );
686 }
687 other => panic!("expected PeerRequest, got {other:?}"),
688 }
689 }
690
691 #[test]
692 fn peer_request_with_both_prefers_params() {
693 let cmd = peer_request_cmd(
694 Some(serde_json::json!({"explicit": true})),
695 Some("ignored body".to_string()),
696 Some("queue"),
697 )
698 .unwrap();
699 match cmd {
700 CommsCommand::PeerRequest { params, .. } => {
701 assert_eq!(params["explicit"], true);
702 assert!(
703 params.get("body").is_none(),
704 "body should not be promoted when params is present"
705 );
706 }
707 other => panic!("expected PeerRequest, got {other:?}"),
708 }
709 }
710
711 #[test]
712 fn peer_request_with_neither_gives_empty_object() {
713 let cmd = peer_request_cmd(None, None, Some("queue")).unwrap();
714 match cmd {
715 CommsCommand::PeerRequest { params, .. } => {
716 assert!(params.is_object(), "params should be an object");
717 assert!(
718 params.as_object().unwrap().is_empty(),
719 "params should be empty when both body and params are absent"
720 );
721 }
722 other => panic!("expected PeerRequest, got {other:?}"),
723 }
724 }
725
726 #[test]
727 fn peer_request_requires_handling_mode() {
728 let result = peer_request_cmd(None, None, None);
729 assert!(
730 result.is_err(),
731 "peer_request without handling_mode must be rejected"
732 );
733 let errors = result.unwrap_err();
734 assert!(
735 errors
736 .iter()
737 .any(|e| e.field == "handling_mode" && e.issue == "required_field"),
738 "expected required handling_mode error, got: {errors:?}"
739 );
740 }
741
742 #[test]
743 fn peer_message_requires_handling_mode() {
744 let req = CommsCommandRequest {
745 kind: "peer_message".to_string(),
746 to: Some("peer-1".to_string()),
747 body: Some("hello".to_string()),
748 ..Default::default()
749 };
750 let result = req.parse(&crate::SessionId::new());
751 assert!(
752 result.is_err(),
753 "peer_message without handling_mode must be rejected"
754 );
755 let errors = result.unwrap_err();
756 assert!(
757 errors
758 .iter()
759 .any(|e| e.field == "handling_mode" && e.issue == "required_field"),
760 "expected required handling_mode error, got: {errors:?}"
761 );
762 }
763
764 #[test]
765 fn peer_message_with_handling_mode_is_accepted() {
766 let req = CommsCommandRequest {
767 kind: "peer_message".to_string(),
768 to: Some("peer-1".to_string()),
769 body: Some("hello".to_string()),
770 handling_mode: Some("steer".to_string()),
771 ..Default::default()
772 };
773 let cmd = req
774 .parse(&crate::SessionId::new())
775 .expect("peer_message with handling_mode should parse");
776 match cmd {
777 CommsCommand::PeerMessage { handling_mode, .. } => {
778 assert_eq!(handling_mode, HandlingMode::Steer);
779 }
780 other => panic!("expected PeerMessage, got {other:?}"),
781 }
782 }
783
784 #[test]
785 fn peer_response_accepted_with_handling_mode_rejected() {
786 let req = CommsCommandRequest {
787 kind: "peer_response".to_string(),
788 to: Some("peer-1".to_string()),
789 in_reply_to: Some(uuid::Uuid::now_v7().to_string()),
790 status: Some("accepted".to_string()),
791 handling_mode: Some("steer".to_string()),
792 ..Default::default()
793 };
794 let result = req.parse(&crate::SessionId::new());
795 assert!(
796 result.is_err(),
797 "accepted response with handling_mode must be rejected"
798 );
799 let errors = result.unwrap_err();
800 assert!(
801 errors
802 .iter()
803 .any(|e| e.issue == "forbidden_for_accepted_response"),
804 "should have forbidden_for_accepted_response error, got: {errors:?}"
805 );
806 }
807
808 #[test]
809 fn peer_response_completed_with_handling_mode_accepted() {
810 let req = CommsCommandRequest {
811 kind: "peer_response".to_string(),
812 to: Some("peer-1".to_string()),
813 in_reply_to: Some(uuid::Uuid::now_v7().to_string()),
814 status: Some("completed".to_string()),
815 handling_mode: Some("steer".to_string()),
816 ..Default::default()
817 };
818 let result = req.parse(&crate::SessionId::new());
819 assert!(
820 result.is_ok(),
821 "completed response with handling_mode=steer should be accepted"
822 );
823 }
824}