clawdentity_core/connector/
frames.rs1use serde::{Deserialize, Serialize};
2use ulid::Ulid;
3
4use crate::did::parse_agent_did;
5use crate::error::{CoreError, Result};
6
7pub const CONNECTOR_FRAME_VERSION: i64 = 1;
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(tag = "type", rename_all = "snake_case")]
11pub enum ConnectorFrame {
12 Heartbeat(HeartbeatFrame),
13 HeartbeatAck(HeartbeatAckFrame),
14 Deliver(DeliverFrame),
15 DeliverAck(DeliverAckFrame),
16 Enqueue(EnqueueFrame),
17 EnqueueAck(EnqueueAckFrame),
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct HeartbeatFrame {
22 pub v: i64,
23 pub id: String,
24 pub ts: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub struct HeartbeatAckFrame {
29 pub v: i64,
30 pub id: String,
31 pub ts: String,
32 #[serde(rename = "ackId")]
33 pub ack_id: String,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct DeliverFrame {
38 pub v: i64,
39 pub id: String,
40 pub ts: String,
41 #[serde(rename = "fromAgentDid")]
42 pub from_agent_did: String,
43 #[serde(rename = "toAgentDid")]
44 pub to_agent_did: String,
45 pub payload: serde_json::Value,
46 #[serde(rename = "contentType", skip_serializing_if = "Option::is_none")]
47 pub content_type: Option<String>,
48 #[serde(rename = "conversationId", skip_serializing_if = "Option::is_none")]
49 pub conversation_id: Option<String>,
50 #[serde(rename = "replyTo", skip_serializing_if = "Option::is_none")]
51 pub reply_to: Option<String>,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
55pub struct DeliverAckFrame {
56 pub v: i64,
57 pub id: String,
58 pub ts: String,
59 #[serde(rename = "ackId")]
60 pub ack_id: String,
61 pub accepted: bool,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub reason: Option<String>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67pub struct EnqueueFrame {
68 pub v: i64,
69 pub id: String,
70 pub ts: String,
71 #[serde(rename = "toAgentDid")]
72 pub to_agent_did: String,
73 pub payload: serde_json::Value,
74 #[serde(rename = "conversationId", skip_serializing_if = "Option::is_none")]
75 pub conversation_id: Option<String>,
76 #[serde(rename = "replyTo", skip_serializing_if = "Option::is_none")]
77 pub reply_to: Option<String>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct EnqueueAckFrame {
82 pub v: i64,
83 pub id: String,
84 pub ts: String,
85 #[serde(rename = "ackId")]
86 pub ack_id: String,
87 pub accepted: bool,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub reason: Option<String>,
90}
91
92fn validate_frame_base(version: i64, id: &str, ts: &str) -> Result<()> {
93 if version != CONNECTOR_FRAME_VERSION {
94 return Err(CoreError::InvalidInput(format!(
95 "connector frame version {version} is unsupported"
96 )));
97 }
98 Ulid::from_string(id)
99 .map_err(|_| CoreError::InvalidInput(format!("invalid frame id: {id}")))?;
100 if ts.trim().is_empty() {
101 return Err(CoreError::InvalidInput(
102 "connector frame timestamp is required".to_string(),
103 ));
104 }
105 Ok(())
106}
107
108fn validate_agent_did(value: &str, field_name: &str) -> Result<()> {
109 let _ = parse_agent_did(value)
110 .map_err(|_| CoreError::InvalidInput(format!("{field_name} must be an agent DID")))?;
111 Ok(())
112}
113
114pub fn validate_frame(frame: &ConnectorFrame) -> Result<()> {
116 match frame {
117 ConnectorFrame::Heartbeat(frame) => validate_frame_base(frame.v, &frame.id, &frame.ts),
118 ConnectorFrame::HeartbeatAck(frame) => {
119 validate_frame_base(frame.v, &frame.id, &frame.ts)?;
120 Ulid::from_string(&frame.ack_id).map_err(|_| {
121 CoreError::InvalidInput(format!("invalid heartbeat ackId: {}", frame.ack_id))
122 })?;
123 Ok(())
124 }
125 ConnectorFrame::Deliver(frame) => {
126 validate_frame_base(frame.v, &frame.id, &frame.ts)?;
127 validate_agent_did(&frame.from_agent_did, "fromAgentDid")?;
128 validate_agent_did(&frame.to_agent_did, "toAgentDid")?;
129 Ok(())
130 }
131 ConnectorFrame::DeliverAck(frame) => {
132 validate_frame_base(frame.v, &frame.id, &frame.ts)?;
133 Ulid::from_string(&frame.ack_id).map_err(|_| {
134 CoreError::InvalidInput(format!("invalid deliver ackId: {}", frame.ack_id))
135 })?;
136 Ok(())
137 }
138 ConnectorFrame::Enqueue(frame) => {
139 validate_frame_base(frame.v, &frame.id, &frame.ts)?;
140 validate_agent_did(&frame.to_agent_did, "toAgentDid")?;
141 Ok(())
142 }
143 ConnectorFrame::EnqueueAck(frame) => {
144 validate_frame_base(frame.v, &frame.id, &frame.ts)?;
145 Ulid::from_string(&frame.ack_id).map_err(|_| {
146 CoreError::InvalidInput(format!("invalid enqueue ackId: {}", frame.ack_id))
147 })?;
148 Ok(())
149 }
150 }
151}
152
153pub fn parse_frame(input: impl AsRef<[u8]>) -> Result<ConnectorFrame> {
155 let bytes = input.as_ref();
156 let payload = std::str::from_utf8(bytes)
157 .map_err(|_| CoreError::InvalidInput("connector frame must be valid UTF-8".to_string()))?;
158
159 let frame = serde_json::from_str::<ConnectorFrame>(payload)
160 .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
161 validate_frame(&frame)?;
162 Ok(frame)
163}
164
165pub fn serialize_frame(frame: &ConnectorFrame) -> Result<String> {
167 validate_frame(frame)?;
168 serde_json::to_string(frame).map_err(CoreError::from)
169}
170
171pub fn now_iso() -> String {
173 chrono::Utc::now().to_rfc3339()
174}
175
176pub fn new_frame_id() -> String {
178 Ulid::new().to_string()
179}
180
181#[cfg(test)]
182mod tests {
183 use super::{
184 CONNECTOR_FRAME_VERSION, ConnectorFrame, EnqueueFrame, new_frame_id, now_iso, parse_frame,
185 serialize_frame,
186 };
187
188 #[test]
189 fn serialize_and_parse_enqueue_frame() {
190 let frame = ConnectorFrame::Enqueue(EnqueueFrame {
191 v: CONNECTOR_FRAME_VERSION,
192 id: new_frame_id(),
193 ts: now_iso(),
194 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
195 .to_string(),
196 payload: serde_json::json!({"text":"hello"}),
197 conversation_id: Some("conv-1".to_string()),
198 reply_to: None,
199 });
200
201 let encoded = serialize_frame(&frame).expect("serialize");
202 let decoded = parse_frame(encoded).expect("parse");
203 assert_eq!(decoded, frame);
204 }
205}