1use rusqlite::{OptionalExtension, params};
2use serde::Serialize;
3
4use crate::db::{SqliteStore, now_utc_ms};
5use crate::error::{CoreError, Result};
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
8pub struct InboundPendingItem {
9 pub request_id: String,
10 pub frame_id: String,
11 pub from_agent_did: String,
12 pub to_agent_did: String,
13 pub payload_json: String,
14 pub payload_bytes: i64,
15 pub received_at_ms: i64,
16 pub next_attempt_at_ms: i64,
17 pub attempt_count: i64,
18 pub last_error: Option<String>,
19 pub last_attempt_at_ms: Option<i64>,
20 pub conversation_id: Option<String>,
21 pub reply_to: Option<String>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
25pub struct InboundDeadLetterItem {
26 pub request_id: String,
27 pub frame_id: String,
28 pub from_agent_did: String,
29 pub to_agent_did: String,
30 pub payload_json: String,
31 pub payload_bytes: i64,
32 pub received_at_ms: i64,
33 pub attempt_count: i64,
34 pub last_error: Option<String>,
35 pub last_attempt_at_ms: Option<i64>,
36 pub conversation_id: Option<String>,
37 pub reply_to: Option<String>,
38 pub dead_lettered_at_ms: i64,
39 pub dead_letter_reason: String,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
43pub struct InboundEvent {
44 pub id: i64,
45 pub at_ms: i64,
46 pub event_type: String,
47 pub request_id: Option<String>,
48 pub details_json: Option<String>,
49}
50
51fn parse_optional_non_empty(value: Option<String>) -> Option<String> {
52 value.and_then(|raw| {
53 let trimmed = raw.trim();
54 if trimmed.is_empty() {
55 None
56 } else {
57 Some(trimmed.to_string())
58 }
59 })
60}
61
62fn map_pending_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<InboundPendingItem> {
63 Ok(InboundPendingItem {
64 request_id: row.get(0)?,
65 frame_id: row.get(1)?,
66 from_agent_did: row.get(2)?,
67 to_agent_did: row.get(3)?,
68 payload_json: row.get(4)?,
69 payload_bytes: row.get(5)?,
70 received_at_ms: row.get(6)?,
71 next_attempt_at_ms: row.get(7)?,
72 attempt_count: row.get(8)?,
73 last_error: row.get(9)?,
74 last_attempt_at_ms: row.get(10)?,
75 conversation_id: row.get(11)?,
76 reply_to: row.get(12)?,
77 })
78}
79
80fn map_dead_letter_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<InboundDeadLetterItem> {
81 Ok(InboundDeadLetterItem {
82 request_id: row.get(0)?,
83 frame_id: row.get(1)?,
84 from_agent_did: row.get(2)?,
85 to_agent_did: row.get(3)?,
86 payload_json: row.get(4)?,
87 payload_bytes: row.get(5)?,
88 received_at_ms: row.get(6)?,
89 attempt_count: row.get(7)?,
90 last_error: row.get(8)?,
91 last_attempt_at_ms: row.get(9)?,
92 conversation_id: row.get(10)?,
93 reply_to: row.get(11)?,
94 dead_lettered_at_ms: row.get(12)?,
95 dead_letter_reason: row.get(13)?,
96 })
97}
98
99#[allow(clippy::too_many_lines)]
101pub fn upsert_pending(store: &SqliteStore, item: InboundPendingItem) -> Result<()> {
102 if item.request_id.trim().is_empty() {
103 return Err(CoreError::InvalidInput(
104 "request_id is required".to_string(),
105 ));
106 }
107 if item.frame_id.trim().is_empty() {
108 return Err(CoreError::InvalidInput("frame_id is required".to_string()));
109 }
110 if item.from_agent_did.trim().is_empty() || item.to_agent_did.trim().is_empty() {
111 return Err(CoreError::InvalidInput(
112 "from_agent_did and to_agent_did are required".to_string(),
113 ));
114 }
115 if item.payload_json.trim().is_empty() {
116 return Err(CoreError::InvalidInput(
117 "payload_json is required".to_string(),
118 ));
119 }
120 if item.payload_bytes < 0 || item.attempt_count < 0 {
121 return Err(CoreError::InvalidInput(
122 "payload_bytes and attempt_count must be >= 0".to_string(),
123 ));
124 }
125
126 store.with_connection(|connection| {
127 connection.execute(
128 "INSERT INTO inbound_pending (
129 request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
130 received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
131 conversation_id, reply_to
132 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
133 ON CONFLICT(request_id) DO UPDATE SET
134 frame_id = excluded.frame_id,
135 from_agent_did = excluded.from_agent_did,
136 to_agent_did = excluded.to_agent_did,
137 payload_json = excluded.payload_json,
138 payload_bytes = excluded.payload_bytes,
139 received_at_ms = excluded.received_at_ms,
140 next_attempt_at_ms = excluded.next_attempt_at_ms,
141 attempt_count = excluded.attempt_count,
142 last_error = excluded.last_error,
143 last_attempt_at_ms = excluded.last_attempt_at_ms,
144 conversation_id = excluded.conversation_id,
145 reply_to = excluded.reply_to",
146 params![
147 item.request_id.trim(),
148 item.frame_id.trim(),
149 item.from_agent_did.trim(),
150 item.to_agent_did.trim(),
151 item.payload_json.trim(),
152 item.payload_bytes,
153 item.received_at_ms,
154 item.next_attempt_at_ms,
155 item.attempt_count,
156 parse_optional_non_empty(item.last_error),
157 item.last_attempt_at_ms,
158 parse_optional_non_empty(item.conversation_id),
159 parse_optional_non_empty(item.reply_to),
160 ],
161 )?;
162 Ok(())
163 })
164}
165
166pub fn list_pending_due(
168 store: &SqliteStore,
169 at_or_before_ms: i64,
170 limit: usize,
171) -> Result<Vec<InboundPendingItem>> {
172 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
173 store.with_connection(|connection| {
174 let mut statement = connection.prepare(
175 "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
176 received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
177 conversation_id, reply_to
178 FROM inbound_pending
179 WHERE next_attempt_at_ms <= ?1
180 ORDER BY next_attempt_at_ms ASC, received_at_ms ASC
181 LIMIT ?2",
182 )?;
183 let rows = statement.query_map(params![at_or_before_ms, limit], map_pending_row)?;
184 let items: rusqlite::Result<Vec<InboundPendingItem>> = rows.collect();
185 Ok(items?)
186 })
187}
188
189pub fn pending_count(store: &SqliteStore) -> Result<i64> {
191 store.with_connection(|connection| {
192 let count =
193 connection.query_row("SELECT COUNT(*) FROM inbound_pending", [], |row| row.get(0))?;
194 Ok(count)
195 })
196}
197
198pub fn get_pending(store: &SqliteStore, request_id: &str) -> Result<Option<InboundPendingItem>> {
200 let request_id = request_id.trim();
201 if request_id.is_empty() {
202 return Ok(None);
203 }
204 store.with_connection(|connection| {
205 let mut statement = connection.prepare(
206 "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
207 received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
208 conversation_id, reply_to
209 FROM inbound_pending
210 WHERE request_id = ?1",
211 )?;
212 let item = statement.query_row([request_id], map_pending_row).optional()?;
213 Ok(item)
214 })
215}
216
217pub fn mark_pending_attempt(
219 store: &SqliteStore,
220 request_id: &str,
221 next_attempt_at_ms: i64,
222 last_error: Option<String>,
223) -> Result<bool> {
224 let request_id = request_id.trim();
225 if request_id.is_empty() {
226 return Ok(false);
227 }
228 let last_error = parse_optional_non_empty(last_error);
229 let last_attempt_at_ms = now_utc_ms();
230 store.with_connection(|connection| {
231 let affected = connection.execute(
232 "UPDATE inbound_pending
233 SET attempt_count = attempt_count + 1,
234 next_attempt_at_ms = ?2,
235 last_error = ?3,
236 last_attempt_at_ms = ?4
237 WHERE request_id = ?1",
238 params![
239 request_id,
240 next_attempt_at_ms,
241 last_error,
242 last_attempt_at_ms
243 ],
244 )?;
245 Ok(affected > 0)
246 })
247}
248
249#[allow(clippy::too_many_lines)]
251pub fn move_pending_to_dead_letter(
252 store: &SqliteStore,
253 request_id: &str,
254 dead_letter_reason: &str,
255) -> Result<bool> {
256 let request_id = request_id.trim();
257 let dead_letter_reason = dead_letter_reason.trim();
258 if request_id.is_empty() {
259 return Ok(false);
260 }
261 if dead_letter_reason.is_empty() {
262 return Err(CoreError::InvalidInput(
263 "dead_letter_reason is required".to_string(),
264 ));
265 }
266
267 store.with_connection(|connection| {
268 let mut select_statement = connection.prepare(
269 "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
270 received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
271 conversation_id, reply_to
272 FROM inbound_pending
273 WHERE request_id = ?1",
274 )?;
275 let pending = select_statement
276 .query_row([request_id], map_pending_row)
277 .optional()?;
278 let Some(pending) = pending else {
279 return Ok(false);
280 };
281
282 let dead_lettered_at_ms = now_utc_ms();
283 connection.execute(
284 "INSERT INTO inbound_dead_letter (
285 request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
286 received_at_ms, attempt_count, last_error, last_attempt_at_ms, conversation_id, reply_to,
287 dead_lettered_at_ms, dead_letter_reason
288 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
289 ON CONFLICT(request_id) DO UPDATE SET
290 frame_id = excluded.frame_id,
291 from_agent_did = excluded.from_agent_did,
292 to_agent_did = excluded.to_agent_did,
293 payload_json = excluded.payload_json,
294 payload_bytes = excluded.payload_bytes,
295 received_at_ms = excluded.received_at_ms,
296 attempt_count = excluded.attempt_count,
297 last_error = excluded.last_error,
298 last_attempt_at_ms = excluded.last_attempt_at_ms,
299 conversation_id = excluded.conversation_id,
300 reply_to = excluded.reply_to,
301 dead_lettered_at_ms = excluded.dead_lettered_at_ms,
302 dead_letter_reason = excluded.dead_letter_reason",
303 params![
304 pending.request_id,
305 pending.frame_id,
306 pending.from_agent_did,
307 pending.to_agent_did,
308 pending.payload_json,
309 pending.payload_bytes,
310 pending.received_at_ms,
311 pending.attempt_count,
312 pending.last_error,
313 pending.last_attempt_at_ms,
314 pending.conversation_id,
315 pending.reply_to,
316 dead_lettered_at_ms,
317 dead_letter_reason
318 ],
319 )?;
320
321 connection.execute(
322 "DELETE FROM inbound_pending WHERE request_id = ?1",
323 [request_id],
324 )?;
325
326 append_inbound_event_with_connection(
327 connection,
328 "dead_lettered",
329 Some(request_id.to_string()),
330 Some(
331 serde_json::json!({
332 "reason": dead_letter_reason,
333 "deadLetteredAtMs": dead_lettered_at_ms,
334 })
335 .to_string(),
336 ),
337 )?;
338 Ok(true)
339 })
340}
341
342pub fn list_dead_letter(store: &SqliteStore, limit: usize) -> Result<Vec<InboundDeadLetterItem>> {
344 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
345 store.with_connection(|connection| {
346 let mut statement = connection.prepare(
347 "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
348 received_at_ms, attempt_count, last_error, last_attempt_at_ms, conversation_id, reply_to,
349 dead_lettered_at_ms, dead_letter_reason
350 FROM inbound_dead_letter
351 ORDER BY dead_lettered_at_ms DESC, request_id DESC
352 LIMIT ?1",
353 )?;
354 let rows = statement.query_map([limit], map_dead_letter_row)?;
355 let items: rusqlite::Result<Vec<InboundDeadLetterItem>> = rows.collect();
356 Ok(items?)
357 })
358}
359
360pub fn dead_letter_count(store: &SqliteStore) -> Result<i64> {
362 store.with_connection(|connection| {
363 let count =
364 connection.query_row("SELECT COUNT(*) FROM inbound_dead_letter", [], |row| {
365 row.get(0)
366 })?;
367 Ok(count)
368 })
369}
370
371#[allow(clippy::too_many_lines)]
373pub fn replay_dead_letter(
374 store: &SqliteStore,
375 request_id: &str,
376 next_attempt_at_ms: i64,
377) -> Result<bool> {
378 let request_id = request_id.trim();
379 if request_id.is_empty() {
380 return Ok(false);
381 }
382 store.with_connection(|connection| {
383 let mut statement = connection.prepare(
384 "SELECT request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
385 received_at_ms, attempt_count, last_error, last_attempt_at_ms, conversation_id, reply_to,
386 dead_lettered_at_ms, dead_letter_reason
387 FROM inbound_dead_letter
388 WHERE request_id = ?1",
389 )?;
390 let dead_letter = statement
391 .query_row([request_id], map_dead_letter_row)
392 .optional()?;
393 let Some(dead_letter) = dead_letter else {
394 return Ok(false);
395 };
396
397 connection.execute(
398 "INSERT INTO inbound_pending (
399 request_id, frame_id, from_agent_did, to_agent_did, payload_json, payload_bytes,
400 received_at_ms, next_attempt_at_ms, attempt_count, last_error, last_attempt_at_ms,
401 conversation_id, reply_to
402 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
403 ON CONFLICT(request_id) DO UPDATE SET
404 frame_id = excluded.frame_id,
405 from_agent_did = excluded.from_agent_did,
406 to_agent_did = excluded.to_agent_did,
407 payload_json = excluded.payload_json,
408 payload_bytes = excluded.payload_bytes,
409 received_at_ms = excluded.received_at_ms,
410 next_attempt_at_ms = excluded.next_attempt_at_ms,
411 attempt_count = excluded.attempt_count,
412 last_error = excluded.last_error,
413 last_attempt_at_ms = excluded.last_attempt_at_ms,
414 conversation_id = excluded.conversation_id,
415 reply_to = excluded.reply_to",
416 params![
417 dead_letter.request_id,
418 dead_letter.frame_id,
419 dead_letter.from_agent_did,
420 dead_letter.to_agent_did,
421 dead_letter.payload_json,
422 dead_letter.payload_bytes,
423 dead_letter.received_at_ms,
424 next_attempt_at_ms,
425 dead_letter.attempt_count,
426 dead_letter.last_error,
427 dead_letter.last_attempt_at_ms,
428 dead_letter.conversation_id,
429 dead_letter.reply_to,
430 ],
431 )?;
432 connection.execute(
433 "DELETE FROM inbound_dead_letter WHERE request_id = ?1",
434 [request_id],
435 )?;
436 append_inbound_event_with_connection(
437 connection,
438 "dead_letter_replayed",
439 Some(request_id.to_string()),
440 None,
441 )?;
442 Ok(true)
443 })
444}
445
446pub fn purge_dead_letter(store: &SqliteStore, request_id: Option<&str>) -> Result<usize> {
448 store.with_connection(|connection| {
449 let deleted = if let Some(request_id) = request_id {
450 let request_id = request_id.trim();
451 if request_id.is_empty() {
452 0
453 } else {
454 connection.execute(
455 "DELETE FROM inbound_dead_letter WHERE request_id = ?1",
456 [request_id],
457 )?
458 }
459 } else {
460 connection.execute("DELETE FROM inbound_dead_letter", [])?
461 };
462 Ok(deleted)
463 })
464}
465
466pub fn append_inbound_event(
468 store: &SqliteStore,
469 event_type: &str,
470 request_id: Option<String>,
471 details_json: Option<String>,
472) -> Result<i64> {
473 let event_type = event_type.trim().to_string();
474 if event_type.is_empty() {
475 return Err(CoreError::InvalidInput(
476 "event_type is required".to_string(),
477 ));
478 }
479 store.with_connection(|connection| {
480 append_inbound_event_with_connection(
481 connection,
482 &event_type,
483 request_id,
484 parse_optional_non_empty(details_json),
485 )
486 })
487}
488
489fn append_inbound_event_with_connection(
490 connection: &rusqlite::Connection,
491 event_type: &str,
492 request_id: Option<String>,
493 details_json: Option<String>,
494) -> Result<i64> {
495 let now_ms = now_utc_ms();
496 connection.execute(
497 "INSERT INTO inbound_events (at_ms, event_type, request_id, details_json)
498 VALUES (?1, ?2, ?3, ?4)",
499 params![
500 now_ms,
501 event_type,
502 parse_optional_non_empty(request_id),
503 parse_optional_non_empty(details_json),
504 ],
505 )?;
506 Ok(connection.last_insert_rowid())
507}
508
509pub fn list_inbound_events(store: &SqliteStore, limit: usize) -> Result<Vec<InboundEvent>> {
511 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
512 store.with_connection(|connection| {
513 let mut statement = connection.prepare(
514 "SELECT id, at_ms, event_type, request_id, details_json
515 FROM inbound_events
516 ORDER BY id DESC
517 LIMIT ?1",
518 )?;
519 let rows = statement.query_map([limit], |row| {
520 Ok(InboundEvent {
521 id: row.get(0)?,
522 at_ms: row.get(1)?,
523 event_type: row.get(2)?,
524 request_id: row.get(3)?,
525 details_json: row.get(4)?,
526 })
527 })?;
528 let items: rusqlite::Result<Vec<InboundEvent>> = rows.collect();
529 Ok(items?)
530 })
531}
532
533#[cfg(test)]
534mod tests {
535 use tempfile::TempDir;
536
537 use crate::db::SqliteStore;
538
539 use super::{
540 InboundPendingItem, append_inbound_event, get_pending, list_dead_letter,
541 list_inbound_events, list_pending_due, mark_pending_attempt, move_pending_to_dead_letter,
542 purge_dead_letter, replay_dead_letter, upsert_pending,
543 };
544
545 fn fixture_pending(request_id: &str) -> InboundPendingItem {
546 InboundPendingItem {
547 request_id: request_id.to_string(),
548 frame_id: "frame-1".to_string(),
549 from_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTD"
550 .to_string(),
551 to_agent_did: "did:cdi:registry.clawdentity.com:agent:01HF7YAT00W6W7CM7N3W5FDXTE"
552 .to_string(),
553 payload_json: "{\"message\":\"hello\"}".to_string(),
554 payload_bytes: 20,
555 received_at_ms: 100,
556 next_attempt_at_ms: 100,
557 attempt_count: 0,
558 last_error: None,
559 last_attempt_at_ms: None,
560 conversation_id: Some("conv-1".to_string()),
561 reply_to: None,
562 }
563 }
564
565 #[test]
566 fn pending_dead_letter_and_replay_round_trip() {
567 let temp = TempDir::new().expect("temp dir");
568 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
569
570 upsert_pending(&store, fixture_pending("req-1")).expect("upsert");
571 let due = list_pending_due(&store, 100, 10).expect("due");
572 assert_eq!(due.len(), 1);
573
574 let marked = mark_pending_attempt(&store, "req-1", 200, Some("retry failed".to_string()))
575 .expect("mark");
576 assert!(marked);
577 let pending = get_pending(&store, "req-1")
578 .expect("get pending")
579 .expect("pending");
580 assert_eq!(pending.attempt_count, 1);
581 assert_eq!(pending.last_error.as_deref(), Some("retry failed"));
582
583 let moved = move_pending_to_dead_letter(&store, "req-1", "max-attempts-exceeded")
584 .expect("move dead letter");
585 assert!(moved);
586 assert!(
587 get_pending(&store, "req-1")
588 .expect("get pending none")
589 .is_none()
590 );
591 assert_eq!(list_dead_letter(&store, 10).expect("dead letters").len(), 1);
592
593 let replayed = replay_dead_letter(&store, "req-1", 300).expect("replay");
594 assert!(replayed);
595 assert_eq!(
596 list_dead_letter(&store, 10)
597 .expect("dead letters after")
598 .len(),
599 0
600 );
601 assert!(
602 get_pending(&store, "req-1")
603 .expect("get pending after")
604 .is_some()
605 );
606
607 let purged_none = purge_dead_letter(&store, None).expect("purge none");
608 assert_eq!(purged_none, 0);
609 }
610
611 #[test]
612 fn append_and_list_events() {
613 let temp = TempDir::new().expect("temp dir");
614 let store = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
615
616 let inserted_id = append_inbound_event(
617 &store,
618 "received",
619 Some("req-123".to_string()),
620 Some("{\"ok\":true}".to_string()),
621 )
622 .expect("append event");
623 assert!(inserted_id > 0);
624
625 let events = list_inbound_events(&store, 10).expect("list events");
626 assert_eq!(events.len(), 1);
627 assert_eq!(events[0].event_type, "received");
628 assert_eq!(events[0].request_id.as_deref(), Some("req-123"));
629 }
630}