1use crate::domain::{DomainResult, DomainError};
7use serde_json::Value as JsonValue;
8use std::collections::HashMap;
9
10#[derive(Debug, Clone, PartialEq)]
12pub enum CompressionStrategy {
13 None,
15 Dictionary { dictionary: HashMap<String, u16> },
17 Delta { base_values: HashMap<String, f64> },
19 RunLength,
21 Hybrid {
23 string_dict: HashMap<String, u16>,
24 numeric_deltas: HashMap<String, f64>,
25 },
26}
27
28#[derive(Debug, Clone)]
30pub struct SchemaAnalyzer {
31 patterns: HashMap<String, PatternInfo>,
33 numeric_fields: HashMap<String, NumericStats>,
35 string_repetitions: HashMap<String, u32>,
37}
38
39#[derive(Debug, Clone)]
40struct PatternInfo {
41 frequency: u32,
42 total_size: usize,
43 compression_potential: f32,
44}
45
46#[derive(Debug, Clone)]
47struct NumericStats {
48 values: Vec<f64>,
49 delta_potential: f32,
50 base_value: f64,
51}
52
53impl SchemaAnalyzer {
54 pub fn new() -> Self {
56 Self {
57 patterns: HashMap::new(),
58 numeric_fields: HashMap::new(),
59 string_repetitions: HashMap::new(),
60 }
61 }
62
63 pub fn analyze(&mut self, data: &JsonValue) -> DomainResult<CompressionStrategy> {
65 self.patterns.clear();
67 self.numeric_fields.clear();
68 self.string_repetitions.clear();
69
70 self.analyze_recursive(data, "")?;
72
73 self.determine_strategy()
75 }
76
77 fn analyze_recursive(&mut self, value: &JsonValue, path: &str) -> DomainResult<()> {
79 match value {
80 JsonValue::Object(obj) => {
81 for (key, val) in obj {
82 let field_path = if path.is_empty() {
83 key.clone()
84 } else {
85 format!("{path}.{key}")
86 };
87 self.analyze_recursive(val, &field_path)?;
88 }
89 }
90 JsonValue::Array(arr) => {
91 if arr.len() > 1 {
93 self.analyze_array_patterns(arr, path)?;
94 }
95 for (idx, item) in arr.iter().enumerate() {
96 let item_path = format!("{path}[{idx}]");
97 self.analyze_recursive(item, &item_path)?;
98 }
99 }
100 JsonValue::String(s) => {
101 self.analyze_string_pattern(s, path);
102 }
103 JsonValue::Number(n) => {
104 if let Some(f) = n.as_f64() {
105 self.analyze_numeric_pattern(f, path);
106 }
107 }
108 _ => {}
109 }
110 Ok(())
111 }
112
113 fn analyze_array_patterns(&mut self, arr: &[JsonValue], path: &str) -> DomainResult<()> {
115 if let Some(JsonValue::Object(first)) = arr.first() {
117 let structure_key = format!("array_structure:{path}");
118 let field_names: Vec<&str> = first.keys().map(|k| k.as_str()).collect();
119 let pattern = field_names.join(",");
120
121 let matching_count = arr.iter()
123 .filter_map(|v| v.as_object())
124 .filter(|obj| {
125 let obj_fields: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
126 obj_fields.join(",") == pattern
127 })
128 .count();
129
130 if matching_count > 1 {
131 let info = PatternInfo {
132 frequency: matching_count as u32,
133 total_size: pattern.len() * matching_count,
134 compression_potential: (matching_count as f32 - 1.0) / matching_count as f32,
135 };
136 self.patterns.insert(structure_key, info);
137 }
138 }
139
140 if arr.len() > 2 {
142 let mut value_counts = HashMap::new();
143 for value in arr {
144 let key = match value {
145 JsonValue::String(s) => format!("string:{s}"),
146 JsonValue::Number(n) => format!("number:{n}"),
147 JsonValue::Bool(b) => format!("bool:{b}"),
148 _ => continue,
149 };
150 *value_counts.entry(key).or_insert(0) += 1;
151 }
152
153 for (value_key, count) in value_counts {
154 if count > 1 {
155 let info = PatternInfo {
156 frequency: count,
157 total_size: value_key.len() * count as usize,
158 compression_potential: (count as f32 - 1.0) / count as f32,
159 };
160 self.patterns.insert(format!("array_value:{path}:{value_key}"), info);
161 }
162 }
163 }
164
165 Ok(())
166 }
167
168 fn analyze_string_pattern(&mut self, s: &str, _path: &str) {
170 *self.string_repetitions.entry(s.to_string()).or_insert(0) += 1;
172
173 if s.len() > 10 {
175 if s.starts_with("http://") || s.starts_with("https://") {
177 let prefix = if s.starts_with("https://") { "https://" } else { "http://" };
178 self.patterns.entry(format!("url_prefix:{prefix}")).or_insert(PatternInfo {
179 frequency: 0,
180 total_size: 0,
181 compression_potential: 0.0,
182 }).frequency += 1;
183 }
184
185 if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 {
187 self.patterns.entry("uuid_pattern".to_string()).or_insert(PatternInfo {
188 frequency: 0,
189 total_size: 36,
190 compression_potential: 0.3,
191 }).frequency += 1;
192 }
193 }
194 }
195
196 fn analyze_numeric_pattern(&mut self, value: f64, path: &str) {
198 self.numeric_fields
199 .entry(path.to_string())
200 .or_insert_with(|| NumericStats {
201 values: Vec::new(),
202 delta_potential: 0.0,
203 base_value: value,
204 })
205 .values
206 .push(value);
207 }
208
209 fn determine_strategy(&mut self) -> DomainResult<CompressionStrategy> {
211 let mut string_dict_score = 0.0;
213 let mut delta_score = 0.0;
214
215 let mut string_dict = HashMap::new();
217 let mut dict_index = 0u16;
218
219 for (string, count) in &self.string_repetitions {
220 if *count > 1 && string.len() > 3 {
221 string_dict_score += (*count as f32 - 1.0) * string.len() as f32;
222 string_dict.insert(string.clone(), dict_index);
223 dict_index += 1;
224 }
225 }
226
227 let mut numeric_deltas = HashMap::new();
229
230 for (path, stats) in &mut self.numeric_fields {
231 if stats.values.len() > 2 {
232 stats.values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
234
235 let deltas: Vec<f64> = stats.values.windows(2)
236 .map(|window| window[1] - window[0])
237 .collect();
238
239 if !deltas.is_empty() {
240 let avg_delta = deltas.iter().sum::<f64>() / deltas.len() as f64;
241 let delta_variance = deltas.iter()
242 .map(|d| (d - avg_delta).powi(2))
243 .sum::<f64>() / deltas.len() as f64;
244
245 stats.delta_potential = 1.0 / (1.0 + delta_variance as f32);
247
248 if stats.delta_potential > 0.3 {
249 delta_score += stats.delta_potential * stats.values.len() as f32;
250 numeric_deltas.insert(path.clone(), stats.base_value);
251 }
252 }
253 }
254 }
255
256 match (string_dict_score > 50.0, delta_score > 30.0) {
258 (true, true) => Ok(CompressionStrategy::Hybrid {
259 string_dict,
260 numeric_deltas,
261 }),
262 (true, false) => Ok(CompressionStrategy::Dictionary {
263 dictionary: string_dict
264 }),
265 (false, true) => Ok(CompressionStrategy::Delta {
266 base_values: numeric_deltas
267 }),
268 (false, false) => {
269 let run_length_score = self.patterns.values()
271 .filter(|p| p.compression_potential > 0.4)
272 .map(|p| p.frequency as f32 * p.compression_potential)
273 .sum::<f32>();
274
275 if run_length_score > 20.0 {
276 Ok(CompressionStrategy::RunLength)
277 } else {
278 Ok(CompressionStrategy::None)
279 }
280 }
281 }
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct SchemaCompressor {
288 strategy: CompressionStrategy,
289 analyzer: SchemaAnalyzer,
290}
291
292impl SchemaCompressor {
293 pub fn new() -> Self {
295 Self {
296 strategy: CompressionStrategy::None,
297 analyzer: SchemaAnalyzer::new(),
298 }
299 }
300
301 pub fn with_strategy(strategy: CompressionStrategy) -> Self {
303 Self {
304 strategy,
305 analyzer: SchemaAnalyzer::new(),
306 }
307 }
308
309 pub fn analyze_and_optimize(&mut self, data: &JsonValue) -> DomainResult<&CompressionStrategy> {
311 self.strategy = self.analyzer.analyze(data)?;
312 Ok(&self.strategy)
313 }
314
315 pub fn compress(&self, data: &JsonValue) -> DomainResult<CompressedData> {
317 match &self.strategy {
318 CompressionStrategy::None => Ok(CompressedData {
319 strategy: self.strategy.clone(),
320 compressed_size: serde_json::to_string(data)
321 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
322 .len(),
323 data: data.clone(),
324 compression_metadata: HashMap::new(),
325 }),
326
327 CompressionStrategy::Dictionary { dictionary } => {
328 self.compress_with_dictionary(data, dictionary)
329 }
330
331 CompressionStrategy::Delta { base_values } => {
332 self.compress_with_delta(data, base_values)
333 }
334
335 CompressionStrategy::RunLength => {
336 self.compress_with_run_length(data)
337 }
338
339 CompressionStrategy::Hybrid { string_dict, numeric_deltas } => {
340 self.compress_hybrid(data, string_dict, numeric_deltas)
341 }
342 }
343 }
344
345 fn compress_with_dictionary(&self, data: &JsonValue, dictionary: &HashMap<String, u16>) -> DomainResult<CompressedData> {
347 let mut metadata = HashMap::new();
348
349 for (string, index) in dictionary {
351 metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
352 }
353
354 let compressed = self.replace_strings_with_indices(data, dictionary)?;
356 let compressed_size = serde_json::to_string(&compressed)
357 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
358 .len();
359
360 Ok(CompressedData {
361 strategy: self.strategy.clone(),
362 compressed_size,
363 data: compressed,
364 compression_metadata: metadata,
365 })
366 }
367
368 fn compress_with_delta(&self, data: &JsonValue, base_values: &HashMap<String, f64>) -> DomainResult<CompressedData> {
370 let mut metadata = HashMap::new();
371
372 for (path, base) in base_values {
374 metadata.insert(format!("base_{path}"), JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()));
375 }
376
377 let compressed = self.apply_delta_compression(data, base_values)?;
379 let compressed_size = serde_json::to_string(&compressed)
380 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
381 .len();
382
383 Ok(CompressedData {
384 strategy: self.strategy.clone(),
385 compressed_size,
386 data: compressed,
387 compression_metadata: metadata,
388 })
389 }
390
391 fn compress_with_run_length(&self, data: &JsonValue) -> DomainResult<CompressedData> {
393 let compressed_size = serde_json::to_string(data)
395 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
396 .len();
397
398 Ok(CompressedData {
399 strategy: self.strategy.clone(),
400 compressed_size,
401 data: data.clone(),
402 compression_metadata: HashMap::new(),
403 })
404 }
405
406 fn compress_hybrid(&self, data: &JsonValue, string_dict: &HashMap<String, u16>, numeric_deltas: &HashMap<String, f64>) -> DomainResult<CompressedData> {
408 let mut metadata = HashMap::new();
409
410 for (string, index) in string_dict {
412 metadata.insert(format!("dict_{index}"), JsonValue::String(string.clone()));
413 }
414
415 for (path, base) in numeric_deltas {
417 metadata.insert(format!("base_{path}"), JsonValue::Number(serde_json::Number::from_f64(*base).unwrap()));
418 }
419
420 let dict_compressed = self.replace_strings_with_indices(data, string_dict)?;
422 let final_compressed = self.apply_delta_compression(&dict_compressed, numeric_deltas)?;
423
424 let compressed_size = serde_json::to_string(&final_compressed)
425 .map_err(|e| DomainError::CompressionError(format!("JSON serialization failed: {e}")))?
426 .len();
427
428 Ok(CompressedData {
429 strategy: self.strategy.clone(),
430 compressed_size,
431 data: final_compressed,
432 compression_metadata: metadata,
433 })
434 }
435
436 #[allow(clippy::only_used_in_recursion)]
438 fn replace_strings_with_indices(&self, data: &JsonValue, dictionary: &HashMap<String, u16>) -> DomainResult<JsonValue> {
439 match data {
440 JsonValue::Object(obj) => {
441 let mut compressed_obj = serde_json::Map::new();
442 for (key, value) in obj {
443 compressed_obj.insert(
444 key.clone(),
445 self.replace_strings_with_indices(value, dictionary)?
446 );
447 }
448 Ok(JsonValue::Object(compressed_obj))
449 }
450 JsonValue::Array(arr) => {
451 let compressed_arr: Result<Vec<_>, _> = arr.iter()
452 .map(|item| self.replace_strings_with_indices(item, dictionary))
453 .collect();
454 Ok(JsonValue::Array(compressed_arr?))
455 }
456 JsonValue::String(s) => {
457 if let Some(&index) = dictionary.get(s) {
458 Ok(JsonValue::Number(serde_json::Number::from(index)))
459 } else {
460 Ok(data.clone())
461 }
462 }
463 _ => Ok(data.clone()),
464 }
465 }
466
467 fn apply_delta_compression(&self, data: &JsonValue, _base_values: &HashMap<String, f64>) -> DomainResult<JsonValue> {
469 Ok(data.clone())
472 }
473}
474
475#[derive(Debug, Clone)]
477pub struct CompressedData {
478 pub strategy: CompressionStrategy,
479 pub compressed_size: usize,
480 pub data: JsonValue,
481 pub compression_metadata: HashMap<String, JsonValue>,
482}
483
484impl CompressedData {
485 pub fn compression_ratio(&self, original_size: usize) -> f32 {
487 if original_size == 0 {
488 return 1.0;
489 }
490 self.compressed_size as f32 / original_size as f32
491 }
492
493 pub fn compression_savings(&self, original_size: usize) -> isize {
495 original_size as isize - self.compressed_size as isize
496 }
497}
498
499impl Default for SchemaAnalyzer {
500 fn default() -> Self {
501 Self::new()
502 }
503}
504
505impl Default for SchemaCompressor {
506 fn default() -> Self {
507 Self::new()
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use serde_json::json;
515
516 #[test]
517 fn test_schema_analyzer_dictionary_potential() {
518 let mut analyzer = SchemaAnalyzer::new();
519
520 let data = json!({
521 "users": [
522 {"name": "John Doe", "role": "admin", "status": "active", "department": "engineering"},
523 {"name": "Jane Smith", "role": "admin", "status": "active", "department": "engineering"},
524 {"name": "Bob Wilson", "role": "admin", "status": "active", "department": "engineering"},
525 {"name": "Alice Brown", "role": "admin", "status": "active", "department": "engineering"},
526 {"name": "Charlie Davis", "role": "admin", "status": "active", "department": "engineering"},
527 {"name": "Diana Evans", "role": "admin", "status": "active", "department": "engineering"},
528 {"name": "Frank Miller", "role": "admin", "status": "active", "department": "engineering"},
529 {"name": "Grace Wilson", "role": "admin", "status": "active", "department": "engineering"}
530 ]
531 });
532
533 let strategy = analyzer.analyze(&data).unwrap();
534
535 match strategy {
537 CompressionStrategy::Dictionary { .. } | CompressionStrategy::Hybrid { .. } => {
538 }
540 _ => panic!("Expected dictionary-based compression strategy"),
541 }
542 }
543
544 #[test]
545 fn test_schema_compressor_basic() {
546 let compressor = SchemaCompressor::new();
547
548 let data = json!({
549 "message": "hello world",
550 "count": 42
551 });
552
553 let original_size = serde_json::to_string(&data).unwrap().len();
554 let compressed = compressor.compress(&data).unwrap();
555
556 assert!(compressed.compressed_size > 0);
557 assert!(compressed.compression_ratio(original_size) <= 1.0);
558 }
559
560 #[test]
561 fn test_dictionary_compression() {
562 let mut dictionary = HashMap::new();
563 dictionary.insert("active".to_string(), 0);
564 dictionary.insert("admin".to_string(), 1);
565
566 let compressor = SchemaCompressor::with_strategy(
567 CompressionStrategy::Dictionary { dictionary }
568 );
569
570 let data = json!({
571 "status": "active",
572 "role": "admin",
573 "description": "active admin user"
574 });
575
576 let result = compressor.compress(&data).unwrap();
577
578 assert!(result.compression_metadata.contains_key("dict_0"));
580 assert!(result.compression_metadata.contains_key("dict_1"));
581 }
582
583 #[test]
584 fn test_compression_strategy_selection() {
585 let mut analyzer = SchemaAnalyzer::new();
586
587 let simple_data = json!({
589 "unique_field_1": "unique_value_1",
590 "unique_field_2": "unique_value_2"
591 });
592
593 let strategy = analyzer.analyze(&simple_data).unwrap();
594 assert_eq!(strategy, CompressionStrategy::None);
595 }
596
597 #[test]
598 fn test_numeric_delta_analysis() {
599 let mut analyzer = SchemaAnalyzer::new();
600
601 let data = json!({
602 "measurements": [
603 {"time": 100, "value": 10.0},
604 {"time": 101, "value": 10.5},
605 {"time": 102, "value": 11.0},
606 {"time": 103, "value": 11.5}
607 ]
608 });
609
610 let _strategy = analyzer.analyze(&data).unwrap();
611
612 assert!(!analyzer.numeric_fields.is_empty());
614 }
615}