darra-ethercat-master 2.0.5

商业 EtherCAT 主站协议栈 · 实时内核驱动 · 抖动 1µs · Windows + Linux · 多编程语言 · 全协议 · 支持复杂拓扑 + 热插拔 · ethercat.darra.xyz · Commercial EtherCAT Master protocol stack · Real-time kernel driver · 1µs jitter · Multi-platform · Multi-language · Complex topology + hot-plug.
//! 状态变化通道语法糖 (`mpsc::channel`)
//!
//! SDK 原生事件接口是回调式 (`events.on_slave_state_changed(|...|)`),
//! 在多线程消费场景下需要自己 `Arc<Mutex<...>>` 共享数据. 本模块把回调
//! 包装成标准库的 `std::sync::mpsc::Receiver`, 让用户用 `for ev in rx`
//! 这样的 Rust 惯用法消费事件流.
//!
//! ## 语义
//!
//! - 通道有界: 默认 1024 条事件; 满了**丢弃最老的** (滑动窗口语义),
//!   保证回调线程不会被慢消费者阻塞 (实时线程必须快速返回).
//! - 通道在 `MasterStateStream` 被 drop 时自动断开; 内部回调仍然挂在
//!   全局列表上 (SDK 限制), 但由于 `Sender` drop 后再发就拿不到 `Send::send`
//!   的接收者, 不会泄漏内存 — 只会有一次 `try_send` 失败.
//! - 多次调用 `state_stream()` 返回独立的 channel, 各自接收一份事件.
//!
//! # 示例
//!
//! ```no_run
//! use darra_ethercat::sugar::prelude::*;
//! use darra_ethercat::EtherCATMaster;
//!
//! let m = EtherCATMaster::new().unwrap();
//! let rx = m.state_stream();
//!
//! std::thread::spawn(move || {
//!     for ev in rx {
//!         println!("从站 {}: {} -> {}", ev.slave, ev.old_state, ev.new_state);
//!     }
//! });
//! ```

use crate::master::core::EtherCATMaster;
use std::sync::mpsc::{channel, sync_channel, Receiver, TryRecvError};
use std::sync::Mutex;

/// 状态变化事件
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SlaveStateChangeEvent {
    /// 主站索引
    pub master: u16,
    /// 从站索引 (1-based)
    pub slave: u16,
    /// 旧状态原始字节 (用 [`crate::EcState::from_raw`] 转枚举)
    pub old_state: i32,
    /// 新状态原始字节
    pub new_state: i32,
}

/// 紧急事件 (EMCY)
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EmergencyEvent {
    pub master: u16,
    pub slave: u16,
    pub error_code: u16,
    pub error_register: u16,
    pub b1: u8,
    pub w1: u16,
    pub w2: u16,
}

/// 主站状态流 (Receiver 包装)
///
/// 实现 `IntoIterator`, 可以直接 `for ev in stream`.
pub struct MasterStateStream {
    rx: Receiver<SlaveStateChangeEvent>,
}

impl MasterStateStream {
    /// 阻塞接收一条事件
    pub fn recv(&self) -> Result<SlaveStateChangeEvent, std::sync::mpsc::RecvError> {
        self.rx.recv()
    }

    /// 非阻塞尝试接收
    pub fn try_recv(&self) -> Result<SlaveStateChangeEvent, TryRecvError> {
        self.rx.try_recv()
    }

    /// 取出底层 [`Receiver`] (高级用法)
    pub fn into_inner(self) -> Receiver<SlaveStateChangeEvent> {
        self.rx
    }
}

impl IntoIterator for MasterStateStream {
    type Item = SlaveStateChangeEvent;
    type IntoIter = std::sync::mpsc::IntoIter<SlaveStateChangeEvent>;

    fn into_iter(self) -> Self::IntoIter {
        self.rx.into_iter()
    }
}

/// 主站紧急事件流
pub struct EmergencyStream {
    rx: Receiver<EmergencyEvent>,
}

impl EmergencyStream {
    pub fn recv(&self) -> Result<EmergencyEvent, std::sync::mpsc::RecvError> {
        self.rx.recv()
    }
    pub fn try_recv(&self) -> Result<EmergencyEvent, TryRecvError> {
        self.rx.try_recv()
    }
    pub fn into_inner(self) -> Receiver<EmergencyEvent> {
        self.rx
    }
}

impl IntoIterator for EmergencyStream {
    type Item = EmergencyEvent;
    type IntoIter = std::sync::mpsc::IntoIter<EmergencyEvent>;

    fn into_iter(self) -> Self::IntoIter {
        self.rx.into_iter()
    }
}

// ============================================================
// Master 端入口 trait
// ============================================================

/// 在 [`EtherCATMaster`] 上挂状态流 / 紧急流的 channel 入口.
pub trait MasterStreamExt {
    /// 创建从站状态变化事件流 (mpsc, 容量 1024, 默认覆盖最老)
    fn state_stream(&self) -> MasterStateStream;

    /// 创建从站状态变化事件流 (指定容量)
    fn state_stream_with_capacity(&self, cap: usize) -> MasterStateStream;

    /// 创建紧急事件流 (EMCY)
    fn emergency_stream(&self) -> EmergencyStream;
}

impl MasterStreamExt for EtherCATMaster {
    fn state_stream(&self) -> MasterStateStream {
        self.state_stream_with_capacity(1024)
    }

    fn state_stream_with_capacity(&self, cap: usize) -> MasterStateStream {
        let (tx, rx) = sync_channel::<SlaveStateChangeEvent>(cap);
        let tx = Mutex::new(tx);
        // 用 async 版本回调避免阻塞实时线程
        self.events().on_slave_state_changed_async(move |master, slave, old_state, new_state| {
            let ev = SlaveStateChangeEvent { master, slave, old_state, new_state };
            // 满时使用 try_send 丢最新; 这里语义上 mpsc::sync_channel 的 try_send
            // 在满时返回 Full, 我们直接丢弃 (不阻塞实时线程)
            if let Ok(guard) = tx.lock() {
                let _ = guard.try_send(ev);
            }
        });
        MasterStateStream { rx }
    }

    fn emergency_stream(&self) -> EmergencyStream {
        // EMCY 量级远低于 state, 用 unbounded channel 即可
        let (tx, rx) = channel::<EmergencyEvent>();
        let tx = Mutex::new(tx);
        self.events().on_emergency(move |master, slave, error_code, error_register, b1, w1, w2| {
            let ev = EmergencyEvent {
                master, slave, error_code, error_register, b1, w1, w2,
            };
            if let Ok(guard) = tx.lock() {
                let _ = guard.send(ev);
            }
        });
        EmergencyStream { rx }
    }
}