lps/
channel.rs

1use crate::*;
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::mpsc::{self, Receiver, Sender};
5use std::sync::Arc;
6
7// A unique id associated with a message channel.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
9pub(crate) struct MessageChannelId(usize);
10
11impl MessageChannelId {
12    // Creates a new [`MessageChannelId`] by casting a pointer to [`MessageTypeId`]
13    // which is stored in the given [`MessageSender`] to [`usize`].
14    //
15    // The pointer to [`MessageChannelId`] is shared between a sender and a receiver,
16    // so [`MessageChannelId`]s obtained from them will be the same.
17    pub(crate) fn from_sender(msg_send: &MessageSender) -> Self {
18        Self(msg_send.msg_type_id.as_ref() as *const _ as usize)
19    }
20
21    // Creates a new [`MessageChannelId`] by casting a pointer to [`MessageTypeId`]
22    // which is stored in the given [`MessageReceiver`] to [`usize`].
23    //
24    // The pointer to [`MessageChannelId`] is shared between a sender and a receiver,
25    // so [`MessageChannelId`]s obtained from them will be the same.
26    pub(crate) fn from_receiver(msg_recv: &MessageReceiver) -> Self {
27        Self(msg_recv.msg_type_id.as_ref() as *const _ as usize)
28    }
29}
30
31// The sending-half of the message channel.
32pub(crate) struct MessageSender {
33    msg_type_id: Arc<MessageTypeId>,
34    send: Sender<Arc<dyn Message>>,
35    is_active: Arc<AtomicBool>,
36}
37
38impl MessageSender {
39    // Returns the id of the channel which the [`MessageSender`] is part of.
40    pub(crate) fn channel_id(&self) -> MessageChannelId {
41        MessageChannelId::from_sender(self)
42    }
43
44    // Returns the type id of messages which can be sent through this [`MessageSender`].
45    pub(crate) fn message_type_id(&self) -> MessageTypeId {
46        *self.msg_type_id
47    }
48
49    // Returns if the channel is active.
50    pub(crate) fn is_active(&self) -> bool {
51        self.is_active.load(Ordering::SeqCst)
52    }
53
54    // Makes the channel active or not depending on `is_active`.
55    #[allow(dead_code)]
56    pub(crate) fn set_active(&self, is_active: bool) {
57        self.is_active.store(is_active, Ordering::SeqCst);
58    }
59
60    // Sends the given messages if messages of its types are supported by the channel.
61    pub(crate) fn send(&self, msg: Arc<dyn Message>) -> Result<(), MessageChannelError> {
62        if msg.type_id() == self.message_type_id() {
63            return self
64                .send
65                .send(msg)
66                .map_err(|_| MessageChannelError::MessageNotSent);
67        }
68
69        Err(MessageChannelError::WrongMessageType)
70    }
71}
72
73// The receiving-half of the message channel.
74pub(crate) struct MessageReceiver {
75    msg_type_id: Arc<MessageTypeId>,
76    recv: Receiver<Arc<dyn Message>>,
77    is_active: Arc<AtomicBool>,
78}
79
80impl MessageReceiver {
81    // Returns the id of the channel which the [`MessageReceiver`] is part of.
82    pub(crate) fn channel_id(&self) -> MessageChannelId {
83        MessageChannelId::from_receiver(self)
84    }
85
86    // Returns the type id of messages which can be received through this [`MessageReceiver`].
87    pub(crate) fn message_type_id(&self) -> MessageTypeId {
88        *self.msg_type_id
89    }
90
91    // Returns if the channel is active.
92    pub(crate) fn is_active(&self) -> bool {
93        self.is_active.load(Ordering::SeqCst)
94    }
95
96    // Makes the channel active or not depending on `is_active`.
97    pub(crate) fn set_active(&self, is_active: bool) {
98        self.is_active.store(is_active, Ordering::SeqCst);
99    }
100
101    // Receives the message if there is any.
102    pub(crate) fn recv(&self) -> Option<Arc<dyn Message>> {
103        self.recv.try_recv().ok()
104    }
105}
106
107// Creates a new message channel which can be used for sending messages with the given type id.
108pub(crate) fn message_channel_new(msg_type_id: MessageTypeId) -> (MessageSender, MessageReceiver) {
109    let msg_type_id = Arc::new(msg_type_id);
110    let (send, recv) = mpsc::channel();
111    let is_active = Arc::new(AtomicBool::new(true));
112
113    let msg_send = MessageSender {
114        msg_type_id: Arc::clone(&msg_type_id),
115        send,
116        is_active: Arc::clone(&is_active),
117    };
118    let msg_recv = MessageReceiver {
119        msg_type_id: Arc::clone(&msg_type_id),
120        recv,
121        is_active,
122    };
123
124    (msg_send, msg_recv)
125}
126
127#[derive(Debug)]
128pub enum MessageChannelError {
129    WrongMessageType,
130    MessageNotSent,
131}