leptos_ws_pro/
zero_copy.rs

1//! Zero-Copy Serialization Implementation
2//!
3//! High-performance serialization using rkyv for minimal allocation
4//! and maximum throughput in WebSocket communications
5
6use crate::codec::{Codec, CodecError};
7use std::marker::PhantomData;
8use serde::{Serialize, Deserialize};
9
10#[cfg(feature = "zero-copy")]
11use rkyv::{Archive, Serialize as RkyvSerialize, Deserialize as RkyvDeserialize, to_bytes, from_bytes};
12
13/// Zero-copy codec using rkyv serialization
14pub struct ZeroCopyCodec<T> {
15    _phantom: PhantomData<T>,
16}
17
18impl<T> ZeroCopyCodec<T> {
19    pub fn new() -> Self {
20        Self {
21            _phantom: PhantomData,
22        }
23    }
24}
25
26impl<T> Default for ZeroCopyCodec<T> {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32#[cfg(feature = "zero-copy")]
33impl<T> Codec<T> for ZeroCopyCodec<T>
34where
35    T: Archive + RkyvSerialize<rkyv::rancor::Strategy<rkyv::rancor::Panic, rkyv::rancor::Panic>> + for<'a> RkyvDeserialize<T, rkyv::rancor::Strategy<rkyv::rancor::Panic, rkyv::rancor::Panic>> + Clone + Send + Sync + 'static,
36    T::Archived: rkyv::Deserialize<T, rkyv::rancor::Strategy<rkyv::rancor::Panic, rkyv::rancor::Panic>>,
37{
38    fn encode(&self, message: &T) -> Result<Vec<u8>, CodecError> {
39        to_bytes(message)
40            .map_err(|e| CodecError::SerializationFailed(format!("rkyv serialization failed: {}", e)))
41            .map(|bytes| bytes.to_vec())
42    }
43
44    fn decode(&self, data: &[u8]) -> Result<T, CodecError> {
45        from_bytes(data)
46            .map_err(|e| CodecError::DeserializationFailed(format!("rkyv deserialization failed: {}", e)))
47    }
48
49    fn content_type(&self) -> &'static str {
50        "application/rkyv"
51    }
52}
53
54#[cfg(not(feature = "zero-copy"))]
55impl<T> Codec<T> for ZeroCopyCodec<T>
56where
57    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
58{
59    fn encode(&self, message: &T) -> Result<Vec<u8>, CodecError> {
60        serde_json::to_vec(message)
61            .map_err(|e| CodecError::SerializationFailed(format!("JSON fallback serialization failed: {}", e)))
62    }
63
64    fn decode(&self, data: &[u8]) -> Result<T, CodecError> {
65        serde_json::from_slice(data)
66            .map_err(|e| CodecError::DeserializationFailed(format!("JSON fallback deserialization failed: {}", e)))
67    }
68
69    fn content_type(&self) -> &'static str {
70        "application/json"
71    }
72}
73
74/// High-performance message with zero-copy deserialization support
75#[derive(Clone, Debug, PartialEq)]
76#[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
77#[cfg_attr(not(feature = "zero-copy"), derive(Serialize, Deserialize))]
78pub struct ZeroCopyMessage<T> {
79    pub id: String,
80    pub timestamp: u64,
81    pub payload: T,
82    pub metadata: MessageMetadata,
83}
84
85#[derive(Clone, Debug, PartialEq)]
86#[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
87#[cfg_attr(not(feature = "zero-copy"), derive(Serialize, Deserialize))]
88pub struct MessageMetadata {
89    pub content_type: String,
90    pub compression: Option<String>,
91    pub priority: u8,
92    pub ttl: Option<u64>,
93}
94
95impl<T> ZeroCopyMessage<T> {
96    pub fn new(id: String, payload: T) -> Self {
97        Self {
98            id,
99            timestamp: std::time::SystemTime::now()
100                .duration_since(std::time::UNIX_EPOCH)
101                .unwrap_or_default()
102                .as_millis() as u64,
103            payload,
104            metadata: MessageMetadata {
105                content_type: "application/rkyv".to_string(),
106                compression: None,
107                priority: 5,
108                ttl: None,
109            },
110        }
111    }
112
113    pub fn with_priority(mut self, priority: u8) -> Self {
114        self.metadata.priority = priority;
115        self
116    }
117
118    pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
119        self.metadata.ttl = Some(ttl_seconds);
120        self
121    }
122
123    pub fn is_expired(&self) -> bool {
124        if let Some(ttl) = self.metadata.ttl {
125            let now = std::time::SystemTime::now()
126                .duration_since(std::time::UNIX_EPOCH)
127                .unwrap_or_default()
128                .as_secs();
129
130            (self.timestamp / 1000) + ttl < now
131        } else {
132            false
133        }
134    }
135}
136
137/// Batch message container for efficient bulk operations
138#[derive(Clone, Debug)]
139#[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
140#[cfg_attr(not(feature = "zero-copy"), derive(Serialize, Deserialize))]
141pub struct MessageBatch<T> {
142    pub batch_id: String,
143    pub messages: Vec<ZeroCopyMessage<T>>,
144    pub created_at: u64,
145}
146
147impl<T> MessageBatch<T> {
148    pub fn new() -> Self {
149        Self {
150            batch_id: format!("batch_{}", uuid::Uuid::new_v4()),
151            messages: Vec::new(),
152            created_at: std::time::SystemTime::now()
153                .duration_since(std::time::UNIX_EPOCH)
154                .unwrap_or_default()
155                .as_secs(),
156        }
157    }
158
159    pub fn add_message(&mut self, message: ZeroCopyMessage<T>) {
160        self.messages.push(message);
161    }
162
163    pub fn len(&self) -> usize {
164        self.messages.len()
165    }
166
167    pub fn is_empty(&self) -> bool {
168        self.messages.is_empty()
169    }
170
171    pub fn clear(&mut self) {
172        self.messages.clear();
173    }
174}
175
176impl<T> Default for MessageBatch<T> {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182/// Zero-copy buffer for memory-efficient message handling
183pub struct ZeroCopyBuffer {
184    data: Vec<u8>,
185    positions: Vec<MessagePosition>,
186}
187
188#[derive(Debug, Clone)]
189struct MessagePosition {
190    start: usize,
191    end: usize,
192    message_type: String,
193}
194
195impl ZeroCopyBuffer {
196    pub fn new() -> Self {
197        Self {
198            data: Vec::new(),
199            positions: Vec::new(),
200        }
201    }
202
203    pub fn with_capacity(capacity: usize) -> Self {
204        Self {
205            data: Vec::with_capacity(capacity),
206            positions: Vec::new(),
207        }
208    }
209
210    /// Append message data without copying
211    pub fn append_message<T>(&mut self, message: &T, codec: &ZeroCopyCodec<T>) -> Result<usize, CodecError>
212    where
213        T: Clone + Send + Sync + 'static,
214        ZeroCopyCodec<T>: Codec<T>,
215    {
216        let start_pos = self.data.len();
217        let encoded = codec.encode(message)?;
218
219        self.data.extend_from_slice(&encoded);
220        let end_pos = self.data.len();
221
222        let message_index = self.positions.len();
223        self.positions.push(MessagePosition {
224            start: start_pos,
225            end: end_pos,
226            message_type: codec.content_type().to_string(),
227        });
228
229        Ok(message_index)
230    }
231
232    /// Get message data without copying
233    pub fn get_message_slice(&self, index: usize) -> Option<&[u8]> {
234        self.positions.get(index).map(|pos| &self.data[pos.start..pos.end])
235    }
236
237    /// Decode message from buffer position
238    pub fn decode_message<T>(&self, index: usize, codec: &ZeroCopyCodec<T>) -> Result<T, CodecError>
239    where
240        ZeroCopyCodec<T>: Codec<T>,
241        T: Send + Sync,
242    {
243        if let Some(slice) = self.get_message_slice(index) {
244            codec.decode(slice)
245        } else {
246            Err(CodecError::DeserializationFailed("Invalid message index".to_string()))
247        }
248    }
249
250    pub fn message_count(&self) -> usize {
251        self.positions.len()
252    }
253
254    pub fn total_size(&self) -> usize {
255        self.data.len()
256    }
257
258    pub fn clear(&mut self) {
259        self.data.clear();
260        self.positions.clear();
261    }
262
263    /// Compact buffer by removing unused space
264    pub fn compact(&mut self) {
265        if self.positions.is_empty() {
266            self.data.clear();
267            return;
268        }
269
270        // Shift all messages to remove gaps
271        let mut write_pos = 0;
272        for position in &mut self.positions {
273            let message_len = position.end - position.start;
274            if position.start != write_pos {
275                self.data.copy_within(position.start..position.end, write_pos);
276            }
277            position.start = write_pos;
278            position.end = write_pos + message_len;
279            write_pos += message_len;
280        }
281
282        self.data.truncate(write_pos);
283    }
284}
285
286impl Default for ZeroCopyBuffer {
287    fn default() -> Self {
288        Self::new()
289    }
290}
291
292/// Performance benchmarking for zero-copy operations
293pub struct ZeroCopyBenchmark {
294    iterations: usize,
295    message_size: usize,
296}
297
298impl ZeroCopyBenchmark {
299    pub fn new(iterations: usize, message_size: usize) -> Self {
300        Self {
301            iterations,
302            message_size,
303        }
304    }
305
306    /// Benchmark serialization performance
307    pub fn benchmark_serialization<T>(&self, message: &T, codec: &ZeroCopyCodec<T>) -> BenchmarkResult
308    where
309        T: Clone + Send + Sync + 'static,
310        ZeroCopyCodec<T>: Codec<T>,
311    {
312        let start = std::time::Instant::now();
313        let mut total_bytes = 0;
314
315        for _ in 0..self.iterations {
316            match codec.encode(message) {
317                Ok(data) => total_bytes += data.len(),
318                Err(_) => continue,
319            }
320        }
321
322        let elapsed = start.elapsed();
323
324        BenchmarkResult {
325            iterations: self.iterations,
326            total_time: elapsed,
327            total_bytes,
328            throughput_mbps: (total_bytes as f64 / elapsed.as_secs_f64()) / 1_000_000.0,
329            operations_per_second: self.iterations as f64 / elapsed.as_secs_f64(),
330        }
331    }
332
333    /// Benchmark deserialization performance
334    pub fn benchmark_deserialization<T>(&self, data: &[u8], codec: &ZeroCopyCodec<T>) -> BenchmarkResult
335    where
336        T: Send + Sync,
337        ZeroCopyCodec<T>: Codec<T>,
338    {
339        let start = std::time::Instant::now();
340        let mut successful_ops = 0;
341
342        for _ in 0..self.iterations {
343            if codec.decode(data).is_ok() {
344                successful_ops += 1;
345            }
346        }
347
348        let elapsed = start.elapsed();
349        let total_bytes = data.len() * successful_ops;
350
351        BenchmarkResult {
352            iterations: successful_ops,
353            total_time: elapsed,
354            total_bytes,
355            throughput_mbps: (total_bytes as f64 / elapsed.as_secs_f64()) / 1_000_000.0,
356            operations_per_second: successful_ops as f64 / elapsed.as_secs_f64(),
357        }
358    }
359}
360
361#[derive(Debug, Clone)]
362pub struct BenchmarkResult {
363    pub iterations: usize,
364    pub total_time: std::time::Duration,
365    pub total_bytes: usize,
366    pub throughput_mbps: f64,
367    pub operations_per_second: f64,
368}
369
370impl BenchmarkResult {
371    pub fn print_summary(&self) {
372        println!("Benchmark Results:");
373        println!("  Iterations: {}", self.iterations);
374        println!("  Total Time: {:?}", self.total_time);
375        println!("  Total Bytes: {}", self.total_bytes);
376        println!("  Throughput: {:.2} MB/s", self.throughput_mbps);
377        println!("  Operations/sec: {:.2}", self.operations_per_second);
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use serde::{Serialize, Deserialize};
385
386    #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
387    #[cfg_attr(feature = "zero-copy", derive(Archive, RkyvSerialize, RkyvDeserialize))]
388    struct TestData {
389        id: u32,
390        name: String,
391        values: Vec<f64>,
392    }
393
394    #[test]
395    fn test_zero_copy_codec() {
396        let codec = ZeroCopyCodec::<TestData>::new();
397        let test_data = TestData {
398            id: 123,
399            name: "test".to_string(),
400            values: vec![1.0, 2.0, 3.0],
401        };
402
403        let encoded = codec.encode(&test_data).unwrap();
404        let decoded = codec.decode(&encoded).unwrap();
405
406        assert_eq!(test_data, decoded);
407    }
408
409    #[test]
410    fn test_zero_copy_message() {
411        let test_data = TestData {
412            id: 456,
413            name: "message_test".to_string(),
414            values: vec![4.0, 5.0, 6.0],
415        };
416
417        let message = ZeroCopyMessage::new("msg_1".to_string(), test_data.clone())
418            .with_priority(8)
419            .with_ttl(300);
420
421        assert_eq!(message.payload, test_data);
422        assert_eq!(message.metadata.priority, 8);
423        assert_eq!(message.metadata.ttl, Some(300));
424        assert!(!message.is_expired());
425    }
426
427    #[test]
428    fn test_message_batch() {
429        let mut batch = MessageBatch::<TestData>::new();
430
431        let data1 = TestData {
432            id: 1,
433            name: "batch1".to_string(),
434            values: vec![1.0],
435        };
436
437        let data2 = TestData {
438            id: 2,
439            name: "batch2".to_string(),
440            values: vec![2.0],
441        };
442
443        batch.add_message(ZeroCopyMessage::new("1".to_string(), data1));
444        batch.add_message(ZeroCopyMessage::new("2".to_string(), data2));
445
446        assert_eq!(batch.len(), 2);
447        assert!(!batch.is_empty());
448    }
449
450    #[test]
451    fn test_zero_copy_buffer() {
452        let mut buffer = ZeroCopyBuffer::new();
453        let codec = ZeroCopyCodec::<TestData>::new();
454
455        let test_data = TestData {
456            id: 789,
457            name: "buffer_test".to_string(),
458            values: vec![7.0, 8.0, 9.0],
459        };
460
461        let index = buffer.append_message(&test_data, &codec).unwrap();
462        assert_eq!(index, 0);
463        assert_eq!(buffer.message_count(), 1);
464
465        let decoded = buffer.decode_message(index, &codec).unwrap();
466        assert_eq!(test_data, decoded);
467    }
468
469    #[test]
470    fn test_buffer_compact() {
471        let mut buffer = ZeroCopyBuffer::with_capacity(1024);
472        let codec = ZeroCopyCodec::<TestData>::new();
473
474        for i in 0..5 {
475            let data = TestData {
476                id: i,
477                name: format!("test_{}", i),
478                values: vec![i as f64],
479            };
480            buffer.append_message(&data, &codec).unwrap();
481        }
482
483        let size_before = buffer.total_size();
484        buffer.compact();
485        let size_after = buffer.total_size();
486
487        assert_eq!(buffer.message_count(), 5);
488        assert!(size_after <= size_before);
489    }
490
491    #[cfg(feature = "zero-copy")]
492    #[test]
493    fn test_performance_comparison() {
494        use crate::codec::JsonCodec;
495
496        let test_data = TestData {
497            id: 12345,
498            name: "performance_test".to_string(),
499            values: (0..1000).map(|i| i as f64).collect(),
500        };
501
502        let zero_copy_codec = ZeroCopyCodec::new();
503        let json_codec = JsonCodec::new();
504
505        // Encode with both codecs
506        let rkyv_encoded = zero_copy_codec.encode(&test_data).unwrap();
507        let json_encoded = json_codec.encode(&test_data).unwrap();
508
509        println!("rkyv size: {} bytes", rkyv_encoded.len());
510        println!("JSON size: {} bytes", json_encoded.len());
511
512        // rkyv should be more compact
513        assert!(rkyv_encoded.len() <= json_encoded.len());
514    }
515}