1use crate::{
46 data::PduData,
47 registers,
48 rawmaster::{RawMaster, PduCommand, SlaveAddress},
49 error::{EthercatError, EthercatResult},
50 };
51use std::{
52 collections::HashMap,
53 time::{SystemTime, Instant, Duration},
54 sync::Arc,
55 };
56use core::sync::atomic::{AtomicI64, Ordering::*};
57
58use futures_concurrency::future::Join;
59
60
61
62
63
64pub struct DistributedClock {
72 master: Arc<RawMaster>,
74
75 start: Instant,
77 epoch: SystemTime,
79 offset: AtomicI64,
81 delay: u32,
83
84 referent: usize,
86 slaves: Vec<ClockSlave>,
88 index: HashMap<SlaveAddress, usize>,
90}
91#[derive(Debug)]
92struct ClockSlave {
93 address: SlaveAddress,
95 enabled: bool,
97 topology: [Option<usize>; 4],
99 offset: i64,
101 delay: u32,
103}
104
105
106type DLSlave = Vec<(u16, registers::DLInformation, registers::DLStatus)>;
107
108
109impl DistributedClock {
110 pub async fn new(
121 master: Arc<RawMaster>,
122 delays_samples: Option<usize>,
123 offsets_samples: Option<usize>,
124 ) -> EthercatResult<Self> {
125 let mut clock = Self {
127 master,
128
129 start: Instant::now(),
130 epoch: SystemTime::now(),
131 offset: AtomicI64::new(0),
132 delay: 0,
133
134 referent: 0,
135 slaves: Vec::new(),
136 index: HashMap::new(),
137 };
138
139 clock.master.bwr(registers::dc::param_2, dc_control_loop::PARAM_2_OMRON).await;
141 clock.master.bwr(registers::dc::param_0, dc_control_loop::PARAM_0_RESET).await;
143
144 let infos = clock.init_slaves().await?;
145 clock.init_topology(&infos).await?;
146 clock.init_delays(&infos, delays_samples.unwrap_or(8)).await?;
147 clock.init_offsets(offsets_samples.unwrap_or(15_000)).await?;
148
149 Ok(clock)
150 }
151
152 async fn init_slaves(&mut self) -> EthercatResult<DLSlave> {
153 let support = self.master.brd(registers::dl::information).await;
155 if support.answers == 0 || ! support.value()?.dc_supported()
156 {return Err(EthercatError::Master("no slave supporting clock"))}
157
158 let master = self.master.as_ref();
160 let infos = (0 .. support.answers).map(|slave| async move {
161 let (address, support, status) = (
162 master.aprd(slave, registers::address::fixed),
163 master.aprd(slave, registers::dl::information),
164 master.aprd(slave, registers::dl::status),
165 ).join().await;
166 Ok((address.one()?, support.one()?, status.one()?))
167 })
168 .collect::<Vec<_>>()
169 .join().await
170 .drain(..).collect::<EthercatResult<Vec<_>>>()?;
171
172 self.slaves = infos.iter().enumerate()
174 .map(|(index, (fixed, information, _))| ClockSlave {
175 address:
176 if *fixed == 0 {SlaveAddress::AutoIncremented(index as _)}
177 else {SlaveAddress::Fixed(*fixed)},
178 enabled:
179 information.dc_supported(),
180 topology: [None; 4],
181 offset: 0,
182 delay: 0,
183 })
184 .collect::<Vec<_>>();
185
186 self.referent = (0 .. self.slaves.len())
188 .find(|index| self.slaves[*index].enabled)
189 .ok_or(EthercatError::Protocol("cannot find first slave supporting clock"))?;
190
191 self.index = HashMap::from_iter(self.slaves
192 .iter().enumerate()
193 .map(|(index, slave)| (slave.address, index))
194 );
195
196 Ok(infos)
197 }
198
199 async fn init_topology(&mut self, infos: &DLSlave) -> EthercatResult {
201 let mut stack = Vec::<usize>::new();
202 for index in 0 .. infos.len() {
203 if index == 0 {
204 self.slaves[index].topology[0] = Some(0);
205 }
206 else {
207 let (parent, port) = loop {
208 let Some(&parent) = stack.last()
209 else {return Err(EthercatError::Protocol("topology identification failed due to wrong slave port activation"))};
210 if let Some(port) = (0 .. self.slaves[parent].topology.len())
211 .find(|&port| infos[parent].2.port_link_status_at(port) && self.slaves[parent].topology[port].is_none())
212 {break (parent, port)}
213 stack.pop();
214 };
215 self.slaves[parent].topology[port] = Some(index);
216 self.slaves[index].topology[0] = Some(parent);
217 }
218 stack.push(index);
219 }
220 Ok(())
222 }
223 async fn init_delays(&mut self, infos: &DLSlave, samples: usize) -> EthercatResult {
225 let mut stamps = vec![[0; 4]; infos.len()*samples];
227 let mut master = vec![[0; 2]; samples];
228 for i in 0 .. samples {
229 master[i][0] = self.reduced();
231 self.master.bwr(registers::dc::measure_time, 0).await;
232 master[i][1] = self.reduced();
233
234 let master = self.master.as_ref();
235 for (index, times) in self.slaves.iter()
236 .enumerate()
237 .filter(|(_, slave)| slave.enabled)
238 .map(|(index, slave)| async move {
239 (index, master.read(slave.address, registers::dc::received_time).await)
240 })
241 .collect::<Vec<_>>()
242 .join().await
243 {
244 stamps[i + index*samples] = times.one()?;
245 }
246 }
247
248 let mut transitions: u64 = 0;
251 for i in 0 .. samples {
252 let child = &stamps[i + self.referent*samples];
253 let child_before = 0;
254 let child_after = self.slaves[self.referent].topology.iter().enumerate().rev()
255 .find(|(_, &next)| next.is_some()).unwrap().0;
256
257 let transition = master[i][1].wrapping_sub(master[i][0])
258 - u64::from(child[child_after].wrapping_sub(child[child_before]));
259 transitions += transition;
260 }
261 self.delay = u32::try_from( transitions / (2*(samples as u64)) ).unwrap();
262
263 for index in 1 .. self.slaves.len() {
265 let parent = self.slaves[index].topology[0].unwrap();
268
269 let parent_after = self.slaves[parent].topology.iter().enumerate()
271 .find(|(_, &next)| next == Some(index)).unwrap().0;
272 let parent_before = self.slaves[parent].topology[0 .. parent_after].iter().enumerate().rev()
273 .find(|(_, &next)| next.is_some()).unwrap().0;
274
275 let child_before = 0;
276 let child_after = self.slaves[index].topology.iter().enumerate().rev()
277 .find(|(_, &next)| next.is_some()).unwrap().0;
278
279 let mut transitions: u64 = 0;
281 let mut ports: u64 = 0;
283
284 for i in 0 .. samples {
285 let child = &stamps[i + index*samples];
286 let parent = &stamps[i + parent*samples];
287 let transition = parent[parent_after].wrapping_sub(parent[parent_before])
288 - child[child_after].wrapping_sub(child[child_before]);
289 let port = parent[parent_before].wrapping_sub(parent[0]);
290 transitions += u64::from(transition);
294 ports += u64::from(port);
295 }
296
297 self.slaves[index].delay = self.slaves[parent].delay + u32::try_from(
298 transitions / (2*(samples as u64)) + ports / (samples as u64)
299 ).unwrap();
300 }
301 self.slaves.iter().map(|slave| async {
303 self.master.write(slave.address, registers::dc::system_delay, slave.delay).await.one()
304 })
305 .collect::<Vec<_>>()
306 .join().await
307 .drain(..).collect::<EthercatResult>()?;
308
309 Ok(())
310 }
311
312 async fn init_offsets(&mut self, samples: usize) -> EthercatResult {
314 let clock = self as *mut Self;
316
317 self.slaves.iter_mut()
319 .filter(|slave| slave.enabled)
320 .map(|slave| async move {
321 let clock = unsafe {&*clock};
322 let remote = clock.master.read(slave.address, registers::dc::local_time).await.one()?;
323 let local = clock.reduced();
324 let offset = local.wrapping_sub(remote);
325 clock.master.write(
326 slave.address,
327 registers::dc::system_offset,
328 offset,
329 ).await.one()?;
330 slave.offset = i64::from_ne_bytes(offset.to_ne_bytes());
331 Ok(())
332 })
333 .collect::<Vec<_>>()
334 .join().await
335 .drain(..).collect::<EthercatResult>()?;
336
337 for _ in 0 .. samples {
339 self.sync().await;
340 }
341 self.slaves.iter_mut()
343 .filter(|slave| slave.enabled)
344 .map(|slave| async move {
345 let clock = unsafe {&*clock};
346 slave.offset += i64::from(i32::from(clock.master.read(slave.address, registers::dc::system_difference).await.one()?));
347 clock.master.write(
348 slave.address,
349 registers::dc::system_offset,
350 u64::from_ne_bytes(slave.offset.to_ne_bytes()),
351 ).await.one()?;
352 EthercatResult::<(), ()>::Ok(())
353 })
354 .collect::<Vec<_>>()
355 .join().await
356 .drain(..).collect::<EthercatResult>()?;
357 Ok(())
358 }
359
360 pub fn referent(&self) -> SlaveAddress {
364 self.slaves[self.referent].address
365 }
366 pub fn system(&self) -> i128 {
368 i128::try_from(self.start.elapsed().as_nanos()).unwrap()
369 + i128::from(self.offset.load(SeqCst))
370 }
372
373 fn reduced(&self) -> u64 {
375 u64::try_from( self.start.elapsed().as_nanos() % u128::from(u64::MAX) ).unwrap()
376 }
377
378
379 pub fn epoch(&self) -> i128 {
384 self.epoch.duration_since(SystemTime::UNIX_EPOCH).unwrap()
385 .as_nanos()
386 .try_into().unwrap()
387 }
388
389 pub fn offset(&self, slave: SlaveAddress) -> i128 {
391 self.slaves[self.index[&slave]].offset.into()
392 }
393 pub fn delay(&self, slave: SlaveAddress) -> i128 {
395 self.slaves[self.index[&slave]].delay.into()
396 }
397
398 pub fn offset_master(&self) -> i128 {
400 self.offset.load(SeqCst).into()
401 }
402 pub fn delay_master(&self) -> i128 {
404 self.delay.into()
405 }
406
407
408 pub async fn sync(&self) {
412 let referent = self.referent();
414 let command = match referent {
415 SlaveAddress::AutoIncremented(_) => PduCommand::ARMW,
416 SlaveAddress::Fixed(_) => PduCommand::FRMW,
417 _ => unreachable!(),
418 };
419 let mut buffer = (0u64).packed().unwrap();
420 let sent = self.reduced();
421 let received = {
422 let mut command = self.master.topic(
423 command,
424 referent,
425 registers::dc::system_time.byte as u32,
426 &mut buffer,
427 ).await;
428 command.send(None).await;
429 self.master.flush();
430 command.wait().await;
431 command.receive(None).answers
432 };
433 if received != 0 {
435 let div = 512;
436 let offset = (u64::unpack(&buffer).unwrap())
437 .wrapping_sub(sent + u64::from(self.slaves[self.referent].delay));
438 self.offset.store(i64::try_from((
439 (div-1) * i128::from(self.offset.load(Relaxed))
440 + 1 * i128::from(i64::from_ne_bytes(offset.to_ne_bytes()))
441 )/div ).unwrap(), SeqCst);
442 }
443 }
445
446 pub async fn sync_loop(&self, period: Duration) {
455 use futures::stream::StreamExt;
456 let mut interval = tokio_timerfd::Interval::new_interval(period).unwrap();
457
458 loop {
459 interval.next().await.unwrap().unwrap();
460 self.sync().await;
461 }
462 }
463}
464
465#[allow(unused)]
466mod dc_control_loop {
467 pub const PARAM_0_RESET: u16 = 0x1000;
469 pub const PARAM_2_DISABLED: u16 = u16::from_le_bytes([0, 0]);
471 pub const PARAM_2_OMRON: u16 = u16::from_le_bytes([0, 12]);
473 pub const PARAM_2_REFERENCE_MASTER: u16 = u16::from_le_bytes([4, 12]);
475 pub const PARAM_2_GRAND_MASTER: u16 = u16::from_le_bytes([4, 0]);
477}