Skip to main content

ez_ffmpeg/core/
packet_scanner.rs

1use ffmpeg_sys_next::{
2    av_packet_alloc, av_packet_free, av_packet_unref, av_read_frame,
3    avformat_seek_file, AVPacket, AVERROR, EAGAIN,
4    AV_PKT_FLAG_CORRUPT, AV_PKT_FLAG_KEY,
5};
6
7use std::iter::FusedIterator;
8
9use crate::core::context::AVFormatContextBox;
10use crate::core::stream_info::StreamInfo;
11use crate::error::{DemuxingError, OpenInputError, PacketScannerError, Result};
12
13/// Read-only metadata extracted from a single demuxed packet.
14///
15/// `PacketInfo` contains scalar values copied out of an `AVPacket` together with
16/// stream-type flags looked up at read time, so it has no lifetime ties to the
17/// scanner. It is cheap to clone and store.
18///
19/// # Defensive fields
20///
21/// `stream_index` and `size` are clamped to non-negative values before storage.
22/// FFmpeg's internal asserts guarantee valid ranges in practice, so the clamping
23/// is purely defensive and not expected to trigger.
24#[derive(Debug, Clone)]
25pub struct PacketInfo {
26    stream_index: usize,
27    pts: Option<i64>,
28    dts: Option<i64>,
29    duration: i64,
30    size: usize,
31    pos: i64,
32    is_keyframe: bool,
33    is_corrupt: bool,
34    is_video: bool,
35    is_audio: bool,
36}
37
38impl PacketInfo {
39    /// The index of the stream this packet belongs to.
40    pub fn stream_index(&self) -> usize {
41        self.stream_index
42    }
43
44    /// Presentation timestamp in stream time-base units, if available.
45    pub fn pts(&self) -> Option<i64> {
46        self.pts
47    }
48
49    /// Decompression timestamp in stream time-base units, if available.
50    pub fn dts(&self) -> Option<i64> {
51        self.dts
52    }
53
54    /// Duration of this packet in stream time-base units.
55    pub fn duration(&self) -> i64 {
56        self.duration
57    }
58
59    /// Size of the packet data in bytes.
60    pub fn size(&self) -> usize {
61        self.size
62    }
63
64    /// Byte position of this packet in the input file, or -1 if unknown.
65    pub fn pos(&self) -> i64 {
66        self.pos
67    }
68
69    /// Whether this packet contains a keyframe.
70    pub fn is_keyframe(&self) -> bool {
71        self.is_keyframe
72    }
73
74    /// Whether this packet is flagged as corrupt.
75    pub fn is_corrupt(&self) -> bool {
76        self.is_corrupt
77    }
78
79    /// Whether this packet belongs to a video stream.
80    pub fn is_video(&self) -> bool {
81        self.is_video
82    }
83
84    /// Whether this packet belongs to an audio stream.
85    pub fn is_audio(&self) -> bool {
86        self.is_audio
87    }
88}
89
90/// A stateful packet-level scanner for media files.
91///
92/// `PacketScanner` opens a media file (or URL) and iterates over demuxed packets
93/// without decoding. This is useful for inspecting packet metadata such as
94/// timestamps, keyframe flags, sizes, and stream indices.
95///
96/// # Example
97///
98/// ```rust,ignore
99/// use ez_ffmpeg::packet_scanner::PacketScanner;
100///
101/// let mut scanner = PacketScanner::open("test.mp4")?;
102/// for packet in scanner.packets() {
103///     let packet = packet?;
104///     println!(
105///         "stream={} pts={:?} size={} keyframe={}",
106///         packet.stream_index(),
107///         packet.pts(),
108///         packet.size(),
109///         packet.is_keyframe(),
110///     );
111/// }
112/// ```
113pub struct PacketScanner {
114    fmt_ctx_box: AVFormatContextBox,
115    pkt: *mut AVPacket,
116    streams: Vec<StreamInfo>,
117}
118
119// SAFETY: PacketScanner owns its AVFormatContext and AVPacket exclusively.
120// It is moved between threads, never shared. No thread-affine callbacks are registered.
121// This is safe only because `open()` does not expose custom AVIO or interrupt callbacks.
122// If custom callbacks are added in the future, this impl must be re-evaluated.
123// This matches the safety reasoning of AVFormatContextBox's own `unsafe impl Send`.
124unsafe impl Send for PacketScanner {}
125
126impl PacketScanner {
127    /// Open a media file or URL for packet scanning.
128    ///
129    /// Stream information is extracted and cached at open time so that
130    /// [`streams`](Self::streams), [`video_stream`](Self::video_stream),
131    /// [`audio_stream`](Self::audio_stream), and
132    /// [`stream_for_packet`](Self::stream_for_packet) are available
133    /// immediately without additional I/O.
134    pub fn open(url: impl Into<String>) -> Result<Self> {
135        let fmt_ctx_box = crate::core::stream_info::init_format_context(url)?;
136        // SAFETY: fmt_ctx_box is fully initialized by init_format_context.
137        let streams = unsafe { crate::core::stream_info::extract_stream_infos(&fmt_ctx_box)? };
138
139        // SAFETY: av_packet_alloc returns a valid packet or null.
140        // Null is checked immediately; the packet is freed in Drop.
141        unsafe {
142            let pkt = av_packet_alloc();
143            if pkt.is_null() {
144                return Err(OpenInputError::OutOfMemory.into());
145            }
146
147            Ok(Self { fmt_ctx_box, pkt, streams })
148        }
149    }
150
151    /// Seek to a timestamp in microseconds.
152    ///
153    /// Seeks to the nearest keyframe before the given timestamp.
154    /// Can be called repeatedly for jump-reading patterns.
155    ///
156    /// On failure you may continue reading or attempt another seek, though
157    /// the exact read position is not guaranteed to be unchanged.
158    pub fn seek(&mut self, timestamp_us: i64) -> Result<()> {
159        // SAFETY: fmt_ctx is valid for the lifetime of self. avformat_seek_file
160        // accepts any timestamp and returns a negative value on failure.
161        unsafe {
162            let ret = avformat_seek_file(
163                self.fmt_ctx_box.fmt_ctx,
164                -1,
165                i64::MIN,
166                timestamp_us,
167                timestamp_us,
168                0,
169            );
170            if ret < 0 {
171                return Err(
172                    PacketScannerError::SeekError(DemuxingError::from(ret)).into()
173                );
174            }
175        }
176        Ok(())
177    }
178
179    /// Read the next packet's info. Returns `None` at EOF.
180    ///
181    /// If the underlying demuxer returns `EAGAIN` (common with network streams),
182    /// this method retries with a 10 ms sleep up to 500 times (~5 seconds).
183    /// After exhausting retries it returns an error.
184    pub fn next_packet(&mut self) -> Result<Option<PacketInfo>> {
185        const MAX_EAGAIN_RETRIES: u32 = 500;
186
187        // SAFETY: self.pkt is a valid, non-null AVPacket allocated in open().
188        // av_packet_unref resets the packet for reuse; av_read_frame fills it.
189        // We read only scalar fields from the filled packet.
190        unsafe {
191            av_packet_unref(self.pkt);
192
193            let mut eagain_retries: u32 = 0;
194            loop {
195                let ret = av_read_frame(self.fmt_ctx_box.fmt_ctx, self.pkt);
196                if ret == AVERROR(EAGAIN) {
197                    eagain_retries += 1;
198                    if eagain_retries > MAX_EAGAIN_RETRIES {
199                        return Err(
200                            PacketScannerError::ReadError(DemuxingError::from(ret)).into()
201                        );
202                    }
203                    std::thread::sleep(std::time::Duration::from_millis(10));
204                    continue;
205                }
206                if ret < 0 {
207                    if ret == ffmpeg_sys_next::AVERROR_EOF {
208                        return Ok(None);
209                    }
210                    return Err(
211                        PacketScannerError::ReadError(DemuxingError::from(ret)).into()
212                    );
213                }
214                break;
215            }
216
217            let pkt = &*self.pkt;
218            let pts = if pkt.pts == ffmpeg_sys_next::AV_NOPTS_VALUE {
219                None
220            } else {
221                Some(pkt.pts)
222            };
223            let dts = if pkt.dts == ffmpeg_sys_next::AV_NOPTS_VALUE {
224                None
225            } else {
226                Some(pkt.dts)
227            };
228
229            // FFmpeg guarantees via av_assert0 in handle_new_packet() (demux.c:571)
230            // that stream_index is in [0, nb_streams). The .max(0) and .unwrap_or()
231            // are purely defensive and not expected to trigger in practice.
232            let stream_index = pkt.stream_index.max(0) as usize;
233            let (is_video, is_audio) = self.streams.get(stream_index)
234                .map(|s| (s.is_video(), s.is_audio()))
235                .unwrap_or((false, false));
236
237            Ok(Some(PacketInfo {
238                stream_index,
239                pts,
240                dts,
241                duration: pkt.duration,
242                // FFmpeg does not document negative size; clamp to 0 defensively.
243                size: { debug_assert!(pkt.size >= 0, "negative pkt.size: {}", pkt.size); pkt.size.max(0) as usize },
244                pos: pkt.pos,
245                is_keyframe: (pkt.flags & AV_PKT_FLAG_KEY) != 0,
246                is_corrupt: (pkt.flags & AV_PKT_FLAG_CORRUPT) != 0,
247                is_video,
248                is_audio,
249            }))
250        }
251    }
252
253    /// Returns all stream information cached at open time.
254    pub fn streams(&self) -> &[StreamInfo] {
255        &self.streams
256    }
257
258    /// Returns the first video stream, if any.
259    pub fn video_stream(&self) -> Option<&StreamInfo> {
260        self.streams.iter().find(|s| s.is_video())
261    }
262
263    /// Returns the first audio stream, if any.
264    pub fn audio_stream(&self) -> Option<&StreamInfo> {
265        self.streams.iter().find(|s| s.is_audio())
266    }
267
268    /// Returns the stream information for the given packet, if the stream
269    /// index is within bounds.
270    pub fn stream_for_packet(&self, packet: &PacketInfo) -> Option<&StreamInfo> {
271        self.streams.get(packet.stream_index())
272    }
273
274    /// Returns an iterator for convenient `for packet in scanner.packets()` usage.
275    ///
276    /// Each call creates a fresh iterator, so you can `seek()` and then call
277    /// `packets()` again to iterate from the new position.
278    ///
279    /// The iterator is fused: once it yields `None` (EOF) or an `Err`, all
280    /// subsequent calls to `next()` return `None`.
281    pub fn packets(&mut self) -> PacketIter<'_> {
282        PacketIter { scanner: self, done: false }
283    }
284}
285
286impl Drop for PacketScanner {
287    fn drop(&mut self) {
288        // SAFETY: pkt was allocated by av_packet_alloc in open().
289        // av_packet_free handles null gracefully, but we check anyway.
290        unsafe {
291            if !self.pkt.is_null() {
292                av_packet_free(&mut self.pkt);
293            }
294        }
295        // AVFormatContextBox handles closing the format context
296    }
297}
298
299/// Iterator wrapper for [`PacketScanner`].
300///
301/// Yields `Result<PacketInfo>` for each packet until EOF or an error occurs.
302/// The iterator is fused: after returning `None` or `Err`, it always returns `None`.
303pub struct PacketIter<'a> {
304    scanner: &'a mut PacketScanner,
305    done: bool,
306}
307
308impl<'a> Iterator for PacketIter<'a> {
309    type Item = Result<PacketInfo>;
310
311    fn next(&mut self) -> Option<Self::Item> {
312        if self.done {
313            return None;
314        }
315        match self.scanner.next_packet() {
316            Ok(Some(info)) => Some(Ok(info)),
317            Ok(None) => {
318                self.done = true;
319                None
320            }
321            Err(e) => {
322                self.done = true;
323                Some(Err(e))
324            }
325        }
326    }
327}
328
329impl<'a> FusedIterator for PacketIter<'a> {}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn test_open_not_found() {
337        let result = PacketScanner::open("not_found.mp4");
338        assert!(result.is_err());
339    }
340
341    #[test]
342    fn test_scan_packets() {
343        let mut scanner = PacketScanner::open("test.mp4").unwrap();
344        let mut count = 0;
345        let mut keyframes = 0;
346        for packet in scanner.packets() {
347            let info = packet.unwrap();
348            count += 1;
349            if info.is_keyframe() {
350                keyframes += 1;
351            }
352        }
353        assert!(count > 0, "expected at least one packet");
354        assert!(keyframes > 0, "expected at least one keyframe");
355        println!("total packets: {}, keyframes: {}", count, keyframes);
356    }
357
358    #[test]
359    fn test_seek_and_read() {
360        let mut scanner = PacketScanner::open("test.mp4").unwrap();
361        // Seek to 1 second (1_000_000 microseconds)
362        scanner.seek(1_000_000).unwrap();
363        let packet = scanner.next_packet().unwrap();
364        assert!(packet.is_some(), "expected a packet after seeking");
365    }
366
367    #[test]
368    fn test_streams() {
369        let scanner = PacketScanner::open("test.mp4").unwrap();
370        let streams = scanner.streams();
371        assert!(!streams.is_empty(), "expected at least one stream");
372        assert_eq!(streams.len(), 2, "test.mp4 should have 2 streams (video + audio)");
373    }
374
375    #[test]
376    fn test_video_stream() {
377        let scanner = PacketScanner::open("test.mp4").unwrap();
378        let video = scanner.video_stream();
379        assert!(video.is_some(), "expected a video stream");
380        assert!(video.unwrap().is_video());
381    }
382
383    #[test]
384    fn test_audio_stream() {
385        let scanner = PacketScanner::open("test.mp4").unwrap();
386        let audio = scanner.audio_stream();
387        assert!(audio.is_some(), "expected an audio stream");
388        assert!(audio.unwrap().is_audio());
389    }
390
391    #[test]
392    fn test_stream_for_packet() {
393        let mut scanner = PacketScanner::open("test.mp4").unwrap();
394        let packet = scanner.next_packet().unwrap();
395        assert!(packet.is_some(), "expected at least one packet");
396        let info = packet.unwrap();
397        let stream = scanner.stream_for_packet(&info);
398        assert!(stream.is_some(), "stream_for_packet should return Some for valid packet");
399    }
400}