Skip to main content

ez_ffmpeg/core/
packet_scanner.rs

1use ffmpeg_sys_next::{
2    av_packet_alloc, av_packet_free, av_packet_unref, av_read_frame,
3    avformat_seek_file, AVPacket, AVERROR, EAGAIN,
4    AV_PKT_FLAG_CORRUPT, AV_PKT_FLAG_KEY,
5};
6
7use std::iter::FusedIterator;
8
9use crate::core::context::AVFormatContextBox;
10use crate::core::stream_info::StreamInfo;
11use crate::error::{DemuxingError, OpenInputError, PacketScannerError, Result};
12
13/// Read-only metadata extracted from a single demuxed packet.
14///
15/// `PacketInfo` contains scalar values copied out of an `AVPacket`, so it has no
16/// lifetime ties to the scanner. It is cheap to clone and store.
17#[derive(Debug, Clone)]
18pub struct PacketInfo {
19    stream_index: usize,
20    pts: Option<i64>,
21    dts: Option<i64>,
22    duration: i64,
23    size: usize,
24    pos: i64,
25    is_keyframe: bool,
26    is_corrupt: bool,
27}
28
29impl PacketInfo {
30    /// The index of the stream this packet belongs to.
31    pub fn stream_index(&self) -> usize {
32        self.stream_index
33    }
34
35    /// Presentation timestamp in stream time-base units, if available.
36    pub fn pts(&self) -> Option<i64> {
37        self.pts
38    }
39
40    /// Decompression timestamp in stream time-base units, if available.
41    pub fn dts(&self) -> Option<i64> {
42        self.dts
43    }
44
45    /// Duration of this packet in stream time-base units.
46    pub fn duration(&self) -> i64 {
47        self.duration
48    }
49
50    /// Size of the packet data in bytes.
51    pub fn size(&self) -> usize {
52        self.size
53    }
54
55    /// Byte position of this packet in the input file, or -1 if unknown.
56    pub fn pos(&self) -> i64 {
57        self.pos
58    }
59
60    /// Whether this packet contains a keyframe.
61    pub fn is_keyframe(&self) -> bool {
62        self.is_keyframe
63    }
64
65    /// Whether this packet is flagged as corrupt.
66    pub fn is_corrupt(&self) -> bool {
67        self.is_corrupt
68    }
69}
70
71/// A stateful packet-level scanner for media files.
72///
73/// `PacketScanner` opens a media file (or URL) and iterates over demuxed packets
74/// without decoding. This is useful for inspecting packet metadata such as
75/// timestamps, keyframe flags, sizes, and stream indices.
76///
77/// # Example
78///
79/// ```rust,ignore
80/// use ez_ffmpeg::packet_scanner::PacketScanner;
81///
82/// let mut scanner = PacketScanner::open("test.mp4")?;
83/// for packet in scanner.packets() {
84///     let packet = packet?;
85///     println!(
86///         "stream={} pts={:?} size={} keyframe={}",
87///         packet.stream_index(),
88///         packet.pts(),
89///         packet.size(),
90///         packet.is_keyframe(),
91///     );
92/// }
93/// ```
94pub struct PacketScanner {
95    fmt_ctx_box: AVFormatContextBox,
96    pkt: *mut AVPacket,
97    streams: Vec<StreamInfo>,
98}
99
100// SAFETY: PacketScanner owns its AVFormatContext and AVPacket exclusively.
101// It is moved between threads, never shared. No thread-affine callbacks are registered.
102// This matches the safety reasoning of AVFormatContextBox's own `unsafe impl Send`.
103unsafe impl Send for PacketScanner {}
104
105impl PacketScanner {
106    /// Open a media file or URL for packet scanning.
107    ///
108    /// Stream information is extracted and cached at open time so that
109    /// [`streams`](Self::streams), [`video_stream`](Self::video_stream),
110    /// [`audio_stream`](Self::audio_stream), and
111    /// [`stream_for_packet`](Self::stream_for_packet) are available
112    /// immediately without additional I/O.
113    pub fn open(url: impl Into<String>) -> Result<Self> {
114        let fmt_ctx_box = crate::core::stream_info::init_format_context(url)?;
115        // SAFETY: fmt_ctx_box is fully initialized by init_format_context.
116        let streams = unsafe { crate::core::stream_info::extract_stream_infos(&fmt_ctx_box)? };
117
118        // SAFETY: av_packet_alloc returns a valid packet or null.
119        // Null is checked immediately; the packet is freed in Drop.
120        unsafe {
121            let pkt = av_packet_alloc();
122            if pkt.is_null() {
123                return Err(OpenInputError::OutOfMemory.into());
124            }
125
126            Ok(Self { fmt_ctx_box, pkt, streams })
127        }
128    }
129
130    /// Seek to a timestamp in microseconds.
131    ///
132    /// Seeks to the nearest keyframe before the given timestamp.
133    /// Can be called repeatedly for jump-reading patterns.
134    ///
135    /// On failure you may continue reading or attempt another seek, though
136    /// the exact read position is not guaranteed to be unchanged.
137    pub fn seek(&mut self, timestamp_us: i64) -> Result<()> {
138        // SAFETY: fmt_ctx is valid for the lifetime of self. avformat_seek_file
139        // accepts any timestamp and returns a negative value on failure.
140        unsafe {
141            let ret = avformat_seek_file(
142                self.fmt_ctx_box.fmt_ctx,
143                -1,
144                i64::MIN,
145                timestamp_us,
146                timestamp_us,
147                0,
148            );
149            if ret < 0 {
150                return Err(
151                    PacketScannerError::SeekError(DemuxingError::from(ret)).into()
152                );
153            }
154        }
155        Ok(())
156    }
157
158    /// Read the next packet's info. Returns `None` at EOF.
159    ///
160    /// If the underlying demuxer returns `EAGAIN` (common with network streams),
161    /// this method retries with a 10 ms sleep up to 500 times (~5 seconds).
162    /// After exhausting retries it returns an error.
163    pub fn next_packet(&mut self) -> Result<Option<PacketInfo>> {
164        const MAX_EAGAIN_RETRIES: u32 = 500;
165
166        // SAFETY: self.pkt is a valid, non-null AVPacket allocated in open().
167        // av_packet_unref resets the packet for reuse; av_read_frame fills it.
168        // We read only scalar fields from the filled packet.
169        unsafe {
170            av_packet_unref(self.pkt);
171
172            let mut eagain_retries: u32 = 0;
173            loop {
174                let ret = av_read_frame(self.fmt_ctx_box.fmt_ctx, self.pkt);
175                if ret == AVERROR(EAGAIN) {
176                    eagain_retries += 1;
177                    if eagain_retries > MAX_EAGAIN_RETRIES {
178                        return Err(
179                            PacketScannerError::ReadError(DemuxingError::from(ret)).into()
180                        );
181                    }
182                    std::thread::sleep(std::time::Duration::from_millis(10));
183                    continue;
184                }
185                if ret < 0 {
186                    if ret == ffmpeg_sys_next::AVERROR_EOF {
187                        return Ok(None);
188                    }
189                    return Err(
190                        PacketScannerError::ReadError(DemuxingError::from(ret)).into()
191                    );
192                }
193                break;
194            }
195
196            let pkt = &*self.pkt;
197            let pts = if pkt.pts == ffmpeg_sys_next::AV_NOPTS_VALUE {
198                None
199            } else {
200                Some(pkt.pts)
201            };
202            let dts = if pkt.dts == ffmpeg_sys_next::AV_NOPTS_VALUE {
203                None
204            } else {
205                Some(pkt.dts)
206            };
207
208            Ok(Some(PacketInfo {
209                stream_index: pkt.stream_index.max(0) as usize,
210                pts,
211                dts,
212                duration: pkt.duration,
213                size: pkt.size.max(0) as usize,
214                pos: pkt.pos,
215                is_keyframe: (pkt.flags & AV_PKT_FLAG_KEY) != 0,
216                is_corrupt: (pkt.flags & AV_PKT_FLAG_CORRUPT) != 0,
217            }))
218        }
219    }
220
221    /// Returns all stream information cached at open time.
222    pub fn streams(&self) -> &[StreamInfo] {
223        &self.streams
224    }
225
226    /// Returns the first video stream, if any.
227    pub fn video_stream(&self) -> Option<&StreamInfo> {
228        self.streams.iter().find(|s| s.is_video())
229    }
230
231    /// Returns the first audio stream, if any.
232    pub fn audio_stream(&self) -> Option<&StreamInfo> {
233        self.streams.iter().find(|s| s.is_audio())
234    }
235
236    /// Returns the stream information for the given packet, if the stream
237    /// index is within bounds.
238    pub fn stream_for_packet(&self, packet: &PacketInfo) -> Option<&StreamInfo> {
239        self.streams.get(packet.stream_index())
240    }
241
242    /// Returns an iterator for convenient `for packet in scanner.packets()` usage.
243    ///
244    /// Each call creates a fresh iterator, so you can `seek()` and then call
245    /// `packets()` again to iterate from the new position.
246    ///
247    /// The iterator is fused: once it yields `None` (EOF) or an `Err`, all
248    /// subsequent calls to `next()` return `None`.
249    pub fn packets(&mut self) -> PacketIter<'_> {
250        PacketIter { scanner: self, done: false }
251    }
252}
253
254impl Drop for PacketScanner {
255    fn drop(&mut self) {
256        // SAFETY: pkt was allocated by av_packet_alloc in open().
257        // av_packet_free handles null gracefully, but we check anyway.
258        unsafe {
259            if !self.pkt.is_null() {
260                av_packet_free(&mut self.pkt);
261            }
262        }
263        // AVFormatContextBox handles closing the format context
264    }
265}
266
267/// Iterator wrapper for [`PacketScanner`].
268///
269/// Yields `Result<PacketInfo>` for each packet until EOF or an error occurs.
270/// The iterator is fused: after returning `None` or `Err`, it always returns `None`.
271pub struct PacketIter<'a> {
272    scanner: &'a mut PacketScanner,
273    done: bool,
274}
275
276impl<'a> Iterator for PacketIter<'a> {
277    type Item = Result<PacketInfo>;
278
279    fn next(&mut self) -> Option<Self::Item> {
280        if self.done {
281            return None;
282        }
283        match self.scanner.next_packet() {
284            Ok(Some(info)) => Some(Ok(info)),
285            Ok(None) => {
286                self.done = true;
287                None
288            }
289            Err(e) => {
290                self.done = true;
291                Some(Err(e))
292            }
293        }
294    }
295}
296
297impl<'a> FusedIterator for PacketIter<'a> {}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_open_not_found() {
305        let result = PacketScanner::open("not_found.mp4");
306        assert!(result.is_err());
307    }
308
309    #[test]
310    fn test_scan_packets() {
311        let mut scanner = PacketScanner::open("test.mp4").unwrap();
312        let mut count = 0;
313        let mut keyframes = 0;
314        for packet in scanner.packets() {
315            let info = packet.unwrap();
316            count += 1;
317            if info.is_keyframe() {
318                keyframes += 1;
319            }
320        }
321        assert!(count > 0, "expected at least one packet");
322        assert!(keyframes > 0, "expected at least one keyframe");
323        println!("total packets: {}, keyframes: {}", count, keyframes);
324    }
325
326    #[test]
327    fn test_seek_and_read() {
328        let mut scanner = PacketScanner::open("test.mp4").unwrap();
329        // Seek to 1 second (1_000_000 microseconds)
330        scanner.seek(1_000_000).unwrap();
331        let packet = scanner.next_packet().unwrap();
332        assert!(packet.is_some(), "expected a packet after seeking");
333    }
334
335    #[test]
336    fn test_streams() {
337        let scanner = PacketScanner::open("test.mp4").unwrap();
338        let streams = scanner.streams();
339        assert!(!streams.is_empty(), "expected at least one stream");
340        assert_eq!(streams.len(), 2, "test.mp4 should have 2 streams (video + audio)");
341    }
342
343    #[test]
344    fn test_video_stream() {
345        let scanner = PacketScanner::open("test.mp4").unwrap();
346        let video = scanner.video_stream();
347        assert!(video.is_some(), "expected a video stream");
348        assert!(video.unwrap().is_video());
349    }
350
351    #[test]
352    fn test_audio_stream() {
353        let scanner = PacketScanner::open("test.mp4").unwrap();
354        let audio = scanner.audio_stream();
355        assert!(audio.is_some(), "expected an audio stream");
356        assert!(audio.unwrap().is_audio());
357    }
358
359    #[test]
360    fn test_stream_for_packet() {
361        let mut scanner = PacketScanner::open("test.mp4").unwrap();
362        let packet = scanner.next_packet().unwrap();
363        assert!(packet.is_some(), "expected at least one packet");
364        let info = packet.unwrap();
365        let stream = scanner.stream_for_packet(&info);
366        assert!(stream.is_some(), "stream_for_packet should return Some for valid packet");
367    }
368}