Skip to main content

clawdentity_core/runtime/
replay.rs

1use crate::db::SqliteStore;
2use crate::db::now_utc_ms;
3use crate::db_inbound::{list_dead_letter, purge_dead_letter, replay_dead_letter};
4use crate::error::Result;
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct ReplayResult {
8    pub replayed_count: usize,
9}
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct PurgeResult {
13    pub purged_count: usize,
14}
15
16/// TODO(clawdentity): document `replay_dead_letter_messages`.
17pub fn replay_dead_letter_messages(
18    store: &SqliteStore,
19    request_ids: Option<Vec<String>>,
20) -> Result<ReplayResult> {
21    let mut replayed_count = 0_usize;
22    let next_attempt_at_ms = now_utc_ms();
23
24    match request_ids {
25        Some(ids) if !ids.is_empty() => {
26            for request_id in ids {
27                if replay_dead_letter(store, &request_id, next_attempt_at_ms)? {
28                    replayed_count += 1;
29                }
30            }
31        }
32        _ => {
33            for item in list_dead_letter(store, usize::MAX)? {
34                if replay_dead_letter(store, &item.request_id, next_attempt_at_ms)? {
35                    replayed_count += 1;
36                }
37            }
38        }
39    }
40
41    Ok(ReplayResult { replayed_count })
42}
43
44/// TODO(clawdentity): document `purge_dead_letter_messages`.
45pub fn purge_dead_letter_messages(
46    store: &SqliteStore,
47    request_ids: Option<Vec<String>>,
48) -> Result<PurgeResult> {
49    let purged_count = match request_ids {
50        Some(ids) if !ids.is_empty() => {
51            let mut total = 0_usize;
52            for request_id in ids {
53                total += purge_dead_letter(store, Some(&request_id))?;
54            }
55            total
56        }
57        _ => purge_dead_letter(store, None)?,
58    };
59
60    Ok(PurgeResult { purged_count })
61}
62
63#[cfg(test)]
64mod tests {
65    use tempfile::TempDir;
66
67    use crate::db::SqliteStore;
68    use crate::db_inbound::{InboundPendingItem, move_pending_to_dead_letter, upsert_pending};
69
70    use super::{purge_dead_letter_messages, replay_dead_letter_messages};
71
72    fn fixture_pending(request_id: &str) -> InboundPendingItem {
73        InboundPendingItem {
74            request_id: request_id.to_string(),
75            frame_id: "frame-1".to_string(),
76            from_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTD"
77                .to_string(),
78            to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTE"
79                .to_string(),
80            payload_json: "{}".to_string(),
81            payload_bytes: 2,
82            received_at_ms: 1,
83            next_attempt_at_ms: 1,
84            attempt_count: 0,
85            last_error: None,
86            last_attempt_at_ms: None,
87            conversation_id: None,
88            reply_to: None,
89        }
90    }
91
92    #[test]
93    fn replay_and_purge_helpers_operate_on_dead_letter_items() {
94        let temp = TempDir::new().expect("temp dir");
95        let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
96
97        upsert_pending(&store, fixture_pending("req-1")).expect("upsert");
98        move_pending_to_dead_letter(&store, "req-1", "test").expect("dead letter");
99
100        let replayed = replay_dead_letter_messages(&store, None).expect("replay");
101        assert_eq!(replayed.replayed_count, 1);
102
103        move_pending_to_dead_letter(&store, "req-1", "test-2").expect("dead letter again");
104        let purged = purge_dead_letter_messages(&store, None).expect("purge");
105        assert_eq!(purged.purged_count, 1);
106    }
107}