pjson_rs/stream/
reconstruction.rs

1//! Client-side reconstruction engine for applying streaming patches
2//!
3//! This module provides functionality to reconstruct complete JSON from
4//! skeleton + patch stream frames, enabling progressive data loading.
5
6use crate::Result;
7use crate::stream::priority::{
8    JsonPatch, JsonPath, PatchOperation, PathSegment, PriorityStreamFrame,
9};
10use serde_json::Value as JsonValue;
11use std::collections::VecDeque;
12
13/// Client-side JSON reconstruction engine
14///
15/// Applies streaming patches to progressively build complete JSON structure
16pub struct JsonReconstructor {
17    /// Current state of JSON being reconstructed
18    current_state: JsonValue,
19    /// Queue of received frames waiting to be processed
20    frame_queue: VecDeque<PriorityStreamFrame>,
21    /// Whether reconstruction is complete
22    is_complete: bool,
23    /// Statistics about reconstruction process
24    stats: ReconstructionStats,
25}
26
27/// Statistics about the reconstruction process
28#[derive(Debug, Clone, Default)]
29pub struct ReconstructionStats {
30    /// Total frames processed
31    pub frames_processed: u32,
32    /// Total patches applied
33    pub patches_applied: u32,
34    /// Number of skeleton frames received
35    pub skeleton_frames: u32,
36    /// Number of patch frames received
37    pub patch_frames: u32,
38    /// Time when reconstruction started
39    pub start_time: Option<std::time::Instant>,
40    /// Time when reconstruction completed
41    pub end_time: Option<std::time::Instant>,
42}
43
44impl JsonReconstructor {
45    /// Create new JSON reconstructor
46    pub fn new() -> Self {
47        Self {
48            current_state: JsonValue::Null,
49            frame_queue: VecDeque::new(),
50            is_complete: false,
51            stats: ReconstructionStats::default(),
52        }
53    }
54
55    /// Add a frame to the reconstruction queue
56    pub fn add_frame(&mut self, frame: PriorityStreamFrame) {
57        if self.stats.start_time.is_none() {
58            self.stats.start_time = Some(std::time::Instant::now());
59        }
60
61        self.frame_queue.push_back(frame);
62    }
63
64    /// Process next frame in the queue
65    pub fn process_next_frame(&mut self) -> Result<ProcessResult> {
66        if let Some(frame) = self.frame_queue.pop_front() {
67            self.process_frame(frame)
68        } else {
69            Ok(ProcessResult::NoFrames)
70        }
71    }
72
73    /// Process all queued frames
74    pub fn process_all_frames(&mut self) -> Result<Vec<ProcessResult>> {
75        let mut results = Vec::new();
76
77        while let Some(frame) = self.frame_queue.pop_front() {
78            results.push(self.process_frame(frame)?);
79        }
80
81        Ok(results)
82    }
83
84    /// Process a single frame
85    fn process_frame(&mut self, frame: PriorityStreamFrame) -> Result<ProcessResult> {
86        self.stats.frames_processed += 1;
87
88        match frame {
89            PriorityStreamFrame::Skeleton {
90                data,
91                priority: _,
92                complete: _,
93            } => {
94                self.stats.skeleton_frames += 1;
95                self.current_state = data;
96                Ok(ProcessResult::SkeletonApplied)
97            }
98
99            PriorityStreamFrame::Patch { patches, priority } => {
100                self.stats.patch_frames += 1;
101                let mut applied_paths = Vec::new();
102
103                for patch in patches {
104                    let path = patch.path.clone();
105                    self.apply_patch(patch)?;
106                    applied_paths.push(path);
107                    self.stats.patches_applied += 1;
108                }
109
110                Ok(ProcessResult::PatchesApplied {
111                    count: applied_paths.len(),
112                    priority,
113                    paths: applied_paths,
114                })
115            }
116
117            PriorityStreamFrame::Complete { checksum: _ } => {
118                self.is_complete = true;
119                self.stats.end_time = Some(std::time::Instant::now());
120                Ok(ProcessResult::ReconstructionComplete)
121            }
122        }
123    }
124
125    /// Apply a single patch to the current JSON state
126    fn apply_patch(&mut self, patch: JsonPatch) -> Result<()> {
127        match patch.operation {
128            PatchOperation::Set { value } => {
129                self.set_at_path(&patch.path, value)?;
130            }
131            PatchOperation::Append { values } => {
132                self.append_at_path(&patch.path, values)?;
133            }
134            PatchOperation::Replace { value } => {
135                self.set_at_path(&patch.path, value)?;
136            }
137            PatchOperation::Remove => {
138                self.remove_at_path(&patch.path)?;
139            }
140        }
141
142        Ok(())
143    }
144
145    /// Set value at specified JSON path
146    fn set_at_path(&mut self, path: &JsonPath, value: JsonValue) -> Result<()> {
147        let segments = path.segments();
148        if segments.is_empty() {
149            return Err(crate::Error::Other("Empty path".to_string()));
150        }
151
152        // Handle root replacement
153        if segments.len() == 1 {
154            self.current_state = value;
155            return Ok(());
156        }
157
158        // Navigate to parent and set the target field
159        let target_key = path
160            .last_key()
161            .ok_or_else(|| crate::Error::Other("Invalid path for set operation".to_string()))?;
162
163        let parent = self.get_parent_mut(path)?;
164
165        match parent {
166            JsonValue::Object(map) => {
167                map.insert(target_key, value);
168            }
169            JsonValue::Array(arr) => {
170                if let Ok(index) = target_key.parse::<usize>() {
171                    if index < arr.len() {
172                        arr[index] = value;
173                    } else {
174                        return Err(crate::Error::Other("Array index out of bounds".to_string()));
175                    }
176                } else {
177                    return Err(crate::Error::Other("Invalid array index".to_string()));
178                }
179            }
180            _ => {
181                return Err(crate::Error::Other(
182                    "Cannot set field on non-object/array".to_string(),
183                ));
184            }
185        }
186
187        Ok(())
188    }
189
190    /// Append values to array at specified path
191    fn append_at_path(&mut self, path: &JsonPath, values: Vec<JsonValue>) -> Result<()> {
192        let target = self.get_mut_at_path(path)?;
193
194        match target {
195            JsonValue::Array(arr) => {
196                arr.extend(values);
197            }
198            _ => {
199                return Err(crate::Error::Other(
200                    "Cannot append to non-array".to_string(),
201                ));
202            }
203        }
204
205        Ok(())
206    }
207
208    /// Remove value at specified path
209    fn remove_at_path(&mut self, path: &JsonPath) -> Result<()> {
210        let target_key = path
211            .last_key()
212            .ok_or_else(|| crate::Error::Other("Invalid path for remove operation".to_string()))?;
213
214        let parent = self.get_parent_mut(path)?;
215
216        match parent {
217            JsonValue::Object(map) => {
218                map.remove(&target_key);
219            }
220            JsonValue::Array(arr) => {
221                if let Ok(index) = target_key.parse::<usize>()
222                    && index < arr.len()
223                {
224                    arr.remove(index);
225                }
226            }
227            _ => {
228                return Err(crate::Error::Other(
229                    "Cannot remove from non-object/array".to_string(),
230                ));
231            }
232        }
233
234        Ok(())
235    }
236
237    /// Get mutable reference to parent of target path
238    fn get_parent_mut(&mut self, path: &JsonPath) -> Result<&mut JsonValue> {
239        if path.len() < 2 {
240            return Ok(&mut self.current_state);
241        }
242
243        let segments = path.segments();
244        let parent_segments = &segments[..segments.len() - 1];
245        let mut current = &mut self.current_state;
246
247        for segment in parent_segments.iter().skip(1) {
248            // Skip root
249            match segment {
250                PathSegment::Key(key) => {
251                    if let JsonValue::Object(map) = current {
252                        current = map
253                            .get_mut(key)
254                            .ok_or_else(|| crate::Error::Other(format!("Key not found: {key}")))?;
255                    } else {
256                        return Err(crate::Error::Other("Expected object".to_string()));
257                    }
258                }
259                PathSegment::Index(idx) => {
260                    if let JsonValue::Array(arr) = current {
261                        current = arr.get_mut(*idx).ok_or_else(|| {
262                            crate::Error::Other(format!("Index out of bounds: {idx}"))
263                        })?;
264                    } else {
265                        return Err(crate::Error::Other("Expected array".to_string()));
266                    }
267                }
268                _ => {
269                    return Err(crate::Error::Other("Unsupported path segment".to_string()));
270                }
271            }
272        }
273
274        Ok(current)
275    }
276
277    /// Get mutable reference at specified path
278    fn get_mut_at_path(&mut self, path: &JsonPath) -> Result<&mut JsonValue> {
279        let mut current = &mut self.current_state;
280
281        for segment in path.segments().iter().skip(1) {
282            // Skip root
283            match segment {
284                PathSegment::Key(key) => {
285                    if let JsonValue::Object(map) = current {
286                        current = map
287                            .get_mut(key)
288                            .ok_or_else(|| crate::Error::Other(format!("Key not found: {key}")))?;
289                    } else {
290                        return Err(crate::Error::Other("Expected object".to_string()));
291                    }
292                }
293                PathSegment::Index(idx) => {
294                    if let JsonValue::Array(arr) = current {
295                        current = arr.get_mut(*idx).ok_or_else(|| {
296                            crate::Error::Other(format!("Index out of bounds: {idx}"))
297                        })?;
298                    } else {
299                        return Err(crate::Error::Other("Expected array".to_string()));
300                    }
301                }
302                _ => {
303                    return Err(crate::Error::Other("Unsupported path segment".to_string()));
304                }
305            }
306        }
307
308        Ok(current)
309    }
310
311    /// Get current JSON state (read-only)
312    pub fn current_state(&self) -> &JsonValue {
313        &self.current_state
314    }
315
316    /// Check if reconstruction is complete
317    pub fn is_complete(&self) -> bool {
318        self.is_complete
319    }
320
321    /// Get reconstruction statistics
322    pub fn stats(&self) -> &ReconstructionStats {
323        &self.stats
324    }
325
326    /// Reset reconstructor to initial state
327    pub fn reset(&mut self) {
328        self.current_state = JsonValue::Null;
329        self.frame_queue.clear();
330        self.is_complete = false;
331        self.stats = ReconstructionStats::default();
332    }
333
334    /// Get reconstruction progress (0.0 to 1.0)
335    pub fn progress(&self) -> f32 {
336        if self.is_complete {
337            1.0
338        } else if self.stats.frames_processed == 0 {
339            0.0
340        } else {
341            // Rough estimate based on frames processed vs expected
342            (self.stats.frames_processed as f32
343                / (self.stats.frames_processed + self.frame_queue.len() as u32) as f32)
344                .min(0.95) // Never show 100% until complete
345        }
346    }
347
348    /// Get reconstruction duration if available
349    pub fn duration(&self) -> Option<std::time::Duration> {
350        if let (Some(start), Some(end)) = (self.stats.start_time, self.stats.end_time) {
351            Some(end.duration_since(start))
352        } else {
353            None
354        }
355    }
356}
357
358/// Result of processing a frame
359#[derive(Debug, Clone)]
360pub enum ProcessResult {
361    /// No frames in queue
362    NoFrames,
363    /// Skeleton frame was applied
364    SkeletonApplied,
365    /// Patch frames were applied
366    PatchesApplied {
367        /// Number of patches applied
368        count: usize,
369        /// Priority of the patches
370        priority: crate::stream::Priority,
371        /// Paths that were modified
372        paths: Vec<JsonPath>,
373    },
374    /// Reconstruction completed
375    ReconstructionComplete,
376}
377
378impl Default for JsonReconstructor {
379    fn default() -> Self {
380        Self::new()
381    }
382}
383
384impl ReconstructionStats {
385    /// Get processing rate in frames per second
386    pub fn frames_per_second(&self) -> Option<f32> {
387        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
388            let duration = end.duration_since(start);
389            if duration.as_secs_f32() > 0.0 {
390                Some(self.frames_processed as f32 / duration.as_secs_f32())
391            } else {
392                None
393            }
394        } else {
395            None
396        }
397    }
398
399    /// Get patches per second rate
400    pub fn patches_per_second(&self) -> Option<f32> {
401        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
402            let duration = end.duration_since(start);
403            if duration.as_secs_f32() > 0.0 {
404                Some(self.patches_applied as f32 / duration.as_secs_f32())
405            } else {
406                None
407            }
408        } else {
409            None
410        }
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417    use crate::stream::Priority;
418    use serde_json::json;
419
420    #[test]
421    fn test_reconstructor_creation() {
422        let reconstructor = JsonReconstructor::new();
423        assert!(!reconstructor.is_complete());
424        assert_eq!(reconstructor.stats().frames_processed, 0);
425        assert_eq!(reconstructor.current_state(), &JsonValue::Null);
426    }
427
428    #[test]
429    fn test_skeleton_application() {
430        let mut reconstructor = JsonReconstructor::new();
431
432        let skeleton = json!({
433            "name": null,
434            "age": 0,
435            "active": false
436        });
437
438        let frame = PriorityStreamFrame::Skeleton {
439            data: skeleton.clone(),
440            priority: Priority::CRITICAL,
441            complete: false,
442        };
443
444        reconstructor.add_frame(frame);
445        let result = reconstructor.process_next_frame().unwrap();
446
447        assert!(matches!(result, ProcessResult::SkeletonApplied));
448        assert_eq!(reconstructor.current_state(), &skeleton);
449        assert_eq!(reconstructor.stats().skeleton_frames, 1);
450    }
451
452    #[test]
453    fn test_patch_application() {
454        let mut reconstructor = JsonReconstructor::new();
455
456        // Start with skeleton
457        let skeleton = json!({
458            "name": null,
459            "age": 0
460        });
461
462        reconstructor.current_state = skeleton;
463
464        // Create patch to set name
465        let path = JsonPath::from_segments(vec![
466            PathSegment::Root,
467            PathSegment::Key("name".to_string()),
468        ]);
469
470        let patch = JsonPatch {
471            path,
472            operation: PatchOperation::Set {
473                value: json!("John Doe"),
474            },
475            priority: Priority::HIGH,
476        };
477
478        let frame = PriorityStreamFrame::Patch {
479            patches: vec![patch],
480            priority: Priority::HIGH,
481        };
482
483        reconstructor.add_frame(frame);
484        let result = reconstructor.process_next_frame().unwrap();
485
486        if let ProcessResult::PatchesApplied { count, .. } = result {
487            assert_eq!(count, 1);
488        } else {
489            panic!("Expected PatchesApplied result");
490        }
491
492        let expected = json!({
493            "name": "John Doe",
494            "age": 0
495        });
496
497        assert_eq!(reconstructor.current_state(), &expected);
498        assert_eq!(reconstructor.stats().patches_applied, 1);
499    }
500
501    #[test]
502    fn test_array_append() {
503        let mut reconstructor = JsonReconstructor::new();
504
505        // Start with skeleton having empty array
506        let skeleton = json!({
507            "items": []
508        });
509
510        reconstructor.current_state = skeleton;
511
512        // Create patch to append items
513        let path = JsonPath::from_segments(vec![
514            PathSegment::Root,
515            PathSegment::Key("items".to_string()),
516        ]);
517
518        let patch = JsonPatch {
519            path,
520            operation: PatchOperation::Append {
521                values: vec![json!("item1"), json!("item2")],
522            },
523            priority: Priority::MEDIUM,
524        };
525
526        let frame = PriorityStreamFrame::Patch {
527            patches: vec![patch],
528            priority: Priority::MEDIUM,
529        };
530
531        reconstructor.add_frame(frame);
532        reconstructor.process_next_frame().unwrap();
533
534        let expected = json!({
535            "items": ["item1", "item2"]
536        });
537
538        assert_eq!(reconstructor.current_state(), &expected);
539    }
540
541    #[test]
542    fn test_completion_tracking() {
543        let mut reconstructor = JsonReconstructor::new();
544
545        let complete_frame = PriorityStreamFrame::Complete { checksum: None };
546        reconstructor.add_frame(complete_frame);
547
548        let result = reconstructor.process_next_frame().unwrap();
549        assert!(matches!(result, ProcessResult::ReconstructionComplete));
550        assert!(reconstructor.is_complete());
551        assert!(reconstructor.stats().end_time.is_some());
552    }
553
554    #[test]
555    fn test_progress_calculation() {
556        let mut reconstructor = JsonReconstructor::new();
557
558        // Initially no progress
559        assert_eq!(reconstructor.progress(), 0.0);
560
561        // Add some frames but don't process
562        let skeleton = json!({"test": null});
563        reconstructor.add_frame(PriorityStreamFrame::Skeleton {
564            data: skeleton,
565            priority: Priority::CRITICAL,
566            complete: false,
567        });
568        reconstructor.add_frame(PriorityStreamFrame::Complete { checksum: None });
569
570        // Process one frame
571        reconstructor.process_next_frame().unwrap();
572        let progress = reconstructor.progress();
573        assert!(
574            progress > 0.0 && progress < 1.0,
575            "Progress was: {}",
576            progress
577        );
578
579        // Process remaining frame (Complete)
580        reconstructor.process_next_frame().unwrap();
581        assert_eq!(reconstructor.progress(), 1.0);
582    }
583
584    #[test]
585    fn test_reset_functionality() {
586        let mut reconstructor = JsonReconstructor::new();
587
588        // Setup some state
589        reconstructor.current_state = json!({"test": true});
590        reconstructor.is_complete = true;
591        reconstructor.stats.frames_processed = 5;
592
593        // Reset
594        reconstructor.reset();
595
596        assert_eq!(reconstructor.current_state(), &JsonValue::Null);
597        assert!(!reconstructor.is_complete());
598        assert_eq!(reconstructor.stats().frames_processed, 0);
599    }
600}