1use crate::codec::{Codec, CodecError};
7use std::marker::PhantomData;
8use serde::{Serialize, Deserialize};
9
10#[cfg(feature = "zero-copy")]
11use rkyv::{Archive, Serialize as RkyvSerialize, Deserialize as RkyvDeserialize, to_bytes, from_bytes};
12
13pub struct ZeroCopyCodec<T> {
15 _phantom: PhantomData<T>,
16}
17
18impl<T> ZeroCopyCodec<T> {
19 pub fn new() -> Self {
20 Self {
21 _phantom: PhantomData,
22 }
23 }
24}
25
26impl<T> Default for ZeroCopyCodec<T> {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32#[cfg(feature = "zero-copy")]
33impl<T> Codec<T> for ZeroCopyCodec<T>
34where
35 T: Archive + RkyvSerialize<rkyv::rancor::Strategy<rkyv::rancor::Panic, rkyv::rancor::Panic>> + for<'a> RkyvDeserialize<T, rkyv::rancor::Strategy<rkyv::rancor::Panic, rkyv::rancor::Panic>> + Clone + Send + Sync + 'static,
36 T::Archived: rkyv::Deserialize<T, rkyv::rancor::Strategy<rkyv::rancor::Panic, rkyv::rancor::Panic>>,
37{
38 fn encode(&self, message: &T) -> Result<Vec<u8>, CodecError> {
39 to_bytes(message)
40 .map_err(|e| CodecError::SerializationFailed(format!("rkyv serialization failed: {}", e)))
41 .map(|bytes| bytes.to_vec())
42 }
43
44 fn decode(&self, data: &[u8]) -> Result<T, CodecError> {
45 from_bytes(data)
46 .map_err(|e| CodecError::DeserializationFailed(format!("rkyv deserialization failed: {}", e)))
47 }
48
49 fn content_type(&self) -> &'static str {
50 "application/rkyv"
51 }
52}
53
54#[cfg(not(feature = "zero-copy"))]
55impl<T> Codec<T> for ZeroCopyCodec<T>
56where
57 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
58{
59 fn encode(&self, message: &T) -> Result<Vec<u8>, CodecError> {
60 serde_json::to_vec(message)
61 .map_err(|e| CodecError::SerializationFailed(format!("JSON fallback serialization failed: {}", e)))
62 }
63
64 fn decode(&self, data: &[u8]) -> Result<T, CodecError> {
65 serde_json::from_slice(data)
66 .map_err(|e| CodecError::DeserializationFailed(format!("JSON fallback deserialization failed: {}", e)))
67 }
68
69 fn content_type(&self) -> &'static str {
70 "application/json"
71 }
72}
73
74#[derive(Clone, Debug, PartialEq)]
76#[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
77#[cfg_attr(not(feature = "zero-copy"), derive(Serialize, Deserialize))]
78pub struct ZeroCopyMessage<T> {
79 pub id: String,
80 pub timestamp: u64,
81 pub payload: T,
82 pub metadata: MessageMetadata,
83}
84
85#[derive(Clone, Debug, PartialEq)]
86#[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
87#[cfg_attr(not(feature = "zero-copy"), derive(Serialize, Deserialize))]
88pub struct MessageMetadata {
89 pub content_type: String,
90 pub compression: Option<String>,
91 pub priority: u8,
92 pub ttl: Option<u64>,
93}
94
95impl<T> ZeroCopyMessage<T> {
96 pub fn new(id: String, payload: T) -> Self {
97 Self {
98 id,
99 timestamp: std::time::SystemTime::now()
100 .duration_since(std::time::UNIX_EPOCH)
101 .unwrap_or_default()
102 .as_millis() as u64,
103 payload,
104 metadata: MessageMetadata {
105 content_type: "application/rkyv".to_string(),
106 compression: None,
107 priority: 5,
108 ttl: None,
109 },
110 }
111 }
112
113 pub fn with_priority(mut self, priority: u8) -> Self {
114 self.metadata.priority = priority;
115 self
116 }
117
118 pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
119 self.metadata.ttl = Some(ttl_seconds);
120 self
121 }
122
123 pub fn is_expired(&self) -> bool {
124 if let Some(ttl) = self.metadata.ttl {
125 let now = std::time::SystemTime::now()
126 .duration_since(std::time::UNIX_EPOCH)
127 .unwrap_or_default()
128 .as_secs();
129
130 (self.timestamp / 1000) + ttl < now
131 } else {
132 false
133 }
134 }
135}
136
137#[derive(Clone, Debug)]
139#[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
140#[cfg_attr(not(feature = "zero-copy"), derive(Serialize, Deserialize))]
141pub struct MessageBatch<T> {
142 pub batch_id: String,
143 pub messages: Vec<ZeroCopyMessage<T>>,
144 pub created_at: u64,
145}
146
147impl<T> MessageBatch<T> {
148 pub fn new() -> Self {
149 Self {
150 batch_id: format!("batch_{}", uuid::Uuid::new_v4()),
151 messages: Vec::new(),
152 created_at: std::time::SystemTime::now()
153 .duration_since(std::time::UNIX_EPOCH)
154 .unwrap_or_default()
155 .as_secs(),
156 }
157 }
158
159 pub fn add_message(&mut self, message: ZeroCopyMessage<T>) {
160 self.messages.push(message);
161 }
162
163 pub fn len(&self) -> usize {
164 self.messages.len()
165 }
166
167 pub fn is_empty(&self) -> bool {
168 self.messages.is_empty()
169 }
170
171 pub fn clear(&mut self) {
172 self.messages.clear();
173 }
174}
175
176impl<T> Default for MessageBatch<T> {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182pub struct ZeroCopyBuffer {
184 data: Vec<u8>,
185 positions: Vec<MessagePosition>,
186}
187
188#[derive(Debug, Clone)]
189struct MessagePosition {
190 start: usize,
191 end: usize,
192 message_type: String,
193}
194
195impl ZeroCopyBuffer {
196 pub fn new() -> Self {
197 Self {
198 data: Vec::new(),
199 positions: Vec::new(),
200 }
201 }
202
203 pub fn with_capacity(capacity: usize) -> Self {
204 Self {
205 data: Vec::with_capacity(capacity),
206 positions: Vec::new(),
207 }
208 }
209
210 pub fn append_message<T>(&mut self, message: &T, codec: &ZeroCopyCodec<T>) -> Result<usize, CodecError>
212 where
213 T: Clone + Send + Sync + 'static,
214 ZeroCopyCodec<T>: Codec<T>,
215 {
216 let start_pos = self.data.len();
217 let encoded = codec.encode(message)?;
218
219 self.data.extend_from_slice(&encoded);
220 let end_pos = self.data.len();
221
222 let message_index = self.positions.len();
223 self.positions.push(MessagePosition {
224 start: start_pos,
225 end: end_pos,
226 message_type: codec.content_type().to_string(),
227 });
228
229 Ok(message_index)
230 }
231
232 pub fn get_message_slice(&self, index: usize) -> Option<&[u8]> {
234 self.positions.get(index).map(|pos| &self.data[pos.start..pos.end])
235 }
236
237 pub fn decode_message<T>(&self, index: usize, codec: &ZeroCopyCodec<T>) -> Result<T, CodecError>
239 where
240 ZeroCopyCodec<T>: Codec<T>,
241 T: Send + Sync,
242 {
243 if let Some(slice) = self.get_message_slice(index) {
244 codec.decode(slice)
245 } else {
246 Err(CodecError::DeserializationFailed("Invalid message index".to_string()))
247 }
248 }
249
250 pub fn message_count(&self) -> usize {
251 self.positions.len()
252 }
253
254 pub fn total_size(&self) -> usize {
255 self.data.len()
256 }
257
258 pub fn clear(&mut self) {
259 self.data.clear();
260 self.positions.clear();
261 }
262
263 pub fn compact(&mut self) {
265 if self.positions.is_empty() {
266 self.data.clear();
267 return;
268 }
269
270 let mut write_pos = 0;
272 for position in &mut self.positions {
273 let message_len = position.end - position.start;
274 if position.start != write_pos {
275 self.data.copy_within(position.start..position.end, write_pos);
276 }
277 position.start = write_pos;
278 position.end = write_pos + message_len;
279 write_pos += message_len;
280 }
281
282 self.data.truncate(write_pos);
283 }
284}
285
286impl Default for ZeroCopyBuffer {
287 fn default() -> Self {
288 Self::new()
289 }
290}
291
292pub struct ZeroCopyBenchmark {
294 iterations: usize,
295 message_size: usize,
296}
297
298impl ZeroCopyBenchmark {
299 pub fn new(iterations: usize, message_size: usize) -> Self {
300 Self {
301 iterations,
302 message_size,
303 }
304 }
305
306 pub fn benchmark_serialization<T>(&self, message: &T, codec: &ZeroCopyCodec<T>) -> BenchmarkResult
308 where
309 T: Clone + Send + Sync + 'static,
310 ZeroCopyCodec<T>: Codec<T>,
311 {
312 let start = std::time::Instant::now();
313 let mut total_bytes = 0;
314
315 for _ in 0..self.iterations {
316 match codec.encode(message) {
317 Ok(data) => total_bytes += data.len(),
318 Err(_) => continue,
319 }
320 }
321
322 let elapsed = start.elapsed();
323
324 BenchmarkResult {
325 iterations: self.iterations,
326 total_time: elapsed,
327 total_bytes,
328 throughput_mbps: (total_bytes as f64 / elapsed.as_secs_f64()) / 1_000_000.0,
329 operations_per_second: self.iterations as f64 / elapsed.as_secs_f64(),
330 }
331 }
332
333 pub fn benchmark_deserialization<T>(&self, data: &[u8], codec: &ZeroCopyCodec<T>) -> BenchmarkResult
335 where
336 T: Send + Sync,
337 ZeroCopyCodec<T>: Codec<T>,
338 {
339 let start = std::time::Instant::now();
340 let mut successful_ops = 0;
341
342 for _ in 0..self.iterations {
343 if codec.decode(data).is_ok() {
344 successful_ops += 1;
345 }
346 }
347
348 let elapsed = start.elapsed();
349 let total_bytes = data.len() * successful_ops;
350
351 BenchmarkResult {
352 iterations: successful_ops,
353 total_time: elapsed,
354 total_bytes,
355 throughput_mbps: (total_bytes as f64 / elapsed.as_secs_f64()) / 1_000_000.0,
356 operations_per_second: successful_ops as f64 / elapsed.as_secs_f64(),
357 }
358 }
359}
360
361#[derive(Debug, Clone)]
362pub struct BenchmarkResult {
363 pub iterations: usize,
364 pub total_time: std::time::Duration,
365 pub total_bytes: usize,
366 pub throughput_mbps: f64,
367 pub operations_per_second: f64,
368}
369
370impl BenchmarkResult {
371 pub fn print_summary(&self) {
372 println!("Benchmark Results:");
373 println!(" Iterations: {}", self.iterations);
374 println!(" Total Time: {:?}", self.total_time);
375 println!(" Total Bytes: {}", self.total_bytes);
376 println!(" Throughput: {:.2} MB/s", self.throughput_mbps);
377 println!(" Operations/sec: {:.2}", self.operations_per_second);
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use serde::{Serialize, Deserialize};
385
386 #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
387 #[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
388 struct TestData {
389 id: u32,
390 name: String,
391 values: Vec<f64>,
392 }
393
394 #[test]
395 fn test_zero_copy_codec() {
396 let codec = ZeroCopyCodec::<TestData>::new();
397 let test_data = TestData {
398 id: 123,
399 name: "test".to_string(),
400 values: vec![1.0, 2.0, 3.0],
401 };
402
403 let encoded = codec.encode(&test_data).unwrap();
404 let decoded = codec.decode(&encoded).unwrap();
405
406 assert_eq!(test_data, decoded);
407 }
408
409 #[test]
410 fn test_zero_copy_message() {
411 let test_data = TestData {
412 id: 456,
413 name: "message_test".to_string(),
414 values: vec![4.0, 5.0, 6.0],
415 };
416
417 let message = ZeroCopyMessage::new("msg_1".to_string(), test_data.clone())
418 .with_priority(8)
419 .with_ttl(300);
420
421 assert_eq!(message.payload, test_data);
422 assert_eq!(message.metadata.priority, 8);
423 assert_eq!(message.metadata.ttl, Some(300));
424 assert!(!message.is_expired());
425 }
426
427 #[test]
428 fn test_message_batch() {
429 let mut batch = MessageBatch::<TestData>::new();
430
431 let data1 = TestData {
432 id: 1,
433 name: "batch1".to_string(),
434 values: vec![1.0],
435 };
436
437 let data2 = TestData {
438 id: 2,
439 name: "batch2".to_string(),
440 values: vec![2.0],
441 };
442
443 batch.add_message(ZeroCopyMessage::new("1".to_string(), data1));
444 batch.add_message(ZeroCopyMessage::new("2".to_string(), data2));
445
446 assert_eq!(batch.len(), 2);
447 assert!(!batch.is_empty());
448 }
449
450 #[test]
451 fn test_zero_copy_buffer() {
452 let mut buffer = ZeroCopyBuffer::new();
453 let codec = ZeroCopyCodec::<TestData>::new();
454
455 let test_data = TestData {
456 id: 789,
457 name: "buffer_test".to_string(),
458 values: vec![7.0, 8.0, 9.0],
459 };
460
461 let index = buffer.append_message(&test_data, &codec).unwrap();
462 assert_eq!(index, 0);
463 assert_eq!(buffer.message_count(), 1);
464
465 let decoded = buffer.decode_message(index, &codec).unwrap();
466 assert_eq!(test_data, decoded);
467 }
468
469 #[test]
470 fn test_buffer_compact() {
471 let mut buffer = ZeroCopyBuffer::with_capacity(1024);
472 let codec = ZeroCopyCodec::<TestData>::new();
473
474 for i in 0..5 {
475 let data = TestData {
476 id: i,
477 name: format!("test_{}", i),
478 values: vec![i as f64],
479 };
480 buffer.append_message(&data, &codec).unwrap();
481 }
482
483 let size_before = buffer.total_size();
484 buffer.compact();
485 let size_after = buffer.total_size();
486
487 assert_eq!(buffer.message_count(), 5);
488 assert!(size_after <= size_before);
489 }
490
491 #[cfg(feature = "zero-copy")]
492 #[test]
493 fn test_performance_comparison() {
494 use crate::codec::JsonCodec;
495
496 let test_data = TestData {
497 id: 12345,
498 name: "performance_test".to_string(),
499 values: (0..1000).map(|i| i as f64).collect(),
500 };
501
502 let zero_copy_codec = ZeroCopyCodec::new();
503 let json_codec = JsonCodec::new();
504
505 let rkyv_encoded = zero_copy_codec.encode(&test_data).unwrap();
507 let json_encoded = json_codec.encode(&test_data).unwrap();
508
509 println!("rkyv size: {} bytes", rkyv_encoded.len());
510 println!("JSON size: {} bytes", json_encoded.len());
511
512 assert!(rkyv_encoded.len() <= json_encoded.len());
514 }
515}