pub struct Client { /* private fields */ }Implementations§
Source§impl Client
impl Client
Sourcepub async fn connect(config: &Config) -> Result<Self, Error>
pub async fn connect(config: &Config) -> Result<Self, Error>
Examples found in repository?
examples/client_listener.rs (line 11)
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.listener";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// subscribe to all topics
let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
opc.await??;
// handle incoming frames
let rx = client.take_event_channel().unwrap();
while let Ok(frame) = rx.recv().await {
println!(
"Frame from {}: {:?} {:?} {}",
frame.sender(),
frame.kind(),
frame.topic(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}More examples
examples/client_rpc_handler.rs (line 77)
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.rpc";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// subscribe the cclient to all topics to print publish frames when received
let op_confirm = client.subscribe("#", QoS::Processed).await?.expect("no op");
// receive operation confirmation
op_confirm.await??;
// create handlers object
let handlers = MyHandlers {
counter: atomic::AtomicU64::default(),
};
// create RPC
let rpc = RpcClient::new(client, handlers);
println!("Waiting for frames to {}", name);
while rpc.is_connected() {
sleep(Duration::from_secs(1)).await;
}
Ok(())
}examples/client_sender.rs (line 11)
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.sender";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// publish to a topic
let opc = client
.publish("some/topic", "hello".as_bytes().into(), QoS::Processed)
.await?
.expect("no op");
opc.await??;
// send a direct message
let opc = client
.send(
"test.client.listener",
"hello".as_bytes().into(),
QoS::Processed,
)
.await?
.expect("no op");
opc.await??;
// send a broadcast message
let opc = client
.send_broadcast("test.*", "hello everyone".as_bytes().into(), QoS::Processed)
.await?
.expect("no op");
opc.await??;
Ok(())
}examples/client_rpc.rs (line 21)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.123";
let target = "test.client.rpc";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let client = Client::connect(&config).await?;
// create RPC with no handlers
let rpc = RpcClient::new0(client);
// call the method with no confirm
rpc.call0(target, "test", empty_payload!(), QoS::Processed)
.await?;
let mut payload: BTreeMap<&str, u32> = <_>::default();
payload.insert("value", 10);
// call a method with confirm to make sure the value is added
rpc.call(
target,
"add",
rmp_serde::to_vec_named(&payload)?.into(),
QoS::Processed,
)
.await?;
// call the method to read the sum
let result = rpc
.call(target, "get", empty_payload!(), QoS::Processed)
.await?;
let amount: Amount = rmp_serde::from_slice(result.payload())?;
println!("{}", amount.value);
Ok(())
}examples/client_cursor.rs (line 21)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.123";
let target = "db";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let client = Client::connect(&config).await?;
// create RPC with no handlers
let rpc = RpcClient::new0(client);
// get a cursor
let cursor: cursors::Payload = rmp_serde::from_slice(
rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
.await?
.payload(),
)?;
// let us use a cow to avoid unnecessary data serialization every time when the method is
// called
let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
loop {
// get customers one-by-one
let result = rpc
.call(target, "N", b_cursor.clone(), QoS::Processed)
.await?;
let data = result.payload();
// the payload is empty when there are no more records left
if data.is_empty() {
break;
}
let customer: Customer = rmp_serde::from_slice(data)?;
println!("{}: {}", customer.id, customer.name);
}
// do the same in bulk
let bulk_size = 100;
// get a cursor
let mut cursor: cursors::Payload = rmp_serde::from_slice(
rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
.await?
.payload(),
)?;
cursor.set_bulk_number(bulk_size);
let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
loop {
// get customers in bulk
let result = rpc
.call(target, "NB", b_cursor.clone(), QoS::Processed)
.await?;
let customers: Vec<Customer> = rmp_serde::from_slice(result.payload())?;
for customer in &customers {
println!("{}: {}", customer.id, customer.name);
}
// stop if the block contains less records than the bulk size - that means it is the last
// block
if customers.len() < bulk_size {
break;
}
}
Ok(())
}pub async fn register_secondary(&self) -> Result<Self, Error>
pub fn get_timeout(&self) -> Duration
Trait Implementations§
Source§impl AsyncClient for Client
impl AsyncClient for Client
fn take_event_channel(&mut self) -> Option<EventChannel>
fn get_connected_beacon(&self) -> Option<Arc<AtomicBool>>
fn send<'life0, 'life1, 'async_trait>(
&'life0 mut self,
target: &'life1 str,
payload: Cow<'async_trait>,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn zc_send<'life0, 'life1, 'async_trait>(
&'life0 mut self,
target: &'life1 str,
header: Cow<'async_trait>,
payload: Cow<'async_trait>,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn send_broadcast<'life0, 'life1, 'async_trait>(
&'life0 mut self,
target: &'life1 str,
payload: Cow<'async_trait>,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn publish<'life0, 'life1, 'async_trait>(
&'life0 mut self,
target: &'life1 str,
payload: Cow<'async_trait>,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn publish_for<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
target: &'life1 str,
receiver: &'life2 str,
payload: Cow<'async_trait>,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn subscribe<'life0, 'life1, 'async_trait>(
&'life0 mut self,
topic: &'life1 str,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn unsubscribe<'life0, 'life1, 'async_trait>(
&'life0 mut self,
topic: &'life1 str,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_bulk<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
topics: &'life1 [&'life2 str],
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn unsubscribe_bulk<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
topics: &'life1 [&'life2 str],
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Source§fn exclude<'life0, 'life1, 'async_trait>(
&'life0 mut self,
topic: &'life1 str,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn exclude<'life0, 'life1, 'async_trait>(
&'life0 mut self,
topic: &'life1 str,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
exclude a topic. it is highly recommended to exclude topics first, then call subscribe
operations to avoid receiving unwanted messages. excluding topics is also an additional
heavy operation so use it only when there is no other way.
Source§fn unexclude<'life0, 'life1, 'async_trait>(
&'life0 mut self,
topic: &'life1 str,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn unexclude<'life0, 'life1, 'async_trait>(
&'life0 mut self,
topic: &'life1 str,
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
unexclude a topic (include back but not subscribe)
Source§fn exclude_bulk<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
topics: &'life1 [&'life2 str],
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn exclude_bulk<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
topics: &'life1 [&'life2 str],
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
exclude multiple topics
Source§fn unexclude_bulk<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
topics: &'life1 [&'life2 str],
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn unexclude_bulk<'life0, 'life1, 'life2, 'async_trait>(
&'life0 mut self,
topics: &'life1 [&'life2 str],
qos: QoS,
) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
unexclude multiple topics (include back but not subscribe)
fn ping<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn is_connected(&self) -> bool
fn get_timeout(&self) -> Option<Duration>
fn get_name(&self) -> &str
Auto Trait Implementations§
impl !Freeze for Client
impl !RefUnwindSafe for Client
impl Send for Client
impl Sync for Client
impl Unpin for Client
impl !UnwindSafe for Client
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more