FrameData

Struct FrameData 

Source
pub struct FrameData { /* private fields */ }

Implementations§

Source§

impl FrameData

Source

pub fn new( kind: FrameKind, sender: Option<String>, topic: Option<String>, header: Option<Vec<u8>>, buf: Vec<u8>, payload_pos: usize, realtime: bool, ) -> Self

Source

pub fn new_nop() -> Self

Source

pub fn kind(&self) -> FrameKind

Examples found in repository?
examples/broker_custom_rpc.rs (line 45)
41    async fn handle_frame(&self, frame: Frame) {
42        println!(
43            "Got non-RPC frame from {}: {:?} {:?} {}",
44            frame.sender(),
45            frame.kind(),
46            frame.topic(),
47            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
48        );
49    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 65)
61    async fn handle_frame(&self, frame: Frame) {
62        println!(
63            "Got non-RPC frame from {}: {:?} {:?} {}",
64            frame.sender(),
65            frame.kind(),
66            frame.topic(),
67            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
68        );
69    }
examples/client_listener.rs (line 21)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8    let name = "test.client.listener";
9    // create a new client instance
10    let config = Config::new("/tmp/busrt.sock", name);
11    let mut client = Client::connect(&config).await?;
12    // subscribe to all topics
13    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
14    opc.await??;
15    // handle incoming frames
16    let rx = client.take_event_channel().unwrap();
17    while let Ok(frame) = rx.recv().await {
18        println!(
19            "Frame from {}: {:?} {:?} {}",
20            frame.sender(),
21            frame.kind(),
22            frame.topic(),
23            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
24        );
25    }
26    Ok(())
27}
Source

pub fn sender(&self) -> &str

§Panics

Will panic if called for a prepared frame

Examples found in repository?
examples/broker_custom_rpc.rs (line 44)
41    async fn handle_frame(&self, frame: Frame) {
42        println!(
43            "Got non-RPC frame from {}: {:?} {:?} {}",
44            frame.sender(),
45            frame.kind(),
46            frame.topic(),
47            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
48        );
49    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 64)
61    async fn handle_frame(&self, frame: Frame) {
62        println!(
63            "Got non-RPC frame from {}: {:?} {:?} {}",
64            frame.sender(),
65            frame.kind(),
66            frame.topic(),
67            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
68        );
69    }
examples/client_listener.rs (line 20)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8    let name = "test.client.listener";
9    // create a new client instance
10    let config = Config::new("/tmp/busrt.sock", name);
11    let mut client = Client::connect(&config).await?;
12    // subscribe to all topics
13    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
14    opc.await??;
15    // handle incoming frames
16    let rx = client.take_event_channel().unwrap();
17    while let Ok(frame) = rx.recv().await {
18        println!(
19            "Frame from {}: {:?} {:?} {}",
20            frame.sender(),
21            frame.kind(),
22            frame.topic(),
23            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
24        );
25    }
26    Ok(())
27}
examples/inter_thread.rs (line 54)
11async fn main() -> Result<(), Box<dyn std::error::Error>> {
12    // create a new broker instance
13    let mut broker = Broker::new();
14    // init the default broker RPC API, optional
15    broker.init_default_core_rpc().await?;
16    // spawn unix server for external clients
17    broker
18        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
19        .await?;
20    // worker 1 will send to worker2 direct "hello" message
21    let mut client1 = broker.register_client("worker.1").await?;
22    // worker 2 will listen to incoming frames only
23    let mut client2 = broker.register_client("worker.2").await?;
24    // worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
25    // connect the broker via unix socket and receive them as well or send a message to "worker.2"
26    // to print it
27    let mut client3 = broker.register_client("worker.3").await?;
28    let rx = client2.take_event_channel().unwrap();
29    tokio::spawn(async move {
30        loop {
31            client1
32                .send("worker.2", "hello".as_bytes().into(), QoS::No)
33                .await
34                .unwrap();
35            sleep(SLEEP_STEP).await;
36        }
37    });
38    tokio::spawn(async move {
39        loop {
40            client3
41                .send_broadcast(
42                    "worker.*",
43                    "this is a broadcast message".as_bytes().into(),
44                    QoS::No,
45                )
46                .await
47                .unwrap();
48            sleep(SLEEP_STEP).await;
49        }
50    });
51    while let Ok(frame) = rx.recv().await {
52        println!(
53            "{}: {}",
54            frame.sender(),
55            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
56        );
57    }
58    Ok(())
59}
Source

pub fn primary_sender(&self) -> &str

§Panics

Will panic if called for a prepared frame

Source

pub fn topic(&self) -> Option<&str>

