Skip to main content

arc_malachitebft_engine/util/
output_port.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Output ports for publish-subscribe notifications between actors
7//!
8//! This notion extends beyond traditional actors in this that is a publish-subscribe
9//! mechanism we've added in `ractor`. Output ports are ports which can have messages published
10//! to them which are automatically forwarded to downstream actors waiting for inputs. They optionally
11//! have a message transformer attached to them to convert them to the appropriate message type
12
13use std::sync::RwLock;
14
15use ractor::concurrency::JoinHandle;
16use ractor::{ActorRef, Message};
17use tokio::sync::broadcast as pubsub;
18
19/// Output messages, since they need to be replicated, require [Clone] in addition
20/// to the base [Message] constraints
21pub trait OutputMessage: Message + Clone {}
22impl<T: Message + Clone> OutputMessage for T {}
23
24/// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
25/// It allows actors to emit messages without knowing which downstream actors are subscribed.
26///
27/// You can subscribe to the output port with an [ActorRef] and a message converter from the output
28/// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
29/// be dropped and if the output port is dropped, then the subscription will also be dropped
30/// automatically.
31pub struct OutputPort<TMsg>
32where
33    TMsg: OutputMessage,
34{
35    tx: pubsub::Sender<Option<TMsg>>,
36    subscriptions: RwLock<Vec<OutputPortSubscription>>,
37}
38
39impl<TMsg> Default for OutputPort<TMsg>
40where
41    TMsg: OutputMessage,
42{
43    fn default() -> Self {
44        // We only need enough buffer for the subscription task to forward to the input port
45        // of the receiving actor. Hence 10 should be plenty.
46        Self::with_capacity(10)
47    }
48}
49
50impl<TMsg> OutputPort<TMsg>
51where
52    TMsg: OutputMessage,
53{
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    pub fn with_capacity(capacity: usize) -> Self {
59        let (tx, _rx) = pubsub::channel(capacity);
60        Self {
61            tx,
62            subscriptions: RwLock::new(vec![]),
63        }
64    }
65
66    /// Subscribe to the output port, passing in a converter to convert to the input message
67    /// of another actor
68    ///
69    /// * `receiver` - The reference to the actor which will receive forwarded messages
70    /// * `converter` - The converter which will convert the output message type to the
71    ///   receiver's input type and return [Some(_)] if the message should be forwarded, [None]
72    ///   if the message should be skipped.
73    pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
74    where
75        F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
76        TReceiverMsg: Message,
77    {
78        let mut subs = self.subscriptions.write().unwrap();
79
80        // filter out dead subscriptions, since they're no longer valid
81        subs.retain(|sub| !sub.is_dead());
82
83        let sub = OutputPortSubscription::new::<TMsg, F, TReceiverMsg>(
84            self.tx.subscribe(),
85            converter,
86            receiver,
87        );
88        subs.push(sub);
89    }
90
91    /// Send a message on the output port
92    ///
93    /// * `msg`: The message to send
94    pub fn send(&self, msg: TMsg) {
95        if self.tx.receiver_count() > 0 {
96            let _ = self.tx.send(Some(msg));
97        }
98    }
99}
100
101impl<TMsg> Drop for OutputPort<TMsg>
102where
103    TMsg: OutputMessage,
104{
105    fn drop(&mut self) {
106        let mut subs = self.subscriptions.write().unwrap();
107        for sub in subs.iter_mut() {
108            sub.stop();
109        }
110        subs.clear();
111    }
112}
113
114// ============== Subscription implementation ============== //
115
116/// The output port's subscription handle. It holds a handle to a [JoinHandle]
117/// which listens to the [pubsub::Receiver] to see if there's a new message, and if there is
118/// forwards it to the [ActorRef] asynchronously using the specified converter.
119struct OutputPortSubscription {
120    handle: JoinHandle<()>,
121}
122
123impl OutputPortSubscription {
124    /// Determine if the subscription is dead
125    pub fn is_dead(&self) -> bool {
126        self.handle.is_finished()
127    }
128
129    /// Stop the subscription, by aborting the underlying [JoinHandle]
130    pub fn stop(&mut self) {
131        self.handle.abort();
132    }
133
134    /// Create a new subscription
135    pub fn new<TMsg, F, TReceiverMsg>(
136        mut port: pubsub::Receiver<Option<TMsg>>,
137        converter: F,
138        receiver: ActorRef<TReceiverMsg>,
139    ) -> Self
140    where
141        TMsg: OutputMessage,
142        F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
143        TReceiverMsg: Message,
144    {
145        let handle = ractor::concurrency::spawn(async move {
146            loop {
147                match port.recv().await {
148                    Err(tokio::sync::broadcast::error::RecvError::Lagged(l)) => {
149                        tracing::warn!("Output port is lagging, we've dropped {l} messages!");
150                    }
151                    Ok(Some(msg)) => {
152                        if let Some(new_msg) = converter(msg) {
153                            if receiver.cast(new_msg).is_err() {
154                                // kill the subscription process, as the forwarding agent is stopped
155                                return;
156                            }
157                        }
158                    }
159                    Ok(None) => {
160                        // skip this message
161                    }
162                    Err(tokio::sync::broadcast::error::RecvError::Closed) => {
163                        tracing::warn!("Subscription is dying due to closed channel!");
164                        return;
165                    }
166                }
167            }
168        });
169
170        Self { handle }
171    }
172}
173
174/// Represents a boxed `ActorRef` subscriber capable of handling messages from a
175/// publisher via an `OutputPort`, employing a publish-subscribe pattern to
176/// decouple message broadcasting from handling. For a subscriber `ActorRef` to
177/// function as an `OutputPortSubscriber<T>`, its message type must implement
178/// `From<T>` to convert the published message type to its own message format.
179///
180/// # Example
181/// ```ignore
182/// // First, define the publisher's message types, including a variant for
183/// // subscribing `OutputPortSubscriber`s and another for publishing messages:
184/// use ractor::{
185///     cast,
186///     port::{OutputPort, OutputPortSubscriber},
187///     Actor, ActorProcessingErr, ActorRef, Message,
188/// };
189///
190/// enum PublisherMessage {
191///     Publish(u8),                         // Message type for publishing
192///     Subscribe(OutputPortSubscriber<u8>), // Message type for subscribing an actor to the output port
193/// }
194///
195/// impl Message for PublisherMessage {
196///     fn serializable() -> bool {
197///         false
198///     }
199/// }
200///
201/// // In the publisher actor's `handle` function, handle subscription requests and
202/// // publish messages accordingly:
203///
204/// struct Publisher;
205/// struct State {
206///     output_port: OutputPort<u8>,
207/// }
208///
209/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
210/// impl Actor for Publisher {
211///     type State = State;
212///     type Msg = PublisherMessage;
213///     type Arguments = ();
214///
215///     async fn pre_start(
216///         &self,
217///         _myself: ActorRef<Self::Msg>,
218///         _: (),
219///     ) -> Result<Self::State, ActorProcessingErr> {
220///         Ok(State {
221///             output_port: OutputPort::default(),
222///         })
223///     }
224///
225///     async fn handle(
226///         &self,
227///         _myself: ActorRef<Self::Msg>,
228///         message: Self::Msg,
229///         state: &mut Self::State,
230///     ) -> Result<(), ActorProcessingErr> {
231///         match message {
232///             PublisherMessage::Subscribe(subscriber) => {
233///                 // Subscribes the `OutputPortSubscriber` wrapped actor to the `OutputPort`
234///                 subscriber.subscribe_to_port(&state.output_port);
235///             }
236///             PublisherMessage::Publish(value) => {
237///                 // Broadcasts the `u8` value to all subscribed actors, which will handle the type conversion
238///                 state.output_port.send(value);
239///             }
240///         }
241///         Ok(())
242///     }
243/// }
244///
245/// // The subscriber's message type demonstrates how to transform the publisher's
246/// // message type by implementing `From<T>`:
247///
248/// #[derive(Debug)]
249/// enum SubscriberMessage {
250///     Handle(String), // Subscriber's intent for message handling
251/// }
252///
253/// impl Message for SubscriberMessage {
254///     fn serializable() -> bool {
255///         false
256///     }
257/// }
258///
259/// impl From<u8> for SubscriberMessage {
260///     fn from(value: u8) -> Self {
261///         SubscriberMessage::Handle(value.to_string()) // Converts u8 to String
262///     }
263/// }
264///
265/// // To subscribe a subscriber actor to the publisher and broadcast a message:
266/// struct Subscriber;
267/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
268/// impl Actor for Subscriber {
269///     type State = ();
270///     type Msg = SubscriberMessage;
271///     type Arguments = ();
272///
273///     async fn pre_start(
274///         &self,
275///         _myself: ActorRef<Self::Msg>,
276///         _: (),
277///     ) -> Result<Self::State, ActorProcessingErr> {
278///         Ok(())
279///     }
280///
281///     async fn handle(
282///         &self,
283///         _myself: ActorRef<Self::Msg>,
284///         message: Self::Msg,
285///         _state: &mut Self::State,
286///     ) -> Result<(), ActorProcessingErr> {
287///         Ok(())
288///     }
289/// }
290/// async fn example() {
291///     let (publisher_actor_ref, publisher_actor_handle) =
292///         Actor::spawn(None, Publisher, ()).await.unwrap();
293///     let (subscriber_actor_ref, subscriber_actor_handle) =
294///         Actor::spawn(None, Subscriber, ()).await.unwrap();
295///
296///     publisher_actor_ref
297///         .send_message(PublisherMessage::Subscribe(Box::new(subscriber_actor_ref)))
298///         .unwrap();
299///
300///     // Broadcasting a message to all subscribers
301///     publisher_actor_ref
302///         .send_message(PublisherMessage::Publish(123))
303///         .unwrap();
304///
305///     publisher_actor_handle.await.unwrap();
306///     subscriber_actor_handle.await.unwrap();
307/// }
308/// ```
309pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
310
311/// A trait for subscribing to an [OutputPort]
312pub trait OutputPortSubscriberTrait<I>: Send
313where
314    I: Message + Clone,
315{
316    /// Subscribe to the output port
317    fn subscribe_to_port(&self, port: &OutputPort<I>);
318}
319
320impl<I, O> OutputPortSubscriberTrait<I> for ActorRef<O>
321where
322    I: Message + Clone,
323    O: Message + From<I>,
324{
325    fn subscribe_to_port(&self, port: &OutputPort<I>) {
326        port.subscribe(self.clone(), |msg| Some(O::from(msg)));
327    }
328}