use crate::{
PduLoop,
error::{Error, PduError, TimeoutError},
fmt,
pdu_loop::frame_element::{FrameBox, FrameElement, FrameState, received_frame::ReceivedFrame},
};
use core::{future::Future, ptr::NonNull, sync::atomic::AtomicU8, task::Poll};
use futures_lite::FutureExt;
#[derive(Debug)]
pub struct ReceivingFrame<'sto> {
inner: FrameBox<'sto>,
}
impl<'sto> ReceivingFrame<'sto> {
pub(in crate::pdu_loop) fn claim_receiving(
frame: NonNull<FrameElement<0>>,
pdu_idx: &'sto AtomicU8,
frame_data_len: usize,
) -> Option<Self> {
let frame = unsafe { FrameElement::claim_receiving(frame)? };
Some(Self {
inner: FrameBox::new(frame, pdu_idx, frame_data_len),
})
}
pub(in crate::pdu_loop) fn mark_received(&self) -> Result<(), PduError> {
self.inner
.swap_state(FrameState::RxBusy, FrameState::RxDone)
.map_err(|bad| {
fmt::error!(
"Failed to set frame {:#04x} state from RxBusy -> RxDone, got {:?}",
self.storage_slot_index(),
bad
);
PduError::InvalidFrameState
})?;
let _ = self.inner.wake();
Ok(())
}
pub(in crate::pdu_loop) fn buf_mut(&mut self) -> &mut [u8] {
self.inner.pdu_buf_mut()
}
fn storage_slot_index(&self) -> u8 {
self.inner.storage_slot_index()
}
}
pub struct ReceiveFrameFut<'sto> {
pub(in crate::pdu_loop::frame_element) frame: Option<FrameBox<'sto>>,
pub(in crate::pdu_loop::frame_element) pdu_loop: &'sto PduLoop<'sto>,
pub(in crate::pdu_loop::frame_element) timeout_timer: crate::timer_factory::Timer,
pub(in crate::pdu_loop::frame_element) timeout: crate::timer_factory::LabeledTimeout,
pub(in crate::pdu_loop::frame_element) retries_left: usize,
}
impl<'sto> ReceiveFrameFut<'sto> {
#[cfg(test)]
pub fn buf(&self) -> &[u8] {
use crate::{ethernet::EthernetFrame, pdu_loop::frame_header::EthercatFrameHeader};
use ethercrab_wire::EtherCrabWireSized;
let frame = self.frame.as_ref().unwrap();
let b = frame.ethernet_frame();
let len = EthernetFrame::<&[u8]>::buffer_len(frame.pdu_payload_len())
+ EthercatFrameHeader::PACKED_LEN;
&b.into_inner()[0..len]
}
fn release(r: FrameBox<'sto>) {
r.set_state(FrameState::None);
}
}
unsafe impl Send for ReceiveFrameFut<'_> {}
impl<'sto> Future for ReceiveFrameFut<'sto> {
type Output = Result<ReceivedFrame<'sto>, Error>;
fn poll(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> Poll<Self::Output> {
let Some(rxin) = self.frame.take() else {
fmt::error!("Frame is taken");
return Poll::Ready(Err(PduError::InvalidFrameState.into()));
};
rxin.replace_waker(cx.waker());
let frame_idx = rxin.storage_slot_index();
let swappy = rxin.swap_state(FrameState::RxDone, FrameState::RxProcessing);
let was = match swappy {
Ok(_) => {
fmt::trace!("frame index {} is ready", frame_idx);
return Poll::Ready(Ok(ReceivedFrame::new(rxin)));
}
Err(e) => e,
};
fmt::trace!("frame index {} not ready yet ({:?})", frame_idx, was);
match self.timeout_timer.poll(cx) {
Poll::Ready(_) => {
fmt::trace!(
"PDU response timeout with {} retries remaining",
self.retries_left
);
if self.retries_left == 0 {
Self::release(rxin);
return Poll::Ready(Err(Error::Timeout(TimeoutError::from_timeout_kind(
self.timeout.kind,
))));
}
self.timeout_timer = crate::timer_factory::timer(self.timeout);
let _ = self.timeout_timer.poll(cx);
rxin.set_state(FrameState::Sendable);
self.pdu_loop.wake_sender();
self.retries_left -= 1;
}
Poll::Pending => {
}
}
match was {
FrameState::Sendable | FrameState::Sending | FrameState::Sent | FrameState::RxBusy => {
self.frame = Some(rxin);
Poll::Pending
}
state => {
fmt::error!("Frame is in invalid state {:?}", state);
Poll::Ready(Err(PduError::InvalidFrameState.into()))
}
}
}
}
impl Drop for ReceiveFrameFut<'_> {
fn drop(&mut self) {
if let Some(r) = self.frame.take() {
fmt::debug!("Dropping in-flight future, possibly caused by timeout");
Self::release(r);
}
}
}