1use std::{
2 collections::VecDeque,
3 io::{BufRead, BufReader, Cursor, Read, Write},
4};
5
6use bytes::{Buf, BytesMut};
7use tracing::warn;
8
9use crate::{
10 pdu::{LARGE_PDU_SIZE, PDU_HEADER_SIZE, PDV_HEADER_SIZE},
11 read_pdu, Pdu,
12};
13
14const PDU_PDV_HEADER_SIZE: usize = (PDU_HEADER_SIZE + PDV_HEADER_SIZE) as usize;
16
17fn setup_pdata_header(buffer: &mut [u8], is_last: bool) {
19 let data_len = (buffer.len() - PDU_PDV_HEADER_SIZE) as u32;
20
21 let pdu_len = data_len + 4 + 2;
23 let pdu_len_bytes = pdu_len.to_be_bytes();
24
25 buffer[2] = pdu_len_bytes[0];
26 buffer[3] = pdu_len_bytes[1];
27 buffer[4] = pdu_len_bytes[2];
28 buffer[5] = pdu_len_bytes[3];
29
30 let pdv_data_len = data_len + 2;
32 let data_len_bytes = pdv_data_len.to_be_bytes();
33
34 buffer[6] = data_len_bytes[0];
35 buffer[7] = data_len_bytes[1];
36 buffer[8] = data_len_bytes[2];
37 buffer[9] = data_len_bytes[3];
38
39 buffer[11] = if is_last { 0x02 } else { 0x00 };
41}
42
43#[must_use]
87pub struct PDataWriter<W: Write> {
88 buffer: Vec<u8>,
89 stream: W,
90 max_data_len: u32,
91}
92
93impl<W> PDataWriter<W>
94where
95 W: Write,
96{
97 pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
101 let max_data_length = calculate_max_data_len_single(max_pdu_length);
102 let mut buffer =
103 Vec::with_capacity((max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize);
104 buffer.extend([
106 0x04,
108 0x00,
109 0xFF,
111 0xFF,
112 0xFF,
113 0xFF,
114 0xFF,
116 0xFF,
117 0xFF,
118 0xFF,
119 presentation_context_id,
121 0xFF,
123 ]);
124
125 PDataWriter {
126 stream,
127 max_data_len: max_data_length,
128 buffer,
129 }
130 }
131
132 pub fn finish(mut self) -> std::io::Result<()> {
137 self.finish_impl()?;
138 Ok(())
139 }
140
141 fn finish_impl(&mut self) -> std::io::Result<()> {
142 if !self.buffer.is_empty() {
143 setup_pdata_header(&mut self.buffer, true);
145 self.stream.write_all(&self.buffer[..])?;
146 self.buffer.clear();
149 }
150 Ok(())
151 }
152
153 fn dispatch_pdu(&mut self) -> std::io::Result<()> {
158 debug_assert!(self.buffer.len() >= PDU_PDV_HEADER_SIZE);
159 setup_pdata_header(&mut self.buffer, false);
161 self.stream.write_all(&self.buffer)?;
162
163 self.buffer.truncate(PDU_PDV_HEADER_SIZE);
165
166 Ok(())
167 }
168}
169
170impl<W> Write for PDataWriter<W>
171where
172 W: Write,
173{
174 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
175 let total_len = self.max_data_len as usize + PDU_PDV_HEADER_SIZE;
176 if self.buffer.len() + buf.len() <= total_len {
177 self.buffer.extend(buf);
179 Ok(buf.len())
180 } else {
181 let buf = &buf[..total_len - self.buffer.len()];
184 self.buffer.extend(buf);
185 debug_assert_eq!(self.buffer.len(), total_len);
186 self.dispatch_pdu()?;
187 Ok(buf.len())
188 }
189 }
190
191 fn flush(&mut self) -> std::io::Result<()> {
192 Ok(())
194 }
195}
196
197impl<W> Drop for PDataWriter<W>
202where
203 W: Write,
204{
205 fn drop(&mut self) {
206 let _ = self.finish_impl();
207 }
208}
209
210#[must_use]
245pub struct PDataReader<'a, R> {
246 buffer: VecDeque<u8>,
247 stream: R,
248 presentation_context_id: Option<u8>,
249 max_data_length: u32,
250 last_pdu: bool,
251 read_buffer: &'a mut BytesMut,
252}
253
254impl<'a, R> PDataReader<'a, R> {
255 pub fn new(stream: R, max_data_length: u32, remaining: &'a mut BytesMut) -> Self {
256 PDataReader {
257 buffer: VecDeque::with_capacity(max_data_length.min(LARGE_PDU_SIZE) as usize),
258 stream,
259 presentation_context_id: None,
260 max_data_length,
261 last_pdu: false,
262 read_buffer: remaining,
263 }
264 }
265
266 pub fn stop_receiving(&mut self) -> std::io::Result<()> {
272 self.last_pdu = true;
273 Ok(())
274 }
275}
276
277impl<R> Read for PDataReader<'_, R>
278where
279 R: Read,
280{
281 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282 if self.buffer.is_empty() {
283 if self.last_pdu {
284 return Ok(0);
286 }
287
288 let mut reader = BufReader::new(&mut self.stream);
289 let msg = loop {
290 let mut buf = Cursor::new(&self.read_buffer[..]);
291 match read_pdu(&mut buf, self.max_data_length, false)
292 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
293 {
294 Some(pdu) => {
295 self.read_buffer.advance(buf.position() as usize);
296 break pdu;
297 }
298 None => {
299 buf.set_position(0)
301 }
302 }
303 let recv = reader.fill_buf()?.to_vec();
304 reader.consume(recv.len());
305 self.read_buffer.extend_from_slice(&recv);
306 if recv.is_empty() {
307 return Err(std::io::Error::new(
308 std::io::ErrorKind::Other,
309 "Connection closed by peer",
310 ));
311 }
312 };
313
314 match msg {
315 Pdu::PData { data } => {
316 for pdata_value in data {
317 self.presentation_context_id = match self.presentation_context_id {
318 None => Some(pdata_value.presentation_context_id),
319 Some(cid) if cid == pdata_value.presentation_context_id => Some(cid),
320 Some(cid) => {
321 warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
322 Some(cid)
323 }
324 };
325 self.buffer.extend(pdata_value.data);
326 self.last_pdu = pdata_value.is_last;
327 }
328 }
329 _ => {
330 return Err(std::io::Error::new(
331 std::io::ErrorKind::UnexpectedEof,
332 "Unexpected PDU type",
333 ))
334 }
335 }
336 }
337 Read::read(&mut self.buffer, buf)
338 }
339}
340
341#[inline]
345fn calculate_max_data_len_single(pdu_len: u32) -> u32 {
346 pdu_len - PDV_HEADER_SIZE
347}
348
349#[cfg(feature = "async")]
350pub mod non_blocking {
351 use std::{
352 io::Cursor,
353 pin::Pin,
354 task::{ready, Context, Poll},
355 };
356
357 use bytes::{Buf, BufMut};
358 use tokio::io::{
359 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf,
360 };
361 use tracing::warn;
362
363 use crate::{
364 pdu::{PDU_HEADER_SIZE, PDV_HEADER_SIZE},
365 read_pdu, Pdu,
366 };
367
368 pub use super::PDataReader;
369 use super::{calculate_max_data_len_single, setup_pdata_header};
370
371 const PDU_PDV_HEADER_SIZE: usize = (PDU_HEADER_SIZE + PDV_HEADER_SIZE) as usize;
372
373 enum WriteState {
375 Ready,
377 Writing(usize),
379 }
380
381 #[must_use]
429 pub struct AsyncPDataWriter<W: AsyncWrite + Unpin> {
430 buffer: Vec<u8>,
431 stream: W,
432 max_data_len: u32,
433 state: WriteState,
434 }
435
436 #[cfg(feature = "async")]
437 impl<W> AsyncPDataWriter<W>
438 where
439 W: AsyncWrite + Unpin,
440 {
441 pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
445 use crate::pdu::LARGE_PDU_SIZE;
446
447 let max_data_length = calculate_max_data_len_single(max_pdu_length);
448 let mut buffer =
449 Vec::with_capacity((max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize);
450 buffer.extend([
452 0x04,
454 0x00,
455 0xFF,
457 0xFF,
458 0xFF,
459 0xFF,
460 0xFF,
462 0xFF,
463 0xFF,
464 0xFF,
465 presentation_context_id,
467 0xFF,
469 ]);
470
471 AsyncPDataWriter {
472 stream,
473 max_data_len: max_data_length,
474 buffer,
475 state: WriteState::Ready,
476 }
477 }
478
479 pub async fn finish(mut self) -> std::io::Result<()> {
484 self.finish_impl().await?;
485 Ok(())
486 }
487
488 async fn finish_impl(&mut self) -> std::io::Result<()> {
489 if !self.buffer.is_empty() {
490 setup_pdata_header(&mut self.buffer, true);
492 if let Err(e) = self.stream.write_all(&self.buffer[..]).await {
493 println!("Error: {e:?}");
494 }
495 self.buffer.clear();
498 }
499 Ok(())
500 }
501 }
502
503 #[cfg(feature = "async")]
504 impl<W> AsyncWrite for AsyncPDataWriter<W>
505 where
506 W: AsyncWrite + Unpin,
507 {
508 fn poll_write(
509 mut self: Pin<&mut Self>,
510 cx: &mut Context<'_>,
511 buf: &[u8],
512 ) -> Poll<std::result::Result<usize, std::io::Error>> {
513 match self.state {
517 WriteState::Ready => {
518 let total_len = self.max_data_len as usize + PDU_PDV_HEADER_SIZE;
520 if self.buffer.len() + buf.len() <= total_len {
521 self.buffer.extend(buf);
523 Poll::Ready(Ok(buf.len()))
524 } else {
525 let slice = &buf[..total_len - self.buffer.len()];
528 self.buffer.extend(slice);
529 debug_assert_eq!(self.buffer.len(), total_len);
530 setup_pdata_header(&mut self.buffer, false);
531 let this = self.get_mut();
532 match Pin::new(&mut this.stream).poll_write(cx, &this.buffer) {
534 Poll::Ready(Ok(n)) => {
535 if n == this.buffer.len() {
536 this.buffer.truncate(PDU_PDV_HEADER_SIZE);
538 Poll::Ready(Ok(slice.len()))
539 } else {
540 this.state = WriteState::Writing(n);
542 Poll::Pending
543 }
544 }
545 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
546 Poll::Pending => {
547 this.state = WriteState::Writing(0);
549 Poll::Pending
550 }
551 }
552 }
553 }
554 WriteState::Writing(pos) => {
555 let buflen = self.buffer.len();
557 let this = self.get_mut();
558 match Pin::new(&mut this.stream).poll_write(cx, &this.buffer[pos..]) {
559 Poll::Ready(Ok(n)) => {
560 if (n + pos) == this.buffer.len() {
561 this.buffer.truncate(PDU_PDV_HEADER_SIZE);
563 this.state = WriteState::Ready;
564 Poll::Ready(Ok(buflen - PDU_PDV_HEADER_SIZE))
565 } else {
566 this.state = WriteState::Writing(n + pos);
568 Poll::Pending
569 }
570 }
571 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
572 Poll::Pending => Poll::Pending,
573 }
574 }
575 }
576 }
577
578 fn poll_flush(
579 mut self: Pin<&mut Self>,
580 cx: &mut Context<'_>,
581 ) -> Poll<std::result::Result<(), std::io::Error>> {
582 Pin::new(&mut self.stream).poll_flush(cx)
583 }
584
585 fn poll_shutdown(
586 mut self: Pin<&mut Self>,
587 cx: &mut Context<'_>,
588 ) -> Poll<std::result::Result<(), std::io::Error>> {
589 Pin::new(&mut self.stream).poll_shutdown(cx)
590 }
591 }
592
593 impl<W> Drop for AsyncPDataWriter<W>
598 where
599 W: AsyncWrite + Unpin,
600 {
601 fn drop(&mut self) {
602 tokio::task::block_in_place(move || {
603 tokio::runtime::Handle::current().block_on(async move {
604 let _ = self.finish_impl().await;
605 })
606 })
607 }
608 }
609
610 impl<R> AsyncRead for PDataReader<'_, R>
611 where
612 R: AsyncRead + Unpin,
613 {
614 fn poll_read(
615 mut self: Pin<&mut Self>,
616 cx: &mut Context<'_>,
617 buf: &mut ReadBuf,
618 ) -> Poll<std::io::Result<()>> {
619 if self.buffer.is_empty() {
620 if self.last_pdu {
621 return Poll::Ready(Ok(()));
622 }
623 let Self {
624 ref mut stream,
625 ref mut read_buffer,
626 ref max_data_length,
627 ..
628 } = &mut *self;
629 let mut reader = BufReader::new(stream);
630 let msg = loop {
631 let mut buf = Cursor::new(&read_buffer[..]);
632 match read_pdu(&mut buf, *max_data_length + PDV_HEADER_SIZE, false)
633 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
634 {
635 Some(pdu) => {
636 read_buffer.advance(buf.position() as usize);
637 break pdu;
638 }
639 None => {
640 buf.set_position(0)
642 }
643 }
644 let recv = ready!(Pin::new(&mut reader).poll_fill_buf(cx))?.to_vec();
645 reader.consume(recv.len());
646 read_buffer.extend_from_slice(&recv);
647 if recv.is_empty() {
648 return Poll::Ready(Err(std::io::Error::new(
649 std::io::ErrorKind::Other,
650 "Connection closed by peer",
651 )));
652 }
653 };
654 match msg {
655 Pdu::PData { data } => {
656 for pdata_value in data {
657 self.presentation_context_id = match self.presentation_context_id {
658 None => Some(pdata_value.presentation_context_id),
659 Some(cid) if cid == pdata_value.presentation_context_id => {
660 Some(cid)
661 }
662 Some(cid) => {
663 warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
664 Some(cid)
665 }
666 };
667 self.buffer.extend(pdata_value.data);
668 self.last_pdu = pdata_value.is_last;
669 }
670 }
671 _ => {
672 return Poll::Ready(Err(std::io::Error::new(
673 std::io::ErrorKind::UnexpectedEof,
674 "Unexpected PDU type",
675 )))
676 }
677 }
678 }
679 let len = std::cmp::min(self.buffer.len(), buf.remaining());
680 for _ in 0..len {
681 buf.put_u8(self.buffer.pop_front().unwrap());
682 }
683 Poll::Ready(Ok(()))
684 }
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use std::io::{Read, Write};
691
692 use crate::association::pdata::PDataWriter;
693 use crate::pdu::{read_pdu, Pdu, MINIMUM_PDU_SIZE, PDV_HEADER_SIZE};
694 use crate::pdu::{PDataValue, PDataValueType};
695 use crate::write_pdu;
696
697 use super::PDataReader;
698
699 use bytes::BytesMut;
700 #[cfg(feature = "async")]
701 use tokio::io::AsyncWriteExt;
702
703 #[cfg(feature = "async")]
704 use crate::association::pdata::non_blocking::AsyncPDataWriter;
705
706 #[test]
707 fn test_write_pdata_and_finish() {
708 let presentation_context_id = 12;
709
710 let mut buf = Vec::new();
711 {
712 let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
713 writer.write_all(&(0..64).collect::<Vec<u8>>()).unwrap();
714 writer.finish().unwrap();
715 }
716
717 let mut cursor = &buf[..];
718 let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
719
720 match same_pdu {
723 Some(Pdu::PData { data: data_1 }) => {
724 let data_1 = &data_1[0];
725
726 assert_eq!(data_1.value_type, PDataValueType::Data);
728 assert_eq!(data_1.presentation_context_id, presentation_context_id);
729 assert_eq!(data_1.data.len(), 64);
730 assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
731 }
732 pdu => panic!("Expected PData, got {:?}", pdu),
733 }
734
735 assert_eq!(cursor.len(), 0);
736 }
737
738 #[cfg(feature = "async")]
739 #[tokio::test(flavor = "multi_thread")]
740 async fn test_async_write_pdata_and_finish() {
741 let presentation_context_id = 12;
742
743 let mut buf = Vec::new();
744 {
745 let mut writer =
746 AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
747 writer
748 .write_all(&(0..64).collect::<Vec<u8>>())
749 .await
750 .unwrap();
751 writer.finish().await.unwrap();
752 }
753
754 let mut cursor = &buf[..];
755 let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
756
757 match same_pdu {
760 Some(Pdu::PData { data: data_1 }) => {
761 let data_1 = &data_1[0];
762
763 assert_eq!(data_1.value_type, PDataValueType::Data);
765 assert_eq!(data_1.presentation_context_id, presentation_context_id);
766 assert_eq!(data_1.data.len(), 64);
767 assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
768 }
769 pdu => panic!("Expected PData, got {:?}", pdu),
770 }
771
772 assert_eq!(cursor.len(), 0);
773 }
774
775 #[test]
776 fn test_write_large_pdata_and_finish() {
777 let presentation_context_id = 32;
778
779 let my_data: Vec<_> = (0..2500).map(|x: u32| x as u8).collect();
780
781 let mut buf = Vec::new();
782 {
783 let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
784 writer.write_all(&my_data).unwrap();
785 writer.finish().unwrap();
786 }
787
788 let mut cursor = &buf[..];
789 let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
790 let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
791 let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
792
793 match (pdu_1, pdu_2, pdu_3) {
796 (
797 Some(Pdu::PData { data: data_1 }),
798 Some(Pdu::PData { data: data_2 }),
799 Some(Pdu::PData { data: data_3 }),
800 ) => {
801 assert_eq!(data_1.len(), 1);
802 let data_1 = &data_1[0];
803 assert_eq!(data_2.len(), 1);
804 let data_2 = &data_2[0];
805 assert_eq!(data_3.len(), 1);
806 let data_3 = &data_3[0];
807
808 assert_eq!(data_1.value_type, PDataValueType::Data);
810 assert_eq!(data_2.value_type, PDataValueType::Data);
811 assert_eq!(data_1.presentation_context_id, presentation_context_id);
812 assert_eq!(data_2.presentation_context_id, presentation_context_id);
813
814 assert_eq!(
816 data_1.data.len(),
817 (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
818 );
819 assert_eq!(
820 data_2.data.len(),
821 (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
822 );
823 assert_eq!(
824 data_3.data.len(),
825 2500 - (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize * 2
826 );
827
828 assert_eq!(
830 &data_1.data[..],
831 (0..MINIMUM_PDU_SIZE - PDV_HEADER_SIZE)
832 .map(|x| x as u8)
833 .collect::<Vec<_>>()
834 );
835 assert_eq!(
836 data_1.data.len() + data_2.data.len() + data_3.data.len(),
837 2500
838 );
839
840 let data_1 = &data_1.data;
841 let data_2 = &data_2.data;
842 let data_3 = &data_3.data;
843
844 let mut all_data: Vec<u8> = Vec::new();
845 all_data.extend(data_1);
846 all_data.extend(data_2);
847 all_data.extend(data_3);
848 assert_eq!(all_data, my_data);
849 }
850 x => panic!("Expected 3 PDatas, got {:?}", x),
851 }
852
853 assert_eq!(cursor.len(), 0);
854 }
855
856 #[cfg(feature = "async")]
857 #[tokio::test(flavor = "multi_thread")]
858 async fn test_async_write_large_pdata_and_finish() {
859 let presentation_context_id = 32;
860
861 let my_data: Vec<_> = (0..2500).map(|x: u32| x as u8).collect();
862
863 let mut buf = Vec::new();
864 {
865 let mut writer =
866 AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
867 writer.write_all(&my_data).await.unwrap();
868 writer.finish().await.unwrap();
869 }
870
871 let mut cursor = &buf[..];
872 let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
873 let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
874 let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
875
876 match (pdu_1, pdu_2, pdu_3) {
879 (
880 Some(Pdu::PData { data: data_1 }),
881 Some(Pdu::PData { data: data_2 }),
882 Some(Pdu::PData { data: data_3 }),
883 ) => {
884 assert_eq!(data_1.len(), 1);
885 let data_1 = &data_1[0];
886 assert_eq!(data_2.len(), 1);
887 let data_2 = &data_2[0];
888 assert_eq!(data_3.len(), 1);
889 let data_3 = &data_3[0];
890
891 assert_eq!(data_1.value_type, PDataValueType::Data);
893 assert_eq!(data_2.value_type, PDataValueType::Data);
894 assert_eq!(data_1.presentation_context_id, presentation_context_id);
895 assert_eq!(data_2.presentation_context_id, presentation_context_id);
896
897 assert_eq!(
899 data_1.data.len(),
900 (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
901 );
902 assert_eq!(
903 data_2.data.len(),
904 (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
905 );
906 assert_eq!(
907 data_3.data.len(),
908 2500 - (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize * 2
909 );
910
911 assert_eq!(
913 &data_1.data[..],
914 (0..MINIMUM_PDU_SIZE - PDV_HEADER_SIZE)
915 .map(|x| x as u8)
916 .collect::<Vec<_>>()
917 );
918 assert_eq!(
919 data_1.data.len() + data_2.data.len() + data_3.data.len(),
920 2500
921 );
922
923 let data_1 = &data_1.data;
924 let data_2 = &data_2.data;
925 let data_3 = &data_3.data;
926
927 let mut all_data: Vec<u8> = Vec::new();
928 all_data.extend(data_1);
929 all_data.extend(data_2);
930 all_data.extend(data_3);
931 assert_eq!(all_data, my_data);
932 }
933 x => panic!("Expected 3 PDatas, got {:?}", x),
934 }
935
936 assert_eq!(cursor.len(), 0);
937 }
938
939 #[test]
940 fn test_read_large_pdata_and_finish() {
941 use std::collections::VecDeque;
942 let presentation_context_id = 32;
943
944 let my_data: Vec<_> = (0..2500).map(|x: u32| x as u8).collect();
945 let pdata_1 = vec![PDataValue {
946 value_type: PDataValueType::Data,
947 data: my_data[0..1018].to_owned(),
948 presentation_context_id,
949 is_last: false,
950 }];
951 let pdata_2 = vec![PDataValue {
952 value_type: PDataValueType::Data,
953 data: my_data[1018..2036].to_owned(),
954 presentation_context_id,
955 is_last: false,
956 }];
957 let pdata_3 = vec![PDataValue {
958 value_type: PDataValueType::Data,
959 data: my_data[2036..].to_owned(),
960 presentation_context_id,
961 is_last: true,
962 }];
963
964 let mut pdu_stream = VecDeque::new();
965
966 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
968 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
969 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
970
971 let mut buf = Vec::new();
972 {
973 let mut read_buf = BytesMut::new();
974 let mut reader = PDataReader::new(&mut pdu_stream, MINIMUM_PDU_SIZE, &mut read_buf);
975 reader.read_to_end(&mut buf).unwrap();
976 }
977 assert_eq!(buf, my_data);
978 }
979
980 #[cfg(feature = "async")]
981 #[tokio::test]
982 async fn test_async_read_large_pdata_and_finish() {
983 use tokio::io::AsyncReadExt;
984
985 let presentation_context_id = 32;
986
987 let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
988 let pdata_1 = vec![PDataValue {
989 value_type: PDataValueType::Data,
990 data: my_data[0..3000].to_owned(),
991 presentation_context_id,
992 is_last: false,
993 }];
994 let pdata_2 = vec![PDataValue {
995 value_type: PDataValueType::Data,
996 data: my_data[3000..6000].to_owned(),
997 presentation_context_id,
998 is_last: false,
999 }];
1000 let pdata_3 = vec![PDataValue {
1001 value_type: PDataValueType::Data,
1002 data: my_data[6000..].to_owned(),
1003 presentation_context_id,
1004 is_last: true,
1005 }];
1006
1007 let mut pdu_stream = std::io::Cursor::new(Vec::new());
1008
1009 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
1011 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
1012 write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
1013
1014 let mut buf = Vec::new();
1015 let inner = pdu_stream.into_inner();
1016 let mut stream = tokio::io::BufReader::new(inner.as_slice());
1017 {
1018 let mut read_buf = BytesMut::new();
1019 let mut reader = PDataReader::new(&mut stream, MINIMUM_PDU_SIZE, &mut read_buf);
1020 reader.read_to_end(&mut buf).await.unwrap();
1021 }
1022 assert_eq!(buf, my_data);
1023 }
1024}