clawdentity_core/runtime/
relay.rs1use crate::connector_client::ConnectorClientSender;
2use crate::connector_frames::{CONNECTOR_FRAME_VERSION, ConnectorFrame, EnqueueFrame, now_iso};
3use crate::db::SqliteStore;
4use crate::db_outbound::{
5 EnqueueOutboundInput, OutboundQueueItem, enqueue_outbound, move_outbound_to_dead_letter,
6 take_oldest_outbound,
7};
8use crate::error::Result;
9use crate::runtime_trusted_receipts::TrustedReceiptsStore;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FlushOutboundResult {
13 pub sent_count: usize,
14 pub failed_count: usize,
15}
16
17#[allow(clippy::too_many_lines)]
19pub async fn flush_outbound_queue_to_relay(
20 store: &SqliteStore,
21 relay: &ConnectorClientSender,
22 max_items: usize,
23 trusted_receipts: Option<&TrustedReceiptsStore>,
24) -> Result<FlushOutboundResult> {
25 let mut sent_count = 0_usize;
26 let mut failed_count = 0_usize;
27
28 for _ in 0..max_items {
29 if !relay.is_connected() {
30 break;
31 }
32
33 let Some(item) = take_oldest_outbound(store)? else {
34 break;
35 };
36
37 let payload = match serde_json::from_str::<serde_json::Value>(&item.payload_json) {
38 Ok(payload) => payload,
39 Err(error) => {
40 tracing::warn!(
41 frame_id = %item.frame_id,
42 to_agent_did = %item.to_agent_did,
43 error = %error,
44 "malformed outbound payload moved to dead letter"
45 );
46 if let Err(dead_letter_error) =
47 dead_letter_malformed_outbound_payload(store, &item, &error)
48 {
49 tracing::warn!(
50 frame_id = %item.frame_id,
51 to_agent_did = %item.to_agent_did,
52 error = %dead_letter_error,
53 "failed to move malformed outbound payload to dead letter"
54 );
55 }
56 failed_count += 1;
57 continue;
58 }
59 };
60
61 let frame = ConnectorFrame::Enqueue(EnqueueFrame {
62 v: CONNECTOR_FRAME_VERSION,
63 id: item.frame_id.clone(),
64 ts: now_iso(),
65 to_agent_did: item.to_agent_did.clone(),
66 payload,
67 conversation_id: item.conversation_id.clone(),
68 reply_to: item.reply_to.clone(),
69 });
70
71 if relay.send_frame(frame).await.is_err() {
72 enqueue_outbound(
74 store,
75 EnqueueOutboundInput {
76 frame_id: item.frame_id,
77 frame_version: item.frame_version,
78 frame_type: item.frame_type,
79 to_agent_did: item.to_agent_did,
80 payload_json: item.payload_json,
81 conversation_id: item.conversation_id,
82 reply_to: item.reply_to,
83 },
84 )?;
85 failed_count += 1;
86 break;
87 }
88
89 if let Some(receipts) = trusted_receipts {
90 receipts.mark_trusted(item.frame_id);
91 }
92 sent_count += 1;
93 }
94
95 Ok(FlushOutboundResult {
96 sent_count,
97 failed_count,
98 })
99}
100
101fn dead_letter_malformed_outbound_payload(
102 store: &SqliteStore,
103 item: &OutboundQueueItem,
104 parse_error: &serde_json::Error,
105) -> Result<()> {
106 move_outbound_to_dead_letter(
107 store,
108 item,
109 &format!("malformed outbound payload: {parse_error}"),
110 )
111}
112
113#[cfg(test)]
114mod tests {
115 use std::time::Duration;
116
117 use tempfile::TempDir;
118
119 use crate::connector_client::{ConnectorClientOptions, spawn_connector_client};
120 use crate::db::SqliteStore;
121 use crate::db_outbound::{
122 EnqueueOutboundInput, enqueue_outbound, list_outbound_dead_letter, outbound_count,
123 take_oldest_outbound,
124 };
125 use crate::runtime_trusted_receipts::TrustedReceiptsStore;
126
127 use super::{dead_letter_malformed_outbound_payload, flush_outbound_queue_to_relay};
128
129 #[tokio::test]
130 async fn flush_keeps_message_when_relay_is_disconnected() {
131 let temp = TempDir::new().expect("temp dir");
132 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
133 enqueue_outbound(
134 &store,
135 EnqueueOutboundInput {
136 frame_id: "frame-1".to_string(),
137 frame_version: 1,
138 frame_type: "enqueue".to_string(),
139 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
140 .to_string(),
141 payload_json: "{\"x\":1}".to_string(),
142 conversation_id: None,
143 reply_to: None,
144 },
145 )
146 .expect("enqueue");
147
148 let client = spawn_connector_client(ConnectorClientOptions::with_defaults(
149 "ws://127.0.0.1:9/v1/relay/connect",
150 vec![],
151 ));
152 tokio::time::sleep(Duration::from_millis(25)).await;
153
154 let receipts = TrustedReceiptsStore::new();
155 let result = flush_outbound_queue_to_relay(&store, &client.sender(), 10, Some(&receipts))
156 .await
157 .expect("flush");
158 assert_eq!(result.sent_count, 0);
159 assert_eq!(result.failed_count, 0);
160 assert_eq!(outbound_count(&store).expect("count"), 1);
161 client.sender().shutdown();
162 }
163
164 #[test]
165 fn malformed_outbound_payload_moves_to_dead_letter_with_context() {
166 let temp = TempDir::new().expect("temp dir");
167 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
168 enqueue_outbound(
169 &store,
170 EnqueueOutboundInput {
171 frame_id: "frame-1".to_string(),
172 frame_version: 1,
173 frame_type: "enqueue".to_string(),
174 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
175 .to_string(),
176 payload_json: "{\"unterminated\"".to_string(),
177 conversation_id: None,
178 reply_to: None,
179 },
180 )
181 .expect("enqueue");
182 let item = take_oldest_outbound(&store).expect("take").expect("item");
183 let parse_error =
184 serde_json::from_str::<serde_json::Value>(&item.payload_json).expect_err("invalid");
185 dead_letter_malformed_outbound_payload(&store, &item, &parse_error).expect("dead letter");
186
187 let dead_letter = list_outbound_dead_letter(&store, 10).expect("dead letters");
188 assert_eq!(dead_letter.len(), 1);
189 assert_eq!(dead_letter[0].frame_id, "frame-1");
190 assert!(
191 dead_letter[0]
192 .dead_letter_reason
193 .contains("malformed outbound payload")
194 );
195 }
196}