Skip to main content

pjson_rs/stream/
priority.rs

1//! Priority-based JSON streaming implementation
2//!
3//! This module implements the core Priority JSON Streaming protocol with:
4//! - Skeleton-first approach
5//! - JSON Path based patching
6//! - Priority-based field ordering
7//! - Incremental reconstruction
8
9use crate::Result;
10use crate::domain::value_objects::Priority;
11use serde_json::{Map as JsonMap, Value as JsonValue};
12use std::collections::VecDeque;
13
14/// Custom serde for Priority in stream module
15mod serde_priority {
16    use crate::domain::value_objects::Priority;
17    use serde::{Serialize, Serializer};
18
19    pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20    where
21        S: Serializer,
22    {
23        priority.value().serialize(serializer)
24    }
25}
26
27/// JSON Path for addressing specific nodes in the JSON structure
28#[derive(Debug, Clone, PartialEq, serde::Serialize)]
29pub struct JsonPath {
30    segments: Vec<PathSegment>,
31}
32
33/// Single segment of a [`JsonPath`].
34#[derive(Debug, Clone, PartialEq, serde::Serialize)]
35pub enum PathSegment {
36    /// Root marker (`$`).
37    Root,
38    /// Object property key.
39    Key(String),
40    /// Array index.
41    Index(usize),
42    /// Wildcard matching any segment.
43    Wildcard,
44}
45
46/// Patch operation for updating JSON structure
47#[derive(Debug, Clone, serde::Serialize)]
48pub struct JsonPatch {
49    /// Path within the target JSON document.
50    pub path: JsonPath,
51    /// Operation to apply at `path`.
52    pub operation: PatchOperation,
53    /// Priority assigned to this patch.
54    #[serde(with = "serde_priority")]
55    pub priority: Priority,
56}
57
58/// Operation a [`JsonPatch`] performs at its target path.
59#[derive(Debug, Clone, serde::Serialize)]
60pub enum PatchOperation {
61    /// Replace the value at the path with `value`.
62    Set {
63        /// New value to set at the path.
64        value: JsonValue,
65    },
66    /// Append values to the array at the path.
67    Append {
68        /// Values appended to the target array.
69        values: Vec<JsonValue>,
70    },
71    /// Replace the value at the path with `value` (semantically distinct from `Set`).
72    Replace {
73        /// Replacement value.
74        value: JsonValue,
75    },
76    /// Remove the value at the path.
77    Remove,
78}
79
80/// Streaming frame containing skeleton or patch data
81#[derive(Debug, Clone, serde::Serialize)]
82pub enum PriorityStreamFrame {
83    /// Initial skeleton frame with placeholder values.
84    Skeleton {
85        /// Skeleton JSON value (nulls/empties for fields filled later).
86        data: JsonValue,
87        /// Priority of the skeleton frame.
88        #[serde(with = "serde_priority")]
89        priority: Priority,
90        /// Whether the skeleton is final or further skeletons may follow.
91        complete: bool,
92    },
93    /// Batch of patches sharing the same priority.
94    Patch {
95        /// Patches in this batch.
96        patches: Vec<JsonPatch>,
97        /// Priority shared by all patches in the batch.
98        #[serde(with = "serde_priority")]
99        priority: Priority,
100    },
101    /// Terminal frame indicating the stream is complete.
102    Complete {
103        /// Optional checksum of the reconstructed payload.
104        checksum: Option<u64>,
105    },
106}
107
108/// Priority-based JSON streamer
109pub struct PriorityStreamer {
110    config: StreamerConfig,
111}
112
113/// Configuration for [`PriorityStreamer`].
114#[derive(Debug, Clone)]
115pub struct StreamerConfig {
116    /// Enable name-based heuristics that infer priorities from common field names.
117    pub detect_semantics: bool,
118    /// Maximum number of patches per [`PriorityStreamFrame::Patch`] batch.
119    pub max_patch_size: usize,
120    /// Patches with priority below this threshold are dropped.
121    pub priority_threshold: Priority,
122}
123
124impl Default for StreamerConfig {
125    fn default() -> Self {
126        Self {
127            detect_semantics: true,
128            max_patch_size: 100,
129            priority_threshold: Priority::LOW,
130        }
131    }
132}
133
134impl PriorityStreamer {
135    /// Create new priority streamer
136    pub fn new() -> Self {
137        Self::with_config(StreamerConfig::default())
138    }
139
140    /// Create streamer with custom configuration
141    pub fn with_config(config: StreamerConfig) -> Self {
142        Self { config }
143    }
144
145    /// Analyze JSON and create streaming plan
146    pub fn analyze(&self, json: &JsonValue) -> Result<StreamingPlan> {
147        let mut plan = StreamingPlan::new();
148
149        // Generate skeleton
150        let skeleton = self.generate_skeleton(json);
151        plan.frames.push_back(PriorityStreamFrame::Skeleton {
152            data: skeleton,
153            priority: Priority::CRITICAL,
154            complete: false,
155        });
156
157        // Extract patches by priority
158        let mut patches = Vec::new();
159        self.extract_patches(json, &JsonPath::root(), &mut patches)?;
160
161        // Group patches by priority
162        patches.sort_by_key(|patch| std::cmp::Reverse(patch.priority));
163
164        let mut current_priority = Priority::CRITICAL;
165        let mut current_batch = Vec::new();
166
167        for patch in patches {
168            if patch.priority != current_priority && !current_batch.is_empty() {
169                plan.frames.push_back(PriorityStreamFrame::Patch {
170                    patches: current_batch,
171                    priority: current_priority,
172                });
173                current_batch = Vec::new();
174            }
175            current_priority = patch.priority;
176            current_batch.push(patch);
177
178            if current_batch.len() >= self.config.max_patch_size {
179                plan.frames.push_back(PriorityStreamFrame::Patch {
180                    patches: current_batch,
181                    priority: current_priority,
182                });
183                current_batch = Vec::new();
184            }
185        }
186
187        // Add remaining patches
188        if !current_batch.is_empty() {
189            plan.frames.push_back(PriorityStreamFrame::Patch {
190                patches: current_batch,
191                priority: current_priority,
192            });
193        }
194
195        // Add completion frame
196        plan.frames
197            .push_back(PriorityStreamFrame::Complete { checksum: None });
198
199        Ok(plan)
200    }
201
202    /// Generate skeleton structure with null/empty values
203    fn generate_skeleton(&self, json: &JsonValue) -> JsonValue {
204        match json {
205            JsonValue::Object(map) => {
206                let mut skeleton = JsonMap::new();
207                for (key, value) in map {
208                    skeleton.insert(
209                        key.clone(),
210                        match value {
211                            JsonValue::Array(_) => JsonValue::Array(vec![]),
212                            JsonValue::Object(_) => self.generate_skeleton(value),
213                            JsonValue::String(_) => JsonValue::Null,
214                            JsonValue::Number(_) => JsonValue::Number(0.into()),
215                            JsonValue::Bool(_) => JsonValue::Bool(false),
216                            JsonValue::Null => JsonValue::Null,
217                        },
218                    );
219                }
220                JsonValue::Object(skeleton)
221            }
222            JsonValue::Array(_) => JsonValue::Array(vec![]),
223            _ => JsonValue::Null,
224        }
225    }
226
227    /// Extract patches from JSON structure
228    fn extract_patches(
229        &self,
230        json: &JsonValue,
231        current_path: &JsonPath,
232        patches: &mut Vec<JsonPatch>,
233    ) -> Result<()> {
234        match json {
235            JsonValue::Object(map) => {
236                for (key, value) in map {
237                    let field_path = current_path.append_key(key);
238                    let priority = self.calculate_field_priority(&field_path, key, value);
239
240                    // Create patch for this field
241                    patches.push(JsonPatch {
242                        path: field_path.clone(),
243                        operation: PatchOperation::Set {
244                            value: value.clone(),
245                        },
246                        priority,
247                    });
248
249                    // Recursively process nested structures
250                    self.extract_patches(value, &field_path, patches)?;
251                }
252            }
253            JsonValue::Array(arr) => {
254                // For arrays, create append operations in chunks
255                if arr.len() > 10 {
256                    // Chunk large arrays
257                    for chunk in arr.chunks(self.config.max_patch_size) {
258                        patches.push(JsonPatch {
259                            path: current_path.clone(),
260                            operation: PatchOperation::Append {
261                                values: chunk.to_vec(),
262                            },
263                            priority: self.calculate_array_priority(current_path, chunk),
264                        });
265                    }
266                } else if !arr.is_empty() {
267                    patches.push(JsonPatch {
268                        path: current_path.clone(),
269                        operation: PatchOperation::Append {
270                            values: arr.clone(),
271                        },
272                        priority: self.calculate_array_priority(current_path, arr),
273                    });
274                }
275            }
276            _ => {
277                // Primitive values handled by parent object/array
278            }
279        }
280
281        Ok(())
282    }
283
284    /// Calculate priority for a field based on path and content
285    fn calculate_field_priority(&self, _path: &JsonPath, key: &str, value: &JsonValue) -> Priority {
286        // Critical fields
287        if matches!(key, "id" | "uuid" | "status" | "type" | "kind") {
288            return Priority::CRITICAL;
289        }
290
291        // High priority fields
292        if matches!(key, "name" | "title" | "label" | "email" | "username") {
293            return Priority::HIGH;
294        }
295
296        // Low priority patterns
297        if key.contains("analytics") || key.contains("stats") || key.contains("meta") {
298            return Priority::LOW;
299        }
300
301        if matches!(key, "reviews" | "comments" | "logs" | "history") {
302            return Priority::BACKGROUND;
303        }
304
305        // Content-based priority
306        match value {
307            JsonValue::Array(arr) if arr.len() > 100 => Priority::BACKGROUND,
308            JsonValue::Object(obj) if obj.contains_key("timestamp") => Priority::MEDIUM,
309            JsonValue::String(s) if s.len() > 1000 => Priority::LOW,
310            _ => Priority::MEDIUM,
311        }
312    }
313
314    /// Calculate priority for array elements
315    fn calculate_array_priority(&self, path: &JsonPath, elements: &[JsonValue]) -> Priority {
316        // Large arrays get background priority
317        if elements.len() > 50 {
318            return Priority::BACKGROUND;
319        }
320
321        // Arrays in certain paths get different priorities
322        if let Some(last_key) = path.last_key() {
323            if matches!(last_key.as_str(), "reviews" | "comments" | "logs") {
324                return Priority::BACKGROUND;
325            }
326            if matches!(last_key.as_str(), "items" | "data" | "results") {
327                return Priority::MEDIUM;
328            }
329        }
330
331        Priority::MEDIUM
332    }
333}
334
335/// Plan for streaming JSON with priority ordering
336#[derive(Debug)]
337pub struct StreamingPlan {
338    /// Ordered queue of frames produced by analysis.
339    pub frames: VecDeque<PriorityStreamFrame>,
340}
341
342impl Default for StreamingPlan {
343    fn default() -> Self {
344        Self::new()
345    }
346}
347
348impl StreamingPlan {
349    /// Create an empty plan with no frames.
350    pub fn new() -> Self {
351        Self {
352            frames: VecDeque::new(),
353        }
354    }
355
356    /// Get next frame to send
357    pub fn next_frame(&mut self) -> Option<PriorityStreamFrame> {
358        self.frames.pop_front()
359    }
360
361    /// Check if streaming is complete
362    pub fn is_complete(&self) -> bool {
363        self.frames.is_empty()
364    }
365
366    /// Get remaining frame count
367    pub fn remaining_frames(&self) -> usize {
368        self.frames.len()
369    }
370
371    /// Get iterator over frames
372    pub fn frames(&self) -> impl Iterator<Item = &PriorityStreamFrame> {
373        self.frames.iter()
374    }
375}
376
377impl JsonPath {
378    /// Create root path
379    pub fn root() -> Self {
380        let segments = vec![PathSegment::Root];
381        Self { segments }
382    }
383
384    /// Append key segment
385    pub fn append_key(&self, key: &str) -> Self {
386        let mut segments = self.segments.clone();
387        segments.push(PathSegment::Key(key.to_string()));
388        Self { segments }
389    }
390
391    /// Append index segment
392    pub fn append_index(&self, index: usize) -> Self {
393        let mut segments = self.segments.clone();
394        segments.push(PathSegment::Index(index));
395        Self { segments }
396    }
397
398    /// Get the last key in the path
399    pub fn last_key(&self) -> Option<String> {
400        self.segments.iter().rev().find_map(|segment| {
401            if let PathSegment::Key(key) = segment {
402                Some(key.clone())
403            } else {
404                None
405            }
406        })
407    }
408
409    /// Get segments (read-only)
410    pub fn segments(&self) -> &[PathSegment] {
411        &self.segments
412    }
413
414    /// Get number of segments
415    pub fn len(&self) -> usize {
416        self.segments.len()
417    }
418
419    /// Check if path is empty
420    pub fn is_empty(&self) -> bool {
421        self.segments.is_empty()
422    }
423
424    /// Create JsonPath from segments (for testing)
425    pub fn from_segments(segments: Vec<PathSegment>) -> Self {
426        Self { segments }
427    }
428
429    /// Convert to JSON Pointer string format
430    pub fn to_json_pointer(&self) -> String {
431        let mut pointer = String::new();
432        for segment in &self.segments {
433            match segment {
434                PathSegment::Root => {}
435                PathSegment::Key(key) => {
436                    pointer.push('/');
437                    pointer.push_str(key);
438                }
439                PathSegment::Index(idx) => {
440                    pointer.push('/');
441                    pointer.push_str(&idx.to_string());
442                }
443                PathSegment::Wildcard => {
444                    pointer.push_str("/*");
445                }
446            }
447        }
448        if pointer.is_empty() {
449            "/".to_string()
450        } else {
451            pointer
452        }
453    }
454}
455
456impl Default for PriorityStreamer {
457    fn default() -> Self {
458        Self::new()
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use serde_json::json;
466
467    #[test]
468    fn test_json_path_creation() {
469        let path = JsonPath::root();
470        assert_eq!(path.to_json_pointer(), "/");
471
472        let path = path.append_key("users").append_index(0).append_key("name");
473        assert_eq!(path.to_json_pointer(), "/users/0/name");
474    }
475
476    #[test]
477    fn test_priority_comparison() {
478        assert!(Priority::CRITICAL > Priority::HIGH);
479        assert!(Priority::HIGH > Priority::MEDIUM);
480        assert!(Priority::MEDIUM > Priority::LOW);
481        assert!(Priority::LOW > Priority::BACKGROUND);
482    }
483
484    #[test]
485    fn test_skeleton_generation() {
486        let streamer = PriorityStreamer::new();
487        let json = json!({
488            "name": "John",
489            "age": 30,
490            "active": true,
491            "posts": ["post1", "post2"]
492        });
493
494        let skeleton = streamer.generate_skeleton(&json);
495        let expected = json!({
496            "name": null,
497            "age": 0,
498            "active": false,
499            "posts": []
500        });
501
502        assert_eq!(skeleton, expected);
503    }
504
505    #[test]
506    fn test_field_priority_calculation() {
507        let streamer = PriorityStreamer::new();
508        let path = JsonPath::root();
509
510        assert_eq!(
511            streamer.calculate_field_priority(&path, "id", &json!(123)),
512            Priority::CRITICAL
513        );
514
515        assert_eq!(
516            streamer.calculate_field_priority(&path, "name", &json!("John")),
517            Priority::HIGH
518        );
519
520        assert_eq!(
521            streamer.calculate_field_priority(&path, "reviews", &json!([])),
522            Priority::BACKGROUND
523        );
524    }
525
526    #[test]
527    fn test_streaming_plan_creation() {
528        let streamer = PriorityStreamer::new();
529        let json = json!({
530            "id": 1,
531            "name": "John",
532            "bio": "Software developer",
533            "reviews": ["Good", "Excellent"]
534        });
535
536        let plan = streamer.analyze(&json).unwrap();
537        assert!(!plan.is_complete());
538        assert!(plan.remaining_frames() > 0);
539    }
540}