use anyhow::{Result, anyhow};
use beetry_channel::{AnyBoxReceiver, AnyBoxSender, BoxReceiver, BoxSender};
use beetry_editor_types::{
output::channel::{ChannelConfig, ChannelKind, TokioChannelKind},
spec::channel::ChannelSpec,
};
use bon::Builder;
use crate::{BoxPlugin, ConstructPlugin, Named, PluginConstructor, PluginError, unique_plugins};
#[derive(Builder)]
pub struct TypeErasedChannel {
pub senders: Vec<AnyBoxSender>,
pub receivers: Vec<AnyBoxReceiver>,
}
impl TypeErasedChannel {
pub fn try_take_sender(&mut self) -> Result<AnyBoxSender> {
self.senders
.pop()
.ok_or_else(|| anyhow!("no free sender in the channel"))
}
pub fn try_take_receiver(&mut self) -> Result<AnyBoxReceiver> {
self.receivers
.pop()
.ok_or_else(|| anyhow!("no free receiver in the channel"))
}
}
impl Named for ChannelSpec {
fn name(&self) -> &str {
self.msg_type_name().as_str()
}
}
pub struct Factory {
func: Box<dyn Fn(ChannelConfig) -> TypeErasedChannel>,
}
impl std::fmt::Debug for Factory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Factory")
.field("func", &"<function>")
.finish()
}
}
impl Factory {
#[must_use]
pub fn from_msg<M: Clone + Default + 'static>() -> Self {
Self {
func: (Box::new(|config| {
let capacity = config.capacity();
let count = config.count();
let n_senders = count.sender();
let n_receivers = count.receiver();
let (senders, receivers) = match config.kind() {
ChannelKind::Tokio(TokioChannelKind::Broadcast) => {
let (sender, receiver) =
beetry_channel::tokio::broadcast::channel::<M>(capacity);
let receivers: Vec<_> =
std::iter::once(Box::new(receiver) as BoxReceiver<M>)
.chain(
std::iter::repeat_with(|| {
Box::new(sender.subscribe()) as BoxReceiver<M>
})
.take(n_receivers - 1),
)
.collect();
let senders: Vec<_> =
std::iter::repeat_with(|| Box::new(sender.clone()) as BoxSender<M>)
.take(n_senders)
.collect();
(senders, receivers)
}
ChannelKind::Tokio(TokioChannelKind::Mpsc) => {
let (sender, receiver) =
beetry_channel::tokio::mpsc::channel::<M>(capacity);
let senders: Vec<_> =
std::iter::repeat_with(|| Box::new(sender.clone()) as BoxSender<M>)
.take(n_senders)
.collect();
let receivers = vec![Box::new(receiver) as BoxReceiver<M>];
(senders, receivers)
}
ChannelKind::Tokio(TokioChannelKind::Watch) => {
let (sender, receiver) = beetry_channel::tokio::watch::channel::<M>();
let receivers: Vec<_> =
std::iter::once(Box::new(receiver) as BoxReceiver<M>)
.chain(
std::iter::repeat_with(|| {
Box::new(sender.subscribe()) as BoxReceiver<M>
})
.take(n_receivers - 1),
)
.collect();
let senders: Vec<_> =
std::iter::repeat_with(|| Box::new(sender.clone()) as BoxSender<M>)
.take(n_senders)
.collect();
(senders, receivers)
}
};
TypeErasedChannel::builder()
.senders(senders.into_iter().map(Into::into).collect())
.receivers(receivers.into_iter().map(Into::into).collect())
.build()
})),
}
}
#[must_use]
pub fn create(&self, config: ChannelConfig) -> TypeErasedChannel {
(self.func)(config)
}
}
pub type BoxChannelPlugin = BoxPlugin<ChannelSpec, Factory>;
pub type ChannelPluginConstructor = PluginConstructor<ChannelSpec, Factory>;
impl ChannelPluginConstructor {
pub fn plugins() -> Result<Vec<BoxChannelPlugin>, PluginError> {
unique_plugins::<Self, <Self as ConstructPlugin>::Spec, <Self as ConstructPlugin>::Factory>(
)
}
}
inventory::collect!(ChannelPluginConstructor);