1pub use bus::Bus;
2use std::{error::Error, mem};
3use std::{fmt::Debug, sync::mpsc::SendError};
4pub use subscriber::Subscriber;
5mod bus;
6mod subscriber;
7
8pub trait Message
10where
11 Self: Clone,
12{
13 type Discriminant: Discriminant<Self>;
14 fn discriminant(&self) -> Self::Discriminant;
15}
16
17pub trait Discriminant<M>
45where
46 Self: PartialEq + Debug,
47{
48}
49impl<T> Discriminant<T> for mem::Discriminant<T> {}
50
51pub trait Subscription
53where
54 Self: PartialEq,
55{
56 type Event: Message;
57 fn subscribed_to(&self, message: &Self::Event) -> bool;
59 fn discriminant_set(&self) -> &[<Self::Event as Message>::Discriminant];
61 fn send_event(&self, message: Self::Event) -> Result<(), SendError<Self::Event>>;
63}
64
65pub trait TerminationCondition<M: Message> {
67 fn terminates(&mut self, message: &M) -> bool;
68}
69impl<M: Message, T: TerminationCondition<M>> TerminationCondition<M> for Option<T> {
70 fn terminates(&mut self, message: &M) -> bool {
72 if let Some(c) = self {
73 c.terminates(message)
74 } else {
75 false
76 }
77 }
78}
79impl<M: Message> TerminationCondition<M> for () {
80 fn terminates(&mut self, _message: &M) -> bool {
81 false
82 }
83}
84impl<M: Message, F: FnMut(&M) -> bool> TerminationCondition<M> for F {
85 fn terminates(&mut self, message: &M) -> bool {
86 self(message)
87 }
88}
89
90pub trait EventDistributor {
92 type Event: Message;
93 type Error: Error;
94 type EventSender;
95 type SubscriptionSender;
96 type TerminationCondition: TerminationCondition<Self::Event>;
97 fn serve_events(&mut self) -> Result<(), Self::Error>;
99 fn get_event_sink(&self) -> Self::EventSender;
100 fn get_subscrition_sink(&self) -> Self::SubscriptionSender;
101}
102
103mod foo {
104 use crate::{Discriminant, Message};
105 #[derive(Clone)]
107 enum TestMessage {
108 A,
109 B,
110 }
111 impl Message for TestMessage {
112 type Discriminant = std::mem::Discriminant<Self>;
113 fn discriminant(&self) -> Self::Discriminant {
114 std::mem::discriminant(self)
115 }
116 }
117 impl Discriminant<String> for [char; 5] {}
118 impl Message for String {
120 type Discriminant = [char; 5];
121 fn discriminant(&self) -> Self::Discriminant {
122 use std::convert::TryInto;
123 self.chars().take(5).collect::<Vec<_>>().try_into().unwrap()
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use std::{mem::discriminant, sync::mpsc, thread};
132 type TestBus = Bus<TestMessage, Subscriber<TestMessage>, fn(&TestMessage) -> bool>;
133 fn terminator(msg: &TestMessage) -> bool {
134 matches!(msg, TestMessage::Terminate)
135 }
136 #[derive(Clone, PartialEq, Debug)]
137 enum TestMessage {
138 FooEvent,
139 Terminate,
140 }
141 impl Message for TestMessage {
142 type Discriminant = mem::Discriminant<Self>;
143 fn discriminant(&self) -> Self::Discriminant {
144 discriminant(self)
145 }
146 }
147
148 #[test]
149 fn can_accept_subscribers() {
150 let mut bus: TestBus = Bus::new(terminator);
151 assert!(bus.subscribers.is_empty());
152 let subscribe_sink = bus.get_subscrition_sink();
153 let (tx, _) = mpsc::channel();
154 let subscriber = Subscriber::<TestMessage> {
155 sender: tx,
156 discriminant_set: vec![],
157 };
158 let etx = bus.get_event_sink();
159 etx.send(TestMessage::Terminate).unwrap();
160 subscribe_sink.send(subscriber).unwrap();
161 bus.serve_events().unwrap();
162 assert_eq!(bus.subscribers.len(), 1);
163 }
164 #[test]
165 fn subscriber_recieves_events() {
166 let mut bus: TestBus = Bus::new(terminator);
167 let subscribe_sink = bus.get_subscrition_sink();
168 let (tx, rx) = mpsc::channel();
169 let subscriber = Subscriber::<TestMessage> {
170 sender: tx,
171 discriminant_set: vec![TestMessage::FooEvent.discriminant()],
172 };
173 let etx = bus.get_event_sink();
174
175 etx.send(TestMessage::FooEvent).unwrap();
176 subscribe_sink.send(subscriber).unwrap();
177
178 thread::spawn(move || bus.serve_events().unwrap());
179 assert_eq!(rx.recv(), Ok(TestMessage::FooEvent));
180 etx.send(TestMessage::Terminate).unwrap();
181 }
182
183 impl TerminationCondition<TestMessage> for TestMessage {
185 fn terminates(&mut self, message: &TestMessage) -> bool {
186 self == message
187 }
188 }
189 #[test]
190 fn bus_terminated_by_message() {
191 let mut bus: Bus<TestMessage, Subscriber<TestMessage>, TestMessage> =
192 Bus::new(TestMessage::Terminate);
193 let subscribe_sink = bus.get_subscrition_sink();
194 let (tx, rx) = mpsc::channel();
195 let subscriber = Subscriber::<TestMessage> {
196 sender: tx,
197 discriminant_set: vec![TestMessage::FooEvent.discriminant()],
198 };
199 let etx = bus.get_event_sink();
200
201 etx.send(TestMessage::FooEvent).unwrap();
202 subscribe_sink.send(subscriber).unwrap();
203
204 thread::spawn(move || bus.serve_events().unwrap());
205 assert_eq!(rx.recv(), Ok(TestMessage::FooEvent));
206 etx.send(TestMessage::Terminate).unwrap();
207 }
208}