pjson_rs_domain/entities/
frame.rs1use crate::{
4 DomainError, DomainResult,
5 value_objects::{JsonData, JsonPath, Priority, StreamId},
6};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum FrameType {
14 Skeleton,
16 Patch,
18 Complete,
20 Error,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub struct Frame {
27 stream_id: StreamId,
28 frame_type: FrameType,
29 priority: Priority,
30 sequence: u64,
31 timestamp: DateTime<Utc>,
32 payload: JsonData,
33 metadata: HashMap<String, String>,
34}
35
36impl std::hash::Hash for Frame {
37 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
38 self.stream_id.hash(state);
39 self.frame_type.hash(state);
40 self.priority.hash(state);
41 self.sequence.hash(state);
42 self.timestamp.hash(state);
43 self.payload.hash(state);
44
45 let mut pairs: Vec<_> = self.metadata.iter().collect();
47 pairs.sort_by_key(|(k, _)| *k);
48 pairs.hash(state);
49 }
50}
51
52impl Frame {
53 pub fn skeleton(stream_id: StreamId, sequence: u64, skeleton_data: JsonData) -> Self {
55 Self {
56 stream_id,
57 frame_type: FrameType::Skeleton,
58 priority: Priority::CRITICAL,
59 sequence,
60 timestamp: Utc::now(),
61 payload: skeleton_data,
62 metadata: HashMap::new(),
63 }
64 }
65
66 pub fn patch(
68 stream_id: StreamId,
69 sequence: u64,
70 priority: Priority,
71 patches: Vec<FramePatch>,
72 ) -> DomainResult<Self> {
73 if patches.is_empty() {
74 return Err(DomainError::InvalidFrame(
75 "Patch frame must contain at least one patch".to_string(),
76 ));
77 }
78
79 let mut payload_obj = HashMap::with_capacity(1);
81 let patches_array: Vec<JsonData> = patches
82 .into_iter()
83 .map(|patch| {
84 let mut patch_obj = HashMap::with_capacity(3);
85 patch_obj.insert("path".into(), JsonData::String(patch.path.to_string()));
86 patch_obj.insert(
87 "operation".into(),
88 JsonData::String(
89 match patch.operation {
90 PatchOperation::Set => "set",
91 PatchOperation::Append => "append",
92 PatchOperation::Merge => "merge",
93 PatchOperation::Delete => "delete",
94 }
95 .into(),
96 ),
97 );
98 patch_obj.insert("value".into(), patch.value);
99 JsonData::Object(patch_obj)
100 })
101 .collect();
102
103 payload_obj.insert("patches".into(), JsonData::Array(patches_array));
104 let payload = JsonData::Object(payload_obj);
105
106 Ok(Self {
107 stream_id,
108 frame_type: FrameType::Patch,
109 priority,
110 sequence,
111 timestamp: Utc::now(),
112 payload,
113 metadata: HashMap::new(),
114 })
115 }
116
117 pub fn complete(stream_id: StreamId, sequence: u64, checksum: Option<String>) -> Self {
119 let payload = if let Some(checksum) = checksum {
120 let mut obj = HashMap::new();
121 obj.insert("checksum".to_string(), JsonData::String(checksum));
122 JsonData::Object(obj)
123 } else {
124 JsonData::Object(HashMap::new())
125 };
126
127 Self {
128 stream_id,
129 frame_type: FrameType::Complete,
130 priority: Priority::CRITICAL,
131 sequence,
132 timestamp: Utc::now(),
133 payload,
134 metadata: HashMap::new(),
135 }
136 }
137
138 pub fn error(
140 stream_id: StreamId,
141 sequence: u64,
142 error_message: String,
143 error_code: Option<String>,
144 ) -> Self {
145 let payload = if let Some(code) = error_code {
146 let mut obj = HashMap::new();
147 obj.insert("message".to_string(), JsonData::String(error_message));
148 obj.insert("code".to_string(), JsonData::String(code));
149 JsonData::Object(obj)
150 } else {
151 let mut obj = HashMap::new();
152 obj.insert("message".to_string(), JsonData::String(error_message));
153 JsonData::Object(obj)
154 };
155
156 Self {
157 stream_id,
158 frame_type: FrameType::Error,
159 priority: Priority::CRITICAL,
160 sequence,
161 timestamp: Utc::now(),
162 payload,
163 metadata: HashMap::new(),
164 }
165 }
166
167 pub fn stream_id(&self) -> StreamId {
169 self.stream_id
170 }
171
172 pub fn frame_type(&self) -> &FrameType {
174 &self.frame_type
175 }
176
177 pub fn priority(&self) -> Priority {
179 self.priority
180 }
181
182 pub fn sequence(&self) -> u64 {
184 self.sequence
185 }
186
187 pub fn timestamp(&self) -> DateTime<Utc> {
189 self.timestamp
190 }
191
192 pub fn payload(&self) -> &JsonData {
194 &self.payload
195 }
196
197 pub fn with_metadata(mut self, key: String, value: String) -> Self {
199 self.metadata.insert(key, value);
200 self
201 }
202
203 pub fn metadata(&self) -> &HashMap<String, String> {
205 &self.metadata
206 }
207
208 pub fn get_metadata(&self, key: &str) -> Option<&String> {
210 self.metadata.get(key)
211 }
212
213 pub fn is_critical(&self) -> bool {
215 self.priority.is_critical()
216 }
217
218 pub fn is_high_priority(&self) -> bool {
220 self.priority.is_high_or_above()
221 }
222
223 pub fn estimated_size(&self) -> usize {
225 let payload_size = self.payload.to_string().len();
227 let metadata_size: usize = self
228 .metadata
229 .iter()
230 .map(|(k, v)| k.len() + v.len() + 4) .sum();
232
233 payload_size + metadata_size + 200 }
235
236 pub fn validate(&self) -> DomainResult<()> {
238 match &self.frame_type {
239 FrameType::Skeleton => {
240 if !self.priority.is_critical() {
241 return Err(DomainError::InvalidFrame(
242 "Skeleton frames must have critical priority".to_string(),
243 ));
244 }
245 }
246 FrameType::Patch => {
247 if !self.payload.is_object() {
249 return Err(DomainError::InvalidFrame(
250 "Patch frames must have object payload".to_string(),
251 ));
252 }
253
254 if !self.payload.get("patches").is_some_and(|p| p.is_array()) {
255 return Err(DomainError::InvalidFrame(
256 "Patch frames must contain patches array".to_string(),
257 ));
258 }
259 }
260 FrameType::Complete => {
261 if !self.priority.is_critical() {
262 return Err(DomainError::InvalidFrame(
263 "Complete frames must have critical priority".to_string(),
264 ));
265 }
266 }
267 FrameType::Error => {
268 if !self.priority.is_critical() {
269 return Err(DomainError::InvalidFrame(
270 "Error frames must have critical priority".to_string(),
271 ));
272 }
273
274 if !self.payload.get("message").is_some_and(|m| m.is_string()) {
275 return Err(DomainError::InvalidFrame(
276 "Error frames must contain message".to_string(),
277 ));
278 }
279 }
280 }
281
282 Ok(())
283 }
284}
285
286#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
288pub struct FramePatch {
289 pub path: JsonPath,
291 pub operation: PatchOperation,
293 pub value: JsonData,
295}
296
297#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
299#[serde(rename_all = "lowercase")]
300pub enum PatchOperation {
301 Set,
303 Append,
305 Merge,
307 Delete,
309}
310
311#[allow(dead_code)]
313#[derive(Debug, Clone, Serialize, Deserialize)]
314struct PatchPayload {
315 patches: Vec<FramePatch>,
316}
317
318impl FramePatch {
319 pub fn set(path: JsonPath, value: JsonData) -> Self {
321 Self {
322 path,
323 operation: PatchOperation::Set,
324 value,
325 }
326 }
327
328 pub fn append(path: JsonPath, value: JsonData) -> Self {
330 Self {
331 path,
332 operation: PatchOperation::Append,
333 value,
334 }
335 }
336
337 pub fn merge(path: JsonPath, value: JsonData) -> Self {
339 Self {
340 path,
341 operation: PatchOperation::Merge,
342 value,
343 }
344 }
345
346 pub fn delete(path: JsonPath) -> Self {
348 Self {
349 path,
350 operation: PatchOperation::Delete,
351 value: JsonData::Null,
352 }
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359
360 #[test]
361 fn test_skeleton_frame_creation() {
362 let stream_id = StreamId::new();
363 let skeleton_data = serde_json::json!({
364 "users": [],
365 "total": 0
366 });
367
368 let frame = Frame::skeleton(stream_id, 1, skeleton_data.clone().into());
369
370 assert_eq!(frame.frame_type(), &FrameType::Skeleton);
371 assert_eq!(frame.priority(), Priority::CRITICAL);
372 assert_eq!(frame.sequence(), 1);
373 assert_eq!(frame.stream_id(), stream_id);
374 assert!(frame.validate().is_ok());
375 }
376
377 #[test]
378 fn test_patch_frame_creation() {
379 let stream_id = StreamId::new();
380 let path = JsonPath::new("$.users[0].name").expect("Failed to create JsonPath in test");
381 let patch = FramePatch::set(path, JsonData::String("John".to_string()));
382
383 let frame = Frame::patch(stream_id, 2, Priority::HIGH, vec![patch])
384 .expect("Failed to create patch frame in test");
385
386 assert_eq!(frame.frame_type(), &FrameType::Patch);
387 assert_eq!(frame.priority(), Priority::HIGH);
388 assert_eq!(frame.sequence(), 2);
389 assert!(frame.validate().is_ok());
390 }
391
392 #[test]
393 fn test_complete_frame_creation() {
394 let stream_id = StreamId::new();
395 let frame = Frame::complete(stream_id, 10, Some("abc123".to_string()));
396
397 assert_eq!(frame.frame_type(), &FrameType::Complete);
398 assert_eq!(frame.priority(), Priority::CRITICAL);
399 assert_eq!(frame.sequence(), 10);
400 assert!(frame.validate().is_ok());
401 }
402
403 #[test]
404 fn test_frame_with_metadata() {
405 let stream_id = StreamId::new();
406 let skeleton_data = serde_json::json!({});
407 let frame = Frame::skeleton(stream_id, 1, skeleton_data.into())
408 .with_metadata("source".to_string(), "api".to_string())
409 .with_metadata("version".to_string(), "1.0".to_string());
410
411 assert_eq!(frame.get_metadata("source"), Some(&"api".to_string()));
412 assert_eq!(frame.get_metadata("version"), Some(&"1.0".to_string()));
413 assert_eq!(frame.metadata().len(), 2);
414 }
415
416 #[test]
417 fn test_empty_patch_validation() {
418 let stream_id = StreamId::new();
419 let result = Frame::patch(stream_id, 1, Priority::MEDIUM, vec![]);
420
421 assert!(result.is_err());
422 }
423}