rtmp_rs/protocol/
quirks.rs

1//! OBS and encoder compatibility quirks
2//!
3//! Different RTMP encoders (OBS, ffmpeg, Wirecast, etc.) have various
4//! non-standard behaviors. This module documents known quirks and provides
5//! helpers for handling them.
6//!
7//! # Known Quirks
8//!
9//! ## OBS Studio
10//! - Sends FCPublish before publish (Twitch/YouTube compatibility)
11//! - May send releaseStream before connect completes
12//! - Sometimes omits object end markers in AMF
13//! - Sends @setDataFrame with onMetaData as nested name
14//!
15//! ## ffmpeg
16//! - Uses different transaction IDs than expected
17//! - May send createStream before connect response
18//! - Duration in metadata may be 0 for live streams
19//!
20//! ## Flash Media Encoder
21//! - Uses legacy AMF0 object encoding
22//! - May send duplicate metadata
23//!
24//! ## Wirecast
25//! - Sends multiple audio/video sequence headers
26//! - May have timestamp discontinuities
27
28use crate::protocol::message::Command;
29
30/// Configuration for handling encoder quirks
31#[derive(Debug, Clone)]
32pub struct QuirksConfig {
33    /// Accept commands before handshake completes
34    pub allow_early_commands: bool,
35
36    /// Accept FCPublish/releaseStream before connect
37    pub allow_fc_before_connect: bool,
38
39    /// Accept malformed AMF (missing end markers)
40    pub lenient_amf: bool,
41
42    /// Accept timestamp regression
43    pub allow_timestamp_regression: bool,
44
45    /// Accept duplicate metadata
46    pub allow_duplicate_metadata: bool,
47
48    /// Accept empty app names
49    pub allow_empty_app: bool,
50
51    /// Accept oversized chunks (larger than negotiated)
52    pub allow_oversized_chunks: bool,
53}
54
55impl Default for QuirksConfig {
56    fn default() -> Self {
57        Self {
58            // Default to lenient for maximum compatibility
59            allow_early_commands: true,
60            allow_fc_before_connect: true,
61            lenient_amf: true,
62            allow_timestamp_regression: true,
63            allow_duplicate_metadata: true,
64            allow_empty_app: true,
65            allow_oversized_chunks: true,
66        }
67    }
68}
69
70impl QuirksConfig {
71    /// Strict mode - reject non-conformant streams
72    pub fn strict() -> Self {
73        Self {
74            allow_early_commands: false,
75            allow_fc_before_connect: false,
76            lenient_amf: false,
77            allow_timestamp_regression: false,
78            allow_duplicate_metadata: false,
79            allow_empty_app: false,
80            allow_oversized_chunks: false,
81        }
82    }
83}
84
85/// Detected encoder type
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum EncoderType {
88    Unknown,
89    Obs,
90    Ffmpeg,
91    Wirecast,
92    FlashMediaEncoder,
93    Xsplit,
94    Larix,
95    Other,
96}
97
98impl EncoderType {
99    /// Detect encoder from connect command's flashVer
100    pub fn from_flash_ver(flash_ver: &str) -> Self {
101        let lower = flash_ver.to_lowercase();
102
103        if lower.contains("obs") {
104            EncoderType::Obs
105        } else if lower.contains("fmle") || lower.contains("flash media") {
106            EncoderType::FlashMediaEncoder
107        } else if lower.contains("wirecast") {
108            EncoderType::Wirecast
109        } else if lower.contains("xsplit") {
110            EncoderType::Xsplit
111        } else if lower.contains("larix") {
112            EncoderType::Larix
113        } else if lower.contains("lavf") || lower.contains("librtmp") {
114            EncoderType::Ffmpeg
115        } else {
116            EncoderType::Other
117        }
118    }
119}
120
121/// OBS/Twitch command sequence helper
122///
123/// Many streaming platforms expect a specific command sequence:
124/// 1. connect -> _result
125/// 2. releaseStream (optional)
126/// 3. FCPublish
127/// 4. createStream -> _result
128/// 5. publish -> onStatus
129pub struct CommandSequence {
130    state: CommandSequenceState,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134enum CommandSequenceState {
135    Initial,
136    Connected,
137    StreamCreated,
138    Publishing,
139    Playing,
140}
141
142impl CommandSequence {
143    pub fn new() -> Self {
144        Self {
145            state: CommandSequenceState::Initial,
146        }
147    }
148
149    /// Check if a command is valid in the current state
150    pub fn is_valid_command(&self, cmd: &Command) -> bool {
151        match cmd.name.as_str() {
152            "connect" => self.state == CommandSequenceState::Initial,
153            "releaseStream" | "FCPublish" => {
154                // These can come before or after connect completes (OBS quirk)
155                true
156            }
157            "createStream" => {
158                self.state == CommandSequenceState::Connected
159                    || self.state == CommandSequenceState::Initial // OBS quirk
160            }
161            "publish" => self.state == CommandSequenceState::StreamCreated,
162            "play" => self.state == CommandSequenceState::StreamCreated,
163            "FCUnpublish" | "deleteStream" | "closeStream" => {
164                self.state == CommandSequenceState::Publishing
165                    || self.state == CommandSequenceState::Playing
166            }
167            _ => true, // Allow unknown commands
168        }
169    }
170
171    /// Transition state based on command response
172    pub fn on_command(&mut self, cmd_name: &str) {
173        match cmd_name {
174            "connect" => self.state = CommandSequenceState::Connected,
175            "createStream" => self.state = CommandSequenceState::StreamCreated,
176            "publish" => self.state = CommandSequenceState::Publishing,
177            "play" => self.state = CommandSequenceState::Playing,
178            "FCUnpublish" | "deleteStream" | "closeStream" => {
179                self.state = CommandSequenceState::Connected;
180            }
181            _ => {}
182        }
183    }
184
185    /// Get current state
186    pub fn state(&self) -> &'static str {
187        match self.state {
188            CommandSequenceState::Initial => "initial",
189            CommandSequenceState::Connected => "connected",
190            CommandSequenceState::StreamCreated => "stream_created",
191            CommandSequenceState::Publishing => "publishing",
192            CommandSequenceState::Playing => "playing",
193        }
194    }
195}
196
197impl Default for CommandSequence {
198    fn default() -> Self {
199        Self::new()
200    }
201}
202
203/// Normalize timestamp to handle regression
204///
205/// Some encoders have timestamp discontinuities or regressions.
206/// This function adjusts timestamps to be monotonically increasing.
207pub struct TimestampNormalizer {
208    last_timestamp: u32,
209    offset: u32,
210}
211
212impl TimestampNormalizer {
213    pub fn new() -> Self {
214        Self {
215            last_timestamp: 0,
216            offset: 0,
217        }
218    }
219
220    /// Normalize a timestamp, handling regression
221    pub fn normalize(&mut self, timestamp: u32) -> u32 {
222        // Check for significant regression (more than 1 second)
223        if timestamp < self.last_timestamp && self.last_timestamp - timestamp > 1000 {
224            // Timestamp regressed significantly, adjust offset
225            self.offset = self.last_timestamp + 1;
226        }
227
228        let normalized = timestamp.wrapping_add(self.offset);
229        self.last_timestamp = normalized;
230        normalized
231    }
232
233    /// Reset normalizer state
234    pub fn reset(&mut self) {
235        self.last_timestamp = 0;
236        self.offset = 0;
237    }
238}
239
240impl Default for TimestampNormalizer {
241    fn default() -> Self {
242        Self::new()
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::amf::AmfValue;
250
251    #[test]
252    fn test_encoder_detection() {
253        assert_eq!(
254            EncoderType::from_flash_ver("OBS-Studio/29.1.3"),
255            EncoderType::Obs
256        );
257        assert_eq!(
258            EncoderType::from_flash_ver("FMLE/3.0"),
259            EncoderType::FlashMediaEncoder
260        );
261        assert_eq!(
262            EncoderType::from_flash_ver("Lavf58.76.100"),
263            EncoderType::Ffmpeg
264        );
265    }
266
267    #[test]
268    fn test_timestamp_normalizer() {
269        let mut normalizer = TimestampNormalizer::new();
270
271        assert_eq!(normalizer.normalize(0), 0);
272        assert_eq!(normalizer.normalize(1000), 1000);
273        assert_eq!(normalizer.normalize(2000), 2000);
274
275        // Small regression (within 1 second) - allow it (no offset adjustment)
276        assert_eq!(normalizer.normalize(1500), 1500);
277
278        // Large regression (> 1 second) - adjust offset
279        // last_timestamp is now 1500, regression to 100 is 1400ms > 1000ms
280        // So offset becomes 1500 + 1 = 1501
281        // Result = 100 + 1501 = 1601
282        assert_eq!(normalizer.normalize(100), 1601);
283    }
284
285    #[test]
286    fn test_quirks_config_default() {
287        let config = QuirksConfig::default();
288
289        // Default should be lenient
290        assert!(config.allow_early_commands);
291        assert!(config.allow_fc_before_connect);
292        assert!(config.lenient_amf);
293        assert!(config.allow_timestamp_regression);
294        assert!(config.allow_duplicate_metadata);
295        assert!(config.allow_empty_app);
296        assert!(config.allow_oversized_chunks);
297    }
298
299    #[test]
300    fn test_quirks_config_strict() {
301        let config = QuirksConfig::strict();
302
303        // Strict mode should reject everything
304        assert!(!config.allow_early_commands);
305        assert!(!config.allow_fc_before_connect);
306        assert!(!config.lenient_amf);
307        assert!(!config.allow_timestamp_regression);
308        assert!(!config.allow_duplicate_metadata);
309        assert!(!config.allow_empty_app);
310        assert!(!config.allow_oversized_chunks);
311    }
312
313    #[test]
314    fn test_encoder_type_detection() {
315        // OBS detection
316        assert_eq!(
317            EncoderType::from_flash_ver("OBS-Studio/29.1.3"),
318            EncoderType::Obs
319        );
320        assert_eq!(EncoderType::from_flash_ver("obs studio"), EncoderType::Obs);
321
322        // ffmpeg detection
323        assert_eq!(
324            EncoderType::from_flash_ver("Lavf58.76.100"),
325            EncoderType::Ffmpeg
326        );
327        assert_eq!(
328            EncoderType::from_flash_ver("librtmp 2.4"),
329            EncoderType::Ffmpeg
330        );
331
332        // FMLE detection
333        assert_eq!(
334            EncoderType::from_flash_ver("FMLE/3.0"),
335            EncoderType::FlashMediaEncoder
336        );
337        assert_eq!(
338            EncoderType::from_flash_ver("Flash Media Encoder"),
339            EncoderType::FlashMediaEncoder
340        );
341
342        // Wirecast detection
343        assert_eq!(
344            EncoderType::from_flash_ver("Wirecast/13.1"),
345            EncoderType::Wirecast
346        );
347
348        // XSplit detection
349        assert_eq!(
350            EncoderType::from_flash_ver("XSplit/4.0"),
351            EncoderType::Xsplit
352        );
353
354        // Larix detection
355        assert_eq!(
356            EncoderType::from_flash_ver("Larix Broadcaster"),
357            EncoderType::Larix
358        );
359
360        // Unknown/Other
361        assert_eq!(
362            EncoderType::from_flash_ver("SomeOtherEncoder"),
363            EncoderType::Other
364        );
365        assert_eq!(EncoderType::from_flash_ver(""), EncoderType::Other);
366    }
367
368    #[test]
369    fn test_encoder_type_case_insensitive() {
370        assert_eq!(EncoderType::from_flash_ver("OBS"), EncoderType::Obs);
371        assert_eq!(EncoderType::from_flash_ver("obs"), EncoderType::Obs);
372        assert_eq!(EncoderType::from_flash_ver("LAVF"), EncoderType::Ffmpeg);
373        assert_eq!(EncoderType::from_flash_ver("lavf"), EncoderType::Ffmpeg);
374    }
375
376    #[test]
377    fn test_command_sequence_new() {
378        let seq = CommandSequence::new();
379        assert_eq!(seq.state(), "initial");
380    }
381
382    #[test]
383    fn test_command_sequence_default() {
384        let seq = CommandSequence::default();
385        assert_eq!(seq.state(), "initial");
386    }
387
388    #[test]
389    fn test_command_sequence_connect() {
390        let mut seq = CommandSequence::new();
391
392        // Connect valid in initial state
393        let cmd = Command {
394            name: "connect".to_string(),
395            transaction_id: 1.0,
396            command_object: AmfValue::Null,
397            arguments: vec![],
398            stream_id: 0,
399        };
400        assert!(seq.is_valid_command(&cmd));
401
402        // Transition to connected
403        seq.on_command("connect");
404        assert_eq!(seq.state(), "connected");
405
406        // Connect no longer valid
407        assert!(!seq.is_valid_command(&cmd));
408    }
409
410    #[test]
411    fn test_command_sequence_create_stream() {
412        let mut seq = CommandSequence::new();
413
414        let create_stream = Command {
415            name: "createStream".to_string(),
416            transaction_id: 2.0,
417            command_object: AmfValue::Null,
418            arguments: vec![],
419            stream_id: 0,
420        };
421
422        // createStream valid in initial state (OBS quirk)
423        assert!(seq.is_valid_command(&create_stream));
424
425        // Transition to connected
426        seq.on_command("connect");
427        assert!(seq.is_valid_command(&create_stream));
428
429        // Transition to stream created
430        seq.on_command("createStream");
431        assert_eq!(seq.state(), "stream_created");
432    }
433
434    #[test]
435    fn test_command_sequence_publish() {
436        let mut seq = CommandSequence::new();
437
438        let publish = Command {
439            name: "publish".to_string(),
440            transaction_id: 0.0,
441            command_object: AmfValue::Null,
442            arguments: vec![AmfValue::String("stream_key".into())],
443            stream_id: 1,
444        };
445
446        // publish not valid before stream created
447        assert!(!seq.is_valid_command(&publish));
448
449        seq.on_command("connect");
450        assert!(!seq.is_valid_command(&publish));
451
452        seq.on_command("createStream");
453        assert!(seq.is_valid_command(&publish));
454
455        seq.on_command("publish");
456        assert_eq!(seq.state(), "publishing");
457    }
458
459    #[test]
460    fn test_command_sequence_play() {
461        let mut seq = CommandSequence::new();
462
463        let play = Command {
464            name: "play".to_string(),
465            transaction_id: 0.0,
466            command_object: AmfValue::Null,
467            arguments: vec![AmfValue::String("stream_name".into())],
468            stream_id: 1,
469        };
470
471        seq.on_command("connect");
472        seq.on_command("createStream");
473
474        assert!(seq.is_valid_command(&play));
475        seq.on_command("play");
476        assert_eq!(seq.state(), "playing");
477    }
478
479    #[test]
480    fn test_command_sequence_fc_commands_always_valid() {
481        let mut seq = CommandSequence::new();
482
483        let release_stream = Command {
484            name: "releaseStream".to_string(),
485            transaction_id: 2.0,
486            command_object: AmfValue::Null,
487            arguments: vec![],
488            stream_id: 0,
489        };
490
491        let fc_publish = Command {
492            name: "FCPublish".to_string(),
493            transaction_id: 3.0,
494            command_object: AmfValue::Null,
495            arguments: vec![],
496            stream_id: 0,
497        };
498
499        // Should be valid in initial state (OBS quirk)
500        assert!(seq.is_valid_command(&release_stream));
501        assert!(seq.is_valid_command(&fc_publish));
502
503        seq.on_command("connect");
504        assert!(seq.is_valid_command(&release_stream));
505        assert!(seq.is_valid_command(&fc_publish));
506    }
507
508    #[test]
509    fn test_command_sequence_close_commands() {
510        let mut seq = CommandSequence::new();
511
512        let fc_unpublish = Command {
513            name: "FCUnpublish".to_string(),
514            transaction_id: 0.0,
515            command_object: AmfValue::Null,
516            arguments: vec![],
517            stream_id: 1,
518        };
519
520        let delete_stream = Command {
521            name: "deleteStream".to_string(),
522            transaction_id: 4.0,
523            command_object: AmfValue::Null,
524            arguments: vec![],
525            stream_id: 1,
526        };
527
528        // Not valid until publishing/playing
529        assert!(!seq.is_valid_command(&fc_unpublish));
530        assert!(!seq.is_valid_command(&delete_stream));
531
532        seq.on_command("connect");
533        seq.on_command("createStream");
534        seq.on_command("publish");
535
536        // Now valid
537        assert!(seq.is_valid_command(&fc_unpublish));
538        assert!(seq.is_valid_command(&delete_stream));
539
540        // After close, return to connected
541        seq.on_command("deleteStream");
542        assert_eq!(seq.state(), "connected");
543    }
544
545    #[test]
546    fn test_command_sequence_unknown_command_always_valid() {
547        let seq = CommandSequence::new();
548
549        let unknown = Command {
550            name: "unknownCommand".to_string(),
551            transaction_id: 0.0,
552            command_object: AmfValue::Null,
553            arguments: vec![],
554            stream_id: 0,
555        };
556
557        // Unknown commands should be allowed
558        assert!(seq.is_valid_command(&unknown));
559    }
560
561    #[test]
562    fn test_timestamp_normalizer_reset() {
563        let mut normalizer = TimestampNormalizer::new();
564
565        normalizer.normalize(1000);
566        normalizer.normalize(2000);
567
568        normalizer.reset();
569
570        // After reset, should behave like new
571        assert_eq!(normalizer.normalize(0), 0);
572        assert_eq!(normalizer.normalize(100), 100);
573    }
574
575    #[test]
576    fn test_timestamp_normalizer_multiple_regressions() {
577        let mut normalizer = TimestampNormalizer::new();
578
579        // First regression
580        normalizer.normalize(0);
581        normalizer.normalize(5000);
582        let after_first = normalizer.normalize(100); // regression > 1s
583        assert!(after_first > 5000);
584
585        // Continue normal
586        let next = normalizer.normalize(200);
587        assert!(next > after_first);
588    }
589
590    #[test]
591    fn test_timestamp_normalizer_default() {
592        let normalizer = TimestampNormalizer::default();
593        assert_eq!(normalizer.last_timestamp, 0);
594        assert_eq!(normalizer.offset, 0);
595    }
596
597    #[test]
598    fn test_encoder_type_equality() {
599        assert_eq!(EncoderType::Obs, EncoderType::Obs);
600        assert_ne!(EncoderType::Obs, EncoderType::Ffmpeg);
601        assert_ne!(EncoderType::Unknown, EncoderType::Other);
602    }
603}