broker_custom_rpc/
broker_custom_rpc.rs

1// Demo of custom broker internal RPC
2use busrt::async_trait;
3use busrt::broker::{Broker, ServerConfig, BROKER_NAME};
4use busrt::client::AsyncClient;
5use busrt::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
6use busrt::{Frame, QoS};
7use serde::Deserialize;
8use std::time::Duration;
9use tokio::time::sleep;
10
11struct MyHandlers {}
12
13#[derive(Deserialize)]
14struct PingParams<'a> {
15    message: Option<&'a str>,
16}
17
18#[async_trait]
19impl RpcHandlers for MyHandlers {
20    // RPC call handler. Will react to the "test" (any params) and "ping" (will parse params as
21    // msgpack and return the "message" field back) methods
22    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
23        match event.parse_method()? {
24            "test" => Ok(Some("passed".as_bytes().to_vec())),
25            "ping" => {
26                let params: PingParams = rmp_serde::from_slice(event.payload())?;
27                Ok(params.message.map(|m| m.as_bytes().to_vec()))
28            }
29            _ => Err(RpcError::method(None)),
30        }
31    }
32    // Handle RPC notifications
33    async fn handle_notification(&self, event: RpcEvent) {
34        println!(
35            "Got RPC notification from {}: {}",
36            event.sender(),
37            std::str::from_utf8(event.payload()).unwrap_or("something unreadable")
38        );
39    }
40    // handle broadcast notifications and topic publications
41    async fn handle_frame(&self, frame: Frame) {
42        println!(
43            "Got non-RPC frame from {}: {:?} {:?} {}",
44            frame.sender(),
45            frame.kind(),
46            frame.topic(),
47            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
48        );
49    }
50}
51
52#[tokio::main]
53async fn main() -> Result<(), Box<dyn std::error::Error>> {
54    // create a new broker instance
55    let mut broker = Broker::new();
56    // spawn unix server for external clients
57    broker
58        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
59        .await?;
60    // register the broker core client
61    let mut core_client = broker.register_client(BROKER_NAME).await?;
62    // subscribe the core client to all topics to print publish frames when received
63    core_client.subscribe("#", QoS::No).await?;
64    // create handlers object
65    let handlers = MyHandlers {};
66    // create RPC
67    let crpc = RpcClient::new(core_client, handlers);
68    println!("Waiting for frames to {}", BROKER_NAME);
69    // set broker client, optional, allows to spawn fifo servers, the client is wrapped in
70    // Arc<Mutex<_>> as it is cloned for each fifo spawned and can be got back with core_rpc_client
71    // broker method
72    broker.set_core_rpc_client(crpc).await;
73    // test it with echo .broker .hello > /tmp/busrt.fifo
74    broker.spawn_fifo("/tmp/busrt.fifo", 8192).await?;
75    // this is the internal client, it will be connected forever
76    while broker
77        .core_rpc_client()
78        .lock()
79        .await
80        .as_ref()
81        .unwrap()
82        .is_connected()
83    {
84        sleep(Duration::from_secs(1)).await;
85    }
86    Ok(())
87}