use crate::channel::SubscribableChannel;
use crate::{error::Result, Channel, Exchange};
use std::any::Any;
use std::fmt::Debug;
use std::sync::Mutex;
use tracing::trace;
pub struct DirectChannel {
id: String,
subscribers: Mutex<Vec<Box<dyn Fn(Exchange) -> Result<()> + Send + Sync>>>,
}
impl Debug for DirectChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let count = self.subscribers.lock().map(|v| v.len()).unwrap_or(0);
f.debug_struct("DirectChannel")
.field("id", &self.id)
.field("subscriber_count", &count)
.finish()
}
}
impl DirectChannel {
pub fn with_random_id() -> Self {
Self {
id: format!("direct:{}", uuid::Uuid::new_v4()),
subscribers: Mutex::new(Vec::new()),
}
}
pub fn with_id<S: Into<String>>(id: S) -> Self {
Self {
id: id.into(),
subscribers: Mutex::new(Vec::new()),
}
}
pub fn with_subscribers<I, F>(id: Option<&str>, subs: I) -> Self
where
I: IntoIterator<Item = F>,
F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
{
let ch = match id {
Some(i) => DirectChannel::with_id(i),
None => DirectChannel::with_random_id(),
};
for f in subs {
ch.subscribe(f);
}
ch
}
pub fn id(&self) -> &str {
&self.id
}
fn dispatch(&self, exchange: Exchange) -> Result<()> {
let subs = self.subscribers.lock().unwrap();
trace!(target: "allora::channel", channel_id = %self.id, subscribers = subs.len(), in_body = ?exchange.in_msg.body_text(), "direct dispatch start");
for (_idx, sub) in subs.iter().enumerate() {
let cloned = exchange.clone();
trace!(target: "allora::channel", channel_id = %self.id, subscriber_index = _idx, in_body = ?cloned.in_msg.body_text(), "direct dispatch to subscriber");
sub(cloned)?;
}
Ok(())
}
pub fn subscribe<F>(&self, f: F) -> usize
where
F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
{
let mut subs = self.subscribers.lock().unwrap();
subs.push(Box::new(f));
subs.len()
}
}
#[async_trait::async_trait]
impl Channel for DirectChannel {
fn id(&self) -> &str {
&self.id
}
async fn send(&self, exchange: Exchange) -> Result<()> {
self.dispatch(exchange)
}
fn kind(&self) -> &'static str {
"direct"
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl SubscribableChannel for DirectChannel {
fn subscribe<F>(&self, f: F) -> usize
where
F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
{
self.subscribe(f) }
}