Module heph::actor_ref::rpc[][src]

Expand description

Types related the ActorRef Remote Procedure Call (RPC) mechanism.

RPC is implemented by sending a RpcMessage to the actor, which contains the request message and a RpcResponse. The RpcResponse allows the receiving actor to send back a response to the sending actor.

To support RPC the receiving actor needs to implement From<RpcMessage<Req, Res>>, where Req is the type of the request message and Res the type of the response. This can be done easily by using the from_message macro. The RPC message can then be received like any other message.

The sending actor needs to call ActorRef::rpc with the correct request type. That will return an Rpc Future which returns the response to the call, or RpcError in case of an error.

Examples

Using RPC to communicate with another actor.

use heph::actor;
use heph::actor_ref::{ActorRef, RpcMessage};
use heph::rt::{self, ThreadLocal};

/// Message type for [`counter`].
struct Add(RpcMessage<usize, usize>);

/// Required to support RPC.
impl From<RpcMessage<usize, usize>> for Add {
    fn from(msg: RpcMessage<usize, usize>) -> Add {
        Add(msg)
    }
}

/// Receiving actor of the RPC.
async fn counter(mut ctx: actor::Context<Add, ThreadLocal>) {
    // State of the counter.
    let mut count: usize = 0;
    // Receive a message like normal.
    while let Ok(Add(RpcMessage { request, response })) = ctx.receive_next().await {
        count += request;
        // Send back the current state, ignoring any errors.
        let _ = response.respond(count);
    }
}

/// Sending actor of the RPC.
async fn requester(_: actor::Context<!, ThreadLocal>, actor_ref: ActorRef<Add>) {
    // Make the procedure call.
    let response = actor_ref.rpc(10).await;
    match response {
        // We got a response.
        Ok(count) => println!("Current count: {}", count),
        // Actor failed to respond.
        Err(err) => eprintln!("Counter didn't reply: {}", err),
    }
}

Supporting multiple procedure within the same actor is possible by making the message an enum as the example below shows. Furthermore synchronous actors are supported.

use heph::actor::{self, SyncContext};
use heph::actor_ref::{ActorRef, RpcMessage};
use heph::from_message;
use heph::rt::{self, ThreadLocal};

/// Message type for [`counter`].
enum Message {
    /// Increase the counter, returning the current state.
    Add(RpcMessage<usize, usize>),
    /// Get the current state of the counter.
    Get(RpcMessage<(), usize>),
}

// Implement the `From` trait for `Message`.
from_message!(Message::Add(usize) -> usize);
from_message!(Message::Get(()) -> usize);

/// Receiving synchronous actor of the RPC.
fn counter(mut ctx: SyncContext<Message>) {
    // State of the counter.
    let mut count: usize = 0;

    // Receive messages in a loop.
    while let Ok(msg) = ctx.receive_next() {
        match msg {
            Message::Add(RpcMessage { request, response }) => {
                count += request;
                // Send back the current state, ignoring any errors.
                let _ = response.respond(count);
            },
            Message::Get(RpcMessage { response, .. }) => {
                // Send back the current state, ignoring any errors.
                let _ = response.respond(count);
            },
        }
    }
}

/// Sending actor of the RPC.
async fn requester(_: actor::Context<!, ThreadLocal>, actor_ref: ActorRef<Message>) {
    // Increase the counter by ten.
    // NOTE: do handle the errors correctly in practice, this is just an
    // example.
    let count = actor_ref.rpc(10).await.unwrap();
    println!("Increased count to {}", count);

    // Retrieve the current count.
    let count = actor_ref.rpc(()).await.unwrap();
    println!("Current count {}", count);
}

Structs

Future that resolves to a Remote Procedure Call (RPC) response.

Message type that holds an RPC request.

Structure to respond to an Rpc request.

Enums

Error returned by Rpc.