Skip to main content

ractor/port/
output.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Output ports for publish-subscribe notifications between actors
7//!
8//! This notion extends beyond traditional actors in this that is a publish-subscribe
9//! mechanism we've added in `ractor`. Output ports are ports which can have messages published
10//! to them which are automatically forwarded to downstream actors waiting for inputs. They optionally
11//! have a message transformer attached to them to convert them to the appropriate message type
12//!
13//! There are two different implementation. If the feature `output-port-v2` is not specified
14//! the implementation use a broadcast channel that is limited to 10 messages successively sent
15//! for each susbscribed actor. That means that if 10 messages are sent to the output port successively
16//! there is a high probably that any subscriber receive all messages.
17//!
18//! The new implementation that is accessible using `output-port-v2` use a fan-out task that distributes
19//! message to all subscriber ensuring that all messages are received by all subscribers
20
21use crate::ActorRef;
22use crate::Message;
23
24#[cfg(test)]
25mod tests;
26
27/// Output messages, since they need to be replicated, require [Clone] in addition
28/// to the base [Message] constraints
29pub trait OutputMessage: Message + Clone {}
30impl<T: Message + Clone> OutputMessage for T {}
31
32#[cfg(not(feature = "output-port-v2"))]
33pub use v1::OutputPort;
34
35#[cfg(feature = "output-port-v2")]
36pub use v2::OutputPort;
37
38#[cfg(not(feature = "output-port-v2"))]
39mod v1 {
40    use std::fmt::Debug;
41    use std::sync::RwLock;
42
43    use tokio::sync::broadcast as pubsub;
44
45    use crate::concurrency::JoinHandle;
46    use crate::{ActorRef, Message, OutputMessage};
47
48    /// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
49    /// It allows actors to emit messages without knowing which downstream actors are subscribed.
50    ///
51    /// You can subscribe to the output port with an [ActorRef] and a message converter from the output
52    /// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
53    /// be dropped and if the output port is dropped, then the subscription will also be dropped
54    /// automatically.
55    pub struct OutputPort<TMsg>
56    where
57        TMsg: OutputMessage,
58    {
59        tx: pubsub::Sender<Option<TMsg>>,
60        subscriptions: RwLock<Vec<OutputPortSubscription>>,
61    }
62
63    impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
64        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65            write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
66        }
67    }
68
69    impl<TMsg> Default for OutputPort<TMsg>
70    where
71        TMsg: OutputMessage,
72    {
73        fn default() -> Self {
74            // We only need enough buffer for the subscription task to forward to the input port
75            // of the receiving actor. Hence 10 should be plenty.
76            let (tx, _rx) = pubsub::channel(10);
77            Self {
78                tx,
79                subscriptions: RwLock::new(vec![]),
80            }
81        }
82    }
83
84    impl<TMsg> OutputPort<TMsg>
85    where
86        TMsg: OutputMessage,
87    {
88        /// Subscribe to the output port, passing in a converter to convert to the input message
89        /// of another actor
90        ///
91        /// * `receiver` - The reference to the actor which will receive forwarded messages
92        /// * `converter` - The converter which will convert the output message type to the
93        ///   receiver's input type and return [Some(_)] if the message should be forwarded, [None]
94        ///   if the message should be skipped.
95        pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
96        where
97            F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
98            TReceiverMsg: Message,
99        {
100            let mut subs = self.subscriptions.write().unwrap();
101
102            // filter out dead subscriptions, since they're no longer valid
103            subs.retain(|sub| !sub.is_dead());
104
105            let sub = OutputPortSubscription::new::<TMsg, F, TReceiverMsg>(
106                self.tx.subscribe(),
107                converter,
108                receiver,
109            );
110            subs.push(sub);
111        }
112
113        /// Send a message on the output port
114        ///
115        /// * `msg`: The message to send
116        pub fn send(&self, msg: TMsg) {
117            if self.tx.receiver_count() > 0 {
118                let _ = self.tx.send(Some(msg));
119            }
120        }
121    }
122
123    // ============== Subscription implementation ============== //
124
125    /// The output port's subscription handle. It holds a handle to a [JoinHandle]
126    /// which listens to the [pubsub::Receiver] to see if there's a new message, and if there is
127    /// forwards it to the [ActorRef] asynchronously using the specified converter.
128    struct OutputPortSubscription {
129        handle: JoinHandle<()>,
130    }
131
132    impl OutputPortSubscription {
133        /// Determine if the subscription is dead
134        pub(crate) fn is_dead(&self) -> bool {
135            self.handle.is_finished()
136        }
137
138        /// Create a new subscription
139        pub(crate) fn new<TMsg, F, TReceiverMsg>(
140            mut port: pubsub::Receiver<Option<TMsg>>,
141            converter: F,
142            receiver: ActorRef<TReceiverMsg>,
143        ) -> Self
144        where
145            TMsg: OutputMessage,
146            F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
147            TReceiverMsg: Message,
148        {
149            let handle = crate::concurrency::spawn(async move {
150                while let Ok(Some(msg)) = port.recv().await {
151                    if let Some(new_msg) = converter(msg) {
152                        if receiver.cast(new_msg).is_err() {
153                            // kill the subscription process, as the forwarding agent is stopped
154                            return;
155                        }
156                    }
157                }
158            });
159
160            Self { handle }
161        }
162    }
163}
164
165#[cfg(feature = "output-port-v2")]
166mod v2 {
167    use crate::{ActorId, ActorRef, Message, OutputMessage};
168    use std::fmt::Debug;
169
170    /// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
171    /// It allows actors to emit messages without knowing which downstream actors are subscribed.
172    ///
173    /// You can subscribe to the output port with an [ActorRef] and a message converter from the output
174    /// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
175    /// be dropped and if the output port is dropped, then the subscription will also be dropped
176    /// automatically.
177    pub struct OutputPort<TMsg>
178    where
179        TMsg: OutputMessage,
180    {
181        inner: inner::OutputPort<ActorId, TMsg>,
182    }
183
184    impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
185        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186            write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
187        }
188    }
189
190    impl<TMsg> Default for OutputPort<TMsg>
191    where
192        TMsg: OutputMessage,
193    {
194        fn default() -> Self {
195            Self {
196                inner: inner::OutputPort::default(),
197            }
198        }
199    }
200
201    impl<TMsg> OutputPort<TMsg>
202    where
203        TMsg: OutputMessage,
204    {
205        /// Subscribe to the output port, passing in a converter to convert to the input message
206        /// of another actor
207        ///
208        /// * `receiver` - The reference to the actor which will receive forwarded messages
209        /// * `converter` - The converter which will convert the output message type to the
210        ///   receiver's input type and return [Some(_)] if the message should be forwarded, [None]
211        ///   if the message should be skipped.
212        pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
213        where
214            F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
215            TReceiverMsg: Message,
216        {
217            self.inner.subscribe(receiver, converter)
218        }
219
220        /// Send a message on the output port
221        ///
222        /// * `msg`: The message to send
223        pub fn send(&self, msg: TMsg) {
224            self.inner.send(msg)
225        }
226    }
227
228    mod inner {
229
230        use super::OutputMessage;
231        use crate::concurrency::{mpsc_unbounded, MpscUnboundedSender};
232        //use crate::concurrency::{mpsc_unbounded, oneshot, MpscUnboundedSender, OneshotSender};
233        use crate::{ActorId, ActorRef, DerivedActorRef, Message};
234
235        #[cfg(feature = "tokio_runtime")]
236        /// As we do a lot of iteratio without calling async
237        /// method while dispatching message, we consume 1 tokio
238        /// task budget unit every CONSUMBE_BUDGET_FACTOR message sent
239        const CONSUME_BUDGET_FACTOR: u32 = 32;
240        /// Each subscriber may receive a batch of MAX_BATCH_SIZE
241        /// before another batch is sent to an other subscriber
242        const MAX_BATCH_SIZE: usize = 32;
243
244        enum OutportMessage<Id, TMsg> {
245            Data(TMsg),
246            SetSubscriber(Option<Box<dyn Subscriber<Id, TMsg>>>),
247            //RemoveSubscriber(Id),
248            //Subscribers(OneshotSender<Vec<Id>>),
249        }
250
251        pub(super) trait Subscriber<Id, TMsg: OutputMessage>: Send + 'static {
252            // return false if the subscriber should be
253            // removed
254            fn send(&self, value: &TMsg) -> bool;
255            fn id(&self) -> Id;
256        }
257
258        #[derive(Debug, Clone)]
259        pub(super) struct OutputPort<Id, TMsg>(MpscUnboundedSender<OutportMessage<Id, TMsg>>);
260
261        impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> Default
262            for OutputPort<Id, TMsg>
263        {
264            fn default() -> Self {
265                Self::new(true)
266            }
267        }
268
269        impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> OutputPort<Id, TMsg> {
270            pub(super) fn new(allow_duplicate_subscription: bool) -> Self {
271                let (tx, mut rx) = mpsc_unbounded::<OutportMessage<Id, TMsg>>();
272
273                crate::concurrency::spawn(async move {
274                    let mut subscribers = Vec::<(Id, Box<dyn Subscriber<Id, TMsg>>)>::new();
275                    let mut batch = Vec::new();
276                    //NB: This algorithm may look overcomplicated but it enhances the OuputPort benchmark
277                    // by 70%!!! compared to a trivial algorith... the duration of sending message to the
278                    // output port is divided by 3.
279                    loop {
280                        let l = rx.len().clamp(1, MAX_BATCH_SIZE);
281                        if rx.recv_many(&mut batch, l).await == 0 {
282                            break;
283                        }
284
285                        let mut i = 0;
286                        let mut coop_count = 0u32;
287                        // First we iterate on subscribers already present
288                        // and send to ech all messages in the batch
289                        'subs: while i < subscribers.len() {
290                            // We processes the messages but only
291                            // apply change to subscribers that do affact
292                            // the current subscriber. The aim is to preserve
293                            // the sequentiality a process that would send to
294                            // the output port my expect.
295                            for msg in batch.iter_mut() {
296                                match msg {
297                                    OutportMessage::Data(v) => {
298                                        if !subscribers[i].1.send(v) {
299                                            subscribers.remove(i);
300                                            continue 'subs;
301                                        } else {
302                                            coop_count = coop_count.wrapping_add(1);
303                                            #[cfg(feature = "tokio_runtime")]
304                                            if coop_count % CONSUME_BUDGET_FACTOR == 0 {
305                                                tokio::task::coop::consume_budget().await
306                                            }
307                                        }
308                                    }
309                                    OutportMessage::SetSubscriber(opt_subscriber) => {
310                                        let sid = if let Some(subscriber) = opt_subscriber {
311                                            let sid = subscriber.id();
312                                            if sid != subscribers[i].0 {
313                                                continue;
314                                            }
315                                            sid
316                                        } else {
317                                            continue;
318                                        };
319                                        let subscriber = opt_subscriber.take().unwrap();
320
321                                        // We ensure there is no duplicate subscription
322                                        if !allow_duplicate_subscription {
323                                            if let Some((_, prev_subscriber)) =
324                                                subscribers.iter_mut().find(|(id, _)| id == &sid)
325                                            {
326                                                // In case of duplication, previous subscription is overrided
327                                                *prev_subscriber = subscriber;
328                                            } else {
329                                                subscribers.push((subscriber.id(), subscriber));
330                                            }
331                                        } else {
332                                            subscribers.push((subscriber.id(), subscriber));
333                                        }
334                                    }
335                                }
336                            }
337                            i += 1;
338                        }
339
340                        let i0 = i;
341                        // The for the new subscribers we switch back
342                        // to a less efficient algrorithm we iterate first by messages
343                        // then by subscribers to ensure expected sequentiality.
344                        for msg in batch.drain(..) {
345                            match msg {
346                                OutportMessage::Data(v) => {
347                                    // We do not want to hold a reference to dyn Subscriber
348                                    // to cross an await, otherwise, Subscriber would need to be Sync.
349                                    // So we iterate by index. This also simplify extraction
350                                    // of subscribers.
351                                    let mut i = i0;
352                                    while i < subscribers.len() {
353                                        if !subscribers[i].1.send(&v) {
354                                            subscribers.remove(i);
355                                        } else {
356                                            i += 1;
357                                            #[cfg(feature = "tokio_runtime")]
358                                            if coop_count % CONSUME_BUDGET_FACTOR == 0 {
359                                                tokio::task::coop::consume_budget().await
360                                            }
361                                            coop_count = coop_count.wrapping_add(1);
362                                        }
363                                    }
364                                }
365                                OutportMessage::SetSubscriber(Some(subscriber)) => {
366                                    let sid = subscriber.id();
367
368                                    // We ensure there is no duplicate subscription
369                                    if !allow_duplicate_subscription {
370                                        if let Some((_, prev_subscriber)) =
371                                            subscribers.iter_mut().find(|(id, _)| id == &sid)
372                                        {
373                                            // In case of duplication, previous subscription is overrided
374                                            *prev_subscriber = subscriber;
375                                        } else {
376                                            subscribers.push((subscriber.id(), subscriber));
377                                        }
378                                    } else {
379                                        subscribers.push((subscriber.id(), subscriber));
380                                    }
381                                }
382                                OutportMessage::SetSubscriber(None) => (),
383                            }
384                        }
385                    }
386                });
387
388                Self(tx)
389            }
390
391            pub(super) fn send(&self, value: TMsg) {
392                _ = self.0.send(OutportMessage::Data(value));
393            }
394        }
395
396        impl<TMsg: OutputMessage> OutputPort<ActorId, TMsg> {
397            pub(super) fn subscribe<TReceiverMsg, F>(
398                &self,
399                receiver: ActorRef<TReceiverMsg>,
400                converter: F,
401            ) where
402                F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
403                TReceiverMsg: Message,
404            {
405                self.set_subscriber_with_filter(receiver, move |msg| converter(msg.clone()))
406            }
407
408            pub(super) fn set_subscriber_with_filter<R: ActorReference>(
409                &self,
410                actor_ref: R,
411                filter: impl Fn(&TMsg) -> Option<R::Msg> + Send + 'static,
412            ) {
413                _ = self
414                    .0
415                    .send(OutportMessage::SetSubscriber(Some(Box::new(Filtering {
416                        actor_ref,
417                        filter,
418                    }))));
419            }
420        }
421
422        impl<T: OutputMessage, U: Message> Subscriber<ActorId, T> for ActorRef<U>
423        where
424            U: TryFrom<T>,
425        {
426            fn send(&self, value: &T) -> bool {
427                if let Ok(value) = value.clone().try_into() {
428                    self.send_message(value).is_ok()
429                } else {
430                    true
431                }
432            }
433
434            fn id(&self) -> ActorId {
435                self.get_id()
436            }
437        }
438        impl<T: OutputMessage> Subscriber<ActorId, T> for DerivedActorRef<T> {
439            fn send(&self, value: &T) -> bool {
440                self.send_message(value.clone()).is_ok()
441            }
442
443            fn id(&self) -> ActorId {
444                self.get_id()
445            }
446        }
447        struct Filtering<T, F> {
448            pub actor_ref: T,
449            pub filter: F,
450        }
451        impl<T: ActorReference, U: OutputMessage, F: Fn(&U) -> Option<T::Msg> + Send + 'static>
452            Subscriber<ActorId, U> for Filtering<T, F>
453        {
454            fn send(&self, value: &U) -> bool {
455                if let Some(v) = (self.filter)(value) {
456                    self.actor_ref.send_message(v)
457                } else {
458                    true
459                }
460            }
461
462            fn id(&self) -> ActorId {
463                self.actor_ref.id()
464            }
465        }
466        pub(super) trait ActorReference: Send + Sync + 'static {
467            type Msg: Message;
468            fn send_message(&self, value: Self::Msg) -> bool;
469            fn id(&self) -> ActorId;
470        }
471        impl<T: Message> ActorReference for ActorRef<T> {
472            type Msg = T;
473
474            fn send_message(&self, value: T) -> bool {
475                self.send_message(value).is_ok()
476            }
477
478            fn id(&self) -> ActorId {
479                self.get_id()
480            }
481        }
482        impl<T: Message> ActorReference for DerivedActorRef<T> {
483            type Msg = T;
484
485            fn send_message(&self, value: T) -> bool {
486                self.send_message(value).is_ok()
487            }
488
489            fn id(&self) -> ActorId {
490                self.get_id()
491            }
492        }
493    }
494}
495
496/// Represents a boxed `ActorRef` subscriber capable of handling messages from a
497/// publisher via an `OutputPort`, employing a publish-subscribe pattern to
498/// decouple message broadcasting from handling. For a subscriber `ActorRef` to
499/// function as an `OutputPortSubscriber<T>`, its message type must implement
500/// `From<T>` to convert the published message type to its own message format.
501///
502/// # Example
503/// ```
504/// // First, define the publisher's message types, including a variant for
505/// // subscribing `OutputPortSubscriber`s and another for publishing messages:
506/// use ractor::{
507///     cast,
508///     port::{OutputPort, OutputPortSubscriber},
509///     Actor, ActorProcessingErr, ActorRef, Message,
510/// };
511///
512/// enum PublisherMessage {
513///     Publish(u8),                         // Message type for publishing
514///     Subscribe(OutputPortSubscriber<u8>), // Message type for subscribing an actor to the output port
515/// }
516///
517/// #[cfg(feature = "cluster")]
518/// impl Message for PublisherMessage {
519///     fn serializable() -> bool {
520///         false
521///     }
522/// }
523///
524/// // In the publisher actor's `handle` function, handle subscription requests and
525/// // publish messages accordingly:
526///
527/// struct Publisher;
528/// struct State {
529///     output_port: OutputPort<u8>,
530/// }
531///
532/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
533/// impl Actor for Publisher {
534///     type State = State;
535///     type Msg = PublisherMessage;
536///     type Arguments = ();
537///
538///     async fn pre_start(
539///         &self,
540///         _myself: ActorRef<Self::Msg>,
541///         _: (),
542///     ) -> Result<Self::State, ActorProcessingErr> {
543///         Ok(State {
544///             output_port: OutputPort::default(),
545///         })
546///     }
547///
548///     async fn handle(
549///         &self,
550///         _myself: ActorRef<Self::Msg>,
551///         message: Self::Msg,
552///         state: &mut Self::State,
553///     ) -> Result<(), ActorProcessingErr> {
554///         match message {
555///             PublisherMessage::Subscribe(subscriber) => {
556///                 // Subscribes the `OutputPortSubscriber` wrapped actor to the `OutputPort`
557///                 subscriber.subscribe_to_port(&state.output_port);
558///             }
559///             PublisherMessage::Publish(value) => {
560///                 // Broadcasts the `u8` value to all subscribed actors, which will handle the type conversion
561///                 state.output_port.send(value);
562///             }
563///         }
564///         Ok(())
565///     }
566/// }
567///
568/// // The subscriber's message type demonstrates how to transform the publisher's
569/// // message type by implementing `From<T>`:
570///
571/// #[derive(Debug)]
572/// enum SubscriberMessage {
573///     Handle(String), // Subscriber's intent for message handling
574/// }
575///
576/// #[cfg(feature = "cluster")]
577/// impl Message for SubscriberMessage {
578///     fn serializable() -> bool {
579///         false
580///     }
581/// }
582///
583/// impl From<u8> for SubscriberMessage {
584///     fn from(value: u8) -> Self {
585///         SubscriberMessage::Handle(value.to_string()) // Converts u8 to String
586///     }
587/// }
588///
589/// // To subscribe a subscriber actor to the publisher and broadcast a message:
590/// struct Subscriber;
591/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
592/// impl Actor for Subscriber {
593///     type State = ();
594///     type Msg = SubscriberMessage;
595///     type Arguments = ();
596///
597///     async fn pre_start(
598///         &self,
599///         _myself: ActorRef<Self::Msg>,
600///         _: (),
601///     ) -> Result<Self::State, ActorProcessingErr> {
602///         Ok(())
603///     }
604///
605///     async fn handle(
606///         &self,
607///         _myself: ActorRef<Self::Msg>,
608///         message: Self::Msg,
609///         _state: &mut Self::State,
610///     ) -> Result<(), ActorProcessingErr> {
611///         Ok(())
612///     }
613/// }
614/// async fn example() {
615///     let (publisher_actor_ref, publisher_actor_handle) =
616///         Actor::spawn(None, Publisher, ()).await.unwrap();
617///     let (subscriber_actor_ref, subscriber_actor_handle) =
618///         Actor::spawn(None, Subscriber, ()).await.unwrap();
619///
620///     publisher_actor_ref
621///         .send_message(PublisherMessage::Subscribe(Box::new(subscriber_actor_ref)))
622///         .unwrap();
623///
624///     // Broadcasting a message to all subscribers
625///     publisher_actor_ref
626///         .send_message(PublisherMessage::Publish(123))
627///         .unwrap();
628///
629///     publisher_actor_handle.await.unwrap();
630///     subscriber_actor_handle.await.unwrap();
631/// }
632/// ```
633pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
634/// A trait for subscribing to an [OutputPort]
635pub trait OutputPortSubscriberTrait<I>: Send
636where
637    I: Message + Clone,
638{
639    /// Subscribe to the output port
640    fn subscribe_to_port(&self, port: &OutputPort<I>);
641}
642
643impl<I, O> OutputPortSubscriberTrait<I> for ActorRef<O>
644where
645    I: Message + Clone,
646    O: Message + From<I>,
647{
648    fn subscribe_to_port(&self, port: &OutputPort<I>) {
649        port.subscribe(self.clone(), |msg| Some(O::from(msg)));
650    }
651}