darra-ethercat-master 2.0.5

商业 EtherCAT 主站协议栈 · 实时内核驱动 · 抖动 1µs · Windows + Linux · 多编程语言 · 全协议 · 支持复杂拓扑 + 热插拔 · ethercat.darra.xyz · Commercial EtherCAT Master protocol stack · Real-time kernel driver · 1µs jitter · Multi-platform · Multi-language · Complex topology + hot-plug.
//! 异步事件流 (tokio, 仅 `async-tokio` feature)
//!
//! 把 [`crate::sugar::state_stream::MasterStateStream`] 包装成实现
//! [`futures_core::Stream`] 的等价物, 让用户在 tokio 应用里用
//! `while let Some(ev) = stream.next().await` 消费事件.
//!
//! 由于本 crate 不强依赖 `futures` (`tokio` 默认的 stream 通过
//! `tokio_stream::Stream` 暴露), 这里采用最小依赖路径: 用 tokio 的
//! `tokio::sync::mpsc::UnboundedReceiver` 重新走一遍, 让 `.recv().await`
//! 直接成立.
//!
//! ## 设计折衷
//!
//! - 不引入 `futures` / `tokio-stream` 额外依赖, 避免 SDK 体积膨胀.
//! - 提供 `recv().await` (tokio 自家 API) + `into_inner()` 返回原始
//!   `UnboundedReceiver`, 用户若要 `Stream` trait 可以自行 `ReceiverStream::new(rx)`.
//!
//! # 示例 (需 `--features async-tokio`)
//!
//! ```ignore
//! use darra_ethercat::sugar::async_stream::*;
//! use darra_ethercat::EtherCATMaster;
//!
//! #[tokio::main]
//! async fn main() {
//!     let m = EtherCATMaster::new().unwrap();
//!     let mut stream = m.state_stream_async();
//!     while let Some(ev) = stream.recv().await {
//!         println!("从站 {}: {} -> {}", ev.slave, ev.old_state, ev.new_state);
//!     }
//! }
//! ```

use crate::master::core::EtherCATMaster;
use crate::sugar::state_stream::{SlaveStateChangeEvent, EmergencyEvent};
use std::sync::Mutex;
use tokio::sync::mpsc;

/// 异步状态流 (tokio mpsc 包装)
pub struct MasterStateStreamAsync {
    rx: mpsc::UnboundedReceiver<SlaveStateChangeEvent>,
}

impl MasterStateStreamAsync {
    /// 异步接收一条事件 (None 表示 channel 关闭)
    pub async fn recv(&mut self) -> Option<SlaveStateChangeEvent> {
        self.rx.recv().await
    }

    /// 取出底层 [`mpsc::UnboundedReceiver`] (高级用法, 例如 `ReceiverStream::new(rx)`)
    pub fn into_inner(self) -> mpsc::UnboundedReceiver<SlaveStateChangeEvent> {
        self.rx
    }
}

/// 异步紧急事件流
pub struct EmergencyStreamAsync {
    rx: mpsc::UnboundedReceiver<EmergencyEvent>,
}

impl EmergencyStreamAsync {
    pub async fn recv(&mut self) -> Option<EmergencyEvent> {
        self.rx.recv().await
    }
    pub fn into_inner(self) -> mpsc::UnboundedReceiver<EmergencyEvent> {
        self.rx
    }
}

/// Master 端异步流入口
pub trait MasterStreamAsyncExt {
    fn state_stream_async(&self) -> MasterStateStreamAsync;
    fn emergency_stream_async(&self) -> EmergencyStreamAsync;
}

impl MasterStreamAsyncExt for EtherCATMaster {
    fn state_stream_async(&self) -> MasterStateStreamAsync {
        let (tx, rx) = mpsc::unbounded_channel::<SlaveStateChangeEvent>();
        let tx = Mutex::new(tx);
        self.events().on_slave_state_changed_async(move |master, slave, old_state, new_state| {
            let ev = SlaveStateChangeEvent { master, slave, old_state, new_state };
            if let Ok(guard) = tx.lock() {
                let _ = guard.send(ev);
            }
        });
        MasterStateStreamAsync { rx }
    }

    fn emergency_stream_async(&self) -> EmergencyStreamAsync {
        let (tx, rx) = mpsc::unbounded_channel::<EmergencyEvent>();
        let tx = Mutex::new(tx);
        self.events().on_emergency(move |master, slave, error_code, error_register, b1, w1, w2| {
            let ev = EmergencyEvent {
                master, slave, error_code, error_register, b1, w1, w2,
            };
            if let Ok(guard) = tx.lock() {
                let _ = guard.send(ev);
            }
        });
        EmergencyStreamAsync { rx }
    }
}