allora_core/channel/
queue.rs

1//! QueueChannel: buffered FIFO channel implementation.
2//!
3//! Provides queued message storage with optional correlation id utilities.
4//! In sync mode, operations are protected by a std::sync::Mutex over a VecDeque; in async
5//! mode, a Tokio Mutex guards a Vec allowing awaitable dequeue operations.
6//!
7//! Features:
8//! * FIFO ordering
9//! * Non-blocking try_receive (sync + async variants)
10//! * Optional blocking receiver with timeout (sync mode)
11//! * Correlation ID generation & lookup helpers via `CorrelationSupport`
12//! * `async` feature for Tokio-based async support
13
14use super::log::{log_dequeued, log_empty, log_receive, log_send_enqueued};
15use super::{Channel, CorrelationSupport, PollableChannel};
16use crate::error::Result;
17use crate::Exchange;
18use std::any::Any;
19use std::collections::VecDeque;
20use std::sync::{
21    atomic::{AtomicU64, Ordering},
22    Arc,
23};
24use tokio::sync::Mutex;
25use tracing::trace;
26
27type InnerQueue = VecDeque<Exchange>;
28
29#[derive(Clone, Debug)]
30pub struct QueueChannel {
31    id: String,
32    out_queue: Arc<Mutex<InnerQueue>>,
33    corr_seq: Arc<AtomicU64>,
34}
35
36impl QueueChannel {
37    // ========================================================================
38    // Constructors (associated functions)
39    // ========================================================================
40
41    /// Create a new queue channel with an explicit identifier.
42    pub fn with_id<S: Into<String>>(id: S) -> Self {
43        Self {
44            id: id.into(),
45            out_queue: Arc::new(Mutex::new(InnerQueue::default())),
46            corr_seq: Arc::new(AtomicU64::new(1)),
47        }
48    }
49
50    /// Create a new queue channel with a randomly generated UUID-based identifier.
51    pub fn with_random_id() -> Self {
52        Self::with_id(format!("queue:{}", uuid::Uuid::new_v4()))
53    }
54
55    // ========================================================================
56    // Public methods
57    // ========================================================================
58
59    /// Returns the channel identifier.
60    pub fn id(&self) -> &str {
61        &self.id
62    }
63
64    // ========================================================================
65    // Private helper methods
66    // ========================================================================
67
68    /// Generate the next correlation ID for this channel.
69    pub(crate) fn next_corr_id(&self) -> String {
70        let id = format!("c{}", self.corr_seq.fetch_add(1, Ordering::Relaxed));
71        trace!(target: "allora::channel",channel_id = %self.id, corr_id = %id, "generated correlation id");
72        id
73    }
74
75    /// Ensure the exchange has a correlation ID, reusing existing or generating new.
76    pub(crate) fn ensure_correlation(&self, exchange: &mut Exchange) -> String {
77        if let Some(id) = exchange.in_msg.header("corr_id") {
78            let id_str = id.to_string();
79            trace!(target: "allora::channel",channel_id = %self.id, corr_id = %id_str, "reusing existing corr_id");
80            if exchange.in_msg.header("correlation_id").is_none() {
81                exchange.in_msg.set_header("correlation_id", &id_str);
82            }
83            id_str
84        } else {
85            let id = self.next_corr_id();
86            trace!(channel_id = %self.id, corr_id = %id, "assigned new corr_id");
87            exchange.in_msg.set_header("corr_id", &id);
88            if exchange.in_msg.header("correlation_id").is_none() {
89                exchange.in_msg.set_header("correlation_id", &id);
90            }
91            id
92        }
93    }
94
95    pub(crate) async fn push(&self, ex: Exchange) {
96        let mut g = self.out_queue.lock().await;
97        g.push_back(ex);
98    }
99
100    // Reusable enqueue helpers (reduce duplication in trait impls)
101    async fn enqueue(&self, exchange: Exchange, corr_id: Option<&str>) -> Result<()> {
102        log_send_enqueued(self.id(), &exchange, true, corr_id);
103        self.push(exchange).await;
104        Ok(())
105    }
106}
107
108// ============================================================================
109// Trait implementations
110// ============================================================================
111
112// Channel trait - core send/receive interface
113#[async_trait::async_trait]
114impl Channel for QueueChannel {
115    // --------------------------------------------------------------------
116    // Identity & metadata
117    // --------------------------------------------------------------------
118    fn id(&self) -> &str {
119        &self.id
120    }
121    // --------------------------------------------------------------------
122    // Send operations
123    // --------------------------------------------------------------------
124    async fn send(&self, exchange: Exchange) -> Result<()> {
125        self.enqueue(exchange, None).await
126    }
127
128    fn kind(&self) -> &'static str {
129        "queue"
130    }
131    fn as_any(&self) -> &dyn Any {
132        self
133    }
134}
135
136// PollableChannel trait - queue/dequeue operations
137#[async_trait::async_trait]
138impl PollableChannel for QueueChannel {
139    // --------------------------------------------------------------------
140    // Receive
141    // --------------------------------------------------------------------
142    async fn try_receive(&self) -> Option<Exchange> {
143        let mut g = self.out_queue.lock().await;
144        if g.is_empty() {
145            log_empty(self.id(), "try_receive", true, Some(g.len()), None);
146            return None;
147        }
148        let exchange = g.pop_front().unwrap();
149        log_dequeued(
150            self.id(),
151            "try_receive",
152            true,
153            &exchange,
154            Some(g.len()),
155            None,
156        );
157        Some(exchange)
158    }
159}
160
161// CorrelationSupport trait - request/reply correlation patterns
162#[async_trait::async_trait]
163impl CorrelationSupport for QueueChannel {
164    // --------------------------------------------------------------------
165    // Correlated send
166    // --------------------------------------------------------------------
167    async fn send_with_correlation(&self, mut exchange: Exchange) -> Result<String> {
168        let id_val = self.ensure_correlation(&mut exchange);
169        self.enqueue(exchange, Some(&id_val)).await?;
170        Ok(id_val)
171    }
172
173    // --------------------------------------------------------------------
174    // Correlated receive
175    // --------------------------------------------------------------------
176    async fn receive_by_correlation(&self, corr_id: &str) -> Option<Exchange> {
177        let cid = corr_id.to_string();
178        let mut g = self.out_queue.lock().await;
179        if let Some(pos) = g
180            .iter()
181            .position(|e| e.in_msg.header("corr_id") == Some(&cid))
182        {
183            let ex = g.remove(pos).unwrap();
184            log_receive(
185                self.id(),
186                "receive_by_correlation",
187                "dequeued",
188                true,
189                Some(&ex),
190                Some(g.len()),
191                Some(&cid),
192                None,
193                None,
194                None,
195            );
196            return Some(ex);
197        }
198        log_receive(
199            self.id(),
200            "receive_by_correlation",
201            "empty",
202            true,
203            None,
204            Some(g.len()),
205            Some(&cid),
206            None,
207            None,
208            None,
209        );
210        None
211    }
212}