digitalis-server 0.0.0

Base server module for Foxflove
Documentation
use std::sync::Arc;

use digitalis_core::{
    common::{ChannelId, SubscriptionId},
    server::MessageData,
    Control, DigitalisError, DigitalisResult, MessageMinimal,
};
use tokio::sync::mpsc;

use crate::client::{Client, ClientId};

#[derive(Debug)]
pub struct Channel {
    _id: ChannelId,
    subscriptions: Vec<(Client, SubscriptionId)>,
}

impl Channel {
    fn new(id: ChannelId) -> Self {
        Self {
            _id: id,
            subscriptions: Default::default(),
        }
    }

    pub fn start(id: ChannelId) -> ChannelQueue {
        let (tx, mut rx) = mpsc::channel(100);

        tokio::spawn(async move {
            let mut channel = Self::new(id);

            loop {
                if let Some(msg) = rx.recv().await {
                    match channel.handle_message(msg) {
                        Ok(Control::Continue) => {}
                        Ok(Control::Exit) => break,
                        Err(e) => {
                            log::error!("Fail to handle message: {}", e);
                        }
                    }
                }
            }
        });

        ChannelQueue::new(id, tx)
    }

    pub fn handle_message(&mut self, msg: ChannelMessage) -> DigitalisResult<Control> {
        use ChannelMessage::*;

        match msg {
            Subscribe { client, id } => {
                self.subscriptions.push((client, id));
            }
            Unsubscribe { client_id, ids } => {
                self.subscriptions
                    .retain(|(c, i)| c.id() != client_id || !ids.contains(i));
            }
            UnsubscribeAll { client_id } => {
                self.subscriptions.retain(|(c, _)| c.id() != client_id);
            }
            Broadcast {
                message,
                receive_timestamp,
            } => {
                if self.subscriptions.is_empty() {
                    return Ok(Control::Continue);
                }

                let payload = Arc::new(message.write_to_bytes()?);

                for (client, id) in self.subscriptions.iter() {
                    let msg = MessageData {
                        subscription_id: *id,
                        receive_timestamp,
                        payload: Arc::clone(&payload),
                    }
                    .into_message()?;

                    if let Err(e) = client.nonblocking_send(msg) {
                        log::error!("Fail to send message to client: {e:?}");
                    }
                }
            }
            Terminate => return Ok(Control::Exit),
        }

        Ok(Control::Continue)
    }
}

#[allow(missing_debug_implementations)]
pub enum ChannelMessage {
    Subscribe {
        client: Client,
        id: SubscriptionId,
    },
    Unsubscribe {
        client_id: ClientId,
        ids: Arc<Vec<SubscriptionId>>,
    },
    UnsubscribeAll {
        client_id: ClientId,
    },
    Broadcast {
        message: Box<dyn MessageMinimal>,
        receive_timestamp: u64,
    },
    Terminate,
}

#[derive(Debug, Clone)]
pub struct ChannelQueue {
    id: ChannelId,
    sender: mpsc::Sender<ChannelMessage>,
}

impl ChannelQueue {
    pub const fn new(id: ChannelId, sender: mpsc::Sender<ChannelMessage>) -> Self {
        Self { id, sender }
    }

    pub const fn id(&self) -> ChannelId {
        self.id
    }

    fn try_send(&self, msg: ChannelMessage) -> DigitalisResult<()> {
        self.sender
            .try_send(msg)
            .map_err(|_| DigitalisError::ChannelSendError)
    }

    pub fn broadcast<T: MessageMinimal>(
        &self,
        msg: T,
        receive_timestamp: u64,
    ) -> DigitalisResult<()> {
        self.try_send(ChannelMessage::Broadcast {
            message: Box::new(msg),
            receive_timestamp,
        })
    }

    pub fn subscribe(&self, client: Client, id: SubscriptionId) -> DigitalisResult<()> {
        self.try_send(ChannelMessage::Subscribe { client, id })
    }

    pub fn unsubscribe(
        &self,
        client_id: ClientId,
        ids: Arc<Vec<SubscriptionId>>,
    ) -> DigitalisResult<()> {
        self.try_send(ChannelMessage::Unsubscribe { client_id, ids })
    }

    pub fn unsubscribe_all(&self, client_id: ClientId) -> DigitalisResult<()> {
        self.try_send(ChannelMessage::UnsubscribeAll { client_id })
    }

    pub fn terminate(&self) -> DigitalisResult<()> {
        self.try_send(ChannelMessage::Terminate)
    }
}