pjson_rs/infrastructure/integration/
simd_acceleration.rs1use crate::stream::StreamFrame;
7use bytes::{BufMut, BytesMut};
8use sonic_rs::{JsonValueTrait, LazyValue};
9
10pub struct SimdFrameSerializer {
12 buffer: BytesMut,
14 stats: SerializationStats,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct SerializationStats {
20 pub frames_processed: usize,
21 pub bytes_written: usize,
22 pub simd_operations: usize,
23}
24
25impl SimdFrameSerializer {
26 pub fn with_capacity(capacity: usize) -> Self {
28 Self {
29 buffer: BytesMut::with_capacity(capacity),
30 stats: SerializationStats::default(),
31 }
32 }
33
34 pub fn serialize_frame(&mut self, frame: &StreamFrame) -> Result<&[u8], sonic_rs::Error> {
36 self.buffer.clear();
37
38 let serialized = sonic_rs::to_vec(frame)?;
40 self.buffer.extend_from_slice(&serialized);
41
42 self.stats.frames_processed += 1;
43 self.stats.bytes_written += self.buffer.len();
44 self.stats.simd_operations += 1;
45
46 Ok(&self.buffer)
47 }
48
49 pub fn serialize_batch(&mut self, frames: &[StreamFrame]) -> Result<BytesMut, sonic_rs::Error> {
51 let estimated_size = frames.len() * 256; let mut output = BytesMut::with_capacity(estimated_size);
54
55 for frame in frames {
56 let serialized = sonic_rs::to_vec(frame)?;
57 output.extend_from_slice(&serialized);
58 output.put_u8(b'\n'); }
60
61 self.stats.frames_processed += frames.len();
62 self.stats.bytes_written += output.len();
63 self.stats.simd_operations += frames.len();
64
65 Ok(output)
66 }
67
68 pub fn serialize_sse_batch(
70 &mut self,
71 frames: &[StreamFrame],
72 ) -> Result<BytesMut, sonic_rs::Error> {
73 let estimated_size = frames.len() * 300; let mut output = BytesMut::with_capacity(estimated_size);
75
76 for frame in frames {
77 output.extend_from_slice(b"data: ");
78 let serialized = sonic_rs::to_vec(frame)?;
79 output.extend_from_slice(&serialized);
80 output.extend_from_slice(b"\n\n");
81 }
82
83 self.stats.frames_processed += frames.len();
84 self.stats.bytes_written += output.len();
85 self.stats.simd_operations += frames.len();
86
87 Ok(output)
88 }
89
90 pub fn stats(&self) -> &SerializationStats {
92 &self.stats
93 }
94
95 pub fn reset_stats(&mut self) {
97 self.stats = SerializationStats::default();
98 }
99}
100
101pub struct SimdJsonProcessor;
103
104impl SimdJsonProcessor {
105 pub fn validate_json(input: &[u8]) -> Result<(), sonic_rs::Error> {
107 let _doc = sonic_rs::from_slice::<sonic_rs::Value>(input)?;
109 Ok(())
110 }
111
112 pub fn extract_priority_field(input: &[u8]) -> Result<Option<u8>, sonic_rs::Error> {
114 let doc = sonic_rs::from_slice::<LazyValue<'_>>(input)?;
115
116 if let Some(priority_value) = doc.get("priority")
117 && let Some(priority) = priority_value.as_u64()
118 {
119 return Ok(Some(priority as u8));
120 }
121
122 Ok(None)
123 }
124
125 pub fn validate_batch(inputs: &[&[u8]]) -> Vec<Result<(), sonic_rs::Error>> {
127 inputs
128 .iter()
129 .map(|input| Self::validate_json(input))
130 .collect()
131 }
132}
133
134pub struct SimdStreamBuffer {
136 data: BytesMut,
138 position: usize,
140 capacity: usize,
142}
143
144impl SimdStreamBuffer {
145 pub fn with_capacity(capacity: usize) -> Self {
147 let aligned_capacity = (capacity + 63) & !63; Self {
151 data: BytesMut::with_capacity(aligned_capacity),
152 position: 0,
153 capacity: aligned_capacity,
154 }
155 }
156
157 pub fn write_frame(&mut self, frame: &StreamFrame) -> Result<usize, sonic_rs::Error> {
159 let start_pos = self.data.len();
160
161 self.ensure_capacity(512); let serialized = sonic_rs::to_vec(frame)?;
165 self.data.extend_from_slice(&serialized);
166
167 let bytes_written = self.data.len() - start_pos;
168 Ok(bytes_written)
169 }
170
171 pub fn write_frames(&mut self, frames: &[StreamFrame]) -> Result<usize, sonic_rs::Error> {
173 let start_len = self.data.len();
174
175 self.ensure_capacity(frames.len() * 256);
177
178 for frame in frames {
179 let serialized = sonic_rs::to_vec(frame)?;
180 self.data.extend_from_slice(&serialized);
181 self.data.put_u8(b'\n');
182 }
183
184 Ok(self.data.len() - start_len)
185 }
186
187 pub fn as_slice(&self) -> &[u8] {
189 &self.data
190 }
191
192 pub fn into_bytes(self) -> bytes::Bytes {
194 self.data.freeze()
195 }
196
197 pub fn clear(&mut self) {
199 self.data.clear();
200 self.position = 0;
201 }
202
203 fn ensure_capacity(&mut self, additional: usize) {
205 if self.data.len() + additional > self.capacity {
206 let new_capacity = ((self.data.len() + additional) * 2 + 63) & !63;
207 self.data.reserve(new_capacity - self.data.capacity());
208 self.capacity = new_capacity;
209 }
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct SimdConfig {
216 pub batch_size: usize,
218 pub initial_capacity: usize,
220 pub collect_stats: bool,
222}
223
224impl Default for SimdConfig {
225 fn default() -> Self {
226 Self {
227 batch_size: 100,
228 initial_capacity: 8192, collect_stats: false,
230 }
231 }
232}
233
234pub struct SimdStreamProcessor {
236 serializer: SimdFrameSerializer,
237 buffer: SimdStreamBuffer,
238 config: SimdConfig,
239}
240
241impl SimdStreamProcessor {
242 pub fn new(config: SimdConfig) -> Self {
244 Self {
245 serializer: SimdFrameSerializer::with_capacity(config.initial_capacity),
246 buffer: SimdStreamBuffer::with_capacity(config.initial_capacity),
247 config,
248 }
249 }
250
251 pub fn process_to_json(
253 &mut self,
254 frames: &[StreamFrame],
255 ) -> Result<bytes::Bytes, sonic_rs::Error> {
256 let result = self.serializer.serialize_batch(frames)?;
257 Ok(result.freeze())
258 }
259
260 pub fn process_to_sse(
262 &mut self,
263 frames: &[StreamFrame],
264 ) -> Result<bytes::Bytes, sonic_rs::Error> {
265 let result = self.serializer.serialize_sse_batch(frames)?;
266 Ok(result.freeze())
267 }
268
269 pub fn process_to_ndjson(
271 &mut self,
272 frames: &[StreamFrame],
273 ) -> Result<bytes::Bytes, sonic_rs::Error> {
274 self.buffer.clear();
275 self.buffer.write_frames(frames)?;
276 let data = self.buffer.as_slice().to_vec();
277 Ok(data.into())
278 }
279
280 pub fn stats(&self) -> Option<&SerializationStats> {
282 if self.config.collect_stats {
283 Some(self.serializer.stats())
284 } else {
285 None
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use crate::domain::Priority;
294 use std::collections::HashMap;
295
296 #[test]
297 fn test_simd_frame_serialization() {
298 let mut serializer = SimdFrameSerializer::with_capacity(1024);
299
300 let frame = StreamFrame {
301 data: serde_json::json!({"test": "data", "priority": "high"}),
302 priority: Priority::HIGH,
303 metadata: HashMap::new(),
304 };
305
306 let result = serializer.serialize_frame(&frame);
307 assert!(result.is_ok());
308
309 let serialized = result.unwrap();
310 assert!(!serialized.is_empty());
311
312 let parsed: serde_json::Value = sonic_rs::from_slice(serialized).unwrap();
314 assert_eq!(parsed["data"]["test"], "data");
315 }
316
317 #[test]
318 fn test_batch_serialization() {
319 let mut serializer = SimdFrameSerializer::with_capacity(2048);
320
321 let frames = vec![
322 StreamFrame {
323 data: serde_json::json!({"id": 1}),
324 priority: Priority::HIGH,
325 metadata: HashMap::new(),
326 },
327 StreamFrame {
328 data: serde_json::json!({"id": 2}),
329 priority: Priority::MEDIUM,
330 metadata: HashMap::new(),
331 },
332 ];
333
334 let result = serializer.serialize_batch(&frames);
335 assert!(result.is_ok());
336
337 let serialized = result.unwrap();
338 assert!(!serialized.is_empty());
339
340 let content = String::from_utf8(serialized.to_vec()).unwrap();
342 assert!(content.contains("\"id\":1"));
343 assert!(content.contains("\"id\":2"));
344 }
345
346 #[test]
347 fn test_simd_json_validation() {
348 let valid_json = br#"{"test": "value", "number": 42}"#;
349 let invalid_json = br#"{"test": "value", "number": 42"#;
350
351 assert!(SimdJsonProcessor::validate_json(valid_json).is_ok());
352 assert!(SimdJsonProcessor::validate_json(invalid_json).is_err());
353 }
354
355 #[test]
356 fn test_priority_extraction() {
357 let json_with_priority = br#"{"data": "test", "priority": 5}"#;
358 let json_without_priority = br#"{"data": "test"}"#;
359
360 let result1 = SimdJsonProcessor::extract_priority_field(json_with_priority).unwrap();
361 assert_eq!(result1, Some(5));
362
363 let result2 = SimdJsonProcessor::extract_priority_field(json_without_priority).unwrap();
364 assert_eq!(result2, None);
365 }
366
367 #[test]
368 fn test_simd_stream_buffer() {
369 let mut buffer = SimdStreamBuffer::with_capacity(1024);
370
371 let frame = StreamFrame {
372 data: serde_json::json!({"buffer_test": true}),
373 priority: Priority::HIGH,
374 metadata: HashMap::new(),
375 };
376
377 let bytes_written = buffer.write_frame(&frame).unwrap();
378 assert!(bytes_written > 0);
379
380 let content = buffer.as_slice();
381 assert!(!content.is_empty());
382
383 let parsed: serde_json::Value = sonic_rs::from_slice(content).unwrap();
385 assert_eq!(parsed["data"]["buffer_test"], true);
386 }
387
388 #[test]
389 fn test_full_simd_processor() {
390 let config = SimdConfig {
391 batch_size: 50,
392 initial_capacity: 2048,
393 collect_stats: true,
394 };
395
396 let mut processor = SimdStreamProcessor::new(config);
397
398 let frames = vec![StreamFrame {
399 data: serde_json::json!({"processor": "test", "id": 1}),
400 priority: Priority::CRITICAL,
401 metadata: HashMap::new(),
402 }];
403
404 let json_result = processor.process_to_json(&frames);
405 assert!(json_result.is_ok());
406
407 let sse_result = processor.process_to_sse(&frames);
408 assert!(sse_result.is_ok());
409
410 let sse_content = String::from_utf8(sse_result.unwrap().to_vec()).unwrap();
412 assert!(sse_content.starts_with("data: "));
413 assert!(sse_content.ends_with("\n\n"));
414
415 if let Some(stats) = processor.stats() {
417 assert!(stats.frames_processed > 0);
418 }
419 }
420}