1use crate::{
4 DomainError, DomainResult,
5 value_objects::{JsonData, JsonPath, Priority, StreamId},
6};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11mod serde_stream_id {
13 use crate::value_objects::StreamId;
14 use serde::{Deserialize, Deserializer, Serialize, Serializer};
15
16 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
17 where
18 S: Serializer,
19 {
20 id.as_uuid().serialize(serializer)
21 }
22
23 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
24 where
25 D: Deserializer<'de>,
26 {
27 let uuid = uuid::Uuid::deserialize(deserializer)?;
28 Ok(StreamId::from_uuid(uuid))
29 }
30}
31
32mod serde_priority {
34 use crate::value_objects::Priority;
35 use serde::{Deserialize, Deserializer, Serialize, Serializer};
36
37 pub fn serialize<S>(priority: &Priority, serializer: S) -> Result<S::Ok, S::Error>
38 where
39 S: Serializer,
40 {
41 priority.value().serialize(serializer)
42 }
43
44 pub fn deserialize<'de, D>(deserializer: D) -> Result<Priority, D::Error>
45 where
46 D: Deserializer<'de>,
47 {
48 let value = u8::deserialize(deserializer)?;
49 Priority::new(value).map_err(serde::de::Error::custom)
50 }
51}
52
53mod serde_json_path {
55 use crate::value_objects::JsonPath;
56 use serde::{Deserialize, Deserializer, Serialize, Serializer};
57
58 pub fn serialize<S>(path: &JsonPath, serializer: S) -> Result<S::Ok, S::Error>
59 where
60 S: Serializer,
61 {
62 path.as_str().serialize(serializer)
63 }
64
65 pub fn deserialize<'de, D>(deserializer: D) -> Result<JsonPath, D::Error>
66 where
67 D: Deserializer<'de>,
68 {
69 let s = String::deserialize(deserializer)?;
70 JsonPath::new(s).map_err(serde::de::Error::custom)
71 }
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
76pub enum FrameType {
77 Skeleton,
79 Patch,
81 Complete,
83 Error,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89pub struct Frame {
90 #[serde(with = "serde_stream_id")]
91 stream_id: StreamId,
92 frame_type: FrameType,
93 #[serde(with = "serde_priority")]
94 priority: Priority,
95 sequence: u64,
96 timestamp: DateTime<Utc>,
97 payload: JsonData,
98 metadata: HashMap<String, String>,
99}
100
101impl std::hash::Hash for Frame {
102 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
103 self.stream_id.hash(state);
104 self.frame_type.hash(state);
105 self.priority.hash(state);
106 self.sequence.hash(state);
107 self.timestamp.hash(state);
108 self.payload.hash(state);
109
110 let mut pairs: Vec<_> = self.metadata.iter().collect();
112 pairs.sort_by_key(|(k, _)| *k);
113 pairs.hash(state);
114 }
115}
116
117impl Frame {
118 pub fn skeleton(stream_id: StreamId, sequence: u64, skeleton_data: JsonData) -> Self {
120 Self {
121 stream_id,
122 frame_type: FrameType::Skeleton,
123 priority: Priority::CRITICAL,
124 sequence,
125 timestamp: Utc::now(),
126 payload: skeleton_data,
127 metadata: HashMap::new(),
128 }
129 }
130
131 pub fn patch(
133 stream_id: StreamId,
134 sequence: u64,
135 priority: Priority,
136 patches: Vec<FramePatch>,
137 ) -> DomainResult<Self> {
138 if patches.is_empty() {
139 return Err(DomainError::InvalidFrame(
140 "Patch frame must contain at least one patch".to_string(),
141 ));
142 }
143
144 let mut payload_obj = HashMap::with_capacity(1);
146 let patches_array: Vec<JsonData> = patches
147 .into_iter()
148 .map(|patch| {
149 let mut patch_obj = HashMap::with_capacity(3);
150 patch_obj.insert("path".into(), JsonData::String(patch.path.to_string()));
151 patch_obj.insert(
152 "operation".into(),
153 JsonData::String(
154 match patch.operation {
155 PatchOperation::Set => "set",
156 PatchOperation::Append => "append",
157 PatchOperation::Merge => "merge",
158 PatchOperation::Delete => "delete",
159 }
160 .into(),
161 ),
162 );
163 patch_obj.insert("value".into(), patch.value);
164 JsonData::Object(patch_obj)
165 })
166 .collect();
167
168 payload_obj.insert("patches".into(), JsonData::Array(patches_array));
169 let payload = JsonData::Object(payload_obj);
170
171 Ok(Self {
172 stream_id,
173 frame_type: FrameType::Patch,
174 priority,
175 sequence,
176 timestamp: Utc::now(),
177 payload,
178 metadata: HashMap::new(),
179 })
180 }
181
182 pub fn complete(stream_id: StreamId, sequence: u64, checksum: Option<String>) -> Self {
184 let payload = if let Some(checksum) = checksum {
185 let mut obj = HashMap::new();
186 obj.insert("checksum".to_string(), JsonData::String(checksum));
187 JsonData::Object(obj)
188 } else {
189 JsonData::Object(HashMap::new())
190 };
191
192 Self {
193 stream_id,
194 frame_type: FrameType::Complete,
195 priority: Priority::CRITICAL,
196 sequence,
197 timestamp: Utc::now(),
198 payload,
199 metadata: HashMap::new(),
200 }
201 }
202
203 pub fn error(
205 stream_id: StreamId,
206 sequence: u64,
207 error_message: String,
208 error_code: Option<String>,
209 ) -> Self {
210 let payload = if let Some(code) = error_code {
211 let mut obj = HashMap::new();
212 obj.insert("message".to_string(), JsonData::String(error_message));
213 obj.insert("code".to_string(), JsonData::String(code));
214 JsonData::Object(obj)
215 } else {
216 let mut obj = HashMap::new();
217 obj.insert("message".to_string(), JsonData::String(error_message));
218 JsonData::Object(obj)
219 };
220
221 Self {
222 stream_id,
223 frame_type: FrameType::Error,
224 priority: Priority::CRITICAL,
225 sequence,
226 timestamp: Utc::now(),
227 payload,
228 metadata: HashMap::new(),
229 }
230 }
231
232 pub fn stream_id(&self) -> StreamId {
234 self.stream_id
235 }
236
237 pub fn frame_type(&self) -> &FrameType {
239 &self.frame_type
240 }
241
242 pub fn priority(&self) -> Priority {
244 self.priority
245 }
246
247 pub fn sequence(&self) -> u64 {
249 self.sequence
250 }
251
252 pub fn timestamp(&self) -> DateTime<Utc> {
254 self.timestamp
255 }
256
257 pub fn payload(&self) -> &JsonData {
259 &self.payload
260 }
261
262 pub fn with_metadata(mut self, key: String, value: String) -> Self {
264 self.metadata.insert(key, value);
265 self
266 }
267
268 pub fn metadata(&self) -> &HashMap<String, String> {
270 &self.metadata
271 }
272
273 pub fn get_metadata(&self, key: &str) -> Option<&String> {
275 self.metadata.get(key)
276 }
277
278 pub fn is_critical(&self) -> bool {
280 self.priority.is_critical()
281 }
282
283 pub fn is_high_priority(&self) -> bool {
285 self.priority.is_high_or_above()
286 }
287
288 pub fn estimated_size(&self) -> usize {
290 let payload_size = self.payload.to_string().len();
292 let metadata_size: usize = self
293 .metadata
294 .iter()
295 .map(|(k, v)| k.len() + v.len() + 4) .sum();
297
298 payload_size + metadata_size + 200 }
300
301 pub fn validate(&self) -> DomainResult<()> {
303 match &self.frame_type {
304 FrameType::Skeleton => {
305 if !self.priority.is_critical() {
306 return Err(DomainError::InvalidFrame(
307 "Skeleton frames must have critical priority".to_string(),
308 ));
309 }
310 }
311 FrameType::Patch => {
312 if !self.payload.is_object() {
314 return Err(DomainError::InvalidFrame(
315 "Patch frames must have object payload".to_string(),
316 ));
317 }
318
319 if !self.payload.get("patches").is_some_and(|p| p.is_array()) {
320 return Err(DomainError::InvalidFrame(
321 "Patch frames must contain patches array".to_string(),
322 ));
323 }
324 }
325 FrameType::Complete => {
326 if !self.priority.is_critical() {
327 return Err(DomainError::InvalidFrame(
328 "Complete frames must have critical priority".to_string(),
329 ));
330 }
331 }
332 FrameType::Error => {
333 if !self.priority.is_critical() {
334 return Err(DomainError::InvalidFrame(
335 "Error frames must have critical priority".to_string(),
336 ));
337 }
338
339 if !self.payload.get("message").is_some_and(|m| m.is_string()) {
340 return Err(DomainError::InvalidFrame(
341 "Error frames must contain message".to_string(),
342 ));
343 }
344 }
345 }
346
347 Ok(())
348 }
349}
350
351#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
353pub struct FramePatch {
354 #[serde(with = "serde_json_path")]
356 pub path: JsonPath,
357 pub operation: PatchOperation,
359 pub value: JsonData,
361}
362
363#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
365#[serde(rename_all = "lowercase")]
366pub enum PatchOperation {
367 Set,
369 Append,
371 Merge,
373 Delete,
375}
376
377impl FramePatch {
378 pub fn set(path: JsonPath, value: JsonData) -> Self {
380 Self {
381 path,
382 operation: PatchOperation::Set,
383 value,
384 }
385 }
386
387 pub fn append(path: JsonPath, value: JsonData) -> Self {
389 Self {
390 path,
391 operation: PatchOperation::Append,
392 value,
393 }
394 }
395
396 pub fn merge(path: JsonPath, value: JsonData) -> Self {
398 Self {
399 path,
400 operation: PatchOperation::Merge,
401 value,
402 }
403 }
404
405 pub fn delete(path: JsonPath) -> Self {
407 Self {
408 path,
409 operation: PatchOperation::Delete,
410 value: JsonData::Null,
411 }
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
420 fn test_skeleton_frame_creation() {
421 let stream_id = StreamId::new();
422 let skeleton_data = serde_json::json!({
423 "users": [],
424 "total": 0
425 });
426
427 let frame = Frame::skeleton(stream_id, 1, skeleton_data.clone().into());
428
429 assert_eq!(frame.frame_type(), &FrameType::Skeleton);
430 assert_eq!(frame.priority(), Priority::CRITICAL);
431 assert_eq!(frame.sequence(), 1);
432 assert_eq!(frame.stream_id(), stream_id);
433 assert!(frame.validate().is_ok());
434 }
435
436 #[test]
437 fn test_patch_frame_creation() {
438 let stream_id = StreamId::new();
439 let path = JsonPath::new("$.users[0].name").expect("Failed to create JsonPath in test");
440 let patch = FramePatch::set(path, JsonData::String("John".to_string()));
441
442 let frame = Frame::patch(stream_id, 2, Priority::HIGH, vec![patch])
443 .expect("Failed to create patch frame in test");
444
445 assert_eq!(frame.frame_type(), &FrameType::Patch);
446 assert_eq!(frame.priority(), Priority::HIGH);
447 assert_eq!(frame.sequence(), 2);
448 assert!(frame.validate().is_ok());
449 }
450
451 #[test]
452 fn test_complete_frame_creation() {
453 let stream_id = StreamId::new();
454 let frame = Frame::complete(stream_id, 10, Some("abc123".to_string()));
455
456 assert_eq!(frame.frame_type(), &FrameType::Complete);
457 assert_eq!(frame.priority(), Priority::CRITICAL);
458 assert_eq!(frame.sequence(), 10);
459 assert!(frame.validate().is_ok());
460 }
461
462 #[test]
463 fn test_frame_with_metadata() {
464 let stream_id = StreamId::new();
465 let skeleton_data = serde_json::json!({});
466 let frame = Frame::skeleton(stream_id, 1, skeleton_data.into())
467 .with_metadata("source".to_string(), "api".to_string())
468 .with_metadata("version".to_string(), "1.0".to_string());
469
470 assert_eq!(frame.get_metadata("source"), Some(&"api".to_string()));
471 assert_eq!(frame.get_metadata("version"), Some(&"1.0".to_string()));
472 assert_eq!(frame.metadata().len(), 2);
473 }
474
475 #[test]
476 fn test_empty_patch_validation() {
477 let stream_id = StreamId::new();
478 let result = Frame::patch(stream_id, 1, Priority::MEDIUM, vec![]);
479
480 assert!(result.is_err());
481 }
482}