Skip to main content

pjson_rs/stream/
priority.rs

1//! Priority-based JSON streaming implementation
2//!
3//! This module implements the core Priority JSON Streaming protocol with:
4//! - Skeleton-first approach
5//! - JSON Path based patching
6//! - Priority-based field ordering
7//! - Incremental reconstruction
8
9use crate::Result;
10use crate::domain::value_objects::Priority;
11use serde_json::{Map as JsonMap, Value as JsonValue};
12use std::collections::VecDeque;
13
14/// Custom serde for Priority in stream module
15mod serde_priority {
16    use crate::domain::value_objects::Priority;
17    use serde::{Serialize, Serializer};
18
19    pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20    where
21        S: Serializer,
22    {
23        priority.value().serialize(serializer)
24    }
25}
26
27/// JSON Path for addressing specific nodes in the JSON structure
28#[derive(Debug, Clone, PartialEq, serde::Serialize)]
29pub struct JsonPath {
30    segments: Vec<PathSegment>,
31}
32
33#[derive(Debug, Clone, PartialEq, serde::Serialize)]
34pub enum PathSegment {
35    Root,
36    Key(String),
37    Index(usize),
38    Wildcard,
39}
40
41/// Patch operation for updating JSON structure
42#[derive(Debug, Clone, serde::Serialize)]
43pub struct JsonPatch {
44    pub path: JsonPath,
45    pub operation: PatchOperation,
46    #[serde(with = "serde_priority")]
47    pub priority: Priority,
48}
49
50#[derive(Debug, Clone, serde::Serialize)]
51pub enum PatchOperation {
52    Set { value: JsonValue },
53    Append { values: Vec<JsonValue> },
54    Replace { value: JsonValue },
55    Remove,
56}
57
58/// Streaming frame containing skeleton or patch data
59#[derive(Debug, Clone, serde::Serialize)]
60pub enum PriorityStreamFrame {
61    Skeleton {
62        data: JsonValue,
63        #[serde(with = "serde_priority")]
64        priority: Priority,
65        complete: bool,
66    },
67    Patch {
68        patches: Vec<JsonPatch>,
69        #[serde(with = "serde_priority")]
70        priority: Priority,
71    },
72    Complete {
73        checksum: Option<u64>,
74    },
75}
76
77/// Priority-based JSON streamer
78pub struct PriorityStreamer {
79    config: StreamerConfig,
80}
81
82#[derive(Debug, Clone)]
83pub struct StreamerConfig {
84    pub detect_semantics: bool,
85    pub max_patch_size: usize,
86    pub priority_threshold: Priority,
87}
88
89impl Default for StreamerConfig {
90    fn default() -> Self {
91        Self {
92            detect_semantics: true,
93            max_patch_size: 100,
94            priority_threshold: Priority::LOW,
95        }
96    }
97}
98
99impl PriorityStreamer {
100    /// Create new priority streamer
101    pub fn new() -> Self {
102        Self::with_config(StreamerConfig::default())
103    }
104
105    /// Create streamer with custom configuration
106    pub fn with_config(config: StreamerConfig) -> Self {
107        Self { config }
108    }
109
110    /// Analyze JSON and create streaming plan
111    pub fn analyze(&self, json: &JsonValue) -> Result<StreamingPlan> {
112        let mut plan = StreamingPlan::new();
113
114        // Generate skeleton
115        let skeleton = self.generate_skeleton(json);
116        plan.frames.push_back(PriorityStreamFrame::Skeleton {
117            data: skeleton,
118            priority: Priority::CRITICAL,
119            complete: false,
120        });
121
122        // Extract patches by priority
123        let mut patches = Vec::new();
124        self.extract_patches(json, &JsonPath::root(), &mut patches)?;
125
126        // Group patches by priority
127        patches.sort_by_key(|patch| std::cmp::Reverse(patch.priority));
128
129        let mut current_priority = Priority::CRITICAL;
130        let mut current_batch = Vec::new();
131
132        for patch in patches {
133            if patch.priority != current_priority && !current_batch.is_empty() {
134                plan.frames.push_back(PriorityStreamFrame::Patch {
135                    patches: current_batch,
136                    priority: current_priority,
137                });
138                current_batch = Vec::new();
139            }
140            current_priority = patch.priority;
141            current_batch.push(patch);
142
143            if current_batch.len() >= self.config.max_patch_size {
144                plan.frames.push_back(PriorityStreamFrame::Patch {
145                    patches: current_batch,
146                    priority: current_priority,
147                });
148                current_batch = Vec::new();
149            }
150        }
151
152        // Add remaining patches
153        if !current_batch.is_empty() {
154            plan.frames.push_back(PriorityStreamFrame::Patch {
155                patches: current_batch,
156                priority: current_priority,
157            });
158        }
159
160        // Add completion frame
161        plan.frames
162            .push_back(PriorityStreamFrame::Complete { checksum: None });
163
164        Ok(plan)
165    }
166
167    /// Generate skeleton structure with null/empty values
168    fn generate_skeleton(&self, json: &JsonValue) -> JsonValue {
169        match json {
170            JsonValue::Object(map) => {
171                let mut skeleton = JsonMap::new();
172                for (key, value) in map {
173                    skeleton.insert(
174                        key.clone(),
175                        match value {
176                            JsonValue::Array(_) => JsonValue::Array(vec![]),
177                            JsonValue::Object(_) => self.generate_skeleton(value),
178                            JsonValue::String(_) => JsonValue::Null,
179                            JsonValue::Number(_) => JsonValue::Number(0.into()),
180                            JsonValue::Bool(_) => JsonValue::Bool(false),
181                            JsonValue::Null => JsonValue::Null,
182                        },
183                    );
184                }
185                JsonValue::Object(skeleton)
186            }
187            JsonValue::Array(_) => JsonValue::Array(vec![]),
188            _ => JsonValue::Null,
189        }
190    }
191
192    /// Extract patches from JSON structure
193    fn extract_patches(
194        &self,
195        json: &JsonValue,
196        current_path: &JsonPath,
197        patches: &mut Vec<JsonPatch>,
198    ) -> Result<()> {
199        match json {
200            JsonValue::Object(map) => {
201                for (key, value) in map {
202                    let field_path = current_path.append_key(key);
203                    let priority = self.calculate_field_priority(&field_path, key, value);
204
205                    // Create patch for this field
206                    patches.push(JsonPatch {
207                        path: field_path.clone(),
208                        operation: PatchOperation::Set {
209                            value: value.clone(),
210                        },
211                        priority,
212                    });
213
214                    // Recursively process nested structures
215                    self.extract_patches(value, &field_path, patches)?;
216                }
217            }
218            JsonValue::Array(arr) => {
219                // For arrays, create append operations in chunks
220                if arr.len() > 10 {
221                    // Chunk large arrays
222                    for chunk in arr.chunks(self.config.max_patch_size) {
223                        patches.push(JsonPatch {
224                            path: current_path.clone(),
225                            operation: PatchOperation::Append {
226                                values: chunk.to_vec(),
227                            },
228                            priority: self.calculate_array_priority(current_path, chunk),
229                        });
230                    }
231                } else if !arr.is_empty() {
232                    patches.push(JsonPatch {
233                        path: current_path.clone(),
234                        operation: PatchOperation::Append {
235                            values: arr.clone(),
236                        },
237                        priority: self.calculate_array_priority(current_path, arr),
238                    });
239                }
240            }
241            _ => {
242                // Primitive values handled by parent object/array
243            }
244        }
245
246        Ok(())
247    }
248
249    /// Calculate priority for a field based on path and content
250    fn calculate_field_priority(&self, _path: &JsonPath, key: &str, value: &JsonValue) -> Priority {
251        // Critical fields
252        if matches!(key, "id" | "uuid" | "status" | "type" | "kind") {
253            return Priority::CRITICAL;
254        }
255
256        // High priority fields
257        if matches!(key, "name" | "title" | "label" | "email" | "username") {
258            return Priority::HIGH;
259        }
260
261        // Low priority patterns
262        if key.contains("analytics") || key.contains("stats") || key.contains("meta") {
263            return Priority::LOW;
264        }
265
266        if matches!(key, "reviews" | "comments" | "logs" | "history") {
267            return Priority::BACKGROUND;
268        }
269
270        // Content-based priority
271        match value {
272            JsonValue::Array(arr) if arr.len() > 100 => Priority::BACKGROUND,
273            JsonValue::Object(obj) if obj.contains_key("timestamp") => Priority::MEDIUM,
274            JsonValue::String(s) if s.len() > 1000 => Priority::LOW,
275            _ => Priority::MEDIUM,
276        }
277    }
278
279    /// Calculate priority for array elements
280    fn calculate_array_priority(&self, path: &JsonPath, elements: &[JsonValue]) -> Priority {
281        // Large arrays get background priority
282        if elements.len() > 50 {
283            return Priority::BACKGROUND;
284        }
285
286        // Arrays in certain paths get different priorities
287        if let Some(last_key) = path.last_key() {
288            if matches!(last_key.as_str(), "reviews" | "comments" | "logs") {
289                return Priority::BACKGROUND;
290            }
291            if matches!(last_key.as_str(), "items" | "data" | "results") {
292                return Priority::MEDIUM;
293            }
294        }
295
296        Priority::MEDIUM
297    }
298}
299
300/// Plan for streaming JSON with priority ordering
301#[derive(Debug)]
302pub struct StreamingPlan {
303    pub frames: VecDeque<PriorityStreamFrame>,
304}
305
306impl Default for StreamingPlan {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312impl StreamingPlan {
313    pub fn new() -> Self {
314        Self {
315            frames: VecDeque::new(),
316        }
317    }
318
319    /// Get next frame to send
320    pub fn next_frame(&mut self) -> Option<PriorityStreamFrame> {
321        self.frames.pop_front()
322    }
323
324    /// Check if streaming is complete
325    pub fn is_complete(&self) -> bool {
326        self.frames.is_empty()
327    }
328
329    /// Get remaining frame count
330    pub fn remaining_frames(&self) -> usize {
331        self.frames.len()
332    }
333
334    /// Get iterator over frames
335    pub fn frames(&self) -> impl Iterator<Item = &PriorityStreamFrame> {
336        self.frames.iter()
337    }
338}
339
340impl JsonPath {
341    /// Create root path
342    pub fn root() -> Self {
343        let segments = vec![PathSegment::Root];
344        Self { segments }
345    }
346
347    /// Append key segment
348    pub fn append_key(&self, key: &str) -> Self {
349        let mut segments = self.segments.clone();
350        segments.push(PathSegment::Key(key.to_string()));
351        Self { segments }
352    }
353
354    /// Append index segment
355    pub fn append_index(&self, index: usize) -> Self {
356        let mut segments = self.segments.clone();
357        segments.push(PathSegment::Index(index));
358        Self { segments }
359    }
360
361    /// Get the last key in the path
362    pub fn last_key(&self) -> Option<String> {
363        self.segments.iter().rev().find_map(|segment| {
364            if let PathSegment::Key(key) = segment {
365                Some(key.clone())
366            } else {
367                None
368            }
369        })
370    }
371
372    /// Get segments (read-only)
373    pub fn segments(&self) -> &[PathSegment] {
374        &self.segments
375    }
376
377    /// Get number of segments
378    pub fn len(&self) -> usize {
379        self.segments.len()
380    }
381
382    /// Check if path is empty
383    pub fn is_empty(&self) -> bool {
384        self.segments.is_empty()
385    }
386
387    /// Create JsonPath from segments (for testing)
388    pub fn from_segments(segments: Vec<PathSegment>) -> Self {
389        Self { segments }
390    }
391
392    /// Convert to JSON Pointer string format
393    pub fn to_json_pointer(&self) -> String {
394        let mut pointer = String::new();
395        for segment in &self.segments {
396            match segment {
397                PathSegment::Root => {}
398                PathSegment::Key(key) => {
399                    pointer.push('/');
400                    pointer.push_str(key);
401                }
402                PathSegment::Index(idx) => {
403                    pointer.push('/');
404                    pointer.push_str(&idx.to_string());
405                }
406                PathSegment::Wildcard => {
407                    pointer.push_str("/*");
408                }
409            }
410        }
411        if pointer.is_empty() {
412            "/".to_string()
413        } else {
414            pointer
415        }
416    }
417}
418
419impl Default for PriorityStreamer {
420    fn default() -> Self {
421        Self::new()
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use serde_json::json;
429
430    #[test]
431    fn test_json_path_creation() {
432        let path = JsonPath::root();
433        assert_eq!(path.to_json_pointer(), "/");
434
435        let path = path.append_key("users").append_index(0).append_key("name");
436        assert_eq!(path.to_json_pointer(), "/users/0/name");
437    }
438
439    #[test]
440    fn test_priority_comparison() {
441        assert!(Priority::CRITICAL > Priority::HIGH);
442        assert!(Priority::HIGH > Priority::MEDIUM);
443        assert!(Priority::MEDIUM > Priority::LOW);
444        assert!(Priority::LOW > Priority::BACKGROUND);
445    }
446
447    #[test]
448    fn test_skeleton_generation() {
449        let streamer = PriorityStreamer::new();
450        let json = json!({
451            "name": "John",
452            "age": 30,
453            "active": true,
454            "posts": ["post1", "post2"]
455        });
456
457        let skeleton = streamer.generate_skeleton(&json);
458        let expected = json!({
459            "name": null,
460            "age": 0,
461            "active": false,
462            "posts": []
463        });
464
465        assert_eq!(skeleton, expected);
466    }
467
468    #[test]
469    fn test_field_priority_calculation() {
470        let streamer = PriorityStreamer::new();
471        let path = JsonPath::root();
472
473        assert_eq!(
474            streamer.calculate_field_priority(&path, "id", &json!(123)),
475            Priority::CRITICAL
476        );
477
478        assert_eq!(
479            streamer.calculate_field_priority(&path, "name", &json!("John")),
480            Priority::HIGH
481        );
482
483        assert_eq!(
484            streamer.calculate_field_priority(&path, "reviews", &json!([])),
485            Priority::BACKGROUND
486        );
487    }
488
489    #[test]
490    fn test_streaming_plan_creation() {
491        let streamer = PriorityStreamer::new();
492        let json = json!({
493            "id": 1,
494            "name": "John",
495            "bio": "Software developer",
496            "reviews": ["Good", "Excellent"]
497        });
498
499        let plan = streamer.analyze(&json).unwrap();
500        assert!(!plan.is_complete());
501        assert!(plan.remaining_frames() > 0);
502    }
503}