use super::log::{log_dequeued, log_empty, log_receive, log_send_enqueued};
use super::{Channel, CorrelationSupport, PollableChannel};
use crate::error::Result;
use crate::Exchange;
use std::any::Any;
use std::collections::VecDeque;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tokio::sync::Mutex;
use tracing::trace;
type InnerQueue = VecDeque<Exchange>;
#[derive(Clone, Debug)]
pub struct QueueChannel {
id: String,
out_queue: Arc<Mutex<InnerQueue>>,
corr_seq: Arc<AtomicU64>,
}
impl QueueChannel {
pub fn with_id<S: Into<String>>(id: S) -> Self {
Self {
id: id.into(),
out_queue: Arc::new(Mutex::new(InnerQueue::default())),
corr_seq: Arc::new(AtomicU64::new(1)),
}
}
pub fn with_random_id() -> Self {
Self::with_id(format!("queue:{}", uuid::Uuid::new_v4()))
}
pub fn id(&self) -> &str {
&self.id
}
pub(crate) fn next_corr_id(&self) -> String {
let id = format!("c{}", self.corr_seq.fetch_add(1, Ordering::Relaxed));
trace!(target: "allora::channel",channel_id = %self.id, corr_id = %id, "generated correlation id");
id
}
pub(crate) fn ensure_correlation(&self, exchange: &mut Exchange) -> String {
if let Some(id) = exchange.in_msg.header("corr_id") {
let id_str = id.to_string();
trace!(target: "allora::channel",channel_id = %self.id, corr_id = %id_str, "reusing existing corr_id");
if exchange.in_msg.header("correlation_id").is_none() {
exchange.in_msg.set_header("correlation_id", &id_str);
}
id_str
} else {
let id = self.next_corr_id();
trace!(channel_id = %self.id, corr_id = %id, "assigned new corr_id");
exchange.in_msg.set_header("corr_id", &id);
if exchange.in_msg.header("correlation_id").is_none() {
exchange.in_msg.set_header("correlation_id", &id);
}
id
}
}
pub(crate) async fn push(&self, ex: Exchange) {
let mut g = self.out_queue.lock().await;
g.push_back(ex);
}
async fn enqueue(&self, exchange: Exchange, corr_id: Option<&str>) -> Result<()> {
log_send_enqueued(self.id(), &exchange, true, corr_id);
self.push(exchange).await;
Ok(())
}
}
#[async_trait::async_trait]
impl Channel for QueueChannel {
fn id(&self) -> &str {
&self.id
}
async fn send(&self, exchange: Exchange) -> Result<()> {
self.enqueue(exchange, None).await
}
fn kind(&self) -> &'static str {
"queue"
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait::async_trait]
impl PollableChannel for QueueChannel {
async fn try_receive(&self) -> Option<Exchange> {
let mut g = self.out_queue.lock().await;
if g.is_empty() {
log_empty(self.id(), "try_receive", true, Some(g.len()), None);
return None;
}
let exchange = g.pop_front().unwrap();
log_dequeued(
self.id(),
"try_receive",
true,
&exchange,
Some(g.len()),
None,
);
Some(exchange)
}
}
#[async_trait::async_trait]
impl CorrelationSupport for QueueChannel {
async fn send_with_correlation(&self, mut exchange: Exchange) -> Result<String> {
let id_val = self.ensure_correlation(&mut exchange);
self.enqueue(exchange, Some(&id_val)).await?;
Ok(id_val)
}
async fn receive_by_correlation(&self, corr_id: &str) -> Option<Exchange> {
let cid = corr_id.to_string();
let mut g = self.out_queue.lock().await;
if let Some(pos) = g
.iter()
.position(|e| e.in_msg.header("corr_id") == Some(&cid))
{
let ex = g.remove(pos).unwrap();
log_receive(
self.id(),
"receive_by_correlation",
"dequeued",
true,
Some(&ex),
Some(g.len()),
Some(&cid),
None,
None,
None,
);
return Some(ex);
}
log_receive(
self.id(),
"receive_by_correlation",
"empty",
true,
None,
Some(g.len()),
Some(&cid),
None,
None,
None,
);
None
}
}