1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.
//! Output ports for publish-subscribe notifications between actors
//!
//! This notion extends beyond traditional actors in this that is a publish-subscribe
//! mechanism we've added in `ractor`. Output ports are ports which can have messages published
//! to them which are automatically forwarded to downstream actors waiting for inputs. They optionally
//! have a message transformer attached to them to convert them to the appropriate message type
use std::sync::RwLock;
use crate::concurrency::JoinHandle;
use tokio::sync::broadcast as pubsub;
use crate::{ActorRef, Message};
#[cfg(test)]
mod tests;
/// Output messages, since they need to be replicated, require [Clone] in addition
/// to the base [Message] constraints
pub trait OutputMessage: Message + Clone {}
impl<T: Message + Clone> OutputMessage for T {}
/// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
/// It allows actors to emit messages without knowing which downstream actors are subscribed.
///
/// You can subscribe to the output port with an [ActorRef] and a message converter from the output
/// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
/// be dropped and if the output port is dropped, then the subscription will also be dropped
/// automatically.
pub struct OutputPort<TMsg>
where
TMsg: OutputMessage,
{
tx: pubsub::Sender<Option<TMsg>>,
subscriptions: RwLock<Vec<OutputPortSubscription>>,
}
impl<TMsg> Default for OutputPort<TMsg>
where
TMsg: OutputMessage,
{
fn default() -> Self {
// We only need enough buffer for the subscription task to forward to the input port
// of the receiving actor. Hence 10 should be plenty.
let (tx, _rx) = pubsub::channel(10);
Self {
tx,
subscriptions: RwLock::new(vec![]),
}
}
}
impl<TMsg> OutputPort<TMsg>
where
TMsg: OutputMessage,
{
/// Subscribe to the output port, passing in a converter to convert to the input message
/// of another actor
///
/// * `receiver` - The reference to the actor which will receive forwarded messages
/// * `converter` - The converter which will convert the output message type to the
/// receiver's input type and return [Some(_)] if the message should be forwarded, [None]
/// if the message should be skipped.
pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
where
F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
TReceiverMsg: Message,
{
let mut subs = self.subscriptions.write().unwrap();
// filter out dead subscriptions, since they're no longer valid
subs.retain(|sub| !sub.is_dead());
let sub = OutputPortSubscription::new::<TMsg, F, TReceiverMsg>(
self.tx.subscribe(),
converter,
receiver,
);
subs.push(sub);
}
/// Send a message on the output port
///
/// * `msg`: The message to send
pub fn send(&self, msg: TMsg) {
if self.tx.receiver_count() > 0 {
let _ = self.tx.send(Some(msg));
}
}
}
impl<TMsg> Drop for OutputPort<TMsg>
where
TMsg: OutputMessage,
{
fn drop(&mut self) {
let mut subs = self.subscriptions.write().unwrap();
for sub in subs.iter_mut() {
sub.stop();
}
subs.clear();
}
}
// ============== Subscription implementation ============== //
/// The output port's subscription handle. It holds a handle to a [JoinHandle]
/// which listens to the [pubsub::Receiver] to see if there's a new message, and if there is
/// forwards it to the [ActorRef] asynchronously using the specified converter.
struct OutputPortSubscription {
handle: JoinHandle<()>,
}
impl OutputPortSubscription {
/// Determine if the subscription is dead
pub fn is_dead(&self) -> bool {
self.handle.is_finished()
}
/// Stop the subscription, by aborting the underlying [JoinHandle]
pub fn stop(&mut self) {
self.handle.abort();
}
/// Create a new subscription
pub fn new<TMsg, F, TReceiverMsg>(
mut port: pubsub::Receiver<Option<TMsg>>,
converter: F,
receiver: ActorRef<TReceiverMsg>,
) -> Self
where
TMsg: OutputMessage,
F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
TReceiverMsg: Message,
{
let handle = crate::concurrency::spawn(async move {
while let Ok(Some(msg)) = port.recv().await {
if let Some(new_msg) = converter(msg) {
if receiver.cast(new_msg).is_err() {
// kill the subscription process, as the forwarding agent is stopped
return;
}
}
}
});
Self { handle }
}
}
/// Represents a boxed `ActorRef` subscriber capable of handling messages from a
/// publisher via an `OutputPort`, employing a publish-subscribe pattern to
/// decouple message broadcasting from handling. For a subscriber `ActorRef` to
/// function as an `OutputPortSubscriber<T>`, its message type must implement
/// `From<T>` to convert the published message type to its own message format.
///
/// # Example
/// ```
/// // First, define the publisher's message types, including a variant for
/// // subscribing `OutputPortSubscriber`s and another for publishing messages:
/// use ractor::{
/// cast,
/// port::{OutputPort, OutputPortSubscriber},
/// Actor, ActorProcessingErr, ActorRef, Message,
/// };
///
/// enum PublisherMessage {
/// Publish(u8), // Message type for publishing
/// Subscribe(OutputPortSubscriber<u8>), // Message type for subscribing an actor to the output port
/// }
///
/// #[cfg(feature = "cluster")]
/// impl Message for PublisherMessage {
/// fn serializable() -> bool {
/// false
/// }
/// }
///
/// // In the publisher actor's `handle` function, handle subscription requests and
/// // publish messages accordingly:
///
/// struct Publisher;
/// struct State {
/// output_port: OutputPort<u8>,
/// }
///
/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
/// impl Actor for Publisher {
/// type State = State;
/// type Msg = PublisherMessage;
/// type Arguments = ();
///
/// async fn pre_start(
/// &self,
/// _myself: ActorRef<Self::Msg>,
/// _: (),
/// ) -> Result<Self::State, ActorProcessingErr> {
/// Ok(State {
/// output_port: OutputPort::default(),
/// })
/// }
///
/// async fn handle(
/// &self,
/// _myself: ActorRef<Self::Msg>,
/// message: Self::Msg,
/// state: &mut Self::State,
/// ) -> Result<(), ActorProcessingErr> {
/// match message {
/// PublisherMessage::Subscribe(subscriber) => {
/// // Subscribes the `OutputPortSubscriber` wrapped actor to the `OutputPort`
/// subscriber.subscribe_to_port(&state.output_port);
/// }
/// PublisherMessage::Publish(value) => {
/// // Broadcasts the `u8` value to all subscribed actors, which will handle the type conversion
/// state.output_port.send(value);
/// }
/// }
/// Ok(())
/// }
/// }
///
/// // The subscriber's message type demonstrates how to transform the publisher's
/// // message type by implementing `From<T>`:
///
/// #[derive(Debug)]
/// enum SubscriberMessage {
/// Handle(String), // Subscriber's intent for message handling
/// }
///
/// #[cfg(feature = "cluster")]
/// impl Message for SubscriberMessage {
/// fn serializable() -> bool {
/// false
/// }
/// }
///
/// impl From<u8> for SubscriberMessage {
/// fn from(value: u8) -> Self {
/// SubscriberMessage::Handle(value.to_string()) // Converts u8 to String
/// }
/// }
///
/// // To subscribe a subscriber actor to the publisher and broadcast a message:
/// struct Subscriber;
/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
/// impl Actor for Subscriber {
/// type State = ();
/// type Msg = SubscriberMessage;
/// type Arguments = ();
///
/// async fn pre_start(
/// &self,
/// _myself: ActorRef<Self::Msg>,
/// _: (),
/// ) -> Result<Self::State, ActorProcessingErr> {
/// Ok(())
/// }
///
/// async fn handle(
/// &self,
/// _myself: ActorRef<Self::Msg>,
/// message: Self::Msg,
/// _state: &mut Self::State,
/// ) -> Result<(), ActorProcessingErr> {
/// Ok(())
/// }
/// }
/// async fn example() {
/// let (publisher_actor_ref, publisher_actor_handle) =
/// Actor::spawn(None, Publisher, ()).await.unwrap();
/// let (subscriber_actor_ref, subscriber_actor_handle) =
/// Actor::spawn(None, Subscriber, ()).await.unwrap();
///
/// publisher_actor_ref
/// .send_message(PublisherMessage::Subscribe(Box::new(subscriber_actor_ref)))
/// .unwrap();
///
/// // Broadcasting a message to all subscribers
/// publisher_actor_ref
/// .send_message(PublisherMessage::Publish(123))
/// .unwrap();
///
/// publisher_actor_handle.await.unwrap();
/// subscriber_actor_handle.await.unwrap();
/// }
/// ```
pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
/// A trait for subscribing to an [OutputPort]
pub trait OutputPortSubscriberTrait<I>: Send
where
I: Message + Clone,
{
/// Subscribe to the output port
fn subscribe_to_port(&self, port: &OutputPort<I>);
}
impl<I, O> OutputPortSubscriberTrait<I> for ActorRef<O>
where
I: Message + Clone,
O: Message + From<I>,
{
fn subscribe_to_port(&self, port: &OutputPort<I>) {
port.subscribe(self.clone(), |msg| Some(O::from(msg)));
}
}