ez_ffmpeg/core/packet_scanner.rs
1use ffmpeg_sys_next::{
2 av_packet_alloc, av_packet_free, av_packet_unref, av_read_frame,
3 avformat_seek_file, AVPacket, AVERROR, EAGAIN,
4 AV_PKT_FLAG_CORRUPT, AV_PKT_FLAG_KEY,
5};
6
7use std::iter::FusedIterator;
8
9use crate::core::context::AVFormatContextBox;
10use crate::core::stream_info::StreamInfo;
11use crate::error::{DemuxingError, OpenInputError, PacketScannerError, Result};
12
13/// Read-only metadata extracted from a single demuxed packet.
14///
15/// `PacketInfo` contains scalar values copied out of an `AVPacket` together with
16/// stream-type flags looked up at read time, so it has no lifetime ties to the
17/// scanner. It is cheap to clone and store.
18///
19/// # Defensive fields
20///
21/// `stream_index` and `size` are clamped to non-negative values before storage.
22/// FFmpeg's internal asserts guarantee valid ranges in practice, so the clamping
23/// is purely defensive and not expected to trigger.
24#[derive(Debug, Clone)]
25pub struct PacketInfo {
26 stream_index: usize,
27 pts: Option<i64>,
28 dts: Option<i64>,
29 duration: i64,
30 size: usize,
31 pos: i64,
32 is_keyframe: bool,
33 is_corrupt: bool,
34 is_video: bool,
35 is_audio: bool,
36}
37
38impl PacketInfo {
39 /// The index of the stream this packet belongs to.
40 pub fn stream_index(&self) -> usize {
41 self.stream_index
42 }
43
44 /// Presentation timestamp in stream time-base units, if available.
45 pub fn pts(&self) -> Option<i64> {
46 self.pts
47 }
48
49 /// Decompression timestamp in stream time-base units, if available.
50 pub fn dts(&self) -> Option<i64> {
51 self.dts
52 }
53
54 /// Duration of this packet in stream time-base units.
55 pub fn duration(&self) -> i64 {
56 self.duration
57 }
58
59 /// Size of the packet data in bytes.
60 pub fn size(&self) -> usize {
61 self.size
62 }
63
64 /// Byte position of this packet in the input file, or -1 if unknown.
65 pub fn pos(&self) -> i64 {
66 self.pos
67 }
68
69 /// Whether this packet contains a keyframe.
70 pub fn is_keyframe(&self) -> bool {
71 self.is_keyframe
72 }
73
74 /// Whether this packet is flagged as corrupt.
75 pub fn is_corrupt(&self) -> bool {
76 self.is_corrupt
77 }
78
79 /// Whether this packet belongs to a video stream.
80 pub fn is_video(&self) -> bool {
81 self.is_video
82 }
83
84 /// Whether this packet belongs to an audio stream.
85 pub fn is_audio(&self) -> bool {
86 self.is_audio
87 }
88}
89
90/// A stateful packet-level scanner for media files.
91///
92/// `PacketScanner` opens a media file (or URL) and iterates over demuxed packets
93/// without decoding. This is useful for inspecting packet metadata such as
94/// timestamps, keyframe flags, sizes, and stream indices.
95///
96/// # Example
97///
98/// ```rust,ignore
99/// use ez_ffmpeg::packet_scanner::PacketScanner;
100///
101/// let mut scanner = PacketScanner::open("test.mp4")?;
102/// for packet in scanner.packets() {
103/// let packet = packet?;
104/// println!(
105/// "stream={} pts={:?} size={} keyframe={}",
106/// packet.stream_index(),
107/// packet.pts(),
108/// packet.size(),
109/// packet.is_keyframe(),
110/// );
111/// }
112/// ```
113pub struct PacketScanner {
114 fmt_ctx_box: AVFormatContextBox,
115 pkt: *mut AVPacket,
116 streams: Vec<StreamInfo>,
117}
118
119// SAFETY: PacketScanner owns its AVFormatContext and AVPacket exclusively.
120// It is moved between threads, never shared. No thread-affine callbacks are registered.
121// This is safe only because `open()` does not expose custom AVIO or interrupt callbacks.
122// If custom callbacks are added in the future, this impl must be re-evaluated.
123// This matches the safety reasoning of AVFormatContextBox's own `unsafe impl Send`.
124unsafe impl Send for PacketScanner {}
125
126impl PacketScanner {
127 /// Open a media file or URL for packet scanning.
128 ///
129 /// Stream information is extracted and cached at open time so that
130 /// [`streams`](Self::streams), [`video_stream`](Self::video_stream),
131 /// [`audio_stream`](Self::audio_stream), and
132 /// [`stream_for_packet`](Self::stream_for_packet) are available
133 /// immediately without additional I/O.
134 pub fn open(url: impl Into<String>) -> Result<Self> {
135 let fmt_ctx_box = crate::core::stream_info::init_format_context(url)?;
136 // SAFETY: fmt_ctx_box is fully initialized by init_format_context.
137 let streams = unsafe { crate::core::stream_info::extract_stream_infos(&fmt_ctx_box)? };
138
139 // SAFETY: av_packet_alloc returns a valid packet or null.
140 // Null is checked immediately; the packet is freed in Drop.
141 unsafe {
142 let pkt = av_packet_alloc();
143 if pkt.is_null() {
144 return Err(OpenInputError::OutOfMemory.into());
145 }
146
147 Ok(Self { fmt_ctx_box, pkt, streams })
148 }
149 }
150
151 /// Seek to a timestamp in microseconds.
152 ///
153 /// Seeks to the nearest keyframe before the given timestamp.
154 /// Can be called repeatedly for jump-reading patterns.
155 ///
156 /// On failure you may continue reading or attempt another seek, though
157 /// the exact read position is not guaranteed to be unchanged.
158 pub fn seek(&mut self, timestamp_us: i64) -> Result<()> {
159 // SAFETY: fmt_ctx is valid for the lifetime of self. avformat_seek_file
160 // accepts any timestamp and returns a negative value on failure.
161 unsafe {
162 let ret = avformat_seek_file(
163 self.fmt_ctx_box.fmt_ctx,
164 -1,
165 i64::MIN,
166 timestamp_us,
167 timestamp_us,
168 0,
169 );
170 if ret < 0 {
171 return Err(
172 PacketScannerError::SeekError(DemuxingError::from(ret)).into()
173 );
174 }
175 }
176 Ok(())
177 }
178
179 /// Read the next packet's info. Returns `None` at EOF.
180 ///
181 /// If the underlying demuxer returns `EAGAIN` (common with network streams),
182 /// this method retries with a 10 ms sleep up to 500 times (~5 seconds).
183 /// After exhausting retries it returns an error.
184 pub fn next_packet(&mut self) -> Result<Option<PacketInfo>> {
185 const MAX_EAGAIN_RETRIES: u32 = 500;
186
187 // SAFETY: self.pkt is a valid, non-null AVPacket allocated in open().
188 // av_packet_unref resets the packet for reuse; av_read_frame fills it.
189 // We read only scalar fields from the filled packet.
190 unsafe {
191 av_packet_unref(self.pkt);
192
193 let mut eagain_retries: u32 = 0;
194 loop {
195 let ret = av_read_frame(self.fmt_ctx_box.fmt_ctx, self.pkt);
196 if ret == AVERROR(EAGAIN) {
197 eagain_retries += 1;
198 if eagain_retries > MAX_EAGAIN_RETRIES {
199 return Err(
200 PacketScannerError::ReadError(DemuxingError::from(ret)).into()
201 );
202 }
203 std::thread::sleep(std::time::Duration::from_millis(10));
204 continue;
205 }
206 if ret < 0 {
207 if ret == ffmpeg_sys_next::AVERROR_EOF {
208 return Ok(None);
209 }
210 return Err(
211 PacketScannerError::ReadError(DemuxingError::from(ret)).into()
212 );
213 }
214 break;
215 }
216
217 let pkt = &*self.pkt;
218 let pts = if pkt.pts == ffmpeg_sys_next::AV_NOPTS_VALUE {
219 None
220 } else {
221 Some(pkt.pts)
222 };
223 let dts = if pkt.dts == ffmpeg_sys_next::AV_NOPTS_VALUE {
224 None
225 } else {
226 Some(pkt.dts)
227 };
228
229 // FFmpeg guarantees via av_assert0 in handle_new_packet() (demux.c:571)
230 // that stream_index is in [0, nb_streams). The .max(0) and .unwrap_or()
231 // are purely defensive and not expected to trigger in practice.
232 let stream_index = pkt.stream_index.max(0) as usize;
233 let (is_video, is_audio) = self.streams.get(stream_index)
234 .map(|s| (s.is_video(), s.is_audio()))
235 .unwrap_or((false, false));
236
237 Ok(Some(PacketInfo {
238 stream_index,
239 pts,
240 dts,
241 duration: pkt.duration,
242 // FFmpeg does not document negative size; clamp to 0 defensively.
243 size: { debug_assert!(pkt.size >= 0, "negative pkt.size: {}", pkt.size); pkt.size.max(0) as usize },
244 pos: pkt.pos,
245 is_keyframe: (pkt.flags & AV_PKT_FLAG_KEY) != 0,
246 is_corrupt: (pkt.flags & AV_PKT_FLAG_CORRUPT) != 0,
247 is_video,
248 is_audio,
249 }))
250 }
251 }
252
253 /// Returns all stream information cached at open time.
254 pub fn streams(&self) -> &[StreamInfo] {
255 &self.streams
256 }
257
258 /// Returns the first video stream, if any.
259 pub fn video_stream(&self) -> Option<&StreamInfo> {
260 self.streams.iter().find(|s| s.is_video())
261 }
262
263 /// Returns the first audio stream, if any.
264 pub fn audio_stream(&self) -> Option<&StreamInfo> {
265 self.streams.iter().find(|s| s.is_audio())
266 }
267
268 /// Returns the stream information for the given packet, if the stream
269 /// index is within bounds.
270 pub fn stream_for_packet(&self, packet: &PacketInfo) -> Option<&StreamInfo> {
271 self.streams.get(packet.stream_index())
272 }
273
274 /// Returns an iterator for convenient `for packet in scanner.packets()` usage.
275 ///
276 /// Each call creates a fresh iterator, so you can `seek()` and then call
277 /// `packets()` again to iterate from the new position.
278 ///
279 /// The iterator is fused: once it yields `None` (EOF) or an `Err`, all
280 /// subsequent calls to `next()` return `None`.
281 pub fn packets(&mut self) -> PacketIter<'_> {
282 PacketIter { scanner: self, done: false }
283 }
284}
285
286impl Drop for PacketScanner {
287 fn drop(&mut self) {
288 // SAFETY: pkt was allocated by av_packet_alloc in open().
289 // av_packet_free handles null gracefully, but we check anyway.
290 unsafe {
291 if !self.pkt.is_null() {
292 av_packet_free(&mut self.pkt);
293 }
294 }
295 // AVFormatContextBox handles closing the format context
296 }
297}
298
299/// Iterator wrapper for [`PacketScanner`].
300///
301/// Yields `Result<PacketInfo>` for each packet until EOF or an error occurs.
302/// The iterator is fused: after returning `None` or `Err`, it always returns `None`.
303pub struct PacketIter<'a> {
304 scanner: &'a mut PacketScanner,
305 done: bool,
306}
307
308impl<'a> Iterator for PacketIter<'a> {
309 type Item = Result<PacketInfo>;
310
311 fn next(&mut self) -> Option<Self::Item> {
312 if self.done {
313 return None;
314 }
315 match self.scanner.next_packet() {
316 Ok(Some(info)) => Some(Ok(info)),
317 Ok(None) => {
318 self.done = true;
319 None
320 }
321 Err(e) => {
322 self.done = true;
323 Some(Err(e))
324 }
325 }
326 }
327}
328
329impl<'a> FusedIterator for PacketIter<'a> {}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_open_not_found() {
337 let result = PacketScanner::open("not_found.mp4");
338 assert!(result.is_err());
339 }
340
341 #[test]
342 fn test_scan_packets() {
343 let mut scanner = PacketScanner::open("test.mp4").unwrap();
344 let mut count = 0;
345 let mut keyframes = 0;
346 for packet in scanner.packets() {
347 let info = packet.unwrap();
348 count += 1;
349 if info.is_keyframe() {
350 keyframes += 1;
351 }
352 }
353 assert!(count > 0, "expected at least one packet");
354 assert!(keyframes > 0, "expected at least one keyframe");
355 println!("total packets: {}, keyframes: {}", count, keyframes);
356 }
357
358 #[test]
359 fn test_seek_and_read() {
360 let mut scanner = PacketScanner::open("test.mp4").unwrap();
361 // Seek to 1 second (1_000_000 microseconds)
362 scanner.seek(1_000_000).unwrap();
363 let packet = scanner.next_packet().unwrap();
364 assert!(packet.is_some(), "expected a packet after seeking");
365 }
366
367 #[test]
368 fn test_streams() {
369 let scanner = PacketScanner::open("test.mp4").unwrap();
370 let streams = scanner.streams();
371 assert!(!streams.is_empty(), "expected at least one stream");
372 assert_eq!(streams.len(), 2, "test.mp4 should have 2 streams (video + audio)");
373 }
374
375 #[test]
376 fn test_video_stream() {
377 let scanner = PacketScanner::open("test.mp4").unwrap();
378 let video = scanner.video_stream();
379 assert!(video.is_some(), "expected a video stream");
380 assert!(video.unwrap().is_video());
381 }
382
383 #[test]
384 fn test_audio_stream() {
385 let scanner = PacketScanner::open("test.mp4").unwrap();
386 let audio = scanner.audio_stream();
387 assert!(audio.is_some(), "expected an audio stream");
388 assert!(audio.unwrap().is_audio());
389 }
390
391 #[test]
392 fn test_stream_for_packet() {
393 let mut scanner = PacketScanner::open("test.mp4").unwrap();
394 let packet = scanner.next_packet().unwrap();
395 assert!(packet.is_some(), "expected at least one packet");
396 let info = packet.unwrap();
397 let stream = scanner.stream_for_packet(&info);
398 assert!(stream.is_some(), "stream_for_packet should return Some for valid packet");
399 }
400}