use core_affinity::CoreId;
use env_logger::Env;
use ethercrab::{
DcSync, MainDevice, MainDeviceConfig, PduStorage, RegisterAddress, Timeouts, TxRxResponse,
error::Error,
std::{ethercat_now, tx_rx_task_xdp},
subdevice_group::{CycleInfo, DcConfiguration},
};
use futures_lite::StreamExt;
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};
use ta::Next;
use ta::indicators::ExponentialMovingAverage;
use thread_priority::{
RealtimeThreadSchedulePolicy, ThreadPriority, ThreadPriorityValue, ThreadSchedulePolicy,
};
const MAX_SUBDEVICES: usize = 16;
const MAX_PDU_DATA: usize = PduStorage::element_size(1100);
const MAX_FRAMES: usize = 32;
const PDI_LEN: usize = 64;
static PDU_STORAGE: PduStorage<MAX_FRAMES, MAX_PDU_DATA> = PduStorage::new();
const TICK_INTERVAL: Duration = Duration::from_micros(100);
fn main() -> Result<(), Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let interface = std::env::args()
.nth(1)
.expect("Provide network interface as first argument.");
log::info!("Starting XDP demo...");
log::info!("Run with RUST_LOG=ethercrab=debug or =trace for debug information");
let (tx, rx, pdu_loop) = PDU_STORAGE.try_split().expect("can only split once");
let maindevice = Arc::new(MainDevice::new(
pdu_loop,
Timeouts {
wait_loop_delay: Duration::from_millis(5),
state_transition: Duration::from_secs(10),
pdu: Duration::from_millis(2000),
..Timeouts::default()
},
MainDeviceConfig {
dc_static_sync_iterations: 10_000,
..MainDeviceConfig::default()
},
));
let mut tick_interval = smol::Timer::interval(TICK_INTERVAL);
thread_priority::ThreadBuilder::default()
.name("tx-rx-thread")
.priority(ThreadPriority::Crossplatform(
ThreadPriorityValue::try_from(49u8).unwrap(),
))
.policy(ThreadSchedulePolicy::Realtime(
RealtimeThreadSchedulePolicy::Fifo,
))
.spawn(move |_| {
core_affinity::set_for_current(CoreId { id: 0 })
.then_some(())
.expect("Set TX/RX thread core");
tx_rx_task_xdp(&interface, tx, rx).expect("TX/RX task");
})
.unwrap();
thread::sleep(Duration::from_millis(200));
core_affinity::set_for_current(CoreId { id: 1 })
.then_some(())
.expect("Set main task core");
#[cfg(target_os = "linux")]
thread_priority::set_current_thread_priority(thread_priority::ThreadPriority::Crossplatform(
thread_priority::ThreadPriorityValue::try_from(48u8).unwrap(),
))
.expect("Main thread prio");
smol::block_on(async {
let mut group = maindevice
.init_single_group::<MAX_SUBDEVICES, PDI_LEN>(ethercat_now)
.await
.expect("Init");
for mut slave in group.iter_mut(&maindevice) {
if slave.name() == "LAN9252-EVB-HBI" {
slave
.sdo_write(0x1c32, 1, 2u16)
.await
.expect("Set sync mode");
let cal_and_copy_time = slave
.sdo_read::<u16>(0x1c32, 6)
.await
.expect("Calc and copy time");
let delay_time = slave.sdo_read::<u16>(0x1c32, 9).await.expect("Delay time");
log::info!(
"LAN9252 calc time {} ns, delay time {} ns",
cal_and_copy_time,
delay_time,
);
slave
.sdo_write(0x1c32, 0x0a, TICK_INTERVAL.as_nanos() as u32)
.await
.expect("Set cycle time");
}
if slave.name() == "EL4102" {
log::info!("Found EL4102");
slave
.sdo_write(0x1c32, 1, 2u16)
.await
.expect("Set sync mode");
slave
.sdo_write(0x1c32, 0x02, TICK_INTERVAL.as_nanos() as u32)
.await
.expect("Set cycle time");
let cal_and_copy_time = slave
.sdo_read::<u16>(0x1c32, 6)
.await
.expect("Calc and copy time");
let delay_time = slave.sdo_read::<u16>(0x1c32, 9).await.expect("Delay time");
log::info!(
"--> Calc time {} ns, delay time {} ns",
cal_and_copy_time,
delay_time,
);
slave.set_dc_sync(DcSync::Sync01 {
sync1_period: Duration::from_nanos(100_000),
});
} else {
slave.set_dc_sync(DcSync::Sync0);
}
}
log::info!("Group has {} slaves", group.len());
let mut averages = Vec::new();
for _ in 0..group.len() {
averages.push(ExponentialMovingAverage::new(64).unwrap());
}
log::info!("Moving into PRE-OP with PDI");
let group = group.into_pre_op_pdi(&maindevice).await?;
log::info!("Done. PDI available. Waiting for SubDevices to align");
let mut now = Instant::now();
let start = Instant::now();
loop {
group
.tx_rx_sync_system_time(&maindevice)
.await
.expect("TX/RX");
if now.elapsed() >= Duration::from_millis(25) {
now = Instant::now();
let mut max_deviation = 0;
for (s1, ema) in group.iter(&maindevice).zip(averages.iter_mut()) {
let diff = match s1
.register_read::<u32>(RegisterAddress::DcSystemTimeDifference)
.await
{
Ok(value) =>
{
let flag = 0b1u32 << 31;
if value >= flag {
-((value & !flag) as i32)
} else {
value as i32
}
}
Err(Error::WorkingCounter { .. }) => 0,
Err(e) => return Err(e),
};
let ema_next = ema.next(diff as f64);
max_deviation = max_deviation.max(ema_next.abs() as u32);
}
log::debug!("--> Max deviation {} ns", max_deviation);
if max_deviation < 100 {
log::info!("Clocks settled after {} ms", start.elapsed().as_millis());
break;
}
}
tick_interval.next().await;
}
log::info!("Alignment done");
let group = group
.configure_dc_sync(
&maindevice,
DcConfiguration {
start_delay: Duration::from_millis(100),
sync0_period: TICK_INTERVAL,
sync0_shift: TICK_INTERVAL / 2,
},
)
.await?;
let group = group
.into_safe_op(&maindevice)
.await
.expect("PRE-OP -> SAFE-OP");
log::info!("SAFE-OP");
let term = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&term))
.expect("Register hook");
let mut print_tick = Instant::now();
let group = group
.request_into_op(&maindevice)
.await
.expect("SAFE-OP -> OP");
log::info!("OP requested");
let op_request = Instant::now();
loop {
let now = Instant::now();
let response @ TxRxResponse {
working_counter: _wkc,
extra: CycleInfo {
next_cycle_wait, ..
},
..
} = group.tx_rx_dc(&maindevice).await.expect("TX/RX");
if response.all_op() {
break;
}
smol::Timer::at(now + next_cycle_wait).await;
}
log::info!(
"All SubDevices entered OP in {} us",
op_request.elapsed().as_micros()
);
loop {
let now = Instant::now();
let TxRxResponse {
working_counter: _wkc,
extra:
CycleInfo {
next_cycle_wait,
cycle_start_offset,
..
},
..
} = group.tx_rx_dc(&maindevice).await.expect("TX/RX");
{
let cycle_start_offset = cycle_start_offset.as_nanos() as u64;
if print_tick.elapsed() > Duration::from_secs(1) {
print_tick = Instant::now();
log::info!(
"Offset from start of cycle {} ({:0.2} ms), next tick in {:0.3} ms",
cycle_start_offset,
(cycle_start_offset as f32) / 1000.0 / 1000.0,
(next_cycle_wait.as_nanos() as f32) / 1000.0 / 1000.0
);
}
}
for subdevice in group.iter(&maindevice) {
for byte in subdevice.outputs_raw_mut().iter_mut() {
*byte = byte.wrapping_add(1);
}
}
smol::Timer::at(now + next_cycle_wait).await;
if term.load(Ordering::Relaxed) {
log::info!("Exiting...");
break;
}
}
let group = group
.into_safe_op(&maindevice)
.await
.expect("OP -> SAFE-OP");
log::info!("OP -> SAFE-OP");
let group = group
.into_pre_op(&maindevice)
.await
.expect("SAFE-OP -> PRE-OP");
log::info!("SAFE-OP -> PRE-OP");
let _group = group.into_init(&maindevice).await.expect("PRE-OP -> INIT");
log::info!("PRE-OP -> INIT, shutdown complete");
Ok(())
})
}