client_rpc_handler/
client_rpc_handler.rs1use busrt::async_trait;
6use busrt::client::AsyncClient;
7use busrt::ipc::{Client, Config};
8use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
9use busrt::{Frame, QoS};
10use serde::Deserialize;
11use std::collections::BTreeMap;
12use std::sync::atomic;
13use std::time::Duration;
14use tokio::time::sleep;
15
16struct MyHandlers {
17 counter: atomic::AtomicU64,
20}
21
22#[derive(Deserialize)]
23struct AddParams {
24 value: u64,
25}
26
27#[async_trait]
28impl RpcHandlers for MyHandlers {
29 async fn handle_call(&self, event: RpcEvent) -> RpcResult {
32 match event.parse_method()? {
33 "test" => {
34 let mut payload = BTreeMap::new();
35 payload.insert("ok", true);
36 Ok(Some(rmp_serde::to_vec_named(&payload)?))
37 }
38 "get" => {
39 let mut payload = BTreeMap::new();
40 payload.insert("value", self.counter.load(atomic::Ordering::SeqCst));
41 Ok(Some(rmp_serde::to_vec_named(&payload)?))
42 }
43 "add" => {
44 let params: AddParams = rmp_serde::from_slice(event.payload())?;
45 self.counter
46 .fetch_add(params.value, atomic::Ordering::SeqCst);
47 Ok(None)
48 }
49 _ => Err(RpcError::method(None)),
50 }
51 }
52 async fn handle_notification(&self, event: RpcEvent) {
54 println!(
55 "Got RPC notification from {}: {}",
56 event.sender(),
57 std::str::from_utf8(event.payload()).unwrap_or("something unreadable")
58 );
59 }
60 async fn handle_frame(&self, frame: Frame) {
62 println!(
63 "Got non-RPC frame from {}: {:?} {:?} {}",
64 frame.sender(),
65 frame.kind(),
66 frame.topic(),
67 std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
68 );
69 }
70}
71
72#[tokio::main]
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 let name = "test.client.rpc";
75 let config = Config::new("/tmp/busrt.sock", name);
77 let mut client = Client::connect(&config).await?;
78 let op_confirm = client.subscribe("#", QoS::Processed).await?.expect("no op");
80 op_confirm.await??;
82 let handlers = MyHandlers {
84 counter: atomic::AtomicU64::default(),
85 };
86 let rpc = RpcClient::new(client, handlers);
88 println!("Waiting for frames to {}", name);
89 while rpc.is_connected() {
90 sleep(Duration::from_secs(1)).await;
91 }
92 Ok(())
93}