Skip to main content

pjson_rs_domain/entities/
frame.rs

1//! Frame entity with streaming data
2
3use crate::{
4    DomainError, DomainResult,
5    value_objects::{JsonData, JsonPath, Priority, StreamId},
6};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11/// Custom serde for StreamId within entities
12mod serde_stream_id {
13    use crate::value_objects::StreamId;
14    use serde::{Deserialize, Deserializer, Serialize, Serializer};
15
16    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
17    where
18        S: Serializer,
19    {
20        id.as_uuid().serialize(serializer)
21    }
22
23    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
24    where
25        D: Deserializer<'de>,
26    {
27        let uuid = uuid::Uuid::deserialize(deserializer)?;
28        Ok(StreamId::from_uuid(uuid))
29    }
30}
31
32/// Custom serde for Priority within entities
33mod serde_priority {
34    use crate::value_objects::Priority;
35    use serde::{Deserialize, Deserializer, Serialize, Serializer};
36
37    pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
38    where
39        S: Serializer,
40    {
41        priority.value().serialize(serializer)
42    }
43
44    pub fn deserialize<'de, D>(deserializer: D) -> Result<Priority, D::Error>
45    where
46        D: Deserializer<'de>,
47    {
48        let value = u8::deserialize(deserializer)?;
49        Priority::new(value).map_err(serde::de::Error::custom)
50    }
51}
52
53/// Custom serde for JsonPath within entities
54mod serde_json_path {
55    use crate::value_objects::JsonPath;
56    use serde::{Deserialize, Deserializer, Serialize, Serializer};
57
58    pub fn serialize<S>(path: &JsonPath, serializer: S) -> Result<S::Ok, S::Error>
59    where
60        S: Serializer,
61    {
62        path.as_str().serialize(serializer)
63    }
64
65    pub fn deserialize<'de, D>(deserializer: D) -> Result<JsonPath, D::Error>
66    where
67        D: Deserializer<'de>,
68    {
69        let s = String::deserialize(deserializer)?;
70        JsonPath::new(s).map_err(serde::de::Error::custom)
71    }
72}
73
74/// Frame types for different stages of streaming
75#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
76pub enum FrameType {
77    /// Initial skeleton with structure
78    Skeleton,
79    /// Data patch update
80    Patch,
81    /// Stream completion signal
82    Complete,
83    /// Error notification
84    Error,
85}
86
87/// Individual frame in a priority stream
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89pub struct Frame {
90    #[serde(with = "serde_stream_id")]
91    stream_id: StreamId,
92    frame_type: FrameType,
93    #[serde(with = "serde_priority")]
94    priority: Priority,
95    sequence: u64,
96    timestamp: DateTime<Utc>,
97    payload: JsonData,
98    metadata: HashMap<String, String>,
99}
100
101impl std::hash::Hash for Frame {
102    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
103        self.stream_id.hash(state);
104        self.frame_type.hash(state);
105        self.priority.hash(state);
106        self.sequence.hash(state);
107        self.timestamp.hash(state);
108        self.payload.hash(state);
109
110        // For HashMap, sort keys for consistent hashing
111        let mut pairs: Vec<_> = self.metadata.iter().collect();
112        pairs.sort_by_key(|(k, _)| *k);
113        pairs.hash(state);
114    }
115}
116
117impl Frame {
118    /// Create new skeleton frame
119    pub fn skeleton(stream_id: StreamId, sequence: u64, skeleton_data: JsonData) -> Self {
120        Self {
121            stream_id,
122            frame_type: FrameType::Skeleton,
123            priority: Priority::CRITICAL,
124            sequence,
125            timestamp: Utc::now(),
126            payload: skeleton_data,
127            metadata: HashMap::new(),
128        }
129    }
130
131    /// Create new patch frame
132    pub fn patch(
133        stream_id: StreamId,
134        sequence: u64,
135        priority: Priority,
136        patches: Vec<FramePatch>,
137    ) -> DomainResult<Self> {
138        if patches.is_empty() {
139            return Err(DomainError::InvalidFrame(
140                "Patch frame must contain at least one patch".to_string(),
141            ));
142        }
143
144        // Create JsonData payload directly instead of using serde_json
145        let mut payload_obj = HashMap::with_capacity(1);
146        let patches_array: Vec<JsonData> = patches
147            .into_iter()
148            .map(|patch| {
149                let mut patch_obj = HashMap::with_capacity(3);
150                patch_obj.insert("path".into(), JsonData::String(patch.path.to_string()));
151                patch_obj.insert(
152                    "operation".into(),
153                    JsonData::String(
154                        match patch.operation {
155                            PatchOperation::Set => "set",
156                            PatchOperation::Append => "append",
157                            PatchOperation::Merge => "merge",
158                            PatchOperation::Delete => "delete",
159                        }
160                        .into(),
161                    ),
162                );
163                patch_obj.insert("value".into(), patch.value);
164                JsonData::Object(patch_obj)
165            })
166            .collect();
167
168        payload_obj.insert("patches".into(), JsonData::Array(patches_array));
169        let payload = JsonData::Object(payload_obj);
170
171        Ok(Self {
172            stream_id,
173            frame_type: FrameType::Patch,
174            priority,
175            sequence,
176            timestamp: Utc::now(),
177            payload,
178            metadata: HashMap::new(),
179        })
180    }
181
182    /// Create completion frame
183    pub fn complete(stream_id: StreamId, sequence: u64, checksum: Option<String>) -> Self {
184        let payload = if let Some(checksum) = checksum {
185            let mut obj = HashMap::new();
186            obj.insert("checksum".to_string(), JsonData::String(checksum));
187            JsonData::Object(obj)
188        } else {
189            JsonData::Object(HashMap::new())
190        };
191
192        Self {
193            stream_id,
194            frame_type: FrameType::Complete,
195            priority: Priority::CRITICAL,
196            sequence,
197            timestamp: Utc::now(),
198            payload,
199            metadata: HashMap::new(),
200        }
201    }
202
203    /// Create error frame
204    pub fn error(
205        stream_id: StreamId,
206        sequence: u64,
207        error_message: String,
208        error_code: Option<String>,
209    ) -> Self {
210        let payload = if let Some(code) = error_code {
211            let mut obj = HashMap::new();
212            obj.insert("message".to_string(), JsonData::String(error_message));
213            obj.insert("code".to_string(), JsonData::String(code));
214            JsonData::Object(obj)
215        } else {
216            let mut obj = HashMap::new();
217            obj.insert("message".to_string(), JsonData::String(error_message));
218            JsonData::Object(obj)
219        };
220
221        Self {
222            stream_id,
223            frame_type: FrameType::Error,
224            priority: Priority::CRITICAL,
225            sequence,
226            timestamp: Utc::now(),
227            payload,
228            metadata: HashMap::new(),
229        }
230    }
231
232    /// Get stream ID
233    pub fn stream_id(&self) -> StreamId {
234        self.stream_id
235    }
236
237    /// Get frame type
238    pub fn frame_type(&self) -> &FrameType {
239        &self.frame_type
240    }
241
242    /// Get priority
243    pub fn priority(&self) -> Priority {
244        self.priority
245    }
246
247    /// Get sequence number
248    pub fn sequence(&self) -> u64 {
249        self.sequence
250    }
251
252    /// Get timestamp
253    pub fn timestamp(&self) -> DateTime<Utc> {
254        self.timestamp
255    }
256
257    /// Get payload
258    pub fn payload(&self) -> &JsonData {
259        &self.payload
260    }
261
262    /// Add metadata
263    pub fn with_metadata(mut self, key: String, value: String) -> Self {
264        self.metadata.insert(key, value);
265        self
266    }
267
268    /// Get metadata
269    pub fn metadata(&self) -> &HashMap<String, String> {
270        &self.metadata
271    }
272
273    /// Get metadata value
274    pub fn get_metadata(&self, key: &str) -> Option<&String> {
275        self.metadata.get(key)
276    }
277
278    /// Check if frame is critical priority
279    pub fn is_critical(&self) -> bool {
280        self.priority.is_critical()
281    }
282
283    /// Check if frame is high priority or above
284    pub fn is_high_priority(&self) -> bool {
285        self.priority.is_high_or_above()
286    }
287
288    /// Estimate frame size in bytes (for network planning)
289    pub fn estimated_size(&self) -> usize {
290        // Rough estimation: JSON serialization + metadata overhead
291        let payload_size = self.payload.to_string().len();
292        let metadata_size: usize = self
293            .metadata
294            .iter()
295            .map(|(k, v)| k.len() + v.len() + 4) // JSON overhead
296            .sum();
297
298        payload_size + metadata_size + 200 // Base frame overhead
299    }
300
301    /// Validate frame consistency
302    pub fn validate(&self) -> DomainResult<()> {
303        match &self.frame_type {
304            FrameType::Skeleton => {
305                if !self.priority.is_critical() {
306                    return Err(DomainError::InvalidFrame(
307                        "Skeleton frames must have critical priority".to_string(),
308                    ));
309                }
310            }
311            FrameType::Patch => {
312                // Validate patch payload structure
313                if !self.payload.is_object() {
314                    return Err(DomainError::InvalidFrame(
315                        "Patch frames must have object payload".to_string(),
316                    ));
317                }
318
319                if !self.payload.get("patches").is_some_and(|p| p.is_array()) {
320                    return Err(DomainError::InvalidFrame(
321                        "Patch frames must contain patches array".to_string(),
322                    ));
323                }
324            }
325            FrameType::Complete => {
326                if !self.priority.is_critical() {
327                    return Err(DomainError::InvalidFrame(
328                        "Complete frames must have critical priority".to_string(),
329                    ));
330                }
331            }
332            FrameType::Error => {
333                if !self.priority.is_critical() {
334                    return Err(DomainError::InvalidFrame(
335                        "Error frames must have critical priority".to_string(),
336                    ));
337                }
338
339                if !self.payload.get("message").is_some_and(|m| m.is_string()) {
340                    return Err(DomainError::InvalidFrame(
341                        "Error frames must contain message".to_string(),
342                    ));
343                }
344            }
345        }
346
347        Ok(())
348    }
349}
350
351/// Individual patch within a frame
352#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
353pub struct FramePatch {
354    /// JSON path to the target location
355    #[serde(with = "serde_json_path")]
356    pub path: JsonPath,
357    /// Operation to perform at the path
358    pub operation: PatchOperation,
359    /// Value to apply with the operation
360    pub value: JsonData,
361}
362
363/// Patch operation types
364#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
365#[serde(rename_all = "lowercase")]
366pub enum PatchOperation {
367    /// Set a value at the path
368    Set,
369    /// Append to an array at the path
370    Append,
371    /// Merge object at the path
372    Merge,
373    /// Delete value at the path
374    Delete,
375}
376
377impl FramePatch {
378    /// Create set operation patch
379    pub fn set(path: JsonPath, value: JsonData) -> Self {
380        Self {
381            path,
382            operation: PatchOperation::Set,
383            value,
384        }
385    }
386
387    /// Create append operation patch
388    pub fn append(path: JsonPath, value: JsonData) -> Self {
389        Self {
390            path,
391            operation: PatchOperation::Append,
392            value,
393        }
394    }
395
396    /// Create merge operation patch
397    pub fn merge(path: JsonPath, value: JsonData) -> Self {
398        Self {
399            path,
400            operation: PatchOperation::Merge,
401            value,
402        }
403    }
404
405    /// Create delete operation patch
406    pub fn delete(path: JsonPath) -> Self {
407        Self {
408            path,
409            operation: PatchOperation::Delete,
410            value: JsonData::Null,
411        }
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    #[test]
420    fn test_skeleton_frame_creation() {
421        let stream_id = StreamId::new();
422        let skeleton_data = serde_json::json!({
423            "users": [],
424            "total": 0
425        });
426
427        let frame = Frame::skeleton(stream_id, 1, skeleton_data.clone().into());
428
429        assert_eq!(frame.frame_type(), &FrameType::Skeleton);
430        assert_eq!(frame.priority(), Priority::CRITICAL);
431        assert_eq!(frame.sequence(), 1);
432        assert_eq!(frame.stream_id(), stream_id);
433        assert!(frame.validate().is_ok());
434    }
435
436    #[test]
437    fn test_patch_frame_creation() {
438        let stream_id = StreamId::new();
439        let path = JsonPath::new("$.users[0].name").expect("Failed to create JsonPath in test");
440        let patch = FramePatch::set(path, JsonData::String("John".to_string()));
441
442        let frame = Frame::patch(stream_id, 2, Priority::HIGH, vec![patch])
443            .expect("Failed to create patch frame in test");
444
445        assert_eq!(frame.frame_type(), &FrameType::Patch);
446        assert_eq!(frame.priority(), Priority::HIGH);
447        assert_eq!(frame.sequence(), 2);
448        assert!(frame.validate().is_ok());
449    }
450
451    #[test]
452    fn test_complete_frame_creation() {
453        let stream_id = StreamId::new();
454        let frame = Frame::complete(stream_id, 10, Some("abc123".to_string()));
455
456        assert_eq!(frame.frame_type(), &FrameType::Complete);
457        assert_eq!(frame.priority(), Priority::CRITICAL);
458        assert_eq!(frame.sequence(), 10);
459        assert!(frame.validate().is_ok());
460    }
461
462    #[test]
463    fn test_frame_with_metadata() {
464        let stream_id = StreamId::new();
465        let skeleton_data = serde_json::json!({});
466        let frame = Frame::skeleton(stream_id, 1, skeleton_data.into())
467            .with_metadata("source".to_string(), "api".to_string())
468            .with_metadata("version".to_string(), "1.0".to_string());
469
470        assert_eq!(frame.get_metadata("source"), Some(&"api".to_string()));
471        assert_eq!(frame.get_metadata("version"), Some(&"1.0".to_string()));
472        assert_eq!(frame.metadata().len(), 2);
473    }
474
475    #[test]
476    fn test_empty_patch_validation() {
477        let stream_id = StreamId::new();
478        let result = Frame::patch(stream_id, 1, Priority::MEDIUM, vec![]);
479
480        assert!(result.is_err());
481    }
482}