Skip to main content

video_rs/
decode.rs

1extern crate ffmpeg_next as ffmpeg;
2
3use ffmpeg::codec::decoder::Video as AvDecoder;
4use ffmpeg::codec::Context as AvContext;
5use ffmpeg::format::pixel::Pixel as AvPixel;
6use ffmpeg::software::scaling::{context::Context as AvScaler, flag::Flags as AvScalerFlags};
7use ffmpeg::util::error::EAGAIN;
8use ffmpeg::{Error as AvError, Rational as AvRational};
9
10use crate::error::Error;
11use crate::ffi;
12use crate::ffi_hwaccel;
13#[cfg(feature = "ndarray")]
14use crate::frame::Frame;
15use crate::frame::{RawFrame, FRAME_PIXEL_FORMAT};
16use crate::hwaccel::{HardwareAccelerationContext, HardwareAccelerationDeviceType};
17use crate::io::{Reader, ReaderBuilder};
18use crate::location::Location;
19use crate::options::Options;
20use crate::packet::Packet;
21use crate::resize::Resize;
22use crate::time::Time;
23
24type Result<T> = std::result::Result<T, Error>;
25
26/// Always use NV12 pixel format with hardware acceleration, then rescale later.
27static HWACCEL_PIXEL_FORMAT: AvPixel = AvPixel::NV12;
28
29/// Builds a [`Decoder`].
30pub struct DecoderBuilder<'a> {
31    source: Location,
32    options: Option<&'a Options>,
33    resize: Option<Resize>,
34    hardware_acceleration_device_type: Option<HardwareAccelerationDeviceType>,
35}
36
37impl<'a> DecoderBuilder<'a> {
38    /// Create a decoder with the specified source.
39    ///
40    /// * `source` - Source to decode.
41    pub fn new(source: impl Into<Location>) -> Self {
42        Self {
43            source: source.into(),
44            options: None,
45            resize: None,
46            hardware_acceleration_device_type: None,
47        }
48    }
49
50    /// Set custom options. Options are applied to the input.
51    ///
52    /// * `options` - Custom options.
53    pub fn with_options(mut self, options: &'a Options) -> Self {
54        self.options = Some(options);
55        self
56    }
57
58    /// Set resizing to apply to frames.
59    ///
60    /// * `resize` - Resizing to apply.
61    pub fn with_resize(mut self, resize: Resize) -> Self {
62        self.resize = Some(resize);
63        self
64    }
65
66    /// Enable hardware acceleration with the specified device type.
67    ///
68    /// * `device_type` - Device to use for hardware acceleration.
69    pub fn with_hardware_acceleration(
70        mut self,
71        device_type: HardwareAccelerationDeviceType,
72    ) -> Self {
73        self.hardware_acceleration_device_type = Some(device_type);
74        self
75    }
76
77    /// Build [`Decoder`].
78    pub fn build(self) -> Result<Decoder> {
79        let mut reader_builder = ReaderBuilder::new(self.source);
80        if let Some(options) = self.options {
81            reader_builder = reader_builder.with_options(options);
82        }
83        let reader = reader_builder.build()?;
84        let reader_stream_index = reader.best_video_stream_index()?;
85        Ok(Decoder {
86            decoder: DecoderSplit::new(
87                &reader,
88                reader_stream_index,
89                self.resize,
90                self.hardware_acceleration_device_type,
91            )?,
92            reader,
93            reader_stream_index,
94            draining: false,
95        })
96    }
97}
98
99/// Decode video files and streams.
100///
101/// # Example
102///
103/// ```ignore
104/// let decoder = Decoder::new(Path::new("video.mp4")).unwrap();
105/// decoder
106///     .decode_iter()
107///     .take_while(Result::is_ok)
108///     .for_each(|frame| println!("Got frame!"),
109/// );
110/// ```
111pub struct Decoder {
112    decoder: DecoderSplit,
113    reader: Reader,
114    reader_stream_index: usize,
115    draining: bool,
116}
117
118impl Decoder {
119    /// Create a decoder to decode the specified source.
120    ///
121    /// # Arguments
122    ///
123    /// * `source` - Source to decode.
124    #[inline]
125    pub fn new(source: impl Into<Location>) -> Result<Self> {
126        DecoderBuilder::new(source).build()
127    }
128
129    /// Get decoder time base.
130    #[inline]
131    pub fn time_base(&self) -> AvRational {
132        self.decoder.time_base()
133    }
134
135    /// Duration of the decoder stream.
136    #[inline]
137    pub fn duration(&self) -> Result<Time> {
138        let reader_stream = self
139            .reader
140            .input
141            .stream(self.reader_stream_index)
142            .ok_or(AvError::StreamNotFound)?;
143        Ok(Time::new(
144            Some(reader_stream.duration()),
145            reader_stream.time_base(),
146        ))
147    }
148
149    /// Number of frames in the decoder stream.
150    #[inline]
151    pub fn frames(&self) -> Result<u64> {
152        Ok(self
153            .reader
154            .input
155            .stream(self.reader_stream_index)
156            .ok_or(AvError::StreamNotFound)?
157            .frames()
158            .max(0) as u64)
159    }
160
161    /// Decode frames through iterator interface. This is similar to `decode` but it returns frames
162    /// through an infinite iterator.
163    ///
164    /// # Example
165    ///
166    /// ```ignore
167    /// decoder
168    ///     .decode_iter()
169    ///     .take_while(Result::is_ok)
170    ///     .map(Result::unwrap)
171    ///     .for_each(|(ts, frame)| {
172    ///         // Do something with frame...
173    ///     });
174    /// ```
175    #[cfg(feature = "ndarray")]
176    pub fn decode_iter(&mut self) -> impl Iterator<Item = Result<(Time, Frame)>> + '_ {
177        std::iter::from_fn(move || Some(self.decode()))
178    }
179
180    /// Decode a single frame.
181    ///
182    /// # Return value
183    ///
184    /// A tuple of the frame timestamp (relative to the stream) and the frame itself.
185    ///
186    /// # Example
187    ///
188    /// ```ignore
189    /// loop {
190    ///     let (ts, frame) = decoder.decode()?;
191    ///     // Do something with frame...
192    /// }
193    /// ```
194    #[cfg(feature = "ndarray")]
195    pub fn decode(&mut self) -> Result<(Time, Frame)> {
196        Ok(loop {
197            if !self.draining {
198                let packet_result = self.reader.read(self.reader_stream_index);
199                if matches!(packet_result, Err(Error::ReadExhausted)) {
200                    self.draining = true;
201                    continue;
202                }
203                let packet = packet_result?;
204                if let Some(frame) = self.decoder.decode(packet)? {
205                    break frame;
206                }
207            } else {
208                match self.decoder.drain() {
209                    Ok(Some(frame)) => break frame,
210                    Ok(None) | Err(Error::ReadExhausted) => {
211                        self.decoder.reset();
212                        self.draining = false;
213                        return Err(Error::DecodeExhausted);
214                    }
215                    Err(err) => return Err(err),
216                }
217            }
218        })
219    }
220
221    /// Decode frames through iterator interface. This is similar to `decode_raw` but it returns
222    /// frames through an infinite iterator.
223    pub fn decode_raw_iter(&mut self) -> impl Iterator<Item = Result<RawFrame>> + '_ {
224        std::iter::from_fn(move || Some(self.decode_raw()))
225    }
226
227    /// Decode a single frame and return the raw ffmpeg `AvFrame`.
228    ///
229    /// # Return value
230    ///
231    /// The decoded raw frame as [`RawFrame`].
232    pub fn decode_raw(&mut self) -> Result<RawFrame> {
233        Ok(loop {
234            if !self.draining {
235                let packet_result = self.reader.read(self.reader_stream_index);
236                if matches!(packet_result, Err(Error::ReadExhausted)) {
237                    self.draining = true;
238                    continue;
239                }
240                let packet = packet_result?;
241                if let Some(frame) = self.decoder.decode_raw(packet)? {
242                    break frame;
243                }
244            } else if let Some(frame) = self.decoder.drain_raw()? {
245                break frame;
246            } else {
247                match self.decoder.drain_raw() {
248                    Ok(Some(frame)) => break frame,
249                    Ok(None) | Err(Error::ReadExhausted) => {
250                        self.decoder.reset();
251                        self.draining = false;
252                        return Err(Error::DecodeExhausted);
253                    }
254                    Err(err) => return Err(err),
255                }
256            }
257        })
258    }
259
260    /// Seek in reader.
261    ///
262    /// See [`Reader::seek`](crate::io::Reader::seek) for more information.
263    #[inline]
264    pub fn seek(&mut self, timestamp_milliseconds: i64) -> Result<()> {
265        self.reader
266            .seek(timestamp_milliseconds)
267            .inspect(|_| self.decoder.decoder.flush())
268    }
269
270    /// Seek to specific frame in reader.
271    ///
272    /// See [`Reader::seek_to_frame`](crate::io::Reader::seek_to_frame) for more information.
273    #[inline]
274    pub fn seek_to_frame(&mut self, frame_number: i64) -> Result<()> {
275        self.reader
276            .seek_to_frame(frame_number)
277            .inspect(|_| self.decoder.decoder.flush())
278    }
279
280    /// Seek to start of reader.
281    ///
282    /// See [`Reader::seek_to_start`](crate::io::Reader::seek_to_start) for more information.
283    #[inline]
284    pub fn seek_to_start(&mut self) -> Result<()> {
285        self.reader
286            .seek_to_start()
287            .inspect(|_| self.decoder.decoder.flush())
288    }
289
290    /// Split the decoder into a decoder (of type [`DecoderSplit`]) and a [`Reader`].
291    ///
292    /// This allows the caller to detach stream reading from decoding, which is useful for advanced
293    /// use cases.
294    ///
295    /// # Return value
296    ///
297    /// Tuple of the [`DecoderSplit`], [`Reader`] and the reader stream index.
298    #[inline]
299    pub fn into_parts(self) -> (DecoderSplit, Reader, usize) {
300        (self.decoder, self.reader, self.reader_stream_index)
301    }
302
303    /// Get the decoders input size (resolution dimensions): width and height.
304    #[inline(always)]
305    pub fn size(&self) -> (u32, u32) {
306        self.decoder.size
307    }
308
309    /// Get the decoders output size after resizing is applied (resolution dimensions): width and
310    /// height.
311    #[inline(always)]
312    pub fn size_out(&self) -> (u32, u32) {
313        self.decoder.size_out
314    }
315
316    /// Get the decoders input frame rate as floating-point value.
317    pub fn frame_rate(&self) -> f32 {
318        let frame_rate = self
319            .reader
320            .input
321            .stream(self.reader_stream_index)
322            .map(|stream| stream.rate());
323
324        if let Some(frame_rate) = frame_rate {
325            if frame_rate.denominator() > 0 {
326                (frame_rate.numerator() as f32) / (frame_rate.denominator() as f32)
327            } else {
328                0.0
329            }
330        } else {
331            0.0
332        }
333    }
334}
335
336/// Decoder part of a split [`Decoder`] and [`Reader`].
337///
338/// Important note: Do not forget to drain the decoder after the reader is exhausted. It may still
339/// contain frames. Run `drain_raw()` or `drain()` in a loop until no more frames are produced.
340pub struct DecoderSplit {
341    decoder: AvDecoder,
342    decoder_time_base: AvRational,
343    hwaccel_context: Option<HardwareAccelerationContext>,
344    scaler: Option<AvScaler>,
345    size: (u32, u32),
346    size_out: (u32, u32),
347    draining: bool,
348}
349
350impl DecoderSplit {
351    /// Create a new [`DecoderSplit`].
352    ///
353    /// # Arguments
354    ///
355    /// * `reader` - [`Reader`] to initialize decoder from.
356    /// * `resize` - Optional resize strategy to apply to frames.
357    pub fn new(
358        reader: &Reader,
359        reader_stream_index: usize,
360        resize: Option<Resize>,
361        hwaccel_device_type: Option<HardwareAccelerationDeviceType>,
362    ) -> Result<Self> {
363        let reader_stream = reader
364            .input
365            .stream(reader_stream_index)
366            .ok_or(AvError::StreamNotFound)?;
367
368        let mut decoder = AvContext::new();
369        ffi::set_decoder_context_time_base(&mut decoder, reader_stream.time_base());
370        decoder.set_parameters(reader_stream.parameters())?;
371
372        let hwaccel_context = match hwaccel_device_type {
373            Some(device_type) => Some(HardwareAccelerationContext::new(&mut decoder, device_type)?),
374            None => None,
375        };
376
377        let decoder = decoder.decoder().video()?;
378        let decoder_time_base = decoder.time_base();
379
380        if decoder.format() == AvPixel::None || decoder.width() == 0 || decoder.height() == 0 {
381            return Err(Error::MissingCodecParameters);
382        }
383
384        let (resize_width, resize_height) = match resize {
385            Some(resize) => resize
386                .compute_for((decoder.width(), decoder.height()))
387                .ok_or(Error::InvalidResizeParameters)?,
388            None => (decoder.width(), decoder.height()),
389        };
390
391        let scaler_input_format = if hwaccel_context.is_some() {
392            HWACCEL_PIXEL_FORMAT
393        } else {
394            decoder.format()
395        };
396
397        let is_scaler_needed = !(scaler_input_format == FRAME_PIXEL_FORMAT
398            && decoder.width() == resize_width
399            && decoder.height() == resize_height);
400        let scaler = if is_scaler_needed {
401            Some(
402                AvScaler::get(
403                    scaler_input_format,
404                    decoder.width(),
405                    decoder.height(),
406                    FRAME_PIXEL_FORMAT,
407                    resize_width,
408                    resize_height,
409                    AvScalerFlags::AREA,
410                )
411                .map_err(Error::BackendError)?,
412            )
413        } else {
414            None
415        };
416
417        let size = (decoder.width(), decoder.height());
418        let size_out = (resize_width, resize_height);
419
420        Ok(Self {
421            decoder,
422            decoder_time_base,
423            hwaccel_context,
424            scaler,
425            size,
426            size_out,
427            draining: false,
428        })
429    }
430
431    /// Get decoder time base.
432    #[inline]
433    pub fn time_base(&self) -> AvRational {
434        self.decoder_time_base
435    }
436
437    /// Decode a [`Packet`].
438    ///
439    /// Feeds the packet to the decoder and returns a frame if there is one available. The caller
440    /// should keep feeding packets until the decoder returns a frame.
441    ///
442    /// # Panics
443    ///
444    /// Panics if in draining mode.
445    ///
446    /// # Return value
447    ///
448    /// A tuple of the [`Frame`] and timestamp (relative to the stream) and the frame itself if the
449    /// decoder has a frame available, [`None`] if not.
450    #[cfg(feature = "ndarray")]
451    pub fn decode(&mut self, packet: Packet) -> Result<Option<(Time, Frame)>> {
452        match self.decode_raw(packet)? {
453            Some(mut frame) => Ok(Some(self.raw_frame_to_time_and_frame(&mut frame)?)),
454            None => Ok(None),
455        }
456    }
457
458    /// Decode a [`Packet`].
459    ///
460    /// Feeds the packet to the decoder and returns a frame if there is one available. The caller
461    /// should keep feeding packets until the decoder returns a frame.
462    ///
463    /// # Panics
464    ///
465    /// Panics if in draining mode.
466    ///
467    /// # Return value
468    ///
469    /// The decoded raw frame as [`RawFrame`] if the decoder has a frame available, [`None`] if not.
470    pub fn decode_raw(&mut self, packet: Packet) -> Result<Option<RawFrame>> {
471        assert!(!self.draining);
472        self.send_packet_to_decoder(packet)?;
473        self.receive_frame_from_decoder()
474    }
475
476    /// Drain one frame from the decoder.
477    ///
478    /// After calling drain once the decoder is in draining mode and the caller may not use normal
479    /// decode anymore or it will panic.
480    ///
481    /// # Return value
482    ///
483    /// A tuple of the [`Frame`] and timestamp (relative to the stream) and the frame itself if the
484    /// decoder has a frame available, [`None`] if not.
485    #[cfg(feature = "ndarray")]
486    pub fn drain(&mut self) -> Result<Option<(Time, Frame)>> {
487        match self.drain_raw()? {
488            Some(mut frame) => Ok(Some(self.raw_frame_to_time_and_frame(&mut frame)?)),
489            None => Ok(None),
490        }
491    }
492
493    /// Drain one frame from the decoder.
494    ///
495    /// After calling drain once the decoder is in draining mode and the caller may not use normal
496    /// decode anymore or it will panic.
497    ///
498    /// # Return value
499    ///
500    /// The decoded raw frame as [`RawFrame`] if the decoder has a frame available, [`None`] if not.
501    pub fn drain_raw(&mut self) -> Result<Option<RawFrame>> {
502        if !self.draining {
503            self.decoder.send_eof().map_err(Error::BackendError)?;
504            self.draining = true;
505        }
506        self.receive_frame_from_decoder()
507    }
508
509    /// Reset the decoder to be used again after draining.
510    pub fn reset(&mut self) {
511        self.decoder.flush();
512        self.draining = false;
513    }
514
515    /// Get the decoders input size (resolution dimensions): width and height.
516    #[inline(always)]
517    pub fn size(&self) -> (u32, u32) {
518        self.size
519    }
520
521    /// Get the decoders output size after resizing is applied (resolution dimensions): width and
522    /// height.
523    #[inline(always)]
524    pub fn size_out(&self) -> (u32, u32) {
525        self.size_out
526    }
527
528    /// Send packet to decoder. Includes rescaling timestamps accordingly.
529    fn send_packet_to_decoder(&mut self, packet: Packet) -> Result<()> {
530        let (mut packet, packet_time_base) = packet.into_inner_parts();
531        packet.rescale_ts(packet_time_base, self.decoder_time_base);
532
533        self.decoder
534            .send_packet(&packet)
535            .map_err(Error::BackendError)?;
536
537        Ok(())
538    }
539
540    /// Receive packet from decoder. Will handle hwaccel conversions and scaling as well.
541    fn receive_frame_from_decoder(&mut self) -> Result<Option<RawFrame>> {
542        match self.decoder_receive_frame()? {
543            Some(frame) => {
544                let frame = match self.hwaccel_context.as_ref() {
545                    Some(hwaccel_context) if hwaccel_context.format() == frame.format() => {
546                        Self::download_frame(&frame)?
547                    }
548                    _ => frame,
549                };
550
551                let frame = match self.scaler.as_mut() {
552                    Some(scaler) => Self::rescale_frame(&frame, scaler)?,
553                    _ => frame,
554                };
555
556                Ok(Some(frame))
557            }
558            None => Ok(None),
559        }
560    }
561
562    /// Pull a decoded frame from the decoder. This function also implements retry mechanism in case
563    /// the decoder signals `EAGAIN`.
564    fn decoder_receive_frame(&mut self) -> Result<Option<RawFrame>> {
565        let mut frame = RawFrame::empty();
566        let decode_result = self.decoder.receive_frame(&mut frame);
567        match decode_result {
568            Ok(()) => Ok(Some(frame)),
569            Err(AvError::Eof) => Err(Error::ReadExhausted),
570            Err(AvError::Other { errno }) if errno == EAGAIN => Ok(None),
571            Err(err) => Err(err.into()),
572        }
573    }
574
575    /// Download frame from foreign hardware acceleration device.
576    fn download_frame(frame: &RawFrame) -> Result<RawFrame> {
577        let mut frame_downloaded = RawFrame::empty();
578        frame_downloaded.set_format(HWACCEL_PIXEL_FORMAT);
579        ffi_hwaccel::hwdevice_transfer_frame(&mut frame_downloaded, frame)?;
580        ffi::copy_frame_props(frame, &mut frame_downloaded);
581        Ok(frame_downloaded)
582    }
583
584    /// Rescale frame with the scaler.
585    fn rescale_frame(frame: &RawFrame, scaler: &mut AvScaler) -> Result<RawFrame> {
586        let mut frame_scaled = RawFrame::empty();
587        scaler
588            .run(frame, &mut frame_scaled)
589            .map_err(Error::BackendError)?;
590        ffi::copy_frame_props(frame, &mut frame_scaled);
591        Ok(frame_scaled)
592    }
593
594    #[cfg(feature = "ndarray")]
595    fn raw_frame_to_time_and_frame(&self, frame: &mut RawFrame) -> Result<(Time, Frame)> {
596        // We use the packet DTS here (which is `frame->pkt_dts`) because that is what the
597        // encoder will use when encoding for the `PTS` field.
598        let timestamp = Time::new(Some(frame.packet().dts), self.decoder_time_base);
599        let frame = ffi::convert_frame_to_ndarray_rgb24(frame).map_err(Error::BackendError)?;
600
601        Ok((timestamp, frame))
602    }
603}
604
605impl Drop for DecoderSplit {
606    fn drop(&mut self) {
607        // Maximum number of invocations to `decoder_receive_frame` to drain the items still on the
608        // queue before giving up.
609        const MAX_DRAIN_ITERATIONS: u32 = 100;
610
611        // We need to drain the items still in the decoders queue.
612        if let Ok(()) = self.decoder.send_eof() {
613            for _ in 0..MAX_DRAIN_ITERATIONS {
614                if self.decoder_receive_frame().is_err() {
615                    break;
616                }
617            }
618        }
619    }
620}
621
622unsafe impl Send for DecoderSplit {}
623unsafe impl Sync for DecoderSplit {}