pub struct RpcClient { /* private fields */ }Implementations§
source§impl RpcClient
impl RpcClient
sourcepub fn new<H>(client: impl AsyncClient + 'static, handlers: H) -> Selfwhere
H: RpcHandlers + Send + Sync + 'static,
pub fn new<H>(client: impl AsyncClient + 'static, handlers: H) -> Selfwhere H: RpcHandlers + Send + Sync + 'static,
creates RPC client with the specified handlers and the default options
Examples found in repository?
examples/client_rpc_handler.rs (line 87)
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.rpc";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// subscribe the cclient to all topics to print publish frames when received
let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
// receive operation confirmation
opc.await??;
// create handlers object
let handlers = MyHandlers {
counter: atomic::AtomicU64::default(),
};
// create RPC
let rpc = RpcClient::new(client, handlers);
println!("Waiting for frames to {}", name);
while rpc.is_connected() {
sleep(Duration::from_secs(1)).await;
// if the broker is unavailable, ping sets the client and RPC to disconnected state
// after, the program can try reconnecting or quit
let _r = rpc.client().lock().await.ping().await;
}
Ok(())
}More examples
examples/broker_custom_rpc.rs (line 67)
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// create a new broker instance
let mut broker = Broker::new();
// spawn unix server for external clients
broker
.spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
.await?;
// register the broker core client
let mut core_client = broker.register_client(BROKER_NAME).await?;
// subscribe the core client to all topics to print publish frames when received
core_client.subscribe("#", QoS::No).await?;
// create handlers object
let handlers = MyHandlers {};
// create RPC
let crpc = RpcClient::new(core_client, handlers);
println!("Waiting for frames to {}", BROKER_NAME);
// set broker client, optional, allows to spawn fifo servers, the client is wrapped in
// Arc<Mutex<_>> as it is cloned for each fifo spawned and can be got back with core_rpc_client
// broker method
broker.set_core_rpc_client(crpc).await;
// test it with echo .broker .hello > /tmp/busrt.fifo
broker.spawn_fifo("/tmp/busrt.fifo", 8192).await?;
// this is the internal client, it will be connected forever
while broker
.core_rpc_client()
.lock()
.await
.as_ref()
.unwrap()
.is_connected()
{
sleep(Duration::from_secs(1)).await;
}
Ok(())
}sourcepub fn new0(client: impl AsyncClient + 'static) -> Self
pub fn new0(client: impl AsyncClient + 'static) -> Self
creates RPC client with dummy handlers and the default options
Examples found in repository?
examples/client_rpc.rs (line 23)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.123";
let target = "test.client.rpc";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let client = Client::connect(&config).await?;
// create RPC with no handlers
let rpc = RpcClient::new0(client);
// call the method with no confirm
rpc.call0(target, "test", empty_payload!(), QoS::Processed)
.await?;
let mut payload: BTreeMap<&str, u32> = <_>::default();
payload.insert("value", 10);
// call a method with confirm to make sure the value is added
rpc.call(
target,
"add",
rmp_serde::to_vec_named(&payload)?.into(),
QoS::Processed,
)
.await?;
// call the method to read the sum
let result = rpc
.call(target, "get", empty_payload!(), QoS::Processed)
.await?;
let amount: Amount = rmp_serde::from_slice(result.payload())?;
println!("{}", amount.value);
Ok(())
}More examples
examples/client_cursor.rs (line 23)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.123";
let target = "db";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let client = Client::connect(&config).await?;
// create RPC with no handlers
let rpc = RpcClient::new0(client);
// get a cursor
let cursor: cursors::Payload = rmp_serde::from_slice(
rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
.await?
.payload(),
)?;
// let us use a cow to avoid unnecessary data serialization every time when the method is
// called
let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
loop {
// get customers one-by-one
let result = rpc
.call(target, "N", b_cursor.clone(), QoS::Processed)
.await?;
let data = result.payload();
// the payload is empty when there are no more records left
if data.is_empty() {
break;
}
let customer: Customer = rmp_serde::from_slice(data)?;
println!("{}: {}", customer.id, customer.name);
}
// do the same in bulk
let bulk_size = 100;
// get a cursor
let mut cursor: cursors::Payload = rmp_serde::from_slice(
rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
.await?
.payload(),
)?;
cursor.set_bulk_count(bulk_size);
let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
loop {
// get customers in bulk
let result = rpc
.call(target, "NB", b_cursor.clone(), QoS::Processed)
.await?;
let customers: Vec<Customer> = rmp_serde::from_slice(result.payload())?;
for customer in &customers {
println!("{}: {}", customer.id, customer.name);
}
// stop if the block contains less records than the bulk size - that means it is the last
// block
if customers.len() < bulk_size {
break;
}
}
Ok(())
}sourcepub fn create<H>(
client: impl AsyncClient + 'static,
handlers: H,
opts: Options
) -> Selfwhere
H: RpcHandlers + Send + Sync + 'static,
pub fn create<H>( client: impl AsyncClient + 'static, handlers: H, opts: Options ) -> Selfwhere H: RpcHandlers + Send + Sync + 'static,
creates RPC client
sourcepub fn create0(client: impl AsyncClient + 'static, opts: Options) -> Self
pub fn create0(client: impl AsyncClient + 'static, opts: Options) -> Self
creates RPC client with dummy handlers
Trait Implementations§
source§impl Rpc for RpcClient
impl Rpc for RpcClient
source§fn call<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
target: &'life1 str,
method: &'life2 str,
params: Cow<'async_trait>,
qos: QoS
) -> Pin<Box<dyn Future<Output = Result<RpcEvent, RpcError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn call<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, target: &'life1 str, method: &'life2 str, params: Cow<'async_trait>, qos: QoS ) -> Pin<Box<dyn Future<Output = Result<RpcEvent, RpcError>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,
Panics
Will panic on poisoned mutex
source§fn client(&self) -> Arc<Mutex<dyn AsyncClient + 'static>>
fn client(&self) -> Arc<Mutex<dyn AsyncClient + 'static>>
When created, busrt client is wrapped with Arc<Mutex<_>> to let it be sent into
the incoming frames handler future Read more
fn notify<'life0, 'life1, 'async_trait>( &'life0 self, target: &'life1 str, data: Cow<'async_trait>, qos: QoS ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,
source§fn call0<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
target: &'life1 str,
method: &'life2 str,
params: Cow<'async_trait>,
qos: QoS
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn call0<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, target: &'life1 str, method: &'life2 str, params: Cow<'async_trait>, qos: QoS ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,
Call the method, no response is required