1use crate::Result;
10use crate::domain::value_objects::Priority;
11use serde_json::{Map as JsonMap, Value as JsonValue};
12use std::collections::VecDeque;
13
14#[derive(Debug, Clone, PartialEq, serde::Serialize)]
16pub struct JsonPath {
17 segments: Vec<PathSegment>,
18}
19
20#[derive(Debug, Clone, PartialEq, serde::Serialize)]
21pub enum PathSegment {
22 Root,
23 Key(String),
24 Index(usize),
25 Wildcard,
26}
27
28#[derive(Debug, Clone, serde::Serialize)]
30pub struct JsonPatch {
31 pub path: JsonPath,
32 pub operation: PatchOperation,
33 pub priority: Priority,
34}
35
36#[derive(Debug, Clone, serde::Serialize)]
37pub enum PatchOperation {
38 Set { value: JsonValue },
39 Append { values: Vec<JsonValue> },
40 Replace { value: JsonValue },
41 Remove,
42}
43
44#[derive(Debug, Clone, serde::Serialize)]
46pub enum PriorityStreamFrame {
47 Skeleton {
48 data: JsonValue,
49 priority: Priority,
50 complete: bool,
51 },
52 Patch {
53 patches: Vec<JsonPatch>,
54 priority: Priority,
55 },
56 Complete {
57 checksum: Option<u64>,
58 },
59}
60
61pub struct PriorityStreamer {
63 config: StreamerConfig,
64}
65
66#[derive(Debug, Clone)]
67pub struct StreamerConfig {
68 pub detect_semantics: bool,
69 pub max_patch_size: usize,
70 pub priority_threshold: Priority,
71}
72
73impl Default for StreamerConfig {
74 fn default() -> Self {
75 Self {
76 detect_semantics: true,
77 max_patch_size: 100,
78 priority_threshold: Priority::LOW,
79 }
80 }
81}
82
83impl PriorityStreamer {
84 pub fn new() -> Self {
86 Self::with_config(StreamerConfig::default())
87 }
88
89 pub fn with_config(config: StreamerConfig) -> Self {
91 Self { config }
92 }
93
94 pub fn analyze(&self, json: &JsonValue) -> Result<StreamingPlan> {
96 let mut plan = StreamingPlan::new();
97
98 let skeleton = self.generate_skeleton(json);
100 plan.frames.push_back(PriorityStreamFrame::Skeleton {
101 data: skeleton,
102 priority: Priority::CRITICAL,
103 complete: false,
104 });
105
106 let mut patches = Vec::new();
108 self.extract_patches(json, &JsonPath::root(), &mut patches)?;
109
110 patches.sort_by(|a, b| b.priority.cmp(&a.priority));
112
113 let mut current_priority = Priority::CRITICAL;
114 let mut current_batch = Vec::new();
115
116 for patch in patches {
117 if patch.priority != current_priority && !current_batch.is_empty() {
118 plan.frames.push_back(PriorityStreamFrame::Patch {
119 patches: current_batch,
120 priority: current_priority,
121 });
122 current_batch = Vec::new();
123 }
124 current_priority = patch.priority;
125 current_batch.push(patch);
126
127 if current_batch.len() >= self.config.max_patch_size {
128 plan.frames.push_back(PriorityStreamFrame::Patch {
129 patches: current_batch,
130 priority: current_priority,
131 });
132 current_batch = Vec::new();
133 }
134 }
135
136 if !current_batch.is_empty() {
138 plan.frames.push_back(PriorityStreamFrame::Patch {
139 patches: current_batch,
140 priority: current_priority,
141 });
142 }
143
144 plan.frames
146 .push_back(PriorityStreamFrame::Complete { checksum: None });
147
148 Ok(plan)
149 }
150
151 fn generate_skeleton(&self, json: &JsonValue) -> JsonValue {
153 match json {
154 JsonValue::Object(map) => {
155 let mut skeleton = JsonMap::new();
156 for (key, value) in map {
157 skeleton.insert(
158 key.clone(),
159 match value {
160 JsonValue::Array(_) => JsonValue::Array(vec![]),
161 JsonValue::Object(_) => self.generate_skeleton(value),
162 JsonValue::String(_) => JsonValue::Null,
163 JsonValue::Number(_) => JsonValue::Number(0.into()),
164 JsonValue::Bool(_) => JsonValue::Bool(false),
165 JsonValue::Null => JsonValue::Null,
166 },
167 );
168 }
169 JsonValue::Object(skeleton)
170 }
171 JsonValue::Array(_) => JsonValue::Array(vec![]),
172 _ => JsonValue::Null,
173 }
174 }
175
176 fn extract_patches(
178 &self,
179 json: &JsonValue,
180 current_path: &JsonPath,
181 patches: &mut Vec<JsonPatch>,
182 ) -> Result<()> {
183 match json {
184 JsonValue::Object(map) => {
185 for (key, value) in map {
186 let field_path = current_path.append_key(key);
187 let priority = self.calculate_field_priority(&field_path, key, value);
188
189 patches.push(JsonPatch {
191 path: field_path.clone(),
192 operation: PatchOperation::Set {
193 value: value.clone(),
194 },
195 priority,
196 });
197
198 self.extract_patches(value, &field_path, patches)?;
200 }
201 }
202 JsonValue::Array(arr) => {
203 if arr.len() > 10 {
205 for chunk in arr.chunks(self.config.max_patch_size) {
207 patches.push(JsonPatch {
208 path: current_path.clone(),
209 operation: PatchOperation::Append {
210 values: chunk.to_vec(),
211 },
212 priority: self.calculate_array_priority(current_path, chunk),
213 });
214 }
215 } else if !arr.is_empty() {
216 patches.push(JsonPatch {
217 path: current_path.clone(),
218 operation: PatchOperation::Append {
219 values: arr.clone(),
220 },
221 priority: self.calculate_array_priority(current_path, arr),
222 });
223 }
224 }
225 _ => {
226 }
228 }
229
230 Ok(())
231 }
232
233 fn calculate_field_priority(&self, _path: &JsonPath, key: &str, value: &JsonValue) -> Priority {
235 if matches!(key, "id" | "uuid" | "status" | "type" | "kind") {
237 return Priority::CRITICAL;
238 }
239
240 if matches!(key, "name" | "title" | "label" | "email" | "username") {
242 return Priority::HIGH;
243 }
244
245 if key.contains("analytics") || key.contains("stats") || key.contains("meta") {
247 return Priority::LOW;
248 }
249
250 if matches!(key, "reviews" | "comments" | "logs" | "history") {
251 return Priority::BACKGROUND;
252 }
253
254 match value {
256 JsonValue::Array(arr) if arr.len() > 100 => Priority::BACKGROUND,
257 JsonValue::Object(obj) if obj.contains_key("timestamp") => Priority::MEDIUM,
258 JsonValue::String(s) if s.len() > 1000 => Priority::LOW,
259 _ => Priority::MEDIUM,
260 }
261 }
262
263 fn calculate_array_priority(&self, path: &JsonPath, elements: &[JsonValue]) -> Priority {
265 if elements.len() > 50 {
267 return Priority::BACKGROUND;
268 }
269
270 if let Some(last_key) = path.last_key() {
272 if matches!(last_key.as_str(), "reviews" | "comments" | "logs") {
273 return Priority::BACKGROUND;
274 }
275 if matches!(last_key.as_str(), "items" | "data" | "results") {
276 return Priority::MEDIUM;
277 }
278 }
279
280 Priority::MEDIUM
281 }
282}
283
284#[derive(Debug)]
286pub struct StreamingPlan {
287 pub frames: VecDeque<PriorityStreamFrame>,
288}
289
290impl Default for StreamingPlan {
291 fn default() -> Self {
292 Self::new()
293 }
294}
295
296impl StreamingPlan {
297 pub fn new() -> Self {
298 Self {
299 frames: VecDeque::new(),
300 }
301 }
302
303 pub fn next_frame(&mut self) -> Option<PriorityStreamFrame> {
305 self.frames.pop_front()
306 }
307
308 pub fn is_complete(&self) -> bool {
310 self.frames.is_empty()
311 }
312
313 pub fn remaining_frames(&self) -> usize {
315 self.frames.len()
316 }
317
318 pub fn frames(&self) -> impl Iterator<Item = &PriorityStreamFrame> {
320 self.frames.iter()
321 }
322}
323
324impl JsonPath {
325 pub fn root() -> Self {
327 let segments = vec![PathSegment::Root];
328 Self { segments }
329 }
330
331 pub fn append_key(&self, key: &str) -> Self {
333 let mut segments = self.segments.clone();
334 segments.push(PathSegment::Key(key.to_string()));
335 Self { segments }
336 }
337
338 pub fn append_index(&self, index: usize) -> Self {
340 let mut segments = self.segments.clone();
341 segments.push(PathSegment::Index(index));
342 Self { segments }
343 }
344
345 pub fn last_key(&self) -> Option<String> {
347 self.segments.iter().rev().find_map(|segment| {
348 if let PathSegment::Key(key) = segment {
349 Some(key.clone())
350 } else {
351 None
352 }
353 })
354 }
355
356 pub fn segments(&self) -> &[PathSegment] {
358 &self.segments
359 }
360
361 pub fn len(&self) -> usize {
363 self.segments.len()
364 }
365
366 pub fn is_empty(&self) -> bool {
368 self.segments.is_empty()
369 }
370
371 pub fn from_segments(segments: Vec<PathSegment>) -> Self {
373 Self { segments }
374 }
375
376 pub fn to_json_pointer(&self) -> String {
378 let mut pointer = String::new();
379 for segment in &self.segments {
380 match segment {
381 PathSegment::Root => {}
382 PathSegment::Key(key) => {
383 pointer.push('/');
384 pointer.push_str(key);
385 }
386 PathSegment::Index(idx) => {
387 pointer.push('/');
388 pointer.push_str(&idx.to_string());
389 }
390 PathSegment::Wildcard => {
391 pointer.push_str("/*");
392 }
393 }
394 }
395 if pointer.is_empty() {
396 "/".to_string()
397 } else {
398 pointer
399 }
400 }
401}
402
403impl Default for PriorityStreamer {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use serde_json::json;
413
414 #[test]
415 fn test_json_path_creation() {
416 let path = JsonPath::root();
417 assert_eq!(path.to_json_pointer(), "/");
418
419 let path = path.append_key("users").append_index(0).append_key("name");
420 assert_eq!(path.to_json_pointer(), "/users/0/name");
421 }
422
423 #[test]
424 fn test_priority_comparison() {
425 assert!(Priority::CRITICAL > Priority::HIGH);
426 assert!(Priority::HIGH > Priority::MEDIUM);
427 assert!(Priority::MEDIUM > Priority::LOW);
428 assert!(Priority::LOW > Priority::BACKGROUND);
429 }
430
431 #[test]
432 fn test_skeleton_generation() {
433 let streamer = PriorityStreamer::new();
434 let json = json!({
435 "name": "John",
436 "age": 30,
437 "active": true,
438 "posts": ["post1", "post2"]
439 });
440
441 let skeleton = streamer.generate_skeleton(&json);
442 let expected = json!({
443 "name": null,
444 "age": 0,
445 "active": false,
446 "posts": []
447 });
448
449 assert_eq!(skeleton, expected);
450 }
451
452 #[test]
453 fn test_field_priority_calculation() {
454 let streamer = PriorityStreamer::new();
455 let path = JsonPath::root();
456
457 assert_eq!(
458 streamer.calculate_field_priority(&path, "id", &json!(123)),
459 Priority::CRITICAL
460 );
461
462 assert_eq!(
463 streamer.calculate_field_priority(&path, "name", &json!("John")),
464 Priority::HIGH
465 );
466
467 assert_eq!(
468 streamer.calculate_field_priority(&path, "reviews", &json!([])),
469 Priority::BACKGROUND
470 );
471 }
472
473 #[test]
474 fn test_streaming_plan_creation() {
475 let streamer = PriorityStreamer::new();
476 let json = json!({
477 "id": 1,
478 "name": "John",
479 "bio": "Software developer",
480 "reviews": ["Good", "Excellent"]
481 });
482
483 let plan = streamer.analyze(&json).unwrap();
484 assert!(!plan.is_complete());
485 assert!(plan.remaining_frames() > 0);
486 }
487}