oddity_rtsp_protocol/
parse.rs

1use std::collections::BTreeMap;
2
3use super::{
4    buffer::{Buf, ReadLine},
5    error::{Error, Result},
6    message::{Bytes, Headers, Message, StatusCode, Uri, Version},
7    request::{Request, RequestMetadata},
8    response::{Response, ResponseMetadata},
9};
10
11pub type RequestParser = Parser<Request>;
12pub type ResponseParser = Parser<Response>;
13
14#[derive(Clone, PartialEq, Debug)]
15pub enum Status {
16    Hungry,
17    Done,
18}
19
20pub struct Parser<M: Message> {
21    state: State,
22    metadata: Option<M::Metadata>,
23    headers: Headers,
24    body: Option<Bytes>,
25}
26
27impl<M: Message> Parser<M> {
28    pub fn new() -> Self {
29        Self {
30            state: State::Head(Head::FirstLine),
31            metadata: None,
32            headers: BTreeMap::new(),
33            body: None,
34        }
35    }
36
37    pub fn into_message(self) -> Result<M> {
38        match self.state {
39            State::Body(Body::Complete) => Ok(M::new(
40                self.metadata.ok_or(Error::MetadataNotParsed)?,
41                self.headers,
42                self.body,
43            )),
44            _ => Err(Error::NotDone),
45        }
46    }
47
48    pub fn parse(&mut self, buffer: &mut impl Buf) -> Result<Status> {
49        self.parse_loop(buffer)?;
50
51        match &self.state {
52            State::Body(Body::Complete) => Ok(Status::Done),
53            State::Body(Body::Incomplete) => Ok(Status::Hungry),
54            State::Head(_) => Ok(Status::Hungry),
55        }
56    }
57
58    fn parse_loop(&mut self, buffer: &mut impl Buf) -> Result<()> {
59        let mut again = true;
60        while again {
61            (self.state, again) = self.parse_inner(buffer)?;
62        }
63
64        Ok(())
65    }
66
67    fn parse_inner(&mut self, buffer: &mut impl Buf) -> Result<(State, Again)> {
68        match self.state {
69            State::Head(head) => {
70                let next_head = self.parse_inner_head(buffer, head)?;
71                match next_head {
72                    Head::Done => {
73                        if self.have_content_length() {
74                            Ok((State::Body(Body::Incomplete), true))
75                        } else {
76                            Ok((State::Body(Body::Complete), false))
77                        }
78                    }
79                    _ => Ok((State::Head(next_head), false)),
80                }
81            }
82            State::Body(Body::Incomplete) => {
83                let need = self
84                    .find_content_length()?
85                    .ok_or_else(|| Error::ContentLengthMissing)?;
86                let got = buffer.remaining();
87
88                if got >= need {
89                    self.body = Some(buffer.copy_to_bytes(need));
90                    Ok((State::Body(Body::Complete), false))
91                } else {
92                    Ok((State::Body(Body::Incomplete), false))
93                }
94            }
95            State::Body(Body::Complete) => Err(Error::BodyAlreadyDone),
96        }
97    }
98
99    fn parse_inner_head(&mut self, buffer: &mut impl Buf, mut head: Head) -> Result<Head> {
100        while head != Head::Done {
101            let line = match buffer.read_line() {
102                Some(line) => line.map_err(|_| Error::Encoding)?,
103                None => break,
104            };
105
106            head = Self::parse_inner_head_line(&mut self.metadata, &mut self.headers, line, head)?;
107        }
108
109        Ok(head)
110    }
111
112    fn parse_inner_head_line(
113        metadata: &mut Option<M::Metadata>,
114        headers: &mut BTreeMap<String, String>,
115        line: String,
116        head: Head,
117    ) -> Result<Head> {
118        let line = line.trim();
119        match head {
120            Head::FirstLine => {
121                *metadata = Some(Self::parse_metadata(line)?);
122                Ok(Head::Header)
123            }
124            Head::Header => {
125                Ok(if !line.is_empty() {
126                    let (var, val) = parse_header(line)?;
127                    headers.insert(var, val);
128                    Head::Header
129                } else {
130                    // The line is empty, so we got CRLF, which signals end of
131                    // headers for this request.
132                    Head::Done
133                })
134            }
135            Head::Done => Err(Error::HeadAlreadyDone),
136        }
137    }
138
139    fn parse_metadata(line: &str) -> Result<M::Metadata> {
140        M::Metadata::parse(line)
141    }
142
143    fn have_content_length(&self) -> bool {
144        self.headers.contains_key("Content-Length")
145    }
146
147    fn find_content_length(&self) -> Result<Option<usize>> {
148        if let Some(content_length) = self.headers.get("Content-Length") {
149            Ok(Some(content_length.parse::<usize>().map_err(|_| {
150                Error::ContentLengthNotInteger {
151                    value: content_length.clone(),
152                }
153            })?))
154        } else {
155            Ok(None)
156        }
157    }
158
159    fn parse_and_into(mut self, mut buffer: impl Buf) -> Result<M> {
160        self.parse(&mut buffer)?;
161        self.into_message()
162    }
163}
164
165impl Parser<Request> {
166    #[inline]
167    pub fn parse_and_into_request(self, buffer: impl Buf) -> Result<Request> {
168        self.parse_and_into(buffer)
169    }
170
171    #[inline]
172    pub fn into_request(self) -> Result<Request> {
173        self.into_message()
174    }
175}
176
177impl Parser<Response> {
178    #[inline]
179    pub fn parse_and_into_response(self, buffer: impl Buf) -> Result<Response> {
180        self.parse_and_into(buffer)
181    }
182
183    #[inline]
184    pub fn into_response(self) -> Result<Response> {
185        self.into_message()
186    }
187}
188
189impl<M: Message> Default for Parser<M> {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195type Again = bool;
196
197#[derive(Clone, Copy, PartialEq, Debug)]
198enum State {
199    Head(Head),
200    Body(Body),
201}
202
203#[derive(Clone, Copy, PartialEq, Debug)]
204enum Head {
205    FirstLine,
206    Header,
207    Done,
208}
209
210#[derive(Clone, Copy, PartialEq, Debug)]
211enum Body {
212    Incomplete,
213    Complete,
214}
215
216pub trait Parse: Sized {
217    fn parse(line: &str) -> Result<Self>;
218}
219
220impl Parse for RequestMetadata {
221    fn parse(line: &str) -> Result<RequestMetadata> {
222        let mut parts = line.split(' ');
223
224        let method = parts
225            .next()
226            .ok_or_else(|| Error::RequestLineMalformed {
227                line: line.to_string(),
228            })?
229            .parse()?;
230
231        let uri = parts
232            .next()
233            .ok_or_else(|| Error::UriMissing {
234                line: line.to_string(),
235            })?
236            .to_string();
237
238        let uri = uri.parse::<Uri>().map_err(|_| Error::UriMalformed {
239            line: line.to_string(),
240            uri: uri.to_string(),
241        })?;
242
243        let uri = if uri.authority().is_some() || uri.path() == "*" {
244            Ok(uri)
245        } else {
246            // Relative URI's are not allowed in RTSP.
247            Err(Error::UriNotAbsolute { uri })
248        }?;
249
250        let version = parts.next().ok_or_else(|| Error::VersionMissing {
251            line: line.to_string(),
252        })?;
253
254        let version = parse_version(version, line)?;
255
256        Ok(RequestMetadata::new(method, uri, version))
257    }
258}
259
260impl Parse for ResponseMetadata {
261    fn parse(line: &str) -> Result<ResponseMetadata> {
262        let (version, rest) = line
263            .split_once(' ')
264            .ok_or_else(|| Error::StatusCodeMissing {
265                line: line.to_string(),
266            })?;
267
268        let version = parse_version(version.trim(), line)?;
269
270        let (status_code, rest) =
271            rest.split_once(' ')
272                .ok_or_else(|| Error::ReasonPhraseMissing {
273                    line: line.to_string(),
274                })?;
275
276        let status_code =
277            status_code
278                .trim()
279                .parse::<StatusCode>()
280                .map_err(|_| Error::StatusCodeNotInteger {
281                    line: line.to_string(),
282                    status_code: status_code.to_string(),
283                })?;
284
285        let reason = rest.trim().to_string();
286
287        Ok(ResponseMetadata::new(version, status_code, reason))
288    }
289}
290
291fn parse_version(part: &str, line: &str) -> Result<Version> {
292    if let Some(stripped) = part.strip_prefix("RTSP/") {
293        Ok(match stripped {
294            "1.0" => Version::V1,
295            "2.0" => Version::V2,
296            _ => Version::Unknown,
297        })
298    } else {
299        Err(Error::VersionMalformed {
300            line: line.to_string(),
301            version: part.to_string(),
302        })
303    }
304}
305
306fn parse_header(line: &str) -> Result<(String, String)> {
307    let (var, val) = line.split_once(':').ok_or_else(|| Error::HeaderMalformed {
308        line: line.to_string(),
309    })?;
310
311    Ok((var.trim().to_string(), val.trim().to_string()))
312}
313
314#[cfg(test)]
315mod tests {
316
317    use bytes::{Bytes, BytesMut};
318
319    use crate::{Method, Request, RequestParser, ResponseParser, StatusCategory, Version};
320
321    use super::Status;
322
323    #[test]
324    fn parse_options_request() {
325        let request = br###"OPTIONS rtsp://example.com/media.mp4 RTSP/1.0
326CSeq: 1
327Require: implicit-play
328Proxy-Require: gzipped-messages
329
330"###;
331
332        let request = RequestParser::new()
333            .parse_and_into_request(request.as_slice())
334            .unwrap();
335        assert_eq!(request.method, Method::Options);
336        assert_eq!(request.uri, "rtsp://example.com/media.mp4");
337        assert_eq!(request.version, Version::V1);
338        assert_eq!(request.headers.get("CSeq"), Some(&"1".to_string()));
339        assert_eq!(
340            request.headers.get("Require"),
341            Some(&"implicit-play".to_string())
342        );
343        assert_eq!(
344            request.headers.get("Proxy-Require"),
345            Some(&"gzipped-messages".to_string())
346        );
347    }
348
349    #[test]
350    fn parse_options_request_any() {
351        let request = br###"OPTIONS * RTSP/1.0
352CSeq: 1
353
354"###;
355
356        let request = RequestParser::new()
357            .parse_and_into_request(request.as_slice())
358            .unwrap();
359        assert_eq!(request.method, Method::Options);
360        assert_eq!(request.uri, "*");
361        assert_eq!(request.version, Version::V1);
362        assert_eq!(request.headers.get("CSeq"), Some(&"1".to_string()));
363    }
364
365    #[test]
366    fn parse_options_response() {
367        let response = br###"RTSP/1.0 200 OK
368CSeq: 1
369Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE
370
371"###;
372
373        let response = ResponseParser::new()
374            .parse_and_into_response(response.as_slice())
375            .unwrap();
376        assert_eq!(response.version, Version::V1);
377        assert_eq!(response.status, 200);
378        assert_eq!(response.status(), StatusCategory::Success);
379        assert_eq!(response.reason, "OK");
380        assert_eq!(response.headers.get("CSeq"), Some(&"1".to_string()));
381        assert_eq!(
382            response.headers.get("Public"),
383            Some(&"DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE".to_string())
384        );
385    }
386
387    #[test]
388    fn parse_options_response_error() {
389        let response = br###"RTSP/1.0 404 Stream Not Found
390CSeq: 1
391
392"###;
393
394        let response = ResponseParser::new()
395            .parse_and_into_response(response.as_slice())
396            .unwrap();
397        assert_eq!(response.version, Version::V1);
398        assert_eq!(response.status, 404);
399        assert_eq!(response.status(), StatusCategory::ClientError);
400        assert_eq!(response.reason, "Stream Not Found");
401        assert_eq!(response.headers.get("CSeq"), Some(&"1".to_string()));
402    }
403
404    #[test]
405    fn parse_describe_request() {
406        let request = br###"DESCRIBE rtsp://example.com/media.mp4 RTSP/1.0
407CSeq: 2
408
409"###;
410
411        let request = RequestParser::new()
412            .parse_and_into_request(request.as_slice())
413            .unwrap();
414        assert_eq!(request.method, Method::Describe);
415        assert_eq!(request.uri, "rtsp://example.com/media.mp4");
416        assert_eq!(request.version, Version::V1);
417        assert_eq!(request.headers.get("CSeq"), Some(&"2".to_string()));
418    }
419
420    #[test]
421    fn parse_describe_request_v2() {
422        let request = br###"DESCRIBE rtsp://example.com/media.mp4 RTSP/2.0
423CSeq: 2
424
425"###;
426
427        let request = RequestParser::new()
428            .parse_and_into_request(request.as_slice())
429            .unwrap();
430        assert_eq!(request.method, Method::Describe);
431        assert_eq!(request.uri, "rtsp://example.com/media.mp4");
432        assert_eq!(request.version, Version::V2);
433        assert_eq!(request.headers.get("CSeq"), Some(&"2".to_string()));
434    }
435
436    #[test]
437    fn parse_describe_request_v3() {
438        let request = br###"DESCRIBE rtsp://example.com/media.mp4 RTSP/3.0
439CSeq: 2
440
441"###;
442
443        let request = RequestParser::new()
444            .parse_and_into_request(request.as_slice())
445            .unwrap();
446        assert_eq!(request.method, Method::Describe);
447        assert_eq!(request.uri, "rtsp://example.com/media.mp4");
448        assert_eq!(request.version, Version::Unknown);
449        assert_eq!(request.headers.get("CSeq"), Some(&"2".to_string()));
450    }
451
452    #[test]
453    fn parse_describe_response() {
454        let response = br###"RTSP/1.0 200 OK
455CSeq: 2
456Content-Base: rtsp://example.com/media.mp4
457Content-Type: application/sdp
458Content-Length: 443
459
460m=video 0 RTP/AVP 96
461a=control:streamid=0
462a=range:npt=0-7.741000
463a=length:npt=7.741000
464a=rtpmap:96 MP4V-ES/5544
465a=mimetype:string;"video/MP4V-ES"
466a=AvgBitRate:integer;304018
467a=StreamName:string;"hinted video track"
468m=audio 0 RTP/AVP 97
469a=control:streamid=1
470a=range:npt=0-7.712000
471a=length:npt=7.712000
472a=rtpmap:97 mpeg4-generic/32000/2
473a=mimetype:string;"audio/mpeg4-generic"
474a=AvgBitRate:integer;65790
475a=StreamName:string;"hinted audio track""###;
476
477        let response = ResponseParser::new()
478            .parse_and_into_response(response.as_slice())
479            .unwrap();
480        assert_eq!(response.version, Version::V1);
481        assert_eq!(response.status, 200);
482        assert_eq!(response.reason, "OK");
483        assert_eq!(response.headers.get("CSeq"), Some(&"2".to_string()));
484        assert_eq!(
485            response.headers.get("Content-Base"),
486            Some(&"rtsp://example.com/media.mp4".to_string())
487        );
488        assert_eq!(
489            response.headers.get("Content-Type"),
490            Some(&"application/sdp".to_string())
491        );
492        assert_eq!(
493            response.headers.get("Content-Length"),
494            Some(&"443".to_string())
495        );
496    }
497
498    const EXAMPLE_PIPELINED_REQUESTS: &[u8] = br###"RECORD rtsp://example.com/media.mp4 RTSP/1.0
499CSeq: 6
500Session: 12345678
501
502ANNOUNCE rtsp://example.com/media.mp4 RTSP/1.0
503CSeq: 7
504Date: 23 Jan 1997 15:35:06 GMT
505Session: 12345678
506Content-Type: application/sdp
507Content-Length: 305
508
509v=0
510o=mhandley 2890844526 2890845468 IN IP4 126.16.64.4
511s=SDP Seminar
512i=A Seminar on the session description protocol
513u=http://www.cs.ucl.ac.uk/staff/M.Handley/sdp.03.ps
514e=mjh@isi.edu (Mark Handley)
515c=IN IP4 224.2.17.12/127
516t=2873397496 2873404696
517a=recvonly
518m=audio 3456 RTP/AVP 0
519m=video 2232 RTP/AVP 31TEARDOWN rtsp://example.com/media.mp4 RTSP/1.0
520CSeq: 8
521Session: 12345678
522
523"###;
524
525    #[test]
526    fn parse_pipelined_requests() {
527        let mut buffer = Bytes::from_static(EXAMPLE_PIPELINED_REQUESTS);
528        let mut parser = RequestParser::new();
529
530        let mut requests = Vec::new();
531        for _ in 0..3 {
532            if parser.parse(&mut buffer).unwrap() == Status::Done {
533                requests.push(parser.into_request().unwrap());
534                parser = RequestParser::new();
535            }
536        }
537
538        test_example_piplined_requests(&requests);
539    }
540
541    #[test]
542    fn parse_pipelined_requests_pieces1() {
543        let mut buffer = BytesMut::new();
544        let mut parser = RequestParser::new();
545
546        let mut requests = Vec::new();
547        for i in 0..EXAMPLE_PIPELINED_REQUESTS.len() {
548            buffer.extend_from_slice(&EXAMPLE_PIPELINED_REQUESTS[i..i + 1]);
549            if parser.parse(&mut buffer).unwrap() == Status::Done {
550                requests.push(parser.into_request().unwrap());
551                parser = RequestParser::new();
552            }
553        }
554
555        test_example_piplined_requests(&requests);
556    }
557
558    #[test]
559    fn parse_pipelined_requests_pieces_varying() {
560        let mut buffer = BytesMut::new();
561        let mut parser = RequestParser::new();
562
563        let mut requests = Vec::new();
564        let mut start = 0;
565        let mut size = 1;
566        loop {
567            let piece_range = start..(start + size).min(EXAMPLE_PIPELINED_REQUESTS.len());
568            buffer.extend_from_slice(&EXAMPLE_PIPELINED_REQUESTS[piece_range]);
569            if let Status::Done = parser.parse(&mut buffer).unwrap() {
570                requests.push(parser.into_request().unwrap());
571                parser = RequestParser::new();
572            }
573            start += size;
574            size = (size * 2) % 9;
575            if start >= EXAMPLE_PIPELINED_REQUESTS.len() {
576                break;
577            }
578        }
579
580        test_example_piplined_requests(&requests);
581    }
582
583    fn test_example_piplined_requests(requests: &[Request]) {
584        assert_eq!(requests.len(), 3);
585        assert_eq!(requests[0].method, Method::Record);
586        assert_eq!(requests[0].uri, "rtsp://example.com/media.mp4");
587        assert_eq!(requests[0].version, Version::V1);
588        assert_eq!(requests[0].headers.get("CSeq"), Some(&"6".to_string()));
589        assert_eq!(
590            requests[0].headers.get("Session"),
591            Some(&"12345678".to_string())
592        );
593        assert_eq!(requests[0].body, None);
594        assert_eq!(requests[1].method, Method::Announce);
595        assert_eq!(requests[1].uri, "rtsp://example.com/media.mp4");
596        assert_eq!(requests[1].version, Version::V1);
597        assert_eq!(requests[1].headers.get("CSeq"), Some(&"7".to_string()));
598        assert_eq!(
599            requests[1].headers.get("Session"),
600            Some(&"12345678".to_string())
601        );
602        assert_eq!(
603            requests[1].headers.get("Date"),
604            Some(&"23 Jan 1997 15:35:06 GMT".to_string())
605        );
606        assert_eq!(
607            requests[1].headers.get("Content-Type"),
608            Some(&"application/sdp".to_string())
609        );
610        assert_eq!(
611            requests[1].headers.get("Content-Length"),
612            Some(&"305".to_string())
613        );
614        assert_eq!(requests[1].body.as_ref().unwrap().len(), 305);
615        assert_eq!(requests[2].method, Method::Teardown);
616        assert_eq!(requests[2].uri, "rtsp://example.com/media.mp4");
617        assert_eq!(requests[2].version, Version::V1);
618        assert_eq!(requests[2].headers.get("CSeq"), Some(&"8".to_string()));
619        assert_eq!(
620            requests[2].headers.get("Session"),
621            Some(&"12345678".to_string())
622        );
623        assert_eq!(requests[2].body, None);
624    }
625
626    const EXAMPLE_REQUEST_PLAY_CRLN: &[u8] = b"PLAY rtsp://example.com/stream/0 RTSP/1.0\r\n\
627CSeq: 1\r\n\
628Session: 1234abcd\r\n\
629Content-Length: 16\r\n\
630\r\n\
6310123456789abcdef";
632
633    #[test]
634    fn parse_play_request() {
635        let request = RequestParser::new()
636            .parse_and_into_request(EXAMPLE_REQUEST_PLAY_CRLN)
637            .unwrap();
638        test_example_request_play(&request);
639    }
640
641    #[test]
642    fn parse_play_request_partial_piece1_ln() {
643        parse_play_request_partial_piece1(&request_play_ln());
644    }
645
646    #[test]
647    fn parse_play_request_partial_piece2_ln() {
648        parse_play_request_partial_piece(&request_play_ln(), 2);
649    }
650
651    #[test]
652    fn parse_play_request_partial_piece3_ln() {
653        parse_play_request_partial_piece(&request_play_ln(), 3);
654    }
655
656    #[test]
657    fn parse_play_request_partial_piece_varying_ln() {
658        parse_play_request_partial_piece_varying(&request_play_ln());
659    }
660
661    #[test]
662    fn parse_play_request_partial_piece1_cr() {
663        parse_play_request_partial_piece1(&request_play_cr());
664    }
665
666    #[test]
667    fn parse_play_request_partial_piece2_cr() {
668        parse_play_request_partial_piece(&request_play_cr(), 2);
669    }
670
671    #[test]
672    fn parse_play_request_partial_piece3_cr() {
673        parse_play_request_partial_piece(&request_play_cr(), 3);
674    }
675
676    #[test]
677    fn parse_play_request_partial_piece_varying_cr() {
678        parse_play_request_partial_piece_varying(&request_play_cr());
679    }
680
681    #[test]
682    fn parse_play_request_partial_piece1_crln() {
683        parse_play_request_partial_piece1(&request_play_crln());
684    }
685
686    #[test]
687    fn parse_play_request_partial_piece2_crln() {
688        parse_play_request_partial_piece(&request_play_crln(), 2);
689    }
690
691    #[test]
692    fn parse_play_request_partial_piece3_crln() {
693        parse_play_request_partial_piece(&request_play_crln(), 3);
694    }
695
696    #[test]
697    fn parse_play_request_partial_piece_varying_crln() {
698        parse_play_request_partial_piece_varying(&request_play_crln());
699    }
700
701    fn request_play_ln() -> Bytes {
702        EXAMPLE_REQUEST_PLAY_CRLN
703            .iter()
704            .copied()
705            .filter(|b| *b != b'\x0d')
706            .collect::<Bytes>()
707    }
708
709    fn request_play_cr() -> Bytes {
710        EXAMPLE_REQUEST_PLAY_CRLN
711            .iter()
712            .copied()
713            .filter(|b| *b != b'\x0a')
714            .collect::<Bytes>()
715    }
716
717    fn request_play_crln() -> Bytes {
718        Bytes::from_static(EXAMPLE_REQUEST_PLAY_CRLN)
719    }
720
721    fn parse_play_request_partial_piece1(request_bytes: &[u8]) {
722        let mut buffer = BytesMut::new();
723        let mut parser = RequestParser::new();
724
725        let upto_last = request_bytes.len() - 1;
726        for i in 0..upto_last {
727            let i_range = i..i + 1;
728            buffer.extend_from_slice(&request_bytes[i_range]);
729            assert_eq!(parser.parse(&mut buffer).unwrap(), Status::Hungry);
730        }
731
732        let last_range = request_bytes.len() - 1..;
733        buffer.extend_from_slice(&request_bytes[last_range]);
734        assert_eq!(parser.parse(&mut buffer).unwrap(), Status::Done);
735
736        let request = parser.into_request().unwrap();
737        test_example_request_play(&request);
738    }
739
740    fn parse_play_request_partial_piece(request_bytes: &[u8], piece_size: usize) {
741        let mut buffer = BytesMut::new();
742        let mut parser = RequestParser::new();
743
744        let pieces_upto_last = (request_bytes.len() / piece_size) - 1;
745        for i in 0..pieces_upto_last {
746            let piece_range = (i * piece_size)..(i * piece_size) + piece_size;
747            buffer.extend_from_slice(&request_bytes[piece_range]);
748            assert_eq!(parser.parse(&mut buffer).unwrap(), Status::Hungry);
749        }
750
751        let last_piece = pieces_upto_last;
752        let leftover_piece_range = last_piece * piece_size..;
753        buffer.extend_from_slice(&request_bytes[leftover_piece_range]);
754        assert_eq!(parser.parse(&mut buffer).unwrap(), Status::Done);
755
756        let request = parser.into_request().unwrap();
757        test_example_request_play(&request);
758    }
759
760    fn parse_play_request_partial_piece_varying(request_bytes: &[u8]) {
761        let mut buffer = BytesMut::new();
762        let mut parser = RequestParser::new();
763
764        let mut start = 0;
765        let mut size = 1;
766        loop {
767            let piece_range = start..(start + size).min(request_bytes.len());
768            buffer.extend_from_slice(&request_bytes[piece_range]);
769            if let Status::Done = parser.parse(&mut buffer).unwrap() {
770                break;
771            }
772            start += size;
773            size = (size * 2) % 9;
774        }
775
776        let request = parser.into_request().unwrap();
777        test_example_request_play(&request);
778    }
779
780    fn test_example_request_play(request: &Request) {
781        assert_eq!(request.method, Method::Play);
782        assert_eq!(request.uri, "rtsp://example.com/stream/0");
783        assert_eq!(request.version, Version::V1);
784        assert_eq!(request.headers.get("CSeq"), Some(&"1".to_string()));
785        assert_eq!(
786            request.headers.get("Session"),
787            Some(&"1234abcd".to_string())
788        );
789        assert_eq!(
790            request.headers.get("Content-Length"),
791            Some(&"16".to_string())
792        );
793        assert_eq!(request.body, Some(b"0123456789abcdef".as_slice().into()));
794    }
795}