rtmp_rs/registry/
mod.rs

1//! Stream registry for pub/sub routing
2//!
3//! The registry manages active streams and routes media from publishers to subscribers.
4//! It uses `tokio::sync::broadcast` for efficient zero-copy fan-out to multiple subscribers.
5//!
6//! # Architecture
7//!
8//! ```text
9//!                          Arc<StreamRegistry>
10//!                     ┌─────────────────────────┐
11//!                     │ streams: HashMap<Key,   │
12//!                     │   StreamEntry {         │
13//!                     │     gop_buffer,         │
14//!                     │     tx: broadcast::Tx,  │
15//!                     │   }                     │
16//!                     │ >                       │
17//!                     └───────────┬─────────────┘
18//!                                 │
19//!         ┌───────────────────────┼───────────────────────┐
20//!         │                       │                       │
21//!         ▼                       ▼                       ▼
22//!    [Publisher]            [Subscriber]            [Subscriber]
23//!    handle_video()         frame_rx.recv()         frame_rx.recv()
24//!         │                       │                       │
25//!         └──► registry.broadcast()──► send_video() ──► TCP
26//! ```
27//!
28//! # Zero-Copy Design
29//!
30//! `bytes::Bytes` uses reference counting, so all subscribers share the same
31//! memory allocation. The broadcast channel clones the `BroadcastFrame`, but
32//! the inner `Bytes` data is only reference-counted, not copied.
33
34pub mod config;
35
36pub use config::RegistryConfig;
37
38use std::collections::HashMap;
39use std::sync::atomic::{AtomicU32, Ordering};
40use std::sync::Arc;
41use std::time::Instant;
42
43use bytes::Bytes;
44use tokio::sync::{broadcast, RwLock};
45
46use crate::media::flv::FlvTag;
47use crate::media::gop::GopBuffer;
48
49/// Unique identifier for a stream (app + stream name)
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct StreamKey {
52    /// Application name (e.g., "live")
53    pub app: String,
54    /// Stream name/key (e.g., "stream_key_123")
55    pub name: String,
56}
57
58impl StreamKey {
59    /// Create a new stream key
60    pub fn new(app: impl Into<String>, name: impl Into<String>) -> Self {
61        Self {
62            app: app.into(),
63            name: name.into(),
64        }
65    }
66}
67
68impl std::fmt::Display for StreamKey {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(f, "{}/{}", self.app, self.name)
71    }
72}
73
74/// Type of broadcast frame
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum FrameType {
77    /// Video frame
78    Video,
79    /// Audio frame
80    Audio,
81    /// Metadata (onMetaData)
82    Metadata,
83}
84
85/// A frame to be broadcast to subscribers
86///
87/// This is designed to be cheap to clone due to `Bytes` reference counting.
88#[derive(Debug, Clone)]
89pub struct BroadcastFrame {
90    /// Type of frame
91    pub frame_type: FrameType,
92    /// Timestamp in milliseconds
93    pub timestamp: u32,
94    /// Frame data (zero-copy via reference counting)
95    pub data: Bytes,
96    /// Whether this is a keyframe (video only)
97    pub is_keyframe: bool,
98    /// Whether this is a sequence header
99    pub is_header: bool,
100}
101
102impl BroadcastFrame {
103    /// Create a video frame
104    pub fn video(timestamp: u32, data: Bytes, is_keyframe: bool, is_header: bool) -> Self {
105        Self {
106            frame_type: FrameType::Video,
107            timestamp,
108            data,
109            is_keyframe,
110            is_header,
111        }
112    }
113
114    /// Create an audio frame
115    pub fn audio(timestamp: u32, data: Bytes, is_header: bool) -> Self {
116        Self {
117            frame_type: FrameType::Audio,
118            timestamp,
119            data,
120            is_keyframe: false,
121            is_header,
122        }
123    }
124
125    /// Create a metadata frame
126    pub fn metadata(data: Bytes) -> Self {
127        Self {
128            frame_type: FrameType::Metadata,
129            timestamp: 0,
130            data,
131            is_keyframe: false,
132            is_header: false,
133        }
134    }
135
136    /// Convert from FLV tag
137    pub fn from_flv_tag(tag: &FlvTag) -> Self {
138        match tag.tag_type {
139            crate::media::flv::FlvTagType::Video => {
140                let is_keyframe = tag.is_keyframe();
141                let is_header = tag.is_avc_sequence_header();
142                Self::video(tag.timestamp, tag.data.clone(), is_keyframe, is_header)
143            }
144            crate::media::flv::FlvTagType::Audio => {
145                let is_header = tag.is_aac_sequence_header();
146                Self::audio(tag.timestamp, tag.data.clone(), is_header)
147            }
148            crate::media::flv::FlvTagType::Script => Self::metadata(tag.data.clone()),
149        }
150    }
151}
152
153/// State of a stream entry
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum StreamState {
156    /// Stream has an active publisher
157    Active,
158    /// Publisher disconnected, within grace period
159    GracePeriod,
160    /// No publisher, waiting for cleanup
161    Idle,
162}
163
164/// Entry for a single stream in the registry
165pub struct StreamEntry {
166    /// GOP buffer for late-joiner support
167    pub gop_buffer: GopBuffer,
168
169    /// Cached video sequence header for fast subscriber catchup
170    pub video_header: Option<BroadcastFrame>,
171
172    /// Cached audio sequence header for fast subscriber catchup
173    pub audio_header: Option<BroadcastFrame>,
174
175    /// Cached metadata
176    pub metadata: Option<BroadcastFrame>,
177
178    /// Current publisher's session ID (None if no publisher)
179    pub publisher_id: Option<u64>,
180
181    /// Broadcast sender for fan-out to subscribers
182    tx: broadcast::Sender<BroadcastFrame>,
183
184    /// Number of active subscribers
185    pub subscriber_count: AtomicU32,
186
187    /// When the publisher disconnected (for grace period tracking)
188    pub publisher_disconnected_at: Option<Instant>,
189
190    /// When the stream was created
191    pub created_at: Instant,
192
193    /// Current stream state
194    pub state: StreamState,
195}
196
197impl StreamEntry {
198    /// Create a new stream entry
199    fn new(config: &RegistryConfig) -> Self {
200        let (tx, _) = broadcast::channel(config.broadcast_capacity);
201
202        Self {
203            gop_buffer: GopBuffer::with_max_size(config.max_gop_size),
204            video_header: None,
205            audio_header: None,
206            metadata: None,
207            publisher_id: None,
208            tx,
209            subscriber_count: AtomicU32::new(0),
210            publisher_disconnected_at: None,
211            created_at: Instant::now(),
212            state: StreamState::Idle,
213        }
214    }
215
216    /// Get the number of subscribers
217    pub fn subscriber_count(&self) -> u32 {
218        self.subscriber_count.load(Ordering::Relaxed)
219    }
220
221    /// Check if the stream has an active publisher
222    pub fn has_publisher(&self) -> bool {
223        self.publisher_id.is_some()
224    }
225
226    /// Get catchup frames for a new subscriber
227    ///
228    /// Returns sequence headers followed by GOP buffer contents.
229    pub fn get_catchup_frames(&self) -> Vec<BroadcastFrame> {
230        let mut frames = Vec::new();
231
232        // Add metadata first
233        if let Some(ref meta) = self.metadata {
234            frames.push(meta.clone());
235        }
236
237        // Add sequence headers
238        if let Some(ref video) = self.video_header {
239            frames.push(video.clone());
240        }
241        if let Some(ref audio) = self.audio_header {
242            frames.push(audio.clone());
243        }
244
245        // Add GOP buffer contents
246        for tag in self.gop_buffer.get_catchup_data() {
247            frames.push(BroadcastFrame::from_flv_tag(&tag));
248        }
249
250        frames
251    }
252}
253
254/// Error type for registry operations
255#[derive(Debug, Clone)]
256pub enum RegistryError {
257    /// Stream not found
258    StreamNotFound(StreamKey),
259    /// Stream already has a publisher
260    StreamAlreadyPublishing(StreamKey),
261    /// Publisher ID mismatch
262    PublisherMismatch,
263    /// Stream is not active (e.g., in grace period without publisher)
264    StreamNotActive(StreamKey),
265}
266
267impl std::fmt::Display for RegistryError {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        match self {
270            RegistryError::StreamNotFound(key) => write!(f, "Stream not found: {}", key),
271            RegistryError::StreamAlreadyPublishing(key) => {
272                write!(f, "Stream already has a publisher: {}", key)
273            }
274            RegistryError::PublisherMismatch => write!(f, "Publisher ID mismatch"),
275            RegistryError::StreamNotActive(key) => write!(f, "Stream not active: {}", key),
276        }
277    }
278}
279
280impl std::error::Error for RegistryError {}
281
282/// Central registry for all active streams
283///
284/// Thread-safe via `RwLock`. Read-heavy workloads (subscriber count checks,
285/// broadcasting) benefit from the concurrent read access.
286pub struct StreamRegistry {
287    /// Map of stream key to stream entry
288    streams: RwLock<HashMap<StreamKey, Arc<RwLock<StreamEntry>>>>,
289
290    /// Configuration
291    config: RegistryConfig,
292}
293
294impl StreamRegistry {
295    /// Create a new stream registry with default configuration
296    pub fn new() -> Self {
297        Self::with_config(RegistryConfig::default())
298    }
299
300    /// Create a new stream registry with custom configuration
301    pub fn with_config(config: RegistryConfig) -> Self {
302        Self {
303            streams: RwLock::new(HashMap::new()),
304            config,
305        }
306    }
307
308    /// Get the registry configuration
309    pub fn config(&self) -> &RegistryConfig {
310        &self.config
311    }
312
313    /// Register a publisher for a stream
314    ///
315    /// If the stream doesn't exist, it will be created.
316    /// If the stream exists and is in grace period, the publisher reclaims it.
317    /// Returns an error if the stream already has an active publisher.
318    pub async fn register_publisher(
319        &self,
320        key: &StreamKey,
321        session_id: u64,
322    ) -> Result<(), RegistryError> {
323        let mut streams = self.streams.write().await;
324
325        if let Some(entry_arc) = streams.get(key) {
326            let mut entry = entry_arc.write().await;
327
328            // Check if stream is available for publishing
329            match entry.state {
330                StreamState::Active if entry.publisher_id.is_some() => {
331                    return Err(RegistryError::StreamAlreadyPublishing(key.clone()));
332                }
333                StreamState::GracePeriod | StreamState::Idle | StreamState::Active => {
334                    // Reclaim or take over the stream
335                    entry.publisher_id = Some(session_id);
336                    entry.publisher_disconnected_at = None;
337                    entry.state = StreamState::Active;
338
339                    tracing::info!(
340                        stream = %key,
341                        session_id = session_id,
342                        subscribers = entry.subscriber_count(),
343                        "Publisher registered (existing stream)"
344                    );
345                }
346            }
347        } else {
348            // Create new stream entry
349            let mut entry = StreamEntry::new(&self.config);
350            entry.publisher_id = Some(session_id);
351            entry.state = StreamState::Active;
352
353            streams.insert(key.clone(), Arc::new(RwLock::new(entry)));
354
355            tracing::info!(
356                stream = %key,
357                session_id = session_id,
358                "Publisher registered (new stream)"
359            );
360        }
361
362        Ok(())
363    }
364
365    /// Unregister a publisher from a stream
366    ///
367    /// The stream enters grace period if there are active subscribers,
368    /// allowing the publisher to reconnect.
369    pub async fn unregister_publisher(&self, key: &StreamKey, session_id: u64) {
370        let streams = self.streams.read().await;
371
372        if let Some(entry_arc) = streams.get(key) {
373            let mut entry = entry_arc.write().await;
374
375            // Verify this is the actual publisher
376            if entry.publisher_id != Some(session_id) {
377                tracing::warn!(
378                    stream = %key,
379                    expected = ?entry.publisher_id,
380                    actual = session_id,
381                    "Publisher unregister mismatch"
382                );
383                return;
384            }
385
386            entry.publisher_id = None;
387            entry.publisher_disconnected_at = Some(Instant::now());
388
389            // If there are subscribers, enter grace period; otherwise go idle
390            if entry.subscriber_count() > 0 {
391                entry.state = StreamState::GracePeriod;
392                tracing::info!(
393                    stream = %key,
394                    session_id = session_id,
395                    subscribers = entry.subscriber_count(),
396                    grace_period_secs = self.config.publisher_grace_period.as_secs(),
397                    "Publisher disconnected, entering grace period"
398                );
399            } else {
400                entry.state = StreamState::Idle;
401                tracing::info!(
402                    stream = %key,
403                    session_id = session_id,
404                    "Publisher disconnected, no subscribers"
405                );
406            }
407        }
408    }
409
410    /// Subscribe to a stream
411    ///
412    /// Returns a broadcast receiver and catchup frames for the subscriber.
413    /// The catchup frames contain sequence headers and recent GOP data.
414    pub async fn subscribe(
415        &self,
416        key: &StreamKey,
417    ) -> Result<(broadcast::Receiver<BroadcastFrame>, Vec<BroadcastFrame>), RegistryError> {
418        let streams = self.streams.read().await;
419
420        let entry_arc = streams
421            .get(key)
422            .ok_or_else(|| RegistryError::StreamNotFound(key.clone()))?;
423
424        let entry = entry_arc.read().await;
425
426        // Allow subscription even during grace period (publisher might reconnect)
427        if entry.state == StreamState::Idle && entry.publisher_id.is_none() {
428            return Err(RegistryError::StreamNotActive(key.clone()));
429        }
430
431        // Get receiver and catchup frames
432        let rx = entry.tx.subscribe();
433        let catchup = entry.get_catchup_frames();
434
435        // Increment subscriber count
436        entry.subscriber_count.fetch_add(1, Ordering::Relaxed);
437
438        tracing::info!(
439            stream = %key,
440            subscribers = entry.subscriber_count(),
441            catchup_frames = catchup.len(),
442            "Subscriber added"
443        );
444
445        Ok((rx, catchup))
446    }
447
448    /// Unsubscribe from a stream
449    pub async fn unsubscribe(&self, key: &StreamKey) {
450        let streams = self.streams.read().await;
451
452        if let Some(entry_arc) = streams.get(key) {
453            let entry = entry_arc.read().await;
454            let prev = entry.subscriber_count.fetch_sub(1, Ordering::Relaxed);
455
456            tracing::debug!(
457                stream = %key,
458                subscribers = prev.saturating_sub(1),
459                "Subscriber removed"
460            );
461        }
462    }
463
464    /// Broadcast a frame to all subscribers of a stream
465    ///
466    /// Also updates the GOP buffer and sequence headers as needed.
467    pub async fn broadcast(&self, key: &StreamKey, frame: BroadcastFrame) {
468        let streams = self.streams.read().await;
469
470        if let Some(entry_arc) = streams.get(key) {
471            let mut entry = entry_arc.write().await;
472
473            // Update cached headers
474            match frame.frame_type {
475                FrameType::Video if frame.is_header => {
476                    entry.video_header = Some(frame.clone());
477                }
478                FrameType::Audio if frame.is_header => {
479                    entry.audio_header = Some(frame.clone());
480                }
481                FrameType::Metadata => {
482                    entry.metadata = Some(frame.clone());
483                }
484                _ => {}
485            }
486
487            // Update GOP buffer for video frames (non-headers)
488            if frame.frame_type == FrameType::Video && !frame.is_header {
489                let tag = FlvTag::video(frame.timestamp, frame.data.clone());
490                entry.gop_buffer.push(tag);
491            }
492
493            // Broadcast to subscribers
494            // Note: send() returns Ok(n) where n is number of receivers, or Err if no receivers
495            let _ = entry.tx.send(frame);
496        }
497    }
498
499    /// Get sequence headers for a stream (video and audio decoder config)
500    ///
501    /// Used when resuming playback after pause to reinitialize decoders.
502    pub async fn get_sequence_headers(&self, key: &StreamKey) -> Vec<BroadcastFrame> {
503        let streams = self.streams.read().await;
504
505        if let Some(entry_arc) = streams.get(key) {
506            let entry = entry_arc.read().await;
507            let mut frames = Vec::with_capacity(2);
508
509            if let Some(ref video) = entry.video_header {
510                frames.push(video.clone());
511            }
512            if let Some(ref audio) = entry.audio_header {
513                frames.push(audio.clone());
514            }
515
516            frames
517        } else {
518            Vec::new()
519        }
520    }
521
522    /// Check if a stream exists and has an active publisher
523    pub async fn has_active_stream(&self, key: &StreamKey) -> bool {
524        let streams = self.streams.read().await;
525
526        if let Some(entry_arc) = streams.get(key) {
527            let entry = entry_arc.read().await;
528            entry.state == StreamState::Active && entry.publisher_id.is_some()
529        } else {
530            false
531        }
532    }
533
534    /// Check if a stream exists (active or in grace period)
535    pub async fn stream_exists(&self, key: &StreamKey) -> bool {
536        let streams = self.streams.read().await;
537
538        if let Some(entry_arc) = streams.get(key) {
539            let entry = entry_arc.read().await;
540            matches!(entry.state, StreamState::Active | StreamState::GracePeriod)
541        } else {
542            false
543        }
544    }
545
546    /// Get stream statistics
547    pub async fn get_stream_stats(&self, key: &StreamKey) -> Option<StreamStats> {
548        let streams = self.streams.read().await;
549
550        if let Some(entry_arc) = streams.get(key) {
551            let entry = entry_arc.read().await;
552            Some(StreamStats {
553                subscriber_count: entry.subscriber_count(),
554                has_publisher: entry.publisher_id.is_some(),
555                state: entry.state,
556                gop_frame_count: entry.gop_buffer.frame_count(),
557                gop_size_bytes: entry.gop_buffer.size(),
558            })
559        } else {
560            None
561        }
562    }
563
564    /// Get total number of streams
565    pub async fn stream_count(&self) -> usize {
566        self.streams.read().await.len()
567    }
568
569    /// Run cleanup task once
570    ///
571    /// Removes streams that have:
572    /// - Been in grace period longer than `publisher_grace_period`
573    /// - Been idle longer than `idle_stream_timeout`
574    pub async fn cleanup(&self) {
575        let mut streams = self.streams.write().await;
576        let now = Instant::now();
577
578        let keys_to_remove: Vec<StreamKey> = streams
579            .iter()
580            .filter_map(|(key, entry_arc)| {
581                // Try to get read lock without blocking
582                if let Ok(entry) = entry_arc.try_read() {
583                    let should_remove = match entry.state {
584                        StreamState::GracePeriod => {
585                            if let Some(disconnected_at) = entry.publisher_disconnected_at {
586                                now.duration_since(disconnected_at)
587                                    > self.config.publisher_grace_period
588                            } else {
589                                false
590                            }
591                        }
592                        StreamState::Idle => {
593                            if let Some(disconnected_at) = entry.publisher_disconnected_at {
594                                now.duration_since(disconnected_at)
595                                    > self.config.idle_stream_timeout
596                            } else {
597                                now.duration_since(entry.created_at)
598                                    > self.config.idle_stream_timeout
599                            }
600                        }
601                        StreamState::Active => false,
602                    };
603
604                    if should_remove {
605                        Some(key.clone())
606                    } else {
607                        None
608                    }
609                } else {
610                    None
611                }
612            })
613            .collect();
614
615        for key in keys_to_remove {
616            streams.remove(&key);
617            tracing::info!(stream = %key, "Stream removed by cleanup");
618        }
619    }
620
621    /// Spawn background cleanup task
622    ///
623    /// Returns a handle that can be used to abort the task.
624    pub fn spawn_cleanup_task(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
625        let registry = Arc::clone(self);
626        let interval = registry.config.cleanup_interval;
627
628        tokio::spawn(async move {
629            let mut ticker = tokio::time::interval(interval);
630            loop {
631                ticker.tick().await;
632                registry.cleanup().await;
633            }
634        })
635    }
636}
637
638impl Default for StreamRegistry {
639    fn default() -> Self {
640        Self::new()
641    }
642}
643
644/// Statistics for a stream
645#[derive(Debug, Clone)]
646pub struct StreamStats {
647    /// Number of active subscribers
648    pub subscriber_count: u32,
649    /// Whether the stream has an active publisher
650    pub has_publisher: bool,
651    /// Current stream state
652    pub state: StreamState,
653    /// Number of frames in GOP buffer
654    pub gop_frame_count: usize,
655    /// Size of GOP buffer in bytes
656    pub gop_size_bytes: usize,
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662
663    #[tokio::test]
664    async fn test_register_publisher() {
665        let registry = StreamRegistry::new();
666        let key = StreamKey::new("live", "test_stream");
667
668        // Register publisher
669        registry.register_publisher(&key, 1).await.unwrap();
670        assert!(registry.has_active_stream(&key).await);
671
672        // Can't register another publisher
673        let result = registry.register_publisher(&key, 2).await;
674        assert!(matches!(
675            result,
676            Err(RegistryError::StreamAlreadyPublishing(_))
677        ));
678    }
679
680    #[tokio::test]
681    async fn test_subscribe_unsubscribe() {
682        let registry = StreamRegistry::new();
683        let key = StreamKey::new("live", "test_stream");
684
685        // Need a publisher first
686        registry.register_publisher(&key, 1).await.unwrap();
687
688        // Subscribe
689        let (mut rx, catchup) = registry.subscribe(&key).await.unwrap();
690        assert!(catchup.is_empty()); // No data yet
691
692        // Broadcast a frame
693        let frame = BroadcastFrame::video(0, Bytes::from_static(&[0x17, 0x01]), true, false);
694        registry.broadcast(&key, frame.clone()).await;
695
696        // Receive the frame
697        let received = rx.recv().await.unwrap();
698        assert_eq!(received.timestamp, 0);
699        assert!(received.is_keyframe);
700
701        // Unsubscribe
702        registry.unsubscribe(&key).await;
703
704        let stats = registry.get_stream_stats(&key).await.unwrap();
705        assert_eq!(stats.subscriber_count, 0);
706    }
707
708    #[tokio::test]
709    async fn test_grace_period() {
710        let config =
711            RegistryConfig::default().publisher_grace_period(std::time::Duration::from_millis(100));
712        let registry = StreamRegistry::with_config(config);
713        let key = StreamKey::new("live", "test_stream");
714
715        // Register publisher and subscriber
716        registry.register_publisher(&key, 1).await.unwrap();
717        let (_rx, _) = registry.subscribe(&key).await.unwrap();
718
719        // Publisher disconnects
720        registry.unregister_publisher(&key, 1).await;
721
722        // Stream should be in grace period
723        let stats = registry.get_stream_stats(&key).await.unwrap();
724        assert_eq!(stats.state, StreamState::GracePeriod);
725
726        // Stream still exists
727        assert!(registry.stream_exists(&key).await);
728
729        // New subscriber can still join
730        let result = registry.subscribe(&key).await;
731        assert!(result.is_ok());
732    }
733
734    #[tokio::test]
735    async fn test_publisher_reconnect() {
736        let registry = StreamRegistry::new();
737        let key = StreamKey::new("live", "test_stream");
738
739        // Register publisher
740        registry.register_publisher(&key, 1).await.unwrap();
741
742        // Add subscriber
743        let (_rx, _) = registry.subscribe(&key).await.unwrap();
744
745        // Publisher disconnects
746        registry.unregister_publisher(&key, 1).await;
747
748        // New publisher takes over
749        registry.register_publisher(&key, 2).await.unwrap();
750
751        let stats = registry.get_stream_stats(&key).await.unwrap();
752        assert!(stats.has_publisher);
753        assert_eq!(stats.state, StreamState::Active);
754        assert_eq!(stats.subscriber_count, 1); // Subscriber still there
755    }
756
757    #[tokio::test]
758    async fn test_catchup_frames() {
759        let registry = StreamRegistry::new();
760        let key = StreamKey::new("live", "test_stream");
761
762        registry.register_publisher(&key, 1).await.unwrap();
763
764        // Broadcast sequence headers
765        let video_header = BroadcastFrame::video(0, Bytes::from_static(&[0x17, 0x00]), true, true);
766        let audio_header = BroadcastFrame::audio(0, Bytes::from_static(&[0xAF, 0x00]), true);
767        registry.broadcast(&key, video_header).await;
768        registry.broadcast(&key, audio_header).await;
769
770        // Broadcast a keyframe
771        let keyframe = BroadcastFrame::video(33, Bytes::from_static(&[0x17, 0x01]), true, false);
772        registry.broadcast(&key, keyframe).await;
773
774        // Late joiner subscribes
775        let (_rx, catchup) = registry.subscribe(&key).await.unwrap();
776
777        // Should receive headers + keyframe
778        assert_eq!(catchup.len(), 3);
779        assert!(catchup[0].is_header); // video header
780        assert!(catchup[1].is_header); // audio header
781        assert!(catchup[2].is_keyframe); // keyframe
782    }
783}