Skip to main content

piper_driver/
piper.rs

1//! Robot API 模块
2//!
3//! 提供对外的 `Piper` 结构体,封装底层 IO 线程和状态同步细节。
4
5use crate::command::{CommandPriority, PiperCommand, RealtimeCommand};
6use crate::error::DriverError;
7use crate::fps_stats::{FpsCounts, FpsResult};
8use crate::metrics::{MetricsSnapshot, PiperMetrics};
9use crate::pipeline::*;
10use crate::state::*;
11use crossbeam_channel::Sender;
12use piper_can::{CanAdapter, CanError, PiperFrame, SplittableAdapter};
13use std::mem::ManuallyDrop;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::thread::{JoinHandle, spawn};
17use std::time::Duration;
18use tracing::{error, info, warn};
19
20/// Extension trait for timeout-capable thread joins
21trait JoinTimeout {
22    fn join_timeout(self, timeout: Duration) -> std::thread::Result<()>;
23}
24
25impl<T: std::marker::Send + 'static> JoinTimeout for JoinHandle<T> {
26    fn join_timeout(self, timeout: Duration) -> std::thread::Result<()> {
27        use std::sync::mpsc;
28
29        // Create a channel for signaling completion
30        let (tx, rx) = mpsc::channel();
31
32        // Spawn a watchdog thread that joins the target thread
33        spawn(move || {
34            let result = self.join();
35            // Send result (ignore send errors - receiver may have timed out)
36            let _ = tx.send(result);
37        });
38
39        // Block with timeout - no busy waiting!
40        match rx.recv_timeout(timeout) {
41            Ok(join_result) => join_result.map(|_| ()), // Thread finished
42            Err(mpsc::RecvTimeoutError::Timeout) => {
43                // Timeout: watchdog thread continues running
44                // This is acceptable - OS will clean up on process exit
45                Err(std::boxed::Box::new(std::io::Error::new(
46                    std::io::ErrorKind::TimedOut,
47                    "Thread join timeout",
48                )))
49            },
50            Err(mpsc::RecvTimeoutError::Disconnected) => {
51                // Channel disconnected unexpectedly - thread panicked
52                Err(std::boxed::Box::new(std::io::Error::new(
53                    std::io::ErrorKind::ConnectionReset,
54                    "Thread panicked during join",
55                )))
56            },
57        }
58    }
59}
60
61/// Piper 机械臂驱动(对外 API)
62///
63/// 支持单线程和双线程两种模式
64/// - 单线程模式:使用 `io_thread`(向后兼容)
65/// - 双线程模式:使用 `rx_thread` 和 `tx_thread`(物理隔离)
66pub struct Piper {
67    /// 命令发送通道(向 IO 线程发送控制帧,单线程模式)
68    ///
69    /// 需要在 Drop 时 **提前关闭通道**(在 join IO 线程之前),
70    /// 否则 `io_loop` 可能永远收不到 `Disconnected` 而导致退出卡住。
71    cmd_tx: ManuallyDrop<Sender<PiperFrame>>,
72    /// 实时命令插槽(双线程模式,邮箱模式,Overwrite)
73    realtime_slot: Option<Arc<std::sync::Mutex<Option<RealtimeCommand>>>>,
74    /// 可靠命令队列发送端(双线程模式,容量 10,FIFO)
75    reliable_tx: Option<Sender<PiperFrame>>,
76    /// 共享状态上下文
77    ctx: Arc<PiperContext>,
78    /// IO 线程句柄(单线程模式,Drop 时 join)
79    io_thread: Option<JoinHandle<()>>,
80    /// RX 线程句柄(双线程模式)
81    rx_thread: Option<JoinHandle<()>>,
82    /// TX 线程句柄(双线程模式)
83    tx_thread: Option<JoinHandle<()>>,
84    /// 运行标志(用于线程生命周期联动)
85    is_running: Arc<AtomicBool>,
86    /// 性能指标(原子计数器)
87    metrics: Arc<PiperMetrics>,
88    /// CAN 接口名称(用于录制元数据)
89    interface: String,
90    /// CAN 总线速度(bps)(用于录制元数据)
91    bus_speed: u32,
92    /// Driver 工作模式(用于回放模式控制)
93    driver_mode: crate::mode::AtomicDriverMode,
94}
95
96impl Piper {
97    /// 最大允许的实时帧包大小
98    ///
99    /// 允许调用者在客户端进行预检查,避免跨层调用后的运行时错误。
100    ///
101    /// # 示例
102    ///
103    /// ```rust,no_run
104    /// # use piper_driver::Piper;
105    /// # use piper_can::PiperFrame;
106    /// # fn example(piper: &Piper) -> std::result::Result<(), Box<dyn std::error::Error>> {
107    /// let frame1 = PiperFrame::new_standard(0x100, &[]);
108    /// let frame2 = PiperFrame::new_standard(0x101, &[]);
109    /// let frame3 = PiperFrame::new_standard(0x102, &[]);
110    /// let frames = [frame1, frame2, frame3];
111    /// if frames.len() > Piper::MAX_REALTIME_PACKAGE_SIZE {
112    ///     return Err("Package too large".into());
113    /// }
114    /// piper.send_realtime_package(frames)?;
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub const MAX_REALTIME_PACKAGE_SIZE: usize = 10;
119
120    /// 设置元数据(内部方法,由 Builder 调用)
121    pub(crate) fn with_metadata(mut self, interface: String, bus_speed: u32) -> Self {
122        self.interface = interface;
123        self.bus_speed = bus_speed;
124        self
125    }
126
127    /// 创建新的 Piper 实例
128    ///
129    /// # 参数
130    /// - `can`: CAN 适配器(会被移动到 IO 线程)
131    /// - `config`: Pipeline 配置(可选)
132    ///
133    /// # 错误
134    /// - `CanError`: CAN 设备初始化失败(注意:这里返回 CanError,因为 DriverError 尚未完全实现 `From<CanError>`)
135    pub fn new(
136        can: impl CanAdapter + Send + 'static,
137        config: Option<PipelineConfig>,
138    ) -> Result<Self, CanError> {
139        // 创建命令通道(有界队列,容量 10)
140        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(10);
141
142        // 创建共享状态上下文
143        let ctx = Arc::new(PiperContext::new());
144
145        // 克隆上下文用于 IO 线程
146        let ctx_clone = ctx.clone();
147
148        // 启动 IO 线程
149        let io_thread = spawn(move || {
150            io_loop(can, cmd_rx, ctx_clone, config.unwrap_or_default());
151        });
152
153        Ok(Self {
154            cmd_tx: ManuallyDrop::new(cmd_tx),
155            realtime_slot: None, // 单线程模式
156            reliable_tx: None,   // 单线程模式
157            ctx,
158            io_thread: Some(io_thread),
159            rx_thread: None,                             // 单线程模式
160            tx_thread: None,                             // 单线程模式
161            is_running: Arc::new(AtomicBool::new(true)), // 默认运行中
162            metrics: Arc::new(PiperMetrics::new()),      // 初始化指标
163            interface: "unknown".to_string(),            // 未通过 builder 构建
164            bus_speed: 1_000_000,                        // 默认 1Mbps
165            driver_mode: crate::mode::AtomicDriverMode::new(crate::mode::DriverMode::Normal),
166        })
167    }
168
169    /// 创建双线程模式的 Piper 实例
170    ///
171    /// 将 CAN 适配器分离为独立的 RX 和 TX 适配器,实现物理隔离。
172    /// RX 线程专门负责接收反馈帧,TX 线程专门负责发送控制命令。
173    ///
174    /// # 参数
175    /// - `can`: 可分离的 CAN 适配器(必须已启动)
176    /// - `config`: Pipeline 配置(可选)
177    ///
178    /// # 错误
179    /// - `CanError::NotStarted`: 适配器未启动
180    /// - `CanError::Device`: 分离适配器失败
181    ///
182    /// # 使用场景
183    /// - 实时控制:需要 RX 不受 TX 阻塞影响
184    /// - 高频控制:500Hz-1kHz 控制循环
185    ///
186    /// # 注意
187    /// - 适配器必须已启动(调用 `configure()` 或 `start()`)
188    /// - 分离后,原适配器不再可用(消费 `can`)
189    pub fn new_dual_thread<C>(can: C, config: Option<PipelineConfig>) -> Result<Self, CanError>
190    where
191        C: SplittableAdapter + Send + 'static,
192        C::RxAdapter: Send + 'static,
193        C::TxAdapter: Send + 'static,
194    {
195        // 分离适配器
196        let (rx_adapter, tx_adapter) = can.split()?;
197
198        // 创建命令通道(邮箱模式 + 可靠队列容量 10)
199        let realtime_slot = Arc::new(std::sync::Mutex::new(None::<RealtimeCommand>));
200        let (reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
201
202        // 创建共享状态上下文
203        let ctx = Arc::new(PiperContext::new());
204
205        // 创建运行标志和指标
206        let is_running = Arc::new(AtomicBool::new(true));
207        let metrics = Arc::new(PiperMetrics::new());
208
209        // 克隆用于线程
210        let ctx_clone = ctx.clone();
211        let is_running_clone = is_running.clone();
212        let metrics_clone = metrics.clone();
213        let config_clone = config.clone().unwrap_or_default();
214
215        // 启动 RX 线程
216        let rx_thread = spawn(move || {
217            crate::pipeline::rx_loop(
218                rx_adapter,
219                ctx_clone,
220                config_clone,
221                is_running_clone,
222                metrics_clone,
223            );
224        });
225
226        // 克隆用于 TX 线程
227        let ctx_tx = ctx.clone();
228        let is_running_tx = is_running.clone();
229        let metrics_tx = metrics.clone();
230        let realtime_slot_tx = realtime_slot.clone();
231
232        // 启动 TX 线程(邮箱模式)
233        let tx_thread = spawn(move || {
234            crate::pipeline::tx_loop_mailbox(
235                tx_adapter,
236                realtime_slot_tx,
237                reliable_rx,
238                is_running_tx,
239                metrics_tx,
240                ctx_tx, // 🆕 v1.2.1: 传入 ctx 用于触发 TX 回调
241            );
242        });
243
244        // 给 RX 线程一些启动时间,确保它已经开始接收数据
245        // 这对于 wait_for_feedback 很重要,因为如果 RX 线程还没启动,就无法收到反馈
246        std::thread::sleep(std::time::Duration::from_millis(10));
247
248        Ok(Self {
249            cmd_tx: ManuallyDrop::new(reliable_tx.clone()), // 向后兼容:单线程模式使用
250            realtime_slot: Some(realtime_slot),             // 实时命令邮箱
251            reliable_tx: Some(reliable_tx),                 // 可靠队列
252            ctx,
253            io_thread: None, // 双线程模式不使用 io_thread
254            rx_thread: Some(rx_thread),
255            tx_thread: Some(tx_thread),
256            is_running,
257            metrics,
258            interface: "unknown".to_string(), // 未通过 builder 构建
259            bus_speed: 1_000_000,             // 默认 1Mbps
260            driver_mode: crate::mode::AtomicDriverMode::new(crate::mode::DriverMode::Normal),
261        })
262    }
263
264    /// 检查线程健康状态
265    ///
266    /// 返回 RX 和 TX 线程的存活状态。
267    ///
268    /// # 返回
269    /// - `(rx_alive, tx_alive)`: 两个布尔值,表示线程是否还在运行
270    pub fn check_health(&self) -> (bool, bool) {
271        let rx_alive = self.rx_thread.as_ref().map(|h| !h.is_finished()).unwrap_or(true); // 单线程模式下,认为健康
272
273        let tx_alive = self.tx_thread.as_ref().map(|h| !h.is_finished()).unwrap_or(true); // 单线程模式下,认为健康
274
275        (rx_alive, tx_alive)
276    }
277
278    /// 检查是否健康
279    ///
280    /// 如果所有线程都存活,返回 `true`。
281    pub fn is_healthy(&self) -> bool {
282        let (rx_alive, tx_alive) = self.check_health();
283        rx_alive && tx_alive
284    }
285
286    /// 获取性能指标快照
287    ///
288    /// 返回当前所有计数器的快照,用于监控 IO 链路健康状态。
289    pub fn get_metrics(&self) -> MetricsSnapshot {
290        self.metrics.snapshot()
291    }
292
293    /// 获取关节动态状态(无锁,纳秒级返回)
294    ///
295    /// 包含关节速度和电流(独立帧 + Buffered Commit)。
296    ///
297    /// # 性能
298    /// - 无锁读取(ArcSwap::load)
299    /// - 返回快照副本(Clone 开销低,< 150 字节)
300    /// - 适合 500Hz 控制循环
301    pub fn get_joint_dynamic(&self) -> JointDynamicState {
302        self.ctx.joint_dynamic.load().as_ref().clone()
303    }
304
305    /// 获取关节位置状态(无锁,纳秒级返回)
306    ///
307    /// 包含6个关节的位置信息(500Hz更新)。
308    ///
309    /// # 性能
310    /// - 无锁读取(ArcSwap::load)
311    /// - 返回快照副本(Clone 开销低)
312    /// - 适合 500Hz 控制循环
313    ///
314    /// # 注意
315    /// - 此状态与 `EndPoseState` 不是原子更新的,如需同时获取,请使用 `capture_motion_snapshot()`
316    pub fn get_joint_position(&self) -> JointPositionState {
317        self.ctx.joint_position.load().as_ref().clone()
318    }
319
320    /// 获取末端位姿状态(无锁,纳秒级返回)
321    ///
322    /// 包含末端执行器的位置和姿态信息(500Hz更新)。
323    ///
324    /// # 性能
325    /// - 无锁读取(ArcSwap::load)
326    /// - 返回快照副本(Clone 开销低)
327    /// - 适合 500Hz 控制循环
328    ///
329    /// # 注意
330    /// - 此状态与 `JointPositionState` 不是原子更新的,如需同时获取,请使用 `capture_motion_snapshot()`
331    pub fn get_end_pose(&self) -> EndPoseState {
332        self.ctx.end_pose.load().as_ref().clone()
333    }
334
335    /// 获取运动快照(无锁,纳秒级返回)
336    ///
337    /// 原子性地获取 `JointPositionState` 和 `EndPoseState` 的最新快照。
338    /// 虽然这两个状态在硬件上不是同时更新的,但此方法保证逻辑上的原子性。
339    ///
340    /// # 性能
341    /// - 无锁读取(两次 ArcSwap::load)
342    /// - 返回快照副本
343    /// - 适合需要同时使用关节位置和末端位姿的场景
344    ///
345    /// # 示例
346    ///
347    /// ```
348    /// # use piper_driver::Piper;
349    /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
350    /// # // let piper = Piper::new(/* ... */).unwrap();
351    /// # // let snapshot = piper.capture_motion_snapshot();
352    /// # // println!("Joint positions: {:?}", snapshot.joint_position.joint_pos);
353    /// # // println!("End pose: {:?}", snapshot.end_pose.end_pose);
354    /// ```
355    pub fn capture_motion_snapshot(&self) -> MotionSnapshot {
356        self.ctx.capture_motion_snapshot()
357    }
358
359    /// 获取机器人控制状态(无锁)
360    ///
361    /// 包含控制模式、机器人状态、故障码等(100Hz更新)。
362    ///
363    /// # 性能
364    /// - 无锁读取(ArcSwap::load)
365    /// - 返回快照副本
366    pub fn get_robot_control(&self) -> RobotControlState {
367        self.ctx.robot_control.load().as_ref().clone()
368    }
369
370    /// 获取夹爪状态(无锁)
371    ///
372    /// 包含夹爪行程、扭矩、状态码等(100Hz更新)。
373    ///
374    /// # 性能
375    /// - 无锁读取(ArcSwap::load)
376    /// - 返回快照副本
377    pub fn get_gripper(&self) -> GripperState {
378        self.ctx.gripper.load().as_ref().clone()
379    }
380
381    /// 获取关节驱动器低速反馈状态(无锁)
382    ///
383    /// 包含温度、电压、电流、驱动器状态等(40Hz更新)。
384    ///
385    /// # 性能
386    /// - 无锁读取(ArcSwap::load,Wait-Free)
387    /// - 返回快照副本
388    pub fn get_joint_driver_low_speed(&self) -> JointDriverLowSpeedState {
389        self.ctx.joint_driver_low_speed.load().as_ref().clone()
390    }
391
392    /// 获取固件版本字符串
393    ///
394    /// 从累积的固件数据中解析版本字符串。
395    /// 如果固件数据未完整或未找到版本字符串,返回 `None`。
396    ///
397    /// # 性能
398    /// - 需要获取 RwLock 读锁
399    /// - 如果已解析,直接返回缓存的版本字符串
400    /// - 如果未解析,尝试从累积数据中解析
401    pub fn get_firmware_version(&self) -> Option<String> {
402        if let Ok(mut firmware_state) = self.ctx.firmware_version.write() {
403            // 如果已经解析过,直接返回
404            if let Some(version) = firmware_state.version_string() {
405                return Some(version.clone());
406            }
407            // 否则尝试解析
408            firmware_state.parse_version()
409        } else {
410            None
411        }
412    }
413
414    /// 查询固件版本
415    ///
416    /// 发送固件版本查询指令到机械臂,并清空之前的固件数据缓存。
417    /// 查询和反馈使用相同的 CAN ID (0x4AF)。
418    ///
419    /// **注意**:
420    /// - 发送查询命令后会自动清空固件数据缓存(与 Python SDK 一致)
421    /// - 需要等待一段时间(推荐 30-50ms)让机械臂返回反馈数据
422    /// - 之后可以调用 `get_firmware_version()` 获取解析后的版本字符串
423    ///
424    /// # 错误
425    /// - `DriverError::ChannelFull`: 命令通道已满(单线程模式)
426    /// - `DriverError::ChannelClosed`: 命令通道已关闭
427    /// - `DriverError::NotDualThread`: 双线程模式下使用错误的方法
428    ///
429    /// # 示例
430    ///
431    /// ```no_run
432    /// # use piper_driver::Piper;
433    /// # use piper_protocol::FirmwareVersionQueryCommand;
434    /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
435    /// # // let piper = Piper::new(/* ... */).unwrap();
436    /// # // 发送查询命令
437    /// # // piper.query_firmware_version().unwrap();
438    /// # // 等待反馈数据累积
439    /// # // std::thread::sleep(std::time::Duration::from_millis(50));
440    /// # // 获取版本字符串
441    /// # // if let Some(version) = piper.get_firmware_version() {
442    /// # //     println!("Firmware version: {}", version);
443    /// # // }
444    /// ```
445    pub fn query_firmware_version(&self) -> Result<(), DriverError> {
446        use piper_protocol::FirmwareVersionQueryCommand;
447
448        // 创建查询命令
449        let cmd = FirmwareVersionQueryCommand::new();
450        let frame = cmd.to_frame();
451
452        // 发送命令(使用可靠命令模式,确保命令被发送)
453        // 注意:固件版本查询不是高频实时命令,使用可靠命令模式更合适
454        if let Some(reliable_tx) = &self.reliable_tx {
455            // 双线程模式:使用可靠命令队列
456            reliable_tx.try_send(frame).map_err(|e| match e {
457                crossbeam_channel::TrySendError::Full(_) => DriverError::ChannelFull,
458                crossbeam_channel::TrySendError::Disconnected(_) => DriverError::ChannelClosed,
459            })?;
460        } else {
461            // 单线程模式:使用普通命令通道
462            self.send_frame(frame)?;
463        }
464
465        // 清空固件数据缓存
466        if let Ok(mut firmware_state) = self.ctx.firmware_version.write() {
467            firmware_state.clear();
468        }
469
470        Ok(())
471    }
472
473    /// 获取主从模式控制模式指令状态(无锁)
474    ///
475    /// 包含控制模式、运动模式、速度等(主从模式下,~200Hz更新)。
476    ///
477    /// # 性能
478    /// - 无锁读取(ArcSwap::load)
479    /// - 返回快照副本
480    pub fn get_master_slave_control_mode(&self) -> MasterSlaveControlModeState {
481        self.ctx.master_slave_control_mode.load().as_ref().clone()
482    }
483
484    /// 获取主从模式关节控制指令状态(无锁)
485    ///
486    /// 包含6个关节的目标角度(主从模式下,~500Hz更新)。
487    ///
488    /// # 性能
489    /// - 无锁读取(ArcSwap::load)
490    /// - 返回快照副本
491    /// - 帧组同步,保证6个关节数据的逻辑一致性
492    pub fn get_master_slave_joint_control(&self) -> MasterSlaveJointControlState {
493        self.ctx.master_slave_joint_control.load().as_ref().clone()
494    }
495
496    /// 获取主从模式夹爪控制指令状态(无锁)
497    ///
498    /// 包含夹爪目标行程、扭矩等(主从模式下,~200Hz更新)。
499    ///
500    /// # 性能
501    /// - 无锁读取(ArcSwap::load)
502    /// - 返回快照副本
503    pub fn get_master_slave_gripper_control(&self) -> MasterSlaveGripperControlState {
504        self.ctx.master_slave_gripper_control.load().as_ref().clone()
505    }
506
507    /// 获取碰撞保护状态(读锁)
508    ///
509    /// 包含各关节的碰撞保护等级(按需查询)。
510    ///
511    /// # 性能
512    /// - 读锁(RwLock::read)
513    /// - 返回快照副本
514    pub fn get_collision_protection(&self) -> Result<CollisionProtectionState, DriverError> {
515        self.ctx
516            .collision_protection
517            .read()
518            .map(|guard| guard.clone())
519            .map_err(|_| DriverError::PoisonedLock)
520    }
521
522    /// 获取关节限制配置状态(读锁)
523    ///
524    /// 包含关节角度限制和速度限制(按需查询)。
525    ///
526    /// # 性能
527    /// - 读锁(RwLock::read)
528    /// - 返回快照副本
529    pub fn get_joint_limit_config(&self) -> Result<JointLimitConfigState, DriverError> {
530        self.ctx
531            .joint_limit_config
532            .read()
533            .map(|guard| guard.clone())
534            .map_err(|_| DriverError::PoisonedLock)
535    }
536
537    /// 获取关节加速度限制配置状态(读锁)
538    ///
539    /// 包含关节加速度限制(按需查询)。
540    ///
541    /// # 性能
542    /// - 读锁(RwLock::read)
543    /// - 返回快照副本
544    pub fn get_joint_accel_config(&self) -> Result<JointAccelConfigState, DriverError> {
545        self.ctx
546            .joint_accel_config
547            .read()
548            .map(|guard| guard.clone())
549            .map_err(|_| DriverError::PoisonedLock)
550    }
551
552    /// 获取末端限制配置状态(读锁)
553    ///
554    /// 包含末端执行器的速度和加速度限制(按需查询)。
555    ///
556    /// # 性能
557    /// - 读锁(RwLock::read)
558    /// - 返回快照副本
559    pub fn get_end_limit_config(&self) -> Result<EndLimitConfigState, DriverError> {
560        self.ctx
561            .end_limit_config
562            .read()
563            .map(|guard| guard.clone())
564            .map_err(|_| DriverError::PoisonedLock)
565    }
566
567    /// 获取组合运动状态(所有热数据)
568    ///
569    /// 注意:不同子状态的时间戳可能不同步(差异通常在毫秒级)。
570    /// 如果需要时间对齐的状态,请使用 `get_aligned_motion()`。
571    pub fn get_motion_state(&self) -> CombinedMotionState {
572        let snapshot = self.capture_motion_snapshot();
573        CombinedMotionState {
574            joint_position: snapshot.joint_position,
575            end_pose: snapshot.end_pose,
576            joint_dynamic: self.get_joint_dynamic(),
577        }
578    }
579
580    /// 获取时间对齐的运动状态(推荐用于力控算法)
581    ///
582    /// 以 `joint_position.hardware_timestamp_us` 为基准时间,检查时间戳差异。
583    /// 即使时间戳差异超过阈值,也返回状态数据(让用户有选择权)。
584    ///
585    /// # 参数
586    /// - `max_time_diff_us`: 允许的最大时间戳差异(微秒),推荐值:5000(5ms)
587    ///
588    /// # 返回值
589    /// - `AlignmentResult::Ok(state)`: 时间戳差异在可接受范围内
590    /// - `AlignmentResult::Misaligned { state, diff_us }`: 时间戳差异过大,但仍返回状态数据
591    pub fn get_aligned_motion(&self, max_time_diff_us: u64) -> AlignmentResult {
592        let snapshot = self.capture_motion_snapshot();
593        let joint_dynamic = self.get_joint_dynamic();
594
595        let time_diff = snapshot
596            .joint_position
597            .hardware_timestamp_us
598            .abs_diff(joint_dynamic.group_timestamp_us);
599
600        let state = AlignedMotionState {
601            joint_pos: snapshot.joint_position.joint_pos,
602            joint_vel: joint_dynamic.joint_vel,
603            joint_current: joint_dynamic.joint_current,
604            end_pose: snapshot.end_pose.end_pose,
605            timestamp: snapshot.joint_position.hardware_timestamp_us, // 使用位置数据的时间戳作为基准
606            time_diff_us: (joint_dynamic.group_timestamp_us as i64)
607                - (snapshot.joint_position.hardware_timestamp_us as i64),
608        };
609
610        if time_diff > max_time_diff_us {
611            AlignmentResult::Misaligned {
612                state,
613                diff_us: time_diff,
614            }
615        } else {
616            AlignmentResult::Ok(state)
617        }
618    }
619
620    /// 等待接收到第一个有效反馈(用于初始化)
621    ///
622    /// 在 `Piper::new()` 后调用,确保在控制循环开始前已收到有效数据。
623    /// 避免使用全零的初始状态导致错误的控制指令。
624    ///
625    /// # 参数
626    /// - `timeout`: 超时时间
627    ///
628    /// # 返回值
629    /// - `Ok(())`: 成功接收到有效反馈(`timestamp_us > 0`)
630    /// - `Err(DriverError::Timeout)`: 超时未收到反馈
631    pub fn wait_for_feedback(&self, timeout: std::time::Duration) -> Result<(), DriverError> {
632        let start = std::time::Instant::now();
633
634        loop {
635            // 检查是否超时
636            if start.elapsed() >= timeout {
637                return Err(DriverError::Timeout);
638            }
639
640            // 检查是否收到有效反馈(任意状态的时间戳 > 0 即可)
641            let joint_pos = self.get_joint_position();
642            if joint_pos.hardware_timestamp_us > 0 {
643                return Ok(());
644            }
645
646            // 短暂休眠,避免 CPU 空转
647            std::thread::sleep(std::time::Duration::from_millis(1));
648        }
649    }
650
651    /// 获取 FPS 统计结果
652    ///
653    /// 返回最近一次统计窗口内的更新频率(FPS)。
654    /// 建议定期调用(如每秒一次)或按需调用。
655    ///
656    /// # 性能
657    /// - 无锁读取(仅原子读取)
658    /// - 开销:~100ns(5 次原子读取 + 浮点计算)
659    ///
660    /// # Example
661    ///
662    /// ```
663    /// # use piper_driver::Piper;
664    /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
665    /// # // let piper = Piper::new(/* ... */).unwrap();
666    /// # // 运行一段时间后查询 FPS
667    /// # // std::thread::sleep(std::time::Duration::from_secs(5));
668    /// # // let fps = piper.get_fps();
669    /// # // println!("Joint Position FPS: {:.2}", fps.joint_position);
670    /// # // println!("End Pose FPS: {:.2}", fps.end_pose);
671    /// # // println!("Joint Dynamic FPS: {:.2}", fps.joint_dynamic);
672    /// ```
673    pub fn get_fps(&self) -> FpsResult {
674        self.ctx.fps_stats.load().calculate_fps()
675    }
676
677    /// 获取 FPS 计数器原始值
678    ///
679    /// 返回当前计数器的原始值,可以配合自定义时间窗口计算 FPS。
680    ///
681    /// # 性能
682    /// - 无锁读取(仅原子读取)
683    /// - 开销:~50ns(5 次原子读取)
684    ///
685    /// # Example
686    ///
687    /// ```
688    /// # use piper_driver::Piper;
689    /// # // 注意:此示例需要实际的 CAN 适配器,仅供参考
690    /// # // let piper = Piper::new(/* ... */).unwrap();
691    /// # // 记录开始时间和计数
692    /// # // let start = std::time::Instant::now();
693    /// # // let counts_start = piper.get_fps_counts();
694    /// # // 运行一段时间
695    /// # // std::thread::sleep(std::time::Duration::from_secs(1));
696    /// # // 计算实际 FPS
697    /// # // let counts_end = piper.get_fps_counts();
698    /// # // let elapsed = start.elapsed();
699    /// # // let actual_fps = (counts_end.joint_position - counts_start.joint_position) as f64 / elapsed.as_secs_f64();
700    /// ```
701    pub fn get_fps_counts(&self) -> FpsCounts {
702        self.ctx.fps_stats.load().get_counts()
703    }
704
705    /// 重置 FPS 统计窗口(清空计数器并重新开始计时)
706    ///
707    /// 这是一个轻量级、无锁的重置:通过 `ArcSwap` 将内部 `FpsStatistics` 原子替换为新实例。
708    /// 适合在监控工具中做固定窗口统计(例如每 5 秒 reset 一次)。
709    pub fn reset_fps_stats(&self) {
710        self.ctx.fps_stats.store(Arc::new(crate::fps_stats::FpsStatistics::new()));
711    }
712
713    // ============================================================
714    // 连接监控 API
715    // ============================================================
716
717    /// 检查机器人是否仍在响应
718    ///
719    /// 如果在超时窗口内收到反馈,返回 `true`。
720    /// 这可用于检测机器人是否断电、CAN 线缆断开或固件崩溃。
721    ///
722    /// # 性能
723    /// - 无锁读取(AtomicU64::load)
724    /// - O(1) 时间复杂度
725    pub fn is_connected(&self) -> bool {
726        self.ctx.connection_monitor.check_connection()
727    }
728
729    /// 获取自上次反馈以来的时间
730    ///
731    /// 返回自上次成功处理 CAN 帧以来的时间。
732    /// 可用于连接质量监控或诊断。
733    pub fn connection_age(&self) -> std::time::Duration {
734        self.ctx.connection_monitor.time_since_last_feedback()
735    }
736
737    /// 发送控制帧(非阻塞)
738    ///
739    /// # 参数
740    /// - `frame`: 控制帧(已构建的 `PiperFrame`)
741    ///
742    /// # 错误
743    /// - `DriverError::ChannelClosed`: 命令通道已关闭(IO 线程退出)
744    /// - `DriverError::ChannelFull`: 命令队列已满(缓冲区容量 10)
745    pub fn send_frame(&self, frame: PiperFrame) -> Result<(), DriverError> {
746        self.cmd_tx.try_send(frame).map_err(|e| match e {
747            crossbeam_channel::TrySendError::Full(_) => DriverError::ChannelFull,
748            crossbeam_channel::TrySendError::Disconnected(_) => DriverError::ChannelClosed,
749        })
750    }
751
752    /// 获取钩子管理器的引用(用于高级诊断)
753    ///
754    /// # 设计理念
755    ///
756    /// 这是一个**逃生舱(Escape Hatch)**,用于高级诊断场景:
757    /// - 注册自定义 CAN 帧回调
758    /// - 实现录制功能
759    /// - 性能分析和调试
760    ///
761    /// # 使用场景
762    ///
763    /// - 自定义诊断工具
764    /// - 高级抓包和调试
765    /// - 性能分析和优化
766    /// - 后台监控线程
767    ///
768    /// # 示例
769    ///
770    /// ```rust,no_run
771    /// # use piper_driver::Piper;
772    /// # use piper_driver::hooks::FrameCallback;
773    /// # use piper_driver::recording::AsyncRecordingHook;
774    /// # use std::sync::Arc;
775    /// # fn example(robot: &Piper) {
776    /// // 获取 hooks 访问
777    /// let hooks = robot.hooks();
778    ///
779    /// // 创建录制钩子
780    /// let (hook, _rx) = AsyncRecordingHook::new();
781    /// let callback = Arc::new(hook) as Arc<dyn FrameCallback>;
782    ///
783    /// // 注册回调(忽略错误以简化示例)
784    /// if let Ok(mut hooks_guard) = hooks.write() {
785    ///     hooks_guard.add_callback(callback);
786    /// }
787    /// # }
788    /// ```
789    ///
790    /// # 安全注意事项
791    ///
792    /// - **性能要求**:回调必须在 <1μs 内完成
793    /// - **线程安全**:返回 `Arc<RwLock<HookManager>>`,需手动加锁
794    /// - **不要阻塞**:禁止在回调中使用 Mutex、I/O、分配等阻塞操作
795    ///
796    /// # 返回值
797    ///
798    /// `Arc<RwLock<HookManager>>`: 钩子管理器的共享引用
799    ///
800    /// # 参考
801    ///
802    /// - [`HookManager`](crate::hooks::HookManager) - 钩子管理器
803    /// - [`FrameCallback`](crate::hooks::FrameCallback) - 回调 trait
804    /// - [架构分析报告](../../../docs/architecture/piper-driver-client-mixing-analysis.md) - 方案 B 设计
805    pub fn hooks(&self) -> Arc<std::sync::RwLock<crate::hooks::HookManager>> {
806        Arc::clone(&self.ctx.hooks)
807    }
808
809    /// 获取 CAN 接口名称
810    ///
811    /// # 返回值
812    ///
813    /// CAN 接口名称,例如 "can0", "vcan0" 等
814    pub fn interface(&self) -> String {
815        self.interface.clone()
816    }
817
818    /// 获取 CAN 总线速度
819    ///
820    /// # 返回值
821    ///
822    /// CAN 总线速度(bps),例如 1000000 (1Mbps)
823    pub fn bus_speed(&self) -> u32 {
824        self.bus_speed
825    }
826
827    /// 获取当前 Driver 模式
828    ///
829    /// # 返回值
830    ///
831    /// 当前 Driver 模式(Normal 或 Replay)
832    ///
833    /// # 示例
834    ///
835    /// ```rust,no_run
836    /// # use piper_driver::Piper;
837    /// # fn example(robot: &Piper) {
838    /// let mode = robot.mode();
839    /// println!("Current mode: {:?}", mode);
840    /// # }
841    /// ```
842    pub fn mode(&self) -> crate::mode::DriverMode {
843        self.driver_mode.get(std::sync::atomic::Ordering::Relaxed)
844    }
845
846    /// 设置 Driver 模式
847    ///
848    /// # 参数
849    ///
850    /// - `mode`: 新的 Driver 模式
851    ///
852    /// # 模式说明
853    ///
854    /// - **Normal**: 正常模式,TX 线程按周期发送控制指令
855    /// - **Replay**: 回放模式,TX 线程暂停周期性发送
856    ///
857    /// # 使用场景
858    ///
859    /// Replay 模式用于安全地回放预先录制的 CAN 帧:
860    /// - 暂停 TX 线程的周期性发送
861    /// - 避免双控制流冲突
862    /// - 允许精确控制帧发送时机
863    ///
864    /// # ⚠️ 安全警告
865    ///
866    /// - 切换到 Replay 模式前,应确保机器人处于 Standby 状态
867    /// - 在 Replay 模式下发送控制指令时,应遵守安全速度限制
868    ///
869    /// # 示例
870    ///
871    /// ```rust,no_run
872    /// # use piper_driver::{Piper, mode::DriverMode};
873    /// # fn example(robot: &Piper) {
874    /// // 切换到回放模式
875    /// robot.set_mode(DriverMode::Replay);
876    ///
877    /// // ... 执行回放 ...
878    ///
879    /// // 恢复正常模式
880    /// robot.set_mode(DriverMode::Normal);
881    /// # }
882    /// ```
883    pub fn set_mode(&self, mode: crate::mode::DriverMode) {
884        self.driver_mode.set(mode, std::sync::atomic::Ordering::Relaxed);
885        tracing::info!("Driver mode set to: {:?}", mode);
886    }
887
888    /// 发送控制帧(阻塞,带超时)
889    ///
890    /// 如果命令通道已满,阻塞等待直到有空闲位置或超时。
891    ///
892    /// # 参数
893    /// - `frame`: 控制帧(已构建的 `PiperFrame`)
894    /// - `timeout`: 超时时间
895    ///
896    /// # 错误
897    /// - `DriverError::ChannelClosed`: 命令通道已关闭(IO 线程退出)
898    /// - `DriverError::Timeout`: 超时未发送成功
899    pub fn send_frame_blocking(
900        &self,
901        frame: PiperFrame,
902        timeout: std::time::Duration,
903    ) -> Result<(), DriverError> {
904        self.cmd_tx.send_timeout(frame, timeout).map_err(|e| match e {
905            crossbeam_channel::SendTimeoutError::Timeout(_) => DriverError::Timeout,
906            crossbeam_channel::SendTimeoutError::Disconnected(_) => DriverError::ChannelClosed,
907        })
908    }
909
910    /// 发送实时控制命令(邮箱模式,覆盖策略)
911    ///
912    /// 实时命令使用邮箱模式(Mailbox),直接覆盖旧命令,确保最新命令被发送。
913    /// 这对于力控/高频控制场景很重要,只保留最新的控制指令。
914    ///
915    /// # 参数
916    /// - `frame`: 控制帧(已构建的 `PiperFrame`)
917    ///
918    /// # 错误
919    /// - `DriverError::NotDualThread`: 未使用双线程模式
920    /// - `DriverError::PoisonedLock`: 锁中毒(极少见,通常意味着 TX 线程 panic)
921    ///
922    /// # 实现细节
923    /// - 获取 Mutex 锁并直接覆盖插槽内容(Last Write Wins)
924    /// - 锁持有时间极短(< 50ns),仅为内存拷贝
925    /// - 永不阻塞:无论 TX 线程是否消费,都能立即写入
926    /// - 如果插槽已有数据,会被覆盖(更新 `metrics.tx_realtime_overwrites`)
927    ///
928    /// # 性能
929    /// - 典型延迟:20-50ns(无竞争情况下)
930    /// - 最坏延迟:200ns(与 TX 线程锁竞争时)
931    /// - 相比 Channel 重试策略,延迟降低 10-100 倍
932    ///
933    /// 发送单个实时帧(向后兼容,API 不变)
934    pub fn send_realtime(&self, frame: PiperFrame) -> Result<(), DriverError> {
935        self.send_realtime_command(RealtimeCommand::single(frame))
936    }
937
938    /// 发送实时帧包(新 API)
939    ///
940    /// # 参数
941    /// - `frames`: 要发送的帧迭代器,必须非空
942    ///
943    /// **接口优化**:接受 `impl IntoIterator`,允许用户传入:
944    /// - 数组:`[frame1, frame2, frame3]`(栈上,零堆分配)
945    /// - 切片:`&[frame1, frame2, frame3]`
946    /// - Vec:`vec![frame1, frame2, frame3]`
947    ///
948    /// # 错误
949    /// - `DriverError::NotDualThread`: 未使用双线程模式
950    /// - `DriverError::InvalidInput`: 帧列表为空或过大
951    /// - `DriverError::PoisonedLock`: 锁中毒
952    ///
953    /// # 原子性保证
954    /// Package 内的所有帧要么全部发送成功,要么都不发送。
955    /// 如果发送过程中出现错误,已发送的帧不会被回滚(CAN 总线特性),
956    /// 但未发送的帧不会继续发送。
957    ///
958    /// # 性能特性
959    /// - 如果帧数量 ≤ 4,完全在栈上分配,零堆内存分配
960    /// - 如果帧数量 > 4,SmallVec 会自动溢出到堆,但仍保持高效
961    pub fn send_realtime_package(
962        &self,
963        frames: impl IntoIterator<Item = PiperFrame>,
964    ) -> Result<(), DriverError> {
965        use crate::command::FrameBuffer;
966
967        let buffer: FrameBuffer = frames.into_iter().collect();
968
969        if buffer.is_empty() {
970            return Err(DriverError::InvalidInput(
971                "Frame package cannot be empty".to_string(),
972            ));
973        }
974
975        // 限制包大小,防止内存问题
976        // 使用 Piper 的关联常量,允许客户端预检查
977        //
978        // 注意:如果用户传入超大 Vec(如长度 1000),这里会先进行 collect 操作,
979        // 可能导致堆分配。虽然之后会检查并报错,但内存开销已经发生。
980        // 这是可以接受的权衡(安全网),但建议用户在调用前进行预检查。
981        if buffer.len() > Self::MAX_REALTIME_PACKAGE_SIZE {
982            return Err(DriverError::InvalidInput(format!(
983                "Frame package too large: {} (max: {})",
984                buffer.len(),
985                Self::MAX_REALTIME_PACKAGE_SIZE
986            )));
987        }
988
989        self.send_realtime_command(RealtimeCommand::package(buffer))
990    }
991
992    /// 内部方法:发送实时命令(统一处理单个帧和帧包)
993    fn send_realtime_command(&self, command: RealtimeCommand) -> Result<(), DriverError> {
994        let realtime_slot = self.realtime_slot.as_ref().ok_or(DriverError::NotDualThread)?;
995
996        match realtime_slot.lock() {
997            Ok(mut slot) => {
998                // 检测是否发生覆盖(如果插槽已有数据)
999                let is_overwrite = slot.is_some();
1000
1001                // 计算帧数量(在覆盖前,避免双重计算)
1002                let frame_count = command.len();
1003
1004                // 直接覆盖(邮箱模式:Last Write Wins)
1005                // 注意:如果旧命令是 Package,Drop 操作会释放 SmallVec
1006                // 但如果数据在栈上(len ≤ 4),Drop 只是栈指针移动,几乎零开销
1007                *slot = Some(command);
1008
1009                // 更新指标(在锁外更新,减少锁持有时间)
1010                // 注意:先释放锁,再更新指标,避免在锁内进行原子操作
1011                drop(slot); // 显式释放锁
1012
1013                // 更新指标(在锁外更新,减少锁持有时间)
1014                let total =
1015                    self.metrics.tx_frames_total.fetch_add(frame_count as u64, Ordering::Relaxed)
1016                        + frame_count as u64;
1017
1018                if is_overwrite {
1019                    let overwrites =
1020                        self.metrics.tx_realtime_overwrites.fetch_add(1, Ordering::Relaxed) + 1;
1021
1022                    // 智能监控:每 1000 次发送检查一次覆盖率
1023                    // 避免频繁计算,减少性能开销
1024                    if total > 0 && total.is_multiple_of(1000) {
1025                        let rate = (overwrites as f64 / total as f64) * 100.0;
1026
1027                        // 只在覆盖率超过阈值时警告
1028                        if rate > 50.0 {
1029                            // 异常情况:覆盖率 > 50%,记录警告
1030                            warn!(
1031                                "High realtime overwrite rate detected: {:.1}% ({} overwrites / {} total sends). \
1032                                 This may indicate TX thread bottleneck or excessive send frequency.",
1033                                rate, overwrites, total
1034                            );
1035                        } else if rate > 30.0 {
1036                            // 中等情况:覆盖率 30-50%,记录信息(可选,生产环境可关闭)
1037                            info!(
1038                                "Moderate realtime overwrite rate: {:.1}% ({} overwrites / {} total sends). \
1039                                 This is normal for high-frequency control (> 500Hz).",
1040                                rate, overwrites, total
1041                            );
1042                        }
1043                        // < 30% 不记录日志(正常情况)
1044                    }
1045                }
1046
1047                Ok(())
1048            },
1049            Err(_) => {
1050                error!("Realtime slot lock poisoned, TX thread may have panicked");
1051                Err(DriverError::PoisonedLock)
1052            },
1053        }
1054    }
1055
1056    /// 发送可靠命令(FIFO 策略)
1057    ///
1058    /// 可靠命令使用容量为 10 的队列,按 FIFO 顺序发送,不会覆盖。
1059    /// 这对于配置帧、状态机切换帧等关键命令很重要。
1060    ///
1061    /// # 参数
1062    /// - `frame`: 控制帧(已构建的 `PiperFrame`)
1063    ///
1064    /// # 错误
1065    /// - `DriverError::NotDualThread`: 未使用双线程模式
1066    /// - `DriverError::ChannelClosed`: 命令通道已关闭(TX 线程退出)
1067    /// - `DriverError::ChannelFull`: 队列满(非阻塞)
1068    pub fn send_reliable(&self, frame: PiperFrame) -> Result<(), DriverError> {
1069        let reliable_tx = self.reliable_tx.as_ref().ok_or(DriverError::NotDualThread)?;
1070
1071        match reliable_tx.try_send(frame) {
1072            Ok(_) => {
1073                self.metrics.tx_frames_total.fetch_add(1, Ordering::Relaxed);
1074                Ok(())
1075            },
1076            Err(crossbeam_channel::TrySendError::Full(_)) => {
1077                // 队列满,记录丢弃
1078                self.metrics.tx_reliable_drops.fetch_add(1, Ordering::Relaxed);
1079                Err(DriverError::ChannelFull)
1080            },
1081            Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
1082                Err(DriverError::ChannelClosed)
1083            },
1084        }
1085    }
1086
1087    /// 发送命令(根据优先级自动选择队列)
1088    ///
1089    /// 根据命令的优先级自动选择实时队列或可靠队列。
1090    ///
1091    /// # 参数
1092    /// - `command`: 带优先级的命令
1093    ///
1094    /// # 错误
1095    /// - `DriverError::NotDualThread`: 未使用双线程模式
1096    /// - `DriverError::ChannelClosed`: 命令通道已关闭(TX 线程退出)
1097    /// - `DriverError::ChannelFull`: 队列满(仅可靠命令)
1098    pub fn send_command(&self, command: PiperCommand) -> Result<(), DriverError> {
1099        match command.priority() {
1100            CommandPriority::RealtimeControl => self.send_realtime(command.frame()),
1101            CommandPriority::ReliableCommand => self.send_reliable(command.frame()),
1102        }
1103    }
1104
1105    /// 发送可靠命令(阻塞,带超时)
1106    ///
1107    /// 如果队列满,阻塞等待直到有空闲位置或超时。
1108    ///
1109    /// # 参数
1110    /// - `frame`: 控制帧(已构建的 `PiperFrame`)
1111    /// - `timeout`: 超时时间
1112    ///
1113    /// # 错误
1114    /// - `DriverError::NotDualThread`: 未使用双线程模式
1115    /// - `DriverError::ChannelClosed`: 命令通道已关闭(TX 线程退出)
1116    /// - `DriverError::Timeout`: 超时未发送成功
1117    pub fn send_reliable_timeout(
1118        &self,
1119        frame: PiperFrame,
1120        timeout: std::time::Duration,
1121    ) -> Result<(), DriverError> {
1122        let reliable_tx = self.reliable_tx.as_ref().ok_or(DriverError::NotDualThread)?;
1123
1124        match reliable_tx.send_timeout(frame, timeout) {
1125            Ok(_) => {
1126                self.metrics.tx_frames_total.fetch_add(1, Ordering::Relaxed);
1127                Ok(())
1128            },
1129            Err(crossbeam_channel::SendTimeoutError::Timeout(_)) => Err(DriverError::Timeout),
1130            Err(crossbeam_channel::SendTimeoutError::Disconnected(_)) => {
1131                Err(DriverError::ChannelClosed)
1132            },
1133        }
1134    }
1135}
1136
1137impl Drop for Piper {
1138    fn drop(&mut self) {
1139        // 设置运行标志为 false,通知所有线程退出
1140        // 使用 Release 确保所有之前的写入对其他线程可见
1141        self.is_running.store(false, Ordering::Release);
1142
1143        // 关闭命令通道(通知 IO 线程退出)
1144        // 关键:必须在 join 线程之前真正 drop 掉 Sender,否则接收端不会 Disconnected。
1145        unsafe {
1146            ManuallyDrop::drop(&mut self.cmd_tx);
1147        }
1148
1149        let join_timeout = Duration::from_secs(2);
1150
1151        // 等待 RX 线程退出(使用 join_timeout 替代 polling)
1152        if let Some(handle) = self.rx_thread.take()
1153            && let Err(_e) = handle.join_timeout(join_timeout)
1154        {
1155            error!(
1156                "RX thread panicked or failed to shut down within {:?}",
1157                join_timeout
1158            );
1159        }
1160
1161        // 等待 TX 线程退出(使用 join_timeout 替代 polling)
1162        if let Some(handle) = self.tx_thread.take()
1163            && let Err(_e) = handle.join_timeout(join_timeout)
1164        {
1165            error!(
1166                "TX thread panicked or failed to shut down within {:?}",
1167                join_timeout
1168            );
1169        }
1170
1171        // 等待 IO 线程退出(单线程模式,使用 join_timeout 替代 polling)
1172        if let Some(handle) = self.io_thread.take()
1173            && let Err(_e) = handle.join_timeout(join_timeout)
1174        {
1175            error!(
1176                "IO thread panicked or failed to shut down within {:?}",
1177                join_timeout
1178            );
1179        }
1180    }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use super::*;
1186    use piper_can::PiperFrame;
1187
1188    // 简单的 Mock CanAdapter 用于测试
1189    struct MockCanAdapter;
1190
1191    impl CanAdapter for MockCanAdapter {
1192        fn send(&mut self, _frame: PiperFrame) -> Result<(), CanError> {
1193            Ok(())
1194        }
1195
1196        fn receive(&mut self) -> Result<PiperFrame, CanError> {
1197            // 永远超时,避免阻塞测试
1198            Err(CanError::Timeout)
1199        }
1200    }
1201
1202    #[test]
1203    fn test_piper_new() {
1204        let mock_can = MockCanAdapter;
1205        let piper = Piper::new(mock_can, None).unwrap();
1206
1207        // 验证可以获取状态(默认状态)
1208        let joint_pos = piper.get_joint_position();
1209        assert_eq!(joint_pos.hardware_timestamp_us, 0);
1210
1211        // 验证通道正常工作
1212        let frame = PiperFrame::new_standard(0x123, &[0x01, 0x02]);
1213        assert!(piper.send_frame(frame).is_ok());
1214    }
1215
1216    #[test]
1217    fn test_piper_drop() {
1218        let mock_can = MockCanAdapter;
1219        let piper = Piper::new(mock_can, None).unwrap();
1220        // drop 应该能够正常退出,IO 线程被 join
1221        drop(piper);
1222    }
1223
1224    #[test]
1225    fn test_piper_get_motion_state() {
1226        let mock_can = MockCanAdapter;
1227        let piper = Piper::new(mock_can, None).unwrap();
1228        let motion = piper.get_motion_state();
1229        assert_eq!(motion.joint_position.hardware_timestamp_us, 0);
1230        assert_eq!(motion.joint_dynamic.group_timestamp_us, 0);
1231    }
1232
1233    #[test]
1234    fn test_piper_send_frame_channel_full() {
1235        let mock_can = MockCanAdapter;
1236        let piper = Piper::new(mock_can, None).unwrap();
1237        let frame = PiperFrame::new_standard(0x123, &[0x01]);
1238
1239        // 填满命令通道(容量 10)
1240        // 注意:IO 线程会持续消费帧,所以需要快速填充
1241        // 或者等待 IO 线程稍微延迟消费
1242        std::thread::sleep(std::time::Duration::from_millis(50));
1243
1244        for _ in 0..10 {
1245            assert!(piper.send_frame(frame).is_ok());
1246        }
1247
1248        // 第 11 次发送可能返回 ChannelFull(如果 IO 线程还没消费完)
1249        // 或者成功(如果 IO 线程已经消费了一些)
1250        // 为了测试 ChannelFull,我们需要更快速地发送,确保通道填满
1251        let result = piper.send_frame(frame);
1252
1253        // 由于 IO 线程在后台消费,可能成功也可能失败
1254        // 验证至少前 10 次都成功即可
1255        match result {
1256            Err(DriverError::ChannelFull) => {
1257                // 通道满,这是预期情况
1258            },
1259            Ok(()) => {
1260                // 如果 IO 线程消费很快,这也可能发生
1261                // 这是可接受的行为
1262            },
1263            Err(e) => panic!("Unexpected error: {:?}", e),
1264        }
1265    }
1266
1267    #[test]
1268    fn test_get_aligned_motion_aligned() {
1269        let mock_can = MockCanAdapter;
1270        let piper = Piper::new(mock_can, None).unwrap();
1271
1272        // 由于 MockCanAdapter 不发送帧,时间戳都为 0
1273        // 测试默认状态下的对齐检查(时间戳都为 0,应该是对齐的)
1274        let result = piper.get_aligned_motion(5000);
1275        match result {
1276            AlignmentResult::Ok(state) => {
1277                assert_eq!(state.timestamp, 0);
1278                assert_eq!(state.time_diff_us, 0);
1279            },
1280            AlignmentResult::Misaligned { .. } => {
1281                // 如果时间戳都为 0,不应该是不对齐的
1282                // 但允许这种情况(因为时间戳都是 0)
1283            },
1284        }
1285    }
1286
1287    #[test]
1288    fn test_get_aligned_motion_misaligned_threshold() {
1289        let mock_can = MockCanAdapter;
1290        let piper = Piper::new(mock_can, None).unwrap();
1291
1292        // 测试不同的时间差阈值
1293        // 由于时间戳都是 0,应该是对齐的
1294        let result1 = piper.get_aligned_motion(0);
1295        let result2 = piper.get_aligned_motion(1000);
1296        let result3 = piper.get_aligned_motion(1000000);
1297
1298        // 所有结果都应该返回状态(即使是对齐的)
1299        match (result1, result2, result3) {
1300            (AlignmentResult::Ok(_), AlignmentResult::Ok(_), AlignmentResult::Ok(_)) => {
1301                // 正常情况
1302            },
1303            _ => {
1304                // 允许其他情况
1305            },
1306        }
1307    }
1308
1309    #[test]
1310    fn test_get_robot_control() {
1311        let mock_can = MockCanAdapter;
1312        let piper = Piper::new(mock_can, None).unwrap();
1313
1314        let control = piper.get_robot_control();
1315        assert_eq!(control.hardware_timestamp_us, 0);
1316        assert_eq!(control.control_mode, 0);
1317        assert!(!control.is_enabled);
1318    }
1319
1320    #[test]
1321    fn test_get_joint_driver_low_speed() {
1322        let mock_can = MockCanAdapter;
1323        let piper = Piper::new(mock_can, None).unwrap();
1324
1325        let driver_state = piper.get_joint_driver_low_speed();
1326        assert_eq!(driver_state.hardware_timestamp_us, 0);
1327        assert_eq!(driver_state.motor_temps, [0.0; 6]);
1328    }
1329
1330    #[test]
1331    fn test_get_joint_limit_config() {
1332        let mock_can = MockCanAdapter;
1333        let piper = Piper::new(mock_can, None).unwrap();
1334
1335        let limits = piper.get_joint_limit_config().unwrap();
1336        assert_eq!(limits.joint_limits_max, [0.0; 6]);
1337    }
1338
1339    #[test]
1340    fn test_wait_for_feedback_timeout() {
1341        let mock_can = MockCanAdapter;
1342        let piper = Piper::new(mock_can, None).unwrap();
1343
1344        // MockCanAdapter 不发送帧,所以应该超时
1345        let result = piper.wait_for_feedback(std::time::Duration::from_millis(10));
1346        assert!(result.is_err());
1347        assert!(matches!(result.unwrap_err(), DriverError::Timeout));
1348    }
1349
1350    #[test]
1351    fn test_send_frame_blocking_timeout() {
1352        let mock_can = MockCanAdapter;
1353        let piper = Piper::new(mock_can, None).unwrap();
1354        let frame = PiperFrame::new_standard(0x123, &[0x01]);
1355
1356        // 快速填充通道(如果 IO 线程来不及消费)
1357        // 然后测试阻塞发送
1358        // 由于通道容量为 10,在 IO 线程消费的情况下,应该能成功
1359        // 但为了测试超时,我们使用极短的超时时间
1360        let result = piper.send_frame_blocking(frame, std::time::Duration::from_millis(1));
1361
1362        // 结果可能是成功(IO 线程消费快)或超时(通道满)
1363        match result {
1364            Ok(()) => {
1365                // 成功是正常情况
1366            },
1367            Err(DriverError::Timeout) => {
1368                // 超时也是可接受的(如果通道满)
1369            },
1370            Err(e) => panic!("Unexpected error: {:?}", e),
1371        }
1372    }
1373
1374    #[test]
1375    fn test_get_aligned_motion_with_time_diff() {
1376        let mock_can = MockCanAdapter;
1377        let piper = Piper::new(mock_can, None).unwrap();
1378
1379        // 测试对齐阈值边界情况
1380        // 时间戳都为 0 时,time_diff_us 应该是 0
1381        let result = piper.get_aligned_motion(0);
1382        match result {
1383            AlignmentResult::Ok(state) => {
1384                assert_eq!(state.time_diff_us, 0);
1385            },
1386            AlignmentResult::Misaligned { state, diff_us } => {
1387                // 如果时间戳都为 0,diff_us 应该也是 0
1388                assert_eq!(diff_us, 0);
1389                assert_eq!(state.time_diff_us, 0);
1390            },
1391        }
1392    }
1393
1394    #[test]
1395    fn test_get_motion_state_returns_combined() {
1396        let mock_can = MockCanAdapter;
1397        let piper = Piper::new(mock_can, None).unwrap();
1398
1399        let motion = piper.get_motion_state();
1400        // 验证返回的是组合状态
1401        assert_eq!(motion.joint_position.hardware_timestamp_us, 0);
1402        assert_eq!(motion.joint_dynamic.group_timestamp_us, 0);
1403        assert_eq!(motion.joint_position.joint_pos, [0.0; 6]);
1404        assert_eq!(motion.joint_dynamic.joint_vel, [0.0; 6]);
1405    }
1406
1407    #[test]
1408    fn test_send_frame_non_blocking() {
1409        let mock_can = MockCanAdapter;
1410        let piper = Piper::new(mock_can, None).unwrap();
1411        let frame = PiperFrame::new_standard(0x123, &[0x01, 0x02]);
1412
1413        // 非阻塞发送应该总是成功(除非通道满或关闭)
1414        let result = piper.send_frame(frame);
1415        assert!(result.is_ok(), "Non-blocking send should succeed");
1416    }
1417
1418    #[test]
1419    fn test_get_joint_dynamic_default() {
1420        let mock_can = MockCanAdapter;
1421        let piper = Piper::new(mock_can, None).unwrap();
1422
1423        let joint_dynamic = piper.get_joint_dynamic();
1424        assert_eq!(joint_dynamic.group_timestamp_us, 0);
1425        assert_eq!(joint_dynamic.joint_vel, [0.0; 6]);
1426        assert_eq!(joint_dynamic.joint_current, [0.0; 6]);
1427        assert!(!joint_dynamic.is_complete());
1428    }
1429
1430    #[test]
1431    fn test_get_joint_position_default() {
1432        let mock_can = MockCanAdapter;
1433        let piper = Piper::new(mock_can, None).unwrap();
1434
1435        let joint_pos = piper.get_joint_position();
1436        assert_eq!(joint_pos.hardware_timestamp_us, 0);
1437        assert_eq!(joint_pos.joint_pos, [0.0; 6]);
1438
1439        let end_pose = piper.get_end_pose();
1440        assert_eq!(end_pose.hardware_timestamp_us, 0);
1441        assert_eq!(end_pose.end_pose, [0.0; 6]);
1442    }
1443
1444    #[test]
1445    fn test_joint_driver_low_speed_clone() {
1446        let mock_can = MockCanAdapter;
1447        let piper = Piper::new(mock_can, None).unwrap();
1448
1449        // 测试读取并克隆诊断状态
1450        let driver1 = piper.get_joint_driver_low_speed();
1451        let driver2 = piper.get_joint_driver_low_speed();
1452
1453        // 验证可以多次读取(ArcSwap 无锁读取)
1454        assert_eq!(driver1.hardware_timestamp_us, driver2.hardware_timestamp_us);
1455        assert_eq!(driver1.motor_temps, driver2.motor_temps);
1456    }
1457
1458    #[test]
1459    fn test_joint_limit_config_read_lock() {
1460        let mock_can = MockCanAdapter;
1461        let piper = Piper::new(mock_can, None).unwrap();
1462
1463        // 测试可以多次读取配置状态
1464        let limits1 = piper.get_joint_limit_config().unwrap();
1465        let limits2 = piper.get_joint_limit_config().unwrap();
1466
1467        assert_eq!(limits1.joint_limits_max, limits2.joint_limits_max);
1468        assert_eq!(limits1.joint_limits_min, limits2.joint_limits_min);
1469    }
1470}