Struct PortalHeader

Source
#[repr(C)]
pub struct PortalHeader { pub magic: u32, pub version: u8, pub msg_type: u8, pub flags: u16, pub payload_len: u32, pub sequence: u32, pub timestamp: u64, pub checksum: u32, pub reserved: [u8; 4], }
Expand description

UTP协议固定32字节头部

Fields§

§magic: u32§version: u8§msg_type: u8§flags: u16§payload_len: u32§sequence: u32§timestamp: u64§checksum: u32§reserved: [u8; 4]

Implementations§

Source§

impl PortalHeader

Source

pub const MAGIC: u32 = 1_431_588_864u32

Source

pub const SIZE: usize = 32usize

Source

pub fn new(msg_type: u8, payload_len: u32, sequence: u32) -> Self

Examples found in repository?
examples/cross_language_test.rs (line 78)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47    info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48    
49    let iterations = 1_000_000; // 100万次双向操作
50    let barrier = Arc::new(Barrier::new(2));
51    let (tx_results, mut rx_results) = mpsc::channel(2);
52    
53    let start_time = Instant::now();
54    
55    // 服务器端任务
56    let server_barrier = barrier.clone();
57    let server_tx = tx_results.clone();
58    let server_task = tokio::spawn(async move {
59        // 创建共享内存段
60        let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61        server_barrier.wait().await;
62        
63        let mut server_ops = 0u64;
64        let mut server_bytes = 0u64;
65        
66        for i in 0..iterations {
67            // 读取客户端消息
68            let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69            let mut header_bytes = [0u8; 32];
70            header_bytes.copy_from_slice(read_data);
71            let header = PortalHeader::from_bytes(&header_bytes);
72            
73            if header.verify_checksum() {
74                server_ops += 1;
75                server_bytes += 32;
76                
77                // 回复消息
78                let response = PortalHeader::new(2, 1024, i);
79                let response_bytes = response.to_bytes();
80                unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81                server_bytes += 32;
82            }
83            
84            // 每10万次让出控制权
85            if i % 100_000 == 0 {
86                tokio::task::yield_now().await;
87            }
88        }
89        
90        server_tx.send((server_ops, server_bytes)).await.unwrap();
91        Ok::<(), anyhow::Error>(())
92    });
93    
94    // 客户端任务
95    let client_barrier = barrier.clone();
96    let client_tx = tx_results.clone();
97    let client_task = tokio::spawn(async move {
98        tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99        
100        // 连接到相同的共享内存段
101        let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102        client_barrier.wait().await;
103        
104        let mut client_ops = 0u64;
105        let mut client_bytes = 0u64;
106        
107        for i in 0..iterations {
108            // 发送消息
109            let header = PortalHeader::new(1, 1024, i);
110            let header_bytes = header.to_bytes();
111            unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112            client_bytes += 32;
113            
114            // 读取回复
115            let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116            let mut response_bytes = [0u8; 32];
117            response_bytes.copy_from_slice(read_data);
118            let response = PortalHeader::from_bytes(&response_bytes);
119            
120            if response.verify_checksum() {
121                client_ops += 1;
122                client_bytes += 32;
123            }
124            
125            if i % 100_000 == 0 {
126                tokio::task::yield_now().await;
127            }
128        }
129        
130        client_tx.send((client_ops, client_bytes)).await.unwrap();
131        Ok::<(), anyhow::Error>(())
132    });
133    
134    // 等待任务完成
135    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136    server_result?;
137    client_result?;
138    
139    // 收集结果
140    drop(tx_results);
141    let mut total_ops = 0u64;
142    let mut total_bytes = 0u64;
143    
144    while let Some((ops, bytes)) = rx_results.recv().await {
145        total_ops += ops;
146        total_bytes += bytes;
147    }
148    
149    let duration = start_time.elapsed();
150    let duration_secs = duration.as_secs_f64();
151    let ops_per_sec = total_ops as f64 / duration_secs;
152    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154    
155    Ok(TestResult {
156        test_name: "Rust ↔ Rust".to_string(),
157        transport_mode: "共享内存".to_string(),
158        total_operations: total_ops,
159        duration_secs,
160        ops_per_sec,
161        throughput_mbps,
162        avg_latency_us,
163        bytes_transferred: total_bytes,
164    })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169    info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170    
171    let iterations = 100_000; // 10万次双向操作(TCP较慢)
172    let addr = "127.0.0.1:9091";
173    let barrier = Arc::new(Barrier::new(2));
174    let (tx_results, mut rx_results) = mpsc::channel(2);
175    
176    let start_time = Instant::now();
177    
178    // TCP服务器任务
179    let server_barrier = barrier.clone();
180    let server_tx = tx_results.clone();
181    let server_task = tokio::spawn(async move {
182        let listener = TcpListener::bind(addr).await?;
183        info!("TCP服务器已启动: {}", addr);
184        server_barrier.wait().await;
185        
186        let mut server_ops = 0u64;
187        let mut server_bytes = 0u64;
188        
189        let (mut stream, _) = listener.accept().await?;
190        let mut buffer = [0u8; 1024];
191        
192        for _i in 0..iterations {
193            // 读取客户端消息
194            match stream.read(&mut buffer).await {
195                Ok(n) if n >= 32 => {
196                    let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197                    let header = PortalHeader::from_bytes(&header_bytes);
198                    
199                    if header.verify_checksum() {
200                        server_ops += 1;
201                        server_bytes += n as u64;
202                        
203                        // 回复消息
204                        let response = PortalHeader::new(2, 1024, header.sequence);
205                        let response_bytes = response.to_bytes();
206                        stream.write_all(&response_bytes).await?;
207                        server_bytes += 32;
208                    }
209                }
210                Ok(0) => break,
211                Ok(_) => continue,
212                Err(e) => {
213                    error!("服务器读取错误: {}", e);
214                    break;
215                }
216            }
217        }
218        
219        server_tx.send((server_ops, server_bytes)).await.unwrap();
220        Ok::<(), anyhow::Error>(())
221    });
222    
223    // TCP客户端任务
224    let client_barrier = barrier.clone();
225    let client_tx = tx_results.clone();
226    let client_task = tokio::spawn(async move {
227        tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228        
229        let mut stream = TcpStream::connect(addr).await?;
230        client_barrier.wait().await;
231        
232        let mut client_ops = 0u64;
233        let mut client_bytes = 0u64;
234        let mut buffer = [0u8; 1024];
235        
236        for i in 0..iterations {
237            // 发送消息
238            let header = PortalHeader::new(1, 1024, i);
239            let header_bytes = header.to_bytes();
240            stream.write_all(&header_bytes).await?;
241            client_bytes += 32;
242            
243            // 读取回复
244            match stream.read(&mut buffer).await {
245                Ok(n) if n >= 32 => {
246                    let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247                    let response = PortalHeader::from_bytes(&response_bytes);
248                    
249                    if response.verify_checksum() {
250                        client_ops += 1;
251                        client_bytes += n as u64;
252                    }
253                }
254                Ok(_) => continue,
255                Err(e) => {
256                    error!("客户端读取错误: {}", e);
257                    break;
258                }
259            }
260            
261            if i % 10_000 == 0 {
262                tokio::task::yield_now().await;
263            }
264        }
265        
266        client_tx.send((client_ops, client_bytes)).await.unwrap();
267        Ok::<(), anyhow::Error>(())
268    });
269    
270    // 等待任务完成
271    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272    server_result?;
273    client_result?;
274    
275    // 收集结果
276    drop(tx_results);
277    let mut total_ops = 0u64;
278    let mut total_bytes = 0u64;
279    
280    while let Some((ops, bytes)) = rx_results.recv().await {
281        total_ops += ops;
282        total_bytes += bytes;
283    }
284    
285    let duration = start_time.elapsed();
286    let duration_secs = duration.as_secs_f64();
287    let ops_per_sec = total_ops as f64 / duration_secs;
288    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290    
291    Ok(TestResult {
292        test_name: "Rust ↔ Rust".to_string(),
293        transport_mode: "TCP网络".to_string(),
294        total_operations: total_ops,
295        duration_secs,
296        ops_per_sec,
297        throughput_mbps,
298        avg_latency_us,
299        bytes_transferred: total_bytes,
300    })
301}
Source

