pjson_rs/stream/
priority.rs

1//! Priority-based JSON streaming implementation
2//!
3//! This module implements the core Priority JSON Streaming protocol with:
4//! - Skeleton-first approach
5//! - JSON Path based patching  
6//! - Priority-based field ordering
7//! - Incremental reconstruction
8
9use crate::Result;
10use crate::domain::value_objects::Priority;
11use serde_json::{Map as JsonMap, Value as JsonValue};
12use std::collections::VecDeque;
13
14/// JSON Path for addressing specific nodes in the JSON structure
15#[derive(Debug, Clone, PartialEq, serde::Serialize)]
16pub struct JsonPath {
17    segments: Vec<PathSegment>,
18}
19
20#[derive(Debug, Clone, PartialEq, serde::Serialize)]
21pub enum PathSegment {
22    Root,
23    Key(String),
24    Index(usize),
25    Wildcard,
26}
27
28/// Patch operation for updating JSON structure
29#[derive(Debug, Clone, serde::Serialize)]
30pub struct JsonPatch {
31    pub path: JsonPath,
32    pub operation: PatchOperation,
33    pub priority: Priority,
34}
35
36#[derive(Debug, Clone, serde::Serialize)]
37pub enum PatchOperation {
38    Set { value: JsonValue },
39    Append { values: Vec<JsonValue> },
40    Replace { value: JsonValue },
41    Remove,
42}
43
44/// Streaming frame containing skeleton or patch data
45#[derive(Debug, Clone, serde::Serialize)]
46pub enum PriorityStreamFrame {
47    Skeleton {
48        data: JsonValue,
49        priority: Priority,
50        complete: bool,
51    },
52    Patch {
53        patches: Vec<JsonPatch>,
54        priority: Priority,
55    },
56    Complete {
57        checksum: Option<u64>,
58    },
59}
60
61/// Priority-based JSON streamer
62pub struct PriorityStreamer {
63    config: StreamerConfig,
64}
65
66#[derive(Debug, Clone)]
67pub struct StreamerConfig {
68    pub detect_semantics: bool,
69    pub max_patch_size: usize,
70    pub priority_threshold: Priority,
71}
72
73impl Default for StreamerConfig {
74    fn default() -> Self {
75        Self {
76            detect_semantics: true,
77            max_patch_size: 100,
78            priority_threshold: Priority::LOW,
79        }
80    }
81}
82
83impl PriorityStreamer {
84    /// Create new priority streamer
85    pub fn new() -> Self {
86        Self::with_config(StreamerConfig::default())
87    }
88
89    /// Create streamer with custom configuration
90    pub fn with_config(config: StreamerConfig) -> Self {
91        Self { config }
92    }
93
94    /// Analyze JSON and create streaming plan
95    pub fn analyze(&self, json: &JsonValue) -> Result<StreamingPlan> {
96        let mut plan = StreamingPlan::new();
97
98        // Generate skeleton
99        let skeleton = self.generate_skeleton(json);
100        plan.frames.push_back(PriorityStreamFrame::Skeleton {
101            data: skeleton,
102            priority: Priority::CRITICAL,
103            complete: false,
104        });
105
106        // Extract patches by priority
107        let mut patches = Vec::new();
108        self.extract_patches(json, &JsonPath::root(), &mut patches)?;
109
110        // Group patches by priority
111        patches.sort_by(|a, b| b.priority.cmp(&a.priority));
112
113        let mut current_priority = Priority::CRITICAL;
114        let mut current_batch = Vec::new();
115
116        for patch in patches {
117            if patch.priority != current_priority && !current_batch.is_empty() {
118                plan.frames.push_back(PriorityStreamFrame::Patch {
119                    patches: current_batch,
120                    priority: current_priority,
121                });
122                current_batch = Vec::new();
123            }
124            current_priority = patch.priority;
125            current_batch.push(patch);
126
127            if current_batch.len() >= self.config.max_patch_size {
128                plan.frames.push_back(PriorityStreamFrame::Patch {
129                    patches: current_batch,
130                    priority: current_priority,
131                });
132                current_batch = Vec::new();
133            }
134        }
135
136        // Add remaining patches
137        if !current_batch.is_empty() {
138            plan.frames.push_back(PriorityStreamFrame::Patch {
139                patches: current_batch,
140                priority: current_priority,
141            });
142        }
143
144        // Add completion frame
145        plan.frames
146            .push_back(PriorityStreamFrame::Complete { checksum: None });
147
148        Ok(plan)
149    }
150
151    /// Generate skeleton structure with null/empty values
152    fn generate_skeleton(&self, json: &JsonValue) -> JsonValue {
153        match json {
154            JsonValue::Object(map) => {
155                let mut skeleton = JsonMap::new();
156                for (key, value) in map {
157                    skeleton.insert(
158                        key.clone(),
159                        match value {
160                            JsonValue::Array(_) => JsonValue::Array(vec![]),
161                            JsonValue::Object(_) => self.generate_skeleton(value),
162                            JsonValue::String(_) => JsonValue::Null,
163                            JsonValue::Number(_) => JsonValue::Number(0.into()),
164                            JsonValue::Bool(_) => JsonValue::Bool(false),
165                            JsonValue::Null => JsonValue::Null,
166                        },
167                    );
168                }
169                JsonValue::Object(skeleton)
170            }
171            JsonValue::Array(_) => JsonValue::Array(vec![]),
172            _ => JsonValue::Null,
173        }
174    }
175
176    /// Extract patches from JSON structure
177    fn extract_patches(
178        &self,
179        json: &JsonValue,
180        current_path: &JsonPath,
181        patches: &mut Vec<JsonPatch>,
182    ) -> Result<()> {
183        match json {
184            JsonValue::Object(map) => {
185                for (key, value) in map {
186                    let field_path = current_path.append_key(key);
187                    let priority = self.calculate_field_priority(&field_path, key, value);
188
189                    // Create patch for this field
190                    patches.push(JsonPatch {
191                        path: field_path.clone(),
192                        operation: PatchOperation::Set {
193                            value: value.clone(),
194                        },
195                        priority,
196                    });
197
198                    // Recursively process nested structures
199                    self.extract_patches(value, &field_path, patches)?;
200                }
201            }
202            JsonValue::Array(arr) => {
203                // For arrays, create append operations in chunks
204                if arr.len() > 10 {
205                    // Chunk large arrays
206                    for chunk in arr.chunks(self.config.max_patch_size) {
207                        patches.push(JsonPatch {
208                            path: current_path.clone(),
209                            operation: PatchOperation::Append {
210                                values: chunk.to_vec(),
211                            },
212                            priority: self.calculate_array_priority(current_path, chunk),
213                        });
214                    }
215                } else if !arr.is_empty() {
216                    patches.push(JsonPatch {
217                        path: current_path.clone(),
218                        operation: PatchOperation::Append {
219                            values: arr.clone(),
220                        },
221                        priority: self.calculate_array_priority(current_path, arr),
222                    });
223                }
224            }
225            _ => {
226                // Primitive values handled by parent object/array
227            }
228        }
229
230        Ok(())
231    }
232
233    /// Calculate priority for a field based on path and content
234    fn calculate_field_priority(&self, _path: &JsonPath, key: &str, value: &JsonValue) -> Priority {
235        // Critical fields
236        if matches!(key, "id" | "uuid" | "status" | "type" | "kind") {
237            return Priority::CRITICAL;
238        }
239
240        // High priority fields
241        if matches!(key, "name" | "title" | "label" | "email" | "username") {
242            return Priority::HIGH;
243        }
244
245        // Low priority patterns
246        if key.contains("analytics") || key.contains("stats") || key.contains("meta") {
247            return Priority::LOW;
248        }
249
250        if matches!(key, "reviews" | "comments" | "logs" | "history") {
251            return Priority::BACKGROUND;
252        }
253
254        // Content-based priority
255        match value {
256            JsonValue::Array(arr) if arr.len() > 100 => Priority::BACKGROUND,
257            JsonValue::Object(obj) if obj.contains_key("timestamp") => Priority::MEDIUM,
258            JsonValue::String(s) if s.len() > 1000 => Priority::LOW,
259            _ => Priority::MEDIUM,
260        }
261    }
262
263    /// Calculate priority for array elements
264    fn calculate_array_priority(&self, path: &JsonPath, elements: &[JsonValue]) -> Priority {
265        // Large arrays get background priority
266        if elements.len() > 50 {
267            return Priority::BACKGROUND;
268        }
269
270        // Arrays in certain paths get different priorities
271        if let Some(last_key) = path.last_key() {
272            if matches!(last_key.as_str(), "reviews" | "comments" | "logs") {
273                return Priority::BACKGROUND;
274            }
275            if matches!(last_key.as_str(), "items" | "data" | "results") {
276                return Priority::MEDIUM;
277            }
278        }
279
280        Priority::MEDIUM
281    }
282}
283
284/// Plan for streaming JSON with priority ordering
285#[derive(Debug)]
286pub struct StreamingPlan {
287    pub frames: VecDeque<PriorityStreamFrame>,
288}
289
290impl Default for StreamingPlan {
291    fn default() -> Self {
292        Self::new()
293    }
294}
295
296impl StreamingPlan {
297    pub fn new() -> Self {
298        Self {
299            frames: VecDeque::new(),
300        }
301    }
302
303    /// Get next frame to send
304    pub fn next_frame(&mut self) -> Option<PriorityStreamFrame> {
305        self.frames.pop_front()
306    }
307
308    /// Check if streaming is complete
309    pub fn is_complete(&self) -> bool {
310        self.frames.is_empty()
311    }
312
313    /// Get remaining frame count
314    pub fn remaining_frames(&self) -> usize {
315        self.frames.len()
316    }
317
318    /// Get iterator over frames
319    pub fn frames(&self) -> impl Iterator<Item = &PriorityStreamFrame> {
320        self.frames.iter()
321    }
322}
323
324impl JsonPath {
325    /// Create root path
326    pub fn root() -> Self {
327        let segments = vec![PathSegment::Root];
328        Self { segments }
329    }
330
331    /// Append key segment
332    pub fn append_key(&self, key: &str) -> Self {
333        let mut segments = self.segments.clone();
334        segments.push(PathSegment::Key(key.to_string()));
335        Self { segments }
336    }
337
338    /// Append index segment
339    pub fn append_index(&self, index: usize) -> Self {
340        let mut segments = self.segments.clone();
341        segments.push(PathSegment::Index(index));
342        Self { segments }
343    }
344
345    /// Get the last key in the path
346    pub fn last_key(&self) -> Option<String> {
347        self.segments.iter().rev().find_map(|segment| {
348            if let PathSegment::Key(key) = segment {
349                Some(key.clone())
350            } else {
351                None
352            }
353        })
354    }
355
356    /// Get segments (read-only)
357    pub fn segments(&self) -> &[PathSegment] {
358        &self.segments
359    }
360
361    /// Get number of segments
362    pub fn len(&self) -> usize {
363        self.segments.len()
364    }
365
366    /// Check if path is empty
367    pub fn is_empty(&self) -> bool {
368        self.segments.is_empty()
369    }
370
371    /// Create JsonPath from segments (for testing)
372    pub fn from_segments(segments: Vec<PathSegment>) -> Self {
373        Self { segments }
374    }
375
376    /// Convert to JSON Pointer string format
377    pub fn to_json_pointer(&self) -> String {
378        let mut pointer = String::new();
379        for segment in &self.segments {
380            match segment {
381                PathSegment::Root => {}
382                PathSegment::Key(key) => {
383                    pointer.push('/');
384                    pointer.push_str(key);
385                }
386                PathSegment::Index(idx) => {
387                    pointer.push('/');
388                    pointer.push_str(&idx.to_string());
389                }
390                PathSegment::Wildcard => {
391                    pointer.push_str("/*");
392                }
393            }
394        }
395        if pointer.is_empty() {
396            "/".to_string()
397        } else {
398            pointer
399        }
400    }
401}
402
403impl Default for PriorityStreamer {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use serde_json::json;
413
414    #[test]
415    fn test_json_path_creation() {
416        let path = JsonPath::root();
417        assert_eq!(path.to_json_pointer(), "/");
418
419        let path = path.append_key("users").append_index(0).append_key("name");
420        assert_eq!(path.to_json_pointer(), "/users/0/name");
421    }
422
423    #[test]
424    fn test_priority_comparison() {
425        assert!(Priority::CRITICAL > Priority::HIGH);
426        assert!(Priority::HIGH > Priority::MEDIUM);
427        assert!(Priority::MEDIUM > Priority::LOW);
428        assert!(Priority::LOW > Priority::BACKGROUND);
429    }
430
431    #[test]
432    fn test_skeleton_generation() {
433        let streamer = PriorityStreamer::new();
434        let json = json!({
435            "name": "John",
436            "age": 30,
437            "active": true,
438            "posts": ["post1", "post2"]
439        });
440
441        let skeleton = streamer.generate_skeleton(&json);
442        let expected = json!({
443            "name": null,
444            "age": 0,
445            "active": false,
446            "posts": []
447        });
448
449        assert_eq!(skeleton, expected);
450    }
451
452    #[test]
453    fn test_field_priority_calculation() {
454        let streamer = PriorityStreamer::new();
455        let path = JsonPath::root();
456
457        assert_eq!(
458            streamer.calculate_field_priority(&path, "id", &json!(123)),
459            Priority::CRITICAL
460        );
461
462        assert_eq!(
463            streamer.calculate_field_priority(&path, "name", &json!("John")),
464            Priority::HIGH
465        );
466
467        assert_eq!(
468            streamer.calculate_field_priority(&path, "reviews", &json!([])),
469            Priority::BACKGROUND
470        );
471    }
472
473    #[test]
474    fn test_streaming_plan_creation() {
475        let streamer = PriorityStreamer::new();
476        let json = json!({
477            "id": 1,
478            "name": "John",
479            "bio": "Software developer",
480            "reviews": ["Good", "Excellent"]
481        });
482
483        let plan = streamer.analyze(&json).unwrap();
484        assert!(!plan.is_complete());
485        assert!(plan.remaining_frames() > 0);
486    }
487}