Skip to main content

clawdentity_core/runtime/
relay.rs

1use crate::connector_client::ConnectorClientSender;
2use crate::connector_frames::{CONNECTOR_FRAME_VERSION, ConnectorFrame, EnqueueFrame, now_iso};
3use crate::db::SqliteStore;
4use crate::db_outbound::{
5    EnqueueOutboundInput, OutboundQueueItem, enqueue_outbound, move_outbound_to_dead_letter,
6    take_oldest_outbound,
7};
8use crate::error::Result;
9use crate::runtime_trusted_receipts::TrustedReceiptsStore;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FlushOutboundResult {
13    pub sent_count: usize,
14    pub failed_count: usize,
15}
16
17/// TODO(clawdentity): document `flush_outbound_queue_to_relay`.
18#[allow(clippy::too_many_lines)]
19pub async fn flush_outbound_queue_to_relay(
20    store: &SqliteStore,
21    relay: &ConnectorClientSender,
22    max_items: usize,
23    trusted_receipts: Option<&TrustedReceiptsStore>,
24) -> Result<FlushOutboundResult> {
25    let mut sent_count = 0_usize;
26    let mut failed_count = 0_usize;
27
28    for _ in 0..max_items {
29        if !relay.is_connected() {
30            break;
31        }
32
33        let Some(item) = take_oldest_outbound(store)? else {
34            break;
35        };
36
37        let payload = match serde_json::from_str::<serde_json::Value>(&item.payload_json) {
38            Ok(payload) => payload,
39            Err(error) => {
40                tracing::warn!(
41                    frame_id = %item.frame_id,
42                    to_agent_did = %item.to_agent_did,
43                    error = %error,
44                    "malformed outbound payload moved to dead letter"
45                );
46                if let Err(dead_letter_error) =
47                    dead_letter_malformed_outbound_payload(store, &item, &error)
48                {
49                    tracing::warn!(
50                        frame_id = %item.frame_id,
51                        to_agent_did = %item.to_agent_did,
52                        error = %dead_letter_error,
53                        "failed to move malformed outbound payload to dead letter"
54                    );
55                }
56                failed_count += 1;
57                continue;
58            }
59        };
60
61        let frame = ConnectorFrame::Enqueue(EnqueueFrame {
62            v: CONNECTOR_FRAME_VERSION,
63            id: item.frame_id.clone(),
64            ts: now_iso(),
65            to_agent_did: item.to_agent_did.clone(),
66            payload,
67            conversation_id: item.conversation_id.clone(),
68            reply_to: item.reply_to.clone(),
69        });
70
71        if relay.send_frame(frame).await.is_err() {
72            // Requeue on best effort if the relay connection failed.
73            enqueue_outbound(
74                store,
75                EnqueueOutboundInput {
76                    frame_id: item.frame_id,
77                    frame_version: item.frame_version,
78                    frame_type: item.frame_type,
79                    to_agent_did: item.to_agent_did,
80                    payload_json: item.payload_json,
81                    conversation_id: item.conversation_id,
82                    reply_to: item.reply_to,
83                },
84            )?;
85            failed_count += 1;
86            break;
87        }
88
89        if let Some(receipts) = trusted_receipts {
90            receipts.mark_trusted(item.frame_id);
91        }
92        sent_count += 1;
93    }
94
95    Ok(FlushOutboundResult {
96        sent_count,
97        failed_count,
98    })
99}
100
101fn dead_letter_malformed_outbound_payload(
102    store: &SqliteStore,
103    item: &OutboundQueueItem,
104    parse_error: &serde_json::Error,
105) -> Result<()> {
106    move_outbound_to_dead_letter(
107        store,
108        item,
109        &format!("malformed outbound payload: {parse_error}"),
110    )
111}
112
113#[cfg(test)]
114mod tests {
115    use std::time::Duration;
116
117    use tempfile::TempDir;
118
119    use crate::connector_client::{ConnectorClientOptions, spawn_connector_client};
120    use crate::db::SqliteStore;
121    use crate::db_outbound::{
122        EnqueueOutboundInput, enqueue_outbound, list_outbound_dead_letter, outbound_count,
123        take_oldest_outbound,
124    };
125    use crate::runtime_trusted_receipts::TrustedReceiptsStore;
126
127    use super::{dead_letter_malformed_outbound_payload, flush_outbound_queue_to_relay};
128
129    #[tokio::test]
130    async fn flush_keeps_message_when_relay_is_disconnected() {
131        let temp = TempDir::new().expect("temp dir");
132        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
133        enqueue_outbound(
134            &store,
135            EnqueueOutboundInput {
136                frame_id: "frame-1".to_string(),
137                frame_version: 1,
138                frame_type: "enqueue".to_string(),
139                to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
140                    .to_string(),
141                payload_json: "{\"x\":1}".to_string(),
142                conversation_id: None,
143                reply_to: None,
144            },
145        )
146        .expect("enqueue");
147
148        let client = spawn_connector_client(ConnectorClientOptions::with_defaults(
149            "ws://127.0.0.1:9/v1/relay/connect",
150            vec![],
151        ));
152        tokio::time::sleep(Duration::from_millis(25)).await;
153
154        let receipts = TrustedReceiptsStore::new();
155        let result = flush_outbound_queue_to_relay(&store, &client.sender(), 10, Some(&receipts))
156            .await
157            .expect("flush");
158        assert_eq!(result.sent_count, 0);
159        assert_eq!(result.failed_count, 0);
160        assert_eq!(outbound_count(&store).expect("count"), 1);
161        client.sender().shutdown();
162    }
163
164    #[test]
165    fn malformed_outbound_payload_moves_to_dead_letter_with_context() {
166        let temp = TempDir::new().expect("temp dir");
167        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
168        enqueue_outbound(
169            &store,
170            EnqueueOutboundInput {
171                frame_id: "frame-1".to_string(),
172                frame_version: 1,
173                frame_type: "enqueue".to_string(),
174                to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
175                    .to_string(),
176                payload_json: "{\"unterminated\"".to_string(),
177                conversation_id: None,
178                reply_to: None,
179            },
180        )
181        .expect("enqueue");
182        let item = take_oldest_outbound(&store).expect("take").expect("item");
183        let parse_error =
184            serde_json::from_str::<serde_json::Value>(&item.payload_json).expect_err("invalid");
185        dead_letter_malformed_outbound_payload(&store, &item, &parse_error).expect("dead letter");
186
187        let dead_letter = list_outbound_dead_letter(&store, 10).expect("dead letters");
188        assert_eq!(dead_letter.len(), 1);
189        assert_eq!(dead_letter[0].frame_id, "frame-1");
190        assert!(
191            dead_letter[0]
192                .dead_letter_reason
193                .contains("malformed outbound payload")
194        );
195    }
196}