Skip to main content

digitalis_server/
channel.rs

1use std::sync::Arc;
2
3use digitalis_core::{
4    common::{ChannelId, SubscriptionId},
5    server::MessageData,
6    Control, DigitalisError, DigitalisResult, MessageMinimal,
7};
8use tokio::sync::mpsc;
9
10use crate::client::{Client, ClientId};
11
12#[derive(Debug)]
13pub struct Channel {
14    _id: ChannelId,
15    subscriptions: Vec<(Client, SubscriptionId)>,
16}
17
18impl Channel {
19    fn new(id: ChannelId) -> Self {
20        Self {
21            _id: id,
22            subscriptions: Default::default(),
23        }
24    }
25
26    pub fn start(id: ChannelId) -> ChannelQueue {
27        let (tx, mut rx) = mpsc::channel(100);
28
29        tokio::spawn(async move {
30            let mut channel = Self::new(id);
31
32            loop {
33                if let Some(msg) = rx.recv().await {
34                    match channel.handle_message(msg) {
35                        Ok(Control::Continue) => {}
36                        Ok(Control::Exit) => break,
37                        Err(e) => {
38                            log::error!("Fail to handle message: {}", e);
39                        }
40                    }
41                }
42            }
43        });
44
45        ChannelQueue::new(id, tx)
46    }
47
48    pub fn handle_message(&mut self, msg: ChannelMessage) -> DigitalisResult<Control> {
49        use ChannelMessage::*;
50
51        match msg {
52            Subscribe { client, id } => {
53                self.subscriptions.push((client, id));
54            }
55            Unsubscribe { client_id, ids } => {
56                self.subscriptions
57                    .retain(|(c, i)| c.id() != client_id || !ids.contains(i));
58            }
59            UnsubscribeAll { client_id } => {
60                self.subscriptions.retain(|(c, _)| c.id() != client_id);
61            }
62            Broadcast {
63                message,
64                receive_timestamp,
65            } => {
66                if self.subscriptions.is_empty() {
67                    return Ok(Control::Continue);
68                }
69
70                let payload = Arc::new(message.write_to_bytes()?);
71
72                for (client, id) in self.subscriptions.iter() {
73                    let msg = MessageData {
74                        subscription_id: *id,
75                        receive_timestamp,
76                        payload: Arc::clone(&payload),
77                    }
78                    .into_message()?;
79
80                    if let Err(e) = client.nonblocking_send(msg) {
81                        log::error!("Fail to send message to client: {e:?}");
82                    }
83                }
84            }
85            Terminate => return Ok(Control::Exit),
86        }
87
88        Ok(Control::Continue)
89    }
90}
91
92#[allow(missing_debug_implementations)]
93pub enum ChannelMessage {
94    Subscribe {
95        client: Client,
96        id: SubscriptionId,
97    },
98    Unsubscribe {
99        client_id: ClientId,
100        ids: Arc<Vec<SubscriptionId>>,
101    },
102    UnsubscribeAll {
103        client_id: ClientId,
104    },
105    Broadcast {
106        message: Box<dyn MessageMinimal>,
107        receive_timestamp: u64,
108    },
109    Terminate,
110}
111
112#[derive(Debug, Clone)]
113pub struct ChannelQueue {
114    id: ChannelId,
115    sender: mpsc::Sender<ChannelMessage>,
116}
117
118impl ChannelQueue {
119    pub const fn new(id: ChannelId, sender: mpsc::Sender<ChannelMessage>) -> Self {
120        Self { id, sender }
121    }
122
123    pub const fn id(&self) -> ChannelId {
124        self.id
125    }
126
127    fn try_send(&self, msg: ChannelMessage) -> DigitalisResult<()> {
128        self.sender
129            .try_send(msg)
130            .map_err(|_| DigitalisError::ChannelSendError)
131    }
132
133    pub fn broadcast<T: MessageMinimal>(
134        &self,
135        msg: T,
136        receive_timestamp: u64,
137    ) -> DigitalisResult<()> {
138        self.try_send(ChannelMessage::Broadcast {
139            message: Box::new(msg),
140            receive_timestamp,
141        })
142    }
143
144    pub fn subscribe(&self, client: Client, id: SubscriptionId) -> DigitalisResult<()> {
145        self.try_send(ChannelMessage::Subscribe { client, id })
146    }
147
148    pub fn unsubscribe(
149        &self,
150        client_id: ClientId,
151        ids: Arc<Vec<SubscriptionId>>,
152    ) -> DigitalisResult<()> {
153        self.try_send(ChannelMessage::Unsubscribe { client_id, ids })
154    }
155
156    pub fn unsubscribe_all(&self, client_id: ClientId) -> DigitalisResult<()> {
157        self.try_send(ChannelMessage::UnsubscribeAll { client_id })
158    }
159
160    pub fn terminate(&self) -> DigitalisResult<()> {
161        self.try_send(ChannelMessage::Terminate)
162    }
163}