pub struct LiveDataStream<R: Read + Unpin, W: Write + Unpin> { /* private fields */ }
Expand description
A Stream
/Sink
wrapper for RESOL VBus Data
items encoded in the
live / wire representation.
It also contains methods to communicate with a VBus device to get or set values etc.
Implementations§
Source§impl<R: Read + Unpin, W: Write + Unpin> LiveDataStream<R, W>
impl<R: Read + Unpin, W: Write + Unpin> LiveDataStream<R, W>
Sourcepub fn new(
reader: R,
writer: W,
channel: u8,
self_address: u16,
) -> LiveDataStream<R, W>
pub fn new( reader: R, writer: W, channel: u8, self_address: u16, ) -> LiveDataStream<R, W>
Create a new LiveDataStream
.
Examples found in repository?
9fn main() -> Result<()> {
10 async_std::task::block_on(async {
11 // Create an recording file and hand it to a `RecordingWriter`
12 let file = File::create("test.vbus")?;
13 let mut rw = RecordingWriter::new(file);
14
15 // Parse the address of the DL2 to connect to
16 let addr = "192.168.5.217:7053".parse::<SocketAddr>()?;
17
18 let stream = TcpStream::connect(addr).await?;
19
20 let mut hs = TcpClientHandshake::start(stream).await?;
21 hs.send_pass_command("vbus").await?;
22 let stream = hs.send_data_command().await?;
23
24 let (reader, writer) = (&stream, &stream);
25
26 let mut stream = LiveDataStream::new(reader, writer, 0, 0x0020);
27
28 while let Some(data) = stream.receive_any_data(60000).await? {
29 println!("{}", data.id_string());
30
31 // Add `Data` value into `DataSet` to be stored
32 let mut data_set = DataSet::new();
33 data_set.timestamp = data.as_ref().timestamp;
34 data_set.add_data(data);
35
36 // Write the `DataSet` into the `RecordingWriter` for permanent storage
37 rw.write_data_set(&data_set)
38 .expect("Unable to write data set");
39 }
40
41 Ok(())
42 })
43}
Sourcepub fn into_inner(self) -> (R, W)
pub fn into_inner(self) -> (R, W)
Consume self
and return the underlying I/O pair.
Sourcepub async fn receive<F>(
&mut self,
timeout_ms: u64,
filter: F,
) -> Result<Option<Data>>
pub async fn receive<F>( &mut self, timeout_ms: u64, filter: F, ) -> Result<Option<Data>>
Receive data from the VBus.
This methods waits for timeout_ms
milliseconds for incoming
VBus data. Every time a valid Data
is received over the VBus
the filter
function is called with that Data
as its argument.
The function returns a bool
whether the provided Data
is the
data it was waiting for.
If the filter
function returns true
, the respective Data
is used to return from the receive
method.
If the filter
function did not find the matching data within
timeout_ms
milliseconds, the receive
method returns with
None
.
Sourcepub async fn transceive<F>(
&mut self,
tx_data: Data,
max_tries: usize,
initial_timeout_ms: u64,
timeout_increment_ms: u64,
filter: F,
) -> Result<Option<Data>>
pub async fn transceive<F>( &mut self, tx_data: Data, max_tries: usize, initial_timeout_ms: u64, timeout_increment_ms: u64, filter: F, ) -> Result<Option<Data>>
Send data to the VBus and wait for a reply.
This method sends the tx_data
to the VBus and waits for up to
initial_timeout_ms
milliseconds for a reply.
Every time a valid Data
is received over the VBus the filter
function is called with that Data
as its argument. The function
returns a bool
whether the provided Data
is the reply it was
waiting for.
If the filter
function returns true
, the respective Data
is used to return from the transceive
method.
If the filter
function did not find the matching reply within
initial_timeout_ms
milliseconds, the tx_data
is send again up
max_tries
times, increasing the timeout by timeout_increment_ms
milliseconds every time.
After max_tries
without a matching reply the transceive
method
returns with None
.
Sourcepub async fn receive_any_data(
&mut self,
timeout_ms: u64,
) -> Result<Option<Data>>
pub async fn receive_any_data( &mut self, timeout_ms: u64, ) -> Result<Option<Data>>
Wait for any VBus data.
Examples found in repository?
9fn main() -> Result<()> {
10 async_std::task::block_on(async {
11 // Create an recording file and hand it to a `RecordingWriter`
12 let file = File::create("test.vbus")?;
13 let mut rw = RecordingWriter::new(file);
14
15 // Parse the address of the DL2 to connect to
16 let addr = "192.168.5.217:7053".parse::<SocketAddr>()?;
17
18 let stream = TcpStream::connect(addr).await?;
19
20 let mut hs = TcpClientHandshake::start(stream).await?;
21 hs.send_pass_command("vbus").await?;
22 let stream = hs.send_data_command().await?;
23
24 let (reader, writer) = (&stream, &stream);
25
26 let mut stream = LiveDataStream::new(reader, writer, 0, 0x0020);
27
28 while let Some(data) = stream.receive_any_data(60000).await? {
29 println!("{}", data.id_string());
30
31 // Add `Data` value into `DataSet` to be stored
32 let mut data_set = DataSet::new();
33 data_set.timestamp = data.as_ref().timestamp;
34 data_set.add_data(data);
35
36 // Write the `DataSet` into the `RecordingWriter` for permanent storage
37 rw.write_data_set(&data_set)
38 .expect("Unable to write data set");
39 }
40
41 Ok(())
42 })
43}
Sourcepub async fn wait_for_free_bus(&mut self) -> Result<Option<Datagram>>
pub async fn wait_for_free_bus(&mut self) -> Result<Option<Datagram>>
Wait for a datagram that offers VBus control.
Sourcepub async fn release_bus(&mut self, address: u16) -> Result<Option<Data>>
pub async fn release_bus(&mut self, address: u16) -> Result<Option<Data>>
Give back bus control to the regular VBus master.
Sourcepub async fn get_value_by_index(
&mut self,
address: u16,
index: i16,
subindex: u8,
) -> Result<Option<Datagram>>
pub async fn get_value_by_index( &mut self, address: u16, index: i16, subindex: u8, ) -> Result<Option<Datagram>>
Get a value by its index.
Sourcepub async fn set_value_by_index(
&mut self,
address: u16,
index: i16,
subindex: u8,
value: i32,
) -> Result<Option<Datagram>>
pub async fn set_value_by_index( &mut self, address: u16, index: i16, subindex: u8, value: i32, ) -> Result<Option<Datagram>>
Set a value by its index.
Sourcepub async fn get_value_id_hash_by_index(
&mut self,
address: u16,
index: i16,
) -> Result<Option<Datagram>>
pub async fn get_value_id_hash_by_index( &mut self, address: u16, index: i16, ) -> Result<Option<Datagram>>
Get a value’s ID hash by its index.
Sourcepub async fn get_value_index_by_id_hash(
&mut self,
address: u16,
id_hash: i32,
) -> Result<Option<Datagram>>
pub async fn get_value_index_by_id_hash( &mut self, address: u16, id_hash: i32, ) -> Result<Option<Datagram>>
Get a value’s index by its ID hash.
Sourcepub async fn get_caps1(&mut self, address: u16) -> Result<Option<Datagram>>
pub async fn get_caps1(&mut self, address: u16) -> Result<Option<Datagram>>
Get the capabilities (part 1) from a VBus device.
Sourcepub async fn begin_bulk_value_transaction(
&mut self,
address: u16,
tx_timeout: i32,
) -> Result<Option<Datagram>>
pub async fn begin_bulk_value_transaction( &mut self, address: u16, tx_timeout: i32, ) -> Result<Option<Datagram>>
Begin a bulk value transaction.
Sourcepub async fn commit_bulk_value_transaction(
&mut self,
address: u16,
) -> Result<Option<Datagram>>
pub async fn commit_bulk_value_transaction( &mut self, address: u16, ) -> Result<Option<Datagram>>
Commit a bulk value transaction.