1use crate::{
4 DomainError, DomainResult,
5 entities::Frame,
6 value_objects::{JsonData, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12mod serde_session_id {
14 use crate::value_objects::SessionId;
15 use serde::{Deserialize, Deserializer, Serialize, Serializer};
16
17 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
18 where
19 S: Serializer,
20 {
21 id.as_uuid().serialize(serializer)
22 }
23
24 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
25 where
26 D: Deserializer<'de>,
27 {
28 let uuid = uuid::Uuid::deserialize(deserializer)?;
29 Ok(SessionId::from_uuid(uuid))
30 }
31}
32
33mod serde_stream_id {
35 use crate::value_objects::StreamId;
36 use serde::{Deserialize, Deserializer, Serialize, Serializer};
37
38 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
39 where
40 S: Serializer,
41 {
42 id.as_uuid().serialize(serializer)
43 }
44
45 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
46 where
47 D: Deserializer<'de>,
48 {
49 let uuid = uuid::Uuid::deserialize(deserializer)?;
50 Ok(StreamId::from_uuid(uuid))
51 }
52}
53
54mod serde_priority_map {
56 use crate::value_objects::Priority;
57 use serde::{Deserialize, Deserializer, Serialize, Serializer};
58 use std::collections::HashMap;
59
60 pub fn serialize<S>(map: &HashMap<String, Priority>, serializer: S) -> Result<S::Ok, S::Error>
61 where
62 S: Serializer,
63 {
64 let u8_map: HashMap<String, u8> = map.iter().map(|(k, v)| (k.clone(), v.value())).collect();
65 u8_map.serialize(serializer)
66 }
67
68 pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<String, Priority>, D::Error>
69 where
70 D: Deserializer<'de>,
71 {
72 let u8_map: HashMap<String, u8> = HashMap::deserialize(deserializer)?;
73 u8_map
74 .into_iter()
75 .map(|(k, v)| {
76 Priority::new(v)
77 .map(|p| (k, p))
78 .map_err(serde::de::Error::custom)
79 })
80 .collect()
81 }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86pub enum StreamState {
87 Preparing,
89 Streaming,
91 Completed,
93 Failed,
95 Cancelled,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct StreamConfig {
102 pub max_frame_size: usize,
104 pub max_frames_per_batch: usize,
106 pub enable_compression: bool,
108 #[serde(with = "serde_priority_map")]
110 pub priority_rules: HashMap<String, Priority>,
111}
112
113impl Default for StreamConfig {
114 fn default() -> Self {
115 Self {
116 max_frame_size: 64 * 1024, max_frames_per_batch: 10,
118 enable_compression: true,
119 priority_rules: HashMap::new(),
120 }
121 }
122}
123
124#[derive(Debug, Clone, Default, Serialize, Deserialize)]
126pub struct StreamStats {
127 pub total_frames: u64,
129 pub skeleton_frames: u64,
131 pub patch_frames: u64,
133 pub complete_frames: u64,
135 pub error_frames: u64,
137 pub total_bytes: u64,
139 pub critical_bytes: u64,
141 pub high_priority_bytes: u64,
143 pub average_frame_size: f64,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct Stream {
150 #[serde(with = "serde_stream_id")]
151 id: StreamId,
152 #[serde(with = "serde_session_id")]
153 session_id: SessionId,
154 state: StreamState,
155 config: StreamConfig,
156 stats: StreamStats,
157 created_at: DateTime<Utc>,
158 updated_at: DateTime<Utc>,
159 completed_at: Option<DateTime<Utc>>,
160 next_sequence: u64,
161 source_data: Option<JsonData>,
162 metadata: HashMap<String, String>,
163}
164
165impl Stream {
166 pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
168 let now = Utc::now();
169
170 Self {
171 id: StreamId::new(),
172 session_id,
173 state: StreamState::Preparing,
174 config,
175 stats: StreamStats::default(),
176 created_at: now,
177 updated_at: now,
178 completed_at: None,
179 next_sequence: 1,
180 source_data: Some(source_data),
181 metadata: HashMap::new(),
182 }
183 }
184
185 pub fn id(&self) -> StreamId {
187 self.id
188 }
189
190 pub fn session_id(&self) -> SessionId {
192 self.session_id
193 }
194
195 pub fn state(&self) -> &StreamState {
197 &self.state
198 }
199
200 pub fn config(&self) -> &StreamConfig {
202 &self.config
203 }
204
205 pub fn stats(&self) -> &StreamStats {
207 &self.stats
208 }
209
210 pub fn created_at(&self) -> DateTime<Utc> {
212 self.created_at
213 }
214
215 pub fn updated_at(&self) -> DateTime<Utc> {
217 self.updated_at
218 }
219
220 pub fn completed_at(&self) -> Option<DateTime<Utc>> {
222 self.completed_at
223 }
224
225 pub fn source_data(&self) -> Option<&JsonData> {
227 self.source_data.as_ref()
228 }
229
230 pub fn metadata(&self) -> &HashMap<String, String> {
232 &self.metadata
233 }
234
235 pub fn add_metadata(&mut self, key: String, value: String) {
237 self.metadata.insert(key, value);
238 self.update_timestamp();
239 }
240
241 pub fn start_streaming(&mut self) -> DomainResult<()> {
243 match self.state {
244 StreamState::Preparing => {
245 self.state = StreamState::Streaming;
246 self.update_timestamp();
247 Ok(())
248 }
249 _ => Err(DomainError::InvalidStateTransition(format!(
250 "Cannot start streaming from state: {:?}",
251 self.state
252 ))),
253 }
254 }
255
256 pub fn complete(&mut self) -> DomainResult<()> {
258 match self.state {
259 StreamState::Streaming => {
260 self.state = StreamState::Completed;
261 self.completed_at = Some(Utc::now());
262 self.update_timestamp();
263 Ok(())
264 }
265 _ => Err(DomainError::InvalidStateTransition(format!(
266 "Cannot complete stream from state: {:?}",
267 self.state
268 ))),
269 }
270 }
271
272 pub fn fail(&mut self, error: String) -> DomainResult<()> {
274 match self.state {
275 StreamState::Preparing | StreamState::Streaming => {
276 self.state = StreamState::Failed;
277 self.completed_at = Some(Utc::now());
278 self.add_metadata("error".to_string(), error);
279 Ok(())
280 }
281 _ => Err(DomainError::InvalidStateTransition(format!(
282 "Cannot fail stream from state: {:?}",
283 self.state
284 ))),
285 }
286 }
287
288 pub fn cancel(&mut self) -> DomainResult<()> {
290 match self.state {
291 StreamState::Preparing | StreamState::Streaming => {
292 self.state = StreamState::Cancelled;
293 self.completed_at = Some(Utc::now());
294 self.update_timestamp();
295 Ok(())
296 }
297 _ => Err(DomainError::InvalidStateTransition(format!(
298 "Cannot cancel stream from state: {:?}",
299 self.state
300 ))),
301 }
302 }
303
304 pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
306 if !matches!(self.state, StreamState::Streaming) {
307 return Err(DomainError::InvalidStreamState(
308 "Stream must be in streaming state to create frames".to_string(),
309 ));
310 }
311
312 let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
313 DomainError::InvalidStreamState("No source data available for skeleton".to_string())
314 })?;
315
316 let skeleton = self.generate_skeleton(skeleton_data)?;
317 let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
318
319 self.record_frame_created(&frame);
320
321 Ok(frame)
322 }
323
324 pub fn create_patch_frames(
326 &mut self,
327 priority_threshold: Priority,
328 max_frames: usize,
329 ) -> DomainResult<Vec<Frame>> {
330 if !matches!(self.state, StreamState::Streaming) {
331 return Err(DomainError::InvalidStreamState(
332 "Stream must be in streaming state to create frames".to_string(),
333 ));
334 }
335
336 let source_data = self.source_data.as_ref().ok_or_else(|| {
337 DomainError::InvalidStreamState("No source data available for patches".to_string())
338 })?;
339
340 let patches = self.extract_patches(source_data, priority_threshold)?;
341 let frames = self.batch_patches_into_frames(patches, max_frames)?;
342
343 for frame in &frames {
344 self.record_frame_created(frame);
345 }
346
347 Ok(frames)
348 }
349
350 pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
352 if !matches!(self.state, StreamState::Streaming) {
353 return Err(DomainError::InvalidStreamState(
354 "Stream must be in streaming state to create frames".to_string(),
355 ));
356 }
357
358 let frame = Frame::complete(self.id, self.next_sequence, checksum);
359 self.record_frame_created(&frame);
360
361 Ok(frame)
362 }
363
364 pub fn is_active(&self) -> bool {
366 matches!(self.state, StreamState::Preparing | StreamState::Streaming)
367 }
368
369 pub fn is_finished(&self) -> bool {
371 matches!(
372 self.state,
373 StreamState::Completed | StreamState::Failed | StreamState::Cancelled
374 )
375 }
376
377 pub fn duration(&self) -> Option<chrono::Duration> {
379 self.completed_at.map(|end| end - self.created_at)
380 }
381
382 pub fn progress(&self) -> f64 {
384 match self.state {
385 StreamState::Preparing => 0.0,
386 StreamState::Streaming => {
387 if self.stats.total_frames == 0 {
389 0.1 } else {
391 (self.stats.total_frames as f64 / 100.0).min(0.9)
393 }
394 }
395 StreamState::Completed => 1.0,
396 StreamState::Failed | StreamState::Cancelled => {
397 (self.stats.total_frames as f64 / 100.0).min(0.99)
399 }
400 }
401 }
402
403 pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
405 if !self.is_active() {
406 return Err(DomainError::InvalidStreamState(
407 "Cannot update config of inactive stream".to_string(),
408 ));
409 }
410
411 self.config = config;
412 self.update_timestamp();
413 Ok(())
414 }
415
416 fn update_timestamp(&mut self) {
418 self.updated_at = Utc::now();
419 }
420
421 fn record_frame_created(&mut self, frame: &Frame) {
423 self.next_sequence += 1;
424 self.stats.total_frames += 1;
425
426 let frame_size = frame.estimated_size() as u64;
427 self.stats.total_bytes += frame_size;
428
429 match frame.frame_type() {
430 crate::entities::frame::FrameType::Skeleton => {
431 self.stats.skeleton_frames += 1;
432 self.stats.critical_bytes += frame_size;
433 }
434 crate::entities::frame::FrameType::Patch => {
435 self.stats.patch_frames += 1;
436 if frame.is_critical() {
437 self.stats.critical_bytes += frame_size;
438 } else if frame.is_high_priority() {
439 self.stats.high_priority_bytes += frame_size;
440 }
441 }
442 crate::entities::frame::FrameType::Complete => {
443 self.stats.complete_frames += 1;
444 self.stats.critical_bytes += frame_size;
445 }
446 crate::entities::frame::FrameType::Error => {
447 self.stats.error_frames += 1;
448 self.stats.critical_bytes += frame_size;
449 }
450 }
451
452 self.stats.average_frame_size =
454 self.stats.total_bytes as f64 / self.stats.total_frames as f64;
455
456 self.update_timestamp();
457 }
458
459 fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
461 match data {
463 JsonData::Object(obj) => {
464 let mut skeleton = HashMap::new();
465 for (key, value) in obj.iter() {
466 skeleton.insert(
467 key.clone(),
468 match value {
469 JsonData::Array(_) => JsonData::Array(Vec::new()),
470 JsonData::Object(_) => self.generate_skeleton(value)?,
471 JsonData::Integer(_) => JsonData::Integer(0),
472 JsonData::Float(_) => JsonData::Float(0.0),
473 JsonData::String(_) => JsonData::Null,
474 JsonData::Bool(_) => JsonData::Bool(false),
475 JsonData::Null => JsonData::Null,
476 },
477 );
478 }
479 Ok(JsonData::Object(skeleton))
480 }
481 JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
482 _ => Ok(JsonData::Null),
483 }
484 }
485
486 fn extract_patches(
488 &self,
489 _data: &JsonData,
490 _threshold: Priority,
491 ) -> DomainResult<Vec<crate::entities::frame::FramePatch>> {
492 Ok(Vec::new())
494 }
495
496 fn batch_patches_into_frames(
498 &mut self,
499 patches: Vec<crate::entities::frame::FramePatch>,
500 max_frames: usize,
501 ) -> DomainResult<Vec<Frame>> {
502 if patches.is_empty() {
503 return Ok(Vec::new());
504 }
505
506 let mut frames = Vec::new();
507 let chunk_size = patches.len().div_ceil(max_frames);
508
509 for patch_chunk in patches.chunks(chunk_size) {
510 let priority = patch_chunk
511 .iter()
512 .map(|_| Priority::MEDIUM) .max()
514 .unwrap_or(Priority::MEDIUM);
515
516 let frame = Frame::patch(self.id, self.next_sequence, priority, patch_chunk.to_vec())?;
517
518 frames.push(frame);
519 }
520
521 Ok(frames)
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528
529 #[test]
530 fn test_stream_creation() {
531 let session_id = SessionId::new();
532 let source_data = serde_json::json!({
533 "users": [
534 {"id": 1, "name": "John"},
535 {"id": 2, "name": "Jane"}
536 ],
537 "total": 2
538 });
539
540 let stream = Stream::new(
541 session_id,
542 source_data.clone().into(),
543 StreamConfig::default(),
544 );
545
546 assert_eq!(stream.session_id(), session_id);
547 assert_eq!(stream.state(), &StreamState::Preparing);
548 assert!(stream.is_active());
549 assert!(!stream.is_finished());
550 assert_eq!(stream.progress(), 0.0);
551 }
552
553 #[test]
554 fn test_stream_state_transitions() {
555 let session_id = SessionId::new();
556 let source_data = serde_json::json!({});
557 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
558
559 assert!(stream.start_streaming().is_ok());
561 assert_eq!(stream.state(), &StreamState::Streaming);
562
563 assert!(stream.complete().is_ok());
565 assert_eq!(stream.state(), &StreamState::Completed);
566 assert!(stream.is_finished());
567 assert_eq!(stream.progress(), 1.0);
568 }
569
570 #[test]
571 fn test_invalid_state_transitions() {
572 let session_id = SessionId::new();
573 let source_data = serde_json::json!({});
574 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
575
576 assert!(stream.complete().is_err());
578
579 assert!(stream.start_streaming().is_ok());
581 assert!(stream.complete().is_ok());
582
583 assert!(stream.start_streaming().is_err());
585 }
586
587 #[test]
588 fn test_frame_creation() {
589 let session_id = SessionId::new();
590 let source_data = serde_json::json!({
591 "test": "data"
592 });
593 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
594
595 assert!(stream.create_skeleton_frame().is_err());
597
598 assert!(stream.start_streaming().is_ok());
600 let skeleton = stream
601 .create_skeleton_frame()
602 .expect("Failed to create skeleton frame in test");
603
604 assert_eq!(
605 skeleton.frame_type(),
606 &crate::entities::frame::FrameType::Skeleton
607 );
608 assert_eq!(skeleton.sequence(), 1);
609 assert_eq!(stream.stats().skeleton_frames, 1);
610 }
611
612 #[test]
613 fn test_stream_metadata() {
614 let session_id = SessionId::new();
615 let source_data = serde_json::json!({});
616 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
617
618 stream.add_metadata("source".to_string(), "api".to_string());
619 stream.add_metadata("version".to_string(), "1.0".to_string());
620
621 assert_eq!(stream.metadata().len(), 2);
622 assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
623 }
624}