Skip to main content

piper_driver/
hooks.rs

1//! 钩子系统(Hook System)
2//!
3//! 本模块提供运行时钩子(Hook)管理功能,用于在 CAN 帧接收/发送时触发自定义回调。
4//!
5//! # 设计原则(v1.2.1)
6//!
7//! - **非阻塞**: 所有回调必须在 <1μs 内完成,使用 Channel 异步处理
8//! - **职责分离**: HookManager 管理运行时回调,PipelineConfig 保持为 POD 数据
9//! - **类型安全**: 使用 `dyn FrameCallback` trait object,支持多种回调类型
10//!
11//! # 使用示例
12//!
13//! ```rust
14//! use piper_driver::hooks::{HookManager, FrameCallback};
15//! use piper_driver::recording::AsyncRecordingHook;
16//! use piper_protocol::PiperFrame;
17//! use std::sync::Arc;
18//!
19//! // 创建钩子管理器
20//! let mut hooks = HookManager::new();
21//!
22//! // 添加录制回调
23//! let (hook, _rx) = AsyncRecordingHook::new();
24//! let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
25//! hooks.add_callback(callback);
26//!
27//! // 触发所有回调(在 rx_loop 中)
28//! let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
29//! hooks.trigger_all(&frame);
30//! ```
31
32use piper_protocol::PiperFrame;
33use std::sync::Arc;
34
35/// 帧回调 Trait
36///
37/// 定义 CAN 帧回调接口,用于在接收到帧时执行自定义逻辑。
38///
39/// # 性能要求
40///
41/// - **非阻塞**: 实现必须在 <1μs 内完成
42/// - **无锁**: 禁止使用 Mutex、I/O、分配等阻塞操作
43/// - **Channel 模式**: 推荐使用 `crossbeam::channel::Sender` 异步处理
44///
45/// # 示例
46///
47/// ```rust
48/// use piper_driver::hooks::FrameCallback;
49/// use piper_protocol::PiperFrame;
50/// use crossbeam_channel::{Sender, bounded};
51///
52/// struct MyCallback {
53///     sender: Sender<PiperFrame>,
54/// }
55///
56/// impl FrameCallback for MyCallback {
57///     fn on_frame_received(&self, frame: &PiperFrame) {
58///         // ✅ 使用 try_send,非阻塞
59///         let _ = self.sender.try_send(*frame);
60///     }
61/// }
62/// ```
63pub trait FrameCallback: Send + Sync {
64    /// 当接收到 CAN 帧时调用
65    ///
66    /// # 性能要求
67    ///
68    /// - 必须在 <1μs 内完成
69    /// - 禁止阻塞操作(Mutex、I/O、分配)
70    /// - 推荐使用 `try_send` 而非 `send`
71    ///
72    /// # 参数
73    ///
74    /// - `frame`: 接收到的 CAN 帧
75    fn on_frame_received(&self, frame: &PiperFrame);
76
77    /// 当发送 CAN 帧成功后调用(可选)
78    ///
79    /// # 时机
80    ///
81    /// 仅在 `tx.send()` 成功后触发,确保记录的是实际到达总线的帧。
82    /// 避免记录"幽灵帧"(发送失败的帧)。
83    ///
84    /// # 默认实现
85    ///
86    /// 默认为空操作,仅需在需要录制 TX 帧时实现。
87    fn on_frame_sent(&self, frame: &PiperFrame) {
88        let _ = frame;
89        // 默认:不处理 TX 帧
90    }
91}
92
93/// 钩子管理器
94///
95/// 专门管理运行时回调列表。
96///
97/// # 设计理由(v1.2.1)
98///
99/// - **Config vs Context 分离**:
100///   - `PipelineConfig` 应该是 POD(Plain Old Data),用于序列化
101///   - `PiperContext` 管理运行时状态和动态组件(如回调)
102///
103/// # 线程安全
104///
105/// 使用 `std::sync::Arc` 确保回调可以跨线程共享。
106/// 回调列表本身不是线程安全的,需要外部同步(通常通过 `RwLock<HookManager>`)。
107///
108/// # 示例
109///
110/// ```rust
111/// use piper_driver::hooks::{HookManager, FrameCallback};
112/// use piper_driver::recording::AsyncRecordingHook;
113/// use piper_protocol::PiperFrame;
114/// use std::sync::{Arc, RwLock};
115///
116/// // 在 PiperContext 中
117/// pub struct PiperContext {
118///     pub hooks: RwLock<HookManager>,
119/// }
120///
121/// // 创建上下文并添加回调
122/// let context = PiperContext { hooks: RwLock::new(HookManager::new()) };
123/// let (hook, _rx) = AsyncRecordingHook::new();
124/// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
125/// if let Ok(mut hooks) = context.hooks.write() {
126///     hooks.add_callback(callback);
127/// }
128///
129/// // 触发回调(在 rx_loop 中)
130/// let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
131/// if let Ok(hooks) = context.hooks.read() {
132///     hooks.trigger_all(&frame);
133/// }
134/// ```
135#[derive(Default)]
136pub struct HookManager {
137    /// 回调列表
138    callbacks: Vec<Arc<dyn FrameCallback>>,
139}
140
141impl HookManager {
142    /// 创建新的钩子管理器
143    #[must_use]
144    pub const fn new() -> Self {
145        Self {
146            callbacks: Vec::new(),
147        }
148    }
149
150    /// 添加回调
151    ///
152    /// # 线程安全
153    ///
154    /// 此方法不是线程安全的,需要外部同步(通常通过 `RwLock`)。
155    ///
156    /// # 参数
157    ///
158    /// - `callback`: 要添加的回调(必须实现 `FrameCallback`)
159    ///
160    /// # 示例
161    ///
162    /// ```rust
163    /// use piper_driver::hooks::{HookManager, FrameCallback};
164    /// use piper_driver::recording::AsyncRecordingHook;
165    /// use std::sync::Arc;
166    ///
167    /// let mut hooks = HookManager::new();
168    /// let (hook, _rx) = AsyncRecordingHook::new();
169    /// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
170    /// hooks.add_callback(callback);
171    /// ```
172    pub fn add_callback(&mut self, callback: Arc<dyn FrameCallback>) {
173        self.callbacks.push(callback);
174    }
175
176    /// 移除所有回调
177    ///
178    /// # 用途
179    ///
180    /// 主要用于测试或清理场景。
181    pub fn clear(&mut self) {
182        self.callbacks.clear();
183    }
184
185    /// 触发所有回调(在 rx_loop 中调用)
186    ///
187    /// # 性能要求
188    ///
189    /// - 总耗时 <1μs(假设每个回调 <100ns)
190    /// - 非阻塞:所有回调必须使用 `try_send` 而非 `send`
191    ///
192    /// # 参数
193    ///
194    /// - `frame`: 接收到的 CAN 帧
195    ///
196    /// # 示例
197    ///
198    /// ```rust
199    /// use piper_driver::hooks::HookManager;
200    /// use piper_driver::recording::AsyncRecordingHook;
201    /// use piper_protocol::PiperFrame;
202    /// use std::sync::Arc;
203    ///
204    /// let mut hooks = HookManager::new();
205    /// let (hook, _rx) = AsyncRecordingHook::new();
206    /// hooks.add_callback(Arc::new(hook));
207    ///
208    /// // 在 rx_loop 中触发
209    /// let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
210    /// hooks.trigger_all(&frame);
211    /// ```
212    pub fn trigger_all(&self, frame: &PiperFrame) {
213        for callback in self.callbacks.iter() {
214            callback.on_frame_received(frame);
215            // ^^^^ 使用 try_send,<1μs,非阻塞
216        }
217    }
218
219    /// 触发所有 TX 回调(在 tx_loop 发送成功后调用)
220    ///
221    /// # 时机
222    ///
223    /// 仅在 `tx.send()` 成功后调用,确保录制的是实际发送的帧。
224    ///
225    /// # 参数
226    ///
227    /// - `frame`: 成功发送的 CAN 帧
228    ///
229    /// # 示例
230    ///
231    /// ```rust
232    /// use piper_driver::hooks::HookManager;
233    /// use piper_protocol::PiperFrame;
234    ///
235    /// let hooks = HookManager::new();
236    ///
237    /// // 在 tx_loop 中,发送成功后触发回调
238    /// let frame = PiperFrame::new_standard(0x123, &[1, 2, 3, 4]);
239    /// // 假设 tx.send(&frame) 返回 Ok(())
240    /// hooks.trigger_all_sent(&frame);
241    /// ```
242    pub fn trigger_all_sent(&self, frame: &PiperFrame) {
243        for callback in self.callbacks.iter() {
244            callback.on_frame_sent(frame);
245        }
246    }
247
248    /// 获取回调数量
249    ///
250    /// # 用途
251    ///
252    /// 主要用于调试和监控。
253    #[must_use]
254    pub fn len(&self) -> usize {
255        self.callbacks.len()
256    }
257
258    /// 检查是否为空
259    #[must_use]
260    pub fn is_empty(&self) -> bool {
261        self.callbacks.is_empty()
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use crossbeam_channel::{Sender, bounded};
269    use std::sync::atomic::{AtomicU64, Ordering};
270
271    #[derive(Debug)]
272    struct TestCallback {
273        tx: Sender<PiperFrame>,
274        count: Arc<AtomicU64>,
275    }
276
277    impl FrameCallback for TestCallback {
278        fn on_frame_received(&self, frame: &PiperFrame) {
279            let _ = self.tx.try_send(*frame);
280            self.count.fetch_add(1, Ordering::Relaxed);
281        }
282
283        fn on_frame_sent(&self, frame: &PiperFrame) {
284            let _ = self.tx.try_send(*frame);
285            self.count.fetch_add(1, Ordering::Relaxed);
286        }
287    }
288
289    #[test]
290    fn test_hook_manager_add_callback() {
291        let mut hooks = HookManager::new();
292        assert!(hooks.is_empty());
293
294        let (tx, _rx) = bounded(10);
295        let count = Arc::new(AtomicU64::new(0));
296        let callback = Arc::new(TestCallback { tx, count });
297
298        hooks.add_callback(callback);
299        assert_eq!(hooks.len(), 1);
300    }
301
302    #[test]
303    fn test_hook_manager_trigger_all() {
304        let mut hooks = HookManager::new();
305
306        let (tx, rx) = bounded::<PiperFrame>(10);
307        let count = Arc::new(AtomicU64::new(0));
308        let callback = Arc::new(TestCallback {
309            tx,
310            count: count.clone(),
311        });
312
313        hooks.add_callback(callback);
314
315        // 创建测试帧
316        let frame = PiperFrame {
317            id: 0x2A5,
318            data: [0, 1, 2, 3, 4, 5, 6, 7],
319            len: 8,
320            is_extended: false,
321            timestamp_us: 12345,
322        };
323
324        // 触发回调
325        hooks.trigger_all(&frame);
326
327        // 验证
328        assert_eq!(count.load(Ordering::Relaxed), 1);
329        assert!(rx.try_recv().is_ok());
330    }
331
332    #[test]
333    fn test_hook_manager_trigger_sent() {
334        let mut hooks = HookManager::new();
335
336        let (tx, rx) = bounded::<PiperFrame>(10);
337        let count = Arc::new(AtomicU64::new(0));
338        let callback = Arc::new(TestCallback {
339            tx,
340            count: count.clone(),
341        });
342
343        hooks.add_callback(callback);
344
345        // 创建测试帧
346        let frame = PiperFrame {
347            id: 0x1A1,
348            data: [0, 1, 2, 3, 4, 5, 6, 7],
349            len: 8,
350            is_extended: false,
351            timestamp_us: 12345,
352        };
353
354        // 触发 TX 回调
355        hooks.trigger_all_sent(&frame);
356
357        // 验证
358        assert_eq!(count.load(Ordering::Relaxed), 1);
359        assert!(rx.try_recv().is_ok());
360    }
361
362    #[test]
363    fn test_hook_manager_clear() {
364        let mut hooks = HookManager::new();
365
366        let (tx, _rx) = bounded(10);
367        let count = Arc::new(AtomicU64::new(0));
368        let callback = Arc::new(TestCallback { tx, count });
369
370        hooks.add_callback(callback);
371        assert_eq!(hooks.len(), 1);
372
373        hooks.clear();
374        assert!(hooks.is_empty());
375    }
376}