dicom-ul 0.9.1

Types and methods for interacting with the DICOM Upper Layer Protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
//! DICOM association module
//!
//! This module contains utilities for establishing associations
//! between DICOM nodes via TCP/IP.
//!
//! As an association requester, often as a service class user (SCU),
//! a new association can be started
//! via the [`ClientAssociationOptions`] type.
//! The minimum required properties are the accepted abstract syntaxes
//! and the TCP socket address to the target node.
//!
//! As an association acceptor,
//! usually taking the role of a service class provider (SCP),
//! a newly created [TCP stream][1] can be passed to
//! a previously prepared [`ServerAssociationOptions`].
//!
//!
//! [1]: std::net::TcpStream
pub mod client;
pub mod server;
#[cfg(test)]
mod tests;

mod uid;

pub(crate) mod pdata;

use std::{
    backtrace::Backtrace,
    io::{BufRead, BufReader, Cursor, Read},
    time::Duration,
};

use bytes::{Buf, BytesMut};
#[cfg(feature = "async")]
pub use client::AsyncClientAssociation;
pub use client::{ClientAssociation, ClientAssociationOptions};
#[cfg(feature = "async")]
pub use pdata::non_blocking::AsyncPDataWriter;
pub use pdata::{PDataReader, PDataWriter};
#[cfg(feature = "async")]
pub use server::AsyncServerAssociation;
pub use server::{ServerAssociation, ServerAssociationOptions};
use snafu::{ensure, ResultExt, Snafu};

use crate::{
    pdu::{self, AssociationRJ, PresentationContextNegotiated, ReadPduSnafu, UserVariableItem},
    write_pdu, Pdu,
};

type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Snafu)]
#[non_exhaustive]
pub enum Error {
    /// missing abstract syntax to begin negotiation
    MissingAbstractSyntax { backtrace: Backtrace },

