use super::nego::*;
use super::*;
pub struct BdxReader<'a> {
exchange: Exchange<'a>,
drive: Drive,
len: Option<u64>,
counter: u32,
held_counter: u32,
held_eof: bool,
block_pos: usize,
holding: bool,
finished: bool,
}
impl<'a> BdxReader<'a> {
pub(super) const fn new(exchange: Exchange<'a>, drive: Drive, len: Option<u64>) -> Self {
Self {
exchange,
drive,
len,
counter: 0,
held_counter: 0,
held_eof: false,
block_pos: 0,
holding: false,
finished: false,
}
}
#[allow(clippy::len_without_is_empty)] pub fn len(&self) -> Option<u64> {
self.len
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
if buf.is_empty() {
return Ok(0);
}
loop {
if self.finished {
return Ok(0);
}
if self.holding {
let n = {
let payload = self.exchange.rx()?.payload();
let data = &payload[BLOCK_HEADER_LEN..];
if self.block_pos < data.len() {
let remaining = &data[self.block_pos..];
let n = remaining.len().min(buf.len());
buf[..n].copy_from_slice(&remaining[..n]);
Some(n)
} else {
None
}
};
if let Some(n) = n {
self.block_pos += n;
return Ok(n);
}
self.release_block().await?;
continue;
}
self.receive_block().await?;
}
}
async fn receive_block(&mut self) -> Result<(), Error> {
enum Outcome {
Ok(bool),
BadCounter,
Unexpected,
Aborted(Error),
}
if matches!(self.drive, Drive::Driver) {
self.send_control(OpCode::BlockQuery, self.counter).await?;
}
self.exchange.recv_fetch().await?;
let meta = self.exchange.rx()?.meta();
let outcome = {
let payload = self.exchange.rx()?.payload();
match classify(&meta, payload) {
Ok(op) if matches!(op, OpCode::Block | OpCode::BlockEof) => {
let block = Block::parse(payload)?;
if block.block_counter != self.counter {
Outcome::BadCounter
} else {
Outcome::Ok(op == OpCode::BlockEof)
}
}
Ok(_) => Outcome::Unexpected,
Err(e) => Outcome::Aborted(e),
}
};
match outcome {
Outcome::Ok(is_eof) => {
self.held_counter = self.counter;
self.held_eof = is_eof;
self.counter = self.counter.wrapping_add(1);
self.block_pos = 0;
self.holding = true;
Ok(())
}
Outcome::BadCounter => {
self.exchange.rx_done()?;
abort(&mut self.exchange, BdxStatus::BadBlockCounter).await
}
Outcome::Unexpected => {
self.exchange.rx_done()?;
abort(&mut self.exchange, BdxStatus::UnexpectedMessage).await
}
Outcome::Aborted(e) => {
self.exchange.rx_done()?;
Err(e)
}
}
}
async fn release_block(&mut self) -> Result<(), Error> {
let counter = self.held_counter;
if self.held_eof {
self.send_control(OpCode::BlockAckEof, counter).await?;
self.exchange.rx_done()?;
self.exchange.acknowledge().await?;
self.finished = true;
} else if matches!(self.drive, Drive::Follower) {
self.send_control(OpCode::BlockAck, counter).await?;
self.exchange.rx_done()?;
} else {
self.exchange.rx_done()?;
}
self.holding = false;
Ok(())
}
async fn send_control(&mut self, opcode: OpCode, counter: u32) -> Result<(), Error> {
self.exchange
.send_with(|_, wb| {
BlockQuery {
block_counter: counter,
}
.write(wb)?;
Ok(Some(opcode.into()))
})
.await
}
}
impl embedded_io_async::ErrorType for BdxReader<'_> {
type Error = Error;
}
impl embedded_io_async::Read for BdxReader<'_> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
BdxReader::read(self, buf).await
}
}
pub trait BdxDownloadInitiator<'a> {
async fn download(
self,
file_designator: &[u8],
offset: Option<u64>,
) -> Result<BdxReader<'a>, Error>;
}
impl<'a> BdxDownloadInitiator<'a> for Exchange<'a> {
async fn download(
mut self,
file_designator: &[u8],
offset: Option<u64>,
) -> Result<BdxReader<'a>, Error> {
send_init(
&mut self,
OpCode::ReceiveInit,
MAX_RX_BLOCK_SIZE,
offset,
file_designator,
)
.await?;
match recv_accept(&mut self, true).await? {
Some((tc, _mbs, length)) => {
let drive = if tc.receiver_drive {
Drive::Driver
} else {
Drive::Follower
};
Ok(BdxReader::new(self, drive, length))
}
None => abort(&mut self, BdxStatus::TransferMethodNotSupported).await,
}
}
}
pub struct BdxUploadResponder<'a> {
exchange: Exchange<'a>,
transfer_control: TransferControl,
max_block_size: u16,
length: Option<u64>,
start_offset: u64,
}
impl<'a> BdxUploadResponder<'a> {
pub async fn accept(mut exchange: Exchange<'a>) -> Result<Self, Error> {
let (transfer_control, max_block_size, length, start_offset) =
recv_init_hold(&mut exchange, OpCode::SendInit).await?;
Ok(Self {
exchange,
transfer_control,
max_block_size,
length,
start_offset,
})
}
pub fn fd(&self) -> &[u8] {
held_fd(&self.exchange)
}
#[allow(clippy::len_without_is_empty)] pub fn len(&self) -> Option<u64> {
self.length
}
pub fn start_offset(&self) -> u64 {
self.start_offset
}
pub async fn reply(mut self) -> Result<BdxReader<'a>, Error> {
let tc = self.transfer_control;
let drive = if tc.sender_drive {
Drive::Follower
} else if tc.receiver_drive {
Drive::Driver
} else {
self.exchange.rx_done()?;
return abort(&mut self.exchange, BdxStatus::TransferMethodNotSupported).await;
};
let mbs = self.max_block_size.clamp(1, MAX_RX_BLOCK_SIZE);
let length = self.length;
self.exchange.rx_done()?;
send_accept(
&mut self.exchange,
false,
TransferControl::select(drive == Drive::Follower),
mbs,
None,
)
.await?;
Ok(BdxReader::new(self.exchange, drive, length))
}
pub async fn reject(mut self, status: BdxStatus) -> Result<(), Error> {
self.exchange.rx_done()?;
send_status_report(&mut self.exchange, status).await
}
}