Skip to main content

tpcp_std/
dlq.rs

1use tokio::sync::mpsc;
2use tpcp_core::TPCPEnvelope;
3
4/// Dead Letter Queue for unhandled TPCP envelopes.
5pub struct DLQ {
6    tx: mpsc::Sender<TPCPEnvelope>,
7    rx: tokio::sync::Mutex<mpsc::Receiver<TPCPEnvelope>>,
8}
9
10impl DLQ {
11    /// Creates a DLQ with capacity 100.
12    pub fn new() -> Self {
13        let (tx, rx) = mpsc::channel(100);
14        Self { tx, rx: tokio::sync::Mutex::new(rx) }
15    }
16
17    /// Enqueues an envelope. Returns false if the queue is full.
18    pub fn enqueue(&self, env: TPCPEnvelope) -> bool {
19        self.tx.try_send(env).is_ok()
20    }
21
22    /// Drains all queued envelopes.
23    pub async fn drain(&self) -> Vec<TPCPEnvelope> {
24        let mut rx = self.rx.lock().await;
25        let mut out = Vec::new();
26        while let Ok(env) = rx.try_recv() {
27            out.push(env);
28        }
29        out
30    }
31}
32
33impl Default for DLQ {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39#[cfg(test)]
40mod tests {
41    use super::*;
42    use tpcp_core::{Intent, MessageHeader, PROTOCOL_VERSION, TPCPEnvelope};
43
44    fn make_envelope(id: &str) -> TPCPEnvelope {
45        TPCPEnvelope {
46            header: MessageHeader {
47                message_id: id.to_string(),
48                timestamp: "2026-01-01T00:00:00Z".to_string(),
49                sender_id: "sender".to_string(),
50                receiver_id: "receiver".to_string(),
51                intent: Intent::TaskRequest,
52                ttl: 30,
53                protocol_version: PROTOCOL_VERSION.to_string(),
54            },
55            payload: serde_json::json!({"payload_type": "text", "content": "test"}),
56            signature: None,
57            ack_info: None,
58            chunk_info: None,
59        }
60    }
61
62    #[tokio::test]
63    async fn test_enqueue_drain() {
64        let dlq = DLQ::new();
65
66        let env1 = make_envelope("msg-1");
67        let env2 = make_envelope("msg-2");
68
69        assert!(dlq.enqueue(env1), "first enqueue must succeed");
70        assert!(dlq.enqueue(env2), "second enqueue must succeed");
71
72        let drained = dlq.drain().await;
73        assert_eq!(drained.len(), 2, "drain must return all enqueued envelopes");
74        assert_eq!(drained[0].header.message_id, "msg-1");
75        assert_eq!(drained[1].header.message_id, "msg-2");
76
77        // After draining, queue is empty.
78        let drained_again = dlq.drain().await;
79        assert!(drained_again.is_empty(), "second drain must return nothing");
80    }
81
82    #[tokio::test]
83    async fn test_drain_empty() {
84        let dlq = DLQ::new();
85        let drained = dlq.drain().await;
86        assert!(drained.is_empty(), "draining an empty DLQ must return an empty vec");
87    }
88}