troupe/
sink.rs

1//! Actors that are only sent messages (either fire-and-forget messages or request-response
2//! messages).
3
4use std::marker::PhantomData;
5
6use tokio::sync::mpsc::UnboundedSender;
7
8use crate::{oneshot_channel, OneshotSender, Permanent, Transient};
9
10/// A marker type used by the [`ActorBuilder`](crate::ActorBuilder) to know what kind of
11/// [`ActorState`](crate::ActorState) it is dealing with. A sink actor is one that receives
12/// messages from other parts of the application. By adding a oneshot channel to the message,
13/// the actor can respond with a particular piece of data. This allows for type-safe communication
14/// between different parts of your program.
15///
16/// The client of a [`SinkActor`] is the [`SinkClient`]. This client implements methods that allow
17/// for the sending of messages to this client. Communication between a sink client and sink actor
18/// uses an MPSC-style channel (see [`mpsc::channel`](tokio::sync::mpsc)).
19#[derive(Debug)]
20pub struct SinkActor;
21
22/// A client to an actor. This client sends messages to the actor and supports two styles of
23/// messaging. The first is fire-and-forget messages. These messages are sent to the client
24/// immediately (no `.await` needed). The actor will process them eventually. The second kind is
25/// request-response or "trackable" messages. These messages are identical to the last kind except
26/// they contain a one-time use channel that the actor will use to send a message back.
27///
28/// It is helpful to use the [`derive_more`](https://crates.io/crates/derive_more) crate's
29/// [`From`](https://jeltef.github.io/derive_more/derive_more/from.html) derive macro with a sink
30/// actor's message type. The [`send`](SinkClient::send) and [`track`](SinkClient::track) methods
31/// of the `SinkClient` perform automatic convertion between the provided data and the actor's
32/// message type. Say you have an actor like the one below. You can send messages to that actor
33/// like so:
34/// ```ignore
35/// # extern crate derive_more;
36/// # use std::collections::HashMap;
37/// # use troupe::prelude::*;
38/// # use derive_more::From;
39/// #[derive(Default)]
40/// struct CacheState(HashMap<usize, String>);
41///
42/// #[derive(From)]
43/// enum CacheCommand {
44///     Insert(usize, String),
45///     Get(usize, OneshotSender<Option<String>>),
46///     Delete(usize),
47/// }
48/// # #[async_trait]
49/// # impl ActorState for CacheState {
50/// #   type Message = CacheCommand;
51/// #   type ActorType = SinkActor;
52/// #   type Permanence = Permanent;
53/// #   type Output = ();
54/// #
55/// #   async fn process(&mut self, scheduler: &mut Scheduler<Self>, msg: Self::Message) { () }
56/// # }
57///
58/// let client = ActorBuilder::new(CacheState::default()).launch();
59///
60/// // Sends CacheCommand::Inset(42, "Hello world")
61/// client.send((42, String::from("Hello World")));
62/// // Sends CacheCommand::Get(42, OneshotSender) and returns a tracker which will listen for a
63/// // response from the actor.
64/// let tracker = client.track(42);
65/// // Sends CacheCommand::Delete(42)
66/// client.send(42);
67/// ```
68#[derive(Debug)]
69pub struct SinkClient<T, M> {
70    ty: PhantomData<T>,
71    send: UnboundedSender<M>,
72}
73
74impl<T, M> SinkClient<T, M> {
75    pub(crate) fn new(send: UnboundedSender<M>) -> Self {
76        Self {
77            send,
78            ty: PhantomData,
79        }
80    }
81
82    /// Returns if the actor that the client is connected to is dead or not.
83    pub fn is_closed(&self) -> bool {
84        self.send.is_closed()
85    }
86
87    /// Sends a fire-and-forget style message to the actor and returns if the message was sent
88    /// successfully.
89    pub fn send(&self, msg: impl Into<M>) -> bool {
90        self.send.send(msg.into()).is_ok()
91    }
92}
93
94impl<M> SinkClient<Permanent, M> {
95    /// Sends a request-response style message to a [`Permanent`] actor. The given data is paired
96    /// with a one-time use channel and sent to the actor. A [`Tracker`](permanent::Tracker) that
97    /// will receive a response from the actor is returned.
98    ///
99    /// Note: Since this client is one for a permanent actor, there is an implicit unwrap once the
100    /// tracker receives a message from the actor. If the actor drops the other half of the channel
101    /// or has died somehow (likely from a panic), the returned tracker will panic too. So, it is
102    /// important that the actor always sends back a message
103    pub fn track<I, O>(&self, msg: I) -> permanent::Tracker<O>
104    where
105        M: From<(I, OneshotSender<O>)>,
106    {
107        let (send, recv) = oneshot_channel();
108        let msg = M::from((msg, send));
109        let _ = self.send(msg);
110        permanent::Tracker::new(recv)
111    }
112}
113
114impl<M> SinkClient<Transient, M> {
115    /// Sends a request-response style message to a [`Transient`] actor. The given data is paired
116    /// with a one-time use channel and sent to the actor. A [`Tracker`](transient::Tracker) that
117    /// will receive a response from the actor is returned.
118    pub fn track<I, O>(&self, msg: I) -> transient::Tracker<O>
119    where
120        M: From<(I, OneshotSender<O>)>,
121    {
122        let (send, recv) = oneshot_channel();
123        let msg = M::from((msg, send));
124        let _ = self.send(msg);
125        transient::Tracker::new(recv)
126    }
127}
128
129impl<T, M> Clone for SinkClient<T, M> {
130    fn clone(&self) -> Self {
131        Self::new(self.send.clone())
132    }
133}
134
135/// A module for things used to interact with the [`Permanent`] actors.
136pub mod permanent {
137    use std::{
138        future::Future,
139        pin::Pin,
140        task::{Context, Poll},
141    };
142
143    use crate::OneshotReceiver;
144
145    /// A tracker for a request-response style message sent to a [`Permanent`](crate::Permanent) actor.
146    ///
147    /// Note: This tracker implicitly unwraps the message produced by its channel receiver. If the
148    /// actor drops the other half of the channel or has died somehow (likely from a panic), this
149    /// tracker will panic when polled.
150    #[derive(Debug)]
151    pub struct Tracker<T> {
152        recv: OneshotReceiver<T>,
153    }
154
155    impl<T> Tracker<T> {
156        /// A constructor for the tracker.
157        pub(crate) fn new(recv: OneshotReceiver<T>) -> Self {
158            Self { recv }
159        }
160    }
161
162    impl<T> Future for Tracker<T> {
163        type Output = T;
164
165        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166            Pin::new(&mut self.recv).poll(cx).map(Result::unwrap)
167        }
168    }
169}
170
171/// A module for things used to interact with the [`Transient`] actors.
172pub mod transient {
173    use std::{
174        fmt::Debug,
175        future::Future,
176        pin::Pin,
177        task::{Context, Poll},
178    };
179
180    use crate::OneshotReceiver;
181
182    /// A tracker for a request-response style message sent to a [`Transient`](crate::Transient) actor.
183    ///
184    /// Note: This tracker might be created after a failed attempt to send a message to a dead
185    /// actor. This means that the tracker will return `None` when polled; however, that does not
186    /// mean that the message was successfully received by the actor.
187    #[derive(Debug)]
188    pub struct Tracker<T> {
189        recv: OneshotReceiver<T>,
190    }
191
192    impl<T> Tracker<T> {
193        /// A constuctor for the tracker.
194        pub(crate) fn new(recv: OneshotReceiver<T>) -> Self {
195            Self { recv }
196        }
197    }
198
199    impl<T> Future for Tracker<T> {
200        type Output = Option<T>;
201
202        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203            Pin::new(&mut self.recv).poll(cx).map(Result::ok)
204        }
205    }
206}