Skip to main content

dicom_ul/association/
pdata.rs

1use std::{
2    collections::VecDeque,
3    io::{BufRead, BufReader, Cursor, Read, Write},
4};
5
6use bytes::{Buf, BytesMut};
7use tracing::warn;
8
9use crate::{
10    pdu::{LARGE_PDU_SIZE, PDU_HEADER_SIZE, PDV_HEADER_SIZE},
11    read_pdu, Pdu,
12};
13
14/// Combined size of PDU header and one PDV header, as usize, for convenience
15const PDU_PDV_HEADER_SIZE: usize = (PDU_HEADER_SIZE + PDV_HEADER_SIZE) as usize;
16
17/// Set up the P-Data PDU header for sending.
18fn setup_pdata_header(buffer: &mut [u8], is_last: bool) {
19    let data_len = (buffer.len() - PDU_PDV_HEADER_SIZE) as u32;
20
21    // full PDU length (minus PDU type and reserved byte)
22    let pdu_len = data_len + 4 + 2;
23    let pdu_len_bytes = pdu_len.to_be_bytes();
24
25    buffer[2] = pdu_len_bytes[0];
26    buffer[3] = pdu_len_bytes[1];
27    buffer[4] = pdu_len_bytes[2];
28    buffer[5] = pdu_len_bytes[3];
29
30    // presentation data length (data + 2 properties below)
31    let pdv_data_len = data_len + 2;
32    let data_len_bytes = pdv_data_len.to_be_bytes();
33
34    buffer[6] = data_len_bytes[0];
35    buffer[7] = data_len_bytes[1];
36    buffer[8] = data_len_bytes[2];
37    buffer[9] = data_len_bytes[3];
38
39    // message control header
40    buffer[11] = if is_last { 0x02 } else { 0x00 };
41}
42
43/// A P-Data value writer.
44///
45/// This exposes an API to iteratively construct and send Data messages
46/// to another node.
47/// Using this as a [standard writer](std::io::Write)
48/// will automatically split the incoming bytes
49/// into separate PDUs if they do not fit in a single one.
50///
51/// # Example
52///
53/// Use an association's `send_pdata` method
54/// to create a new P-Data value writer.
55///
56/// ```no_run
57/// # use std::io::Write;
58/// # use dicom_ul::association::{ClientAssociationOptions, Association, SyncAssociation};
59/// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
60/// # fn command_data() -> Vec<u8> { unimplemented!() }
61/// # fn dicom_data() -> &'static [u8] { unimplemented!() }
62/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
63/// let mut association = ClientAssociationOptions::new()
64///    .establish("129.168.0.5:104")?;
65///
66/// let presentation_context_id = association.presentation_contexts()[0].id;
67///
68/// // send a command first
69/// association.send(&Pdu::PData {
70///     data: vec![PDataValue {
71///     presentation_context_id,
72///     value_type: PDataValueType::Command,
73///         is_last: true,
74///         data: command_data(),
75///     }],
76/// });
77///
78/// // then send a DICOM object which may be split into multiple PDUs
79/// let mut pdata = association.send_pdata(presentation_context_id);
80/// pdata.write_all(dicom_data())?;
81/// pdata.finish()?;
82///
83/// let pdu_ac = association.receive()?;
84/// # Ok(())
85/// # }
86#[must_use]
87pub struct PDataWriter<W: Write> {
88    buffer: Vec<u8>,
89    stream: W,
90    max_data_len: u32,
91}
92
93impl<W> PDataWriter<W>
94where
95    W: Write,
96{
97    /// Construct a new P-Data value writer.
98    ///
99    /// `max_pdu_length` is the maximum value of the PDU-length property.
100    pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
101        let max_data_length = calculate_max_data_len_single(max_pdu_length);
102        let mut buffer =
103            Vec::with_capacity((max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize);
104        // initial buffer set up
105        buffer.extend([
106            // PDU-type + reserved byte
107            0x04,
108            0x00,
109            // full PDU length, unknown at this point
110            0xFF,
111            0xFF,
112            0xFF,
113            0xFF,
114            // presentation data length, unknown at this point
115            0xFF,
116            0xFF,
117            0xFF,
118            0xFF,
119            // presentation context id
120            presentation_context_id,
121            // message control header, unknown at this point
122            0xFF,
123        ]);
124
125        PDataWriter {
126            stream,
127            max_data_len: max_data_length,
128            buffer,
129        }
130    }
131
132    /// Declare to have finished sending P-Data fragments,
133    /// thus emitting the last P-Data fragment PDU.
134    ///
135    /// This is also done automatically once the P-Data writer is dropped.
136    pub fn finish(mut self) -> std::io::Result<()> {
137        self.finish_impl()?;
138        Ok(())
139    }
140
141    fn finish_impl(&mut self) -> std::io::Result<()> {
142        if !self.buffer.is_empty() {
143            // send last PDU
144            setup_pdata_header(&mut self.buffer, true);
145            self.stream.write_all(&self.buffer[..])?;
146            // clear buffer so that subsequent calls to `finish_impl`
147            // do not send any more PDUs
148            self.buffer.clear();
149        }
150        Ok(())
151    }
152
153    /// Use the current state of the buffer to send new PDUs
154    ///
155    /// Pre-condition:
156    /// buffer must have enough data for one P-Data-tf PDU
157    fn dispatch_pdu(&mut self) -> std::io::Result<()> {
158        debug_assert!(self.buffer.len() >= PDU_PDV_HEADER_SIZE);
159        // send PDU now
160        setup_pdata_header(&mut self.buffer, false);
161        self.stream.write_all(&self.buffer)?;
162
163        // back to just the header
164        self.buffer.truncate(PDU_PDV_HEADER_SIZE);
165
166        Ok(())
167    }
168}
169
170impl<W> Write for PDataWriter<W>
171where
172    W: Write,
173{
174    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
175        let total_len = self.max_data_len as usize + PDU_PDV_HEADER_SIZE;
176        if self.buffer.len() + buf.len() <= total_len {
177            // accumulate into buffer, do nothing
178            self.buffer.extend(buf);
179            Ok(buf.len())
180        } else {
181            // fill in the rest of the buffer, send PDU,
182            // and leave out the rest for subsequent writes
183            let buf = &buf[..total_len - self.buffer.len()];
184            self.buffer.extend(buf);
185            debug_assert_eq!(self.buffer.len(), total_len);
186            self.dispatch_pdu()?;
187            Ok(buf.len())
188        }
189    }
190
191    fn flush(&mut self) -> std::io::Result<()> {
192        // do nothing
193        Ok(())
194    }
195}
196
197/// With the P-Data writer dropped,
198/// this `Drop` implementation
199/// will construct and emit the last P-Data fragment PDU
200/// if there is any data left to send.
201impl<W> Drop for PDataWriter<W>
202where
203    W: Write,
204{
205    fn drop(&mut self) {
206        let _ = self.finish_impl();
207    }
208}
209
210/// A P-Data value reader.
211///
212/// This exposes an API which provides a byte stream of data
213/// by iteratively collecting Data messages from another node.
214/// Using this as a [standard reader](std::io::Read)
215/// will provide all incoming bytes,
216/// even if they reside in separate PDUs,
217/// until the last message is received.
218///
219/// # Example
220///
221/// Use an association's `receive_pdata` method
222/// to create a new P-Data value reader.
223///
224/// ```no_run
225/// # use std::io::Read;
226/// # use dicom_ul::association::{ClientAssociationOptions, SyncAssociation};
227/// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
228/// # fn command_data() -> Vec<u8> { unimplemented!() }
229/// # fn dicom_data() -> &'static [u8] { unimplemented!() }
230/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
231/// # let mut association = ClientAssociationOptions::new()
232/// #    .establish("129.168.0.5:104")?;
233///
234/// // expecting a DICOM object which may be split into multiple PDUs,
235/// let mut pdata = association.receive_pdata();
236/// let all_pdata_bytes = {
237///     let mut v = Vec::new();
238///     pdata.read_to_end(&mut v)?;
239///     v
240/// };
241/// # Ok(())
242/// # }
243/// ```
244#[must_use]
245pub struct PDataReader<'a, R> {
246    buffer: VecDeque<u8>,
247    stream: R,
248    presentation_context_id: Option<u8>,
249    max_data_length: u32,
250    last_pdu: bool,
251    read_buffer: &'a mut BytesMut,
252}
253
254impl<'a, R> PDataReader<'a, R> {
255    pub fn new(stream: R, max_data_length: u32, remaining: &'a mut BytesMut) -> Self {
256        PDataReader {
257            buffer: VecDeque::with_capacity(max_data_length.min(LARGE_PDU_SIZE) as usize),
258            stream,
259            presentation_context_id: None,
260            max_data_length,
261            last_pdu: false,
262            read_buffer: remaining,
263        }
264    }
265
266    /// Declare no intention to read more PDUs from the remote node.
267    ///
268    /// Attempting to read more bytes
269    /// will only consume the inner buffer and not result in
270    /// more PDUs being received.
271    pub fn stop_receiving(&mut self) -> std::io::Result<()> {
272        self.last_pdu = true;
273        Ok(())
274    }
275}
276
277impl<R> Read for PDataReader<'_, R>
278where
279    R: Read,
280{
281    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282        if self.buffer.is_empty() {
283            if self.last_pdu {
284                // reached the end of PData stream
285                return Ok(0);
286            }
287
288            let mut reader = BufReader::new(&mut self.stream);
289            let msg = loop {
290                let mut buf = Cursor::new(&self.read_buffer[..]);
291                match read_pdu(&mut buf, self.max_data_length, false)
292                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
293                {
294                    Some(pdu) => {
295                        self.read_buffer.advance(buf.position() as usize);
296                        break pdu;
297                    }
298                    None => {
299                        // Reset position
300                        buf.set_position(0)
301                    }
302                }
303                let recv = reader.fill_buf()?.to_vec();
304                reader.consume(recv.len());
305                self.read_buffer.extend_from_slice(&recv);
306                if recv.is_empty() {
307                    return Err(std::io::Error::new(
308                        std::io::ErrorKind::Other,
309                        "Connection closed by peer",
310                    ));
311                }
312            };
313
314            match msg {
315                Pdu::PData { data } => {
316                    for pdata_value in data {
317                        self.presentation_context_id = match self.presentation_context_id {
318                            None => Some(pdata_value.presentation_context_id),
319                            Some(cid) if cid == pdata_value.presentation_context_id => Some(cid),
320                            Some(cid) => {
321                                warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
322                                Some(cid)
323                            }
324                        };
325                        self.buffer.extend(pdata_value.data);
326                        self.last_pdu = pdata_value.is_last;
327                    }
328                }
329                _ => {
330                    return Err(std::io::Error::new(
331                        std::io::ErrorKind::UnexpectedEof,
332                        "Unexpected PDU type",
333                    ))
334                }
335            }
336        }
337        Read::read(&mut self.buffer, buf)
338    }
339}
340
341/// Determine the maximum length of actual PDV data
342/// when encapsulated in a PDU with the given length property.
343/// Does not account for the first 2 bytes (type + reserved).
344#[inline]
345fn calculate_max_data_len_single(pdu_len: u32) -> u32 {
346    pdu_len - PDV_HEADER_SIZE
347}
348
349#[cfg(feature = "async")]
350pub mod non_blocking {
351    use std::{
352        io::Cursor,
353        pin::Pin,
354        task::{ready, Context, Poll},
355    };
356
357    use bytes::{Buf, BufMut};
358    use tokio::io::{
359        AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf,
360    };
361    use tracing::warn;
362
363    use crate::{
364        pdu::{PDU_HEADER_SIZE, PDV_HEADER_SIZE},
365        read_pdu, Pdu,
366    };
367
368    pub use super::PDataReader;
369    use super::{calculate_max_data_len_single, setup_pdata_header};
370
371    const PDU_PDV_HEADER_SIZE: usize = (PDU_HEADER_SIZE + PDV_HEADER_SIZE) as usize;
372
373    /// Enum representing state of the Async Writer
374    enum WriteState {
375        // Ready to write to the underlying stream
376        Ready,
377        // Currently writing to underlying stream, with a position in the buffer
378        Writing(usize),
379    }
380
381    /// A P-Data async value writer.
382    ///
383    /// This exposes an API to iteratively construct and send Data messages
384    /// to another node.
385    /// Using this as a [standard writer](std::io::Write)
386    /// will automatically split the incoming bytes
387    /// into separate PDUs if they do not fit in a single one.
388    ///
389    /// # Example
390    ///
391    /// Use an association's `send_pdata` method
392    /// to create a new P-Data value writer.
393    ///
394    /// ```no_run
395    /// # use std::io::Write;
396    /// use tokio::io::AsyncWriteExt;
397    /// # use dicom_ul::association::{ClientAssociationOptions, Association, AsyncAssociation};
398    /// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
399    /// # fn command_data() -> Vec<u8> { unimplemented!() }
400    /// # fn dicom_data() -> &'static [u8] { unimplemented!() }
401    /// #[tokio::main]
402    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
403    /// let mut association = ClientAssociationOptions::new()
404    ///    .establish_async("129.168.0.5:104")
405    ///    .await?;
406    ///
407    /// let presentation_context_id = association.presentation_contexts()[0].id;
408    ///
409    /// // send a command first
410    /// association.send(&Pdu::PData {
411    ///     data: vec![PDataValue {
412    ///     presentation_context_id,
413    ///     value_type: PDataValueType::Command,
414    ///         is_last: true,
415    ///         data: command_data(),
416    ///     }],
417    /// }).await;
418    ///
419    /// // then send a DICOM object which may be split into multiple PDUs
420    /// let mut pdata = association.send_pdata(presentation_context_id);
421    /// pdata.write_all(dicom_data()).await?;
422    /// pdata.finish().await?;
423    ///
424    /// let pdu_ac = association.receive().await?;
425    /// # Ok(())
426    /// # }
427    /// ```
428    #[must_use]
429    pub struct AsyncPDataWriter<W: AsyncWrite + Unpin> {
430        buffer: Vec<u8>,
431        stream: W,
432        max_data_len: u32,
433        state: WriteState,
434    }
435
436    #[cfg(feature = "async")]
437    impl<W> AsyncPDataWriter<W>
438    where
439        W: AsyncWrite + Unpin,
440    {
441        /// Construct a new P-Data value writer.
442        ///
443        /// `max_pdu_length` is the maximum value of the PDU-length property.
444        pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
445            use crate::pdu::LARGE_PDU_SIZE;
446
447            let max_data_length = calculate_max_data_len_single(max_pdu_length);
448            let mut buffer =
449                Vec::with_capacity((max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize);
450            // initial buffer set up
451            buffer.extend([
452                // PDU-type + reserved byte
453                0x04,
454                0x00,
455                // full PDU length, unknown at this point
456                0xFF,
457                0xFF,
458                0xFF,
459                0xFF,
460                // presentation data length, unknown at this point
461                0xFF,
462                0xFF,
463                0xFF,
464                0xFF,
465                // presentation context id
466                presentation_context_id,
467                // message control header, unknown at this point
468                0xFF,
469            ]);
470
471            AsyncPDataWriter {
472                stream,
473                max_data_len: max_data_length,
474                buffer,
475                state: WriteState::Ready,
476            }
477        }
478
479        /// Declare to have finished sending P-Data fragments,
480        /// thus emitting the last P-Data fragment PDU.
481        ///
482        /// This is also done automatically once the P-Data writer is dropped.
483        pub async fn finish(mut self) -> std::io::Result<()> {
484            self.finish_impl().await?;
485            Ok(())
486        }
487
488        async fn finish_impl(&mut self) -> std::io::Result<()> {
489            if !self.buffer.is_empty() {
490                // send last PDU
491                setup_pdata_header(&mut self.buffer, true);
492                if let Err(e) = self.stream.write_all(&self.buffer[..]).await {
493                    println!("Error: {e:?}");
494                }
495                // clear buffer so that subsequent calls to `finish_impl`
496                // do not send any more PDUs
497                self.buffer.clear();
498            }
499            Ok(())
500        }
501    }
502
503    #[cfg(feature = "async")]
504    impl<W> AsyncWrite for AsyncPDataWriter<W>
505    where
506        W: AsyncWrite + Unpin,
507    {
508        fn poll_write(
509            mut self: Pin<&mut Self>,
510            cx: &mut Context<'_>,
511            buf: &[u8],
512        ) -> Poll<std::result::Result<usize, std::io::Error>> {
513            // Each call to `poll_write` on the underlying stream may or may not
514            // write the whole of `self.buffer`, therefore we need to keep track
515            // of how much we've written, this is done in `self.state`
516            match self.state {
517                WriteState::Ready => {
518                    // If we're in ready state, we can prepare another PDU
519                    let total_len = self.max_data_len as usize + PDU_PDV_HEADER_SIZE;
520                    if self.buffer.len() + buf.len() <= total_len {
521                        // Still have space in `self.buffer`, accumulate into buffer
522                        self.buffer.extend(buf);
523                        Poll::Ready(Ok(buf.len()))
524                    } else {
525                        // `self.buffer` is full, fill in the rest of the
526                        // buffer, prepare to send PDU
527                        let slice = &buf[..total_len - self.buffer.len()];
528                        self.buffer.extend(slice);
529                        debug_assert_eq!(self.buffer.len(), total_len);
530                        setup_pdata_header(&mut self.buffer, false);
531                        let this = self.get_mut();
532                        // Attempt to send PDU on wire
533                        match Pin::new(&mut this.stream).poll_write(cx, &this.buffer) {
534                            Poll::Ready(Ok(n)) => {
535                                if n == this.buffer.len() {
536                                    // If we wrote the whole buffer, reset `self.buffer`
537                                    this.buffer.truncate(PDU_PDV_HEADER_SIZE);
538                                    Poll::Ready(Ok(slice.len()))
539                                } else {
540                                    // Otherwise keep track of how much we wrote and change state to Writing
541                                    this.state = WriteState::Writing(n);
542                                    Poll::Pending
543                                }
544                            }
545                            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
546                            Poll::Pending => {
547                                // Nothing was written yet, change state to writing at position 0
548                                this.state = WriteState::Writing(0);
549                                Poll::Pending
550                            }
551                        }
552                    }
553                }
554                WriteState::Writing(pos) => {
555                    // Continue writing to stream from current position
556                    let buflen = self.buffer.len();
557                    let this = self.get_mut();
558                    match Pin::new(&mut this.stream).poll_write(cx, &this.buffer[pos..]) {
559                        Poll::Ready(Ok(n)) => {
560                            if (n + pos) == this.buffer.len() {
561                                // If we wrote the whole buffer, reset `self.buffer` and change state back to ready
562                                this.buffer.truncate(PDU_PDV_HEADER_SIZE);
563                                this.state = WriteState::Ready;
564                                Poll::Ready(Ok(buflen - PDU_PDV_HEADER_SIZE))
565                            } else {
566                                // Otherwise add to current position
567                                this.state = WriteState::Writing(n + pos);
568                                Poll::Pending
569                            }
570                        }
571                        Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
572                        Poll::Pending => Poll::Pending,
573                    }
574                }
575            }
576        }
577
578        fn poll_flush(
579            mut self: Pin<&mut Self>,
580            cx: &mut Context<'_>,
581        ) -> Poll<std::result::Result<(), std::io::Error>> {
582            Pin::new(&mut self.stream).poll_flush(cx)
583        }
584
585        fn poll_shutdown(
586            mut self: Pin<&mut Self>,
587            cx: &mut Context<'_>,
588        ) -> Poll<std::result::Result<(), std::io::Error>> {
589            Pin::new(&mut self.stream).poll_shutdown(cx)
590        }
591    }
592
593    /// With the P-Data writer dropped,
594    /// this `Drop` implementation
595    /// will construct and emit the last P-Data fragment PDU
596    /// if there is any data left to send.
597    impl<W> Drop for AsyncPDataWriter<W>
598    where
599        W: AsyncWrite + Unpin,
600    {
601        fn drop(&mut self) {
602            tokio::task::block_in_place(move || {
603                tokio::runtime::Handle::current().block_on(async move {
604                    let _ = self.finish_impl().await;
605                })
606            })
607        }
608    }
609
610    impl<R> AsyncRead for PDataReader<'_, R>
611    where
612        R: AsyncRead + Unpin,
613    {
614        fn poll_read(
615            mut self: Pin<&mut Self>,
616            cx: &mut Context<'_>,
617            buf: &mut ReadBuf,
618        ) -> Poll<std::io::Result<()>> {
619            if self.buffer.is_empty() {
620                if self.last_pdu {
621                    return Poll::Ready(Ok(()));
622                }
623                let Self {
624                    ref mut stream,
625                    ref mut read_buffer,
626                    ref max_data_length,
627                    ..
628                } = &mut *self;
629                let mut reader = BufReader::new(stream);
630                let msg = loop {
631                    let mut buf = Cursor::new(&read_buffer[..]);
632                    match read_pdu(&mut buf, *max_data_length + PDV_HEADER_SIZE, false)
633                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
634                    {
635                        Some(pdu) => {
636                            read_buffer.advance(buf.position() as usize);
637                            break pdu;
638                        }
639                        None => {
640                            // Reset position
641                            buf.set_position(0)
642                        }
643                    }
644                    let recv = ready!(Pin::new(&mut reader).poll_fill_buf(cx))?.to_vec();
645                    reader.consume(recv.len());
646                    read_buffer.extend_from_slice(&recv);
647                    if recv.is_empty() {
648                        return Poll::Ready(Err(std::io::Error::new(
649                            std::io::ErrorKind::Other,
650                            "Connection closed by peer",
651                        )));
652                    }
653                };
654                match msg {
655                    Pdu::PData { data } => {
656                        for pdata_value in data {
657                            self.presentation_context_id = match self.presentation_context_id {
658                                None => Some(pdata_value.presentation_context_id),
659                                Some(cid) if cid == pdata_value.presentation_context_id => {
660                                    Some(cid)
661                                }
662                                Some(cid) => {
663                                    warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
664                                    Some(cid)
665                                }
666                            };
667                            self.buffer.extend(pdata_value.data);
668                            self.last_pdu = pdata_value.is_last;
669                        }
670                    }
671                    _ => {
672                        return Poll::Ready(Err(std::io::Error::new(
673                            std::io::ErrorKind::UnexpectedEof,
674                            "Unexpected PDU type",
675                        )))
676                    }
677                }
678            }
679            let len = std::cmp::min(self.buffer.len(), buf.remaining());
680            for _ in 0..len {
681                buf.put_u8(self.buffer.pop_front().unwrap());
682            }
683            Poll::Ready(Ok(()))
684        }
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use std::io::{Read, Write};
691
692    use crate::association::pdata::PDataWriter;
693    use crate::pdu::{read_pdu, Pdu, MINIMUM_PDU_SIZE, PDV_HEADER_SIZE};
694    use crate::pdu::{PDataValue, PDataValueType};
695    use crate::write_pdu;
696
697    use super::PDataReader;
698
699    use bytes::BytesMut;
700    #[cfg(feature = "async")]
701    use tokio::io::AsyncWriteExt;
702
703    #[cfg(feature = "async")]
704    use crate::association::pdata::non_blocking::AsyncPDataWriter;
705
706    #[test]
707    fn test_write_pdata_and_finish() {
708        let presentation_context_id = 12;
709
710        let mut buf = Vec::new();
711        {
712            let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
713            writer.write_all(&(0..64).collect::<Vec<u8>>()).unwrap();
714            writer.finish().unwrap();
715        }
716
717        let mut cursor = &buf[..];
718        let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
719
720        // concatenate data chunks, compare with all data
721
722        match same_pdu {
723            Some(Pdu::PData { data: data_1 }) => {
724                let data_1 = &data_1[0];
725
726                // check that this PDU is consistent
727                assert_eq!(data_1.value_type, PDataValueType::Data);
728                assert_eq!(data_1.presentation_context_id, presentation_context_id);
729                assert_eq!(data_1.data.len(), 64);
730                assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
731            }
732            pdu => panic!("Expected PData, got {:?}", pdu),
733        }
734
735        assert_eq!(cursor.len(), 0);
736    }
737
738    #[cfg(feature = "async")]
739    #[tokio::test(flavor = "multi_thread")]
740    async fn test_async_write_pdata_and_finish() {
741        let presentation_context_id = 12;
742
743        let mut buf = Vec::new();
744        {
745            let mut writer =
746                AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
747            writer
748                .write_all(&(0..64).collect::<Vec<u8>>())
749                .await
750                .unwrap();
751            writer.finish().await.unwrap();
752        }
753
754        let mut cursor = &buf[..];
755        let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
756
757        // concatenate data chunks, compare with all data
758
759        match same_pdu {
760            Some(Pdu::PData { data: data_1 }) => {
761                let data_1 = &data_1[0];
762
763                // check that this PDU is consistent
764                assert_eq!(data_1.value_type, PDataValueType::Data);
765                assert_eq!(data_1.presentation_context_id, presentation_context_id);
766                assert_eq!(data_1.data.len(), 64);
767                assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
768            }
769            pdu => panic!("Expected PData, got {:?}", pdu),
770        }
771
772        assert_eq!(cursor.len(), 0);
773    }
774
775    #[test]
776    fn test_write_large_pdata_and_finish() {
777        let presentation_context_id = 32;
778
779        let my_data: Vec<_> = (0..2500).map(|x: u32| x as u8).collect();
780
781        let mut buf = Vec::new();
782        {
783            let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
784            writer.write_all(&my_data).unwrap();
785            writer.finish().unwrap();
786        }
787
788        let mut cursor = &buf[..];
789        let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
790        let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
791        let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
792
793        // concatenate data chunks, compare with all data
794
795        match (pdu_1, pdu_2, pdu_3) {
796            (
797                Some(Pdu::PData { data: data_1 }),
798                Some(Pdu::PData { data: data_2 }),
799                Some(Pdu::PData { data: data_3 }),
800            ) => {
801                assert_eq!(data_1.len(), 1);
802                let data_1 = &data_1[0];
803                assert_eq!(data_2.len(), 1);
804                let data_2 = &data_2[0];
805                assert_eq!(data_3.len(), 1);
806                let data_3 = &data_3[0];
807
808                // check that these two PDUs are consistent
809                assert_eq!(data_1.value_type, PDataValueType::Data);
810                assert_eq!(data_2.value_type, PDataValueType::Data);
811                assert_eq!(data_1.presentation_context_id, presentation_context_id);
812                assert_eq!(data_2.presentation_context_id, presentation_context_id);
813
814                // check expected lengths
815                assert_eq!(
816                    data_1.data.len(),
817                    (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
818                );
819                assert_eq!(
820                    data_2.data.len(),
821                    (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
822                );
823                assert_eq!(
824                    data_3.data.len(),
825                    2500 - (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize * 2
826                );
827
828                // check data consistency
829                assert_eq!(
830                    &data_1.data[..],
831                    (0..MINIMUM_PDU_SIZE - PDV_HEADER_SIZE)
832                        .map(|x| x as u8)
833                        .collect::<Vec<_>>()
834                );
835                assert_eq!(
836                    data_1.data.len() + data_2.data.len() + data_3.data.len(),
837                    2500
838                );
839
840                let data_1 = &data_1.data;
841                let data_2 = &data_2.data;
842                let data_3 = &data_3.data;
843
844                let mut all_data: Vec<u8> = Vec::new();
845                all_data.extend(data_1);
846                all_data.extend(data_2);
847                all_data.extend(data_3);
848                assert_eq!(all_data, my_data);
849            }
850            x => panic!("Expected 3 PDatas, got {:?}", x),
851        }
852
853        assert_eq!(cursor.len(), 0);
854    }
855
856    #[cfg(feature = "async")]
857    #[tokio::test(flavor = "multi_thread")]
858    async fn test_async_write_large_pdata_and_finish() {
859        let presentation_context_id = 32;
860
861        let my_data: Vec<_> = (0..2500).map(|x: u32| x as u8).collect();
862
863        let mut buf = Vec::new();
864        {
865            let mut writer =
866                AsyncPDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
867            writer.write_all(&my_data).await.unwrap();
868            writer.finish().await.unwrap();
869        }
870
871        let mut cursor = &buf[..];
872        let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
873        let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
874        let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
875
876        // concatenate data chunks, compare with all data
877
878        match (pdu_1, pdu_2, pdu_3) {
879            (
880                Some(Pdu::PData { data: data_1 }),
881                Some(Pdu::PData { data: data_2 }),
882                Some(Pdu::PData { data: data_3 }),
883            ) => {
884                assert_eq!(data_1.len(), 1);
885                let data_1 = &data_1[0];
886                assert_eq!(data_2.len(), 1);
887                let data_2 = &data_2[0];
888                assert_eq!(data_3.len(), 1);
889                let data_3 = &data_3[0];
890
891                // check that these two PDUs are consistent
892                assert_eq!(data_1.value_type, PDataValueType::Data);
893                assert_eq!(data_2.value_type, PDataValueType::Data);
894                assert_eq!(data_1.presentation_context_id, presentation_context_id);
895                assert_eq!(data_2.presentation_context_id, presentation_context_id);
896
897                // check expected lengths
898                assert_eq!(
899                    data_1.data.len(),
900                    (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
901                );
902                assert_eq!(
903                    data_2.data.len(),
904                    (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize
905                );
906                assert_eq!(
907                    data_3.data.len(),
908                    2500 - (MINIMUM_PDU_SIZE - PDV_HEADER_SIZE) as usize * 2
909                );
910
911                // check data consistency
912                assert_eq!(
913                    &data_1.data[..],
914                    (0..MINIMUM_PDU_SIZE - PDV_HEADER_SIZE)
915                        .map(|x| x as u8)
916                        .collect::<Vec<_>>()
917                );
918                assert_eq!(
919                    data_1.data.len() + data_2.data.len() + data_3.data.len(),
920                    2500
921                );
922
923                let data_1 = &data_1.data;
924                let data_2 = &data_2.data;
925                let data_3 = &data_3.data;
926
927                let mut all_data: Vec<u8> = Vec::new();
928                all_data.extend(data_1);
929                all_data.extend(data_2);
930                all_data.extend(data_3);
931                assert_eq!(all_data, my_data);
932            }
933            x => panic!("Expected 3 PDatas, got {:?}", x),
934        }
935
936        assert_eq!(cursor.len(), 0);
937    }
938
939    #[test]
940    fn test_read_large_pdata_and_finish() {
941        use std::collections::VecDeque;
942        let presentation_context_id = 32;
943
944        let my_data: Vec<_> = (0..2500).map(|x: u32| x as u8).collect();
945        let pdata_1 = vec![PDataValue {
946            value_type: PDataValueType::Data,
947            data: my_data[0..1018].to_owned(),
948            presentation_context_id,
949            is_last: false,
950        }];
951        let pdata_2 = vec![PDataValue {
952            value_type: PDataValueType::Data,
953            data: my_data[1018..2036].to_owned(),
954            presentation_context_id,
955            is_last: false,
956        }];
957        let pdata_3 = vec![PDataValue {
958            value_type: PDataValueType::Data,
959            data: my_data[2036..].to_owned(),
960            presentation_context_id,
961            is_last: true,
962        }];
963
964        let mut pdu_stream = VecDeque::new();
965
966        // write some PDUs
967        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
968        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
969        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
970
971        let mut buf = Vec::new();
972        {
973            let mut read_buf = BytesMut::new();
974            let mut reader = PDataReader::new(&mut pdu_stream, MINIMUM_PDU_SIZE, &mut read_buf);
975            reader.read_to_end(&mut buf).unwrap();
976        }
977        assert_eq!(buf, my_data);
978    }
979
980    #[cfg(feature = "async")]
981    #[tokio::test]
982    async fn test_async_read_large_pdata_and_finish() {
983        use tokio::io::AsyncReadExt;
984
985        let presentation_context_id = 32;
986
987        let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
988        let pdata_1 = vec![PDataValue {
989            value_type: PDataValueType::Data,
990            data: my_data[0..3000].to_owned(),
991            presentation_context_id,
992            is_last: false,
993        }];
994        let pdata_2 = vec![PDataValue {
995            value_type: PDataValueType::Data,
996            data: my_data[3000..6000].to_owned(),
997            presentation_context_id,
998            is_last: false,
999        }];
1000        let pdata_3 = vec![PDataValue {
1001            value_type: PDataValueType::Data,
1002            data: my_data[6000..].to_owned(),
1003            presentation_context_id,
1004            is_last: true,
1005        }];
1006
1007        let mut pdu_stream = std::io::Cursor::new(Vec::new());
1008
1009        // write some PDUs
1010        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
1011        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
1012        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
1013
1014        let mut buf = Vec::new();
1015        let inner = pdu_stream.into_inner();
1016        let mut stream = tokio::io::BufReader::new(inner.as_slice());
1017        {
1018            let mut read_buf = BytesMut::new();
1019            let mut reader = PDataReader::new(&mut stream, MINIMUM_PDU_SIZE, &mut read_buf);
1020            reader.read_to_end(&mut buf).await.unwrap();
1021        }
1022        assert_eq!(buf, my_data);
1023    }
1024}