1use crate::Result;
10use crate::domain::value_objects::Priority;
11use serde_json::{Map as JsonMap, Value as JsonValue};
12use std::collections::VecDeque;
13
14mod serde_priority {
16 use crate::domain::value_objects::Priority;
17 use serde::{Serialize, Serializer};
18
19 pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20 where
21 S: Serializer,
22 {
23 priority.value().serialize(serializer)
24 }
25}
26
27#[derive(Debug, Clone, PartialEq, serde::Serialize)]
29pub struct JsonPath {
30 segments: Vec<PathSegment>,
31}
32
33#[derive(Debug, Clone, PartialEq, serde::Serialize)]
35pub enum PathSegment {
36 Root,
38 Key(String),
40 Index(usize),
42 Wildcard,
44}
45
46#[derive(Debug, Clone, serde::Serialize)]
48pub struct JsonPatch {
49 pub path: JsonPath,
51 pub operation: PatchOperation,
53 #[serde(with = "serde_priority")]
55 pub priority: Priority,
56}
57
58#[derive(Debug, Clone, serde::Serialize)]
60pub enum PatchOperation {
61 Set {
63 value: JsonValue,
65 },
66 Append {
68 values: Vec<JsonValue>,
70 },
71 Replace {
73 value: JsonValue,
75 },
76 Remove,
78}
79
80#[derive(Debug, Clone, serde::Serialize)]
82pub enum PriorityStreamFrame {
83 Skeleton {
85 data: JsonValue,
87 #[serde(with = "serde_priority")]
89 priority: Priority,
90 complete: bool,
92 },
93 Patch {
95 patches: Vec<JsonPatch>,
97 #[serde(with = "serde_priority")]
99 priority: Priority,
100 },
101 Complete {
103 checksum: Option<u64>,
105 },
106}
107
108pub struct PriorityStreamer {
110 config: StreamerConfig,
111}
112
113#[derive(Debug, Clone)]
115pub struct StreamerConfig {
116 pub detect_semantics: bool,
118 pub max_patch_size: usize,
120 pub priority_threshold: Priority,
122}
123
124impl Default for StreamerConfig {
125 fn default() -> Self {
126 Self {
127 detect_semantics: true,
128 max_patch_size: 100,
129 priority_threshold: Priority::LOW,
130 }
131 }
132}
133
134impl PriorityStreamer {
135 pub fn new() -> Self {
137 Self::with_config(StreamerConfig::default())
138 }
139
140 pub fn with_config(config: StreamerConfig) -> Self {
142 Self { config }
143 }
144
145 pub fn analyze(&self, json: &JsonValue) -> Result<StreamingPlan> {
147 let mut plan = StreamingPlan::new();
148
149 let skeleton = self.generate_skeleton(json);
151 plan.frames.push_back(PriorityStreamFrame::Skeleton {
152 data: skeleton,
153 priority: Priority::CRITICAL,
154 complete: false,
155 });
156
157 let mut patches = Vec::new();
159 self.extract_patches(json, &JsonPath::root(), &mut patches)?;
160
161 patches.sort_by_key(|patch| std::cmp::Reverse(patch.priority));
163
164 let mut current_priority = Priority::CRITICAL;
165 let mut current_batch = Vec::new();
166
167 for patch in patches {
168 if patch.priority != current_priority && !current_batch.is_empty() {
169 plan.frames.push_back(PriorityStreamFrame::Patch {
170 patches: current_batch,
171 priority: current_priority,
172 });
173 current_batch = Vec::new();
174 }
175 current_priority = patch.priority;
176 current_batch.push(patch);
177
178 if current_batch.len() >= self.config.max_patch_size {
179 plan.frames.push_back(PriorityStreamFrame::Patch {
180 patches: current_batch,
181 priority: current_priority,
182 });
183 current_batch = Vec::new();
184 }
185 }
186
187 if !current_batch.is_empty() {
189 plan.frames.push_back(PriorityStreamFrame::Patch {
190 patches: current_batch,
191 priority: current_priority,
192 });
193 }
194
195 plan.frames
197 .push_back(PriorityStreamFrame::Complete { checksum: None });
198
199 Ok(plan)
200 }
201
202 fn generate_skeleton(&self, json: &JsonValue) -> JsonValue {
204 match json {
205 JsonValue::Object(map) => {
206 let mut skeleton = JsonMap::new();
207 for (key, value) in map {
208 skeleton.insert(
209 key.clone(),
210 match value {
211 JsonValue::Array(_) => JsonValue::Array(vec![]),
212 JsonValue::Object(_) => self.generate_skeleton(value),
213 JsonValue::String(_) => JsonValue::Null,
214 JsonValue::Number(_) => JsonValue::Number(0.into()),
215 JsonValue::Bool(_) => JsonValue::Bool(false),
216 JsonValue::Null => JsonValue::Null,
217 },
218 );
219 }
220 JsonValue::Object(skeleton)
221 }
222 JsonValue::Array(_) => JsonValue::Array(vec![]),
223 _ => JsonValue::Null,
224 }
225 }
226
227 fn extract_patches(
229 &self,
230 json: &JsonValue,
231 current_path: &JsonPath,
232 patches: &mut Vec<JsonPatch>,
233 ) -> Result<()> {
234 match json {
235 JsonValue::Object(map) => {
236 for (key, value) in map {
237 let field_path = current_path.append_key(key);
238 let priority = self.calculate_field_priority(&field_path, key, value);
239
240 patches.push(JsonPatch {
242 path: field_path.clone(),
243 operation: PatchOperation::Set {
244 value: value.clone(),
245 },
246 priority,
247 });
248
249 self.extract_patches(value, &field_path, patches)?;
251 }
252 }
253 JsonValue::Array(arr) => {
254 if arr.len() > 10 {
256 for chunk in arr.chunks(self.config.max_patch_size) {
258 patches.push(JsonPatch {
259 path: current_path.clone(),
260 operation: PatchOperation::Append {
261 values: chunk.to_vec(),
262 },
263 priority: self.calculate_array_priority(current_path, chunk),
264 });
265 }
266 } else if !arr.is_empty() {
267 patches.push(JsonPatch {
268 path: current_path.clone(),
269 operation: PatchOperation::Append {
270 values: arr.clone(),
271 },
272 priority: self.calculate_array_priority(current_path, arr),
273 });
274 }
275 }
276 _ => {
277 }
279 }
280
281 Ok(())
282 }
283
284 fn calculate_field_priority(&self, _path: &JsonPath, key: &str, value: &JsonValue) -> Priority {
286 if matches!(key, "id" | "uuid" | "status" | "type" | "kind") {
288 return Priority::CRITICAL;
289 }
290
291 if matches!(key, "name" | "title" | "label" | "email" | "username") {
293 return Priority::HIGH;
294 }
295
296 if key.contains("analytics") || key.contains("stats") || key.contains("meta") {
298 return Priority::LOW;
299 }
300
301 if matches!(key, "reviews" | "comments" | "logs" | "history") {
302 return Priority::BACKGROUND;
303 }
304
305 match value {
307 JsonValue::Array(arr) if arr.len() > 100 => Priority::BACKGROUND,
308 JsonValue::Object(obj) if obj.contains_key("timestamp") => Priority::MEDIUM,
309 JsonValue::String(s) if s.len() > 1000 => Priority::LOW,
310 _ => Priority::MEDIUM,
311 }
312 }
313
314 fn calculate_array_priority(&self, path: &JsonPath, elements: &[JsonValue]) -> Priority {
316 if elements.len() > 50 {
318 return Priority::BACKGROUND;
319 }
320
321 if let Some(last_key) = path.last_key() {
323 if matches!(last_key.as_str(), "reviews" | "comments" | "logs") {
324 return Priority::BACKGROUND;
325 }
326 if matches!(last_key.as_str(), "items" | "data" | "results") {
327 return Priority::MEDIUM;
328 }
329 }
330
331 Priority::MEDIUM
332 }
333}
334
335#[derive(Debug)]
337pub struct StreamingPlan {
338 pub frames: VecDeque<PriorityStreamFrame>,
340}
341
342impl Default for StreamingPlan {
343 fn default() -> Self {
344 Self::new()
345 }
346}
347
348impl StreamingPlan {
349 pub fn new() -> Self {
351 Self {
352 frames: VecDeque::new(),
353 }
354 }
355
356 pub fn next_frame(&mut self) -> Option<PriorityStreamFrame> {
358 self.frames.pop_front()
359 }
360
361 pub fn is_complete(&self) -> bool {
363 self.frames.is_empty()
364 }
365
366 pub fn remaining_frames(&self) -> usize {
368 self.frames.len()
369 }
370
371 pub fn frames(&self) -> impl Iterator<Item = &PriorityStreamFrame> {
373 self.frames.iter()
374 }
375}
376
377impl JsonPath {
378 pub fn root() -> Self {
380 let segments = vec![PathSegment::Root];
381 Self { segments }
382 }
383
384 pub fn append_key(&self, key: &str) -> Self {
386 let mut segments = self.segments.clone();
387 segments.push(PathSegment::Key(key.to_string()));
388 Self { segments }
389 }
390
391 pub fn append_index(&self, index: usize) -> Self {
393 let mut segments = self.segments.clone();
394 segments.push(PathSegment::Index(index));
395 Self { segments }
396 }
397
398 pub fn last_key(&self) -> Option<String> {
400 self.segments.iter().rev().find_map(|segment| {
401 if let PathSegment::Key(key) = segment {
402 Some(key.clone())
403 } else {
404 None
405 }
406 })
407 }
408
409 pub fn segments(&self) -> &[PathSegment] {
411 &self.segments
412 }
413
414 pub fn len(&self) -> usize {
416 self.segments.len()
417 }
418
419 pub fn is_empty(&self) -> bool {
421 self.segments.is_empty()
422 }
423
424 pub fn from_segments(segments: Vec<PathSegment>) -> Self {
426 Self { segments }
427 }
428
429 pub fn to_json_pointer(&self) -> String {
431 let mut pointer = String::new();
432 for segment in &self.segments {
433 match segment {
434 PathSegment::Root => {}
435 PathSegment::Key(key) => {
436 pointer.push('/');
437 pointer.push_str(key);
438 }
439 PathSegment::Index(idx) => {
440 pointer.push('/');
441 pointer.push_str(&idx.to_string());
442 }
443 PathSegment::Wildcard => {
444 pointer.push_str("/*");
445 }
446 }
447 }
448 if pointer.is_empty() {
449 "/".to_string()
450 } else {
451 pointer
452 }
453 }
454}
455
456impl Default for PriorityStreamer {
457 fn default() -> Self {
458 Self::new()
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use serde_json::json;
466
467 #[test]
468 fn test_json_path_creation() {
469 let path = JsonPath::root();
470 assert_eq!(path.to_json_pointer(), "/");
471
472 let path = path.append_key("users").append_index(0).append_key("name");
473 assert_eq!(path.to_json_pointer(), "/users/0/name");
474 }
475
476 #[test]
477 fn test_priority_comparison() {
478 assert!(Priority::CRITICAL > Priority::HIGH);
479 assert!(Priority::HIGH > Priority::MEDIUM);
480 assert!(Priority::MEDIUM > Priority::LOW);
481 assert!(Priority::LOW > Priority::BACKGROUND);
482 }
483
484 #[test]
485 fn test_skeleton_generation() {
486 let streamer = PriorityStreamer::new();
487 let json = json!({
488 "name": "John",
489 "age": 30,
490 "active": true,
491 "posts": ["post1", "post2"]
492 });
493
494 let skeleton = streamer.generate_skeleton(&json);
495 let expected = json!({
496 "name": null,
497 "age": 0,
498 "active": false,
499 "posts": []
500 });
501
502 assert_eq!(skeleton, expected);
503 }
504
505 #[test]
506 fn test_field_priority_calculation() {
507 let streamer = PriorityStreamer::new();
508 let path = JsonPath::root();
509
510 assert_eq!(
511 streamer.calculate_field_priority(&path, "id", &json!(123)),
512 Priority::CRITICAL
513 );
514
515 assert_eq!(
516 streamer.calculate_field_priority(&path, "name", &json!("John")),
517 Priority::HIGH
518 );
519
520 assert_eq!(
521 streamer.calculate_field_priority(&path, "reviews", &json!([])),
522 Priority::BACKGROUND
523 );
524 }
525
526 #[test]
527 fn test_streaming_plan_creation() {
528 let streamer = PriorityStreamer::new();
529 let json = json!({
530 "id": 1,
531 "name": "John",
532 "bio": "Software developer",
533 "reviews": ["Good", "Excellent"]
534 });
535
536 let plan = streamer.analyze(&json).unwrap();
537 assert!(!plan.is_complete());
538 assert!(plan.remaining_frames() > 0);
539 }
540}