1use crate::{
4 DomainError, DomainResult,
5 entities::Frame,
6 value_objects::{JsonData, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12mod serde_session_id {
14 use crate::value_objects::SessionId;
15 use serde::{Deserialize, Deserializer, Serialize, Serializer};
16
17 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
18 where
19 S: Serializer,
20 {
21 id.as_uuid().serialize(serializer)
22 }
23
24 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
25 where
26 D: Deserializer<'de>,
27 {
28 let uuid = uuid::Uuid::deserialize(deserializer)?;
29 Ok(SessionId::from_uuid(uuid))
30 }
31}
32
33mod serde_stream_id {
35 use crate::value_objects::StreamId;
36 use serde::{Deserialize, Deserializer, Serialize, Serializer};
37
38 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
39 where
40 S: Serializer,
41 {
42 id.as_uuid().serialize(serializer)
43 }
44
45 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
46 where
47 D: Deserializer<'de>,
48 {
49 let uuid = uuid::Uuid::deserialize(deserializer)?;
50 Ok(StreamId::from_uuid(uuid))
51 }
52}
53
54#[allow(dead_code)]
56mod serde_priority {
57 use crate::value_objects::Priority;
58 use serde::{Deserialize, Deserializer, Serialize, Serializer};
59
60 pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
61 where
62 S: Serializer,
63 {
64 priority.value().serialize(serializer)
65 }
66
67 pub fn deserialize<'de, D>(deserializer: D) -> Result<Priority, D::Error>
68 where
69 D: Deserializer<'de>,
70 {
71 let value = u8::deserialize(deserializer)?;
72 Priority::new(value).map_err(serde::de::Error::custom)
73 }
74}
75
76mod serde_priority_map {
78 use crate::value_objects::Priority;
79 use serde::{Deserialize, Deserializer, Serialize, Serializer};
80 use std::collections::HashMap;
81
82 pub fn serialize<S>(map: &HashMap<String, Priority>, serializer: S) -> Result<S::Ok, S::Error>
83 where
84 S: Serializer,
85 {
86 let u8_map: HashMap<String, u8> = map.iter().map(|(k, v)| (k.clone(), v.value())).collect();
87 u8_map.serialize(serializer)
88 }
89
90 pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<String, Priority>, D::Error>
91 where
92 D: Deserializer<'de>,
93 {
94 let u8_map: HashMap<String, u8> = HashMap::deserialize(deserializer)?;
95 u8_map
96 .into_iter()
97 .map(|(k, v)| {
98 Priority::new(v)
99 .map(|p| (k, p))
100 .map_err(serde::de::Error::custom)
101 })
102 .collect()
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
108pub enum StreamState {
109 Preparing,
111 Streaming,
113 Completed,
115 Failed,
117 Cancelled,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct StreamConfig {
124 pub max_frame_size: usize,
126 pub max_frames_per_batch: usize,
128 pub enable_compression: bool,
130 #[serde(with = "serde_priority_map")]
132 pub priority_rules: HashMap<String, Priority>,
133}
134
135impl Default for StreamConfig {
136 fn default() -> Self {
137 Self {
138 max_frame_size: 64 * 1024, max_frames_per_batch: 10,
140 enable_compression: true,
141 priority_rules: HashMap::new(),
142 }
143 }
144}
145
146#[derive(Debug, Clone, Default, Serialize, Deserialize)]
148pub struct StreamStats {
149 pub total_frames: u64,
151 pub skeleton_frames: u64,
153 pub patch_frames: u64,
155 pub complete_frames: u64,
157 pub error_frames: u64,
159 pub total_bytes: u64,
161 pub critical_bytes: u64,
163 pub high_priority_bytes: u64,
165 pub average_frame_size: f64,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct Stream {
172 #[serde(with = "serde_stream_id")]
173 id: StreamId,
174 #[serde(with = "serde_session_id")]
175 session_id: SessionId,
176 state: StreamState,
177 config: StreamConfig,
178 stats: StreamStats,
179 created_at: DateTime<Utc>,
180 updated_at: DateTime<Utc>,
181 completed_at: Option<DateTime<Utc>>,
182 next_sequence: u64,
183 source_data: Option<JsonData>,
184 metadata: HashMap<String, String>,
185}
186
187impl Stream {
188 pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
190 let now = Utc::now();
191
192 Self {
193 id: StreamId::new(),
194 session_id,
195 state: StreamState::Preparing,
196 config,
197 stats: StreamStats::default(),
198 created_at: now,
199 updated_at: now,
200 completed_at: None,
201 next_sequence: 1,
202 source_data: Some(source_data),
203 metadata: HashMap::new(),
204 }
205 }
206
207 pub fn id(&self) -> StreamId {
209 self.id
210 }
211
212 pub fn session_id(&self) -> SessionId {
214 self.session_id
215 }
216
217 pub fn state(&self) -> &StreamState {
219 &self.state
220 }
221
222 pub fn config(&self) -> &StreamConfig {
224 &self.config
225 }
226
227 pub fn stats(&self) -> &StreamStats {
229 &self.stats
230 }
231
232 pub fn created_at(&self) -> DateTime<Utc> {
234 self.created_at
235 }
236
237 pub fn updated_at(&self) -> DateTime<Utc> {
239 self.updated_at
240 }
241
242 pub fn completed_at(&self) -> Option<DateTime<Utc>> {
244 self.completed_at
245 }
246
247 pub fn source_data(&self) -> Option<&JsonData> {
249 self.source_data.as_ref()
250 }
251
252 pub fn metadata(&self) -> &HashMap<String, String> {
254 &self.metadata
255 }
256
257 pub fn add_metadata(&mut self, key: String, value: String) {
259 self.metadata.insert(key, value);
260 self.update_timestamp();
261 }
262
263 pub fn start_streaming(&mut self) -> DomainResult<()> {
265 match self.state {
266 StreamState::Preparing => {
267 self.state = StreamState::Streaming;
268 self.update_timestamp();
269 Ok(())
270 }
271 _ => Err(DomainError::InvalidStateTransition(format!(
272 "Cannot start streaming from state: {:?}",
273 self.state
274 ))),
275 }
276 }
277
278 pub fn complete(&mut self) -> DomainResult<()> {
280 match self.state {
281 StreamState::Streaming => {
282 self.state = StreamState::Completed;
283 self.completed_at = Some(Utc::now());
284 self.update_timestamp();
285 Ok(())
286 }
287 _ => Err(DomainError::InvalidStateTransition(format!(
288 "Cannot complete stream from state: {:?}",
289 self.state
290 ))),
291 }
292 }
293
294 pub fn fail(&mut self, error: String) -> DomainResult<()> {
296 match self.state {
297 StreamState::Preparing | StreamState::Streaming => {
298 self.state = StreamState::Failed;
299 self.completed_at = Some(Utc::now());
300 self.add_metadata("error".to_string(), error);
301 Ok(())
302 }
303 _ => Err(DomainError::InvalidStateTransition(format!(
304 "Cannot fail stream from state: {:?}",
305 self.state
306 ))),
307 }
308 }
309
310 pub fn cancel(&mut self) -> DomainResult<()> {
312 match self.state {
313 StreamState::Preparing | StreamState::Streaming => {
314 self.state = StreamState::Cancelled;
315 self.completed_at = Some(Utc::now());
316 self.update_timestamp();
317 Ok(())
318 }
319 _ => Err(DomainError::InvalidStateTransition(format!(
320 "Cannot cancel stream from state: {:?}",
321 self.state
322 ))),
323 }
324 }
325
326 pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
328 if !matches!(self.state, StreamState::Streaming) {
329 return Err(DomainError::InvalidStreamState(
330 "Stream must be in streaming state to create frames".to_string(),
331 ));
332 }
333
334 let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
335 DomainError::InvalidStreamState("No source data available for skeleton".to_string())
336 })?;
337
338 let skeleton = self.generate_skeleton(skeleton_data)?;
339 let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
340
341 self.record_frame_created(&frame);
342
343 Ok(frame)
344 }
345
346 pub fn create_patch_frames(
348 &mut self,
349 priority_threshold: Priority,
350 max_frames: usize,
351 ) -> DomainResult<Vec<Frame>> {
352 if !matches!(self.state, StreamState::Streaming) {
353 return Err(DomainError::InvalidStreamState(
354 "Stream must be in streaming state to create frames".to_string(),
355 ));
356 }
357
358 let source_data = self.source_data.as_ref().ok_or_else(|| {
359 DomainError::InvalidStreamState("No source data available for patches".to_string())
360 })?;
361
362 let patches = self.extract_patches(source_data, priority_threshold)?;
363 let frames = self.batch_patches_into_frames(patches, max_frames)?;
364
365 for frame in &frames {
366 self.record_frame_created(frame);
367 }
368
369 Ok(frames)
370 }
371
372 pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
374 if !matches!(self.state, StreamState::Streaming) {
375 return Err(DomainError::InvalidStreamState(
376 "Stream must be in streaming state to create frames".to_string(),
377 ));
378 }
379
380 let frame = Frame::complete(self.id, self.next_sequence, checksum);
381 self.record_frame_created(&frame);
382
383 Ok(frame)
384 }
385
386 pub fn is_active(&self) -> bool {
388 matches!(self.state, StreamState::Preparing | StreamState::Streaming)
389 }
390
391 pub fn is_finished(&self) -> bool {
393 matches!(
394 self.state,
395 StreamState::Completed | StreamState::Failed | StreamState::Cancelled
396 )
397 }
398
399 pub fn duration(&self) -> Option<chrono::Duration> {
401 self.completed_at.map(|end| end - self.created_at)
402 }
403
404 pub fn progress(&self) -> f64 {
406 match self.state {
407 StreamState::Preparing => 0.0,
408 StreamState::Streaming => {
409 if self.stats.total_frames == 0 {
411 0.1 } else {
413 (self.stats.total_frames as f64 / 100.0).min(0.9)
415 }
416 }
417 StreamState::Completed => 1.0,
418 StreamState::Failed | StreamState::Cancelled => {
419 (self.stats.total_frames as f64 / 100.0).min(0.99)
421 }
422 }
423 }
424
425 pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
427 if !self.is_active() {
428 return Err(DomainError::InvalidStreamState(
429 "Cannot update config of inactive stream".to_string(),
430 ));
431 }
432
433 self.config = config;
434 self.update_timestamp();
435 Ok(())
436 }
437
438 fn update_timestamp(&mut self) {
440 self.updated_at = Utc::now();
441 }
442
443 fn record_frame_created(&mut self, frame: &Frame) {
445 self.next_sequence += 1;
446 self.stats.total_frames += 1;
447
448 let frame_size = frame.estimated_size() as u64;
449 self.stats.total_bytes += frame_size;
450
451 match frame.frame_type() {
452 crate::entities::frame::FrameType::Skeleton => {
453 self.stats.skeleton_frames += 1;
454 self.stats.critical_bytes += frame_size;
455 }
456 crate::entities::frame::FrameType::Patch => {
457 self.stats.patch_frames += 1;
458 if frame.is_critical() {
459 self.stats.critical_bytes += frame_size;
460 } else if frame.is_high_priority() {
461 self.stats.high_priority_bytes += frame_size;
462 }
463 }
464 crate::entities::frame::FrameType::Complete => {
465 self.stats.complete_frames += 1;
466 self.stats.critical_bytes += frame_size;
467 }
468 crate::entities::frame::FrameType::Error => {
469 self.stats.error_frames += 1;
470 self.stats.critical_bytes += frame_size;
471 }
472 }
473
474 self.stats.average_frame_size =
476 self.stats.total_bytes as f64 / self.stats.total_frames as f64;
477
478 self.update_timestamp();
479 }
480
481 fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
483 match data {
485 JsonData::Object(obj) => {
486 let mut skeleton = HashMap::new();
487 for (key, value) in obj.iter() {
488 skeleton.insert(
489 key.clone(),
490 match value {
491 JsonData::Array(_) => JsonData::Array(Vec::new()),
492 JsonData::Object(_) => self.generate_skeleton(value)?,
493 JsonData::Integer(_) => JsonData::Integer(0),
494 JsonData::Float(_) => JsonData::Float(0.0),
495 JsonData::String(_) => JsonData::Null,
496 JsonData::Bool(_) => JsonData::Bool(false),
497 JsonData::Null => JsonData::Null,
498 },
499 );
500 }
501 Ok(JsonData::Object(skeleton))
502 }
503 JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
504 _ => Ok(JsonData::Null),
505 }
506 }
507
508 fn extract_patches(
510 &self,
511 _data: &JsonData,
512 _threshold: Priority,
513 ) -> DomainResult<Vec<crate::entities::frame::FramePatch>> {
514 Ok(Vec::new())
516 }
517
518 fn batch_patches_into_frames(
520 &mut self,
521 patches: Vec<crate::entities::frame::FramePatch>,
522 max_frames: usize,
523 ) -> DomainResult<Vec<Frame>> {
524 if patches.is_empty() {
525 return Ok(Vec::new());
526 }
527
528 let mut frames = Vec::new();
529 let chunk_size = patches.len().div_ceil(max_frames);
530
531 for patch_chunk in patches.chunks(chunk_size) {
532 let priority = patch_chunk
533 .iter()
534 .map(|_| Priority::MEDIUM) .max()
536 .unwrap_or(Priority::MEDIUM);
537
538 let frame = Frame::patch(self.id, self.next_sequence, priority, patch_chunk.to_vec())?;
539
540 frames.push(frame);
541 }
542
543 Ok(frames)
544 }
545}
546
547#[cfg(test)]
548mod tests {
549 use super::*;
550
551 #[test]
552 fn test_stream_creation() {
553 let session_id = SessionId::new();
554 let source_data = serde_json::json!({
555 "users": [
556 {"id": 1, "name": "John"},
557 {"id": 2, "name": "Jane"}
558 ],
559 "total": 2
560 });
561
562 let stream = Stream::new(
563 session_id,
564 source_data.clone().into(),
565 StreamConfig::default(),
566 );
567
568 assert_eq!(stream.session_id(), session_id);
569 assert_eq!(stream.state(), &StreamState::Preparing);
570 assert!(stream.is_active());
571 assert!(!stream.is_finished());
572 assert_eq!(stream.progress(), 0.0);
573 }
574
575 #[test]
576 fn test_stream_state_transitions() {
577 let session_id = SessionId::new();
578 let source_data = serde_json::json!({});
579 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
580
581 assert!(stream.start_streaming().is_ok());
583 assert_eq!(stream.state(), &StreamState::Streaming);
584
585 assert!(stream.complete().is_ok());
587 assert_eq!(stream.state(), &StreamState::Completed);
588 assert!(stream.is_finished());
589 assert_eq!(stream.progress(), 1.0);
590 }
591
592 #[test]
593 fn test_invalid_state_transitions() {
594 let session_id = SessionId::new();
595 let source_data = serde_json::json!({});
596 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
597
598 assert!(stream.complete().is_err());
600
601 assert!(stream.start_streaming().is_ok());
603 assert!(stream.complete().is_ok());
604
605 assert!(stream.start_streaming().is_err());
607 }
608
609 #[test]
610 fn test_frame_creation() {
611 let session_id = SessionId::new();
612 let source_data = serde_json::json!({
613 "test": "data"
614 });
615 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
616
617 assert!(stream.create_skeleton_frame().is_err());
619
620 assert!(stream.start_streaming().is_ok());
622 let skeleton = stream
623 .create_skeleton_frame()
624 .expect("Failed to create skeleton frame in test");
625
626 assert_eq!(
627 skeleton.frame_type(),
628 &crate::entities::frame::FrameType::Skeleton
629 );
630 assert_eq!(skeleton.sequence(), 1);
631 assert_eq!(stream.stats().skeleton_frames, 1);
632 }
633
634 #[test]
635 fn test_stream_metadata() {
636 let session_id = SessionId::new();
637 let source_data = serde_json::json!({});
638 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
639
640 stream.add_metadata("source".to_string(), "api".to_string());
641 stream.add_metadata("version".to_string(), "1.0".to_string());
642
643 assert_eq!(stream.metadata().len(), 2);
644 assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
645 }
646}