allora_core/channel/
queue.rs1use super::log::{log_dequeued, log_empty, log_receive, log_send_enqueued};
15use super::{Channel, CorrelationSupport, PollableChannel};
16use crate::error::Result;
17use crate::Exchange;
18use std::any::Any;
19use std::collections::VecDeque;
20use std::sync::{
21 atomic::{AtomicU64, Ordering},
22 Arc,
23};
24use tokio::sync::Mutex;
25use tracing::trace;
26
27type InnerQueue = VecDeque<Exchange>;
28
29#[derive(Clone, Debug)]
30pub struct QueueChannel {
31 id: String,
32 out_queue: Arc<Mutex<InnerQueue>>,
33 corr_seq: Arc<AtomicU64>,
34}
35
36impl QueueChannel {
37 pub fn with_id<S: Into<String>>(id: S) -> Self {
43 Self {
44 id: id.into(),
45 out_queue: Arc::new(Mutex::new(InnerQueue::default())),
46 corr_seq: Arc::new(AtomicU64::new(1)),
47 }
48 }
49
50 pub fn with_random_id() -> Self {
52 Self::with_id(format!("queue:{}", uuid::Uuid::new_v4()))
53 }
54
55 pub fn id(&self) -> &str {
61 &self.id
62 }
63
64 pub(crate) fn next_corr_id(&self) -> String {
70 let id = format!("c{}", self.corr_seq.fetch_add(1, Ordering::Relaxed));
71 trace!(target: "allora::channel",channel_id = %self.id, corr_id = %id, "generated correlation id");
72 id
73 }
74
75 pub(crate) fn ensure_correlation(&self, exchange: &mut Exchange) -> String {
77 if let Some(id) = exchange.in_msg.header("corr_id") {
78 let id_str = id.to_string();
79 trace!(target: "allora::channel",channel_id = %self.id, corr_id = %id_str, "reusing existing corr_id");
80 if exchange.in_msg.header("correlation_id").is_none() {
81 exchange.in_msg.set_header("correlation_id", &id_str);
82 }
83 id_str
84 } else {
85 let id = self.next_corr_id();
86 trace!(channel_id = %self.id, corr_id = %id, "assigned new corr_id");
87 exchange.in_msg.set_header("corr_id", &id);
88 if exchange.in_msg.header("correlation_id").is_none() {
89 exchange.in_msg.set_header("correlation_id", &id);
90 }
91 id
92 }
93 }
94
95 pub(crate) async fn push(&self, ex: Exchange) {
96 let mut g = self.out_queue.lock().await;
97 g.push_back(ex);
98 }
99
100 async fn enqueue(&self, exchange: Exchange, corr_id: Option<&str>) -> Result<()> {
102 log_send_enqueued(self.id(), &exchange, true, corr_id);
103 self.push(exchange).await;
104 Ok(())
105 }
106}
107
108#[async_trait::async_trait]
114impl Channel for QueueChannel {
115 fn id(&self) -> &str {
119 &self.id
120 }
121 async fn send(&self, exchange: Exchange) -> Result<()> {
125 self.enqueue(exchange, None).await
126 }
127
128 fn kind(&self) -> &'static str {
129 "queue"
130 }
131 fn as_any(&self) -> &dyn Any {
132 self
133 }
134}
135
136#[async_trait::async_trait]
138impl PollableChannel for QueueChannel {
139 async fn try_receive(&self) -> Option<Exchange> {
143 let mut g = self.out_queue.lock().await;
144 if g.is_empty() {
145 log_empty(self.id(), "try_receive", true, Some(g.len()), None);
146 return None;
147 }
148 let exchange = g.pop_front().unwrap();
149 log_dequeued(
150 self.id(),
151 "try_receive",
152 true,
153 &exchange,
154 Some(g.len()),
155 None,
156 );
157 Some(exchange)
158 }
159}
160
161#[async_trait::async_trait]
163impl CorrelationSupport for QueueChannel {
164 async fn send_with_correlation(&self, mut exchange: Exchange) -> Result<String> {
168 let id_val = self.ensure_correlation(&mut exchange);
169 self.enqueue(exchange, Some(&id_val)).await?;
170 Ok(id_val)
171 }
172
173 async fn receive_by_correlation(&self, corr_id: &str) -> Option<Exchange> {
177 let cid = corr_id.to_string();
178 let mut g = self.out_queue.lock().await;
179 if let Some(pos) = g
180 .iter()
181 .position(|e| e.in_msg.header("corr_id") == Some(&cid))
182 {
183 let ex = g.remove(pos).unwrap();
184 log_receive(
185 self.id(),
186 "receive_by_correlation",
187 "dequeued",
188 true,
189 Some(&ex),
190 Some(g.len()),
191 Some(&cid),
192 None,
193 None,
194 None,
195 );
196 return Some(ex);
197 }
198 log_receive(
199 self.id(),
200 "receive_by_correlation",
201 "empty",
202 true,
203 None,
204 Some(g.len()),
205 Some(&cid),
206 None,
207 None,
208 None,
209 );
210 None
211 }
212}