krossbar_rpc/
lib.rs

1/*!
2RPC library used by Krossbar platform for communication.
3
4The library:
5- Receives [tokio::net::UnixStream] and returns RPC handle;
6- Allows making calls, subscribing to an endpoint, and sending [tokio::net::UnixStream] using RPC connection;
7- Supports replacing the stream after reconnection, resubscribing to the active subscriptions, and keeping all client handles valid;
8- Supports message exchange monitoring via [Monitor]
9
10Use [rpc::Rpc::poll] method to poll the stream. This includes waiting for a call or subscriptions response.
11
12# Examples
13
14RPC calls:
15```
16use futures::{select, FutureExt};
17use tokio::net::UnixStream;
18
19use krossbar_rpc::rpc::Rpc;
20
21async fn call() {
22    let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
23    let mut rpc = Rpc::new(stream, "hub");
24
25    let call = rpc.call::<u32, u32>("echo", &42).await.unwrap();
26
27    select! {
28        response = call.fuse() => {
29            println!("Call response: {response:?}")
30        },
31        _ = rpc.poll().fuse() => {}
32    }
33}
34```
35
36RPC subscription:
37```
38use futures::{select, FutureExt, StreamExt};
39use tokio::net::UnixStream;
40
41use krossbar_rpc::rpc::Rpc;
42
43async fn subscribe() {
44    let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
45    let mut rpc = Rpc::new(stream, "hub");
46
47    let subscription = rpc.subscribe::<u32>("signal").await.unwrap();
48
49    select! {
50        response = subscription.take(2).collect::<Vec<krossbar_rpc::Result<u32>>>() => {
51            println!("Subscription response: {response:?}")
52        },
53        _ = rpc.poll().fuse() => {}
54    }
55}
56```
57
58One-way message:
59```
60use futures::{select, FutureExt};
61use tokio::net::UnixStream;
62
63use krossbar_rpc::rpc::Rpc;
64
65async fn message() {
66    let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
67    let mut rpc = Rpc::new(stream, "hub");
68
69    let call = rpc.send_message("echo", &42).await.unwrap();
70
71    let incoming_message = rpc.poll().await;
72}
73```
74
75Polling imcoming messages:
76```
77use futures::{select, FutureExt};
78use tokio::net::UnixStream;
79
80use krossbar_rpc::{rpc::Rpc, request::Body};
81
82async fn poll() {
83    let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
84    let mut rpc = Rpc::new(stream, "hub");
85
86    loop {
87        let request = rpc.poll().await;
88
89        if request.is_none() {
90            println!("Client disconnected");
91            return;
92        }
93
94        let mut request = request.unwrap();
95
96        println!("Incoming method call: {}", request.endpoint());
97        match request.take_body().unwrap() {
98            Body::Message(bson) => {
99                println!("Incoming message: {bson:?}");
100            },
101            Body::Call(bson) => {
102                println!("Incoming call: {bson:?}");
103                request.respond(Ok(bson)).await;
104            },
105            Body::Subscription => {
106                println!("Incoming subscription");
107                request.respond(Ok(41)).await;
108                request.respond(Ok(42)).await;
109                request.respond(Ok(43)).await;
110            },
111            Body::Fd { client_name, .. } => {
112                println!("Incoming connection request from {client_name}");
113                request.respond(Ok(())).await;
114            }
115        }
116    }
117}
118```
119
120See `tests/` for more examples.
121*/
122
123mod calls_registry;
124mod error;
125mod message;
126mod message_stream;
127#[cfg(feature = "monitor")]
128pub mod monitor;
129pub mod request;
130pub mod rpc;
131pub mod writer;
132
133pub use error::*;
134
135#[cfg(feature = "impl-monitor")]
136pub use message::*;
137#[cfg(feature = "impl-monitor")]
138pub use monitor::*;