1use std::pin::pin;
2
3use bytes::Bytes;
4use error::Error;
5use flowly::{Fourcc, Frame, FrameFlags, Service};
6use futures::{Stream, TryStreamExt};
7use header::FlvHeader;
8use parser::{FlvParser, Parser};
9use tag::{FlvTag, FlvTagHeader, FlvTagType};
10use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
11use tokio_util::io::StreamReader;
12
13pub mod error;
14pub mod header;
15pub mod parser;
16pub mod reader;
17pub mod tag;
18
19pub const DEMUX_ALL_TYPES: u64 = -1i64 as u64;
20
21#[cfg(feature = "serde")]
22pub mod serde;
23
24#[derive(Debug, Clone)]
25pub struct FlvFrame {
26 dts: u64,
27 track_id: u32,
28 flags: flowly::FrameFlags,
29 pts_offset: i32,
30 codec: Fourcc,
31 params_count: u32,
32 payload: Vec<Bytes>,
33}
34
35impl Frame for FlvFrame {
36 fn pts(&self) -> i64 {
37 self.dts as i64 + self.pts_offset as i64
38 }
39
40 fn dts(&self) -> u64 {
41 self.dts
42 }
43
44 fn timestamp(&self) -> Option<u64> {
45 None
46 }
47
48 fn codec(&self) -> flowly::Fourcc {
49 self.codec
50 }
51
52 fn flags(&self) -> flowly::FrameFlags {
53 self.flags
54 }
55
56 fn track(&self) -> u32 {
57 self.track_id
58 }
59
60 fn params(&self) -> impl Iterator<Item = &[u8]> {
61 self.payload
62 .iter()
63 .map(|x| x.as_ref())
64 .take(self.params_count as usize)
65 }
66
67 fn units(&self) -> impl Iterator<Item = &[u8]> {
68 self.payload
69 .iter()
70 .map(|x| x.as_ref())
71 .skip(self.params_count as usize)
72 }
73}
74
75#[inline]
76pub fn demux_flv_stream<R: AsyncRead>(
77 reader: R,
78 tag_types: u64,
79) -> impl Stream<Item = Result<FlvTag, error::Error>> {
80 demux_flv_stream_inner(reader, tag_types)
81}
82
83#[derive(Debug)]
84pub struct FlvDemuxer {
85 flags_filter: FrameFlags,
86 tracks_filter: u64,
87}
88
89impl FlvDemuxer {
90 pub fn new(flags_filter: FrameFlags, tracks_filter: u64) -> Self {
91 Self {
92 flags_filter,
93 tracks_filter,
94 }
95 }
96}
97
98impl Default for FlvDemuxer {
99 fn default() -> Self {
100 Self {
101 flags_filter: FrameFlags::VIDEO_STREAM,
102 tracks_filter: !0,
103 }
104 }
105}
106
107impl<F: AsRef<[u8]> + Send + 'static, E: std::error::Error + Send + Sync + 'static>
108 Service<Result<F, E>> for FlvDemuxer
109{
110 type Out = Result<FlvFrame, Error<E>>;
111
112 fn handle(
113 self,
114 input: impl Stream<Item = Result<F, E>> + Send,
115 ) -> impl Stream<Item = Self::Out> + Send {
116 let mut tag_type_filter = 0u64;
117
118 if self.flags_filter.contains(FrameFlags::AUDIO_STREAM) {
119 tag_type_filter |= 0b1 << u8::from(FlvTagType::Audio);
120 }
121
122 if self.flags_filter.contains(FrameFlags::VIDEO_STREAM) {
123 tag_type_filter |= 0b1 << u8::from(FlvTagType::Video);
124 }
125
126 if self.flags_filter.contains(FrameFlags::METADATA_STREAM) {
127 tag_type_filter |= 0b1 << u8::from(FlvTagType::Metadata);
128 }
129
130 let reader = StreamReader::new(input.map_ok(std::io::Cursor::new).map_err(Error::Other));
131
132 demux_flv_stream_inner(reader, tag_type_filter).try_filter_map(move |tag| async move {
133 Ok(match tag.data {
134 tag::FlvTagData::Video(vtag) => {
135 if vtag.track_id < 64 && (self.tracks_filter << vtag.track_id) == 0 {
136 None
137 } else {
138 Some(FlvFrame {
139 dts: tag.header.timestamp as u64 * 1000,
140 track_id: vtag.track_id as _,
141 flags: FrameFlags::empty(),
142 pts_offset: vtag.body.pts_offset * 1000,
143 codec: vtag.header.fourcc,
144 params_count: vtag.body.param_count,
145 payload: vtag.body.nalus,
146 })
147 }
148 }
149 tag::FlvTagData::Audio(_) => None,
150 tag::FlvTagData::Meta(_) => None,
151 tag::FlvTagData::Unknown => None,
152 })
153 })
154 }
155}
156
157fn demux_flv_stream_inner<E, R: AsyncRead>(
158 reader: R,
159 tag_types: u64,
160) -> impl Stream<Item = Result<FlvTag, error::Error<E>>> {
161 async_stream::stream! {
162 let mut buff = vec![0u8; 1024];
163 let mut reader = pin!(BufReader::new(reader));
164 let mut parser = FlvParser::default();
165
166 reader.read_exact(&mut buff[0..9]).await?;
168 let header: FlvHeader = parser.parse(&mut &buff[0..9])?;
169
170 if header.remaining > 0 {
172 if buff.len() < header.remaining as usize {
173 buff.resize(header.remaining as usize, 0);
174 }
175
176 reader.read_exact(&mut buff[0..header.remaining as usize]).await?;
177 }
178
179 while let Ok(_) = reader.read_u32().await {
180 match reader.read_exact(&mut buff[0..11]).await {
181 Ok(_) => (),
182 Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break,
183 Err(err) => yield Err(err.into())
184 };
185
186 let header: FlvTagHeader = parser.parse(&mut &buff[0..11])?;
187 if buff.len() < header.data_size as usize {
188 buff.resize(header.data_size as usize, 0);
189 }
190
191 match reader.read_exact(&mut buff[0..header.data_size as usize]).await {
192 Ok(_) => (),
193 Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break,
194 Err(err) => yield Err(err.into())
195 }
196
197 if tag_types & (1u64 << u8::from(header.tag_type)) > 0 {
198 let mut tag_data = Bytes::from(buff[0..header.data_size as usize].to_vec());
199
200 yield Ok(FlvTag {
201 data: parser.parse_flv_data(&mut tag_data, header.tag_type)?,
202 header,
203 });
204 }
205 }
206
207 }
208}