use piper_sdk::can::{CanError, PiperFrame, RxAdapter, TxAdapter};
use piper_sdk::driver::command::PiperCommand;
use piper_sdk::driver::{PipelineConfig, PiperContext, PiperMetrics, rx_loop, tx_loop_mailbox};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
fn is_ci_env() -> bool {
std::env::var("CI").is_ok()
|| std::env::var("GITHUB_ACTIONS").is_ok()
|| std::env::var("GITLAB_CI").is_ok()
|| std::env::var("CIRCLECI").is_ok()
|| std::env::var("TRAVIS").is_ok()
|| std::env::var("APPVEYOR").is_ok()
}
#[derive(Debug, Clone)]
pub struct PerformanceBaseline {
pub rx_interval_p95: Duration,
pub tx_latency_p95: Duration,
pub send_duration_p95: Duration,
pub throughput_fps: f64,
pub timestamp: u64,
}
impl PerformanceBaseline {
pub fn new() -> Self {
Self {
rx_interval_p95: Duration::from_millis(2),
tx_latency_p95: Duration::from_millis(1),
send_duration_p95: Duration::from_micros(500),
throughput_fps: 500.0,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
pub fn to_markdown(&self) -> String {
format!(
r#"## Performance Baseline
- **RX Interval P95**: {:?}
- **TX Latency P95**: {:?}
- **Send Duration P95**: {:?}
- **Throughput**: {:.2} fps
- **Timestamp**: {}
"#,
self.rx_interval_p95,
self.tx_latency_p95,
self.send_duration_p95,
self.throughput_fps,
self.timestamp
)
}
}
impl Default for PerformanceBaseline {
fn default() -> Self {
Self::new()
}
}
pub struct PerformanceRegressionTest {
baseline: PerformanceBaseline,
current: PerformanceBaseline,
regression_threshold: f64,
}
impl PerformanceRegressionTest {
pub fn new(baseline: PerformanceBaseline, regression_threshold: f64) -> Self {
Self {
baseline,
current: PerformanceBaseline::new(),
regression_threshold,
}
}
pub fn set_current(&mut self, current: PerformanceBaseline) {
self.current = current;
}
pub fn check_regression(&self) -> Result<(), Vec<String>> {
let mut errors = Vec::new();
let rx_regression = (self.current.rx_interval_p95.as_nanos() as f64
- self.baseline.rx_interval_p95.as_nanos() as f64)
/ self.baseline.rx_interval_p95.as_nanos() as f64
* 100.0;
if rx_regression > self.regression_threshold {
errors.push(format!(
"RX interval P95 regression: {:.2}% (baseline: {:?}, current: {:?})",
rx_regression, self.baseline.rx_interval_p95, self.current.rx_interval_p95
));
}
let tx_regression = (self.current.tx_latency_p95.as_nanos() as f64
- self.baseline.tx_latency_p95.as_nanos() as f64)
/ self.baseline.tx_latency_p95.as_nanos() as f64
* 100.0;
if tx_regression > self.regression_threshold {
errors.push(format!(
"TX latency P95 regression: {:.2}% (baseline: {:?}, current: {:?})",
tx_regression, self.baseline.tx_latency_p95, self.current.tx_latency_p95
));
}
let send_regression = (self.current.send_duration_p95.as_nanos() as f64
- self.baseline.send_duration_p95.as_nanos() as f64)
/ self.baseline.send_duration_p95.as_nanos() as f64
* 100.0;
if send_regression > self.regression_threshold {
errors.push(format!(
"Send duration P95 regression: {:.2}% (baseline: {:?}, current: {:?})",
send_regression, self.baseline.send_duration_p95, self.current.send_duration_p95
));
}
let throughput_regression = (self.baseline.throughput_fps - self.current.throughput_fps)
/ self.baseline.throughput_fps
* 100.0;
if throughput_regression > self.regression_threshold {
errors.push(format!(
"Throughput regression: {:.2}% (baseline: {:.2} fps, current: {:.2} fps)",
throughput_regression, self.baseline.throughput_fps, self.current.throughput_fps
));
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
pub fn generate_report(&self) -> String {
let check_result = self.check_regression();
let status = if check_result.is_ok() {
"✅ PASS"
} else {
"❌ FAIL"
};
format!(
r#"# Performance Regression Test Report
**Status**: {}
**Regression Threshold**: {:.1}%
## Baseline
{}
## Current
{}
## Comparison
- **RX Interval P95**: {:?} → {:?} ({:+.2}%)
- **TX Latency P95**: {:?} → {:?} ({:+.2}%)
- **Send Duration P95**: {:?} → {:?} ({:+.2}%)
- **Throughput**: {:.2} → {:.2} fps ({:+.2}%)
{}
"#,
status,
self.regression_threshold,
self.baseline.to_markdown(),
self.current.to_markdown(),
self.baseline.rx_interval_p95,
self.current.rx_interval_p95,
(self.current.rx_interval_p95.as_nanos() as f64
- self.baseline.rx_interval_p95.as_nanos() as f64)
/ self.baseline.rx_interval_p95.as_nanos() as f64
* 100.0,
self.baseline.tx_latency_p95,
self.current.tx_latency_p95,
(self.current.tx_latency_p95.as_nanos() as f64
- self.baseline.tx_latency_p95.as_nanos() as f64)
/ self.baseline.tx_latency_p95.as_nanos() as f64
* 100.0,
self.baseline.send_duration_p95,
self.current.send_duration_p95,
(self.current.send_duration_p95.as_nanos() as f64
- self.baseline.send_duration_p95.as_nanos() as f64)
/ self.baseline.send_duration_p95.as_nanos() as f64
* 100.0,
self.baseline.throughput_fps,
self.current.throughput_fps,
(self.current.throughput_fps - self.baseline.throughput_fps)
/ self.baseline.throughput_fps
* 100.0,
if let Err(errors) = &check_result {
format!("\n## Regression Errors\n\n{}\n", errors.join("\n"))
} else {
"\n✅ No performance regression detected.\n".to_string()
}
)
}
}
struct SimpleRxAdapter {
frames: VecDeque<PiperFrame>,
interval: Duration,
frame_count: Arc<AtomicU64>,
start_time: Instant,
}
impl SimpleRxAdapter {
fn new(frames_per_second: u32, test_duration: Duration) -> Self {
let mut frames = VecDeque::new();
let total_frames = frames_per_second * test_duration.as_secs() as u32;
for i in 0..total_frames {
frames.push_back(PiperFrame::new_standard(
(0x251 + (i % 6)) as u16,
&[i as u8; 8],
));
}
Self {
frames,
interval: Duration::from_millis(1000 / frames_per_second as u64),
frame_count: Arc::new(AtomicU64::new(0)),
start_time: Instant::now(),
}
}
#[allow(dead_code)]
fn frame_count(&self) -> u64 {
self.frame_count.load(Ordering::Relaxed)
}
}
impl RxAdapter for SimpleRxAdapter {
fn receive(&mut self) -> Result<PiperFrame, CanError> {
let elapsed = self.start_time.elapsed();
let expected_frame_index = (elapsed.as_millis() / self.interval.as_millis()) as usize;
if expected_frame_index >= self.frames.len() {
return Err(CanError::Timeout);
}
let next_frame_time = self.start_time + self.interval * expected_frame_index as u32;
let now = Instant::now();
if now < next_frame_time {
thread::sleep(next_frame_time - now);
}
self.frame_count.fetch_add(1, Ordering::Relaxed);
self.frames.pop_front().ok_or(CanError::Timeout)
}
}
struct SimpleTxAdapter {
send_delay: Duration,
sent_count: Arc<AtomicU64>,
send_times: Arc<Mutex<Vec<(Instant, Duration)>>>,
}
impl SimpleTxAdapter {
fn new(send_delay: Duration) -> Self {
Self {
send_delay,
sent_count: Arc::new(AtomicU64::new(0)),
send_times: Arc::new(Mutex::new(Vec::new())),
}
}
#[allow(dead_code)]
fn sent_count(&self) -> u64 {
self.sent_count.load(Ordering::Relaxed)
}
#[allow(dead_code)]
fn send_times(&self) -> Vec<(Instant, Duration)> {
self.send_times.lock().unwrap().clone()
}
}
impl TxAdapter for SimpleTxAdapter {
fn send(&mut self, _frame: PiperFrame) -> Result<(), CanError> {
let start = Instant::now();
thread::sleep(self.send_delay);
let duration = start.elapsed();
self.sent_count.fetch_add(1, Ordering::Relaxed);
self.send_times.lock().unwrap().push((start, duration));
Ok(())
}
}
fn measure_performance(frequency_hz: u32, test_duration: Duration) -> PerformanceBaseline {
let ctx = Arc::new(PiperContext::new());
let config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
let rx_adapter = SimpleRxAdapter::new(frequency_hz, test_duration);
let tx_adapter = SimpleTxAdapter::new(Duration::from_micros(100));
let send_times = tx_adapter.send_times.clone();
let (realtime_tx, _realtime_rx) = crossbeam_channel::bounded::<PiperFrame>(1);
let (_reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let ctx_rx = ctx.clone();
let is_running_rx = is_running.clone();
let metrics_rx = metrics.clone();
let rx_handle = thread::spawn(move || {
rx_loop(rx_adapter, ctx_rx, config, is_running_rx, metrics_rx);
});
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
let mut rx_intervals = Vec::new();
let mut last_update_time = Instant::now();
let mut last_update_count = 0u64;
let mut tx_latencies = Vec::new();
let mut command_count = 0u32;
let start = Instant::now();
while start.elapsed() < test_duration {
let current_count = metrics.rx_frames_valid.load(Ordering::Relaxed);
if current_count > last_update_count {
let period = last_update_time.elapsed();
rx_intervals.push(period);
last_update_time = Instant::now();
last_update_count = current_count;
}
let api_call_time = Instant::now();
let frame = PiperFrame::new_standard(
0x200 + (command_count % 10) as u16,
&[command_count as u8; 8],
);
if realtime_tx.try_send(frame).is_ok() {
let mut retries = 0;
while retries < 100 {
let times = send_times.lock().unwrap();
if times.len() > command_count as usize {
let (send_time, _) = times[command_count as usize];
let latency = send_time.duration_since(api_call_time);
tx_latencies.push(latency);
break;
}
drop(times);
thread::sleep(Duration::from_micros(100));
retries += 1;
}
command_count += 1;
}
thread::sleep(Duration::from_millis(2)); }
thread::sleep(Duration::from_millis(200));
is_running.store(false, Ordering::Relaxed);
let _ = rx_handle.join();
let _ = tx_handle.join();
rx_intervals.sort();
tx_latencies.sort();
let send_times_vec = send_times.lock().unwrap();
let mut send_durations: Vec<Duration> = send_times_vec.iter().map(|(_, d)| *d).collect();
send_durations.sort();
let rx_interval_p95 = if rx_intervals.is_empty() {
Duration::from_millis(2)
} else {
let index = (rx_intervals.len() as f64 * 0.95).ceil() as usize - 1;
rx_intervals[index.min(rx_intervals.len() - 1)]
};
let tx_latency_p95 = if tx_latencies.is_empty() {
Duration::from_millis(1)
} else {
let index = (tx_latencies.len() as f64 * 0.95).ceil() as usize - 1;
tx_latencies[index.min(tx_latencies.len() - 1)]
};
let send_duration_p95 = if send_durations.is_empty() {
Duration::from_micros(500)
} else {
let index = (send_durations.len() as f64 * 0.95).ceil() as usize - 1;
send_durations[index.min(send_durations.len() - 1)]
};
let throughput_fps =
metrics.rx_frames_valid.load(Ordering::Relaxed) as f64 / test_duration.as_secs_f64();
PerformanceBaseline {
rx_interval_p95,
tx_latency_p95,
send_duration_p95,
throughput_fps,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
#[test]
fn test_performance_regression() {
let baseline = PerformanceBaseline {
rx_interval_p95: Duration::from_millis(5), tx_latency_p95: Duration::from_millis(1),
send_duration_p95: Duration::from_micros(500),
throughput_fps: 400.0, timestamp: 0,
};
let test_duration = Duration::from_secs(3);
let current = measure_performance(500, test_duration);
let mut regression_test = PerformanceRegressionTest::new(baseline, 20.0); regression_test.set_current(current);
match regression_test.check_regression() {
Ok(_) => {
println!("✅ Performance regression test passed");
},
Err(errors) => {
println!("❌ Performance regression detected:");
for error in &errors {
println!(" - {}", error);
}
},
}
let report = regression_test.generate_report();
println!("{}", report);
}
#[test]
fn test_command_priority_performance() {
let test_duration = Duration::from_secs(2);
let _frequency_hz = 500;
let ctx = Arc::new(PiperContext::new());
let _config = PipelineConfig::default();
let is_running = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(PiperMetrics::new());
let tx_adapter = SimpleTxAdapter::new(Duration::from_micros(100));
let (realtime_tx, _realtime_rx) = crossbeam_channel::bounded::<PiperFrame>(1);
let (_reliable_tx, reliable_rx) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let ctx_tx = ctx.clone();
let is_running_tx = is_running.clone();
let metrics_tx = metrics.clone();
let tx_handle = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter,
realtime_slot,
reliable_rx,
is_running_tx,
metrics_tx,
ctx_tx,
);
});
let start = Instant::now();
let mut direct_send_count = 0u32;
while start.elapsed() < test_duration {
let frame = PiperFrame::new_standard(
0x200 + (direct_send_count % 10) as u16,
&[direct_send_count as u8; 8],
);
if realtime_tx.try_send(frame).is_ok() {
direct_send_count += 1;
}
thread::sleep(Duration::from_millis(2));
}
let direct_send_duration = start.elapsed();
let direct_send_rate = direct_send_count as f64 / direct_send_duration.as_secs_f64();
thread::sleep(Duration::from_millis(200));
is_running.store(false, Ordering::Relaxed);
let _ = tx_handle.join();
let is_running2 = Arc::new(AtomicBool::new(true));
let metrics2 = Arc::new(PiperMetrics::new());
let tx_adapter2 = SimpleTxAdapter::new(Duration::from_micros(100));
let (realtime_tx2, _realtime_rx2) = crossbeam_channel::bounded::<PiperFrame>(1);
let (_reliable_tx2, reliable_rx2) = crossbeam_channel::bounded::<PiperFrame>(10);
let realtime_slot2: Arc<std::sync::Mutex<Option<piper_sdk::driver::command::RealtimeCommand>>> =
Arc::new(std::sync::Mutex::new(None));
let ctx_tx2 = ctx.clone();
let is_running_tx2 = is_running2.clone();
let metrics_tx2 = metrics2.clone();
let tx_handle2 = thread::spawn(move || {
tx_loop_mailbox(
tx_adapter2,
realtime_slot2,
reliable_rx2,
is_running_tx2,
metrics_tx2,
ctx_tx2,
);
});
let start2 = Instant::now();
let mut command_send_count = 0u32;
while start2.elapsed() < test_duration {
let frame = PiperFrame::new_standard(
0x200 + (command_send_count % 10) as u16,
&[command_send_count as u8; 8],
);
let cmd = PiperCommand::realtime(frame);
if realtime_tx2.try_send(cmd.frame()).is_ok() {
command_send_count += 1;
}
thread::sleep(Duration::from_millis(2));
}
let command_send_duration = start2.elapsed();
let command_send_rate = command_send_count as f64 / command_send_duration.as_secs_f64();
thread::sleep(Duration::from_millis(200));
is_running2.store(false, Ordering::Relaxed);
let _ = tx_handle2.join();
let overhead = (direct_send_rate - command_send_rate).abs() / direct_send_rate * 100.0;
println!("Direct send rate: {:.2} fps", direct_send_rate);
println!("Command send rate: {:.2} fps", command_send_rate);
println!("Overhead: {:.2}%", overhead);
let threshold = if is_ci_env() { 10.0 * 2.0 } else { 10.0 };
assert!(
overhead < threshold,
"Command priority overhead should be < {:.1}% (CI环境已放宽), got: {:.2}%",
threshold,
overhead
);
}
#[test]
fn test_baseline_serialization() {
let baseline = PerformanceBaseline::new();
let report = baseline.to_markdown();
assert!(report.contains("Performance Baseline"));
assert!(report.contains("RX Interval P95"));
assert!(report.contains("TX Latency P95"));
assert!(report.contains("Send Duration P95"));
assert!(report.contains("Throughput"));
println!("Baseline serialization test passed");
println!("Report:\n{}", report);
}