1use crate::actor::Id;
15use crate::util::{HashableHashMap, HashableHashSet};
16use crate::{Rewrite, RewritePlan};
17use std::collections::btree_map::Entry;
18use std::collections::{btree_map, hash_map, hash_set};
19use std::collections::{BTreeMap, VecDeque};
20use std::hash::Hash;
21use std::str::FromStr;
22
23#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, serde::Serialize)]
25pub struct Envelope<Msg> {
26 pub src: Id,
27 pub dst: Id,
28 pub msg: Msg,
29}
30
31impl<Msg> Envelope<&Msg> {
32 pub fn to_cloned_msg(&self) -> Envelope<Msg>
34 where
35 Msg: Clone,
36 {
37 Envelope {
38 src: self.src,
39 dst: self.dst,
40 msg: self.msg.clone(),
41 }
42 }
43}
44
45#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, serde::Serialize)]
47pub enum Network<Msg>
48where
49 Msg: Eq + Hash,
50{
51 UnorderedDuplicating(HashableHashSet<Envelope<Msg>>, Option<Envelope<Msg>>),
53
54 UnorderedNonDuplicating(HashableHashMap<Envelope<Msg>, usize>),
56
57 Ordered(BTreeMap<(Id, Id), VecDeque<Msg>>),
68}
69
70impl<Msg> Network<Msg>
71where
72 Msg: Eq + Hash,
73{
74 pub fn new_ordered(envelopes: impl IntoIterator<Item = Envelope<Msg>>) -> Self {
85 let mut this = Self::Ordered(BTreeMap::new());
86 for env in envelopes {
87 this.send(env);
88 }
89 this
90 }
91
92 pub fn new_unordered_duplicating_with_last_msg(
97 envelopes: impl IntoIterator<Item = Envelope<Msg>>,
98 last_msg: Option<Envelope<Msg>>,
99 ) -> Self {
100 let mut this = Self::UnorderedDuplicating(
101 HashableHashSet::with_hasher(crate::stable::build_hasher()),
102 last_msg,
103 );
104 for env in envelopes {
105 this.send(env);
106 }
107 this
108 }
109
110 pub fn new_unordered_duplicating(envelopes: impl IntoIterator<Item = Envelope<Msg>>) -> Self {
114 let mut this = Self::UnorderedDuplicating(
115 HashableHashSet::with_hasher(crate::stable::build_hasher()),
116 None,
117 );
118 for env in envelopes {
119 this.send(env);
120 }
121 this
122 }
123
124 pub fn new_unordered_nonduplicating(
128 envelopes: impl IntoIterator<Item = Envelope<Msg>>,
129 ) -> Self {
130 let mut this = Self::UnorderedNonDuplicating(HashableHashMap::with_hasher(
131 crate::stable::build_hasher(),
132 ));
133 for env in envelopes {
134 this.send(env);
135 }
136 this
137 }
138
139 pub fn names() -> Vec<&'static str> {
141 struct IterStr<Msg: Eq + Hash>(Option<Network<Msg>>);
142 impl<Msg: Eq + Hash> Iterator for IterStr<Msg> {
143 type Item = &'static str;
144 fn next(&mut self) -> Option<Self::Item> {
145 if let Some(network) = &self.0 {
146 match network {
147 Network::Ordered(_) => {
148 self.0 = Some(Network::UnorderedDuplicating(Default::default(), None));
149 Some("ordered")
150 }
151 Network::UnorderedDuplicating(_, _) => {
152 self.0 = Some(Network::UnorderedNonDuplicating(Default::default()));
153 Some("unordered_duplicating")
154 }
155 Network::UnorderedNonDuplicating(_) => {
156 self.0 = None;
157 Some("unordered_nonduplicating")
158 }
159 }
160 } else {
161 None
162 }
163 }
164 }
165 IterStr::<Msg>(Some(Network::Ordered(Default::default()))).collect()
166 }
167
168 pub fn iter_all(&self) -> NetworkIter<Msg> {
170 match self {
171 Network::UnorderedDuplicating(set, _) => NetworkIter::UnorderedDuplicating(set.iter()),
172 Network::UnorderedNonDuplicating(multiset) => {
173 NetworkIter::UnorderedNonDuplicating(None, multiset.iter())
174 }
175 Network::Ordered(map) => NetworkIter::Ordered(None, map.iter()),
176 }
177 }
178
179 pub fn iter_deliverable(&self) -> NetworkDeliverableIter<Msg> {
181 match self {
182 Network::UnorderedDuplicating(set, _) => {
183 NetworkDeliverableIter::UnorderedDuplicating(set.iter())
184 }
185 Network::UnorderedNonDuplicating(multiset) => {
186 NetworkDeliverableIter::UnorderedNonDuplicating(multiset.keys())
187 }
188 Network::Ordered(map) => NetworkDeliverableIter::Ordered(map.iter()),
189 }
190 }
191
192 #[allow(clippy::len_without_is_empty)]
194 pub fn len(&self) -> usize {
195 match self {
196 Network::UnorderedDuplicating(set, _) => set.len(),
197 Network::UnorderedNonDuplicating(multiset) => multiset.values().sum(),
198 Network::Ordered(map) => map.values().map(VecDeque::len).sum(),
199 }
200 }
201
202 pub(crate) fn send(&mut self, envelope: Envelope<Msg>) {
204 match self {
205 Network::UnorderedDuplicating(set, _) => {
206 set.insert(envelope);
207 }
208 Network::UnorderedNonDuplicating(multiset) => {
209 *multiset.entry(envelope).or_insert(0) += 1;
210 }
211 Network::Ordered(map) => {
212 map.entry((envelope.src, envelope.dst))
213 .or_insert_with(|| VecDeque::with_capacity(1))
214 .push_back(envelope.msg);
215 }
216 }
217 }
218
219 pub(crate) fn on_deliver(&mut self, envelope: Envelope<Msg>)
220 where
221 Msg: PartialEq,
222 {
223 match self {
224 Network::UnorderedDuplicating(_, last_msg) => {
225 last_msg.replace(envelope);
228 }
229 Network::UnorderedNonDuplicating(multiset) => match multiset.entry(envelope) {
230 hash_map::Entry::Occupied(mut entry) => {
231 let value = *entry.get();
232 assert!(value > 0);
233 if value == 1 {
234 entry.remove();
235 } else {
236 *entry.get_mut() -= 1;
237 }
238 }
239 hash_map::Entry::Vacant(_) => {
240 panic!("envelope not found");
241 }
242 },
243 Network::Ordered(map) => {
244 let flow_entry = match map.entry((envelope.src, envelope.dst)) {
249 Entry::Vacant(_) => panic!(
250 "flow not found. src={:?}, dst={:?}",
251 envelope.src, envelope.dst
252 ),
253 Entry::Occupied(flow) => flow,
254 };
255 let i = flow_entry
256 .get()
257 .iter()
258 .position(|x| x == &envelope.msg)
259 .expect("message not found");
260 if flow_entry.get().len() > 1 {
261 flow_entry.into_mut().remove(i);
262 } else {
263 flow_entry.remove();
264 }
265 }
266 }
267 }
268
269 pub(crate) fn on_drop(&mut self, envelope: Envelope<Msg>)
270 where
271 Msg: PartialEq,
272 {
273 match self {
274 Network::UnorderedDuplicating(set, _) => {
275 set.remove(&envelope);
276 }
277 Network::UnorderedNonDuplicating(multiset) => match multiset.entry(envelope) {
278 hash_map::Entry::Occupied(mut entry) => {
279 let value = *entry.get();
280 assert!(value > 0);
281 if value == 1 {
282 entry.remove();
283 } else {
284 *entry.get_mut() -= 1;
285 }
286 }
287 hash_map::Entry::Vacant(_) => {
288 panic!("envelope not found");
289 }
290 },
291 Network::Ordered(map) => {
292 let flow_entry = match map.entry((envelope.src, envelope.dst)) {
297 Entry::Vacant(_) => panic!(
298 "flow not found. src={:?}, dst={:?}",
299 envelope.src, envelope.dst
300 ),
301 Entry::Occupied(flow) => flow,
302 };
303 let i = flow_entry
304 .get()
305 .iter()
306 .position(|x| x == &envelope.msg)
307 .expect("message not found");
308 if flow_entry.get().len() > 1 {
309 flow_entry.into_mut().remove(i);
310 } else {
311 flow_entry.remove();
312 }
313 }
314 }
315 }
316}
317
318impl<Msg> FromStr for Network<Msg>
319where
320 Msg: Eq + Hash,
321{
322 type Err = String;
323 fn from_str(s: &str) -> Result<Self, Self::Err> {
324 match s {
325 "ordered" => Ok(Self::new_ordered([])),
326 "unordered_duplicating" => Ok(Self::new_unordered_duplicating([])),
327 "unordered_nonduplicating" => Ok(Self::new_unordered_nonduplicating([])),
328 _ => Err(format!("unable to parse network name: {s}")),
329 }
330 }
331}
332
333impl<Msg> Rewrite<Id> for Network<Msg>
334where
335 Msg: Eq + Hash + Rewrite<Id>,
336{
337 fn rewrite<S>(&self, plan: &RewritePlan<Id, S>) -> Self {
338 match self {
339 Network::UnorderedDuplicating(set, last_msg) => {
340 Network::UnorderedDuplicating(set.rewrite(plan), last_msg.rewrite(plan))
341 }
342 Network::UnorderedNonDuplicating(multiset) => {
343 Network::UnorderedNonDuplicating(multiset.rewrite(plan))
344 }
345 Network::Ordered(map) => Network::Ordered(map.rewrite(plan)),
346 }
347 }
348}
349
350pub enum NetworkIter<'a, Msg> {
351 UnorderedDuplicating(hash_set::Iter<'a, Envelope<Msg>>),
352 UnorderedNonDuplicating(
353 Option<(Envelope<&'a Msg>, usize)>,
355 std::collections::hash_map::Iter<'a, Envelope<Msg>, usize>,
356 ),
357 Ordered(
358 Option<(Id, Id, &'a VecDeque<Msg>, usize)>,
360 btree_map::Iter<'a, (Id, Id), VecDeque<Msg>>,
361 ),
362}
363
364impl<'a, Msg> Iterator for NetworkIter<'a, Msg> {
365 type Item = Envelope<&'a Msg>;
366 fn next(&mut self) -> Option<Self::Item> {
367 match self {
368 NetworkIter::UnorderedDuplicating(it) => it.next().map(|env| Envelope {
369 src: env.src,
370 dst: env.dst,
371 msg: &env.msg,
372 }),
373 NetworkIter::UnorderedNonDuplicating(active, it) => {
374 if let Some((env, count)) = active {
375 let env = *env; *count -= 1;
378 if *count == 0 {
379 *active = None;
380 }
381 return Some(env);
382 }
383 it.next().map(|(env, count)| {
384 let env = Envelope {
385 src: env.src,
386 dst: env.dst,
387 msg: &env.msg,
388 };
389 if *count > 1 {
390 *active = Some((env, *count));
391 }
392 env
393 })
394 }
395 NetworkIter::Ordered(active, it) => {
396 if let Some((src, dst, messages, index)) = active {
397 let msg = messages.get(*index).unwrap(); return Some(Envelope {
399 src: *src,
400 dst: *dst,
401 msg,
402 });
403 }
404 it.next().map(|(&(src, dst), messages)| {
405 let msg = messages.front().unwrap(); *active = Some((src, dst, messages, 0));
407 Envelope { src, dst, msg }
408 })
409 }
410 }
411 }
412}
413
414pub enum NetworkDeliverableIter<'a, Msg> {
415 UnorderedDuplicating(hash_set::Iter<'a, Envelope<Msg>>),
416 UnorderedNonDuplicating(hash_map::Keys<'a, Envelope<Msg>, usize>),
417 Ordered(btree_map::Iter<'a, (Id, Id), VecDeque<Msg>>),
418}
419
420impl<'a, Msg> Iterator for NetworkDeliverableIter<'a, Msg> {
421 type Item = Envelope<&'a Msg>;
422 fn next(&mut self) -> Option<Self::Item> {
423 match self {
424 NetworkDeliverableIter::UnorderedDuplicating(it) => it.next().map(|env| Envelope {
425 src: env.src,
426 dst: env.dst,
427 msg: &env.msg,
428 }),
429 NetworkDeliverableIter::UnorderedNonDuplicating(it) => it.next().map(|env| Envelope {
430 src: env.src,
431 dst: env.dst,
432 msg: &env.msg,
433 }),
434 NetworkDeliverableIter::Ordered(it) => it.next().map(|(&(src, dst), messages)| {
435 let msg = messages.front().expect("empty channel");
436 Envelope { src, dst, msg }
437 }),
438 }
439 }
440}
441
442#[cfg(test)]
443mod test {
444 use super::*;
445 use std::collections::BTreeSet;
446 #[test]
447 fn can_enumerate_and_parse_names() {
448 assert_eq!(
449 Network::<()>::names()
450 .into_iter()
451 .map(Network::<()>::from_str)
452 .map(Result::unwrap)
453 .collect::<BTreeSet<_>>(),
454 vec![
455 Network::new_ordered([]),
456 Network::new_unordered_duplicating([]),
457 Network::new_unordered_nonduplicating([]),
458 ]
459 .into_iter()
460 .collect()
461 );
462 }
463}