1use tokio::sync::mpsc;
2use tpcp_core::TPCPEnvelope;
3
4pub struct DLQ {
6 tx: mpsc::Sender<TPCPEnvelope>,
7 rx: tokio::sync::Mutex<mpsc::Receiver<TPCPEnvelope>>,
8}
9
10impl DLQ {
11 pub fn new() -> Self {
13 let (tx, rx) = mpsc::channel(100);
14 Self { tx, rx: tokio::sync::Mutex::new(rx) }
15 }
16
17 pub fn enqueue(&self, env: TPCPEnvelope) -> bool {
19 self.tx.try_send(env).is_ok()
20 }
21
22 pub async fn drain(&self) -> Vec<TPCPEnvelope> {
24 let mut rx = self.rx.lock().await;
25 let mut out = Vec::new();
26 while let Ok(env) = rx.try_recv() {
27 out.push(env);
28 }
29 out
30 }
31}
32
33impl Default for DLQ {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39#[cfg(test)]
40mod tests {
41 use super::*;
42 use tpcp_core::{Intent, MessageHeader, PROTOCOL_VERSION, TPCPEnvelope};
43
44 fn make_envelope(id: &str) -> TPCPEnvelope {
45 TPCPEnvelope {
46 header: MessageHeader {
47 message_id: id.to_string(),
48 timestamp: "2026-01-01T00:00:00Z".to_string(),
49 sender_id: "sender".to_string(),
50 receiver_id: "receiver".to_string(),
51 intent: Intent::TaskRequest,
52 ttl: 30,
53 protocol_version: PROTOCOL_VERSION.to_string(),
54 },
55 payload: serde_json::json!({"payload_type": "text", "content": "test"}),
56 signature: None,
57 ack_info: None,
58 chunk_info: None,
59 }
60 }
61
62 #[tokio::test]
63 async fn test_enqueue_drain() {
64 let dlq = DLQ::new();
65
66 let env1 = make_envelope("msg-1");
67 let env2 = make_envelope("msg-2");
68
69 assert!(dlq.enqueue(env1), "first enqueue must succeed");
70 assert!(dlq.enqueue(env2), "second enqueue must succeed");
71
72 let drained = dlq.drain().await;
73 assert_eq!(drained.len(), 2, "drain must return all enqueued envelopes");
74 assert_eq!(drained[0].header.message_id, "msg-1");
75 assert_eq!(drained[1].header.message_id, "msg-2");
76
77 let drained_again = dlq.drain().await;
79 assert!(drained_again.is_empty(), "second drain must return nothing");
80 }
81
82 #[tokio::test]
83 async fn test_drain_empty() {
84 let dlq = DLQ::new();
85 let drained = dlq.drain().await;
86 assert!(drained.is_empty(), "draining an empty DLQ must return an empty vec");
87 }
88}