    /// could not convert to sockeDUt address
    ToAddress {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    /// could not connect to server
    Connect {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    /// Could not set tcp read timeout
    SetReadTimeout {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    /// Could not set tcp write timeout
    SetWriteTimeout {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    /// failed to send association request
    #[snafu(display("failed to send pdu: {}", source))]
    SendPdu {
        #[snafu(backtrace)]
        source: crate::pdu::WriteError,
    },

    /// failed to receive association response
    #[snafu(display("failed to receive pdu: {}", source))]
    ReceivePdu {
        #[snafu(backtrace)]
        source: crate::pdu::ReadError,
    },

    #[snafu(display("unexpected response from peer `{:?}`", pdu))]
    #[non_exhaustive]
    UnexpectedPdu {
        /// the PDU obtained from the server
        pdu: Box<Pdu>,
    },

    #[snafu(display("unknown response from peer `{:?}`", pdu))]
    #[non_exhaustive]
    UnknownPdu {
        /// the PDU obtained from the server, of variant Unknown
        pdu: Box<Pdu>,
    },

    #[snafu(display("protocol version mismatch: expected {}, got {}", expected, got))]
    ProtocolVersionMismatch {
        expected: u16,
        got: u16,
        backtrace: Backtrace,
    },

    // Association rejected by the server
    #[snafu(display("association rejected {}", association_rj.source))]
    Rejected {
        association_rj: AssociationRJ,
        backtrace: Backtrace,
    },

    /// association aborted
    Aborted { backtrace: Backtrace },

    /// no presentation contexts accepted by the server
    NoAcceptedPresentationContexts { backtrace: Backtrace },

    /// failed to send PDU message on wire
    #[non_exhaustive]
    WireSend {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    /// failed to read PDU message from wire
    #[non_exhaustive]
    WireRead {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    /// Operation timed out
    #[non_exhaustive]
    Timeout {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    #[snafu(display("failed close connection: {}", source))]
    Close {
        source: std::io::Error,
        backtrace: Backtrace,
    },

    #[snafu(display(
        "PDU is too large ({} bytes) to be sent to the remote application entity",
        length
    ))]
    #[non_exhaustive]
    SendTooLongPdu { length: usize, backtrace: Backtrace },

    #[snafu(display("Connection closed by peer"))]
    ConnectionClosed,

    /// TLS configuration is missing
    #[cfg(feature = "sync-tls")]
    #[snafu(display("TLS configuration is required but not provided"))]
    TlsConfigMissing { backtrace: Backtrace },

    /// Invalid server name for TLS
    #[cfg(feature = "sync-tls")]
    #[snafu(display("Invalid server name for TLS connection"))]
    InvalidServerName {
        source: rustls::pki_types::InvalidDnsNameError,
        backtrace: Backtrace,
    },

    /// Failed to establish TLS connection
    #[cfg(feature = "sync-tls")]
    #[snafu(display("Failed to establish TLS connection: {:?}", source))]
    TlsConnection {
        source: rustls::Error,
        backtrace: Backtrace,
    },
}
/// Struct to hold negotiated options after association is accepted
pub(crate) struct NegotiatedOptions {
    /// Maximum PDU length the peer can handle
    peer_max_pdu_length: u32,
    /// User variables accepted by the peer
    user_variables: Vec<UserVariableItem>,
    /// Presentation contexts accepted by the peer
    presentation_contexts: Vec<PresentationContextNegotiated>,
    /// The peer's AE title
    peer_ae_title: String,
}

/// Socket configuration for associations
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct SocketOptions {
    /// Timeout for individual read operations
    read_timeout: Option<Duration>,
    /// Timeout for individual send operations
    write_timeout: Option<Duration>,
    /// Timeout for connection establishment
    connection_timeout: Option<Duration>,
}

/// Trait to close underlying socket
pub trait CloseSocket {
    fn close(&mut self) -> std::io::Result<()>;
}

impl CloseSocket for std::net::TcpStream {
    fn close(&mut self) -> std::io::Result<()> {
        self.shutdown(std::net::Shutdown::Both)
    }
}

#[cfg(feature = "sync-tls")]
impl CloseSocket for rustls::StreamOwned<rustls::ClientConnection, std::net::TcpStream> {
    fn close(&mut self) -> std::io::Result<()> {
        self.get_mut().shutdown(std::net::Shutdown::Both)
    }
}

#[cfg(feature = "sync-tls")]
impl CloseSocket for rustls::StreamOwned<rustls::ServerConnection, std::net::TcpStream> {
    fn close(&mut self) -> std::io::Result<()> {
        self.get_mut().shutdown(std::net::Shutdown::Both)
    }
}

/// Trait that represents common properties of an association
pub trait Association {
    /// Obtain the remote DICOM node's application entity title.
    fn peer_ae_title(&self) -> &str;

    /// Retrieve the maximum PDU length
    /// admitted by the association acceptor.
    fn acceptor_max_pdu_length(&self) -> u32;

    /// Retrieve the maximum PDU length
    /// admitted by the association requestor.
    fn requestor_max_pdu_length(&self) -> u32;

    /// Retrieve the maximum PDU length
    /// that this application entity is expecting to receive.
    /// That's the same as acceptor_max_pdu_length() for
    /// server objects, and as requestor_max_pdu_length()
    /// for client objects.
    ///
    /// The current implementation is not required to fail
    /// and/or abort the association
    /// if a larger PDU is received.
    fn local_max_pdu_length(&self) -> u32;

    /// Retrieve the maximum PDU length
    /// admitted by the peer.
    /// That's the same as requestor_max_pdu_length() for
    /// server objects, and as acceptor_max_pdu_length()
    /// for client objects.
    fn peer_max_pdu_length(&self) -> u32;

    /// Obtain a view of the negotiated presentation contexts.
    fn presentation_contexts(&self) -> &[PresentationContextNegotiated];

    /// Retrieve the user variables that were taken from the server.
    ///
    /// It usually contains the maximum PDU length,
    /// the implementation class UID, and the implementation version name.
    fn user_variables(&self) -> &[UserVariableItem];
}

mod private {
    use crate::{
        pdu::{AbortRQServiceProviderReason, AbortRQSource},
        Pdu,
    };
    use snafu::ResultExt;

    /// Private trait which exposes "unsafe" methods that should not be called by the user
    ///
    /// `close` and `release` _should_ take ownership, and in the public interface, they
    /// do. However, in order to implement `Drop` we need to expose a version of these
    /// methods that don't take ownership.
    ///
    /// `send` and `receive` implementations are needed in order to provide
    /// the implementation for `release`
    pub trait SyncAssociationSealed<S: std::io::Read + std::io::Write + super::CloseSocket> {
        fn close(&mut self) -> std::io::Result<()>;
        fn send(&mut self, pdu: &Pdu) -> super::Result<()>;
        fn receive(&mut self) -> super::Result<Pdu>;
        fn release(&mut self) -> super::Result<()> {
            let pdu = Pdu::ReleaseRQ;
            self.send(&pdu)?;
            let pdu = self.receive()?;

            match pdu {
                Pdu::ReleaseRP => {}
                pdu @ Pdu::AbortRQ { .. }
                | pdu @ Pdu::AssociationAC { .. }
                | pdu @ Pdu::AssociationRJ { .. }
                | pdu @ Pdu::AssociationRQ { .. }
                | pdu @ Pdu::PData { .. }
                | pdu @ Pdu::ReleaseRQ => return super::UnexpectedPduSnafu { pdu }.fail(),
                pdu @ Pdu::Unknown { .. } => return super::UnknownPduSnafu { pdu }.fail(),
            }
            self.close().context(super::CloseSnafu)?;
            Ok(())
        }

        fn abort(&mut self) -> super::Result<()>
        where
            Self: Sized,
        {
            let pdu = Pdu::AbortRQ {
                source: AbortRQSource::ServiceProvider(
                    AbortRQServiceProviderReason::ReasonNotSpecified,
                ),
            };
            let out = self.send(&pdu);
            let _ = self.close();
            out
        }
    }

    /// Private trait which exposes "unsafe" methods that should not be called by the user
    ///
    /// `close` and `release` _should_ take ownership, and in the public interface, they
    /// do. However, in order to implement `Drop` we need to expose a version of these
    /// methods that don't take ownership.
    ///
    /// `send` and `receive` implementations are needed in order to provide
    /// the implementation for `release`
    #[cfg(feature = "async")]
    pub trait AsyncAssociationSealed<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin> {
        fn close(&mut self) -> impl std::future::Future<Output = std::io::Result<()>> + Send
        where
            Self: Send;
        fn send(
            &mut self,
            pdu: &Pdu,
        ) -> impl std::future::Future<Output = super::Result<()>> + Send
        where
            Self: Send;
        fn receive(&mut self) -> impl std::future::Future<Output = super::Result<Pdu>> + Send
        where
            Self: Send;
        fn release(&mut self) -> impl std::future::Future<Output = super::Result<()>> + Send
        where
            Self: Send,
        {
            async move {
                let pdu = Pdu::ReleaseRQ;
                self.send(&pdu).await?;
                let pdu = self.receive().await?;

                match pdu {
                    Pdu::ReleaseRP => {}
                    pdu @ Pdu::AbortRQ { .. }
                    | pdu @ Pdu::AssociationAC { .. }
                    | pdu @ Pdu::AssociationRJ { .. }
                    | pdu @ Pdu::AssociationRQ { .. }
                    | pdu @ Pdu::PData { .. }
                    | pdu @ Pdu::ReleaseRQ => return super::UnexpectedPduSnafu { pdu }.fail(),
                    pdu @ Pdu::Unknown { .. } => return super::UnknownPduSnafu { pdu }.fail(),
                }
                self.close().await.context(super::CloseSnafu)?;
                Ok(())
            }
        }

        fn abort(&mut self) -> impl std::future::Future<Output = super::Result<()>> + Send
        where
            Self: Sized + Send,
        {
            let pdu = Pdu::AbortRQ {
                source: AbortRQSource::ServiceProvider(
                    AbortRQServiceProviderReason::ReasonNotSpecified,
                ),
            };
            async move {
                let out = self.send(&pdu).await;
                let _ = self.close().await;
                out
            }
        }
    }
}

/// Trait that represents methods that can be made on a synchronous association.
pub trait SyncAssociation<S: std::io::Read + std::io::Write + CloseSocket>:
    private::SyncAssociationSealed<S> + Association
{
    /// Obtain access to the inner stream
    /// connected to the association acceptor.
    ///
    /// This can be used to send the PDU in semantic fragments of the message,
    /// thus using less memory.
    ///
    /// **Note:** reading and writing should be done with care
    /// to avoid inconsistencies in the association state.
    /// Do not call `send` and `receive` while not in a PDU boundary.
    fn inner_stream(&mut self) -> &mut S;

    /// Obtain mutable access to the inner stream and read buffer
    fn get_mut(&mut self) -> (&mut S, &mut BytesMut);

    /// Send a PDU message to the other intervenient.
    fn send(&mut self, pdu: &Pdu) -> Result<()> {
        private::SyncAssociationSealed::send(self, pdu)
    }

    /// Read a PDU message from the other intervenient.
    fn receive(&mut self) -> Result<Pdu> {
        private::SyncAssociationSealed::receive(self)
    }

    /// Send a provider initiated abort message
    /// and shut down the TCP connection,
    /// terminating the association.
    fn abort(mut self) -> Result<()>
    where
        Self: Sized,
    {
        private::SyncAssociationSealed::abort(&mut self)
    }

    /// Iniate a graceful release of the association.
    ///
    /// A DIMSE A-RELEASE transaction is initiated by this application entity,
    /// and the underlying socket is closed once settled.
    ///
    /// Note that as of version 0.9.1,
    /// implementers of this trait no longer call this method on [`Drop`],
    /// so remember to call `release` explicitly
    /// at the end of all DIMSE transactions.
    fn release(mut self) -> Result<()>
    where
        Self: Sized,
    {
        private::SyncAssociationSealed::release(&mut self)
    }

    /// Prepare a P-Data writer for sending
    /// one or more data item PDUs.
    ///
    /// Returns a writer which automatically
    /// splits the inner data into separate PDUs if necessary.
    fn send_pdata(&mut self, presentation_context_id: u8) -> PDataWriter<&mut S> {
        let max_pdu_length = self.peer_max_pdu_length();
        PDataWriter::new(self.inner_stream(), presentation_context_id, max_pdu_length)
    }

    /// Prepare a P-Data reader for receiving
    /// one or more data item PDUs.
    ///
    /// Returns a reader which automatically
    /// receives more data PDUs once the bytes collected are consumed.
    fn receive_pdata(&mut self) -> PDataReader<'_, &mut S> {
        let max_pdu_length = self.local_max_pdu_length();
        let (socket, read_buffer) = self.get_mut();
        PDataReader::new(socket, max_pdu_length, read_buffer)
    }
}

#[cfg(feature = "async")]
/// Trait that represents methods that can be made on an asynchronous association.
pub trait AsyncAssociation<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin>:
    private::AsyncAssociationSealed<S> + Association
{
    /// Obtain access to the inner stream
    /// connected to the association acceptor.
    ///
    /// This can be used to send the PDU in semantic fragments of the message,
    /// thus using less memory.
    ///
    /// **Note:** reading and writing should be done with care
    /// to avoid inconsistencies in the association state.
    /// Do not call `send` and `receive` while not in a PDU boundary.
    fn inner_stream(&mut self) -> &mut S;

    /// Obtain mutable access to the inner stream and read buffer
    fn get_mut(&mut self) -> (&mut S, &mut BytesMut);

    /// Send a PDU message to the other intervenient.
    fn send(&mut self, pdu: &Pdu) -> impl std::future::Future<Output = Result<()>> + Send
    where
        Self: Send,
    {
        async move { private::AsyncAssociationSealed::send(self, pdu).await }
    }

    /// Read a PDU message from the other intervenient.
    fn receive(&mut self) -> impl std::future::Future<Output = Result<Pdu>> + Send
    where
        Self: Send,
    {
        async move { private::AsyncAssociationSealed::receive(self).await }
    }

    /// Send a provider initiated abort message
    /// and shut down the TCP connection,
    /// terminating the association.
    fn abort(mut self) -> impl std::future::Future<Output = Result<()>> + Send
    where
        Self: Sized + Send,
    {
        async move { private::AsyncAssociationSealed::abort(&mut self).await }
    }

    /// Iniate a graceful release of the association.
    ///
    /// A DIMSE A-RELEASE transaction is initiated by this application entity,
    /// and the underlying socket is closed once settled.
    ///
    /// Note that implementers of this trait
    /// do not try to release the association on [`Drop`],
    /// so remember to call `release` explicitly
    /// at the end of all DIMSE transactions.
    fn release(mut self) -> impl std::future::Future<Output = Result<()>> + Send
    where
        Self: Sized + Send,
    {
        async move { private::AsyncAssociationSealed::release(&mut self).await }
    }

    /// Prepare a P-Data writer for sending
    /// one or more data item PDUs.
    ///
    /// Returns a writer which automatically
    /// splits the inner data into separate PDUs if necessary.
    fn send_pdata(&mut self, presentation_context_id: u8) -> AsyncPDataWriter<&mut S> {
        let max_pdu_length = self.peer_max_pdu_length();
        AsyncPDataWriter::new(self.inner_stream(), presentation_context_id, max_pdu_length)
    }

    /// Prepare a P-Data reader for receiving
    /// one or more data item PDUs.
    ///
    /// Returns a reader which automatically
    /// receives more data PDUs once the bytes collected are consumed.
    fn receive_pdata(&mut self) -> PDataReader<'_, &mut S> {
        let max_pdu_length = self.local_max_pdu_length();
        let (socket, read_buffer) = self.get_mut();
        PDataReader::new(socket, max_pdu_length, read_buffer)
    }
}

// Helper function to perform an operation with timeout
#[cfg(feature = "async")]
async fn timeout<T>(
    timeout: Option<Duration>,
    block: impl std::future::Future<Output = Result<T>>,
) -> Result<T> {
    if let Some(timeout) = timeout {
        tokio::time::timeout(timeout, block)
            .await
            .map_err(|_| std::io::Error::from(std::io::ErrorKind::TimedOut))
            .context(crate::association::TimeoutSnafu)?
    } else {
        block.await
    }
}

/// Encode a PDU into the provided buffer
pub(crate) fn encode_pdu(buffer: &mut Vec<u8>, pdu: &Pdu, peer_max_pdu_length: u32) -> Result<()> {
    write_pdu(buffer, pdu).context(SendPduSnafu)?;
    if buffer.len() > peer_max_pdu_length as usize {
        return SendTooLongPduSnafu {
            length: buffer.len(),
        }
        .fail();
    }
    Ok(())
}

/// Helper function to get a PDU from a reader.
///
/// Chunks of data are read into `read_buffer`,
/// which should be passed in subsequent calls
/// to receive more PDUs from the same stream.
pub fn read_pdu_from_wire<R>(
    reader: &mut R,
    read_buffer: &mut BytesMut,
    max_pdu_length: u32,
    strict: bool,
) -> Result<Pdu>
where
    R: Read,
{
    let mut reader = BufReader::new(reader);
    let msg = loop {
        let mut buf = Cursor::new(&read_buffer[..]);
        // try to read a PDU according to what's in the buffer
        match pdu::read_pdu(&mut buf, max_pdu_length, strict).context(ReceivePduSnafu)? {
            Some(pdu) => {
                read_buffer.advance(buf.position() as usize);
                break pdu;
            }
            None => {
                // Reset position
                buf.set_position(0)
            }
        }
        // Use BufReader to get similar behavior to AsyncRead read_buf
        let recv = reader
            .fill_buf()
            .context(ReadPduSnafu)
            .context(ReceivePduSnafu)?;
        let bytes_read = recv.len();
        read_buffer.extend_from_slice(recv);
        reader.consume(bytes_read);
        ensure!(bytes_read != 0, ConnectionClosedSnafu);
    };
    Ok(msg)
}

/// Helper function to get a PDU from an async reader.
///
/// Chunks of data are read into `read_buffer`,
/// which should be passed in subsequent calls
/// to receive more PDUs from the same stream.
#[cfg(feature = "async")]
pub async fn read_pdu_from_wire_async<R: tokio::io::AsyncRead + Unpin>(
    reader: &mut R,
    read_buffer: &mut BytesMut,
    max_pdu_length: u32,
    strict: bool,
) -> Result<Pdu> {
    use tokio::io::AsyncReadExt;
    // receive response

    let msg = loop {
        let mut buf = Cursor::new(&read_buffer[..]);
        match pdu::read_pdu(&mut buf, max_pdu_length, strict).context(ReceivePduSnafu)? {
            Some(pdu) => {
                read_buffer.advance(buf.position() as usize);
                break pdu;
            }
            None => {
                // Reset position
                buf.set_position(0)
            }
        }
        let recv = reader
            .read_buf(read_buffer)
            .await
            .context(ReadPduSnafu)
            .context(ReceivePduSnafu)?;
        ensure!(recv > 0, ConnectionClosedSnafu);
    };
    Ok(msg)
}