rtmp_rs/protocol/
quirks.rs

1//! OBS and encoder compatibility quirks
2//!
3//! Different RTMP encoders (OBS, ffmpeg, Wirecast, etc.) have various
4//! non-standard behaviors. This module documents known quirks and provides
5//! helpers for handling them.
6//!
7//! # Known Quirks
8//!
9//! ## OBS Studio
10//! - Sends FCPublish before publish (Twitch/YouTube compatibility)
11//! - May send releaseStream before connect completes
12//! - Sometimes omits object end markers in AMF
13//! - Sends @setDataFrame with onMetaData as nested name
14//!
15//! ## ffmpeg
16//! - Uses different transaction IDs than expected
17//! - May send createStream before connect response
18//! - Duration in metadata may be 0 for live streams
19//!
20//! ## Flash Media Encoder
21//! - Uses legacy AMF0 object encoding
22//! - May send duplicate metadata
23//!
24//! ## Wirecast
25//! - Sends multiple audio/video sequence headers
26//! - May have timestamp discontinuities
27
28use crate::protocol::message::Command;
29
30/// Configuration for handling encoder quirks
31#[derive(Debug, Clone)]
32pub struct QuirksConfig {
33    /// Accept commands before handshake completes
34    pub allow_early_commands: bool,
35
36    /// Accept FCPublish/releaseStream before connect
37    pub allow_fc_before_connect: bool,
38
39    /// Accept malformed AMF (missing end markers)
40    pub lenient_amf: bool,
41
42    /// Accept timestamp regression
43    pub allow_timestamp_regression: bool,
44
45    /// Accept duplicate metadata
46    pub allow_duplicate_metadata: bool,
47
48    /// Accept empty app names
49    pub allow_empty_app: bool,
50
51    /// Accept oversized chunks (larger than negotiated)
52    pub allow_oversized_chunks: bool,
53}
54
55impl Default for QuirksConfig {
56    fn default() -> Self {
57        Self {
58            // Default to lenient for maximum compatibility
59            allow_early_commands: true,
60            allow_fc_before_connect: true,
61            lenient_amf: true,
62            allow_timestamp_regression: true,
63            allow_duplicate_metadata: true,
64            allow_empty_app: true,
65            allow_oversized_chunks: true,
66        }
67    }
68}
69
70impl QuirksConfig {
71    /// Strict mode - reject non-conformant streams
72    pub fn strict() -> Self {
73        Self {
74            allow_early_commands: false,
75            allow_fc_before_connect: false,
76            lenient_amf: false,
77            allow_timestamp_regression: false,
78            allow_duplicate_metadata: false,
79            allow_empty_app: false,
80            allow_oversized_chunks: false,
81        }
82    }
83}
84
85/// Detected encoder type
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum EncoderType {
88    Unknown,
89    Obs,
90    Ffmpeg,
91    Wirecast,
92    FlashMediaEncoder,
93    Xsplit,
94    Larix,
95    Other,
96}
97
98impl EncoderType {
99    /// Detect encoder from connect command's flashVer
100    pub fn from_flash_ver(flash_ver: &str) -> Self {
101        let lower = flash_ver.to_lowercase();
102
103        if lower.contains("obs") {
104            EncoderType::Obs
105        } else if lower.contains("fmle") || lower.contains("flash media") {
106            EncoderType::FlashMediaEncoder
107        } else if lower.contains("wirecast") {
108            EncoderType::Wirecast
109        } else if lower.contains("xsplit") {
110            EncoderType::Xsplit
111        } else if lower.contains("larix") {
112            EncoderType::Larix
113        } else if lower.contains("lavf") || lower.contains("librtmp") {
114            EncoderType::Ffmpeg
115        } else {
116            EncoderType::Other
117        }
118    }
119}
120
121/// OBS/Twitch command sequence helper
122///
123/// Many streaming platforms expect a specific command sequence:
124/// 1. connect -> _result
125/// 2. releaseStream (optional)
126/// 3. FCPublish
127/// 4. createStream -> _result
128/// 5. publish -> onStatus
129pub struct CommandSequence {
130    state: CommandSequenceState,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134enum CommandSequenceState {
135    Initial,
136    Connected,
137    StreamCreated,
138    Publishing,
139    Playing,
140}
141
142impl CommandSequence {
143    pub fn new() -> Self {
144        Self {
145            state: CommandSequenceState::Initial,
146        }
147    }
148
149    /// Check if a command is valid in the current state
150    pub fn is_valid_command(&self, cmd: &Command) -> bool {
151        match cmd.name.as_str() {
152            "connect" => self.state == CommandSequenceState::Initial,
153            "releaseStream" | "FCPublish" => {
154                // These can come before or after connect completes (OBS quirk)
155                true
156            }
157            "createStream" => {
158                self.state == CommandSequenceState::Connected
159                    || self.state == CommandSequenceState::Initial // OBS quirk
160            }
161            "publish" => self.state == CommandSequenceState::StreamCreated,
162            "play" => self.state == CommandSequenceState::StreamCreated,
163            "FCUnpublish" | "deleteStream" | "closeStream" => {
164                self.state == CommandSequenceState::Publishing
165                    || self.state == CommandSequenceState::Playing
166            }
167            _ => true, // Allow unknown commands
168        }
169    }
170
171    /// Transition state based on command response
172    pub fn on_command(&mut self, cmd_name: &str) {
173        match cmd_name {
174            "connect" => self.state = CommandSequenceState::Connected,
175            "createStream" => self.state = CommandSequenceState::StreamCreated,
176            "publish" => self.state = CommandSequenceState::Publishing,
177            "play" => self.state = CommandSequenceState::Playing,
178            "FCUnpublish" | "deleteStream" | "closeStream" => {
179                self.state = CommandSequenceState::Connected;
180            }
181            _ => {}
182        }
183    }
184
185    /// Get current state
186    pub fn state(&self) -> &'static str {
187        match self.state {
188            CommandSequenceState::Initial => "initial",
189            CommandSequenceState::Connected => "connected",
190            CommandSequenceState::StreamCreated => "stream_created",
191            CommandSequenceState::Publishing => "publishing",
192            CommandSequenceState::Playing => "playing",
193        }
194    }
195}
196
197impl Default for CommandSequence {
198    fn default() -> Self {
199        Self::new()
200    }
201}
202
203/// Normalize timestamp to handle regression
204///
205/// Some encoders have timestamp discontinuities or regressions.
206/// This function adjusts timestamps to be monotonically increasing.
207pub struct TimestampNormalizer {
208    last_timestamp: u32,
209    offset: u32,
210}
211
212impl TimestampNormalizer {
213    pub fn new() -> Self {
214        Self {
215            last_timestamp: 0,
216            offset: 0,
217        }
218    }
219
220    /// Normalize a timestamp, handling regression
221    pub fn normalize(&mut self, timestamp: u32) -> u32 {
222        // Check for significant regression (more than 1 second)
223        if timestamp < self.last_timestamp && self.last_timestamp - timestamp > 1000 {
224            // Timestamp regressed significantly, adjust offset
225            self.offset = self.last_timestamp + 1;
226        }
227
228        let normalized = timestamp.wrapping_add(self.offset);
229        self.last_timestamp = normalized;
230        normalized
231    }
232
233    /// Reset normalizer state
234    pub fn reset(&mut self) {
235        self.last_timestamp = 0;
236        self.offset = 0;
237    }
238}
239
240impl Default for TimestampNormalizer {
241    fn default() -> Self {
242        Self::new()
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    #[test]
251    fn test_encoder_detection() {
252        assert_eq!(
253            EncoderType::from_flash_ver("OBS-Studio/29.1.3"),
254            EncoderType::Obs
255        );
256        assert_eq!(
257            EncoderType::from_flash_ver("FMLE/3.0"),
258            EncoderType::FlashMediaEncoder
259        );
260        assert_eq!(
261            EncoderType::from_flash_ver("Lavf58.76.100"),
262            EncoderType::Ffmpeg
263        );
264    }
265
266    #[test]
267    fn test_timestamp_normalizer() {
268        let mut normalizer = TimestampNormalizer::new();
269
270        assert_eq!(normalizer.normalize(0), 0);
271        assert_eq!(normalizer.normalize(1000), 1000);
272        assert_eq!(normalizer.normalize(2000), 2000);
273
274        // Small regression (within 1 second) - allow it (no offset adjustment)
275        assert_eq!(normalizer.normalize(1500), 1500);
276
277        // Large regression (> 1 second) - adjust offset
278        // last_timestamp is now 1500, regression to 100 is 1400ms > 1000ms
279        // So offset becomes 1500 + 1 = 1501
280        // Result = 100 + 1501 = 1601
281        assert_eq!(normalizer.normalize(100), 1601);
282    }
283}