piper_driver/piper.rs
1//! Robot API 模块
2//!
3//! 提供对外的 `Piper` 结构体,封装底层 IO 线程和状态同步细节。
4
5use crate::command::{CommandPriority, PiperCommand, RealtimeCommand};
6use crate::error::DriverError;
7use crate::fps_stats::{FpsCounts, FpsResult};
8use crate::metrics::{MetricsSnapshot, PiperMetrics};
9use crate::pipeline::*;
10use crate::state::*;
11use crossbeam_channel::Sender;
12use piper_can::{CanAdapter, CanError, PiperFrame, SplittableAdapter};
13use std::mem::ManuallyDrop;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::thread::{JoinHandle, spawn};
17use std::time::Duration;
18use tracing::{error, info, warn};
19
20/// Extension trait for timeout-capable thread joins
21trait JoinTimeout {
22 fn join_timeout(self, timeout: Duration) -> std::thread::Result<()>;
23}
24
25impl<T: std::marker::Send + 'static> JoinTimeout for JoinHandle<T> {
26 fn join_timeout(self, timeout: Duration) -> std::thread::Result<()> {
27 use std::sync::mpsc;
28
29 // Create a channel for signaling completion
30 let (tx, rx) = mpsc::channel();
31
32 // Spawn a watchdog thread that joins the target thread
33 spawn(move || {
34 let result = self.join();
35 // Send result (ignore send errors - receiver may have timed out)
36 let _ = tx.send(result);
37 });
38
39 // Block with timeout - no busy waiting!
40 match rx.recv_timeout(timeout) {
41 Ok(join_result) => join_result.map(|_| ()), // Thread finished
42 Err(mpsc::RecvTimeoutError::Timeout) => {
43 // Timeout: watchdog thread continues running
44 // This is acceptable - OS will clean up on process exit
45 Err(std::boxed::Box::new(std::io::Error::new(
46 std::io::ErrorKind::TimedOut,
47 "Thread join timeout",
48 )))
49 },
50 Err(mpsc::RecvTimeoutError::Disconnected) => {
51 // Channel disconnected unexpectedly - thread panicked
52 Err(std::boxed::Box::new(std::io::Error::new(
53 std::io::ErrorKind::ConnectionReset,
54 "Thread panicked during join",
55 )))
56 },
57 }
58 }
59}
60
61/// Piper 机械臂驱动(对外 API)
62///
63/// 支持单线程和双线程两种模式
64/// - 单线程模式:使用 `io_thread`(向后兼容)
65/// - 双线程模式:使用 `rx_thread` 和 `tx_thread`(物理隔离)
66pub struct Piper {
67 /// 命令发送通道(向 IO 线程发送控制帧,单线程模式)
68 ///
69 /// 需要在 Drop 时 **提前关闭通道**(在 join IO 线程之前),
70 /// 否则 `io_loop` 可能永远收不到 `Disconnected` 而导致退出卡住。
71 cmd_tx: ManuallyDrop<Sender<PiperFrame>>,
72 /// 实时命令插槽(双线程模式,邮箱模式,Overwrite)
73 realtime_slot: Option<Arc<std::sync::Mutex<Option<RealtimeCommand>>>>,
74 /// 可靠命令队列发送端(双线程模式,容量 10,FIFO)
75 reliable_tx: Option<Sender<PiperFrame>>,
76 /// 共享状态上下文
77 ctx: Arc<PiperContext>,
78 /// IO 线程句柄(单线程模式,Drop 时 join)
79 io_thread: Option<JoinHandle<()>>,
80 /// RX 线程句柄(双线程模式)
81 rx_thread: Option<JoinHandle<()>>,
82 /// TX 线程句柄(双线程模式)
83 tx_thread: Option<JoinHandle<()>>,
84 /// 运行标志(用于线程生命周期联动)
85 is_running: Arc<AtomicBool>,
86 /// 性能指标(原子计数器)
87 metrics: Arc<PiperMetrics>,
88 /// CAN 接口名称(用于录制元数据)
89 interface: String,
90 /// CAN 总线速度(bps)(用于录制元数据)
91 bus_speed: u32,
92 /// Driver 工作模式(用于回放模式控制)
93 driver_mode: crate::mode::AtomicDriverMode,
94}
95
96impl Piper {
97 /// 最大允许的实时帧包大小
98 ///
99 /// 允许调用者在客户端进行预检查,避免跨层调用后的运行时错误。
100 ///
101 /// # 示例
102 ///
103 /// ```rust,no_run
104 /// # use piper_driver::Piper;
105 /// # use piper_can::PiperFrame;
106 /// # fn example(piper: &Piper) -> std::result::Result<(), Box<dyn std::error::Error>> {
107 /// let frame1 = PiperFrame::new_standard(0x100, &[]);
108 /// let frame2 = PiperFrame::new_standard(0x101, &[]);
109 /// let frame3 = PiperFrame::new_standard(0x102, &[]);
110 /// let frames = [frame1, frame2, frame3];
111 /// if frames.len() > Piper::MAX_REALTIME_PACKAGE_SIZE {
112 /// return Err("Package too large".into());
113 /// }
114 /// piper.send_realtime_package(frames)?;
115 /// # Ok(())
116 /// # }
117 /// ```
118 pub const MAX_REALTIME_PACKAGE_SIZE: usize = 10;
119
120 /// 设置元数据(内部方法,由 Builder 调用)
121 pub(crate) fn with_metadata(mut self, interface: String, bus_speed: u32) -> Self {
122 self.interface = interface;
123 self.bus_speed = bus_speed;
124 self
125 }
126
127 /// 创建新的 Piper 实例
128 ///
129 /// # 参数
130 /// - `can`: CAN 适配器(会被移动到 IO 线程)
131 /// - `config`: Pipeline 配置(可选)
132 ///
133 /// # 错误
134 /// - `CanError`: CAN 设备初始化失败(注意:这里返回 CanError,因为 DriverError 尚未完全实现 `From<CanError>`)
135 pub fn new(
136 can: impl CanAdapter + Send + 'static,
137 config: Option<PipelineConfig>,
138 ) -> Result<Self, CanError> {
139 // 创建命令通道(有界队列,容量 10)
140 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(10);
141
142 // 创建共享状态上下文
143 let ctx = Arc::new(PiperContext::new());
144
145 // 克隆上下文用于 IO 线程
146 let ctx_clone = ctx.clone();
147
148 // 启动 IO 线程
149 let io_thread = spawn(move || {
150 io_loop(can, cmd_rx, ctx_clone, config.unwrap_or_default());
151 });
152
153 Ok(Self {
154 cmd_tx: ManuallyDrop::new(cmd_tx),
155 realtime_slot: None, // 单线程模式
156 reliable_tx: None, // 单线程模式
157 ctx,
158 io_thread: Some(io_thread),
159 rx_thread: None, // 单线程模式
160 tx_thread: None, // 单线程模式
161 is_running: Arc::new(AtomicBool::new(true)), // 默认运行中
162 metrics: Arc::new(PiperMetrics::new()), // 初始化指标
163 interface: "unknown".to_string(), // 未通过 builder 构建
164 bus_speed: 1_000_000, // 默认 1Mbps
165 driver_mode: crate::mode::AtomicDriverMode::new(crate::mode::DriverMode::Normal),
166 })
167 }
168
169 /// 创建双线程模式的 Piper 实例
170 ///
171 /// 将 CAN 适配器分离为独立的 RX 和 TX 适配器,实现物理隔离。
172 /// RX 线程专门负责接收反馈帧,TX 线程专门负责发送控制命令。
173 ///
174 /// # 参数
175 /// - `can`: 可分离的 CAN 适配器(必须已启动)
176 /// - `config`: Pipeline 配置(可选)
177 ///
178 /// # 错误
179 /// - `CanError::NotStarted`: 适配器未启动
180 /// - `CanError::Device`: 分离适配器失败
181 ///
182 /// # 使用场景
183 /// - 实时控制:需要 RX 不受 TX 阻塞影响
184 /// - 高频控制:500Hz-1kHz 控制循环
185 ///
186 /// # 注意
187 /// - 适配器必须已启动(调用 `configure()` 或 `start()`)
188 /// - 分离后,原适配器不再可用(消费 `can`)
189 pub fn new_dual_thread<C>(can: C, config: Option<PipelineConfig>) -> Result<Self, CanError>
190 where
191 C: SplittableAdapter + Send + 'static,
192 C::RxAdapter: Send + 'static,
193 C::TxAdapter: Send + 'static,
194 {
195 // 分离适配器
196 let (rx_adapter, tx_adapter) = can.split()?;
197
198 // 创建命令通道(邮箱模式 + 可靠队列容量 10)
199 let realtime_slot = Arc::new(std::sync::Mutex::new(None::<RealtimeCommand>));
200 let (reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
201
202 // 创建共享状态上下文
203 let ctx = Arc::new(PiperContext::new());
204
205 // 创建运行标志和指标
206 let is_running = Arc::new(AtomicBool::new(true));
207 let metrics = Arc::new(PiperMetrics::new());
208
209 // 克隆用于线程
210 let ctx_clone = ctx.clone();
211 let is_running_clone = is_running.clone();
212 let metrics_clone = metrics.clone();
213 let config_clone = config.clone().unwrap_or_default();
214
215 // 启动 RX 线程
216 let rx_thread = spawn(move || {
217 crate::pipeline::rx_loop(
218 rx_adapter,
219 ctx_clone,
220 config_clone,
221 is_running_clone,
222 metrics_clone,
223 );
224 });
225
226 // 克隆用于 TX 线程
227 let ctx_tx = ctx.clone();
228 let is_running_tx = is_running.clone();
229 let metrics_tx = metrics.clone();
230 let realtime_slot_tx = realtime_slot.clone();
231
232 // 启动 TX 线程(邮箱模式)
233 let tx_thread = spawn(move || {
234 crate::pipeline::tx_loop_mailbox(
235 tx_adapter,
236 realtime_slot_tx,
237 reliable_rx,
238 is_running_tx,
239 metrics_tx,
240 ctx_tx, // 🆕 v1.2.1: 传入 ctx 用于触发 TX 回调
241 );
242 });
243
244 // 给 RX 线程一些启动时间,确保它已经开始接收数据
245 // 这对于 wait_for_feedback 很重要,因为如果 RX 线程还没启动,就无法收到反馈
246 std::thread::sleep(std::time::Duration::from_millis(10));
247
248 Ok(Self {
249 cmd_tx: ManuallyDrop::new(reliable_tx.clone()), // 向后兼容:单线程模式使用
250 realtime_slot: Some(realtime_slot), // 实时命令邮箱
251 reliable_tx: Some(reliable_tx), // 可靠队列
252 ctx,
253 io_thread: None, // 双线程模式不使用 io_thread
254 rx_thread: Some(rx_thread),
255 tx_thread: Some(tx_thread),
256 is_running,
257 metrics,
258 interface: "unknown".to_string(), // 未通过 builder 构建
259 bus_speed: 1_000_000, // 默认 1Mbps
260 driver_mode: crate::mode::AtomicDriverMode::new(crate::mode::DriverMode::Normal),
261 })
262 }
263
264 /// 检查线程健康状态
265 ///
266 /// 返回 RX 和 TX 线程的存活状态。
267 ///
268 /// # 返回
269 /// - `(rx_alive, tx_alive)`: 两个布尔值,表示线程是否还在运行
270 pub fn check_health(&self) -> (bool, bool) {
271 let rx_alive = self.rx_thread.as_ref().map(|h| !h.is_finished()).unwrap_or(true); // 单线程模式下,认为健康
272
273 let tx_alive = self.tx_thread.as_ref().map(|h| !h.is_finished()).unwrap_or(true); // 单线程模式下,认为健康
274
275 (rx_alive, tx_alive)
276 }
277
278 /// 检查是否健康
279 ///
280 /// 如果所有线程都存活,返回 `true`。
281 pub fn is_healthy(&self) -> bool {
282 let (rx_alive, tx_alive) = self.check_health();
283 rx_alive && tx_alive
284 }
285
286 /// 获取性能指标快照
287 ///
288 /// 返回当前所有计数器的快照,用于监控 IO 链路健康状态。
289 pub fn get_metrics(&self) -> MetricsSnapshot {
290 self.metrics.snapshot()
291 }
292
293 /// 获取关节动态状态(无锁,纳秒级返回)
294 ///
295 /// 包含关节速度和电流(独立帧 + Buffered Commit)。
296 ///
297 /// # 性能
298 /// - 无锁读取(ArcSwap::load)
299 /// - 返回快照副本(Clone 开销低,< 150 字节)
300 /// - 适合 500Hz 控制循环
301 pub fn get_joint_dynamic(&self) -> JointDynamicState {
302 self.ctx.joint_dynamic.load().as_ref().clone()
303 }
304
305 /// 获取关节位置状态(无锁,纳秒级返回)
306 ///
307 /// 包含6个关节的位置信息(500Hz更新)。
308 ///
309 /// # 性能
310 /// - 无锁读取(ArcSwap::load)
311 /// - 返回快照副本(Clone 开销低)
312 /// - 适合 500Hz 控制循环
313 ///
314 /// # 注意
315 /// - 此状态与 `EndPoseState` 不是原子更新的,如需同时获取,请使用 `capture_motion_snapshot()`
316 pub fn get_joint_position(&self) -> JointPositionState {
317 self.ctx.joint_position.load().as_ref().clone()
318 }
319
320 /// 获取末端位姿状态(无锁,纳秒级返回)
321 ///
322 /// 包含末端执行器的位置和姿态信息(500Hz更新)。
323 ///
324 /// # 性能
325 /// - 无锁读取(ArcSwap::load)
326 /// - 返回快照副本(Clone 开销低)
327 /// - 适合 500Hz 控制循环
328 ///
329 /// # 注意
330 /// - 此状态与 `JointPositionState` 不是原子更新的,如需同时获取,请使用 `capture_motion_snapshot()`
331 pub fn get_end_pose(&self) -> EndPoseState {
332 self.ctx.end_pose.load().as_ref().clone()
333 }
334
335 /// 获取运动快照(无锁,纳秒级返回)
336 ///
337 /// 原子性地获取 `JointPositionState` 和 `EndPoseState` 的最新快照。
338 /// 虽然这两个状态在硬件上不是同时更新的,但此方法保证逻辑上的原子性。
339 ///
340 /// # 性能
341 /// - 无锁读取(两次 ArcSwap::load)
342 /// - 返回快照副本
343 /// - 适合需要同时使用关节位置和末端位姿的场景
344 ///
345 /// # 示例
346 ///
347 /// ```
348 /// # use piper_driver::Piper;
349 /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
350 /// # // let piper = Piper::new(/* ... */).unwrap();
351 /// # // let snapshot = piper.capture_motion_snapshot();
352 /// # // println!("Joint positions: {:?}", snapshot.joint_position.joint_pos);
353 /// # // println!("End pose: {:?}", snapshot.end_pose.end_pose);
354 /// ```
355 pub fn capture_motion_snapshot(&self) -> MotionSnapshot {
356 self.ctx.capture_motion_snapshot()
357 }
358
359 /// 获取机器人控制状态(无锁)
360 ///
361 /// 包含控制模式、机器人状态、故障码等(100Hz更新)。
362 ///
363 /// # 性能
364 /// - 无锁读取(ArcSwap::load)
365 /// - 返回快照副本
366 pub fn get_robot_control(&self) -> RobotControlState {
367 self.ctx.robot_control.load().as_ref().clone()
368 }
369
370 /// 获取夹爪状态(无锁)
371 ///
372 /// 包含夹爪行程、扭矩、状态码等(100Hz更新)。
373 ///
374 /// # 性能
375 /// - 无锁读取(ArcSwap::load)
376 /// - 返回快照副本
377 pub fn get_gripper(&self) -> GripperState {
378 self.ctx.gripper.load().as_ref().clone()
379 }
380
381 /// 获取关节驱动器低速反馈状态(无锁)
382 ///
383 /// 包含温度、电压、电流、驱动器状态等(40Hz更新)。
384 ///
385 /// # 性能
386 /// - 无锁读取(ArcSwap::load,Wait-Free)
387 /// - 返回快照副本
388 pub fn get_joint_driver_low_speed(&self) -> JointDriverLowSpeedState {
389 self.ctx.joint_driver_low_speed.load().as_ref().clone()
390 }
391
392 /// 获取固件版本字符串
393 ///
394 /// 从累积的固件数据中解析版本字符串。
395 /// 如果固件数据未完整或未找到版本字符串,返回 `None`。
396 ///
397 /// # 性能
398 /// - 需要获取 RwLock 读锁
399 /// - 如果已解析,直接返回缓存的版本字符串
400 /// - 如果未解析,尝试从累积数据中解析
401 pub fn get_firmware_version(&self) -> Option<String> {
402 if let Ok(mut firmware_state) = self.ctx.firmware_version.write() {
403 // 如果已经解析过,直接返回
404 if let Some(version) = firmware_state.version_string() {
405 return Some(version.clone());
406 }
407 // 否则尝试解析
408 firmware_state.parse_version()
409 } else {
410 None
411 }
412 }
413
414 /// 查询固件版本
415 ///
416 /// 发送固件版本查询指令到机械臂,并清空之前的固件数据缓存。
417 /// 查询和反馈使用相同的 CAN ID (0x4AF)。
418 ///
419 /// **注意**:
420 /// - 发送查询命令后会自动清空固件数据缓存(与 Python SDK 一致)
421 /// - 需要等待一段时间(推荐 30-50ms)让机械臂返回反馈数据
422 /// - 之后可以调用 `get_firmware_version()` 获取解析后的版本字符串
423 ///
424 /// # 错误
425 /// - `DriverError::ChannelFull`: 命令通道已满(单线程模式)
426 /// - `DriverError::ChannelClosed`: 命令通道已关闭
427 /// - `DriverError::NotDualThread`: 双线程模式下使用错误的方法
428 ///
429 /// # 示例
430 ///
431 /// ```no_run
432 /// # use piper_driver::Piper;
433 /// # use piper_protocol::FirmwareVersionQueryCommand;
434 /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
435 /// # // let piper = Piper::new(/* ... */).unwrap();
436 /// # // 发送查询命令
437 /// # // piper.query_firmware_version().unwrap();
438 /// # // 等待反馈数据累积
439 /// # // std::thread::sleep(std::time::Duration::from_millis(50));
440 /// # // 获取版本字符串
441 /// # // if let Some(version) = piper.get_firmware_version() {
442 /// # // println!("Firmware version: {}", version);
443 /// # // }
444 /// ```
445 pub fn query_firmware_version(&self) -> Result<(), DriverError> {
446 use piper_protocol::FirmwareVersionQueryCommand;
447
448 // 创建查询命令
449 let cmd = FirmwareVersionQueryCommand::new();
450 let frame = cmd.to_frame();
451
452 // 发送命令(使用可靠命令模式,确保命令被发送)
453 // 注意:固件版本查询不是高频实时命令,使用可靠命令模式更合适
454 if let Some(reliable_tx) = &self.reliable_tx {
455 // 双线程模式:使用可靠命令队列
456 reliable_tx.try_send(frame).map_err(|e| match e {
457 crossbeam_channel::TrySendError::Full(_) => DriverError::ChannelFull,
458 crossbeam_channel::TrySendError::Disconnected(_) => DriverError::ChannelClosed,
459 })?;
460 } else {
461 // 单线程模式:使用普通命令通道
462 self.send_frame(frame)?;
463 }
464
465 // 清空固件数据缓存
466 if let Ok(mut firmware_state) = self.ctx.firmware_version.write() {
467 firmware_state.clear();
468 }
469
470 Ok(())
471 }
472
473 /// 获取主从模式控制模式指令状态(无锁)
474 ///
475 /// 包含控制模式、运动模式、速度等(主从模式下,~200Hz更新)。
476 ///
477 /// # 性能
478 /// - 无锁读取(ArcSwap::load)
479 /// - 返回快照副本
480 pub fn get_master_slave_control_mode(&self) -> MasterSlaveControlModeState {
481 self.ctx.master_slave_control_mode.load().as_ref().clone()
482 }
483
484 /// 获取主从模式关节控制指令状态(无锁)
485 ///
486 /// 包含6个关节的目标角度(主从模式下,~500Hz更新)。
487 ///
488 /// # 性能
489 /// - 无锁读取(ArcSwap::load)
490 /// - 返回快照副本
491 /// - 帧组同步,保证6个关节数据的逻辑一致性
492 pub fn get_master_slave_joint_control(&self) -> MasterSlaveJointControlState {
493 self.ctx.master_slave_joint_control.load().as_ref().clone()
494 }
495
496 /// 获取主从模式夹爪控制指令状态(无锁)
497 ///
498 /// 包含夹爪目标行程、扭矩等(主从模式下,~200Hz更新)。
499 ///
500 /// # 性能
501 /// - 无锁读取(ArcSwap::load)
502 /// - 返回快照副本
503 pub fn get_master_slave_gripper_control(&self) -> MasterSlaveGripperControlState {
504 self.ctx.master_slave_gripper_control.load().as_ref().clone()
505 }
506
507 /// 获取碰撞保护状态(读锁)
508 ///
509 /// 包含各关节的碰撞保护等级(按需查询)。
510 ///
511 /// # 性能
512 /// - 读锁(RwLock::read)
513 /// - 返回快照副本
514 pub fn get_collision_protection(&self) -> Result<CollisionProtectionState, DriverError> {
515 self.ctx
516 .collision_protection
517 .read()
518 .map(|guard| guard.clone())
519 .map_err(|_| DriverError::PoisonedLock)
520 }
521
522 /// 获取关节限制配置状态(读锁)
523 ///
524 /// 包含关节角度限制和速度限制(按需查询)。
525 ///
526 /// # 性能
527 /// - 读锁(RwLock::read)
528 /// - 返回快照副本
529 pub fn get_joint_limit_config(&self) -> Result<JointLimitConfigState, DriverError> {
530 self.ctx
531 .joint_limit_config
532 .read()
533 .map(|guard| guard.clone())
534 .map_err(|_| DriverError::PoisonedLock)
535 }
536
537 /// 获取关节加速度限制配置状态(读锁)
538 ///
539 /// 包含关节加速度限制(按需查询)。
540 ///
541 /// # 性能
542 /// - 读锁(RwLock::read)
543 /// - 返回快照副本
544 pub fn get_joint_accel_config(&self) -> Result<JointAccelConfigState, DriverError> {
545 self.ctx
546 .joint_accel_config
547 .read()
548 .map(|guard| guard.clone())
549 .map_err(|_| DriverError::PoisonedLock)
550 }
551
552 /// 获取末端限制配置状态(读锁)
553 ///
554 /// 包含末端执行器的速度和加速度限制(按需查询)。
555 ///
556 /// # 性能
557 /// - 读锁(RwLock::read)
558 /// - 返回快照副本
559 pub fn get_end_limit_config(&self) -> Result<EndLimitConfigState, DriverError> {
560 self.ctx
561 .end_limit_config
562 .read()
563 .map(|guard| guard.clone())
564 .map_err(|_| DriverError::PoisonedLock)
565 }
566
567 /// 获取组合运动状态(所有热数据)
568 ///
569 /// 注意:不同子状态的时间戳可能不同步(差异通常在毫秒级)。
570 /// 如果需要时间对齐的状态,请使用 `get_aligned_motion()`。
571 pub fn get_motion_state(&self) -> CombinedMotionState {
572 let snapshot = self.capture_motion_snapshot();
573 CombinedMotionState {
574 joint_position: snapshot.joint_position,
575 end_pose: snapshot.end_pose,
576 joint_dynamic: self.get_joint_dynamic(),
577 }
578 }
579
580 /// 获取时间对齐的运动状态(推荐用于力控算法)
581 ///
582 /// 以 `joint_position.hardware_timestamp_us` 为基准时间,检查时间戳差异。
583 /// 即使时间戳差异超过阈值,也返回状态数据(让用户有选择权)。
584 ///
585 /// # 参数
586 /// - `max_time_diff_us`: 允许的最大时间戳差异(微秒),推荐值:5000(5ms)
587 ///
588 /// # 返回值
589 /// - `AlignmentResult::Ok(state)`: 时间戳差异在可接受范围内
590 /// - `AlignmentResult::Misaligned { state, diff_us }`: 时间戳差异过大,但仍返回状态数据
591 pub fn get_aligned_motion(&self, max_time_diff_us: u64) -> AlignmentResult {
592 let snapshot = self.capture_motion_snapshot();
593 let joint_dynamic = self.get_joint_dynamic();
594
595 let time_diff = snapshot
596 .joint_position
597 .hardware_timestamp_us
598 .abs_diff(joint_dynamic.group_timestamp_us);
599
600 let state = AlignedMotionState {
601 joint_pos: snapshot.joint_position.joint_pos,
602 joint_vel: joint_dynamic.joint_vel,
603 joint_current: joint_dynamic.joint_current,
604 end_pose: snapshot.end_pose.end_pose,
605 timestamp: snapshot.joint_position.hardware_timestamp_us, // 使用位置数据的时间戳作为基准
606 time_diff_us: (joint_dynamic.group_timestamp_us as i64)
607 - (snapshot.joint_position.hardware_timestamp_us as i64),
608 };
609
610 if time_diff > max_time_diff_us {
611 AlignmentResult::Misaligned {
612 state,
613 diff_us: time_diff,
614 }
615 } else {
616 AlignmentResult::Ok(state)
617 }
618 }
619
620 /// 等待接收到第一个有效反馈(用于初始化)
621 ///
622 /// 在 `Piper::new()` 后调用,确保在控制循环开始前已收到有效数据。
623 /// 避免使用全零的初始状态导致错误的控制指令。
624 ///
625 /// # 参数
626 /// - `timeout`: 超时时间
627 ///
628 /// # 返回值
629 /// - `Ok(())`: 成功接收到有效反馈(`timestamp_us > 0`)
630 /// - `Err(DriverError::Timeout)`: 超时未收到反馈
631 pub fn wait_for_feedback(&self, timeout: std::time::Duration) -> Result<(), DriverError> {
632 let start = std::time::Instant::now();
633
634 loop {
635 // 检查是否超时
636 if start.elapsed() >= timeout {
637 return Err(DriverError::Timeout);
638 }
639
640 // 检查是否收到有效反馈(任意状态的时间戳 > 0 即可)
641 let joint_pos = self.get_joint_position();
642 if joint_pos.hardware_timestamp_us > 0 {
643 return Ok(());
644 }
645
646 // 短暂休眠,避免 CPU 空转
647 std::thread::sleep(std::time::Duration::from_millis(1));
648 }
649 }
650
651 /// 获取 FPS 统计结果
652 ///
653 /// 返回最近一次统计窗口内的更新频率(FPS)。
654 /// 建议定期调用(如每秒一次)或按需调用。
655 ///
656 /// # 性能
657 /// - 无锁读取(仅原子读取)
658 /// - 开销:~100ns(5 次原子读取 + 浮点计算)
659 ///
660 /// # Example
661 ///
662 /// ```
663 /// # use piper_driver::Piper;
664 /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
665 /// # // let piper = Piper::new(/* ... */).unwrap();
666 /// # // 运行一段时间后查询 FPS
667 /// # // std::thread::sleep(std::time::Duration::from_secs(5));
668 /// # // let fps = piper.get_fps();
669 /// # // println!("Joint Position FPS: {:.2}", fps.joint_position);
670 /// # // println!("End Pose FPS: {:.2}", fps.end_pose);
671 /// # // println!("Joint Dynamic FPS: {:.2}", fps.joint_dynamic);
672 /// ```
673 pub fn get_fps(&self) -> FpsResult {
674 self.ctx.fps_stats.load().calculate_fps()
675 }
676
677 /// 获取 FPS 计数器原始值
678 ///
679 /// 返回当前计数器的原始值,可以配合自定义时间窗口计算 FPS。
680 ///
681 /// # 性能
682 /// - 无锁读取(仅原子读取)
683 /// - 开销:~50ns(5 次原子读取)
684 ///
685 /// # Example
686 ///
687 /// ```
688 /// # use piper_driver::Piper;
689 /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
690 /// # // let piper = Piper::new(/* ... */).unwrap();
691 /// # // 记录开始时间和计数
692 /// # // let start = std::time::Instant::now();
693 /// # // let counts_start = piper.get_fps_counts();
694 /// # // 运行一段时间
695 /// # // std::thread::sleep(std::time::Duration::from_secs(1));
696 /// # // 计算实际 FPS
697 /// # // let counts_end = piper.get_fps_counts();
698 /// # // let elapsed = start.elapsed();
699 /// # // let actual_fps = (counts_end.joint_position - counts_start.joint_position) as f64 / elapsed.as_secs_f64();
700 /// ```
701 pub fn get_fps_counts(&self) -> FpsCounts {
702 self.ctx.fps_stats.load().get_counts()
703 }
704
705 /// 重置 FPS 统计窗口(清空计数器并重新开始计时)
706 ///
707 /// 这是一个轻量级、无锁的重置:通过 `ArcSwap` 将内部 `FpsStatistics` 原子替换为新实例。
708 /// 适合在监控工具中做固定窗口统计(例如每 5 秒 reset 一次)。
709 pub fn reset_fps_stats(&self) {
710 self.ctx.fps_stats.store(Arc::new(crate::fps_stats::FpsStatistics::new()));
711 }
712
713 // ============================================================
714 // 连接监控 API
715 // ============================================================
716
717 /// 检查机器人是否仍在响应
718 ///
719 /// 如果在超时窗口内收到反馈,返回 `true`。
720 /// 这可用于检测机器人是否断电、CAN 线缆断开或固件崩溃。
721 ///
722 /// # 性能
723 /// - 无锁读取(AtomicU64::load)
724 /// - O(1) 时间复杂度
725 pub fn is_connected(&self) -> bool {
726 self.ctx.connection_monitor.check_connection()
727 }
728
729 /// 获取自上次反馈以来的时间
730 ///
731 /// 返回自上次成功处理 CAN 帧以来的时间。
732 /// 可用于连接质量监控或诊断。
733 pub fn connection_age(&self) -> std::time::Duration {
734 self.ctx.connection_monitor.time_since_last_feedback()
735 }
736
737 /// 发送控制帧(非阻塞)
738 ///
739 /// # 参数
740 /// - `frame`: 控制帧(已构建的 `PiperFrame`)
741 ///
742 /// # 错误
743 /// - `DriverError::ChannelClosed`: 命令通道已关闭(IO 线程退出)
744 /// - `DriverError::ChannelFull`: 命令队列已满(缓冲区容量 10)
745 pub fn send_frame(&self, frame: PiperFrame) -> Result<(), DriverError> {
746 self.cmd_tx.try_send(frame).map_err(|e| match e {
747 crossbeam_channel::TrySendError::Full(_) => DriverError::ChannelFull,
748 crossbeam_channel::TrySendError::Disconnected(_) => DriverError::ChannelClosed,
749 })
750 }
751
752 /// 获取钩子管理器的引用(用于高级诊断)
753 ///
754 /// # 设计理念
755 ///
756 /// 这是一个**逃生舱(Escape Hatch)**,用于高级诊断场景:
757 /// - 注册自定义 CAN 帧回调
758 /// - 实现录制功能
759 /// - 性能分析和调试
760 ///
761 /// # 使用场景
762 ///
763 /// - 自定义诊断工具
764 /// - 高级抓包和调试
765 /// - 性能分析和优化
766 /// - 后台监控线程
767 ///
768 /// # 示例
769 ///
770 /// ```rust,no_run
771 /// # use piper_driver::Piper;
772 /// # use piper_driver::hooks::FrameCallback;
773 /// # use piper_driver::recording::AsyncRecordingHook;
774 /// # use std::sync::Arc;
775 /// # fn example(robot: &Piper) {
776 /// // 获取 hooks 访问
777 /// let hooks = robot.hooks();
778 ///
779 /// // 创建录制钩子
780 /// let (hook, _rx) = AsyncRecordingHook::new();
781 /// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
782 ///
783 /// // 注册回调(忽略错误以简化示例)
784 /// if let Ok(mut hooks_guard) = hooks.write() {
785 /// hooks_guard.add_callback(callback);
786 /// }
787 /// # }
788 /// ```
789 ///
790 /// # 安全注意事项
791 ///
792 /// - **性能要求**:回调必须在 <1μs 内完成
793 /// - **线程安全**:返回 `Arc<RwLock<HookManager>>`,需手动加锁
794 /// - **不要阻塞**:禁止在回调中使用 Mutex、I/O、分配等阻塞操作
795 ///
796 /// # 返回值
797 ///
798 /// `Arc<RwLock<HookManager>>`: 钩子管理器的共享引用
799 ///
800 /// # 参考
801 ///
802 /// - [`HookManager`](crate::hooks::HookManager) - 钩子管理器
803 /// - [`FrameCallback`](crate::hooks::FrameCallback) - 回调 trait
804 /// - [架构分析报告](../../../docs/architecture/piper-driver-client-mixing-analysis.md) - 方案 B 设计
805 pub fn hooks(&self) -> Arc<std::sync::RwLock<crate::hooks::HookManager>> {
806 Arc::clone(&self.ctx.hooks)
807 }
808
809 /// 获取 CAN 接口名称
810 ///
811 /// # 返回值
812 ///
813 /// CAN 接口名称,例如 "can0", "vcan0" 等
814 pub fn interface(&self) -> String {
815 self.interface.clone()
816 }
817
818 /// 获取 CAN 总线速度
819 ///
820 /// # 返回值
821 ///
822 /// CAN 总线速度(bps),例如 1000000 (1Mbps)
823 pub fn bus_speed(&self) -> u32 {
824 self.bus_speed
825 }
826
827 /// 获取当前 Driver 模式
828 ///
829 /// # 返回值
830 ///
831 /// 当前 Driver 模式(Normal 或 Replay)
832 ///
833 /// # 示例
834 ///
835 /// ```rust,no_run
836 /// # use piper_driver::Piper;
837 /// # fn example(robot: &Piper) {
838 /// let mode = robot.mode();
839 /// println!("Current mode: {:?}", mode);
840 /// # }
841 /// ```
842 pub fn mode(&self) -> crate::mode::DriverMode {
843 self.driver_mode.get(std::sync::atomic::Ordering::Relaxed)
844 }
845
846 /// 设置 Driver 模式
847 ///
848 /// # 参数
849 ///
850 /// - `mode`: 新的 Driver 模式
851 ///
852 /// # 模式说明
853 ///
854 /// - **Normal**: 正常模式,TX 线程按周期发送控制指令
855 /// - **Replay**: 回放模式,TX 线程暂停周期性发送
856 ///
857 /// # 使用场景
858 ///
859 /// Replay 模式用于安全地回放预先录制的 CAN 帧:
860 /// - 暂停 TX 线程的周期性发送
861 /// - 避免双控制流冲突
862 /// - 允许精确控制帧发送时机
863 ///
864 /// # ⚠️ 安全警告
865 ///
866 /// - 切换到 Replay 模式前,应确保机器人处于 Standby 状态
867 /// - 在 Replay 模式下发送控制指令时,应遵守安全速度限制
868 ///
869 /// # 示例
870 ///
871 /// ```rust,no_run
872 /// # use piper_driver::{Piper, mode::DriverMode};
873 /// # fn example(robot: &Piper) {
874 /// // 切换到回放模式
875 /// robot.set_mode(DriverMode::Replay);
876 ///
877 /// // ... 执行回放 ...
878 ///
879 /// // 恢复正常模式
880 /// robot.set_mode(DriverMode::Normal);
881 /// # }
882 /// ```
883 pub fn set_mode(&self, mode: crate::mode::DriverMode) {
884 self.driver_mode.set(mode, std::sync::atomic::Ordering::Relaxed);
885 tracing::info!("Driver mode set to: {:?}", mode);
886 }
887
888 /// 发送控制帧(阻塞,带超时)
889 ///
890 /// 如果命令通道已满,阻塞等待直到有空闲位置或超时。
891 ///
892 /// # 参数
893 /// - `frame`: 控制帧(已构建的 `PiperFrame`)
894 /// - `timeout`: 超时时间
895 ///
896 /// # 错误
897 /// - `DriverError::ChannelClosed`: 命令通道已关闭(IO 线程退出)
898 /// - `DriverError::Timeout`: 超时未发送成功
899 pub fn send_frame_blocking(
900 &self,
901 frame: PiperFrame,
902 timeout: std::time::Duration,
903 ) -> Result<(), DriverError> {
904 self.cmd_tx.send_timeout(frame, timeout).map_err(|e| match e {
905 crossbeam_channel::SendTimeoutError::Timeout(_) => DriverError::Timeout,
906 crossbeam_channel::SendTimeoutError::Disconnected(_) => DriverError::ChannelClosed,
907 })
908 }
909
910 /// 发送实时控制命令(邮箱模式,覆盖策略)
911 ///
912 /// 实时命令使用邮箱模式(Mailbox),直接覆盖旧命令,确保最新命令被发送。
913 /// 这对于力控/高频控制场景很重要,只保留最新的控制指令。
914 ///
915 /// # 参数
916 /// - `frame`: 控制帧(已构建的 `PiperFrame`)
917 ///
918 /// # 错误
919 /// - `DriverError::NotDualThread`: 未使用双线程模式
920 /// - `DriverError::PoisonedLock`: 锁中毒(极少见,通常意味着 TX 线程 panic)
921 ///
922 /// # 实现细节
923 /// - 获取 Mutex 锁并直接覆盖插槽内容(Last Write Wins)
924 /// - 锁持有时间极短(< 50ns),仅为内存拷贝
925 /// - 永不阻塞:无论 TX 线程是否消费,都能立即写入
926 /// - 如果插槽已有数据,会被覆盖(更新 `metrics.tx_realtime_overwrites`)
927 ///
928 /// # 性能
929 /// - 典型延迟:20-50ns(无竞争情况下)
930 /// - 最坏延迟:200ns(与 TX 线程锁竞争时)
931 /// - 相比 Channel 重试策略,延迟降低 10-100 倍
932 ///
933 /// 发送单个实时帧(向后兼容,API 不变)
934 pub fn send_realtime(&self, frame: PiperFrame) -> Result<(), DriverError> {
935 self.send_realtime_command(RealtimeCommand::single(frame))
936 }
937
938 /// 发送实时帧包(新 API)
939 ///
940 /// # 参数
941 /// - `frames`: 要发送的帧迭代器,必须非空
942 ///
943 /// **接口优化**:接受 `impl IntoIterator`,允许用户传入:
944 /// - 数组:`[frame1, frame2, frame3]`(栈上,零堆分配)
945 /// - 切片:`&[frame1, frame2, frame3]`
946 /// - Vec:`vec![frame1, frame2, frame3]`
947 ///
948 /// # 错误
949 /// - `DriverError::NotDualThread`: 未使用双线程模式
950 /// - `DriverError::InvalidInput`: 帧列表为空或过大
951 /// - `DriverError::PoisonedLock`: 锁中毒
952 ///
953 /// # 原子性保证
954 /// Package 内的所有帧要么全部发送成功,要么都不发送。
955 /// 如果发送过程中出现错误,已发送的帧不会被回滚(CAN 总线特性),
956 /// 但未发送的帧不会继续发送。
957 ///
958 /// # 性能特性
959 /// - 如果帧数量 ≤ 4,完全在栈上分配,零堆内存分配
960 /// - 如果帧数量 > 4,SmallVec 会自动溢出到堆,但仍保持高效
961 pub fn send_realtime_package(
962 &self,
963 frames: impl IntoIterator<Item = PiperFrame>,
964 ) -> Result<(), DriverError> {
965 use crate::command::FrameBuffer;
966
967 let buffer: FrameBuffer = frames.into_iter().collect();
968
969 if buffer.is_empty() {
970 return Err(DriverError::InvalidInput(
971 "Frame package cannot be empty".to_string(),
972 ));
973 }
974
975 // 限制包大小,防止内存问题
976 // 使用 Piper 的关联常量,允许客户端预检查
977 //
978 // 注意:如果用户传入超大 Vec(如长度 1000),这里会先进行 collect 操作,
979 // 可能导致堆分配。虽然之后会检查并报错,但内存开销已经发生。
980 // 这是可以接受的权衡(安全网),但建议用户在调用前进行预检查。
981 if buffer.len() > Self::MAX_REALTIME_PACKAGE_SIZE {
982 return Err(DriverError::InvalidInput(format!(
983 "Frame package too large: {} (max: {})",
984 buffer.len(),
985 Self::MAX_REALTIME_PACKAGE_SIZE
986 )));
987 }
988
989 self.send_realtime_command(RealtimeCommand::package(buffer))
990 }
991
992 /// 内部方法:发送实时命令(统一处理单个帧和帧包)
993 fn send_realtime_command(&self, command: RealtimeCommand) -> Result<(), DriverError> {
994 let realtime_slot = self.realtime_slot.as_ref().ok_or(DriverError::NotDualThread)?;
995
996 match realtime_slot.lock() {
997 Ok(mut slot) => {
998 // 检测是否发生覆盖(如果插槽已有数据)
999 let is_overwrite = slot.is_some();
1000
1001 // 计算帧数量(在覆盖前,避免双重计算)
1002 let frame_count = command.len();
1003
1004 // 直接覆盖(邮箱模式:Last Write Wins)
1005 // 注意:如果旧命令是 Package,Drop 操作会释放 SmallVec
1006 // 但如果数据在栈上(len ≤ 4),Drop 只是栈指针移动,几乎零开销
1007 *slot = Some(command);
1008
1009 // 更新指标(在锁外更新,减少锁持有时间)
1010 // 注意:先释放锁,再更新指标,避免在锁内进行原子操作
1011 drop(slot); // 显式释放锁
1012
1013 // 更新指标(在锁外更新,减少锁持有时间)
1014 let total =
1015 self.metrics.tx_frames_total.fetch_add(frame_count as u64, Ordering::Relaxed)
1016 + frame_count as u64;
1017
1018 if is_overwrite {
1019 let overwrites =
1020 self.metrics.tx_realtime_overwrites.fetch_add(1, Ordering::Relaxed) + 1;
1021
1022 // 智能监控:每 1000 次发送检查一次覆盖率
1023 // 避免频繁计算,减少性能开销
1024 if total > 0 && total.is_multiple_of(1000) {
1025 let rate = (overwrites as f64 / total as f64) * 100.0;
1026
1027 // 只在覆盖率超过阈值时警告
1028 if rate > 50.0 {
1029 // 异常情况:覆盖率 > 50%,记录警告
1030 warn!(
1031 "High realtime overwrite rate detected: {:.1}% ({} overwrites / {} total sends). \
1032 This may indicate TX thread bottleneck or excessive send frequency.",
1033 rate, overwrites, total
1034 );
1035 } else if rate > 30.0 {
1036 // 中等情况:覆盖率 30-50%,记录信息(可选,生产环境可关闭)
1037 info!(
1038 "Moderate realtime overwrite rate: {:.1}% ({} overwrites / {} total sends). \
1039 This is normal for high-frequency control (> 500Hz).",
1040 rate, overwrites, total
1041 );
1042 }
1043 // < 30% 不记录日志(正常情况)
1044 }
1045 }
1046
1047 Ok(())
1048 },
1049 Err(_) => {
1050 error!("Realtime slot lock poisoned, TX thread may have panicked");
1051 Err(DriverError::PoisonedLock)
1052 },
1053 }
1054 }
1055
1056 /// 发送可靠命令(FIFO 策略)
1057 ///
1058 /// 可靠命令使用容量为 10 的队列,按 FIFO 顺序发送,不会覆盖。
1059 /// 这对于配置帧、状态机切换帧等关键命令很重要。
1060 ///
1061 /// # 参数
1062 /// - `frame`: 控制帧(已构建的 `PiperFrame`)
1063 ///
1064 /// # 错误
1065 /// - `DriverError::NotDualThread`: 未使用双线程模式
1066 /// - `DriverError::ChannelClosed`: 命令通道已关闭(TX 线程退出)
1067 /// - `DriverError::ChannelFull`: 队列满(非阻塞)
1068 pub fn send_reliable(&self, frame: PiperFrame) -> Result<(), DriverError> {
1069 let reliable_tx = self.reliable_tx.as_ref().ok_or(DriverError::NotDualThread)?;
1070
1071 match reliable_tx.try_send(frame) {
1072 Ok(_) => {
1073 self.metrics.tx_frames_total.fetch_add(1, Ordering::Relaxed);
1074 Ok(())
1075 },
1076 Err(crossbeam_channel::TrySendError::Full(_)) => {
1077 // 队列满,记录丢弃
1078 self.metrics.tx_reliable_drops.fetch_add(1, Ordering::Relaxed);
1079 Err(DriverError::ChannelFull)
1080 },
1081 Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
1082 Err(DriverError::ChannelClosed)
1083 },
1084 }
1085 }
1086
1087 /// 发送命令(根据优先级自动选择队列)
1088 ///
1089 /// 根据命令的优先级自动选择实时队列或可靠队列。
1090 ///
1091 /// # 参数
1092 /// - `command`: 带优先级的命令
1093 ///
1094 /// # 错误
1095 /// - `DriverError::NotDualThread`: 未使用双线程模式
1096 /// - `DriverError::ChannelClosed`: 命令通道已关闭(TX 线程退出)
1097 /// - `DriverError::ChannelFull`: 队列满(仅可靠命令)
1098 pub fn send_command(&self, command: PiperCommand) -> Result<(), DriverError> {
1099 match command.priority() {
1100 CommandPriority::RealtimeControl => self.send_realtime(command.frame()),
1101 CommandPriority::ReliableCommand => self.send_reliable(command.frame()),
1102 }
1103 }
1104
1105 /// 发送可靠命令(阻塞,带超时)
1106 ///
1107 /// 如果队列满,阻塞等待直到有空闲位置或超时。
1108 ///
1109 /// # 参数
1110 /// - `frame`: 控制帧(已构建的 `PiperFrame`)
1111 /// - `timeout`: 超时时间
1112 ///
1113 /// # 错误
1114 /// - `DriverError::NotDualThread`: 未使用双线程模式
1115 /// - `DriverError::ChannelClosed`: 命令通道已关闭(TX 线程退出)
1116 /// - `DriverError::Timeout`: 超时未发送成功
1117 pub fn send_reliable_timeout(
1118 &self,
1119 frame: PiperFrame,
1120 timeout: std::time::Duration,
1121 ) -> Result<(), DriverError> {
1122 let reliable_tx = self.reliable_tx.as_ref().ok_or(DriverError::NotDualThread)?;
1123
1124 match reliable_tx.send_timeout(frame, timeout) {
1125 Ok(_) => {
1126 self.metrics.tx_frames_total.fetch_add(1, Ordering::Relaxed);
1127 Ok(())
1128 },
1129 Err(crossbeam_channel::SendTimeoutError::Timeout(_)) => Err(DriverError::Timeout),
1130 Err(crossbeam_channel::SendTimeoutError::Disconnected(_)) => {
1131 Err(DriverError::ChannelClosed)
1132 },
1133 }
1134 }
1135}
1136
1137impl Drop for Piper {
1138 fn drop(&mut self) {
1139 // 设置运行标志为 false,通知所有线程退出
1140 // 使用 Release 确保所有之前的写入对其他线程可见
1141 self.is_running.store(false, Ordering::Release);
1142
1143 // 关闭命令通道(通知 IO 线程退出)
1144 // 关键:必须在 join 线程之前真正 drop 掉 Sender,否则接收端不会 Disconnected。
1145 unsafe {
1146 ManuallyDrop::drop(&mut self.cmd_tx);
1147 }
1148
1149 let join_timeout = Duration::from_secs(2);
1150
1151 // 等待 RX 线程退出(使用 join_timeout 替代 polling)
1152 if let Some(handle) = self.rx_thread.take()
1153 && let Err(_e) = handle.join_timeout(join_timeout)
1154 {
1155 error!(
1156 "RX thread panicked or failed to shut down within {:?}",
1157 join_timeout
1158 );
1159 }
1160
1161 // 等待 TX 线程退出(使用 join_timeout 替代 polling)
1162 if let Some(handle) = self.tx_thread.take()
1163 && let Err(_e) = handle.join_timeout(join_timeout)
1164 {
1165 error!(
1166 "TX thread panicked or failed to shut down within {:?}",
1167 join_timeout
1168 );
1169 }
1170
1171 // 等待 IO 线程退出(单线程模式,使用 join_timeout 替代 polling)
1172 if let Some(handle) = self.io_thread.take()
1173 && let Err(_e) = handle.join_timeout(join_timeout)
1174 {
1175 error!(
1176 "IO thread panicked or failed to shut down within {:?}",
1177 join_timeout
1178 );
1179 }
1180 }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use super::*;
1186 use piper_can::PiperFrame;
1187
1188 // 简单的 Mock CanAdapter 用于测试
1189 struct MockCanAdapter;
1190
1191 impl CanAdapter for MockCanAdapter {
1192 fn send(&mut self, _frame: PiperFrame) -> Result<(), CanError> {
1193 Ok(())
1194 }
1195
1196 fn receive(&mut self) -> Result<PiperFrame, CanError> {
1197 // 永远超时,避免阻塞测试
1198 Err(CanError::Timeout)
1199 }
1200 }
1201
1202 #[test]
1203 fn test_piper_new() {
1204 let mock_can = MockCanAdapter;
1205 let piper = Piper::new(mock_can, None).unwrap();
1206
1207 // 验证可以获取状态(默认状态)
1208 let joint_pos = piper.get_joint_position();
1209 assert_eq!(joint_pos.hardware_timestamp_us, 0);
1210
1211 // 验证通道正常工作
1212 let frame = PiperFrame::new_standard(0x123, &[0x01, 0x02]);
1213 assert!(piper.send_frame(frame).is_ok());
1214 }
1215
1216 #[test]
1217 fn test_piper_drop() {
1218 let mock_can = MockCanAdapter;
1219 let piper = Piper::new(mock_can, None).unwrap();
1220 // drop 应该能够正常退出,IO 线程被 join
1221 drop(piper);
1222 }
1223
1224 #[test]
1225 fn test_piper_get_motion_state() {
1226 let mock_can = MockCanAdapter;
1227 let piper = Piper::new(mock_can, None).unwrap();
1228 let motion = piper.get_motion_state();
1229 assert_eq!(motion.joint_position.hardware_timestamp_us, 0);
1230 assert_eq!(motion.joint_dynamic.group_timestamp_us, 0);
1231 }
1232
1233 #[test]
1234 fn test_piper_send_frame_channel_full() {
1235 let mock_can = MockCanAdapter;
1236 let piper = Piper::new(mock_can, None).unwrap();
1237 let frame = PiperFrame::new_standard(0x123, &[0x01]);
1238
1239 // 填满命令通道(容量 10)
1240 // 注意:IO 线程会持续消费帧,所以需要快速填充
1241 // 或者等待 IO 线程稍微延迟消费
1242 std::thread::sleep(std::time::Duration::from_millis(50));
1243
1244 for _ in 0..10 {
1245 assert!(piper.send_frame(frame).is_ok());
1246 }
1247
1248 // 第 11 次发送可能返回 ChannelFull(如果 IO 线程还没消费完)
1249 // 或者成功(如果 IO 线程已经消费了一些)
1250 // 为了测试 ChannelFull,我们需要更快速地发送,确保通道填满
1251 let result = piper.send_frame(frame);
1252
1253 // 由于 IO 线程在后台消费,可能成功也可能失败
1254 // 验证至少前 10 次都成功即可
1255 match result {
1256 Err(DriverError::ChannelFull) => {
1257 // 通道满,这是预期情况
1258 },
1259 Ok(()) => {
1260 // 如果 IO 线程消费很快,这也可能发生
1261 // 这是可接受的行为
1262 },
1263 Err(e) => panic!("Unexpected error: {:?}", e),
1264 }
1265 }
1266
1267 #[test]
1268 fn test_get_aligned_motion_aligned() {
1269 let mock_can = MockCanAdapter;
1270 let piper = Piper::new(mock_can, None).unwrap();
1271
1272 // 由于 MockCanAdapter 不发送帧,时间戳都为 0
1273 // 测试默认状态下的对齐检查(时间戳都为 0,应该是对齐的)
1274 let result = piper.get_aligned_motion(5000);
1275 match result {
1276 AlignmentResult::Ok(state) => {
1277 assert_eq!(state.timestamp, 0);
1278 assert_eq!(state.time_diff_us, 0);
1279 },
1280 AlignmentResult::Misaligned { .. } => {
1281 // 如果时间戳都为 0,不应该是不对齐的
1282 // 但允许这种情况(因为时间戳都是 0)
1283 },
1284 }
1285 }
1286
1287 #[test]
1288 fn test_get_aligned_motion_misaligned_threshold() {
1289 let mock_can = MockCanAdapter;
1290 let piper = Piper::new(mock_can, None).unwrap();
1291
1292 // 测试不同的时间差阈值
1293 // 由于时间戳都是 0,应该是对齐的
1294 let result1 = piper.get_aligned_motion(0);
1295 let result2 = piper.get_aligned_motion(1000);
1296 let result3 = piper.get_aligned_motion(1000000);
1297
1298 // 所有结果都应该返回状态(即使是对齐的)
1299 match (result1, result2, result3) {
1300 (AlignmentResult::Ok(_), AlignmentResult::Ok(_), AlignmentResult::Ok(_)) => {
1301 // 正常情况
1302 },
1303 _ => {
1304 // 允许其他情况
1305 },
1306 }
1307 }
1308
1309 #[test]
1310 fn test_get_robot_control() {
1311 let mock_can = MockCanAdapter;
1312 let piper = Piper::new(mock_can, None).unwrap();
1313
1314 let control = piper.get_robot_control();
1315 assert_eq!(control.hardware_timestamp_us, 0);
1316 assert_eq!(control.control_mode, 0);
1317 assert!(!control.is_enabled);
1318 }
1319
1320 #[test]
1321 fn test_get_joint_driver_low_speed() {
1322 let mock_can = MockCanAdapter;
1323 let piper = Piper::new(mock_can, None).unwrap();
1324
1325 let driver_state = piper.get_joint_driver_low_speed();
1326 assert_eq!(driver_state.hardware_timestamp_us, 0);
1327 assert_eq!(driver_state.motor_temps, [0.0; 6]);
1328 }
1329
1330 #[test]
1331 fn test_get_joint_limit_config() {
1332 let mock_can = MockCanAdapter;
1333 let piper = Piper::new(mock_can, None).unwrap();
1334
1335 let limits = piper.get_joint_limit_config().unwrap();
1336 assert_eq!(limits.joint_limits_max, [0.0; 6]);
1337 }
1338
1339 #[test]
1340 fn test_wait_for_feedback_timeout() {
1341 let mock_can = MockCanAdapter;
1342 let piper = Piper::new(mock_can, None).unwrap();
1343
1344 // MockCanAdapter 不发送帧,所以应该超时
1345 let result = piper.wait_for_feedback(std::time::Duration::from_millis(10));
1346 assert!(result.is_err());
1347 assert!(matches!(result.unwrap_err(), DriverError::Timeout));
1348 }
1349
1350 #[test]
1351 fn test_send_frame_blocking_timeout() {
1352 let mock_can = MockCanAdapter;
1353 let piper = Piper::new(mock_can, None).unwrap();
1354 let frame = PiperFrame::new_standard(0x123, &[0x01]);
1355
1356 // 快速填充通道(如果 IO 线程来不及消费)
1357 // 然后测试阻塞发送
1358 // 由于通道容量为 10,在 IO 线程消费的情况下,应该能成功
1359 // 但为了测试超时,我们使用极短的超时时间
1360 let result = piper.send_frame_blocking(frame, std::time::Duration::from_millis(1));
1361
1362 // 结果可能是成功(IO 线程消费快)或超时(通道满)
1363 match result {
1364 Ok(()) => {
1365 // 成功是正常情况
1366 },
1367 Err(DriverError::Timeout) => {
1368 // 超时也是可接受的(如果通道满)
1369 },
1370 Err(e) => panic!("Unexpected error: {:?}", e),
1371 }
1372 }
1373
1374 #[test]
1375 fn test_get_aligned_motion_with_time_diff() {
1376 let mock_can = MockCanAdapter;
1377 let piper = Piper::new(mock_can, None).unwrap();
1378
1379 // 测试对齐阈值边界情况
1380 // 时间戳都为 0 时,time_diff_us 应该是 0
1381 let result = piper.get_aligned_motion(0);
1382 match result {
1383 AlignmentResult::Ok(state) => {
1384 assert_eq!(state.time_diff_us, 0);
1385 },
1386 AlignmentResult::Misaligned { state, diff_us } => {
1387 // 如果时间戳都为 0,diff_us 应该也是 0
1388 assert_eq!(diff_us, 0);
1389 assert_eq!(state.time_diff_us, 0);
1390 },
1391 }
1392 }
1393
1394 #[test]
1395 fn test_get_motion_state_returns_combined() {
1396 let mock_can = MockCanAdapter;
1397 let piper = Piper::new(mock_can, None).unwrap();
1398
1399 let motion = piper.get_motion_state();
1400 // 验证返回的是组合状态
1401 assert_eq!(motion.joint_position.hardware_timestamp_us, 0);
1402 assert_eq!(motion.joint_dynamic.group_timestamp_us, 0);
1403 assert_eq!(motion.joint_position.joint_pos, [0.0; 6]);
1404 assert_eq!(motion.joint_dynamic.joint_vel, [0.0; 6]);
1405 }
1406
1407 #[test]
1408 fn test_send_frame_non_blocking() {
1409 let mock_can = MockCanAdapter;
1410 let piper = Piper::new(mock_can, None).unwrap();
1411 let frame = PiperFrame::new_standard(0x123, &[0x01, 0x02]);
1412
1413 // 非阻塞发送应该总是成功(除非通道满或关闭)
1414 let result = piper.send_frame(frame);
1415 assert!(result.is_ok(), "Non-blocking send should succeed");
1416 }
1417
1418 #[test]
1419 fn test_get_joint_dynamic_default() {
1420 let mock_can = MockCanAdapter;
1421 let piper = Piper::new(mock_can, None).unwrap();
1422
1423 let joint_dynamic = piper.get_joint_dynamic();
1424 assert_eq!(joint_dynamic.group_timestamp_us, 0);
1425 assert_eq!(joint_dynamic.joint_vel, [0.0; 6]);
1426 assert_eq!(joint_dynamic.joint_current, [0.0; 6]);
1427 assert!(!joint_dynamic.is_complete());
1428 }
1429
1430 #[test]
1431 fn test_get_joint_position_default() {
1432 let mock_can = MockCanAdapter;
1433 let piper = Piper::new(mock_can, None).unwrap();
1434
1435 let joint_pos = piper.get_joint_position();
1436 assert_eq!(joint_pos.hardware_timestamp_us, 0);
1437 assert_eq!(joint_pos.joint_pos, [0.0; 6]);
1438
1439 let end_pose = piper.get_end_pose();
1440 assert_eq!(end_pose.hardware_timestamp_us, 0);
1441 assert_eq!(end_pose.end_pose, [0.0; 6]);
1442 }
1443
1444 #[test]
1445 fn test_joint_driver_low_speed_clone() {
1446 let mock_can = MockCanAdapter;
1447 let piper = Piper::new(mock_can, None).unwrap();
1448
1449 // 测试读取并克隆诊断状态
1450 let driver1 = piper.get_joint_driver_low_speed();
1451 let driver2 = piper.get_joint_driver_low_speed();
1452
1453 // 验证可以多次读取(ArcSwap 无锁读取)
1454 assert_eq!(driver1.hardware_timestamp_us, driver2.hardware_timestamp_us);
1455 assert_eq!(driver1.motor_temps, driver2.motor_temps);
1456 }
1457
1458 #[test]
1459 fn test_joint_limit_config_read_lock() {
1460 let mock_can = MockCanAdapter;
1461 let piper = Piper::new(mock_can, None).unwrap();
1462
1463 // 测试可以多次读取配置状态
1464 let limits1 = piper.get_joint_limit_config().unwrap();
1465 let limits2 = piper.get_joint_limit_config().unwrap();
1466
1467 assert_eq!(limits1.joint_limits_max, limits2.joint_limits_max);
1468 assert_eq!(limits1.joint_limits_min, limits2.joint_limits_min);
1469 }
1470}