ractor/macros.rs
1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Macro helpers for remote procedure calls
7
8/// `cast!` takes an actor and a message and emits a [crate::RactorErr] error
9/// which can be pattern matched on in order to derive the output.
10#[macro_export]
11macro_rules! cast {
12 ($actor:expr, $msg:expr) => {
13 $actor.cast($msg).map_err($crate::RactorErr::from)
14 };
15}
16
17/// `call!`: Perform an infinite-time remote procedure call to an [crate::Actor]
18///
19/// * `$actor` - The actor to call
20/// * `$msg` - The message builder which takes in a [crate::port::RpcReplyPort] and emits a message which
21/// the actor supports
22/// * `$args` - (optional) Variable length arguments which will PRECEDE the reply channel when
23/// constructing the message payload
24///
25/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
26/// Example usage (without the `cluster` feature)
27/// ```rust
28/// use ractor::{call, Actor, RpcReplyPort, ActorRef, ActorProcessingErr};
29/// struct TestActor;
30/// enum MessageFormat {
31/// TestRpc(String, RpcReplyPort<String>),
32/// }
33/// #[cfg(feature = "cluster")]
34/// impl ractor::Message for MessageFormat {}
35///
36/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
37/// impl Actor for TestActor {
38/// type Msg = MessageFormat;
39/// type Arguments = ();
40/// type State = ();
41///
42/// async fn pre_start(&self, _this_actor: ActorRef<Self::Msg>, _: ()) -> Result<Self::State, ActorProcessingErr> {
43/// Ok(())
44/// }
45///
46/// async fn handle(
47/// &self,
48/// _this_actor: ActorRef<Self::Msg>,
49/// message: Self::Msg,
50/// _state: &mut Self::State,
51/// ) -> Result<(), ActorProcessingErr> {
52/// match message {
53/// Self::Msg::TestRpc(arg, reply) => {
54/// // An error sending means no one is listening anymore (the receiver was dropped),
55/// // so we should shortcut the processing of this message probably!
56/// if !reply.is_closed() {
57/// let _ = reply.send(arg);
58/// }
59/// }
60/// }
61/// Ok(())
62/// }
63/// }
64///
65/// #[tokio::main]
66/// async fn main() {
67/// let (actor, handle) = Actor::spawn(None, TestActor, ()).await.unwrap();
68/// let result = call!(actor, MessageFormat::TestRpc, "Something".to_string()).unwrap();
69/// assert_eq!(result, "Something".to_string());
70/// actor.stop(None);
71/// handle.await.unwrap();
72/// }
73/// ```
74#[macro_export]
75macro_rules! call {
76 ($actor:expr, $msg:expr) => {{
77 let err = $actor
78 .call(|tx| $msg(tx), None)
79 .await
80 .map_err($crate::RactorErr::from);
81 match err {
82 Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
83 Ok(cr) => Err($crate::RactorErr::from(cr)),
84 Err(e) => Err(e),
85 }
86 }};
87 ($actor:expr, $msg:expr, $($args:expr),*) => {{
88 let err = $actor
89 .call(|tx| $msg($($args),*, tx), None)
90 .await
91 .map_err($crate::RactorErr::from);
92 match err {
93 Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
94 Ok(cr) => Err($crate::RactorErr::from(cr)),
95 Err(e) => Err(e),
96 }
97 }};
98}
99
100/// `call_t!`: Perform an finite-time remote procedure call to an [crate::Actor]
101///
102/// * `$actor` - The actor to call
103/// * `$msg` - The message builder variant
104/// * `$timeout_ms` - the timeout in milliseconds for the remote procedure call
105/// * `$args` - (optional) Variable length arguments which will PRECEDE the reply channel when
106/// constructing the message payload
107///
108/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
109///
110/// Example usage
111/// ```rust
112/// use ractor::{call_t, Actor, ActorRef, RpcReplyPort, ActorProcessingErr};
113/// struct TestActor;
114/// enum MessageFormat {
115/// TestRpc(String, RpcReplyPort<String>),
116/// }
117/// #[cfg(feature = "cluster")]
118/// impl ractor::Message for MessageFormat {}
119///
120/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
121/// impl Actor for TestActor {
122/// type Msg = MessageFormat;
123/// type Arguments = ();
124/// type State = ();
125///
126/// async fn pre_start(&self, _this_actor: ActorRef<Self::Msg>, _: ()) -> Result<Self::State, ActorProcessingErr> {
127/// Ok(())
128/// }
129///
130/// async fn handle(
131/// &self,
132/// _this_actor: ActorRef<Self::Msg>,
133/// message: Self::Msg,
134/// _state: &mut Self::State,
135/// ) -> Result<(), ActorProcessingErr> {
136/// match message {
137/// Self::Msg::TestRpc(arg, reply) => {
138/// // An error sending means no one is listening anymore (the receiver was dropped),
139/// // so we should shortcut the processing of this message probably!
140/// if !reply.is_closed() {
141/// let _ = reply.send(arg);
142/// }
143/// }
144/// }
145/// Ok(())
146/// }
147/// }
148///
149/// #[tokio::main]
150/// async fn main() {
151/// let (actor, handle) = Actor::spawn(None, TestActor, ()).await.unwrap();
152/// let result = call_t!(actor, MessageFormat::TestRpc, 50, "Something".to_string()).unwrap();
153/// assert_eq!(result, "Something".to_string());
154/// actor.stop(None);
155/// handle.await.unwrap();
156/// }
157/// ```
158#[macro_export]
159macro_rules! call_t {
160 ($actor:expr, $msg:expr, $timeout_ms:expr) => {{
161 let err = $actor
162 .call(|tx| $msg(tx), Some($crate::concurrency::Duration::from_millis($timeout_ms)))
163 .await
164 .map_err($crate::RactorErr::from);
165 match err {
166 Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
167 Ok(cr) => Err($crate::RactorErr::from(cr)),
168 Err(e) => Err(e),
169 }
170 }};
171 ($actor:expr, $msg:expr, $timeout_ms:expr, $($args:expr),*) => {{
172 let err = $actor
173 .call(|tx| $msg($($args),*, tx), Some($crate::concurrency::Duration::from_millis($timeout_ms)))
174 .await
175 .map_err($crate::RactorErr::from);
176 match err {
177 Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
178 Ok(cr) => Err($crate::RactorErr::from(cr)),
179 Err(e) => Err(e),
180 }
181 }};
182}
183
184/// `forward!`: Perform a remote procedure call to a [crate::Actor]
185/// and forwards the result to another actor if successful
186///
187/// * `$actor` - The actors to call
188/// * `$msg` - The message builder, which takes in a [crate::port::RpcReplyPort] and emits a message which
189/// the actor supports.
190/// * `$forward` - The [crate::ActorRef] to forward the call to
191/// * `$forward_mapping` - The message transformer from the RPC result to the forwarding actor's message format
192/// * `$timeout` - The [crate::concurrency::Duration] to allow the call to complete before timing out.
193///
194/// Returns [Ok(())] on successful call forwarding, [Err(crate::RactorErr)] otherwies
195#[macro_export]
196macro_rules! forward {
197 ($actor:expr, $msg:expr, $forward:ident, $forward_mapping:expr) => {{
198 let future_or_err = $actor
199 .call_and_forward(|tx| $msg(tx), &$forward, $forward_mapping, None)
200 .map_err($crate::RactorErr::from);
201 match future_or_err {
202 Ok(future) => {
203 let err = future.await;
204 match err {
205 Ok($crate::rpc::CallResult::Success(Ok(()))) => Ok(()),
206 Ok($crate::rpc::CallResult::Success(Err(e))) => Err($crate::RactorErr::from(e)),
207 Ok(cr) => Err($crate::RactorErr::from(cr)),
208 Err(_join_err) => Err($crate::RactorErr::Messaging(
209 $crate::MessagingErr::ChannelClosed,
210 )),
211 }
212 }
213 Err(_) => Err($crate::RactorErr::Messaging(
214 $crate::MessagingErr::ChannelClosed,
215 )),
216 }
217 }};
218 ($actor:expr, $msg:expr, $forward:ident, $forward_mapping:expr, $timeout:expr) => {{
219 let future_or_err = $actor
220 .call_and_forward(|tx| $msg(tx), &$forward, $forward_mapping, Some($timeout))
221 .map_err($crate::RactorErr::from);
222 match future_or_err {
223 Ok(future) => {
224 let err = future.await;
225 match err {
226 Ok($crate::rpc::CallResult::Success(Ok(()))) => Ok(()),
227 Ok($crate::rpc::CallResult::Success(Err(e))) => Err($crate::RactorErr::from(e)),
228 Ok(cr) => Err($crate::RactorErr::from(cr)),
229 Err(_join_err) => Err($crate::RactorErr::Messaging(
230 $crate::MessagingErr::ChannelClosed,
231 )),
232 }
233 }
234 Err(_) => Err($crate::RactorErr::Messaging(
235 $crate::MessagingErr::ChannelClosed,
236 )),
237 }
238 }};
239}
240
241#[cfg(test)]
242mod tests;