use super::*;
pub(super) fn classify(meta: &MessageMeta, payload: &[u8]) -> Result<OpCode, Error> {
if meta.proto_id == PROTO_ID_BDX {
return opcode(meta).ok_or_else(|| ErrorCode::InvalidOpcode.into());
}
if meta.proto_id == sc::PROTO_ID_SECURE_CHANNEL
&& meta.proto_opcode == sc::OpCode::StatusReport as u8
{
let mut rb = ReadBuf::new(payload);
match StatusReport::read(&mut rb) {
Ok(report) if report.proto_id == PROTO_ID_BDX as u32 => {
error!(
"BDX: peer aborted the transfer (BDX status 0x{:04x})",
report.proto_code
);
}
Ok(report) => {
error!(
"BDX: transfer ended by a non-BDX StatusReport (protocol 0x{:08x}, code 0x{:04x})",
report.proto_id, report.proto_code
);
}
Err(_) => {
error!("BDX: peer aborted the transfer with a malformed StatusReport");
}
}
return Err(ErrorCode::Invalid.into());
}
Err(ErrorCode::InvalidProto.into())
}
pub(super) async fn send_status_report(
exchange: &mut Exchange<'_>,
status: BdxStatus,
) -> Result<(), Error> {
exchange
.send_with(|_, wb| {
status.as_report().write(wb)?;
Ok(Some(sc::OpCode::StatusReport.meta()))
})
.await
}
pub(super) async fn abort<T>(exchange: &mut Exchange<'_>, status: BdxStatus) -> Result<T, Error> {
warn!("BDX: aborting the transfer ({:?})", status);
send_status_report(exchange, status).await?;
Err(ErrorCode::Invalid.into())
}
pub(super) async fn send_init(
exchange: &mut Exchange<'_>,
opcode: OpCode,
max_block_size: u16,
start_offset: Option<u64>,
file_designator: &[u8],
) -> Result<(), Error> {
let start_offset = start_offset.filter(|&o| o > 0);
let init = TransferInit {
transfer_control: TransferControl {
version: BDX_VERSION,
sender_drive: true,
receiver_drive: true,
async_mode: false,
},
range_control: RangeControl {
def_len: false,
start_offset: start_offset.is_some(),
wide_range: start_offset.is_some_and(|o| o > u32::MAX as u64),
},
max_block_size,
start_offset: start_offset.unwrap_or(0),
length: 0,
file_designator,
metadata: &[],
};
exchange
.send_with(|_, wb| {
init.write(wb)?;
Ok(Some(opcode.into()))
})
.await
}
pub(super) async fn send_accept(
exchange: &mut Exchange<'_>,
receive: bool,
transfer_control: TransferControl,
max_block_size: u16,
length: Option<u64>,
) -> Result<(), Error> {
let accept = TransferAccept {
receive,
transfer_control,
range_control: RangeControl {
def_len: receive && length.is_some(),
start_offset: false,
wide_range: length.is_some_and(|len| len > u32::MAX as u64),
},
max_block_size,
length: length.unwrap_or(0),
metadata: &[],
};
let opcode = if receive {
OpCode::ReceiveAccept
} else {
OpCode::SendAccept
};
exchange
.send_with(|_, wb| {
accept.write(wb)?;
Ok(Some(opcode.into()))
})
.await
}
pub(super) async fn recv_accept(
exchange: &mut Exchange<'_>,
receive: bool,
) -> Result<Option<(TransferControl, u16, Option<u64>)>, Error> {
let expected = if receive {
OpCode::ReceiveAccept
} else {
OpCode::SendAccept
};
enum Outcome {
Ok(TransferControl, u16, Option<u64>),
NoMethod,
Unexpected,
Aborted(Error),
}
exchange.recv_fetch().await?;
let meta = exchange.rx()?.meta();
let outcome = {
let payload = exchange.rx()?.payload();
match classify(&meta, payload) {
Ok(op) if op == expected => {
let accept = TransferAccept::parse(receive, payload)?;
let tc = accept.transfer_control;
if tc.sender_drive || tc.receiver_drive {
let length = (accept.range_control.def_len && accept.length > 0)
.then_some(accept.length);
Outcome::Ok(tc, accept.max_block_size, length)
} else {
Outcome::NoMethod
}
}
Ok(_) => Outcome::Unexpected,
Err(e) => Outcome::Aborted(e),
}
};
exchange.rx_done()?;
match outcome {
Outcome::Ok(tc, mbs, length) => Ok(Some((tc, mbs, length))),
Outcome::NoMethod => Ok(None),
Outcome::Unexpected => abort(exchange, BdxStatus::UnexpectedMessage).await,
Outcome::Aborted(e) => Err(e),
}
}
pub(super) async fn recv_init_hold(
exchange: &mut Exchange<'_>,
expected: OpCode,
) -> Result<(TransferControl, u16, Option<u64>, u64), Error> {
enum Outcome {
Ok(TransferControl, u16, Option<u64>, u64),
Unexpected,
Aborted(Error),
}
exchange.recv_fetch().await?;
let meta = exchange.rx()?.meta();
let outcome = {
let payload = exchange.rx()?.payload();
match classify(&meta, payload) {
Ok(op) if op == expected => {
let init = TransferInit::parse(payload)?;
let length = (init.range_control.def_len && init.length > 0).then_some(init.length);
Outcome::Ok(
init.transfer_control,
init.max_block_size,
length,
init.start_offset,
)
}
Ok(_) => Outcome::Unexpected,
Err(e) => Outcome::Aborted(e),
}
};
match outcome {
Outcome::Ok(tc, pmbs, length, start_offset) => Ok((tc, pmbs, length, start_offset)),
Outcome::Unexpected => {
exchange.rx_done()?;
abort(exchange, BdxStatus::UnexpectedMessage).await
}
Outcome::Aborted(e) => {
exchange.rx_done()?;
Err(e)
}
}
}
pub(super) fn held_fd<'x>(exchange: &'x Exchange<'_>) -> &'x [u8] {
exchange
.rx()
.ok()
.and_then(|rx| TransferInit::parse(rx.payload()).ok())
.map(|init| init.file_designator)
.unwrap_or(&[])
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn classify_maps_bdx_opcodes() {
assert_eq!(classify(&OpCode::Block.meta(), &[]).unwrap(), OpCode::Block);
assert_eq!(
classify(&OpCode::ReceiveInit.meta(), &[]).unwrap(),
OpCode::ReceiveInit
);
}
#[test]
fn classify_rejects_status_report() {
let meta = MessageMeta::new(
sc::PROTO_ID_SECURE_CHANNEL,
sc::OpCode::StatusReport as u8,
true,
);
let mut buf = [0u8; 16];
let mut wb = WriteBuf::new(&mut buf);
BdxStatus::TransferFailedUnknownError
.as_report()
.write(&mut wb)
.unwrap();
assert!(classify(&meta, wb.as_slice()).is_err());
assert!(classify(&meta, &[]).is_err());
}
#[test]
fn classify_rejects_other_protocols() {
assert!(classify(&MessageMeta::new(0x99, 0x01, true), &[]).is_err());
}
#[test]
fn classify_rejects_unknown_bdx_opcode() {
assert!(classify(&MessageMeta::new(PROTO_ID_BDX, 0x7f, true), &[]).is_err());
}
}