pjson_rs_domain/entities/
stream.rs1use crate::{
4 DomainError, DomainResult,
5 entities::Frame,
6 value_objects::{JsonData, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub enum StreamState {
15 Preparing,
17 Streaming,
19 Completed,
21 Failed,
23 Cancelled,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct StreamConfig {
30 pub max_frame_size: usize,
32 pub max_frames_per_batch: usize,
34 pub enable_compression: bool,
36 pub priority_rules: HashMap<String, Priority>,
38}
39
40impl Default for StreamConfig {
41 fn default() -> Self {
42 Self {
43 max_frame_size: 64 * 1024, max_frames_per_batch: 10,
45 enable_compression: true,
46 priority_rules: HashMap::new(),
47 }
48 }
49}
50
51#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct StreamStats {
54 pub total_frames: u64,
56 pub skeleton_frames: u64,
58 pub patch_frames: u64,
60 pub complete_frames: u64,
62 pub error_frames: u64,
64 pub total_bytes: u64,
66 pub critical_bytes: u64,
68 pub high_priority_bytes: u64,
70 pub average_frame_size: f64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Stream {
77 id: StreamId,
78 session_id: SessionId,
79 state: StreamState,
80 config: StreamConfig,
81 stats: StreamStats,
82 created_at: DateTime<Utc>,
83 updated_at: DateTime<Utc>,
84 completed_at: Option<DateTime<Utc>>,
85 next_sequence: u64,
86 source_data: Option<JsonData>,
87 metadata: HashMap<String, String>,
88}
89
90impl Stream {
91 pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
93 let now = Utc::now();
94
95 Self {
96 id: StreamId::new(),
97 session_id,
98 state: StreamState::Preparing,
99 config,
100 stats: StreamStats::default(),
101 created_at: now,
102 updated_at: now,
103 completed_at: None,
104 next_sequence: 1,
105 source_data: Some(source_data),
106 metadata: HashMap::new(),
107 }
108 }
109
110 pub fn id(&self) -> StreamId {
112 self.id
113 }
114
115 pub fn session_id(&self) -> SessionId {
117 self.session_id
118 }
119
120 pub fn state(&self) -> &StreamState {
122 &self.state
123 }
124
125 pub fn config(&self) -> &StreamConfig {
127 &self.config
128 }
129
130 pub fn stats(&self) -> &StreamStats {
132 &self.stats
133 }
134
135 pub fn created_at(&self) -> DateTime<Utc> {
137 self.created_at
138 }
139
140 pub fn updated_at(&self) -> DateTime<Utc> {
142 self.updated_at
143 }
144
145 pub fn completed_at(&self) -> Option<DateTime<Utc>> {
147 self.completed_at
148 }
149
150 pub fn source_data(&self) -> Option<&JsonData> {
152 self.source_data.as_ref()
153 }
154
155 pub fn metadata(&self) -> &HashMap<String, String> {
157 &self.metadata
158 }
159
160 pub fn add_metadata(&mut self, key: String, value: String) {
162 self.metadata.insert(key, value);
163 self.update_timestamp();
164 }
165
166 pub fn start_streaming(&mut self) -> DomainResult<()> {
168 match self.state {
169 StreamState::Preparing => {
170 self.state = StreamState::Streaming;
171 self.update_timestamp();
172 Ok(())
173 }
174 _ => Err(DomainError::InvalidStateTransition(format!(
175 "Cannot start streaming from state: {:?}",
176 self.state
177 ))),
178 }
179 }
180
181 pub fn complete(&mut self) -> DomainResult<()> {
183 match self.state {
184 StreamState::Streaming => {
185 self.state = StreamState::Completed;
186 self.completed_at = Some(Utc::now());
187 self.update_timestamp();
188 Ok(())
189 }
190 _ => Err(DomainError::InvalidStateTransition(format!(
191 "Cannot complete stream from state: {:?}",
192 self.state
193 ))),
194 }
195 }
196
197 pub fn fail(&mut self, error: String) -> DomainResult<()> {
199 match self.state {
200 StreamState::Preparing | StreamState::Streaming => {
201 self.state = StreamState::Failed;
202 self.completed_at = Some(Utc::now());
203 self.add_metadata("error".to_string(), error);
204 Ok(())
205 }
206 _ => Err(DomainError::InvalidStateTransition(format!(
207 "Cannot fail stream from state: {:?}",
208 self.state
209 ))),
210 }
211 }
212
213 pub fn cancel(&mut self) -> DomainResult<()> {
215 match self.state {
216 StreamState::Preparing | StreamState::Streaming => {
217 self.state = StreamState::Cancelled;
218 self.completed_at = Some(Utc::now());
219 self.update_timestamp();
220 Ok(())
221 }
222 _ => Err(DomainError::InvalidStateTransition(format!(
223 "Cannot cancel stream from state: {:?}",
224 self.state
225 ))),
226 }
227 }
228
229 pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
231 if !matches!(self.state, StreamState::Streaming) {
232 return Err(DomainError::InvalidStreamState(
233 "Stream must be in streaming state to create frames".to_string(),
234 ));
235 }
236
237 let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
238 DomainError::InvalidStreamState("No source data available for skeleton".to_string())
239 })?;
240
241 let skeleton = self.generate_skeleton(skeleton_data)?;
242 let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
243
244 self.record_frame_created(&frame);
245
246 Ok(frame)
247 }
248
249 pub fn create_patch_frames(
251 &mut self,
252 priority_threshold: Priority,
253 max_frames: usize,
254 ) -> DomainResult<Vec<Frame>> {
255 if !matches!(self.state, StreamState::Streaming) {
256 return Err(DomainError::InvalidStreamState(
257 "Stream must be in streaming state to create frames".to_string(),
258 ));
259 }
260
261 let source_data = self.source_data.as_ref().ok_or_else(|| {
262 DomainError::InvalidStreamState("No source data available for patches".to_string())
263 })?;
264
265 let patches = self.extract_patches(source_data, priority_threshold)?;
266 let frames = self.batch_patches_into_frames(patches, max_frames)?;
267
268 for frame in &frames {
269 self.record_frame_created(frame);
270 }
271
272 Ok(frames)
273 }
274
275 pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
277 if !matches!(self.state, StreamState::Streaming) {
278 return Err(DomainError::InvalidStreamState(
279 "Stream must be in streaming state to create frames".to_string(),
280 ));
281 }
282
283 let frame = Frame::complete(self.id, self.next_sequence, checksum);
284 self.record_frame_created(&frame);
285
286 Ok(frame)
287 }
288
289 pub fn is_active(&self) -> bool {
291 matches!(self.state, StreamState::Preparing | StreamState::Streaming)
292 }
293
294 pub fn is_finished(&self) -> bool {
296 matches!(
297 self.state,
298 StreamState::Completed | StreamState::Failed | StreamState::Cancelled
299 )
300 }
301
302 pub fn duration(&self) -> Option<chrono::Duration> {
304 self.completed_at.map(|end| end - self.created_at)
305 }
306
307 pub fn progress(&self) -> f64 {
309 match self.state {
310 StreamState::Preparing => 0.0,
311 StreamState::Streaming => {
312 if self.stats.total_frames == 0 {
314 0.1 } else {
316 (self.stats.total_frames as f64 / 100.0).min(0.9)
318 }
319 }
320 StreamState::Completed => 1.0,
321 StreamState::Failed | StreamState::Cancelled => {
322 (self.stats.total_frames as f64 / 100.0).min(0.99)
324 }
325 }
326 }
327
328 pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
330 if !self.is_active() {
331 return Err(DomainError::InvalidStreamState(
332 "Cannot update config of inactive stream".to_string(),
333 ));
334 }
335
336 self.config = config;
337 self.update_timestamp();
338 Ok(())
339 }
340
341 fn update_timestamp(&mut self) {
343 self.updated_at = Utc::now();
344 }
345
346 fn record_frame_created(&mut self, frame: &Frame) {
348 self.next_sequence += 1;
349 self.stats.total_frames += 1;
350
351 let frame_size = frame.estimated_size() as u64;
352 self.stats.total_bytes += frame_size;
353
354 match frame.frame_type() {
355 crate::entities::frame::FrameType::Skeleton => {
356 self.stats.skeleton_frames += 1;
357 self.stats.critical_bytes += frame_size;
358 }
359 crate::entities::frame::FrameType::Patch => {
360 self.stats.patch_frames += 1;
361 if frame.is_critical() {
362 self.stats.critical_bytes += frame_size;
363 } else if frame.is_high_priority() {
364 self.stats.high_priority_bytes += frame_size;
365 }
366 }
367 crate::entities::frame::FrameType::Complete => {
368 self.stats.complete_frames += 1;
369 self.stats.critical_bytes += frame_size;
370 }
371 crate::entities::frame::FrameType::Error => {
372 self.stats.error_frames += 1;
373 self.stats.critical_bytes += frame_size;
374 }
375 }
376
377 self.stats.average_frame_size =
379 self.stats.total_bytes as f64 / self.stats.total_frames as f64;
380
381 self.update_timestamp();
382 }
383
384 fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
386 match data {
388 JsonData::Object(obj) => {
389 let mut skeleton = HashMap::new();
390 for (key, value) in obj.iter() {
391 skeleton.insert(
392 key.clone(),
393 match value {
394 JsonData::Array(_) => JsonData::Array(Vec::new()),
395 JsonData::Object(_) => self.generate_skeleton(value)?,
396 JsonData::Integer(_) => JsonData::Integer(0),
397 JsonData::Float(_) => JsonData::Float(0.0),
398 JsonData::String(_) => JsonData::Null,
399 JsonData::Bool(_) => JsonData::Bool(false),
400 JsonData::Null => JsonData::Null,
401 },
402 );
403 }
404 Ok(JsonData::Object(skeleton))
405 }
406 JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
407 _ => Ok(JsonData::Null),
408 }
409 }
410
411 fn extract_patches(
413 &self,
414 _data: &JsonData,
415 _threshold: Priority,
416 ) -> DomainResult<Vec<crate::entities::frame::FramePatch>> {
417 Ok(Vec::new())
419 }
420
421 fn batch_patches_into_frames(
423 &mut self,
424 patches: Vec<crate::entities::frame::FramePatch>,
425 max_frames: usize,
426 ) -> DomainResult<Vec<Frame>> {
427 if patches.is_empty() {
428 return Ok(Vec::new());
429 }
430
431 let mut frames = Vec::new();
432 let chunk_size = patches.len().div_ceil(max_frames);
433
434 for patch_chunk in patches.chunks(chunk_size) {
435 let priority = patch_chunk
436 .iter()
437 .map(|_| Priority::MEDIUM) .max()
439 .unwrap_or(Priority::MEDIUM);
440
441 let frame = Frame::patch(self.id, self.next_sequence, priority, patch_chunk.to_vec())?;
442
443 frames.push(frame);
444 }
445
446 Ok(frames)
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn test_stream_creation() {
456 let session_id = SessionId::new();
457 let source_data = serde_json::json!({
458 "users": [
459 {"id": 1, "name": "John"},
460 {"id": 2, "name": "Jane"}
461 ],
462 "total": 2
463 });
464
465 let stream = Stream::new(
466 session_id,
467 source_data.clone().into(),
468 StreamConfig::default(),
469 );
470
471 assert_eq!(stream.session_id(), session_id);
472 assert_eq!(stream.state(), &StreamState::Preparing);
473 assert!(stream.is_active());
474 assert!(!stream.is_finished());
475 assert_eq!(stream.progress(), 0.0);
476 }
477
478 #[test]
479 fn test_stream_state_transitions() {
480 let session_id = SessionId::new();
481 let source_data = serde_json::json!({});
482 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
483
484 assert!(stream.start_streaming().is_ok());
486 assert_eq!(stream.state(), &StreamState::Streaming);
487
488 assert!(stream.complete().is_ok());
490 assert_eq!(stream.state(), &StreamState::Completed);
491 assert!(stream.is_finished());
492 assert_eq!(stream.progress(), 1.0);
493 }
494
495 #[test]
496 fn test_invalid_state_transitions() {
497 let session_id = SessionId::new();
498 let source_data = serde_json::json!({});
499 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
500
501 assert!(stream.complete().is_err());
503
504 assert!(stream.start_streaming().is_ok());
506 assert!(stream.complete().is_ok());
507
508 assert!(stream.start_streaming().is_err());
510 }
511
512 #[test]
513 fn test_frame_creation() {
514 let session_id = SessionId::new();
515 let source_data = serde_json::json!({
516 "test": "data"
517 });
518 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
519
520 assert!(stream.create_skeleton_frame().is_err());
522
523 assert!(stream.start_streaming().is_ok());
525 let skeleton = stream
526 .create_skeleton_frame()
527 .expect("Failed to create skeleton frame in test");
528
529 assert_eq!(
530 skeleton.frame_type(),
531 &crate::entities::frame::FrameType::Skeleton
532 );
533 assert_eq!(skeleton.sequence(), 1);
534 assert_eq!(stream.stats().skeleton_frames, 1);
535 }
536
537 #[test]
538 fn test_stream_metadata() {
539 let session_id = SessionId::new();
540 let source_data = serde_json::json!({});
541 let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
542
543 stream.add_metadata("source".to_string(), "api".to_string());
544 stream.add_metadata("version".to_string(), "1.0".to_string());
545
546 assert_eq!(stream.metadata().len(), 2);
547 assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
548 }
549}