1use rusqlite::params;
2
3use crate::db::{SqliteStore, now_utc_ms};
4use crate::error::{CoreError, Result};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub struct OutboundQueueItem {
8 pub frame_id: String,
9 pub frame_version: i64,
10 pub frame_type: String,
11 pub to_agent_did: String,
12 pub payload_json: String,
13 pub conversation_id: Option<String>,
14 pub reply_to: Option<String>,
15 pub created_at_ms: i64,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct OutboundDeadLetterItem {
20 pub frame_id: String,
21 pub frame_version: i64,
22 pub frame_type: String,
23 pub to_agent_did: String,
24 pub payload_json: String,
25 pub conversation_id: Option<String>,
26 pub reply_to: Option<String>,
27 pub created_at_ms: i64,
28 pub dead_lettered_at_ms: i64,
29 pub dead_letter_reason: String,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct EnqueueOutboundInput {
34 pub frame_id: String,
35 pub frame_version: i64,
36 pub frame_type: String,
37 pub to_agent_did: String,
38 pub payload_json: String,
39 pub conversation_id: Option<String>,
40 pub reply_to: Option<String>,
41}
42
43fn parse_optional_non_empty(value: Option<String>) -> Option<String> {
44 value.and_then(|raw| {
45 let trimmed = raw.trim();
46 if trimmed.is_empty() {
47 None
48 } else {
49 Some(trimmed.to_string())
50 }
51 })
52}
53
54fn map_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<OutboundQueueItem> {
55 Ok(OutboundQueueItem {
56 frame_id: row.get(0)?,
57 frame_version: row.get(1)?,
58 frame_type: row.get(2)?,
59 to_agent_did: row.get(3)?,
60 payload_json: row.get(4)?,
61 conversation_id: row.get(5)?,
62 reply_to: row.get(6)?,
63 created_at_ms: row.get(7)?,
64 })
65}
66
67fn map_dead_letter_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<OutboundDeadLetterItem> {
68 Ok(OutboundDeadLetterItem {
69 frame_id: row.get(0)?,
70 frame_version: row.get(1)?,
71 frame_type: row.get(2)?,
72 to_agent_did: row.get(3)?,
73 payload_json: row.get(4)?,
74 conversation_id: row.get(5)?,
75 reply_to: row.get(6)?,
76 created_at_ms: row.get(7)?,
77 dead_lettered_at_ms: row.get(8)?,
78 dead_letter_reason: row.get(9)?,
79 })
80}
81
82pub fn enqueue_outbound(store: &SqliteStore, input: EnqueueOutboundInput) -> Result<()> {
84 let frame_id = input.frame_id.trim().to_string();
85 let frame_type = input.frame_type.trim().to_string();
86 let to_agent_did = input.to_agent_did.trim().to_string();
87 let payload_json = input.payload_json.trim().to_string();
88
89 if frame_id.is_empty() {
90 return Err(CoreError::InvalidInput("frame_id is required".to_string()));
91 }
92 if frame_type.is_empty() {
93 return Err(CoreError::InvalidInput(
94 "frame_type is required".to_string(),
95 ));
96 }
97 if to_agent_did.is_empty() {
98 return Err(CoreError::InvalidInput(
99 "to_agent_did is required".to_string(),
100 ));
101 }
102 if payload_json.is_empty() {
103 return Err(CoreError::InvalidInput(
104 "payload_json is required".to_string(),
105 ));
106 }
107
108 let conversation_id = parse_optional_non_empty(input.conversation_id);
109 let reply_to = parse_optional_non_empty(input.reply_to);
110 let created_at_ms = now_utc_ms();
111 store.with_connection(|connection| {
112 connection.execute(
113 "INSERT INTO outbound_queue (
114 frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to, created_at_ms
115 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
116 params![
117 frame_id,
118 input.frame_version,
119 frame_type,
120 to_agent_did,
121 payload_json,
122 conversation_id,
123 reply_to,
124 created_at_ms
125 ],
126 )?;
127 Ok(())
128 })
129}
130
131pub fn list_outbound(store: &SqliteStore, limit: usize) -> Result<Vec<OutboundQueueItem>> {
133 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
134 store.with_connection(|connection| {
135 let mut statement = connection.prepare(
136 "SELECT frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to, created_at_ms
137 FROM outbound_queue
138 ORDER BY created_at_ms ASC, frame_id ASC
139 LIMIT ?1",
140 )?;
141 let rows = statement.query_map([limit], map_row)?;
142 let items: rusqlite::Result<Vec<OutboundQueueItem>> = rows.collect();
143 Ok(items?)
144 })
145}
146
147pub fn take_oldest_outbound(store: &SqliteStore) -> Result<Option<OutboundQueueItem>> {
149 store.with_connection(|connection| {
150 let mut statement = connection.prepare(
151 "SELECT frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to, created_at_ms
152 FROM outbound_queue
153 ORDER BY created_at_ms ASC, frame_id ASC
154 LIMIT 1",
155 )?;
156 let item = statement.query_row([], map_row).ok();
157 let Some(item) = item else {
158 return Ok(None);
159 };
160 connection.execute("DELETE FROM outbound_queue WHERE frame_id = ?1", [&item.frame_id])?;
161 Ok(Some(item))
162 })
163}
164
165pub fn delete_outbound(store: &SqliteStore, frame_id: &str) -> Result<bool> {
167 let frame_id = frame_id.trim();
168 if frame_id.is_empty() {
169 return Ok(false);
170 }
171 store.with_connection(|connection| {
172 let affected =
173 connection.execute("DELETE FROM outbound_queue WHERE frame_id = ?1", [frame_id])?;
174 Ok(affected > 0)
175 })
176}
177
178pub fn move_outbound_to_dead_letter(
180 store: &SqliteStore,
181 item: &OutboundQueueItem,
182 dead_letter_reason: &str,
183) -> Result<()> {
184 let reason = dead_letter_reason.trim();
185 if reason.is_empty() {
186 return Err(CoreError::InvalidInput(
187 "dead_letter_reason is required".to_string(),
188 ));
189 }
190
191 let dead_lettered_at_ms = now_utc_ms();
192 store.with_connection(|connection| {
193 connection.execute(
194 "INSERT INTO outbound_dead_letter (
195 frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to,
196 created_at_ms, dead_lettered_at_ms, dead_letter_reason
197 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
198 ON CONFLICT(frame_id) DO UPDATE SET
199 frame_version = excluded.frame_version,
200 frame_type = excluded.frame_type,
201 to_agent_did = excluded.to_agent_did,
202 payload_json = excluded.payload_json,
203 conversation_id = excluded.conversation_id,
204 reply_to = excluded.reply_to,
205 created_at_ms = excluded.created_at_ms,
206 dead_lettered_at_ms = excluded.dead_lettered_at_ms,
207 dead_letter_reason = excluded.dead_letter_reason",
208 params![
209 &item.frame_id,
210 item.frame_version,
211 &item.frame_type,
212 &item.to_agent_did,
213 &item.payload_json,
214 &item.conversation_id,
215 &item.reply_to,
216 item.created_at_ms,
217 dead_lettered_at_ms,
218 reason
219 ],
220 )?;
221 Ok(())
222 })
223}
224
225pub fn outbound_count(store: &SqliteStore) -> Result<i64> {
227 store.with_connection(|connection| {
228 let count =
229 connection.query_row("SELECT COUNT(*) FROM outbound_queue", [], |row| row.get(0))?;
230 Ok(count)
231 })
232}
233
234pub fn list_outbound_dead_letter(
236 store: &SqliteStore,
237 limit: usize,
238) -> Result<Vec<OutboundDeadLetterItem>> {
239 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
240 store.with_connection(|connection| {
241 let mut statement = connection.prepare(
242 "SELECT frame_id, frame_version, frame_type, to_agent_did, payload_json, conversation_id, reply_to,
243 created_at_ms, dead_lettered_at_ms, dead_letter_reason
244 FROM outbound_dead_letter
245 ORDER BY dead_lettered_at_ms DESC, frame_id DESC
246 LIMIT ?1",
247 )?;
248 let rows = statement.query_map([limit], map_dead_letter_row)?;
249 let items: rusqlite::Result<Vec<OutboundDeadLetterItem>> = rows.collect();
250 Ok(items?)
251 })
252}
253
254pub fn outbound_dead_letter_count(store: &SqliteStore) -> Result<i64> {
256 store.with_connection(|connection| {
257 let count =
258 connection.query_row("SELECT COUNT(*) FROM outbound_dead_letter", [], |row| {
259 row.get(0)
260 })?;
261 Ok(count)
262 })
263}
264
265#[cfg(test)]
266mod tests {
267 use tempfile::TempDir;
268
269 use crate::db::SqliteStore;
270
271 use super::{
272 EnqueueOutboundInput, delete_outbound, enqueue_outbound, list_outbound,
273 list_outbound_dead_letter, move_outbound_to_dead_letter, outbound_count,
274 outbound_dead_letter_count, take_oldest_outbound,
275 };
276
277 #[test]
278 fn enqueue_take_and_delete_outbound_items() {
279 let temp = TempDir::new().expect("temp dir");
280 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
281
282 enqueue_outbound(
283 &store,
284 EnqueueOutboundInput {
285 frame_id: "frame-1".to_string(),
286 frame_version: 1,
287 frame_type: "relay.frame".to_string(),
288 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
289 .to_string(),
290 payload_json: "{\"hello\":\"world\"}".to_string(),
291 conversation_id: Some("conv-1".to_string()),
292 reply_to: None,
293 },
294 )
295 .expect("enqueue 1");
296 enqueue_outbound(
297 &store,
298 EnqueueOutboundInput {
299 frame_id: "frame-2".to_string(),
300 frame_version: 1,
301 frame_type: "relay.frame".to_string(),
302 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT5"
303 .to_string(),
304 payload_json: "{\"hi\":\"there\"}".to_string(),
305 conversation_id: None,
306 reply_to: None,
307 },
308 )
309 .expect("enqueue 2");
310
311 assert_eq!(outbound_count(&store).expect("count"), 2);
312 assert_eq!(list_outbound(&store, 10).expect("list").len(), 2);
313
314 let oldest = take_oldest_outbound(&store).expect("take").expect("oldest");
315 assert_eq!(oldest.frame_id, "frame-1");
316 assert_eq!(outbound_count(&store).expect("count after take"), 1);
317
318 let deleted = delete_outbound(&store, "frame-2").expect("delete");
319 assert!(deleted);
320 assert_eq!(outbound_count(&store).expect("count after delete"), 0);
321 }
322
323 #[test]
324 fn moves_outbound_item_to_dead_letter() {
325 let temp = TempDir::new().expect("temp dir");
326 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
327
328 enqueue_outbound(
329 &store,
330 EnqueueOutboundInput {
331 frame_id: "frame-1".to_string(),
332 frame_version: 1,
333 frame_type: "relay.frame".to_string(),
334 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXT4"
335 .to_string(),
336 payload_json: "{\"broken\":\"json\"}".to_string(),
337 conversation_id: None,
338 reply_to: None,
339 },
340 )
341 .expect("enqueue");
342 let item = take_oldest_outbound(&store).expect("take").expect("item");
343 move_outbound_to_dead_letter(&store, &item, "malformed outbound payload").expect("move");
344
345 assert_eq!(outbound_count(&store).expect("queue count"), 0);
346 assert_eq!(
347 outbound_dead_letter_count(&store).expect("dead letter count"),
348 1
349 );
350 let dead_letters = list_outbound_dead_letter(&store, 10).expect("dead letters");
351 assert_eq!(dead_letters.len(), 1);
352 assert_eq!(dead_letters[0].frame_id, "frame-1");
353 assert_eq!(
354 dead_letters[0].dead_letter_reason,
355 "malformed outbound payload"
356 );
357 }
358}