use piper_sdk::can::{
CanDeviceError, CanDeviceErrorKind, CanError, PiperFrame, RxAdapter, TxAdapter,
};
use piper_sdk::driver::{PipelineConfig, PiperContext, PiperMetrics, rx_loop, tx_loop_mailbox};
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant};
fn is_ci_env() -> bool {
std::env::var("CI").is_ok()
|| std::env::var("GITHUB_ACTIONS").is_ok()
|| std::env::var("GITLAB_CI").is_ok()
|| std::env::var("CIRCLECI").is_ok()
|| std::env::var("TRAVIS").is_ok()
|| std::env::var("APPVEYOR").is_ok()
}
fn adjust_threshold_ms(local_threshold_ms: u64) -> Duration {
let multiplier = if is_ci_env() { 5 } else { 1 };
Duration::from_millis(local_threshold_ms * multiplier)
}
struct MockRxAdapter {
frames: VecDeque<PiperFrame>,
receive_delay: Duration,
should_fail: Arc<AtomicBool>,
}
impl MockRxAdapter {
fn new(frames: Vec<PiperFrame>, receive_delay: Duration) -> Self {
Self {
frames: VecDeque::from(frames),
receive_delay,
should_fail: Arc::new(AtomicBool::new(false)),
}
}
#[allow(dead_code)]
fn set_should_fail(&self, fail: bool) {
self.should_fail.store(fail, Ordering::Relaxed);
}
}
impl RxAdapter for MockRxAdapter {
fn receive(&mut self) -> Result<PiperFrame, CanError> {
if self.should_fail.load(Ordering::Relaxed) {
return Err(CanError::Device(CanDeviceError::new(
CanDeviceErrorKind::NoDevice,
"Device disconnected",
)));
}
thread::sleep(self.receive_delay);
self.frames.pop_front().ok_or(CanError::Timeout)
}
}
struct MockTxAdapter {
send_delay: Duration,
should_timeout: Arc<AtomicBool>,
should_fail: Arc<AtomicBool>,
sent_count: Arc<AtomicU64>,
}
impl MockTxAdapter {
fn new(send_delay: Duration) -> Self {
Self {
send_delay,
should_timeout: Arc::new(AtomicBool::new(false)),
should_fail: Arc::new(AtomicBool::new(false)),
sent_count: Arc::new(AtomicU64::new(0)),
}
}
#[allow(dead_code)]
fn set_should_timeout(&self, timeout: bool) {
self.should_timeout.store(timeout, Ordering::Relaxed);
}
#[allow(dead_code)]
fn set_should_fail(&self, fail: bool) {
self.should_fail.store(fail, Ordering::Relaxed);
}
#[allow(dead_code)]
fn sent_count(&self) -> u64 {
self.sent_count.load(Ordering::Relaxed)
}
}
impl TxAdapter for MockTxAdapter {
fn send(&mut self, _frame: PiperFrame) -> Result<(), CanError> {
if self.should_fail.load(Ordering::Relaxed) {
return Err(CanError::Device(CanDeviceError::new(
CanDeviceErrorKind::NoDevice,
"Device disconnected",
)));
}
if self.should_timeout.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(100));
return Err(CanError::Timeout);
}
thread::sleep(self.send_delay);
self.sent_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
fn generate_test_frames(count: usize) -> Vec<PiperFrame> {
(0..count)
.map(|i| PiperFrame::new_standard((0x251 + (i % 6)) as u16, &[i as u8; 8]))
.collect()
}
#[test]
fn test_rx_unaffected_by_tx_timeout() {
let ctx = Arc::new(PiperContext::new());
let config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
let rx_frames = generate_test_frames(100);
let rx_adapter = MockRxAdapter::new(rx_frames, Duration::from_millis(2));
let tx_adapter = MockTxAdapter::new(Duration::from_millis(1));
let (reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let ctx_rx = ctx.clone();
let is_running_rx = is_running.clone();
let metrics_rx = metrics.clone();
let rx_handle = thread::spawn(move || {
rx_loop(rx_adapter, ctx_rx, config, is_running_rx, metrics_rx);
});
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
thread::sleep(Duration::from_millis(50));
let initial_rx_count = metrics.rx_frames_valid.load(Ordering::Relaxed);
reliable_tx.send(PiperFrame::new_standard(0x123, &[1, 2, 3])).unwrap();
thread::sleep(Duration::from_millis(100));
let final_rx_count = metrics.rx_frames_valid.load(Ordering::Relaxed);
let rx_updates = final_rx_count.saturating_sub(initial_rx_count);
assert!(
rx_updates > 0,
"RX should continue receiving frames even when TX has issues. Received: {}",
rx_updates
);
let rx_timeouts = metrics.rx_timeouts.load(Ordering::Relaxed);
let total_rx_attempts = metrics.rx_frames_total.load(Ordering::Relaxed);
let timeout_ratio = if total_rx_attempts > 0 {
rx_timeouts as f64 / total_rx_attempts as f64
} else {
0.0
};
assert!(
timeout_ratio < 0.5,
"RX timeout ratio should be low (< 50%), got: {:.2}%",
timeout_ratio * 100.0
);
is_running.store(false, Ordering::Relaxed);
let _ = rx_handle.join();
let _ = tx_handle.join();
}
#[test]
fn test_tx_detects_rx_failure() {
let ctx = Arc::new(PiperContext::new());
let config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
let rx_frames = generate_test_frames(10);
let rx_adapter = MockRxAdapter::new(rx_frames, Duration::from_millis(2));
let rx_should_fail = rx_adapter.should_fail.clone();
let tx_adapter = MockTxAdapter::new(Duration::from_millis(1));
let (_reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let ctx_rx = ctx.clone();
let is_running_rx = is_running.clone();
let metrics_rx = metrics.clone();
let rx_handle = thread::spawn(move || {
rx_loop(rx_adapter, ctx_rx, config, is_running_rx, metrics_rx);
});
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
thread::sleep(Duration::from_millis(20));
rx_should_fail.store(true, Ordering::Relaxed);
let start = Instant::now();
let _tx_exit_timeout = Duration::from_millis(200);
let mut tx_exited = false;
for _ in 0..20 {
if tx_handle.is_finished() {
tx_exited = true;
break;
}
thread::sleep(Duration::from_millis(10));
}
let elapsed = start.elapsed();
let threshold = adjust_threshold_ms(200);
assert!(
tx_exited,
"TX thread should exit within {:?} after RX failure (CI环境已放宽). Elapsed: {:?}",
threshold, elapsed
);
assert!(
elapsed < threshold,
"TX thread should detect RX failure quickly (< {:?}, CI环境已放宽). Elapsed: {:?}",
threshold,
elapsed
);
assert!(
!is_running.load(Ordering::Relaxed),
"is_running flag should be false after RX failure"
);
let _ = rx_handle.join();
let _ = tx_handle.join();
}
#[test]
fn test_thread_lifecycle_linkage() {
let ctx = Arc::new(PiperContext::new());
let config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
let rx_frames = generate_test_frames(5);
let rx_adapter = MockRxAdapter::new(rx_frames, Duration::from_millis(2));
let rx_should_fail = rx_adapter.should_fail.clone();
let tx_adapter = MockTxAdapter::new(Duration::from_millis(1));
let (_reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let ctx_rx = ctx.clone();
let is_running_rx = is_running.clone();
let metrics_rx = metrics.clone();
let rx_handle = thread::spawn(move || {
rx_loop(rx_adapter, ctx_rx, config, is_running_rx, metrics_rx);
});
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
thread::sleep(Duration::from_millis(20));
rx_should_fail.store(true, Ordering::Relaxed);
let start = Instant::now();
let mut both_exited = false;
for _ in 0..30 {
if rx_handle.is_finished() && tx_handle.is_finished() {
both_exited = true;
break;
}
thread::sleep(Duration::from_millis(10));
}
let elapsed = start.elapsed();
assert!(
both_exited,
"Both threads should exit after RX failure. Elapsed: {:?}",
elapsed
);
let threshold = adjust_threshold_ms(300);
assert!(
elapsed < threshold,
"Threads should exit quickly (< {:?}, CI环境已放宽). Elapsed: {:?}",
threshold,
elapsed
);
assert!(
!is_running.load(Ordering::Relaxed),
"is_running flag should be false after thread failure"
);
let _ = rx_handle.join();
let _ = tx_handle.join();
}