1use std::{
2 collections::VecDeque,
3 io::{BufRead, BufReader, Cursor, Read, Write},
4};
5
6use bytes::{Buf, BytesMut};
7use tracing::warn;
8
9use crate::{
10 pdu::{LARGE_PDU_SIZE, PDU_HEADER_SIZE},
11 read_pdu, Pdu,
12};
13
14fn setup_pdata_header(buffer: &mut [u8], is_last: bool) {
16 let data_len = (buffer.len() - 12) as u32;
17
18 let pdu_len = data_len + 4 + 2;
20 let pdu_len_bytes = pdu_len.to_be_bytes();
21
22 buffer[2] = pdu_len_bytes[0];
23 buffer[3] = pdu_len_bytes[1];
24 buffer[4] = pdu_len_bytes[2];
25 buffer[5] = pdu_len_bytes[3];
26
27 let pdv_data_len = data_len + 2;
29 let data_len_bytes = pdv_data_len.to_be_bytes();
30
31 buffer[6] = data_len_bytes[0];
32 buffer[7] = data_len_bytes[1];
33 buffer[8] = data_len_bytes[2];
34 buffer[9] = data_len_bytes[3];
35
36 buffer[11] = if is_last { 0x02 } else { 0x00 };
38}
39
40#[must_use]
84pub struct PDataWriter<W: Write> {
85 buffer: Vec<u8>,
86 stream: W,
87 max_data_len: u32,
88}
89
90impl<W> PDataWriter<W>
91where
92 W: Write,
93{
94 pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
98 let max_data_length = calculate_max_data_len_single(max_pdu_length);
99 let mut buffer =
100 Vec::with_capacity((max_data_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize);
101 buffer.extend([
103 0x04,
105 0x00,
106 0xFF,
108 0xFF,
109 0xFF,
110 0xFF,
111 0xFF,
113 0xFF,
114 0xFF,
115 0xFF,
116 presentation_context_id,
118 0xFF,
120 ]);
121
122 PDataWriter {
123 stream,
124 max_data_len: max_data_length,
125 buffer,
126 }
127 }
128
129 pub fn finish(mut self) -> std::io::Result<()> {
134 self.finish_impl()?;
135 Ok(())
136 }
137
138 fn finish_impl(&mut self) -> std::io::Result<()> {
139 if !self.buffer.is_empty() {
140 setup_pdata_header(&mut self.buffer, true);
142 self.stream.write_all(&self.buffer[..])?;
143 self.buffer.clear();
146 }
147 Ok(())
148 }
149
150 fn dispatch_pdu(&mut self) -> std::io::Result<()> {
155 debug_assert!(self.buffer.len() >= 12);
156 setup_pdata_header(&mut self.buffer, false);
158 self.stream.write_all(&self.buffer)?;
159
160 self.buffer.truncate(12);
162
163 Ok(())
164 }
165}
166
167impl<W> Write for PDataWriter<W>
168where
169 W: Write,
170{
171 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
172 let total_len = self.max_data_len as usize + 12;
173 if self.buffer.len() + buf.len() <= total_len {
174 self.buffer.extend(buf);
176 Ok(buf.len())
177 } else {
178 let buf = &buf[..total_len - self.buffer.len()];
181 self.buffer.extend(buf);
182 debug_assert_eq!(self.buffer.len(), total_len);
183 self.dispatch_pdu()?;
184 Ok(buf.len())
185 }
186 }
187
188 fn flush(&mut self) -> std::io::Result<()> {
189 Ok(())
191 }
192}
193
194impl<W> Drop for PDataWriter<W>
199where
200 W: Write,
201{
202 fn drop(&mut self) {
203 let _ = self.finish_impl();
204 }
205}
206
207#[must_use]
242pub struct PDataReader<'a, R> {
243 buffer: VecDeque<u8>,
244 stream: R,
245 presentation_context_id: Option<u8>,
246 max_data_length: u32,
247 last_pdu: bool,
248 read_buffer: &'a mut BytesMut,
249}
250
251impl<'a, R> PDataReader<'a, R> {
252 pub fn new(stream: R, max_data_length: u32, remaining: &'a mut BytesMut) -> Self {
253 PDataReader {
254 buffer: VecDeque::with_capacity(max_data_length.min(LARGE_PDU_SIZE) as usize),
255 stream,
256 presentation_context_id: None,
257 max_data_length,
258 last_pdu: false,
259 read_buffer: remaining,
260 }
261 }
262
263 pub fn stop_receiving(&mut self) -> std::io::Result<()> {
269 self.last_pdu = true;
270 Ok(())
271 }
272}
273
274impl<R> Read for PDataReader<'_, R>
275where
276 R: Read,
277{
278 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
279 if self.buffer.is_empty() {
280 if self.last_pdu {
281 return Ok(0);
283 }
284
285 let mut reader = BufReader::new(&mut self.stream);
286 let msg = loop {
287 let mut buf = Cursor::new(&self.read_buffer[..]);
288 match read_pdu(&mut buf, self.max_data_length, false)
289 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
290 {
291 Some(pdu) => {
292 self.read_buffer.advance(buf.position() as usize);
293 break pdu;
294 }
295 None => {
296 buf.set_position(0)
298 }
299 }
300 let recv = reader.fill_buf()?.to_vec();
301 reader.consume(recv.len());
302 self.read_buffer.extend_from_slice(&recv);
303 if recv.is_empty() {
304 return Err(std::io::Error::new(
305 std::io::ErrorKind::Other,
306 "Connection closed by peer",
307 ));
308 }
309 };
310
311 match msg {
312 Pdu::PData { data } => {
313 for pdata_value in data {
314 self.presentation_context_id = match self.presentation_context_id {
315 None => Some(pdata_value.presentation_context_id),
316 Some(cid) if cid == pdata_value.presentation_context_id => Some(cid),
317 Some(cid) => {
318 warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
319 Some(cid)
320 }
321 };
322 self.buffer.extend(pdata_value.data);
323 self.last_pdu = pdata_value.is_last;
324 }
325 }
326 _ => {
327 return Err(std::io::Error::new(
328 std::io::ErrorKind::UnexpectedEof,
329 "Unexpected PDU type",
330 ))
331 }
332 }
333 }
334 Read::read(&mut self.buffer, buf)
335 }
336}
337
338#[inline]
342fn calculate_max_data_len_single(pdu_len: u32) -> u32 {
343 pdu_len - 4 - 2
346}
347
348#[cfg(feature = "async")]
349pub mod non_blocking {
350 use std::{
351 io::Cursor,
352 pin::Pin,
353 task::{ready, Context, Poll},
354 };
355
356 use bytes::{Buf, BufMut};
357 use tokio::io::{
358 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf,
359 };
360 use tracing::warn;
361
362 use crate::{pdu::PDU_HEADER_SIZE, read_pdu, Pdu};
363
364 pub use super::PDataReader;
365 use super::{calculate_max_data_len_single, setup_pdata_header};
366
367 enum WriteState {
369 Ready,
371 Writing(usize),
373 }
374
375 #[must_use]
423 pub struct AsyncPDataWriter<W: AsyncWrite + Unpin> {
424 buffer: Vec<u8>,
425 stream: W,
426 max_data_len: u32,
427 state: WriteState,
428 }
429
430 #[cfg(feature = "async")]
431 impl<W> AsyncPDataWriter<W>
432 where
433 W: AsyncWrite + Unpin,
434 {
435 pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
439 use crate::pdu::LARGE_PDU_SIZE;
440
441 let max_data_length = calculate_max_data_len_single(max_pdu_length);
442 let mut buffer = Vec::with_capacity(
443 (max_data_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
444 );
445 buffer.extend([
447 0x04,
449 0x00,
450 0xFF,
452 0xFF,
453 0xFF,
454 0xFF,
455 0xFF,
457 0xFF,
458 0xFF,
459 0xFF,
460 presentation_context_id,
462 0xFF,
464 ]);
465
466 AsyncPDataWriter {
467 stream,
468 max_data_len: max_data_length,
469 buffer,
470 state: WriteState::Ready,
471 }
472 }
473
474 pub async fn finish(mut self) -> std::io::Result<()> {
479 self.finish_impl().await?;
480 Ok(())
481 }
482
483 async fn finish_impl(&mut self) -> std::io::Result<()> {
484 if !self.buffer.is_empty() {
485 setup_pdata_header(&mut self.buffer, true);
487 if let Err(e) = self.stream.write_all(&self.buffer[..]).await {
488 println!("Error: {e:?}");
489 }
490 self.buffer.clear();
493 }
494 Ok(())
495 }
496 }
497
498 #[cfg(feature = "async")]
499 impl<W> AsyncWrite for AsyncPDataWriter<W>
500 where
501 W: AsyncWrite + Unpin,
502 {
503 fn poll_write(
504 mut self: Pin<&mut Self>,
505 cx: &mut Context<'_>,
506 buf: &[u8],
507 ) -> Poll<std::result::Result<usize, std::io::Error>> {
508 match self.state {
512 WriteState::Ready => {
513 let total_len = self.max_data_len as usize + 12;
515 if self.buffer.len() + buf.len() <= total_len {
516 self.buffer.extend(buf);
518 Poll::Ready(Ok(buf.len()))
519 } else {
520 let slice = &buf[..total_len - self.buffer.len()];
523 self.buffer.extend(slice);
524 debug_assert_eq!(self.buffer.len(), total_len);
525 setup_pdata_header(&mut self.buffer, false);
526 let this = self.get_mut();
527 match Pin::new(&mut this.stream).poll_write(cx, &this.buffer) {
529 Poll::Ready(Ok(n)) => {
530 if n == this.buffer.len() {
531 this.buffer.truncate(12);
533 Poll::Ready(Ok(slice.len()))
534 } else {
535 this.state = WriteState::Writing(n);
537 Poll::Pending
538 }
539 }
540 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
541 Poll::Pending => {
542 this.state = WriteState::Writing(0);
544 Poll::Pending
545 }
546 }
547 }
548 }
549 WriteState::Writing(pos) => {
550 let buflen = self.buffer.len();
552 let this = self.get_mut();
553 match Pin::new(&mut this.stream).poll_write(cx, &this.buffer[pos..]) {
554 Poll::Ready(Ok(n)) => {
555 if (n + pos) == this.buffer.len() {
556 this.buffer.truncate(12);
558 this.state = WriteState::Ready;
559 Poll::Ready(Ok(buflen - 12))
560 } else {
561 this.state = WriteState::Writing(n + pos);
563 Poll::Pending
564 }
565 }
566 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
567 Poll::Pending => Poll::Pending,
568 }
569 }
570 }
571 }
572
573 fn poll_flush(
574 mut self: Pin<&mut Self>,
575 cx: &mut Context<'_>,
576 ) -> Poll<std::result::Result<(), std::io::Error>> {
577 Pin::new(&mut self.stream).poll_flush(cx)
578 }
579
580 fn poll_shutdown(
581 mut self: Pin<&mut Self>,
582 cx: &mut Context<'_>,
583 ) -> Poll<std::result::Result<(), std::io::Error>> {
584 Pin::new(&mut self.stream).poll_shutdown(cx)
585 }
586 }
587
588 impl<W> Drop for AsyncPDataWriter<W>
593 where
594 W: AsyncWrite + Unpin,
595 {
596 fn drop(&mut self) {
597 tokio::task::block_in_place(move || {
598 tokio::runtime::Handle::current().block_on(async move {
599 let _ = self.finish_impl().await;
600 })
601 })
602 }
603 }
604
605 impl<R> AsyncRead for PDataReader<'_, R>
606 where
607 R: AsyncRead + Unpin,
608 {
609 fn poll_read(
610 mut self: Pin<&mut Self>,
611 cx: &mut Context<'_>,
612 buf: &mut ReadBuf,
613 ) -> Poll<std::io::Result<()>> {
614 if self.buffer.is_empty() {
615 if self.last_pdu {
616 return Poll::Ready(Ok(()));
617 }
618 let Self {
619 ref mut stream,
620 ref mut read_buffer,
621 ref max_data_length,
622 ..
623 } = &mut *self;
624 let mut reader = BufReader::new(stream);
625 let msg = loop {
626 let mut buf = Cursor::new(&read_buffer[..]);
627 match read_pdu(&mut buf, *max_data_length, false)
628 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
629 {
630 Some(pdu) => {
631 read_buffer.advance(buf.position() as usize);
632 break pdu;
633 }
634 None => {
635 buf.set_position(0)
637 }
638 }
639 let recv = ready!(Pin::new(&mut reader).poll_fill_buf(cx))?.to_vec();
640 reader.consume(recv.len());
641 read_buffer.extend_from_slice(&recv);
642 if recv.is_empty() {
643 return Poll::Ready(Err(std::io::Error::new(
644 std::io::ErrorKind::Other,
645 "Connection closed by peer",
646 )));
647 }
648 };
649 match msg {
650 Pdu::PData { data } => {
651 for pdata_value in data {
652 self.presentation_context_id = match self.presentation_context_id {
653 None => Some(pdata_value.presentation_context_id),
654 Some(cid) if cid == pdata_value.presentation_context_id => {
655 Some(cid)
656 }
657 Some(cid) => {
658 warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
659 Some(cid)
660 }
661 };
662 self.buffer.extend(pdata_value.data);
663 self.last_pdu = pdata_value.is_last;
664 }
665 }
666 _ => {
667 return Poll::Ready(Err(std::io::Error::new(
668 std::io::ErrorKind::UnexpectedEof,
669 "Unexpected PDU type",
670 )))
671 }
672 }
673 }
674 let len = std::cmp::min(self.buffer.len(), buf.remaining());
675 for _ in 0..len {
676 buf.put_u8(self.buffer.pop_front().unwrap());
677 }
678 Poll::Ready(Ok(()))
679 }
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use std::io::{Read, Write};
686
687 use crate::association::pdata::PDataWriter;
688 use crate::pdu::{read_pdu, Pdu, MINIMUM_PDU_SIZE, PDU_HEADER_SIZE};
689 use crate::pdu::{PDataValue, PDataValueType};
690 use crate::write_pdu;
691
692 use super::PDataReader;
693
694 use bytes::BytesMut;
695 #[cfg(feature = "async")]
696 use tokio::io::AsyncWriteExt;
697
698 #[cfg(feature = "async")]
699 use crate::association::pdata::non_blocking::AsyncPDataWriter;
700
701 #[test]
702 fn test_write_pdata_and_finish() {
703 let presentation_context_id = 12;
704
705 let mut buf = Vec::new();
706 {
707 let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
708 writer.write_all(&(0..64).collect::<Vec<u8>>()).unwrap();
709 writer.finish().unwrap();
710 }
711
712 let mut cursor = &buf[..];
713 let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
714
715 match same_pdu {
718 Some(Pdu::PData { data: data_1 }) => {
719 let data_1 = &data_1[0];
720
721 assert_eq!(data_1.value_type, PDataValueType::Data);
723 assert_eq!(data_1.presentation_context_id, presentation_context_id);
724 assert_eq!(data_1.data.len(), 64);
725 assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
726 }
727 pdu => panic!("Expected PData, got {:?}", pdu),
728 }
729
730 assert_eq!(cursor.len(), 0);
731 }
732
733 #[cfg(feature = "async")]
734 #[tokio::test(flavor = "multi_thread")]
735 async fn test_async_write_pdata_and_finish() {
736 let presentation_context_id = 12;
737
738 let mut buf = Vec::new();
739 {
740 let mut writer =
741 AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
742 writer
743 .write_all(&(0..64).collect::<Vec<u8>>())
744 .await
745 .unwrap();
746 writer.finish().await.unwrap();
747 }
748
749 let mut cursor = &buf[..];
750 let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
751
752 match same_pdu {
755 Some(Pdu::PData { data: data_1 }) => {
756 let data_1 = &data_1[0];
757
758 assert_eq!(data_1.value_type, PDataValueType::Data);
760 assert_eq!(data_1.presentation_context_id, presentation_context_id);
761 assert_eq!(data_1.data.len(), 64);
762 assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
763 }
764 pdu => panic!("Expected PData, got {:?}", pdu),
765 }
766
767 assert_eq!(cursor.len(), 0);
768 }
769
770 #[test]
771 fn test_write_large_pdata_and_finish() {
772 let presentation_context_id = 32;
773
774 let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
775
776 let mut buf = Vec::new();
777 {
778 let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
779 writer.write_all(&my_data).unwrap();
780 writer.finish().unwrap();
781 }
782
783 let mut cursor = &buf[..];
784 let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
785 let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
786 let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
787
788 match (pdu_1, pdu_2, pdu_3) {
791 (
792 Some(Pdu::PData { data: data_1 }),
793 Some(Pdu::PData { data: data_2 }),
794 Some(Pdu::PData { data: data_3 }),
795 ) => {
796 assert_eq!(data_1.len(), 1);
797 let data_1 = &data_1[0];
798 assert_eq!(data_2.len(), 1);
799 let data_2 = &data_2[0];
800 assert_eq!(data_3.len(), 1);
801 let data_3 = &data_3[0];
802
803 assert_eq!(data_1.value_type, PDataValueType::Data);
805 assert_eq!(data_2.value_type, PDataValueType::Data);
806 assert_eq!(data_1.presentation_context_id, presentation_context_id);
807 assert_eq!(data_2.presentation_context_id, presentation_context_id);
808
809 assert_eq!(
811 data_1.data.len(),
812 (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
813 );
814 assert_eq!(
815 data_2.data.len(),
816 (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
817 );
818 assert_eq!(data_3.data.len(), 820);
819
820 assert_eq!(
822 &data_1.data[..],
823 (0..MINIMUM_PDU_SIZE - PDU_HEADER_SIZE)
824 .map(|x| x as u8)
825 .collect::<Vec<_>>()
826 );
827 assert_eq!(
828 data_1.data.len() + data_2.data.len() + data_3.data.len(),
829 9000
830 );
831
832 let data_1 = &data_1.data;
833 let data_2 = &data_2.data;
834 let data_3 = &data_3.data;
835
836 let mut all_data: Vec<u8> = Vec::new();
837 all_data.extend(data_1);
838 all_data.extend(data_2);
839 all_data.extend(data_3);
840 assert_eq!(all_data, my_data);
841 }
842 x => panic!("Expected 3 PDatas, got {:?}", x),
843 }
844
845 assert_eq!(cursor.len(), 0);
846 }
847
848 #[cfg(feature = "async")]
849 #[tokio::test(flavor = "multi_thread")]
850 async fn test_async_write_large_pdata_and_finish() {
851 let presentation_context_id = 32;
852
853 let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
854
855 let mut buf = Vec::new();
856 {
857 let mut writer =
858 AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
859 writer.write_all(&my_data).await.unwrap();
860 writer.finish().await.unwrap();
861 }
862
863 let mut cursor = &buf[..];
864 let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
865 let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
866 let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
867
868 match (pdu_1, pdu_2, pdu_3) {
871 (
872 Some(Pdu::PData { data: data_1 }),
873 Some(Pdu::PData { data: data_2 }),
874 Some(Pdu::PData { data: data_3 }),
875 ) => {
876 assert_eq!(data_1.len(), 1);
877 let data_1 = &data_1[0];
878 assert_eq!(data_2.len(), 1);
879 let data_2 = &data_2[0];
880 assert_eq!(data_3.len(), 1);
881 let data_3 = &data_3[0];
882
883 assert_eq!(data_1.value_type, PDataValueType::Data);
885 assert_eq!(data_2.value_type, PDataValueType::Data);
886 assert_eq!(data_1.presentation_context_id, presentation_context_id);
887 assert_eq!(data_2.presentation_context_id, presentation_context_id);
888
889 assert_eq!(
891 data_1.data.len(),
892 (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
893 );
894 assert_eq!(
895 data_2.data.len(),
896 (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
897 );
898 assert_eq!(data_3.data.len(), 820);
899
900 assert_eq!(
902 &data_1.data[..],
903 (0..MINIMUM_PDU_SIZE - PDU_HEADER_SIZE)
904 .map(|x| x as u8)
905 .collect::<Vec<_>>()
906 );
907 assert_eq!(
908 data_1.data.len() + data_2.data.len() + data_3.data.len(),
909 9000
910 );
911
912 let data_1 = &data_1.data;
913 let data_2 = &data_2.data;
914 let data_3 = &data_3.data;
915
916 let mut all_data: Vec<u8> = Vec::new();
917 all_data.extend(data_1);
918 all_data.extend(data_2);
919 all_data.extend(data_3);
920 assert_eq!(all_data, my_data);
921 }
922 x => panic!("Expected 3 PDatas, got {:?}", x),
923 }
924
925 assert_eq!(cursor.len(), 0);
926 }
927
928 #[test]
929 fn test_read_large_pdata_and_finish() {
930 use std::collections::VecDeque;
931 let presentation_context_id = 32;
932
933 let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
934 let pdata_1 = vec![PDataValue {
935 value_type: PDataValueType::Data,
936 data: my_data[0..3000].to_owned(),
937 presentation_context_id,
938 is_last: false,
939 }];
940 let pdata_2 = vec![PDataValue {
941 value_type: PDataValueType::Data,
942 data: my_data[3000..6000].to_owned(),
943 presentation_context_id,
944 is_last: false,
945 }];
946 let pdata_3 = vec![PDataValue {
947 value_type: PDataValueType::Data,
948 data: my_data[6000..].to_owned(),
949 presentation_context_id,
950 is_last: true,
951 }];
952
953 let mut pdu_stream = VecDeque::new();
954
955 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
957 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
958 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
959
960 let mut buf = Vec::new();
961 {
962 let mut read_buf = BytesMut::new();
963 let mut reader = PDataReader::new(&mut pdu_stream, MINIMUM_PDU_SIZE, &mut read_buf);
964 reader.read_to_end(&mut buf).unwrap();
965 }
966 assert_eq!(buf, my_data);
967 }
968
969 #[cfg(feature = "async")]
970 #[tokio::test]
971 async fn test_async_read_large_pdata_and_finish() {
972 use tokio::io::AsyncReadExt;
973
974 let presentation_context_id = 32;
975
976 let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
977 let pdata_1 = vec![PDataValue {
978 value_type: PDataValueType::Data,
979 data: my_data[0..3000].to_owned(),
980 presentation_context_id,
981 is_last: false,
982 }];
983 let pdata_2 = vec![PDataValue {
984 value_type: PDataValueType::Data,
985 data: my_data[3000..6000].to_owned(),
986 presentation_context_id,
987 is_last: false,
988 }];
989 let pdata_3 = vec![PDataValue {
990 value_type: PDataValueType::Data,
991 data: my_data[6000..].to_owned(),
992 presentation_context_id,
993 is_last: true,
994 }];
995
996 let mut pdu_stream = std::io::Cursor::new(Vec::new());
997
998 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
1000 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
1001 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
1002
1003 let mut buf = Vec::new();
1004 let inner = pdu_stream.into_inner();
1005 let mut stream = tokio::io::BufReader::new(inner.as_slice());
1006 {
1007 let mut read_buf = BytesMut::new();
1008 let mut reader = PDataReader::new(&mut stream, MINIMUM_PDU_SIZE, &mut read_buf);
1009 reader.read_to_end(&mut buf).await.unwrap();
1010 }
1011 assert_eq!(buf, my_data);
1012 }
1013}