use piper_sdk::can::{CanError, PiperFrame, RxAdapter, TxAdapter};
use piper_sdk::driver::command::{CommandPriority, PiperCommand};
use piper_sdk::driver::{PipelineConfig, PiperContext, PiperMetrics, rx_loop, tx_loop_mailbox};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct MockRxAdapter {
frames: VecDeque<PiperFrame>,
receive_delay: Duration,
}
impl MockRxAdapter {
fn new(frames: Vec<PiperFrame>, receive_delay: Duration) -> Self {
Self {
frames: VecDeque::from(frames),
receive_delay,
}
}
}
impl RxAdapter for MockRxAdapter {
fn receive(&mut self) -> Result<PiperFrame, CanError> {
thread::sleep(self.receive_delay);
self.frames.pop_front().ok_or(CanError::Timeout)
}
}
struct MockTxAdapter {
sent_frames: Arc<Mutex<VecDeque<PiperFrame>>>,
send_delay: Duration,
}
impl MockTxAdapter {
fn new() -> Self {
Self {
sent_frames: Arc::new(Mutex::new(VecDeque::new())),
send_delay: Duration::from_micros(100),
}
}
#[allow(dead_code)]
fn sent_frames(&self) -> Vec<PiperFrame> {
self.sent_frames.lock().unwrap().iter().copied().collect()
}
}
impl TxAdapter for MockTxAdapter {
fn send(&mut self, frame: PiperFrame) -> Result<(), CanError> {
thread::sleep(self.send_delay);
self.sent_frames.lock().unwrap().push_back(frame);
Ok(())
}
}
fn generate_test_frames(count: usize, base_id: u32) -> Vec<PiperFrame> {
(0..count)
.map(|i| PiperFrame::new_standard((base_id + i as u32) as u16, &[i as u8; 8]))
.collect()
}
#[test]
fn test_priority_scheduling() {
let ctx = Arc::new(PiperContext::new());
let config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
let rx_frames = generate_test_frames(5, 0x251);
let rx_adapter = MockRxAdapter::new(rx_frames, Duration::from_millis(1));
let tx_adapter = MockTxAdapter::new();
let sent_frames = tx_adapter.sent_frames.clone();
let (reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let realtime_slot_clone = realtime_slot.clone();
let ctx_rx = ctx.clone();
let is_running_rx = is_running.clone();
let metrics_rx = metrics.clone();
let rx_handle = thread::spawn(move || {
rx_loop(rx_adapter, ctx_rx, config, is_running_rx, metrics_rx);
});
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
let reliable_frame1 = PiperFrame::new_standard(0x100, &[1, 1, 1]);
let reliable_frame2 = PiperFrame::new_standard(0x101, &[2, 2, 2]);
let realtime_frame = PiperFrame::new_standard(0x200, &[3, 3, 3]);
reliable_tx.send(reliable_frame1).unwrap();
reliable_tx.send(reliable_frame2).unwrap();
*realtime_slot_clone.lock().unwrap() = Some(
piper_sdk::driver::command::RealtimeCommand::single(realtime_frame),
);
thread::sleep(Duration::from_millis(150));
is_running.store(false, Ordering::Relaxed);
let _ = rx_handle.join();
let _ = tx_handle.join();
let binding = sent_frames.lock().unwrap();
let sent: Vec<PiperFrame> = binding.iter().copied().collect();
println!("Sent frames order:");
for (i, frame) in sent.iter().enumerate() {
println!(
" {}: ID=0x{:X}, data={:?}",
i,
frame.id,
&frame.data[..frame.len as usize]
);
}
let realtime_pos = sent.iter().position(|f| f.id == 0x200);
let reliable_pos1 = sent.iter().position(|f| f.id == 0x100);
let _reliable_pos2 = sent.iter().position(|f| f.id == 0x101);
assert!(realtime_pos.is_some(), "Realtime command should be sent");
assert!(reliable_pos1.is_some(), "Reliable command 1 should be sent");
assert!(
sent.len() >= 3,
"Should send at least 3 frames, got: {}",
sent.len()
);
}
#[test]
fn test_reliable_command_not_dropped() {
let ctx = Arc::new(PiperContext::new());
let _config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
struct SlowTxAdapter {
send_delay: Duration,
sent_count: Arc<AtomicU64>,
}
impl SlowTxAdapter {
fn new() -> Self {
Self {
send_delay: Duration::from_millis(20), sent_count: Arc::new(AtomicU64::new(0)),
}
}
}
impl TxAdapter for SlowTxAdapter {
fn send(&mut self, _frame: PiperFrame) -> Result<(), CanError> {
thread::sleep(self.send_delay);
self.sent_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
let tx_adapter = SlowTxAdapter::new();
let sent_count = tx_adapter.sent_count.clone();
let (reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
let reliable_commands: Vec<PiperFrame> = (0..15)
.map(|i| PiperFrame::new_standard(0x100 + i as u16, &[i as u8; 8]))
.collect();
let mut sent_successfully: u32 = 0;
for frame in reliable_commands.iter() {
match reliable_tx.try_send(*frame) {
Ok(_) => {
sent_successfully += 1;
},
Err(crossbeam_channel::TrySendError::Full(_)) => {
thread::sleep(Duration::from_millis(10));
match reliable_tx.try_send(*frame) {
Ok(_) => sent_successfully += 1,
Err(_) => break,
}
},
Err(_) => break,
}
}
println!("Sent {} reliable commands successfully", sent_successfully);
let min_wait_ms = (sent_successfully as u64).saturating_mul(20 * 15).max(2500);
let deadline = std::time::Instant::now() + Duration::from_millis(min_wait_ms);
while std::time::Instant::now() < deadline {
let processed = sent_count.load(Ordering::Relaxed);
if processed >= sent_successfully as u64 {
break;
}
thread::sleep(Duration::from_millis(20));
}
is_running.store(false, Ordering::Relaxed);
let _ = tx_handle.join();
let final_sent_count = sent_count.load(Ordering::Relaxed);
println!(
"Commands sent: {}, Commands processed: {}",
sent_successfully, final_sent_count
);
assert!(
final_sent_count >= sent_successfully as u64,
"All successfully sent reliable commands should be processed. Sent: {}, Processed: {}",
sent_successfully,
final_sent_count
);
let snapshot = metrics.snapshot();
println!("Reliable drops: {}", snapshot.tx_reliable_drops);
}
#[test]
fn test_realtime_overwrite_strategy() {
let ctx = Arc::new(PiperContext::new());
let _config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
struct SlowTxAdapter {
send_delay: Duration,
sent_frames: Arc<Mutex<VecDeque<PiperFrame>>>,
}
impl SlowTxAdapter {
fn new() -> Self {
Self {
send_delay: Duration::from_millis(10),
sent_frames: Arc::new(Mutex::new(VecDeque::new())),
}
}
}
impl TxAdapter for SlowTxAdapter {
fn send(&mut self, frame: PiperFrame) -> Result<(), CanError> {
thread::sleep(self.send_delay);
self.sent_frames.lock().unwrap().push_back(frame);
Ok(())
}
}
let tx_adapter = SlowTxAdapter::new();
let sent_frames = tx_adapter.sent_frames.clone();
let (_reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let realtime_slot_clone = realtime_slot.clone();
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
let realtime_commands: Vec<PiperFrame> = (0..5)
.map(|i| PiperFrame::new_standard(0x200 + i as u16, &[i as u8; 8]))
.collect();
for frame in realtime_commands.iter() {
*realtime_slot_clone.lock().unwrap() =
Some(piper_sdk::driver::command::RealtimeCommand::single(*frame));
thread::sleep(Duration::from_millis(2));
}
thread::sleep(Duration::from_millis(200));
is_running.store(false, Ordering::Relaxed);
let _ = tx_handle.join();
let binding = sent_frames.lock().unwrap();
let sent: Vec<PiperFrame> = binding.iter().copied().collect();
assert!(
!sent.is_empty(),
"Should send at least some realtime commands, got: {}",
sent.len()
);
let snapshot = metrics.snapshot();
println!("Realtime overwrites: {}", snapshot.tx_realtime_overwrites);
}
#[test]
fn test_command_type_conversion() {
let frame = PiperFrame::new_standard(0x123, &[1, 2, 3]);
let realtime_cmd = PiperCommand::realtime(frame);
assert_eq!(realtime_cmd.priority(), CommandPriority::RealtimeControl);
assert_eq!(realtime_cmd.frame().id, 0x123);
let reliable_cmd = PiperCommand::reliable(frame);
assert_eq!(reliable_cmd.priority(), CommandPriority::ReliableCommand);
assert_eq!(reliable_cmd.frame().id, 0x123);
let cmd: PiperCommand = frame.into();
assert_eq!(cmd.priority(), CommandPriority::ReliableCommand);
let converted_frame: PiperFrame = realtime_cmd.into();
assert_eq!(converted_frame.id, 0x123);
}