arc_malachitebft_engine/util/output_port.rs
1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Output ports for publish-subscribe notifications between actors
7//!
8//! This notion extends beyond traditional actors in this that is a publish-subscribe
9//! mechanism we've added in `ractor`. Output ports are ports which can have messages published
10//! to them which are automatically forwarded to downstream actors waiting for inputs. They optionally
11//! have a message transformer attached to them to convert them to the appropriate message type
12
13use std::sync::RwLock;
14
15use ractor::concurrency::JoinHandle;
16use ractor::{ActorRef, Message};
17use tokio::sync::broadcast as pubsub;
18
19/// Output messages, since they need to be replicated, require [Clone] in addition
20/// to the base [Message] constraints
21pub trait OutputMessage: Message + Clone {}
22impl<T: Message + Clone> OutputMessage for T {}
23
24/// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
25/// It allows actors to emit messages without knowing which downstream actors are subscribed.
26///
27/// You can subscribe to the output port with an [ActorRef] and a message converter from the output
28/// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
29/// be dropped and if the output port is dropped, then the subscription will also be dropped
30/// automatically.
31pub struct OutputPort<TMsg>
32where
33 TMsg: OutputMessage,
34{
35 tx: pubsub::Sender<Option<TMsg>>,
36 subscriptions: RwLock<Vec<OutputPortSubscription>>,
37}
38
39impl<TMsg> Default for OutputPort<TMsg>
40where
41 TMsg: OutputMessage,
42{
43 fn default() -> Self {
44 // We only need enough buffer for the subscription task to forward to the input port
45 // of the receiving actor. Hence 10 should be plenty.
46 Self::with_capacity(10)
47 }
48}
49
50impl<TMsg> OutputPort<TMsg>
51where
52 TMsg: OutputMessage,
53{
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn with_capacity(capacity: usize) -> Self {
59 let (tx, _rx) = pubsub::channel(capacity);
60 Self {
61 tx,
62 subscriptions: RwLock::new(vec![]),
63 }
64 }
65
66 /// Subscribe to the output port, passing in a converter to convert to the input message
67 /// of another actor
68 ///
69 /// * `receiver` - The reference to the actor which will receive forwarded messages
70 /// * `converter` - The converter which will convert the output message type to the
71 /// receiver's input type and return [Some(_)] if the message should be forwarded, [None]
72 /// if the message should be skipped.
73 pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
74 where
75 F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
76 TReceiverMsg: Message,
77 {
78 let mut subs = self.subscriptions.write().unwrap();
79
80 // filter out dead subscriptions, since they're no longer valid
81 subs.retain(|sub| !sub.is_dead());
82
83 let sub = OutputPortSubscription::new::<TMsg, F, TReceiverMsg>(
84 self.tx.subscribe(),
85 converter,
86 receiver,
87 );
88 subs.push(sub);
89 }
90
91 /// Send a message on the output port
92 ///
93 /// * `msg`: The message to send
94 pub fn send(&self, msg: TMsg) {
95 if self.tx.receiver_count() > 0 {
96 let _ = self.tx.send(Some(msg));
97 }
98 }
99}
100
101impl<TMsg> Drop for OutputPort<TMsg>
102where
103 TMsg: OutputMessage,
104{
105 fn drop(&mut self) {
106 let mut subs = self.subscriptions.write().unwrap();
107 for sub in subs.iter_mut() {
108 sub.stop();
109 }
110 subs.clear();
111 }
112}
113
114// ============== Subscription implementation ============== //
115
116/// The output port's subscription handle. It holds a handle to a [JoinHandle]
117/// which listens to the [pubsub::Receiver] to see if there's a new message, and if there is
118/// forwards it to the [ActorRef] asynchronously using the specified converter.
119struct OutputPortSubscription {
120 handle: JoinHandle<()>,
121}
122
123impl OutputPortSubscription {
124 /// Determine if the subscription is dead
125 pub fn is_dead(&self) -> bool {
126 self.handle.is_finished()
127 }
128
129 /// Stop the subscription, by aborting the underlying [JoinHandle]
130 pub fn stop(&mut self) {
131 self.handle.abort();
132 }
133
134 /// Create a new subscription
135 pub fn new<TMsg, F, TReceiverMsg>(
136 mut port: pubsub::Receiver<Option<TMsg>>,
137 converter: F,
138 receiver: ActorRef<TReceiverMsg>,
139 ) -> Self
140 where
141 TMsg: OutputMessage,
142 F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
143 TReceiverMsg: Message,
144 {
145 let handle = ractor::concurrency::spawn(async move {
146 loop {
147 match port.recv().await {
148 Err(tokio::sync::broadcast::error::RecvError::Lagged(l)) => {
149 tracing::warn!("Output port is lagging, we've dropped {l} messages!");
150 }
151 Ok(Some(msg)) => {
152 if let Some(new_msg) = converter(msg) {
153 if receiver.cast(new_msg).is_err() {
154 // kill the subscription process, as the forwarding agent is stopped
155 return;
156 }
157 }
158 }
159 Ok(None) => {
160 // skip this message
161 }
162 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
163 tracing::warn!("Subscription is dying due to closed channel!");
164 return;
165 }
166 }
167 }
168 });
169
170 Self { handle }
171 }
172}
173
174/// Represents a boxed `ActorRef` subscriber capable of handling messages from a
175/// publisher via an `OutputPort`, employing a publish-subscribe pattern to
176/// decouple message broadcasting from handling. For a subscriber `ActorRef` to
177/// function as an `OutputPortSubscriber<T>`, its message type must implement
178/// `From<T>` to convert the published message type to its own message format.
179///
180/// # Example
181/// ```ignore
182/// // First, define the publisher's message types, including a variant for
183/// // subscribing `OutputPortSubscriber`s and another for publishing messages:
184/// use ractor::{
185/// cast,
186/// port::{OutputPort, OutputPortSubscriber},
187/// Actor, ActorProcessingErr, ActorRef, Message,
188/// };
189///
190/// enum PublisherMessage {
191/// Publish(u8), // Message type for publishing
192/// Subscribe(OutputPortSubscriber<u8>), // Message type for subscribing an actor to the output port
193/// }
194///
195/// impl Message for PublisherMessage {
196/// fn serializable() -> bool {
197/// false
198/// }
199/// }
200///
201/// // In the publisher actor's `handle` function, handle subscription requests and
202/// // publish messages accordingly:
203///
204/// struct Publisher;
205/// struct State {
206/// output_port: OutputPort<u8>,
207/// }
208///
209/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
210/// impl Actor for Publisher {
211/// type State = State;
212/// type Msg = PublisherMessage;
213/// type Arguments = ();
214///
215/// async fn pre_start(
216/// &self,
217/// _myself: ActorRef<Self::Msg>,
218/// _: (),
219/// ) -> Result<Self::State, ActorProcessingErr> {
220/// Ok(State {
221/// output_port: OutputPort::default(),
222/// })
223/// }
224///
225/// async fn handle(
226/// &self,
227/// _myself: ActorRef<Self::Msg>,
228/// message: Self::Msg,
229/// state: &mut Self::State,
230/// ) -> Result<(), ActorProcessingErr> {
231/// match message {
232/// PublisherMessage::Subscribe(subscriber) => {
233/// // Subscribes the `OutputPortSubscriber` wrapped actor to the `OutputPort`
234/// subscriber.subscribe_to_port(&state.output_port);
235/// }
236/// PublisherMessage::Publish(value) => {
237/// // Broadcasts the `u8` value to all subscribed actors, which will handle the type conversion
238/// state.output_port.send(value);
239/// }
240/// }
241/// Ok(())
242/// }
243/// }
244///
245/// // The subscriber's message type demonstrates how to transform the publisher's
246/// // message type by implementing `From<T>`:
247///
248/// #[derive(Debug)]
249/// enum SubscriberMessage {
250/// Handle(String), // Subscriber's intent for message handling
251/// }
252///
253/// impl Message for SubscriberMessage {
254/// fn serializable() -> bool {
255/// false
256/// }
257/// }
258///
259/// impl From<u8> for SubscriberMessage {
260/// fn from(value: u8) -> Self {
261/// SubscriberMessage::Handle(value.to_string()) // Converts u8 to String
262/// }
263/// }
264///
265/// // To subscribe a subscriber actor to the publisher and broadcast a message:
266/// struct Subscriber;
267/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
268/// impl Actor for Subscriber {
269/// type State = ();
270/// type Msg = SubscriberMessage;
271/// type Arguments = ();
272///
273/// async fn pre_start(
274/// &self,
275/// _myself: ActorRef<Self::Msg>,
276/// _: (),
277/// ) -> Result<Self::State, ActorProcessingErr> {
278/// Ok(())
279/// }
280///
281/// async fn handle(
282/// &self,
283/// _myself: ActorRef<Self::Msg>,
284/// message: Self::Msg,
285/// _state: &mut Self::State,
286/// ) -> Result<(), ActorProcessingErr> {
287/// Ok(())
288/// }
289/// }
290/// async fn example() {
291/// let (publisher_actor_ref, publisher_actor_handle) =
292/// Actor::spawn(None, Publisher, ()).await.unwrap();
293/// let (subscriber_actor_ref, subscriber_actor_handle) =
294/// Actor::spawn(None, Subscriber, ()).await.unwrap();
295///
296/// publisher_actor_ref
297/// .send_message(PublisherMessage::Subscribe(Box::new(subscriber_actor_ref)))
298/// .unwrap();
299///
300/// // Broadcasting a message to all subscribers
301/// publisher_actor_ref
302/// .send_message(PublisherMessage::Publish(123))
303/// .unwrap();
304///
305/// publisher_actor_handle.await.unwrap();
306/// subscriber_actor_handle.await.unwrap();
307/// }
308/// ```
309pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
310
311/// A trait for subscribing to an [OutputPort]
312pub trait OutputPortSubscriberTrait<I>: Send
313where
314 I: Message + Clone,
315{
316 /// Subscribe to the output port
317 fn subscribe_to_port(&self, port: &OutputPort<I>);
318}
319
320impl<I, O> OutputPortSubscriberTrait<I> for ActorRef<O>
321where
322 I: Message + Clone,
323 O: Message + From<I>,
324{
325 fn subscribe_to_port(&self, port: &OutputPort<I>) {
326 port.subscribe(self.clone(), |msg| Some(O::from(msg)));
327 }
328}