clawdentity_core/runtime/
replay.rs1use crate::db::SqliteStore;
2use crate::db::now_utc_ms;
3use crate::db_inbound::{list_dead_letter, purge_dead_letter, replay_dead_letter};
4use crate::error::Result;
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct ReplayResult {
8 pub replayed_count: usize,
9}
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct PurgeResult {
13 pub purged_count: usize,
14}
15
16pub fn replay_dead_letter_messages(
18 store: &SqliteStore,
19 request_ids: Option<Vec<String>>,
20) -> Result<ReplayResult> {
21 let mut replayed_count = 0_usize;
22 let next_attempt_at_ms = now_utc_ms();
23
24 match request_ids {
25 Some(ids) if !ids.is_empty() => {
26 for request_id in ids {
27 if replay_dead_letter(store, &request_id, next_attempt_at_ms)? {
28 replayed_count += 1;
29 }
30 }
31 }
32 _ => {
33 for item in list_dead_letter(store, usize::MAX)? {
34 if replay_dead_letter(store, &item.request_id, next_attempt_at_ms)? {
35 replayed_count += 1;
36 }
37 }
38 }
39 }
40
41 Ok(ReplayResult { replayed_count })
42}
43
44pub fn purge_dead_letter_messages(
46 store: &SqliteStore,
47 request_ids: Option<Vec<String>>,
48) -> Result<PurgeResult> {
49 let purged_count = match request_ids {
50 Some(ids) if !ids.is_empty() => {
51 let mut total = 0_usize;
52 for request_id in ids {
53 total += purge_dead_letter(store, Some(&request_id))?;
54 }
55 total
56 }
57 _ => purge_dead_letter(store, None)?,
58 };
59
60 Ok(PurgeResult { purged_count })
61}
62
63#[cfg(test)]
64mod tests {
65 use tempfile::TempDir;
66
67 use crate::db::SqliteStore;
68 use crate::db_inbound::{InboundPendingItem, move_pending_to_dead_letter, upsert_pending};
69
70 use super::{purge_dead_letter_messages, replay_dead_letter_messages};
71
72 fn fixture_pending(request_id: &str) -> InboundPendingItem {
73 InboundPendingItem {
74 request_id: request_id.to_string(),
75 frame_id: "frame-1".to_string(),
76 from_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTD"
77 .to_string(),
78 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTE"
79 .to_string(),
80 payload_json: "{}".to_string(),
81 payload_bytes: 2,
82 received_at_ms: 1,
83 next_attempt_at_ms: 1,
84 attempt_count: 0,
85 last_error: None,
86 last_attempt_at_ms: None,
87 conversation_id: None,
88 reply_to: None,
89 }
90 }
91
92 #[test]
93 fn replay_and_purge_helpers_operate_on_dead_letter_items() {
94 let temp = TempDir::new().expect("temp dir");
95 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
96
97 upsert_pending(&store, fixture_pending("req-1")).expect("upsert");
98 move_pending_to_dead_letter(&store, "req-1", "test").expect("dead letter");
99
100 let replayed = replay_dead_letter_messages(&store, None).expect("replay");
101 assert_eq!(replayed.replayed_count, 1);
102
103 move_pending_to_dead_letter(&store, "req-1", "test-2").expect("dead letter again");
104 let purged = purge_dead_letter_messages(&store, None).expect("purge");
105 assert_eq!(purged.purged_count, 1);
106 }
107}