Skip to main content

clawdentity_core/connector/
frames.rs

1use serde::{Deserialize, Serialize};
2use ulid::Ulid;
3
4use crate::did::parse_agent_did;
5use crate::error::{CoreError, Result};
6
7pub const CONNECTOR_FRAME_VERSION: i64 = 1;
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(tag = "type", rename_all = "snake_case")]
11pub enum ConnectorFrame {
12    Heartbeat(HeartbeatFrame),
13    HeartbeatAck(HeartbeatAckFrame),
14    Deliver(DeliverFrame),
15    DeliverAck(DeliverAckFrame),
16    Enqueue(EnqueueFrame),
17    EnqueueAck(EnqueueAckFrame),
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct HeartbeatFrame {
22    pub v: i64,
23    pub id: String,
24    pub ts: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub struct HeartbeatAckFrame {
29    pub v: i64,
30    pub id: String,
31    pub ts: String,
32    #[serde(rename = "ackId")]
33    pub ack_id: String,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct DeliverFrame {
38    pub v: i64,
39    pub id: String,
40    pub ts: String,
41    #[serde(rename = "fromAgentDid")]
42    pub from_agent_did: String,
43    #[serde(rename = "toAgentDid")]
44    pub to_agent_did: String,
45    pub payload: serde_json::Value,
46    #[serde(rename = "contentType", skip_serializing_if = "Option::is_none")]
47    pub content_type: Option<String>,
48    #[serde(rename = "conversationId", skip_serializing_if = "Option::is_none")]
49    pub conversation_id: Option<String>,
50    #[serde(rename = "replyTo", skip_serializing_if = "Option::is_none")]
51    pub reply_to: Option<String>,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
55pub struct DeliverAckFrame {
56    pub v: i64,
57    pub id: String,
58    pub ts: String,
59    #[serde(rename = "ackId")]
60    pub ack_id: String,
61    pub accepted: bool,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub reason: Option<String>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67pub struct EnqueueFrame {
68    pub v: i64,
69    pub id: String,
70    pub ts: String,
71    #[serde(rename = "toAgentDid")]
72    pub to_agent_did: String,
73    pub payload: serde_json::Value,
74    #[serde(rename = "conversationId", skip_serializing_if = "Option::is_none")]
75    pub conversation_id: Option<String>,
76    #[serde(rename = "replyTo", skip_serializing_if = "Option::is_none")]
77    pub reply_to: Option<String>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct EnqueueAckFrame {
82    pub v: i64,
83    pub id: String,
84    pub ts: String,
85    #[serde(rename = "ackId")]
86    pub ack_id: String,
87    pub accepted: bool,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub reason: Option<String>,
90}
91
92fn validate_frame_base(version: i64, id: &str, ts: &str) -> Result<()> {
93    if version != CONNECTOR_FRAME_VERSION {
94        return Err(CoreError::InvalidInput(format!(
95            "connector frame version {version} is unsupported"
96        )));
97    }
98    Ulid::from_string(id)
99        .map_err(|_| CoreError::InvalidInput(format!("invalid frame id: {id}")))?;
100    if ts.trim().is_empty() {
101        return Err(CoreError::InvalidInput(
102            "connector frame timestamp is required".to_string(),
103        ));
104    }
105    Ok(())
106}
107
108fn validate_agent_did(value: &str, field_name: &str) -> Result<()> {
109    let _ = parse_agent_did(value)
110        .map_err(|_| CoreError::InvalidInput(format!("{field_name} must be an agent DID")))?;
111    Ok(())
112}
113
114/// TODO(clawdentity): document `validate_frame`.
115pub fn validate_frame(frame: &ConnectorFrame) -> Result<()> {
116    match frame {
117        ConnectorFrame::Heartbeat(frame) => validate_frame_base(frame.v, &frame.id, &frame.ts),
118        ConnectorFrame::HeartbeatAck(frame) => {
119            validate_frame_base(frame.v, &frame.id, &frame.ts)?;
120            Ulid::from_string(&frame.ack_id).map_err(|_| {
121                CoreError::InvalidInput(format!("invalid heartbeat ackId: {}", frame.ack_id))
122            })?;
123            Ok(())
124        }
125        ConnectorFrame::Deliver(frame) => {
126            validate_frame_base(frame.v, &frame.id, &frame.ts)?;
127            validate_agent_did(&frame.from_agent_did, "fromAgentDid")?;
128            validate_agent_did(&frame.to_agent_did, "toAgentDid")?;
129            Ok(())
130        }
131        ConnectorFrame::DeliverAck(frame) => {
132            validate_frame_base(frame.v, &frame.id, &frame.ts)?;
133            Ulid::from_string(&frame.ack_id).map_err(|_| {
134                CoreError::InvalidInput(format!("invalid deliver ackId: {}", frame.ack_id))
135            })?;
136            Ok(())
137        }
138        ConnectorFrame::Enqueue(frame) => {
139            validate_frame_base(frame.v, &frame.id, &frame.ts)?;
140            validate_agent_did(&frame.to_agent_did, "toAgentDid")?;
141            Ok(())
142        }
143        ConnectorFrame::EnqueueAck(frame) => {
144            validate_frame_base(frame.v, &frame.id, &frame.ts)?;
145            Ulid::from_string(&frame.ack_id).map_err(|_| {
146                CoreError::InvalidInput(format!("invalid enqueue ackId: {}", frame.ack_id))
147            })?;
148            Ok(())
149        }
150    }
151}
152
153/// TODO(clawdentity): document `parse_frame`.
154pub fn parse_frame(input: impl AsRef<[u8]>) -> Result<ConnectorFrame> {
155    let bytes = input.as_ref();
156    let payload = std::str::from_utf8(bytes)
157        .map_err(|_| CoreError::InvalidInput("connector frame must be valid UTF-8".to_string()))?;
158
159    let frame = serde_json::from_str::<ConnectorFrame>(payload)
160        .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
161    validate_frame(&frame)?;
162    Ok(frame)
163}
164
165/// TODO(clawdentity): document `serialize_frame`.
166pub fn serialize_frame(frame: &ConnectorFrame) -> Result<String> {
167    validate_frame(frame)?;
168    serde_json::to_string(frame).map_err(CoreError::from)
169}
170
171/// TODO(clawdentity): document `now_iso`.
172pub fn now_iso() -> String {
173    chrono::Utc::now().to_rfc3339()
174}
175
176/// TODO(clawdentity): document `new_frame_id`.
177pub fn new_frame_id() -> String {
178    Ulid::new().to_string()
179}
180
181#[cfg(test)]
182mod tests {
183    use super::{
184        CONNECTOR_FRAME_VERSION, ConnectorFrame, EnqueueFrame, new_frame_id, now_iso, parse_frame,
185        serialize_frame,
186    };
187
188    #[test]
189    fn serialize_and_parse_enqueue_frame() {
190        let frame = ConnectorFrame::Enqueue(EnqueueFrame {
191            v: CONNECTOR_FRAME_VERSION,
192            id: new_frame_id(),
193            ts: now_iso(),
194            to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
195                .to_string(),
196            payload: serde_json::json!({"text":"hello"}),
197            conversation_id: Some("conv-1".to_string()),
198            reply_to: None,
199        });
200
201        let encoded = serialize_frame(&frame).expect("serialize");
202        let decoded = parse_frame(encoded).expect("parse");
203        assert_eq!(decoded, frame);
204    }
205}