1#![warn(rust_2018_idioms)]
7#![warn(rust_2021_compatibility)]
8#![warn(missing_debug_implementations)]
9#![warn(unreachable_pub)]
10#![warn(unsafe_code)]
11#![warn(rustdoc::broken_intra_doc_links)]
12#![warn(clippy::pedantic)]
13#![warn(clippy::clone_on_ref_ptr)]
15#![warn(clippy::self_named_module_files)]
16#![allow(clippy::default_trait_access)]
18#![allow(clippy::module_name_repetitions)]
19#![allow(clippy::missing_errors_doc)] use std::{error::Error as StdError, fmt, future::Future, pin::Pin};
22
23use thiserror::Error;
24use tokio::sync::{
25 broadcast,
26 mpsc::{self, error::SendError},
27 oneshot,
28};
29
30use msr_core::audit::Activity;
31
32pub trait Plugin {
34 type Message;
36
37 type Event;
39
40 fn message_sender(&self) -> MessageSender<Self::Message>;
44
45 fn subscribe_events(&self) -> BroadcastReceiver<Self::Event>;
49
50 fn run(self) -> MessageLoop;
52}
53
54#[allow(missing_debug_implementations)]
55pub struct PluginContainer<M, E> {
56 pub ports: PluginPorts<M, E>,
57 pub message_loop: MessageLoop,
58}
59
60impl<M, E> Plugin for PluginContainer<M, E> {
61 type Message = M;
62 type Event = PublishedEvent<E>;
63
64 fn message_sender(&self) -> MessageSender<Self::Message> {
65 self.ports.message_tx.clone()
66 }
67 fn subscribe_events(&self) -> BroadcastReceiver<Self::Event> {
68 self.ports.event_subscriber.subscribe()
69 }
70 fn run(self) -> MessageLoop {
71 self.message_loop
72 }
73}
74
75pub type MessageLoop = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
76
77#[allow(missing_debug_implementations)]
78pub struct PluginPorts<M, E> {
79 pub message_tx: MessageSender<M>,
80 pub event_subscriber: EventSubscriber<E>,
81}
82
83#[derive(Error, Debug)]
84pub enum PluginError<E: StdError> {
85 #[error("communication error")]
86 Communication,
87
88 #[error("internal error: {0}")]
89 Internal(E),
90}
91
92pub type PluginResult<T, E> = Result<T, PluginError<E>>;
93
94pub type MessageSender<T> = mpsc::UnboundedSender<T>;
100pub type MessageReceiver<T> = mpsc::UnboundedReceiver<T>;
101
102#[must_use]
103pub fn message_channel<T>() -> (MessageSender<T>, MessageReceiver<T>) {
104 mpsc::unbounded_channel()
105}
106
107pub type ReplySender<T> = oneshot::Sender<T>;
112pub type ReplyReceiver<T> = oneshot::Receiver<T>;
113
114#[must_use]
115pub fn reply_channel<T>() -> (oneshot::Sender<T>, oneshot::Receiver<T>) {
116 oneshot::channel()
117}
118
119pub type ResultSender<T, E> = ReplySender<Result<T, E>>;
120pub type ResultReceiver<T, E> = ReplyReceiver<Result<T, E>>;
121
122type BroadcastSender<T> = broadcast::Sender<T>;
127type BroadcastReceiver<T> = broadcast::Receiver<T>;
128
129#[derive(Debug, Clone)]
130pub struct BroadcastSubscriber<T> {
131 sender: BroadcastSender<T>,
132}
133
134impl<T> BroadcastSubscriber<T> {
135 #[must_use]
136 pub fn new(sender: BroadcastSender<T>) -> Self {
137 Self { sender }
138 }
139
140 #[must_use]
141 pub fn subscribe(&self) -> BroadcastReceiver<T> {
142 self.sender.subscribe()
143 }
144}
145
146#[must_use]
147pub fn broadcast_channel<T>(channel_capacity: usize) -> (BroadcastSender<T>, BroadcastSubscriber<T>)
148where
149 T: Clone,
150{
151 let (tx, _) = broadcast::channel(channel_capacity);
152 let subscriber = BroadcastSubscriber::new(tx.clone());
153 (tx, subscriber)
154}
155
156pub type EventPublisherIndexValue = usize;
162
163#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
171pub struct EventPublisherIndex(EventPublisherIndexValue);
172
173impl EventPublisherIndex {
174 #[must_use]
175 pub const fn from_value(value: EventPublisherIndexValue) -> Self {
176 Self(value)
177 }
178
179 #[must_use]
180 pub const fn to_value(self) -> EventPublisherIndexValue {
181 self.0
182 }
183}
184
185impl From<EventPublisherIndexValue> for EventPublisherIndex {
186 fn from(from: EventPublisherIndexValue) -> Self {
187 Self::from_value(from)
188 }
189}
190
191impl From<EventPublisherIndex> for EventPublisherIndexValue {
192 fn from(from: EventPublisherIndex) -> Self {
193 from.to_value()
194 }
195}
196
197#[derive(Debug, Clone)]
198pub struct PublishedEvent<E> {
199 pub published: Activity<EventPublisherIndex>,
200 pub payload: E,
201}
202
203pub type EventSender<E> = broadcast::Sender<PublishedEvent<E>>;
204pub type EventReceiver<E> = broadcast::Receiver<PublishedEvent<E>>;
205pub type EventSubscriber<E> = BroadcastSubscriber<PublishedEvent<E>>;
206
207#[must_use]
208pub fn event_channel<E>(channel_capacity: usize) -> (EventSender<E>, EventSubscriber<E>)
209where
210 E: Clone,
211{
212 broadcast_channel(channel_capacity)
213}
214
215#[derive(Debug, Clone)]
216pub struct EventPubSub<E> {
217 publisher_index: EventPublisherIndex,
218 event_tx: EventSender<E>,
219}
220
221impl<E> EventPubSub<E>
222where
223 E: fmt::Debug + Clone,
224{
225 pub fn new(
226 publisher_index: impl Into<EventPublisherIndex>,
227 channel_capacity: usize,
228 ) -> (Self, EventSubscriber<E>) {
229 let (event_tx, event_subscriber) = event_channel(channel_capacity);
230 (
231 Self {
232 publisher_index: publisher_index.into(),
233 event_tx,
234 },
235 event_subscriber,
236 )
237 }
238
239 pub fn publish_event(&self, payload: E) {
240 let published = Activity::now(self.publisher_index);
241 let event = PublishedEvent { published, payload };
242 self.dispatch_event(event);
243 }
244}
245
246pub trait EventDispatcher<E> {
247 fn dispatch_event(&self, event: E);
248}
249
250impl<E> EventDispatcher<PublishedEvent<E>> for EventPubSub<E>
251where
252 E: fmt::Debug + Clone,
253{
254 fn dispatch_event(&self, event: PublishedEvent<E>) {
255 if let Err(event) = self.event_tx.send(event) {
256 log::debug!("No subscribers for published event {:?}", event);
259 }
260 }
261}
262
263pub fn send_message<M, E>(
268 message: impl Into<M>,
269 message_tx: &MessageSender<M>,
270) -> PluginResult<(), E>
271where
272 M: fmt::Debug,
273 E: StdError,
274{
275 message_tx.send(message.into()).map_err(|send_error| {
276 let SendError(message) = send_error;
277 log::error!("Unexpected send error: Dropping message {:?}", message);
278 PluginError::Communication
279 })
280}
281
282pub fn send_reply<R>(reply_tx: ReplySender<R>, reply: impl Into<R>)
283where
284 R: fmt::Debug,
285{
286 if let Err(reply) = reply_tx.send(reply.into()) {
287 log::info!("Unexpected send error: Dropping reply {:?}", reply);
289 }
290}
291
292pub async fn receive_reply<R, E>(reply_rx: ReplyReceiver<R>) -> PluginResult<R, E>
293where
294 E: StdError,
295{
296 reply_rx.await.map_err(|receive_error| {
297 log::error!("No reply received: {}", receive_error);
298 PluginError::Communication
299 })
300}
301
302pub async fn send_message_receive_reply<M, R, E>(
303 message: impl Into<M>,
304 message_tx: &MessageSender<M>,
305 reply_rx: ReplyReceiver<R>,
306) -> PluginResult<R, E>
307where
308 M: fmt::Debug,
309 E: StdError,
310{
311 send_message(message, message_tx)?;
312 receive_reply(reply_rx).await
313}
314
315pub async fn receive_result<R, E>(result_rx: ResultReceiver<R, E>) -> PluginResult<R, E>
316where
317 E: StdError,
318{
319 receive_reply(result_rx)
320 .await?
321 .map_err(PluginError::Internal)
322}
323
324pub async fn send_message_receive_result<M, R, E>(
325 message: impl Into<M>,
326 message_tx: &MessageSender<M>,
327 result_rx: ResultReceiver<R, E>,
328) -> PluginResult<R, E>
329where
330 M: fmt::Debug,
331 E: StdError,
332{
333 send_message(message, message_tx)?;
334 receive_result(result_rx).await
335}