1use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{mpsc, Barrier};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tracing::{info, error};
17use anyhow::Result;
18use data_portal::{PortalHeader, SharedMemoryTransport};
19
20#[derive(Debug, Clone)]
21pub struct TestResult {
22 pub test_name: String,
23 pub transport_mode: String,
24 pub total_operations: u64,
25 pub duration_secs: f64,
26 pub ops_per_sec: f64,
27 pub throughput_mbps: f64,
28 pub avg_latency_us: f64,
29 pub bytes_transferred: u64,
30}
31
32impl TestResult {
33 pub fn print_summary(&self) {
34 info!("📊 {} 测试结果:", self.test_name);
35 info!(" 传输模式: {}", self.transport_mode);
36 info!(" 操作次数: {} 次", self.total_operations);
37 info!(" 总耗时: {:.3} 秒", self.duration_secs);
38 info!(" 操作频率: {:.1} M ops/sec", self.ops_per_sec / 1_000_000.0);
39 info!(" 吞吐量: {:.1} MB/s", self.throughput_mbps);
40 info!(" 平均延迟: {:.3} μs", self.avg_latency_us);
41 info!(" 传输数据: {:.1} MB", self.bytes_transferred as f64 / (1024.0 * 1024.0));
42 }
43}
44
45pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47 info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48
49 let iterations = 1_000_000; let barrier = Arc::new(Barrier::new(2));
51 let (tx_results, mut rx_results) = mpsc::channel(2);
52
53 let start_time = Instant::now();
54
55 let server_barrier = barrier.clone();
57 let server_tx = tx_results.clone();
58 let server_task = tokio::spawn(async move {
59 let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61 server_barrier.wait().await;
62
63 let mut server_ops = 0u64;
64 let mut server_bytes = 0u64;
65
66 for i in 0..iterations {
67 let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69 let mut header_bytes = [0u8; 32];
70 header_bytes.copy_from_slice(read_data);
71 let header = PortalHeader::from_bytes(&header_bytes);
72
73 if header.verify_checksum() {
74 server_ops += 1;
75 server_bytes += 32;
76
77 let response = PortalHeader::new(2, 1024, i);
79 let response_bytes = response.to_bytes();
80 unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81 server_bytes += 32;
82 }
83
84 if i % 100_000 == 0 {
86 tokio::task::yield_now().await;
87 }
88 }
89
90 server_tx.send((server_ops, server_bytes)).await.unwrap();
91 Ok::<(), anyhow::Error>(())
92 });
93
94 let client_barrier = barrier.clone();
96 let client_tx = tx_results.clone();
97 let client_task = tokio::spawn(async move {
98 tokio::time::sleep(Duration::from_millis(10)).await; let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102 client_barrier.wait().await;
103
104 let mut client_ops = 0u64;
105 let mut client_bytes = 0u64;
106
107 for i in 0..iterations {
108 let header = PortalHeader::new(1, 1024, i);
110 let header_bytes = header.to_bytes();
111 unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112 client_bytes += 32;
113
114 let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116 let mut response_bytes = [0u8; 32];
117 response_bytes.copy_from_slice(read_data);
118 let response = PortalHeader::from_bytes(&response_bytes);
119
120 if response.verify_checksum() {
121 client_ops += 1;
122 client_bytes += 32;
123 }
124
125 if i % 100_000 == 0 {
126 tokio::task::yield_now().await;
127 }
128 }
129
130 client_tx.send((client_ops, client_bytes)).await.unwrap();
131 Ok::<(), anyhow::Error>(())
132 });
133
134 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136 server_result?;
137 client_result?;
138
139 drop(tx_results);
141 let mut total_ops = 0u64;
142 let mut total_bytes = 0u64;
143
144 while let Some((ops, bytes)) = rx_results.recv().await {
145 total_ops += ops;
146 total_bytes += bytes;
147 }
148
149 let duration = start_time.elapsed();
150 let duration_secs = duration.as_secs_f64();
151 let ops_per_sec = total_ops as f64 / duration_secs;
152 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154
155 Ok(TestResult {
156 test_name: "Rust ↔ Rust".to_string(),
157 transport_mode: "共享内存".to_string(),
158 total_operations: total_ops,
159 duration_secs,
160 ops_per_sec,
161 throughput_mbps,
162 avg_latency_us,
163 bytes_transferred: total_bytes,
164 })
165}
166
167pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169 info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170
171 let iterations = 100_000; let addr = "127.0.0.1:9091";
173 let barrier = Arc::new(Barrier::new(2));
174 let (tx_results, mut rx_results) = mpsc::channel(2);
175
176 let start_time = Instant::now();
177
178 let server_barrier = barrier.clone();
180 let server_tx = tx_results.clone();
181 let server_task = tokio::spawn(async move {
182 let listener = TcpListener::bind(addr).await?;
183 info!("TCP服务器已启动: {}", addr);
184 server_barrier.wait().await;
185
186 let mut server_ops = 0u64;
187 let mut server_bytes = 0u64;
188
189 let (mut stream, _) = listener.accept().await?;
190 let mut buffer = [0u8; 1024];
191
192 for _i in 0..iterations {
193 match stream.read(&mut buffer).await {
195 Ok(n) if n >= 32 => {
196 let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197 let header = PortalHeader::from_bytes(&header_bytes);
198
199 if header.verify_checksum() {
200 server_ops += 1;
201 server_bytes += n as u64;
202
203 let response = PortalHeader::new(2, 1024, header.sequence);
205 let response_bytes = response.to_bytes();
206 stream.write_all(&response_bytes).await?;
207 server_bytes += 32;
208 }
209 }
210 Ok(0) => break,
211 Ok(_) => continue,
212 Err(e) => {
213 error!("服务器读取错误: {}", e);
214 break;
215 }
216 }
217 }
218
219 server_tx.send((server_ops, server_bytes)).await.unwrap();
220 Ok::<(), anyhow::Error>(())
221 });
222
223 let client_barrier = barrier.clone();
225 let client_tx = tx_results.clone();
226 let client_task = tokio::spawn(async move {
227 tokio::time::sleep(Duration::from_millis(100)).await; let mut stream = TcpStream::connect(addr).await?;
230 client_barrier.wait().await;
231
232 let mut client_ops = 0u64;
233 let mut client_bytes = 0u64;
234 let mut buffer = [0u8; 1024];
235
236 for i in 0..iterations {
237 let header = PortalHeader::new(1, 1024, i);
239 let header_bytes = header.to_bytes();
240 stream.write_all(&header_bytes).await?;
241 client_bytes += 32;
242
243 match stream.read(&mut buffer).await {
245 Ok(n) if n >= 32 => {
246 let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247 let response = PortalHeader::from_bytes(&response_bytes);
248
249 if response.verify_checksum() {
250 client_ops += 1;
251 client_bytes += n as u64;
252 }
253 }
254 Ok(_) => continue,
255 Err(e) => {
256 error!("客户端读取错误: {}", e);
257 break;
258 }
259 }
260
261 if i % 10_000 == 0 {
262 tokio::task::yield_now().await;
263 }
264 }
265
266 client_tx.send((client_ops, client_bytes)).await.unwrap();
267 Ok::<(), anyhow::Error>(())
268 });
269
270 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272 server_result?;
273 client_result?;
274
275 drop(tx_results);
277 let mut total_ops = 0u64;
278 let mut total_bytes = 0u64;
279
280 while let Some((ops, bytes)) = rx_results.recv().await {
281 total_ops += ops;
282 total_bytes += bytes;
283 }
284
285 let duration = start_time.elapsed();
286 let duration_secs = duration.as_secs_f64();
287 let ops_per_sec = total_ops as f64 / duration_secs;
288 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290
291 Ok(TestResult {
292 test_name: "Rust ↔ Rust".to_string(),
293 transport_mode: "TCP网络".to_string(),
294 total_operations: total_ops,
295 duration_secs,
296 ops_per_sec,
297 throughput_mbps,
298 avg_latency_us,
299 bytes_transferred: total_bytes,
300 })
301}
302
303pub fn generate_performance_report(results: &[TestResult]) {
305 info!("📈 Data Portal 跨语言性能测试报告");
306 info!("================================================================");
307 info!("通信组合 | 传输模式 | 操作频率 | 吞吐量 | 延迟");
308 info!("---------------------|-----------|-------------|------------|--------");
309
310 for result in results {
311 info!(
312 "{:<20} | {:<9} | {:>9.1}M/s | {:>8.1}MB/s | {:>6.3}μs",
313 result.test_name,
314 result.transport_mode,
315 result.ops_per_sec / 1_000_000.0,
316 result.throughput_mbps,
317 result.avg_latency_us
318 );
319 }
320
321 info!("================================================================");
322
323 if results.len() >= 2 {
325 let shm_results: Vec<_> = results.iter().filter(|r| r.transport_mode.contains("共享内存")).collect();
326 let tcp_results: Vec<_> = results.iter().filter(|r| r.transport_mode.contains("TCP")).collect();
327
328 if !shm_results.is_empty() && !tcp_results.is_empty() {
329 let avg_shm_throughput: f64 = shm_results.iter().map(|r| r.throughput_mbps).sum::<f64>() / shm_results.len() as f64;
330 let avg_tcp_throughput: f64 = tcp_results.iter().map(|r| r.throughput_mbps).sum::<f64>() / tcp_results.len() as f64;
331 let improvement = avg_shm_throughput / avg_tcp_throughput;
332
333 info!("🔥 性能提升分析:");
334 info!(" 共享内存平均吞吐量: {:.1} MB/s", avg_shm_throughput);
335 info!(" TCP网络平均吞吐量: {:.1} MB/s", avg_tcp_throughput);
336 info!(" 共享内存 vs TCP: {:.1}x 性能提升", improvement);
337 }
338 }
339}
340
341#[tokio::main]
342async fn main() -> Result<()> {
343 tracing_subscriber::fmt()
345 .with_env_filter("info")
346 .init();
347
348 info!("🎯 Data Portal 跨语言性能测试");
349 info!("测试6组通信组合的双向通信性能");
350 println!();
351
352 let mut results = Vec::new();
353
354 match test_rust_rust_shared_memory().await {
356 Ok(result) => {
357 result.print_summary();
358 results.push(result);
359 }
360 Err(e) => error!("❌ Rust ↔ Rust 共享内存测试失败: {}", e),
361 }
362
363 println!();
364
365 match test_rust_rust_tcp().await {
367 Ok(result) => {
368 result.print_summary();
369 results.push(result);
370 }
371 Err(e) => error!("❌ Rust ↔ Rust TCP测试失败: {}", e),
372 }
373
374 println!();
375
376 if !results.is_empty() {
378 generate_performance_report(&results);
379 }
380
381 info!("🏁 Rust端测试完成!");
382 info!("📝 注意: Swift测试需要在Xcode中运行或使用swift命令");
383
384 Ok(())
385}