busrt::ipc

Struct Client

Source
pub struct Client { /* private fields */ }

Implementations§

Source§

impl Client

Source

pub async fn connect(config: &Config) -> Result<Self, Error>

Examples found in repository?
examples/client_listener.rs (line 11)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.listener";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // subscribe to all topics
    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
    opc.await??;
    // handle incoming frames
    let rx = client.take_event_channel().unwrap();
    while let Ok(frame) = rx.recv().await {
        println!(
            "Frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 77)
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.rpc";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // subscribe the cclient to all topics to print publish frames when received
    let op_confirm = client.subscribe("#", QoS::Processed).await?.expect("no op");
    // receive operation confirmation
    op_confirm.await??;
    // create handlers object
    let handlers = MyHandlers {
        counter: atomic::AtomicU64::default(),
    };
    // create RPC
    let rpc = RpcClient::new(client, handlers);
    println!("Waiting for frames to {}", name);
    while rpc.is_connected() {
        sleep(Duration::from_secs(1)).await;
    }
    Ok(())
}
examples/client_sender.rs (line 11)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.sender";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // publish to a topic
    let opc = client
        .publish("some/topic", "hello".as_bytes().into(), QoS::Processed)
        .await?
        .expect("no op");
    opc.await??;
    // send a direct message
    let opc = client
        .send(
            "test.client.listener",
            "hello".as_bytes().into(),
            QoS::Processed,
        )
        .await?
        .expect("no op");
    opc.await??;
    // send a broadcast message
    let opc = client
        .send_broadcast("test.*", "hello everyone".as_bytes().into(), QoS::Processed)
        .await?
        .expect("no op");
    opc.await??;
    Ok(())
}
examples/client_rpc.rs (line 21)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.123";
    let target = "test.client.rpc";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let client = Client::connect(&config).await?;
    // create RPC with no handlers
    let rpc = RpcClient::new0(client);
    // call the method with no confirm
    rpc.call0(target, "test", empty_payload!(), QoS::Processed)
        .await?;
    let mut payload: BTreeMap<&str, u32> = <_>::default();
    payload.insert("value", 10);
    // call a method with confirm to make sure the value is added
    rpc.call(
        target,
        "add",
        rmp_serde::to_vec_named(&payload)?.into(),
        QoS::Processed,
    )
    .await?;
    // call the method to read the sum
    let result = rpc
        .call(target, "get", empty_payload!(), QoS::Processed)
        .await?;
    let amount: Amount = rmp_serde::from_slice(result.payload())?;
    println!("{}", amount.value);
    Ok(())
}
examples/client_cursor.rs (line 21)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.123";
    let target = "db";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let client = Client::connect(&config).await?;
    // create RPC with no handlers
    let rpc = RpcClient::new0(client);
    // get a cursor
    let cursor: cursors::Payload = rmp_serde::from_slice(
        rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
            .await?
            .payload(),
    )?;
    // let us use a cow to avoid unnecessary data serialization every time when the method is
    // called
    let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
    let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
    loop {
        // get customers one-by-one
        let result = rpc
            .call(target, "N", b_cursor.clone(), QoS::Processed)
            .await?;
        let data = result.payload();
        // the payload is empty when there are no more records left
        if data.is_empty() {
            break;
        }
        let customer: Customer = rmp_serde::from_slice(data)?;
        println!("{}: {}", customer.id, customer.name);
    }
    // do the same in bulk
    let bulk_size = 100;
    // get a cursor
    let mut cursor: cursors::Payload = rmp_serde::from_slice(
        rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
            .await?
            .payload(),
    )?;
    cursor.set_bulk_number(bulk_size);
    let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
    let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
    loop {
        // get customers in bulk
        let result = rpc
            .call(target, "NB", b_cursor.clone(), QoS::Processed)
            .await?;
        let customers: Vec<Customer> = rmp_serde::from_slice(result.payload())?;
        for customer in &customers {
            println!("{}: {}", customer.id, customer.name);
        }
        // stop if the block contains less records than the bulk size - that means it is the last
        // block
        if customers.len() < bulk_size {
            break;
        }
    }
    Ok(())
}
Source

pub async fn register_secondary(&self) -> Result<Self, Error>

Source

pub fn get_timeout(&self) -> Duration

Trait Implementations§

Source§

impl AsyncClient for Client

Source§

fn take_event_channel(&mut self) -> Option<EventChannel>

Source§

fn get_connected_beacon(&self) -> Option<Arc<AtomicBool>>

Source§

fn send<'life0, 'life1, 'async_trait>( &'life0 mut self, target: &'life1 str, payload: Cow<'async_trait>, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn zc_send<'life0, 'life1, 'async_trait>( &'life0 mut self, target: &'life1 str, header: Cow<'async_trait>, payload: Cow<'async_trait>, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn send_broadcast<'life0, 'life1, 'async_trait>( &'life0 mut self, target: &'life1 str, payload: Cow<'async_trait>, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn publish<'life0, 'life1, 'async_trait>( &'life0 mut self, target: &'life1 str, payload: Cow<'async_trait>, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn publish_for<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, target: &'life1 str, receiver: &'life2 str, payload: Cow<'async_trait>, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source§

fn subscribe<'life0, 'life1, 'async_trait>( &'life0 mut self, topic: &'life1 str, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn unsubscribe<'life0, 'life1, 'async_trait>( &'life0 mut self, topic: &'life1 str, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn subscribe_bulk<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, topics: &'life1 [&'life2 str], qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source§

fn unsubscribe_bulk<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, topics: &'life1 [&'life2 str], qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Source§

fn exclude<'life0, 'life1, 'async_trait>( &'life0 mut self, topic: &'life1 str, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

exclude a topic. it is highly recommended to exclude topics first, then call subscribe operations to avoid receiving unwanted messages. excluding topics is also an additional heavy operation so use it only when there is no other way.
Source§

fn unexclude<'life0, 'life1, 'async_trait>( &'life0 mut self, topic: &'life1 str, qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

unexclude a topic (include back but not subscribe)
Source§

fn exclude_bulk<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, topics: &'life1 [&'life2 str], qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

exclude multiple topics
Source§

fn unexclude_bulk<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, topics: &'life1 [&'life2 str], qos: QoS, ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

unexclude multiple topics (include back but not subscribe)
Source§

fn ping<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn is_connected(&self) -> bool

Source§

fn get_timeout(&self) -> Option<Duration>

Source§

fn get_name(&self) -> &str

Source§

impl Drop for Client

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl !Freeze for Client

§

impl !RefUnwindSafe for Client

§

impl Send for Client

§

impl Sync for Client

§

impl Unpin for Client

§

impl !UnwindSafe for Client

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.