1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use busrt::async_trait;
use busrt::client::AsyncClient;
use busrt::ipc::{Client, Config};
use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
use busrt::{Frame, QoS};
use serde::Deserialize;
use std::collections::BTreeMap;
use std::sync::atomic;
use std::time::Duration;
use tokio::time::sleep;
struct MyHandlers {
counter: atomic::AtomicU64,
}
#[derive(Deserialize)]
struct AddParams {
value: u64,
}
#[async_trait]
impl RpcHandlers for MyHandlers {
async fn handle_call(&self, event: RpcEvent) -> RpcResult {
match event.parse_method()? {
"test" => {
let mut payload = BTreeMap::new();
payload.insert("ok", true);
Ok(Some(rmp_serde::to_vec_named(&payload)?))
}
"get" => {
let mut payload = BTreeMap::new();
payload.insert("value", self.counter.load(atomic::Ordering::SeqCst));
Ok(Some(rmp_serde::to_vec_named(&payload)?))
}
"add" => {
let params: AddParams = rmp_serde::from_slice(event.payload())?;
self.counter
.fetch_add(params.value, atomic::Ordering::SeqCst);
Ok(None)
}
_ => Err(RpcError::method(None)),
}
}
async fn handle_notification(&self, event: RpcEvent) {
println!(
"Got RPC notification from {}: {}",
event.sender(),
std::str::from_utf8(event.payload()).unwrap_or("something unreadable")
);
}
async fn handle_frame(&self, frame: Frame) {
println!(
"Got non-RPC frame from {}: {:?} {:?} {}",
frame.sender(),
frame.kind(),
frame.topic(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.rpc";
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
opc.await??;
let handlers = MyHandlers {
counter: atomic::AtomicU64::default(),
};
let rpc = RpcClient::new(client, handlers);
println!("Waiting for frames to {}", name);
while rpc.is_connected() {
sleep(Duration::from_secs(1)).await;
let _r = rpc.client().lock().await.ping().await;
}
Ok(())
}