use std::{
sync::{Arc, Mutex},
time::Instant,
};
use core::{
ops::DerefMut,
time::Duration,
sync::atomic::AtomicU16,
sync::atomic::Ordering::*,
};
use tokio::{
task::JoinHandle,
sync::Notify,
};
use tokio_timerfd::{Delay, Interval};
use bilge::prelude::*;
use futures_concurrency::future::Race;
use futures::StreamExt;
use core::future::poll_fn;
use crate::{
error::{EthercatError, EthercatResult},
socket::*,
data::{self, Field, PduData, Storage, Cursor, PackingResult},
registers::ExternalEvent,
};
const MAX_ETHERCAT_FRAME: usize = 1500;
const MIN_PDU: usize = 60;
const MAX_ETHERCAT_PDU: usize = MAX_ETHERCAT_FRAME / MIN_PDU;
pub struct RawMaster {
pdu_merge_time: Duration,
pdu_timeout: Duration,
socket: Box<dyn EthercatSocket + Send + Sync>,
task: Mutex<Option<JoinHandle<()>>>,
received: Notify,
sendable: Notify,
sent: Notify,
state: Mutex<MasterState>,
}
struct MasterState {
last_start: usize,
last_end: usize,
ready: bool,
send: [u8; MAX_ETHERCAT_FRAME],
receive: [Option<PduState>; 2*MAX_ETHERCAT_PDU],
free: heapless::Vec<usize, {2*MAX_ETHERCAT_PDU}>,
buffered: Instant,
}
struct PduState {
data: &'static mut [u8],
ready: AtomicU16,
sent: Instant,
}
impl RawMaster {
pub fn new<S: EthercatSocket + 'static + Send + Sync>(socket: S) -> Arc<Self> {
let master = Arc::new(Self {
pdu_merge_time: Duration::from_micros(100),
pdu_timeout: Duration::from_millis(100),
socket: Box::new(socket),
received: Notify::new(),
sendable: Notify::new(),
sent: Notify::new(),
state: Mutex::new(MasterState {
last_start: EthercatHeader::packed_size(),
last_end: 0,
ready: false,
send: [0; MAX_ETHERCAT_FRAME],
receive: [0; 2*MAX_ETHERCAT_PDU].map(|_| None),
free: (0 .. 2*MAX_ETHERCAT_PDU).collect(),
buffered: Instant::now(),
}),
task: Mutex::new(None),
});
master.task.lock().unwrap().replace(tokio::task::spawn({
let master = master.clone();
async move { master.task_loop().await; }
}));
master
}
pub async fn brd<T: PduData>(&self, address: Field<T>) -> PduAnswer<T> {
self.read(SlaveAddress::Broadcast, address).await
}
pub async fn bwr<T: PduData>(&self, address: Field<T>, data: T) -> PduAnswer<()> {
self.write(SlaveAddress::Broadcast, address, data).await
}
pub async fn brw<T: PduData>(&self, address: Field<T>, data: T) -> PduAnswer<T> {
self.exchange(SlaveAddress::Broadcast, address, data).await
}
pub async fn aprd<T: PduData>(&self, slave: u16, address: Field<T>) -> PduAnswer<T> {
self.read(SlaveAddress::AutoIncremented(slave), address).await
}
pub async fn apwr<T: PduData>(&self, slave: u16, address: Field<T>, data: T) -> PduAnswer<()> {
self.write(SlaveAddress::AutoIncremented(slave), address, data).await
}
pub async fn aprw<T: PduData>(&self, slave: u16, address: Field<T>, data: T) -> PduAnswer<T> {
self.exchange(SlaveAddress::AutoIncremented(slave), address, data).await
}
pub async fn armw(&self) {todo!()}
pub async fn fprd<T: PduData>(&self, slave: u16, address: Field<T>) -> PduAnswer<T> {
self.read(SlaveAddress::Fixed(slave), address).await
}
pub async fn fpwr<T: PduData>(&self, slave: u16, address: Field<T>, data: T) -> PduAnswer<()> {
self.write(SlaveAddress::Fixed(slave), address, data).await
}
pub async fn fprw<T: PduData>(&self, slave: u16, address: Field<T>, data: T) -> PduAnswer<T> {
self.exchange(SlaveAddress::Fixed(slave), address, data).await
}
pub async fn frmw(&self) {todo!()}
pub async fn lrd<T: PduData>(&self, address: Field<T>) -> PduAnswer<T> {
self.read(SlaveAddress::Logical, address).await
}
pub async fn lwr<T: PduData>(&self, address: Field<T>, data: T) -> PduAnswer<()> {
self.write(SlaveAddress::Logical, address, data).await
}
pub async fn lrw<T: PduData>(&self, address: Field<T>, data: T) -> PduAnswer<T> {
self.exchange(SlaveAddress::Logical, address, data).await
}
pub async fn read<T: PduData>(&self, slave: SlaveAddress, memory: Field<T>) -> PduAnswer<T> {
let mut buffer = T::Packed::uninit();
buffer.as_mut().fill(0);
PduAnswer {
answers: self.read_slice(slave, memory.byte as _, &mut buffer.as_mut()[.. memory.len]).await.answers,
data: buffer,
}
}
pub async fn write<T: PduData>(&self, slave: SlaveAddress, memory: Field<T>, data: T) -> PduAnswer<()> {
let mut buffer = T::Packed::uninit();
data.pack(buffer.as_mut()).unwrap();
self.write_slice(slave, memory.byte as _, &mut buffer.as_mut()[.. memory.len]).await
}
pub async fn exchange<T: PduData>(&self, slave: SlaveAddress, memory: Field<T>, data: T) -> PduAnswer<T> {
let mut buffer = T::Packed::uninit();
data.pack(buffer.as_mut()).unwrap();
PduAnswer {
answers: self.exchange_slice(slave, memory.byte as _, &mut buffer.as_mut()[.. memory.len]).await.answers,
data: buffer,
}
}
pub async fn multiple<T: PduData>(&self, slave: SlaveAddress, memory: Field<T>, data: T) -> PduAnswer<T> {
let command = match slave {
SlaveAddress::AutoIncremented(_) => PduCommand::ARMW,
SlaveAddress::Fixed(_) => PduCommand::FRMW,
_ => unimplemented!("read-multiple-write can only be used with a specific slave for reading"),
};
let mut buffer = T::Packed::uninit();
data.pack(buffer.as_mut()).unwrap();
let answers = self.topic(command, slave, memory.byte as _, &mut buffer.as_mut()[.. memory.len]).await
.send(None).await
.wait().await
.receive(None).answers;
PduAnswer {
answers,
data: buffer,
}
}
pub async fn read_slice(&self, slave: SlaveAddress, memory: u32, data: &mut [u8]) -> PduAnswer<()> {
let command = match slave {
SlaveAddress::Broadcast => PduCommand::BRD,
SlaveAddress::AutoIncremented(_) => PduCommand::APRD,
SlaveAddress::Fixed(_) => PduCommand::FPRD,
SlaveAddress::Logical => PduCommand::LRD,
};
self.topic(command, slave, memory, data).await
.send(None).await
.wait().await
.receive(None)
}
pub async fn write_slice(&self, slave: SlaveAddress, memory: u32, data: &mut [u8]) -> PduAnswer<()> {
let command = match slave {
SlaveAddress::Broadcast => PduCommand::BWR,
SlaveAddress::AutoIncremented(_) => PduCommand::APWR,
SlaveAddress::Fixed(_) => PduCommand::FPWR,
SlaveAddress::Logical => PduCommand::LWR,
};
self.topic(command, slave, memory, data).await
.send(None).await
.wait().await
.receive(None)
}
pub async fn exchange_slice(&self, slave: SlaveAddress, memory: u32, data: &mut [u8]) -> PduAnswer<()> {
let command = match slave {
SlaveAddress::Broadcast => PduCommand::BRW,
SlaveAddress::AutoIncremented(_) => PduCommand::APRW,
SlaveAddress::Fixed(_) => PduCommand::FPRW,
SlaveAddress::Logical => PduCommand::LRW,
};
self.topic(command, slave, memory, data).await
.send(None).await
.wait().await
.receive(None)
}
pub async fn topic<'a>(&'a self, command: PduCommand, slave: SlaveAddress, memory: u32, buffer: &'a mut [u8]) -> Topic<'a> {
Topic::new(self, command, slave, memory, buffer).await
}
pub fn flush(&self) {
let mut state = self.state.lock().unwrap();
if state.last_end != 0 {
state.ready = true;
self.sendable.notify_one();
}
}
fn pdu_receive(&self, state: &mut MasterState, frame: &[u8]) -> EthercatResult<()> {
let _finisher = Finisher::new(|| self.received.notify_waiters());
let mut frame = Cursor::new(frame);
loop {
let header = frame.unpack::<PduHeader>()
.map_err(|_| EthercatError::Protocol("unable to unpack PDU header, skiping all remaning PDUs in frame"))?;
let token = usize::from(header.token());
if token >= state.receive.len()
{return Err(EthercatError::Protocol("received inconsistent PDU token"))}
if let Some(storage) = state.receive[token].as_mut() {
let content = frame.read(usize::from(u16::from(header.len())))
.map_err(|_| EthercatError::Protocol("PDU size mismatch"))?;
storage.data.copy_from_slice(content);
let footer = frame.unpack::<PduFooter>()
.map_err(|_| EthercatError::Protocol("unable to unpack PDU footer"))?;
storage.ready.store(footer.working_count().saturating_add(1), Relaxed);
}
if ! header.next() {break}
if frame.remain().len() == 0
{return Err(EthercatError::Protocol("inconsistent ethercat frame size: remaining unused data after PDUs"))}
}
Ok(())
}
async fn task_receive(&self) -> EthercatResult {
let mut receive = [0; MAX_ETHERCAT_FRAME];
loop {
let size = poll_fn(|cx| self.socket.poll_receive(cx, &mut receive) ).await?;
let mut frame = Cursor::new(&receive[.. size]);
let header = frame.unpack::<EthercatHeader>()?;
if frame.remain().len() < header.len().value() as usize
{return Err(EthercatError::Protocol("received frame header has inconsistent length"))}
let content = &frame.remain()[.. header.len().value() as usize];
assert!(header.len().value() as usize <= content.len());
match header.ty() {
EthercatType::PDU => self.pdu_receive(
self.state.lock().unwrap().deref_mut(),
content,
)?,
EthercatType::NetworkVariable => todo!(),
EthercatType::Mailbox => {},
}
}
}
async fn task_send(&self) -> EthercatResult {
let mut delay = Delay::new(Instant::now())?;
loop {
let delay = &mut delay;
let ready = loop {
self.sendable.notified().await;
let state = self.state.lock().unwrap();
if state.last_end != 0 {break state.ready}
};
let send = {
let mut state = if ready {
self.state.lock().unwrap()
}
else {
delay.reset(Instant::now() + self.pdu_merge_time);
(
async {
delay.await.unwrap();
self.state.lock().unwrap()
},
async { loop {
self.sendable.notified().await;
let state = self.state.lock().unwrap();
if state.ready {break state}
}},
).race().await
};
state.ready = true;
EthercatHeader::new(
u11::new((state.last_end - EthercatHeader::packed_size()) as u16),
EthercatType::PDU,
).pack(&mut state.send).unwrap();
unsafe {std::slice::from_raw_parts_mut(
state.send.as_mut_ptr(),
state.last_end,
)}
};
poll_fn(|cx| self.socket.poll_send(cx, &send) ).await?;
{
let mut state = self.state.lock().unwrap();
state.ready = false;
state.last_end = 0;
state.last_start = EthercatHeader::packed_size();
}
self.sent.notify_waiters();
}
}
async fn task_timeout(&self) -> EthercatResult {
let mut delay = Interval::new_interval(self.pdu_timeout)?;
loop {
delay.next().await.unwrap().unwrap();
let mut state = self.state.lock().unwrap();
let date = Instant::now();
for (token, storage) in state.receive.iter_mut().enumerate() {
if let Some(storage) = storage {
if date.duration_since(storage.sent) > self.pdu_timeout {
println!("token {} timeout", token);
storage.ready.store(1, Relaxed);
}
}
}
self.received.notify_waiters();
}
}
async fn task_loop(&self) {
(
async { self.task_receive().await.unwrap() },
async { self.task_send().await.unwrap() },
async { self.task_timeout().await.unwrap() },
).race().await
}
pub fn stop(&self) {
self.task.lock().unwrap().as_ref().map(|handle| handle.abort());
}
}
pub struct Topic<'a> {
master: &'a RawMaster,
ready: &'a AtomicU16,
token: usize,
command: PduCommand,
target: u32,
}
impl<'a> Topic<'a> {
pub async fn new(master: &'a RawMaster, command: PduCommand, slave: SlaveAddress, memory: u32, buffer: &'a mut [u8]) -> Topic<'a> {
let target = match slave {
SlaveAddress::Broadcast => u32::from(MemoryAddress::new(
0,
memory as u16,
)),
SlaveAddress::AutoIncremented(slave) => u32::from(MemoryAddress::new(
0u16.wrapping_sub(slave),
memory as u16,
)),
SlaveAddress::Fixed(slave) => u32::from(MemoryAddress::new(
slave,
memory as u16,
)),
SlaveAddress::Logical => memory,
};
let (token, ready);
loop {
{
let mut state = master.state.lock().unwrap();
if state.free.is_empty() {
master.received.notified()
}
else {
token = state.free.pop().unwrap();
state.receive[token] = Some(PduState {
sent: Instant::now(),
data: unsafe {std::slice::from_raw_parts_mut(
buffer.as_mut_ptr(),
buffer.len(),
)},
ready: AtomicU16::new(0),
});
ready = unsafe {&*(&state.receive[token].as_ref().unwrap().ready as *const AtomicU16)};
break
}
}.await;
}
Topic {
master,
ready,
token,
command,
target,
}
}
pub async fn send(&mut self, data: Option<&[u8]>) -> &'_ Self {
loop {
{
let mut state = self.master.state.lock().unwrap();
let data = data.unwrap_or(state.receive[self.token].as_ref().unwrap().data);
let data = unsafe {std::slice::from_raw_parts(
data.as_ptr(),
data.len(),
)};
if state.ready {
self.master.sent.notified()
}
else if ! (self.master.socket.max_frame() > state.last_end.max(state.last_start)
+ data.len()
+ PduHeader::packed_size()
+ PduFooter::packed_size()) {
state.ready = true;
self.master.sendable.notify_one();
self.master.sent.notified()
}
else {
if state.last_start <= state.last_end {
let range = state.last_start .. state.last_end;
let place = &mut state.send[range];
let mut header = PduHeader::unpack(place).unwrap();
header.set_next(true);
header.pack(place).unwrap();
}
else {
state.last_end = state.last_start;
}
let advance = {
let range = state.last_end ..;
let mut cursor = Cursor::new(&mut state.send[range]);
cursor.pack(&PduHeader::new(
self.command,
self.token as u8,
self.target,
u11::new(data.len().try_into().unwrap()),
false,
false,
ExternalEvent::default(),
)).unwrap();
cursor.write(data).unwrap();
cursor.pack(&PduFooter::new(0)).unwrap();
cursor.position()
};
state.last_start = state.last_end;
state.last_end = state.last_start + advance;
state.receive[self.token].as_mut().unwrap().sent = Instant::now();
state.buffered = Instant::now();
self.master.sendable.notify_one();
break
}
}.await;
}
self
}
pub fn available(&self) -> bool {
self.ready.load(SeqCst) != 0
}
pub async fn wait(&self) -> &'_ Self {
loop {
let notification = self.master.received.notified();
if self.available() {break}
notification.await;
}
self
}
pub fn receive(&self, data: Option<&mut [u8]>) -> PduAnswer<()> {
let answers = if let Some(data) = data {
let state = self.master.state.lock().unwrap();
let storage = state.receive[self.token].as_ref().unwrap();
data.copy_from_slice(storage.data);
self.ready.swap(0, Relaxed).saturating_sub(1)
}
else {
self.ready.swap(0, Relaxed).saturating_sub(1)
};
PduAnswer {
answers,
data: [],
}
}
}
impl Drop for Topic<'_> {
fn drop(&mut self) {
let mut state = self.master.state.lock().unwrap();
if state.receive[self.token].is_some() {
state.receive[self.token] = None;
state.free.push(self.token).unwrap();
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum SlaveAddress {
Broadcast,
AutoIncremented(u16),
Fixed(u16),
Logical,
}
pub struct PduAnswer<T: PduData> {
pub answers: u16,
pub data: T::Packed,
}
impl<T: PduData> PduAnswer<T> {
pub fn one(self) -> EthercatResult<T> {
self.exact(1)
}
pub fn exact(&self, n: u16) -> EthercatResult<T> {
if self.answers != n {
if self.answers == 0
{return Err(EthercatError::Protocol("no slave answered"))}
else if self.answers < n
{return Err(EthercatError::Protocol("to few slaves answered"))}
else if self.answers > n
{return Err(EthercatError::Protocol("to much slaves answered"))}
}
Ok(self.value()?)
}
pub fn any(&self) -> EthercatResult<T> {
if self.answers == 0
{return Err(EthercatError::Protocol("no slave answered"))}
Ok(self.value()?)
}
pub fn value(&self) -> PackingResult<T> {
T::unpack(self.data.as_ref())
}
}
#[bitsize(16)]
#[derive(TryFromBits, DebugBits, Copy, Clone)]
struct EthercatHeader {
len: u11,
reserved: u1,
ty: EthercatType,
}
data::bilge_pdudata!(EthercatHeader, u16);
#[bitsize(4)]
#[derive(TryFromBits, Debug, Copy, Clone)]
enum EthercatType {
PDU = 0x1,
NetworkVariable = 0x4,
Mailbox = 0x5,
}
#[bitsize(80)]
#[derive(FromBits, DebugBits, Clone, Default)]
struct PduHeader {
command: PduCommand,
token: u8,
address: u32,
len: u11,
reserved: u3,
circulating: bool,
next: bool,
interrupt: ExternalEvent,
}
data::bilge_pdudata!(PduHeader, u80);
#[bitsize(32)]
#[derive(FromBits, DebugBits, Copy, Clone, Eq, PartialEq, Hash)]
struct MemoryAddress {
slave: u16,
memory: u16,
}
#[bitsize(16)]
#[derive(FromBits, DebugBits, Copy, Clone, Default)]
struct PduFooter {
working_count: u16,
}
data::bilge_pdudata!(PduFooter, u16);
#[bitsize(8)]
#[derive(FromBits, Debug, Copy, Clone, Default)]
pub enum PduCommand {
#[fallback]
#[default]
NOP = 0x0,
BRD = 0x07,
BWR = 0x08,
BRW = 0x09,
APRD = 0x01,
APWR = 0x02,
APRW = 0x03,
FPRD = 0x04,
FPWR = 0x05,
FPRW = 0x06,
LRD = 0x0A,
LWR = 0x0B,
LRW = 0x0C,
ARMW = 0x0D,
FRMW = 0x0E,
}
struct Finisher<F: FnOnce()> {
callback: Option<F>,
}
impl<F: FnOnce()> Finisher<F> {
fn new(callback: F) -> Self {Self{callback: Some(callback)}}
}
impl<F: FnOnce()>
Drop for Finisher<F> {
fn drop(&mut self) {
if let Some(callback) = self.callback.take() {
callback();
}
}
}