flowly_codec_openh264/
lib.rs

1use std::collections::BinaryHeap;
2
3use bytes::Bytes;
4use flowly::{
5    DataFrame, EncodedFrame, Fourcc, Frame, FrameFlags, FrameSource, MemBlock, Service, VideoFrame,
6    spsc,
7};
8use futures::{Stream, executor::block_on};
9use openh264::{
10    decoder::{DecoderConfig, Flush},
11    formats::YUVSource,
12};
13
14pub use error::Error;
15
16mod error;
17
18#[derive(Debug, Clone)]
19pub struct DecodedFrame<S> {
20    pub timestamp: u64,
21    pub data: Vec<u8>,
22    pub width: u16,
23    pub height: u16,
24    pub flags: FrameFlags,
25    source: S,
26}
27
28impl<S: FrameSource> DataFrame for DecodedFrame<S> {
29    type Source = S;
30    type Chunk = Vec<u8>;
31
32    fn source(&self) -> &Self::Source {
33        &self.source
34    }
35
36    fn chunks(&self) -> impl Send + Iterator<Item = <Self::Chunk as MemBlock>::Ref<'_>> {
37        std::iter::once(self.data.as_slice())
38    }
39
40    fn into_chunks(self) -> impl Send + Iterator<Item = Self::Chunk> {
41        std::iter::once(self.data)
42    }
43}
44
45impl<S: FrameSource> Frame for DecodedFrame<S> {
46    fn timestamp(&self) -> u64 {
47        self.timestamp
48    }
49
50    fn codec(&self) -> Fourcc {
51        Fourcc::PIXEL_FORMAT_RGB888
52    }
53
54    fn flags(&self) -> FrameFlags {
55        self.flags
56    }
57}
58
59impl<S: FrameSource> VideoFrame for DecodedFrame<S> {
60    fn dimensions(&self) -> (u16, u16) {
61        (self.width, self.height)
62    }
63
64    fn bit_depth(&self) -> u8 {
65        8
66    }
67}
68
69pub struct Openh264Decoder<S> {
70    sender: spsc::Sender<(Bytes, u64, S)>,
71    receiver: spsc::Receiver<Result<DecodedFrame<S>, Error>>,
72    _handler: tokio::task::JoinHandle<Result<(), Error>>,
73}
74
75impl<S: Send + Default + 'static> Openh264Decoder<S> {
76    pub fn new(_num_threads: u32) -> Self {
77        let (sender, mut rx) = spsc::channel(8);
78        let (mut tx, receiver) = spsc::channel(8);
79
80        Self {
81            sender,
82            receiver,
83            _handler: tokio::task::spawn_blocking(move || {
84                let mut ts_heap: BinaryHeap<Entry<S>> = BinaryHeap::new();
85                let decode_config = DecoderConfig::new().flush_after_decode(Flush::NoFlush);
86
87                let mut decoder = openh264::decoder::Decoder::with_api_config(
88                    openh264::OpenH264API::from_source(),
89                    decode_config,
90                )?;
91
92                while let Some(frame) = block_on(rx.recv()) {
93                    if let Some(ts) = ts_heap.peek() {
94                        if ts.0 != frame.1 {
95                            ts_heap.push(Entry(frame.1, frame.2));
96                        }
97                    } else {
98                        ts_heap.push(Entry(frame.1, frame.2));
99                    }
100
101                    let res = decoder
102                        .decode(&frame.0)
103                        .map_err(Error::from)
104                        .map(|frame| frame.map(|frame| Self::make_frame(ts_heap.pop(), frame)));
105
106                    if let Some(res) = res.transpose() {
107                        if block_on(tx.send(res)).is_err() {
108                            break;
109                        }
110                    }
111                }
112
113                match decoder.flush_remaining() {
114                    Ok(remaining) => {
115                        for frame in remaining {
116                            if block_on(tx.send(Ok(Self::make_frame(ts_heap.pop(), frame))))
117                                .is_err()
118                            {
119                                break;
120                            }
121                        }
122                    }
123                    Err(err) => log::error!("openh264::Decoder::flush_remaining error: {err}"),
124                }
125
126                Ok(())
127            }),
128        }
129    }
130
131    #[inline]
132    pub async fn push_data(&mut self, data: Bytes, timestamp: u64, source: S) -> Result<(), Error> {
133        self.sender
134            .send((data, timestamp, source))
135            .await
136            .map_err(|_| Error::TrySendError)
137    }
138
139    #[inline]
140    pub fn pull_frame(&mut self) -> Result<Option<DecodedFrame<S>>, Error> {
141        self.receiver
142            .try_recv()
143            .map_err(|_| Error::TrySendError)?
144            .transpose()
145    }
146
147    #[inline]
148    pub fn close(&mut self) {
149        self.sender.close();
150    }
151
152    #[allow(clippy::uninit_vec)]
153    fn make_frame(
154        in_frame: Option<Entry<S>>,
155        frame: openh264::decoder::DecodedYUV<'_>,
156    ) -> DecodedFrame<S> {
157        let dims = frame.dimensions();
158        let mut data = Vec::with_capacity(dims.0 * dims.1 * 3);
159        unsafe { data.set_len(dims.0 * dims.1 * 3) };
160
161        frame.write_rgb8(&mut data);
162
163        DecodedFrame {
164            timestamp: in_frame.as_ref().map(|x| x.0).unwrap_or_default(),
165            data,
166            width: dims.0 as _,
167            height: dims.1 as _,
168            source: in_frame.map(|x| x.1).unwrap_or_default(),
169            flags: FrameFlags::VIDEO_STREAM,
170        }
171    }
172}
173
174impl<S: Send + Default + 'static> Default for Openh264Decoder<S> {
175    fn default() -> Self {
176        Self::new(0)
177    }
178}
179
180impl<F: EncodedFrame + 'static> Service<F> for Openh264Decoder<F::Source> {
181    type Out = Result<DecodedFrame<F::Source>, Error>;
182
183    fn handle(&mut self, frame: F, _cx: &flowly::Context) -> impl Stream<Item = Self::Out> {
184        async_stream::stream! {
185            let ts = frame.timestamp();
186            let source = frame.source().clone();
187
188            for chunk in frame.into_chunks() {
189                if let Err(err) = self.push_data(chunk.into_cpu_bytes(), ts, source.clone()).await {
190                    yield Err(err);
191                }
192            }
193
194            while let Some(res) = self.pull_frame().transpose() {
195                yield res;
196            }
197        }
198    }
199}
200
201struct Entry<S>(u64, S);
202
203impl<S> std::ops::Deref for Entry<S> {
204    type Target = S;
205
206    fn deref(&self) -> &Self::Target {
207        &self.1
208    }
209}
210
211impl<S> PartialEq for Entry<S> {
212    fn eq(&self, other: &Self) -> bool {
213        self.0 == other.0
214    }
215}
216
217impl<S> Eq for Entry<S> {}
218
219impl<S> PartialOrd for Entry<S> {
220    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
221        Some(Self::cmp(self, other))
222    }
223}
224
225impl<S> Ord for Entry<S> {
226    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
227        other.0.cmp(&self.0)
228    }
229}