pjson_rs/infrastructure/integration/
simd_acceleration.rs

1// SIMD acceleration module using sonic-rs for high-performance JSON operations
2//
3// This module provides SIMD-accelerated JSON processing for streaming operations,
4// significantly improving performance for large-scale data serialization.
5
6use crate::stream::StreamFrame;
7use bytes::{BufMut, BytesMut};
8use sonic_rs::{JsonValueTrait, LazyValue};
9
10/// High-performance SIMD-accelerated serializer for stream frames
11pub struct SimdFrameSerializer {
12    /// Pre-allocated buffer for serialization
13    buffer: BytesMut,
14    /// Statistics for performance monitoring
15    stats: SerializationStats,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct SerializationStats {
20    pub frames_processed: usize,
21    pub bytes_written: usize,
22    pub simd_operations: usize,
23}
24
25impl SimdFrameSerializer {
26    /// Create a new SIMD serializer with initial capacity
27    pub fn with_capacity(capacity: usize) -> Self {
28        Self {
29            buffer: BytesMut::with_capacity(capacity),
30            stats: SerializationStats::default(),
31        }
32    }
33
34    /// Serialize a single frame using SIMD acceleration
35    pub fn serialize_frame(&mut self, frame: &StreamFrame) -> Result<&[u8], sonic_rs::Error> {
36        self.buffer.clear();
37
38        // Use sonic-rs for SIMD-accelerated serialization
39        let serialized = sonic_rs::to_vec(frame)?;
40        self.buffer.extend_from_slice(&serialized);
41
42        self.stats.frames_processed += 1;
43        self.stats.bytes_written += self.buffer.len();
44        self.stats.simd_operations += 1;
45
46        Ok(&self.buffer)
47    }
48
49    /// Batch serialize multiple frames with SIMD optimization
50    pub fn serialize_batch(&mut self, frames: &[StreamFrame]) -> Result<BytesMut, sonic_rs::Error> {
51        // Estimate capacity for better performance
52        let estimated_size = frames.len() * 256; // ~256 bytes per frame estimate
53        let mut output = BytesMut::with_capacity(estimated_size);
54
55        for frame in frames {
56            let serialized = sonic_rs::to_vec(frame)?;
57            output.extend_from_slice(&serialized);
58            output.put_u8(b'\n'); // NDJSON separator
59        }
60
61        self.stats.frames_processed += frames.len();
62        self.stats.bytes_written += output.len();
63        self.stats.simd_operations += frames.len();
64
65        Ok(output)
66    }
67
68    /// Serialize frames to Server-Sent Events format with SIMD
69    pub fn serialize_sse_batch(
70        &mut self,
71        frames: &[StreamFrame],
72    ) -> Result<BytesMut, sonic_rs::Error> {
73        let estimated_size = frames.len() * 300; // SSE overhead + JSON
74        let mut output = BytesMut::with_capacity(estimated_size);
75
76        for frame in frames {
77            output.extend_from_slice(b"data: ");
78            let serialized = sonic_rs::to_vec(frame)?;
79            output.extend_from_slice(&serialized);
80            output.extend_from_slice(b"\n\n");
81        }
82
83        self.stats.frames_processed += frames.len();
84        self.stats.bytes_written += output.len();
85        self.stats.simd_operations += frames.len();
86
87        Ok(output)
88    }
89
90    /// Get serialization statistics
91    pub fn stats(&self) -> &SerializationStats {
92        &self.stats
93    }
94
95    /// Reset statistics
96    pub fn reset_stats(&mut self) {
97        self.stats = SerializationStats::default();
98    }
99}
100
101/// SIMD-accelerated JSON validator and parser
102pub struct SimdJsonProcessor;
103
104impl SimdJsonProcessor {
105    /// Validate JSON using SIMD acceleration
106    pub fn validate_json(input: &[u8]) -> Result<(), sonic_rs::Error> {
107        // sonic-rs uses SIMD for ultra-fast validation
108        let _doc = sonic_rs::from_slice::<sonic_rs::Value>(input)?;
109        Ok(())
110    }
111
112    /// Parse and extract specific fields using SIMD with zero-copy where possible
113    pub fn extract_priority_field(input: &[u8]) -> Result<Option<u8>, sonic_rs::Error> {
114        let doc = sonic_rs::from_slice::<LazyValue<'_>>(input)?;
115
116        if let Some(priority_value) = doc.get("priority")
117            && let Some(priority) = priority_value.as_u64()
118        {
119            return Ok(Some(priority as u8));
120        }
121
122        Ok(None)
123    }
124
125    /// Batch validate multiple JSON documents
126    pub fn validate_batch(inputs: &[&[u8]]) -> Vec<Result<(), sonic_rs::Error>> {
127        inputs
128            .iter()
129            .map(|input| Self::validate_json(input))
130            .collect()
131    }
132}
133
134/// SIMD-optimized buffer for stream processing
135pub struct SimdStreamBuffer {
136    /// Main data buffer, AVX-512 aligned
137    data: BytesMut,
138    /// Position tracking for efficient processing
139    position: usize,
140    /// Capacity management
141    capacity: usize,
142}
143
144impl SimdStreamBuffer {
145    /// Create a new SIMD-optimized buffer with specified capacity
146    pub fn with_capacity(capacity: usize) -> Self {
147        // Ensure capacity is aligned for SIMD operations
148        let aligned_capacity = (capacity + 63) & !63; // Align to 64 bytes for AVX-512
149
150        Self {
151            data: BytesMut::with_capacity(aligned_capacity),
152            position: 0,
153            capacity: aligned_capacity,
154        }
155    }
156
157    /// Write a frame to buffer using SIMD serialization
158    pub fn write_frame(&mut self, frame: &StreamFrame) -> Result<usize, sonic_rs::Error> {
159        let start_pos = self.data.len();
160
161        // Ensure we have enough space
162        self.ensure_capacity(512); // Reserve space for frame
163
164        let serialized = sonic_rs::to_vec(frame)?;
165        self.data.extend_from_slice(&serialized);
166
167        let bytes_written = self.data.len() - start_pos;
168        Ok(bytes_written)
169    }
170
171    /// Write multiple frames efficiently
172    pub fn write_frames(&mut self, frames: &[StreamFrame]) -> Result<usize, sonic_rs::Error> {
173        let start_len = self.data.len();
174
175        // Pre-allocate space for all frames
176        self.ensure_capacity(frames.len() * 256);
177
178        for frame in frames {
179            let serialized = sonic_rs::to_vec(frame)?;
180            self.data.extend_from_slice(&serialized);
181            self.data.put_u8(b'\n');
182        }
183
184        Ok(self.data.len() - start_len)
185    }
186
187    /// Get the current buffer contents
188    pub fn as_slice(&self) -> &[u8] {
189        &self.data
190    }
191
192    /// Consume the buffer and return bytes
193    pub fn into_bytes(self) -> bytes::Bytes {
194        self.data.freeze()
195    }
196
197    /// Clear the buffer for reuse
198    pub fn clear(&mut self) {
199        self.data.clear();
200        self.position = 0;
201    }
202
203    /// Ensure buffer has at least the specified additional capacity
204    fn ensure_capacity(&mut self, additional: usize) {
205        if self.data.len() + additional > self.capacity {
206            let new_capacity = ((self.data.len() + additional) * 2 + 63) & !63;
207            self.data.reserve(new_capacity - self.data.capacity());
208            self.capacity = new_capacity;
209        }
210    }
211}
212
213/// Configuration for SIMD operations
214#[derive(Debug, Clone)]
215pub struct SimdConfig {
216    /// Buffer size for batch operations
217    pub batch_size: usize,
218    /// Initial capacity for buffers
219    pub initial_capacity: usize,
220    /// Enable statistics collection
221    pub collect_stats: bool,
222}
223
224impl Default for SimdConfig {
225    fn default() -> Self {
226        Self {
227            batch_size: 100,
228            initial_capacity: 8192, // 8KB initial capacity
229            collect_stats: false,
230        }
231    }
232}
233
234/// High-level SIMD streaming interface
235pub struct SimdStreamProcessor {
236    serializer: SimdFrameSerializer,
237    buffer: SimdStreamBuffer,
238    config: SimdConfig,
239}
240
241impl SimdStreamProcessor {
242    /// Create a new SIMD stream processor
243    pub fn new(config: SimdConfig) -> Self {
244        Self {
245            serializer: SimdFrameSerializer::with_capacity(config.initial_capacity),
246            buffer: SimdStreamBuffer::with_capacity(config.initial_capacity),
247            config,
248        }
249    }
250
251    /// Process frames to JSON format with SIMD acceleration
252    pub fn process_to_json(
253        &mut self,
254        frames: &[StreamFrame],
255    ) -> Result<bytes::Bytes, sonic_rs::Error> {
256        let result = self.serializer.serialize_batch(frames)?;
257        Ok(result.freeze())
258    }
259
260    /// Process frames to Server-Sent Events format
261    pub fn process_to_sse(
262        &mut self,
263        frames: &[StreamFrame],
264    ) -> Result<bytes::Bytes, sonic_rs::Error> {
265        let result = self.serializer.serialize_sse_batch(frames)?;
266        Ok(result.freeze())
267    }
268
269    /// Process frames to NDJSON with buffered approach
270    pub fn process_to_ndjson(
271        &mut self,
272        frames: &[StreamFrame],
273    ) -> Result<bytes::Bytes, sonic_rs::Error> {
274        self.buffer.clear();
275        self.buffer.write_frames(frames)?;
276        let data = self.buffer.as_slice().to_vec();
277        Ok(data.into())
278    }
279
280    /// Get processing statistics if enabled
281    pub fn stats(&self) -> Option<&SerializationStats> {
282        if self.config.collect_stats {
283            Some(self.serializer.stats())
284        } else {
285            None
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use crate::domain::Priority;
294    use std::collections::HashMap;
295
296    #[test]
297    fn test_simd_frame_serialization() {
298        let mut serializer = SimdFrameSerializer::with_capacity(1024);
299
300        let frame = StreamFrame {
301            data: serde_json::json!({"test": "data", "priority": "high"}),
302            priority: Priority::HIGH,
303            metadata: HashMap::new(),
304        };
305
306        let result = serializer.serialize_frame(&frame);
307        assert!(result.is_ok());
308
309        let serialized = result.unwrap();
310        assert!(!serialized.is_empty());
311
312        // Verify we can deserialize back
313        let parsed: serde_json::Value = sonic_rs::from_slice(serialized).unwrap();
314        assert_eq!(parsed["data"]["test"], "data");
315    }
316
317    #[test]
318    fn test_batch_serialization() {
319        let mut serializer = SimdFrameSerializer::with_capacity(2048);
320
321        let frames = vec![
322            StreamFrame {
323                data: serde_json::json!({"id": 1}),
324                priority: Priority::HIGH,
325                metadata: HashMap::new(),
326            },
327            StreamFrame {
328                data: serde_json::json!({"id": 2}),
329                priority: Priority::MEDIUM,
330                metadata: HashMap::new(),
331            },
332        ];
333
334        let result = serializer.serialize_batch(&frames);
335        assert!(result.is_ok());
336
337        let serialized = result.unwrap();
338        assert!(!serialized.is_empty());
339
340        // Should contain both frames separated by newlines
341        let content = String::from_utf8(serialized.to_vec()).unwrap();
342        assert!(content.contains("\"id\":1"));
343        assert!(content.contains("\"id\":2"));
344    }
345
346    #[test]
347    fn test_simd_json_validation() {
348        let valid_json = br#"{"test": "value", "number": 42}"#;
349        let invalid_json = br#"{"test": "value", "number": 42"#;
350
351        assert!(SimdJsonProcessor::validate_json(valid_json).is_ok());
352        assert!(SimdJsonProcessor::validate_json(invalid_json).is_err());
353    }
354
355    #[test]
356    fn test_priority_extraction() {
357        let json_with_priority = br#"{"data": "test", "priority": 5}"#;
358        let json_without_priority = br#"{"data": "test"}"#;
359
360        let result1 = SimdJsonProcessor::extract_priority_field(json_with_priority).unwrap();
361        assert_eq!(result1, Some(5));
362
363        let result2 = SimdJsonProcessor::extract_priority_field(json_without_priority).unwrap();
364        assert_eq!(result2, None);
365    }
366
367    #[test]
368    fn test_simd_stream_buffer() {
369        let mut buffer = SimdStreamBuffer::with_capacity(1024);
370
371        let frame = StreamFrame {
372            data: serde_json::json!({"buffer_test": true}),
373            priority: Priority::HIGH,
374            metadata: HashMap::new(),
375        };
376
377        let bytes_written = buffer.write_frame(&frame).unwrap();
378        assert!(bytes_written > 0);
379
380        let content = buffer.as_slice();
381        assert!(!content.is_empty());
382
383        // Verify content is valid JSON
384        let parsed: serde_json::Value = sonic_rs::from_slice(content).unwrap();
385        assert_eq!(parsed["data"]["buffer_test"], true);
386    }
387
388    #[test]
389    fn test_full_simd_processor() {
390        let config = SimdConfig {
391            batch_size: 50,
392            initial_capacity: 2048,
393            collect_stats: true,
394        };
395
396        let mut processor = SimdStreamProcessor::new(config);
397
398        let frames = vec![StreamFrame {
399            data: serde_json::json!({"processor": "test", "id": 1}),
400            priority: Priority::CRITICAL,
401            metadata: HashMap::new(),
402        }];
403
404        let json_result = processor.process_to_json(&frames);
405        assert!(json_result.is_ok());
406
407        let sse_result = processor.process_to_sse(&frames);
408        assert!(sse_result.is_ok());
409
410        // Verify SSE format
411        let sse_content = String::from_utf8(sse_result.unwrap().to_vec()).unwrap();
412        assert!(sse_content.starts_with("data: "));
413        assert!(sse_content.ends_with("\n\n"));
414
415        // Check stats if enabled
416        if let Some(stats) = processor.stats() {
417            assert!(stats.frames_processed > 0);
418        }
419    }
420}