1use anyhow::{anyhow, Result};
41use chrono::{DateTime, Utc};
42use serde::{Deserialize, Serialize};
43use std::collections::HashMap;
44use std::sync::Arc;
45use std::time::Instant;
46use tokio::sync::RwLock;
47use tracing::{debug, info};
48
49pub trait CustomSerializer: Send + Sync {
51 fn serialize(&self, data: &[u8]) -> Result<Vec<u8>>;
53
54 fn deserialize(&self, data: &[u8]) -> Result<Vec<u8>>;
56
57 fn format_name(&self) -> &str;
59
60 fn format_version(&self) -> &str {
62 "1.0.0"
63 }
64
65 fn magic_bytes(&self) -> Option<&[u8]> {
67 None
68 }
69
70 fn supports_zero_copy(&self) -> bool {
72 false
73 }
74
75 fn validate_schema(&self, _schema: &[u8], _data: &[u8]) -> Result<bool> {
77 Ok(true)
78 }
79
80 fn stats(&self) -> SerializerStats {
82 SerializerStats::default()
83 }
84}
85
86#[derive(Debug, Clone, Default, Serialize, Deserialize)]
88pub struct SerializerStats {
89 pub bytes_serialized: u64,
91
92 pub bytes_deserialized: u64,
94
95 pub serialization_count: u64,
97
98 pub deserialization_count: u64,
100
101 pub avg_serialization_time_ms: f64,
103
104 pub avg_deserialization_time_ms: f64,
106
107 pub error_count: u64,
109}
110
111pub struct SerializerRegistry {
113 serializers: Arc<RwLock<HashMap<String, Box<dyn CustomSerializer>>>>,
114 benchmarks: Arc<RwLock<HashMap<String, SerializerBenchmark>>>,
115}
116
117impl SerializerRegistry {
118 pub fn new() -> Self {
120 Self {
122 serializers: Arc::new(RwLock::new(HashMap::new())),
123 benchmarks: Arc::new(RwLock::new(HashMap::new())),
124 }
125 }
126
127 pub async fn register(&self, name: &str, serializer: Box<dyn CustomSerializer>) -> Result<()> {
129 let mut serializers = self.serializers.write().await;
130
131 if serializers.contains_key(name) {
132 return Err(anyhow!("Serializer '{}' already registered", name));
133 }
134
135 serializers.insert(name.to_string(), serializer);
136 info!("Registered custom serializer: {}", name);
137 Ok(())
138 }
139
140 pub async fn unregister(&self, name: &str) -> Result<()> {
142 let mut serializers = self.serializers.write().await;
143
144 if serializers.remove(name).is_some() {
145 info!("Unregistered serializer: {}", name);
146 Ok(())
147 } else {
148 Err(anyhow!("Serializer '{}' not found", name))
149 }
150 }
151
152 pub async fn get(&self, name: &str) -> Result<String> {
154 let serializers = self.serializers.read().await;
155
156 if serializers.contains_key(name) {
157 Ok(name.to_string())
158 } else {
159 Err(anyhow!("Serializer '{}' not found", name))
160 }
161 }
162
163 pub async fn list(&self) -> Vec<String> {
165 let serializers = self.serializers.read().await;
166 serializers.keys().cloned().collect()
167 }
168
169 pub async fn serialize(&self, format: &str, data: &[u8]) -> Result<Vec<u8>> {
171 let serializers = self.serializers.read().await;
172
173 let serializer = serializers
174 .get(format)
175 .ok_or_else(|| anyhow!("Serializer '{}' not found", format))?;
176
177 let start = Instant::now();
178 let result = serializer.serialize(data)?;
179 let duration = start.elapsed();
180
181 drop(serializers);
183 self.update_benchmark(format, duration.as_secs_f64() * 1000.0, true)
184 .await;
185
186 Ok(result)
187 }
188
189 pub async fn deserialize(&self, format: &str, data: &[u8]) -> Result<Vec<u8>> {
191 let serializers = self.serializers.read().await;
192
193 let serializer = serializers
194 .get(format)
195 .ok_or_else(|| anyhow!("Serializer '{}' not found", format))?;
196
197 let start = Instant::now();
198 let result = serializer.deserialize(data)?;
199 let duration = start.elapsed();
200
201 drop(serializers);
203 self.update_benchmark(format, duration.as_secs_f64() * 1000.0, false)
204 .await;
205
206 Ok(result)
207 }
208
209 pub async fn detect_format(&self, data: &[u8]) -> Option<String> {
211 let serializers = self.serializers.read().await;
212
213 for (name, serializer) in serializers.iter() {
214 if let Some(magic) = serializer.magic_bytes() {
215 if data.len() >= magic.len() && &data[0..magic.len()] == magic {
216 return Some(name.clone());
217 }
218 }
219 }
220
221 None
222 }
223
224 pub async fn get_benchmark(&self, format: &str) -> Option<SerializerBenchmark> {
226 let benchmarks = self.benchmarks.read().await;
227 benchmarks.get(format).cloned()
228 }
229
230 pub async fn all_benchmarks(&self) -> HashMap<String, SerializerBenchmark> {
232 let benchmarks = self.benchmarks.read().await;
233 benchmarks.clone()
234 }
235
236 async fn update_benchmark(&self, format: &str, duration_ms: f64, is_serialization: bool) {
238 let mut benchmarks = self.benchmarks.write().await;
239
240 let benchmark = benchmarks
241 .entry(format.to_string())
242 .or_insert_with(SerializerBenchmark::default);
243
244 if is_serialization {
245 benchmark.serialization_times.push(duration_ms);
246 benchmark.serialization_count += 1;
247 } else {
248 benchmark.deserialization_times.push(duration_ms);
249 benchmark.deserialization_count += 1;
250 }
251 }
252}
253
254impl Default for SerializerRegistry {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260#[derive(Debug, Clone, Default, Serialize, Deserialize)]
262pub struct SerializerBenchmark {
263 pub serialization_count: u64,
265
266 pub deserialization_count: u64,
268
269 pub serialization_times: Vec<f64>,
271
272 pub deserialization_times: Vec<f64>,
274
275 pub last_updated: Option<DateTime<Utc>>,
277}
278
279impl SerializerBenchmark {
280 pub fn avg_serialization_time(&self) -> f64 {
282 if self.serialization_times.is_empty() {
283 0.0
284 } else {
285 self.serialization_times.iter().sum::<f64>() / self.serialization_times.len() as f64
286 }
287 }
288
289 pub fn avg_deserialization_time(&self) -> f64 {
291 if self.deserialization_times.is_empty() {
292 0.0
293 } else {
294 self.deserialization_times.iter().sum::<f64>() / self.deserialization_times.len() as f64
295 }
296 }
297
298 pub fn p95_serialization_time(&self) -> f64 {
300 self.percentile(&self.serialization_times, 0.95)
301 }
302
303 pub fn p95_deserialization_time(&self) -> f64 {
305 self.percentile(&self.deserialization_times, 0.95)
306 }
307
308 fn percentile(&self, times: &[f64], p: f64) -> f64 {
310 if times.is_empty() {
311 return 0.0;
312 }
313
314 let mut sorted = times.to_vec();
315 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
316
317 let index = ((sorted.len() as f64 - 1.0) * p) as usize;
318 sorted[index]
319 }
320}
321
322pub struct BsonSerializer;
324
325impl CustomSerializer for BsonSerializer {
326 fn serialize(&self, data: &[u8]) -> Result<Vec<u8>> {
327 let mut result = Vec::new();
329 result.extend_from_slice(b"BSON");
330 result.extend_from_slice(data);
331 Ok(result)
332 }
333
334 fn deserialize(&self, data: &[u8]) -> Result<Vec<u8>> {
335 if data.len() < 4 {
336 return Err(anyhow!("Invalid BSON data"));
337 }
338 Ok(data[4..].to_vec())
339 }
340
341 fn format_name(&self) -> &str {
342 "bson"
343 }
344
345 fn magic_bytes(&self) -> Option<&[u8]> {
346 Some(b"BSON")
347 }
348}
349
350pub struct ThriftSerializer;
352
353impl CustomSerializer for ThriftSerializer {
354 fn serialize(&self, data: &[u8]) -> Result<Vec<u8>> {
355 let mut result = Vec::new();
357 result.extend_from_slice(b"THFT");
358 result.extend_from_slice(data);
359 Ok(result)
360 }
361
362 fn deserialize(&self, data: &[u8]) -> Result<Vec<u8>> {
363 if data.len() < 4 {
364 return Err(anyhow!("Invalid Thrift data"));
365 }
366 Ok(data[4..].to_vec())
367 }
368
369 fn format_name(&self) -> &str {
370 "thrift"
371 }
372
373 fn magic_bytes(&self) -> Option<&[u8]> {
374 Some(b"THFT")
375 }
376}
377
378pub struct FlexBuffersSerializer;
380
381impl CustomSerializer for FlexBuffersSerializer {
382 fn serialize(&self, data: &[u8]) -> Result<Vec<u8>> {
383 let mut result = Vec::new();
385 result.extend_from_slice(b"FLEX");
386 result.extend_from_slice(data);
387 Ok(result)
388 }
389
390 fn deserialize(&self, data: &[u8]) -> Result<Vec<u8>> {
391 if data.len() < 4 {
392 return Err(anyhow!("Invalid FlexBuffers data"));
393 }
394 Ok(data[4..].to_vec())
395 }
396
397 fn format_name(&self) -> &str {
398 "flexbuffers"
399 }
400
401 fn magic_bytes(&self) -> Option<&[u8]> {
402 Some(b"FLEX")
403 }
404
405 fn supports_zero_copy(&self) -> bool {
406 true
407 }
408}
409
410pub struct RonSerializer;
412
413impl CustomSerializer for RonSerializer {
414 fn serialize(&self, data: &[u8]) -> Result<Vec<u8>> {
415 let mut result = Vec::new();
417 result.extend_from_slice(b"RON\0");
418 result.extend_from_slice(data);
419 Ok(result)
420 }
421
422 fn deserialize(&self, data: &[u8]) -> Result<Vec<u8>> {
423 if data.len() < 4 {
424 return Err(anyhow!("Invalid RON data"));
425 }
426 Ok(data[4..].to_vec())
427 }
428
429 fn format_name(&self) -> &str {
430 "ron"
431 }
432
433 fn magic_bytes(&self) -> Option<&[u8]> {
434 Some(b"RON\0")
435 }
436}
437
438pub struct IonSerializer;
440
441impl CustomSerializer for IonSerializer {
442 fn serialize(&self, data: &[u8]) -> Result<Vec<u8>> {
443 let mut result = Vec::new();
445 result.extend_from_slice(b"ION\x01");
446 result.extend_from_slice(data);
447 Ok(result)
448 }
449
450 fn deserialize(&self, data: &[u8]) -> Result<Vec<u8>> {
451 if data.len() < 4 {
452 return Err(anyhow!("Invalid Ion data"));
453 }
454 Ok(data[4..].to_vec())
455 }
456
457 fn format_name(&self) -> &str {
458 "ion"
459 }
460
461 fn magic_bytes(&self) -> Option<&[u8]> {
462 Some(b"ION\x01")
463 }
464}
465
466pub struct SerializerBenchmarkSuite {
468 registry: Arc<SerializerRegistry>,
469 test_data: Vec<Vec<u8>>,
470}
471
472impl SerializerBenchmarkSuite {
473 pub fn new(registry: Arc<SerializerRegistry>) -> Self {
475 Self {
476 registry,
477 test_data: Self::generate_test_data(),
478 }
479 }
480
481 fn generate_test_data() -> Vec<Vec<u8>> {
483 use scirs2_core::random::{rng, RngExt};
484
485 let mut rand_gen = rng();
486 let sizes = [100, 1024, 10_240, 102_400]; sizes
489 .iter()
490 .map(|&size| (0..size).map(|_| rand_gen.random_range(0..=255)).collect())
491 .collect()
492 }
493
494 pub async fn benchmark(&self, format: &str, iterations: usize) -> Result<BenchmarkResults> {
496 let mut results = BenchmarkResults {
497 format: format.to_string(),
498 iterations,
499 serialization_times: Vec::new(),
500 deserialization_times: Vec::new(),
501 sizes: Vec::new(),
502 };
503
504 for test_data in &self.test_data {
505 let mut ser_times = Vec::new();
506 let mut deser_times = Vec::new();
507
508 for _ in 0..iterations {
509 let start = Instant::now();
511 let serialized = self.registry.serialize(format, test_data).await?;
512 ser_times.push(start.elapsed().as_secs_f64() * 1000.0);
513
514 let start = Instant::now();
516 self.registry.deserialize(format, &serialized).await?;
517 deser_times.push(start.elapsed().as_secs_f64() * 1000.0);
518 }
519
520 let avg_ser = ser_times.iter().sum::<f64>() / ser_times.len() as f64;
521 let avg_deser = deser_times.iter().sum::<f64>() / deser_times.len() as f64;
522
523 results.serialization_times.push(avg_ser);
524 results.deserialization_times.push(avg_deser);
525 results.sizes.push(test_data.len());
526 }
527
528 debug!("Benchmark completed for {}: {:?}", format, results);
529 Ok(results)
530 }
531
532 pub async fn compare(
534 &self,
535 formats: &[String],
536 iterations: usize,
537 ) -> Result<Vec<BenchmarkResults>> {
538 let mut all_results = Vec::new();
539
540 for format in formats {
541 let results = self.benchmark(format, iterations).await?;
542 all_results.push(results);
543 }
544
545 Ok(all_results)
546 }
547}
548
549#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct BenchmarkResults {
552 pub format: String,
554
555 pub iterations: usize,
557
558 pub serialization_times: Vec<f64>,
560
561 pub deserialization_times: Vec<f64>,
563
564 pub sizes: Vec<usize>,
566}
567
568impl BenchmarkResults {
569 pub fn avg_serialization_time(&self) -> f64 {
571 if self.serialization_times.is_empty() {
572 0.0
573 } else {
574 self.serialization_times.iter().sum::<f64>() / self.serialization_times.len() as f64
575 }
576 }
577
578 pub fn avg_deserialization_time(&self) -> f64 {
580 if self.deserialization_times.is_empty() {
581 0.0
582 } else {
583 self.deserialization_times.iter().sum::<f64>() / self.deserialization_times.len() as f64
584 }
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591
592 #[tokio::test]
593 async fn test_register_custom_serializer() {
594 let registry = SerializerRegistry::new();
595
596 registry
597 .register("bson", Box::new(BsonSerializer))
598 .await
599 .unwrap();
600
601 let formats = registry.list().await;
602 assert!(formats.contains(&"bson".to_string()));
603 }
604
605 #[tokio::test]
606 async fn test_serialize_deserialize() {
607 let registry = SerializerRegistry::new();
608
609 registry
610 .register("bson", Box::new(BsonSerializer))
611 .await
612 .unwrap();
613
614 let data = b"test data";
615 let serialized = registry.serialize("bson", data).await.unwrap();
616 let deserialized = registry.deserialize("bson", &serialized).await.unwrap();
617
618 assert_eq!(deserialized, data);
619 }
620
621 #[tokio::test]
622 async fn test_format_detection() {
623 let registry = SerializerRegistry::new();
624
625 registry
626 .register("bson", Box::new(BsonSerializer))
627 .await
628 .unwrap();
629 registry
630 .register("thrift", Box::new(ThriftSerializer))
631 .await
632 .unwrap();
633
634 let data = b"BSONtest data";
635 let format = registry.detect_format(data).await;
636
637 assert_eq!(format, Some("bson".to_string()));
638 }
639
640 #[tokio::test]
641 async fn test_benchmark() {
642 let registry = Arc::new(SerializerRegistry::new());
643
644 registry
645 .register("bson", Box::new(BsonSerializer))
646 .await
647 .unwrap();
648
649 let suite = SerializerBenchmarkSuite::new(registry.clone());
650 let results = suite.benchmark("bson", 10).await.unwrap();
651
652 assert_eq!(results.format, "bson");
653 assert_eq!(results.iterations, 10);
654 assert!(!results.serialization_times.is_empty());
655 }
656
657 #[tokio::test]
658 async fn test_multiple_formats() {
659 let registry = SerializerRegistry::new();
660
661 registry
662 .register("bson", Box::new(BsonSerializer))
663 .await
664 .unwrap();
665 registry
666 .register("thrift", Box::new(ThriftSerializer))
667 .await
668 .unwrap();
669 registry
670 .register("flexbuffers", Box::new(FlexBuffersSerializer))
671 .await
672 .unwrap();
673 registry
674 .register("ron", Box::new(RonSerializer))
675 .await
676 .unwrap();
677 registry
678 .register("ion", Box::new(IonSerializer))
679 .await
680 .unwrap();
681
682 let formats = registry.list().await;
683 assert_eq!(formats.len(), 5);
684 }
685
686 #[tokio::test]
687 async fn test_unregister() {
688 let registry = SerializerRegistry::new();
689
690 registry
691 .register("bson", Box::new(BsonSerializer))
692 .await
693 .unwrap();
694
695 assert!(registry.list().await.contains(&"bson".to_string()));
696
697 registry.unregister("bson").await.unwrap();
698
699 assert!(!registry.list().await.contains(&"bson".to_string()));
700 }
701}