piper_driver/hooks.rs
1//! 钩子系统(Hook System)
2//!
3//! 本模块提供运行时钩子(Hook)管理功能,用于在 CAN 帧接收/发送时触发自定义回调。
4//!
5//! # 设计原则(v1.2.1)
6//!
7//! - **非阻塞**: 所有回调必须在 <1μs 内完成,使用 Channel 异步处理
8//! - **职责分离**: HookManager 管理运行时回调,PipelineConfig 保持为 POD 数据
9//! - **类型安全**: 使用 `dyn FrameCallback` trait object,支持多种回调类型
10//!
11//! # 使用示例
12//!
13//! ```rust
14//! use piper_driver::hooks::{HookManager, FrameCallback};
15//! use piper_driver::recording::AsyncRecordingHook;
16//! use piper_protocol::PiperFrame;
17//! use std::sync::Arc;
18//!
19//! // 创建钩子管理器
20//! let mut hooks = HookManager::new();
21//!
22//! // 添加录制回调
23//! let (hook, _rx) = AsyncRecordingHook::new();
24//! let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
25//! hooks.add_callback(callback);
26//!
27//! // 触发所有回调(在 rx_loop 中)
28//! let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
29//! hooks.trigger_all(&frame);
30//! ```
31
32use piper_protocol::PiperFrame;
33use std::sync::Arc;
34
35/// 帧回调 Trait
36///
37/// 定义 CAN 帧回调接口,用于在接收到帧时执行自定义逻辑。
38///
39/// # 性能要求
40///
41/// - **非阻塞**: 实现必须在 <1μs 内完成
42/// - **无锁**: 禁止使用 Mutex、I/O、分配等阻塞操作
43/// - **Channel 模式**: 推荐使用 `crossbeam::channel::Sender` 异步处理
44///
45/// # 示例
46///
47/// ```rust
48/// use piper_driver::hooks::FrameCallback;
49/// use piper_protocol::PiperFrame;
50/// use crossbeam_channel::{Sender, bounded};
51///
52/// struct MyCallback {
53/// sender: Sender<PiperFrame>,
54/// }
55///
56/// impl FrameCallback for MyCallback {
57/// fn on_frame_received(&self, frame: &PiperFrame) {
58/// // ✅ 使用 try_send,非阻塞
59/// let _ = self.sender.try_send(*frame);
60/// }
61/// }
62/// ```
63pub trait FrameCallback: Send + Sync {
64 /// 当接收到 CAN 帧时调用
65 ///
66 /// # 性能要求
67 ///
68 /// - 必须在 <1μs 内完成
69 /// - 禁止阻塞操作(Mutex、I/O、分配)
70 /// - 推荐使用 `try_send` 而非 `send`
71 ///
72 /// # 参数
73 ///
74 /// - `frame`: 接收到的 CAN 帧
75 fn on_frame_received(&self, frame: &PiperFrame);
76
77 /// 当发送 CAN 帧成功后调用(可选)
78 ///
79 /// # 时机
80 ///
81 /// 仅在 `tx.send()` 成功后触发,确保记录的是实际到达总线的帧。
82 /// 避免记录"幽灵帧"(发送失败的帧)。
83 ///
84 /// # 默认实现
85 ///
86 /// 默认为空操作,仅需在需要录制 TX 帧时实现。
87 fn on_frame_sent(&self, frame: &PiperFrame) {
88 let _ = frame;
89 // 默认:不处理 TX 帧
90 }
91}
92
93/// 钩子管理器
94///
95/// 专门管理运行时回调列表。
96///
97/// # 设计理由(v1.2.1)
98///
99/// - **Config vs Context 分离**:
100/// - `PipelineConfig` 应该是 POD(Plain Old Data),用于序列化
101/// - `PiperContext` 管理运行时状态和动态组件(如回调)
102///
103/// # 线程安全
104///
105/// 使用 `std::sync::Arc` 确保回调可以跨线程共享。
106/// 回调列表本身不是线程安全的,需要外部同步(通常通过 `RwLock<HookManager>`)。
107///
108/// # 示例
109///
110/// ```rust
111/// use piper_driver::hooks::{HookManager, FrameCallback};
112/// use piper_driver::recording::AsyncRecordingHook;
113/// use piper_protocol::PiperFrame;
114/// use std::sync::{Arc, RwLock};
115///
116/// // 在 PiperContext 中
117/// pub struct PiperContext {
118/// pub hooks: RwLock<HookManager>,
119/// }
120///
121/// // 创建上下文并添加回调
122/// let context = PiperContext { hooks: RwLock::new(HookManager::new()) };
123/// let (hook, _rx) = AsyncRecordingHook::new();
124/// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
125/// if let Ok(mut hooks) = context.hooks.write() {
126/// hooks.add_callback(callback);
127/// }
128///
129/// // 触发回调(在 rx_loop 中)
130/// let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
131/// if let Ok(hooks) = context.hooks.read() {
132/// hooks.trigger_all(&frame);
133/// }
134/// ```
135#[derive(Default)]
136pub struct HookManager {
137 /// 回调列表
138 callbacks: Vec<Arc<dyn FrameCallback>>,
139}
140
141impl HookManager {
142 /// 创建新的钩子管理器
143 #[must_use]
144 pub const fn new() -> Self {
145 Self {
146 callbacks: Vec::new(),
147 }
148 }
149
150 /// 添加回调
151 ///
152 /// # 线程安全
153 ///
154 /// 此方法不是线程安全的,需要外部同步(通常通过 `RwLock`)。
155 ///
156 /// # 参数
157 ///
158 /// - `callback`: 要添加的回调(必须实现 `FrameCallback`)
159 ///
160 /// # 示例
161 ///
162 /// ```rust
163 /// use piper_driver::hooks::{HookManager, FrameCallback};
164 /// use piper_driver::recording::AsyncRecordingHook;
165 /// use std::sync::Arc;
166 ///
167 /// let mut hooks = HookManager::new();
168 /// let (hook, _rx) = AsyncRecordingHook::new();
169 /// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
170 /// hooks.add_callback(callback);
171 /// ```
172 pub fn add_callback(&mut self, callback: Arc<dyn FrameCallback>) {
173 self.callbacks.push(callback);
174 }
175
176 /// 移除所有回调
177 ///
178 /// # 用途
179 ///
180 /// 主要用于测试或清理场景。
181 pub fn clear(&mut self) {
182 self.callbacks.clear();
183 }
184
185 /// 触发所有回调(在 rx_loop 中调用)
186 ///
187 /// # 性能要求
188 ///
189 /// - 总耗时 <1μs(假设每个回调 <100ns)
190 /// - 非阻塞:所有回调必须使用 `try_send` 而非 `send`
191 ///
192 /// # 参数
193 ///
194 /// - `frame`: 接收到的 CAN 帧
195 ///
196 /// # 示例
197 ///
198 /// ```rust
199 /// use piper_driver::hooks::HookManager;
200 /// use piper_driver::recording::AsyncRecordingHook;
201 /// use piper_protocol::PiperFrame;
202 /// use std::sync::Arc;
203 ///
204 /// let mut hooks = HookManager::new();
205 /// let (hook, _rx) = AsyncRecordingHook::new();
206 /// hooks.add_callback(Arc::new(hook));
207 ///
208 /// // 在 rx_loop 中触发
209 /// let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
210 /// hooks.trigger_all(&frame);
211 /// ```
212 pub fn trigger_all(&self, frame: &PiperFrame) {
213 for callback in self.callbacks.iter() {
214 callback.on_frame_received(frame);
215 // ^^^^ 使用 try_send,<1μs,非阻塞
216 }
217 }
218
219 /// 触发所有 TX 回调(在 tx_loop 发送成功后调用)
220 ///
221 /// # 时机
222 ///
223 /// 仅在 `tx.send()` 成功后调用,确保录制的是实际发送的帧。
224 ///
225 /// # 参数
226 ///
227 /// - `frame`: 成功发送的 CAN 帧
228 ///
229 /// # 示例
230 ///
231 /// ```rust
232 /// use piper_driver::hooks::HookManager;
233 /// use piper_protocol::PiperFrame;
234 ///
235 /// let hooks = HookManager::new();
236 ///
237 /// // 在 tx_loop 中,发送成功后触发回调
238 /// let frame = PiperFrame::new_standard(0x123, &[1, 2, 3, 4]);
239 /// // 假设 tx.send(&frame) 返回 Ok(())
240 /// hooks.trigger_all_sent(&frame);
241 /// ```
242 pub fn trigger_all_sent(&self, frame: &PiperFrame) {
243 for callback in self.callbacks.iter() {
244 callback.on_frame_sent(frame);
245 }
246 }
247
248 /// 获取回调数量
249 ///
250 /// # 用途
251 ///
252 /// 主要用于调试和监控。
253 #[must_use]
254 pub fn len(&self) -> usize {
255 self.callbacks.len()
256 }
257
258 /// 检查是否为空
259 #[must_use]
260 pub fn is_empty(&self) -> bool {
261 self.callbacks.is_empty()
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use crossbeam_channel::{Sender, bounded};
269 use std::sync::atomic::{AtomicU64, Ordering};
270
271 #[derive(Debug)]
272 struct TestCallback {
273 tx: Sender<PiperFrame>,
274 count: Arc<AtomicU64>,
275 }
276
277 impl FrameCallback for TestCallback {
278 fn on_frame_received(&self, frame: &PiperFrame) {
279 let _ = self.tx.try_send(*frame);
280 self.count.fetch_add(1, Ordering::Relaxed);
281 }
282
283 fn on_frame_sent(&self, frame: &PiperFrame) {
284 let _ = self.tx.try_send(*frame);
285 self.count.fetch_add(1, Ordering::Relaxed);
286 }
287 }
288
289 #[test]
290 fn test_hook_manager_add_callback() {
291 let mut hooks = HookManager::new();
292 assert!(hooks.is_empty());
293
294 let (tx, _rx) = bounded(10);
295 let count = Arc::new(AtomicU64::new(0));
296 let callback = Arc::new(TestCallback { tx, count });
297
298 hooks.add_callback(callback);
299 assert_eq!(hooks.len(), 1);
300 }
301
302 #[test]
303 fn test_hook_manager_trigger_all() {
304 let mut hooks = HookManager::new();
305
306 let (tx, rx) = bounded::<PiperFrame>(10);
307 let count = Arc::new(AtomicU64::new(0));
308 let callback = Arc::new(TestCallback {
309 tx,
310 count: count.clone(),
311 });
312
313 hooks.add_callback(callback);
314
315 // 创建测试帧
316 let frame = PiperFrame {
317 id: 0x2A5,
318 data: [0, 1, 2, 3, 4, 5, 6, 7],
319 len: 8,
320 is_extended: false,
321 timestamp_us: 12345,
322 };
323
324 // 触发回调
325 hooks.trigger_all(&frame);
326
327 // 验证
328 assert_eq!(count.load(Ordering::Relaxed), 1);
329 assert!(rx.try_recv().is_ok());
330 }
331
332 #[test]
333 fn test_hook_manager_trigger_sent() {
334 let mut hooks = HookManager::new();
335
336 let (tx, rx) = bounded::<PiperFrame>(10);
337 let count = Arc::new(AtomicU64::new(0));
338 let callback = Arc::new(TestCallback {
339 tx,
340 count: count.clone(),
341 });
342
343 hooks.add_callback(callback);
344
345 // 创建测试帧
346 let frame = PiperFrame {
347 id: 0x1A1,
348 data: [0, 1, 2, 3, 4, 5, 6, 7],
349 len: 8,
350 is_extended: false,
351 timestamp_us: 12345,
352 };
353
354 // 触发 TX 回调
355 hooks.trigger_all_sent(&frame);
356
357 // 验证
358 assert_eq!(count.load(Ordering::Relaxed), 1);
359 assert!(rx.try_recv().is_ok());
360 }
361
362 #[test]
363 fn test_hook_manager_clear() {
364 let mut hooks = HookManager::new();
365
366 let (tx, _rx) = bounded(10);
367 let count = Arc::new(AtomicU64::new(0));
368 let callback = Arc::new(TestCallback { tx, count });
369
370 hooks.add_callback(callback);
371 assert_eq!(hooks.len(), 1);
372
373 hooks.clear();
374 assert!(hooks.is_empty());
375 }
376}