Struct busrt::rpc::RpcClient

source ·
pub struct RpcClient { /* private fields */ }

Implementations§

source§

impl RpcClient

source

pub fn new<H>(client: impl AsyncClient + 'static, handlers: H) -> Selfwhere H: RpcHandlers + Send + Sync + 'static,

creates RPC client with the specified handlers and the default options

Examples found in repository?
examples/client_rpc_handler.rs (line 87)
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.rpc";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // subscribe the cclient to all topics to print publish frames when received
    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
    // receive operation confirmation
    opc.await??;
    // create handlers object
    let handlers = MyHandlers {
        counter: atomic::AtomicU64::default(),
    };
    // create RPC
    let rpc = RpcClient::new(client, handlers);
    println!("Waiting for frames to {}", name);
    while rpc.is_connected() {
        sleep(Duration::from_secs(1)).await;
        // if the broker is unavailable, ping sets the client and RPC to disconnected state
        // after, the program can try reconnecting or quit
        let _r = rpc.client().lock().await.ping().await;
    }
    Ok(())
}
More examples
Hide additional examples
examples/broker_custom_rpc.rs (line 67)
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // create a new broker instance
    let mut broker = Broker::new();
    // spawn unix server for external clients
    broker
        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
        .await?;
    // register the broker core client
    let mut core_client = broker.register_client(BROKER_NAME).await?;
    // subscribe the core client to all topics to print publish frames when received
    core_client.subscribe("#", QoS::No).await?;
    // create handlers object
    let handlers = MyHandlers {};
    // create RPC
    let crpc = RpcClient::new(core_client, handlers);
    println!("Waiting for frames to {}", BROKER_NAME);
    // set broker client, optional, allows to spawn fifo servers, the client is wrapped in
    // Arc<Mutex<_>> as it is cloned for each fifo spawned and can be got back with core_rpc_client
    // broker method
    broker.set_core_rpc_client(crpc).await;
    // test it with echo .broker .hello > /tmp/busrt.fifo
    broker.spawn_fifo("/tmp/busrt.fifo", 8192).await?;
    // this is the internal client, it will be connected forever
    while broker
        .core_rpc_client()
        .lock()
        .await
        .as_ref()
        .unwrap()
        .is_connected()
    {
        sleep(Duration::from_secs(1)).await;
    }
    Ok(())
}
source

pub fn new0(client: impl AsyncClient + 'static) -> Self

creates RPC client with dummy handlers and the default options

Examples found in repository?
examples/client_rpc.rs (line 23)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.123";
    let target = "test.client.rpc";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let client = Client::connect(&config).await?;
    // create RPC with no handlers
    let rpc = RpcClient::new0(client);
    // call the method with no confirm
    rpc.call0(target, "test", empty_payload!(), QoS::Processed)
        .await?;
    let mut payload: BTreeMap<&str, u32> = <_>::default();
    payload.insert("value", 10);
    // call a method with confirm to make sure the value is added
    rpc.call(
        target,
        "add",
        rmp_serde::to_vec_named(&payload)?.into(),
        QoS::Processed,
    )
    .await?;
    // call the method to read the sum
    let result = rpc
        .call(target, "get", empty_payload!(), QoS::Processed)
        .await?;
    let amount: Amount = rmp_serde::from_slice(result.payload())?;
    println!("{}", amount.value);
    Ok(())
}
More examples
Hide additional examples
examples/client_cursor.rs (line 23)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.123";
    let target = "db";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let client = Client::connect(&config).await?;
    // create RPC with no handlers
    let rpc = RpcClient::new0(client);
    // get a cursor
    let cursor: cursors::Payload = rmp_serde::from_slice(
        rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
            .await?
            .payload(),
    )?;
    // let us use a cow to avoid unnecessary data serialization every time when the method is
    // called
    let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
    let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
    loop {
        // get customers one-by-one
        let result = rpc
            .call(target, "N", b_cursor.clone(), QoS::Processed)
            .await?;
        let data = result.payload();
        // the payload is empty when there are no more records left
        if data.is_empty() {
            break;
        }
        let customer: Customer = rmp_serde::from_slice(data)?;
        println!("{}: {}", customer.id, customer.name);
    }
    // do the same in bulk
    let bulk_size = 100;
    // get a cursor
    let mut cursor: cursors::Payload = rmp_serde::from_slice(
        rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
            .await?
            .payload(),
    )?;
    cursor.set_bulk_count(bulk_size);
    let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
    let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
    loop {
        // get customers in bulk
        let result = rpc
            .call(target, "NB", b_cursor.clone(), QoS::Processed)
            .await?;
        let customers: Vec<Customer> = rmp_serde::from_slice(result.payload())?;
        for customer in &customers {
            println!("{}: {}", customer.id, customer.name);
        }
        // stop if the block contains less records than the bulk size - that means it is the last
        // block
        if customers.len() < bulk_size {
            break;
        }
    }
    Ok(())
}
source

pub fn create<H>( client: impl AsyncClient + 'static, handlers: H, opts: Options ) -> Selfwhere H: RpcHandlers + Send + Sync + 'static,

creates RPC client

source

pub fn create0(client: impl AsyncClient + 'static, opts: Options) -> Self

creates RPC client with dummy handlers

Trait Implementations§

source§

impl Drop for RpcClient

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl Rpc for RpcClient

source§

fn call<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, target: &'life1 str, method: &'life2 str, params: Cow<'async_trait>, qos: QoS ) -> Pin<Box<dyn Future<Output = Result<RpcEvent, RpcError>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Panics

Will panic on poisoned mutex

source§

fn client(&self) -> Arc<Mutex<dyn AsyncClient + 'static>>

When created, busrt client is wrapped with Arc<Mutex<_>> to let it be sent into the incoming frames handler future Read more
source§

fn notify<'life0, 'life1, 'async_trait>( &'life0 self, target: &'life1 str, data: Cow<'async_trait>, qos: QoS ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

source§

fn call0<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, target: &'life1 str, method: &'life2 str, params: Cow<'async_trait>, qos: QoS ) -> Pin<Box<dyn Future<Output = Result<OpConfirm, Error>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Call the method, no response is required
source§

fn is_connected(&self) -> bool

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.