Skip to main content

pjson_rs_domain/entities/
stream.rs

1//! Stream entity representing a prioritized data stream
2
3use crate::{
4    DomainError, DomainResult,
5    entities::Frame,
6    value_objects::{JsonData, JsonPath, Priority, SessionId, StreamId},
7};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12/// Custom serde for SessionId within entities
13mod serde_session_id {
14    use crate::value_objects::SessionId;
15    use serde::{Deserialize, Deserializer, Serialize, Serializer};
16
17    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
18    where
19        S: Serializer,
20    {
21        id.as_uuid().serialize(serializer)
22    }
23
24    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
25    where
26        D: Deserializer<'de>,
27    {
28        let uuid = uuid::Uuid::deserialize(deserializer)?;
29        Ok(SessionId::from_uuid(uuid))
30    }
31}
32
33/// Custom serde for StreamId within entities
34mod serde_stream_id {
35    use crate::value_objects::StreamId;
36    use serde::{Deserialize, Deserializer, Serialize, Serializer};
37
38    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
39    where
40        S: Serializer,
41    {
42        id.as_uuid().serialize(serializer)
43    }
44
45    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
46    where
47        D: Deserializer<'de>,
48    {
49        let uuid = uuid::Uuid::deserialize(deserializer)?;
50        Ok(StreamId::from_uuid(uuid))
51    }
52}
53
54/// Custom serde for HashMap<String, Priority>
55mod serde_priority_map {
56    use crate::value_objects::Priority;
57    use serde::{Deserialize, Deserializer, Serialize, Serializer};
58    use std::collections::HashMap;
59
60    pub fn serialize<S>(map: &HashMap<String, Priority>, serializer: S) -> Result<S::Ok, S::Error>
61    where
62        S: Serializer,
63    {
64        let u8_map: HashMap<String, u8> = map.iter().map(|(k, v)| (k.clone(), v.value())).collect();
65        u8_map.serialize(serializer)
66    }
67
68    pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<String, Priority>, D::Error>
69    where
70        D: Deserializer<'de>,
71    {
72        let u8_map: HashMap<String, u8> = HashMap::deserialize(deserializer)?;
73        u8_map
74            .into_iter()
75            .map(|(k, v)| {
76                Priority::new(v)
77                    .map(|p| (k, p))
78                    .map_err(serde::de::Error::custom)
79            })
80            .collect()
81    }
82}
83
84/// Stream state in its lifecycle
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86#[non_exhaustive]
87pub enum StreamState {
88    /// Stream is being prepared
89    Preparing,
90    /// Stream is actively sending data
91    Streaming,
92    /// Stream completed successfully
93    Completed,
94    /// Stream failed with error
95    Failed,
96    /// Stream was cancelled
97    Cancelled,
98}
99
100/// Stream configuration and metadata
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct StreamConfig {
103    /// Maximum frame size in bytes
104    pub max_frame_size: usize,
105    /// Maximum frames per batch
106    pub max_frames_per_batch: usize,
107    /// Compression settings
108    pub enable_compression: bool,
109    /// Custom priority rules
110    #[serde(with = "serde_priority_map")]
111    pub priority_rules: HashMap<String, Priority>,
112}
113
114impl Default for StreamConfig {
115    fn default() -> Self {
116        Self {
117            max_frame_size: 64 * 1024, // 64KB
118            max_frames_per_batch: 10,
119            enable_compression: true,
120            priority_rules: HashMap::new(),
121        }
122    }
123}
124
125/// Stream statistics for monitoring
126#[derive(Debug, Clone, Default, Serialize, Deserialize)]
127pub struct StreamStats {
128    /// Total number of frames generated
129    pub total_frames: u64,
130    /// Number of skeleton frames sent
131    pub skeleton_frames: u64,
132    /// Number of patch frames sent
133    pub patch_frames: u64,
134    /// Number of completion frames sent
135    pub complete_frames: u64,
136    /// Number of error frames sent
137    pub error_frames: u64,
138    /// Total bytes transmitted across all frames
139    pub total_bytes: u64,
140    /// Bytes transmitted in critical priority frames
141    pub critical_bytes: u64,
142    /// Bytes transmitted in high priority frames
143    pub high_priority_bytes: u64,
144    /// Average size of frames in bytes
145    pub average_frame_size: f64,
146}
147
148/// Priority data stream entity
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Stream {
151    #[serde(with = "serde_stream_id")]
152    id: StreamId,
153    #[serde(with = "serde_session_id")]
154    session_id: SessionId,
155    state: StreamState,
156    config: StreamConfig,
157    stats: StreamStats,
158    created_at: DateTime<Utc>,
159    updated_at: DateTime<Utc>,
160    completed_at: Option<DateTime<Utc>>,
161    next_sequence: u64,
162    source_data: Option<JsonData>,
163    metadata: HashMap<String, String>,
164}
165
166impl Stream {
167    /// Create new stream
168    pub fn new(session_id: SessionId, source_data: JsonData, config: StreamConfig) -> Self {
169        let now = Utc::now();
170
171        Self {
172            id: StreamId::new(),
173            session_id,
174            state: StreamState::Preparing,
175            config,
176            stats: StreamStats::default(),
177            created_at: now,
178            updated_at: now,
179            completed_at: None,
180            next_sequence: 1,
181            source_data: Some(source_data),
182            metadata: HashMap::new(),
183        }
184    }
185
186    /// Get stream ID
187    pub fn id(&self) -> StreamId {
188        self.id
189    }
190
191    /// Get session ID
192    pub fn session_id(&self) -> SessionId {
193        self.session_id
194    }
195
196    /// Get current state
197    pub fn state(&self) -> &StreamState {
198        &self.state
199    }
200
201    /// Get configuration
202    pub fn config(&self) -> &StreamConfig {
203        &self.config
204    }
205
206    /// Get statistics
207    pub fn stats(&self) -> &StreamStats {
208        &self.stats
209    }
210
211    /// Get creation timestamp
212    pub fn created_at(&self) -> DateTime<Utc> {
213        self.created_at
214    }
215
216    /// Get last update timestamp
217    pub fn updated_at(&self) -> DateTime<Utc> {
218        self.updated_at
219    }
220
221    /// Get completion timestamp
222    pub fn completed_at(&self) -> Option<DateTime<Utc>> {
223        self.completed_at
224    }
225
226    /// Get source data
227    pub fn source_data(&self) -> Option<&JsonData> {
228        self.source_data.as_ref()
229    }
230
231    /// Get metadata
232    pub fn metadata(&self) -> &HashMap<String, String> {
233        &self.metadata
234    }
235
236    /// Add metadata
237    pub fn add_metadata(&mut self, key: String, value: String) {
238        self.metadata.insert(key, value);
239        self.update_timestamp();
240    }
241
242    /// Start streaming (transition to Streaming state)
243    pub fn start_streaming(&mut self) -> DomainResult<()> {
244        match self.state {
245            StreamState::Preparing => {
246                self.state = StreamState::Streaming;
247                self.update_timestamp();
248                Ok(())
249            }
250            _ => Err(DomainError::InvalidStateTransition(format!(
251                "Cannot start streaming from state: {:?}",
252                self.state
253            ))),
254        }
255    }
256
257    /// Complete stream successfully
258    pub fn complete(&mut self) -> DomainResult<()> {
259        match self.state {
260            StreamState::Streaming => {
261                self.state = StreamState::Completed;
262                self.completed_at = Some(Utc::now());
263                self.update_timestamp();
264                Ok(())
265            }
266            _ => Err(DomainError::InvalidStateTransition(format!(
267                "Cannot complete stream from state: {:?}",
268                self.state
269            ))),
270        }
271    }
272
273    /// Fail stream with error
274    pub fn fail(&mut self, error: String) -> DomainResult<()> {
275        match self.state {
276            StreamState::Preparing | StreamState::Streaming => {
277                self.state = StreamState::Failed;
278                self.completed_at = Some(Utc::now());
279                self.add_metadata("error".to_string(), error);
280                Ok(())
281            }
282            _ => Err(DomainError::InvalidStateTransition(format!(
283                "Cannot fail stream from state: {:?}",
284                self.state
285            ))),
286        }
287    }
288
289    /// Cancel stream
290    pub fn cancel(&mut self) -> DomainResult<()> {
291        match self.state {
292            StreamState::Preparing | StreamState::Streaming => {
293                self.state = StreamState::Cancelled;
294                self.completed_at = Some(Utc::now());
295                self.update_timestamp();
296                Ok(())
297            }
298            _ => Err(DomainError::InvalidStateTransition(format!(
299                "Cannot cancel stream from state: {:?}",
300                self.state
301            ))),
302        }
303    }
304
305    /// Generate skeleton frame for the stream
306    pub fn create_skeleton_frame(&mut self) -> DomainResult<Frame> {
307        if !matches!(self.state, StreamState::Streaming) {
308            return Err(DomainError::InvalidStreamState(
309                "Stream must be in streaming state to create frames".to_string(),
310            ));
311        }
312
313        let skeleton_data = self.source_data.as_ref().ok_or_else(|| {
314            DomainError::InvalidStreamState("No source data available for skeleton".to_string())
315        })?;
316
317        let skeleton = self.generate_skeleton(skeleton_data)?;
318        let frame = Frame::skeleton(self.id, self.next_sequence, skeleton);
319
320        self.record_frame_created(&frame);
321
322        Ok(frame)
323    }
324
325    /// Create batch of patch frames based on priority
326    pub fn create_patch_frames(
327        &mut self,
328        priority_threshold: Priority,
329        max_frames: usize,
330    ) -> DomainResult<Vec<Frame>> {
331        if !matches!(self.state, StreamState::Streaming) {
332            return Err(DomainError::InvalidStreamState(
333                "Stream must be in streaming state to create frames".to_string(),
334            ));
335        }
336
337        let source_data = self.source_data.as_ref().ok_or_else(|| {
338            DomainError::InvalidStreamState("No source data available for patches".to_string())
339        })?;
340
341        let prioritized = self.extract_patches(source_data, priority_threshold)?;
342        let frames = self.batch_patches_into_frames(prioritized, max_frames)?;
343
344        for frame in &frames {
345            self.record_frame_created(frame);
346        }
347
348        Ok(frames)
349    }
350
351    /// Create completion frame
352    pub fn create_completion_frame(&mut self, checksum: Option<String>) -> DomainResult<Frame> {
353        if !matches!(self.state, StreamState::Streaming) {
354            return Err(DomainError::InvalidStreamState(
355                "Stream must be in streaming state to create frames".to_string(),
356            ));
357        }
358
359        let frame = Frame::complete(self.id, self.next_sequence, checksum);
360        self.record_frame_created(&frame);
361
362        Ok(frame)
363    }
364
365    /// Check if stream is active
366    pub fn is_active(&self) -> bool {
367        matches!(self.state, StreamState::Preparing | StreamState::Streaming)
368    }
369
370    /// Check if stream is finished
371    pub fn is_finished(&self) -> bool {
372        matches!(
373            self.state,
374            StreamState::Completed | StreamState::Failed | StreamState::Cancelled
375        )
376    }
377
378    /// Get stream duration
379    pub fn duration(&self) -> Option<chrono::Duration> {
380        self.completed_at.map(|end| end - self.created_at)
381    }
382
383    /// Calculate stream progress (0.0 to 1.0)
384    pub fn progress(&self) -> f64 {
385        match self.state {
386            StreamState::Preparing => 0.0,
387            StreamState::Streaming => {
388                // Estimate based on frames sent vs expected
389                if self.stats.total_frames == 0 {
390                    0.1 // Just started
391                } else {
392                    // Simple heuristic: more frames = more progress
393                    (self.stats.total_frames as f64 / 100.0).min(0.9)
394                }
395            }
396            StreamState::Completed => 1.0,
397            StreamState::Failed | StreamState::Cancelled => {
398                // Partial progress before failure/cancellation
399                (self.stats.total_frames as f64 / 100.0).min(0.99)
400            }
401        }
402    }
403
404    /// Update configuration
405    pub fn update_config(&mut self, config: StreamConfig) -> DomainResult<()> {
406        if !self.is_active() {
407            return Err(DomainError::InvalidStreamState(
408                "Cannot update config of inactive stream".to_string(),
409            ));
410        }
411
412        self.config = config;
413        self.update_timestamp();
414        Ok(())
415    }
416
417    /// Private helper: Update timestamp
418    fn update_timestamp(&mut self) {
419        self.updated_at = Utc::now();
420    }
421
422    /// Private helper: Record frame creation for stats
423    fn record_frame_created(&mut self, frame: &Frame) {
424        self.next_sequence += 1;
425        self.stats.total_frames += 1;
426
427        let frame_size = frame.estimated_size() as u64;
428        self.stats.total_bytes += frame_size;
429
430        match frame.frame_type() {
431            crate::entities::frame::FrameType::Skeleton => {
432                self.stats.skeleton_frames += 1;
433                self.stats.critical_bytes += frame_size;
434            }
435            crate::entities::frame::FrameType::Patch => {
436                self.stats.patch_frames += 1;
437                if frame.is_critical() {
438                    self.stats.critical_bytes += frame_size;
439                } else if frame.is_high_priority() {
440                    self.stats.high_priority_bytes += frame_size;
441                }
442            }
443            crate::entities::frame::FrameType::Complete => {
444                self.stats.complete_frames += 1;
445                self.stats.critical_bytes += frame_size;
446            }
447            crate::entities::frame::FrameType::Error => {
448                self.stats.error_frames += 1;
449                self.stats.critical_bytes += frame_size;
450            }
451        }
452
453        // Update average frame size
454        self.stats.average_frame_size =
455            self.stats.total_bytes as f64 / self.stats.total_frames as f64;
456
457        self.update_timestamp();
458    }
459
460    /// Private helper: Generate skeleton from source data
461    fn generate_skeleton(&self, data: &JsonData) -> DomainResult<JsonData> {
462        // Simplified skeleton generation - create empty structure
463        match data {
464            JsonData::Object(obj) => {
465                let mut skeleton = HashMap::new();
466                for (key, value) in obj.iter() {
467                    skeleton.insert(
468                        key.clone(),
469                        match value {
470                            JsonData::Array(_) => JsonData::Array(Vec::new()),
471                            JsonData::Object(_) => self.generate_skeleton(value)?,
472                            JsonData::Integer(_) => JsonData::Integer(0),
473                            JsonData::Float(_) => JsonData::Float(0.0),
474                            JsonData::String(_) => JsonData::Null,
475                            JsonData::Bool(_) => JsonData::Bool(false),
476                            JsonData::Null => JsonData::Null,
477                        },
478                    );
479                }
480                Ok(JsonData::Object(skeleton))
481            }
482            JsonData::Array(_) => Ok(JsonData::Array(Vec::new())),
483            _ => Ok(JsonData::Null),
484        }
485    }
486
487    /// Private helper: Extract patches with priority filtering.
488    ///
489    /// Walks `data` recursively, emitting one `Set` patch per leaf-level
490    /// value (primitives and arrays — objects are traversed without emitting
491    /// a patch, since their structure is already conveyed by the skeleton
492    /// frame). Each patch is paired with a computed priority so that
493    /// `batch_patches_into_frames` can group chunks by maximum priority.
494    /// Patches whose priority falls below `threshold` are dropped.
495    fn extract_patches(
496        &self,
497        data: &JsonData,
498        threshold: Priority,
499    ) -> DomainResult<Vec<(crate::entities::frame::FramePatch, Priority)>> {
500        let mut patches = Vec::new();
501        self.collect_patches(data, &JsonPath::root(), threshold, &mut patches)?;
502        // Sort by priority descending so high-priority patches land in earlier
503        // frames within the chunk-based batch layout.
504        patches.sort_by_key(|p| core::cmp::Reverse(p.1));
505        Ok(patches)
506    }
507
508    /// Recursive walker that emits prioritized patches into `out`.
509    fn collect_patches(
510        &self,
511        data: &JsonData,
512        path: &JsonPath,
513        threshold: Priority,
514        out: &mut Vec<(crate::entities::frame::FramePatch, Priority)>,
515    ) -> DomainResult<()> {
516        if let JsonData::Object(map) = data {
517            for (key, value) in map.iter() {
518                // Keys with characters JsonPath cannot encode (`.`, `[`, `]`)
519                // are skipped: a domain-internal walker must not refuse the
520                // entire document because of one weird key.
521                let Ok(child_path) = path.append_key(key) else {
522                    continue;
523                };
524                self.collect_patches(value, &child_path, threshold, out)?;
525            }
526            return Ok(());
527        }
528
529        let priority = self.compute_priority(path, data);
530        if priority >= threshold {
531            let patch = crate::entities::frame::FramePatch::set(path.clone(), data.clone());
532            out.push((patch, priority));
533        }
534        Ok(())
535    }
536
537    /// Compute a priority for a patch by delegating to
538    /// [`crate::services::compute_priority`].
539    ///
540    /// The per-stream `priority_rules` map is mapped onto
541    /// [`PriorityHeuristicConfig::overrides`] so that user-provided rules keep
542    /// winning over the shared heuristic. This is the single entry point used
543    /// by both the HTTP transport (via `extract_patches`) and the WebAssembly
544    /// bindings; see #242 for the divergence this resolves.
545    fn compute_priority(&self, path: &JsonPath, value: &JsonData) -> Priority {
546        let mut cfg = crate::services::PriorityHeuristicConfig::default();
547        if !self.config.priority_rules.is_empty() {
548            cfg.overrides = self.config.priority_rules.clone();
549        }
550        crate::services::compute_priority(&cfg, path, value)
551    }
552
553    /// Private helper: Batch patches into frames.
554    ///
555    /// Each frame's priority is the maximum priority of the patches in
556    /// its chunk, so per-frame ordering downstream reflects the most
557    /// important content the frame carries.
558    fn batch_patches_into_frames(
559        &mut self,
560        patches: Vec<(crate::entities::frame::FramePatch, Priority)>,
561        max_frames: usize,
562    ) -> DomainResult<Vec<Frame>> {
563        if patches.is_empty() || max_frames == 0 {
564            return Ok(Vec::new());
565        }
566
567        let mut frames = Vec::new();
568        let chunk_size = patches.len().div_ceil(max_frames).max(1);
569
570        for chunk in patches.chunks(chunk_size) {
571            let priority = chunk
572                .iter()
573                .map(|(_, p)| *p)
574                .max()
575                .unwrap_or(Priority::MEDIUM);
576
577            let frame_patches: Vec<crate::entities::frame::FramePatch> =
578                chunk.iter().map(|(patch, _)| patch.clone()).collect();
579
580            let frame = Frame::patch(self.id, self.next_sequence, priority, frame_patches)?;
581
582            frames.push(frame);
583        }
584
585        Ok(frames)
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    #[test]
594    fn test_stream_creation() {
595        let session_id = SessionId::new();
596        let source_data = serde_json::json!({
597            "users": [
598                {"id": 1, "name": "John"},
599                {"id": 2, "name": "Jane"}
600            ],
601            "total": 2
602        });
603
604        let stream = Stream::new(
605            session_id,
606            source_data.clone().into(),
607            StreamConfig::default(),
608        );
609
610        assert_eq!(stream.session_id(), session_id);
611        assert_eq!(stream.state(), &StreamState::Preparing);
612        assert!(stream.is_active());
613        assert!(!stream.is_finished());
614        assert_eq!(stream.progress(), 0.0);
615    }
616
617    #[test]
618    fn test_stream_state_transitions() {
619        let session_id = SessionId::new();
620        let source_data = serde_json::json!({});
621        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
622
623        // Start streaming
624        assert!(stream.start_streaming().is_ok());
625        assert_eq!(stream.state(), &StreamState::Streaming);
626
627        // Complete stream
628        assert!(stream.complete().is_ok());
629        assert_eq!(stream.state(), &StreamState::Completed);
630        assert!(stream.is_finished());
631        assert_eq!(stream.progress(), 1.0);
632    }
633
634    #[test]
635    fn test_invalid_state_transitions() {
636        let session_id = SessionId::new();
637        let source_data = serde_json::json!({});
638        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
639
640        // Cannot complete from preparing state
641        assert!(stream.complete().is_err());
642
643        // Start and complete
644        assert!(stream.start_streaming().is_ok());
645        assert!(stream.complete().is_ok());
646
647        // Cannot start again from completed state
648        assert!(stream.start_streaming().is_err());
649    }
650
651    #[test]
652    fn test_frame_creation() {
653        let session_id = SessionId::new();
654        let source_data = serde_json::json!({
655            "test": "data"
656        });
657        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
658
659        // Cannot create frames before streaming
660        assert!(stream.create_skeleton_frame().is_err());
661
662        // Start streaming and create skeleton
663        assert!(stream.start_streaming().is_ok());
664        let skeleton = stream
665            .create_skeleton_frame()
666            .expect("Failed to create skeleton frame in test");
667
668        assert_eq!(
669            skeleton.frame_type(),
670            &crate::entities::frame::FrameType::Skeleton
671        );
672        assert_eq!(skeleton.sequence(), 1);
673        assert_eq!(stream.stats().skeleton_frames, 1);
674    }
675
676    #[test]
677    fn test_stream_metadata() {
678        let session_id = SessionId::new();
679        let source_data = serde_json::json!({});
680        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
681
682        stream.add_metadata("source".to_string(), "api".to_string());
683        stream.add_metadata("version".to_string(), "1.0".to_string());
684
685        assert_eq!(stream.metadata().len(), 2);
686        assert_eq!(stream.metadata().get("source"), Some(&"api".to_string()));
687    }
688
689    #[test]
690    fn test_create_patch_frames_emits_frames_for_typical_payload() {
691        let session_id = SessionId::new();
692        let source_data = serde_json::json!({
693            "id": "abc-123",
694            "name": "Alice",
695            "items": [1, 2, 3]
696        });
697        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
698
699        stream
700            .start_streaming()
701            .expect("stream must enter streaming state");
702
703        let frames = stream
704            .create_patch_frames(Priority::BACKGROUND, 16)
705            .expect("frame generation must succeed");
706
707        assert!(
708            !frames.is_empty(),
709            "extract_patches must produce at least one patch for non-empty source data"
710        );
711
712        let id_frame_priority_max = frames
713            .iter()
714            .map(|f| f.priority())
715            .max()
716            .expect("non-empty frames must have a max priority");
717        assert!(
718            id_frame_priority_max >= Priority::CRITICAL,
719            "frames carrying the `id` field must surface at critical priority"
720        );
721    }
722
723    #[test]
724    fn test_create_patch_frames_filters_below_threshold() {
725        let session_id = SessionId::new();
726        // `analytics` is forced to BACKGROUND priority by the heuristic, so
727        // a CRITICAL threshold filters everything out.
728        let source_data = serde_json::json!({
729            "analytics": {"clicks": 1, "views": 2}
730        });
731        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
732
733        stream.start_streaming().expect("stream starts");
734        let frames = stream
735            .create_patch_frames(Priority::CRITICAL, 8)
736            .expect("frame generation must succeed");
737
738        assert!(
739            frames.is_empty(),
740            "patches below the priority threshold must be dropped"
741        );
742    }
743
744    #[test]
745    fn test_create_patch_frames_uses_max_priority_per_chunk() {
746        let session_id = SessionId::new();
747        // Mix of CRITICAL (`id`), HIGH (`name`), and BACKGROUND (`logs`).
748        let source_data = serde_json::json!({
749            "id": "x",
750            "name": "y",
751            "logs": "z"
752        });
753        let mut stream = Stream::new(session_id, source_data.into(), StreamConfig::default());
754
755        stream.start_streaming().expect("stream starts");
756        // Force a single chunk so we can assert max-priority aggregation.
757        let frames = stream
758            .create_patch_frames(Priority::BACKGROUND, 1)
759            .expect("frame generation must succeed");
760
761        assert_eq!(frames.len(), 1, "max_frames=1 must yield a single frame");
762        assert_eq!(
763            frames[0].priority(),
764            Priority::CRITICAL,
765            "frame priority must reflect the highest-priority patch in the chunk"
766        );
767    }
768}