1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use crate::{EventDistributor, Message, Subscription, TerminationCondition};
use std::{
error::Error,
sync::mpsc::{self, Receiver, Sender},
};
#[derive(Debug)]
pub enum BusError {
Error,
}
impl std::fmt::Display for BusError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<Self as std::fmt::Debug>::fmt(self, f)
}
}
impl Error for BusError {}
pub struct Bus<M, S, C>
where
M: Message,
S: Subscription<Event = M>,
C: TerminationCondition<M>,
{
pub(crate) event_reciever: Receiver<M>,
event_sender: Sender<M>,
subscription_reciever: Receiver<S>,
subscription_sender: Sender<S>,
termination_condition: C,
pub(crate) subscribers: Vec<S>,
}
impl<M, S, C> Bus<M, S, C>
where
M: Message,
S: Subscription<Event = M>,
C: TerminationCondition<M>,
{
pub fn new(cond: C) -> Self {
let (etx, erx) = mpsc::channel();
let (stx, srx) = mpsc::channel();
Self {
event_reciever: erx,
event_sender: etx,
subscription_reciever: srx,
subscription_sender: stx,
termination_condition: cond,
subscribers: Vec::new(),
}
}
}
impl<M, S, C> EventDistributor for Bus<M, S, C>
where
M: Message + std::fmt::Debug,
S: Subscription<Event = M> + std::fmt::Debug,
C: TerminationCondition<M>,
{
type Event = M;
type Error = BusError;
type EventSender = Sender<M>;
type SubscriptionSender = Sender<S>;
type TerminationCondition = C;
fn serve_events(&mut self) -> Result<(), Self::Error> {
'outer: loop {
while let Ok(subscriber) = self.subscription_reciever.try_recv() {
self.subscribers.push(subscriber);
}
while let Ok(message) = self.event_reciever.try_recv() {
if self.termination_condition.terminates(&message) {
break 'outer;
};
let mut should_remove = vec![];
for (idx, subscriber) in self
.subscribers
.iter()
.filter(|s| s.subscribed_to(&message))
.enumerate()
{
if subscriber.send_event(message.clone()).is_err() {
should_remove.push(idx)
}
}
for idx in should_remove {
self.subscribers.remove(idx);
}
}
}
Ok(())
}
fn get_event_sink(&self) -> Self::EventSender {
self.event_sender.clone()
}
fn get_subscrition_sink(&self) -> Self::SubscriptionSender {
self.subscription_sender.clone()
}
}