1use crate::Result;
7use crate::stream::priority::{
8 JsonPatch, JsonPath, PatchOperation, PathSegment, PriorityStreamFrame,
9};
10use serde_json::Value as JsonValue;
11use std::collections::VecDeque;
12
13pub struct JsonReconstructor {
17 current_state: JsonValue,
19 frame_queue: VecDeque<PriorityStreamFrame>,
21 is_complete: bool,
23 stats: ReconstructionStats,
25}
26
27#[derive(Debug, Clone, Default)]
29pub struct ReconstructionStats {
30 pub frames_processed: u32,
32 pub patches_applied: u32,
34 pub skeleton_frames: u32,
36 pub patch_frames: u32,
38 pub start_time: Option<std::time::Instant>,
40 pub end_time: Option<std::time::Instant>,
42}
43
44impl JsonReconstructor {
45 pub fn new() -> Self {
47 Self {
48 current_state: JsonValue::Null,
49 frame_queue: VecDeque::new(),
50 is_complete: false,
51 stats: ReconstructionStats::default(),
52 }
53 }
54
55 pub fn add_frame(&mut self, frame: PriorityStreamFrame) {
57 if self.stats.start_time.is_none() {
58 self.stats.start_time = Some(std::time::Instant::now());
59 }
60
61 self.frame_queue.push_back(frame);
62 }
63
64 pub fn process_next_frame(&mut self) -> Result<ProcessResult> {
66 if let Some(frame) = self.frame_queue.pop_front() {
67 self.process_frame(frame)
68 } else {
69 Ok(ProcessResult::NoFrames)
70 }
71 }
72
73 pub fn process_all_frames(&mut self) -> Result<Vec<ProcessResult>> {
75 let mut results = Vec::new();
76
77 while let Some(frame) = self.frame_queue.pop_front() {
78 results.push(self.process_frame(frame)?);
79 }
80
81 Ok(results)
82 }
83
84 fn process_frame(&mut self, frame: PriorityStreamFrame) -> Result<ProcessResult> {
86 self.stats.frames_processed += 1;
87
88 match frame {
89 PriorityStreamFrame::Skeleton {
90 data,
91 priority: _,
92 complete: _,
93 } => {
94 self.stats.skeleton_frames += 1;
95 self.current_state = data;
96 Ok(ProcessResult::SkeletonApplied)
97 }
98
99 PriorityStreamFrame::Patch { patches, priority } => {
100 self.stats.patch_frames += 1;
101 let mut applied_paths = Vec::new();
102
103 for patch in patches {
104 let path = patch.path.clone();
105 self.apply_patch(patch)?;
106 applied_paths.push(path);
107 self.stats.patches_applied += 1;
108 }
109
110 Ok(ProcessResult::PatchesApplied {
111 count: applied_paths.len(),
112 priority,
113 paths: applied_paths,
114 })
115 }
116
117 PriorityStreamFrame::Complete { checksum: _ } => {
118 self.is_complete = true;
119 self.stats.end_time = Some(std::time::Instant::now());
120 Ok(ProcessResult::ReconstructionComplete)
121 }
122 }
123 }
124
125 fn apply_patch(&mut self, patch: JsonPatch) -> Result<()> {
127 match patch.operation {
128 PatchOperation::Set { value } => {
129 self.set_at_path(&patch.path, value)?;
130 }
131 PatchOperation::Append { values } => {
132 self.append_at_path(&patch.path, values)?;
133 }
134 PatchOperation::Replace { value } => {
135 self.set_at_path(&patch.path, value)?;
136 }
137 PatchOperation::Remove => {
138 self.remove_at_path(&patch.path)?;
139 }
140 }
141
142 Ok(())
143 }
144
145 fn set_at_path(&mut self, path: &JsonPath, value: JsonValue) -> Result<()> {
147 let segments = path.segments();
148 if segments.is_empty() {
149 return Err(crate::Error::Other("Empty path".to_string()));
150 }
151
152 if segments.len() == 1 {
154 self.current_state = value;
155 return Ok(());
156 }
157
158 let target_key = path
160 .last_key()
161 .ok_or_else(|| crate::Error::Other("Invalid path for set operation".to_string()))?;
162
163 let parent = self.get_parent_mut(path)?;
164
165 match parent {
166 JsonValue::Object(map) => {
167 map.insert(target_key, value);
168 }
169 JsonValue::Array(arr) => {
170 if let Ok(index) = target_key.parse::<usize>() {
171 if index < arr.len() {
172 arr[index] = value;
173 } else {
174 return Err(crate::Error::Other("Array index out of bounds".to_string()));
175 }
176 } else {
177 return Err(crate::Error::Other("Invalid array index".to_string()));
178 }
179 }
180 _ => {
181 return Err(crate::Error::Other(
182 "Cannot set field on non-object/array".to_string(),
183 ));
184 }
185 }
186
187 Ok(())
188 }
189
190 fn append_at_path(&mut self, path: &JsonPath, values: Vec<JsonValue>) -> Result<()> {
192 let target = self.get_mut_at_path(path)?;
193
194 match target {
195 JsonValue::Array(arr) => {
196 arr.extend(values);
197 }
198 _ => {
199 return Err(crate::Error::Other(
200 "Cannot append to non-array".to_string(),
201 ));
202 }
203 }
204
205 Ok(())
206 }
207
208 fn remove_at_path(&mut self, path: &JsonPath) -> Result<()> {
210 let target_key = path
211 .last_key()
212 .ok_or_else(|| crate::Error::Other("Invalid path for remove operation".to_string()))?;
213
214 let parent = self.get_parent_mut(path)?;
215
216 match parent {
217 JsonValue::Object(map) => {
218 map.remove(&target_key);
219 }
220 JsonValue::Array(arr) => {
221 if let Ok(index) = target_key.parse::<usize>()
222 && index < arr.len()
223 {
224 arr.remove(index);
225 }
226 }
227 _ => {
228 return Err(crate::Error::Other(
229 "Cannot remove from non-object/array".to_string(),
230 ));
231 }
232 }
233
234 Ok(())
235 }
236
237 fn get_parent_mut(&mut self, path: &JsonPath) -> Result<&mut JsonValue> {
239 if path.len() < 2 {
240 return Ok(&mut self.current_state);
241 }
242
243 let segments = path.segments();
244 let parent_segments = &segments[..segments.len() - 1];
245 let mut current = &mut self.current_state;
246
247 for segment in parent_segments.iter().skip(1) {
248 match segment {
250 PathSegment::Key(key) => {
251 if let JsonValue::Object(map) = current {
252 current = map
253 .get_mut(key)
254 .ok_or_else(|| crate::Error::Other(format!("Key not found: {key}")))?;
255 } else {
256 return Err(crate::Error::Other("Expected object".to_string()));
257 }
258 }
259 PathSegment::Index(idx) => {
260 if let JsonValue::Array(arr) = current {
261 current = arr.get_mut(*idx).ok_or_else(|| {
262 crate::Error::Other(format!("Index out of bounds: {idx}"))
263 })?;
264 } else {
265 return Err(crate::Error::Other("Expected array".to_string()));
266 }
267 }
268 _ => {
269 return Err(crate::Error::Other("Unsupported path segment".to_string()));
270 }
271 }
272 }
273
274 Ok(current)
275 }
276
277 fn get_mut_at_path(&mut self, path: &JsonPath) -> Result<&mut JsonValue> {
279 let mut current = &mut self.current_state;
280
281 for segment in path.segments().iter().skip(1) {
282 match segment {
284 PathSegment::Key(key) => {
285 if let JsonValue::Object(map) = current {
286 current = map
287 .get_mut(key)
288 .ok_or_else(|| crate::Error::Other(format!("Key not found: {key}")))?;
289 } else {
290 return Err(crate::Error::Other("Expected object".to_string()));
291 }
292 }
293 PathSegment::Index(idx) => {
294 if let JsonValue::Array(arr) = current {
295 current = arr.get_mut(*idx).ok_or_else(|| {
296 crate::Error::Other(format!("Index out of bounds: {idx}"))
297 })?;
298 } else {
299 return Err(crate::Error::Other("Expected array".to_string()));
300 }
301 }
302 _ => {
303 return Err(crate::Error::Other("Unsupported path segment".to_string()));
304 }
305 }
306 }
307
308 Ok(current)
309 }
310
311 pub fn current_state(&self) -> &JsonValue {
313 &self.current_state
314 }
315
316 pub fn is_complete(&self) -> bool {
318 self.is_complete
319 }
320
321 pub fn stats(&self) -> &ReconstructionStats {
323 &self.stats
324 }
325
326 pub fn reset(&mut self) {
328 self.current_state = JsonValue::Null;
329 self.frame_queue.clear();
330 self.is_complete = false;
331 self.stats = ReconstructionStats::default();
332 }
333
334 pub fn progress(&self) -> f32 {
336 if self.is_complete {
337 1.0
338 } else if self.stats.frames_processed == 0 {
339 0.0
340 } else {
341 (self.stats.frames_processed as f32
343 / (self.stats.frames_processed + self.frame_queue.len() as u32) as f32)
344 .min(0.95) }
346 }
347
348 pub fn duration(&self) -> Option<std::time::Duration> {
350 if let (Some(start), Some(end)) = (self.stats.start_time, self.stats.end_time) {
351 Some(end.duration_since(start))
352 } else {
353 None
354 }
355 }
356}
357
358#[derive(Debug, Clone)]
360pub enum ProcessResult {
361 NoFrames,
363 SkeletonApplied,
365 PatchesApplied {
367 count: usize,
369 priority: crate::stream::Priority,
371 paths: Vec<JsonPath>,
373 },
374 ReconstructionComplete,
376}
377
378impl Default for JsonReconstructor {
379 fn default() -> Self {
380 Self::new()
381 }
382}
383
384impl ReconstructionStats {
385 pub fn frames_per_second(&self) -> Option<f32> {
387 if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
388 let duration = end.duration_since(start);
389 if duration.as_secs_f32() > 0.0 {
390 Some(self.frames_processed as f32 / duration.as_secs_f32())
391 } else {
392 None
393 }
394 } else {
395 None
396 }
397 }
398
399 pub fn patches_per_second(&self) -> Option<f32> {
401 if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
402 let duration = end.duration_since(start);
403 if duration.as_secs_f32() > 0.0 {
404 Some(self.patches_applied as f32 / duration.as_secs_f32())
405 } else {
406 None
407 }
408 } else {
409 None
410 }
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417 use crate::stream::Priority;
418 use serde_json::json;
419
420 #[test]
421 fn test_reconstructor_creation() {
422 let reconstructor = JsonReconstructor::new();
423 assert!(!reconstructor.is_complete());
424 assert_eq!(reconstructor.stats().frames_processed, 0);
425 assert_eq!(reconstructor.current_state(), &JsonValue::Null);
426 }
427
428 #[test]
429 fn test_skeleton_application() {
430 let mut reconstructor = JsonReconstructor::new();
431
432 let skeleton = json!({
433 "name": null,
434 "age": 0,
435 "active": false
436 });
437
438 let frame = PriorityStreamFrame::Skeleton {
439 data: skeleton.clone(),
440 priority: Priority::CRITICAL,
441 complete: false,
442 };
443
444 reconstructor.add_frame(frame);
445 let result = reconstructor.process_next_frame().unwrap();
446
447 assert!(matches!(result, ProcessResult::SkeletonApplied));
448 assert_eq!(reconstructor.current_state(), &skeleton);
449 assert_eq!(reconstructor.stats().skeleton_frames, 1);
450 }
451
452 #[test]
453 fn test_patch_application() {
454 let mut reconstructor = JsonReconstructor::new();
455
456 let skeleton = json!({
458 "name": null,
459 "age": 0
460 });
461
462 reconstructor.current_state = skeleton;
463
464 let path = JsonPath::from_segments(vec![
466 PathSegment::Root,
467 PathSegment::Key("name".to_string()),
468 ]);
469
470 let patch = JsonPatch {
471 path,
472 operation: PatchOperation::Set {
473 value: json!("John Doe"),
474 },
475 priority: Priority::HIGH,
476 };
477
478 let frame = PriorityStreamFrame::Patch {
479 patches: vec![patch],
480 priority: Priority::HIGH,
481 };
482
483 reconstructor.add_frame(frame);
484 let result = reconstructor.process_next_frame().unwrap();
485
486 if let ProcessResult::PatchesApplied { count, .. } = result {
487 assert_eq!(count, 1);
488 } else {
489 panic!("Expected PatchesApplied result");
490 }
491
492 let expected = json!({
493 "name": "John Doe",
494 "age": 0
495 });
496
497 assert_eq!(reconstructor.current_state(), &expected);
498 assert_eq!(reconstructor.stats().patches_applied, 1);
499 }
500
501 #[test]
502 fn test_array_append() {
503 let mut reconstructor = JsonReconstructor::new();
504
505 let skeleton = json!({
507 "items": []
508 });
509
510 reconstructor.current_state = skeleton;
511
512 let path = JsonPath::from_segments(vec![
514 PathSegment::Root,
515 PathSegment::Key("items".to_string()),
516 ]);
517
518 let patch = JsonPatch {
519 path,
520 operation: PatchOperation::Append {
521 values: vec![json!("item1"), json!("item2")],
522 },
523 priority: Priority::MEDIUM,
524 };
525
526 let frame = PriorityStreamFrame::Patch {
527 patches: vec![patch],
528 priority: Priority::MEDIUM,
529 };
530
531 reconstructor.add_frame(frame);
532 reconstructor.process_next_frame().unwrap();
533
534 let expected = json!({
535 "items": ["item1", "item2"]
536 });
537
538 assert_eq!(reconstructor.current_state(), &expected);
539 }
540
541 #[test]
542 fn test_completion_tracking() {
543 let mut reconstructor = JsonReconstructor::new();
544
545 let complete_frame = PriorityStreamFrame::Complete { checksum: None };
546 reconstructor.add_frame(complete_frame);
547
548 let result = reconstructor.process_next_frame().unwrap();
549 assert!(matches!(result, ProcessResult::ReconstructionComplete));
550 assert!(reconstructor.is_complete());
551 assert!(reconstructor.stats().end_time.is_some());
552 }
553
554 #[test]
555 fn test_progress_calculation() {
556 let mut reconstructor = JsonReconstructor::new();
557
558 assert_eq!(reconstructor.progress(), 0.0);
560
561 let skeleton = json!({"test": null});
563 reconstructor.add_frame(PriorityStreamFrame::Skeleton {
564 data: skeleton,
565 priority: Priority::CRITICAL,
566 complete: false,
567 });
568 reconstructor.add_frame(PriorityStreamFrame::Complete { checksum: None });
569
570 reconstructor.process_next_frame().unwrap();
572 let progress = reconstructor.progress();
573 assert!(
574 progress > 0.0 && progress < 1.0,
575 "Progress was: {}",
576 progress
577 );
578
579 reconstructor.process_next_frame().unwrap();
581 assert_eq!(reconstructor.progress(), 1.0);
582 }
583
584 #[test]
585 fn test_reset_functionality() {
586 let mut reconstructor = JsonReconstructor::new();
587
588 reconstructor.current_state = json!({"test": true});
590 reconstructor.is_complete = true;
591 reconstructor.stats.frames_processed = 5;
592
593 reconstructor.reset();
595
596 assert_eq!(reconstructor.current_state(), &JsonValue::Null);
597 assert!(!reconstructor.is_complete());
598 assert_eq!(reconstructor.stats().frames_processed, 0);
599 }
600}