dicom_ul/association/
pdata.rs

1use std::{
2    collections::VecDeque,
3    io::{BufRead, BufReader, Cursor, Read, Write},
4};
5
6use bytes::{Buf, BytesMut};
7use tracing::warn;
8
9use crate::{
10    pdu::{LARGE_PDU_SIZE, PDU_HEADER_SIZE},
11    read_pdu, Pdu,
12};
13
14/// Set up the P-Data PDU header for sending.
15fn setup_pdata_header(buffer: &mut [u8], is_last: bool) {
16    let data_len = (buffer.len() - 12) as u32;
17
18    // full PDU length (minus PDU type and reserved byte)
19    let pdu_len = data_len + 4 + 2;
20    let pdu_len_bytes = pdu_len.to_be_bytes();
21
22    buffer[2] = pdu_len_bytes[0];
23    buffer[3] = pdu_len_bytes[1];
24    buffer[4] = pdu_len_bytes[2];
25    buffer[5] = pdu_len_bytes[3];
26
27    // presentation data length (data + 2 properties below)
28    let pdv_data_len = data_len + 2;
29    let data_len_bytes = pdv_data_len.to_be_bytes();
30
31    buffer[6] = data_len_bytes[0];
32    buffer[7] = data_len_bytes[1];
33    buffer[8] = data_len_bytes[2];
34    buffer[9] = data_len_bytes[3];
35
36    // message control header
37    buffer[11] = if is_last { 0x02 } else { 0x00 };
38}
39
40/// A P-Data value writer.
41///
42/// This exposes an API to iteratively construct and send Data messages
43/// to another node.
44/// Using this as a [standard writer](std::io::Write)
45/// will automatically split the incoming bytes
46/// into separate PDUs if they do not fit in a single one.
47///
48/// # Example
49///
50/// Use an association's `send_pdata` method
51/// to create a new P-Data value writer.
52///
53/// ```no_run
54/// # use std::io::Write;
55/// # use dicom_ul::association::ClientAssociationOptions;
56/// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
57/// # fn command_data() -> Vec<u8> { unimplemented!() }
58/// # fn dicom_data() -> &'static [u8] { unimplemented!() }
59/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
60/// let mut association = ClientAssociationOptions::new()
61///    .establish("129.168.0.5:104")?;
62///
63/// let presentation_context_id = association.presentation_contexts()[0].id;
64///
65/// // send a command first
66/// association.send(&Pdu::PData {
67///     data: vec![PDataValue {
68///     presentation_context_id,
69///     value_type: PDataValueType::Command,
70///         is_last: true,
71///         data: command_data(),
72///     }],
73/// });
74///
75/// // then send a DICOM object which may be split into multiple PDUs
76/// let mut pdata = association.send_pdata(presentation_context_id);
77/// pdata.write_all(dicom_data())?;
78/// pdata.finish()?;
79///
80/// let pdu_ac = association.receive()?;
81/// # Ok(())
82/// # }
83#[must_use]
84pub struct PDataWriter<W: Write> {
85    buffer: Vec<u8>,
86    stream: W,
87    max_data_len: u32,
88}
89
90impl<W> PDataWriter<W>
91where
92    W: Write,
93{
94    /// Construct a new P-Data value writer.
95    ///
96    /// `max_pdu_length` is the maximum value of the PDU-length property.
97    pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
98        let max_data_length = calculate_max_data_len_single(max_pdu_length);
99        let mut buffer =
100            Vec::with_capacity((max_data_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize);
101        // initial buffer set up
102        buffer.extend([
103            // PDU-type + reserved byte
104            0x04,
105            0x00,
106            // full PDU length, unknown at this point
107            0xFF,
108            0xFF,
109            0xFF,
110            0xFF,
111            // presentation data length, unknown at this point
112            0xFF,
113            0xFF,
114            0xFF,
115            0xFF,
116            // presentation context id
117            presentation_context_id,
118            // message control header, unknown at this point
119            0xFF,
120        ]);
121
122        PDataWriter {
123            stream,
124            max_data_len: max_data_length,
125            buffer,
126        }
127    }
128
129    /// Declare to have finished sending P-Data fragments,
130    /// thus emitting the last P-Data fragment PDU.
131    ///
132    /// This is also done automatically once the P-Data writer is dropped.
133    pub fn finish(mut self) -> std::io::Result<()> {
134        self.finish_impl()?;
135        Ok(())
136    }
137
138    fn finish_impl(&mut self) -> std::io::Result<()> {
139        if !self.buffer.is_empty() {
140            // send last PDU
141            setup_pdata_header(&mut self.buffer, true);
142            self.stream.write_all(&self.buffer[..])?;
143            // clear buffer so that subsequent calls to `finish_impl`
144            // do not send any more PDUs
145            self.buffer.clear();
146        }
147        Ok(())
148    }
149
150    /// Use the current state of the buffer to send new PDUs
151    ///
152    /// Pre-condition:
153    /// buffer must have enough data for one P-Data-tf PDU
154    fn dispatch_pdu(&mut self) -> std::io::Result<()> {
155        debug_assert!(self.buffer.len() >= 12);
156        // send PDU now
157        setup_pdata_header(&mut self.buffer, false);
158        self.stream.write_all(&self.buffer)?;
159
160        // back to just the header
161        self.buffer.truncate(12);
162
163        Ok(())
164    }
165}
166
167impl<W> Write for PDataWriter<W>
168where
169    W: Write,
170{
171    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
172        let total_len = self.max_data_len as usize + 12;
173        if self.buffer.len() + buf.len() <= total_len {
174            // accumulate into buffer, do nothing
175            self.buffer.extend(buf);
176            Ok(buf.len())
177        } else {
178            // fill in the rest of the buffer, send PDU,
179            // and leave out the rest for subsequent writes
180            let buf = &buf[..total_len - self.buffer.len()];
181            self.buffer.extend(buf);
182            debug_assert_eq!(self.buffer.len(), total_len);
183            self.dispatch_pdu()?;
184            Ok(buf.len())
185        }
186    }
187
188    fn flush(&mut self) -> std::io::Result<()> {
189        // do nothing
190        Ok(())
191    }
192}
193
194/// With the P-Data writer dropped,
195/// this `Drop` implementation
196/// will construct and emit the last P-Data fragment PDU
197/// if there is any data left to send.
198impl<W> Drop for PDataWriter<W>
199where
200    W: Write,
201{
202    fn drop(&mut self) {
203        let _ = self.finish_impl();
204    }
205}
206
207/// A P-Data value reader.
208///
209/// This exposes an API which provides a byte stream of data
210/// by iteratively collecting Data messages from another node.
211/// Using this as a [standard reader](std::io::Read)
212/// will provide all incoming bytes,
213/// even if they reside in separate PDUs,
214/// until the last message is received.
215///
216/// # Example
217///
218/// Use an association's `receive_pdata` method
219/// to create a new P-Data value reader.
220///
221/// ```no_run
222/// # use std::io::Read;
223/// # use dicom_ul::association::ClientAssociationOptions;
224/// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
225/// # fn command_data() -> Vec<u8> { unimplemented!() }
226/// # fn dicom_data() -> &'static [u8] { unimplemented!() }
227/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
228/// # let mut association = ClientAssociationOptions::new()
229/// #    .establish("129.168.0.5:104")?;
230///
231/// // expecting a DICOM object which may be split into multiple PDUs,
232/// let mut pdata = association.receive_pdata();
233/// let all_pdata_bytes = {
234///     let mut v = Vec::new();
235///     pdata.read_to_end(&mut v)?;
236///     v
237/// };
238/// # Ok(())
239/// # }
240/// ```
241#[must_use]
242pub struct PDataReader<'a, R> {
243    buffer: VecDeque<u8>,
244    stream: R,
245    presentation_context_id: Option<u8>,
246    max_data_length: u32,
247    last_pdu: bool,
248    read_buffer: &'a mut BytesMut,
249}
250
251impl<'a, R> PDataReader<'a, R> {
252    pub fn new(stream: R, max_data_length: u32, remaining: &'a mut BytesMut) -> Self {
253        PDataReader {
254            buffer: VecDeque::with_capacity(max_data_length.min(LARGE_PDU_SIZE) as usize),
255            stream,
256            presentation_context_id: None,
257            max_data_length,
258            last_pdu: false,
259            read_buffer: remaining,
260        }
261    }
262
263    /// Declare no intention to read more PDUs from the remote node.
264    ///
265    /// Attempting to read more bytes
266    /// will only consume the inner buffer and not result in
267    /// more PDUs being received.
268    pub fn stop_receiving(&mut self) -> std::io::Result<()> {
269        self.last_pdu = true;
270        Ok(())
271    }
272}
273
274impl<R> Read for PDataReader<'_, R>
275where
276    R: Read,
277{
278    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
279        if self.buffer.is_empty() {
280            if self.last_pdu {
281                // reached the end of PData stream
282                return Ok(0);
283            }
284
285            let mut reader = BufReader::new(&mut self.stream);
286            let msg = loop {
287                let mut buf = Cursor::new(&self.read_buffer[..]);
288                match read_pdu(&mut buf, self.max_data_length, false)
289                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
290                {
291                    Some(pdu) => {
292                        self.read_buffer.advance(buf.position() as usize);
293                        break pdu;
294                    }
295                    None => {
296                        // Reset position
297                        buf.set_position(0)
298                    }
299                }
300                let recv = reader.fill_buf()?.to_vec();
301                reader.consume(recv.len());
302                self.read_buffer.extend_from_slice(&recv);
303                if recv.is_empty() {
304                    return Err(std::io::Error::new(
305                        std::io::ErrorKind::Other,
306                        "Connection closed by peer",
307                    ));
308                }
309            };
310
311            match msg {
312                Pdu::PData { data } => {
313                    for pdata_value in data {
314                        self.presentation_context_id = match self.presentation_context_id {
315                            None => Some(pdata_value.presentation_context_id),
316                            Some(cid) if cid == pdata_value.presentation_context_id => Some(cid),
317                            Some(cid) => {
318                                warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
319                                Some(cid)
320                            }
321                        };
322                        self.buffer.extend(pdata_value.data);
323                        self.last_pdu = pdata_value.is_last;
324                    }
325                }
326                _ => {
327                    return Err(std::io::Error::new(
328                        std::io::ErrorKind::UnexpectedEof,
329                        "Unexpected PDU type",
330                    ))
331                }
332            }
333        }
334        Read::read(&mut self.buffer, buf)
335    }
336}
337
338/// Determine the maximum length of actual PDV data
339/// when encapsulated in a PDU with the given length property.
340/// Does not account for the first 2 bytes (type + reserved).
341#[inline]
342fn calculate_max_data_len_single(pdu_len: u32) -> u32 {
343    // data length: 4 bytes
344    // control header: 2 bytes
345    pdu_len - 4 - 2
346}
347
348#[cfg(feature = "async")]
349pub mod non_blocking {
350    use std::{
351        io::Cursor,
352        pin::Pin,
353        task::{ready, Context, Poll},
354    };
355
356    use bytes::{Buf, BufMut};
357    use tokio::io::{
358        AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf,
359    };
360    use tracing::warn;
361
362    use crate::{pdu::PDU_HEADER_SIZE, read_pdu, Pdu};
363
364    pub use super::PDataReader;
365    use super::{calculate_max_data_len_single, setup_pdata_header};
366
367    /// Enum representing state of the Async Writer
368    enum WriteState {
369        // Ready to write to the underlying stream
370        Ready,
371        // Currently writing to underlying stream, with a position in the buffer
372        Writing(usize),
373    }
374
375    /// A P-Data async value writer.
376    ///
377    /// This exposes an API to iteratively construct and send Data messages
378    /// to another node.
379    /// Using this as a [standard writer](std::io::Write)
380    /// will automatically split the incoming bytes
381    /// into separate PDUs if they do not fit in a single one.
382    ///
383    /// # Example
384    ///
385    /// Use an association's `send_pdata` method
386    /// to create a new P-Data value writer.
387    ///
388    /// ```no_run
389    /// # use std::io::Write;
390    /// use tokio::io::AsyncWriteExt;
391    /// # use dicom_ul::association::ClientAssociationOptions;
392    /// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
393    /// # fn command_data() -> Vec<u8> { unimplemented!() }
394    /// # fn dicom_data() -> &'static [u8] { unimplemented!() }
395    /// #[tokio::main]
396    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
397    /// let mut association = ClientAssociationOptions::new()
398    ///    .establish_async("129.168.0.5:104")
399    ///    .await?;
400    ///
401    /// let presentation_context_id = association.presentation_contexts()[0].id;
402    ///
403    /// // send a command first
404    /// association.send(&Pdu::PData {
405    ///     data: vec![PDataValue {
406    ///     presentation_context_id,
407    ///     value_type: PDataValueType::Command,
408    ///         is_last: true,
409    ///         data: command_data(),
410    ///     }],
411    /// }).await;
412    ///
413    /// // then send a DICOM object which may be split into multiple PDUs
414    /// let mut pdata = association.send_pdata(presentation_context_id).await;
415    /// pdata.write_all(dicom_data()).await?;
416    /// pdata.finish().await?;
417    ///
418    /// let pdu_ac = association.receive().await?;
419    /// # Ok(())
420    /// # }
421    /// ```
422    #[must_use]
423    pub struct AsyncPDataWriter<W: AsyncWrite + Unpin> {
424        buffer: Vec<u8>,
425        stream: W,
426        max_data_len: u32,
427        state: WriteState,
428    }
429
430    #[cfg(feature = "async")]
431    impl<W> AsyncPDataWriter<W>
432    where
433        W: AsyncWrite + Unpin,
434    {
435        /// Construct a new P-Data value writer.
436        ///
437        /// `max_pdu_length` is the maximum value of the PDU-length property.
438        pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
439            use crate::pdu::LARGE_PDU_SIZE;
440
441            let max_data_length = calculate_max_data_len_single(max_pdu_length);
442            let mut buffer = Vec::with_capacity(
443                (max_data_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
444            );
445            // initial buffer set up
446            buffer.extend([
447                // PDU-type + reserved byte
448                0x04,
449                0x00,
450                // full PDU length, unknown at this point
451                0xFF,
452                0xFF,
453                0xFF,
454                0xFF,
455                // presentation data length, unknown at this point
456                0xFF,
457                0xFF,
458                0xFF,
459                0xFF,
460                // presentation context id
461                presentation_context_id,
462                // message control header, unknown at this point
463                0xFF,
464            ]);
465
466            AsyncPDataWriter {
467                stream,
468                max_data_len: max_data_length,
469                buffer,
470                state: WriteState::Ready,
471            }
472        }
473
474        /// Declare to have finished sending P-Data fragments,
475        /// thus emitting the last P-Data fragment PDU.
476        ///
477        /// This is also done automatically once the P-Data writer is dropped.
478        pub async fn finish(mut self) -> std::io::Result<()> {
479            self.finish_impl().await?;
480            Ok(())
481        }
482
483        async fn finish_impl(&mut self) -> std::io::Result<()> {
484            if !self.buffer.is_empty() {
485                // send last PDU
486                setup_pdata_header(&mut self.buffer, true);
487                if let Err(e) = self.stream.write_all(&self.buffer[..]).await {
488                    println!("Error: {e:?}");
489                }
490                // clear buffer so that subsequent calls to `finish_impl`
491                // do not send any more PDUs
492                self.buffer.clear();
493            }
494            Ok(())
495        }
496    }
497
498    #[cfg(feature = "async")]
499    impl<W> AsyncWrite for AsyncPDataWriter<W>
500    where
501        W: AsyncWrite + Unpin,
502    {
503        fn poll_write(
504            mut self: Pin<&mut Self>,
505            cx: &mut Context<'_>,
506            buf: &[u8],
507        ) -> Poll<std::result::Result<usize, std::io::Error>> {
508            // Each call to `poll_write` on the underlying stream may or may not
509            // write the whole of `self.buffer`, therefore we need to keep track
510            // of how much we've written, this is done in `self.state`
511            match self.state {
512                WriteState::Ready => {
513                    // If we're in ready state, we can prepare another PDU
514                    let total_len = self.max_data_len as usize + 12;
515                    if self.buffer.len() + buf.len() <= total_len {
516                        // Still have space in `self.buffer`, accumulate into buffer
517                        self.buffer.extend(buf);
518                        Poll::Ready(Ok(buf.len()))
519                    } else {
520                        // `self.buffer` is full, fill in the rest of the
521                        // buffer, prepare to send PDU
522                        let slice = &buf[..total_len - self.buffer.len()];
523                        self.buffer.extend(slice);
524                        debug_assert_eq!(self.buffer.len(), total_len);
525                        setup_pdata_header(&mut self.buffer, false);
526                        let this = self.get_mut();
527                        // Attempt to send PDU on wire
528                        match Pin::new(&mut this.stream).poll_write(cx, &this.buffer) {
529                            Poll::Ready(Ok(n)) => {
530                                if n == this.buffer.len() {
531                                    // If we wrote the whole buffer, reset `self.buffer`
532                                    this.buffer.truncate(12);
533                                    Poll::Ready(Ok(slice.len()))
534                                } else {
535                                    // Otherwise keep track of how much we wrote and change state to Writing
536                                    this.state = WriteState::Writing(n);
537                                    Poll::Pending
538                                }
539                            }
540                            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
541                            Poll::Pending => {
542                                // Nothing was written yet, change state to writing at position 0
543                                this.state = WriteState::Writing(0);
544                                Poll::Pending
545                            }
546                        }
547                    }
548                }
549                WriteState::Writing(pos) => {
550                    // Continue writing to stream from current position
551                    let buflen = self.buffer.len();
552                    let this = self.get_mut();
553                    match Pin::new(&mut this.stream).poll_write(cx, &this.buffer[pos..]) {
554                        Poll::Ready(Ok(n)) => {
555                            if (n + pos) == this.buffer.len() {
556                                // If we wrote the whole buffer, reset `self.buffer` and change state back to ready
557                                this.buffer.truncate(12);
558                                this.state = WriteState::Ready;
559                                Poll::Ready(Ok(buflen - 12))
560                            } else {
561                                // Otherwise add to current position
562                                this.state = WriteState::Writing(n + pos);
563                                Poll::Pending
564                            }
565                        }
566                        Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
567                        Poll::Pending => Poll::Pending,
568                    }
569                }
570            }
571        }
572
573        fn poll_flush(
574            mut self: Pin<&mut Self>,
575            cx: &mut Context<'_>,
576        ) -> Poll<std::result::Result<(), std::io::Error>> {
577            Pin::new(&mut self.stream).poll_flush(cx)
578        }
579
580        fn poll_shutdown(
581            mut self: Pin<&mut Self>,
582            cx: &mut Context<'_>,
583        ) -> Poll<std::result::Result<(), std::io::Error>> {
584            Pin::new(&mut self.stream).poll_shutdown(cx)
585        }
586    }
587
588    /// With the P-Data writer dropped,
589    /// this `Drop` implementation
590    /// will construct and emit the last P-Data fragment PDU
591    /// if there is any data left to send.
592    impl<W> Drop for AsyncPDataWriter<W>
593    where
594        W: AsyncWrite + Unpin,
595    {
596        fn drop(&mut self) {
597            tokio::task::block_in_place(move || {
598                tokio::runtime::Handle::current().block_on(async move {
599                    let _ = self.finish_impl().await;
600                })
601            })
602        }
603    }
604
605    impl<R> AsyncRead for PDataReader<'_, R>
606    where
607        R: AsyncRead + Unpin,
608    {
609        fn poll_read(
610            mut self: Pin<&mut Self>,
611            cx: &mut Context<'_>,
612            buf: &mut ReadBuf,
613        ) -> Poll<std::io::Result<()>> {
614            if self.buffer.is_empty() {
615                if self.last_pdu {
616                    return Poll::Ready(Ok(()));
617                }
618                let Self {
619                    ref mut stream,
620                    ref mut read_buffer,
621                    ref max_data_length,
622                    ..
623                } = &mut *self;
624                let mut reader = BufReader::new(stream);
625                let msg = loop {
626                    let mut buf = Cursor::new(&read_buffer[..]);
627                    match read_pdu(&mut buf, *max_data_length, false)
628                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
629                    {
630                        Some(pdu) => {
631                            read_buffer.advance(buf.position() as usize);
632                            break pdu;
633                        }
634                        None => {
635                            // Reset position
636                            buf.set_position(0)
637                        }
638                    }
639                    let recv = ready!(Pin::new(&mut reader).poll_fill_buf(cx))?.to_vec();
640                    reader.consume(recv.len());
641                    read_buffer.extend_from_slice(&recv);
642                    if recv.is_empty() {
643                        return Poll::Ready(Err(std::io::Error::new(
644                            std::io::ErrorKind::Other,
645                            "Connection closed by peer",
646                        )));
647                    }
648                };
649                match msg {
650                    Pdu::PData { data } => {
651                        for pdata_value in data {
652                            self.presentation_context_id = match self.presentation_context_id {
653                                None => Some(pdata_value.presentation_context_id),
654                                Some(cid) if cid == pdata_value.presentation_context_id => {
655                                    Some(cid)
656                                }
657                                Some(cid) => {
658                                    warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
659                                    Some(cid)
660                                }
661                            };
662                            self.buffer.extend(pdata_value.data);
663                            self.last_pdu = pdata_value.is_last;
664                        }
665                    }
666                    _ => {
667                        return Poll::Ready(Err(std::io::Error::new(
668                            std::io::ErrorKind::UnexpectedEof,
669                            "Unexpected PDU type",
670                        )))
671                    }
672                }
673            }
674            let len = std::cmp::min(self.buffer.len(), buf.remaining());
675            for _ in 0..len {
676                buf.put_u8(self.buffer.pop_front().unwrap());
677            }
678            Poll::Ready(Ok(()))
679        }
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use std::io::{Read, Write};
686
687    use crate::association::pdata::PDataWriter;
688    use crate::pdu::{read_pdu, Pdu, MINIMUM_PDU_SIZE, PDU_HEADER_SIZE};
689    use crate::pdu::{PDataValue, PDataValueType};
690    use crate::write_pdu;
691
692    use super::PDataReader;
693
694    use bytes::BytesMut;
695    #[cfg(feature = "async")]
696    use tokio::io::AsyncWriteExt;
697
698    #[cfg(feature = "async")]
699    use crate::association::pdata::non_blocking::AsyncPDataWriter;
700
701    #[test]
702    fn test_write_pdata_and_finish() {
703        let presentation_context_id = 12;
704
705        let mut buf = Vec::new();
706        {
707            let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
708            writer.write_all(&(0..64).collect::<Vec<u8>>()).unwrap();
709            writer.finish().unwrap();
710        }
711
712        let mut cursor = &buf[..];
713        let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
714
715        // concatenate data chunks, compare with all data
716
717        match same_pdu {
718            Some(Pdu::PData { data: data_1 }) => {
719                let data_1 = &data_1[0];
720
721                // check that this PDU is consistent
722                assert_eq!(data_1.value_type, PDataValueType::Data);
723                assert_eq!(data_1.presentation_context_id, presentation_context_id);
724                assert_eq!(data_1.data.len(), 64);
725                assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
726            }
727            pdu => panic!("Expected PData, got {:?}", pdu),
728        }
729
730        assert_eq!(cursor.len(), 0);
731    }
732
733    #[cfg(feature = "async")]
734    #[tokio::test(flavor = "multi_thread")]
735    async fn test_async_write_pdata_and_finish() {
736        let presentation_context_id = 12;
737
738        let mut buf = Vec::new();
739        {
740            let mut writer =
741                AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
742            writer
743                .write_all(&(0..64).collect::<Vec<u8>>())
744                .await
745                .unwrap();
746            writer.finish().await.unwrap();
747        }
748
749        let mut cursor = &buf[..];
750        let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
751
752        // concatenate data chunks, compare with all data
753
754        match same_pdu {
755            Some(Pdu::PData { data: data_1 }) => {
756                let data_1 = &data_1[0];
757
758                // check that this PDU is consistent
759                assert_eq!(data_1.value_type, PDataValueType::Data);
760                assert_eq!(data_1.presentation_context_id, presentation_context_id);
761                assert_eq!(data_1.data.len(), 64);
762                assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
763            }
764            pdu => panic!("Expected PData, got {:?}", pdu),
765        }
766
767        assert_eq!(cursor.len(), 0);
768    }
769
770    #[test]
771    fn test_write_large_pdata_and_finish() {
772        let presentation_context_id = 32;
773
774        let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
775
776        let mut buf = Vec::new();
777        {
778            let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
779            writer.write_all(&my_data).unwrap();
780            writer.finish().unwrap();
781        }
782
783        let mut cursor = &buf[..];
784        let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
785        let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
786        let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
787
788        // concatenate data chunks, compare with all data
789
790        match (pdu_1, pdu_2, pdu_3) {
791            (
792                Some(Pdu::PData { data: data_1 }),
793                Some(Pdu::PData { data: data_2 }),
794                Some(Pdu::PData { data: data_3 }),
795            ) => {
796                assert_eq!(data_1.len(), 1);
797                let data_1 = &data_1[0];
798                assert_eq!(data_2.len(), 1);
799                let data_2 = &data_2[0];
800                assert_eq!(data_3.len(), 1);
801                let data_3 = &data_3[0];
802
803                // check that these two PDUs are consistent
804                assert_eq!(data_1.value_type, PDataValueType::Data);
805                assert_eq!(data_2.value_type, PDataValueType::Data);
806                assert_eq!(data_1.presentation_context_id, presentation_context_id);
807                assert_eq!(data_2.presentation_context_id, presentation_context_id);
808
809                // check expected lengths
810                assert_eq!(
811                    data_1.data.len(),
812                    (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
813                );
814                assert_eq!(
815                    data_2.data.len(),
816                    (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
817                );
818                assert_eq!(data_3.data.len(), 820);
819
820                // check data consistency
821                assert_eq!(
822                    &data_1.data[..],
823                    (0..MINIMUM_PDU_SIZE - PDU_HEADER_SIZE)
824                        .map(|x| x as u8)
825                        .collect::<Vec<_>>()
826                );
827                assert_eq!(
828                    data_1.data.len() + data_2.data.len() + data_3.data.len(),
829                    9000
830                );
831
832                let data_1 = &data_1.data;
833                let data_2 = &data_2.data;
834                let data_3 = &data_3.data;
835
836                let mut all_data: Vec<u8> = Vec::new();
837                all_data.extend(data_1);
838                all_data.extend(data_2);
839                all_data.extend(data_3);
840                assert_eq!(all_data, my_data);
841            }
842            x => panic!("Expected 3 PDatas, got {:?}", x),
843        }
844
845        assert_eq!(cursor.len(), 0);
846    }
847
848    #[cfg(feature = "async")]
849    #[tokio::test(flavor = "multi_thread")]
850    async fn test_async_write_large_pdata_and_finish() {
851        let presentation_context_id = 32;
852
853        let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
854
855        let mut buf = Vec::new();
856        {
857            let mut writer =
858                AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
859            writer.write_all(&my_data).await.unwrap();
860            writer.finish().await.unwrap();
861        }
862
863        let mut cursor = &buf[..];
864        let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
865        let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
866        let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
867
868        // concatenate data chunks, compare with all data
869
870        match (pdu_1, pdu_2, pdu_3) {
871            (
872                Some(Pdu::PData { data: data_1 }),
873                Some(Pdu::PData { data: data_2 }),
874                Some(Pdu::PData { data: data_3 }),
875            ) => {
876                assert_eq!(data_1.len(), 1);
877                let data_1 = &data_1[0];
878                assert_eq!(data_2.len(), 1);
879                let data_2 = &data_2[0];
880                assert_eq!(data_3.len(), 1);
881                let data_3 = &data_3[0];
882
883                // check that these two PDUs are consistent
884                assert_eq!(data_1.value_type, PDataValueType::Data);
885                assert_eq!(data_2.value_type, PDataValueType::Data);
886                assert_eq!(data_1.presentation_context_id, presentation_context_id);
887                assert_eq!(data_2.presentation_context_id, presentation_context_id);
888
889                // check expected lengths
890                assert_eq!(
891                    data_1.data.len(),
892                    (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
893                );
894                assert_eq!(
895                    data_2.data.len(),
896                    (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
897                );
898                assert_eq!(data_3.data.len(), 820);
899
900                // check data consistency
901                assert_eq!(
902                    &data_1.data[..],
903                    (0..MINIMUM_PDU_SIZE - PDU_HEADER_SIZE)
904                        .map(|x| x as u8)
905                        .collect::<Vec<_>>()
906                );
907                assert_eq!(
908                    data_1.data.len() + data_2.data.len() + data_3.data.len(),
909                    9000
910                );
911
912                let data_1 = &data_1.data;
913                let data_2 = &data_2.data;
914                let data_3 = &data_3.data;
915
916                let mut all_data: Vec<u8> = Vec::new();
917                all_data.extend(data_1);
918                all_data.extend(data_2);
919                all_data.extend(data_3);
920                assert_eq!(all_data, my_data);
921            }
922            x => panic!("Expected 3 PDatas, got {:?}", x),
923        }
924
925        assert_eq!(cursor.len(), 0);
926    }
927
928    #[test]
929    fn test_read_large_pdata_and_finish() {
930        use std::collections::VecDeque;
931        let presentation_context_id = 32;
932
933        let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
934        let pdata_1 = vec![PDataValue {
935            value_type: PDataValueType::Data,
936            data: my_data[0..3000].to_owned(),
937            presentation_context_id,
938            is_last: false,
939        }];
940        let pdata_2 = vec![PDataValue {
941            value_type: PDataValueType::Data,
942            data: my_data[3000..6000].to_owned(),
943            presentation_context_id,
944            is_last: false,
945        }];
946        let pdata_3 = vec![PDataValue {
947            value_type: PDataValueType::Data,
948            data: my_data[6000..].to_owned(),
949            presentation_context_id,
950            is_last: true,
951        }];
952
953        let mut pdu_stream = VecDeque::new();
954
955        // write some PDUs
956        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
957        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
958        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
959
960        let mut buf = Vec::new();
961        {
962            let mut read_buf = BytesMut::new();
963            let mut reader = PDataReader::new(&mut pdu_stream, MINIMUM_PDU_SIZE, &mut read_buf);
964            reader.read_to_end(&mut buf).unwrap();
965        }
966        assert_eq!(buf, my_data);
967    }
968
969    #[cfg(feature = "async")]
970    #[tokio::test]
971    async fn test_async_read_large_pdata_and_finish() {
972        use tokio::io::AsyncReadExt;
973
974        let presentation_context_id = 32;
975
976        let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
977        let pdata_1 = vec![PDataValue {
978            value_type: PDataValueType::Data,
979            data: my_data[0..3000].to_owned(),
980            presentation_context_id,
981            is_last: false,
982        }];
983        let pdata_2 = vec![PDataValue {
984            value_type: PDataValueType::Data,
985            data: my_data[3000..6000].to_owned(),
986            presentation_context_id,
987            is_last: false,
988        }];
989        let pdata_3 = vec![PDataValue {
990            value_type: PDataValueType::Data,
991            data: my_data[6000..].to_owned(),
992            presentation_context_id,
993            is_last: true,
994        }];
995
996        let mut pdu_stream = std::io::Cursor::new(Vec::new());
997
998        // write some PDUs
999        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
1000        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
1001        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
1002
1003        let mut buf = Vec::new();
1004        let inner = pdu_stream.into_inner();
1005        let mut stream = tokio::io::BufReader::new(inner.as_slice());
1006        {
1007            let mut read_buf = BytesMut::new();
1008            let mut reader = PDataReader::new(&mut stream, MINIMUM_PDU_SIZE, &mut read_buf);
1009            reader.read_to_end(&mut buf).await.unwrap();
1010        }
1011        assert_eq!(buf, my_data);
1012    }
1013}