beetry-plugin 0.2.0

Internal beetry crate. For the public API, check the beetry crate.
Documentation
use anyhow::{Result, anyhow};
use beetry_channel::{AnyBoxReceiver, AnyBoxSender, BoxReceiver, BoxSender};
use beetry_editor_types::{
    output::channel::{ChannelConfig, ChannelKind, TokioChannelKind},
    spec::channel::ChannelSpec,
};
use bon::Builder;

use crate::{BoxPlugin, ConstructPlugin, Named, PluginConstructor, PluginError, unique_plugins};

#[derive(Builder)]
pub struct TypeErasedChannel {
    pub senders: Vec<AnyBoxSender>,
    pub receivers: Vec<AnyBoxReceiver>,
}

impl TypeErasedChannel {
    pub fn try_take_sender(&mut self) -> Result<AnyBoxSender> {
        self.senders
            .pop()
            .ok_or_else(|| anyhow!("no free sender in the channel"))
    }

    pub fn try_take_receiver(&mut self) -> Result<AnyBoxReceiver> {
        self.receivers
            .pop()
            .ok_or_else(|| anyhow!("no free receiver in the channel"))
    }
}

impl Named for ChannelSpec {
    fn name(&self) -> &str {
        self.msg_type_name().as_str()
    }
}

pub struct Factory {
    func: Box<dyn Fn(ChannelConfig) -> TypeErasedChannel>,
}

impl std::fmt::Debug for Factory {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Factory")
            .field("func", &"<function>")
            .finish()
    }
}

impl Factory {
    #[must_use]
    pub fn from_msg<M: Clone + Default + 'static>() -> Self {
        Self {
            func: (Box::new(|config| {
                let capacity = config.capacity();
                let count = config.count();
                // client is responsible for providing correct and valid number of senders and
                // receivers
                let n_senders = count.sender();
                let n_receivers = count.receiver();

                let (senders, receivers) = match config.kind() {
                    ChannelKind::Tokio(TokioChannelKind::Broadcast) => {
                        let (sender, receiver) =
                            beetry_channel::tokio::broadcast::channel::<M>(capacity);

                        let receivers: Vec<_> =
                            std::iter::once(Box::new(receiver) as BoxReceiver<M>)
                                .chain(
                                    std::iter::repeat_with(|| {
                                        Box::new(sender.subscribe()) as BoxReceiver<M>
                                    })
                                    .take(n_receivers - 1),
                                )
                                .collect();
                        let senders: Vec<_> =
                            std::iter::repeat_with(|| Box::new(sender.clone()) as BoxSender<M>)
                                .take(n_senders)
                                .collect();

                        (senders, receivers)
                    }
                    ChannelKind::Tokio(TokioChannelKind::Mpsc) => {
                        let (sender, receiver) =
                            beetry_channel::tokio::mpsc::channel::<M>(capacity);

                        let senders: Vec<_> =
                            std::iter::repeat_with(|| Box::new(sender.clone()) as BoxSender<M>)
                                .take(n_senders)
                                .collect();
                        let receivers = vec![Box::new(receiver) as BoxReceiver<M>];

                        (senders, receivers)
                    }
                    ChannelKind::Tokio(TokioChannelKind::Watch) => {
                        let (sender, receiver) = beetry_channel::tokio::watch::channel::<M>();

                        let receivers: Vec<_> =
                            std::iter::once(Box::new(receiver) as BoxReceiver<M>)
                                .chain(
                                    std::iter::repeat_with(|| {
                                        Box::new(sender.subscribe()) as BoxReceiver<M>
                                    })
                                    .take(n_receivers - 1),
                                )
                                .collect();
                        let senders: Vec<_> =
                            std::iter::repeat_with(|| Box::new(sender.clone()) as BoxSender<M>)
                                .take(n_senders)
                                .collect();

                        (senders, receivers)
                    }
                };

                TypeErasedChannel::builder()
                    .senders(senders.into_iter().map(Into::into).collect())
                    .receivers(receivers.into_iter().map(Into::into).collect())
                    .build()
            })),
        }
    }

    #[must_use]
    pub fn create(&self, config: ChannelConfig) -> TypeErasedChannel {
        (self.func)(config)
    }
}

pub type BoxChannelPlugin = BoxPlugin<ChannelSpec, Factory>;
pub type ChannelPluginConstructor = PluginConstructor<ChannelSpec, Factory>;

impl ChannelPluginConstructor {
    pub fn plugins() -> Result<Vec<BoxChannelPlugin>, PluginError> {
        unique_plugins::<Self, <Self as ConstructPlugin>::Spec, <Self as ConstructPlugin>::Factory>(
        )
    }
}

inventory::collect!(ChannelPluginConstructor);