piper_driver/recording.rs
1//! 异步录制钩子(Async Recording Hook)
2//!
3//! 本模块提供基于 Channel 的异步录制钩子,用于高性能 CAN 帧录制。
4//!
5//! # 设计原则(v1.2.1)
6//!
7//! - **Bounded Queue**: 使用 `bounded(10000)` 防止 OOM
8//! - **非阻塞**: 使用 `try_send`,队列满时丢帧而非阻塞
9//! - **丢帧监控**: 提供 `dropped_frames` 计数器
10//! - **时间戳精度**: 直接使用 `frame.timestamp_us`(硬件时间戳)
11//!
12//! # 性能分析
13//!
14//! - 队列容量: 10,000 帧(约 10 秒 @ 1kHz)
15//! - 回调开销: <1μs (0.1%)
16//! - 内存占用: 每帧约 32 bytes → 队列总约 320 KB
17//!
18//! # 使用示例
19//!
20//! ```rust
21//! use piper_driver::recording::AsyncRecordingHook;
22//! use piper_driver::hooks::FrameCallback;
23//! use piper_protocol::PiperFrame;
24//! use std::sync::Arc;
25//!
26//! // 创建录制钩子
27//! let (hook, rx) = AsyncRecordingHook::new();
28//! let dropped_counter = hook.dropped_frames().clone(); // 📊 直接持有引用
29//!
30//! // 注册为回调
31//! let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
32//!
33//! // 在后台线程处理录制数据
34//! std::thread::spawn(move || {
35//! while let Ok(frame) = rx.recv() {
36//! // 处理帧...
37//! }
38//! });
39//!
40//! // 监控丢帧
41//! println!("丢了 {} 帧", dropped_counter.load(std::sync::atomic::Ordering::Relaxed));
42//! ```
43
44use crate::hooks::FrameCallback;
45use crossbeam_channel::{Receiver, Sender, bounded};
46use piper_protocol::PiperFrame;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
49
50/// 带时间戳的帧
51///
52/// 保存 CAN 帧及其硬件时间戳,用于录制和回放。
53#[derive(Debug, Clone)]
54pub struct TimestampedFrame {
55 /// 硬件时间戳(微秒)
56 ///
57 /// ⏱️ **时间戳精度**: 必须直接使用 `frame.timestamp_us`(硬件时间戳)
58 /// 禁止在回调中调用 `SystemTime::now()`,因为回调执行时间已晚于帧到达时间。
59 pub timestamp_us: u64,
60
61 /// CAN ID
62 pub id: u32,
63
64 /// 帧数据(最多 8 bytes)
65 pub data: Vec<u8>,
66}
67
68impl From<&PiperFrame> for TimestampedFrame {
69 fn from(frame: &PiperFrame) -> Self {
70 Self {
71 // ⏱️ 直接透传硬件时间戳
72 timestamp_us: frame.timestamp_us,
73 id: frame.id,
74 data: frame.data.to_vec(),
75 }
76 }
77}
78
79/// 异步录制钩子(Actor 模式 + Bounded Queue)
80///
81/// # 内存安全(v1.2.1 关键修正)
82///
83/// 使用 **有界通道**(Bounded Channel)防止 OOM:
84/// - 容量: 100,000 帧(约 3.3 分钟 @ 500Hz)
85/// - 队列满时丢帧,而不是无限增长导致 OOM
86/// - 可通过 `dropped_frames` 和 `frame_counter` 计数器监控
87///
88/// # 设计理由
89///
90/// ❌ **v1.1 错误设计**: `unbounded()` 可能导致 OOM
91/// ✅ **v1.2.1 正确设计**: `bounded(10000)` 优雅降级
92/// ✅ **v1.3.0 最新设计**: `bounded(100000)` 更长录制时长(约 3.3 分钟)
93///
94/// # 示例
95///
96/// ```rust
97/// use piper_driver::recording::AsyncRecordingHook;
98/// use piper_driver::hooks::FrameCallback;
99/// use std::sync::Arc;
100///
101/// // 创建录制钩子
102/// let (hook, rx) = AsyncRecordingHook::new();
103///
104/// // 直接持有计数器的 Arc 引用
105/// let dropped_counter = hook.dropped_frames().clone();
106/// let frame_counter = hook.frame_counter().clone();
107///
108/// // 注册为回调
109/// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
110///
111/// // 监控丢帧和帧数
112/// let dropped = dropped_counter.load(std::sync::atomic::Ordering::Relaxed);
113/// let frames = frame_counter.load(std::sync::atomic::Ordering::Relaxed);
114/// println!("已录制 {} 帧,丢了 {} 帧", frames, dropped);
115/// ```
116pub struct AsyncRecordingHook {
117 /// 发送端(用于 Channel)
118 tx: Sender<TimestampedFrame>,
119
120 /// 丢帧计数器(用于监控)
121 dropped_frames: Arc<AtomicU64>,
122
123 /// 帧计数器(每次成功发送时递增)
124 frame_counter: Arc<AtomicU64>,
125
126 /// 停止条件:当收到此 CAN ID 时停止录制(None 表示不启用)
127 stop_on_id: Option<u32>,
128
129 /// 停止请求标志(原子操作,用于跨线程通信)
130 stop_requested: Arc<AtomicBool>,
131}
132
133impl AsyncRecordingHook {
134 /// 创建新的录制钩子
135 ///
136 /// # 队列容量
137 ///
138 /// - 容量: 100,000 帧(约 3.3 分钟 @ 500Hz)
139 /// - 500Hz CAN 总线: 约 3.3 分钟缓存
140 /// - 1kHz CAN 总线: 约 1.6 分钟缓存
141 /// - 内存占用: 约 2.4MB(100k × 24 bytes/frame)
142 ///
143 /// **设计理由**:
144 /// - 足够吸收短暂的磁盘 I/O 延迟,同时防止 OOM
145 /// - 支持中等时长的录制(3 分钟左右)
146 /// - 超过此时长会导致丢帧(Channel 满)
147 ///
148 /// # 返回
149 ///
150 /// - `(hook, rx)`: 钩子实例和接收端
151 ///
152 /// # 示例
153 ///
154 /// ```rust
155 /// use piper_driver::recording::AsyncRecordingHook;
156 ///
157 /// let (hook, rx) = AsyncRecordingHook::new();
158 /// ```
159 #[must_use]
160 pub fn new() -> (Self, Receiver<TimestampedFrame>) {
161 // ⚠️ 缓冲区大小:100,000 帧(约 3-4 分钟 @ 500Hz)
162 // 内存占用:约 2.4MB(100k × 24 bytes/frame)
163 // 风险提示:超过此时长会导致丢帧
164 let (tx, rx) = bounded(100_000);
165
166 let hook = Self {
167 tx,
168 dropped_frames: Arc::new(AtomicU64::new(0)),
169 frame_counter: Arc::new(AtomicU64::new(0)),
170 stop_on_id: None,
171 stop_requested: Arc::new(AtomicBool::new(false)),
172 };
173
174 (hook, rx)
175 }
176
177 /// 创建新的录制钩子(带停止条件)
178 ///
179 /// # 参数
180 ///
181 /// - `stop_on_id`: 当收到此 CAN ID 时停止录制(None 表示不启用)
182 ///
183 /// # 返回
184 ///
185 /// - `(hook, rx)`: 钩子实例和接收端
186 ///
187 /// # 示例
188 ///
189 /// ```rust
190 /// use piper_driver::recording::AsyncRecordingHook;
191 ///
192 /// // 当收到 0x2A4 时停止录制(末端位姿帧)
193 /// let (hook, rx) = AsyncRecordingHook::with_stop_condition(Some(0x2A4));
194 /// ```
195 #[must_use]
196 pub fn with_stop_condition(stop_on_id: Option<u32>) -> (Self, Receiver<TimestampedFrame>) {
197 let (tx, rx) = bounded(100_000);
198
199 let hook = Self {
200 tx,
201 dropped_frames: Arc::new(AtomicU64::new(0)),
202 frame_counter: Arc::new(AtomicU64::new(0)),
203 stop_on_id,
204 stop_requested: Arc::new(AtomicBool::new(false)),
205 };
206
207 (hook, rx)
208 }
209
210 /// 获取停止请求标志(新增:v1.4)
211 ///
212 /// 用于检查是否应该停止录制
213 pub fn is_stop_requested(&self) -> bool {
214 self.stop_requested.load(Ordering::Relaxed)
215 }
216
217 /// 获取停止请求标志的 Arc 引用(新增:v1.4)
218 ///
219 /// 用于跨线程共享停止标志
220 pub fn stop_requested(&self) -> &Arc<AtomicBool> {
221 &self.stop_requested
222 }
223
224 /// 获取发送端(用于自定义场景)
225 ///
226 /// # 注意
227 ///
228 /// 大多数情况下不需要直接使用此方法,只需将 `AsyncRecordingHook` 注册为 `FrameCallback` 即可。
229 #[must_use]
230 pub fn sender(&self) -> Sender<TimestampedFrame> {
231 self.tx.clone()
232 }
233
234 /// 获取丢帧计数器
235 ///
236 /// # 使用建议(v1.2.1)
237 ///
238 /// ✅ **推荐**: 在创建钩子时直接持有 `Arc` 引用
239 ///
240 /// ```rust
241 /// use piper_driver::recording::AsyncRecordingHook;
242 /// use std::sync::atomic::Ordering;
243 ///
244 /// let (hook, _rx) = AsyncRecordingHook::new();
245 /// let dropped_counter = hook.dropped_frames().clone(); // 在此持有
246 ///
247 /// // 直接读取,无需从 Context downcast
248 /// let count = dropped_counter.load(Ordering::Relaxed);
249 /// ```
250 ///
251 /// ❌ **不推荐**: 试图从 `Context` 中 `downcast`(需要 Trait 继承 `Any`)
252 ///
253 /// # 返回
254 ///
255 /// `Arc<AtomicU64>`: 丢帧计数器的引用
256 #[must_use]
257 pub fn dropped_frames(&self) -> &Arc<AtomicU64> {
258 &self.dropped_frames
259 }
260
261 /// 获取当前丢帧数量
262 ///
263 /// # 返回
264 ///
265 /// 当前丢失的帧数
266 #[must_use]
267 pub fn dropped_count(&self) -> u64 {
268 self.dropped_frames.load(Ordering::Relaxed)
269 }
270
271 /// 获取帧计数器(新增:v1.3.0)
272 ///
273 /// # 使用建议
274 ///
275 /// ✅ **推荐**: 在创建钩子时直接持有 `Arc` 引用
276 ///
277 /// ```rust
278 /// use piper_driver::recording::AsyncRecordingHook;
279 /// use std::sync::atomic::Ordering;
280 ///
281 /// let (hook, _rx) = AsyncRecordingHook::new();
282 /// let frame_counter = hook.frame_counter().clone(); // 在此持有
283 ///
284 /// // 直接读取,无需从 Context downcast
285 /// let count = frame_counter.load(Ordering::Relaxed);
286 /// ```
287 ///
288 /// # 返回
289 ///
290 /// `Arc<AtomicU64>`: 帧计数器的引用(不可变,只读)
291 #[must_use]
292 pub fn frame_counter(&self) -> &Arc<AtomicU64> {
293 &self.frame_counter
294 }
295
296 /// 获取当前已录制的帧数(新增:v1.3.0)
297 ///
298 /// # 返回
299 ///
300 /// 当前已成功录制的帧数
301 #[must_use]
302 pub fn frame_count(&self) -> u64 {
303 self.frame_counter.load(Ordering::Relaxed)
304 }
305}
306
307impl FrameCallback for AsyncRecordingHook {
308 /// 当接收到 CAN 帧时调用
309 ///
310 /// # 性能要求
311 ///
312 /// - <1μs 开销(非阻塞)
313 /// - 队列满时丢帧,而非阻塞或无限增长
314 ///
315 /// # 时间戳精度(v1.2.1)
316 ///
317 /// ⏱️ **必须使用硬件时间戳**:
318 ///
319 /// ```rust
320 /// use piper_driver::recording::TimestampedFrame;
321 /// use piper_protocol::PiperFrame;
322 ///
323 /// let frame = PiperFrame::new_standard(0x251, &[1, 2, 3, 4]);
324 /// let ts_frame = TimestampedFrame::from(&frame);
325 /// assert_eq!(ts_frame.timestamp_us, frame.timestamp_us); // ✅ 硬件时间戳
326 /// ```
327 ///
328 /// ❌ **禁止软件生成时间戳**:
329 ///
330 /// // ❌ 错误:回调执行时间已晚于帧到达时间(仅说明概念)
331 /// // let ts = SystemTime::now().duration_since(UNIX_EPOCH)?.as_micros() as u64;
332 ///
333 #[inline]
334 #[allow(clippy::collapsible_if)] // 嵌套 if 结构更清晰:先检查 Option,再比较 ID
335 fn on_frame_received(&self, frame: &PiperFrame) {
336 // ⚠️ 关键:这里运行在 CAN 接收线程中,必须极快
337 // ✅ 性能优化:先记录所有帧(包括触发帧),再检查停止条件(v1.4 修正)
338
339 // 1. 先记录帧(无论是否为触发帧)
340 let ts_frame = TimestampedFrame::from(frame);
341 if self.tx.try_send(ts_frame).is_err() {
342 // ⚠️ 缓冲区满时,丢弃"新"帧,保留"旧"帧
343 self.dropped_frames.fetch_add(1, Ordering::Relaxed);
344 } else {
345 self.frame_counter.fetch_add(1, Ordering::Relaxed);
346 }
347
348 // 2. 再检查停止条件(原子操作,极快)
349 if let Some(stop_id) = self.stop_on_id {
350 if frame.id() == stop_id {
351 // ✅ 原子存储,不会阻塞
352 self.stop_requested.store(true, Ordering::SeqCst);
353 // ✅ 注意:不使用 return,因为已经记录了触发帧
354 }
355 }
356 }
357
358 /// 当发送 CAN 帧成功后调用(可选)
359 ///
360 /// # 时机
361 ///
362 /// 仅在 `tx.send()` 成功后调用,确保录制的是实际发送的帧。
363 #[inline]
364 fn on_frame_sent(&self, frame: &PiperFrame) {
365 // ⏱️ 直接透传硬件时间戳
366 let ts_frame = TimestampedFrame::from(frame);
367
368 // 🛡️ 非阻塞发送
369 if self.tx.try_send(ts_frame).is_err() {
370 self.dropped_frames.fetch_add(1, Ordering::Relaxed);
371 }
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use std::thread;
379 use std::time::Duration;
380
381 #[test]
382 fn test_async_recording_hook_basic() {
383 let (hook, rx) = AsyncRecordingHook::new();
384 let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
385
386 // 创建测试帧
387 let frame = PiperFrame {
388 id: 0x2A5,
389 data: [0, 1, 2, 3, 4, 5, 6, 7],
390 len: 8,
391 is_extended: false,
392 timestamp_us: 12345,
393 };
394
395 // 触发回调
396 callback.on_frame_received(&frame);
397
398 // 验证接收到帧
399 let received = rx.recv_timeout(Duration::from_millis(100)).unwrap();
400 assert_eq!(received.timestamp_us, 12345);
401 assert_eq!(received.id, 0x2A5);
402 assert_eq!(received.data, vec![0, 1, 2, 3, 4, 5, 6, 7]);
403 }
404
405 #[test]
406 fn test_async_recording_hook_dropped_frames() {
407 let (hook, rx) = AsyncRecordingHook::new();
408 let dropped_counter = hook.dropped_frames().clone();
409 let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
410
411 // 创建测试帧
412 let frame = PiperFrame {
413 id: 0x2A5,
414 data: [0, 1, 2, 3, 4, 5, 6, 7],
415 len: 8,
416 is_extended: false,
417 timestamp_us: 12345,
418 };
419
420 // 正常情况:无丢帧
421 callback.on_frame_received(&frame);
422 assert_eq!(dropped_counter.load(Ordering::Relaxed), 0);
423
424 // 清空接收端,模拟队列满的情况
425 drop(rx);
426
427 // 现在发送会失败(队列已关闭)
428 for _ in 0..10 {
429 callback.on_frame_received(&frame);
430 }
431
432 // 应该记录了 10 个丢帧
433 assert_eq!(dropped_counter.load(Ordering::Relaxed), 10);
434 }
435
436 #[test]
437 fn test_async_recording_hook_tx_callback() {
438 let (hook, rx) = AsyncRecordingHook::new();
439 let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
440
441 // 创建测试帧
442 let frame = PiperFrame {
443 id: 0x1A1,
444 data: [1, 2, 3, 4, 5, 6, 7, 8],
445 len: 8,
446 is_extended: false,
447 timestamp_us: 54321,
448 };
449
450 // 触发 TX 回调
451 callback.on_frame_sent(&frame);
452
453 // 验证接收到帧
454 let received = rx.recv_timeout(Duration::from_millis(100)).unwrap();
455 assert_eq!(received.timestamp_us, 54321);
456 assert_eq!(received.id, 0x1A1);
457 }
458
459 #[test]
460 fn test_timestamped_frame_from_piper_frame() {
461 let frame = PiperFrame {
462 id: 0x2A5,
463 data: [0, 1, 2, 3, 4, 5, 6, 7],
464 len: 8,
465 is_extended: false,
466 timestamp_us: 99999,
467 };
468
469 let ts_frame = TimestampedFrame::from(&frame);
470
471 assert_eq!(ts_frame.timestamp_us, 99999);
472 assert_eq!(ts_frame.id, 0x2A5);
473 assert_eq!(ts_frame.data, vec![0, 1, 2, 3, 4, 5, 6, 7]);
474 }
475
476 #[test]
477 fn test_async_recording_hook_concurrent() {
478 let (hook, rx) = AsyncRecordingHook::new();
479 let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
480
481 // 创建多个线程并发触发回调
482 let handles: Vec<_> = (0..10)
483 .map(|i| {
484 let cb = callback.clone();
485 thread::spawn(move || {
486 let frame = PiperFrame {
487 id: 0x2A5,
488 data: [i as u8; 8],
489 len: 8,
490 is_extended: false,
491 timestamp_us: i as u64,
492 };
493 cb.on_frame_received(&frame);
494 })
495 })
496 .collect();
497
498 // 等待所有线程完成
499 for handle in handles {
500 handle.join().unwrap();
501 }
502
503 // 验证接收到所有帧(顺序可能不同)
504 let mut count = 0;
505 while rx.try_recv().is_ok() {
506 count += 1;
507 }
508 assert_eq!(count, 10);
509 }
510}