ractor/
macros.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Macro helpers for remote procedure calls
7
8/// `cast!` takes an actor and a message and emits a [crate::RactorErr] error
9/// which can be pattern matched on in order to derive the output.
10#[macro_export]
11macro_rules! cast {
12    ($actor:expr, $msg:expr) => {
13        $actor.cast($msg).map_err($crate::RactorErr::from)
14    };
15}
16
17/// `call!`: Perform an infinite-time remote procedure call to an [crate::Actor]
18///
19/// * `$actor` - The actor to call
20/// * `$msg` - The message builder which takes in a [crate::port::RpcReplyPort] and emits a message which
21///   the actor supports
22/// * `$args` - (optional) Variable length arguments which will PRECEDE the reply channel when
23///   constructing the message payload
24///
25/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
26/// Example usage (without the `cluster` feature)
27/// ```rust
28/// use ractor::{call, Actor, RpcReplyPort, ActorRef, ActorProcessingErr};
29/// struct TestActor;
30/// enum MessageFormat {
31///     TestRpc(String, RpcReplyPort<String>),
32/// }
33/// #[cfg(feature = "cluster")]
34/// impl ractor::Message for MessageFormat {}
35///
36/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
37/// impl Actor for TestActor {
38///     type Msg = MessageFormat;
39///     type Arguments = ();
40///     type State = ();
41///
42///     async fn pre_start(&self, _this_actor: ActorRef<Self::Msg>, _: ()) -> Result<Self::State, ActorProcessingErr> {
43///         Ok(())
44///     }
45///
46///     async fn handle(
47///         &self,
48///         _this_actor: ActorRef<Self::Msg>,
49///         message: Self::Msg,
50///         _state: &mut Self::State,
51///     ) -> Result<(), ActorProcessingErr> {
52///         match message {
53///             Self::Msg::TestRpc(arg, reply) => {
54///                 // An error sending means no one is listening anymore (the receiver was dropped),
55///                 // so we should shortcut the processing of this message probably!
56///                 if !reply.is_closed() {
57///                     let _ = reply.send(arg);
58///                 }
59///             }
60///         }
61///         Ok(())
62///     }
63/// }
64///
65/// #[tokio::main]
66/// async fn main() {
67///     let (actor, handle) = Actor::spawn(None, TestActor, ()).await.unwrap();
68///     let result = call!(actor, MessageFormat::TestRpc, "Something".to_string()).unwrap();
69///     assert_eq!(result, "Something".to_string());
70///     actor.stop(None);
71///     handle.await.unwrap();
72/// }
73/// ```
74#[macro_export]
75macro_rules! call {
76    ($actor:expr, $msg:expr) => {{
77        let err = $actor
78            .call(|tx| $msg(tx), None)
79            .await
80            .map_err($crate::RactorErr::from);
81        match err {
82            Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
83            Ok(cr) => Err($crate::RactorErr::from(cr)),
84            Err(e) => Err(e),
85        }
86    }};
87    ($actor:expr, $msg:expr, $($args:expr),*) => {{
88        let err = $actor
89            .call(|tx| $msg($($args),*, tx), None)
90            .await
91            .map_err($crate::RactorErr::from);
92        match err {
93            Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
94            Ok(cr) => Err($crate::RactorErr::from(cr)),
95            Err(e) => Err(e),
96        }
97    }};
98}
99
100/// `call_t!`: Perform an finite-time remote procedure call to an [crate::Actor]
101///
102/// * `$actor` - The actor to call
103/// * `$msg` - The message builder variant
104/// * `$timeout_ms` - the timeout in milliseconds for the remote procedure call
105/// * `$args` - (optional) Variable length arguments which will PRECEDE the reply channel when
106///   constructing the message payload
107///
108/// Returns [Ok(_)] with the result on successful RPC or [Err(crate::RactorErr)] on failure
109///
110/// Example usage
111/// ```rust
112/// use ractor::{call_t, Actor, ActorRef, RpcReplyPort, ActorProcessingErr};
113/// struct TestActor;
114/// enum MessageFormat {
115///     TestRpc(String, RpcReplyPort<String>),
116/// }
117/// #[cfg(feature = "cluster")]
118/// impl ractor::Message for MessageFormat {}
119///
120/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
121/// impl Actor for TestActor {
122///     type Msg = MessageFormat;
123///     type Arguments = ();
124///     type State = ();
125///
126///     async fn pre_start(&self, _this_actor: ActorRef<Self::Msg>, _: ()) -> Result<Self::State, ActorProcessingErr> {
127///         Ok(())
128///     }
129///
130///     async fn handle(
131///         &self,
132///         _this_actor: ActorRef<Self::Msg>,
133///         message: Self::Msg,
134///         _state: &mut Self::State,
135///     ) -> Result<(), ActorProcessingErr> {
136///         match message {
137///             Self::Msg::TestRpc(arg, reply) => {
138///                 // An error sending means no one is listening anymore (the receiver was dropped),
139///                 // so we should shortcut the processing of this message probably!
140///                 if !reply.is_closed() {
141///                     let _ = reply.send(arg);
142///                 }
143///             }
144///         }
145///         Ok(())
146///     }
147/// }
148///
149/// #[tokio::main]
150/// async fn main() {
151///     let (actor, handle) = Actor::spawn(None, TestActor, ()).await.unwrap();
152///     let result = call_t!(actor, MessageFormat::TestRpc, 50, "Something".to_string()).unwrap();
153///     assert_eq!(result, "Something".to_string());
154///     actor.stop(None);
155///     handle.await.unwrap();
156/// }
157/// ```
158#[macro_export]
159macro_rules! call_t {
160    ($actor:expr, $msg:expr, $timeout_ms:expr) => {{
161        let err = $actor
162            .call(|tx| $msg(tx), Some($crate::concurrency::Duration::from_millis($timeout_ms)))
163            .await
164            .map_err($crate::RactorErr::from);
165        match err {
166            Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
167            Ok(cr) => Err($crate::RactorErr::from(cr)),
168            Err(e) => Err(e),
169        }
170    }};
171    ($actor:expr, $msg:expr, $timeout_ms:expr, $($args:expr),*) => {{
172        let err = $actor
173            .call(|tx| $msg($($args),*, tx), Some($crate::concurrency::Duration::from_millis($timeout_ms)))
174            .await
175            .map_err($crate::RactorErr::from);
176        match err {
177            Ok($crate::rpc::CallResult::Success(ok_value)) => Ok(ok_value),
178            Ok(cr) => Err($crate::RactorErr::from(cr)),
179            Err(e) => Err(e),
180        }
181    }};
182}
183
184/// `forward!`: Perform a remote procedure call to a [crate::Actor]
185/// and forwards the result to another actor if successful
186///
187/// * `$actor` - The actors to call
188/// * `$msg` - The message builder, which takes in a [crate::port::RpcReplyPort] and emits a message which
189///   the actor supports.
190/// * `$forward` - The [crate::ActorRef] to forward the call to
191/// * `$forward_mapping` - The message transformer from the RPC result to the forwarding actor's message format
192/// * `$timeout` - The [crate::concurrency::Duration] to allow the call to complete before timing out.
193///
194/// Returns [Ok(())] on successful call forwarding, [Err(crate::RactorErr)] otherwies
195#[macro_export]
196macro_rules! forward {
197    ($actor:expr, $msg:expr, $forward:ident, $forward_mapping:expr) => {{
198        let future_or_err = $actor
199            .call_and_forward(|tx| $msg(tx), &$forward, $forward_mapping, None)
200            .map_err($crate::RactorErr::from);
201        match future_or_err {
202            Ok(future) => {
203                let err = future.await;
204                match err {
205                    Ok($crate::rpc::CallResult::Success(Ok(()))) => Ok(()),
206                    Ok($crate::rpc::CallResult::Success(Err(e))) => Err($crate::RactorErr::from(e)),
207                    Ok(cr) => Err($crate::RactorErr::from(cr)),
208                    Err(_join_err) => Err($crate::RactorErr::Messaging(
209                        $crate::MessagingErr::ChannelClosed,
210                    )),
211                }
212            }
213            Err(_) => Err($crate::RactorErr::Messaging(
214                $crate::MessagingErr::ChannelClosed,
215            )),
216        }
217    }};
218    ($actor:expr, $msg:expr, $forward:ident, $forward_mapping:expr, $timeout:expr) => {{
219        let future_or_err = $actor
220            .call_and_forward(|tx| $msg(tx), &$forward, $forward_mapping, Some($timeout))
221            .map_err($crate::RactorErr::from);
222        match future_or_err {
223            Ok(future) => {
224                let err = future.await;
225                match err {
226                    Ok($crate::rpc::CallResult::Success(Ok(()))) => Ok(()),
227                    Ok($crate::rpc::CallResult::Success(Err(e))) => Err($crate::RactorErr::from(e)),
228                    Ok(cr) => Err($crate::RactorErr::from(cr)),
229                    Err(_join_err) => Err($crate::RactorErr::Messaging(
230                        $crate::MessagingErr::ChannelClosed,
231                    )),
232                }
233            }
234            Err(_) => Err($crate::RactorErr::Messaging(
235                $crate::MessagingErr::ChannelClosed,
236            )),
237        }
238    }};
239}
240
241#[cfg(test)]
242mod tests;