1use crate::Result;
10use crate::domain::value_objects::Priority;
11use serde_json::{Map as JsonMap, Value as JsonValue};
12use std::collections::VecDeque;
13
14mod serde_priority {
16 use crate::domain::value_objects::Priority;
17 use serde::{Serialize, Serializer};
18
19 pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
20 where
21 S: Serializer,
22 {
23 priority.value().serialize(serializer)
24 }
25}
26
27#[derive(Debug, Clone, PartialEq, serde::Serialize)]
29pub struct JsonPath {
30 segments: Vec<PathSegment>,
31}
32
33#[derive(Debug, Clone, PartialEq, serde::Serialize)]
34pub enum PathSegment {
35 Root,
36 Key(String),
37 Index(usize),
38 Wildcard,
39}
40
41#[derive(Debug, Clone, serde::Serialize)]
43pub struct JsonPatch {
44 pub path: JsonPath,
45 pub operation: PatchOperation,
46 #[serde(with = "serde_priority")]
47 pub priority: Priority,
48}
49
50#[derive(Debug, Clone, serde::Serialize)]
51pub enum PatchOperation {
52 Set { value: JsonValue },
53 Append { values: Vec<JsonValue> },
54 Replace { value: JsonValue },
55 Remove,
56}
57
58#[derive(Debug, Clone, serde::Serialize)]
60pub enum PriorityStreamFrame {
61 Skeleton {
62 data: JsonValue,
63 #[serde(with = "serde_priority")]
64 priority: Priority,
65 complete: bool,
66 },
67 Patch {
68 patches: Vec<JsonPatch>,
69 #[serde(with = "serde_priority")]
70 priority: Priority,
71 },
72 Complete {
73 checksum: Option<u64>,
74 },
75}
76
77pub struct PriorityStreamer {
79 config: StreamerConfig,
80}
81
82#[derive(Debug, Clone)]
83pub struct StreamerConfig {
84 pub detect_semantics: bool,
85 pub max_patch_size: usize,
86 pub priority_threshold: Priority,
87}
88
89impl Default for StreamerConfig {
90 fn default() -> Self {
91 Self {
92 detect_semantics: true,
93 max_patch_size: 100,
94 priority_threshold: Priority::LOW,
95 }
96 }
97}
98
99impl PriorityStreamer {
100 pub fn new() -> Self {
102 Self::with_config(StreamerConfig::default())
103 }
104
105 pub fn with_config(config: StreamerConfig) -> Self {
107 Self { config }
108 }
109
110 pub fn analyze(&self, json: &JsonValue) -> Result<StreamingPlan> {
112 let mut plan = StreamingPlan::new();
113
114 let skeleton = self.generate_skeleton(json);
116 plan.frames.push_back(PriorityStreamFrame::Skeleton {
117 data: skeleton,
118 priority: Priority::CRITICAL,
119 complete: false,
120 });
121
122 let mut patches = Vec::new();
124 self.extract_patches(json, &JsonPath::root(), &mut patches)?;
125
126 patches.sort_by_key(|patch| std::cmp::Reverse(patch.priority));
128
129 let mut current_priority = Priority::CRITICAL;
130 let mut current_batch = Vec::new();
131
132 for patch in patches {
133 if patch.priority != current_priority && !current_batch.is_empty() {
134 plan.frames.push_back(PriorityStreamFrame::Patch {
135 patches: current_batch,
136 priority: current_priority,
137 });
138 current_batch = Vec::new();
139 }
140 current_priority = patch.priority;
141 current_batch.push(patch);
142
143 if current_batch.len() >= self.config.max_patch_size {
144 plan.frames.push_back(PriorityStreamFrame::Patch {
145 patches: current_batch,
146 priority: current_priority,
147 });
148 current_batch = Vec::new();
149 }
150 }
151
152 if !current_batch.is_empty() {
154 plan.frames.push_back(PriorityStreamFrame::Patch {
155 patches: current_batch,
156 priority: current_priority,
157 });
158 }
159
160 plan.frames
162 .push_back(PriorityStreamFrame::Complete { checksum: None });
163
164 Ok(plan)
165 }
166
167 fn generate_skeleton(&self, json: &JsonValue) -> JsonValue {
169 match json {
170 JsonValue::Object(map) => {
171 let mut skeleton = JsonMap::new();
172 for (key, value) in map {
173 skeleton.insert(
174 key.clone(),
175 match value {
176 JsonValue::Array(_) => JsonValue::Array(vec![]),
177 JsonValue::Object(_) => self.generate_skeleton(value),
178 JsonValue::String(_) => JsonValue::Null,
179 JsonValue::Number(_) => JsonValue::Number(0.into()),
180 JsonValue::Bool(_) => JsonValue::Bool(false),
181 JsonValue::Null => JsonValue::Null,
182 },
183 );
184 }
185 JsonValue::Object(skeleton)
186 }
187 JsonValue::Array(_) => JsonValue::Array(vec![]),
188 _ => JsonValue::Null,
189 }
190 }
191
192 fn extract_patches(
194 &self,
195 json: &JsonValue,
196 current_path: &JsonPath,
197 patches: &mut Vec<JsonPatch>,
198 ) -> Result<()> {
199 match json {
200 JsonValue::Object(map) => {
201 for (key, value) in map {
202 let field_path = current_path.append_key(key);
203 let priority = self.calculate_field_priority(&field_path, key, value);
204
205 patches.push(JsonPatch {
207 path: field_path.clone(),
208 operation: PatchOperation::Set {
209 value: value.clone(),
210 },
211 priority,
212 });
213
214 self.extract_patches(value, &field_path, patches)?;
216 }
217 }
218 JsonValue::Array(arr) => {
219 if arr.len() > 10 {
221 for chunk in arr.chunks(self.config.max_patch_size) {
223 patches.push(JsonPatch {
224 path: current_path.clone(),
225 operation: PatchOperation::Append {
226 values: chunk.to_vec(),
227 },
228 priority: self.calculate_array_priority(current_path, chunk),
229 });
230 }
231 } else if !arr.is_empty() {
232 patches.push(JsonPatch {
233 path: current_path.clone(),
234 operation: PatchOperation::Append {
235 values: arr.clone(),
236 },
237 priority: self.calculate_array_priority(current_path, arr),
238 });
239 }
240 }
241 _ => {
242 }
244 }
245
246 Ok(())
247 }
248
249 fn calculate_field_priority(&self, _path: &JsonPath, key: &str, value: &JsonValue) -> Priority {
251 if matches!(key, "id" | "uuid" | "status" | "type" | "kind") {
253 return Priority::CRITICAL;
254 }
255
256 if matches!(key, "name" | "title" | "label" | "email" | "username") {
258 return Priority::HIGH;
259 }
260
261 if key.contains("analytics") || key.contains("stats") || key.contains("meta") {
263 return Priority::LOW;
264 }
265
266 if matches!(key, "reviews" | "comments" | "logs" | "history") {
267 return Priority::BACKGROUND;
268 }
269
270 match value {
272 JsonValue::Array(arr) if arr.len() > 100 => Priority::BACKGROUND,
273 JsonValue::Object(obj) if obj.contains_key("timestamp") => Priority::MEDIUM,
274 JsonValue::String(s) if s.len() > 1000 => Priority::LOW,
275 _ => Priority::MEDIUM,
276 }
277 }
278
279 fn calculate_array_priority(&self, path: &JsonPath, elements: &[JsonValue]) -> Priority {
281 if elements.len() > 50 {
283 return Priority::BACKGROUND;
284 }
285
286 if let Some(last_key) = path.last_key() {
288 if matches!(last_key.as_str(), "reviews" | "comments" | "logs") {
289 return Priority::BACKGROUND;
290 }
291 if matches!(last_key.as_str(), "items" | "data" | "results") {
292 return Priority::MEDIUM;
293 }
294 }
295
296 Priority::MEDIUM
297 }
298}
299
300#[derive(Debug)]
302pub struct StreamingPlan {
303 pub frames: VecDeque<PriorityStreamFrame>,
304}
305
306impl Default for StreamingPlan {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312impl StreamingPlan {
313 pub fn new() -> Self {
314 Self {
315 frames: VecDeque::new(),
316 }
317 }
318
319 pub fn next_frame(&mut self) -> Option<PriorityStreamFrame> {
321 self.frames.pop_front()
322 }
323
324 pub fn is_complete(&self) -> bool {
326 self.frames.is_empty()
327 }
328
329 pub fn remaining_frames(&self) -> usize {
331 self.frames.len()
332 }
333
334 pub fn frames(&self) -> impl Iterator<Item = &PriorityStreamFrame> {
336 self.frames.iter()
337 }
338}
339
340impl JsonPath {
341 pub fn root() -> Self {
343 let segments = vec![PathSegment::Root];
344 Self { segments }
345 }
346
347 pub fn append_key(&self, key: &str) -> Self {
349 let mut segments = self.segments.clone();
350 segments.push(PathSegment::Key(key.to_string()));
351 Self { segments }
352 }
353
354 pub fn append_index(&self, index: usize) -> Self {
356 let mut segments = self.segments.clone();
357 segments.push(PathSegment::Index(index));
358 Self { segments }
359 }
360
361 pub fn last_key(&self) -> Option<String> {
363 self.segments.iter().rev().find_map(|segment| {
364 if let PathSegment::Key(key) = segment {
365 Some(key.clone())
366 } else {
367 None
368 }
369 })
370 }
371
372 pub fn segments(&self) -> &[PathSegment] {
374 &self.segments
375 }
376
377 pub fn len(&self) -> usize {
379 self.segments.len()
380 }
381
382 pub fn is_empty(&self) -> bool {
384 self.segments.is_empty()
385 }
386
387 pub fn from_segments(segments: Vec<PathSegment>) -> Self {
389 Self { segments }
390 }
391
392 pub fn to_json_pointer(&self) -> String {
394 let mut pointer = String::new();
395 for segment in &self.segments {
396 match segment {
397 PathSegment::Root => {}
398 PathSegment::Key(key) => {
399 pointer.push('/');
400 pointer.push_str(key);
401 }
402 PathSegment::Index(idx) => {
403 pointer.push('/');
404 pointer.push_str(&idx.to_string());
405 }
406 PathSegment::Wildcard => {
407 pointer.push_str("/*");
408 }
409 }
410 }
411 if pointer.is_empty() {
412 "/".to_string()
413 } else {
414 pointer
415 }
416 }
417}
418
419impl Default for PriorityStreamer {
420 fn default() -> Self {
421 Self::new()
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use serde_json::json;
429
430 #[test]
431 fn test_json_path_creation() {
432 let path = JsonPath::root();
433 assert_eq!(path.to_json_pointer(), "/");
434
435 let path = path.append_key("users").append_index(0).append_key("name");
436 assert_eq!(path.to_json_pointer(), "/users/0/name");
437 }
438
439 #[test]
440 fn test_priority_comparison() {
441 assert!(Priority::CRITICAL > Priority::HIGH);
442 assert!(Priority::HIGH > Priority::MEDIUM);
443 assert!(Priority::MEDIUM > Priority::LOW);
444 assert!(Priority::LOW > Priority::BACKGROUND);
445 }
446
447 #[test]
448 fn test_skeleton_generation() {
449 let streamer = PriorityStreamer::new();
450 let json = json!({
451 "name": "John",
452 "age": 30,
453 "active": true,
454 "posts": ["post1", "post2"]
455 });
456
457 let skeleton = streamer.generate_skeleton(&json);
458 let expected = json!({
459 "name": null,
460 "age": 0,
461 "active": false,
462 "posts": []
463 });
464
465 assert_eq!(skeleton, expected);
466 }
467
468 #[test]
469 fn test_field_priority_calculation() {
470 let streamer = PriorityStreamer::new();
471 let path = JsonPath::root();
472
473 assert_eq!(
474 streamer.calculate_field_priority(&path, "id", &json!(123)),
475 Priority::CRITICAL
476 );
477
478 assert_eq!(
479 streamer.calculate_field_priority(&path, "name", &json!("John")),
480 Priority::HIGH
481 );
482
483 assert_eq!(
484 streamer.calculate_field_priority(&path, "reviews", &json!([])),
485 Priority::BACKGROUND
486 );
487 }
488
489 #[test]
490 fn test_streaming_plan_creation() {
491 let streamer = PriorityStreamer::new();
492 let json = json!({
493 "id": 1,
494 "name": "John",
495 "bio": "Software developer",
496 "reviews": ["Good", "Excellent"]
497 });
498
499 let plan = streamer.analyze(&json).unwrap();
500 assert!(!plan.is_complete());
501 assert!(plan.remaining_frames() > 0);
502 }
503}