1use crate::*;
2
3use std::collections::HashMap;
4
5pub fn bidirectional_queue<A: Message, B: Message>(
10) -> (MessageEndpoint<A, B>, MessageEndpoint<B, A>) {
11 let (send1, recv1) = unidirectional_queue();
12 let (mut send2, mut recv2) = unidirectional_queue();
13
14 synchronize_queues_activity((&mut send2, &mut recv2), (&send1, &recv1));
15
16 let end1 = MessageEndpoint {
17 input: recv1,
18 output: send2,
19 };
20 let end2 = MessageEndpoint {
21 input: recv2,
22 output: send1,
23 };
24
25 (end1, end2)
26}
27
28pub fn bidirectional_queue_bounded<A: Message, B: Message>(
33 cap: usize,
34) -> (MessageEndpoint<A, B>, MessageEndpoint<B, A>) {
35 let (send1, recv1) = unidirectional_queue_bounded(cap);
36 let (mut send2, mut recv2) = unidirectional_queue_bounded(cap);
37
38 synchronize_queues_activity((&mut send2, &mut recv2), (&send1, &recv1));
39
40 let end1 = MessageEndpoint {
41 input: recv1,
42 output: send2,
43 };
44 let end2 = MessageEndpoint {
45 input: recv2,
46 output: send1,
47 };
48
49 (end1, end2)
50}
51
52pub struct MessageEndpoint<In, Out> {
55 input: MessageReceiver<In>,
56 output: MessageSender<Out>,
57}
58
59impl<In: Message, Out: Message> MessageEndpoint<In, Out> {
60 pub fn queue_id(&self) -> QueueId {
61 QueueId::new(self.input.is_active.as_ref() as *const _ as usize)
64 }
65
66 pub fn as_sender(&self) -> &MessageSender<Out> {
68 &self.output
69 }
70
71 pub fn as_receiver(&self) -> &MessageReceiver<In> {
73 &self.input
74 }
75
76 pub fn is_active(&self) -> bool {
78 self.as_sender().is_active() && self.as_receiver().is_active()
79 }
80
81 pub fn activate(&self) {
83 self.input.activate();
84 self.output.activate();
85 }
86
87 pub fn deactivate(&self) {
89 self.input.deactivate();
90 self.output.deactivate();
91 }
92
93 pub fn send(&self, msg: Out) -> Result<(), MessagingError> {
95 self.as_sender().send(msg)
96 }
97
98 pub fn recv(&self) -> Option<In> {
100 self.input.recv()
101 }
102
103 pub fn iter(&self) -> MessageIter<'_, In> {
105 self.as_receiver().iter()
106 }
107
108 pub fn forward_one<Any, N>(
110 &self,
111 next: MessageEndpoint<Any, N>,
112 ) -> Result<(), MessagingError>
113 where
114 Any: Message,
115 N: Message + From<In>,
116 {
117 self.as_receiver().forward_one(next.as_sender().clone())
118 }
119
120 pub fn forward<Any, N>(&self, next: MessageEndpoint<Any, N>)
122 where
123 Any: Message,
124 N: Message + From<In>,
125 {
126 self.as_receiver().forward(next.as_sender().clone());
127 }
128}
129
130impl<In, Out> Clone for MessageEndpoint<In, Out> {
131 fn clone(&self) -> Self {
132 Self {
133 input: self.input.clone(),
134 output: self.output.clone(),
135 }
136 }
137}
138
139pub struct MessageEndpoints<In, Out> {
141 map: HashMap<QueueId, MessageEndpoint<In, Out>>,
142}
143
144impl<In: Message, Out: Message> MessageEndpoints<In, Out> {
145 pub fn new() -> Self {
147 Self {
148 map: HashMap::new(),
149 }
150 }
151
152 pub fn new_queue(&mut self) -> MessageEndpoint<Out, In> {
155 let (end1, end2) = bidirectional_queue();
156 self.map.insert(end1.queue_id(), end1);
157
158 end2
159 }
160
161 pub fn destroy_queue(&mut self, end: MessageEndpoint<In, Out>) {
163 self.map.remove(&end.queue_id());
164 }
165
166 pub fn add_endpoint(&mut self, end: MessageEndpoint<In, Out>) {
168 self.map.insert(end.queue_id(), end);
169 }
170
171 pub fn remove_endpoint(&mut self, end: MessageEndpoint<In, Out>) {
174 self.map.remove(&end.queue_id());
175 }
176
177 pub fn recv(&self) -> Option<In> {
179 for end in self.map.values() {
180 if let Some(msg) = end.recv() {
181 return Some(msg);
182 }
183 }
184
185 None
186 }
187
188 pub fn iter(&self) -> AbstractMessageIter<impl Iterator + '_> {
190 AbstractMessageIter {
191 iter: self.map.values().flat_map(|end| end.iter()),
192 }
193 }
194}
195
196impl<In: Message, Out: Message + Clone> MessageEndpoints<In, Out> {
197 pub fn send(&self, msg: Out) -> Result<(), MessagingError> {
201 self.map
202 .values()
203 .try_for_each(|end| end.send(msg.clone()))
204 }
205}
206
207impl<In, Out> Clone for MessageEndpoints<In, Out> {
208 fn clone(&self) -> Self {
209 Self {
210 map: self.map.clone(),
211 }
212 }
213}