cross_language_test/
cross_language_test.rs

1//! 跨语言通信性能测试
2//! 
3//! 测试6组通信组合:
4//! 1. Rust ↔ Rust (共享内存)
5//! 2. Rust ↔ Rust (TCP)
6//! 3. Swift ↔ Swift (共享内存) 
7//! 4. Swift ↔ Swift (TCP)
8//! 5. Rust ↔ Swift (共享内存)
9//! 6. Rust ↔ Swift (TCP)
10
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{mpsc, Barrier};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tracing::{info, error};
17use anyhow::Result;
18use data_portal::{PortalHeader, SharedMemoryTransport};
19
20#[derive(Debug, Clone)]
21pub struct TestResult {
22    pub test_name: String,
23    pub transport_mode: String,
24    pub total_operations: u64,
25    pub duration_secs: f64,
26    pub ops_per_sec: f64,
27    pub throughput_mbps: f64,
28    pub avg_latency_us: f64,
29    pub bytes_transferred: u64,
30}
31
32impl TestResult {
33    pub fn print_summary(&self) {
34        info!("📊 {} 测试结果:", self.test_name);
35        info!("  传输模式: {}", self.transport_mode);
36        info!("  操作次数: {} 次", self.total_operations);
37        info!("  总耗时: {:.3} 秒", self.duration_secs);
38        info!("  操作频率: {:.1} M ops/sec", self.ops_per_sec / 1_000_000.0);
39        info!("  吞吐量: {:.1} MB/s", self.throughput_mbps);
40        info!("  平均延迟: {:.3} μs", self.avg_latency_us);
41        info!("  传输数据: {:.1} MB", self.bytes_transferred as f64 / (1024.0 * 1024.0));
42    }
43}
44
45/// 测试1: Rust ↔ Rust 共享内存双向通信
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47    info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48    
49    let iterations = 1_000_000; // 100万次双向操作
50    let barrier = Arc::new(Barrier::new(2));
51    let (tx_results, mut rx_results) = mpsc::channel(2);
52    
53    let start_time = Instant::now();
54    
55    // 服务器端任务
56    let server_barrier = barrier.clone();
57    let server_tx = tx_results.clone();
58    let server_task = tokio::spawn(async move {
59        // 创建共享内存段
60        let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61        server_barrier.wait().await;
62        
63        let mut server_ops = 0u64;
64        let mut server_bytes = 0u64;
65        
66        for i in 0..iterations {
67            // 读取客户端消息
68            let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69            let mut header_bytes = [0u8; 32];
70            header_bytes.copy_from_slice(read_data);
71            let header = PortalHeader::from_bytes(&header_bytes);
72            
73            if header.verify_checksum() {
74                server_ops += 1;
75                server_bytes += 32;
76                
77                // 回复消息
78                let response = PortalHeader::new(2, 1024, i);
79                let response_bytes = response.to_bytes();
80                unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81                server_bytes += 32;
82            }
83            
84            // 每10万次让出控制权
85            if i % 100_000 == 0 {
86                tokio::task::yield_now().await;
87            }
88        }
89        
90        server_tx.send((server_ops, server_bytes)).await.unwrap();
91        Ok::<(), anyhow::Error>(())
92    });
93    
94    // 客户端任务
95    let client_barrier = barrier.clone();
96    let client_tx = tx_results.clone();
97    let client_task = tokio::spawn(async move {
98        tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99        
100        // 连接到相同的共享内存段
101        let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102        client_barrier.wait().await;
103        
104        let mut client_ops = 0u64;
105        let mut client_bytes = 0u64;
106        
107        for i in 0..iterations {
108            // 发送消息
109            let header = PortalHeader::new(1, 1024, i);
110            let header_bytes = header.to_bytes();
111            unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112            client_bytes += 32;
113            
114            // 读取回复
115            let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116            let mut response_bytes = [0u8; 32];
117            response_bytes.copy_from_slice(read_data);
118            let response = PortalHeader::from_bytes(&response_bytes);
119            
120            if response.verify_checksum() {
121                client_ops += 1;
122                client_bytes += 32;
123            }
124            
125            if i % 100_000 == 0 {
126                tokio::task::yield_now().await;
127            }
128        }
129        
130        client_tx.send((client_ops, client_bytes)).await.unwrap();
131        Ok::<(), anyhow::Error>(())
132    });
133    
134    // 等待任务完成
135    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136    server_result?;
137    client_result?;
138    
139    // 收集结果
140    drop(tx_results);
141    let mut total_ops = 0u64;
142    let mut total_bytes = 0u64;
143    
144    while let Some((ops, bytes)) = rx_results.recv().await {
145        total_ops += ops;
146        total_bytes += bytes;
147    }
148    
149    let duration = start_time.elapsed();
150    let duration_secs = duration.as_secs_f64();
151    let ops_per_sec = total_ops as f64 / duration_secs;
152    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154    
155    Ok(TestResult {
156        test_name: "Rust ↔ Rust".to_string(),
157        transport_mode: "共享内存".to_string(),
158        total_operations: total_ops,
159        duration_secs,
160        ops_per_sec,
161        throughput_mbps,
162        avg_latency_us,
163        bytes_transferred: total_bytes,
164    })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169    info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170    
171    let iterations = 100_000; // 10万次双向操作(TCP较慢)
172    let addr = "127.0.0.1:9091";
173    let barrier = Arc::new(Barrier::new(2));
174    let (tx_results, mut rx_results) = mpsc::channel(2);
175    
176    let start_time = Instant::now();
177    
178    // TCP服务器任务
179    let server_barrier = barrier.clone();
180    let server_tx = tx_results.clone();
181    let server_task = tokio::spawn(async move {
182        let listener = TcpListener::bind(addr).await?;
183        info!("TCP服务器已启动: {}", addr);
184        server_barrier.wait().await;
185        
186        let mut server_ops = 0u64;
187        let mut server_bytes = 0u64;
188        
189        let (mut stream, _) = listener.accept().await?;
190        let mut buffer = [0u8; 1024];
191        
192        for _i in 0..iterations {
193            // 读取客户端消息
194            match stream.read(&mut buffer).await {
195                Ok(n) if n >= 32 => {
196                    let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197                    let header = PortalHeader::from_bytes(&header_bytes);
198                    
199                    if header.verify_checksum() {
200                        server_ops += 1;
201                        server_bytes += n as u64;
202                        
203                        // 回复消息
204                        let response = PortalHeader::new(2, 1024, header.sequence);
205                        let response_bytes = response.to_bytes();
206                        stream.write_all(&response_bytes).await?;
207                        server_bytes += 32;
208                    }
209                }
210                Ok(0) => break,
211                Ok(_) => continue,
212                Err(e) => {
213                    error!("服务器读取错误: {}", e);
214                    break;
215                }
216            }
217        }
218        
219        server_tx.send((server_ops, server_bytes)).await.unwrap();
220        Ok::<(), anyhow::Error>(())
221    });
222    
223    // TCP客户端任务
224    let client_barrier = barrier.clone();
225    let client_tx = tx_results.clone();
226    let client_task = tokio::spawn(async move {
227        tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228        
229        let mut stream = TcpStream::connect(addr).await?;
230        client_barrier.wait().await;
231        
232        let mut client_ops = 0u64;
233        let mut client_bytes = 0u64;
234        let mut buffer = [0u8; 1024];
235        
236        for i in 0..iterations {
237            // 发送消息
238            let header = PortalHeader::new(1, 1024, i);
239            let header_bytes = header.to_bytes();
240            stream.write_all(&header_bytes).await?;
241            client_bytes += 32;
242            
243            // 读取回复
244            match stream.read(&mut buffer).await {
245                Ok(n) if n >= 32 => {
246                    let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247                    let response = PortalHeader::from_bytes(&response_bytes);
248                    
249                    if response.verify_checksum() {
250                        client_ops += 1;
251                        client_bytes += n as u64;
252                    }
253                }
254                Ok(_) => continue,
255                Err(e) => {
256                    error!("客户端读取错误: {}", e);
257                    break;
258                }
259            }
260            
261            if i % 10_000 == 0 {
262                tokio::task::yield_now().await;
263            }
264        }
265        
266        client_tx.send((client_ops, client_bytes)).await.unwrap();
267        Ok::<(), anyhow::Error>(())
268    });
269    
270    // 等待任务完成
271    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272    server_result?;
273    client_result?;
274    
275    // 收集结果
276    drop(tx_results);
277    let mut total_ops = 0u64;
278    let mut total_bytes = 0u64;
279    
280    while let Some((ops, bytes)) = rx_results.recv().await {
281        total_ops += ops;
282        total_bytes += bytes;
283    }
284    
285    let duration = start_time.elapsed();
286    let duration_secs = duration.as_secs_f64();
287    let ops_per_sec = total_ops as f64 / duration_secs;
288    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290    
291    Ok(TestResult {
292        test_name: "Rust ↔ Rust".to_string(),
293        transport_mode: "TCP网络".to_string(),
294        total_operations: total_ops,
295        duration_secs,
296        ops_per_sec,
297        throughput_mbps,
298        avg_latency_us,
299        bytes_transferred: total_bytes,
300    })
301}
302
303/// 生成性能报告
304pub fn generate_performance_report(results: &[TestResult]) {
305    info!("📈 Data Portal 跨语言性能测试报告");
306    info!("================================================================");
307    info!("通信组合              | 传输模式   | 操作频率     | 吞吐量      | 延迟");
308    info!("---------------------|-----------|-------------|------------|--------");
309    
310    for result in results {
311        info!(
312            "{:<20} | {:<9} | {:>9.1}M/s | {:>8.1}MB/s | {:>6.3}μs",
313            result.test_name,
314            result.transport_mode,
315            result.ops_per_sec / 1_000_000.0,
316            result.throughput_mbps,
317            result.avg_latency_us
318        );
319    }
320    
321    info!("================================================================");
322    
323    // 性能对比分析
324    if results.len() >= 2 {
325        let shm_results: Vec<_> = results.iter().filter(|r| r.transport_mode.contains("共享内存")).collect();
326        let tcp_results: Vec<_> = results.iter().filter(|r| r.transport_mode.contains("TCP")).collect();
327        
328        if !shm_results.is_empty() && !tcp_results.is_empty() {
329            let avg_shm_throughput: f64 = shm_results.iter().map(|r| r.throughput_mbps).sum::<f64>() / shm_results.len() as f64;
330            let avg_tcp_throughput: f64 = tcp_results.iter().map(|r| r.throughput_mbps).sum::<f64>() / tcp_results.len() as f64;
331            let improvement = avg_shm_throughput / avg_tcp_throughput;
332            
333            info!("🔥 性能提升分析:");
334            info!("  共享内存平均吞吐量: {:.1} MB/s", avg_shm_throughput);
335            info!("  TCP网络平均吞吐量: {:.1} MB/s", avg_tcp_throughput);
336            info!("  共享内存 vs TCP: {:.1}x 性能提升", improvement);
337        }
338    }
339}
340
341#[tokio::main]
342async fn main() -> Result<()> {
343    // 初始化日志
344    tracing_subscriber::fmt()
345        .with_env_filter("info")
346        .init();
347    
348    info!("🎯 Data Portal 跨语言性能测试");
349    info!("测试6组通信组合的双向通信性能");
350    println!();
351    
352    let mut results = Vec::new();
353    
354    // 测试1: Rust ↔ Rust 共享内存
355    match test_rust_rust_shared_memory().await {
356        Ok(result) => {
357            result.print_summary();
358            results.push(result);
359        }
360        Err(e) => error!("❌ Rust ↔ Rust 共享内存测试失败: {}", e),
361    }
362    
363    println!();
364    
365    // 测试2: Rust ↔ Rust TCP
366    match test_rust_rust_tcp().await {
367        Ok(result) => {
368            result.print_summary();
369            results.push(result);
370        }
371        Err(e) => error!("❌ Rust ↔ Rust TCP测试失败: {}", e),
372    }
373    
374    println!();
375    
376    // 生成报告
377    if !results.is_empty() {
378        generate_performance_report(&results);
379    }
380    
381    info!("🏁 Rust端测试完成!");
382    info!("📝 注意: Swift测试需要在Xcode中运行或使用swift命令");
383    
384    Ok(())
385}