pub fn to_bytes(&self) -> [u8; 32]

Examples found in repository?
examples/cross_language_test.rs (line 79)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47    info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48    
49    let iterations = 1_000_000; // 100万次双向操作
50    let barrier = Arc::new(Barrier::new(2));
51    let (tx_results, mut rx_results) = mpsc::channel(2);
52    
53    let start_time = Instant::now();
54    
55    // 服务器端任务
56    let server_barrier = barrier.clone();
57    let server_tx = tx_results.clone();
58    let server_task = tokio::spawn(async move {
59        // 创建共享内存段
60        let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61        server_barrier.wait().await;
62        
63        let mut server_ops = 0u64;
64        let mut server_bytes = 0u64;
65        
66        for i in 0..iterations {
67            // 读取客户端消息
68            let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69            let mut header_bytes = [0u8; 32];
70            header_bytes.copy_from_slice(read_data);
71            let header = PortalHeader::from_bytes(&header_bytes);
72            
73            if header.verify_checksum() {
74                server_ops += 1;
75                server_bytes += 32;
76                
77                // 回复消息
78                let response = PortalHeader::new(2, 1024, i);
79                let response_bytes = response.to_bytes();
80                unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81                server_bytes += 32;
82            }
83            
84            // 每10万次让出控制权
85            if i % 100_000 == 0 {
86                tokio::task::yield_now().await;
87            }
88        }
89        
90        server_tx.send((server_ops, server_bytes)).await.unwrap();
91        Ok::<(), anyhow::Error>(())
92    });
93    
94    // 客户端任务
95    let client_barrier = barrier.clone();
96    let client_tx = tx_results.clone();
97    let client_task = tokio::spawn(async move {
98        tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99        
100        // 连接到相同的共享内存段
101        let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102        client_barrier.wait().await;
103        
104        let mut client_ops = 0u64;
105        let mut client_bytes = 0u64;
106        
107        for i in 0..iterations {
108            // 发送消息
109            let header = PortalHeader::new(1, 1024, i);
110            let header_bytes = header.to_bytes();
111            unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112            client_bytes += 32;
113            
114            // 读取回复
115            let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116            let mut response_bytes = [0u8; 32];
117            response_bytes.copy_from_slice(read_data);
118            let response = PortalHeader::from_bytes(&response_bytes);
119            
120            if response.verify_checksum() {
121                client_ops += 1;
122                client_bytes += 32;
123            }
124            
125            if i % 100_000 == 0 {
126                tokio::task::yield_now().await;
127            }
128        }
129        
130        client_tx.send((client_ops, client_bytes)).await.unwrap();
131        Ok::<(), anyhow::Error>(())
132    });
133    
134    // 等待任务完成
135    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136    server_result?;
137    client_result?;
138    
139    // 收集结果
140    drop(tx_results);
141    let mut total_ops = 0u64;
142    let mut total_bytes = 0u64;
143    
144    while let Some((ops, bytes)) = rx_results.recv().await {
145        total_ops += ops;
146        total_bytes += bytes;
147    }
148    
149    let duration = start_time.elapsed();
150    let duration_secs = duration.as_secs_f64();
151    let ops_per_sec = total_ops as f64 / duration_secs;
152    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154    
155    Ok(TestResult {
156        test_name: "Rust ↔ Rust".to_string(),
157        transport_mode: "共享内存".to_string(),
158        total_operations: total_ops,
159        duration_secs,
160        ops_per_sec,
161        throughput_mbps,
162        avg_latency_us,
163        bytes_transferred: total_bytes,
164    })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169    info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170    
171    let iterations = 100_000; // 10万次双向操作(TCP较慢)
172    let addr = "127.0.0.1:9091";
173    let barrier = Arc::new(Barrier::new(2));
174    let (tx_results, mut rx_results) = mpsc::channel(2);
175    
176    let start_time = Instant::now();
177    
178    // TCP服务器任务
179    let server_barrier = barrier.clone();
180    let server_tx = tx_results.clone();
181    let server_task = tokio::spawn(async move {
182        let listener = TcpListener::bind(addr).await?;
183        info!("TCP服务器已启动: {}", addr);
184        server_barrier.wait().await;
185        
186        let mut server_ops = 0u64;
187        let mut server_bytes = 0u64;
188        
189        let (mut stream, _) = listener.accept().await?;
190        let mut buffer = [0u8; 1024];
191        
192        for _i in 0..iterations {
193            // 读取客户端消息
194            match stream.read(&mut buffer).await {
195                Ok(n) if n >= 32 => {
196                    let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197                    let header = PortalHeader::from_bytes(&header_bytes);
198                    
199                    if header.verify_checksum() {
200                        server_ops += 1;
201                        server_bytes += n as u64;
202                        
203                        // 回复消息
204                        let response = PortalHeader::new(2, 1024, header.sequence);
205                        let response_bytes = response.to_bytes();
206                        stream.write_all(&response_bytes).await?;
207                        server_bytes += 32;
208                    }
209                }
210                Ok(0) => break,
211                Ok(_) => continue,
212                Err(e) => {
213                    error!("服务器读取错误: {}", e);
214                    break;
215                }
216            }
217        }
218        
219        server_tx.send((server_ops, server_bytes)).await.unwrap();
220        Ok::<(), anyhow::Error>(())
221    });
222    
223    // TCP客户端任务
224    let client_barrier = barrier.clone();
225    let client_tx = tx_results.clone();
226    let client_task = tokio::spawn(async move {
227        tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228        
229        let mut stream = TcpStream::connect(addr).await?;
230        client_barrier.wait().await;
231        
232        let mut client_ops = 0u64;
233        let mut client_bytes = 0u64;
234        let mut buffer = [0u8; 1024];
235        
236        for i in 0..iterations {
237            // 发送消息
238            let header = PortalHeader::new(1, 1024, i);
239            let header_bytes = header.to_bytes();
240            stream.write_all(&header_bytes).await?;
241            client_bytes += 32;
242            
243            // 读取回复
244            match stream.read(&mut buffer).await {
245                Ok(n) if n >= 32 => {
246                    let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247                    let response = PortalHeader::from_bytes(&response_bytes);
248                    
249                    if response.verify_checksum() {
250                        client_ops += 1;
251                        client_bytes += n as u64;
252                    }
253                }
254                Ok(_) => continue,
255                Err(e) => {
256                    error!("客户端读取错误: {}", e);
257                    break;
258                }
259            }
260            
261            if i % 10_000 == 0 {
262                tokio::task::yield_now().await;
263            }
264        }
265        
266        client_tx.send((client_ops, client_bytes)).await.unwrap();
267        Ok::<(), anyhow::Error>(())
268    });
269    
270    // 等待任务完成
271    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272    server_result?;
273    client_result?;
274    
275    // 收集结果
276    drop(tx_results);
277    let mut total_ops = 0u64;
278    let mut total_bytes = 0u64;
279    
280    while let Some((ops, bytes)) = rx_results.recv().await {
281        total_ops += ops;
282        total_bytes += bytes;
283    }
284    
285    let duration = start_time.elapsed();
286    let duration_secs = duration.as_secs_f64();
287    let ops_per_sec = total_ops as f64 / duration_secs;
288    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290    
291    Ok(TestResult {
292        test_name: "Rust ↔ Rust".to_string(),
293        transport_mode: "TCP网络".to_string(),
294        total_operations: total_ops,
295        duration_secs,
296        ops_per_sec,
297        throughput_mbps,
298        avg_latency_us,
299        bytes_transferred: total_bytes,
300    })
301}
Source

