client_rpc_handler/
client_rpc_handler.rs

1// Demo of client RPC handler
2//
3// use client_rpc example to test client/server, don't forget to launch a standalone broker server
4// instance
5use busrt::async_trait;
6use busrt::client::AsyncClient;
7use busrt::ipc::{Client, Config};
8use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
9use busrt::{Frame, QoS};
10use serde::Deserialize;
11use std::collections::BTreeMap;
12use std::sync::atomic;
13use std::time::Duration;
14use tokio::time::sleep;
15
16struct MyHandlers {
17    // all RPC handlers are launched in parallel multiple instances, so the internal variables need
18    // to be either atomic or under Mutex/RwLock to be modified
19    counter: atomic::AtomicU64,
20}
21
22#[derive(Deserialize)]
23struct AddParams {
24    value: u64,
25}
26
27#[async_trait]
28impl RpcHandlers for MyHandlers {
29    // RPC call handler. Will react to the "test" and "get" (any params) and "add" (will parse
30    // params as msgpack and add the value to the internal counter) methods
31    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
32        match event.parse_method()? {
33            "test" => {
34                let mut payload = BTreeMap::new();
35                payload.insert("ok", true);
36                Ok(Some(rmp_serde::to_vec_named(&payload)?))
37            }
38            "get" => {
39                let mut payload = BTreeMap::new();
40                payload.insert("value", self.counter.load(atomic::Ordering::SeqCst));
41                Ok(Some(rmp_serde::to_vec_named(&payload)?))
42            }
43            "add" => {
44                let params: AddParams = rmp_serde::from_slice(event.payload())?;
45                self.counter
46                    .fetch_add(params.value, atomic::Ordering::SeqCst);
47                Ok(None)
48            }
49            _ => Err(RpcError::method(None)),
50        }
51    }
52    // Handle RPC notifications
53    async fn handle_notification(&self, event: RpcEvent) {
54        println!(
55            "Got RPC notification from {}: {}",
56            event.sender(),
57            std::str::from_utf8(event.payload()).unwrap_or("something unreadable")
58        );
59    }
60    // handle broadcast notifications and topic publications
61    async fn handle_frame(&self, frame: Frame) {
62        println!(
63            "Got non-RPC frame from {}: {:?} {:?} {}",
64            frame.sender(),
65            frame.kind(),
66            frame.topic(),
67            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
68        );
69    }
70}
71
72#[tokio::main]
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    let name = "test.client.rpc";
75    // create a new client instance
76    let config = Config::new("/tmp/busrt.sock", name);
77    let mut client = Client::connect(&config).await?;
78    // subscribe the cclient to all topics to print publish frames when received
79    let op_confirm = client.subscribe("#", QoS::Processed).await?.expect("no op");
80    // receive operation confirmation
81    op_confirm.await??;
82    // create handlers object
83    let handlers = MyHandlers {
84        counter: atomic::AtomicU64::default(),
85    };
86    // create RPC
87    let rpc = RpcClient::new(client, handlers);
88    println!("Waiting for frames to {}", name);
89    while rpc.is_connected() {
90        sleep(Duration::from_secs(1)).await;
91    }
92    Ok(())
93}