1pub mod config;
35
36pub use config::RegistryConfig;
37
38use std::collections::HashMap;
39use std::sync::atomic::{AtomicU32, Ordering};
40use std::sync::Arc;
41use std::time::Instant;
42
43use bytes::Bytes;
44use tokio::sync::{broadcast, RwLock};
45
46use crate::media::flv::FlvTag;
47use crate::media::gop::GopBuffer;
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct StreamKey {
52 pub app: String,
54 pub name: String,
56}
57
58impl StreamKey {
59 pub fn new(app: impl Into<String>, name: impl Into<String>) -> Self {
61 Self {
62 app: app.into(),
63 name: name.into(),
64 }
65 }
66}
67
68impl std::fmt::Display for StreamKey {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "{}/{}", self.app, self.name)
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum FrameType {
77 Video,
79 Audio,
81 Metadata,
83}
84
85#[derive(Debug, Clone)]
89pub struct BroadcastFrame {
90 pub frame_type: FrameType,
92 pub timestamp: u32,
94 pub data: Bytes,
96 pub is_keyframe: bool,
98 pub is_header: bool,
100}
101
102impl BroadcastFrame {
103 pub fn video(timestamp: u32, data: Bytes, is_keyframe: bool, is_header: bool) -> Self {
105 Self {
106 frame_type: FrameType::Video,
107 timestamp,
108 data,
109 is_keyframe,
110 is_header,
111 }
112 }
113
114 pub fn audio(timestamp: u32, data: Bytes, is_header: bool) -> Self {
116 Self {
117 frame_type: FrameType::Audio,
118 timestamp,
119 data,
120 is_keyframe: false,
121 is_header,
122 }
123 }
124
125 pub fn metadata(data: Bytes) -> Self {
127 Self {
128 frame_type: FrameType::Metadata,
129 timestamp: 0,
130 data,
131 is_keyframe: false,
132 is_header: false,
133 }
134 }
135
136 pub fn from_flv_tag(tag: &FlvTag) -> Self {
138 match tag.tag_type {
139 crate::media::flv::FlvTagType::Video => {
140 let is_keyframe = tag.is_keyframe();
141 let is_header = tag.is_avc_sequence_header();
142 Self::video(tag.timestamp, tag.data.clone(), is_keyframe, is_header)
143 }
144 crate::media::flv::FlvTagType::Audio => {
145 let is_header = tag.is_aac_sequence_header();
146 Self::audio(tag.timestamp, tag.data.clone(), is_header)
147 }
148 crate::media::flv::FlvTagType::Script => Self::metadata(tag.data.clone()),
149 }
150 }
151}
152
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum StreamState {
156 Active,
158 GracePeriod,
160 Idle,
162}
163
164pub struct StreamEntry {
166 pub gop_buffer: GopBuffer,
168
169 pub video_header: Option<BroadcastFrame>,
171
172 pub audio_header: Option<BroadcastFrame>,
174
175 pub metadata: Option<BroadcastFrame>,
177
178 pub publisher_id: Option<u64>,
180
181 tx: broadcast::Sender<BroadcastFrame>,
183
184 pub subscriber_count: AtomicU32,
186
187 pub publisher_disconnected_at: Option<Instant>,
189
190 pub created_at: Instant,
192
193 pub state: StreamState,
195}
196
197impl StreamEntry {
198 fn new(config: &RegistryConfig) -> Self {
200 let (tx, _) = broadcast::channel(config.broadcast_capacity);
201
202 Self {
203 gop_buffer: GopBuffer::with_max_size(config.max_gop_size),
204 video_header: None,
205 audio_header: None,
206 metadata: None,
207 publisher_id: None,
208 tx,
209 subscriber_count: AtomicU32::new(0),
210 publisher_disconnected_at: None,
211 created_at: Instant::now(),
212 state: StreamState::Idle,
213 }
214 }
215
216 pub fn subscriber_count(&self) -> u32 {
218 self.subscriber_count.load(Ordering::Relaxed)
219 }
220
221 pub fn has_publisher(&self) -> bool {
223 self.publisher_id.is_some()
224 }
225
226 pub fn get_catchup_frames(&self) -> Vec<BroadcastFrame> {
230 let mut frames = Vec::new();
231
232 if let Some(ref meta) = self.metadata {
234 frames.push(meta.clone());
235 }
236
237 if let Some(ref video) = self.video_header {
239 frames.push(video.clone());
240 }
241 if let Some(ref audio) = self.audio_header {
242 frames.push(audio.clone());
243 }
244
245 for tag in self.gop_buffer.get_catchup_data() {
247 frames.push(BroadcastFrame::from_flv_tag(&tag));
248 }
249
250 frames
251 }
252}
253
254#[derive(Debug, Clone)]
256pub enum RegistryError {
257 StreamNotFound(StreamKey),
259 StreamAlreadyPublishing(StreamKey),
261 PublisherMismatch,
263 StreamNotActive(StreamKey),
265}
266
267impl std::fmt::Display for RegistryError {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 match self {
270 RegistryError::StreamNotFound(key) => write!(f, "Stream not found: {}", key),
271 RegistryError::StreamAlreadyPublishing(key) => {
272 write!(f, "Stream already has a publisher: {}", key)
273 }
274 RegistryError::PublisherMismatch => write!(f, "Publisher ID mismatch"),
275 RegistryError::StreamNotActive(key) => write!(f, "Stream not active: {}", key),
276 }
277 }
278}
279
280impl std::error::Error for RegistryError {}
281
282pub struct StreamRegistry {
287 streams: RwLock<HashMap<StreamKey, Arc<RwLock<StreamEntry>>>>,
289
290 config: RegistryConfig,
292}
293
294impl StreamRegistry {
295 pub fn new() -> Self {
297 Self::with_config(RegistryConfig::default())
298 }
299
300 pub fn with_config(config: RegistryConfig) -> Self {
302 Self {
303 streams: RwLock::new(HashMap::new()),
304 config,
305 }
306 }
307
308 pub fn config(&self) -> &RegistryConfig {
310 &self.config
311 }
312
313 pub async fn register_publisher(
319 &self,
320 key: &StreamKey,
321 session_id: u64,
322 ) -> Result<(), RegistryError> {
323 let mut streams = self.streams.write().await;
324
325 if let Some(entry_arc) = streams.get(key) {
326 let mut entry = entry_arc.write().await;
327
328 match entry.state {
330 StreamState::Active if entry.publisher_id.is_some() => {
331 return Err(RegistryError::StreamAlreadyPublishing(key.clone()));
332 }
333 StreamState::GracePeriod | StreamState::Idle | StreamState::Active => {
334 entry.publisher_id = Some(session_id);
336 entry.publisher_disconnected_at = None;
337 entry.state = StreamState::Active;
338
339 tracing::info!(
340 stream = %key,
341 session_id = session_id,
342 subscribers = entry.subscriber_count(),
343 "Publisher registered (existing stream)"
344 );
345 }
346 }
347 } else {
348 let mut entry = StreamEntry::new(&self.config);
350 entry.publisher_id = Some(session_id);
351 entry.state = StreamState::Active;
352
353 streams.insert(key.clone(), Arc::new(RwLock::new(entry)));
354
355 tracing::info!(
356 stream = %key,
357 session_id = session_id,
358 "Publisher registered (new stream)"
359 );
360 }
361
362 Ok(())
363 }
364
365 pub async fn unregister_publisher(&self, key: &StreamKey, session_id: u64) {
370 let streams = self.streams.read().await;
371
372 if let Some(entry_arc) = streams.get(key) {
373 let mut entry = entry_arc.write().await;
374
375 if entry.publisher_id != Some(session_id) {
377 tracing::warn!(
378 stream = %key,
379 expected = ?entry.publisher_id,
380 actual = session_id,
381 "Publisher unregister mismatch"
382 );
383 return;
384 }
385
386 entry.publisher_id = None;
387 entry.publisher_disconnected_at = Some(Instant::now());
388
389 if entry.subscriber_count() > 0 {
391 entry.state = StreamState::GracePeriod;
392 tracing::info!(
393 stream = %key,
394 session_id = session_id,
395 subscribers = entry.subscriber_count(),
396 grace_period_secs = self.config.publisher_grace_period.as_secs(),
397 "Publisher disconnected, entering grace period"
398 );
399 } else {
400 entry.state = StreamState::Idle;
401 tracing::info!(
402 stream = %key,
403 session_id = session_id,
404 "Publisher disconnected, no subscribers"
405 );
406 }
407 }
408 }
409
410 pub async fn subscribe(
415 &self,
416 key: &StreamKey,
417 ) -> Result<(broadcast::Receiver<BroadcastFrame>, Vec<BroadcastFrame>), RegistryError> {
418 let streams = self.streams.read().await;
419
420 let entry_arc = streams
421 .get(key)
422 .ok_or_else(|| RegistryError::StreamNotFound(key.clone()))?;
423
424 let entry = entry_arc.read().await;
425
426 if entry.state == StreamState::Idle && entry.publisher_id.is_none() {
428 return Err(RegistryError::StreamNotActive(key.clone()));
429 }
430
431 let rx = entry.tx.subscribe();
433 let catchup = entry.get_catchup_frames();
434
435 entry.subscriber_count.fetch_add(1, Ordering::Relaxed);
437
438 tracing::info!(
439 stream = %key,
440 subscribers = entry.subscriber_count(),
441 catchup_frames = catchup.len(),
442 "Subscriber added"
443 );
444
445 Ok((rx, catchup))
446 }
447
448 pub async fn unsubscribe(&self, key: &StreamKey) {
450 let streams = self.streams.read().await;
451
452 if let Some(entry_arc) = streams.get(key) {
453 let entry = entry_arc.read().await;
454 let prev = entry.subscriber_count.fetch_sub(1, Ordering::Relaxed);
455
456 tracing::debug!(
457 stream = %key,
458 subscribers = prev.saturating_sub(1),
459 "Subscriber removed"
460 );
461 }
462 }
463
464 pub async fn broadcast(&self, key: &StreamKey, frame: BroadcastFrame) {
468 let streams = self.streams.read().await;
469
470 if let Some(entry_arc) = streams.get(key) {
471 let mut entry = entry_arc.write().await;
472
473 match frame.frame_type {
475 FrameType::Video if frame.is_header => {
476 entry.video_header = Some(frame.clone());
477 }
478 FrameType::Audio if frame.is_header => {
479 entry.audio_header = Some(frame.clone());
480 }
481 FrameType::Metadata => {
482 entry.metadata = Some(frame.clone());
483 }
484 _ => {}
485 }
486
487 if frame.frame_type == FrameType::Video && !frame.is_header {
489 let tag = FlvTag::video(frame.timestamp, frame.data.clone());
490 entry.gop_buffer.push(tag);
491 }
492
493 let _ = entry.tx.send(frame);
496 }
497 }
498
499 pub async fn get_sequence_headers(&self, key: &StreamKey) -> Vec<BroadcastFrame> {
503 let streams = self.streams.read().await;
504
505 if let Some(entry_arc) = streams.get(key) {
506 let entry = entry_arc.read().await;
507 let mut frames = Vec::with_capacity(2);
508
509 if let Some(ref video) = entry.video_header {
510 frames.push(video.clone());
511 }
512 if let Some(ref audio) = entry.audio_header {
513 frames.push(audio.clone());
514 }
515
516 frames
517 } else {
518 Vec::new()
519 }
520 }
521
522 pub async fn has_active_stream(&self, key: &StreamKey) -> bool {
524 let streams = self.streams.read().await;
525
526 if let Some(entry_arc) = streams.get(key) {
527 let entry = entry_arc.read().await;
528 entry.state == StreamState::Active && entry.publisher_id.is_some()
529 } else {
530 false
531 }
532 }
533
534 pub async fn stream_exists(&self, key: &StreamKey) -> bool {
536 let streams = self.streams.read().await;
537
538 if let Some(entry_arc) = streams.get(key) {
539 let entry = entry_arc.read().await;
540 matches!(entry.state, StreamState::Active | StreamState::GracePeriod)
541 } else {
542 false
543 }
544 }
545
546 pub async fn get_stream_stats(&self, key: &StreamKey) -> Option<StreamStats> {
548 let streams = self.streams.read().await;
549
550 if let Some(entry_arc) = streams.get(key) {
551 let entry = entry_arc.read().await;
552 Some(StreamStats {
553 subscriber_count: entry.subscriber_count(),
554 has_publisher: entry.publisher_id.is_some(),
555 state: entry.state,
556 gop_frame_count: entry.gop_buffer.frame_count(),
557 gop_size_bytes: entry.gop_buffer.size(),
558 })
559 } else {
560 None
561 }
562 }
563
564 pub async fn stream_count(&self) -> usize {
566 self.streams.read().await.len()
567 }
568
569 pub async fn cleanup(&self) {
575 let mut streams = self.streams.write().await;
576 let now = Instant::now();
577
578 let keys_to_remove: Vec<StreamKey> = streams
579 .iter()
580 .filter_map(|(key, entry_arc)| {
581 if let Ok(entry) = entry_arc.try_read() {
583 let should_remove = match entry.state {
584 StreamState::GracePeriod => {
585 if let Some(disconnected_at) = entry.publisher_disconnected_at {
586 now.duration_since(disconnected_at)
587 > self.config.publisher_grace_period
588 } else {
589 false
590 }
591 }
592 StreamState::Idle => {
593 if let Some(disconnected_at) = entry.publisher_disconnected_at {
594 now.duration_since(disconnected_at)
595 > self.config.idle_stream_timeout
596 } else {
597 now.duration_since(entry.created_at)
598 > self.config.idle_stream_timeout
599 }
600 }
601 StreamState::Active => false,
602 };
603
604 if should_remove {
605 Some(key.clone())
606 } else {
607 None
608 }
609 } else {
610 None
611 }
612 })
613 .collect();
614
615 for key in keys_to_remove {
616 streams.remove(&key);
617 tracing::info!(stream = %key, "Stream removed by cleanup");
618 }
619 }
620
621 pub fn spawn_cleanup_task(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
625 let registry = Arc::clone(self);
626 let interval = registry.config.cleanup_interval;
627
628 tokio::spawn(async move {
629 let mut ticker = tokio::time::interval(interval);
630 loop {
631 ticker.tick().await;
632 registry.cleanup().await;
633 }
634 })
635 }
636}
637
638impl Default for StreamRegistry {
639 fn default() -> Self {
640 Self::new()
641 }
642}
643
644#[derive(Debug, Clone)]
646pub struct StreamStats {
647 pub subscriber_count: u32,
649 pub has_publisher: bool,
651 pub state: StreamState,
653 pub gop_frame_count: usize,
655 pub gop_size_bytes: usize,
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662
663 #[tokio::test]
664 async fn test_register_publisher() {
665 let registry = StreamRegistry::new();
666 let key = StreamKey::new("live", "test_stream");
667
668 registry.register_publisher(&key, 1).await.unwrap();
670 assert!(registry.has_active_stream(&key).await);
671
672 let result = registry.register_publisher(&key, 2).await;
674 assert!(matches!(
675 result,
676 Err(RegistryError::StreamAlreadyPublishing(_))
677 ));
678 }
679
680 #[tokio::test]
681 async fn test_subscribe_unsubscribe() {
682 let registry = StreamRegistry::new();
683 let key = StreamKey::new("live", "test_stream");
684
685 registry.register_publisher(&key, 1).await.unwrap();
687
688 let (mut rx, catchup) = registry.subscribe(&key).await.unwrap();
690 assert!(catchup.is_empty()); let frame = BroadcastFrame::video(0, Bytes::from_static(&[0x17, 0x01]), true, false);
694 registry.broadcast(&key, frame.clone()).await;
695
696 let received = rx.recv().await.unwrap();
698 assert_eq!(received.timestamp, 0);
699 assert!(received.is_keyframe);
700
701 registry.unsubscribe(&key).await;
703
704 let stats = registry.get_stream_stats(&key).await.unwrap();
705 assert_eq!(stats.subscriber_count, 0);
706 }
707
708 #[tokio::test]
709 async fn test_grace_period() {
710 let config =
711 RegistryConfig::default().publisher_grace_period(std::time::Duration::from_millis(100));
712 let registry = StreamRegistry::with_config(config);
713 let key = StreamKey::new("live", "test_stream");
714
715 registry.register_publisher(&key, 1).await.unwrap();
717 let (_rx, _) = registry.subscribe(&key).await.unwrap();
718
719 registry.unregister_publisher(&key, 1).await;
721
722 let stats = registry.get_stream_stats(&key).await.unwrap();
724 assert_eq!(stats.state, StreamState::GracePeriod);
725
726 assert!(registry.stream_exists(&key).await);
728
729 let result = registry.subscribe(&key).await;
731 assert!(result.is_ok());
732 }
733
734 #[tokio::test]
735 async fn test_publisher_reconnect() {
736 let registry = StreamRegistry::new();
737 let key = StreamKey::new("live", "test_stream");
738
739 registry.register_publisher(&key, 1).await.unwrap();
741
742 let (_rx, _) = registry.subscribe(&key).await.unwrap();
744
745 registry.unregister_publisher(&key, 1).await;
747
748 registry.register_publisher(&key, 2).await.unwrap();
750
751 let stats = registry.get_stream_stats(&key).await.unwrap();
752 assert!(stats.has_publisher);
753 assert_eq!(stats.state, StreamState::Active);
754 assert_eq!(stats.subscriber_count, 1); }
756
757 #[tokio::test]
758 async fn test_catchup_frames() {
759 let registry = StreamRegistry::new();
760 let key = StreamKey::new("live", "test_stream");
761
762 registry.register_publisher(&key, 1).await.unwrap();
763
764 let video_header = BroadcastFrame::video(0, Bytes::from_static(&[0x17, 0x00]), true, true);
766 let audio_header = BroadcastFrame::audio(0, Bytes::from_static(&[0xAF, 0x00]), true);
767 registry.broadcast(&key, video_header).await;
768 registry.broadcast(&key, audio_header).await;
769
770 let keyframe = BroadcastFrame::video(33, Bytes::from_static(&[0x17, 0x01]), true, false);
772 registry.broadcast(&key, keyframe).await;
773
774 let (_rx, catchup) = registry.subscribe(&key).await.unwrap();
776
777 assert_eq!(catchup.len(), 3);
779 assert!(catchup[0].is_header); assert!(catchup[1].is_header); assert!(catchup[2].is_keyframe); }
783}