use dicom_toolkit_core::error::DcmResult;
use dicom_toolkit_data::{io::reader::DicomReader, DataSet};
use dicom_toolkit_dict::{tags, Vr};
use crate::association::Association;
use crate::config::AssociationConfig;
use crate::presentation::PresentationContextRq;
use crate::services::provider::{DestinationLookup, MoveEvent, MoveServiceProvider};
use crate::services::recv_command_data_bytes;
use crate::services::store::c_store;
#[derive(Debug, Clone)]
pub struct MoveRequest {
pub sop_class_uid: String,
pub destination: String,
pub query: Vec<u8>,
pub context_id: u8,
pub priority: u16,
}
#[derive(Debug, Clone)]
pub struct MoveResponse {
pub status: u16,
pub remaining: Option<u16>,
pub completed: Option<u16>,
pub failed: Option<u16>,
pub warning: Option<u16>,
}
pub async fn c_move(assoc: &mut Association, req: MoveRequest) -> DcmResult<Vec<MoveResponse>> {
let msg_id = next_message_id();
let mut cmd = DataSet::new();
cmd.set_uid(tags::AFFECTED_SOP_CLASS_UID, &req.sop_class_uid);
cmd.set_u16(tags::COMMAND_FIELD, 0x0021); cmd.set_u16(tags::MESSAGE_ID, msg_id);
cmd.set_u16(tags::PRIORITY, req.priority);
cmd.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0000); cmd.set_string(tags::MOVE_DESTINATION, Vr::AE, &req.destination);
assoc.send_dimse_command(req.context_id, &cmd).await?;
assoc.send_dimse_data(req.context_id, &req.query).await?;
let mut responses = Vec::new();
loop {
let (_ctx, rsp_cmd) = assoc.recv_dimse_command().await?;
let status = rsp_cmd.get_u16(tags::STATUS).unwrap_or(0xFFFF);
let has_dataset = rsp_cmd
.get_u16(tags::COMMAND_DATA_SET_TYPE)
.map(|v| v != 0x0101)
.unwrap_or(false);
if has_dataset {
let _ = assoc.recv_dimse_data().await?;
}
responses.push(MoveResponse {
status,
remaining: rsp_cmd.get_u16(tags::NUMBER_OF_REMAINING_SUB_OPERATIONS),
completed: rsp_cmd.get_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS),
failed: rsp_cmd.get_u16(tags::NUMBER_OF_FAILED_SUB_OPERATIONS),
warning: rsp_cmd.get_u16(tags::NUMBER_OF_WARNING_SUB_OPERATIONS),
});
let is_pending = status == 0xFF00 || status == 0xFF01;
if !is_pending {
break;
}
}
Ok(responses)
}
fn next_message_id() -> u16 {
use std::sync::atomic::{AtomicU16, Ordering};
static ID: AtomicU16 = AtomicU16::new(1);
ID.fetch_add(1, Ordering::Relaxed)
}
const TS_EXPLICIT_LE: &str = "1.2.840.10008.1.2.1";
pub async fn handle_move_rq<P, L>(
assoc: &mut Association,
ctx_id: u8,
cmd: &DataSet,
provider: &P,
dest_lookup: &L,
local_ae: &str,
) -> DcmResult<()>
where
P: MoveServiceProvider,
L: DestinationLookup + ?Sized,
{
let sop_class = cmd
.get_string(tags::AFFECTED_SOP_CLASS_UID)
.unwrap_or_default()
.trim_end_matches('\0')
.to_string();
let msg_id = cmd.get_u16(tags::MESSAGE_ID).unwrap_or(1);
let destination = cmd
.get_string(tags::MOVE_DESTINATION)
.unwrap_or_default()
.trim()
.to_string();
let query_bytes = recv_command_data_bytes(assoc, cmd).await?;
let ts = assoc
.context_by_id(ctx_id)
.map(|pc| pc.transfer_syntax.trim_end_matches('\0').to_string())
.unwrap_or_else(|| TS_EXPLICIT_LE.to_string());
let identifier = DicomReader::new(query_bytes.as_slice())
.read_dataset(&ts)
.unwrap_or_else(|_| DataSet::new());
let dest_addr = match dest_lookup.lookup(&destination) {
Some(addr) => addr,
None => {
let mut rsp = DataSet::new();
rsp.set_uid(tags::AFFECTED_SOP_CLASS_UID, &sop_class);
rsp.set_u16(tags::COMMAND_FIELD, 0x8021); rsp.set_u16(tags::MESSAGE_ID_BEING_RESPONDED_TO, msg_id);
rsp.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0101);
rsp.set_u16(tags::STATUS, 0xA801); return assoc.send_dimse_command(ctx_id, &rsp).await;
}
};
let event = MoveEvent {
calling_ae: assoc.calling_ae.clone(),
destination: destination.clone(),
sop_class_uid: sop_class.clone(),
identifier,
};
let items = provider.on_move(event).await;
if items.is_empty() {
let mut rsp = DataSet::new();
rsp.set_uid(tags::AFFECTED_SOP_CLASS_UID, &sop_class);
rsp.set_u16(tags::COMMAND_FIELD, 0x8021);
rsp.set_u16(tags::MESSAGE_ID_BEING_RESPONDED_TO, msg_id);
rsp.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0101);
rsp.set_u16(tags::STATUS, 0x0000);
rsp.set_u16(tags::NUMBER_OF_REMAINING_SUB_OPERATIONS, 0);
rsp.set_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS, 0);
rsp.set_u16(tags::NUMBER_OF_FAILED_SUB_OPERATIONS, 0);
rsp.set_u16(tags::NUMBER_OF_WARNING_SUB_OPERATIONS, 0);
return assoc.send_dimse_command(ctx_id, &rsp).await;
}
let mut unique_sop_classes: Vec<String> = items
.iter()
.map(|i| i.sop_class_uid.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
unique_sop_classes.sort();
let sub_contexts: Vec<PresentationContextRq> = unique_sop_classes
.iter()
.enumerate()
.map(|(i, sc)| PresentationContextRq {
id: (i * 2 + 1) as u8,
abstract_syntax: sc.clone(),
transfer_syntaxes: vec![TS_EXPLICIT_LE.to_string()],
})
.collect();
let sub_cfg = AssociationConfig {
local_ae_title: local_ae.to_string(),
accept_all_transfer_syntaxes: true,
..Default::default()
};
let mut sub_assoc =
match Association::request(&dest_addr, &destination, local_ae, &sub_contexts, &sub_cfg)
.await
{
Ok(a) => a,
Err(_) => {
let total = items.len() as u16;
let mut rsp = DataSet::new();
rsp.set_uid(tags::AFFECTED_SOP_CLASS_UID, &sop_class);
rsp.set_u16(tags::COMMAND_FIELD, 0x8021);
rsp.set_u16(tags::MESSAGE_ID_BEING_RESPONDED_TO, msg_id);
rsp.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0101);
rsp.set_u16(tags::STATUS, 0xA801);
rsp.set_u16(tags::NUMBER_OF_REMAINING_SUB_OPERATIONS, 0);
rsp.set_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS, 0);
rsp.set_u16(tags::NUMBER_OF_FAILED_SUB_OPERATIONS, total);
rsp.set_u16(tags::NUMBER_OF_WARNING_SUB_OPERATIONS, 0);
return assoc.send_dimse_command(ctx_id, &rsp).await;
}
};
let total = items.len() as u16;
let mut completed: u16 = 0;
let mut failed: u16 = 0;
for item in &items {
let remaining = total.saturating_sub(completed + failed + 1);
if let Some(store_ctx) = sub_assoc.find_context(&item.sop_class_uid) {
use crate::services::store::StoreRequest;
let req = StoreRequest {
sop_class_uid: item.sop_class_uid.clone(),
sop_instance_uid: item.sop_instance_uid.clone(),
priority: 0,
dataset_bytes: item.dataset.clone(),
context_id: store_ctx.id,
};
match c_store(&mut sub_assoc, req).await {
Ok(rsp) if rsp.status == 0x0000 => completed += 1,
_ => failed += 1,
}
} else {
failed += 1;
}
let mut pending_rsp = DataSet::new();
pending_rsp.set_uid(tags::AFFECTED_SOP_CLASS_UID, &sop_class);
pending_rsp.set_u16(tags::COMMAND_FIELD, 0x8021); pending_rsp.set_u16(tags::MESSAGE_ID_BEING_RESPONDED_TO, msg_id);
pending_rsp.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0101); pending_rsp.set_u16(tags::STATUS, 0xFF00); pending_rsp.set_u16(tags::NUMBER_OF_REMAINING_SUB_OPERATIONS, remaining);
pending_rsp.set_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS, completed);
pending_rsp.set_u16(tags::NUMBER_OF_FAILED_SUB_OPERATIONS, failed);
pending_rsp.set_u16(tags::NUMBER_OF_WARNING_SUB_OPERATIONS, 0);
assoc.send_dimse_command(ctx_id, &pending_rsp).await?;
}
let _ = sub_assoc.release().await;
let final_status: u16 = if failed > 0 { 0xB000 } else { 0x0000 };
let mut final_rsp = DataSet::new();
final_rsp.set_uid(tags::AFFECTED_SOP_CLASS_UID, &sop_class);
final_rsp.set_u16(tags::COMMAND_FIELD, 0x8021); final_rsp.set_u16(tags::MESSAGE_ID_BEING_RESPONDED_TO, msg_id);
final_rsp.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0101); final_rsp.set_u16(tags::STATUS, final_status);
final_rsp.set_u16(tags::NUMBER_OF_REMAINING_SUB_OPERATIONS, 0);
final_rsp.set_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS, completed);
final_rsp.set_u16(tags::NUMBER_OF_FAILED_SUB_OPERATIONS, failed);
final_rsp.set_u16(tags::NUMBER_OF_WARNING_SUB_OPERATIONS, 0);
assoc.send_dimse_command(ctx_id, &final_rsp).await
}
#[cfg(test)]
mod tests {
use crate::dimse;
use dicom_toolkit_data::DataSet;
use dicom_toolkit_dict::{tags, Vr};
#[test]
fn c_move_rq_command_build() {
let mut cmd = DataSet::new();
cmd.set_uid(tags::AFFECTED_SOP_CLASS_UID, "1.2.840.10008.5.1.4.1.2.1.2");
cmd.set_u16(tags::COMMAND_FIELD, 0x0021); cmd.set_u16(tags::MESSAGE_ID, 3);
cmd.set_u16(tags::PRIORITY, 0);
cmd.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0000);
cmd.set_string(tags::MOVE_DESTINATION, Vr::AE, "STORAGESCU");
let bytes = dimse::encode_command_dataset(&cmd);
let decoded = dimse::decode_command_dataset(&bytes).unwrap();
assert_eq!(decoded.get_u16(tags::COMMAND_FIELD), Some(0x0021));
assert_eq!(decoded.get_u16(tags::MESSAGE_ID), Some(3));
assert_eq!(
decoded.get_string(tags::MOVE_DESTINATION),
Some("STORAGESCU")
);
}
#[test]
fn c_move_rsp_pending_has_counts() {
let mut rsp = DataSet::new();
rsp.set_u16(tags::COMMAND_FIELD, 0x8021); rsp.set_u16(tags::MESSAGE_ID_BEING_RESPONDED_TO, 3);
rsp.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0101); rsp.set_u16(tags::STATUS, 0xFF00); rsp.set_u16(tags::NUMBER_OF_REMAINING_SUB_OPERATIONS, 10);
rsp.set_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS, 3);
rsp.set_u16(tags::NUMBER_OF_FAILED_SUB_OPERATIONS, 0);
rsp.set_u16(tags::NUMBER_OF_WARNING_SUB_OPERATIONS, 1);
let bytes = dimse::encode_command_dataset(&rsp);
let decoded = dimse::decode_command_dataset(&bytes).unwrap();
assert_eq!(decoded.get_u16(tags::STATUS), Some(0xFF00));
assert_eq!(
decoded.get_u16(tags::NUMBER_OF_REMAINING_SUB_OPERATIONS),
Some(10)
);
assert_eq!(
decoded.get_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS),
Some(3)
);
assert_eq!(
decoded.get_u16(tags::NUMBER_OF_WARNING_SUB_OPERATIONS),
Some(1)
);
}
#[test]
fn c_move_rsp_final_success() {
let mut rsp = DataSet::new();
rsp.set_u16(tags::COMMAND_FIELD, 0x8021);
rsp.set_u16(tags::MESSAGE_ID_BEING_RESPONDED_TO, 3);
rsp.set_u16(tags::COMMAND_DATA_SET_TYPE, 0x0101);
rsp.set_u16(tags::STATUS, 0x0000);
rsp.set_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS, 13);
let bytes = dimse::encode_command_dataset(&rsp);
let decoded = dimse::decode_command_dataset(&bytes).unwrap();
assert_eq!(decoded.get_u16(tags::STATUS), Some(0x0000));
assert_eq!(
decoded.get_u16(tags::NUMBER_OF_COMPLETED_SUB_OPERATIONS),
Some(13)
);
}
}