use crate::Result;
use crate::domain::value_objects::Priority;
use serde_json::{Map as JsonMap, Value as JsonValue};
use std::collections::VecDeque;
mod serde_priority {
use crate::domain::value_objects::Priority;
use serde::{Serialize, Serializer};
pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
priority.value().serialize(serializer)
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
pub struct JsonPath {
segments: Vec<PathSegment>,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
pub enum PathSegment {
Root,
Key(String),
Index(usize),
Wildcard,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct JsonPatch {
pub path: JsonPath,
pub operation: PatchOperation,
#[serde(with = "serde_priority")]
pub priority: Priority,
}
#[derive(Debug, Clone, serde::Serialize)]
pub enum PatchOperation {
Set { value: JsonValue },
Append { values: Vec<JsonValue> },
Replace { value: JsonValue },
Remove,
}
#[derive(Debug, Clone, serde::Serialize)]
pub enum PriorityStreamFrame {
Skeleton {
data: JsonValue,
#[serde(with = "serde_priority")]
priority: Priority,
complete: bool,
},
Patch {
patches: Vec<JsonPatch>,
#[serde(with = "serde_priority")]
priority: Priority,
},
Complete {
checksum: Option<u64>,
},
}
pub struct PriorityStreamer {
config: StreamerConfig,
}
#[derive(Debug, Clone)]
pub struct StreamerConfig {
pub detect_semantics: bool,
pub max_patch_size: usize,
pub priority_threshold: Priority,
}
impl Default for StreamerConfig {
fn default() -> Self {
Self {
detect_semantics: true,
max_patch_size: 100,
priority_threshold: Priority::LOW,
}
}
}
impl PriorityStreamer {
pub fn new() -> Self {
Self::with_config(StreamerConfig::default())
}
pub fn with_config(config: StreamerConfig) -> Self {
Self { config }
}
pub fn analyze(&self, json: &JsonValue) -> Result<StreamingPlan> {
let mut plan = StreamingPlan::new();
let skeleton = self.generate_skeleton(json);
plan.frames.push_back(PriorityStreamFrame::Skeleton {
data: skeleton,
priority: Priority::CRITICAL,
complete: false,
});
let mut patches = Vec::new();
self.extract_patches(json, &JsonPath::root(), &mut patches)?;
patches.sort_by_key(|patch| std::cmp::Reverse(patch.priority));
let mut current_priority = Priority::CRITICAL;
let mut current_batch = Vec::new();
for patch in patches {
if patch.priority != current_priority && !current_batch.is_empty() {
plan.frames.push_back(PriorityStreamFrame::Patch {
patches: current_batch,
priority: current_priority,
});
current_batch = Vec::new();
}
current_priority = patch.priority;
current_batch.push(patch);
if current_batch.len() >= self.config.max_patch_size {
plan.frames.push_back(PriorityStreamFrame::Patch {
patches: current_batch,
priority: current_priority,
});
current_batch = Vec::new();
}
}
if !current_batch.is_empty() {
plan.frames.push_back(PriorityStreamFrame::Patch {
patches: current_batch,
priority: current_priority,
});
}
plan.frames
.push_back(PriorityStreamFrame::Complete { checksum: None });
Ok(plan)
}
fn generate_skeleton(&self, json: &JsonValue) -> JsonValue {
match json {
JsonValue::Object(map) => {
let mut skeleton = JsonMap::new();
for (key, value) in map {
skeleton.insert(
key.clone(),
match value {
JsonValue::Array(_) => JsonValue::Array(vec![]),
JsonValue::Object(_) => self.generate_skeleton(value),
JsonValue::String(_) => JsonValue::Null,
JsonValue::Number(_) => JsonValue::Number(0.into()),
JsonValue::Bool(_) => JsonValue::Bool(false),
JsonValue::Null => JsonValue::Null,
},
);
}
JsonValue::Object(skeleton)
}
JsonValue::Array(_) => JsonValue::Array(vec![]),
_ => JsonValue::Null,
}
}
fn extract_patches(
&self,
json: &JsonValue,
current_path: &JsonPath,
patches: &mut Vec<JsonPatch>,
) -> Result<()> {
match json {
JsonValue::Object(map) => {
for (key, value) in map {
let field_path = current_path.append_key(key);
let priority = self.calculate_field_priority(&field_path, key, value);
patches.push(JsonPatch {
path: field_path.clone(),
operation: PatchOperation::Set {
value: value.clone(),
},
priority,
});
self.extract_patches(value, &field_path, patches)?;
}
}
JsonValue::Array(arr) => {
if arr.len() > 10 {
for chunk in arr.chunks(self.config.max_patch_size) {
patches.push(JsonPatch {
path: current_path.clone(),
operation: PatchOperation::Append {
values: chunk.to_vec(),
},
priority: self.calculate_array_priority(current_path, chunk),
});
}
} else if !arr.is_empty() {
patches.push(JsonPatch {
path: current_path.clone(),
operation: PatchOperation::Append {
values: arr.clone(),
},
priority: self.calculate_array_priority(current_path, arr),
});
}
}
_ => {
}
}
Ok(())
}
fn calculate_field_priority(&self, _path: &JsonPath, key: &str, value: &JsonValue) -> Priority {
if matches!(key, "id" | "uuid" | "status" | "type" | "kind") {
return Priority::CRITICAL;
}
if matches!(key, "name" | "title" | "label" | "email" | "username") {
return Priority::HIGH;
}
if key.contains("analytics") || key.contains("stats") || key.contains("meta") {
return Priority::LOW;
}
if matches!(key, "reviews" | "comments" | "logs" | "history") {
return Priority::BACKGROUND;
}
match value {
JsonValue::Array(arr) if arr.len() > 100 => Priority::BACKGROUND,
JsonValue::Object(obj) if obj.contains_key("timestamp") => Priority::MEDIUM,
JsonValue::String(s) if s.len() > 1000 => Priority::LOW,
_ => Priority::MEDIUM,
}
}
fn calculate_array_priority(&self, path: &JsonPath, elements: &[JsonValue]) -> Priority {
if elements.len() > 50 {
return Priority::BACKGROUND;
}
if let Some(last_key) = path.last_key() {
if matches!(last_key.as_str(), "reviews" | "comments" | "logs") {
return Priority::BACKGROUND;
}
if matches!(last_key.as_str(), "items" | "data" | "results") {
return Priority::MEDIUM;
}
}
Priority::MEDIUM
}
}
#[derive(Debug)]
pub struct StreamingPlan {
pub frames: VecDeque<PriorityStreamFrame>,
}
impl Default for StreamingPlan {
fn default() -> Self {
Self::new()
}
}
impl StreamingPlan {
pub fn new() -> Self {
Self {
frames: VecDeque::new(),
}
}
pub fn next_frame(&mut self) -> Option<PriorityStreamFrame> {
self.frames.pop_front()
}
pub fn is_complete(&self) -> bool {
self.frames.is_empty()
}
pub fn remaining_frames(&self) -> usize {
self.frames.len()
}
pub fn frames(&self) -> impl Iterator<Item = &PriorityStreamFrame> {
self.frames.iter()
}
}
impl JsonPath {
pub fn root() -> Self {
let segments = vec![PathSegment::Root];
Self { segments }
}
pub fn append_key(&self, key: &str) -> Self {
let mut segments = self.segments.clone();
segments.push(PathSegment::Key(key.to_string()));
Self { segments }
}
pub fn append_index(&self, index: usize) -> Self {
let mut segments = self.segments.clone();
segments.push(PathSegment::Index(index));
Self { segments }
}
pub fn last_key(&self) -> Option<String> {
self.segments.iter().rev().find_map(|segment| {
if let PathSegment::Key(key) = segment {
Some(key.clone())
} else {
None
}
})
}
pub fn segments(&self) -> &[PathSegment] {
&self.segments
}
pub fn len(&self) -> usize {
self.segments.len()
}
pub fn is_empty(&self) -> bool {
self.segments.is_empty()
}
pub fn from_segments(segments: Vec<PathSegment>) -> Self {
Self { segments }
}
pub fn to_json_pointer(&self) -> String {
let mut pointer = String::new();
for segment in &self.segments {
match segment {
PathSegment::Root => {}
PathSegment::Key(key) => {
pointer.push('/');
pointer.push_str(key);
}
PathSegment::Index(idx) => {
pointer.push('/');
pointer.push_str(&idx.to_string());
}
PathSegment::Wildcard => {
pointer.push_str("/*");
}
}
}
if pointer.is_empty() {
"/".to_string()
} else {
pointer
}
}
}
impl Default for PriorityStreamer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_json_path_creation() {
let path = JsonPath::root();
assert_eq!(path.to_json_pointer(), "/");
let path = path.append_key("users").append_index(0).append_key("name");
assert_eq!(path.to_json_pointer(), "/users/0/name");
}
#[test]
fn test_priority_comparison() {
assert!(Priority::CRITICAL > Priority::HIGH);
assert!(Priority::HIGH > Priority::MEDIUM);
assert!(Priority::MEDIUM > Priority::LOW);
assert!(Priority::LOW > Priority::BACKGROUND);
}
#[test]
fn test_skeleton_generation() {
let streamer = PriorityStreamer::new();
let json = json!({
"name": "John",
"age": 30,
"active": true,
"posts": ["post1", "post2"]
});
let skeleton = streamer.generate_skeleton(&json);
let expected = json!({
"name": null,
"age": 0,
"active": false,
"posts": []
});
assert_eq!(skeleton, expected);
}
#[test]
fn test_field_priority_calculation() {
let streamer = PriorityStreamer::new();
let path = JsonPath::root();
assert_eq!(
streamer.calculate_field_priority(&path, "id", &json!(123)),
Priority::CRITICAL
);
assert_eq!(
streamer.calculate_field_priority(&path, "name", &json!("John")),
Priority::HIGH
);
assert_eq!(
streamer.calculate_field_priority(&path, "reviews", &json!([])),
Priority::BACKGROUND
);
}
#[test]
fn test_streaming_plan_creation() {
let streamer = PriorityStreamer::new();
let json = json!({
"id": 1,
"name": "John",
"bio": "Software developer",
"reviews": ["Good", "Excellent"]
});
let plan = streamer.analyze(&json).unwrap();
assert!(!plan.is_complete());
assert!(plan.remaining_frames() > 0);
}
}