1use std::{marker::Unpin, time::Duration};
2
3use async_std::{
4 io::{Read, Write},
5 prelude::*,
6};
7
8use resol_vbus::{chrono::Utc, live_data_encoder, Data, Datagram, Header, LiveDataBuffer};
9
10use crate::error::Result;
11
12fn try_as_datagram(data: &Data) -> Option<&Datagram> {
13 if data.is_datagram() {
14 Some(data.as_datagram())
15 } else {
16 None
17 }
18}
19
20#[derive(Debug)]
26pub struct LiveDataStream<R: Read + Unpin, W: Write + Unpin> {
27 reader: R,
28 writer: W,
29 channel: u8,
30 self_address: u16,
31 buf: LiveDataBuffer,
32}
33
34impl<R: Read + Unpin, W: Write + Unpin> LiveDataStream<R, W> {
35 pub fn new(reader: R, writer: W, channel: u8, self_address: u16) -> LiveDataStream<R, W> {
37 LiveDataStream {
38 reader,
39 writer,
40 channel,
41 self_address,
42 buf: LiveDataBuffer::new(channel),
43 }
44 }
45
46 pub fn into_inner(self) -> (R, W) {
48 (self.reader, self.writer)
49 }
50
51 fn create_datagram(
52 &self,
53 destination_address: u16,
54 command: u16,
55 param16: i16,
56 param32: i32,
57 ) -> Datagram {
58 Datagram {
59 header: Header {
60 timestamp: Utc::now(),
61 channel: self.channel,
62 destination_address,
63 source_address: self.self_address,
64 protocol_version: 0x20,
65 },
66 command,
67 param16,
68 param32,
69 }
70 }
71
72 async fn transceive_internal<F>(
73 &mut self,
74 tx_data: Option<Data>,
75 max_tries: usize,
76 initial_timeout_ms: u64,
77 timeout_increment_ms: u64,
78 filter: F,
79 ) -> Result<Option<Data>>
80 where
81 F: Fn(&Data) -> bool,
82 {
83 let tx_data = match tx_data {
84 Some(ref data) => {
85 let len = live_data_encoder::length_from_data(data);
86 let mut bytes = vec![0u8; len];
87 live_data_encoder::bytes_from_data(data, &mut bytes);
88 Some(bytes)
89 }
90 None => None,
91 };
92
93 let mut current_try = 0;
94 let mut current_timeout_ms = initial_timeout_ms;
95
96 let result = loop {
97 if current_try >= max_tries {
98 break None;
99 }
100
101 if let Some(ref tx_data) = tx_data {
102 self.writer.write_all(tx_data).await?;
103 }
104
105 let result = async_std::io::timeout(Duration::from_millis(current_timeout_ms), async {
106 loop {
107 let data = loop {
108 if let Some(data) = self.buf.read_data() {
109 if filter(&data) {
110 break Some(data);
111 }
112 } else {
113 break None;
114 }
115 };
116
117 if let Some(data) = data {
118 break Ok(Some(data));
119 }
120
121 let mut buf = [0u8; 256];
122 let len = self.reader.read(&mut buf).await?;
123 if len == 0 {
124 break Ok(None);
125 }
126
127 self.buf.extend_from_slice(&buf[0..len]);
128 }
129 })
130 .await;
131
132 if let Ok(data) = result {
133 break data;
134 }
135
136 current_try += 1;
137 current_timeout_ms += timeout_increment_ms;
138 };
139
140 Ok(result)
141 }
142
143 pub async fn receive<F>(&mut self, timeout_ms: u64, filter: F) -> Result<Option<Data>>
158 where
159 F: Fn(&Data) -> bool,
160 {
161 self.transceive_internal(None, 1, timeout_ms, 0, filter)
162 .await
163 }
164
165 pub async fn transceive<F>(
186 &mut self,
187 tx_data: Data,
188 max_tries: usize,
189 initial_timeout_ms: u64,
190 timeout_increment_ms: u64,
191 filter: F,
192 ) -> Result<Option<Data>>
193 where
194 F: Fn(&Data) -> bool,
195 {
196 self.transceive_internal(
197 Some(tx_data),
198 max_tries,
199 initial_timeout_ms,
200 timeout_increment_ms,
201 filter,
202 )
203 .await
204 }
205
206 pub async fn receive_any_data(&mut self, timeout_ms: u64) -> Result<Option<Data>> {
208 self.receive(timeout_ms, |_| true).await
209 }
210
211 pub async fn wait_for_free_bus(&mut self) -> Result<Option<Datagram>> {
213 let rx_data = self
214 .receive(20000, |data| {
215 if let Some(dgram) = try_as_datagram(data) {
216 if dgram.command != 0x0500 {
217 false
218 } else {
219 true
220 }
221 } else {
222 false
223 }
224 })
225 .await?;
226
227 Ok(rx_data.map(|data| data.into_datagram()))
228 }
229
230 pub async fn release_bus(&mut self, address: u16) -> Result<Option<Data>> {
232 let tx_dgram = self.create_datagram(address, 0x0600, 0, 0);
233
234 let tx_data = Data::Datagram(tx_dgram);
235
236 let rx_data = self
237 .transceive(tx_data, 2, 2500, 2500, |data| data.is_packet())
238 .await?;
239
240 Ok(rx_data)
241 }
242
243 pub async fn get_value_by_index(
245 &mut self,
246 address: u16,
247 index: i16,
248 subindex: u8,
249 ) -> Result<Option<Datagram>> {
250 let tx_dgram = self.create_datagram(address, 0x0300 | u16::from(subindex), index, 0);
251
252 let tx_data = Data::Datagram(tx_dgram.clone());
253
254 let rx_data = self
255 .transceive(tx_data, 3, 500, 500, |data| {
256 if let Some(dgram) = try_as_datagram(data) {
257 if dgram.header.source_address != tx_dgram.header.destination_address {
258 false
259 } else if dgram.header.destination_address != tx_dgram.header.source_address {
260 false
261 } else if dgram.command != (0x0100 | u16::from(subindex)) {
262 false
263 } else if dgram.param16 != tx_dgram.param16 {
264 false
265 } else {
266 true
267 }
268 } else {
269 false
270 }
271 })
272 .await?;
273
274 Ok(rx_data.map(|data| data.into_datagram()))
275 }
276
277 pub async fn set_value_by_index(
279 &mut self,
280 address: u16,
281 index: i16,
282 subindex: u8,
283 value: i32,
284 ) -> Result<Option<Datagram>> {
285 let tx_dgram = self.create_datagram(address, 0x0200 | u16::from(subindex), index, value);
286
287 let tx_data = Data::Datagram(tx_dgram.clone());
288
289 let rx_data = self
290 .transceive(tx_data, 3, 500, 500, |data| {
291 if let Some(dgram) = try_as_datagram(data) {
292 if dgram.header.source_address != tx_dgram.header.destination_address {
293 false
294 } else if dgram.header.destination_address != tx_dgram.header.source_address {
295 false
296 } else if dgram.command != (0x0100 | u16::from(subindex)) {
297 false
298 } else if dgram.param16 != tx_dgram.param16 {
299 false
300 } else {
301 true
302 }
303 } else {
304 false
305 }
306 })
307 .await?;
308
309 Ok(rx_data.map(|data| data.into_datagram()))
310 }
311
312 pub async fn get_value_id_hash_by_index(
314 &mut self,
315 address: u16,
316 index: i16,
317 ) -> Result<Option<Datagram>> {
318 let tx_dgram = self.create_datagram(address, 0x1000, index, 0);
319
320 let tx_data = Data::Datagram(tx_dgram.clone());
321
322 let rx_data = self
323 .transceive(tx_data, 3, 500, 500, |data| {
324 if let Some(dgram) = try_as_datagram(data) {
325 if dgram.header.source_address != tx_dgram.header.destination_address {
326 false
327 } else if dgram.header.destination_address != tx_dgram.header.source_address {
328 false
329 } else if dgram.command != 0x0100 {
330 false
331 } else if dgram.param16 != tx_dgram.param16 {
332 false
333 } else {
334 true
335 }
336 } else {
337 false
338 }
339 })
340 .await?;
341
342 Ok(rx_data.map(|data| data.into_datagram()))
343 }
344
345 pub async fn get_value_index_by_id_hash(
347 &mut self,
348 address: u16,
349 id_hash: i32,
350 ) -> Result<Option<Datagram>> {
351 let tx_dgram = self.create_datagram(address, 0x1100, 0, id_hash);
352
353 let tx_data = Data::Datagram(tx_dgram.clone());
354
355 let rx_data = self
356 .transceive(tx_data, 3, 500, 500, |data| {
357 if let Some(dgram) = try_as_datagram(data) {
358 if dgram.header.source_address != tx_dgram.header.destination_address {
359 false
360 } else if dgram.header.destination_address != tx_dgram.header.source_address {
361 false
362 } else if dgram.command != 0x0100 {
363 false
364 } else if dgram.param32 != tx_dgram.param32 {
365 false
366 } else {
367 true
368 }
369 } else {
370 false
371 }
372 })
373 .await?;
374
375 Ok(rx_data.map(|data| data.into_datagram()))
376 }
377
378 pub async fn get_caps1(&mut self, address: u16) -> Result<Option<Datagram>> {
380 let tx_dgram = self.create_datagram(address, 0x1300, 0, 0);
381
382 let tx_data = Data::Datagram(tx_dgram.clone());
383
384 let rx_data = self
385 .transceive(tx_data, 3, 500, 500, |data| {
386 if let Data::Datagram(ref dgram) = *data {
387 if dgram.header.source_address != tx_dgram.header.destination_address {
388 false
389 } else if dgram.header.destination_address != tx_dgram.header.source_address {
390 false
391 } else if dgram.command != 0x1301 {
392 false
393 } else {
394 true
395 }
396 } else {
397 false
398 }
399 })
400 .await?;
401
402 Ok(rx_data.map(|data| data.into_datagram()))
403 }
404
405 pub async fn begin_bulk_value_transaction(
407 &mut self,
408 address: u16,
409 tx_timeout: i32,
410 ) -> Result<Option<Datagram>> {
411 let tx_dgram = self.create_datagram(address, 0x1400, 0, tx_timeout);
412
413 let tx_data = Data::Datagram(tx_dgram.clone());
414
415 let rx_data = self
416 .transceive(tx_data, 3, 500, 500, |data| {
417 if let Data::Datagram(ref dgram) = *data {
418 if dgram.header.source_address != tx_dgram.header.destination_address {
419 false
420 } else if dgram.header.destination_address != tx_dgram.header.source_address {
421 false
422 } else if dgram.command != 0x1401 {
423 false
424 } else {
425 true
426 }
427 } else {
428 false
429 }
430 })
431 .await?;
432
433 Ok(rx_data.map(|data| data.into_datagram()))
434 }
435
436 pub async fn commit_bulk_value_transaction(
438 &mut self,
439 address: u16,
440 ) -> Result<Option<Datagram>> {
441 let tx_dgram = self.create_datagram(address, 0x1402, 0, 0);
442
443 let tx_data = Data::Datagram(tx_dgram.clone());
444
445 let rx_data = self
446 .transceive(tx_data, 3, 500, 500, |data| {
447 if let Data::Datagram(ref dgram) = *data {
448 if dgram.header.source_address != tx_dgram.header.destination_address {
449 false
450 } else if dgram.header.destination_address != tx_dgram.header.source_address {
451 false
452 } else if dgram.command != 0x1403 {
453 false
454 } else {
455 true
456 }
457 } else {
458 false
459 }
460 })
461 .await?;
462
463 Ok(rx_data.map(|data| data.into_datagram()))
464 }
465
466 pub async fn rollback_bulk_value_transaction(
468 &mut self,
469 address: u16,
470 ) -> Result<Option<Datagram>> {
471 let tx_dgram = self.create_datagram(address, 0x1404, 0, 0);
472
473 let tx_data = Data::Datagram(tx_dgram.clone());
474
475 let rx_data = self
476 .transceive(tx_data, 3, 500, 500, |data| {
477 if let Data::Datagram(ref dgram) = *data {
478 if dgram.header.source_address != tx_dgram.header.destination_address {
479 false
480 } else if dgram.header.destination_address != tx_dgram.header.source_address {
481 false
482 } else if dgram.command != 0x1405 {
483 false
484 } else {
485 true
486 }
487 } else {
488 false
489 }
490 })
491 .await?;
492
493 Ok(rx_data.map(|data| data.into_datagram()))
494 }
495
496 pub async fn set_bulk_value_by_index(
498 &mut self,
499 address: u16,
500 index: i16,
501 subindex: u8,
502 value: i32,
503 ) -> Result<Option<Datagram>> {
504 let tx_dgram = self.create_datagram(address, 0x1500 | u16::from(subindex), index, value);
505
506 let tx_data = Data::Datagram(tx_dgram.clone());
507
508 let rx_data = self
509 .transceive(tx_data, 3, 500, 500, |data| {
510 if let Data::Datagram(ref dgram) = *data {
511 if dgram.header.source_address != tx_dgram.header.destination_address {
512 false
513 } else if dgram.header.destination_address != tx_dgram.header.source_address {
514 false
515 } else if dgram.command != (0x1600 | u16::from(subindex)) {
516 false
517 } else if dgram.param16 != tx_dgram.param16 {
518 false
519 } else {
520 true
521 }
522 } else {
523 false
524 }
525 })
526 .await?;
527
528 Ok(rx_data.map(|data| data.into_datagram()))
529 }
530}
531
532#[cfg(test)]
533impl<R: Read + Unpin, W: Write + Unpin> LiveDataStream<R, W> {
534 pub fn writer_ref(&self) -> &W {
535 &self.writer
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use async_std::io::Cursor;
542
543 use resol_vbus::Packet;
544
545 use super::*;
546
547 fn extend_from_data(buf: &mut Vec<u8>, data: &Data) {
548 let len = live_data_encoder::length_from_data(data);
549 let idx = buf.len();
550 buf.resize(idx + len, 0);
551 live_data_encoder::bytes_from_data(data, &mut buf[idx..]);
552 }
553
554 fn extend_with_empty_packet(
555 buf: &mut Vec<u8>,
556 destination_address: u16,
557 source_address: u16,
558 command: u16,
559 ) {
560 let data = Data::Packet(Packet {
561 header: Header {
562 timestamp: Utc::now(),
563 channel: 0,
564 destination_address,
565 source_address,
566 protocol_version: 0x20,
567 },
568 command,
569 frame_count: 0,
570 frame_data: [0; 508],
571 });
572 extend_from_data(buf, &data);
573 }
574
575 fn extend_from_datagram(
576 buf: &mut Vec<u8>,
577 destination_address: u16,
578 source_address: u16,
579 command: u16,
580 param16: i16,
581 param32: i32,
582 ) {
583 let data = Data::Datagram(Datagram {
584 header: Header {
585 timestamp: Utc::now(),
586 channel: 0,
587 destination_address,
588 source_address,
589 protocol_version: 0x20,
590 },
591 command,
592 param16,
593 param32,
594 });
595 extend_from_data(buf, &data);
596 }
597
598 fn simulate_run<T, F: Future<Output = T>>(f: F) -> T {
599 async_std::task::block_on(f)
600 }
601
602 trait ToBytes {
603 fn to_bytes(&self) -> Vec<u8>;
604 }
605
606 fn hex_encode<T: ToBytes>(value: &T) -> String {
607 let buf = value.to_bytes();
608 buf.iter()
609 .map(|b| format!("{:02x}", b))
610 .collect::<Vec<String>>()
611 .concat()
612 }
613
614 impl ToBytes for Cursor<Vec<u8>> {
615 fn to_bytes(&self) -> Vec<u8> {
616 self.get_ref().clone()
617 }
618 }
619
620 impl ToBytes for Data {
621 fn to_bytes(&self) -> Vec<u8> {
622 let len = live_data_encoder::length_from_data(self);
623 let mut buf = Vec::new();
624 buf.resize(len, 0);
625 live_data_encoder::bytes_from_data(self, &mut buf);
626 buf
627 }
628 }
629
630 impl ToBytes for Datagram {
631 fn to_bytes(&self) -> Vec<u8> {
632 Data::Datagram(self.clone()).to_bytes()
633 }
634 }
635
636 #[test]
637 fn test_wait_for_free_bus() {
638 let mut rx_buf = Vec::new();
639 let tx_buf = Cursor::new(Vec::new());
640
641 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
642 extend_from_datagram(&mut rx_buf, 0x0000, 0x7E11, 0x0500, 0, 0);
643
644 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
645
646 let data = simulate_run(lds.wait_for_free_bus()).unwrap();
647
648 assert_eq!("", hex_encode(lds.writer_ref()));
649 assert_eq!(
650 "aa0000117e200005000000000000004b",
651 hex_encode(&data.unwrap())
652 );
653 }
654
655 #[test]
656 fn test_release_bus() {
657 let mut rx_buf = Vec::new();
658 let tx_buf = Cursor::new(Vec::new());
659
660 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0, 0);
661 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
662
663 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
664
665 let data = simulate_run(lds.release_bus(0x7E11)).unwrap();
666
667 assert_eq!(
668 "aa117e2000200006000000000000002a",
669 hex_encode(lds.writer_ref())
670 );
671 assert_eq!("aa1000117e100001004f", hex_encode(&data.unwrap()));
672 }
673
674 #[test]
675 fn test_get_value_by_index() {
676 let mut rx_buf = Vec::new();
677 let tx_buf = Cursor::new(Vec::new());
678
679 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
680 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0156, 0x1234, 0x789abcde);
681 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0156, 0x1234, 0x789abcde);
682 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0157, 0x1234, 0x789abcde);
683 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1235, 0x789abcde);
684 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1234, 0x789abcde);
685
686 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
687
688 let data = simulate_run(lds.get_value_by_index(0x7E11, 0x1234, 0x56)).unwrap();
689
690 assert_eq!(
691 "aa117e20002056033412000000000011",
692 hex_encode(lds.writer_ref())
693 );
694 assert_eq!(
695 "aa2000117e20560134125e3c1a781c4b",
696 hex_encode(&data.unwrap())
697 );
698 }
699
700 #[test]
701 fn test_set_value_by_index() {
702 let mut rx_buf = Vec::new();
703 let tx_buf = Cursor::new(Vec::new());
704
705 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
706 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0156, 0x1234, 0x789abcde);
707 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0156, 0x1234, 0x789abcde);
708 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0157, 0x1234, 0x789abcde);
709 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1235, 0x789abcde);
710 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0156, 0x1234, 0x789abcde);
711
712 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
713
714 let data = simulate_run(lds.set_value_by_index(0x7E11, 0x1234, 0x56, 0x789abcde)).unwrap();
715
716 assert_eq!(
717 "aa117e200020560234125e3c1a781c4a",
718 hex_encode(lds.writer_ref())
719 );
720 assert_eq!(
721 "aa2000117e20560134125e3c1a781c4b",
722 hex_encode(&data.unwrap())
723 );
724 }
725
726 #[test]
727 fn test_get_value_id_hash_by_index() {
728 let mut rx_buf = Vec::new();
729 let tx_buf = Cursor::new(Vec::new());
730
731 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
732 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0100, 0x1234, 0x789abcde);
733 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0100, 0x1234, 0x789abcde);
734 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0101, 0x1234, 0x789abcde);
735 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1235, 0x789abcde);
736 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1234, 0x789abcde);
737
738 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
739
740 let data = simulate_run(lds.get_value_id_hash_by_index(0x7E11, 0x1234)).unwrap();
741
742 assert_eq!(
743 "aa117e2000200010341200000000005a",
744 hex_encode(lds.writer_ref())
745 );
746 assert_eq!(
747 "aa2000117e20000134125e3c1a781c21",
748 hex_encode(&data.unwrap())
749 );
750 }
751
752 #[test]
753 fn test_get_value_index_by_id_hash() {
754 let mut rx_buf = Vec::new();
755 let tx_buf = Cursor::new(Vec::new());
756
757 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
758 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x0100, 0x1234, 0x789abcde);
759 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x0100, 0x1234, 0x789abcde);
760 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0101, 0x1234, 0x789abcde);
761 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1234, 0x789abcdf);
762 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x0100, 0x1234, 0x789abcde);
763
764 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
765
766 let data = simulate_run(lds.get_value_index_by_id_hash(0x7E11, 0x789abcde)).unwrap();
767
768 assert_eq!(
769 "aa117e200020001100005e3c1a781c57",
770 hex_encode(lds.writer_ref())
771 );
772 assert_eq!(
773 "aa2000117e20000134125e3c1a781c21",
774 hex_encode(&data.unwrap())
775 );
776 }
777
778 #[test]
779 fn test_get_caps1() {
780 let mut rx_buf = Vec::new();
781 let tx_buf = Cursor::new(Vec::new());
782
783 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
784 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1301, 0, 0x789abcde);
785 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1301, 0, 0x789abcde);
786 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1300, 0, 0x789abcde);
787 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1301, 0, 0x789abcde);
788
789 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
790
791 let data = simulate_run(lds.get_caps1(0x7E11)).unwrap();
792
793 assert_eq!(
794 "aa117e2000200013000000000000001d",
795 hex_encode(lds.writer_ref())
796 );
797 assert_eq!(
798 "aa2000117e20011300005e3c1a781c54",
799 hex_encode(&data.unwrap())
800 );
801 }
802
803 #[test]
804 fn test_begin_bulk_value_transaction() {
805 let mut rx_buf = Vec::new();
806 let tx_buf = Cursor::new(Vec::new());
807
808 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
809 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1401, 0, 0);
810 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1401, 0, 0);
811 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1400, 0, 0);
812 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1401, 0, 0);
813
814 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
815
816 let data = simulate_run(lds.begin_bulk_value_transaction(0x7E11, 0x789abcde)).unwrap();
817
818 assert_eq!(
819 "aa117e200020001400005e3c1a781c54",
820 hex_encode(lds.writer_ref())
821 );
822 assert_eq!(
823 "aa2000117e200114000000000000001b",
824 hex_encode(&data.unwrap())
825 );
826 }
827
828 #[test]
829 fn test_commit_value_transaction() {
830 let mut rx_buf = Vec::new();
831 let tx_buf = Cursor::new(Vec::new());
832
833 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
834 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1403, 0, 0);
835 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1403, 0, 0);
836 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1402, 0, 0);
837 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1403, 0, 0);
838
839 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
840
841 let data = simulate_run(lds.commit_bulk_value_transaction(0x7E11)).unwrap();
842
843 assert_eq!(
844 "aa117e2000200214000000000000001a",
845 hex_encode(lds.writer_ref())
846 );
847 assert_eq!(
848 "aa2000117e2003140000000000000019",
849 hex_encode(&data.unwrap())
850 );
851 }
852
853 #[test]
854 fn test_rollback_value_transaction() {
855 let mut rx_buf = Vec::new();
856 let tx_buf = Cursor::new(Vec::new());
857
858 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
859 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1405, 0, 0);
860 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1405, 0, 0);
861 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1404, 0, 0);
862 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1405, 0, 0);
863
864 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
865
866 let data = simulate_run(lds.rollback_bulk_value_transaction(0x7E11)).unwrap();
867
868 assert_eq!(
869 "aa117e20002004140000000000000018",
870 hex_encode(lds.writer_ref())
871 );
872 assert_eq!(
873 "aa2000117e2005140000000000000017",
874 hex_encode(&data.unwrap())
875 );
876 }
877
878 #[test]
879 fn test_set_bulk_value_by_index() {
880 let mut rx_buf = Vec::new();
881 let tx_buf = Cursor::new(Vec::new());
882
883 extend_with_empty_packet(&mut rx_buf, 0x0010, 0x7E11, 0x0100);
884 extend_from_datagram(&mut rx_buf, 0x0021, 0x7E11, 0x1656, 0x1234, 0x789abcde);
885 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E10, 0x1656, 0x1234, 0x789abcde);
886 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1657, 0x1234, 0x789abcde);
887 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1656, 0x1235, 0x789abcde);
888 extend_from_datagram(&mut rx_buf, 0x0020, 0x7E11, 0x1656, 0x1234, 0x789abcde);
889
890 let mut lds = LiveDataStream::new(&rx_buf[..], tx_buf, 0, 0x0020);
891
892 let data =
893 simulate_run(lds.set_bulk_value_by_index(0x7E11, 0x1234, 0x56, 0x789abcde)).unwrap();
894
895 assert_eq!(
896 "aa117e200020561534125e3c1a781c37",
897 hex_encode(lds.writer_ref())
898 );
899 assert_eq!(
900 "aa2000117e20561634125e3c1a781c36",
901 hex_encode(&data.unwrap())
902 );
903 }
904}