async_resol_vbus/
live_data_stream.rs

1use std::{marker::Unpin, time::Duration};
2
3use async_std::{
4    io::{Read, Write},
5    prelude::*,
6};
7
8use resol_vbus::{chrono::Utc, live_data_encoder, Data, Datagram, Header, LiveDataBuffer};
9
10use crate::error::Result;
11
12fn try_as_datagram(data: &Data) -> Option<&Datagram> {
13    if data.is_datagram() {
14        Some(data.as_datagram())
15    } else {
16        None
17    }
18}
19
20/// A `Stream`/`Sink` wrapper for RESOL VBus `Data` items encoded in the
21/// live / wire representation.
22///
23/// It also contains methods to communicate with a VBus device to get or set
24/// values etc.
25#[derive(Debug)]
26pub struct LiveDataStream<R: Read + Unpin, W: Write + Unpin> {
27    reader: R,
28    writer: W,
29    channel: u8,
30    self_address: u16,
31    buf: LiveDataBuffer,
32}
33
34impl<R: Read + Unpin, W: Write + Unpin> LiveDataStream<R, W> {
35    /// Create a new `LiveDataStream`.
36    pub fn new(reader: R, writer: W, channel: u8, self_address: u16) -> LiveDataStream<R, W> {
37        LiveDataStream {
38            reader,
39            writer,
40            channel,
41            self_address,
42            buf: LiveDataBuffer::new(channel),
43        }
44    }
45
46    /// Consume `self` and return the underlying I/O pair.
47    pub fn into_inner(self) -> (R, W) {
48        (self.reader, self.writer)
49    }
50
51    fn create_datagram(
52        &self,
53        destination_address: u16,
54        command: u16,
55        param16: i16,
56        param32: i32,
57    ) -> Datagram {
58        Datagram {
59            header: Header {
60                timestamp: Utc::now(),
61                channel: self.channel,
62                destination_address,
63                source_address: self.self_address,
64                protocol_version: 0x20,
65            },
66            command,
67            param16,
68            param32,
69        }
70    }
71
72    async fn transceive_internal<F>(
73        &mut self,
74        tx_data: Option<Data>,
75        max_tries: usize,
76        initial_timeout_ms: u64,
77        timeout_increment_ms: u64,
78        filter: F,
79    ) -> Result<Option<Data>>
80    where
81        F: Fn(&Data) -> bool,
82    {
83        let tx_data = match tx_data {
84            Some(ref data) => {
85                let len = live_data_encoder::length_from_data(data);
86                let mut bytes = vec![0u8; len];
87                live_data_encoder::bytes_from_data(data, &mut bytes);
88                Some(bytes)
89            }
90            None => None,
91        };
92
93        let mut current_try = 0;
94        let mut current_timeout_ms = initial_timeout_ms;
95
96        let result = loop {
97            if current_try >= max_tries {
98                break None;
99            }
100
101            if let Some(ref tx_data) = tx_data {
102                self.writer.write_all(tx_data).await?;
103            }
104
105            let result = async_std::io::timeout(Duration::from_millis(current_timeout_ms), async {
106                loop {
107                    let data = loop {
108                        if let Some(data) = self.buf.read_data() {
109                            if filter(&data) {
110                                break Some(data);
111                            }
112                        } else {
113                            break None;
114                        }
115                    };
116
117                    if let Some(data) = data {
118                        break Ok(Some(data));
119                    }
120
121                    let mut buf = [0u8; 256];
122                    let len = self.reader.read(&mut buf).await?;
123                    if len == 0 {
124                        break Ok(None);
125                    }
126
127                    self.buf.extend_from_slice(&buf[0..len]);
128                }
129            })
130            .await;
131
132            if let Ok(data) = result {
133                break data;
134            }
135
136            current_try += 1;
137            current_timeout_ms += timeout_increment_ms;
138        };
139
140        Ok(result)
141    }
142
143    /// Receive data from the VBus.
144    ///
145    /// This methods waits for `timeout_ms` milliseconds for incoming
146    /// VBus data. Every time a valid `Data` is received over the VBus
147    /// the `filter` function is called with that `Data` as its argument.
148    /// The function returns a `bool` whether the provided `Data` is the
149    /// data it was waiting for.
150    ///
151    /// If the `filter` function returns `true`, the respective `Data`
152    /// is used to return from the `receive` method.
153    ///
154    /// If the `filter` function did not find the matching data within
155    /// `timeout_ms` milliseconds, the `receive` method returns with
156    /// `None`.
157    pub async fn receive<F>(&mut self, timeout_ms: u64, filter: F) -> Result<Option<Data>>
158    where
159        F: Fn(&Data) -> bool,
160    {
161        self.transceive_internal(None, 1, timeout_ms, 0, filter)
162            .await
163    }
164
165    /// Send data to the VBus and wait for a reply.
166    ///
167    /// This method sends the `tx_data` to the VBus and waits for up to
168    /// `initial_timeout_ms` milliseconds for a reply.
169    ///
170    /// Every time a valid `Data` is received over the VBus the `filter`
171    /// function is called with that `Data` as its argument. The function
172    /// returns a `bool` whether the provided `Data` is the reply it was
173    /// waiting for.
174    ///
175    /// If the `filter` function returns `true`, the respective `Data`
176    /// is used to return from the `transceive` method.
177    ///
178    /// If the `filter` function did not find the matching reply within
179    /// `initial_timeout_ms` milliseconds, the `tx_data` is send again up
180    /// `max_tries` times, increasing the timeout by `timeout_increment_ms`
181    /// milliseconds every time.
182    ///
183    /// After `max_tries` without a matching reply the `transceive` method
184    /// returns with `None`.
185    pub async fn transceive<F>(
186        &mut self,
187        tx_data: Data,
188        max_tries: usize,
189        initial_timeout_ms: u64,
190        timeout_increment_ms: u64,
191        filter: F,
192    ) -> Result<Option<Data>>
193    where
194        F: Fn(&Data) -> bool,
195    {
196        self.transceive_internal(
197            Some(tx_data),
198            max_tries,
199            initial_timeout_ms,
200            timeout_increment_ms,
201            filter,
202        )
203        .await
204    }
205
206    /// Wait for any VBus data.
207    pub async fn receive_any_data(&mut self, timeout_ms: u64) -> Result<Option<Data>> {
208        self.receive(timeout_ms, |_| true).await
209    }
210
211    /// Wait for a datagram that offers VBus control.
212    pub async fn wait_for_free_bus(&mut self) -> Result<Option<Datagram>> {
213        let rx_data = self
214            .receive(20000, |data| {
215                if let Some(dgram) = try_as_datagram(data) {
216                    if dgram.command != 0x0500 {
217                        false
218                    } else {
219                        true
220                    }
221                } else {
222                    false
223                }
224            })
225            .await?;
226
227        Ok(rx_data.map(|data| data.into_datagram()))
228    }
229
230    /// Give back bus control to the regular VBus master.
231    pub async fn release_bus(&mut self, address: u16) -> Result<Option<Data>> {
232        let tx_dgram = self.create_datagram(address, 0x0600, 0, 0);
233
234        let tx_data = Data::Datagram(tx_dgram);
235
236        let rx_data = self
237            .transceive(tx_data, 2, 2500, 2500, |data| data.is_packet())
238            .await?;
239
240        Ok(rx_data)
241    }
242
243    /// Get a value by its index.
244    pub async fn get_value_by_index(
245        &mut self,
246        address: u16,
247        index: i16,
248        subindex: u8,
249    ) -> Result<Option<Datagram>> {
250        let tx_dgram = self.create_datagram(address, 0x0300 | u16::from(subindex), index, 0);
251
252        let tx_data = Data::Datagram(tx_dgram.clone());
253
254        let rx_data = self
255            .transceive(tx_data, 3, 500, 500, |data| {
256                if let Some(dgram) = try_as_datagram(data) {
257                    if dgram.header.source_address != tx_dgram.header.destination_address {
258                        false
259                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
260                        false
261                    } else if dgram.command != (0x0100 | u16::from(subindex)) {
262                        false
263                    } else if dgram.param16 != tx_dgram.param16 {
264                        false
265                    } else {
266                        true
267                    }
268                } else {
269                    false
270                }
271            })
272            .await?;
273
274        Ok(rx_data.map(|data| data.into_datagram()))
275    }
276
277    /// Set a value by its index.
278    pub async fn set_value_by_index(
279        &mut self,
280        address: u16,
281        index: i16,
282        subindex: u8,
283        value: i32,
284    ) -> Result<Option<Datagram>> {
285        let tx_dgram = self.create_datagram(address, 0x0200 | u16::from(subindex), index, value);
286
287        let tx_data = Data::Datagram(tx_dgram.clone());
288
289        let rx_data = self
290            .transceive(tx_data, 3, 500, 500, |data| {
291                if let Some(dgram) = try_as_datagram(data) {
292                    if dgram.header.source_address != tx_dgram.header.destination_address {
293                        false
294                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
295                        false
296                    } else if dgram.command != (0x0100 | u16::from(subindex)) {
297                        false
298                    } else if dgram.param16 != tx_dgram.param16 {
299                        false
300                    } else {
301                        true
302                    }
303                } else {
304                    false
305                }
306            })
307            .await?;
308
309        Ok(rx_data.map(|data| data.into_datagram()))
310    }
311
312    /// Get a value's ID hash by its index.
313    pub async fn get_value_id_hash_by_index(
314        &mut self,
315        address: u16,
316        index: i16,
317    ) -> Result<Option<Datagram>> {
318        let tx_dgram = self.create_datagram(address, 0x1000, index, 0);
319
320        let tx_data = Data::Datagram(tx_dgram.clone());
321
322        let rx_data = self
323            .transceive(tx_data, 3, 500, 500, |data| {
324                if let Some(dgram) = try_as_datagram(data) {
325                    if dgram.header.source_address != tx_dgram.header.destination_address {
326                        false
327                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
328                        false
329                    } else if dgram.command != 0x0100 {
330                        false
331                    } else if dgram.param16 != tx_dgram.param16 {
332                        false
333                    } else {
334                        true
335                    }
336                } else {
337                    false
338                }
339            })
340            .await?;
341
342        Ok(rx_data.map(|data| data.into_datagram()))
343    }
344
345    /// Get a value's index by its ID hash.
346    pub async fn get_value_index_by_id_hash(
347        &mut self,
348        address: u16,
349        id_hash: i32,
350    ) -> Result<Option<Datagram>> {
351        let tx_dgram = self.create_datagram(address, 0x1100, 0, id_hash);
352
353        let tx_data = Data::Datagram(tx_dgram.clone());
354
355        let rx_data = self
356            .transceive(tx_data, 3, 500, 500, |data| {
357                if let Some(dgram) = try_as_datagram(data) {
358                    if dgram.header.source_address != tx_dgram.header.destination_address {
359                        false
360                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
361                        false
362                    } else if dgram.command != 0x0100 {
363                        false
364                    } else if dgram.param32 != tx_dgram.param32 {
365                        false
366                    } else {
367                        true
368                    }
369                } else {
370                    false
371                }
372            })
373            .await?;
374
375        Ok(rx_data.map(|data| data.into_datagram()))
376    }
377
378    /// Get the capabilities (part 1) from a VBus device.
379    pub async fn get_caps1(&mut self, address: u16) -> Result<Option<Datagram>> {
380        let tx_dgram = self.create_datagram(address, 0x1300, 0, 0);
381
382        let tx_data = Data::Datagram(tx_dgram.clone());
383
384        let rx_data = self
385            .transceive(tx_data, 3, 500, 500, |data| {
386                if let Data::Datagram(ref dgram) = *data {
387                    if dgram.header.source_address != tx_dgram.header.destination_address {
388                        false
389                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
390                        false
391                    } else if dgram.command != 0x1301 {
392                        false
393                    } else {
394                        true
395                    }
396                } else {
397                    false
398                }
399            })
400            .await?;
401
402        Ok(rx_data.map(|data| data.into_datagram()))
403    }
404
405    /// Begin a bulk value transaction.
406    pub async fn begin_bulk_value_transaction(
407        &mut self,
408        address: u16,
409        tx_timeout: i32,
410    ) -> Result<Option<Datagram>> {
411        let tx_dgram = self.create_datagram(address, 0x1400, 0, tx_timeout);
412
413        let tx_data = Data::Datagram(tx_dgram.clone());
414
415        let rx_data = self
416            .transceive(tx_data, 3, 500, 500, |data| {
417                if let Data::Datagram(ref dgram) = *data {
418                    if dgram.header.source_address != tx_dgram.header.destination_address {
419                        false
420                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
421                        false
422                    } else if dgram.command != 0x1401 {
423                        false
424                    } else {
425                        true
426                    }
427                } else {
428                    false
429                }
430            })
431            .await?;
432
433        Ok(rx_data.map(|data| data.into_datagram()))
434    }
435
436    /// Commit a bulk value transaction.
437    pub async fn commit_bulk_value_transaction(
438        &mut self,
439        address: u16,
440    ) -> Result<Option<Datagram>> {
441        let tx_dgram = self.create_datagram(address, 0x1402, 0, 0);
442
443        let tx_data = Data::Datagram(tx_dgram.clone());
444
445        let rx_data = self
446            .transceive(tx_data, 3, 500, 500, |data| {
447                if let Data::Datagram(ref dgram) = *data {
448                    if dgram.header.source_address != tx_dgram.header.destination_address {
449                        false
450                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
451                        false
452                    } else if dgram.command != 0x1403 {
453                        false
454                    } else {
455                        true
456                    }
457                } else {
458                    false
459                }
460            })
461            .await?;
462
463        Ok(rx_data.map(|data| data.into_datagram()))
464    }
465
466    /// Rollback a bulk value transaction.
467    pub async fn rollback_bulk_value_transaction(
468        &mut self,
469        address: u16,
470    ) -> Result<Option<Datagram>> {
471        let tx_dgram = self.create_datagram(address, 0x1404, 0, 0);
472
473        let tx_data = Data::Datagram(tx_dgram.clone());
474
475        let rx_data = self
476            .transceive(tx_data, 3, 500, 500, |data| {
477                if let Data::Datagram(ref dgram) = *data {
478                    if dgram.header.source_address != tx_dgram.header.destination_address {
479                        false
480                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
481                        false
482                    } else if dgram.command != 0x1405 {
483                        false
484                    } else {
485                        true
486                    }
487                } else {
488                    false
489                }
490            })
491            .await?;
492
493        Ok(rx_data.map(|data| data.into_datagram()))
494    }
495
496    /// Set a value by its index while inside a bulk value transaction.
497    pub async fn set_bulk_value_by_index(
498        &mut self,
499        address: u16,
500        index: i16,
501        subindex: u8,
502        value: i32,
503    ) -> Result<Option<Datagram>> {
504        let tx_dgram = self.create_datagram(address, 0x1500 | u16::from(subindex), index, value);
505
506        let tx_data = Data::Datagram(tx_dgram.clone());
507
508        let rx_data = self
509            .transceive(tx_data, 3, 500, 500, |data| {
510                if let Data::Datagram(ref dgram) = *data {
511                    if dgram.header.source_address != tx_dgram.header.destination_address {
512                        false
513                    } else if dgram.header.destination_address != tx_dgram.header.source_address {
514                        false
515                    } else if dgram.command != (0x1600 | u16::from(subindex)) {
516                        false
517                    } else if dgram.param16 != tx_dgram.param16 {
518                        false
519                    } else {
520                        true
521                    }
522                } else {
523                    false
524                }
525            })
526            .await?;
527
528        Ok(rx_data.map(|data| data.into_datagram()))
529    }
530}
531
532#[cfg(test)]
533impl<R: Read + Unpin, W: Write + Unpin> LiveDataStream<R, W> {
534    pub fn writer_ref(&self) -> &W {
535        &self.writer
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use async_std::io::Cursor;
542
543    use resol_vbus::Packet;
544
545    use super::*;
546
547    fn extend_from_data(buf: &mut Vec<u8>, data: &Data) {
548        let len = live_data_encoder::length_from_data(data);
549        let idx = buf.len();
550        buf.resize(idx + len, 0);
551        live_data_encoder::bytes_from_data(data, &mut buf[idx..]);
552    }
553
554    fn extend_with_empty_packet(
555        buf: &mut Vec<u8>,
556        destination_address: u16,
557        source_address: u16,
558        command: u16,
559    ) {
560        let data = Data::Packet(Packet {
561            header: Header {
562                timestamp: Utc::now(),
563                channel: 0,
564                destination_address,
565                source_address,
566                protocol_version: 0x20,
567            },
568            command,
569            frame_count: 0,
570            frame_data: [0; 508],
571        });
572        extend_from_data(buf, &data);
573    }
574
575    fn extend_from_datagram(
576        buf: &mut Vec<u8>,
577        destination_address: u16,
578        source_address: u16,
579        command: u16,
580        param16: i16,
581        param32: i32,
582    ) {
583        let data = Data::Datagram(Datagram {
584            header: Header {
585                timestamp: Utc::now(),
586                channel: 0,
587                destination_address,
588                source_address,
589                protocol_version: 0x20,
590            },
591            command,
592            param16,
593            param32,
594        });
595        extend_from_data(buf, &data);
596    }
597
598    fn simulate_run<T, F: Future<Output = T>>(f: F) -> T {
599        async_std::task::block_on(f)
600    }
601
602    trait ToBytes {
603        fn to_bytes(&self) -> Vec<u8>;
604    }
605
606    fn hex_encode<T: ToBytes>(value: &T) -> String {
607        let buf = value.to_bytes();
608        buf.iter()
609            .map(|b| format!("{:02x}", b))
610            .collect::<Vec<String>>()
611            .concat()
612    }
613
614    impl ToBytes for Cursor<Vec<u8>> {
615        fn to_bytes(&self) -> Vec<u8> {
616            self.get_ref().clone()
617        }
618    }
619
620    impl ToBytes for Data {
621        fn to_bytes(&self) -> Vec<u8> {
622            let len = live_data_encoder::length_from_data(self);
623            let mut buf = Vec::new();
624            buf.resize(len, 0);
625            live_data_encoder::bytes_from_data(self, &mut buf);
626            buf
627        }
628    }
629
630    impl ToBytes for Datagram {
631        fn to_bytes(&self) -> Vec<u8> {
632            Data::Datagram(self.clone()).to_bytes()
633        }
634    }
635
636    #[test]
637    fn test_wait_for_free_bus() {
638        let mut rx_buf = Vec::new();
639        let tx_buf = Cursor::new(Vec::new());
640
641        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
642        extend_from_datagram(&mut rx_buf, 0x0000, 0x7E11, 0x0500, 0, 0);
643
644        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
645
646        let data = simulate_run(lds.wait_for_free_bus()).unwrap();
647
648        assert_eq!("", hex_encode(lds.writer_ref()));
649        assert_eq!(
650            "aa0000117e200005000000000000004b",
651            hex_encode(&data.unwrap())
652        );
653    }
654
655    #[test]
656    fn test_release_bus() {
657        let mut rx_buf = Vec::new();
658        let tx_buf = Cursor::new(Vec::new());
659
660        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0, 0);
661        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
662
663        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
664
665        let data = simulate_run(lds.release_bus(0x7E11)).unwrap();
666
667        assert_eq!(
668            "aa117e2000200006000000000000002a",
669            hex_encode(lds.writer_ref())
670        );
671        assert_eq!("aa1000117e100001004f", hex_encode(&data.unwrap()));
672    }
673
674    #[test]
675    fn test_get_value_by_index() {
676        let mut rx_buf = Vec::new();
677        let tx_buf = Cursor::new(Vec::new());
678
679        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
680        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0156, 0x1234, 0x789abcde);
681        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0156, 0x1234, 0x789abcde);
682        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0157, 0x1234, 0x789abcde);
683        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1235, 0x789abcde);
684        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1234, 0x789abcde);
685
686        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
687
688        let data = simulate_run(lds.get_value_by_index(0x7E11, 0x1234, 0x56)).unwrap();
689
690        assert_eq!(
691            "aa117e20002056033412000000000011",
692            hex_encode(lds.writer_ref())
693        );
694        assert_eq!(
695            "aa2000117e20560134125e3c1a781c4b",
696            hex_encode(&data.unwrap())
697        );
698    }
699
700    #[test]
701    fn test_set_value_by_index() {
702        let mut rx_buf = Vec::new();
703        let tx_buf = Cursor::new(Vec::new());
704
705        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
706        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0156, 0x1234, 0x789abcde);
707        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0156, 0x1234, 0x789abcde);
708        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0157, 0x1234, 0x789abcde);
709        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1235, 0x789abcde);
710        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1234, 0x789abcde);
711
712        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
713
714        let data = simulate_run(lds.set_value_by_index(0x7E11, 0x1234, 0x56, 0x789abcde)).unwrap();
715
716        assert_eq!(
717            "aa117e200020560234125e3c1a781c4a",
718            hex_encode(lds.writer_ref())
719        );
720        assert_eq!(
721            "aa2000117e20560134125e3c1a781c4b",
722            hex_encode(&data.unwrap())
723        );
724    }
725
726    #[test]
727    fn test_get_value_id_hash_by_index() {
728        let mut rx_buf = Vec::new();
729        let tx_buf = Cursor::new(Vec::new());
730
731        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
732        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0100, 0x1234, 0x789abcde);
733        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0100, 0x1234, 0x789abcde);
734        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0101, 0x1234, 0x789abcde);
735        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1235, 0x789abcde);
736        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1234, 0x789abcde);
737
738        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
739
740        let data = simulate_run(lds.get_value_id_hash_by_index(0x7E11, 0x1234)).unwrap();
741
742        assert_eq!(
743            "aa117e2000200010341200000000005a",
744            hex_encode(lds.writer_ref())
745        );
746        assert_eq!(
747            "aa2000117e20000134125e3c1a781c21",
748            hex_encode(&data.unwrap())
749        );
750    }
751
752    #[test]
753    fn test_get_value_index_by_id_hash() {
754        let mut rx_buf = Vec::new();
755        let tx_buf = Cursor::new(Vec::new());
756
757        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
758        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0100, 0x1234, 0x789abcde);
759        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0100, 0x1234, 0x789abcde);
760        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0101, 0x1234, 0x789abcde);
761        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1234, 0x789abcdf);
762        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1234, 0x789abcde);
763
764        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
765
766        let data = simulate_run(lds.get_value_index_by_id_hash(0x7E11, 0x789abcde)).unwrap();
767
768        assert_eq!(
769            "aa117e200020001100005e3c1a781c57",
770            hex_encode(lds.writer_ref())
771        );
772        assert_eq!(
773            "aa2000117e20000134125e3c1a781c21",
774            hex_encode(&data.unwrap())
775        );
776    }
777
778    #[test]
779    fn test_get_caps1() {
780        let mut rx_buf = Vec::new();
781        let tx_buf = Cursor::new(Vec::new());
782
783        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
784        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1301, 0, 0x789abcde);
785        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1301, 0, 0x789abcde);
786        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1300, 0, 0x789abcde);
787        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1301, 0, 0x789abcde);
788
789        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
790
791        let data = simulate_run(lds.get_caps1(0x7E11)).unwrap();
792
793        assert_eq!(
794            "aa117e2000200013000000000000001d",
795            hex_encode(lds.writer_ref())
796        );
797        assert_eq!(
798            "aa2000117e20011300005e3c1a781c54",
799            hex_encode(&data.unwrap())
800        );
801    }
802
803    #[test]
804    fn test_begin_bulk_value_transaction() {
805        let mut rx_buf = Vec::new();
806        let tx_buf = Cursor::new(Vec::new());
807
808        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
809        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1401, 0, 0);
810        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1401, 0, 0);
811        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1400, 0, 0);
812        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1401, 0, 0);
813
814        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
815
816        let data = simulate_run(lds.begin_bulk_value_transaction(0x7E11, 0x789abcde)).unwrap();
817
818        assert_eq!(
819            "aa117e200020001400005e3c1a781c54",
820            hex_encode(lds.writer_ref())
821        );
822        assert_eq!(
823            "aa2000117e200114000000000000001b",
824            hex_encode(&data.unwrap())
825        );
826    }
827
828    #[test]
829    fn test_commit_value_transaction() {
830        let mut rx_buf = Vec::new();
831        let tx_buf = Cursor::new(Vec::new());
832
833        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
834        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1403, 0, 0);
835        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1403, 0, 0);
836        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1402, 0, 0);
837        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1403, 0, 0);
838
839        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
840
841        let data = simulate_run(lds.commit_bulk_value_transaction(0x7E11)).unwrap();
842
843        assert_eq!(
844            "aa117e2000200214000000000000001a",
845            hex_encode(lds.writer_ref())
846        );
847        assert_eq!(
848            "aa2000117e2003140000000000000019",
849            hex_encode(&data.unwrap())
850        );
851    }
852
853    #[test]
854    fn test_rollback_value_transaction() {
855        let mut rx_buf = Vec::new();
856        let tx_buf = Cursor::new(Vec::new());
857
858        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
859        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1405, 0, 0);
860        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1405, 0, 0);
861        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1404, 0, 0);
862        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1405, 0, 0);
863
864        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
865
866        let data = simulate_run(lds.rollback_bulk_value_transaction(0x7E11)).unwrap();
867
868        assert_eq!(
869            "aa117e20002004140000000000000018",
870            hex_encode(lds.writer_ref())
871        );
872        assert_eq!(
873            "aa2000117e2005140000000000000017",
874            hex_encode(&data.unwrap())
875        );
876    }
877
878    #[test]
879    fn test_set_bulk_value_by_index() {
880        let mut rx_buf = Vec::new();
881        let tx_buf = Cursor::new(Vec::new());
882
883        extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
884        extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1656, 0x1234, 0x789abcde);
885        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1656, 0x1234, 0x789abcde);
886        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1657, 0x1234, 0x789abcde);
887        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1656, 0x1235, 0x789abcde);
888        extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1656, 0x1234, 0x789abcde);
889
890        let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
891
892        let data =
893            simulate_run(lds.set_bulk_value_by_index(0x7E11, 0x1234, 0x56, 0x789abcde)).unwrap();
894
895        assert_eq!(
896            "aa117e200020561534125e3c1a781c37",
897            hex_encode(lds.writer_ref())
898        );
899        assert_eq!(
900            "aa2000117e20561634125e3c1a781c36",
901            hex_encode(&data.unwrap())
902        );
903    }
904}