1use crate::metrics::PiperMetrics;
6use crate::state::*;
7use crossbeam_channel::Receiver;
8use piper_can::{CanAdapter, CanError, PiperFrame, RxAdapter, TxAdapter};
9use piper_protocol::config::*;
10use piper_protocol::feedback::*;
11use piper_protocol::ids::*;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::time::{Duration, Instant, SystemTime};
15use tracing::{debug, error, trace, warn};
16
17use spin_sleep;
19
20#[inline]
36fn safe_system_timestamp_us() -> u64 {
37 match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
38 Ok(duration) => duration.as_micros() as u64,
39 Err(_) => {
40 warn!("System clock went backwards, using invalid timestamp (0)");
43 0
44 },
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct PipelineConfig {
69 pub receive_timeout_ms: u64,
71 pub frame_group_timeout_ms: u64,
74 pub velocity_buffer_timeout_us: u64,
77}
78
79impl Default for PipelineConfig {
80 fn default() -> Self {
81 Self {
82 receive_timeout_ms: 2,
83 frame_group_timeout_ms: 10,
84 velocity_buffer_timeout_us: 10_000, }
86 }
87}
88
89pub struct ParserState<'a> {
110 pub pending_joint_pos: [f64; 6],
113 pub joint_pos_frame_mask: u8,
115
116 pub pending_end_pose: [f64; 6],
119 pub end_pose_frame_mask: u8,
121
122 pub pending_joint_dynamic: JointDynamicState,
125 pub vel_update_mask: u8,
127 pub last_vel_commit_time_us: u64,
129 pub last_vel_packet_time_us: u64,
131 pub last_vel_packet_instant: Option<Instant>,
133
134 pub pending_joint_target_deg: [i32; 6],
137 pub joint_control_frame_mask: u8,
139
140 _phantom: std::marker::PhantomData<&'a ()>,
143}
144
145impl<'a> ParserState<'a> {
146 pub fn new() -> Self {
155 Self {
156 pending_joint_pos: [0.0; 6],
157 joint_pos_frame_mask: 0,
158 pending_end_pose: [0.0; 6],
159 end_pose_frame_mask: 0,
160 pending_joint_dynamic: JointDynamicState::default(),
161 vel_update_mask: 0,
162 last_vel_commit_time_us: 0,
163 last_vel_packet_time_us: 0,
164 last_vel_packet_instant: None,
165 pending_joint_target_deg: [0; 6],
166 joint_control_frame_mask: 0,
167 _phantom: std::marker::PhantomData,
168 }
169 }
170}
171
172impl<'a> Default for ParserState<'a> {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178pub fn io_loop(
186 mut can: impl CanAdapter,
187 cmd_rx: Receiver<PiperFrame>,
188 ctx: Arc<PiperContext>,
189 config: PipelineConfig,
190) {
191 let mut state = ParserState::new();
193
194 let frame_group_timeout = Duration::from_millis(config.frame_group_timeout_ms);
197 let mut last_frame_time = std::time::Instant::now();
198
199 loop {
200 if drain_tx_queue(&mut can, &cmd_rx) {
204 break;
206 }
207
208 let frame = match can.receive() {
212 Ok(frame) => frame,
213 Err(CanError::Timeout) => {
214 let elapsed = last_frame_time.elapsed();
219 if elapsed > frame_group_timeout {
220 warn!(
223 "Frame group timeout after {:?}, resetting pending buffers",
224 elapsed
225 );
226 state.pending_joint_pos = [0.0; 6];
227 state.pending_end_pose = [0.0; 6];
228 state.joint_pos_frame_mask = 0;
229 state.end_pose_frame_mask = 0;
230 state.pending_joint_target_deg = [0; 6];
231 state.joint_control_frame_mask = 0;
232 last_frame_time = Instant::now();
233 }
234
235 if state.vel_update_mask != 0
239 && let Some(last_vel_instant) = state.last_vel_packet_instant
240 {
241 let elapsed_since_last_vel = last_vel_instant.elapsed();
242 let vel_timeout_threshold = Duration::from_micros(6000); if elapsed_since_last_vel > vel_timeout_threshold {
248 warn!(
250 "Velocity buffer timeout: mask={:06b}, forcing commit with incomplete data",
251 state.vel_update_mask
252 );
253 if state.last_vel_packet_time_us > 0 {
255 state.pending_joint_dynamic.group_timestamp_us =
256 state.last_vel_packet_time_us;
257 state.pending_joint_dynamic.valid_mask = state.vel_update_mask;
258 ctx.joint_dynamic.store(Arc::new(state.pending_joint_dynamic.clone()));
259 ctx.fps_stats
260 .load()
261 .joint_dynamic_updates
262 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
263
264 state.vel_update_mask = 0;
266 state.last_vel_commit_time_us = state.last_vel_packet_time_us;
267 state.last_vel_packet_instant = None;
268 } else {
269 state.vel_update_mask = 0;
271 state.last_vel_packet_instant = None;
272 }
273 }
274 }
275
276 continue;
277 },
278 Err(e) => {
279 error!("CAN receive error: {}", e);
280 continue;
282 },
283 };
284
285 last_frame_time = std::time::Instant::now();
286
287 parse_and_update_state(&frame, &ctx, &config, &mut state);
291
292 ctx.connection_monitor.register_feedback();
296
297 if drain_tx_queue(&mut can, &cmd_rx) {
301 break;
303 }
304
305 }
308}
309
310fn drain_tx_queue(can: &mut impl CanAdapter, cmd_rx: &Receiver<PiperFrame>) -> bool {
329 const MAX_DRAIN_PER_CYCLE: usize = 32;
331 const TIME_BUDGET: Duration = Duration::from_micros(500); let start = std::time::Instant::now();
334
335 for _ in 0..MAX_DRAIN_PER_CYCLE {
336 if start.elapsed() > TIME_BUDGET {
338 let remaining = cmd_rx.len();
339 trace!("Drain time budget exhausted, deferred {} frames", remaining);
340 break;
341 }
342
343 match cmd_rx.try_recv() {
344 Ok(cmd_frame) => {
345 if let Err(e) = can.send(cmd_frame) {
346 error!("Failed to send control frame: {}", e);
347 }
349 },
350 Err(crossbeam_channel::TryRecvError::Empty) => break, Err(crossbeam_channel::TryRecvError::Disconnected) => return true, }
353 }
354
355 false
356}
357
358pub fn rx_loop(
370 mut rx: impl RxAdapter,
371 ctx: Arc<PiperContext>,
372 config: PipelineConfig,
373 is_running: Arc<AtomicBool>,
374 metrics: Arc<PiperMetrics>,
375) {
376 #[cfg(feature = "realtime")]
378 {
379 use thread_priority::*;
380 use tracing::info;
381
382 match set_current_thread_priority(ThreadPriority::Max) {
383 Ok(_) => {
384 info!("RX thread priority set to MAX (realtime)");
385 },
386 Err(e) => {
387 warn!(
388 "Failed to set RX thread priority: {}. \
389 On Linux, you may need to run with CAP_SYS_NICE or use rtkit. \
390 See README for details.",
391 e
392 );
393 },
394 }
395 }
396
397 let mut state = ParserState::new();
399
400 let frame_group_timeout = Duration::from_millis(config.frame_group_timeout_ms);
401 let mut last_frame_time = std::time::Instant::now();
402
403 loop {
404 if !is_running.load(Ordering::Acquire) {
407 trace!("RX thread: is_running flag is false, exiting");
408 break;
409 }
410
411 let frame = match rx.receive() {
415 Ok(frame) => {
416 metrics.rx_frames_total.fetch_add(1, Ordering::Relaxed);
417 frame
418 },
419 Err(CanError::Timeout) => {
420 metrics.rx_timeouts.fetch_add(1, Ordering::Relaxed);
422
423 let elapsed = last_frame_time.elapsed();
425 if elapsed > frame_group_timeout {
426 state.pending_joint_pos = [0.0; 6];
428 state.pending_end_pose = [0.0; 6];
429 state.joint_pos_frame_mask = 0;
430 state.end_pose_frame_mask = 0;
431 state.pending_joint_target_deg = [0; 6];
432 state.joint_control_frame_mask = 0;
433 }
434
435 if state.vel_update_mask != 0
437 && let Some(last_vel_instant) = state.last_vel_packet_instant
438 {
439 let elapsed_since_last_vel = last_vel_instant.elapsed();
440 let vel_timeout_threshold = Duration::from_micros(6000); if elapsed_since_last_vel > vel_timeout_threshold {
443 warn!(
444 "Velocity buffer timeout: mask={:06b}, forcing commit with incomplete data",
445 state.vel_update_mask
446 );
447 if state.last_vel_packet_time_us > 0 {
448 state.pending_joint_dynamic.group_timestamp_us =
449 state.last_vel_packet_time_us;
450 state.pending_joint_dynamic.valid_mask = state.vel_update_mask;
451 ctx.joint_dynamic.store(Arc::new(state.pending_joint_dynamic.clone()));
452 ctx.fps_stats
453 .load()
454 .joint_dynamic_updates
455 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
456
457 state.vel_update_mask = 0;
458 state.last_vel_commit_time_us = state.last_vel_packet_time_us;
459 state.last_vel_packet_instant = None;
460 } else {
461 state.vel_update_mask = 0;
462 state.last_vel_packet_instant = None;
463 }
464 }
465 }
466
467 continue;
468 },
469 Err(e) => {
470 error!("RX thread: CAN receive error: {}", e);
472 metrics.device_errors.fetch_add(1, Ordering::Relaxed);
473
474 let is_fatal = matches!(e, CanError::Device(_) | CanError::BufferOverflow);
476
477 if is_fatal {
478 error!("RX thread: Fatal error detected, setting is_running = false");
479 is_running.store(false, Ordering::Release);
481 break;
482 }
483
484 continue;
486 },
487 };
488
489 last_frame_time = std::time::Instant::now();
490 metrics.rx_frames_valid.fetch_add(1, Ordering::Relaxed);
491
492 if let Ok(hooks) = ctx.hooks.try_read() {
497 hooks.trigger_all(&frame);
498 }
500
501 parse_and_update_state(&frame, &ctx, &config, &mut state);
506 }
507
508 trace!("RX thread: loop exited");
509}
510
511pub fn tx_loop_mailbox(
524 mut tx: impl TxAdapter,
525 realtime_slot: Arc<std::sync::Mutex<Option<crate::command::RealtimeCommand>>>,
526 reliable_rx: Receiver<PiperFrame>,
527 is_running: Arc<AtomicBool>,
528 metrics: Arc<PiperMetrics>,
529 ctx: Arc<PiperContext>,
530) {
531 const REALTIME_BURST_LIMIT: usize = 100;
533 let mut realtime_burst_count = 0;
534
535 loop {
536 if !is_running.load(Ordering::Acquire) {
539 trace!("TX thread: is_running flag is false, exiting");
540 break;
541 }
542
543 let realtime_command = {
546 match realtime_slot.lock() {
547 Ok(mut slot) => slot.take(), Err(_) => {
549 error!("TX thread: Realtime slot lock poisoned");
551 None
552 },
553 }
554 };
555
556 if let Some(command) = realtime_command {
557 let frames = command.into_frames();
560 let total_frames = frames.len();
561 let mut sent_count = 0;
562 let mut should_break = false;
563
564 for frame in frames {
565 match tx.send(frame) {
566 Ok(_) => {
567 sent_count += 1;
568 metrics.tx_frames_total.fetch_add(1, Ordering::Relaxed);
569 if let Ok(hooks) = ctx.hooks.try_read() {
572 hooks.trigger_all_sent(&frame);
573 }
575 },
576 Err(e) => {
577 error!(
578 "TX thread: Failed to send frame {} in package: {}",
579 sent_count, e
580 );
581 metrics.device_errors.fetch_add(1, Ordering::Relaxed);
582 metrics.tx_timeouts.fetch_add(1, Ordering::Relaxed);
583
584 let is_fatal = matches!(e, CanError::Device(_) | CanError::BufferOverflow);
586 if is_fatal {
587 error!("TX thread: Fatal error detected, setting is_running = false");
588 is_running.store(false, Ordering::Release);
590 should_break = true;
591 }
592
593 break;
596 },
597 }
598 }
599
600 if sent_count > 0 {
602 metrics.tx_package_sent.fetch_add(1, Ordering::Relaxed);
603 if sent_count < total_frames {
604 metrics.tx_package_partial.fetch_add(1, Ordering::Relaxed);
605 }
606 }
607
608 if should_break {
609 break;
610 }
611
612 realtime_burst_count += 1;
614 if realtime_burst_count >= REALTIME_BURST_LIMIT {
615 realtime_burst_count = 0;
617 } else {
619 continue;
621 }
622 } else {
623 realtime_burst_count = 0;
625 }
626
627 if let Ok(frame) = reliable_rx.try_recv() {
629 match tx.send(frame) {
630 Ok(_) => {
631 metrics.tx_frames_total.fetch_add(1, Ordering::Relaxed);
633
634 if let Ok(hooks) = ctx.hooks.try_read() {
637 hooks.trigger_all_sent(&frame);
638 }
640 },
641 Err(e) => {
642 error!("TX thread: Failed to send reliable frame: {}", e);
643 metrics.device_errors.fetch_add(1, Ordering::Relaxed);
644 metrics.tx_timeouts.fetch_add(1, Ordering::Relaxed);
645
646 let is_fatal = matches!(e, CanError::Device(_) | CanError::BufferOverflow);
648
649 if is_fatal {
650 error!("TX thread: Fatal error detected, setting is_running = false");
651 is_running.store(false, Ordering::Release);
653 break;
654 }
655 },
656 }
657 continue;
658 }
659
660 spin_sleep::sleep(Duration::from_micros(50));
665 }
666
667 trace!("TX thread: loop exited");
668}
669
670fn parse_and_update_state(
687 frame: &PiperFrame,
688 ctx: &Arc<PiperContext>,
689 config: &PipelineConfig,
690 state: &mut ParserState,
691) {
692 match frame.id {
694 ID_JOINT_FEEDBACK_12 => {
698 if let Ok(feedback) = JointFeedback12::try_from(*frame) {
699 state.pending_joint_pos[0] = feedback.j1_rad();
700 state.pending_joint_pos[1] = feedback.j2_rad();
701 state.joint_pos_frame_mask |= 1 << 0; } else {
703 warn!("Failed to parse JointFeedback12: CAN ID 0x{:X}", frame.id);
704 }
705 },
706
707 ID_JOINT_FEEDBACK_34 => {
709 if let Ok(feedback) = JointFeedback34::try_from(*frame) {
710 state.pending_joint_pos[2] = feedback.j3_rad();
711 state.pending_joint_pos[3] = feedback.j4_rad();
712 state.joint_pos_frame_mask |= 1 << 1; } else {
714 warn!("Failed to parse JointFeedback34: CAN ID 0x{:X}", frame.id);
715 }
716 },
717
718 ID_JOINT_FEEDBACK_56 => {
720 if let Ok(feedback) = JointFeedback56::try_from(*frame) {
721 state.pending_joint_pos[4] = feedback.j5_rad();
722 state.pending_joint_pos[5] = feedback.j6_rad();
723 state.joint_pos_frame_mask |= 1 << 2; let system_timestamp_us = safe_system_timestamp_us();
727
728 let new_joint_pos_state = JointPositionState {
730 hardware_timestamp_us: frame.timestamp_us,
731 system_timestamp_us,
732 joint_pos: state.pending_joint_pos,
733 frame_valid_mask: state.joint_pos_frame_mask,
734 };
735 ctx.joint_position.store(Arc::new(new_joint_pos_state));
736 ctx.fps_stats
737 .load()
738 .joint_position_updates
739 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
740 state.joint_pos_frame_mask = 0;
744 } else {
745 warn!("Failed to parse JointFeedback56: CAN ID 0x{:X}", frame.id);
746 }
747 },
748
749 ID_END_POSE_1 => {
751 if let Ok(feedback) = EndPoseFeedback1::try_from(*frame) {
752 state.pending_end_pose[0] = feedback.x() / 1000.0; state.pending_end_pose[1] = feedback.y() / 1000.0; state.end_pose_frame_mask |= 1 << 0; }
756 },
757
758 ID_END_POSE_2 => {
760 if let Ok(feedback) = EndPoseFeedback2::try_from(*frame) {
761 state.pending_end_pose[2] = feedback.z() / 1000.0; state.pending_end_pose[3] = feedback.rx_rad();
763 state.end_pose_frame_mask |= 1 << 1; }
765 },
766
767 ID_END_POSE_3 => {
769 if let Ok(feedback) = EndPoseFeedback3::try_from(*frame) {
770 state.pending_end_pose[4] = feedback.ry_rad();
771 state.pending_end_pose[5] = feedback.rz_rad();
772 state.end_pose_frame_mask |= 1 << 2; let system_timestamp_us = safe_system_timestamp_us();
776
777 let new_end_pose_state = EndPoseState {
779 hardware_timestamp_us: frame.timestamp_us,
780 system_timestamp_us,
781 end_pose: state.pending_end_pose,
782 frame_valid_mask: state.end_pose_frame_mask,
783 };
784 ctx.end_pose.store(Arc::new(new_end_pose_state));
785 ctx.fps_stats
786 .load()
787 .end_pose_updates
788 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
789 state.end_pose_frame_mask = 0;
793 }
794 },
795
796 id if (ID_JOINT_DRIVER_HIGH_SPEED_BASE..=ID_JOINT_DRIVER_HIGH_SPEED_BASE + 5)
798 .contains(&id) =>
799 {
800 let joint_index = (id - ID_JOINT_DRIVER_HIGH_SPEED_BASE) as usize;
801
802 if let Ok(feedback) = JointDriverHighSpeedFeedback::try_from(*frame) {
803 state.pending_joint_dynamic.joint_vel[joint_index] = feedback.speed();
805 state.pending_joint_dynamic.joint_current[joint_index] = feedback.current();
806 state.pending_joint_dynamic.timestamps[joint_index] = frame.timestamp_us;
807
808 state.vel_update_mask |= 1 << joint_index;
810 state.last_vel_packet_time_us = frame.timestamp_us;
811 state.last_vel_packet_instant = Some(std::time::Instant::now());
812
813 let all_received = state.vel_update_mask == 0b111111; let time_since_last_commit = if state.last_vel_commit_time_us == 0 {
820 0 } else {
822 frame.timestamp_us.wrapping_sub(state.last_vel_commit_time_us)
824 };
825
826 let timeout_threshold_us = config.velocity_buffer_timeout_us;
828
829 if all_received || time_since_last_commit > timeout_threshold_us {
830 state.pending_joint_dynamic.group_timestamp_us = frame.timestamp_us;
832 state.pending_joint_dynamic.valid_mask = state.vel_update_mask;
833
834 ctx.joint_dynamic.store(Arc::new(state.pending_joint_dynamic.clone()));
835 ctx.fps_stats
836 .load()
837 .joint_dynamic_updates
838 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
839
840 state.vel_update_mask = 0;
842 state.last_vel_commit_time_us = frame.timestamp_us;
843 state.last_vel_packet_instant = None;
844
845 if !all_received {
846 warn!(
847 "Velocity frame commit timeout: mask={:06b}, incomplete data",
848 state.vel_update_mask
849 );
850 }
851 }
853 }
854 },
855
856 ID_ROBOT_STATUS => {
858 if let Ok(feedback) = RobotStatusFeedback::try_from(*frame) {
860 let system_timestamp_us = safe_system_timestamp_us();
861
862 let fault_angle_limit_mask = feedback.fault_code_angle_limit.joint1_limit() as u8
864 | (feedback.fault_code_angle_limit.joint2_limit() as u8) << 1
865 | (feedback.fault_code_angle_limit.joint3_limit() as u8) << 2
866 | (feedback.fault_code_angle_limit.joint4_limit() as u8) << 3
867 | (feedback.fault_code_angle_limit.joint5_limit() as u8) << 4
868 | (feedback.fault_code_angle_limit.joint6_limit() as u8) << 5;
869
870 let fault_comm_error_mask = feedback.fault_code_comm_error.joint1_comm_error()
871 as u8
872 | (feedback.fault_code_comm_error.joint2_comm_error() as u8) << 1
873 | (feedback.fault_code_comm_error.joint3_comm_error() as u8) << 2
874 | (feedback.fault_code_comm_error.joint4_comm_error() as u8) << 3
875 | (feedback.fault_code_comm_error.joint5_comm_error() as u8) << 4
876 | (feedback.fault_code_comm_error.joint6_comm_error() as u8) << 5;
877
878 let new_robot_control_state = RobotControlState {
879 hardware_timestamp_us: frame.timestamp_us,
880 system_timestamp_us,
881 control_mode: feedback.control_mode as u8,
882 robot_status: feedback.robot_status as u8,
883 move_mode: feedback.move_mode as u8,
884 teach_status: feedback.teach_status as u8,
885 motion_status: feedback.motion_status as u8,
886 trajectory_point_index: feedback.trajectory_point_index,
887 fault_angle_limit_mask,
888 fault_comm_error_mask,
889 is_enabled: matches!(feedback.robot_status, RobotStatus::Normal),
890 feedback_counter: 0,
893 };
894
895 ctx.robot_control.store(Arc::new(new_robot_control_state));
896 ctx.fps_stats
897 .load()
898 .robot_control_updates
899 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
900 }
902 },
903
904 ID_GRIPPER_FEEDBACK => {
905 if let Ok(feedback) = GripperFeedback::try_from(*frame) {
907 let system_timestamp_us = safe_system_timestamp_us();
908
909 let current = ctx.gripper.load();
910 let last_travel = current.last_travel;
911
912 let new_gripper_state = GripperState {
913 hardware_timestamp_us: frame.timestamp_us,
914 system_timestamp_us,
915 travel: feedback.travel(),
916 torque: feedback.torque(),
917 status_code: u8::from(feedback.status),
918 last_travel,
919 };
920
921 ctx.gripper.rcu(|old| {
922 let mut new = new_gripper_state.clone();
923 new.last_travel = old.travel;
924 Arc::new(new)
925 });
926
927 ctx.fps_stats
928 .load()
929 .gripper_updates
930 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
931 }
933 },
934
935 id if (ID_JOINT_DRIVER_LOW_SPEED_BASE..=ID_JOINT_DRIVER_LOW_SPEED_BASE + 5)
937 .contains(&id) =>
938 {
939 if let Ok(feedback) = JointDriverLowSpeedFeedback::try_from(*frame) {
941 let joint_idx = (feedback.joint_index as usize).saturating_sub(1);
942 if joint_idx < 6 {
943 let system_timestamp_us = safe_system_timestamp_us();
945
946 ctx.joint_driver_low_speed.rcu(|old| {
947 let mut new = (**old).clone();
948 new.motor_temps[joint_idx] = feedback.motor_temp() as f32;
949 new.driver_temps[joint_idx] = feedback.driver_temp() as f32;
950 new.joint_voltage[joint_idx] = feedback.voltage() as f32;
951 new.joint_bus_current[joint_idx] = feedback.bus_current() as f32;
952 new.hardware_timestamps[joint_idx] = frame.timestamp_us;
953 new.system_timestamps[joint_idx] = system_timestamp_us;
954 new.hardware_timestamp_us = frame.timestamp_us;
955 new.system_timestamp_us = system_timestamp_us;
956 new.valid_mask |= 1 << joint_idx;
957
958 if feedback.status.voltage_low() {
960 new.driver_voltage_low_mask |= 1 << joint_idx;
961 } else {
962 new.driver_voltage_low_mask &= !(1 << joint_idx);
963 }
964 if feedback.status.motor_over_temp() {
965 new.driver_motor_over_temp_mask |= 1 << joint_idx;
966 } else {
967 new.driver_motor_over_temp_mask &= !(1 << joint_idx);
968 }
969 if feedback.status.driver_over_current() {
970 new.driver_over_current_mask |= 1 << joint_idx;
971 } else {
972 new.driver_over_current_mask &= !(1 << joint_idx);
973 }
974 if feedback.status.driver_over_temp() {
975 new.driver_over_temp_mask |= 1 << joint_idx;
976 } else {
977 new.driver_over_temp_mask &= !(1 << joint_idx);
978 }
979 if feedback.status.collision_protection() {
980 new.driver_collision_protection_mask |= 1 << joint_idx;
981 } else {
982 new.driver_collision_protection_mask &= !(1 << joint_idx);
983 }
984 if feedback.status.driver_error() {
985 new.driver_error_mask |= 1 << joint_idx;
986 } else {
987 new.driver_error_mask &= !(1 << joint_idx);
988 }
989 if feedback.status.enabled() {
990 new.driver_enabled_mask |= 1 << joint_idx;
991 } else {
992 new.driver_enabled_mask &= !(1 << joint_idx);
993 }
994 if feedback.status.stall_protection() {
995 new.driver_stall_protection_mask |= 1 << joint_idx;
996 } else {
997 new.driver_stall_protection_mask &= !(1 << joint_idx);
998 }
999 Arc::new(new)
1000 });
1001
1002 ctx.fps_stats
1003 .load()
1004 .joint_driver_low_speed_updates
1005 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1006 }
1008 }
1009 },
1010
1011 ID_COLLISION_PROTECTION_LEVEL_FEEDBACK => {
1012 if let Ok(feedback) = CollisionProtectionLevelFeedback::try_from(*frame) {
1014 let system_timestamp_us = safe_system_timestamp_us();
1015
1016 if let Ok(mut collision) = ctx.collision_protection.try_write() {
1018 collision.hardware_timestamp_us = frame.timestamp_us;
1019 collision.system_timestamp_us = system_timestamp_us;
1020 collision.protection_levels = feedback.levels;
1021 }
1022 ctx.fps_stats
1025 .load()
1026 .collision_protection_updates
1027 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1028 }
1030 },
1031
1032 ID_MOTOR_LIMIT_FEEDBACK => {
1034 if let Ok(feedback) = MotorLimitFeedback::try_from(*frame) {
1036 let joint_idx = (feedback.joint_index as usize).saturating_sub(1);
1037 if joint_idx < 6 {
1038 let system_timestamp_us = safe_system_timestamp_us();
1040
1041 if let Ok(mut joint_limit) = ctx.joint_limit_config.write() {
1042 joint_limit.joint_limits_max[joint_idx] = feedback.max_angle().to_radians();
1043 joint_limit.joint_limits_min[joint_idx] = feedback.min_angle().to_radians();
1044 joint_limit.joint_max_velocity[joint_idx] = feedback.max_velocity();
1045 joint_limit.joint_update_hardware_timestamps[joint_idx] =
1046 frame.timestamp_us;
1047 joint_limit.joint_update_system_timestamps[joint_idx] = system_timestamp_us;
1048 joint_limit.last_update_hardware_timestamp_us = frame.timestamp_us;
1049 joint_limit.last_update_system_timestamp_us = system_timestamp_us;
1050 joint_limit.valid_mask |= 1 << joint_idx;
1051 }
1052
1053 ctx.fps_stats
1054 .load()
1055 .joint_limit_config_updates
1056 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1057 }
1059 }
1060 },
1061
1062 ID_MOTOR_MAX_ACCEL_FEEDBACK => {
1063 if let Ok(feedback) = MotorMaxAccelFeedback::try_from(*frame) {
1065 let joint_idx = (feedback.joint_index as usize).saturating_sub(1);
1066 if joint_idx < 6 {
1067 let system_timestamp_us = safe_system_timestamp_us();
1069
1070 if let Ok(mut joint_accel) = ctx.joint_accel_config.write() {
1071 joint_accel.max_acc_limits[joint_idx] = feedback.max_accel();
1072 joint_accel.joint_update_hardware_timestamps[joint_idx] =
1073 frame.timestamp_us;
1074 joint_accel.joint_update_system_timestamps[joint_idx] = system_timestamp_us;
1075 joint_accel.last_update_hardware_timestamp_us = frame.timestamp_us;
1076 joint_accel.last_update_system_timestamp_us = system_timestamp_us;
1077 joint_accel.valid_mask |= 1 << joint_idx;
1078 }
1079
1080 ctx.fps_stats
1081 .load()
1082 .joint_accel_config_updates
1083 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1084 }
1086 }
1087 },
1088
1089 ID_END_VELOCITY_ACCEL_FEEDBACK => {
1090 if let Ok(feedback) = EndVelocityAccelFeedback::try_from(*frame) {
1092 let system_timestamp_us = safe_system_timestamp_us();
1093
1094 if let Ok(mut end_limit) = ctx.end_limit_config.write() {
1095 end_limit.max_end_linear_velocity = feedback.max_linear_velocity();
1096 end_limit.max_end_angular_velocity = feedback.max_angular_velocity();
1097 end_limit.max_end_linear_accel = feedback.max_linear_accel();
1098 end_limit.max_end_angular_accel = feedback.max_angular_accel();
1099 end_limit.last_update_hardware_timestamp_us = frame.timestamp_us;
1100 end_limit.last_update_system_timestamp_us = system_timestamp_us;
1101 end_limit.is_valid = true;
1102 }
1103
1104 ctx.fps_stats
1105 .load()
1106 .end_limit_config_updates
1107 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1108 }
1110 },
1111
1112 ID_FIRMWARE_READ => {
1114 if let Ok(feedback) = FirmwareReadFeedback::try_from(*frame) {
1116 let system_timestamp_us = safe_system_timestamp_us();
1117
1118 if let Ok(mut firmware_state) = ctx.firmware_version.write() {
1119 firmware_state.firmware_data.extend_from_slice(feedback.firmware_data());
1120 firmware_state.hardware_timestamp_us = frame.timestamp_us;
1121 firmware_state.system_timestamp_us = system_timestamp_us;
1122 firmware_state.parse_version();
1123 }
1124
1125 ctx.fps_stats
1126 .load()
1127 .firmware_version_updates
1128 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1129 }
1131 },
1132
1133 ID_CONTROL_MODE => {
1134 if let Ok(feedback) = ControlModeCommandFeedback::try_from(*frame) {
1136 let system_timestamp_us = safe_system_timestamp_us();
1137
1138 let new_state = MasterSlaveControlModeState {
1139 hardware_timestamp_us: frame.timestamp_us,
1140 system_timestamp_us,
1141 control_mode: feedback.control_mode as u8,
1142 move_mode: feedback.move_mode as u8,
1143 speed_percent: feedback.speed_percent,
1144 mit_mode: feedback.mit_mode as u8,
1145 trajectory_stay_time: feedback.trajectory_stay_time,
1146 install_position: feedback.install_position as u8,
1147 is_valid: true,
1148 };
1149
1150 ctx.master_slave_control_mode.store(Arc::new(new_state));
1151 ctx.fps_stats
1152 .load()
1153 .master_slave_control_mode_updates
1154 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1155 }
1157 },
1158
1159 ID_JOINT_CONTROL_12 => {
1160 if let Ok(feedback) = JointControl12Feedback::try_from(*frame) {
1162 state.pending_joint_target_deg[0] = feedback.j1_deg;
1163 state.pending_joint_target_deg[1] = feedback.j2_deg;
1164 state.joint_control_frame_mask |= 1 << 0; }
1166 },
1167
1168 ID_JOINT_CONTROL_34 => {
1169 if let Ok(feedback) = JointControl34Feedback::try_from(*frame) {
1171 state.pending_joint_target_deg[2] = feedback.j3_deg;
1172 state.pending_joint_target_deg[3] = feedback.j4_deg;
1173 state.joint_control_frame_mask |= 1 << 1; }
1175 },
1176
1177 ID_JOINT_CONTROL_56 => {
1178 if let Ok(feedback) = JointControl56Feedback::try_from(*frame) {
1180 state.pending_joint_target_deg[4] = feedback.j5_deg;
1181 state.pending_joint_target_deg[5] = feedback.j6_deg;
1182 state.joint_control_frame_mask |= 1 << 2; let system_timestamp_us = safe_system_timestamp_us();
1185
1186 let new_state = MasterSlaveJointControlState {
1187 hardware_timestamp_us: frame.timestamp_us,
1188 system_timestamp_us,
1189 joint_target_deg: state.pending_joint_target_deg,
1190 frame_valid_mask: state.joint_control_frame_mask,
1191 };
1192
1193 ctx.master_slave_joint_control.store(Arc::new(new_state));
1194 ctx.fps_stats
1195 .load()
1196 .master_slave_joint_control_updates
1197 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1198 state.joint_control_frame_mask = 0;
1201 }
1202 },
1203
1204 ID_GRIPPER_CONTROL => {
1205 if let Ok(feedback) = GripperControlFeedback::try_from(*frame) {
1207 let system_timestamp_us = safe_system_timestamp_us();
1208
1209 let new_state = MasterSlaveGripperControlState {
1210 hardware_timestamp_us: frame.timestamp_us,
1211 system_timestamp_us,
1212 gripper_target_travel_mm: feedback.travel_mm,
1213 gripper_target_torque_nm: feedback.torque_nm,
1214 gripper_status_code: feedback.status_code,
1215 gripper_set_zero: feedback.set_zero,
1216 is_valid: true,
1217 };
1218
1219 ctx.master_slave_gripper_control.store(Arc::new(new_state));
1220 ctx.fps_stats
1221 .load()
1222 .master_slave_gripper_control_updates
1223 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1224 }
1226 },
1227
1228 _ => {
1230 debug!("RX thread: Received unhandled frame ID=0x{:X}", frame.id);
1232 },
1233 }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239 use std::collections::VecDeque;
1240 use std::sync::Arc;
1241 use std::thread;
1242 use std::time::Duration;
1243
1244 struct MockCanAdapter {
1246 receive_queue: VecDeque<PiperFrame>,
1247 sent_frames: Vec<PiperFrame>,
1248 }
1249
1250 impl MockCanAdapter {
1251 fn new() -> Self {
1252 Self {
1253 receive_queue: VecDeque::new(),
1254 sent_frames: Vec::new(),
1255 }
1256 }
1257
1258 fn queue_frame(&mut self, frame: PiperFrame) {
1259 self.receive_queue.push_back(frame);
1260 }
1261
1262 #[allow(dead_code)]
1263 fn take_sent_frames(&mut self) -> Vec<PiperFrame> {
1264 std::mem::take(&mut self.sent_frames)
1265 }
1266 }
1267
1268 impl CanAdapter for MockCanAdapter {
1269 fn send(&mut self, frame: PiperFrame) -> Result<(), CanError> {
1270 self.sent_frames.push(frame);
1271 Ok(())
1272 }
1273
1274 fn receive(&mut self) -> Result<PiperFrame, CanError> {
1275 self.receive_queue.pop_front().ok_or(CanError::Timeout)
1276 }
1277 }
1278
1279 #[test]
1280 fn test_pipeline_config_default() {
1281 let config = PipelineConfig::default();
1282 assert_eq!(config.receive_timeout_ms, 2);
1283 assert_eq!(config.frame_group_timeout_ms, 10);
1284 }
1285
1286 #[test]
1287 fn test_pipeline_config_custom() {
1288 let config = PipelineConfig {
1289 receive_timeout_ms: 5,
1290 frame_group_timeout_ms: 20,
1291 velocity_buffer_timeout_us: 10_000,
1292 };
1293 assert_eq!(config.receive_timeout_ms, 5);
1294 assert_eq!(config.frame_group_timeout_ms, 20);
1295 assert_eq!(config.velocity_buffer_timeout_us, 10_000);
1296 }
1297
1298 fn create_joint_feedback_frame_data(j1_deg: f64, j2_deg: f64) -> [u8; 8] {
1300 let j1_raw = (j1_deg * 1000.0) as i32;
1301 let j2_raw = (j2_deg * 1000.0) as i32;
1302 let mut data = [0u8; 8];
1303 data[0..4].copy_from_slice(&j1_raw.to_be_bytes());
1304 data[4..8].copy_from_slice(&j2_raw.to_be_bytes());
1305 data
1306 }
1307
1308 #[test]
1309 fn test_joint_pos_frame_commit_complete() {
1310 let ctx = Arc::new(PiperContext::new());
1311 let mut mock_can = MockCanAdapter::new();
1312 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(10);
1313
1314 let mut frame_2a5 = PiperFrame::new_standard(
1317 ID_JOINT_FEEDBACK_12 as u16,
1318 &create_joint_feedback_frame_data(10.0, 20.0),
1319 );
1320 frame_2a5.timestamp_us = 1000;
1321 let mut frame_2a6 = PiperFrame::new_standard(
1322 ID_JOINT_FEEDBACK_34 as u16,
1323 &create_joint_feedback_frame_data(30.0, 40.0),
1324 );
1325 frame_2a6.timestamp_us = 1001;
1326 let mut frame_2a7 = PiperFrame::new_standard(
1327 ID_JOINT_FEEDBACK_56 as u16,
1328 &create_joint_feedback_frame_data(50.0, 60.0),
1329 );
1330 frame_2a7.timestamp_us = 1002;
1331
1332 mock_can.queue_frame(frame_2a5);
1334 mock_can.queue_frame(frame_2a6);
1335 mock_can.queue_frame(frame_2a7);
1336
1337 let ctx_clone = ctx.clone();
1339 let config = PipelineConfig::default();
1340 let handle = thread::spawn(move || {
1341 io_loop(mock_can, cmd_rx, ctx_clone, config);
1342 });
1343
1344 thread::sleep(Duration::from_millis(100));
1346
1347 drop(cmd_tx);
1349 let start = std::time::Instant::now();
1351 while start.elapsed().as_secs() < 2 {
1352 if handle.is_finished() {
1353 break;
1354 }
1355 thread::sleep(Duration::from_millis(10));
1356 }
1357 let _ = handle.join();
1358
1359 let joint_pos = ctx.joint_position.load();
1362 assert!(
1365 joint_pos.joint_pos.iter().any(|&v| v != 0.0) || joint_pos.hardware_timestamp_us == 0
1366 );
1367 }
1368
1369 #[test]
1370 fn test_command_channel_processing() {
1371 let ctx = Arc::new(PiperContext::new());
1372 let mock_can = MockCanAdapter::new();
1373 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(10);
1374
1375 let config = PipelineConfig::default();
1376 let handle = thread::spawn(move || {
1377 io_loop(mock_can, cmd_rx, ctx, config);
1378 });
1379
1380 let cmd_frame = PiperFrame::new_standard(0x123, &[0x01, 0x02, 0x03]);
1382 cmd_tx.send(cmd_frame).unwrap();
1383
1384 thread::sleep(Duration::from_millis(50));
1386
1387 drop(cmd_tx);
1389 let start = std::time::Instant::now();
1391 while start.elapsed().as_secs() < 2 {
1392 if handle.is_finished() {
1393 break;
1394 }
1395 thread::sleep(Duration::from_millis(10));
1396 }
1397 let _ = handle.join();
1398
1399 }
1403}