pub fn from_bytes(bytes: &[u8; 32]) -> Self

Examples found in repository?
examples/cross_language_test.rs (line 71)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47    info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48    
49    let iterations = 1_000_000; // 100万次双向操作
50    let barrier = Arc::new(Barrier::new(2));
51    let (tx_results, mut rx_results) = mpsc::channel(2);
52    
53    let start_time = Instant::now();
54    
55    // 服务器端任务
56    let server_barrier = barrier.clone();
57    let server_tx = tx_results.clone();
58    let server_task = tokio::spawn(async move {
59        // 创建共享内存段
60        let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61        server_barrier.wait().await;
62        
63        let mut server_ops = 0u64;
64        let mut server_bytes = 0u64;
65        
66        for i in 0..iterations {
67            // 读取客户端消息
68            let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69            let mut header_bytes = [0u8; 32];
70            header_bytes.copy_from_slice(read_data);
71            let header = PortalHeader::from_bytes(&header_bytes);
72            
73            if header.verify_checksum() {
74                server_ops += 1;
75                server_bytes += 32;
76                
77                // 回复消息
78                let response = PortalHeader::new(2, 1024, i);
79                let response_bytes = response.to_bytes();
80                unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81                server_bytes += 32;
82            }
83            
84            // 每10万次让出控制权
85            if i % 100_000 == 0 {
86                tokio::task::yield_now().await;
87            }
88        }
89        
90        server_tx.send((server_ops, server_bytes)).await.unwrap();
91        Ok::<(), anyhow::Error>(())
92    });
93    
94    // 客户端任务
95    let client_barrier = barrier.clone();
96    let client_tx = tx_results.clone();
97    let client_task = tokio::spawn(async move {
98        tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99        
100        // 连接到相同的共享内存段
101        let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102        client_barrier.wait().await;
103        
104        let mut client_ops = 0u64;
105        let mut client_bytes = 0u64;
106        
107        for i in 0..iterations {
108            // 发送消息
109            let header = PortalHeader::new(1, 1024, i);
110            let header_bytes = header.to_bytes();
111            unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112            client_bytes += 32;
113            
114            // 读取回复
115            let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116            let mut response_bytes = [0u8; 32];
117            response_bytes.copy_from_slice(read_data);
118            let response = PortalHeader::from_bytes(&response_bytes);
119            
120            if response.verify_checksum() {
121                client_ops += 1;
122                client_bytes += 32;
123            }
124            
125            if i % 100_000 == 0 {
126                tokio::task::yield_now().await;
127            }
128        }
129        
130        client_tx.send((client_ops, client_bytes)).await.unwrap();
131        Ok::<(), anyhow::Error>(())
132    });
133    
134    // 等待任务完成
135    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136    server_result?;
137    client_result?;
138    
139    // 收集结果
140    drop(tx_results);
141    let mut total_ops = 0u64;
142    let mut total_bytes = 0u64;
143    
144    while let Some((ops, bytes)) = rx_results.recv().await {
145        total_ops += ops;
146        total_bytes += bytes;
147    }
148    
149    let duration = start_time.elapsed();
150    let duration_secs = duration.as_secs_f64();
151    let ops_per_sec = total_ops as f64 / duration_secs;
152    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154    
155    Ok(TestResult {
156        test_name: "Rust ↔ Rust".to_string(),
157        transport_mode: "共享内存".to_string(),
158        total_operations: total_ops,
159        duration_secs,
160        ops_per_sec,
161        throughput_mbps,
162        avg_latency_us,
163        bytes_transferred: total_bytes,
164    })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169    info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170    
171    let iterations = 100_000; // 10万次双向操作(TCP较慢)
172    let addr = "127.0.0.1:9091";
173    let barrier = Arc::new(Barrier::new(2));
174    let (tx_results, mut rx_results) = mpsc::channel(2);
175    
176    let start_time = Instant::now();
177    
178    // TCP服务器任务
179    let server_barrier = barrier.clone();
180    let server_tx = tx_results.clone();
181    let server_task = tokio::spawn(async move {
182        let listener = TcpListener::bind(addr).await?;
183        info!("TCP服务器已启动: {}", addr);
184        server_barrier.wait().await;
185        
186        let mut server_ops = 0u64;
187        let mut server_bytes = 0u64;
188        
189        let (mut stream, _) = listener.accept().await?;
190        let mut buffer = [0u8; 1024];
191        
192        for _i in 0..iterations {
193            // 读取客户端消息
194            match stream.read(&mut buffer).await {
195                Ok(n) if n >= 32 => {
196                    let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197                    let header = PortalHeader::from_bytes(&header_bytes);
198                    
199                    if header.verify_checksum() {
200                        server_ops += 1;
201                        server_bytes += n as u64;
202                        
203                        // 回复消息
204                        let response = PortalHeader::new(2, 1024, header.sequence);
205                        let response_bytes = response.to_bytes();
206                        stream.write_all(&response_bytes).await?;
207                        server_bytes += 32;
208                    }
209                }
210                Ok(0) => break,
211                Ok(_) => continue,
212                Err(e) => {
213                    error!("服务器读取错误: {}", e);
214                    break;
215                }
216            }
217        }
218        
219        server_tx.send((server_ops, server_bytes)).await.unwrap();
220        Ok::<(), anyhow::Error>(())
221    });
222    
223    // TCP客户端任务
224    let client_barrier = barrier.clone();
225    let client_tx = tx_results.clone();
226    let client_task = tokio::spawn(async move {
227        tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228        
229        let mut stream = TcpStream::connect(addr).await?;
230        client_barrier.wait().await;
231        
232        let mut client_ops = 0u64;
233        let mut client_bytes = 0u64;
234        let mut buffer = [0u8; 1024];
235        
236        for i in 0..iterations {
237            // 发送消息
238            let header = PortalHeader::new(1, 1024, i);
239            let header_bytes = header.to_bytes();
240            stream.write_all(&header_bytes).await?;
241            client_bytes += 32;
242            
243            // 读取回复
244            match stream.read(&mut buffer).await {
245                Ok(n) if n >= 32 => {
246                    let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247                    let response = PortalHeader::from_bytes(&response_bytes);
248                    
249                    if response.verify_checksum() {
250                        client_ops += 1;
251                        client_bytes += n as u64;
252                    }
253                }
254                Ok(_) => continue,
255                Err(e) => {
256                    error!("客户端读取错误: {}", e);
257                    break;
258                }
259            }
260            
261            if i % 10_000 == 0 {
262                tokio::task::yield_now().await;
263            }
264        }
265        
266        client_tx.send((client_ops, client_bytes)).await.unwrap();
267        Ok::<(), anyhow::Error>(())
268    });
269    
270    // 等待任务完成
271    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272    server_result?;
273    client_result?;
274    
275    // 收集结果
276    drop(tx_results);
277    let mut total_ops = 0u64;
278    let mut total_bytes = 0u64;
279    
280    while let Some((ops, bytes)) = rx_results.recv().await {
281        total_ops += ops;
282        total_bytes += bytes;
283    }
284    
285    let duration = start_time.elapsed();
286    let duration_secs = duration.as_secs_f64();
287    let ops_per_sec = total_ops as f64 / duration_secs;
288    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290    
291    Ok(TestResult {
292        test_name: "Rust ↔ Rust".to_string(),
293        transport_mode: "TCP网络".to_string(),
294        total_operations: total_ops,
295        duration_secs,
296        ops_per_sec,
297        throughput_mbps,
298        avg_latency_us,
299        bytes_transferred: total_bytes,
300    })
301}
Source

