flowly_codec_openh264/
lib.rs1use std::collections::BinaryHeap;
2
3use bytes::Bytes;
4use flowly::{
5 DataFrame, EncodedFrame, Fourcc, Frame, FrameFlags, FrameSource, MemBlock, Service, VideoFrame,
6 spsc,
7};
8use futures::{Stream, executor::block_on};
9use openh264::{
10 decoder::{DecoderConfig, Flush},
11 formats::YUVSource,
12};
13
14pub use error::Error;
15
16mod error;
17
18#[derive(Debug, Clone)]
19pub struct DecodedFrame<S> {
20 pub timestamp: u64,
21 pub data: Vec<u8>,
22 pub width: u16,
23 pub height: u16,
24 pub flags: FrameFlags,
25 source: S,
26}
27
28impl<S: FrameSource> DataFrame for DecodedFrame<S> {
29 type Source = S;
30 type Chunk = Vec<u8>;
31
32 fn source(&self) -> &Self::Source {
33 &self.source
34 }
35
36 fn chunks(&self) -> impl Send + Iterator<Item = <Self::Chunk as MemBlock>::Ref<'_>> {
37 std::iter::once(self.data.as_slice())
38 }
39
40 fn into_chunks(self) -> impl Send + Iterator<Item = Self::Chunk> {
41 std::iter::once(self.data)
42 }
43}
44
45impl<S: FrameSource> Frame for DecodedFrame<S> {
46 fn timestamp(&self) -> u64 {
47 self.timestamp
48 }
49
50 fn codec(&self) -> Fourcc {
51 Fourcc::PIXEL_FORMAT_RGB888
52 }
53
54 fn flags(&self) -> FrameFlags {
55 self.flags
56 }
57}
58
59impl<S: FrameSource> VideoFrame for DecodedFrame<S> {
60 fn dimensions(&self) -> (u16, u16) {
61 (self.width, self.height)
62 }
63
64 fn bit_depth(&self) -> u8 {
65 8
66 }
67}
68
69pub struct Openh264Decoder<S> {
70 sender: spsc::Sender<(Bytes, u64, S)>,
71 receiver: spsc::Receiver<Result<DecodedFrame<S>, Error>>,
72 _handler: tokio::task::JoinHandle<Result<(), Error>>,
73}
74
75impl<S: Send + Default + 'static> Openh264Decoder<S> {
76 pub fn new(_num_threads: u32) -> Self {
77 let (sender, mut rx) = spsc::channel(8);
78 let (mut tx, receiver) = spsc::channel(8);
79
80 Self {
81 sender,
82 receiver,
83 _handler: tokio::task::spawn_blocking(move || {
84 let mut ts_heap: BinaryHeap<Entry<S>> = BinaryHeap::new();
85 let decode_config = DecoderConfig::new().flush_after_decode(Flush::NoFlush);
86
87 let mut decoder = openh264::decoder::Decoder::with_api_config(
88 openh264::OpenH264API::from_source(),
89 decode_config,
90 )?;
91
92 while let Some(frame) = block_on(rx.recv()) {
93 if let Some(ts) = ts_heap.peek() {
94 if ts.0 != frame.1 {
95 ts_heap.push(Entry(frame.1, frame.2));
96 }
97 } else {
98 ts_heap.push(Entry(frame.1, frame.2));
99 }
100
101 let res = decoder
102 .decode(&frame.0)
103 .map_err(Error::from)
104 .map(|frame| frame.map(|frame| Self::make_frame(ts_heap.pop(), frame)));
105
106 if let Some(res) = res.transpose() {
107 if block_on(tx.send(res)).is_err() {
108 break;
109 }
110 }
111 }
112
113 match decoder.flush_remaining() {
114 Ok(remaining) => {
115 for frame in remaining {
116 if block_on(tx.send(Ok(Self::make_frame(ts_heap.pop(), frame))))
117 .is_err()
118 {
119 break;
120 }
121 }
122 }
123 Err(err) => log::error!("openh264::Decoder::flush_remaining error: {err}"),
124 }
125
126 Ok(())
127 }),
128 }
129 }
130
131 #[inline]
132 pub async fn push_data(&mut self, data: Bytes, timestamp: u64, source: S) -> Result<(), Error> {
133 self.sender
134 .send((data, timestamp, source))
135 .await
136 .map_err(|_| Error::TrySendError)
137 }
138
139 #[inline]
140 pub fn pull_frame(&mut self) -> Result<Option<DecodedFrame<S>>, Error> {
141 self.receiver
142 .try_recv()
143 .map_err(|_| Error::TrySendError)?
144 .transpose()
145 }
146
147 #[inline]
148 pub fn close(&mut self) {
149 self.sender.close();
150 }
151
152 #[allow(clippy::uninit_vec)]
153 fn make_frame(
154 in_frame: Option<Entry<S>>,
155 frame: openh264::decoder::DecodedYUV<'_>,
156 ) -> DecodedFrame<S> {
157 let dims = frame.dimensions();
158 let mut data = Vec::with_capacity(dims.0 * dims.1 * 3);
159 unsafe { data.set_len(dims.0 * dims.1 * 3) };
160
161 frame.write_rgb8(&mut data);
162
163 DecodedFrame {
164 timestamp: in_frame.as_ref().map(|x| x.0).unwrap_or_default(),
165 data,
166 width: dims.0 as _,
167 height: dims.1 as _,
168 source: in_frame.map(|x| x.1).unwrap_or_default(),
169 flags: FrameFlags::VIDEO_STREAM,
170 }
171 }
172}
173
174impl<S: Send + Default + 'static> Default for Openh264Decoder<S> {
175 fn default() -> Self {
176 Self::new(0)
177 }
178}
179
180impl<F: EncodedFrame + 'static> Service<F> for Openh264Decoder<F::Source> {
181 type Out = Result<DecodedFrame<F::Source>, Error>;
182
183 fn handle(&mut self, frame: F, _cx: &flowly::Context) -> impl Stream<Item = Self::Out> {
184 async_stream::stream! {
185 let ts = frame.timestamp();
186 let source = frame.source().clone();
187
188 for chunk in frame.into_chunks() {
189 if let Err(err) = self.push_data(chunk.into_cpu_bytes(), ts, source.clone()).await {
190 yield Err(err);
191 }
192 }
193
194 while let Some(res) = self.pull_frame().transpose() {
195 yield res;
196 }
197 }
198 }
199}
200
201struct Entry<S>(u64, S);
202
203impl<S> std::ops::Deref for Entry<S> {
204 type Target = S;
205
206 fn deref(&self) -> &Self::Target {
207 &self.1
208 }
209}
210
211impl<S> PartialEq for Entry<S> {
212 fn eq(&self, other: &Self) -> bool {
213 self.0 == other.0
214 }
215}
216
217impl<S> Eq for Entry<S> {}
218
219impl<S> PartialOrd for Entry<S> {
220 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
221 Some(Self::cmp(self, other))
222 }
223}
224
225impl<S> Ord for Entry<S> {
226 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
227 other.0.cmp(&self.0)
228 }
229}