rtmp_rs/session/
stream.rs

1//! Per-stream state management
2//!
3//! Each RTMP message stream (identified by stream ID) has its own state,
4//! including publish/play mode, stream key, and media state.
5
6use std::time::Instant;
7
8use crate::media::gop::GopBuffer;
9
10/// Stream mode (publishing or playing)
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum StreamMode {
13    /// Stream is idle (created but not publishing/playing)
14    Idle,
15    /// Stream is publishing (receiving media)
16    Publishing,
17    /// Stream is playing (sending media)
18    Playing,
19}
20
21/// Per-stream state
22#[derive(Debug)]
23pub struct StreamState {
24    /// Message stream ID
25    pub id: u32,
26
27    /// Current mode
28    pub mode: StreamMode,
29
30    /// Stream key/name (for publish/play)
31    pub stream_key: Option<String>,
32
33    /// Publish type ("live", "record", "append")
34    pub publish_type: Option<String>,
35
36    /// Time when stream became active
37    pub started_at: Option<Instant>,
38
39    /// Whether we've received video sequence header
40    pub has_video_header: bool,
41
42    /// Whether we've received audio sequence header
43    pub has_audio_header: bool,
44
45    /// Whether we've received metadata
46    pub has_metadata: bool,
47
48    /// Last video timestamp
49    pub last_video_ts: u32,
50
51    /// Last audio timestamp
52    pub last_audio_ts: u32,
53
54    /// Video frames received
55    pub video_frames: u64,
56
57    /// Audio frames received
58    pub audio_frames: u64,
59
60    /// Keyframes received
61    pub keyframes: u64,
62
63    /// Total bytes received on this stream
64    pub bytes_received: u64,
65
66    /// GOP buffer for late-joiner support
67    pub gop_buffer: GopBuffer,
68}
69
70impl StreamState {
71    /// Create a new stream state
72    pub fn new(id: u32) -> Self {
73        Self {
74            id,
75            mode: StreamMode::Idle,
76            stream_key: None,
77            publish_type: None,
78            started_at: None,
79            has_video_header: false,
80            has_audio_header: false,
81            has_metadata: false,
82            last_video_ts: 0,
83            last_audio_ts: 0,
84            video_frames: 0,
85            audio_frames: 0,
86            keyframes: 0,
87            bytes_received: 0,
88            gop_buffer: GopBuffer::new(),
89        }
90    }
91
92    /// Start publishing on this stream
93    pub fn start_publish(&mut self, stream_key: String, publish_type: String) {
94        self.mode = StreamMode::Publishing;
95        self.stream_key = Some(stream_key);
96        self.publish_type = Some(publish_type);
97        self.started_at = Some(Instant::now());
98    }
99
100    /// Start playing on this stream
101    pub fn start_play(&mut self, stream_name: String) {
102        self.mode = StreamMode::Playing;
103        self.stream_key = Some(stream_name);
104        self.started_at = Some(Instant::now());
105    }
106
107    /// Stop the stream
108    pub fn stop(&mut self) {
109        self.mode = StreamMode::Idle;
110    }
111
112    /// Check if stream is publishing
113    pub fn is_publishing(&self) -> bool {
114        self.mode == StreamMode::Publishing
115    }
116
117    /// Check if stream is playing
118    pub fn is_playing(&self) -> bool {
119        self.mode == StreamMode::Playing
120    }
121
122    /// Check if stream is ready (has required headers)
123    pub fn is_ready(&self) -> bool {
124        // A stream is ready if it has at least video or audio header
125        self.has_video_header || self.has_audio_header
126    }
127
128    /// Get stream duration
129    pub fn duration(&self) -> Option<std::time::Duration> {
130        self.started_at.map(|t| t.elapsed())
131    }
132
133    /// Update video state
134    pub fn on_video(&mut self, timestamp: u32, is_keyframe: bool, is_header: bool, size: usize) {
135        self.last_video_ts = timestamp;
136        self.video_frames += 1;
137        self.bytes_received += size as u64;
138
139        if is_header {
140            self.has_video_header = true;
141        }
142        if is_keyframe {
143            self.keyframes += 1;
144        }
145    }
146
147    /// Update audio state
148    pub fn on_audio(&mut self, timestamp: u32, is_header: bool, size: usize) {
149        self.last_audio_ts = timestamp;
150        self.audio_frames += 1;
151        self.bytes_received += size as u64;
152
153        if is_header {
154            self.has_audio_header = true;
155        }
156    }
157
158    /// Mark metadata received
159    pub fn on_metadata(&mut self) {
160        self.has_metadata = true;
161    }
162
163    /// Get bitrate estimate (bits per second)
164    pub fn bitrate(&self) -> Option<u64> {
165        let duration = self.duration()?.as_secs();
166        if duration > 0 {
167            Some((self.bytes_received * 8) / duration)
168        } else {
169            None
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    #[test]
179    fn test_stream_publish() {
180        let mut stream = StreamState::new(1);
181
182        assert_eq!(stream.mode, StreamMode::Idle);
183        assert!(!stream.is_publishing());
184
185        stream.start_publish("test_key".into(), "live".into());
186
187        assert_eq!(stream.mode, StreamMode::Publishing);
188        assert!(stream.is_publishing());
189        assert_eq!(stream.stream_key, Some("test_key".into()));
190        assert!(stream.started_at.is_some());
191    }
192
193    #[test]
194    fn test_stream_ready() {
195        let mut stream = StreamState::new(1);
196
197        assert!(!stream.is_ready());
198
199        stream.on_video(0, false, true, 100);
200        assert!(stream.is_ready());
201        assert!(stream.has_video_header);
202    }
203}