use piper_sdk::can::{CanAdapter, CanError, PiperFrame};
use piper_sdk::driver::*;
use piper_sdk::protocol::ids::*;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
struct MockCanAdapter {
receive_queue: Arc<Mutex<VecDeque<PiperFrame>>>,
sent_frames: Arc<Mutex<Vec<PiperFrame>>>,
}
impl MockCanAdapter {
fn new() -> Self {
Self {
receive_queue: Arc::new(Mutex::new(VecDeque::new())),
sent_frames: Arc::new(Mutex::new(Vec::new())),
}
}
fn queue_frame(&self, frame: PiperFrame) {
self.receive_queue.lock().unwrap().push_back(frame);
}
}
impl CanAdapter for MockCanAdapter {
fn send(&mut self, frame: PiperFrame) -> Result<(), CanError> {
self.sent_frames.lock().unwrap().push(frame);
Ok(())
}
fn receive(&mut self) -> Result<PiperFrame, CanError> {
self.receive_queue.lock().unwrap().pop_front().ok_or(CanError::Timeout)
}
}
#[test]
fn test_robot_status_feedback_update() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
let status_frame = PiperFrame::new_standard(
ID_ROBOT_STATUS as u16,
&[
0x01, 0x00, 0x01, 0x00, 0x00, 0x05, 0b0011_1111, 0b0000_0000, ],
);
mock_can_clone.queue_frame(status_frame);
std::thread::sleep(std::time::Duration::from_millis(200));
let control = piper.get_robot_control();
assert_eq!(control.control_mode, control.control_mode);
assert_eq!(control.robot_status, control.robot_status);
}
#[test]
fn test_gripper_feedback_update() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
let travel_mm = 100000i32;
let torque_nm = 1000i16;
let status_byte = 0u8;
let mut data = [0u8; 8];
data[0..4].copy_from_slice(&travel_mm.to_be_bytes());
data[4..6].copy_from_slice(&torque_nm.to_be_bytes());
data[6] = status_byte;
let gripper_frame = PiperFrame::new_standard(ID_GRIPPER_FEEDBACK as u16, &data);
mock_can_clone.queue_frame(gripper_frame);
std::thread::sleep(std::time::Duration::from_millis(200));
let _control = piper.get_robot_control();
let _gripper = piper.get_gripper();
let _driver = piper.get_joint_driver_low_speed();
}
#[test]
fn test_command_channel_send() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
let cmd_frame = PiperFrame::new_standard(0x150, &[0x01, 0x02, 0x03]);
piper.send_frame(cmd_frame).unwrap();
std::thread::sleep(std::time::Duration::from_millis(200));
}
#[test]
fn test_collision_protection_try_write_non_blocking() {
let ctx = Arc::new(PiperContext::new());
let _read_guard = ctx.collision_protection.read().unwrap();
let result = ctx.collision_protection.try_write();
assert!(
result.is_err(),
"try_write should fail when read lock is held"
);
drop(_read_guard);
let mut write_guard = ctx.collision_protection.write().unwrap();
write_guard.hardware_timestamp_us = 1000;
write_guard.protection_levels = [1, 2, 3, 4, 5, 6];
drop(write_guard);
let read_guard = ctx.collision_protection.read().unwrap();
assert_eq!(read_guard.hardware_timestamp_us, 1000);
}
#[test]
fn test_joint_driver_low_speed_feedback_update() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
for joint_index in 1..=6 {
let id = ID_JOINT_DRIVER_LOW_SPEED_BASE + (joint_index as u32 - 1);
let voltage = 240 + joint_index as u16 * 10; let driver_temp = 35 + joint_index as i16;
let motor_temp = 30 + joint_index as i8;
let bus_current = 1000 + joint_index as u16 * 100;
let mut data = [0u8; 8];
data[0..2].copy_from_slice(&voltage.to_be_bytes());
data[2..4].copy_from_slice(&driver_temp.to_be_bytes());
data[4] = motor_temp as u8;
data[5] = 0; data[6..8].copy_from_slice(&bus_current.to_be_bytes());
let mut frame = PiperFrame::new_standard(id as u16, &data);
frame.timestamp_us = 3000 + joint_index;
mock_can_clone.queue_frame(frame);
}
std::thread::sleep(std::time::Duration::from_millis(300));
let _driver = piper.get_joint_driver_low_speed();
}
#[test]
fn test_collision_protection_level_feedback_update() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
let protection_frame = PiperFrame::new_standard(
ID_COLLISION_PROTECTION_LEVEL_FEEDBACK as u16,
&[
0x05, 0x05, 0x05, 0x04, 0x04, 0x04, 0x00, 0x00, ],
);
mock_can_clone.queue_frame(protection_frame);
std::thread::sleep(std::time::Duration::from_millis(200));
let _driver = piper.get_joint_driver_low_speed();
}
#[test]
fn test_motor_limit_feedback_accumulation() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
for joint_index in 1..=6 {
let max_angle_deg = (180.0 * 10.0) as i16; let min_angle_deg = (-180.0 * 10.0) as i16; let max_velocity_rad_s = (5.0 * 100.0) as u16;
let mut data = [0u8; 8];
data[0] = joint_index;
data[1..3].copy_from_slice(&max_angle_deg.to_be_bytes());
data[3..5].copy_from_slice(&min_angle_deg.to_be_bytes());
data[5..7].copy_from_slice(&max_velocity_rad_s.to_be_bytes());
let mut frame = PiperFrame::new_standard(ID_MOTOR_LIMIT_FEEDBACK as u16, &data);
frame.timestamp_us = 4000 + joint_index as u64;
mock_can_clone.queue_frame(frame);
}
std::thread::sleep(std::time::Duration::from_millis(300));
let _limits = piper.get_joint_limit_config().unwrap();
let _accel = piper.get_joint_accel_config().unwrap();
let _end_limits = piper.get_end_limit_config().unwrap();
}
#[test]
fn test_motor_max_accel_feedback_accumulation() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
for joint_index in 1..=6 {
let max_accel_rad_s2 = (10.0 * 100.0) as u16;
let mut data = [0u8; 8];
data[0] = joint_index;
data[1..3].copy_from_slice(&max_accel_rad_s2.to_be_bytes());
let mut frame = PiperFrame::new_standard(ID_MOTOR_MAX_ACCEL_FEEDBACK as u16, &data);
frame.timestamp_us = 5000 + joint_index as u64;
mock_can_clone.queue_frame(frame);
}
std::thread::sleep(std::time::Duration::from_millis(300));
let _limits = piper.get_joint_limit_config().unwrap();
let _accel = piper.get_joint_accel_config().unwrap();
let _end_limits = piper.get_end_limit_config().unwrap();
}
#[test]
fn test_end_velocity_accel_feedback_update() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
let max_linear_vel = 500u16;
let max_angular_vel = 1000u16;
let max_linear_accel = 1000u16;
let max_angular_accel = 2000u16;
let mut data = [0u8; 8];
data[0..2].copy_from_slice(&max_linear_vel.to_be_bytes());
data[2..4].copy_from_slice(&max_angular_vel.to_be_bytes());
data[4..6].copy_from_slice(&max_linear_accel.to_be_bytes());
data[6..8].copy_from_slice(&max_angular_accel.to_be_bytes());
let mut frame = PiperFrame::new_standard(ID_END_VELOCITY_ACCEL_FEEDBACK as u16, &data);
frame.timestamp_us = 6000;
mock_can_clone.queue_frame(frame);
std::thread::sleep(std::time::Duration::from_millis(200));
let _limits = piper.get_joint_limit_config().unwrap();
let _accel = piper.get_joint_accel_config().unwrap();
let _end_limits = piper.get_end_limit_config().unwrap();
}
#[test]
fn test_can_receive_error_timeout() {
let can_adapter = MockCanAdapter {
receive_queue: Arc::new(Mutex::new(VecDeque::new())),
sent_frames: Arc::new(Mutex::new(Vec::new())),
};
let piper = Piper::new(can_adapter, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
let _joint_pos = piper.get_joint_position();
let _end_pose = piper.get_end_pose();
}
#[test]
fn test_can_send_error_handling() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
let cmd_frame = PiperFrame::new_standard(0x150, &[0x01]);
assert!(piper.send_frame(cmd_frame).is_ok());
}
#[test]
fn test_frame_parse_error_invalid_frame() {
let mock_can = MockCanAdapter::new();
let mock_can_clone = Arc::new(mock_can);
let can_adapter = MockCanAdapter {
receive_queue: mock_can_clone.receive_queue.clone(),
sent_frames: mock_can_clone.sent_frames.clone(),
};
let piper = Piper::new(can_adapter, None).unwrap();
let invalid_frame = PiperFrame::new_standard(ID_ROBOT_STATUS as u16, &[0x01; 4]); mock_can_clone.queue_frame(invalid_frame);
std::thread::sleep(std::time::Duration::from_millis(200));
let _control = piper.get_robot_control();
}