use crate::utils::ffi;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
type CbPdoCyclic = Arc<dyn Fn(u16) + Send + Sync + 'static>;
type CbSlaveStateChange = Arc<dyn Fn(u16, u16, i32, i32) + Send + Sync + 'static>;
type CbEmergency = Arc<dyn Fn(u16, u16, u16, u16, u8, u16, u16) + Send + Sync + 'static>;
type CbSlaveDiscovery = Arc<dyn Fn(u16, u16, bool) + Send + Sync + 'static>;
type CbPdoFrameLoss = Arc<dyn Fn(u16, u8, u32, u32) + Send + Sync + 'static>;
type CbDcSyncLost = Arc<dyn Fn(u16, u16, i32) + Send + Sync + 'static>;
type CbRedundancyChanged = Arc<dyn Fn(u16, i32, i32) + Send + Sync + 'static>;
type CbPreOpReconfig = Arc<dyn Fn(u16, u16) + Send + Sync + 'static>;
type CbSlaveIdentityMismatch = Arc<dyn Fn(SlaveIdentityMismatch) + Send + Sync + 'static>;
type CbSlavePortLinkChanged = Arc<dyn Fn(u16, u16, u8, bool) + Send + Sync + 'static>;
type Table<C> = RwLock<HashMap<u16, Vec<C>>>;
#[derive(Debug, Clone, Copy)]
pub struct SlaveIdentityMismatch {
pub master_index: u16,
pub slave_index: u16,
pub expected_vendor: u32,
pub expected_product: u32,
pub expected_revision: u32,
pub actual_vendor: u32,
pub actual_product: u32,
pub actual_revision: u32,
}
macro_rules! table_static {
($name:ident, $cb_ty:ty) => {
fn $name() -> &'static Table<$cb_ty> {
use std::sync::OnceLock;
static INSTANCE: OnceLock<Table<$cb_ty>> = OnceLock::new();
INSTANCE.get_or_init(|| RwLock::new(HashMap::new()))
}
};
}
table_static!(pdo_cyclic_sync_table, CbPdoCyclic);
table_static!(slave_state_change_table, CbSlaveStateChange);
table_static!(slave_state_change_async_table, CbSlaveStateChange);
table_static!(emergency_table, CbEmergency);
table_static!(slave_discovery_table, CbSlaveDiscovery);
table_static!(slave_discovery_async_table, CbSlaveDiscovery);
table_static!(pdo_frame_loss_table, CbPdoFrameLoss);
table_static!(dc_sync_lost_table, CbDcSyncLost);
table_static!(redundancy_changed_table, CbRedundancyChanged);
table_static!(preop_reconfig_table, CbPreOpReconfig);
table_static!(slave_identity_mismatch_table, CbSlaveIdentityMismatch);
table_static!(slave_port_link_changed_table, CbSlavePortLinkChanged);
fn offline_slaves() -> &'static Mutex<HashMap<u16, HashSet<u16>>> {
use std::sync::OnceLock;
static INSTANCE: OnceLock<Mutex<HashMap<u16, HashSet<u16>>>> = OnceLock::new();
INSTANCE.get_or_init(|| Mutex::new(HashMap::new()))
}
static DISPATCHER_REGISTERED: AtomicBool = AtomicBool::new(false);
pub fn initialize_callbacks() {
if DISPATCHER_REGISTERED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
unsafe {
ffi::RegisterProcessDataCyclicCallbackSync(on_pdo_cyclic_sync);
ffi::RegisterSlaveStateChangeCallbackSync(on_slave_state_change);
ffi::RegisterSlaveStateChangeCallbackAsync(on_slave_state_change_async);
ffi::RegisterEmergencyEventCallback(on_emergency);
ffi::RegisterSlaveDiscoveryCallbackSync(on_slave_discovery);
ffi::RegisterSlaveDiscoveryCallbackAsync(on_slave_discovery_async);
ffi::RegisterPDOFrameLossCallback(on_pdo_frame_loss);
ffi::SetDCSyncLostCallback(on_dc_sync_lost);
ffi::RegisterRedundancyModeChangedCallback(on_redundancy_changed);
ffi::RegisterSlavePreOpReconfigCallback(on_preop_reconfig);
ffi::RegisterSlaveIdentityMismatchCallback(on_slave_identity_mismatch);
ffi::RegisterSlavePortLinkChangedCallback(on_slave_port_link_changed);
}
}
pub fn shutdown_callbacks() {
if DISPATCHER_REGISTERED
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
unsafe {
ffi::UnregisterProcessDataCyclicCallbackSync(on_pdo_cyclic_sync);
ffi::UnregisterSlaveStateChangeCallbackSync(on_slave_state_change);
ffi::UnregisterSlaveStateChangeCallbackAsync(on_slave_state_change_async);
ffi::UnregisterEmergencyEventCallback(on_emergency);
ffi::UnregisterSlaveDiscoveryCallbackSync(on_slave_discovery);
ffi::UnregisterSlaveDiscoveryCallbackAsync(on_slave_discovery_async);
ffi::UnregisterPDOFrameLossCallback(on_pdo_frame_loss);
ffi::UnsetDCSyncLostCallback(on_dc_sync_lost);
ffi::UnregisterRedundancyModeChangedCallback(on_redundancy_changed);
ffi::UnregisterSlavePreOpReconfigCallback(on_preop_reconfig);
ffi::UnregisterSlaveIdentityMismatchCallback(on_slave_identity_mismatch);
ffi::UnregisterSlavePortLinkChangedCallback(on_slave_port_link_changed);
}
}
fn push_cb<C: Clone>(table: &Table<C>, mi: u16, cb: C) {
if let Ok(mut guard) = table.write() {
guard.entry(mi).or_default().push(cb);
}
}
fn snapshot<C: Clone>(table: &Table<C>, mi: u16) -> Option<Vec<C>> {
let guard = table.read().ok()?;
guard.get(&mi).map(|v| v.clone())
}
use std::panic::{catch_unwind, AssertUnwindSafe};
#[inline]
fn safe_invoke<F: FnOnce()>(name: &'static str, f: F) {
if let Err(_e) = catch_unwind(AssertUnwindSafe(f)) {
eprintln!("[Darra events] callback {} panicked (caught at FFI boundary)", name);
}
}
extern "C" fn on_pdo_cyclic_sync(master_index: u16) {
if let Some(cbs) = snapshot(pdo_cyclic_sync_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("on_pdo_cyclic_sync", || cb(master_index));
}
}
}
extern "C" fn on_slave_state_change(master_index: u16, slave_index: u16, old_state: i32, new_state: i32) {
if let Some(cbs) = snapshot(slave_state_change_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("state_change_dispatcher", || cb(master_index, slave_index, old_state, new_state));
}
}
}
extern "C" fn on_slave_state_change_async(master_index: u16, slave_index: u16, old_state: i32, new_state: i32) {
if let Some(cbs) = snapshot(slave_state_change_async_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("state_change_dispatcher", || cb(master_index, slave_index, old_state, new_state));
}
}
}
extern "C" fn on_emergency(
master_index: u16, slave_index: u16,
error_code: u16, error_reg: u16, b1: u8, w1: u16, w2: u16,
) {
if let Some(cbs) = snapshot(emergency_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("emergency", || cb(master_index, slave_index, error_code, error_reg, b1, w1, w2));
}
}
}
extern "C" fn on_slave_discovery(master_index: u16, slave_index: u16, is_found: i32) {
let found = is_found != 0;
if let Ok(mut map) = offline_slaves().lock() {
let set = map.entry(master_index).or_default();
if found {
set.remove(&slave_index);
} else {
set.insert(slave_index);
}
}
if let Some(cbs) = snapshot(slave_discovery_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("discovery", || cb(master_index, slave_index, found));
}
}
}
extern "C" fn on_slave_discovery_async(master_index: u16, slave_index: u16, is_found: i32) {
let found = is_found != 0;
if let Some(cbs) = snapshot(slave_discovery_async_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("discovery", || cb(master_index, slave_index, found));
}
}
}
extern "C" fn on_pdo_frame_loss(master_index: u16, group: u8, consecutive: u32, total: u32) {
if let Some(cbs) = snapshot(pdo_frame_loss_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("pdo_frame_loss", || cb(master_index, group, consecutive, total));
}
}
}
extern "C" fn on_dc_sync_lost(master_index: u16, slave_index: u16, diff_ns: i32) {
if let Some(cbs) = snapshot(dc_sync_lost_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("dc_sync_lost", || cb(master_index, slave_index, diff_ns));
}
}
}
extern "C" fn on_redundancy_changed(master_index: u16, old_mode: i32, new_mode: i32) {
if let Some(cbs) = snapshot(redundancy_changed_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("redundancy_changed", || cb(master_index, old_mode, new_mode));
}
}
}
extern "C" fn on_preop_reconfig(master_index: u16, slave_index: u16) {
if let Some(cbs) = snapshot(preop_reconfig_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("preop_reconfig", || cb(master_index, slave_index));
}
}
}
extern "C" fn on_slave_identity_mismatch(
master_index: u16,
slave_index: u16,
expected_vendor: u32,
expected_product: u32,
expected_revision: u32,
actual_vendor: u32,
actual_product: u32,
actual_revision: u32,
) {
let args = SlaveIdentityMismatch {
master_index,
slave_index,
expected_vendor,
expected_product,
expected_revision,
actual_vendor,
actual_product,
actual_revision,
};
if let Some(cbs) = snapshot(slave_identity_mismatch_table(), master_index) {
for cb in cbs.iter() {
safe_invoke("identity_mismatch", || cb(args));
}
}
}
extern "C" fn on_slave_port_link_changed(master_index: u16, slave_index: u16, port: u8, is_up: i32) {
let up = is_up != 0;
if let Some(cbs) = snapshot(slave_port_link_changed_table(), master_index) {
for cb in cbs.iter() {
cb(master_index, slave_index, port, up);
}
}
}
pub(crate) fn clear_master_callbacks(mi: u16) {
macro_rules! clear {
($t:expr) => {
if let Ok(mut g) = $t.write() {
g.remove(&mi);
}
};
}
clear!(pdo_cyclic_sync_table());
clear!(slave_state_change_table());
clear!(slave_state_change_async_table());
clear!(emergency_table());
clear!(slave_discovery_table());
clear!(slave_discovery_async_table());
clear!(pdo_frame_loss_table());
clear!(dc_sync_lost_table());
clear!(redundancy_changed_table());
clear!(preop_reconfig_table());
clear!(slave_identity_mismatch_table());
clear!(slave_port_link_changed_table());
if let Ok(mut map) = offline_slaves().lock() {
map.remove(&mi);
}
}
pub struct MasterEvents {
master_index: u16,
}
impl MasterEvents {
pub(crate) fn new(master_index: u16) -> Self {
initialize_callbacks();
Self { master_index }
}
pub fn on_pdo_cyclic_sync<F>(&self, callback: F)
where
F: Fn(u16) + Send + Sync + 'static,
{
push_cb(pdo_cyclic_sync_table(), self.master_index, Arc::new(callback));
}
pub fn on_slave_state_changed<F>(&self, callback: F)
where
F: Fn(u16, u16, i32, i32) + Send + Sync + 'static,
{
push_cb(slave_state_change_table(), self.master_index, Arc::new(callback));
}
pub fn on_slave_state_changed_async<F>(&self, callback: F)
where
F: Fn(u16, u16, i32, i32) + Send + Sync + 'static,
{
push_cb(slave_state_change_async_table(), self.master_index, Arc::new(callback));
}
pub fn on_emergency<F>(&self, callback: F)
where
F: Fn(u16, u16, u16, u16, u8, u16, u16) + Send + Sync + 'static,
{
push_cb(emergency_table(), self.master_index, Arc::new(callback));
}
pub fn on_slave_offline<F>(&self, callback: F)
where
F: Fn(u16) + Send + Sync + 'static,
{
let cb: CbSlaveDiscovery = Arc::new(move |_master: u16, slave: u16, found: bool| {
if !found {
callback(slave);
}
});
push_cb(slave_discovery_table(), self.master_index, cb);
}
pub fn on_slave_online<F>(&self, callback: F)
where
F: Fn(u16) + Send + Sync + 'static,
{
let cb: CbSlaveDiscovery = Arc::new(move |_master: u16, slave: u16, found: bool| {
if found {
callback(slave);
}
});
push_cb(slave_discovery_table(), self.master_index, cb);
}
pub fn on_slave_discovery<F>(&self, callback: F)
where
F: Fn(u16, u16, bool) + Send + Sync + 'static,
{
push_cb(slave_discovery_table(), self.master_index, Arc::new(callback));
}
pub fn on_slave_discovery_async<F>(&self, callback: F)
where
F: Fn(u16, u16, bool) + Send + Sync + 'static,
{
push_cb(slave_discovery_async_table(), self.master_index, Arc::new(callback));
}
pub fn on_pdo_frame_loss<F>(&self, callback: F)
where
F: Fn(u16, u8, u32, u32) + Send + Sync + 'static,
{
push_cb(pdo_frame_loss_table(), self.master_index, Arc::new(callback));
}
pub fn on_dc_sync_lost<F>(&self, callback: F)
where
F: Fn(u16, u16, i32) + Send + Sync + 'static,
{
push_cb(dc_sync_lost_table(), self.master_index, Arc::new(callback));
}
pub fn on_redundancy_mode_changed<F>(&self, callback: F)
where
F: Fn(u16, i32, i32) + Send + Sync + 'static,
{
push_cb(redundancy_changed_table(), self.master_index, Arc::new(callback));
}
pub fn on_preop_reconfig<F>(&self, callback: F)
where
F: Fn(u16, u16) + Send + Sync + 'static,
{
push_cb(preop_reconfig_table(), self.master_index, Arc::new(callback));
}
pub fn on_slave_identity_mismatch<F>(&self, callback: F)
where
F: Fn(SlaveIdentityMismatch) + Send + Sync + 'static,
{
push_cb(slave_identity_mismatch_table(), self.master_index, Arc::new(callback));
}
pub fn on_slave_port_link_changed<F>(&self, callback: F)
where
F: Fn(u16, u16, u8, bool) + Send + Sync + 'static,
{
push_cb(slave_port_link_changed_table(), self.master_index, Arc::new(callback));
}
pub fn clear_all(&self) {
clear_master_callbacks(self.master_index);
}
pub fn is_slave_offline(&self, slave_index: u16) -> bool {
if let Ok(map) = offline_slaves().lock() {
map.get(&self.master_index)
.map(|s| s.contains(&slave_index))
.unwrap_or(false)
} else {
false
}
}
pub fn offline_slaves(&self) -> Vec<u16> {
if let Ok(map) = offline_slaves().lock() {
map.get(&self.master_index)
.map(|s| s.iter().copied().collect())
.unwrap_or_default()
} else {
Vec::new()
}
}
pub fn offline_slave_count(&self) -> usize {
if let Ok(map) = offline_slaves().lock() {
map.get(&self.master_index).map(|s| s.len()).unwrap_or(0)
} else {
0
}
}
}
pub struct SlaveEvents {
master_index: u16,
slave_index: u16,
}
impl SlaveEvents {
pub(crate) fn new(master_index: u16, slave_index: u16) -> Self {
initialize_callbacks();
Self { master_index, slave_index }
}
pub fn on_state_changed<F>(&self, callback: F)
where
F: Fn(i32, i32) + Send + Sync + 'static,
{
let target_slave = self.slave_index;
let cb: CbSlaveStateChange = Arc::new(move |_master, slave, old_state, new_state| {
if slave == target_slave {
callback(old_state, new_state);
}
});
push_cb(slave_state_change_table(), self.master_index, cb);
}
pub fn on_emergency<F>(&self, callback: F)
where
F: Fn(u16, u16, u8, u16, u16) + Send + Sync + 'static,
{
let target_slave = self.slave_index;
let cb: CbEmergency = Arc::new(move |_master, slave, ec, er, b1, w1, w2| {
if slave == target_slave {
callback(ec, er, b1, w1, w2);
}
});
push_cb(emergency_table(), self.master_index, cb);
}
pub fn on_offline<F>(&self, callback: F)
where
F: Fn() + Send + Sync + 'static,
{
let target_slave = self.slave_index;
let cb: CbSlaveDiscovery = Arc::new(move |_master, slave, found| {
if slave == target_slave && !found {
callback();
}
});
push_cb(slave_discovery_table(), self.master_index, cb);
}
pub fn on_online<F>(&self, callback: F)
where
F: Fn() + Send + Sync + 'static,
{
let target_slave = self.slave_index;
let cb: CbSlaveDiscovery = Arc::new(move |_master, slave, found| {
if slave == target_slave && found {
callback();
}
});
push_cb(slave_discovery_table(), self.master_index, cb);
}
pub fn on_dc_sync_lost<F>(&self, callback: F)
where
F: Fn(i32) + Send + Sync + 'static,
{
let target_slave = self.slave_index;
let cb: CbDcSyncLost = Arc::new(move |_master, slave, diff_ns| {
if slave == target_slave {
callback(diff_ns);
}
});
push_cb(dc_sync_lost_table(), self.master_index, cb);
}
#[deprecated(since = "1.99.6", note = "noop in current architecture; use MasterEvents::clear_all() to clear the whole master bucket")]
pub fn clear_all(&self) {
eprintln!(
"SlaveEvents::clear_all is a noop (deprecated). Use MasterEvents::clear_all() to clear master_index={}.",
self.master_index
);
}
}