digitalis_server/
channel.rs1use std::sync::Arc;
2
3use digitalis_core::{
4 common::{ChannelId, SubscriptionId},
5 server::MessageData,
6 Control, DigitalisError, DigitalisResult, MessageMinimal,
7};
8use tokio::sync::mpsc;
9
10use crate::client::{Client, ClientId};
11
12#[derive(Debug)]
13pub struct Channel {
14 _id: ChannelId,
15 subscriptions: Vec<(Client, SubscriptionId)>,
16}
17
18impl Channel {
19 fn new(id: ChannelId) -> Self {
20 Self {
21 _id: id,
22 subscriptions: Default::default(),
23 }
24 }
25
26 pub fn start(id: ChannelId) -> ChannelQueue {
27 let (tx, mut rx) = mpsc::channel(100);
28
29 tokio::spawn(async move {
30 let mut channel = Self::new(id);
31
32 loop {
33 if let Some(msg) = rx.recv().await {
34 match channel.handle_message(msg) {
35 Ok(Control::Continue) => {}
36 Ok(Control::Exit) => break,
37 Err(e) => {
38 log::error!("Fail to handle message: {}", e);
39 }
40 }
41 }
42 }
43 });
44
45 ChannelQueue::new(id, tx)
46 }
47
48 pub fn handle_message(&mut self, msg: ChannelMessage) -> DigitalisResult<Control> {
49 use ChannelMessage::*;
50
51 match msg {
52 Subscribe { client, id } => {
53 self.subscriptions.push((client, id));
54 }
55 Unsubscribe { client_id, ids } => {
56 self.subscriptions
57 .retain(|(c, i)| c.id() != client_id || !ids.contains(i));
58 }
59 UnsubscribeAll { client_id } => {
60 self.subscriptions.retain(|(c, _)| c.id() != client_id);
61 }
62 Broadcast {
63 message,
64 receive_timestamp,
65 } => {
66 if self.subscriptions.is_empty() {
67 return Ok(Control::Continue);
68 }
69
70 let payload = Arc::new(message.write_to_bytes()?);
71
72 for (client, id) in self.subscriptions.iter() {
73 let msg = MessageData {
74 subscription_id: *id,
75 receive_timestamp,
76 payload: Arc::clone(&payload),
77 }
78 .into_message()?;
79
80 if let Err(e) = client.nonblocking_send(msg) {
81 log::error!("Fail to send message to client: {e:?}");
82 }
83 }
84 }
85 Terminate => return Ok(Control::Exit),
86 }
87
88 Ok(Control::Continue)
89 }
90}
91
92#[allow(missing_debug_implementations)]
93pub enum ChannelMessage {
94 Subscribe {
95 client: Client,
96 id: SubscriptionId,
97 },
98 Unsubscribe {
99 client_id: ClientId,
100 ids: Arc<Vec<SubscriptionId>>,
101 },
102 UnsubscribeAll {
103 client_id: ClientId,
104 },
105 Broadcast {
106 message: Box<dyn MessageMinimal>,
107 receive_timestamp: u64,
108 },
109 Terminate,
110}
111
112#[derive(Debug, Clone)]
113pub struct ChannelQueue {
114 id: ChannelId,
115 sender: mpsc::Sender<ChannelMessage>,
116}
117
118impl ChannelQueue {
119 pub const fn new(id: ChannelId, sender: mpsc::Sender<ChannelMessage>) -> Self {
120 Self { id, sender }
121 }
122
123 pub const fn id(&self) -> ChannelId {
124 self.id
125 }
126
127 fn try_send(&self, msg: ChannelMessage) -> DigitalisResult<()> {
128 self.sender
129 .try_send(msg)
130 .map_err(|_| DigitalisError::ChannelSendError)
131 }
132
133 pub fn broadcast<T: MessageMinimal>(
134 &self,
135 msg: T,
136 receive_timestamp: u64,
137 ) -> DigitalisResult<()> {
138 self.try_send(ChannelMessage::Broadcast {
139 message: Box::new(msg),
140 receive_timestamp,
141 })
142 }
143
144 pub fn subscribe(&self, client: Client, id: SubscriptionId) -> DigitalisResult<()> {
145 self.try_send(ChannelMessage::Subscribe { client, id })
146 }
147
148 pub fn unsubscribe(
149 &self,
150 client_id: ClientId,
151 ids: Arc<Vec<SubscriptionId>>,
152 ) -> DigitalisResult<()> {
153 self.try_send(ChannelMessage::Unsubscribe { client_id, ids })
154 }
155
156 pub fn unsubscribe_all(&self, client_id: ClientId) -> DigitalisResult<()> {
157 self.try_send(ChannelMessage::UnsubscribeAll { client_id })
158 }
159
160 pub fn terminate(&self) -> DigitalisResult<()> {
161 self.try_send(ChannelMessage::Terminate)
162 }
163}