rtmp_rs/session/
stream.rs1use std::time::Instant;
7
8use crate::media::gop::GopBuffer;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum StreamMode {
13 Idle,
15 Publishing,
17 Playing,
19}
20
21#[derive(Debug)]
23pub struct StreamState {
24 pub id: u32,
26
27 pub mode: StreamMode,
29
30 pub stream_key: Option<String>,
32
33 pub publish_type: Option<String>,
35
36 pub started_at: Option<Instant>,
38
39 pub has_video_header: bool,
41
42 pub has_audio_header: bool,
44
45 pub has_metadata: bool,
47
48 pub last_video_ts: u32,
50
51 pub last_audio_ts: u32,
53
54 pub video_frames: u64,
56
57 pub audio_frames: u64,
59
60 pub keyframes: u64,
62
63 pub bytes_received: u64,
65
66 pub gop_buffer: GopBuffer,
68}
69
70impl StreamState {
71 pub fn new(id: u32) -> Self {
73 Self {
74 id,
75 mode: StreamMode::Idle,
76 stream_key: None,
77 publish_type: None,
78 started_at: None,
79 has_video_header: false,
80 has_audio_header: false,
81 has_metadata: false,
82 last_video_ts: 0,
83 last_audio_ts: 0,
84 video_frames: 0,
85 audio_frames: 0,
86 keyframes: 0,
87 bytes_received: 0,
88 gop_buffer: GopBuffer::new(),
89 }
90 }
91
92 pub fn start_publish(&mut self, stream_key: String, publish_type: String) {
94 self.mode = StreamMode::Publishing;
95 self.stream_key = Some(stream_key);
96 self.publish_type = Some(publish_type);
97 self.started_at = Some(Instant::now());
98 }
99
100 pub fn start_play(&mut self, stream_name: String) {
102 self.mode = StreamMode::Playing;
103 self.stream_key = Some(stream_name);
104 self.started_at = Some(Instant::now());
105 }
106
107 pub fn stop(&mut self) {
109 self.mode = StreamMode::Idle;
110 }
111
112 pub fn is_publishing(&self) -> bool {
114 self.mode == StreamMode::Publishing
115 }
116
117 pub fn is_playing(&self) -> bool {
119 self.mode == StreamMode::Playing
120 }
121
122 pub fn is_ready(&self) -> bool {
124 self.has_video_header || self.has_audio_header
126 }
127
128 pub fn duration(&self) -> Option<std::time::Duration> {
130 self.started_at.map(|t| t.elapsed())
131 }
132
133 pub fn on_video(&mut self, timestamp: u32, is_keyframe: bool, is_header: bool, size: usize) {
135 self.last_video_ts = timestamp;
136 self.video_frames += 1;
137 self.bytes_received += size as u64;
138
139 if is_header {
140 self.has_video_header = true;
141 }
142 if is_keyframe {
143 self.keyframes += 1;
144 }
145 }
146
147 pub fn on_audio(&mut self, timestamp: u32, is_header: bool, size: usize) {
149 self.last_audio_ts = timestamp;
150 self.audio_frames += 1;
151 self.bytes_received += size as u64;
152
153 if is_header {
154 self.has_audio_header = true;
155 }
156 }
157
158 pub fn on_metadata(&mut self) {
160 self.has_metadata = true;
161 }
162
163 pub fn bitrate(&self) -> Option<u64> {
165 let duration = self.duration()?.as_secs();
166 if duration > 0 {
167 Some((self.bytes_received * 8) / duration)
168 } else {
169 None
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn test_stream_publish() {
180 let mut stream = StreamState::new(1);
181
182 assert_eq!(stream.mode, StreamMode::Idle);
183 assert!(!stream.is_publishing());
184
185 stream.start_publish("test_key".into(), "live".into());
186
187 assert_eq!(stream.mode, StreamMode::Publishing);
188 assert!(stream.is_publishing());
189 assert_eq!(stream.stream_key, Some("test_key".into()));
190 assert!(stream.started_at.is_some());
191 }
192
193 #[test]
194 fn test_stream_ready() {
195 let mut stream = StreamState::new(1);
196
197 assert!(!stream.is_ready());
198
199 stream.on_video(0, false, true, 100);
200 assert!(stream.is_ready());
201 assert!(stream.has_video_header);
202 }
203}