busrt

Struct FrameData

Source
pub struct FrameData { /* private fields */ }

Implementations§

Source§

impl FrameData

Source

pub fn new( kind: FrameKind, sender: Option<String>, topic: Option<String>, header: Option<Vec<u8>>, buf: Vec<u8>, payload_pos: usize, realtime: bool, ) -> Self

Source

pub fn new_nop() -> Self

Source

pub fn kind(&self) -> FrameKind

Examples found in repository?
examples/broker_custom_rpc.rs (line 45)
41
42
43
44
45
46
47
48
49
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 65)
61
62
63
64
65
66
67
68
69
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
examples/client_listener.rs (line 21)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.listener";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // subscribe to all topics
    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
    opc.await??;
    // handle incoming frames
    let rx = client.take_event_channel().unwrap();
    while let Ok(frame) = rx.recv().await {
        println!(
            "Frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}
Source

pub fn sender(&self) -> &str

§Panics

Will panic if called for a prepared frame

Examples found in repository?
examples/broker_custom_rpc.rs (line 44)
41
42
43
44
45
46
47
48
49
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 64)
61
62
63
64
65
66
67
68
69
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
examples/client_listener.rs (line 20)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.listener";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // subscribe to all topics
    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
    opc.await??;
    // handle incoming frames
    let rx = client.take_event_channel().unwrap();
    while let Ok(frame) = rx.recv().await {
        println!(
            "Frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}
examples/inter_thread.rs (line 54)
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // create a new broker instance
    let mut broker = Broker::new();
    // init the default broker RPC API, optional
    broker.init_default_core_rpc().await?;
    // spawn unix server for external clients
    broker
        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
        .await?;
    // worker 1 will send to worker2 direct "hello" message
    let mut client1 = broker.register_client("worker.1").await?;
    // worker 2 will listen to incoming frames only
    let mut client2 = broker.register_client("worker.2").await?;
    // worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
    // connect the broker via unix socket and receive them as well or send a message to "worker.2"
    // to print it
    let mut client3 = broker.register_client("worker.3").await?;
    let rx = client2.take_event_channel().unwrap();
    tokio::spawn(async move {
        loop {
            client1
                .send("worker.2", "hello".as_bytes().into(), QoS::No)
                .await
                .unwrap();
            sleep(SLEEP_STEP).await;
        }
    });
    tokio::spawn(async move {
        loop {
            client3
                .send_broadcast(
                    "worker.*",
                    "this is a broadcast message".as_bytes().into(),
                    QoS::No,
                )
                .await
                .unwrap();
            sleep(SLEEP_STEP).await;
        }
    });
    while let Ok(frame) = rx.recv().await {
        println!(
            "{}: {}",
            frame.sender(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}
Source

pub fn primary_sender(&self) -> &str

§Panics

Will panic if called for a prepared frame

Source

pub fn topic(&self) -> Option<&str>

Filled for pub/sub communications

Examples found in repository?
examples/broker_custom_rpc.rs (line 46)
41
42
43
44
45
46
47
48
49
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 66)
61
62
63
64
65
66
67
68
69
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
examples/client_listener.rs (line 22)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.listener";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // subscribe to all topics
    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
    opc.await??;
    // handle incoming frames
    let rx = client.take_event_channel().unwrap();
    while let Ok(frame) = rx.recv().await {
        println!(
            "Frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}
Source

pub fn payload(&self) -> &[u8]

To keep zero-copy model, frames contain the full incoming buffer + actual payload position. Use this method to get the actual call payload.

Examples found in repository?
examples/broker_custom_rpc.rs (line 47)
41
42
43
44
45
46
47
48
49
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 67)
61
62
63
64
65
66
67
68
69
    async fn handle_frame(&self, frame: Frame) {
        println!(
            "Got non-RPC frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
examples/client_listener.rs (line 23)
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.listener";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let mut client = Client::connect(&config).await?;
    // subscribe to all topics
    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
    opc.await??;
    // handle incoming frames
    let rx = client.take_event_channel().unwrap();
    while let Ok(frame) = rx.recv().await {
        println!(
            "Frame from {}: {:?} {:?} {}",
            frame.sender(),
            frame.kind(),
            frame.topic(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}
examples/inter_thread.rs (line 55)
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // create a new broker instance
    let mut broker = Broker::new();
    // init the default broker RPC API, optional
    broker.init_default_core_rpc().await?;
    // spawn unix server for external clients
    broker
        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
        .await?;
    // worker 1 will send to worker2 direct "hello" message
    let mut client1 = broker.register_client("worker.1").await?;
    // worker 2 will listen to incoming frames only
    let mut client2 = broker.register_client("worker.2").await?;
    // worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
    // connect the broker via unix socket and receive them as well or send a message to "worker.2"
    // to print it
    let mut client3 = broker.register_client("worker.3").await?;
    let rx = client2.take_event_channel().unwrap();
    tokio::spawn(async move {
        loop {
            client1
                .send("worker.2", "hello".as_bytes().into(), QoS::No)
                .await
                .unwrap();
            sleep(SLEEP_STEP).await;
        }
    });
    tokio::spawn(async move {
        loop {
            client3
                .send_broadcast(
                    "worker.*",
                    "this is a broadcast message".as_bytes().into(),
                    QoS::No,
                )
                .await
                .unwrap();
            sleep(SLEEP_STEP).await;
        }
    });
    while let Ok(frame) = rx.recv().await {
        println!(
            "{}: {}",
            frame.sender(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}
Source

pub fn header(&self) -> Option<&[u8]>

The header can be used by certain implementations (e.g. the default RPC layer) to keep zero-copy model. The header is None for IPC communications, but filled for inter-thread ones. A custom layer should use/parse the header to avoid unnecessary payload copy

Source

pub fn is_realtime(&self) -> bool

Trait Implementations§

Source§

impl Debug for FrameData

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.