Skip to main content

clawdentity_core/db/
inbound.rs

1use rusqlite::{OptionalExtension, params};
2use serde::Serialize;
3
4use crate::db::{SqliteStore, now_utc_ms};
5use crate::error::{CoreError, Result};
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
8pub struct InboundPendingItem {
9    pub request_id: String,
10    pub frame_id: String,
11    pub from_agent_did: String,
12    pub to_agent_did: String,
13    pub payload_json: String,
14    pub payload_bytes: i64,
15    pub received_at_ms: i64,
16    pub next_attempt_at_ms: i64,
17    pub attempt_count: i64,
18    pub last_error: Option<String>,
19    pub last_attempt_at_ms: Option<i64>,
20    pub conversation_id: Option<String>,
21    pub reply_to: Option<String>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
25pub struct InboundDeadLetterItem {
26    pub request_id: String,
27    pub frame_id: String,
28    pub from_agent_did: String,
29    pub to_agent_did: String,
30    pub payload_json: String,
31    pub payload_bytes: i64,
32    pub received_at_ms: i64,
33    pub attempt_count: i64,
34    pub last_error: Option<String>,
35    pub last_attempt_at_ms: Option<i64>,
36    pub conversation_id: Option<String>,
37    pub reply_to: Option<String>,
38    pub dead_lettered_at_ms: i64,
39    pub dead_letter_reason: String,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
43pub struct InboundEvent {
44    pub id: i64,
45    pub at_ms: i64,
46    pub event_type: String,
47    pub request_id: Option<String>,
48    pub details_json: Option<String>,
49}
50
51fn parse_optional_non_empty(value: Option<String>) -> Option<String> {
52    value.and_then(|raw| {
53        let trimmed = raw.trim();
54        if trimmed.is_empty() {
55            None
56        } else {
57            Some(trimmed.to_string())
58        }
59    })
60}
61
62fn map_pending_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<InboundPendingItem> {
63    Ok(InboundPendingItem {
64        request_id: row.get(0)?,
65        frame_id: row.get(1)?,
66        from_agent_did: row.get(2)?,
67        to_agent_did: row.get(3)?,
68        payload_json: row.get(4)?,
69        payload_bytes: row.get(5)?,
70        received_at_ms: row.get(6)?,
71        next_attempt_at_ms: row.get(7)?,
72        attempt_count: row.get(8)?,
73        last_error: row.get(9)?,
74        last_attempt_at_ms: row.get(10)?,
75        conversation_id: row.get(11)?,
76        reply_to: row.get(12)?,
77    })
78}
79
80fn map_dead_letter_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<InboundDeadLetterItem> {
81    Ok(InboundDeadLetterItem {
82        request_id: row.get(0)?,
83        frame_id: row.get(1)?,
84        from_agent_did: row.get(2)?,
85        to_agent_did: row.get(3)?,
86        payload_json: row.get(4)?,
87        payload_bytes: row.get(5)?,
88        received_at_ms: row.get(6)?,
89        attempt_count: row.get(7)?,
90        last_error: row.get(8)?,
91        last_attempt_at_ms: row.get(9)?,
92        conversation_id: row.get(10)?,
93        reply_to: row.get(11)?,
94        dead_lettered_at_ms: row.get(12)?,
95        dead_letter_reason: row.get(13)?,
96    })
97}
98
99/// TODO(clawdentity): document `upsert_pending`.
100#[allow(clippy::too_many_lines)]
101pub fn upsert_pending(store: &SqliteStore, item: InboundPendingItem) -> Result<()> {
102    if item.request_id.trim().is_empty() {
103        return Err(CoreError::InvalidInput(
104            "request_id is required".to_string(),
105        ));
106    }
107    if item.frame_id.trim().is_empty() {
108        return Err(CoreError::InvalidInput("frame_id is required".to_string()));
109    }
110    if item.from_agent_did.trim().is_empty() || item.to_agent_did.trim().is_empty() {
111        return Err(CoreError::InvalidInput(
112            "from_agent_did and to_agent_did are required".to_string(),
113        ));
114    }
115    if item.payload_json.trim().is_empty() {
116        return Err(CoreError::InvalidInput(
117            "payload_json is required".to_string(),
118        ));
119    }
120    if item.payload_bytes < 0 || item.attempt_count < 0 {
121        return Err(CoreError::InvalidInput(
122            "payload_bytes and attempt_count must be >= 0".to_string(),
123        ));
124    }
125
126    store.with_connection(|connection| {
127        connection.execute(
128            "INSERT INTO inbound_pending (
129                request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
130                received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
131                conversation_id, reply_to
132            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
133            ON CONFLICT(request_id) DO UPDATE SET
134                frame_id = excluded.frame_id,
135                from_agent_did = excluded.from_agent_did,
136                to_agent_did = excluded.to_agent_did,
137                payload_json = excluded.payload_json,
138                payload_bytes = excluded.payload_bytes,
139                received_at_ms = excluded.received_at_ms,
140                next_attempt_at_ms = excluded.next_attempt_at_ms,
141                attempt_count = excluded.attempt_count,
142                last_error = excluded.last_error,
143                last_attempt_at_ms = excluded.last_attempt_at_ms,
144                conversation_id = excluded.conversation_id,
145                reply_to = excluded.reply_to",
146            params![
147                item.request_id.trim(),
148                item.frame_id.trim(),
149                item.from_agent_did.trim(),
150                item.to_agent_did.trim(),
151                item.payload_json.trim(),
152                item.payload_bytes,
153                item.received_at_ms,
154                item.next_attempt_at_ms,
155                item.attempt_count,
156                parse_optional_non_empty(item.last_error),
157                item.last_attempt_at_ms,
158                parse_optional_non_empty(item.conversation_id),
159                parse_optional_non_empty(item.reply_to),
160            ],
161        )?;
162        Ok(())
163    })
164}
165
166/// TODO(clawdentity): document `list_pending_due`.
167pub fn list_pending_due(
168    store: &SqliteStore,
169    at_or_before_ms: i64,
170    limit: usize,
171) -> Result<Vec<InboundPendingItem>> {
172    let limit = i64::try_from(limit).unwrap_or(i64::MAX);
173    store.with_connection(|connection| {
174        let mut statement = connection.prepare(
175            "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
176                    received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
177                    conversation_id, reply_to
178             FROM inbound_pending
179             WHERE next_attempt_at_ms <= ?1
180             ORDER BY next_attempt_at_ms ASC, received_at_ms ASC
181             LIMIT ?2",
182        )?;
183        let rows = statement.query_map(params![at_or_before_ms, limit], map_pending_row)?;
184        let items: rusqlite::Result<Vec<InboundPendingItem>> = rows.collect();
185        Ok(items?)
186    })
187}
188
189/// TODO(clawdentity): document `pending_count`.
190pub fn pending_count(store: &SqliteStore) -> Result<i64> {
191    store.with_connection(|connection| {
192        let count =
193            connection.query_row("SELECT COUNT(*) FROM inbound_pending", [], |row| row.get(0))?;
194        Ok(count)
195    })
196}
197
198/// TODO(clawdentity): document `get_pending`.
199pub fn get_pending(store: &SqliteStore, request_id: &str) -> Result<Option<InboundPendingItem>> {
200    let request_id = request_id.trim();
201    if request_id.is_empty() {
202        return Ok(None);
203    }
204    store.with_connection(|connection| {
205        let mut statement = connection.prepare(
206            "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
207                    received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
208                    conversation_id, reply_to
209             FROM inbound_pending
210             WHERE request_id = ?1",
211        )?;
212        let item = statement.query_row([request_id], map_pending_row).optional()?;
213        Ok(item)
214    })
215}
216
217/// TODO(clawdentity): document `mark_pending_attempt`.
218pub fn mark_pending_attempt(
219    store: &SqliteStore,
220    request_id: &str,
221    next_attempt_at_ms: i64,
222    last_error: Option<String>,
223) -> Result<bool> {
224    let request_id = request_id.trim();
225    if request_id.is_empty() {
226        return Ok(false);
227    }
228    let last_error = parse_optional_non_empty(last_error);
229    let last_attempt_at_ms = now_utc_ms();
230    store.with_connection(|connection| {
231        let affected = connection.execute(
232            "UPDATE inbound_pending
233             SET attempt_count = attempt_count + 1,
234                 next_attempt_at_ms = ?2,
235                 last_error = ?3,
236                 last_attempt_at_ms = ?4
237             WHERE request_id = ?1",
238            params![
239                request_id,
240                next_attempt_at_ms,
241                last_error,
242                last_attempt_at_ms
243            ],
244        )?;
245        Ok(affected > 0)
246    })
247}
248
249/// TODO(clawdentity): document `move_pending_to_dead_letter`.
250#[allow(clippy::too_many_lines)]
251pub fn move_pending_to_dead_letter(
252    store: &SqliteStore,
253    request_id: &str,
254    dead_letter_reason: &str,
255) -> Result<bool> {
256    let request_id = request_id.trim();
257    let dead_letter_reason = dead_letter_reason.trim();
258    if request_id.is_empty() {
259        return Ok(false);
260    }
261    if dead_letter_reason.is_empty() {
262        return Err(CoreError::InvalidInput(
263            "dead_letter_reason is required".to_string(),
264        ));
265    }
266
267    store.with_connection(|connection| {
268        let mut select_statement = connection.prepare(
269            "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
270                    received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
271                    conversation_id, reply_to
272             FROM inbound_pending
273             WHERE request_id = ?1",
274        )?;
275        let pending = select_statement
276            .query_row([request_id], map_pending_row)
277            .optional()?;
278        let Some(pending) = pending else {
279            return Ok(false);
280        };
281
282        let dead_lettered_at_ms = now_utc_ms();
283        connection.execute(
284            "INSERT INTO inbound_dead_letter (
285                request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
286                received_at_ms, attempt_count, last_error, last_attempt_at_ms, conversation_id, reply_to,
287                dead_lettered_at_ms, dead_letter_reason
288            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
289            ON CONFLICT(request_id) DO UPDATE SET
290                frame_id = excluded.frame_id,
291                from_agent_did = excluded.from_agent_did,
292                to_agent_did = excluded.to_agent_did,
293                payload_json = excluded.payload_json,
294                payload_bytes = excluded.payload_bytes,
295                received_at_ms = excluded.received_at_ms,
296                attempt_count = excluded.attempt_count,
297                last_error = excluded.last_error,
298                last_attempt_at_ms = excluded.last_attempt_at_ms,
299                conversation_id = excluded.conversation_id,
300                reply_to = excluded.reply_to,
301                dead_lettered_at_ms = excluded.dead_lettered_at_ms,
302                dead_letter_reason = excluded.dead_letter_reason",
303            params![
304                pending.request_id,
305                pending.frame_id,
306                pending.from_agent_did,
307                pending.to_agent_did,
308                pending.payload_json,
309                pending.payload_bytes,
310                pending.received_at_ms,
311                pending.attempt_count,
312                pending.last_error,
313                pending.last_attempt_at_ms,
314                pending.conversation_id,
315                pending.reply_to,
316                dead_lettered_at_ms,
317                dead_letter_reason
318            ],
319        )?;
320
321        connection.execute(
322            "DELETE FROM inbound_pending WHERE request_id = ?1",
323            [request_id],
324        )?;
325
326        append_inbound_event_with_connection(
327            connection,
328            "dead_lettered",
329            Some(request_id.to_string()),
330            Some(
331                serde_json::json!({
332                    "reason": dead_letter_reason,
333                    "deadLetteredAtMs": dead_lettered_at_ms,
334                })
335                .to_string(),
336            ),
337        )?;
338        Ok(true)
339    })
340}
341
342/// TODO(clawdentity): document `list_dead_letter`.
343pub fn list_dead_letter(store: &SqliteStore, limit: usize) -> Result<Vec<InboundDeadLetterItem>> {
344    let limit = i64::try_from(limit).unwrap_or(i64::MAX);
345    store.with_connection(|connection| {
346        let mut statement = connection.prepare(
347            "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
348                    received_at_ms, attempt_count, last_error, last_attempt_at_ms, conversation_id, reply_to,
349                    dead_lettered_at_ms, dead_letter_reason
350             FROM inbound_dead_letter
351             ORDER BY dead_lettered_at_ms DESC, request_id DESC
352             LIMIT ?1",
353        )?;
354        let rows = statement.query_map([limit], map_dead_letter_row)?;
355        let items: rusqlite::Result<Vec<InboundDeadLetterItem>> = rows.collect();
356        Ok(items?)
357    })
358}
359
360/// TODO(clawdentity): document `dead_letter_count`.
361pub fn dead_letter_count(store: &SqliteStore) -> Result<i64> {
362    store.with_connection(|connection| {
363        let count =
364            connection.query_row("SELECT COUNT(*) FROM inbound_dead_letter", [], |row| {
365                row.get(0)
366            })?;
367        Ok(count)
368    })
369}
370
371/// TODO(clawdentity): document `replay_dead_letter`.
372#[allow(clippy::too_many_lines)]
373pub fn replay_dead_letter(
374    store: &SqliteStore,
375    request_id: &str,
376    next_attempt_at_ms: i64,
377) -> Result<bool> {
378    let request_id = request_id.trim();
379    if request_id.is_empty() {
380        return Ok(false);
381    }
382    store.with_connection(|connection| {
383        let mut statement = connection.prepare(
384            "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
385                    received_at_ms, attempt_count, last_error, last_attempt_at_ms, conversation_id, reply_to,
386                    dead_lettered_at_ms, dead_letter_reason
387             FROM inbound_dead_letter
388             WHERE request_id = ?1",
389        )?;
390        let dead_letter = statement
391            .query_row([request_id], map_dead_letter_row)
392            .optional()?;
393        let Some(dead_letter) = dead_letter else {
394            return Ok(false);
395        };
396
397        connection.execute(
398            "INSERT INTO inbound_pending (
399                request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
400                received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
401                conversation_id, reply_to
402            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
403            ON CONFLICT(request_id) DO UPDATE SET
404                frame_id = excluded.frame_id,
405                from_agent_did = excluded.from_agent_did,
406                to_agent_did = excluded.to_agent_did,
407                payload_json = excluded.payload_json,
408                payload_bytes = excluded.payload_bytes,
409                received_at_ms = excluded.received_at_ms,
410                next_attempt_at_ms = excluded.next_attempt_at_ms,
411                attempt_count = excluded.attempt_count,
412                last_error = excluded.last_error,
413                last_attempt_at_ms = excluded.last_attempt_at_ms,
414                conversation_id = excluded.conversation_id,
415                reply_to = excluded.reply_to",
416            params![
417                dead_letter.request_id,
418                dead_letter.frame_id,
419                dead_letter.from_agent_did,
420                dead_letter.to_agent_did,
421                dead_letter.payload_json,
422                dead_letter.payload_bytes,
423                dead_letter.received_at_ms,
424                next_attempt_at_ms,
425                dead_letter.attempt_count,
426                dead_letter.last_error,
427                dead_letter.last_attempt_at_ms,
428                dead_letter.conversation_id,
429                dead_letter.reply_to,
430            ],
431        )?;
432        connection.execute(
433            "DELETE FROM inbound_dead_letter WHERE request_id = ?1",
434            [request_id],
435        )?;
436        append_inbound_event_with_connection(
437            connection,
438            "dead_letter_replayed",
439            Some(request_id.to_string()),
440            None,
441        )?;
442        Ok(true)
443    })
444}
445
446/// TODO(clawdentity): document `purge_dead_letter`.
447pub fn purge_dead_letter(store: &SqliteStore, request_id: Option<&str>) -> Result<usize> {
448    store.with_connection(|connection| {
449        let deleted = if let Some(request_id) = request_id {
450            let request_id = request_id.trim();
451            if request_id.is_empty() {
452                0
453            } else {
454                connection.execute(
455                    "DELETE FROM inbound_dead_letter WHERE request_id = ?1",
456                    [request_id],
457                )?
458            }
459        } else {
460            connection.execute("DELETE FROM inbound_dead_letter", [])?
461        };
462        Ok(deleted)
463    })
464}
465
466/// TODO(clawdentity): document `append_inbound_event`.
467pub fn append_inbound_event(
468    store: &SqliteStore,
469    event_type: &str,
470    request_id: Option<String>,
471    details_json: Option<String>,
472) -> Result<i64> {
473    let event_type = event_type.trim().to_string();
474    if event_type.is_empty() {
475        return Err(CoreError::InvalidInput(
476            "event_type is required".to_string(),
477        ));
478    }
479    store.with_connection(|connection| {
480        append_inbound_event_with_connection(
481            connection,
482            &event_type,
483            request_id,
484            parse_optional_non_empty(details_json),
485        )
486    })
487}
488
489fn append_inbound_event_with_connection(
490    connection: &rusqlite::Connection,
491    event_type: &str,
492    request_id: Option<String>,
493    details_json: Option<String>,
494) -> Result<i64> {
495    let now_ms = now_utc_ms();
496    connection.execute(
497        "INSERT INTO inbound_events (at_ms, event_type, request_id, details_json)
498         VALUES (?1, ?2, ?3, ?4)",
499        params![
500            now_ms,
501            event_type,
502            parse_optional_non_empty(request_id),
503            parse_optional_non_empty(details_json),
504        ],
505    )?;
506    Ok(connection.last_insert_rowid())
507}
508
509/// TODO(clawdentity): document `list_inbound_events`.
510pub fn list_inbound_events(store: &SqliteStore, limit: usize) -> Result<Vec<InboundEvent>> {
511    let limit = i64::try_from(limit).unwrap_or(i64::MAX);
512    store.with_connection(|connection| {
513        let mut statement = connection.prepare(
514            "SELECT id, at_ms, event_type, request_id, details_json
515             FROM inbound_events
516             ORDER BY id DESC
517             LIMIT ?1",
518        )?;
519        let rows = statement.query_map([limit], |row| {
520            Ok(InboundEvent {
521                id: row.get(0)?,
522                at_ms: row.get(1)?,
523                event_type: row.get(2)?,
524                request_id: row.get(3)?,
525                details_json: row.get(4)?,
526            })
527        })?;
528        let items: rusqlite::Result<Vec<InboundEvent>> = rows.collect();
529        Ok(items?)
530    })
531}
532
533#[cfg(test)]
534mod tests {
535    use tempfile::TempDir;
536
537    use crate::db::SqliteStore;
538
539    use super::{
540        InboundPendingItem, append_inbound_event, get_pending, list_dead_letter,
541        list_inbound_events, list_pending_due, mark_pending_attempt, move_pending_to_dead_letter,
542        purge_dead_letter, replay_dead_letter, upsert_pending,
543    };
544
545    fn fixture_pending(request_id: &str) -> InboundPendingItem {
546        InboundPendingItem {
547            request_id: request_id.to_string(),
548            frame_id: "frame-1".to_string(),
549            from_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTD"
550                .to_string(),
551            to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTE"
552                .to_string(),
553            payload_json: "{\"message\":\"hello\"}".to_string(),
554            payload_bytes: 20,
555            received_at_ms: 100,
556            next_attempt_at_ms: 100,
557            attempt_count: 0,
558            last_error: None,
559            last_attempt_at_ms: None,
560            conversation_id: Some("conv-1".to_string()),
561            reply_to: None,
562        }
563    }
564
565    #[test]
566    fn pending_dead_letter_and_replay_round_trip() {
567        let temp = TempDir::new().expect("temp dir");
568        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
569
570        upsert_pending(&store, fixture_pending("req-1")).expect("upsert");
571        let due = list_pending_due(&store, 100, 10).expect("due");
572        assert_eq!(due.len(), 1);
573
574        let marked = mark_pending_attempt(&store, "req-1", 200, Some("retry failed".to_string()))
575            .expect("mark");
576        assert!(marked);
577        let pending = get_pending(&store, "req-1")
578            .expect("get pending")
579            .expect("pending");
580        assert_eq!(pending.attempt_count, 1);
581        assert_eq!(pending.last_error.as_deref(), Some("retry failed"));
582
583        let moved = move_pending_to_dead_letter(&store, "req-1", "max-attempts-exceeded")
584            .expect("move dead letter");
585        assert!(moved);
586        assert!(
587            get_pending(&store, "req-1")
588                .expect("get pending none")
589                .is_none()
590        );
591        assert_eq!(list_dead_letter(&store, 10).expect("dead letters").len(), 1);
592
593        let replayed = replay_dead_letter(&store, "req-1", 300).expect("replay");
594        assert!(replayed);
595        assert_eq!(
596            list_dead_letter(&store, 10)
597                .expect("dead letters after")
598                .len(),
599            0
600        );
601        assert!(
602            get_pending(&store, "req-1")
603                .expect("get pending after")
604                .is_some()
605        );
606
607        let purged_none = purge_dead_letter(&store, None).expect("purge none");
608        assert_eq!(purged_none, 0);
609    }
610
611    #[test]
612    fn append_and_list_events() {
613        let temp = TempDir::new().expect("temp dir");
614        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
615
616        let inserted_id = append_inbound_event(
617            &store,
618            "received",
619            Some("req-123".to_string()),
620            Some("{\"ok\":true}".to_string()),
621        )
622        .expect("append event");
623        assert!(inserted_id > 0);
624
625        let events = list_inbound_events(&store, 10).expect("list events");
626        assert_eq!(events.len(), 1);
627        assert_eq!(events[0].event_type, "received");
628        assert_eq!(events[0].request_id.as_deref(), Some("req-123"));
629    }
630}