use crate::{
data::PduData,
registers,
rawmaster::{RawMaster, PduCommand, SlaveAddress},
error::{EthercatError, EthercatResult},
};
use std::{
collections::HashMap,
time::{SystemTime, Instant, Duration},
sync::Arc,
};
use core::sync::atomic::{AtomicI64, Ordering::*};
use futures_concurrency::future::Join;
pub struct DistributedClock {
master: Arc<RawMaster>,
start: Instant,
epoch: SystemTime,
offset: AtomicI64,
delay: u32,
referent: usize,
slaves: Vec<ClockSlave>,
index: HashMap<SlaveAddress, usize>,
}
#[derive(Debug)]
struct ClockSlave {
address: SlaveAddress,
enabled: bool,
topology: [Option<usize>; 4],
offset: i64,
delay: u32,
}
type DLSlave = Vec<(u16, registers::DLInformation, registers::DLStatus)>;
impl DistributedClock {
pub async fn new(
master: Arc<RawMaster>,
delays_samples: Option<usize>,
offsets_samples: Option<usize>,
) -> EthercatResult<Self> {
let mut clock = Self {
master,
start: Instant::now(),
epoch: SystemTime::now(),
offset: AtomicI64::new(0),
delay: 0,
referent: 0,
slaves: Vec::new(),
index: HashMap::new(),
};
clock.master.bwr(registers::dc::param_2, dc_control_loop::PARAM_2_OMRON).await;
clock.master.bwr(registers::dc::param_0, dc_control_loop::PARAM_0_RESET).await;
let infos = clock.init_slaves().await?;
clock.init_topology(&infos).await?;
clock.init_delays(&infos, delays_samples.unwrap_or(8)).await?;
clock.init_offsets(offsets_samples.unwrap_or(15_000)).await?;
Ok(clock)
}
async fn init_slaves(&mut self) -> EthercatResult<DLSlave> {
let support = self.master.brd(registers::dl::information).await;
if support.answers == 0 || ! support.value()?.dc_supported()
{return Err(EthercatError::Master("no slave supporting clock"))}
let master = self.master.as_ref();
let infos = (0 .. support.answers).map(|slave| async move {
let (address, support, status) = (
master.aprd(slave, registers::address::fixed),
master.aprd(slave, registers::dl::information),
master.aprd(slave, registers::dl::status),
).join().await;
Ok((address.one()?, support.one()?, status.one()?))
})
.collect::<Vec<_>>()
.join().await
.drain(..).collect::<EthercatResult<Vec<_>>>()?;
self.slaves = infos.iter().enumerate()
.map(|(index, (fixed, information, _))| ClockSlave {
address:
if *fixed == 0 {SlaveAddress::AutoIncremented(index as _)}
else {SlaveAddress::Fixed(*fixed)},
enabled:
information.dc_supported(),
topology: [None; 4],
offset: 0,
delay: 0,
})
.collect::<Vec<_>>();
self.referent = (0 .. self.slaves.len())
.find(|index| self.slaves[*index].enabled)
.ok_or(EthercatError::Protocol("cannot find first slave supporting clock"))?;
self.index = HashMap::from_iter(self.slaves
.iter().enumerate()
.map(|(index, slave)| (slave.address, index))
);
Ok(infos)
}
async fn init_topology(&mut self, infos: &DLSlave) -> EthercatResult {
let mut stack = Vec::<usize>::new();
for index in 0 .. infos.len() {
if index == 0 {
self.slaves[index].topology[0] = Some(0);
}
else {
let (parent, port) = loop {
let Some(&parent) = stack.last()
else {return Err(EthercatError::Protocol("topology identification failed due to wrong slave port activation"))};
if let Some(port) = (0 .. self.slaves[parent].topology.len())
.find(|&port| infos[parent].2.port_link_status_at(port) && self.slaves[parent].topology[port].is_none())
{break (parent, port)}
stack.pop();
};
self.slaves[parent].topology[port] = Some(index);
self.slaves[index].topology[0] = Some(parent);
}
stack.push(index);
}
Ok(())
}
async fn init_delays(&mut self, infos: &DLSlave, samples: usize) -> EthercatResult {
let mut stamps = vec![[0; 4]; infos.len()*samples];
let mut master = vec![[0; 2]; samples];
for i in 0 .. samples {
master[i][0] = self.reduced();
self.master.bwr(registers::dc::measure_time, 0).await;
master[i][1] = self.reduced();
let master = self.master.as_ref();
for (index, times) in self.slaves.iter()
.enumerate()
.filter(|(_, slave)| slave.enabled)
.map(|(index, slave)| async move {
(index, master.read(slave.address, registers::dc::received_time).await)
})
.collect::<Vec<_>>()
.join().await
{
stamps[i + index*samples] = times.one()?;
}
}
let mut transitions: u64 = 0;
for i in 0 .. samples {
let child = &stamps[i + self.referent*samples];
let child_before = 0;
let child_after = self.slaves[self.referent].topology.iter().enumerate().rev()
.find(|(_, &next)| next.is_some()).unwrap().0;
let transition = master[i][1].wrapping_sub(master[i][0])
- u64::from(child[child_after].wrapping_sub(child[child_before]));
transitions += transition;
}
self.delay = u32::try_from( transitions / (2*(samples as u64)) ).unwrap();
for index in 1 .. self.slaves.len() {
let parent = self.slaves[index].topology[0].unwrap();
let parent_after = self.slaves[parent].topology.iter().enumerate()
.find(|(_, &next)| next == Some(index)).unwrap().0;
let parent_before = self.slaves[parent].topology[0 .. parent_after].iter().enumerate().rev()
.find(|(_, &next)| next.is_some()).unwrap().0;
let child_before = 0;
let child_after = self.slaves[index].topology.iter().enumerate().rev()
.find(|(_, &next)| next.is_some()).unwrap().0;
let mut transitions: u64 = 0;
let mut ports: u64 = 0;
for i in 0 .. samples {
let child = &stamps[i + index*samples];
let parent = &stamps[i + parent*samples];
let transition = parent[parent_after].wrapping_sub(parent[parent_before])
- child[child_after].wrapping_sub(child[child_before]);
let port = parent[parent_before].wrapping_sub(parent[0]);
transitions += u64::from(transition);
ports += u64::from(port);
}
self.slaves[index].delay = self.slaves[parent].delay + u32::try_from(
transitions / (2*(samples as u64)) + ports / (samples as u64)
).unwrap();
}
self.slaves.iter().map(|slave| async {
self.master.write(slave.address, registers::dc::system_delay, slave.delay).await.one()
})
.collect::<Vec<_>>()
.join().await
.drain(..).collect::<EthercatResult>()?;
Ok(())
}
async fn init_offsets(&mut self, samples: usize) -> EthercatResult {
let clock = self as *mut Self;
self.slaves.iter_mut()
.filter(|slave| slave.enabled)
.map(|slave| async move {
let clock = unsafe {&*clock};
let remote = clock.master.read(slave.address, registers::dc::local_time).await.one()?;
let local = clock.reduced();
let offset = local.wrapping_sub(remote);
clock.master.write(
slave.address,
registers::dc::system_offset,
offset,
).await.one()?;
slave.offset = i64::from_ne_bytes(offset.to_ne_bytes());
Ok(())
})
.collect::<Vec<_>>()
.join().await
.drain(..).collect::<EthercatResult>()?;
for _ in 0 .. samples {
self.sync().await;
}
self.slaves.iter_mut()
.filter(|slave| slave.enabled)
.map(|slave| async move {
let clock = unsafe {&*clock};
slave.offset += i64::from(i32::from(clock.master.read(slave.address, registers::dc::system_difference).await.one()?));
clock.master.write(
slave.address,
registers::dc::system_offset,
u64::from_ne_bytes(slave.offset.to_ne_bytes()),
).await.one()?;
EthercatResult::<(), ()>::Ok(())
})
.collect::<Vec<_>>()
.join().await
.drain(..).collect::<EthercatResult>()?;
Ok(())
}
pub fn referent(&self) -> SlaveAddress {
self.slaves[self.referent].address
}
pub fn system(&self) -> i128 {
i128::try_from(self.start.elapsed().as_nanos()).unwrap()
+ i128::from(self.offset.load(SeqCst))
}
fn reduced(&self) -> u64 {
u64::try_from( self.start.elapsed().as_nanos() % u128::from(u64::MAX) ).unwrap()
}
pub fn epoch(&self) -> i128 {
self.epoch.duration_since(SystemTime::UNIX_EPOCH).unwrap()
.as_nanos()
.try_into().unwrap()
}
pub fn offset(&self, slave: SlaveAddress) -> i128 {
self.slaves[self.index[&slave]].offset.into()
}
pub fn delay(&self, slave: SlaveAddress) -> i128 {
self.slaves[self.index[&slave]].delay.into()
}
pub fn offset_master(&self) -> i128 {
self.offset.load(SeqCst).into()
}
pub fn delay_master(&self) -> i128 {
self.delay.into()
}
pub async fn sync(&self) {
let referent = self.referent();
let command = match referent {
SlaveAddress::AutoIncremented(_) => PduCommand::ARMW,
SlaveAddress::Fixed(_) => PduCommand::FRMW,
_ => unreachable!(),
};
let mut buffer = (0u64).packed().unwrap();
let sent = self.reduced();
let received = {
let mut command = self.master.topic(
command,
referent,
registers::dc::system_time.byte as u32,
&mut buffer,
).await;
command.send(None).await;
self.master.flush();
command.wait().await;
command.receive(None).answers
};
if received != 0 {
let div = 512;
let offset = (u64::unpack(&buffer).unwrap())
.wrapping_sub(sent + u64::from(self.slaves[self.referent].delay));
self.offset.store(i64::try_from((
(div-1) * i128::from(self.offset.load(Relaxed))
+ 1 * i128::from(i64::from_ne_bytes(offset.to_ne_bytes()))
)/div ).unwrap(), SeqCst);
}
}
pub async fn sync_loop(&self, period: Duration) {
use futures::stream::StreamExt;
let mut interval = tokio_timerfd::Interval::new_interval(period).unwrap();
loop {
interval.next().await.unwrap().unwrap();
self.sync().await;
}
}
}
#[allow(unused)]
mod dc_control_loop {
pub const PARAM_0_RESET: u16 = 0x1000;
pub const PARAM_2_DISABLED: u16 = u16::from_le_bytes([0, 0]);
pub const PARAM_2_OMRON: u16 = u16::from_le_bytes([0, 12]);
pub const PARAM_2_REFERENCE_MASTER: u16 = u16::from_le_bytes([4, 12]);
pub const PARAM_2_GRAND_MASTER: u16 = u16::from_le_bytes([4, 0]);
}