mod frame_element;
mod frame_header;
mod pdu_flags;
mod pdu_header;
mod pdu_rx;
mod pdu_tx;
pub mod storage;
use crate::{command::Command, error::Error, pdu_loop::storage::PduStorageRef};
use core::sync::atomic::Ordering;
pub use pdu_rx::PduRx;
#[allow(unused)]
pub use pdu_rx::ReceiveAction;
pub use pdu_tx::PduTx;
pub use storage::PduStorage;
pub(crate) use self::frame_element::created_frame::CreatedFrame;
#[cfg(test)]
pub(crate) use frame_element::received_frame::ReceivedFrame;
pub(crate) use frame_element::received_frame::ReceivedPdu;
pub use frame_element::sendable_frame::SendableFrame;
#[derive(Debug)]
pub struct PduLoop<'sto> {
storage: PduStorageRef<'sto>,
}
impl<'sto> PduLoop<'sto> {
pub(in crate::pdu_loop) const fn new(storage: PduStorageRef<'sto>) -> Self {
assert!(storage.num_frames <= u8::MAX as usize);
Self { storage }
}
pub fn reset(&mut self) {
self.storage.reset();
}
pub(crate) fn reset_all(&mut self) {
self.storage.exit_flag.store(true, Ordering::Release);
self.wake_sender();
self.storage.reset();
}
#[cfg(test)]
pub(crate) fn test_only_storage_ref(&self) -> &PduStorageRef<'sto> {
&self.storage
}
pub(crate) const fn max_frame_data(&self) -> usize {
self.storage.frame_data_len
}
pub(crate) fn wake_sender(&self) {
self.storage.tx_waker.wake();
}
pub(crate) async fn pdu_broadcast_zeros(
&self,
register: u16,
payload_length: u16,
timeout: crate::timer_factory::LabeledTimeout,
retries: usize,
) -> Result<(), Error> {
let mut frame = self.storage.alloc_frame()?;
frame.push_pdu(Command::bwr(register).into(), (), Some(payload_length))?;
let frame = frame.mark_sendable(self, timeout, retries);
self.wake_sender();
frame.await?;
Ok(())
}
pub(crate) fn alloc_frame(&self) -> Result<CreatedFrame<'sto>, Error> {
self.storage.alloc_frame()
}
}
#[cfg(test)]
mod tests {
use crate::ethernet::{EthernetAddress, EthernetFrame};
use crate::pdu_loop::frame_element::created_frame::PduResponseHandle;
use crate::pdu_loop::frame_element::received_frame::ReceivedFrame;
use crate::pdu_loop::frame_header::EthercatFrameHeader;
use crate::{
Command, PduStorage, Reads,
error::{Error, PduError, TimeoutError},
fmt,
pdu_loop::frame_element::created_frame::CreatedFrame,
timer_factory::{IntoTimeout, MAX_TIMEOUT, MIN_TIMEOUT},
};
use cassette::Cassette;
use core::{future::poll_fn, ops::Deref, pin::pin, task::Poll, time::Duration};
use futures_lite::Future;
use std::{sync::Arc, thread};
#[test]
fn timed_out_frame_is_reallocatable() {
static STORAGE: PduStorage<1, { PduStorage::element_size(32) }> = PduStorage::new();
let (_tx, _rx, pdu_loop) = STORAGE.try_split().unwrap();
let mut frame = pdu_loop.storage.alloc_frame().expect("Alloc");
frame
.push_pdu(
Reads::Brd {
address: 0,
register: 0,
}
.into(),
(),
Some(16),
)
.expect("Push PDU");
let fut = frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX);
let res = cassette::block_on(fut.timeout(MIN_TIMEOUT));
assert_eq!(
res.unwrap_err(),
Error::Timeout(TimeoutError::from_timeout_kind(MIN_TIMEOUT.kind))
);
let frame = pdu_loop.storage.alloc_frame();
assert!(matches!(frame, Ok(CreatedFrame { .. })));
let f2 = pdu_loop.storage.alloc_frame();
assert_eq!(f2.unwrap_err(), PduError::SwapState.into());
}
#[test]
fn write_frame() {
crate::test_logger();
static STORAGE: PduStorage<1, 128> = PduStorage::<1, 128>::new();
let (_tx, _rx, pdu_loop) = STORAGE.try_split().unwrap();
let data = [0xaau8, 0xbb, 0xcc];
let mut frame = pdu_loop.storage.alloc_frame().unwrap();
let _handle = frame
.push_pdu(Command::fpwr(0x5678, 0x1234).into(), data, None)
.expect("Push");
let frame = frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX);
assert_eq!(
frame.buf(),
&[
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x88, 0xa4, 0x0f, 0x10, 0x05, 0x00, 0x78, 0x56, 0x34, 0x12, 0x03, 0x00, 0x00, 0x00, 0xaa, 0xbb, 0xcc, 0x00, 0x00, ]
);
}
#[test]
fn single_frame_round_trip() {
crate::test_logger();
const FRAME_OVERHEAD: usize = 28;
let storage = PduStorage::<1, 128>::new();
let (mut tx, mut rx, pdu_loop) = storage.try_split().unwrap();
let data = [0xaau8, 0xbb, 0xcc, 0xdd];
let poller = poll_fn(|ctx| {
let mut written_packet = vec![0; FRAME_OVERHEAD + data.len()];
let mut frame = pdu_loop.storage.alloc_frame().expect("Frame alloc");
let handle = frame
.push_pdu(Command::fpwr(0x5678, 0x1234).into(), data, None)
.expect("Push PDU");
let mut frame_fut = pin!(frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX));
assert!(
matches!(frame_fut.as_mut().poll(ctx), Poll::Pending),
"frame fut should be pending"
);
let frame = tx.next_sendable_frame().expect("need a frame");
let send_fut = pin!(async move {
frame
.send_blocking(|bytes| {
written_packet.copy_from_slice(bytes);
Ok(bytes.len())
})
.expect("send");
{
let mut frame = EthernetFrame::new_checked(written_packet).unwrap();
frame.set_src_addr(EthernetAddress([0x12, 0x10, 0x10, 0x10, 0x10, 0x10]));
frame.into_inner()
}
});
let Poll::Ready(written_packet) = send_fut.poll(ctx) else {
panic!("no send")
};
assert_eq!(written_packet.len(), FRAME_OVERHEAD + data.len());
let result = rx.receive_frame(&written_packet);
assert_eq!(result, Ok(crate::ReceiveAction::Processed));
match frame_fut.poll(ctx) {
Poll::Ready(Ok(frame)) => {
let response = frame.first_pdu(handle).expect("Handle");
assert_eq!(response.deref(), &data);
}
Poll::Ready(other) => panic!("Expected Ready(Ok()), got {:?}", other),
Poll::Pending => panic!("frame future still pending"),
}
Poll::Ready(())
});
cassette::block_on(poller);
}
#[test]
fn write_multiple_frame() {
static STORAGE: PduStorage<1, 128> = PduStorage::<1, 128>::new();
let (_tx, _rx, pdu_loop) = STORAGE.try_split().unwrap();
let data = [0xaau8, 0xbb, 0xcc];
let mut frame = pdu_loop.storage.alloc_frame().unwrap();
let _handle = frame
.push_pdu(Command::fpwr(0x5678, 0x1234).into(), data, None)
.expect("Push PDU");
drop(frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX));
let data = [0xaau8, 0xbb];
let mut frame = pdu_loop.storage.alloc_frame().unwrap();
let _handle = frame
.push_pdu(Command::fpwr(0x6789, 0x1234).into(), data, None)
.expect("Push second PDU");
let frame = frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX);
assert_eq!(
frame.buf(),
&[
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x88, 0xa4, 0x0e, 0x10, 0x05, 0x01, 0x89, 0x67, 0x34, 0x12, 0x02, 0x00, 0x00, 0x00, 0xaa, 0xbb, 0x00, 0x00, ]
);
}
#[test]
fn receive_frame() {
crate::test_logger();
let ethernet_packet = [
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x12, 0x10, 0x10, 0x10, 0x10, 0x10, 0x88, 0xa4, 0x10, 0x10, 0x05, 0x00, 0x89, 0x67, 0x34, 0x12, 0x04, 0x00, 0x00, 0x00, 0xdd, 0xcc, 0xbb, 0xaa, 0x00, 0x00, ];
let storage = PduStorage::<1, 128>::new();
let (mut tx, mut rx, pdu_loop) = storage.try_split().unwrap();
let data = 0xAABBCCDDu32;
let data_bytes = data.to_le_bytes();
let poller = poll_fn(|ctx| {
let mut frame = pdu_loop.storage.alloc_frame().unwrap();
let handle = frame
.push_pdu(Command::fpwr(0x6789, 0x1234).into(), data_bytes, None)
.expect("Push PDU");
let mut frame_fut = pin!(frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX));
assert!(
matches!(frame_fut.as_mut().poll(ctx), Poll::Pending),
"frame fut should be pending"
);
let frame = tx.next_sendable_frame().expect("need a frame");
frame.send_blocking(|bytes| Ok(bytes.len())).expect("send");
let result = rx.receive_frame(ðernet_packet);
assert_eq!(result, Ok(crate::ReceiveAction::Processed));
match frame_fut.poll(ctx) {
Poll::Ready(Ok(frame)) => {
assert_eq!(frame.first_pdu(handle).unwrap().deref(), &data_bytes);
}
Poll::Ready(other) => panic!("Expected Ready(Ok()), got {:?}", other),
Poll::Pending => panic!("frame future still pending"),
}
Poll::Ready(())
});
cassette::block_on(poller);
}
#[test]
fn receive_frame_before_first_poll() {
crate::test_logger();
let ethernet_packet = [
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x12, 0x10, 0x10, 0x10, 0x10, 0x10, 0x88, 0xa4, 0x10, 0x10, 0x05, 0x00, 0x89, 0x67, 0x34, 0x12, 0x04, 0x00, 0x00, 0x00, 0xdd, 0xcc, 0xbb, 0xaa, 0x00, 0x00, ];
let storage = PduStorage::<1, 128>::new();
let (mut tx, mut rx, pdu_loop) = storage.try_split().unwrap();
let data = 0xAABBCCDDu32;
let data_bytes = data.to_le_bytes();
let mut frame = pdu_loop.storage.alloc_frame().unwrap();
frame
.push_pdu(Command::fpwr(0x6789, 0x1234).into(), data_bytes, None)
.expect("Push PDU");
let frame_fut = pin!(frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX));
let frame = tx.next_sendable_frame().expect("need a frame");
frame.send_blocking(|bytes| Ok(bytes.len())).expect("send");
let result = rx.receive_frame(ðernet_packet);
assert_eq!(result, Ok(crate::ReceiveAction::Processed));
let mut frame_fut = Cassette::new(frame_fut);
let frame_poll = frame_fut.poll_on();
assert!(
matches!(frame_poll, Some(Ok(ReceivedFrame { .. }))),
"frame future should have completed"
);
}
#[tokio::test]
async fn tokio_spawn() {
crate::test_logger();
static STORAGE: PduStorage<16, 128> = PduStorage::<16, 128>::new();
let (mut tx, mut rx, pdu_loop) = STORAGE.try_split().unwrap();
let tx_rx_task = async move {
let (s, mut r) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
let tx_task = async {
fmt::info!("Spawn TX task");
loop {
while let Some(frame) = tx.next_sendable_frame() {
frame
.send_blocking(|bytes| {
s.send(bytes.to_vec()).unwrap();
Ok(bytes.len())
})
.unwrap();
}
futures_lite::future::yield_now().await;
}
};
let rx_task = async {
fmt::info!("Spawn RX task");
while let Some(ethernet_frame) = r.recv().await {
fmt::trace!("RX task received packet");
let ethernet_frame = {
let mut frame = EthernetFrame::new_checked(ethernet_frame).unwrap();
frame.set_src_addr(EthernetAddress([0x12, 0x10, 0x10, 0x10, 0x10, 0x10]));
frame.into_inner()
};
rx.receive_frame(ðernet_frame).expect("RX");
}
};
futures_lite::future::race(tx_task, rx_task).await;
};
tokio::spawn(tx_rx_task);
for i in 0..32 {
let data = [0xaa, 0xbb, 0xcc, 0xdd, i];
fmt::info!("Send PDU {i}");
let mut frame = pdu_loop.storage.alloc_frame().expect("Frame alloc");
let handle = frame
.push_pdu(Command::fpwr(0x1000, 0x980).into(), data, None)
.expect("Push PDU");
let result = frame
.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX)
.await
.expect("Future");
let received_data = result.first_pdu(handle).expect("Take");
assert_eq!(&*received_data, &data);
}
fmt::info!("Sent all PDUs");
}
#[test]
fn multiple_threads() {
crate::test_logger();
const MAX_SUBDEVICES: usize = 16;
static STORAGE: PduStorage<MAX_SUBDEVICES, 128> = PduStorage::<MAX_SUBDEVICES, 128>::new();
let (mut tx, mut rx, pdu_loop) = STORAGE.try_split().unwrap();
let (sent, received) = thread::scope(|s| {
let (mock_net_tx, mock_net_rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(16);
let sent = s.spawn(move || {
fmt::info!("Spawn TX task");
let mut sent = 0;
loop {
while let Some(frame) = tx.next_sendable_frame() {
fmt::info!("Sendable frame");
frame
.send_blocking(|bytes| {
mock_net_tx.send(bytes.to_vec()).unwrap();
sent += 1;
Ok(bytes.len())
})
.unwrap();
thread::yield_now();
}
thread::sleep(Duration::from_millis(1));
if sent == MAX_SUBDEVICES {
break sent;
}
}
});
let received = s.spawn(move || {
fmt::info!("Spawn RX task");
let mut received = 0;
while let Ok(ethernet_frame) = mock_net_rx.recv() {
fmt::info!("RX task received packet");
thread::sleep(Duration::from_millis(1));
let ethernet_frame = {
let mut frame = EthernetFrame::new_checked(ethernet_frame).unwrap();
frame.set_src_addr(EthernetAddress([0x12, 0x10, 0x10, 0x10, 0x10, 0x10]));
frame.into_inner()
};
while rx.receive_frame(ðernet_frame).is_err() {}
thread::yield_now();
received += 1;
if received == MAX_SUBDEVICES {
break;
}
}
received
});
let pdu_loop = Arc::new(pdu_loop);
for i in 0..MAX_SUBDEVICES {
let pdu_loop = pdu_loop.clone();
s.spawn(move || {
let data = [0xaau8, 0xbb, 0xcc, 0xdd, i as u8];
fmt::info!("Send PDU {i}");
let mut frame = pdu_loop.storage.alloc_frame().expect("Frame alloc");
let handle = frame
.push_pdu(Command::fpwr(0x1000, 0x980).into(), data, None)
.expect("Push PDU");
let frame = pin!(frame.mark_sendable(&pdu_loop, MAX_TIMEOUT, usize::MAX));
let mut x = Cassette::new(frame);
let result = loop {
if let Some(res) = x.poll_on() {
break res;
}
thread::sleep(Duration::from_millis(1));
thread::yield_now();
}
.expect("Future");
let received_data = result.first_pdu(handle).expect("Take");
assert_eq!(&*received_data, &data);
});
}
(sent.join().unwrap(), received.join().unwrap())
});
assert_eq!(sent, received);
assert_eq!(sent, MAX_SUBDEVICES);
fmt::info!("Sent all PDUs");
}
#[test]
fn split_pdi() {
crate::test_logger();
const DATA: usize = 48;
static STORAGE: PduStorage<8, DATA> = PduStorage::new();
let (_tx, _rx, pdu_loop) = STORAGE.try_split().unwrap();
let pdi = [0xaau8; 64];
let mut remaining = &pdi[..];
let (sent, _handle) = {
let mut frame = pdu_loop.alloc_frame().expect("No frame");
let res = frame.push_pdu_slice_rest(Command::Nop, remaining);
let expected_pushed_bytes = DATA
- EthernetFrame::<&[u8]>::header_len()
- EthercatFrameHeader::header_len()
- CreatedFrame::PDU_OVERHEAD_BYTES;
assert_eq!(expected_pushed_bytes, 20);
assert_eq!(
res,
Ok(Some((
expected_pushed_bytes,
PduResponseHandle {
index_in_frame: 0,
pdu_idx: 0,
command_code: 0,
alloc_size: pdu_loop.max_frame_data()
- EthernetFrame::<&[u8]>::header_len()
- EthercatFrameHeader::header_len()
}
)))
);
res.unwrap().unwrap()
};
remaining = &remaining[sent..];
assert_eq!(remaining.len(), 44);
let (sent, _handle) = {
let mut frame = pdu_loop.alloc_frame().expect("No frame");
let res = frame.push_pdu_slice_rest(Command::Nop, remaining);
let expected_pushed_bytes = DATA
- EthernetFrame::<&[u8]>::header_len()
- EthercatFrameHeader::header_len()
- CreatedFrame::PDU_OVERHEAD_BYTES;
assert_eq!(expected_pushed_bytes, 20);
assert_eq!(
res,
Ok(Some((
expected_pushed_bytes,
PduResponseHandle {
index_in_frame: 0,
pdu_idx: 1,
command_code: 0,
alloc_size: pdu_loop.max_frame_data()
- EthernetFrame::<&[u8]>::header_len()
- EthercatFrameHeader::header_len()
}
)))
);
res.unwrap().unwrap()
};
remaining = &remaining[sent..];
assert_eq!(remaining.len(), 24);
let (sent, _handle) = {
let mut frame = pdu_loop.alloc_frame().expect("No frame");
let res = frame.push_pdu_slice_rest(Command::Nop, remaining);
let expected_pushed_bytes = DATA
- EthernetFrame::<&[u8]>::header_len()
- EthercatFrameHeader::header_len()
- CreatedFrame::PDU_OVERHEAD_BYTES;
assert_eq!(expected_pushed_bytes, 20);
assert_eq!(
res,
Ok(Some((
expected_pushed_bytes,
PduResponseHandle {
index_in_frame: 0,
pdu_idx: 2,
command_code: 0,
alloc_size: pdu_loop.max_frame_data()
- EthernetFrame::<&[u8]>::header_len()
- EthercatFrameHeader::header_len()
}
)))
);
res.unwrap().unwrap()
};
remaining = &remaining[sent..];
assert_eq!(remaining.len(), 4);
let (sent, _handle) = {
let mut frame = pdu_loop.alloc_frame().expect("No frame");
let res = frame.push_pdu_slice_rest(Command::Nop, remaining);
let expected_pushed_bytes = 4;
assert_eq!(
res,
Ok(Some((
expected_pushed_bytes,
PduResponseHandle {
index_in_frame: 0,
pdu_idx: 3,
command_code: 0,
alloc_size: expected_pushed_bytes + CreatedFrame::PDU_OVERHEAD_BYTES
}
)))
);
res.unwrap().unwrap()
};
let empty: &[u8] = &[];
assert_eq!(&remaining[sent..], empty);
}
}