broker_custom_rpc/
broker_custom_rpc.rs1use busrt::async_trait;
3use busrt::broker::{Broker, ServerConfig, BROKER_NAME};
4use busrt::client::AsyncClient;
5use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
6use busrt::{Frame, QoS};
7use serde::Deserialize;
8use std::time::Duration;
9use tokio::time::sleep;
10
11struct MyHandlers {}
12
13#[derive(Deserialize)]
14struct PingParams<'a> {
15 message: Option<&'a str>,
16}
17
18#[async_trait]
19impl RpcHandlers for MyHandlers {
20 async fn handle_call(&self, event: RpcEvent) -> RpcResult {
23 match event.parse_method()? {
24 "test" => Ok(Some("passed".as_bytes().to_vec())),
25 "ping" => {
26 let params: PingParams = rmp_serde::from_slice(event.payload())?;
27 Ok(params.message.map(|m| m.as_bytes().to_vec()))
28 }
29 _ => Err(RpcError::method(None)),
30 }
31 }
32 async fn handle_notification(&self, event: RpcEvent) {
34 println!(
35 "Got RPC notification from {}: {}",
36 event.sender(),
37 std::str::from_utf8(event.payload()).unwrap_or("something unreadable")
38 );
39 }
40 async fn handle_frame(&self, frame: Frame) {
42 println!(
43 "Got non-RPC frame from {}: {:?} {:?} {}",
44 frame.sender(),
45 frame.kind(),
46 frame.topic(),
47 std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
48 );
49 }
50}
51
52#[tokio::main]
53async fn main() -> Result<(), Box<dyn std::error::Error>> {
54 let mut broker = Broker::new();
56 broker
58 .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
59 .await?;
60 let mut core_client = broker.register_client(BROKER_NAME).await?;
62 core_client.subscribe("#", QoS::No).await?;
64 let handlers = MyHandlers {};
66 let crpc = RpcClient::new(core_client, handlers);
68 println!("Waiting for frames to {}", BROKER_NAME);
69 broker.set_core_rpc_client(crpc).await;
73 broker.spawn_fifo("/tmp/busrt.fifo", 8192).await?;
75 while broker
77 .core_rpc_client()
78 .lock()
79 .await
80 .as_ref()
81 .unwrap()
82 .is_connected()
83 {
84 sleep(Duration::from_secs(1)).await;
85 }
86 Ok(())
87}