use bytes::BytesMut;
use std::borrow::Cow;
use std::time::Duration;
use std::{io::Write, net::TcpStream};
use crate::association::private::SyncAssociationSealed;
use crate::association::{
encode_pdu, read_pdu_from_wire, AbortedSnafu, Association, CloseSocket,
MissingAbstractSyntaxSnafu, RejectedSnafu, SendPduSnafu, SocketOptions, SyncAssociation,
UnexpectedPduSnafu, UnknownPduSnafu, WireSendSnafu,
};
use dicom_encoding::transfer_syntax::TransferSyntaxIndex;
use dicom_transfer_syntax_registry::TransferSyntaxRegistry;
use snafu::{ensure, ResultExt};
use crate::association::NegotiatedOptions;
use crate::pdu::{PresentationContextNegotiated, LARGE_PDU_SIZE};
use crate::{
pdu::{
write_pdu, AbortRQServiceProviderReason, AbortRQSource, AssociationAC, AssociationRJ,
AssociationRJResult, AssociationRJServiceUserReason, AssociationRJSource, AssociationRQ,
Pdu, PresentationContextResult, PresentationContextResultReason, UserIdentity,
UserVariableItem, DEFAULT_MAX_PDU, PDU_HEADER_SIZE,
},
IMPLEMENTATION_CLASS_UID, IMPLEMENTATION_VERSION_NAME,
};
use super::{uid::trim_uid, Error, Result};
#[cfg(feature = "async")]
use crate::association::AsyncAssociation;
#[deprecated(since = "0.9.1")]
pub mod non_blocking {}
#[cfg(feature = "sync-tls")]
pub type TlsStream = rustls::StreamOwned<rustls::ServerConnection, std::net::TcpStream>;
#[cfg(feature = "async-tls")]
pub type AsyncTlsStream = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
pub trait AccessControl {
fn check_access(
&self,
this_ae_title: &str,
calling_ae_title: &str,
called_ae_title: &str,
user_identity: Option<&UserIdentity>,
) -> Result<(), AssociationRJServiceUserReason>;
}
#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)]
pub struct AcceptAny;
impl AccessControl for AcceptAny {
fn check_access(
&self,
_this_ae_title: &str,
_calling_ae_title: &str,
_called_ae_title: &str,
_user_identity: Option<&UserIdentity>,
) -> Result<(), AssociationRJServiceUserReason> {
Ok(())
}
}
#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)]
pub struct AcceptCalledAeTitle;
impl AccessControl for AcceptCalledAeTitle {
fn check_access(
&self,
this_ae_title: &str,
_calling_ae_title: &str,
called_ae_title: &str,
_user_identity: Option<&UserIdentity>,
) -> Result<(), AssociationRJServiceUserReason> {
if this_ae_title == called_ae_title {
Ok(())
} else {
Err(AssociationRJServiceUserReason::CalledAETitleNotRecognized)
}
}
}
#[derive(Debug, Clone)]
pub struct ServerAssociationOptions<'a, A> {
ae_access_control: A,
ae_title: Cow<'a, str>,
application_context_name: Cow<'a, str>,
abstract_syntax_uids: Vec<Cow<'a, str>>,
transfer_syntax_uids: Vec<Cow<'a, str>>,
protocol_version: u16,
max_pdu_length: u32,
strict: bool,
promiscuous: bool,
socket_options: SocketOptions,
#[cfg(feature = "sync-tls")]
tls_config: Option<std::sync::Arc<rustls::ServerConfig>>,
}
impl Default for ServerAssociationOptions<'_, AcceptAny> {
fn default() -> Self {
ServerAssociationOptions {
ae_access_control: AcceptAny,
ae_title: "THIS-SCP".into(),
application_context_name: "1.2.840.10008.3.1.1.1".into(),
abstract_syntax_uids: Vec::new(),
transfer_syntax_uids: Vec::new(),
protocol_version: 1,
max_pdu_length: DEFAULT_MAX_PDU,
strict: true,
promiscuous: false,
socket_options: SocketOptions::default(),
#[cfg(feature = "sync-tls")]
tls_config: None,
}
}
}
impl ServerAssociationOptions<'_, AcceptAny> {
pub fn new() -> Self {
Self::default()
}
}
impl<'a, A> ServerAssociationOptions<'a, A>
where
A: AccessControl,
{
pub fn accept_any(self) -> ServerAssociationOptions<'a, AcceptAny> {
self.ae_access_control(AcceptAny)
}
pub fn accept_called_ae_title(self) -> ServerAssociationOptions<'a, AcceptCalledAeTitle> {
self.ae_access_control(AcceptCalledAeTitle)
}
pub fn ae_access_control<P>(self, access_control: P) -> ServerAssociationOptions<'a, P>
where
P: AccessControl,
{
let ServerAssociationOptions {
ae_title,
application_context_name,
abstract_syntax_uids,
transfer_syntax_uids,
protocol_version,
max_pdu_length,
strict,
promiscuous,
ae_access_control: _,
socket_options,
#[cfg(feature = "sync-tls")]
tls_config,
} = self;
ServerAssociationOptions {
ae_access_control: access_control,
ae_title,
application_context_name,
abstract_syntax_uids,
transfer_syntax_uids,
protocol_version,
max_pdu_length,
strict,
promiscuous,
socket_options,
#[cfg(feature = "sync-tls")]
tls_config,
}
}
pub fn ae_title<T>(mut self, ae_title: T) -> Self
where
T: Into<Cow<'a, str>>,
{
self.ae_title = ae_title.into();
self
}
pub fn with_abstract_syntax<T>(mut self, abstract_syntax_uid: T) -> Self
where
T: Into<Cow<'a, str>>,
{
self.abstract_syntax_uids
.push(trim_uid(abstract_syntax_uid.into()));
self
}
pub fn with_transfer_syntax<T>(mut self, transfer_syntax_uid: T) -> Self
where
T: Into<Cow<'a, str>>,
{
self.transfer_syntax_uids
.push(trim_uid(transfer_syntax_uid.into()));
self
}
pub fn max_pdu_length(mut self, value: u32) -> Self {
self.max_pdu_length = value;
self
}
pub fn strict(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn promiscuous(mut self, promiscuous: bool) -> Self {
self.promiscuous = promiscuous;
self
}
pub fn read_timeout(self, timeout: Duration) -> Self {
Self {
socket_options: SocketOptions {
read_timeout: Some(timeout),
write_timeout: self.socket_options.write_timeout,
connection_timeout: self.socket_options.connection_timeout,
},
..self
}
}
pub fn write_timeout(self, timeout: Duration) -> Self {
Self {
socket_options: SocketOptions {
read_timeout: self.socket_options.read_timeout,
write_timeout: Some(timeout),
connection_timeout: self.socket_options.connection_timeout,
},
..self
}
}
#[cfg(feature = "sync-tls")]
pub fn tls_config(mut self, config: impl Into<std::sync::Arc<rustls::ServerConfig>>) -> Self {
self.tls_config = Some(config.into());
self
}
#[allow(clippy::result_large_err)]
fn process_a_association_rq(
&self,
msg: Pdu,
) -> std::result::Result<(Pdu, NegotiatedOptions), (Pdu, Error)> {
match msg {
Pdu::AssociationRQ(AssociationRQ {
protocol_version,
calling_ae_title,
called_ae_title,
application_context_name,
presentation_contexts,
user_variables,
}) => {
if protocol_version != self.protocol_version {
let association_rj = AssociationRJ {
result: AssociationRJResult::Permanent,
source: AssociationRJSource::ServiceUser(
AssociationRJServiceUserReason::NoReasonGiven,
),
};
let pdu = Pdu::AssociationRJ(association_rj.clone());
return Err((pdu, RejectedSnafu { association_rj }.build()));
}
if application_context_name != self.application_context_name {
let association_rj = AssociationRJ {
result: AssociationRJResult::Permanent,
source: AssociationRJSource::ServiceUser(
AssociationRJServiceUserReason::ApplicationContextNameNotSupported,
),
};
let pdu = Pdu::AssociationRJ(association_rj.clone());
return Err((pdu, RejectedSnafu { association_rj }.build()));
}
self.ae_access_control
.check_access(
&self.ae_title,
&calling_ae_title,
&called_ae_title,
user_variables
.iter()
.find_map(|user_variable| match user_variable {
UserVariableItem::UserIdentityItem(user_identity) => {
Some(user_identity)
}
_ => None,
}),
)
.map(Ok)
.unwrap_or_else(|reason| {
let association_rj = AssociationRJ {
result: AssociationRJResult::Permanent,
source: AssociationRJSource::ServiceUser(reason),
};
let pdu = Pdu::AssociationRJ(association_rj.clone());
Err((pdu, RejectedSnafu { association_rj }.build()))
})?;
let requestor_max_pdu_length = user_variables
.iter()
.find_map(|item| match item {
UserVariableItem::MaxLength(len) => Some(*len),
_ => None,
})
.unwrap_or(DEFAULT_MAX_PDU);
let requestor_max_pdu_length = if requestor_max_pdu_length == 0 {
u32::MAX
} else {
requestor_max_pdu_length
};
let presentation_contexts_negotiated: Vec<_> = presentation_contexts
.into_iter()
.map(|pc| {
let abstract_syntax = trim_uid(Cow::from(pc.abstract_syntax));
if !self.abstract_syntax_uids.contains(&abstract_syntax)
&& !self.promiscuous
{
return PresentationContextNegotiated {
id: pc.id,
reason: PresentationContextResultReason::AbstractSyntaxNotSupported,
transfer_syntax: "1.2.840.10008.1.2".to_string(),
abstract_syntax: abstract_syntax.to_string(),
};
}
let (transfer_syntax, reason) = self
.choose_ts(pc.transfer_syntaxes)
.map(|ts| (ts, PresentationContextResultReason::Acceptance))
.unwrap_or_else(|| {
(
"1.2.840.10008.1.2".to_string(),
PresentationContextResultReason::TransferSyntaxesNotSupported,
)
});
PresentationContextNegotiated {
id: pc.id,
reason,
transfer_syntax,
abstract_syntax: abstract_syntax.to_string(),
}
})
.collect();
let pdu = Pdu::AssociationAC(AssociationAC {
protocol_version: self.protocol_version,
application_context_name,
presentation_contexts: presentation_contexts_negotiated
.iter()
.map(|pc| PresentationContextResult {
id: pc.id,
reason: pc.reason.clone(),
transfer_syntax: pc.transfer_syntax.clone(),
})
.collect(),
calling_ae_title: calling_ae_title.clone(),
called_ae_title,
user_variables: vec![
UserVariableItem::MaxLength(self.max_pdu_length),
UserVariableItem::ImplementationClassUID(
IMPLEMENTATION_CLASS_UID.to_string(),
),
UserVariableItem::ImplementationVersionName(
IMPLEMENTATION_VERSION_NAME.to_string(),
),
],
});
Ok((
pdu,
NegotiatedOptions {
peer_max_pdu_length: requestor_max_pdu_length,
user_variables,
presentation_contexts: presentation_contexts_negotiated,
peer_ae_title: calling_ae_title,
},
))
}
Pdu::ReleaseRQ => Err((Pdu::ReleaseRP, AbortedSnafu.build())),
pdu @ Pdu::AssociationAC { .. }
| pdu @ Pdu::AssociationRJ { .. }
| pdu @ Pdu::PData { .. }
| pdu @ Pdu::ReleaseRP
| pdu @ Pdu::AbortRQ { .. } => Err((
Pdu::AbortRQ {
source: AbortRQSource::ServiceProvider(
AbortRQServiceProviderReason::UnexpectedPdu,
),
},
UnexpectedPduSnafu { pdu }.build(),
)),
pdu @ Pdu::Unknown { .. } => Err((
Pdu::AbortRQ {
source: AbortRQSource::ServiceProvider(
AbortRQServiceProviderReason::UnrecognizedPdu,
),
},
UnknownPduSnafu { pdu }.build(),
)),
}
}
pub fn establish(&self, mut socket: TcpStream) -> Result<ServerAssociation<TcpStream>> {
ensure!(
!self.abstract_syntax_uids.is_empty() || self.promiscuous,
MissingAbstractSyntaxSnafu
);
socket
.set_read_timeout(self.socket_options.read_timeout)
.context(super::SetReadTimeoutSnafu)?;
socket
.set_write_timeout(self.socket_options.write_timeout)
.context(super::SetWriteTimeoutSnafu)?;
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let msg = read_pdu_from_wire(
&mut socket,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)?;
let mut write_buffer: Vec<u8> = Vec::with_capacity(self.max_pdu_length as usize);
match self.process_a_association_rq(msg) {
Ok((
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
socket.write_all(&write_buffer).context(WireSendSnafu)?;
Ok(ServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket,
client_ae_title: peer_ae_title,
write_buffer,
strict: self.strict,
read_buffer,
user_variables,
})
}
Err((pdu, err)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
socket.write_all(&write_buffer).context(WireSendSnafu)?;
Err(err)
}
}
}
#[cfg(feature = "sync-tls")]
pub fn establish_tls(&self, socket: TcpStream) -> Result<ServerAssociation<TlsStream>> {
ensure!(
!self.abstract_syntax_uids.is_empty() || self.promiscuous,
MissingAbstractSyntaxSnafu
);
let tls_config = self
.tls_config
.as_ref()
.ok_or_else(|| super::TlsConfigMissingSnafu {}.build())?;
socket
.set_read_timeout(self.socket_options.read_timeout)
.context(super::SetReadTimeoutSnafu)?;
socket
.set_write_timeout(self.socket_options.write_timeout)
.context(super::SetWriteTimeoutSnafu)?;
let conn =
rustls::ServerConnection::new(tls_config.clone()).context(super::TlsConnectionSnafu)?;
let mut tls_stream = rustls::StreamOwned::new(conn, socket);
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let msg = read_pdu_from_wire(
&mut tls_stream,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)?;
let mut write_buffer: Vec<u8> = Vec::with_capacity(self.max_pdu_length as usize);
match self.process_a_association_rq(msg) {
Ok((
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
tls_stream.write_all(&write_buffer).context(WireSendSnafu)?;
Ok(ServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket: tls_stream,
client_ae_title: peer_ae_title,
write_buffer,
strict: self.strict,
read_buffer,
user_variables,
})
}
Err((pdu, err)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
tls_stream.write_all(&write_buffer).context(WireSendSnafu)?;
Err(err)
}
}
}
fn choose_ts<I, T>(&self, it: I) -> Option<T>
where
I: IntoIterator<Item = T>,
T: AsRef<str>,
{
if self.transfer_syntax_uids.is_empty() {
return choose_supported(it);
}
it.into_iter().find(|ts| {
let ts = ts.as_ref();
if self.transfer_syntax_uids.is_empty() {
ts.trim_end_matches(|c: char| c.is_whitespace() || c == '\0') == "1.2.840.10008.1.2"
} else {
self.transfer_syntax_uids.contains(&trim_uid(ts.into())) && is_supported(ts)
}
})
}
}
#[derive(Debug)]
pub struct ServerAssociation<S> {
presentation_contexts: Vec<PresentationContextNegotiated>,
requestor_max_pdu_length: u32,
acceptor_max_pdu_length: u32,
socket: S,
client_ae_title: String,
write_buffer: Vec<u8>,
strict: bool,
read_buffer: bytes::BytesMut,
user_variables: Vec<UserVariableItem>,
}
impl<S> ServerAssociation<S> {
pub fn presentation_contexts(&self) -> &[PresentationContextNegotiated] {
&self.presentation_contexts
}
pub fn acceptor_max_pdu_length(&self) -> u32 {
self.acceptor_max_pdu_length
}
pub fn requestor_max_pdu_length(&self) -> u32 {
self.requestor_max_pdu_length
}
#[deprecated(
since = "0.9.1",
note = "Call `peer_ae_title` from trait `Association`"
)]
pub fn client_ae_title(&self) -> &str {
&self.client_ae_title
}
}
impl<S> ServerAssociation<S>
where
S: std::io::Read + std::io::Write + CloseSocket,
{
pub fn send(&mut self, msg: &Pdu) -> Result<()> {
SyncAssociation::send(self, msg)
}
pub fn receive(&mut self) -> Result<Pdu> {
SyncAssociation::receive(self)
}
pub fn abort(self) -> Result<()> {
SyncAssociation::abort(self)
}
pub fn send_pdata(
&mut self,
presentation_context_id: u8,
) -> crate::association::pdata::PDataWriter<&mut S> {
SyncAssociation::send_pdata(self, presentation_context_id)
}
pub fn receive_pdata(&mut self) -> crate::association::pdata::PDataReader<'_, &mut S> {
SyncAssociation::receive_pdata(self)
}
pub fn inner_stream(&mut self) -> &mut S {
SyncAssociation::inner_stream(self)
}
}
impl<S> Association for ServerAssociation<S>
where
S: std::io::Read + std::io::Write + CloseSocket,
{
fn presentation_contexts(&self) -> &[PresentationContextNegotiated] {
&self.presentation_contexts
}
fn acceptor_max_pdu_length(&self) -> u32 {
self.acceptor_max_pdu_length
}
fn requestor_max_pdu_length(&self) -> u32 {
self.requestor_max_pdu_length
}
fn local_max_pdu_length(&self) -> u32 {
self.acceptor_max_pdu_length
}
fn peer_max_pdu_length(&self) -> u32 {
self.requestor_max_pdu_length
}
fn peer_ae_title(&self) -> &str {
&self.client_ae_title
}
fn user_variables(&self) -> &[UserVariableItem] {
&self.user_variables
}
}
impl<S> SyncAssociationSealed<S> for ServerAssociation<S>
where
S: std::io::Read + std::io::Write + CloseSocket,
{
fn send(&mut self, pdu: &Pdu) -> Result<()> {
self.write_buffer.clear();
encode_pdu(
&mut self.write_buffer,
pdu,
self.requestor_max_pdu_length + PDU_HEADER_SIZE,
)?;
self.socket
.write_all(&self.write_buffer)
.context(WireSendSnafu)
}
fn receive(&mut self) -> Result<Pdu> {
read_pdu_from_wire(
&mut self.socket,
&mut self.read_buffer,
self.acceptor_max_pdu_length,
self.strict,
)
}
fn close(&mut self) -> std::io::Result<()> {
self.socket.close()
}
}
impl<S> SyncAssociation<S> for ServerAssociation<S>
where
S: std::io::Read + std::io::Write + CloseSocket,
{
fn inner_stream(&mut self) -> &mut S {
&mut self.socket
}
fn get_mut(&mut self) -> (&mut S, &mut BytesMut) {
let Self {
socket,
read_buffer,
..
} = self;
(socket, read_buffer)
}
}
pub fn is_supported_with_repo<R>(ts_repo: R, ts_uid: &str) -> bool
where
R: TransferSyntaxIndex,
{
ts_repo
.get(ts_uid)
.filter(|ts| !ts.is_unsupported())
.is_some()
}
pub fn is_supported(ts_uid: &str) -> bool {
is_supported_with_repo(TransferSyntaxRegistry, ts_uid)
}
pub fn choose_supported_with_repo<R, I, T>(ts_repo: R, it: I) -> Option<T>
where
R: TransferSyntaxIndex,
I: IntoIterator<Item = T>,
T: AsRef<str>,
{
it.into_iter()
.find(|ts| is_supported_with_repo(&ts_repo, ts.as_ref()))
}
pub fn choose_supported<I, T>(it: I) -> Option<T>
where
I: IntoIterator<Item = T>,
T: AsRef<str>,
{
it.into_iter().find(|ts| is_supported(ts.as_ref()))
}
#[cfg(feature = "async")]
impl<A> ServerAssociationOptions<'_, A>
where
A: AccessControl,
{
pub async fn establish_async(
&self,
mut socket: tokio::net::TcpStream,
) -> Result<AsyncServerAssociation<tokio::net::TcpStream>> {
use tokio::io::AsyncWriteExt;
ensure!(
!self.abstract_syntax_uids.is_empty() || self.promiscuous,
MissingAbstractSyntaxSnafu
);
let read_timeout = self.socket_options.read_timeout;
let task = async {
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let pdu = super::read_pdu_from_wire_async(
&mut socket,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)
.await?;
let mut write_buffer: Vec<u8> =
Vec::with_capacity((DEFAULT_MAX_PDU + PDU_HEADER_SIZE) as usize);
match self.process_a_association_rq(pdu) {
Ok((
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
socket
.write_all(&write_buffer)
.await
.context(WireSendSnafu)?;
Ok(AsyncServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket,
client_ae_title: peer_ae_title,
write_buffer,
strict: self.strict,
read_buffer,
read_timeout: self.socket_options.read_timeout,
write_timeout: self.socket_options.write_timeout,
user_variables,
})
}
Err((pdu, err)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
socket
.write_all(&write_buffer)
.await
.context(WireSendSnafu)?;
Err(err)
}
}
};
super::timeout(read_timeout, task).await
}
#[cfg(feature = "async-tls")]
pub async fn establish_tls_async(
&self,
socket: tokio::net::TcpStream,
) -> Result<AsyncServerAssociation<AsyncTlsStream>> {
use tokio::io::AsyncWriteExt;
use tokio_rustls::TlsAcceptor;
ensure!(
!self.abstract_syntax_uids.is_empty() || self.promiscuous,
MissingAbstractSyntaxSnafu
);
let tls_config = self
.tls_config
.as_ref()
.ok_or_else(|| crate::association::TlsConfigMissingSnafu {}.build())?;
let acceptor = TlsAcceptor::from(tls_config.clone());
let mut socket = acceptor
.accept(socket)
.await
.context(crate::association::ConnectSnafu)?;
let read_timeout = self.socket_options.read_timeout;
let task = async {
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let pdu = super::read_pdu_from_wire_async(
&mut socket,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)
.await?;
let mut write_buffer: Vec<u8> =
Vec::with_capacity((DEFAULT_MAX_PDU + PDU_HEADER_SIZE) as usize);
match self.process_a_association_rq(pdu) {
Ok((
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
socket
.write_all(&write_buffer)
.await
.context(WireSendSnafu)?;
Ok(AsyncServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket,
client_ae_title: peer_ae_title,
write_buffer,
strict: self.strict,
read_buffer,
read_timeout: self.socket_options.read_timeout,
write_timeout: self.socket_options.write_timeout,
user_variables,
})
}
Err((pdu, err)) => {
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
socket
.write_all(&write_buffer)
.await
.context(WireSendSnafu)?;
Err(err)
}
}
};
super::timeout(read_timeout, task).await
}
}
#[cfg(feature = "async")]
#[derive(Debug)]
pub struct AsyncServerAssociation<S> {
presentation_contexts: Vec<PresentationContextNegotiated>,
requestor_max_pdu_length: u32,
acceptor_max_pdu_length: u32,
socket: S,
client_ae_title: String,
write_buffer: Vec<u8>,
strict: bool,
read_buffer: bytes::BytesMut,
read_timeout: Option<std::time::Duration>,
write_timeout: Option<std::time::Duration>,
user_variables: Vec<UserVariableItem>,
}
#[cfg(feature = "async")]
impl<S> Association for AsyncServerAssociation<S>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
{
fn acceptor_max_pdu_length(&self) -> u32 {
self.acceptor_max_pdu_length
}
fn requestor_max_pdu_length(&self) -> u32 {
self.requestor_max_pdu_length
}
fn local_max_pdu_length(&self) -> u32 {
self.acceptor_max_pdu_length
}
fn peer_max_pdu_length(&self) -> u32 {
self.requestor_max_pdu_length
}
fn presentation_contexts(&self) -> &[PresentationContextNegotiated] {
&self.presentation_contexts
}
fn peer_ae_title(&self) -> &str {
&self.client_ae_title
}
fn user_variables(&self) -> &[UserVariableItem] {
&self.user_variables
}
}
#[cfg(feature = "async")]
impl<S> crate::association::private::AsyncAssociationSealed<S> for AsyncServerAssociation<S>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
{
async fn send(&mut self, msg: &Pdu) -> Result<()> {
use tokio::io::AsyncWriteExt;
self.write_buffer.clear();
super::timeout(self.write_timeout, async {
encode_pdu(
&mut self.write_buffer,
msg,
self.requestor_max_pdu_length + PDU_HEADER_SIZE,
)?;
self.socket
.write_all(&self.write_buffer)
.await
.context(WireSendSnafu)
})
.await
}
async fn receive(&mut self) -> Result<Pdu> {
super::timeout(self.read_timeout, async {
super::read_pdu_from_wire_async(
&mut self.socket,
&mut self.read_buffer,
self.acceptor_max_pdu_length,
self.strict,
)
.await
})
.await
}
async fn close(&mut self) -> std::io::Result<()> {
use tokio::io::AsyncWriteExt;
self.socket.shutdown().await
}
}
#[cfg(feature = "async")]
impl<S> crate::association::AsyncAssociation<S> for AsyncServerAssociation<S>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
{
fn inner_stream(&mut self) -> &mut S {
&mut self.socket
}
fn get_mut(&mut self) -> (&mut S, &mut bytes::BytesMut) {
let Self {
socket,
read_buffer,
..
} = self;
(socket, read_buffer)
}
}
#[cfg(feature = "async")]
impl<S> AsyncServerAssociation<S> {
pub fn presentation_contexts(&self) -> &[PresentationContextNegotiated] {
&self.presentation_contexts
}
pub fn acceptor_max_pdu_length(&self) -> u32 {
self.acceptor_max_pdu_length
}
pub fn requestor_max_pdu_length(&self) -> u32 {
self.requestor_max_pdu_length
}
#[deprecated(
since = "0.9.1",
note = "Call `peer_ae_title` from trait `Association`"
)]
pub fn client_ae_title(&self) -> &str {
&self.client_ae_title
}
}
#[cfg(feature = "async")]
impl<S> AsyncServerAssociation<S>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
{
pub async fn send(&mut self, msg: &Pdu) -> Result<()> {
AsyncAssociation::send(self, msg).await
}
pub async fn receive(&mut self) -> Result<Pdu> {
AsyncAssociation::receive(self).await
}
pub async fn release(self) -> Result<()> {
AsyncAssociation::release(self).await
}
pub async fn abort(self) -> Result<()> {
AsyncAssociation::abort(self).await
}
pub fn send_pdata(
&mut self,
presentation_context_id: u8,
) -> crate::association::pdata::non_blocking::AsyncPDataWriter<&mut S> {
AsyncAssociation::send_pdata(self, presentation_context_id)
}
pub fn receive_pdata(&mut self) -> crate::association::pdata::PDataReader<'_, &mut S> {
AsyncAssociation::receive_pdata(self)
}
pub fn inner_stream(&mut self) -> &mut S {
AsyncAssociation::inner_stream(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_choose_supported() {
assert_eq!(choose_supported(vec!["1.1.1.1.1"]), None,);
assert_eq!(
choose_supported(vec!["1.2.840.10008.1.2", "1.2.840.10008.1.2.1"]),
Some("1.2.840.10008.1.2"),
);
assert_eq!(
choose_supported(vec![
"1.2.840.10008.1.2.1".to_string(),
"1.2.840.10008.1.2".to_string()
]),
Some("1.2.840.10008.1.2.1".to_string()),
);
}
impl<'a, A> ServerAssociationOptions<'a, A>
where
A: AccessControl,
{
pub(crate) fn establish_with_extra_pdus(
&self,
mut socket: std::net::TcpStream,
extra_pdus: Vec<Pdu>,
) -> Result<ServerAssociation<TcpStream>> {
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let pdu = read_pdu_from_wire(
&mut socket,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)?;
let (
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
) = self
.process_a_association_rq(pdu)
.expect("Could not parse association req");
let mut write_buffer: Vec<u8> =
Vec::with_capacity((DEFAULT_MAX_PDU + PDU_HEADER_SIZE) as usize);
write_pdu(&mut write_buffer, &pdu).context(SendPduSnafu)?;
for extra_pdu in extra_pdus {
write_pdu(&mut write_buffer, &extra_pdu).context(SendPduSnafu)?;
}
socket.write_all(&write_buffer).context(WireSendSnafu)?;
Ok(ServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket,
client_ae_title: peer_ae_title,
write_buffer,
read_buffer,
strict: self.strict,
user_variables,
})
}
#[cfg(feature = "async")]
pub(crate) async fn establish_with_extra_pdus_async(
&self,
mut socket: tokio::net::TcpStream,
extra_pdus: Vec<Pdu>,
) -> Result<AsyncServerAssociation<tokio::net::TcpStream>> {
use tokio::io::AsyncWriteExt;
use crate::association::read_pdu_from_wire_async;
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let pdu = read_pdu_from_wire_async(
&mut socket,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)
.await?;
let (
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
) = self
.process_a_association_rq(pdu)
.expect("Could not parse association req");
let mut buffer: Vec<u8> = Vec::with_capacity(
(peer_max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
write_pdu(&mut buffer, &pdu).context(SendPduSnafu)?;
for extra_pdu in extra_pdus {
write_pdu(&mut buffer, &extra_pdu).context(SendPduSnafu)?;
}
socket.write_all(&buffer).await.context(WireSendSnafu)?;
Ok(AsyncServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket,
client_ae_title: peer_ae_title,
write_buffer: buffer,
strict: self.strict,
read_buffer: BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
),
user_variables,
read_timeout: self.socket_options.read_timeout,
write_timeout: self.socket_options.write_timeout,
})
}
pub fn broken_establish(
&self,
mut socket: TcpStream,
) -> Result<ServerAssociation<TcpStream>> {
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let msg = read_pdu_from_wire(
&mut socket,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)?;
let (
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
) = self
.process_a_association_rq(msg)
.expect("Could not parse association req");
let mut buffer: Vec<u8> = Vec::with_capacity(
(peer_max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
write_pdu(&mut buffer, &pdu).context(SendPduSnafu)?;
socket.write_all(&buffer).context(WireSendSnafu)?;
Ok(ServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket,
client_ae_title: peer_ae_title,
write_buffer: buffer,
strict: self.strict,
read_buffer: BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
),
user_variables,
})
}
#[cfg(feature = "async")]
pub async fn broken_establish_async(
&self,
mut socket: tokio::net::TcpStream,
) -> Result<AsyncServerAssociation<tokio::net::TcpStream>> {
use tokio::io::AsyncWriteExt;
use crate::association::read_pdu_from_wire_async;
let mut read_buffer = BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
let msg = read_pdu_from_wire_async(
&mut socket,
&mut read_buffer,
self.max_pdu_length,
self.strict,
)
.await?;
let (
pdu,
NegotiatedOptions {
user_variables,
presentation_contexts,
peer_max_pdu_length,
peer_ae_title,
},
) = self
.process_a_association_rq(msg)
.expect("Could not parse association req");
let mut buffer: Vec<u8> = Vec::with_capacity(
(peer_max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
);
write_pdu(&mut buffer, &pdu).context(SendPduSnafu)?;
socket.write_all(&buffer).await.context(WireSendSnafu)?;
Ok(AsyncServerAssociation {
presentation_contexts,
requestor_max_pdu_length: peer_max_pdu_length,
acceptor_max_pdu_length: self.max_pdu_length,
socket,
client_ae_title: peer_ae_title,
write_buffer: buffer,
strict: self.strict,
read_buffer: BytesMut::with_capacity(
(self.max_pdu_length.min(LARGE_PDU_SIZE) + PDU_HEADER_SIZE) as usize,
),
read_timeout: self.socket_options.read_timeout,
write_timeout: self.socket_options.write_timeout,
user_variables,
})
}
}
}