pjson_rs_domain/entities/
frame.rs

1//! Frame entity with streaming data
2
3use crate::{
4    DomainError, DomainResult,
5    value_objects::{JsonData, JsonPath, Priority, StreamId},
6};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11/// Frame types for different stages of streaming
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum FrameType {
14    /// Initial skeleton with structure
15    Skeleton,
16    /// Data patch update
17    Patch,
18    /// Stream completion signal
19    Complete,
20    /// Error notification
21    Error,
22}
23
24/// Individual frame in a priority stream
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub struct Frame {
27    stream_id: StreamId,
28    frame_type: FrameType,
29    priority: Priority,
30    sequence: u64,
31    timestamp: DateTime<Utc>,
32    payload: JsonData,
33    metadata: HashMap<String, String>,
34}
35
36impl std::hash::Hash for Frame {
37    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
38        self.stream_id.hash(state);
39        self.frame_type.hash(state);
40        self.priority.hash(state);
41        self.sequence.hash(state);
42        self.timestamp.hash(state);
43        self.payload.hash(state);
44
45        // For HashMap, sort keys for consistent hashing
46        let mut pairs: Vec<_> = self.metadata.iter().collect();
47        pairs.sort_by_key(|(k, _)| *k);
48        pairs.hash(state);
49    }
50}
51
52impl Frame {
53    /// Create new skeleton frame
54    pub fn skeleton(stream_id: StreamId, sequence: u64, skeleton_data: JsonData) -> Self {
55        Self {
56            stream_id,
57            frame_type: FrameType::Skeleton,
58            priority: Priority::CRITICAL,
59            sequence,
60            timestamp: Utc::now(),
61            payload: skeleton_data,
62            metadata: HashMap::new(),
63        }
64    }
65
66    /// Create new patch frame
67    pub fn patch(
68        stream_id: StreamId,
69        sequence: u64,
70        priority: Priority,
71        patches: Vec<FramePatch>,
72    ) -> DomainResult<Self> {
73        if patches.is_empty() {
74            return Err(DomainError::InvalidFrame(
75                "Patch frame must contain at least one patch".to_string(),
76            ));
77        }
78
79        // Create JsonData payload directly instead of using serde_json
80        let mut payload_obj = HashMap::with_capacity(1);
81        let patches_array: Vec<JsonData> = patches
82            .into_iter()
83            .map(|patch| {
84                let mut patch_obj = HashMap::with_capacity(3);
85                patch_obj.insert("path".into(), JsonData::String(patch.path.to_string()));
86                patch_obj.insert(
87                    "operation".into(),
88                    JsonData::String(
89                        match patch.operation {
90                            PatchOperation::Set => "set",
91                            PatchOperation::Append => "append",
92                            PatchOperation::Merge => "merge",
93                            PatchOperation::Delete => "delete",
94                        }
95                        .into(),
96                    ),
97                );
98                patch_obj.insert("value".into(), patch.value);
99                JsonData::Object(patch_obj)
100            })
101            .collect();
102
103        payload_obj.insert("patches".into(), JsonData::Array(patches_array));
104        let payload = JsonData::Object(payload_obj);
105
106        Ok(Self {
107            stream_id,
108            frame_type: FrameType::Patch,
109            priority,
110            sequence,
111            timestamp: Utc::now(),
112            payload,
113            metadata: HashMap::new(),
114        })
115    }
116
117    /// Create completion frame
118    pub fn complete(stream_id: StreamId, sequence: u64, checksum: Option<String>) -> Self {
119        let payload = if let Some(checksum) = checksum {
120            let mut obj = HashMap::new();
121            obj.insert("checksum".to_string(), JsonData::String(checksum));
122            JsonData::Object(obj)
123        } else {
124            JsonData::Object(HashMap::new())
125        };
126
127        Self {
128            stream_id,
129            frame_type: FrameType::Complete,
130            priority: Priority::CRITICAL,
131            sequence,
132            timestamp: Utc::now(),
133            payload,
134            metadata: HashMap::new(),
135        }
136    }
137
138    /// Create error frame
139    pub fn error(
140        stream_id: StreamId,
141        sequence: u64,
142        error_message: String,
143        error_code: Option<String>,
144    ) -> Self {
145        let payload = if let Some(code) = error_code {
146            let mut obj = HashMap::new();
147            obj.insert("message".to_string(), JsonData::String(error_message));
148            obj.insert("code".to_string(), JsonData::String(code));
149            JsonData::Object(obj)
150        } else {
151            let mut obj = HashMap::new();
152            obj.insert("message".to_string(), JsonData::String(error_message));
153            JsonData::Object(obj)
154        };
155
156        Self {
157            stream_id,
158            frame_type: FrameType::Error,
159            priority: Priority::CRITICAL,
160            sequence,
161            timestamp: Utc::now(),
162            payload,
163            metadata: HashMap::new(),
164        }
165    }
166
167    /// Get stream ID
168    pub fn stream_id(&self) -> StreamId {
169        self.stream_id
170    }
171
172    /// Get frame type
173    pub fn frame_type(&self) -> &FrameType {
174        &self.frame_type
175    }
176
177    /// Get priority
178    pub fn priority(&self) -> Priority {
179        self.priority
180    }
181
182    /// Get sequence number
183    pub fn sequence(&self) -> u64 {
184        self.sequence
185    }
186
187    /// Get timestamp
188    pub fn timestamp(&self) -> DateTime<Utc> {
189        self.timestamp
190    }
191
192    /// Get payload
193    pub fn payload(&self) -> &JsonData {
194        &self.payload
195    }
196
197    /// Add metadata
198    pub fn with_metadata(mut self, key: String, value: String) -> Self {
199        self.metadata.insert(key, value);
200        self
201    }
202
203    /// Get metadata
204    pub fn metadata(&self) -> &HashMap<String, String> {
205        &self.metadata
206    }
207
208    /// Get metadata value
209    pub fn get_metadata(&self, key: &str) -> Option<&String> {
210        self.metadata.get(key)
211    }
212
213    /// Check if frame is critical priority
214    pub fn is_critical(&self) -> bool {
215        self.priority.is_critical()
216    }
217
218    /// Check if frame is high priority or above
219    pub fn is_high_priority(&self) -> bool {
220        self.priority.is_high_or_above()
221    }
222
223    /// Estimate frame size in bytes (for network planning)
224    pub fn estimated_size(&self) -> usize {
225        // Rough estimation: JSON serialization + metadata overhead
226        let payload_size = self.payload.to_string().len();
227        let metadata_size: usize = self
228            .metadata
229            .iter()
230            .map(|(k, v)| k.len() + v.len() + 4) // JSON overhead
231            .sum();
232
233        payload_size + metadata_size + 200 // Base frame overhead
234    }
235
236    /// Validate frame consistency
237    pub fn validate(&self) -> DomainResult<()> {
238        match &self.frame_type {
239            FrameType::Skeleton => {
240                if !self.priority.is_critical() {
241                    return Err(DomainError::InvalidFrame(
242                        "Skeleton frames must have critical priority".to_string(),
243                    ));
244                }
245            }
246            FrameType::Patch => {
247                // Validate patch payload structure
248                if !self.payload.is_object() {
249                    return Err(DomainError::InvalidFrame(
250                        "Patch frames must have object payload".to_string(),
251                    ));
252                }
253
254                if !self.payload.get("patches").is_some_and(|p| p.is_array()) {
255                    return Err(DomainError::InvalidFrame(
256                        "Patch frames must contain patches array".to_string(),
257                    ));
258                }
259            }
260            FrameType::Complete => {
261                if !self.priority.is_critical() {
262                    return Err(DomainError::InvalidFrame(
263                        "Complete frames must have critical priority".to_string(),
264                    ));
265                }
266            }
267            FrameType::Error => {
268                if !self.priority.is_critical() {
269                    return Err(DomainError::InvalidFrame(
270                        "Error frames must have critical priority".to_string(),
271                    ));
272                }
273
274                if !self.payload.get("message").is_some_and(|m| m.is_string()) {
275                    return Err(DomainError::InvalidFrame(
276                        "Error frames must contain message".to_string(),
277                    ));
278                }
279            }
280        }
281
282        Ok(())
283    }
284}
285
286/// Individual patch within a frame
287#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
288pub struct FramePatch {
289    /// JSON path to the target location
290    pub path: JsonPath,
291    /// Operation to perform at the path
292    pub operation: PatchOperation,
293    /// Value to apply with the operation
294    pub value: JsonData,
295}
296
297/// Patch operation types
298#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
299#[serde(rename_all = "lowercase")]
300pub enum PatchOperation {
301    /// Set a value at the path
302    Set,
303    /// Append to an array at the path
304    Append,
305    /// Merge object at the path
306    Merge,
307    /// Delete value at the path
308    Delete,
309}
310
311/// Patch payload structure (reserved for future use)
312#[allow(dead_code)]
313#[derive(Debug, Clone, Serialize, Deserialize)]
314struct PatchPayload {
315    patches: Vec<FramePatch>,
316}
317
318impl FramePatch {
319    /// Create set operation patch
320    pub fn set(path: JsonPath, value: JsonData) -> Self {
321        Self {
322            path,
323            operation: PatchOperation::Set,
324            value,
325        }
326    }
327
328    /// Create append operation patch
329    pub fn append(path: JsonPath, value: JsonData) -> Self {
330        Self {
331            path,
332            operation: PatchOperation::Append,
333            value,
334        }
335    }
336
337    /// Create merge operation patch
338    pub fn merge(path: JsonPath, value: JsonData) -> Self {
339        Self {
340            path,
341            operation: PatchOperation::Merge,
342            value,
343        }
344    }
345
346    /// Create delete operation patch
347    pub fn delete(path: JsonPath) -> Self {
348        Self {
349            path,
350            operation: PatchOperation::Delete,
351            value: JsonData::Null,
352        }
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    #[test]
361    fn test_skeleton_frame_creation() {
362        let stream_id = StreamId::new();
363        let skeleton_data = serde_json::json!({
364            "users": [],
365            "total": 0
366        });
367
368        let frame = Frame::skeleton(stream_id, 1, skeleton_data.clone().into());
369
370        assert_eq!(frame.frame_type(), &FrameType::Skeleton);
371        assert_eq!(frame.priority(), Priority::CRITICAL);
372        assert_eq!(frame.sequence(), 1);
373        assert_eq!(frame.stream_id(), stream_id);
374        assert!(frame.validate().is_ok());
375    }
376
377    #[test]
378    fn test_patch_frame_creation() {
379        let stream_id = StreamId::new();
380        let path = JsonPath::new("$.users[0].name").expect("Failed to create JsonPath in test");
381        let patch = FramePatch::set(path, JsonData::String("John".to_string()));
382
383        let frame = Frame::patch(stream_id, 2, Priority::HIGH, vec![patch])
384            .expect("Failed to create patch frame in test");
385
386        assert_eq!(frame.frame_type(), &FrameType::Patch);
387        assert_eq!(frame.priority(), Priority::HIGH);
388        assert_eq!(frame.sequence(), 2);
389        assert!(frame.validate().is_ok());
390    }
391
392    #[test]
393    fn test_complete_frame_creation() {
394        let stream_id = StreamId::new();
395        let frame = Frame::complete(stream_id, 10, Some("abc123".to_string()));
396
397        assert_eq!(frame.frame_type(), &FrameType::Complete);
398        assert_eq!(frame.priority(), Priority::CRITICAL);
399        assert_eq!(frame.sequence(), 10);
400        assert!(frame.validate().is_ok());
401    }
402
403    #[test]
404    fn test_frame_with_metadata() {
405        let stream_id = StreamId::new();
406        let skeleton_data = serde_json::json!({});
407        let frame = Frame::skeleton(stream_id, 1, skeleton_data.into())
408            .with_metadata("source".to_string(), "api".to_string())
409            .with_metadata("version".to_string(), "1.0".to_string());
410
411        assert_eq!(frame.get_metadata("source"), Some(&"api".to_string()));
412        assert_eq!(frame.get_metadata("version"), Some(&"1.0".to_string()));
413        assert_eq!(frame.metadata().len(), 2);
414    }
415
416    #[test]
417    fn test_empty_patch_validation() {
418        let stream_id = StreamId::new();
419        let result = Frame::patch(stream_id, 1, Priority::MEDIUM, vec![]);
420
421        assert!(result.is_err());
422    }
423}