1use crate::{
4 DomainError, DomainResult,
5 entities::Frame,
6 value_objects::{JsonData, JsonPath, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12mod serde_session_id {
14 use crate::value_objects::SessionId;
15 use serde::{Deserialize, Deserializer, Serialize, Serializer};
16
17 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
18 where
19 S: Serializer,
20 {
21 id.as_uuid().serialize(serializer)
22 }
23
24 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
25 where
26 D: Deserializer<'de>,
27 {
28 let uuid = uuid::Uuid::deserialize(deserializer)?;
29 Ok(SessionId::from_uuid(uuid))
30 }
31}
32
33mod serde_stream_id {
35 use crate::value_objects::StreamId;
36 use serde::{Deserialize, Deserializer, Serialize, Serializer};
37
38 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
39 where
40 S: Serializer,
41 {
42 id.as_uuid().serialize(serializer)
43 }
44
45 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
46 where
47 D: Deserializer<'de>,
48 {
49 let uuid = uuid::Uuid::deserialize(deserializer)?;
50 Ok(StreamId::from_uuid(uuid))
51 }
52}
53
54mod serde_priority_map {
56 use crate::value_objects::Priority;
57 use serde::{Deserialize, Deserializer, Serialize, Serializer};
58 use std::collections::HashMap;
59
60 pub fn serialize<S>(map: &HashMap<String, Priority>, serializer: S) -> Result<S::Ok, S::Error>
61 where
62 S: Serializer,
63 {
64 let u8_map: HashMap<String, u8> = map.iter().map(|(k, v)| (k.clone(), v.value())).collect();
65 u8_map.serialize(serializer)
66 }
67
68 pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<String, Priority>, D::Error>
69 where
70 D: Deserializer<'de>,
71 {
72 let u8_map: HashMap<String, u8> = HashMap::deserialize(deserializer)?;
73 u8_map
74 .into_iter()
75 .map(|(k, v)| {
76 Priority::new(v)
77 .map(|p| (k, p))
78 .map_err(serde::de::Error::custom)
79 })
80 .collect()
81 }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86#[non_exhaustive]
87pub enum StreamState {
88 Preparing,
90 Streaming,
92 Completed,
94 Failed,
96 Cancelled,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct StreamConfig {
103 pub max_frame_size: usize,
105 pub max_frames_per_batch: usize,
107 pub enable_compression: bool,
109 #[serde(with = "serde_priority_map")]
111 pub priority_rules: HashMap<String, Priority>,
112}
113
114impl Default for StreamConfig {
115 fn default() -> Self {
116 Self {
117 max_frame_size: 64 * 1024, max_frames_per_batch: 10,
119 enable_compression: true,
120 priority_rules: HashMap::new(),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Default, Serialize, Deserialize)]
127pub struct StreamStats {
128 pub total_frames: u64,
130 pub skeleton_frames: u64,
132 pub patch_frames: u64,
134 pub complete_frames: u64,
136 pub error_frames: u64,
138 pub total_bytes: u64,
140 pub critical_bytes: u64,
142 pub high_priority_bytes: u64,
144 pub average_frame_size: f64,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Stream {
151 #[serde(with = "serde_stream_id")]
152 id: StreamId,
153 #[serde(with = "serde_session_id")]
154 session_id: SessionId,
155 state: StreamState,
156 config: StreamConfig,
157 stats: StreamStats,
158 created_at: DateTime<Utc>,
159 updated_at: DateTime<Utc>,
160 completed_at: Option<DateTime<Utc>>,
161 next_sequence: u64,
162 source_data: Option<JsonData>,
163 metadata: HashMap<String, String>,
164}
165
166impl Stream {
167 pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
169 let now = Utc::now();
170
171 Self {
172 id: StreamId::new(),
173 session_id,
174 state: StreamState::Preparing,
175 config,
176 stats: StreamStats::default(),
177 created_at: now,
178 updated_at: now,
179 completed_at: None,
180 next_sequence: 1,
181 source_data: Some(source_data),
182 metadata: HashMap::new(),
183 }
184 }
185
186 pub fn id(&self) -> StreamId {
188 self.id
189 }
190
191 pub fn session_id(&self) -> SessionId {
193 self.session_id
194 }
195
196 pub fn state(&self) -> &StreamState {
198 &self.state
199 }
200
201 pub fn config(&self) -> &StreamConfig {
203 &self.config
204 }
205
206 pub fn stats(&self) -> &StreamStats {
208 &self.stats
209 }
210
211 pub fn created_at(&self) -> DateTime<Utc> {
213 self.created_at
214 }
215
216 pub fn updated_at(&self) -> DateTime<Utc> {
218 self.updated_at
219 }
220
221 pub fn completed_at(&self) -> Option<DateTime<Utc>> {
223 self.completed_at
224 }
225
226 pub fn source_data(&self) -> Option<&JsonData> {
228 self.source_data.as_ref()
229 }
230
231 pub fn metadata(&self) -> &HashMap<String, String> {
233 &self.metadata
234 }
235
236 pub fn add_metadata(&mut self, key: String, value: String) {
238 self.metadata.insert(key, value);
239 self.update_timestamp();
240 }
241
242 pub fn start_streaming(&mut self) -> DomainResult<()> {
244 match self.state {
245 StreamState::Preparing => {
246 self.state = StreamState::Streaming;
247 self.update_timestamp();
248 Ok(())
249 }
250 _ => Err(DomainError::InvalidStateTransition(format!(
251 "Cannot start streaming from state: {:?}",
252 self.state
253 ))),
254 }
255 }
256
257 pub fn complete(&mut self) -> DomainResult<()> {
259 match self.state {
260 StreamState::Streaming => {
261 self.state = StreamState::Completed;
262 self.completed_at = Some(Utc::now());
263 self.update_timestamp();
264 Ok(())
265 }
266 _ => Err(DomainError::InvalidStateTransition(format!(
267 "Cannot complete stream from state: {:?}",
268 self.state
269 ))),
270 }
271 }
272
273 pub fn fail(&mut self, error: String) -> DomainResult<()> {
275 match self.state {
276 StreamState::Preparing | StreamState::Streaming => {
277 self.state = StreamState::Failed;
278 self.completed_at = Some(Utc::now());
279 self.add_metadata("error".to_string(), error);
280 Ok(())
281 }
282 _ => Err(DomainError::InvalidStateTransition(format!(
283 "Cannot fail stream from state: {:?}",
284 self.state
285 ))),
286 }
287 }
288
289 pub fn cancel(&mut self) -> DomainResult<()> {
291 match self.state {
292 StreamState::Preparing | StreamState::Streaming => {
293 self.state = StreamState::Cancelled;
294 self.completed_at = Some(Utc::now());
295 self.update_timestamp();
296 Ok(())
297 }
298 _ => Err(DomainError::InvalidStateTransition(format!(
299 "Cannot cancel stream from state: {:?}",
300 self.state
301 ))),
302 }
303 }
304
305 pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
307 if !matches!(self.state, StreamState::Streaming) {
308 return Err(DomainError::InvalidStreamState(
309 "Stream must be in streaming state to create frames".to_string(),
310 ));
311 }
312
313 let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
314 DomainError::InvalidStreamState("No source data available for skeleton".to_string())
315 })?;
316
317 let skeleton = self.generate_skeleton(skeleton_data)?;
318 let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
319
320 self.record_frame_created(&frame);
321
322 Ok(frame)
323 }
324
325 pub fn create_patch_frames(
327 &mut self,
328 priority_threshold: Priority,
329 max_frames: usize,
330 ) -> DomainResult<Vec<Frame>> {
331 if !matches!(self.state, StreamState::Streaming) {
332 return Err(DomainError::InvalidStreamState(
333 "Stream must be in streaming state to create frames".to_string(),
334 ));
335 }
336
337 let source_data = self.source_data.as_ref().ok_or_else(|| {
338 DomainError::InvalidStreamState("No source data available for patches".to_string())
339 })?;
340
341 let prioritized = self.extract_patches(source_data, priority_threshold)?;
342 let frames = self.batch_patches_into_frames(prioritized, max_frames)?;
343
344 for frame in &frames {
345 self.record_frame_created(frame);
346 }
347
348 Ok(frames)
349 }
350
351 pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
353 if !matches!(self.state, StreamState::Streaming) {
354 return Err(DomainError::InvalidStreamState(
355 "Stream must be in streaming state to create frames".to_string(),
356 ));
357 }
358
359 let frame = Frame::complete(self.id, self.next_sequence, checksum);
360 self.record_frame_created(&frame);
361
362 Ok(frame)
363 }
364
365 pub fn is_active(&self) -> bool {
367 matches!(self.state, StreamState::Preparing | StreamState::Streaming)
368 }
369
370 pub fn is_finished(&self) -> bool {
372 matches!(
373 self.state,
374 StreamState::Completed | StreamState::Failed | StreamState::Cancelled
375 )
376 }
377
378 pub fn duration(&self) -> Option<chrono::Duration> {
380 self.completed_at.map(|end| end - self.created_at)
381 }
382
383 pub fn progress(&self) -> f64 {
385 match self.state {
386 StreamState::Preparing => 0.0,
387 StreamState::Streaming => {
388 if self.stats.total_frames == 0 {
390 0.1 } else {
392 (self.stats.total_frames as f64 / 100.0).min(0.9)
394 }
395 }
396 StreamState::Completed => 1.0,
397 StreamState::Failed | StreamState::Cancelled => {
398 (self.stats.total_frames as f64 / 100.0).min(0.99)
400 }
401 }
402 }
403
404 pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
406 if !self.is_active() {
407 return Err(DomainError::InvalidStreamState(
408 "Cannot update config of inactive stream".to_string(),
409 ));
410 }
411
412 self.config = config;
413 self.update_timestamp();
414 Ok(())
415 }
416
417 fn update_timestamp(&mut self) {
419 self.updated_at = Utc::now();
420 }
421
422 fn record_frame_created(&mut self, frame: &Frame) {
424 self.next_sequence += 1;
425 self.stats.total_frames += 1;
426
427 let frame_size = frame.estimated_size() as u64;
428 self.stats.total_bytes += frame_size;
429
430 match frame.frame_type() {
431 crate::entities::frame::FrameType::Skeleton => {
432 self.stats.skeleton_frames += 1;
433 self.stats.critical_bytes += frame_size;
434 }
435 crate::entities::frame::FrameType::Patch => {
436 self.stats.patch_frames += 1;
437 if frame.is_critical() {
438 self.stats.critical_bytes += frame_size;
439 } else if frame.is_high_priority() {
440 self.stats.high_priority_bytes += frame_size;
441 }
442 }
443 crate::entities::frame::FrameType::Complete => {
444 self.stats.complete_frames += 1;
445 self.stats.critical_bytes += frame_size;
446 }
447 crate::entities::frame::FrameType::Error => {
448 self.stats.error_frames += 1;
449 self.stats.critical_bytes += frame_size;
450 }
451 }
452
453 self.stats.average_frame_size =
455 self.stats.total_bytes as f64 / self.stats.total_frames as f64;
456
457 self.update_timestamp();
458 }
459
460 fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
462 match data {
464 JsonData::Object(obj) => {
465 let mut skeleton = HashMap::new();
466 for (key, value) in obj.iter() {
467 skeleton.insert(
468 key.clone(),
469 match value {
470 JsonData::Array(_) => JsonData::Array(Vec::new()),
471 JsonData::Object(_) => self.generate_skeleton(value)?,
472 JsonData::Integer(_) => JsonData::Integer(0),
473 JsonData::Float(_) => JsonData::Float(0.0),
474 JsonData::String(_) => JsonData::Null,
475 JsonData::Bool(_) => JsonData::Bool(false),
476 JsonData::Null => JsonData::Null,
477 },
478 );
479 }
480 Ok(JsonData::Object(skeleton))
481 }
482 JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
483 _ => Ok(JsonData::Null),
484 }
485 }
486
487 fn extract_patches(
496 &self,
497 data: &JsonData,
498 threshold: Priority,
499 ) -> DomainResult<Vec<(crate::entities::frame::FramePatch, Priority)>> {
500 let mut patches = Vec::new();
501 self.collect_patches(data, &JsonPath::root(), threshold, &mut patches)?;
502 patches.sort_by_key(|p| core::cmp::Reverse(p.1));
505 Ok(patches)
506 }
507
508 fn collect_patches(
510 &self,
511 data: &JsonData,
512 path: &JsonPath,
513 threshold: Priority,
514 out: &mut Vec<(crate::entities::frame::FramePatch, Priority)>,
515 ) -> DomainResult<()> {
516 if let JsonData::Object(map) = data {
517 for (key, value) in map.iter() {
518 let Ok(child_path) = path.append_key(key) else {
522 continue;
523 };
524 self.collect_patches(value, &child_path, threshold, out)?;
525 }
526 return Ok(());
527 }
528
529 let priority = self.compute_priority(path, data);
530 if priority >= threshold {
531 let patch = crate::entities::frame::FramePatch::set(path.clone(), data.clone());
532 out.push((patch, priority));
533 }
534 Ok(())
535 }
536
537 fn compute_priority(&self, path: &JsonPath, value: &JsonData) -> Priority {
546 let mut cfg = crate::services::PriorityHeuristicConfig::default();
547 if !self.config.priority_rules.is_empty() {
548 cfg.overrides = self.config.priority_rules.clone();
549 }
550 crate::services::compute_priority(&cfg, path, value)
551 }
552
553 fn batch_patches_into_frames(
559 &mut self,
560 patches: Vec<(crate::entities::frame::FramePatch, Priority)>,
561 max_frames: usize,
562 ) -> DomainResult<Vec<Frame>> {
563 if patches.is_empty() || max_frames == 0 {
564 return Ok(Vec::new());
565 }
566
567 let mut frames = Vec::new();
568 let chunk_size = patches.len().div_ceil(max_frames).max(1);
569
570 for chunk in patches.chunks(chunk_size) {
571 let priority = chunk
572 .iter()
573 .map(|(_, p)| *p)
574 .max()
575 .unwrap_or(Priority::MEDIUM);
576
577 let frame_patches: Vec<crate::entities::frame::FramePatch> =
578 chunk.iter().map(|(patch, _)| patch.clone()).collect();
579
580 let frame = Frame::patch(self.id, self.next_sequence, priority, frame_patches)?;
581
582 frames.push(frame);
583 }
584
585 Ok(frames)
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[test]
594 fn test_stream_creation() {
595 let session_id = SessionId::new();
596 let source_data = serde_json::json!({
597 "users": [
598 {"id": 1, "name": "John"},
599 {"id": 2, "name": "Jane"}
600 ],
601 "total": 2
602 });
603
604 let stream = Stream::new(
605 session_id,
606 source_data.clone().into(),
607 StreamConfig::default(),
608 );
609
610 assert_eq!(stream.session_id(), session_id);
611 assert_eq!(stream.state(), &StreamState::Preparing);
612 assert!(stream.is_active());
613 assert!(!stream.is_finished());
614 assert_eq!(stream.progress(), 0.0);
615 }
616
617 #[test]
618 fn test_stream_state_transitions() {
619 let session_id = SessionId::new();
620 let source_data = serde_json::json!({});
621 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
622
623 assert!(stream.start_streaming().is_ok());
625 assert_eq!(stream.state(), &StreamState::Streaming);
626
627 assert!(stream.complete().is_ok());
629 assert_eq!(stream.state(), &StreamState::Completed);
630 assert!(stream.is_finished());
631 assert_eq!(stream.progress(), 1.0);
632 }
633
634 #[test]
635 fn test_invalid_state_transitions() {
636 let session_id = SessionId::new();
637 let source_data = serde_json::json!({});
638 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
639
640 assert!(stream.complete().is_err());
642
643 assert!(stream.start_streaming().is_ok());
645 assert!(stream.complete().is_ok());
646
647 assert!(stream.start_streaming().is_err());
649 }
650
651 #[test]
652 fn test_frame_creation() {
653 let session_id = SessionId::new();
654 let source_data = serde_json::json!({
655 "test": "data"
656 });
657 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
658
659 assert!(stream.create_skeleton_frame().is_err());
661
662 assert!(stream.start_streaming().is_ok());
664 let skeleton = stream
665 .create_skeleton_frame()
666 .expect("Failed to create skeleton frame in test");
667
668 assert_eq!(
669 skeleton.frame_type(),
670 &crate::entities::frame::FrameType::Skeleton
671 );
672 assert_eq!(skeleton.sequence(), 1);
673 assert_eq!(stream.stats().skeleton_frames, 1);
674 }
675
676 #[test]
677 fn test_stream_metadata() {
678 let session_id = SessionId::new();
679 let source_data = serde_json::json!({});
680 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
681
682 stream.add_metadata("source".to_string(), "api".to_string());
683 stream.add_metadata("version".to_string(), "1.0".to_string());
684
685 assert_eq!(stream.metadata().len(), 2);
686 assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
687 }
688
689 #[test]
690 fn test_create_patch_frames_emits_frames_for_typical_payload() {
691 let session_id = SessionId::new();
692 let source_data = serde_json::json!({
693 "id": "abc-123",
694 "name": "Alice",
695 "items": [1, 2, 3]
696 });
697 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
698
699 stream
700 .start_streaming()
701 .expect("stream must enter streaming state");
702
703 let frames = stream
704 .create_patch_frames(Priority::BACKGROUND, 16)
705 .expect("frame generation must succeed");
706
707 assert!(
708 !frames.is_empty(),
709 "extract_patches must produce at least one patch for non-empty source data"
710 );
711
712 let id_frame_priority_max = frames
713 .iter()
714 .map(|f| f.priority())
715 .max()
716 .expect("non-empty frames must have a max priority");
717 assert!(
718 id_frame_priority_max >= Priority::CRITICAL,
719 "frames carrying the `id` field must surface at critical priority"
720 );
721 }
722
723 #[test]
724 fn test_create_patch_frames_filters_below_threshold() {
725 let session_id = SessionId::new();
726 let source_data = serde_json::json!({
729 "analytics": {"clicks": 1, "views": 2}
730 });
731 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
732
733 stream.start_streaming().expect("stream starts");
734 let frames = stream
735 .create_patch_frames(Priority::CRITICAL, 8)
736 .expect("frame generation must succeed");
737
738 assert!(
739 frames.is_empty(),
740 "patches below the priority threshold must be dropped"
741 );
742 }
743
744 #[test]
745 fn test_create_patch_frames_uses_max_priority_per_chunk() {
746 let session_id = SessionId::new();
747 let source_data = serde_json::json!({
749 "id": "x",
750 "name": "y",
751 "logs": "z"
752 });
753 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
754
755 stream.start_streaming().expect("stream starts");
756 let frames = stream
758 .create_patch_frames(Priority::BACKGROUND, 1)
759 .expect("frame generation must succeed");
760
761 assert_eq!(frames.len(), 1, "max_frames=1 must yield a single frame");
762 assert_eq!(
763 frames[0].priority(),
764 Priority::CRITICAL,
765 "frame priority must reflect the highest-priority patch in the chunk"
766 );
767 }
768}