#[repr(C)]pub struct PortalHeader {
pub magic: u32,
pub version: u8,
pub msg_type: u8,
pub flags: u16,
pub payload_len: u32,
pub sequence: u32,
pub timestamp: u64,
pub checksum: u32,
pub reserved: [u8; 4],
}
Expand description
UTP协议固定32字节头部
Fields§
§magic: u32
§version: u8
§msg_type: u8
§flags: u16
§payload_len: u32
§sequence: u32
§timestamp: u64
§checksum: u32
§reserved: [u8; 4]
Implementations§
Source§impl PortalHeader
impl PortalHeader
pub const MAGIC: u32 = 1_431_588_864u32
pub const SIZE: usize = 32usize
Sourcepub fn new(msg_type: u8, payload_len: u32, sequence: u32) -> Self
pub fn new(msg_type: u8, payload_len: u32, sequence: u32) -> Self
Examples found in repository?
examples/cross_language_test.rs (line 78)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47 info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48
49 let iterations = 1_000_000; // 100万次双向操作
50 let barrier = Arc::new(Barrier::new(2));
51 let (tx_results, mut rx_results) = mpsc::channel(2);
52
53 let start_time = Instant::now();
54
55 // 服务器端任务
56 let server_barrier = barrier.clone();
57 let server_tx = tx_results.clone();
58 let server_task = tokio::spawn(async move {
59 // 创建共享内存段
60 let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61 server_barrier.wait().await;
62
63 let mut server_ops = 0u64;
64 let mut server_bytes = 0u64;
65
66 for i in 0..iterations {
67 // 读取客户端消息
68 let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69 let mut header_bytes = [0u8; 32];
70 header_bytes.copy_from_slice(read_data);
71 let header = PortalHeader::from_bytes(&header_bytes);
72
73 if header.verify_checksum() {
74 server_ops += 1;
75 server_bytes += 32;
76
77 // 回复消息
78 let response = PortalHeader::new(2, 1024, i);
79 let response_bytes = response.to_bytes();
80 unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81 server_bytes += 32;
82 }
83
84 // 每10万次让出控制权
85 if i % 100_000 == 0 {
86 tokio::task::yield_now().await;
87 }
88 }
89
90 server_tx.send((server_ops, server_bytes)).await.unwrap();
91 Ok::<(), anyhow::Error>(())
92 });
93
94 // 客户端任务
95 let client_barrier = barrier.clone();
96 let client_tx = tx_results.clone();
97 let client_task = tokio::spawn(async move {
98 tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99
100 // 连接到相同的共享内存段
101 let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102 client_barrier.wait().await;
103
104 let mut client_ops = 0u64;
105 let mut client_bytes = 0u64;
106
107 for i in 0..iterations {
108 // 发送消息
109 let header = PortalHeader::new(1, 1024, i);
110 let header_bytes = header.to_bytes();
111 unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112 client_bytes += 32;
113
114 // 读取回复
115 let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116 let mut response_bytes = [0u8; 32];
117 response_bytes.copy_from_slice(read_data);
118 let response = PortalHeader::from_bytes(&response_bytes);
119
120 if response.verify_checksum() {
121 client_ops += 1;
122 client_bytes += 32;
123 }
124
125 if i % 100_000 == 0 {
126 tokio::task::yield_now().await;
127 }
128 }
129
130 client_tx.send((client_ops, client_bytes)).await.unwrap();
131 Ok::<(), anyhow::Error>(())
132 });
133
134 // 等待任务完成
135 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136 server_result?;
137 client_result?;
138
139 // 收集结果
140 drop(tx_results);
141 let mut total_ops = 0u64;
142 let mut total_bytes = 0u64;
143
144 while let Some((ops, bytes)) = rx_results.recv().await {
145 total_ops += ops;
146 total_bytes += bytes;
147 }
148
149 let duration = start_time.elapsed();
150 let duration_secs = duration.as_secs_f64();
151 let ops_per_sec = total_ops as f64 / duration_secs;
152 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154
155 Ok(TestResult {
156 test_name: "Rust ↔ Rust".to_string(),
157 transport_mode: "共享内存".to_string(),
158 total_operations: total_ops,
159 duration_secs,
160 ops_per_sec,
161 throughput_mbps,
162 avg_latency_us,
163 bytes_transferred: total_bytes,
164 })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169 info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170
171 let iterations = 100_000; // 10万次双向操作(TCP较慢)
172 let addr = "127.0.0.1:9091";
173 let barrier = Arc::new(Barrier::new(2));
174 let (tx_results, mut rx_results) = mpsc::channel(2);
175
176 let start_time = Instant::now();
177
178 // TCP服务器任务
179 let server_barrier = barrier.clone();
180 let server_tx = tx_results.clone();
181 let server_task = tokio::spawn(async move {
182 let listener = TcpListener::bind(addr).await?;
183 info!("TCP服务器已启动: {}", addr);
184 server_barrier.wait().await;
185
186 let mut server_ops = 0u64;
187 let mut server_bytes = 0u64;
188
189 let (mut stream, _) = listener.accept().await?;
190 let mut buffer = [0u8; 1024];
191
192 for _i in 0..iterations {
193 // 读取客户端消息
194 match stream.read(&mut buffer).await {
195 Ok(n) if n >= 32 => {
196 let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197 let header = PortalHeader::from_bytes(&header_bytes);
198
199 if header.verify_checksum() {
200 server_ops += 1;
201 server_bytes += n as u64;
202
203 // 回复消息
204 let response = PortalHeader::new(2, 1024, header.sequence);
205 let response_bytes = response.to_bytes();
206 stream.write_all(&response_bytes).await?;
207 server_bytes += 32;
208 }
209 }
210 Ok(0) => break,
211 Ok(_) => continue,
212 Err(e) => {
213 error!("服务器读取错误: {}", e);
214 break;
215 }
216 }
217 }
218
219 server_tx.send((server_ops, server_bytes)).await.unwrap();
220 Ok::<(), anyhow::Error>(())
221 });
222
223 // TCP客户端任务
224 let client_barrier = barrier.clone();
225 let client_tx = tx_results.clone();
226 let client_task = tokio::spawn(async move {
227 tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228
229 let mut stream = TcpStream::connect(addr).await?;
230 client_barrier.wait().await;
231
232 let mut client_ops = 0u64;
233 let mut client_bytes = 0u64;
234 let mut buffer = [0u8; 1024];
235
236 for i in 0..iterations {
237 // 发送消息
238 let header = PortalHeader::new(1, 1024, i);
239 let header_bytes = header.to_bytes();
240 stream.write_all(&header_bytes).await?;
241 client_bytes += 32;
242
243 // 读取回复
244 match stream.read(&mut buffer).await {
245 Ok(n) if n >= 32 => {
246 let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247 let response = PortalHeader::from_bytes(&response_bytes);
248
249 if response.verify_checksum() {
250 client_ops += 1;
251 client_bytes += n as u64;
252 }
253 }
254 Ok(_) => continue,
255 Err(e) => {
256 error!("客户端读取错误: {}", e);
257 break;
258 }
259 }
260
261 if i % 10_000 == 0 {
262 tokio::task::yield_now().await;
263 }
264 }
265
266 client_tx.send((client_ops, client_bytes)).await.unwrap();
267 Ok::<(), anyhow::Error>(())
268 });
269
270 // 等待任务完成
271 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272 server_result?;
273 client_result?;
274
275 // 收集结果
276 drop(tx_results);
277 let mut total_ops = 0u64;
278 let mut total_bytes = 0u64;
279
280 while let Some((ops, bytes)) = rx_results.recv().await {
281 total_ops += ops;
282 total_bytes += bytes;
283 }
284
285 let duration = start_time.elapsed();
286 let duration_secs = duration.as_secs_f64();
287 let ops_per_sec = total_ops as f64 / duration_secs;
288 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290
291 Ok(TestResult {
292 test_name: "Rust ↔ Rust".to_string(),
293 transport_mode: "TCP网络".to_string(),
294 total_operations: total_ops,
295 duration_secs,
296 ops_per_sec,
297 throughput_mbps,
298 avg_latency_us,
299 bytes_transferred: total_bytes,
300 })
301}
Sourcepub fn to_bytes(&self) -> [u8; 32]
pub fn to_bytes(&self) -> [u8; 32]
Examples found in repository?
examples/cross_language_test.rs (line 79)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47 info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48
49 let iterations = 1_000_000; // 100万次双向操作
50 let barrier = Arc::new(Barrier::new(2));
51 let (tx_results, mut rx_results) = mpsc::channel(2);
52
53 let start_time = Instant::now();
54
55 // 服务器端任务
56 let server_barrier = barrier.clone();
57 let server_tx = tx_results.clone();
58 let server_task = tokio::spawn(async move {
59 // 创建共享内存段
60 let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61 server_barrier.wait().await;
62
63 let mut server_ops = 0u64;
64 let mut server_bytes = 0u64;
65
66 for i in 0..iterations {
67 // 读取客户端消息
68 let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69 let mut header_bytes = [0u8; 32];
70 header_bytes.copy_from_slice(read_data);
71 let header = PortalHeader::from_bytes(&header_bytes);
72
73 if header.verify_checksum() {
74 server_ops += 1;
75 server_bytes += 32;
76
77 // 回复消息
78 let response = PortalHeader::new(2, 1024, i);
79 let response_bytes = response.to_bytes();
80 unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81 server_bytes += 32;
82 }
83
84 // 每10万次让出控制权
85 if i % 100_000 == 0 {
86 tokio::task::yield_now().await;
87 }
88 }
89
90 server_tx.send((server_ops, server_bytes)).await.unwrap();
91 Ok::<(), anyhow::Error>(())
92 });
93
94 // 客户端任务
95 let client_barrier = barrier.clone();
96 let client_tx = tx_results.clone();
97 let client_task = tokio::spawn(async move {
98 tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99
100 // 连接到相同的共享内存段
101 let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102 client_barrier.wait().await;
103
104 let mut client_ops = 0u64;
105 let mut client_bytes = 0u64;
106
107 for i in 0..iterations {
108 // 发送消息
109 let header = PortalHeader::new(1, 1024, i);
110 let header_bytes = header.to_bytes();
111 unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112 client_bytes += 32;
113
114 // 读取回复
115 let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116 let mut response_bytes = [0u8; 32];
117 response_bytes.copy_from_slice(read_data);
118 let response = PortalHeader::from_bytes(&response_bytes);
119
120 if response.verify_checksum() {
121 client_ops += 1;
122 client_bytes += 32;
123 }
124
125 if i % 100_000 == 0 {
126 tokio::task::yield_now().await;
127 }
128 }
129
130 client_tx.send((client_ops, client_bytes)).await.unwrap();
131 Ok::<(), anyhow::Error>(())
132 });
133
134 // 等待任务完成
135 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136 server_result?;
137 client_result?;
138
139 // 收集结果
140 drop(tx_results);
141 let mut total_ops = 0u64;
142 let mut total_bytes = 0u64;
143
144 while let Some((ops, bytes)) = rx_results.recv().await {
145 total_ops += ops;
146 total_bytes += bytes;
147 }
148
149 let duration = start_time.elapsed();
150 let duration_secs = duration.as_secs_f64();
151 let ops_per_sec = total_ops as f64 / duration_secs;
152 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154
155 Ok(TestResult {
156 test_name: "Rust ↔ Rust".to_string(),
157 transport_mode: "共享内存".to_string(),
158 total_operations: total_ops,
159 duration_secs,
160 ops_per_sec,
161 throughput_mbps,
162 avg_latency_us,
163 bytes_transferred: total_bytes,
164 })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169 info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170
171 let iterations = 100_000; // 10万次双向操作(TCP较慢)
172 let addr = "127.0.0.1:9091";
173 let barrier = Arc::new(Barrier::new(2));
174 let (tx_results, mut rx_results) = mpsc::channel(2);
175
176 let start_time = Instant::now();
177
178 // TCP服务器任务
179 let server_barrier = barrier.clone();
180 let server_tx = tx_results.clone();
181 let server_task = tokio::spawn(async move {
182 let listener = TcpListener::bind(addr).await?;
183 info!("TCP服务器已启动: {}", addr);
184 server_barrier.wait().await;
185
186 let mut server_ops = 0u64;
187 let mut server_bytes = 0u64;
188
189 let (mut stream, _) = listener.accept().await?;
190 let mut buffer = [0u8; 1024];
191
192 for _i in 0..iterations {
193 // 读取客户端消息
194 match stream.read(&mut buffer).await {
195 Ok(n) if n >= 32 => {
196 let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197 let header = PortalHeader::from_bytes(&header_bytes);
198
199 if header.verify_checksum() {
200 server_ops += 1;
201 server_bytes += n as u64;
202
203 // 回复消息
204 let response = PortalHeader::new(2, 1024, header.sequence);
205 let response_bytes = response.to_bytes();
206 stream.write_all(&response_bytes).await?;
207 server_bytes += 32;
208 }
209 }
210 Ok(0) => break,
211 Ok(_) => continue,
212 Err(e) => {
213 error!("服务器读取错误: {}", e);
214 break;
215 }
216 }
217 }
218
219 server_tx.send((server_ops, server_bytes)).await.unwrap();
220 Ok::<(), anyhow::Error>(())
221 });
222
223 // TCP客户端任务
224 let client_barrier = barrier.clone();
225 let client_tx = tx_results.clone();
226 let client_task = tokio::spawn(async move {
227 tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228
229 let mut stream = TcpStream::connect(addr).await?;
230 client_barrier.wait().await;
231
232 let mut client_ops = 0u64;
233 let mut client_bytes = 0u64;
234 let mut buffer = [0u8; 1024];
235
236 for i in 0..iterations {
237 // 发送消息
238 let header = PortalHeader::new(1, 1024, i);
239 let header_bytes = header.to_bytes();
240 stream.write_all(&header_bytes).await?;
241 client_bytes += 32;
242
243 // 读取回复
244 match stream.read(&mut buffer).await {
245 Ok(n) if n >= 32 => {
246 let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247 let response = PortalHeader::from_bytes(&response_bytes);
248
249 if response.verify_checksum() {
250 client_ops += 1;
251 client_bytes += n as u64;
252 }
253 }
254 Ok(_) => continue,
255 Err(e) => {
256 error!("客户端读取错误: {}", e);
257 break;
258 }
259 }
260
261 if i % 10_000 == 0 {
262 tokio::task::yield_now().await;
263 }
264 }
265
266 client_tx.send((client_ops, client_bytes)).await.unwrap();
267 Ok::<(), anyhow::Error>(())
268 });
269
270 // 等待任务完成
271 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272 server_result?;
273 client_result?;
274
275 // 收集结果
276 drop(tx_results);
277 let mut total_ops = 0u64;
278 let mut total_bytes = 0u64;
279
280 while let Some((ops, bytes)) = rx_results.recv().await {
281 total_ops += ops;
282 total_bytes += bytes;
283 }
284
285 let duration = start_time.elapsed();
286 let duration_secs = duration.as_secs_f64();
287 let ops_per_sec = total_ops as f64 / duration_secs;
288 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290
291 Ok(TestResult {
292 test_name: "Rust ↔ Rust".to_string(),
293 transport_mode: "TCP网络".to_string(),
294 total_operations: total_ops,
295 duration_secs,
296 ops_per_sec,
297 throughput_mbps,
298 avg_latency_us,
299 bytes_transferred: total_bytes,
300 })
301}
Sourcepub fn from_bytes(bytes: &[u8; 32]) -> Self
pub fn from_bytes(bytes: &[u8; 32]) -> Self
Examples found in repository?
examples/cross_language_test.rs (line 71)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47 info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48
49 let iterations = 1_000_000; // 100万次双向操作
50 let barrier = Arc::new(Barrier::new(2));
51 let (tx_results, mut rx_results) = mpsc::channel(2);
52
53 let start_time = Instant::now();
54
55 // 服务器端任务
56 let server_barrier = barrier.clone();
57 let server_tx = tx_results.clone();
58 let server_task = tokio::spawn(async move {
59 // 创建共享内存段
60 let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61 server_barrier.wait().await;
62
63 let mut server_ops = 0u64;
64 let mut server_bytes = 0u64;
65
66 for i in 0..iterations {
67 // 读取客户端消息
68 let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69 let mut header_bytes = [0u8; 32];
70 header_bytes.copy_from_slice(read_data);
71 let header = PortalHeader::from_bytes(&header_bytes);
72
73 if header.verify_checksum() {
74 server_ops += 1;
75 server_bytes += 32;
76
77 // 回复消息
78 let response = PortalHeader::new(2, 1024, i);
79 let response_bytes = response.to_bytes();
80 unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81 server_bytes += 32;
82 }
83
84 // 每10万次让出控制权
85 if i % 100_000 == 0 {
86 tokio::task::yield_now().await;
87 }
88 }
89
90 server_tx.send((server_ops, server_bytes)).await.unwrap();
91 Ok::<(), anyhow::Error>(())
92 });
93
94 // 客户端任务
95 let client_barrier = barrier.clone();
96 let client_tx = tx_results.clone();
97 let client_task = tokio::spawn(async move {
98 tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99
100 // 连接到相同的共享内存段
101 let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102 client_barrier.wait().await;
103
104 let mut client_ops = 0u64;
105 let mut client_bytes = 0u64;
106
107 for i in 0..iterations {
108 // 发送消息
109 let header = PortalHeader::new(1, 1024, i);
110 let header_bytes = header.to_bytes();
111 unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112 client_bytes += 32;
113
114 // 读取回复
115 let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116 let mut response_bytes = [0u8; 32];
117 response_bytes.copy_from_slice(read_data);
118 let response = PortalHeader::from_bytes(&response_bytes);
119
120 if response.verify_checksum() {
121 client_ops += 1;
122 client_bytes += 32;
123 }
124
125 if i % 100_000 == 0 {
126 tokio::task::yield_now().await;
127 }
128 }
129
130 client_tx.send((client_ops, client_bytes)).await.unwrap();
131 Ok::<(), anyhow::Error>(())
132 });
133
134 // 等待任务完成
135 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136 server_result?;
137 client_result?;
138
139 // 收集结果
140 drop(tx_results);
141 let mut total_ops = 0u64;
142 let mut total_bytes = 0u64;
143
144 while let Some((ops, bytes)) = rx_results.recv().await {
145 total_ops += ops;
146 total_bytes += bytes;
147 }
148
149 let duration = start_time.elapsed();
150 let duration_secs = duration.as_secs_f64();
151 let ops_per_sec = total_ops as f64 / duration_secs;
152 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154
155 Ok(TestResult {
156 test_name: "Rust ↔ Rust".to_string(),
157 transport_mode: "共享内存".to_string(),
158 total_operations: total_ops,
159 duration_secs,
160 ops_per_sec,
161 throughput_mbps,
162 avg_latency_us,
163 bytes_transferred: total_bytes,
164 })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169 info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170
171 let iterations = 100_000; // 10万次双向操作(TCP较慢)
172 let addr = "127.0.0.1:9091";
173 let barrier = Arc::new(Barrier::new(2));
174 let (tx_results, mut rx_results) = mpsc::channel(2);
175
176 let start_time = Instant::now();
177
178 // TCP服务器任务
179 let server_barrier = barrier.clone();
180 let server_tx = tx_results.clone();
181 let server_task = tokio::spawn(async move {
182 let listener = TcpListener::bind(addr).await?;
183 info!("TCP服务器已启动: {}", addr);
184 server_barrier.wait().await;
185
186 let mut server_ops = 0u64;
187 let mut server_bytes = 0u64;
188
189 let (mut stream, _) = listener.accept().await?;
190 let mut buffer = [0u8; 1024];
191
192 for _i in 0..iterations {
193 // 读取客户端消息
194 match stream.read(&mut buffer).await {
195 Ok(n) if n >= 32 => {
196 let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197 let header = PortalHeader::from_bytes(&header_bytes);
198
199 if header.verify_checksum() {
200 server_ops += 1;
201 server_bytes += n as u64;
202
203 // 回复消息
204 let response = PortalHeader::new(2, 1024, header.sequence);
205 let response_bytes = response.to_bytes();
206 stream.write_all(&response_bytes).await?;
207 server_bytes += 32;
208 }
209 }
210 Ok(0) => break,
211 Ok(_) => continue,
212 Err(e) => {
213 error!("服务器读取错误: {}", e);
214 break;
215 }
216 }
217 }
218
219 server_tx.send((server_ops, server_bytes)).await.unwrap();
220 Ok::<(), anyhow::Error>(())
221 });
222
223 // TCP客户端任务
224 let client_barrier = barrier.clone();
225 let client_tx = tx_results.clone();
226 let client_task = tokio::spawn(async move {
227 tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228
229 let mut stream = TcpStream::connect(addr).await?;
230 client_barrier.wait().await;
231
232 let mut client_ops = 0u64;
233 let mut client_bytes = 0u64;
234 let mut buffer = [0u8; 1024];
235
236 for i in 0..iterations {
237 // 发送消息
238 let header = PortalHeader::new(1, 1024, i);
239 let header_bytes = header.to_bytes();
240 stream.write_all(&header_bytes).await?;
241 client_bytes += 32;
242
243 // 读取回复
244 match stream.read(&mut buffer).await {
245 Ok(n) if n >= 32 => {
246 let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247 let response = PortalHeader::from_bytes(&response_bytes);
248
249 if response.verify_checksum() {
250 client_ops += 1;
251 client_bytes += n as u64;
252 }
253 }
254 Ok(_) => continue,
255 Err(e) => {
256 error!("客户端读取错误: {}", e);
257 break;
258 }
259 }
260
261 if i % 10_000 == 0 {
262 tokio::task::yield_now().await;
263 }
264 }
265
266 client_tx.send((client_ops, client_bytes)).await.unwrap();
267 Ok::<(), anyhow::Error>(())
268 });
269
270 // 等待任务完成
271 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272 server_result?;
273 client_result?;
274
275 // 收集结果
276 drop(tx_results);
277 let mut total_ops = 0u64;
278 let mut total_bytes = 0u64;
279
280 while let Some((ops, bytes)) = rx_results.recv().await {
281 total_ops += ops;
282 total_bytes += bytes;
283 }
284
285 let duration = start_time.elapsed();
286 let duration_secs = duration.as_secs_f64();
287 let ops_per_sec = total_ops as f64 / duration_secs;
288 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290
291 Ok(TestResult {
292 test_name: "Rust ↔ Rust".to_string(),
293 transport_mode: "TCP网络".to_string(),
294 total_operations: total_ops,
295 duration_secs,
296 ops_per_sec,
297 throughput_mbps,
298 avg_latency_us,
299 bytes_transferred: total_bytes,
300 })
301}
Sourcepub fn verify_checksum(&self) -> bool
pub fn verify_checksum(&self) -> bool
Examples found in repository?
examples/cross_language_test.rs (line 73)
46pub async fn test_rust_rust_shared_memory() -> Result<TestResult> {
47 info!("🚀 开始测试: Rust ↔ Rust 共享内存双向通信");
48
49 let iterations = 1_000_000; // 100万次双向操作
50 let barrier = Arc::new(Barrier::new(2));
51 let (tx_results, mut rx_results) = mpsc::channel(2);
52
53 let start_time = Instant::now();
54
55 // 服务器端任务
56 let server_barrier = barrier.clone();
57 let server_tx = tx_results.clone();
58 let server_task = tokio::spawn(async move {
59 // 创建共享内存段
60 let shm = SharedMemoryTransport::new("/utp_test_server", 1024 * 1024)?;
61 server_barrier.wait().await;
62
63 let mut server_ops = 0u64;
64 let mut server_bytes = 0u64;
65
66 for i in 0..iterations {
67 // 读取客户端消息
68 let read_data = unsafe { shm.read_zero_copy(0, 32)? };
69 let mut header_bytes = [0u8; 32];
70 header_bytes.copy_from_slice(read_data);
71 let header = PortalHeader::from_bytes(&header_bytes);
72
73 if header.verify_checksum() {
74 server_ops += 1;
75 server_bytes += 32;
76
77 // 回复消息
78 let response = PortalHeader::new(2, 1024, i);
79 let response_bytes = response.to_bytes();
80 unsafe { shm.write_zero_copy(&response_bytes, 32)? };
81 server_bytes += 32;
82 }
83
84 // 每10万次让出控制权
85 if i % 100_000 == 0 {
86 tokio::task::yield_now().await;
87 }
88 }
89
90 server_tx.send((server_ops, server_bytes)).await.unwrap();
91 Ok::<(), anyhow::Error>(())
92 });
93
94 // 客户端任务
95 let client_barrier = barrier.clone();
96 let client_tx = tx_results.clone();
97 let client_task = tokio::spawn(async move {
98 tokio::time::sleep(Duration::from_millis(10)).await; // 确保服务器先启动
99
100 // 连接到相同的共享内存段
101 let shm = SharedMemoryTransport::new("/utp_test_client", 1024 * 1024)?;
102 client_barrier.wait().await;
103
104 let mut client_ops = 0u64;
105 let mut client_bytes = 0u64;
106
107 for i in 0..iterations {
108 // 发送消息
109 let header = PortalHeader::new(1, 1024, i);
110 let header_bytes = header.to_bytes();
111 unsafe { shm.write_zero_copy(&header_bytes, 0)? };
112 client_bytes += 32;
113
114 // 读取回复
115 let read_data = unsafe { shm.read_zero_copy(32, 32)? };
116 let mut response_bytes = [0u8; 32];
117 response_bytes.copy_from_slice(read_data);
118 let response = PortalHeader::from_bytes(&response_bytes);
119
120 if response.verify_checksum() {
121 client_ops += 1;
122 client_bytes += 32;
123 }
124
125 if i % 100_000 == 0 {
126 tokio::task::yield_now().await;
127 }
128 }
129
130 client_tx.send((client_ops, client_bytes)).await.unwrap();
131 Ok::<(), anyhow::Error>(())
132 });
133
134 // 等待任务完成
135 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
136 server_result?;
137 client_result?;
138
139 // 收集结果
140 drop(tx_results);
141 let mut total_ops = 0u64;
142 let mut total_bytes = 0u64;
143
144 while let Some((ops, bytes)) = rx_results.recv().await {
145 total_ops += ops;
146 total_bytes += bytes;
147 }
148
149 let duration = start_time.elapsed();
150 let duration_secs = duration.as_secs_f64();
151 let ops_per_sec = total_ops as f64 / duration_secs;
152 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
153 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
154
155 Ok(TestResult {
156 test_name: "Rust ↔ Rust".to_string(),
157 transport_mode: "共享内存".to_string(),
158 total_operations: total_ops,
159 duration_secs,
160 ops_per_sec,
161 throughput_mbps,
162 avg_latency_us,
163 bytes_transferred: total_bytes,
164 })
165}
166
167/// 测试2: Rust ↔ Rust TCP双向通信
168pub async fn test_rust_rust_tcp() -> Result<TestResult> {
169 info!("🚀 开始测试: Rust ↔ Rust TCP双向通信");
170
171 let iterations = 100_000; // 10万次双向操作(TCP较慢)
172 let addr = "127.0.0.1:9091";
173 let barrier = Arc::new(Barrier::new(2));
174 let (tx_results, mut rx_results) = mpsc::channel(2);
175
176 let start_time = Instant::now();
177
178 // TCP服务器任务
179 let server_barrier = barrier.clone();
180 let server_tx = tx_results.clone();
181 let server_task = tokio::spawn(async move {
182 let listener = TcpListener::bind(addr).await?;
183 info!("TCP服务器已启动: {}", addr);
184 server_barrier.wait().await;
185
186 let mut server_ops = 0u64;
187 let mut server_bytes = 0u64;
188
189 let (mut stream, _) = listener.accept().await?;
190 let mut buffer = [0u8; 1024];
191
192 for _i in 0..iterations {
193 // 读取客户端消息
194 match stream.read(&mut buffer).await {
195 Ok(n) if n >= 32 => {
196 let header_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
197 let header = PortalHeader::from_bytes(&header_bytes);
198
199 if header.verify_checksum() {
200 server_ops += 1;
201 server_bytes += n as u64;
202
203 // 回复消息
204 let response = PortalHeader::new(2, 1024, header.sequence);
205 let response_bytes = response.to_bytes();
206 stream.write_all(&response_bytes).await?;
207 server_bytes += 32;
208 }
209 }
210 Ok(0) => break,
211 Ok(_) => continue,
212 Err(e) => {
213 error!("服务器读取错误: {}", e);
214 break;
215 }
216 }
217 }
218
219 server_tx.send((server_ops, server_bytes)).await.unwrap();
220 Ok::<(), anyhow::Error>(())
221 });
222
223 // TCP客户端任务
224 let client_barrier = barrier.clone();
225 let client_tx = tx_results.clone();
226 let client_task = tokio::spawn(async move {
227 tokio::time::sleep(Duration::from_millis(100)).await; // 等待服务器启动
228
229 let mut stream = TcpStream::connect(addr).await?;
230 client_barrier.wait().await;
231
232 let mut client_ops = 0u64;
233 let mut client_bytes = 0u64;
234 let mut buffer = [0u8; 1024];
235
236 for i in 0..iterations {
237 // 发送消息
238 let header = PortalHeader::new(1, 1024, i);
239 let header_bytes = header.to_bytes();
240 stream.write_all(&header_bytes).await?;
241 client_bytes += 32;
242
243 // 读取回复
244 match stream.read(&mut buffer).await {
245 Ok(n) if n >= 32 => {
246 let response_bytes: [u8; 32] = buffer[..32].try_into().unwrap();
247 let response = PortalHeader::from_bytes(&response_bytes);
248
249 if response.verify_checksum() {
250 client_ops += 1;
251 client_bytes += n as u64;
252 }
253 }
254 Ok(_) => continue,
255 Err(e) => {
256 error!("客户端读取错误: {}", e);
257 break;
258 }
259 }
260
261 if i % 10_000 == 0 {
262 tokio::task::yield_now().await;
263 }
264 }
265
266 client_tx.send((client_ops, client_bytes)).await.unwrap();
267 Ok::<(), anyhow::Error>(())
268 });
269
270 // 等待任务完成
271 let (server_result, client_result) = tokio::try_join!(server_task, client_task)?;
272 server_result?;
273 client_result?;
274
275 // 收集结果
276 drop(tx_results);
277 let mut total_ops = 0u64;
278 let mut total_bytes = 0u64;
279
280 while let Some((ops, bytes)) = rx_results.recv().await {
281 total_ops += ops;
282 total_bytes += bytes;
283 }
284
285 let duration = start_time.elapsed();
286 let duration_secs = duration.as_secs_f64();
287 let ops_per_sec = total_ops as f64 / duration_secs;
288 let throughput_mbps = (total_bytes as f64 / duration_secs) / (1024.0 * 1024.0);
289 let avg_latency_us = (duration_secs / total_ops as f64) * 1_000_000.0;
290
291 Ok(TestResult {
292 test_name: "Rust ↔ Rust".to_string(),
293 transport_mode: "TCP网络".to_string(),
294 total_operations: total_ops,
295 duration_secs,
296 ops_per_sec,
297 throughput_mbps,
298 avg_latency_us,
299 bytes_transferred: total_bytes,
300 })
301}
Trait Implementations§
Source§impl Clone for PortalHeader
impl Clone for PortalHeader
Source§fn clone(&self) -> PortalHeader
fn clone(&self) -> PortalHeader
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreSource§impl Debug for PortalHeader
impl Debug for PortalHeader
impl Copy for PortalHeader
Auto Trait Implementations§
impl Freeze for PortalHeader
impl RefUnwindSafe for PortalHeader
impl Send for PortalHeader
impl Sync for PortalHeader
impl Unpin for PortalHeader
impl UnwindSafe for PortalHeader
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more