flowly_flv/
lib.rs

1use std::pin::pin;
2
3use bytes::Bytes;
4use error::Error;
5use flowly::{Fourcc, Frame, FrameFlags, Service};
6use futures::{Stream, TryStreamExt};
7use header::FlvHeader;
8use parser::{FlvParser, Parser};
9use tag::{FlvTag, FlvTagHeader, FlvTagType};
10use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
11use tokio_util::io::StreamReader;
12
13pub mod error;
14pub mod header;
15pub mod parser;
16pub mod reader;
17pub mod tag;
18
19pub const DEMUX_ALL_TYPES: u64 = -1i64 as u64;
20
21#[cfg(feature = "serde")]
22pub mod serde;
23
24#[derive(Debug, Clone)]
25pub struct FlvFrame {
26    dts: u64,
27    track_id: u32,
28    flags: flowly::FrameFlags,
29    pts_offset: i32,
30    codec: Fourcc,
31    params_count: u32,
32    payload: Vec<Bytes>,
33}
34
35impl Frame for FlvFrame {
36    fn pts(&self) -> i64 {
37        self.dts as i64 + self.pts_offset as i64
38    }
39
40    fn dts(&self) -> u64 {
41        self.dts
42    }
43
44    fn timestamp(&self) -> Option<u64> {
45        None
46    }
47
48    fn codec(&self) -> flowly::Fourcc {
49        self.codec
50    }
51
52    fn flags(&self) -> flowly::FrameFlags {
53        self.flags
54    }
55
56    fn track(&self) -> u32 {
57        self.track_id
58    }
59
60    fn params(&self) -> impl Iterator<Item = &[u8]> {
61        self.payload
62            .iter()
63            .map(|x| x.as_ref())
64            .take(self.params_count as usize)
65    }
66
67    fn units(&self) -> impl Iterator<Item = &[u8]> {
68        self.payload
69            .iter()
70            .map(|x| x.as_ref())
71            .skip(self.params_count as usize)
72    }
73}
74
75#[inline]
76pub fn demux_flv_stream<R: AsyncRead>(
77    reader: R,
78    tag_types: u64,
79) -> impl Stream<Item = Result<FlvTag, error::Error>> {
80    demux_flv_stream_inner(reader, tag_types)
81}
82
83#[derive(Debug)]
84pub struct FlvDemuxer {
85    flags_filter: FrameFlags,
86    tracks_filter: u64,
87}
88
89impl FlvDemuxer {
90    pub fn new(flags_filter: FrameFlags, tracks_filter: u64) -> Self {
91        Self {
92            flags_filter,
93            tracks_filter,
94        }
95    }
96}
97
98impl Default for FlvDemuxer {
99    fn default() -> Self {
100        Self {
101            flags_filter: FrameFlags::VIDEO_STREAM,
102            tracks_filter: !0,
103        }
104    }
105}
106
107impl<F: AsRef<[u8]> + Send + 'static, E: std::error::Error + Send + Sync + 'static>
108    Service<Result<F, E>> for FlvDemuxer
109{
110    type Out = Result<FlvFrame, Error<E>>;
111
112    fn handle(
113        self,
114        input: impl Stream<Item = Result<F, E>> + Send,
115    ) -> impl Stream<Item = Self::Out> + Send {
116        let mut tag_type_filter = 0u64;
117
118        if self.flags_filter.contains(FrameFlags::AUDIO_STREAM) {
119            tag_type_filter |= 0b1 << u8::from(FlvTagType::Audio);
120        }
121
122        if self.flags_filter.contains(FrameFlags::VIDEO_STREAM) {
123            tag_type_filter |= 0b1 << u8::from(FlvTagType::Video);
124        }
125
126        if self.flags_filter.contains(FrameFlags::METADATA_STREAM) {
127            tag_type_filter |= 0b1 << u8::from(FlvTagType::Metadata);
128        }
129
130        let reader = StreamReader::new(input.map_ok(std::io::Cursor::new).map_err(Error::Other));
131
132        demux_flv_stream_inner(reader, tag_type_filter).try_filter_map(move |tag| async move {
133            Ok(match tag.data {
134                tag::FlvTagData::Video(vtag) => {
135                    if vtag.track_id < 64 && (self.tracks_filter << vtag.track_id) == 0 {
136                        None
137                    } else {
138                        Some(FlvFrame {
139                            dts: tag.header.timestamp as u64 * 1000,
140                            track_id: vtag.track_id as _,
141                            flags: FrameFlags::empty(),
142                            pts_offset: vtag.body.pts_offset * 1000,
143                            codec: vtag.header.fourcc,
144                            params_count: vtag.body.param_count,
145                            payload: vtag.body.nalus,
146                        })
147                    }
148                }
149                tag::FlvTagData::Audio(_) => None,
150                tag::FlvTagData::Meta(_) => None,
151                tag::FlvTagData::Unknown => None,
152            })
153        })
154    }
155}
156
157fn demux_flv_stream_inner<E, R: AsyncRead>(
158    reader: R,
159    tag_types: u64,
160) -> impl Stream<Item = Result<FlvTag, error::Error<E>>> {
161    async_stream::stream! {
162        let mut buff = vec![0u8; 1024];
163        let mut reader = pin!(BufReader::new(reader));
164        let mut parser = FlvParser::default();
165
166        // reading flv header
167        reader.read_exact(&mut buff[0..9]).await?;
168        let header: FlvHeader = parser.parse(&mut &buff[0..9])?;
169
170        // skipping offset if present
171        if header.remaining > 0 {
172            if buff.len() < header.remaining as usize {
173                buff.resize(header.remaining as usize, 0);
174            }
175
176            reader.read_exact(&mut buff[0..header.remaining as usize]).await?;
177        }
178
179        while let Ok(_) = reader.read_u32().await {
180            match reader.read_exact(&mut buff[0..11]).await {
181                Ok(_) => (),
182                Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break,
183                Err(err) => yield Err(err.into())
184            };
185
186            let header: FlvTagHeader = parser.parse(&mut &buff[0..11])?;
187            if buff.len() < header.data_size as usize {
188                buff.resize(header.data_size as usize, 0);
189            }
190
191            match reader.read_exact(&mut buff[0..header.data_size as usize]).await {
192                Ok(_) => (),
193                Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break,
194                Err(err) => yield Err(err.into())
195            }
196
197            if tag_types & (1u64 << u8::from(header.tag_type)) > 0 {
198                let mut tag_data = Bytes::from(buff[0..header.data_size as usize].to_vec());
199
200                yield Ok(FlvTag {
201                    data: parser.parse_flv_data(&mut tag_data, header.tag_type)?,
202                    header,
203                });
204            }
205        }
206
207    }
208}