pub fn verify_checksum(&self) -> bool

Examples found in repository?
examples/cross_language_test.rs (line 73)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47    info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48    
49    let iterations = 1_000_000; // 100万次双向操作
50    let barrier = Arc::new(Barrier::new(2));
51    let (tx_results, mut rx_results) = mpsc::channel(2);
52    
53    let start_time = Instant::now();
54    
55    // 服务器端任务
56    let server_barrier = barrier.clone();
57    let server_tx = tx_results.clone();
58    let server_task = tokio::spawn(async move {
59        // 创建共享内存段
60        let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61        server_barrier.wait().await;
62        
63        let mut server_ops = 0u64;
64        let mut server_bytes = 0u64;
65        
66        for i in 0..iterations {
67            // 读取客户端消息
68            let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69            let mut header_bytes = [0u8; 32];
70            header_bytes.copy_from_slice(read_data);
71            let header = PortalHeader::from_bytes(&header_bytes);
72            
73            if header.verify_checksum() {
74                server_ops += 1;
75                server_bytes += 32;
76                
77                // 回复消息
78                let response = PortalHeader::new(2, 1024, i);
79                let response_bytes = response.to_bytes();
80                unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81                server_bytes += 32;
82            }
83            
84            // 每10万次让出控制权
85            if i % 100_000 == 0 {
86                tokio::task::yield_now().await;
87            }
88        }
89        
90        server_tx.send((server_ops, server_bytes)).await.unwrap();
91        Ok::<(), anyhow::Error>(())
92    });
93    
94    // 客户端任务
95    let client_barrier = barrier.clone();
96    let client_tx = tx_results.clone();
97    let client_task = tokio::spawn(async move {
98        tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99        
100        // 连接到相同的共享内存段
101        let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102        client_barrier.wait().await;
103        
104        let mut client_ops = 0u64;
105        let mut client_bytes = 0u64;
106        
107        for i in 0..iterations {
108            // 发送消息
109            let header = PortalHeader::new(1, 1024, i);
110            let header_bytes = header.to_bytes();
111            unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112            client_bytes += 32;
113            
114            // 读取回复
115            let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116            let mut response_bytes = [0u8; 32];
117            response_bytes.copy_from_slice(read_data);
118            let response = PortalHeader::from_bytes(&response_bytes);
119            
120            if response.verify_checksum() {
121                client_ops += 1;
122                client_bytes += 32;
123            }
124            
125            if i % 100_000 == 0 {
126                tokio::task::yield_now().await;
127            }
128        }
129        
130        client_tx.send((client_ops, client_bytes)).await.unwrap();
131        Ok::<(), anyhow::Error>(())
132    });
133    
134    // 等待任务完成
135    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136    server_result?;
137    client_result?;
138    
139    // 收集结果
140    drop(tx_results);
141    let mut total_ops = 0u64;
142    let mut total_bytes = 0u64;
143    
144    while let Some((ops, bytes)) = rx_results.recv().await {
145        total_ops += ops;
146        total_bytes += bytes;
147    }
148    
149    let duration = start_time.elapsed();
150    let duration_secs = duration.as_secs_f64();
151    let ops_per_sec = total_ops as f64 / duration_secs;
152    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154    
155    Ok(TestResult {
156        test_name: "Rust ↔ Rust".to_string(),
157        transport_mode: "共享内存".to_string(),
158        total_operations: total_ops,
159        duration_secs,
160        ops_per_sec,
161        throughput_mbps,
162        avg_latency_us,
163        bytes_transferred: total_bytes,
164    })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169    info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170    
171    let iterations = 100_000; // 10万次双向操作(TCP较慢)
172    let addr = "127.0.0.1:9091";
173    let barrier = Arc::new(Barrier::new(2));
174    let (tx_results, mut rx_results) = mpsc::channel(2);
175    
176    let start_time = Instant::now();
177    
178    // TCP服务器任务
179    let server_barrier = barrier.clone();
180    let server_tx = tx_results.clone();
181    let server_task = tokio::spawn(async move {
182        let listener = TcpListener::bind(addr).await?;
183        info!("TCP服务器已启动: {}", addr);
184        server_barrier.wait().await;
185        
186        let mut server_ops = 0u64;
187        let mut server_bytes = 0u64;
188        
189        let (mut stream, _) = listener.accept().await?;
190        let mut buffer = [0u8; 1024];
191        
192        for _i in 0..iterations {
193            // 读取客户端消息
194            match stream.read(&mut buffer).await {
195                Ok(n) if n >= 32 => {
196                    let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197                    let header = PortalHeader::from_bytes(&header_bytes);
198                    
199                    if header.verify_checksum() {
200                        server_ops += 1;
201                        server_bytes += n as u64;
202                        
203                        // 回复消息
204                        let response = PortalHeader::new(2, 1024, header.sequence);
205                        let response_bytes = response.to_bytes();
206                        stream.write_all(&response_bytes).await?;
207                        server_bytes += 32;
208                    }
209                }
210                Ok(0) => break,
211                Ok(_) => continue,
212                Err(e) => {
213                    error!("服务器读取错误: {}", e);
214                    break;
215                }
216            }
217        }
218        
219        server_tx.send((server_ops, server_bytes)).await.unwrap();
220        Ok::<(), anyhow::Error>(())
221    });
222    
223    // TCP客户端任务
224    let client_barrier = barrier.clone();
225    let client_tx = tx_results.clone();
226    let client_task = tokio::spawn(async move {
227        tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228        
229        let mut stream = TcpStream::connect(addr).await?;
230        client_barrier.wait().await;
231        
232        let mut client_ops = 0u64;
233        let mut client_bytes = 0u64;
234        let mut buffer = [0u8; 1024];
235        
236        for i in 0..iterations {
237            // 发送消息
238            let header = PortalHeader::new(1, 1024, i);
239            let header_bytes = header.to_bytes();
240            stream.write_all(&header_bytes).await?;
241            client_bytes += 32;
242            
243            // 读取回复
244            match stream.read(&mut buffer).await {
245                Ok(n) if n >= 32 => {
246                    let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247                    let response = PortalHeader::from_bytes(&response_bytes);
248                    
249                    if response.verify_checksum() {
250                        client_ops += 1;
251                        client_bytes += n as u64;
252                    }
253                }
254                Ok(_) => continue,
255                Err(e) => {
256                    error!("客户端读取错误: {}", e);
257                    break;
258                }
259            }
260            
261            if i % 10_000 == 0 {
262                tokio::task::yield_now().await;
263            }
264        }
265        
266        client_tx.send((client_ops, client_bytes)).await.unwrap();
267        Ok::<(), anyhow::Error>(())
268    });
269    
270    // 等待任务完成
271    let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272    server_result?;
273    client_result?;
274    
275    // 收集结果
276    drop(tx_results);
277    let mut total_ops = 0u64;
278    let mut total_bytes = 0u64;
279    
280    while let Some((ops, bytes)) = rx_results.recv().await {
281        total_ops += ops;
282        total_bytes += bytes;
283    }
284    
285    let duration = start_time.elapsed();
286    let duration_secs = duration.as_secs_f64();
287    let ops_per_sec = total_ops as f64 / duration_secs;
288    let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289    let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290    
291    Ok(TestResult {
292        test_name: "Rust ↔ Rust".to_string(),
293        transport_mode: "TCP网络".to_string(),
294        total_operations: total_ops,
295        duration_secs,
296        ops_per_sec,
297        throughput_mbps,
298        avg_latency_us,
299        bytes_transferred: total_bytes,
300    })
301}

Trait Implementations§

Source§

impl Clone for PortalHeader

Source§

fn clone(&self) -> PortalHeader

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for PortalHeader

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Copy for PortalHeader

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.