Filled for pub/sub communications

Examples found in repository?
examples/broker_custom_rpc.rs (line 46)
41    async fn handle_frame(&self, frame: Frame) {
42        println!(
43            "Got non-RPC frame from {}: {:?} {:?} {}",
44            frame.sender(),
45            frame.kind(),
46            frame.topic(),
47            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
48        );
49    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 66)
61    async fn handle_frame(&self, frame: Frame) {
62        println!(
63            "Got non-RPC frame from {}: {:?} {:?} {}",
64            frame.sender(),
65            frame.kind(),
66            frame.topic(),
67            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
68        );
69    }
examples/client_listener.rs (line 22)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8    let name = "test.client.listener";
9    // create a new client instance
10    let config = Config::new("/tmp/busrt.sock", name);
11    let mut client = Client::connect(&config).await?;
12    // subscribe to all topics
13    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
14    opc.await??;
15    // handle incoming frames
16    let rx = client.take_event_channel().unwrap();
17    while let Ok(frame) = rx.recv().await {
18        println!(
19            "Frame from {}: {:?} {:?} {}",
20            frame.sender(),
21            frame.kind(),
22            frame.topic(),
23            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
24        );
25    }
26    Ok(())
27}
Source

pub fn payload(&self) -> &[u8]

To keep zero-copy model, frames contain the full incoming buffer + actual payload position. Use this method to get the actual call payload.

Examples found in repository?
examples/broker_custom_rpc.rs (line 47)
41    async fn handle_frame(&self, frame: Frame) {
42        println!(
43            "Got non-RPC frame from {}: {:?} {:?} {}",
44            frame.sender(),
45            frame.kind(),
46            frame.topic(),
47            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
48        );
49    }
More examples
Hide additional examples
examples/client_rpc_handler.rs (line 67)
61    async fn handle_frame(&self, frame: Frame) {
62        println!(
63            "Got non-RPC frame from {}: {:?} {:?} {}",
64            frame.sender(),
65            frame.kind(),
66            frame.topic(),
67            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
68        );
69    }
examples/client_listener.rs (line 23)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8    let name = "test.client.listener";
9    // create a new client instance
10    let config = Config::new("/tmp/busrt.sock", name);
11    let mut client = Client::connect(&config).await?;
12    // subscribe to all topics
13    let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
14    opc.await??;
15    // handle incoming frames
16    let rx = client.take_event_channel().unwrap();
17    while let Ok(frame) = rx.recv().await {
18        println!(
19            "Frame from {}: {:?} {:?} {}",
20            frame.sender(),
21            frame.kind(),
22            frame.topic(),
23            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
24        );
25    }
26    Ok(())
27}
examples/inter_thread.rs (line 55)
11async fn main() -> Result<(), Box<dyn std::error::Error>> {
12    // create a new broker instance
13    let mut broker = Broker::new();
14    // init the default broker RPC API, optional
15    broker.init_default_core_rpc().await?;
16    // spawn unix server for external clients
17    broker
18        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
19        .await?;
20    // worker 1 will send to worker2 direct "hello" message
21    let mut client1 = broker.register_client("worker.1").await?;
22    // worker 2 will listen to incoming frames only
23    let mut client2 = broker.register_client("worker.2").await?;
24    // worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
25    // connect the broker via unix socket and receive them as well or send a message to "worker.2"
26    // to print it
27    let mut client3 = broker.register_client("worker.3").await?;
28    let rx = client2.take_event_channel().unwrap();
29    tokio::spawn(async move {
30        loop {
31            client1
32                .send("worker.2", "hello".as_bytes().into(), QoS::No)
33                .await
34                .unwrap();
35            sleep(SLEEP_STEP).await;
36        }
37    });
38    tokio::spawn(async move {
39        loop {
40            client3
41                .send_broadcast(
42                    "worker.*",
43                    "this is a broadcast message".as_bytes().into(),
44                    QoS::No,
45                )
46                .await
47                .unwrap();
48            sleep(SLEEP_STEP).await;
49        }
50    });
51    while let Ok(frame) = rx.recv().await {
52        println!(
53            "{}: {}",
54            frame.sender(),
55            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
56        );
57    }
58    Ok(())
59}
Source

pub fn header(&self) -> Option<&[u8]>

The header can be used by certain implementations (e.g. the default RPC layer) to keep zero-copy model. The header is None for IPC communications, but filled for inter-thread ones. A custom layer should use/parse the header to avoid unnecessary payload copy

Source

pub fn is_realtime(&self) -> bool

Trait Implementations§

Source§

impl Debug for FrameData

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more