Module heph::actor_ref::rpc [−][src]
Expand description
Types related the ActorRef
Remote Procedure Call (RPC) mechanism.
RPC is implemented by sending a RpcMessage
to the actor, which contains
the request message and a RpcResponse
. The RpcResponse
allows the
receiving actor to send back a response to the sending actor.
To support RPC the receiving actor needs to implement
From
<
RpcMessage
<Req, Res>>
, where Req
is the type of the
request message and Res
the type of the response. This can be done easily
by using the from_message
macro. The RPC message can then be received
like any other message.
The sending actor needs to call ActorRef::rpc
with the correct request
type. That will return an Rpc
Future
which returns the response to
the call, or RpcError
in case of an error.
Examples
Using RPC to communicate with another actor.
use heph::actor; use heph::actor_ref::{ActorRef, RpcMessage}; use heph::rt::{self, ThreadLocal}; /// Message type for [`counter`]. struct Add(RpcMessage<usize, usize>); /// Required to support RPC. impl From<RpcMessage<usize, usize>> for Add { fn from(msg: RpcMessage<usize, usize>) -> Add { Add(msg) } } /// Receiving actor of the RPC. async fn counter(mut ctx: actor::Context<Add, ThreadLocal>) { // State of the counter. let mut count: usize = 0; // Receive a message like normal. while let Ok(Add(RpcMessage { request, response })) = ctx.receive_next().await { count += request; // Send back the current state, ignoring any errors. let _ = response.respond(count); } } /// Sending actor of the RPC. async fn requester(_: actor::Context<!, ThreadLocal>, actor_ref: ActorRef<Add>) { // Make the procedure call. let response = actor_ref.rpc(10).await; match response { // We got a response. Ok(count) => println!("Current count: {}", count), // Actor failed to respond. Err(err) => eprintln!("Counter didn't reply: {}", err), } }
Supporting multiple procedure within the same actor is possible by making
the message an enum
as the example below shows. Furthermore synchronous
actors are supported.
use heph::actor::{self, SyncContext}; use heph::actor_ref::{ActorRef, RpcMessage}; use heph::from_message; use heph::rt::{self, ThreadLocal}; /// Message type for [`counter`]. enum Message { /// Increase the counter, returning the current state. Add(RpcMessage<usize, usize>), /// Get the current state of the counter. Get(RpcMessage<(), usize>), } // Implement the `From` trait for `Message`. from_message!(Message::Add(usize) -> usize); from_message!(Message::Get(()) -> usize); /// Receiving synchronous actor of the RPC. fn counter(mut ctx: SyncContext<Message>) { // State of the counter. let mut count: usize = 0; // Receive messages in a loop. while let Ok(msg) = ctx.receive_next() { match msg { Message::Add(RpcMessage { request, response }) => { count += request; // Send back the current state, ignoring any errors. let _ = response.respond(count); }, Message::Get(RpcMessage { response, .. }) => { // Send back the current state, ignoring any errors. let _ = response.respond(count); }, } } } /// Sending actor of the RPC. async fn requester(_: actor::Context<!, ThreadLocal>, actor_ref: ActorRef<Message>) { // Increase the counter by ten. // NOTE: do handle the errors correctly in practice, this is just an // example. let count = actor_ref.rpc(10).await.unwrap(); println!("Increased count to {}", count); // Retrieve the current count. let count = actor_ref.rpc(()).await.unwrap(); println!("Current count {}", count); }
Structs
Future
that resolves to a Remote Procedure Call (RPC) response.
Message type that holds an RPC request.
Structure to respond to an Rpc
request.