mueue/
bidir_queue.rs

1use crate::*;
2
3use std::collections::HashMap;
4
5/// Creates a new unbounded bidirectional message queue for messages.
6///
7/// A bidirectional queue is a queue where there are two flows of messages which are
8/// directed in opposite directions. It lets two objects communicate with each other.
9pub fn bidirectional_queue<A: Message, B: Message>(
10) -> (MessageEndpoint<A, B>, MessageEndpoint<B, A>) {
11    let (send1, recv1) = unidirectional_queue();
12    let (mut send2, mut recv2) = unidirectional_queue();
13
14    synchronize_queues_activity((&mut send2, &mut recv2), (&send1, &recv1));
15
16    let end1 = MessageEndpoint {
17        input: recv1,
18        output: send2,
19    };
20    let end2 = MessageEndpoint {
21        input: recv2,
22        output: send1,
23    };
24
25    (end1, end2)
26}
27
28/// Creates a new bounded bidirectional message queue for messages.
29///
30/// A bidirectional queue is a queue where there are two flows of messages which are
31/// directed in opposite directions. It lets two objects communicate with each other.
32pub fn bidirectional_queue_bounded<A: Message, B: Message>(
33    cap: usize,
34) -> (MessageEndpoint<A, B>, MessageEndpoint<B, A>) {
35    let (send1, recv1) = unidirectional_queue_bounded(cap);
36    let (mut send2, mut recv2) = unidirectional_queue_bounded(cap);
37
38    synchronize_queues_activity((&mut send2, &mut recv2), (&send1, &recv1));
39
40    let end1 = MessageEndpoint {
41        input: recv1,
42        output: send2,
43    };
44    let end2 = MessageEndpoint {
45        input: recv2,
46        output: send1,
47    };
48
49    (end1, end2)
50}
51
52/// The half of the bidirectional queue which can be used both
53/// for sending and receiving messages.
54pub struct MessageEndpoint<In, Out> {
55    input: MessageReceiver<In>,
56    output: MessageSender<Out>,
57}
58
59impl<In: Message, Out: Message> MessageEndpoint<In, Out> {
60    pub fn queue_id(&self) -> QueueId {
61        // The pointer which is stored in `is_active` in both `MessageEndpoint::input`
62        // and `MessageEndpoint::output` is unique for each queue, so it can be used as id.
63        QueueId::new(self.input.is_active.as_ref() as *const _ as usize)
64    }
65
66    /// Converts the [`MessageEndpoint`] to a [`MessageSender`].
67    pub fn as_sender(&self) -> &MessageSender<Out> {
68        &self.output
69    }
70
71    /// Converts the [`MessageEndpoint`] to a [`MessageReceiver`].
72    pub fn as_receiver(&self) -> &MessageReceiver<In> {
73        &self.input
74    }
75
76    /// Returns if the [`MessageEndpoint`] is active.
77    pub fn is_active(&self) -> bool {
78        self.as_sender().is_active() && self.as_receiver().is_active()
79    }
80
81    /// Activates the [`MessageEndpoint`].
82    pub fn activate(&self) {
83        self.input.activate();
84        self.output.activate();
85    }
86
87    /// Deactivates the [`MessageEndpoint`].
88    pub fn deactivate(&self) {
89        self.input.deactivate();
90        self.output.deactivate();
91    }
92
93    /// Sends message to the queue if the [`MessageEndpoint`] is active.
94    pub fn send(&self, msg: Out) -> Result<(), MessagingError> {
95        self.as_sender().send(msg)
96    }
97
98    /// Receives one message if there is any and the message queue is active.
99    pub fn recv(&self) -> Option<In> {
100        self.input.recv()
101    }
102
103    /// Returns an iterator which yields all pending messages.
104    pub fn iter(&self) -> MessageIter<'_, In> {
105        self.as_receiver().iter()
106    }
107
108    /// Receives one message and forwards it into another queue if it is not full.
109    pub fn forward_one<Any, N>(
110        &self,
111        next: MessageEndpoint<Any, N>,
112    ) -> Result<(), MessagingError>
113    where
114        Any: Message,
115        N: Message + From<In>,
116    {
117        self.as_receiver().forward_one(next.as_sender().clone())
118    }
119
120    /// Forwards all pending messages into another queue if it is not full.
121    pub fn forward<Any, N>(&self, next: MessageEndpoint<Any, N>)
122    where
123        Any: Message,
124        N: Message + From<In>,
125    {
126        self.as_receiver().forward(next.as_sender().clone());
127    }
128}
129
130impl<In, Out> Clone for MessageEndpoint<In, Out> {
131    fn clone(&self) -> Self {
132        Self {
133            input: self.input.clone(),
134            output: self.output.clone(),
135        }
136    }
137}
138
139/// A set of [`MessageEndpoint`]s which can be used to manipulate them simultaneously.
140pub struct MessageEndpoints<In, Out> {
141    map: HashMap<QueueId, MessageEndpoint<In, Out>>,
142}
143
144impl<In: Message, Out: Message> MessageEndpoints<In, Out> {
145    /// Constructs a new [`MessageEndpoints`].
146    pub fn new() -> Self {
147        Self {
148            map: HashMap::new(),
149        }
150    }
151
152    /// Creates a new bidirectional message queue and stores one of its [`MessageEndpoint`]
153    /// in the list. The other [`MessageEndpoint`] is returned to the caller.
154    pub fn new_queue(&mut self) -> MessageEndpoint<Out, In> {
155        let (end1, end2) = bidirectional_queue();
156        self.map.insert(end1.queue_id(), end1);
157
158        end2
159    }
160
161    /// Destroys one of the message queues stored in the [`MessageEndpoints`].
162    pub fn destroy_queue(&mut self, end: MessageEndpoint<In, Out>) {
163        self.map.remove(&end.queue_id());
164    }
165
166    /// Adds the given [`MessageEndpoint`] to the list.
167    pub fn add_endpoint(&mut self, end: MessageEndpoint<In, Out>) {
168        self.map.insert(end.queue_id(), end);
169    }
170
171    /// Removes the [`MessageEndpoint`] which correspondes to the given [`MessageEndpoint`]
172    /// from the [`MessageEndpoints`].
173    pub fn remove_endpoint(&mut self, end: MessageEndpoint<In, Out>) {
174        self.map.remove(&end.queue_id());
175    }
176
177    /// Receives one message if there is any.
178    pub fn recv(&self) -> Option<In> {
179        for end in self.map.values() {
180            if let Some(msg) = end.recv() {
181                return Some(msg);
182            }
183        }
184
185        None
186    }
187
188    /// Returns an iterator which yields all pending messages from all [`MessageEndpoint`]s.
189    pub fn iter(&self) -> AbstractMessageIter<impl Iterator + '_> {
190        AbstractMessageIter {
191            iter: self.map.values().flat_map(|end| end.iter()),
192        }
193    }
194}
195
196impl<In: Message, Out: Message + Clone> MessageEndpoints<In, Out> {
197    /// Sends the given message to all stored [`MessageEndpoint`]s.
198    /// 
199    /// If any of queues is full, this call will block until the send operation can proceed.
200    pub fn send(&self, msg: Out) -> Result<(), MessagingError> {
201        self.map
202            .values()
203            .try_for_each(|end| end.send(msg.clone()))
204    }
205}
206
207impl<In, Out> Clone for MessageEndpoints<In, Out> {
208    fn clone(&self) -> Self {
209        Self {
210            map: self.map.clone(),
211        }
212    }
213}