flowly_codec_openh264/
lib.rs

1use std::collections::BinaryHeap;
2
3use bytes::Bytes;
4use flowly::{
5    DataFrame, EncodedFrame, Fourcc, Frame, FrameFlags, FrameSource, MemBlock, Service, VideoFrame,
6    spsc,
7};
8use futures::{Stream, executor::block_on};
9use openh264::{
10    decoder::{DecoderConfig, Flush},
11    formats::YUVSource,
12};
13
14pub use error::Error;
15
16mod error;
17
18#[derive(Debug, Clone)]
19pub struct DecodedFrame<S> {
20    pub timestamp: u64,
21    pub data: Bytes,
22    pub width: u16,
23    pub height: u16,
24    pub flags: FrameFlags,
25    source: S,
26}
27
28impl<S: FrameSource> DataFrame for DecodedFrame<S> {
29    type Source = S;
30    type Chunk = Bytes;
31
32    fn source(&self) -> &Self::Source {
33        &self.source
34    }
35
36    fn chunks(&self) -> impl Send + Iterator<Item = <Self::Chunk as MemBlock>::Ref<'_>> {
37        std::iter::once(&self.data)
38    }
39
40    fn into_chunks(self) -> impl Send + Iterator<Item = Self::Chunk> {
41        std::iter::once(self.data)
42    }
43}
44
45impl<S: FrameSource> Frame for DecodedFrame<S> {
46    fn timestamp(&self) -> u64 {
47        self.timestamp
48    }
49
50    fn codec(&self) -> Fourcc {
51        Fourcc::PIXEL_FORMAT_RGB888
52    }
53
54    fn flags(&self) -> FrameFlags {
55        self.flags
56    }
57}
58
59impl<S: FrameSource> VideoFrame for DecodedFrame<S> {
60    fn dimensions(&self) -> (u16, u16) {
61        (self.width, self.height)
62    }
63
64    fn bit_depth(&self) -> u8 {
65        8
66    }
67}
68
69pub struct Openh264Decoder<S> {
70    sender: spsc::Sender<(Bytes, u64, S)>,
71    receiver: spsc::Receiver<Result<DecodedFrame<S>, Error>>,
72    _handler: tokio::task::JoinHandle<Result<(), Error>>,
73}
74
75impl<S: Send + Default + 'static> Openh264Decoder<S> {
76    pub fn new(_num_threads: u32) -> Self {
77        let (sender, mut rx) = spsc::channel(2);
78        let (mut tx, receiver) = spsc::channel(2);
79
80        Self {
81            sender,
82            receiver,
83            _handler: tokio::task::spawn_blocking(move || {
84                let mut ts_heap: BinaryHeap<Entry<S>> = BinaryHeap::new();
85                let decode_config = DecoderConfig::new().flush_after_decode(Flush::NoFlush);
86
87                let mut decoder = openh264::decoder::Decoder::with_api_config(
88                    openh264::OpenH264API::from_source(),
89                    decode_config,
90                )?;
91
92                while let Some(frame) = block_on(rx.recv()) {
93                    if let Some(ts) = ts_heap.peek() {
94                        if ts.0 != frame.1 {
95                            ts_heap.push(Entry(frame.1, frame.2));
96                        }
97                    } else {
98                        ts_heap.push(Entry(frame.1, frame.2));
99                    }
100
101                    let res = decoder
102                        .decode(&frame.0)
103                        .map_err(Error::from)
104                        .map(|frame| frame.map(|frame| Self::make_frame(ts_heap.pop(), frame)));
105
106                    if let Some(res) = res.transpose() {
107                        if block_on(tx.send(res)).is_err() {
108                            break;
109                        }
110                    }
111                }
112
113                match decoder.flush_remaining() {
114                    Ok(remaining) => {
115                        for frame in remaining {
116                            if block_on(tx.send(Ok(Self::make_frame(ts_heap.pop(), frame))))
117                                .is_err()
118                            {
119                                break;
120                            }
121                        }
122                    }
123                    Err(err) => log::error!("openh264::Decoder::flush_remaining error: {err}"),
124                }
125
126                Ok(())
127            }),
128        }
129    }
130
131    #[inline]
132    pub async fn push_data(&mut self, data: Bytes, timestamp: u64, source: S) -> Result<(), Error> {
133        self.sender
134            .send((data, timestamp, source))
135            .await
136            .map_err(|_| Error::TrySendError)
137    }
138
139    #[inline]
140    pub fn pull_frame(&mut self) -> Result<Option<DecodedFrame<S>>, Error> {
141        self.receiver
142            .try_recv()
143            .map_err(|_| Error::TrySendError)?
144            .transpose()
145    }
146
147    #[allow(clippy::uninit_vec)]
148    fn make_frame(
149        in_frame: Option<Entry<S>>,
150        frame: openh264::decoder::DecodedYUV<'_>,
151    ) -> DecodedFrame<S> {
152        let dims = frame.dimensions();
153        let mut data = Vec::with_capacity(dims.0 * dims.1 * 3);
154        unsafe { data.set_len(dims.0 * dims.1 * 3) };
155
156        frame.write_rgb8(&mut data);
157
158        DecodedFrame {
159            timestamp: in_frame.as_ref().map(|x| x.0).unwrap_or_default(),
160            data: data.into(),
161            width: dims.0 as _,
162            height: dims.1 as _,
163            source: in_frame.map(|x| x.1).unwrap_or_default(),
164            flags: FrameFlags::VIDEO_STREAM,
165        }
166    }
167}
168
169impl<S: Send + Default + 'static> Default for Openh264Decoder<S> {
170    fn default() -> Self {
171        Self::new(0)
172    }
173}
174
175impl<F: EncodedFrame + 'static> Service<F> for Openh264Decoder<F::Source> {
176    type Out = Result<DecodedFrame<F::Source>, Error>;
177
178    fn handle(&mut self, frame: F, _cx: &flowly::Context) -> impl Stream<Item = Self::Out> {
179        async_stream::stream! {
180            let ts = frame.timestamp();
181            let source = frame.source().clone();
182
183            for chunk in frame.into_chunks() {
184                if let Err(err) = self.push_data(chunk.into_cpu_bytes(), ts, source.clone()).await {
185                    yield Err(err);
186                }
187            }
188
189            while let Some(res) = self.pull_frame().transpose() {
190                yield res;
191            }
192        }
193    }
194}
195
196struct Entry<S>(u64, S);
197
198impl<S> std::ops::Deref for Entry<S> {
199    type Target = S;
200
201    fn deref(&self) -> &Self::Target {
202        &self.1
203    }
204}
205
206impl<S> PartialEq for Entry<S> {
207    fn eq(&self, other: &Self) -> bool {
208        self.0 == other.0
209    }
210}
211
212impl<S> Eq for Entry<S> {}
213
214impl<S> PartialOrd for Entry<S> {
215    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
216        Some(Self::cmp(self, other))
217    }
218}
219
220impl<S> Ord for Entry<S> {
221    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
222        other.0.cmp(&self.0)
223    }
224}