troupe/sink.rs
1//! Actors that are only sent messages (either fire-and-forget messages or request-response
2//! messages).
3
4use std::marker::PhantomData;
5
6use tokio::sync::mpsc::UnboundedSender;
7
8use crate::{oneshot_channel, OneshotSender, Permanent, Transient};
9
10/// A marker type used by the [`ActorBuilder`](crate::ActorBuilder) to know what kind of
11/// [`ActorState`](crate::ActorState) it is dealing with. A sink actor is one that receives
12/// messages from other parts of the application. By adding a oneshot channel to the message,
13/// the actor can respond with a particular piece of data. This allows for type-safe communication
14/// between different parts of your program.
15///
16/// The client of a [`SinkActor`] is the [`SinkClient`]. This client implements methods that allow
17/// for the sending of messages to this client. Communication between a sink client and sink actor
18/// uses an MPSC-style channel (see [`mpsc::channel`](tokio::sync::mpsc)).
19#[derive(Debug)]
20pub struct SinkActor;
21
22/// A client to an actor. This client sends messages to the actor and supports two styles of
23/// messaging. The first is fire-and-forget messages. These messages are sent to the client
24/// immediately (no `.await` needed). The actor will process them eventually. The second kind is
25/// request-response or "trackable" messages. These messages are identical to the last kind except
26/// they contain a one-time use channel that the actor will use to send a message back.
27///
28/// It is helpful to use the [`derive_more`](https://crates.io/crates/derive_more) crate's
29/// [`From`](https://jeltef.github.io/derive_more/derive_more/from.html) derive macro with a sink
30/// actor's message type. The [`send`](SinkClient::send) and [`track`](SinkClient::track) methods
31/// of the `SinkClient` perform automatic convertion between the provided data and the actor's
32/// message type. Say you have an actor like the one below. You can send messages to that actor
33/// like so:
34/// ```ignore
35/// # extern crate derive_more;
36/// # use std::collections::HashMap;
37/// # use troupe::prelude::*;
38/// # use derive_more::From;
39/// #[derive(Default)]
40/// struct CacheState(HashMap<usize, String>);
41///
42/// #[derive(From)]
43/// enum CacheCommand {
44/// Insert(usize, String),
45/// Get(usize, OneshotSender<Option<String>>),
46/// Delete(usize),
47/// }
48/// # #[async_trait]
49/// # impl ActorState for CacheState {
50/// # type Message = CacheCommand;
51/// # type ActorType = SinkActor;
52/// # type Permanence = Permanent;
53/// # type Output = ();
54/// #
55/// # async fn process(&mut self, scheduler: &mut Scheduler<Self>, msg: Self::Message) { () }
56/// # }
57///
58/// let client = ActorBuilder::new(CacheState::default()).launch();
59///
60/// // Sends CacheCommand::Inset(42, "Hello world")
61/// client.send((42, String::from("Hello World")));
62/// // Sends CacheCommand::Get(42, OneshotSender) and returns a tracker which will listen for a
63/// // response from the actor.
64/// let tracker = client.track(42);
65/// // Sends CacheCommand::Delete(42)
66/// client.send(42);
67/// ```
68#[derive(Debug)]
69pub struct SinkClient<T, M> {
70 ty: PhantomData<T>,
71 send: UnboundedSender<M>,
72}
73
74impl<T, M> SinkClient<T, M> {
75 pub(crate) fn new(send: UnboundedSender<M>) -> Self {
76 Self {
77 send,
78 ty: PhantomData,
79 }
80 }
81
82 /// Returns if the actor that the client is connected to is dead or not.
83 pub fn is_closed(&self) -> bool {
84 self.send.is_closed()
85 }
86
87 /// Sends a fire-and-forget style message to the actor and returns if the message was sent
88 /// successfully.
89 pub fn send(&self, msg: impl Into<M>) -> bool {
90 self.send.send(msg.into()).is_ok()
91 }
92}
93
94impl<M> SinkClient<Permanent, M> {
95 /// Sends a request-response style message to a [`Permanent`] actor. The given data is paired
96 /// with a one-time use channel and sent to the actor. A [`Tracker`](permanent::Tracker) that
97 /// will receive a response from the actor is returned.
98 ///
99 /// Note: Since this client is one for a permanent actor, there is an implicit unwrap once the
100 /// tracker receives a message from the actor. If the actor drops the other half of the channel
101 /// or has died somehow (likely from a panic), the returned tracker will panic too. So, it is
102 /// important that the actor always sends back a message
103 pub fn track<I, O>(&self, msg: I) -> permanent::Tracker<O>
104 where
105 M: From<(I, OneshotSender<O>)>,
106 {
107 let (send, recv) = oneshot_channel();
108 let msg = M::from((msg, send));
109 let _ = self.send(msg);
110 permanent::Tracker::new(recv)
111 }
112}
113
114impl<M> SinkClient<Transient, M> {
115 /// Sends a request-response style message to a [`Transient`] actor. The given data is paired
116 /// with a one-time use channel and sent to the actor. A [`Tracker`](transient::Tracker) that
117 /// will receive a response from the actor is returned.
118 pub fn track<I, O>(&self, msg: I) -> transient::Tracker<O>
119 where
120 M: From<(I, OneshotSender<O>)>,
121 {
122 let (send, recv) = oneshot_channel();
123 let msg = M::from((msg, send));
124 let _ = self.send(msg);
125 transient::Tracker::new(recv)
126 }
127}
128
129impl<T, M> Clone for SinkClient<T, M> {
130 fn clone(&self) -> Self {
131 Self::new(self.send.clone())
132 }
133}
134
135/// A module for things used to interact with the [`Permanent`] actors.
136pub mod permanent {
137 use std::{
138 future::Future,
139 pin::Pin,
140 task::{Context, Poll},
141 };
142
143 use crate::OneshotReceiver;
144
145 /// A tracker for a request-response style message sent to a [`Permanent`](crate::Permanent) actor.
146 ///
147 /// Note: This tracker implicitly unwraps the message produced by its channel receiver. If the
148 /// actor drops the other half of the channel or has died somehow (likely from a panic), this
149 /// tracker will panic when polled.
150 #[derive(Debug)]
151 pub struct Tracker<T> {
152 recv: OneshotReceiver<T>,
153 }
154
155 impl<T> Tracker<T> {
156 /// A constructor for the tracker.
157 pub(crate) fn new(recv: OneshotReceiver<T>) -> Self {
158 Self { recv }
159 }
160 }
161
162 impl<T> Future for Tracker<T> {
163 type Output = T;
164
165 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166 Pin::new(&mut self.recv).poll(cx).map(Result::unwrap)
167 }
168 }
169}
170
171/// A module for things used to interact with the [`Transient`] actors.
172pub mod transient {
173 use std::{
174 fmt::Debug,
175 future::Future,
176 pin::Pin,
177 task::{Context, Poll},
178 };
179
180 use crate::OneshotReceiver;
181
182 /// A tracker for a request-response style message sent to a [`Transient`](crate::Transient) actor.
183 ///
184 /// Note: This tracker might be created after a failed attempt to send a message to a dead
185 /// actor. This means that the tracker will return `None` when polled; however, that does not
186 /// mean that the message was successfully received by the actor.
187 #[derive(Debug)]
188 pub struct Tracker<T> {
189 recv: OneshotReceiver<T>,
190 }
191
192 impl<T> Tracker<T> {
193 /// A constuctor for the tracker.
194 pub(crate) fn new(recv: OneshotReceiver<T>) -> Self {
195 Self { recv }
196 }
197 }
198
199 impl<T> Future for Tracker<T> {
200 type Output = Option<T>;
201
202 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203 Pin::new(&mut self.recv).poll(cx).map(Result::ok)
204 }
205 }
206}