ractor/port/output.rs
1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Output ports for publish-subscribe notifications between actors
7//!
8//! This notion extends beyond traditional actors in this that is a publish-subscribe
9//! mechanism we've added in `ractor`. Output ports are ports which can have messages published
10//! to them which are automatically forwarded to downstream actors waiting for inputs. They optionally
11//! have a message transformer attached to them to convert them to the appropriate message type
12//!
13//! There are two different implementation. If the feature `output-port-v2` is not specified
14//! the implementation use a broadcast channel that is limited to 10 messages successively sent
15//! for each susbscribed actor. That means that if 10 messages are sent to the output port successively
16//! there is a high probably that any subscriber receive all messages.
17//!
18//! The new implementation that is accessible using `output-port-v2` use a fan-out task that distributes
19//! message to all subscriber ensuring that all messages are received by all subscribers
20
21use crate::ActorRef;
22use crate::Message;
23
24#[cfg(test)]
25mod tests;
26
27/// Output messages, since they need to be replicated, require [Clone] in addition
28/// to the base [Message] constraints
29pub trait OutputMessage: Message + Clone {}
30impl<T: Message + Clone> OutputMessage for T {}
31
32#[cfg(not(feature = "output-port-v2"))]
33pub use v1::OutputPort;
34
35#[cfg(feature = "output-port-v2")]
36pub use v2::OutputPort;
37
38#[cfg(not(feature = "output-port-v2"))]
39mod v1 {
40 use std::fmt::Debug;
41 use std::sync::RwLock;
42
43 use tokio::sync::broadcast as pubsub;
44
45 use crate::concurrency::JoinHandle;
46 use crate::{ActorRef, Message, OutputMessage};
47
48 /// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
49 /// It allows actors to emit messages without knowing which downstream actors are subscribed.
50 ///
51 /// You can subscribe to the output port with an [ActorRef] and a message converter from the output
52 /// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
53 /// be dropped and if the output port is dropped, then the subscription will also be dropped
54 /// automatically.
55 pub struct OutputPort<TMsg>
56 where
57 TMsg: OutputMessage,
58 {
59 tx: pubsub::Sender<Option<TMsg>>,
60 subscriptions: RwLock<Vec<OutputPortSubscription>>,
61 }
62
63 impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
66 }
67 }
68
69 impl<TMsg> Default for OutputPort<TMsg>
70 where
71 TMsg: OutputMessage,
72 {
73 fn default() -> Self {
74 // We only need enough buffer for the subscription task to forward to the input port
75 // of the receiving actor. Hence 10 should be plenty.
76 let (tx, _rx) = pubsub::channel(10);
77 Self {
78 tx,
79 subscriptions: RwLock::new(vec![]),
80 }
81 }
82 }
83
84 impl<TMsg> OutputPort<TMsg>
85 where
86 TMsg: OutputMessage,
87 {
88 /// Subscribe to the output port, passing in a converter to convert to the input message
89 /// of another actor
90 ///
91 /// * `receiver` - The reference to the actor which will receive forwarded messages
92 /// * `converter` - The converter which will convert the output message type to the
93 /// receiver's input type and return [Some(_)] if the message should be forwarded, [None]
94 /// if the message should be skipped.
95 pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
96 where
97 F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
98 TReceiverMsg: Message,
99 {
100 let mut subs = self.subscriptions.write().unwrap();
101
102 // filter out dead subscriptions, since they're no longer valid
103 subs.retain(|sub| !sub.is_dead());
104
105 let sub = OutputPortSubscription::new::<TMsg, F, TReceiverMsg>(
106 self.tx.subscribe(),
107 converter,
108 receiver,
109 );
110 subs.push(sub);
111 }
112
113 /// Send a message on the output port
114 ///
115 /// * `msg`: The message to send
116 pub fn send(&self, msg: TMsg) {
117 if self.tx.receiver_count() > 0 {
118 let _ = self.tx.send(Some(msg));
119 }
120 }
121 }
122
123 // ============== Subscription implementation ============== //
124
125 /// The output port's subscription handle. It holds a handle to a [JoinHandle]
126 /// which listens to the [pubsub::Receiver] to see if there's a new message, and if there is
127 /// forwards it to the [ActorRef] asynchronously using the specified converter.
128 struct OutputPortSubscription {
129 handle: JoinHandle<()>,
130 }
131
132 impl OutputPortSubscription {
133 /// Determine if the subscription is dead
134 pub(crate) fn is_dead(&self) -> bool {
135 self.handle.is_finished()
136 }
137
138 /// Create a new subscription
139 pub(crate) fn new<TMsg, F, TReceiverMsg>(
140 mut port: pubsub::Receiver<Option<TMsg>>,
141 converter: F,
142 receiver: ActorRef<TReceiverMsg>,
143 ) -> Self
144 where
145 TMsg: OutputMessage,
146 F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
147 TReceiverMsg: Message,
148 {
149 let handle = crate::concurrency::spawn(async move {
150 while let Ok(Some(msg)) = port.recv().await {
151 if let Some(new_msg) = converter(msg) {
152 if receiver.cast(new_msg).is_err() {
153 // kill the subscription process, as the forwarding agent is stopped
154 return;
155 }
156 }
157 }
158 });
159
160 Self { handle }
161 }
162 }
163}
164
165#[cfg(feature = "output-port-v2")]
166mod v2 {
167 use crate::{ActorId, ActorRef, Message, OutputMessage};
168 use std::fmt::Debug;
169
170 /// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
171 /// It allows actors to emit messages without knowing which downstream actors are subscribed.
172 ///
173 /// You can subscribe to the output port with an [ActorRef] and a message converter from the output
174 /// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
175 /// be dropped and if the output port is dropped, then the subscription will also be dropped
176 /// automatically.
177 pub struct OutputPort<TMsg>
178 where
179 TMsg: OutputMessage,
180 {
181 inner: inner::OutputPort<ActorId, TMsg>,
182 }
183
184 impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
187 }
188 }
189
190 impl<TMsg> Default for OutputPort<TMsg>
191 where
192 TMsg: OutputMessage,
193 {
194 fn default() -> Self {
195 Self {
196 inner: inner::OutputPort::default(),
197 }
198 }
199 }
200
201 impl<TMsg> OutputPort<TMsg>
202 where
203 TMsg: OutputMessage,
204 {
205 /// Subscribe to the output port, passing in a converter to convert to the input message
206 /// of another actor
207 ///
208 /// * `receiver` - The reference to the actor which will receive forwarded messages
209 /// * `converter` - The converter which will convert the output message type to the
210 /// receiver's input type and return [Some(_)] if the message should be forwarded, [None]
211 /// if the message should be skipped.
212 pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
213 where
214 F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
215 TReceiverMsg: Message,
216 {
217 self.inner.subscribe(receiver, converter)
218 }
219
220 /// Send a message on the output port
221 ///
222 /// * `msg`: The message to send
223 pub fn send(&self, msg: TMsg) {
224 self.inner.send(msg)
225 }
226 }
227
228 mod inner {
229
230 use super::OutputMessage;
231 use crate::concurrency::{mpsc_unbounded, MpscUnboundedSender};
232 //use crate::concurrency::{mpsc_unbounded, oneshot, MpscUnboundedSender, OneshotSender};
233 use crate::{ActorId, ActorRef, DerivedActorRef, Message};
234
235 #[cfg(feature = "tokio_runtime")]
236 /// As we do a lot of iteratio without calling async
237 /// method while dispatching message, we consume 1 tokio
238 /// task budget unit every CONSUMBE_BUDGET_FACTOR message sent
239 const CONSUME_BUDGET_FACTOR: u32 = 32;
240 /// Each subscriber may receive a batch of MAX_BATCH_SIZE
241 /// before another batch is sent to an other subscriber
242 const MAX_BATCH_SIZE: usize = 32;
243
244 enum OutportMessage<Id, TMsg> {
245 Data(TMsg),
246 SetSubscriber(Option<Box<dyn Subscriber<Id, TMsg>>>),
247 //RemoveSubscriber(Id),
248 //Subscribers(OneshotSender<Vec<Id>>),
249 }
250
251 pub(super) trait Subscriber<Id, TMsg: OutputMessage>: Send + 'static {
252 // return false if the subscriber should be
253 // removed
254 fn send(&self, value: &TMsg) -> bool;
255 fn id(&self) -> Id;
256 }
257
258 #[derive(Debug, Clone)]
259 pub(super) struct OutputPort<Id, TMsg>(MpscUnboundedSender<OutportMessage<Id, TMsg>>);
260
261 impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> Default
262 for OutputPort<Id, TMsg>
263 {
264 fn default() -> Self {
265 Self::new(true)
266 }
267 }
268
269 impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> OutputPort<Id, TMsg> {
270 pub(super) fn new(allow_duplicate_subscription: bool) -> Self {
271 let (tx, mut rx) = mpsc_unbounded::<OutportMessage<Id, TMsg>>();
272
273 crate::concurrency::spawn(async move {
274 let mut subscribers = Vec::<(Id, Box<dyn Subscriber<Id, TMsg>>)>::new();
275 let mut batch = Vec::new();
276 //NB: This algorithm may look overcomplicated but it enhances the OuputPort benchmark
277 // by 70%!!! compared to a trivial algorith... the duration of sending message to the
278 // output port is divided by 3.
279 loop {
280 let l = rx.len().clamp(1, MAX_BATCH_SIZE);
281 if rx.recv_many(&mut batch, l).await == 0 {
282 break;
283 }
284
285 let mut i = 0;
286 let mut coop_count = 0u32;
287 // First we iterate on subscribers already present
288 // and send to ech all messages in the batch
289 'subs: while i < subscribers.len() {
290 // We processes the messages but only
291 // apply change to subscribers that do affact
292 // the current subscriber. The aim is to preserve
293 // the sequentiality a process that would send to
294 // the output port my expect.
295 for msg in batch.iter_mut() {
296 match msg {
297 OutportMessage::Data(v) => {
298 if !subscribers[i].1.send(v) {
299 subscribers.remove(i);
300 continue 'subs;
301 } else {
302 coop_count = coop_count.wrapping_add(1);
303 #[cfg(feature = "tokio_runtime")]
304 if coop_count % CONSUME_BUDGET_FACTOR == 0 {
305 tokio::task::coop::consume_budget().await
306 }
307 }
308 }
309 OutportMessage::SetSubscriber(opt_subscriber) => {
310 let sid = if let Some(subscriber) = opt_subscriber {
311 let sid = subscriber.id();
312 if sid != subscribers[i].0 {
313 continue;
314 }
315 sid
316 } else {
317 continue;
318 };
319 let subscriber = opt_subscriber.take().unwrap();
320
321 // We ensure there is no duplicate subscription
322 if !allow_duplicate_subscription {
323 if let Some((_, prev_subscriber)) =
324 subscribers.iter_mut().find(|(id, _)| id == &sid)
325 {
326 // In case of duplication, previous subscription is overrided
327 *prev_subscriber = subscriber;
328 } else {
329 subscribers.push((subscriber.id(), subscriber));
330 }
331 } else {
332 subscribers.push((subscriber.id(), subscriber));
333 }
334 }
335 }
336 }
337 i += 1;
338 }
339
340 let i0 = i;
341 // The for the new subscribers we switch back
342 // to a less efficient algrorithm we iterate first by messages
343 // then by subscribers to ensure expected sequentiality.
344 for msg in batch.drain(..) {
345 match msg {
346 OutportMessage::Data(v) => {
347 // We do not want to hold a reference to dyn Subscriber
348 // to cross an await, otherwise, Subscriber would need to be Sync.
349 // So we iterate by index. This also simplify extraction
350 // of subscribers.
351 let mut i = i0;
352 while i < subscribers.len() {
353 if !subscribers[i].1.send(&v) {
354 subscribers.remove(i);
355 } else {
356 i += 1;
357 #[cfg(feature = "tokio_runtime")]
358 if coop_count % CONSUME_BUDGET_FACTOR == 0 {
359 tokio::task::coop::consume_budget().await
360 }
361 coop_count = coop_count.wrapping_add(1);
362 }
363 }
364 }
365 OutportMessage::SetSubscriber(Some(subscriber)) => {
366 let sid = subscriber.id();
367
368 // We ensure there is no duplicate subscription
369 if !allow_duplicate_subscription {
370 if let Some((_, prev_subscriber)) =
371 subscribers.iter_mut().find(|(id, _)| id == &sid)
372 {
373 // In case of duplication, previous subscription is overrided
374 *prev_subscriber = subscriber;
375 } else {
376 subscribers.push((subscriber.id(), subscriber));
377 }
378 } else {
379 subscribers.push((subscriber.id(), subscriber));
380 }
381 }
382 OutportMessage::SetSubscriber(None) => (),
383 }
384 }
385 }
386 });
387
388 Self(tx)
389 }
390
391 pub(super) fn send(&self, value: TMsg) {
392 _ = self.0.send(OutportMessage::Data(value));
393 }
394 }
395
396 impl<TMsg: OutputMessage> OutputPort<ActorId, TMsg> {
397 pub(super) fn subscribe<TReceiverMsg, F>(
398 &self,
399 receiver: ActorRef<TReceiverMsg>,
400 converter: F,
401 ) where
402 F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
403 TReceiverMsg: Message,
404 {
405 self.set_subscriber_with_filter(receiver, move |msg| converter(msg.clone()))
406 }
407
408 pub(super) fn set_subscriber_with_filter<R: ActorReference>(
409 &self,
410 actor_ref: R,
411 filter: impl Fn(&TMsg) -> Option<R::Msg> + Send + 'static,
412 ) {
413 _ = self
414 .0
415 .send(OutportMessage::SetSubscriber(Some(Box::new(Filtering {
416 actor_ref,
417 filter,
418 }))));
419 }
420 }
421
422 impl<T: OutputMessage, U: Message> Subscriber<ActorId, T> for ActorRef<U>
423 where
424 U: TryFrom<T>,
425 {
426 fn send(&self, value: &T) -> bool {
427 if let Ok(value) = value.clone().try_into() {
428 self.send_message(value).is_ok()
429 } else {
430 true
431 }
432 }
433
434 fn id(&self) -> ActorId {
435 self.get_id()
436 }
437 }
438 impl<T: OutputMessage> Subscriber<ActorId, T> for DerivedActorRef<T> {
439 fn send(&self, value: &T) -> bool {
440 self.send_message(value.clone()).is_ok()
441 }
442
443 fn id(&self) -> ActorId {
444 self.get_id()
445 }
446 }
447 struct Filtering<T, F> {
448 pub actor_ref: T,
449 pub filter: F,
450 }
451 impl<T: ActorReference, U: OutputMessage, F: Fn(&U) -> Option<T::Msg> + Send + 'static>
452 Subscriber<ActorId, U> for Filtering<T, F>
453 {
454 fn send(&self, value: &U) -> bool {
455 if let Some(v) = (self.filter)(value) {
456 self.actor_ref.send_message(v)
457 } else {
458 true
459 }
460 }
461
462 fn id(&self) -> ActorId {
463 self.actor_ref.id()
464 }
465 }
466 pub(super) trait ActorReference: Send + Sync + 'static {
467 type Msg: Message;
468 fn send_message(&self, value: Self::Msg) -> bool;
469 fn id(&self) -> ActorId;
470 }
471 impl<T: Message> ActorReference for ActorRef<T> {
472 type Msg = T;
473
474 fn send_message(&self, value: T) -> bool {
475 self.send_message(value).is_ok()
476 }
477
478 fn id(&self) -> ActorId {
479 self.get_id()
480 }
481 }
482 impl<T: Message> ActorReference for DerivedActorRef<T> {
483 type Msg = T;
484
485 fn send_message(&self, value: T) -> bool {
486 self.send_message(value).is_ok()
487 }
488
489 fn id(&self) -> ActorId {
490 self.get_id()
491 }
492 }
493 }
494}
495
496/// Represents a boxed `ActorRef` subscriber capable of handling messages from a
497/// publisher via an `OutputPort`, employing a publish-subscribe pattern to
498/// decouple message broadcasting from handling. For a subscriber `ActorRef` to
499/// function as an `OutputPortSubscriber<T>`, its message type must implement
500/// `From<T>` to convert the published message type to its own message format.
501///
502/// # Example
503/// ```
504/// // First, define the publisher's message types, including a variant for
505/// // subscribing `OutputPortSubscriber`s and another for publishing messages:
506/// use ractor::{
507/// cast,
508/// port::{OutputPort, OutputPortSubscriber},
509/// Actor, ActorProcessingErr, ActorRef, Message,
510/// };
511///
512/// enum PublisherMessage {
513/// Publish(u8), // Message type for publishing
514/// Subscribe(OutputPortSubscriber<u8>), // Message type for subscribing an actor to the output port
515/// }
516///
517/// #[cfg(feature = "cluster")]
518/// impl Message for PublisherMessage {
519/// fn serializable() -> bool {
520/// false
521/// }
522/// }
523///
524/// // In the publisher actor's `handle` function, handle subscription requests and
525/// // publish messages accordingly:
526///
527/// struct Publisher;
528/// struct State {
529/// output_port: OutputPort<u8>,
530/// }
531///
532/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
533/// impl Actor for Publisher {
534/// type State = State;
535/// type Msg = PublisherMessage;
536/// type Arguments = ();
537///
538/// async fn pre_start(
539/// &self,
540/// _myself: ActorRef<Self::Msg>,
541/// _: (),
542/// ) -> Result<Self::State, ActorProcessingErr> {
543/// Ok(State {
544/// output_port: OutputPort::default(),
545/// })
546/// }
547///
548/// async fn handle(
549/// &self,
550/// _myself: ActorRef<Self::Msg>,
551/// message: Self::Msg,
552/// state: &mut Self::State,
553/// ) -> Result<(), ActorProcessingErr> {
554/// match message {
555/// PublisherMessage::Subscribe(subscriber) => {
556/// // Subscribes the `OutputPortSubscriber` wrapped actor to the `OutputPort`
557/// subscriber.subscribe_to_port(&state.output_port);
558/// }
559/// PublisherMessage::Publish(value) => {
560/// // Broadcasts the `u8` value to all subscribed actors, which will handle the type conversion
561/// state.output_port.send(value);
562/// }
563/// }
564/// Ok(())
565/// }
566/// }
567///
568/// // The subscriber's message type demonstrates how to transform the publisher's
569/// // message type by implementing `From<T>`:
570///
571/// #[derive(Debug)]
572/// enum SubscriberMessage {
573/// Handle(String), // Subscriber's intent for message handling
574/// }
575///
576/// #[cfg(feature = "cluster")]
577/// impl Message for SubscriberMessage {
578/// fn serializable() -> bool {
579/// false
580/// }
581/// }
582///
583/// impl From<u8> for SubscriberMessage {
584/// fn from(value: u8) -> Self {
585/// SubscriberMessage::Handle(value.to_string()) // Converts u8 to String
586/// }
587/// }
588///
589/// // To subscribe a subscriber actor to the publisher and broadcast a message:
590/// struct Subscriber;
591/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
592/// impl Actor for Subscriber {
593/// type State = ();
594/// type Msg = SubscriberMessage;
595/// type Arguments = ();
596///
597/// async fn pre_start(
598/// &self,
599/// _myself: ActorRef<Self::Msg>,
600/// _: (),
601/// ) -> Result<Self::State, ActorProcessingErr> {
602/// Ok(())
603/// }
604///
605/// async fn handle(
606/// &self,
607/// _myself: ActorRef<Self::Msg>,
608/// message: Self::Msg,
609/// _state: &mut Self::State,
610/// ) -> Result<(), ActorProcessingErr> {
611/// Ok(())
612/// }
613/// }
614/// async fn example() {
615/// let (publisher_actor_ref, publisher_actor_handle) =
616/// Actor::spawn(None, Publisher, ()).await.unwrap();
617/// let (subscriber_actor_ref, subscriber_actor_handle) =
618/// Actor::spawn(None, Subscriber, ()).await.unwrap();
619///
620/// publisher_actor_ref
621/// .send_message(PublisherMessage::Subscribe(Box::new(subscriber_actor_ref)))
622/// .unwrap();
623///
624/// // Broadcasting a message to all subscribers
625/// publisher_actor_ref
626/// .send_message(PublisherMessage::Publish(123))
627/// .unwrap();
628///
629/// publisher_actor_handle.await.unwrap();
630/// subscriber_actor_handle.await.unwrap();
631/// }
632/// ```
633pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
634/// A trait for subscribing to an [OutputPort]
635pub trait OutputPortSubscriberTrait<I>: Send
636where
637 I: Message + Clone,
638{
639 /// Subscribe to the output port
640 fn subscribe_to_port(&self, port: &OutputPort<I>);
641}
642
643impl<I, O> OutputPortSubscriberTrait<I> for ActorRef<O>
644where
645 I: Message + Clone,
646 O: Message + From<I>,
647{
648 fn subscribe_to_port(&self, port: &OutputPort<I>) {
649 port.subscribe(self.clone(), |msg| Some(O::from(msg)));
650 }
651}