client_rpc_handler/
client_rpc_handler.rsuse busrt::async_trait;
use busrt::client::AsyncClient;
use busrt::ipc::{Client, Config};
use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
use busrt::{Frame, QoS};
use serde::Deserialize;
use std::collections::BTreeMap;
use std::sync::atomic;
use std::time::Duration;
use tokio::time::sleep;
struct MyHandlers {
counter: atomic::AtomicU64,
}
#[derive(Deserialize)]
struct AddParams {
value: u64,
}
#[async_trait]
impl RpcHandlers for MyHandlers {
async fn handle_call(&self, event: RpcEvent) -> RpcResult {
match event.parse_method()? {
"test" => {
let mut payload = BTreeMap::new();
payload.insert("ok", true);
Ok(Some(rmp_serde::to_vec_named(&payload)?))
}
"get" => {
let mut payload = BTreeMap::new();
payload.insert("value", self.counter.load(atomic::Ordering::SeqCst));
Ok(Some(rmp_serde::to_vec_named(&payload)?))
}
"add" => {
let params: AddParams = rmp_serde::from_slice(event.payload())?;
self.counter
.fetch_add(params.value, atomic::Ordering::SeqCst);
Ok(None)
}
_ => Err(RpcError::method(None)),
}
}
async fn handle_notification(&self, event: RpcEvent) {
println!(
"Got RPC notification from {}: {}",
event.sender(),
std::str::from_utf8(event.payload()).unwrap_or("something unreadable")
);
}
async fn handle_frame(&self, frame: Frame) {
println!(
"Got non-RPC frame from {}: {:?} {:?} {}",
frame.sender(),
frame.kind(),
frame.topic(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.rpc";
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
let op_confirm = client.subscribe("#", QoS::Processed).await?.expect("no op");
op_confirm.await??;
let handlers = MyHandlers {
counter: atomic::AtomicU64::default(),
};
let rpc = RpcClient::new(client, handlers);
println!("Waiting for frames to {}", name);
while rpc.is_connected() {
sleep(Duration::from_secs(1)).await;
}
Ok(())
}