use super::nego::*;
use super::*;
pub struct BdxWriter<'a, 'b> {
exchange: Exchange<'a>,
drive: Drive,
buf: &'b mut [u8],
max_block_size: usize,
counter: u32,
block_len: usize,
}
impl<'a, 'b> BdxWriter<'a, 'b> {
pub(super) fn new(
exchange: Exchange<'a>,
drive: Drive,
buf: &'b mut [u8],
max_block_size: u16,
) -> Self {
let max_block_size = (max_block_size as usize).min(buf.len());
Self {
exchange,
drive,
buf,
max_block_size,
counter: 0,
block_len: 0,
}
}
pub async fn write(&mut self, data: &[u8]) -> Result<usize, Error> {
if data.is_empty() {
return Ok(0);
}
let space = self.max_block_size - self.block_len;
let n = space.min(data.len());
self.buf[self.block_len..self.block_len + n].copy_from_slice(&data[..n]);
self.block_len += n;
if self.block_len == self.max_block_size {
self.send_block(false).await?;
}
Ok(n)
}
pub fn max_block_size(&self) -> usize {
self.max_block_size
}
pub fn block_buf(&mut self) -> &mut [u8] {
&mut self.buf[..self.max_block_size]
}
pub async fn commit(&mut self, len: usize) -> Result<(), Error> {
if len > self.max_block_size {
return Err(ErrorCode::Invalid.into());
}
self.block_len = len;
self.send_block(false).await
}
pub async fn finish(mut self) -> Result<(), Error> {
self.send_block(true).await?;
self.exchange.acknowledge().await
}
async fn send_block(&mut self, is_eof: bool) -> Result<(), Error> {
let counter = self.counter;
if matches!(self.drive, Drive::Follower) {
self.recv_control(OpCode::BlockQuery, counter).await?;
}
let opcode = if is_eof {
OpCode::BlockEof
} else {
OpCode::Block
};
let len = self.block_len;
{
let data = &self.buf[..len];
self.exchange
.send_with(|_, wb| {
Block {
block_counter: counter,
data,
}
.write(wb)?;
Ok(Some(opcode.into()))
})
.await?;
}
self.block_len = 0;
if matches!(self.drive, Drive::Driver) {
let ack = if is_eof {
OpCode::BlockAckEof
} else {
OpCode::BlockAck
};
self.recv_control(ack, counter).await?;
} else if is_eof {
self.recv_control(OpCode::BlockAckEof, counter).await?;
}
self.counter = self.counter.wrapping_add(1);
Ok(())
}
async fn recv_control(&mut self, expected: OpCode, expected_counter: u32) -> Result<(), Error> {
enum Outcome {
Ok,
BadCounter,
Unexpected,
Aborted(Error),
}
self.exchange.recv_fetch().await?;
let meta = self.exchange.rx()?.meta();
let outcome = {
let payload = self.exchange.rx()?.payload();
match classify(&meta, payload) {
Ok(op) if op == expected => {
if BlockQuery::parse(payload)?.block_counter == expected_counter {
Outcome::Ok
} else {
Outcome::BadCounter
}
}
Ok(_) => Outcome::Unexpected,
Err(e) => Outcome::Aborted(e),
}
};
self.exchange.rx_done()?;
match outcome {
Outcome::Ok => Ok(()),
Outcome::BadCounter => abort(&mut self.exchange, BdxStatus::BadBlockCounter).await,
Outcome::Unexpected => abort(&mut self.exchange, BdxStatus::UnexpectedMessage).await,
Outcome::Aborted(e) => Err(e),
}
}
}
impl embedded_io_async::ErrorType for BdxWriter<'_, '_> {
type Error = Error;
}
impl embedded_io_async::Write for BdxWriter<'_, '_> {
async fn write(&mut self, data: &[u8]) -> Result<usize, Self::Error> {
BdxWriter::write(self, data).await
}
async fn flush(&mut self) -> Result<(), Self::Error> {
if self.block_len > 0 {
self.send_block(false).await?;
}
Ok(())
}
}
pub trait BdxUploadInitiator<'a> {
async fn upload<'b>(
self,
buf: &'b mut [u8],
file_designator: &[u8],
offset: Option<u64>,
) -> Result<BdxWriter<'a, 'b>, Error>;
}
impl<'a> BdxUploadInitiator<'a> for Exchange<'a> {
async fn upload<'b>(
mut self,
buf: &'b mut [u8],
file_designator: &[u8],
offset: Option<u64>,
) -> Result<BdxWriter<'a, 'b>, Error> {
if buf.is_empty() {
return Err(ErrorCode::Invalid.into());
}
let pmbs = buf.len().min(MAX_TX_BLOCK_SIZE as usize) as u16;
send_init(&mut self, OpCode::SendInit, pmbs, offset, file_designator).await?;
match recv_accept(&mut self, false).await? {
Some((tc, mbs, _length)) => {
let drive = if tc.sender_drive {
Drive::Driver
} else {
Drive::Follower
};
Ok(BdxWriter::new(self, drive, buf, mbs))
}
None => abort(&mut self, BdxStatus::TransferMethodNotSupported).await,
}
}
}
pub struct BdxDownloadResponder<'a> {
exchange: Exchange<'a>,
transfer_control: TransferControl,
max_block_size: u16,
start_offset: u64,
}
impl<'a> BdxDownloadResponder<'a> {
pub async fn accept(mut exchange: Exchange<'a>) -> Result<Self, Error> {
let (transfer_control, max_block_size, _length, start_offset) =
recv_init_hold(&mut exchange, OpCode::ReceiveInit).await?;
Ok(Self {
exchange,
transfer_control,
max_block_size,
start_offset,
})
}
pub fn fd(&self) -> &[u8] {
held_fd(&self.exchange)
}
pub fn start_offset(&self) -> u64 {
self.start_offset
}
pub async fn reply<'b>(
mut self,
buf: &'b mut [u8],
length: Option<u64>,
) -> Result<BdxWriter<'a, 'b>, Error> {
if buf.is_empty() {
self.exchange.rx_done()?;
return abort(&mut self.exchange, BdxStatus::TransferFailedUnknownError).await;
}
let tc = self.transfer_control;
let drive = if tc.receiver_drive {
Drive::Follower
} else if tc.sender_drive {
Drive::Driver
} else {
self.exchange.rx_done()?;
return abort(&mut self.exchange, BdxStatus::TransferMethodNotSupported).await;
};
let cap = buf.len().min(MAX_TX_BLOCK_SIZE as usize) as u16;
let mbs = self.max_block_size.clamp(1, cap);
self.exchange.rx_done()?;
send_accept(
&mut self.exchange,
true,
TransferControl::select(drive == Drive::Driver),
mbs,
length,
)
.await?;
Ok(BdxWriter::new(self.exchange, drive, buf, mbs))
}
pub async fn reject(mut self, status: BdxStatus) -> Result<(), Error> {
self.exchange.rx_done()?;
send_status_report(&mut self.exchange, status).await
}
}