use crate::Result;
use crate::stream::priority::{
JsonPatch, JsonPath, PatchOperation, PathSegment, PriorityStreamFrame,
};
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
pub struct JsonReconstructor {
current_state: JsonValue,
frame_queue: VecDeque<PriorityStreamFrame>,
is_complete: bool,
stats: ReconstructionStats,
}
#[derive(Debug, Clone, Default)]
pub struct ReconstructionStats {
pub frames_processed: u32,
pub patches_applied: u32,
pub skeleton_frames: u32,
pub patch_frames: u32,
pub start_time: Option<std::time::Instant>,
pub end_time: Option<std::time::Instant>,
}
impl JsonReconstructor {
pub fn new() -> Self {
Self {
current_state: JsonValue::Null,
frame_queue: VecDeque::new(),
is_complete: false,
stats: ReconstructionStats::default(),
}
}
pub fn add_frame(&mut self, frame: PriorityStreamFrame) {
if self.stats.start_time.is_none() {
self.stats.start_time = Some(std::time::Instant::now());
}
self.frame_queue.push_back(frame);
}
pub fn process_next_frame(&mut self) -> Result<ProcessResult> {
if let Some(frame) = self.frame_queue.pop_front() {
self.process_frame(frame)
} else {
Ok(ProcessResult::NoFrames)
}
}
pub fn process_all_frames(&mut self) -> Result<Vec<ProcessResult>> {
let mut results = Vec::new();
while let Some(frame) = self.frame_queue.pop_front() {
results.push(self.process_frame(frame)?);
}
Ok(results)
}
fn process_frame(&mut self, frame: PriorityStreamFrame) -> Result<ProcessResult> {
self.stats.frames_processed += 1;
match frame {
PriorityStreamFrame::Skeleton {
data,
priority: _,
complete: _,
} => {
self.stats.skeleton_frames += 1;
self.current_state = data;
Ok(ProcessResult::SkeletonApplied)
}
PriorityStreamFrame::Patch { patches, priority } => {
self.stats.patch_frames += 1;
let mut applied_paths = Vec::new();
for patch in patches {
let path = patch.path.clone();
self.apply_patch(patch)?;
applied_paths.push(path);
self.stats.patches_applied += 1;
}
Ok(ProcessResult::PatchesApplied {
count: applied_paths.len(),
priority,
paths: applied_paths,
})
}
PriorityStreamFrame::Complete { checksum: _ } => {
self.is_complete = true;
self.stats.end_time = Some(std::time::Instant::now());
Ok(ProcessResult::ReconstructionComplete)
}
}
}
fn apply_patch(&mut self, patch: JsonPatch) -> Result<()> {
match patch.operation {
PatchOperation::Set { value } => {
self.set_at_path(&patch.path, value)?;
}
PatchOperation::Append { values } => {
self.append_at_path(&patch.path, values)?;
}
PatchOperation::Replace { value } => {
self.set_at_path(&patch.path, value)?;
}
PatchOperation::Remove => {
self.remove_at_path(&patch.path)?;
}
}
Ok(())
}
fn set_at_path(&mut self, path: &JsonPath, value: JsonValue) -> Result<()> {
let segments = path.segments();
if segments.is_empty() {
return Err(crate::Error::Other("Empty path".to_string()));
}
if segments.len() == 1 {
self.current_state = value;
return Ok(());
}
let target_key = path
.last_key()
.ok_or_else(|| crate::Error::Other("Invalid path for set operation".to_string()))?;
let parent = self.get_parent_mut(path)?;
match parent {
JsonValue::Object(map) => {
map.insert(target_key, value);
}
JsonValue::Array(arr) => {
if let Ok(index) = target_key.parse::<usize>() {
if index < arr.len() {
arr[index] = value;
} else {
return Err(crate::Error::Other("Array index out of bounds".to_string()));
}
} else {
return Err(crate::Error::Other("Invalid array index".to_string()));
}
}
_ => {
return Err(crate::Error::Other(
"Cannot set field on non-object/array".to_string(),
));
}
}
Ok(())
}
fn append_at_path(&mut self, path: &JsonPath, values: Vec<JsonValue>) -> Result<()> {
let target = self.get_mut_at_path(path)?;
match target {
JsonValue::Array(arr) => {
arr.extend(values);
}
_ => {
return Err(crate::Error::Other(
"Cannot append to non-array".to_string(),
));
}
}
Ok(())
}
fn remove_at_path(&mut self, path: &JsonPath) -> Result<()> {
let target_key = path
.last_key()
.ok_or_else(|| crate::Error::Other("Invalid path for remove operation".to_string()))?;
let parent = self.get_parent_mut(path)?;
match parent {
JsonValue::Object(map) => {
map.remove(&target_key);
}
JsonValue::Array(arr) => {
if let Ok(index) = target_key.parse::<usize>()
&& index < arr.len()
{
arr.remove(index);
}
}
_ => {
return Err(crate::Error::Other(
"Cannot remove from non-object/array".to_string(),
));
}
}
Ok(())
}
fn get_parent_mut(&mut self, path: &JsonPath) -> Result<&mut JsonValue> {
if path.len() < 2 {
return Ok(&mut self.current_state);
}
let segments = path.segments();
let parent_segments = &segments[..segments.len() - 1];
let mut current = &mut self.current_state;
for segment in parent_segments.iter().skip(1) {
match segment {
PathSegment::Key(key) => {
if let JsonValue::Object(map) = current {
current = map
.get_mut(key)
.ok_or_else(|| crate::Error::Other(format!("Key not found: {key}")))?;
} else {
return Err(crate::Error::Other("Expected object".to_string()));
}
}
PathSegment::Index(idx) => {
if let JsonValue::Array(arr) = current {
current = arr.get_mut(*idx).ok_or_else(|| {
crate::Error::Other(format!("Index out of bounds: {idx}"))
})?;
} else {
return Err(crate::Error::Other("Expected array".to_string()));
}
}
_ => {
return Err(crate::Error::Other("Unsupported path segment".to_string()));
}
}
}
Ok(current)
}
fn get_mut_at_path(&mut self, path: &JsonPath) -> Result<&mut JsonValue> {
let mut current = &mut self.current_state;
for segment in path.segments().iter().skip(1) {
match segment {
PathSegment::Key(key) => {
if let JsonValue::Object(map) = current {
current = map
.get_mut(key)
.ok_or_else(|| crate::Error::Other(format!("Key not found: {key}")))?;
} else {
return Err(crate::Error::Other("Expected object".to_string()));
}
}
PathSegment::Index(idx) => {
if let JsonValue::Array(arr) = current {
current = arr.get_mut(*idx).ok_or_else(|| {
crate::Error::Other(format!("Index out of bounds: {idx}"))
})?;
} else {
return Err(crate::Error::Other("Expected array".to_string()));
}
}
_ => {
return Err(crate::Error::Other("Unsupported path segment".to_string()));
}
}
}
Ok(current)
}
pub fn current_state(&self) -> &JsonValue {
&self.current_state
}
pub fn is_complete(&self) -> bool {
self.is_complete
}
pub fn stats(&self) -> &ReconstructionStats {
&self.stats
}
pub fn reset(&mut self) {
self.current_state = JsonValue::Null;
self.frame_queue.clear();
self.is_complete = false;
self.stats = ReconstructionStats::default();
}
pub fn progress(&self) -> f32 {
if self.is_complete {
1.0
} else if self.stats.frames_processed == 0 {
0.0
} else {
(self.stats.frames_processed as f32
/ (self.stats.frames_processed + self.frame_queue.len() as u32) as f32)
.min(0.95) }
}
pub fn duration(&self) -> Option<std::time::Duration> {
if let (Some(start), Some(end)) = (self.stats.start_time, self.stats.end_time) {
Some(end.duration_since(start))
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub enum ProcessResult {
NoFrames,
SkeletonApplied,
PatchesApplied {
count: usize,
priority: crate::stream::Priority,
paths: Vec<JsonPath>,
},
ReconstructionComplete,
}
impl Default for JsonReconstructor {
fn default() -> Self {
Self::new()
}
}
impl ReconstructionStats {
pub fn frames_per_second(&self) -> Option<f32> {
if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
let duration = end.duration_since(start);
if duration.as_secs_f32() > 0.0 {
Some(self.frames_processed as f32 / duration.as_secs_f32())
} else {
None
}
} else {
None
}
}
pub fn patches_per_second(&self) -> Option<f32> {
if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
let duration = end.duration_since(start);
if duration.as_secs_f32() > 0.0 {
Some(self.patches_applied as f32 / duration.as_secs_f32())
} else {
None
}
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::Priority;
use serde_json::json;
#[test]
fn test_reconstructor_creation() {
let reconstructor = JsonReconstructor::new();
assert!(!reconstructor.is_complete());
assert_eq!(reconstructor.stats().frames_processed, 0);
assert_eq!(reconstructor.current_state(), &JsonValue::Null);
}
#[test]
fn test_skeleton_application() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({
"name": null,
"age": 0,
"active": false
});
let frame = PriorityStreamFrame::Skeleton {
data: skeleton.clone(),
priority: Priority::CRITICAL,
complete: false,
};
reconstructor.add_frame(frame);
let result = reconstructor.process_next_frame().unwrap();
assert!(matches!(result, ProcessResult::SkeletonApplied));
assert_eq!(reconstructor.current_state(), &skeleton);
assert_eq!(reconstructor.stats().skeleton_frames, 1);
}
#[test]
fn test_patch_application() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({
"name": null,
"age": 0
});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("name".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Set {
value: json!("John Doe"),
},
priority: Priority::HIGH,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::HIGH,
};
reconstructor.add_frame(frame);
let result = reconstructor.process_next_frame().unwrap();
if let ProcessResult::PatchesApplied { count, .. } = result {
assert_eq!(count, 1);
} else {
panic!("Expected PatchesApplied result");
}
let expected = json!({
"name": "John Doe",
"age": 0
});
assert_eq!(reconstructor.current_state(), &expected);
assert_eq!(reconstructor.stats().patches_applied, 1);
}
#[test]
fn test_array_append() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({
"items": []
});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("items".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Append {
values: vec![json!("item1"), json!("item2")],
},
priority: Priority::MEDIUM,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::MEDIUM,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
let expected = json!({
"items": ["item1", "item2"]
});
assert_eq!(reconstructor.current_state(), &expected);
}
#[test]
fn test_completion_tracking() {
let mut reconstructor = JsonReconstructor::new();
let complete_frame = PriorityStreamFrame::Complete { checksum: None };
reconstructor.add_frame(complete_frame);
let result = reconstructor.process_next_frame().unwrap();
assert!(matches!(result, ProcessResult::ReconstructionComplete));
assert!(reconstructor.is_complete());
assert!(reconstructor.stats().end_time.is_some());
}
#[test]
fn test_progress_calculation() {
let mut reconstructor = JsonReconstructor::new();
assert_eq!(reconstructor.progress(), 0.0);
let skeleton = json!({"test": null});
reconstructor.add_frame(PriorityStreamFrame::Skeleton {
data: skeleton,
priority: Priority::CRITICAL,
complete: false,
});
reconstructor.add_frame(PriorityStreamFrame::Complete { checksum: None });
reconstructor.process_next_frame().unwrap();
let progress = reconstructor.progress();
assert!(
progress > 0.0 && progress < 1.0,
"Progress was: {}",
progress
);
reconstructor.process_next_frame().unwrap();
assert_eq!(reconstructor.progress(), 1.0);
}
#[test]
fn test_reset_functionality() {
let mut reconstructor = JsonReconstructor::new();
reconstructor.current_state = json!({"test": true});
reconstructor.is_complete = true;
reconstructor.stats.frames_processed = 5;
reconstructor.reset();
assert_eq!(reconstructor.current_state(), &JsonValue::Null);
assert!(!reconstructor.is_complete());
assert_eq!(reconstructor.stats().frames_processed, 0);
}
#[test]
fn test_deep_nesting_level_3() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({
"level1": {
"level2": {
"level3": null
}
}
});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("level1".to_string()),
PathSegment::Key("level2".to_string()),
PathSegment::Key("level3".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Set {
value: json!("deep value"),
},
priority: Priority::MEDIUM,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::MEDIUM,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
assert_eq!(
reconstructor.current_state()["level1"]["level2"]["level3"],
json!("deep value")
);
}
#[test]
fn test_deep_nesting_level_5() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({
"l1": {
"l2": {
"l3": {
"l4": {
"l5": null
}
}
}
}
});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("l1".to_string()),
PathSegment::Key("l2".to_string()),
PathSegment::Key("l3".to_string()),
PathSegment::Key("l4".to_string()),
PathSegment::Key("l5".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Set {
value: json!({"final": "value"}),
},
priority: Priority::HIGH,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::HIGH,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
assert_eq!(
reconstructor.current_state()["l1"]["l2"]["l3"]["l4"]["l5"],
json!({"final": "value"})
);
}
#[test]
fn test_out_of_order_patches() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({"a": null, "b": null, "c": null});
reconstructor.current_state = skeleton;
let patch_c = JsonPatch {
path: JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("c".to_string()),
]),
operation: PatchOperation::Set {
value: json!("third"),
},
priority: Priority::LOW,
};
let patch_a = JsonPatch {
path: JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("a".to_string()),
]),
operation: PatchOperation::Set {
value: json!("first"),
},
priority: Priority::LOW,
};
let patch_b = JsonPatch {
path: JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("b".to_string()),
]),
operation: PatchOperation::Set {
value: json!("second"),
},
priority: Priority::LOW,
};
reconstructor.add_frame(PriorityStreamFrame::Patch {
patches: vec![patch_c],
priority: Priority::LOW,
});
reconstructor.add_frame(PriorityStreamFrame::Patch {
patches: vec![patch_a],
priority: Priority::LOW,
});
reconstructor.add_frame(PriorityStreamFrame::Patch {
patches: vec![patch_b],
priority: Priority::LOW,
});
reconstructor.process_all_frames().unwrap();
assert_eq!(reconstructor.current_state()["a"], json!("first"));
assert_eq!(reconstructor.current_state()["b"], json!("second"));
assert_eq!(reconstructor.current_state()["c"], json!("third"));
}
#[test]
fn test_object_field_set() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({"obj": {"field": null}});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("obj".to_string()),
PathSegment::Key("field".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Set {
value: json!("updated"),
},
priority: Priority::MEDIUM,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::MEDIUM,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
assert_eq!(
reconstructor.current_state()["obj"]["field"],
json!("updated")
);
}
#[test]
fn test_array_append_multiple() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({"items": []});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("items".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Append {
values: vec![json!(1), json!(2), json!(3)],
},
priority: Priority::MEDIUM,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::MEDIUM,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
assert_eq!(reconstructor.current_state()["items"], json!([1, 2, 3]));
}
#[test]
fn test_remove_operation() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({"keep": "this", "remove": "me"});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("remove".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Remove,
priority: Priority::MEDIUM,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::MEDIUM,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
assert_eq!(reconstructor.current_state(), &json!({"keep": "this"}));
}
#[test]
fn test_remove_nested_field() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({"outer": {"keep": "this", "remove": "me"}});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("outer".to_string()),
PathSegment::Key("remove".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Remove,
priority: Priority::MEDIUM,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::MEDIUM,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
assert_eq!(
reconstructor.current_state()["outer"],
json!({"keep": "this"})
);
}
#[test]
fn test_replace_operation() {
let mut reconstructor = JsonReconstructor::new();
let skeleton = json!({"field": "old"});
reconstructor.current_state = skeleton;
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("field".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Replace {
value: json!("new"),
},
priority: Priority::MEDIUM,
};
let frame = PriorityStreamFrame::Patch {
patches: vec![patch],
priority: Priority::MEDIUM,
};
reconstructor.add_frame(frame);
reconstructor.process_next_frame().unwrap();
assert_eq!(reconstructor.current_state()["field"], json!("new"));
}
#[test]
fn test_error_empty_path_set() {
let mut reconstructor = JsonReconstructor::new();
reconstructor.current_state = json!({"test": "data"});
let empty_path = JsonPath::from_segments(vec![]);
let patch = JsonPatch {
path: empty_path,
operation: PatchOperation::Set {
value: json!("new"),
},
priority: Priority::MEDIUM,
};
let result = reconstructor.apply_patch(patch);
assert!(result.is_err());
}
#[test]
fn test_error_invalid_parent() {
let mut reconstructor = JsonReconstructor::new();
reconstructor.current_state = json!({"value": 123});
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("value".to_string()),
PathSegment::Key("nested".to_string()),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Set {
value: json!("new"),
},
priority: Priority::MEDIUM,
};
let result = reconstructor.apply_patch(patch);
assert!(result.is_err());
}
#[test]
fn test_error_array_index_out_of_bounds() {
let mut reconstructor = JsonReconstructor::new();
reconstructor.current_state = json!({"arr": [1, 2]});
let path = JsonPath::from_segments(vec![
PathSegment::Root,
PathSegment::Key("arr".to_string()),
PathSegment::Index(5),
]);
let patch = JsonPatch {
path,
operation: PatchOperation::Set {
value: json!("value"),
},
priority: Priority::MEDIUM,
};
let result = reconstructor.apply_patch(patch);
assert!(result.is_err());
}
#[test]
fn test_process_result_no_frames() {
let mut reconstructor = JsonReconstructor::new();
let result = reconstructor.process_next_frame().unwrap();
assert!(matches!(result, ProcessResult::NoFrames));
}
#[test]
fn test_duration_tracking() {
let mut reconstructor = JsonReconstructor::new();
assert!(reconstructor.duration().is_none());
let skeleton = json!({"test": "data"});
reconstructor.add_frame(PriorityStreamFrame::Skeleton {
data: skeleton,
priority: Priority::CRITICAL,
complete: false,
});
reconstructor.process_next_frame().unwrap();
reconstructor.add_frame(PriorityStreamFrame::Complete { checksum: None });
reconstructor.process_next_frame().unwrap();
assert!(reconstructor.duration().is_some());
}
}