pub struct FrameData { /* private fields */ }Implementations§
Source§impl FrameData
impl FrameData
pub fn new( kind: FrameKind, sender: Option<String>, topic: Option<String>, header: Option<Vec<u8>>, buf: Vec<u8>, payload_pos: usize, realtime: bool, ) -> Self
pub fn new_nop() -> Self
Sourcepub fn kind(&self) -> FrameKind
pub fn kind(&self) -> FrameKind
Examples found in repository?
More examples
examples/client_listener.rs (line 21)
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.listener";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// subscribe to all topics
let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
opc.await??;
// handle incoming frames
let rx = client.take_event_channel().unwrap();
while let Ok(frame) = rx.recv().await {
println!(
"Frame from {}: {:?} {:?} {}",
frame.sender(),
frame.kind(),
frame.topic(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}Sourcepub fn sender(&self) -> &str
pub fn sender(&self) -> &str
§Panics
Will panic if called for a prepared frame
Examples found in repository?
More examples
examples/client_listener.rs (line 20)
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.listener";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// subscribe to all topics
let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
opc.await??;
// handle incoming frames
let rx = client.take_event_channel().unwrap();
while let Ok(frame) = rx.recv().await {
println!(
"Frame from {}: {:?} {:?} {}",
frame.sender(),
frame.kind(),
frame.topic(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}examples/inter_thread.rs (line 54)
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// create a new broker instance
let mut broker = Broker::new();
// init the default broker RPC API, optional
broker.init_default_core_rpc().await?;
// spawn unix server for external clients
broker
.spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
.await?;
// worker 1 will send to worker2 direct "hello" message
let mut client1 = broker.register_client("worker.1").await?;
// worker 2 will listen to incoming frames only
let mut client2 = broker.register_client("worker.2").await?;
// worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
// connect the broker via unix socket and receive them as well or send a message to "worker.2"
// to print it
let mut client3 = broker.register_client("worker.3").await?;
let rx = client2.take_event_channel().unwrap();
tokio::spawn(async move {
loop {
client1
.send("worker.2", "hello".as_bytes().into(), QoS::No)
.await
.unwrap();
sleep(SLEEP_STEP).await;
}
});
tokio::spawn(async move {
loop {
client3
.send_broadcast(
"worker.*",
"this is a broadcast message".as_bytes().into(),
QoS::No,
)
.await
.unwrap();
sleep(SLEEP_STEP).await;
}
});
while let Ok(frame) = rx.recv().await {
println!(
"{}: {}",
frame.sender(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}Sourcepub fn primary_sender(&self) -> &str
pub fn primary_sender(&self) -> &str
§Panics
Will panic if called for a prepared frame
Sourcepub fn topic(&self) -> Option<&str>
pub fn topic(&self) -> Option<&str>
Filled for pub/sub communications
Examples found in repository?
More examples
examples/client_listener.rs (line 22)
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.listener";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// subscribe to all topics
let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
opc.await??;
// handle incoming frames
let rx = client.take_event_channel().unwrap();
while let Ok(frame) = rx.recv().await {
println!(
"Frame from {}: {:?} {:?} {}",
frame.sender(),
frame.kind(),
frame.topic(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}Sourcepub fn payload(&self) -> &[u8] ⓘ
pub fn payload(&self) -> &[u8] ⓘ
To keep zero-copy model, frames contain the full incoming buffer + actual payload position. Use this method to get the actual call payload.
Examples found in repository?
More examples
examples/client_listener.rs (line 23)
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let name = "test.client.listener";
// create a new client instance
let config = Config::new("/tmp/busrt.sock", name);
let mut client = Client::connect(&config).await?;
// subscribe to all topics
let opc = client.subscribe("#", QoS::Processed).await?.expect("no op");
opc.await??;
// handle incoming frames
let rx = client.take_event_channel().unwrap();
while let Ok(frame) = rx.recv().await {
println!(
"Frame from {}: {:?} {:?} {}",
frame.sender(),
frame.kind(),
frame.topic(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}examples/inter_thread.rs (line 55)
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// create a new broker instance
let mut broker = Broker::new();
// init the default broker RPC API, optional
broker.init_default_core_rpc().await?;
// spawn unix server for external clients
broker
.spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
.await?;
// worker 1 will send to worker2 direct "hello" message
let mut client1 = broker.register_client("worker.1").await?;
// worker 2 will listen to incoming frames only
let mut client2 = broker.register_client("worker.2").await?;
// worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
// connect the broker via unix socket and receive them as well or send a message to "worker.2"
// to print it
let mut client3 = broker.register_client("worker.3").await?;
let rx = client2.take_event_channel().unwrap();
tokio::spawn(async move {
loop {
client1
.send("worker.2", "hello".as_bytes().into(), QoS::No)
.await
.unwrap();
sleep(SLEEP_STEP).await;
}
});
tokio::spawn(async move {
loop {
client3
.send_broadcast(
"worker.*",
"this is a broadcast message".as_bytes().into(),
QoS::No,
)
.await
.unwrap();
sleep(SLEEP_STEP).await;
}
});
while let Ok(frame) = rx.recv().await {
println!(
"{}: {}",
frame.sender(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}Sourcepub fn header(&self) -> Option<&[u8]>
pub fn header(&self) -> Option<&[u8]>
The header can be used by certain implementations (e.g. the default RPC layer) to keep zero-copy model. The header is None for IPC communications, but filled for inter-thread ones. A custom layer should use/parse the header to avoid unnecessary payload copy
pub fn is_realtime(&self) -> bool
Trait Implementations§
Auto Trait Implementations§
impl Freeze for FrameData
impl RefUnwindSafe for FrameData
impl Send for FrameData
impl Sync for FrameData
impl Unpin for FrameData
impl UnwindSafe for FrameData
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more