msr_plugin/
lib.rs

1//! Industrial Automation Toolbox - Plugin Foundation
2
3// FIXME: Enable and switch `missing_docs` from `warn` to `deny` before release
4//#![warn(missing_docs)]
5
6#![warn(rust_2018_idioms)]
7#![warn(rust_2021_compatibility)]
8#![warn(missing_debug_implementations)]
9#![warn(unreachable_pub)]
10#![warn(unsafe_code)]
11#![warn(rustdoc::broken_intra_doc_links)]
12#![warn(clippy::pedantic)]
13// Additional restrictions
14#![warn(clippy::clone_on_ref_ptr)]
15#![warn(clippy::self_named_module_files)]
16// Exceptions
17#![allow(clippy::default_trait_access)]
18#![allow(clippy::module_name_repetitions)]
19#![allow(clippy::missing_errors_doc)] // TODO
20
21use std::{error::Error as StdError, fmt, future::Future, pin::Pin};
22
23use thiserror::Error;
24use tokio::sync::{
25    broadcast,
26    mpsc::{self, error::SendError},
27    oneshot,
28};
29
30use msr_core::audit::Activity;
31
32/// Message-driven plugin
33pub trait Plugin {
34    /// The message type
35    type Message;
36
37    /// The event type
38    type Event;
39
40    /// Endpoint for submitting messages
41    ///
42    /// Returns an endpoint for sending request messages to the plugin.
43    fn message_sender(&self) -> MessageSender<Self::Message>;
44
45    /// Subscribe to plugin events
46    ///
47    /// Returns an endpoint for receiving events published by the plugin.
48    fn subscribe_events(&self) -> BroadcastReceiver<Self::Event>;
49
50    /// Run the message loop
51    fn run(self) -> MessageLoop;
52}
53
54#[allow(missing_debug_implementations)]
55pub struct PluginContainer<M, E> {
56    pub ports: PluginPorts<M, E>,
57    pub message_loop: MessageLoop,
58}
59
60impl<M, E> Plugin for PluginContainer<M, E> {
61    type Message = M;
62    type Event = PublishedEvent<E>;
63
64    fn message_sender(&self) -> MessageSender<Self::Message> {
65        self.ports.message_tx.clone()
66    }
67    fn subscribe_events(&self) -> BroadcastReceiver<Self::Event> {
68        self.ports.event_subscriber.subscribe()
69    }
70    fn run(self) -> MessageLoop {
71        self.message_loop
72    }
73}
74
75pub type MessageLoop = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
76
77#[allow(missing_debug_implementations)]
78pub struct PluginPorts<M, E> {
79    pub message_tx: MessageSender<M>,
80    pub event_subscriber: EventSubscriber<E>,
81}
82
83#[derive(Error, Debug)]
84pub enum PluginError<E: StdError> {
85    #[error("communication error")]
86    Communication,
87
88    #[error("internal error: {0}")]
89    Internal(E),
90}
91
92pub type PluginResult<T, E> = Result<T, PluginError<E>>;
93
94// ------ -------
95//   Messages
96// ------ -------
97
98// TODO: Use bounded channels for backpressure?
99pub type MessageSender<T> = mpsc::UnboundedSender<T>;
100pub type MessageReceiver<T> = mpsc::UnboundedReceiver<T>;
101
102#[must_use]
103pub fn message_channel<T>() -> (MessageSender<T>, MessageReceiver<T>) {
104    mpsc::unbounded_channel()
105}
106
107// ------ -------
108// Reply messages
109// ------ -------
110
111pub type ReplySender<T> = oneshot::Sender<T>;
112pub type ReplyReceiver<T> = oneshot::Receiver<T>;
113
114#[must_use]
115pub fn reply_channel<T>() -> (oneshot::Sender<T>, oneshot::Receiver<T>) {
116    oneshot::channel()
117}
118
119pub type ResultSender<T, E> = ReplySender<Result<T, E>>;
120pub type ResultReceiver<T, E> = ReplyReceiver<Result<T, E>>;
121
122// ------ -------
123//  Broadcasting
124// ------ -------
125
126type BroadcastSender<T> = broadcast::Sender<T>;
127type BroadcastReceiver<T> = broadcast::Receiver<T>;
128
129#[derive(Debug, Clone)]
130pub struct BroadcastSubscriber<T> {
131    sender: BroadcastSender<T>,
132}
133
134impl<T> BroadcastSubscriber<T> {
135    #[must_use]
136    pub fn new(sender: BroadcastSender<T>) -> Self {
137        Self { sender }
138    }
139
140    #[must_use]
141    pub fn subscribe(&self) -> BroadcastReceiver<T> {
142        self.sender.subscribe()
143    }
144}
145
146#[must_use]
147pub fn broadcast_channel<T>(channel_capacity: usize) -> (BroadcastSender<T>, BroadcastSubscriber<T>)
148where
149    T: Clone,
150{
151    let (tx, _) = broadcast::channel(channel_capacity);
152    let subscriber = BroadcastSubscriber::new(tx.clone());
153    (tx, subscriber)
154}
155
156// ----- ------
157//    Events
158// ----- ------
159
160/// Internal index into a lookup table with event publisher metadata
161pub type EventPublisherIndexValue = usize;
162
163/// Numeric identifier of an event publisher control cycle
164///
165/// Uniquely identifies an event publisher in the system at runtime.
166///
167/// The value is supposed to be used as a key or index to retrieve
168/// extended metadata for an event publisher that does not need to
169/// be sent with every event. This metadata is probably immutable.
170#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
171pub struct EventPublisherIndex(EventPublisherIndexValue);
172
173impl EventPublisherIndex {
174    #[must_use]
175    pub const fn from_value(value: EventPublisherIndexValue) -> Self {
176        Self(value)
177    }
178
179    #[must_use]
180    pub const fn to_value(self) -> EventPublisherIndexValue {
181        self.0
182    }
183}
184
185impl From<EventPublisherIndexValue> for EventPublisherIndex {
186    fn from(from: EventPublisherIndexValue) -> Self {
187        Self::from_value(from)
188    }
189}
190
191impl From<EventPublisherIndex> for EventPublisherIndexValue {
192    fn from(from: EventPublisherIndex) -> Self {
193        from.to_value()
194    }
195}
196
197#[derive(Debug, Clone)]
198pub struct PublishedEvent<E> {
199    pub published: Activity<EventPublisherIndex>,
200    pub payload: E,
201}
202
203pub type EventSender<E> = broadcast::Sender<PublishedEvent<E>>;
204pub type EventReceiver<E> = broadcast::Receiver<PublishedEvent<E>>;
205pub type EventSubscriber<E> = BroadcastSubscriber<PublishedEvent<E>>;
206
207#[must_use]
208pub fn event_channel<E>(channel_capacity: usize) -> (EventSender<E>, EventSubscriber<E>)
209where
210    E: Clone,
211{
212    broadcast_channel(channel_capacity)
213}
214
215#[derive(Debug, Clone)]
216pub struct EventPubSub<E> {
217    publisher_index: EventPublisherIndex,
218    event_tx: EventSender<E>,
219}
220
221impl<E> EventPubSub<E>
222where
223    E: fmt::Debug + Clone,
224{
225    pub fn new(
226        publisher_index: impl Into<EventPublisherIndex>,
227        channel_capacity: usize,
228    ) -> (Self, EventSubscriber<E>) {
229        let (event_tx, event_subscriber) = event_channel(channel_capacity);
230        (
231            Self {
232                publisher_index: publisher_index.into(),
233                event_tx,
234            },
235            event_subscriber,
236        )
237    }
238
239    pub fn publish_event(&self, payload: E) {
240        let published = Activity::now(self.publisher_index);
241        let event = PublishedEvent { published, payload };
242        self.dispatch_event(event);
243    }
244}
245
246pub trait EventDispatcher<E> {
247    fn dispatch_event(&self, event: E);
248}
249
250impl<E> EventDispatcher<PublishedEvent<E>> for EventPubSub<E>
251where
252    E: fmt::Debug + Clone,
253{
254    fn dispatch_event(&self, event: PublishedEvent<E>) {
255        if let Err(event) = self.event_tx.send(event) {
256            // Ignore all send errors that are expected if no subscribers
257            // are connected.
258            log::debug!("No subscribers for published event {:?}", event);
259        }
260    }
261}
262
263// --------- -----------
264//   Utility functions
265// --------- -----------
266
267pub fn send_message<M, E>(
268    message: impl Into<M>,
269    message_tx: &MessageSender<M>,
270) -> PluginResult<(), E>
271where
272    M: fmt::Debug,
273    E: StdError,
274{
275    message_tx.send(message.into()).map_err(|send_error| {
276        let SendError(message) = send_error;
277        log::error!("Unexpected send error: Dropping message {:?}", message);
278        PluginError::Communication
279    })
280}
281
282pub fn send_reply<R>(reply_tx: ReplySender<R>, reply: impl Into<R>)
283where
284    R: fmt::Debug,
285{
286    if let Err(reply) = reply_tx.send(reply.into()) {
287        // Not an error, may occur if the receiver side has already been dropped
288        log::info!("Unexpected send error: Dropping reply {:?}", reply);
289    }
290}
291
292pub async fn receive_reply<R, E>(reply_rx: ReplyReceiver<R>) -> PluginResult<R, E>
293where
294    E: StdError,
295{
296    reply_rx.await.map_err(|receive_error| {
297        log::error!("No reply received: {}", receive_error);
298        PluginError::Communication
299    })
300}
301
302pub async fn send_message_receive_reply<M, R, E>(
303    message: impl Into<M>,
304    message_tx: &MessageSender<M>,
305    reply_rx: ReplyReceiver<R>,
306) -> PluginResult<R, E>
307where
308    M: fmt::Debug,
309    E: StdError,
310{
311    send_message(message, message_tx)?;
312    receive_reply(reply_rx).await
313}
314
315pub async fn receive_result<R, E>(result_rx: ResultReceiver<R, E>) -> PluginResult<R, E>
316where
317    E: StdError,
318{
319    receive_reply(result_rx)
320        .await?
321        .map_err(PluginError::Internal)
322}
323
324pub async fn send_message_receive_result<M, R, E>(
325    message: impl Into<M>,
326    message_tx: &MessageSender<M>,
327    result_rx: ResultReceiver<R, E>,
328) -> PluginResult<R, E>
329where
330    M: fmt::Debug,
331    E: StdError,
332{
333    send_message(message, message_tx)?;
334    receive_result(result_rx).await
335}