mod group_id;
mod handle;
mod tx_rx_response;
use crate::{
DcSync,
MainDevice,
RegisterAddress,
SubDeviceState,
al_control::AlControl,
command::Command,
error::{DistributedClockError, Error, Item},
fmt,
pdi::PdiOffset,
pdu_loop::{CreatedFrame, ReceivedPdu},
subdevice::{
IoRanges, SubDevice, SubDeviceRef, configuration::PdoDirection, pdi::SubDevicePdi,
},
timer_factory::IntoTimeout,
};
use core::{cell::UnsafeCell, marker::PhantomData, sync::atomic::AtomicUsize, time::Duration};
use ethercrab_wire::{EtherCrabWireRead, EtherCrabWireSized};
use lock_api::{RawRwLock, RwLock, RwLockWriteGuard};
pub use self::group_id::GroupId;
pub use self::handle::SubDeviceGroupHandle;
pub use self::tx_rx_response::TxRxResponse;
static GROUP_ID: AtomicUsize = AtomicUsize::new(0);
const DC_PDU_SIZE: usize = CreatedFrame::PDU_OVERHEAD_BYTES + u64::PACKED_LEN;
#[derive(Debug)]
pub(crate) struct MySyncUnsafeCell<T: ?Sized>(pub UnsafeCell<T>);
impl<T> MySyncUnsafeCell<T> {
pub fn new(inner: T) -> Self {
Self(UnsafeCell::new(inner))
}
}
unsafe impl<T: ?Sized + Sync> Sync for MySyncUnsafeCell<T> {}
impl<T: ?Sized> MySyncUnsafeCell<T> {
#[inline]
pub const fn get(&self) -> *mut T {
self.0.get()
}
#[inline]
pub fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}
}
#[derive(Copy, Clone, Debug)]
pub struct Init;
#[derive(Copy, Clone, Debug)]
pub struct PreOp;
#[derive(Copy, Clone, Debug)]
pub struct PreOpPdi;
#[derive(Copy, Clone, Debug)]
pub struct SafeOp;
#[derive(Copy, Clone, Debug)]
pub struct Op;
#[derive(Copy, Clone, Debug)]
pub struct NoDc;
#[derive(Copy, Clone, Debug)]
pub struct HasDc {
sync0_period: u64,
sync0_shift: u64,
reference: u16,
}
#[doc(hidden)]
pub trait HasPdi {}
impl HasPdi for PreOpPdi {}
impl HasPdi for SafeOp {}
impl HasPdi for Op {}
#[doc(hidden)]
pub trait IsPreOp {}
impl IsPreOp for PreOp {}
impl IsPreOp for PreOpPdi {}
#[derive(Default)]
struct GroupInner<const MAX_SUBDEVICES: usize> {
subdevices: heapless::Vec<SubDevice, MAX_SUBDEVICES>,
pdi_start: PdiOffset,
}
const CYCLIC_OP_ENABLE: u8 = 0b0000_0001;
const SYNC0_ACTIVATE: u8 = 0b0000_0010;
const SYNC1_ACTIVATE: u8 = 0b0000_0100;
#[derive(Default, Debug, Copy, Clone)]
pub struct DcConfiguration {
pub start_delay: Duration,
pub sync0_period: Duration,
pub sync0_shift: Duration,
}
#[derive(Debug, Copy, Clone)]
pub struct CycleInfo {
pub dc_system_time: u64,
pub next_cycle_wait: Duration,
pub cycle_start_offset: Duration,
}
#[doc(alias = "SlaveGroup")]
pub struct SubDeviceGroup<
const MAX_SUBDEVICES: usize,
const MAX_PDI: usize,
R: RawRwLock = crate::DefaultLock,
S = PreOp,
DC = NoDc,
> {
id: GroupId,
pdi: RwLock<R, MySyncUnsafeCell<[u8; MAX_PDI]>>,
read_pdi_len: usize,
pdi_len: usize,
inner: MySyncUnsafeCell<GroupInner<MAX_SUBDEVICES>>,
dc_conf: DC,
_state: PhantomData<S>,
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, DC>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, PreOp, DC>
{
async fn configure_fmmus(&mut self, maindevice: &MainDevice<'_>) -> Result<(), Error> {
let inner = self.inner.get_mut();
let mut pdi_position = inner.pdi_start;
fmt::debug!(
"Going to configure group with {} SubDevice(s), starting PDI offset {:#010x}",
inner.subdevices.len(),
inner.pdi_start.start_address
);
for subdevice in inner.subdevices.iter_mut() {
pdi_position = SubDeviceRef::new(maindevice, subdevice.configured_address(), subdevice)
.configure_fmmus(
pdi_position,
inner.pdi_start.start_address,
PdoDirection::MasterRead,
)
.await?;
}
self.read_pdi_len = (pdi_position.start_address - inner.pdi_start.start_address) as usize;
fmt::debug!("SubDevice mailboxes configured and init hooks called");
for subdevice in inner.subdevices.iter_mut() {
let addr = subdevice.configured_address();
let mut subdevice_config = SubDeviceRef::new(maindevice, addr, subdevice);
pdi_position = subdevice_config
.configure_fmmus(
pdi_position,
inner.pdi_start.start_address,
PdoDirection::MasterWrite,
)
.await?;
}
fmt::debug!("SubDevice FMMUs configured for group. Able to move to SAFE-OP");
self.pdi_len = (pdi_position.start_address - inner.pdi_start.start_address) as usize;
fmt::debug!(
"Group PDI length: start {:#010x}, {} total bytes ({} input bytes)",
inner.pdi_start.start_address,
self.pdi_len,
self.read_pdi_len
);
if self.pdi_len > MAX_PDI {
return Err(Error::PdiTooLong {
max_length: MAX_PDI,
desired_length: self.pdi_len,
});
}
Ok(())
}
#[deny(clippy::panic)]
#[doc(alias = "slave")]
pub fn subdevice<'maindevice, 'group>(
&'group self,
maindevice: &'maindevice MainDevice<'maindevice>,
index: usize,
) -> Result<SubDeviceRef<'maindevice, &'group SubDevice>, Error> {
let subdevice = self.inner().subdevices.get(index).ok_or(Error::NotFound {
item: Item::SubDevice,
index: Some(index),
})?;
Ok(SubDeviceRef::new(
maindevice,
subdevice.configured_address(),
subdevice,
))
}
pub async fn into_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Op, DC>, Error> {
let self_ = self.into_safe_op(maindevice).await?;
self_.into_op(maindevice).await
}
pub async fn into_pre_op_pdi(
mut self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, PreOpPdi, DC>, Error> {
self.configure_fmmus(maindevice).await?;
Ok(SubDeviceGroup {
id: self.id,
pdi: self.pdi,
read_pdi_len: self.read_pdi_len,
pdi_len: self.pdi_len,
inner: self.inner,
dc_conf: self.dc_conf,
_state: PhantomData,
})
}
pub async fn into_safe_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, SafeOp, DC>, Error> {
let self_ = self.into_pre_op_pdi(maindevice).await?;
self_
.transition_to(maindevice, SubDeviceState::SafeOp)
.await
}
pub async fn into_init(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Init, DC>, Error> {
self.transition_to(maindevice, SubDeviceState::Init).await
}
pub fn iter<'group, 'maindevice>(
&'group self,
maindevice: &'maindevice MainDevice<'maindevice>,
) -> impl Iterator<Item = SubDeviceRef<'maindevice, &'group SubDevice>> {
self.inner()
.subdevices
.iter()
.map(|sd| SubDeviceRef::new(maindevice, sd.configured_address, sd))
}
pub fn iter_mut<'group, 'maindevice>(
&'group mut self,
maindevice: &'maindevice MainDevice<'maindevice>,
) -> impl Iterator<Item = SubDeviceRef<'maindevice, &'group mut SubDevice>> {
self.inner
.get_mut()
.subdevices
.iter_mut()
.map(|sd| SubDeviceRef::new(maindevice, sd.configured_address, sd))
}
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, S, DC>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, S, DC>
where
S: IsPreOp,
{
pub async fn configure_dc_sync(
self,
maindevice: &MainDevice<'_>,
dc_conf: DcConfiguration,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, PreOpPdi, HasDc>, Error> {
fmt::debug!("Configuring distributed clocks for group");
let Some(reference) = maindevice.dc_ref_address() else {
fmt::error!("No DC reference clock SubDevice present, unable to configure DC");
return Err(DistributedClockError::NoReference.into());
};
let DcConfiguration {
start_delay,
sync0_period,
sync0_shift,
} = dc_conf;
let self_ = SubDeviceGroup {
id: self.id,
pdi: self.pdi,
read_pdi_len: self.read_pdi_len,
pdi_len: self.pdi_len,
inner: self.inner,
dc_conf: NoDc,
_state: PhantomData::<PreOp>,
};
let dc_devices = self_.iter(maindevice).filter(|subdevice| {
subdevice.dc_support().any() && !matches!(subdevice.dc_sync(), DcSync::Disabled)
});
let system_time = SubDeviceRef::new(maindevice, reference, ())
.register_read::<u64>(RegisterAddress::DcSystemTime)
.await?;
let sync0_period = u64::from(u32::try_from(sync0_period.as_nanos())?);
let first_pulse_delay = u64::from(u32::try_from(start_delay.as_nanos())?);
for subdevice in dc_devices {
fmt::debug!(
"--> Configuring SubDevice {:#06x} {} DC mode {}",
subdevice.configured_address(),
subdevice.name(),
subdevice.dc_sync()
);
subdevice
.write(RegisterAddress::DcSyncActive)
.ignore_wkc()
.send(maindevice, 0u8)
.await?;
let start_time = (system_time + first_pulse_delay) / sync0_period * sync0_period;
fmt::debug!("--> Computed DC sync start time: {}", start_time);
subdevice
.write(RegisterAddress::DcSyncStartTime)
.send(maindevice, start_time)
.await?;
subdevice
.write(RegisterAddress::DcSync0CycleTime)
.send(maindevice, sync0_period)
.await?;
let flags = if let DcSync::Sync01 { sync1_period } = subdevice.dc_sync() {
let sync1_period = u64::from(u32::try_from(sync1_period.as_nanos())?);
subdevice
.write(RegisterAddress::DcSync1CycleTime)
.send(maindevice, sync1_period)
.await?;
SYNC1_ACTIVATE | SYNC0_ACTIVATE | CYCLIC_OP_ENABLE
} else {
SYNC0_ACTIVATE | CYCLIC_OP_ENABLE
};
subdevice
.write(RegisterAddress::DcSyncActive)
.send(maindevice, flags)
.await?;
}
Ok(SubDeviceGroup {
id: self_.id,
pdi: self_.pdi,
read_pdi_len: self_.read_pdi_len,
pdi_len: self_.pdi_len,
inner: self_.inner,
dc_conf: HasDc {
sync0_period: sync0_period,
sync0_shift: sync0_shift.as_nanos() as u64,
reference,
},
_state: PhantomData,
})
}
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, DC>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, PreOpPdi, DC>
{
pub async fn into_safe_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, SafeOp, DC>, Error> {
self.transition_to(maindevice, SubDeviceState::SafeOp).await
}
pub async fn into_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Op, DC>, Error> {
let self_ = self.into_safe_op(maindevice).await?;
self_.transition_to(maindevice, SubDeviceState::Op).await
}
pub async fn request_into_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Op, DC>, Error> {
let self_ = self.into_safe_op(maindevice).await?;
self_.request_into_op(maindevice).await
}
pub async fn into_init(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Init, DC>, Error> {
self.transition_to(maindevice, SubDeviceState::Init).await
}
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, DC>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, SafeOp, DC>
{
pub async fn into_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Op, DC>, Error> {
self.transition_to(maindevice, SubDeviceState::Op).await
}
pub async fn into_pre_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, PreOp, DC>, Error> {
self.transition_to(maindevice, SubDeviceState::PreOp).await
}
pub async fn request_into_op(
mut self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Op, DC>, Error> {
for subdevice in self.inner.get_mut().subdevices.iter_mut() {
SubDeviceRef::new(maindevice, subdevice.configured_address(), subdevice)
.request_subdevice_state_nowait(SubDeviceState::Op)
.await?;
}
Ok(SubDeviceGroup {
id: self.id,
pdi: self.pdi,
read_pdi_len: self.read_pdi_len,
pdi_len: self.pdi_len,
inner: self.inner,
dc_conf: self.dc_conf,
_state: PhantomData,
})
}
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, DC>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, Op, DC>
{
pub async fn into_safe_op(
self,
maindevice: &MainDevice<'_>,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, SafeOp, DC>, Error> {
self.transition_to(maindevice, SubDeviceState::SafeOp).await
}
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, S> Default
for SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, S>
{
fn default() -> Self {
Self {
id: GroupId(GROUP_ID.fetch_add(1, core::sync::atomic::Ordering::Relaxed)),
pdi: RwLock::new(MySyncUnsafeCell::new([0u8; MAX_PDI])),
read_pdi_len: Default::default(),
pdi_len: Default::default(),
inner: MySyncUnsafeCell::new(GroupInner::default()),
dc_conf: NoDc,
_state: PhantomData,
}
}
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, S, DC>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, S, DC>
{
fn inner(&self) -> &GroupInner<MAX_SUBDEVICES> {
unsafe { &*self.inner.get() }
}
pub fn len(&self) -> usize {
self.inner().subdevices.len()
}
pub fn is_empty(&self) -> bool {
self.inner().subdevices.is_empty()
}
async fn is_state(
&self,
maindevice: &MainDevice<'_>,
desired_state: SubDeviceState,
) -> Result<bool, Error> {
fmt::trace!("Check group state");
let mut subdevices = self.inner().subdevices.iter();
let mut total_checks = 0;
loop {
let mut frame = maindevice.pdu_loop.alloc_frame()?;
let (rest, num_in_this_frame) = push_state_checks(subdevices, &mut frame)?;
subdevices = rest;
if num_in_this_frame == 0 {
fmt::trace!("--> No more state checks, pushed {}", total_checks);
break;
}
total_checks += num_in_this_frame;
let frame = frame.mark_sendable(
&maindevice.pdu_loop,
maindevice.timeouts.pdu(),
maindevice.config.retry_behaviour.retry_count(),
);
maindevice.pdu_loop.wake_sender();
let received = frame.await?;
for pdu in received.into_pdu_iter() {
let pdu = pdu?;
let result = AlControl::unpack_from_slice(&pdu)?;
if result.state != desired_state {
return Ok(false);
}
}
}
debug_assert_eq!(total_checks, self.len());
Ok(true)
}
async fn wait_for_state(
&self,
maindevice: &MainDevice<'_>,
desired_state: SubDeviceState,
) -> Result<(), Error> {
async {
loop {
if self.is_state(maindevice, desired_state).await? {
break Ok(());
}
maindevice.timeouts.loop_tick().await;
}
}
.timeout(maindevice.timeouts.state_transition())
.await
}
async fn transition_to<TO>(
mut self,
maindevice: &MainDevice<'_>,
desired_state: SubDeviceState,
) -> Result<SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, TO, DC>, Error> {
for subdevice in self.inner.get_mut().subdevices.iter_mut() {
SubDeviceRef::new(maindevice, subdevice.configured_address(), subdevice)
.request_subdevice_state_nowait(desired_state)
.await?;
}
fmt::debug!("Waiting for group state {}", desired_state);
self.wait_for_state(maindevice, desired_state).await?;
fmt::debug!("--> Group reached state {}", desired_state);
Ok(SubDeviceGroup {
id: self.id,
pdi: self.pdi,
read_pdi_len: self.read_pdi_len,
pdi_len: self.pdi_len,
inner: self.inner,
dc_conf: self.dc_conf,
_state: PhantomData,
})
}
}
fn push_state_checks<'group, 'sto, I>(
mut subdevices: I,
frame: &mut CreatedFrame<'sto>,
) -> Result<(I, usize), Error>
where
I: Iterator<Item = &'group SubDevice>,
{
let mut num_in_this_frame = 0;
while frame.can_push_pdu_payload(AlControl::PACKED_LEN) {
let Some(sd) = subdevices.next() else {
break;
};
frame.push_pdu(
Command::fprd(sd.configured_address(), RegisterAddress::AlStatus.into()).into(),
(),
Some(AlControl::PACKED_LEN as u16),
)?;
num_in_this_frame += 1;
if num_in_this_frame > 128 {
break;
}
}
fmt::trace!(
"--> Pushed {} status checks into frame {}",
num_in_this_frame,
frame.storage_slot_index()
);
Ok((subdevices, num_in_this_frame))
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, S, DC>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, S, DC>
where
S: HasPdi,
{
#[doc(alias = "slave")]
pub fn subdevice<'maindevice, 'group>(
&'group self,
maindevice: &'maindevice MainDevice<'maindevice>,
index: usize,
) -> Result<SubDeviceRef<'maindevice, SubDevicePdi<'group, MAX_PDI, R>>, Error> {
let subdevice = self.inner().subdevices.get(index).ok_or(Error::NotFound {
item: Item::SubDevice,
index: Some(index),
})?;
let io_ranges = subdevice.io_segments().clone();
let IoRanges {
input: input_range,
output: output_range,
} = &io_ranges;
fmt::trace!(
"Get SubDevice {:#06x} IO ranges I: {}, O: {} (group PDI {} byte subset of {} byte max)",
subdevice.configured_address(),
input_range,
output_range,
self.pdi_len,
MAX_PDI
);
Ok(SubDeviceRef::new(
maindevice,
subdevice.configured_address(),
SubDevicePdi::new(subdevice, &self.pdi),
))
}
pub fn iter<'group, 'maindevice>(
&'group self,
maindevice: &'maindevice MainDevice<'maindevice>,
) -> impl Iterator<Item = SubDeviceRef<'group, SubDevicePdi<'group, MAX_PDI, R>>>
where
'maindevice: 'group,
{
self.inner().subdevices.iter().map(|sd| {
SubDeviceRef::new(
maindevice,
sd.configured_address,
SubDevicePdi::new(sd, &self.pdi),
)
})
}
pub async fn tx_rx<'sto>(
&self,
maindevice: &'sto MainDevice<'sto>,
) -> Result<TxRxResponse<MAX_SUBDEVICES>, Error> {
fmt::trace!(
"Group TX/RX, start address {:#010x}, data len {}, of which read bytes: {}",
self.inner().pdi_start.start_address,
self.pdi_len,
self.read_pdi_len
);
let mut pdi_lock = self.pdi.write();
let mut total_bytes_sent = 0;
let mut lrw_wkc_sum = 0;
let mut subdevices = self.inner().subdevices.iter();
let mut total_checks = 0;
let mut subdevice_states = heapless::Vec::<_, MAX_SUBDEVICES>::new();
loop {
let chunk_len = self.pdi_len.saturating_sub(total_bytes_sent);
if chunk_len == 0 && total_checks >= self.len() {
break;
}
let chunk_start = total_bytes_sent.min(self.pdi_len);
let chunk = pdi_lock.get_mut()[chunk_start..(chunk_start + chunk_len)].as_ref();
let mut frame = maindevice.pdu_loop.alloc_frame()?;
let pushed_chunk = if !chunk.is_empty() {
let start_addr = self.inner().pdi_start.start_address + total_bytes_sent as u32;
frame.push_pdu_slice_rest(Command::lrw(start_addr).into(), chunk)?
} else {
None
};
let (rest, num_checks_in_this_frame) = push_state_checks(subdevices, &mut frame)?;
subdevices = rest;
total_checks += num_checks_in_this_frame;
if frame.is_empty() {
break;
}
let frame = frame.mark_sendable(
&maindevice.pdu_loop,
maindevice.timeouts.pdu(),
maindevice.config.retry_behaviour.retry_count(),
);
maindevice.pdu_loop.wake_sender();
let received = frame.await?;
let mut pdus = received.into_pdu_iter();
if let Some((bytes_in_this_chunk, _pdu_handle)) = pushed_chunk {
let wkc = self.process_received_pdi_chunk(
total_bytes_sent,
bytes_in_this_chunk,
&pdus.next().ok_or(Error::Internal)??,
&mut pdi_lock,
)?;
total_bytes_sent += bytes_in_this_chunk;
lrw_wkc_sum += wkc;
}
for state_check_pdu in pdus {
let state_check_pdu = state_check_pdu?;
let state = AlControl::unpack_from_slice(&state_check_pdu)?;
let _ = subdevice_states.push(state.state);
}
}
Ok(TxRxResponse {
working_counter: lrw_wkc_sum,
subdevice_states,
extra: (),
})
}
pub async fn tx_rx_sync_system_time<'sto>(
&self,
maindevice: &'sto MainDevice<'sto>,
) -> Result<TxRxResponse<MAX_SUBDEVICES, Option<u64>>, Error> {
let mut pdi_lock = self.pdi.write();
fmt::trace!(
"Group TX/RX with DC sync, start address {:#010x}, data len {}, of which read bytes: {}",
self.inner().pdi_start.start_address,
self.pdi_len,
self.read_pdi_len
);
if let Some(dc_ref) = maindevice.dc_ref_address() {
let mut total_bytes_sent = 0;
let mut time = 0;
let mut lrw_wkc_sum = 0;
let mut time_read = false;
let mut subdevices = self.inner().subdevices.iter();
let mut total_checks = 0;
let mut subdevice_states = heapless::Vec::<_, MAX_SUBDEVICES>::new();
loop {
let mut frame = maindevice.pdu_loop.alloc_frame()?;
let dc_handle = if !time_read {
let dc_handle = frame.push_pdu(
Command::frmw(dc_ref, RegisterAddress::DcSystemTime.into()).into(),
0u64,
None,
)?;
debug_assert_eq!(dc_handle.alloc_size, DC_PDU_SIZE);
Some(dc_handle)
} else {
None
};
let chunk_start = total_bytes_sent.min(self.pdi_len);
let chunk_len = self.pdi_len.saturating_sub(total_bytes_sent);
let chunk = pdi_lock.get_mut()[chunk_start..(chunk_start + chunk_len)].as_ref();
let pushed_chunk = if !chunk.is_empty() {
let start_addr = self.inner().pdi_start.start_address + total_bytes_sent as u32;
frame.push_pdu_slice_rest(Command::lrw(start_addr).into(), chunk)?
} else {
None
};
if let Some((bytes_in_this_chunk, _)) = pushed_chunk {
fmt::trace!("Wrote {} byte chunk", bytes_in_this_chunk);
}
let (rest, num_checks_in_this_frame) = push_state_checks(subdevices, &mut frame)?;
subdevices = rest;
total_checks += num_checks_in_this_frame;
if frame.is_empty() {
break Ok(TxRxResponse {
working_counter: lrw_wkc_sum,
subdevice_states,
extra: Some(time),
});
}
let frame = frame.mark_sendable(
&maindevice.pdu_loop,
maindevice.timeouts.pdu(),
maindevice.config.retry_behaviour.retry_count(),
);
maindevice.pdu_loop.wake_sender();
let received = frame.await?;
let mut pdus = received.into_pdu_iter();
if dc_handle.is_some() {
let dc_pdu = pdus.next().ok_or(Error::Internal)?;
time =
dc_pdu.and_then(|rx| u64::unpack_from_slice(&rx).map_err(Error::from))?;
time_read = true;
}
if let Some((bytes_in_this_chunk, _pdu_handle)) = pushed_chunk {
let wkc = self.process_received_pdi_chunk(
total_bytes_sent,
bytes_in_this_chunk,
&pdus.next().ok_or(Error::Internal)??,
&mut pdi_lock,
)?;
total_bytes_sent += bytes_in_this_chunk;
lrw_wkc_sum += wkc;
}
for state_check_pdu in pdus {
let state_check_pdu = state_check_pdu?;
let state = AlControl::unpack_from_slice(&state_check_pdu)?;
let _ = subdevice_states.push(state.state);
}
if chunk_len == 0 && total_checks >= self.len() {
break Ok(TxRxResponse {
working_counter: lrw_wkc_sum,
subdevice_states,
extra: Some(time),
});
}
}
} else {
self.tx_rx(maindevice).await.map(|response| TxRxResponse {
working_counter: response.working_counter,
subdevice_states: response.subdevice_states,
extra: None,
})
}
}
fn process_received_pdi_chunk(
&self,
total_bytes_sent: usize,
bytes_in_this_chunk: usize,
data: &ReceivedPdu<'_>,
pdi_lock: &mut RwLockWriteGuard<'_, R, MySyncUnsafeCell<[u8; MAX_PDI]>>,
) -> Result<u16, Error> {
let wkc = data.working_counter;
let rx_range = total_bytes_sent.min(self.read_pdi_len)
..(total_bytes_sent + bytes_in_this_chunk).min(self.read_pdi_len);
let inputs_chunk = &mut pdi_lock.get_mut()[rx_range];
inputs_chunk.copy_from_slice(data.get(0..inputs_chunk.len()).ok_or(Error::Internal)?);
Ok(wkc)
}
}
impl<const MAX_SUBDEVICES: usize, const MAX_PDI: usize, R: RawRwLock, S>
SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, R, S, HasDc>
where
S: HasPdi,
{
pub async fn tx_rx_dc<'sto>(
&self,
maindevice: &'sto MainDevice<'sto>,
) -> Result<TxRxResponse<MAX_SUBDEVICES, CycleInfo>, Error> {
fmt::trace!(
"Group TX/RX with DC sync, start address {:#010x}, data len {}, of which read bytes: {}",
self.inner().pdi_start.start_address,
self.pdi_len,
self.read_pdi_len
);
let mut pdi_lock = self.pdi.write();
let mut total_bytes_sent = 0;
let mut time = 0;
let mut lrw_wkc_sum = 0;
let mut time_read = false;
let mut subdevices = self.inner().subdevices.iter();
let mut total_checks = 0;
let mut subdevice_states = heapless::Vec::<_, MAX_SUBDEVICES>::new();
loop {
let mut frame = maindevice.pdu_loop.alloc_frame()?;
let dc_handle = if !time_read {
let dc_handle = frame.push_pdu(
Command::frmw(self.dc_conf.reference, RegisterAddress::DcSystemTime.into())
.into(),
0u64,
None,
)?;
debug_assert_eq!(dc_handle.alloc_size, DC_PDU_SIZE);
Some(dc_handle)
} else {
None
};
let chunk_start = total_bytes_sent.min(self.pdi_len);
let chunk_len = self.pdi_len.saturating_sub(total_bytes_sent);
let chunk = pdi_lock.get_mut()[chunk_start..(chunk_start + chunk_len)].as_ref();
let pushed_chunk = if !chunk.is_empty() {
let start_addr = self.inner().pdi_start.start_address + total_bytes_sent as u32;
frame.push_pdu_slice_rest(Command::lrw(start_addr).into(), chunk)?
} else {
None
};
let (rest, num_checks_in_this_frame) = push_state_checks(subdevices, &mut frame)?;
subdevices = rest;
total_checks += num_checks_in_this_frame;
if frame.is_empty() {
break;
}
let frame = frame.mark_sendable(
&maindevice.pdu_loop,
maindevice.timeouts.pdu(),
maindevice.config.retry_behaviour.retry_count(),
);
maindevice.pdu_loop.wake_sender();
let received = frame.await?;
let mut pdus = received.into_pdu_iter();
if dc_handle.is_some() {
let dc_pdu = pdus.next().ok_or(Error::Internal)?;
time = dc_pdu.and_then(|rx| u64::unpack_from_slice(&rx).map_err(Error::from))?;
time_read = true;
}
if let Some((bytes_in_this_chunk, _pdu_handle)) = pushed_chunk {
let wkc = self.process_received_pdi_chunk(
total_bytes_sent,
bytes_in_this_chunk,
&pdus.next().ok_or(Error::Internal)??,
&mut pdi_lock,
)?;
total_bytes_sent += bytes_in_this_chunk;
lrw_wkc_sum += wkc;
}
for state_check_pdu in pdus {
let state_check_pdu = state_check_pdu?;
let state = AlControl::unpack_from_slice(&state_check_pdu)?;
let _ = subdevice_states.push(state.state);
}
if chunk_len == 0 && total_checks >= self.len() {
break;
}
}
let cycle_start_offset = time % self.dc_conf.sync0_period;
let time_to_next_iter =
(self.dc_conf.sync0_period - cycle_start_offset) + self.dc_conf.sync0_shift;
Ok(TxRxResponse {
working_counter: lrw_wkc_sum,
subdevice_states,
extra: CycleInfo {
dc_system_time: time,
cycle_start_offset: Duration::from_nanos(cycle_start_offset),
next_cycle_wait: Duration::from_nanos(time_to_next_iter),
},
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
MainDeviceConfig, PduStorage, Timeouts,
ethernet::{EthernetAddress, EthernetFrame},
pdu_loop::ReceivedFrame,
};
use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::{sync::Arc, thread};
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn tx_rx_miri() {
const MAX_SUBDEVICES: usize = 16;
const MAX_PDU_DATA: usize = PduStorage::element_size(8);
const MAX_FRAMES: usize = 128;
const MAX_PDI: usize = 128;
static PDU_STORAGE: PduStorage<MAX_FRAMES, MAX_PDU_DATA> = PduStorage::new();
crate::test_logger();
let (mock_net_tx, mock_net_rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(16);
let (mut tx, mut rx, pdu_loop) = PDU_STORAGE.try_split().expect("can only split once");
let stop = Arc::new(AtomicBool::new(false));
let stop1 = stop.clone();
let tx_handle = thread::spawn(move || {
fmt::info!("Spawn TX task");
while !stop1.load(Ordering::Relaxed) {
while let Some(frame) = tx.next_sendable_frame() {
fmt::info!("Sendable frame");
frame
.send_blocking(|bytes| {
mock_net_tx.send(bytes.to_vec()).unwrap();
Ok(bytes.len())
})
.unwrap();
thread::yield_now();
}
thread::sleep(Duration::from_millis(1));
}
});
let stop1 = stop.clone();
let rx_handle = thread::spawn(move || {
fmt::info!("Spawn RX task");
while let Ok(ethernet_frame) = mock_net_rx.recv() {
fmt::info!("RX task received packet");
thread::sleep(Duration::from_millis(1));
let ethernet_frame = {
let mut frame = EthernetFrame::new_checked(ethernet_frame).unwrap();
frame.set_src_addr(EthernetAddress([0x12, 0x10, 0x10, 0x10, 0x10, 0x10]));
frame.into_inner()
};
while rx.receive_frame(ðernet_frame).is_err() {}
thread::yield_now();
if stop1.load(Ordering::Relaxed) {
break;
}
}
});
let maindevice = Arc::new(MainDevice::new(
pdu_loop,
Timeouts {
pdu: Duration::from_secs(1),
wait_loop_delay: Duration::ZERO,
..Timeouts::default()
},
MainDeviceConfig::default(),
));
let group: SubDeviceGroup<MAX_SUBDEVICES, MAX_PDI, crate::DefaultLock, PreOpPdi, NoDc> =
SubDeviceGroup {
id: GroupId(0),
pdi: RwLock::new(MySyncUnsafeCell::new([0u8; MAX_PDI])),
read_pdi_len: 32,
pdi_len: 96,
inner: MySyncUnsafeCell::new(GroupInner {
subdevices: heapless::Vec::new(),
pdi_start: PdiOffset::default(),
}),
dc_conf: NoDc,
_state: PhantomData,
};
let out = group.tx_rx(&maindevice).await;
assert_eq!(
out,
Ok(TxRxResponse {
working_counter: 0,
subdevice_states: heapless::Vec::new(),
extra: ()
})
);
stop.store(true, Ordering::Relaxed);
tx_handle.join().unwrap();
rx_handle.join().unwrap();
}
#[test]
fn multi_state_checks_single_frame() {
const MAX_FRAMES: usize = 1;
const MAX_PDU_DATA: usize = PduStorage::element_size(AlControl::PACKED_LEN);
static PDU_STORAGE: PduStorage<MAX_FRAMES, MAX_PDU_DATA> = PduStorage::new();
crate::test_logger();
let (_tx, _rx, pdu_loop) = PDU_STORAGE.try_split().expect("can only split once");
let mut frame = pdu_loop.alloc_frame().expect("No frame");
assert!(
frame.can_push_pdu_payload(AlControl::PACKED_LEN),
"should be possible to push one status check PDU"
);
assert!(
!frame.can_push_pdu_payload(AlControl::PACKED_LEN + 12),
"test requires the frame to fit exactly one status check PDU"
);
let single_sd = vec![SubDevice {
..SubDevice::default()
}];
let subdevices = single_sd.iter();
let (rest, num_pushed) =
push_state_checks(subdevices, &mut frame).expect("Could not push status check");
assert_eq!(rest.count(), 0);
assert_eq!(num_pushed, single_sd.len());
assert!(!frame.can_push_pdu_payload(1), "frame should be full");
}
#[test]
fn multi_state_checks_space_left_over() {
const SPACE_LEFT: usize = 1;
const MAX_FRAMES: usize = 1;
const MAX_PDU_DATA: usize = (AlControl::PACKED_LEN + CreatedFrame::PDU_OVERHEAD_BYTES) * 2
+ (SPACE_LEFT + CreatedFrame::PDU_OVERHEAD_BYTES)
+ 16;
static PDU_STORAGE: PduStorage<MAX_FRAMES, MAX_PDU_DATA> = PduStorage::new();
crate::test_logger();
let (_tx, _rx, pdu_loop) = PDU_STORAGE.try_split().expect("can only split once");
let mut frame = pdu_loop.alloc_frame().expect("No frame");
let sds = vec![
SubDevice {
..SubDevice::default()
},
SubDevice {
..SubDevice::default()
},
SubDevice {
..SubDevice::default()
},
];
let subdevices = sds.iter();
let (rest, num_pushed) =
push_state_checks(subdevices, &mut frame).expect("Could not push status check");
assert_eq!(num_pushed, 2, "frame should hold two SD status checks");
assert_eq!(rest.count(), 1, "frame can only hold two SD status checks");
assert!(
frame.can_push_pdu_payload(SPACE_LEFT),
"frame has {} bytes available",
SPACE_LEFT
);
}
#[test]
fn large_group_frame_split() {
const MAX_SUBDEVICES: usize = 32;
const MAX_PDU_DATA: usize = PduStorage::element_size(256);
const MAX_FRAMES: usize = 32;
const MAX_PDI: usize = 512;
static PDU_STORAGE: PduStorage<MAX_FRAMES, MAX_PDU_DATA> = PduStorage::new();
crate::test_logger();
let (mock_net_tx, mock_net_rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(16);
let (mut tx, mut rx, pdu_loop) = PDU_STORAGE.try_split().expect("can only split once");
let maindevice = Arc::new(MainDevice::new(
pdu_loop,
Timeouts::default(),
MainDeviceConfig::default(),
));
let stop = Arc::new(AtomicBool::new(false));
let stop1 = stop.clone();
let tx_handle = thread::spawn(move || {
fmt::info!("Spawn TX task");
while !stop1.load(Ordering::Relaxed) {
while let Some(frame) = tx.next_sendable_frame() {
fmt::info!("Sendable frame");
frame
.send_blocking(|bytes| {
mock_net_tx.send(bytes.to_vec()).unwrap();
Ok(bytes.len())
})
.unwrap();
thread::yield_now();
}
}
});
let stop1 = stop.clone();
let rx_handle = thread::spawn(move || {
fmt::info!("Spawn RX task");
while let Ok(ethernet_frame) = mock_net_rx.recv() {
fmt::info!("RX task received packet");
let ethernet_frame = {
let mut frame = EthernetFrame::new_checked(ethernet_frame).unwrap();
frame.set_src_addr(EthernetAddress([0x12, 0x10, 0x10, 0x10, 0x10, 0x10]));
frame.into_inner()
};
while rx.receive_frame(ðernet_frame).is_err() {}
thread::yield_now();
if stop1.load(Ordering::Relaxed) {
break;
}
}
});
fn sd(addr: u16) -> SubDevice {
SubDevice {
configured_address: addr,
..SubDevice::default()
}
}
let subdevices = heapless::Vec::<_, MAX_SUBDEVICES>::from_slice(&[
sd(0x1000),
sd(0x1001),
sd(0x1002),
sd(0x1003),
sd(0x1004),
sd(0x1005),
sd(0x1006),
sd(0x1007),
sd(0x1008),
sd(0x1009),
sd(0x100a),
sd(0x100b),
sd(0x100c),
sd(0x100d),
sd(0x100e),
sd(0x100f),
])
.unwrap();
assert_eq!(subdevices.len(), 16);
let group = SubDeviceGroup::<MAX_SUBDEVICES, MAX_PDI, crate::DefaultLock, Op, HasDc> {
id: GroupId(0),
pdi: RwLock::new(MySyncUnsafeCell::new([0u8; MAX_PDI])),
read_pdi_len: 406,
pdi_len: 474,
inner: MySyncUnsafeCell::new(GroupInner {
subdevices,
pdi_start: PdiOffset { start_address: 0 },
}),
dc_conf: HasDc {
sync0_period: 100_000,
sync0_shift: 0,
reference: 0,
},
_state: PhantomData::<Op>,
};
cassette::block_on(group.tx_rx_dc(&maindevice)).unwrap();
stop.store(true, Ordering::Relaxed);
tx_handle.join().unwrap();
rx_handle.join().unwrap();
const PDI_FRAME_0: usize = 236;
const PDI_FRAME_1: usize = 238;
assert_eq!(PDI_FRAME_0 + PDI_FRAME_1, 474);
let expected_pdus = [
[
8, PDI_FRAME_0, ]
.as_slice(),
&[
PDI_FRAME_1, 2, ],
&[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2],
];
for (i, expected_lens) in expected_pdus.iter().enumerate() {
let f = maindevice
.pdu_loop
.test_only_storage_ref()
.frame_at_index(i);
let idx = AtomicU8::new(i as u8);
let b = ReceivedFrame::from_frame_element_for_test_only(f, &idx, MAX_PDU_DATA);
let expected_pdu_count = expected_lens.len();
let mut actual_pdu_count = 0;
for (pdu_idx, pdu) in b.into_pdu_iter().enumerate() {
let pdu = pdu.unwrap();
actual_pdu_count += 1;
assert_eq!(
pdu.len(),
expected_lens[pdu_idx],
"frame {}, PDU {} length",
i,
pdu_idx
);
}
assert_eq!(
actual_pdu_count, expected_pdu_count,
"frame {} PDU count",
i
);
}
let f = maindevice
.pdu_loop
.test_only_storage_ref()
.frame_at_index(3);
let idx = AtomicU8::new(3);
let b = ReceivedFrame::from_frame_element_for_test_only(f, &idx, MAX_PDU_DATA);
assert_eq!(b.into_pdu_iter().count(), 0);
}
}