Skip to main content

piper_driver/
recording.rs

1//! 异步录制钩子(Async Recording Hook)
2//!
3//! 本模块提供基于 Channel 的异步录制钩子,用于高性能 CAN 帧录制。
4//!
5//! # 设计原则(v1.2.1)
6//!
7//! - **Bounded Queue**: 使用 `bounded(10000)` 防止 OOM
8//! - **非阻塞**: 使用 `try_send`,队列满时丢帧而非阻塞
9//! - **丢帧监控**: 提供 `dropped_frames` 计数器
10//! - **时间戳精度**: 直接使用 `frame.timestamp_us`(硬件时间戳)
11//!
12//! # 性能分析
13//!
14//! - 队列容量: 10,000 帧(约 10 秒 @ 1kHz)
15//! - 回调开销: <1μs (0.1%)
16//! - 内存占用: 每帧约 32 bytes → 队列总约 320 KB
17//!
18//! # 使用示例
19//!
20//! ```rust
21//! use piper_driver::recording::AsyncRecordingHook;
22//! use piper_driver::hooks::FrameCallback;
23//! use piper_protocol::PiperFrame;
24//! use std::sync::Arc;
25//!
26//! // 创建录制钩子
27//! let (hook, rx) = AsyncRecordingHook::new();
28//! let dropped_counter = hook.dropped_frames().clone();  // 📊 直接持有引用
29//!
30//! // 注册为回调
31//! let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
32//!
33//! // 在后台线程处理录制数据
34//! std::thread::spawn(move || {
35//!     while let Ok(frame) = rx.recv() {
36//!         // 处理帧...
37//!     }
38//! });
39//!
40//! // 监控丢帧
41//! println!("丢了 {} 帧", dropped_counter.load(std::sync::atomic::Ordering::Relaxed));
42//! ```
43
44use crate::hooks::FrameCallback;
45use crossbeam_channel::{Receiver, Sender, bounded};
46use piper_protocol::PiperFrame;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
49
50/// 带时间戳的帧
51///
52/// 保存 CAN 帧及其硬件时间戳,用于录制和回放。
53#[derive(Debug, Clone)]
54pub struct TimestampedFrame {
55    /// 硬件时间戳(微秒)
56    ///
57    /// ⏱️ **时间戳精度**: 必须直接使用 `frame.timestamp_us`(硬件时间戳)
58    /// 禁止在回调中调用 `SystemTime::now()`,因为回调执行时间已晚于帧到达时间。
59    pub timestamp_us: u64,
60
61    /// CAN ID
62    pub id: u32,
63
64    /// 帧数据(最多 8 bytes)
65    pub data: Vec<u8>,
66}
67
68impl From<&PiperFrame> for TimestampedFrame {
69    fn from(frame: &PiperFrame) -> Self {
70        Self {
71            // ⏱️ 直接透传硬件时间戳
72            timestamp_us: frame.timestamp_us,
73            id: frame.id,
74            data: frame.data.to_vec(),
75        }
76    }
77}
78
79/// 异步录制钩子(Actor 模式 + Bounded Queue)
80///
81/// # 内存安全(v1.2.1 关键修正)
82///
83/// 使用 **有界通道**(Bounded Channel)防止 OOM:
84/// - 容量: 100,000 帧(约 3.3 分钟 @ 500Hz)
85/// - 队列满时丢帧,而不是无限增长导致 OOM
86/// - 可通过 `dropped_frames` 和 `frame_counter` 计数器监控
87///
88/// # 设计理由
89///
90/// ❌ **v1.1 错误设计**: `unbounded()` 可能导致 OOM
91/// ✅ **v1.2.1 正确设计**: `bounded(10000)` 优雅降级
92/// ✅ **v1.3.0 最新设计**: `bounded(100000)` 更长录制时长(约 3.3 分钟)
93///
94/// # 示例
95///
96/// ```rust
97/// use piper_driver::recording::AsyncRecordingHook;
98/// use piper_driver::hooks::FrameCallback;
99/// use std::sync::Arc;
100///
101/// // 创建录制钩子
102/// let (hook, rx) = AsyncRecordingHook::new();
103///
104/// // 直接持有计数器的 Arc 引用
105/// let dropped_counter = hook.dropped_frames().clone();
106/// let frame_counter = hook.frame_counter().clone();
107///
108/// // 注册为回调
109/// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
110///
111/// // 监控丢帧和帧数
112/// let dropped = dropped_counter.load(std::sync::atomic::Ordering::Relaxed);
113/// let frames = frame_counter.load(std::sync::atomic::Ordering::Relaxed);
114/// println!("已录制 {} 帧,丢了 {} 帧", frames, dropped);
115/// ```
116pub struct AsyncRecordingHook {
117    /// 发送端(用于 Channel)
118    tx: Sender<TimestampedFrame>,
119
120    /// 丢帧计数器(用于监控)
121    dropped_frames: Arc<AtomicU64>,
122
123    /// 帧计数器(每次成功发送时递增)
124    frame_counter: Arc<AtomicU64>,
125
126    /// 停止条件:当收到此 CAN ID 时停止录制(None 表示不启用)
127    stop_on_id: Option<u32>,
128
129    /// 停止请求标志(原子操作,用于跨线程通信)
130    stop_requested: Arc<AtomicBool>,
131}
132
133impl AsyncRecordingHook {
134    /// 创建新的录制钩子
135    ///
136    /// # 队列容量
137    ///
138    /// - 容量: 100,000 帧(约 3.3 分钟 @ 500Hz)
139    /// - 500Hz CAN 总线: 约 3.3 分钟缓存
140    /// - 1kHz CAN 总线: 约 1.6 分钟缓存
141    /// - 内存占用: 约 2.4MB(100k × 24 bytes/frame)
142    ///
143    /// **设计理由**:
144    /// - 足够吸收短暂的磁盘 I/O 延迟,同时防止 OOM
145    /// - 支持中等时长的录制(3 分钟左右)
146    /// - 超过此时长会导致丢帧(Channel 满)
147    ///
148    /// # 返回
149    ///
150    /// - `(hook, rx)`: 钩子实例和接收端
151    ///
152    /// # 示例
153    ///
154    /// ```rust
155    /// use piper_driver::recording::AsyncRecordingHook;
156    ///
157    /// let (hook, rx) = AsyncRecordingHook::new();
158    /// ```
159    #[must_use]
160    pub fn new() -> (Self, Receiver<TimestampedFrame>) {
161        // ⚠️ 缓冲区大小:100,000 帧(约 3-4 分钟 @ 500Hz)
162        // 内存占用:约 2.4MB(100k × 24 bytes/frame)
163        // 风险提示:超过此时长会导致丢帧
164        let (tx, rx) = bounded(100_000);
165
166        let hook = Self {
167            tx,
168            dropped_frames: Arc::new(AtomicU64::new(0)),
169            frame_counter: Arc::new(AtomicU64::new(0)),
170            stop_on_id: None,
171            stop_requested: Arc::new(AtomicBool::new(false)),
172        };
173
174        (hook, rx)
175    }
176
177    /// 创建新的录制钩子(带停止条件)
178    ///
179    /// # 参数
180    ///
181    /// - `stop_on_id`: 当收到此 CAN ID 时停止录制(None 表示不启用)
182    ///
183    /// # 返回
184    ///
185    /// - `(hook, rx)`: 钩子实例和接收端
186    ///
187    /// # 示例
188    ///
189    /// ```rust
190    /// use piper_driver::recording::AsyncRecordingHook;
191    ///
192    /// // 当收到 0x2A4 时停止录制(末端位姿帧)
193    /// let (hook, rx) = AsyncRecordingHook::with_stop_condition(Some(0x2A4));
194    /// ```
195    #[must_use]
196    pub fn with_stop_condition(stop_on_id: Option<u32>) -> (Self, Receiver<TimestampedFrame>) {
197        let (tx, rx) = bounded(100_000);
198
199        let hook = Self {
200            tx,
201            dropped_frames: Arc::new(AtomicU64::new(0)),
202            frame_counter: Arc::new(AtomicU64::new(0)),
203            stop_on_id,
204            stop_requested: Arc::new(AtomicBool::new(false)),
205        };
206
207        (hook, rx)
208    }
209
210    /// 获取停止请求标志(新增:v1.4)
211    ///
212    /// 用于检查是否应该停止录制
213    pub fn is_stop_requested(&self) -> bool {
214        self.stop_requested.load(Ordering::Relaxed)
215    }
216
217    /// 获取停止请求标志的 Arc 引用(新增:v1.4)
218    ///
219    /// 用于跨线程共享停止标志
220    pub fn stop_requested(&self) -> &Arc<AtomicBool> {
221        &self.stop_requested
222    }
223
224    /// 获取发送端(用于自定义场景)
225    ///
226    /// # 注意
227    ///
228    /// 大多数情况下不需要直接使用此方法,只需将 `AsyncRecordingHook` 注册为 `FrameCallback` 即可。
229    #[must_use]
230    pub fn sender(&self) -> Sender<TimestampedFrame> {
231        self.tx.clone()
232    }
233
234    /// 获取丢帧计数器
235    ///
236    /// # 使用建议(v1.2.1)
237    ///
238    /// ✅ **推荐**: 在创建钩子时直接持有 `Arc` 引用
239    ///
240    /// ```rust
241    /// use piper_driver::recording::AsyncRecordingHook;
242    /// use std::sync::atomic::Ordering;
243    ///
244    /// let (hook, _rx) = AsyncRecordingHook::new();
245    /// let dropped_counter = hook.dropped_frames().clone();  // 在此持有
246    ///
247    /// // 直接读取,无需从 Context downcast
248    /// let count = dropped_counter.load(Ordering::Relaxed);
249    /// ```
250    ///
251    /// ❌ **不推荐**: 试图从 `Context` 中 `downcast`(需要 Trait 继承 `Any`)
252    ///
253    /// # 返回
254    ///
255    /// `Arc<AtomicU64>`: 丢帧计数器的引用
256    #[must_use]
257    pub fn dropped_frames(&self) -> &Arc<AtomicU64> {
258        &self.dropped_frames
259    }
260
261    /// 获取当前丢帧数量
262    ///
263    /// # 返回
264    ///
265    /// 当前丢失的帧数
266    #[must_use]
267    pub fn dropped_count(&self) -> u64 {
268        self.dropped_frames.load(Ordering::Relaxed)
269    }
270
271    /// 获取帧计数器(新增:v1.3.0)
272    ///
273    /// # 使用建议
274    ///
275    /// ✅ **推荐**: 在创建钩子时直接持有 `Arc` 引用
276    ///
277    /// ```rust
278    /// use piper_driver::recording::AsyncRecordingHook;
279    /// use std::sync::atomic::Ordering;
280    ///
281    /// let (hook, _rx) = AsyncRecordingHook::new();
282    /// let frame_counter = hook.frame_counter().clone();  // 在此持有
283    ///
284    /// // 直接读取,无需从 Context downcast
285    /// let count = frame_counter.load(Ordering::Relaxed);
286    /// ```
287    ///
288    /// # 返回
289    ///
290    /// `Arc<AtomicU64>`: 帧计数器的引用(不可变,只读)
291    #[must_use]
292    pub fn frame_counter(&self) -> &Arc<AtomicU64> {
293        &self.frame_counter
294    }
295
296    /// 获取当前已录制的帧数(新增:v1.3.0)
297    ///
298    /// # 返回
299    ///
300    /// 当前已成功录制的帧数
301    #[must_use]
302    pub fn frame_count(&self) -> u64 {
303        self.frame_counter.load(Ordering::Relaxed)
304    }
305}
306
307impl FrameCallback for AsyncRecordingHook {
308    /// 当接收到 CAN 帧时调用
309    ///
310    /// # 性能要求
311    ///
312    /// - <1μs 开销(非阻塞)
313    /// - 队列满时丢帧,而非阻塞或无限增长
314    ///
315    /// # 时间戳精度(v1.2.1)
316    ///
317    /// ⏱️ **必须使用硬件时间戳**:
318    ///
319    /// ```rust
320    /// use piper_driver::recording::TimestampedFrame;
321    /// use piper_protocol::PiperFrame;
322    ///
323    /// let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
324    /// let ts_frame = TimestampedFrame::from(&frame);
325    /// assert_eq!(ts_frame.timestamp_us, frame.timestamp_us);  // ✅ 硬件时间戳
326    /// ```
327    ///
328    /// ❌ **禁止软件生成时间戳**:
329    ///
330    /// // ❌ 错误:回调执行时间已晚于帧到达时间(仅说明概念)
331    /// // let ts = SystemTime::now().duration_since(UNIX_EPOCH)?.as_micros() as u64;
332    ///
333    #[inline]
334    #[allow(clippy::collapsible_if)] // 嵌套 if 结构更清晰:先检查 Option,再比较 ID
335    fn on_frame_received(&self, frame: &PiperFrame) {
336        // ⚠️ 关键:这里运行在 CAN 接收线程中,必须极快
337        // ✅ 性能优化:先记录所有帧(包括触发帧),再检查停止条件(v1.4 修正)
338
339        // 1. 先记录帧(无论是否为触发帧)
340        let ts_frame = TimestampedFrame::from(frame);
341        if self.tx.try_send(ts_frame).is_err() {
342            // ⚠️ 缓冲区满时,丢弃"新"帧,保留"旧"帧
343            self.dropped_frames.fetch_add(1, Ordering::Relaxed);
344        } else {
345            self.frame_counter.fetch_add(1, Ordering::Relaxed);
346        }
347
348        // 2. 再检查停止条件(原子操作,极快)
349        if let Some(stop_id) = self.stop_on_id {
350            if frame.id() == stop_id {
351                // ✅ 原子存储,不会阻塞
352                self.stop_requested.store(true, Ordering::SeqCst);
353                // ✅ 注意:不使用 return,因为已经记录了触发帧
354            }
355        }
356    }
357
358    /// 当发送 CAN 帧成功后调用(可选)
359    ///
360    /// # 时机
361    ///
362    /// 仅在 `tx.send()` 成功后调用,确保录制的是实际发送的帧。
363    #[inline]
364    fn on_frame_sent(&self, frame: &PiperFrame) {
365        // ⏱️ 直接透传硬件时间戳
366        let ts_frame = TimestampedFrame::from(frame);
367
368        // 🛡️ 非阻塞发送
369        if self.tx.try_send(ts_frame).is_err() {
370            self.dropped_frames.fetch_add(1, Ordering::Relaxed);
371        }
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use std::thread;
379    use std::time::Duration;
380
381    #[test]
382    fn test_async_recording_hook_basic() {
383        let (hook, rx) = AsyncRecordingHook::new();
384        let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
385
386        // 创建测试帧
387        let frame = PiperFrame {
388            id: 0x2A5,
389            data: [0, 1, 2, 3, 4, 5, 6, 7],
390            len: 8,
391            is_extended: false,
392            timestamp_us: 12345,
393        };
394
395        // 触发回调
396        callback.on_frame_received(&frame);
397
398        // 验证接收到帧
399        let received = rx.recv_timeout(Duration::from_millis(100)).unwrap();
400        assert_eq!(received.timestamp_us, 12345);
401        assert_eq!(received.id, 0x2A5);
402        assert_eq!(received.data, vec![0, 1, 2, 3, 4, 5, 6, 7]);
403    }
404
405    #[test]
406    fn test_async_recording_hook_dropped_frames() {
407        let (hook, rx) = AsyncRecordingHook::new();
408        let dropped_counter = hook.dropped_frames().clone();
409        let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
410
411        // 创建测试帧
412        let frame = PiperFrame {
413            id: 0x2A5,
414            data: [0, 1, 2, 3, 4, 5, 6, 7],
415            len: 8,
416            is_extended: false,
417            timestamp_us: 12345,
418        };
419
420        // 正常情况:无丢帧
421        callback.on_frame_received(&frame);
422        assert_eq!(dropped_counter.load(Ordering::Relaxed), 0);
423
424        // 清空接收端,模拟队列满的情况
425        drop(rx);
426
427        // 现在发送会失败(队列已关闭)
428        for _ in 0..10 {
429            callback.on_frame_received(&frame);
430        }
431
432        // 应该记录了 10 个丢帧
433        assert_eq!(dropped_counter.load(Ordering::Relaxed), 10);
434    }
435
436    #[test]
437    fn test_async_recording_hook_tx_callback() {
438        let (hook, rx) = AsyncRecordingHook::new();
439        let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
440
441        // 创建测试帧
442        let frame = PiperFrame {
443            id: 0x1A1,
444            data: [1, 2, 3, 4, 5, 6, 7, 8],
445            len: 8,
446            is_extended: false,
447            timestamp_us: 54321,
448        };
449
450        // 触发 TX 回调
451        callback.on_frame_sent(&frame);
452
453        // 验证接收到帧
454        let received = rx.recv_timeout(Duration::from_millis(100)).unwrap();
455        assert_eq!(received.timestamp_us, 54321);
456        assert_eq!(received.id, 0x1A1);
457    }
458
459    #[test]
460    fn test_timestamped_frame_from_piper_frame() {
461        let frame = PiperFrame {
462            id: 0x2A5,
463            data: [0, 1, 2, 3, 4, 5, 6, 7],
464            len: 8,
465            is_extended: false,
466            timestamp_us: 99999,
467        };
468
469        let ts_frame = TimestampedFrame::from(&frame);
470
471        assert_eq!(ts_frame.timestamp_us, 99999);
472        assert_eq!(ts_frame.id, 0x2A5);
473        assert_eq!(ts_frame.data, vec![0, 1, 2, 3, 4, 5, 6, 7]);
474    }
475
476    #[test]
477    fn test_async_recording_hook_concurrent() {
478        let (hook, rx) = AsyncRecordingHook::new();
479        let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
480
481        // 创建多个线程并发触发回调
482        let handles: Vec<_> = (0..10)
483            .map(|i| {
484                let cb = callback.clone();
485                thread::spawn(move || {
486                    let frame = PiperFrame {
487                        id: 0x2A5,
488                        data: [i as u8; 8],
489                        len: 8,
490                        is_extended: false,
491                        timestamp_us: i as u64,
492                    };
493                    cb.on_frame_received(&frame);
494                })
495            })
496            .collect();
497
498        // 等待所有线程完成
499        for handle in handles {
500            handle.join().unwrap();
501        }
502
503        // 验证接收到所有帧(顺序可能不同)
504        let mut count = 0;
505        while rx.try_recv().is_ok() {
506            count += 1;
507        }
508        assert_eq!(count, 10);
509    }
510}