Skip to main content

clawdentity_core/db/
outbound.rs

1use rusqlite::params;
2
3use crate::db::{SqliteStore, now_utc_ms};
4use crate::error::{CoreError, Result};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct OutboundQueueItem {
8    pub frame_id: String,
9    pub frame_version: i64,
10    pub frame_type: String,
11    pub to_agent_did: String,
12    pub payload_json: String,
13    pub conversation_id: Option<String>,
14    pub reply_to: Option<String>,
15    pub created_at_ms: i64,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct OutboundDeadLetterItem {
20    pub frame_id: String,
21    pub frame_version: i64,
22    pub frame_type: String,
23    pub to_agent_did: String,
24    pub payload_json: String,
25    pub conversation_id: Option<String>,
26    pub reply_to: Option<String>,
27    pub created_at_ms: i64,
28    pub dead_lettered_at_ms: i64,
29    pub dead_letter_reason: String,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct EnqueueOutboundInput {
34    pub frame_id: String,
35    pub frame_version: i64,
36    pub frame_type: String,
37    pub to_agent_did: String,
38    pub payload_json: String,
39    pub conversation_id: Option<String>,
40    pub reply_to: Option<String>,
41}
42
43fn parse_optional_non_empty(value: Option<String>) -> Option<String> {
44    value.and_then(|raw| {
45        let trimmed = raw.trim();
46        if trimmed.is_empty() {
47            None
48        } else {
49            Some(trimmed.to_string())
50        }
51    })
52}
53
54fn map_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<OutboundQueueItem> {
55    Ok(OutboundQueueItem {
56        frame_id: row.get(0)?,
57        frame_version: row.get(1)?,
58        frame_type: row.get(2)?,
59        to_agent_did: row.get(3)?,
60        payload_json: row.get(4)?,
61        conversation_id: row.get(5)?,
62        reply_to: row.get(6)?,
63        created_at_ms: row.get(7)?,
64    })
65}
66
67fn map_dead_letter_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<OutboundDeadLetterItem> {
68    Ok(OutboundDeadLetterItem {
69        frame_id: row.get(0)?,
70        frame_version: row.get(1)?,
71        frame_type: row.get(2)?,
72        to_agent_did: row.get(3)?,
73        payload_json: row.get(4)?,
74        conversation_id: row.get(5)?,
75        reply_to: row.get(6)?,
76        created_at_ms: row.get(7)?,
77        dead_lettered_at_ms: row.get(8)?,
78        dead_letter_reason: row.get(9)?,
79    })
80}
81
82/// TODO(clawdentity): document `enqueue_outbound`.
83pub fn enqueue_outbound(store: &SqliteStore, input: EnqueueOutboundInput) -> Result<()> {
84    let frame_id = input.frame_id.trim().to_string();
85    let frame_type = input.frame_type.trim().to_string();
86    let to_agent_did = input.to_agent_did.trim().to_string();
87    let payload_json = input.payload_json.trim().to_string();
88
89    if frame_id.is_empty() {
90        return Err(CoreError::InvalidInput("frame_id is required".to_string()));
91    }
92    if frame_type.is_empty() {
93        return Err(CoreError::InvalidInput(
94            "frame_type is required".to_string(),
95        ));
96    }
97    if to_agent_did.is_empty() {
98        return Err(CoreError::InvalidInput(
99            "to_agent_did is required".to_string(),
100        ));
101    }
102    if payload_json.is_empty() {
103        return Err(CoreError::InvalidInput(
104            "payload_json is required".to_string(),
105        ));
106    }
107
108    let conversation_id = parse_optional_non_empty(input.conversation_id);
109    let reply_to = parse_optional_non_empty(input.reply_to);
110    let created_at_ms = now_utc_ms();
111    store.with_connection(|connection| {
112        connection.execute(
113            "INSERT INTO outbound_queue (
114                frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to, created_at_ms
115            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
116            params![
117                frame_id,
118                input.frame_version,
119                frame_type,
120                to_agent_did,
121                payload_json,
122                conversation_id,
123                reply_to,
124                created_at_ms
125            ],
126        )?;
127        Ok(())
128    })
129}
130
131/// TODO(clawdentity): document `list_outbound`.
132pub fn list_outbound(store: &SqliteStore, limit: usize) -> Result<Vec<OutboundQueueItem>> {
133    let limit = i64::try_from(limit).unwrap_or(i64::MAX);
134    store.with_connection(|connection| {
135        let mut statement = connection.prepare(
136            "SELECT frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to, created_at_ms
137             FROM outbound_queue
138             ORDER BY created_at_ms ASC, frame_id ASC
139             LIMIT ?1",
140        )?;
141        let rows = statement.query_map([limit], map_row)?;
142        let items: rusqlite::Result<Vec<OutboundQueueItem>> = rows.collect();
143        Ok(items?)
144    })
145}
146
147/// TODO(clawdentity): document `take_oldest_outbound`.
148pub fn take_oldest_outbound(store: &SqliteStore) -> Result<Option<OutboundQueueItem>> {
149    store.with_connection(|connection| {
150        let mut statement = connection.prepare(
151            "SELECT frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to, created_at_ms
152             FROM outbound_queue
153             ORDER BY created_at_ms ASC, frame_id ASC
154             LIMIT 1",
155        )?;
156        let item = statement.query_row([], map_row).ok();
157        let Some(item) = item else {
158            return Ok(None);
159        };
160        connection.execute("DELETE FROM outbound_queue WHERE frame_id = ?1", [&item.frame_id])?;
161        Ok(Some(item))
162    })
163}
164
165/// TODO(clawdentity): document `delete_outbound`.
166pub fn delete_outbound(store: &SqliteStore, frame_id: &str) -> Result<bool> {
167    let frame_id = frame_id.trim();
168    if frame_id.is_empty() {
169        return Ok(false);
170    }
171    store.with_connection(|connection| {
172        let affected =
173            connection.execute("DELETE FROM outbound_queue WHERE frame_id = ?1", [frame_id])?;
174        Ok(affected > 0)
175    })
176}
177
178/// TODO(clawdentity): document `move_outbound_to_dead_letter`.
179pub fn move_outbound_to_dead_letter(
180    store: &SqliteStore,
181    item: &OutboundQueueItem,
182    dead_letter_reason: &str,
183) -> Result<()> {
184    let reason = dead_letter_reason.trim();
185    if reason.is_empty() {
186        return Err(CoreError::InvalidInput(
187            "dead_letter_reason is required".to_string(),
188        ));
189    }
190
191    let dead_lettered_at_ms = now_utc_ms();
192    store.with_connection(|connection| {
193        connection.execute(
194            "INSERT INTO outbound_dead_letter (
195                frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to,
196                created_at_ms, dead_lettered_at_ms, dead_letter_reason
197            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
198            ON CONFLICT(frame_id) DO UPDATE SET
199                frame_version = excluded.frame_version,
200                frame_type = excluded.frame_type,
201                to_agent_did = excluded.to_agent_did,
202                payload_json = excluded.payload_json,
203                conversation_id = excluded.conversation_id,
204                reply_to = excluded.reply_to,
205                created_at_ms = excluded.created_at_ms,
206                dead_lettered_at_ms = excluded.dead_lettered_at_ms,
207                dead_letter_reason = excluded.dead_letter_reason",
208            params![
209                &item.frame_id,
210                item.frame_version,
211                &item.frame_type,
212                &item.to_agent_did,
213                &item.payload_json,
214                &item.conversation_id,
215                &item.reply_to,
216                item.created_at_ms,
217                dead_lettered_at_ms,
218                reason
219            ],
220        )?;
221        Ok(())
222    })
223}
224
225/// TODO(clawdentity): document `outbound_count`.
226pub fn outbound_count(store: &SqliteStore) -> Result<i64> {
227    store.with_connection(|connection| {
228        let count =
229            connection.query_row("SELECT COUNT(*) FROM outbound_queue", [], |row| row.get(0))?;
230        Ok(count)
231    })
232}
233
234/// TODO(clawdentity): document `list_outbound_dead_letter`.
235pub fn list_outbound_dead_letter(
236    store: &SqliteStore,
237    limit: usize,
238) -> Result<Vec<OutboundDeadLetterItem>> {
239    let limit = i64::try_from(limit).unwrap_or(i64::MAX);
240    store.with_connection(|connection| {
241        let mut statement = connection.prepare(
242            "SELECT frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to,
243                    created_at_ms, dead_lettered_at_ms, dead_letter_reason
244             FROM outbound_dead_letter
245             ORDER BY dead_lettered_at_ms DESC, frame_id DESC
246             LIMIT ?1",
247        )?;
248        let rows = statement.query_map([limit], map_dead_letter_row)?;
249        let items: rusqlite::Result<Vec<OutboundDeadLetterItem>> = rows.collect();
250        Ok(items?)
251    })
252}
253
254/// TODO(clawdentity): document `outbound_dead_letter_count`.
255pub fn outbound_dead_letter_count(store: &SqliteStore) -> Result<i64> {
256    store.with_connection(|connection| {
257        let count =
258            connection.query_row("SELECT COUNT(*) FROM outbound_dead_letter", [], |row| {
259                row.get(0)
260            })?;
261        Ok(count)
262    })
263}
264
265#[cfg(test)]
266mod tests {
267    use tempfile::TempDir;
268
269    use crate::db::SqliteStore;
270
271    use super::{
272        EnqueueOutboundInput, delete_outbound, enqueue_outbound, list_outbound,
273        list_outbound_dead_letter, move_outbound_to_dead_letter, outbound_count,
274        outbound_dead_letter_count, take_oldest_outbound,
275    };
276
277    #[test]
278    fn enqueue_take_and_delete_outbound_items() {
279        let temp = TempDir::new().expect("temp dir");
280        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
281
282        enqueue_outbound(
283            &store,
284            EnqueueOutboundInput {
285                frame_id: "frame-1".to_string(),
286                frame_version: 1,
287                frame_type: "relay.frame".to_string(),
288                to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
289                    .to_string(),
290                payload_json: "{\"hello\":\"world\"}".to_string(),
291                conversation_id: Some("conv-1".to_string()),
292                reply_to: None,
293            },
294        )
295        .expect("enqueue 1");
296        enqueue_outbound(
297            &store,
298            EnqueueOutboundInput {
299                frame_id: "frame-2".to_string(),
300                frame_version: 1,
301                frame_type: "relay.frame".to_string(),
302                to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT5"
303                    .to_string(),
304                payload_json: "{\"hi\":\"there\"}".to_string(),
305                conversation_id: None,
306                reply_to: None,
307            },
308        )
309        .expect("enqueue 2");
310
311        assert_eq!(outbound_count(&store).expect("count"), 2);
312        assert_eq!(list_outbound(&store, 10).expect("list").len(), 2);
313
314        let oldest = take_oldest_outbound(&store).expect("take").expect("oldest");
315        assert_eq!(oldest.frame_id, "frame-1");
316        assert_eq!(outbound_count(&store).expect("count after take"), 1);
317
318        let deleted = delete_outbound(&store, "frame-2").expect("delete");
319        assert!(deleted);
320        assert_eq!(outbound_count(&store).expect("count after delete"), 0);
321    }
322
323    #[test]
324    fn moves_outbound_item_to_dead_letter() {
325        let temp = TempDir::new().expect("temp dir");
326        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
327
328        enqueue_outbound(
329            &store,
330            EnqueueOutboundInput {
331                frame_id: "frame-1".to_string(),
332                frame_version: 1,
333                frame_type: "relay.frame".to_string(),
334                to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
335                    .to_string(),
336                payload_json: "{\"broken\":\"json\"}".to_string(),
337                conversation_id: None,
338                reply_to: None,
339            },
340        )
341        .expect("enqueue");
342        let item = take_oldest_outbound(&store).expect("take").expect("item");
343        move_outbound_to_dead_letter(&store, &item, "malformed outbound payload").expect("move");
344
345        assert_eq!(outbound_count(&store).expect("queue count"), 0);
346        assert_eq!(
347            outbound_dead_letter_count(&store).expect("dead letter count"),
348            1
349        );
350        let dead_letters = list_outbound_dead_letter(&store, 10).expect("dead letters");
351        assert_eq!(dead_letters.len(), 1);
352        assert_eq!(dead_letters[0].frame_id, "frame-1");
353        assert_eq!(
354            dead_letters[0].dead_letter_reason,
355            "malformed outbound payload"
356        );
357    }
358}