Skip to main content

llm_multimodal/
media.rs

1use std::{
2    collections::HashSet,
3    io::Write,
4    path::PathBuf,
5    process::{Output, Stdio},
6    sync::Arc,
7    time::{Duration, Instant},
8};
9
10use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine};
11use bytes::Bytes;
12#[cfg(feature = "opencv-video")]
13use opencv::{core::Mat, imgproc, prelude::*, videoio};
14use reqwest::Client;
15use tokio::{fs, process::Command, task, time};
16use tracing::info;
17use url::Url;
18
19const DEFAULT_VIDEO_PROCESS_TIMEOUT: Duration = Duration::from_secs(30);
20const DEFAULT_VIDEO_MAX_DECODED_BYTES: usize = 1024 * 1024 * 1024;
21
22use super::{
23    error::MediaConnectorError,
24    types::{
25        DecodedRgbFrame, DecodedRgbVideo, ImageDetail, ImageFrame, ImageSource, VideoClip,
26        VideoSource,
27    },
28};
29
30#[derive(Clone)]
31pub struct MediaConnectorConfig {
32    pub allowed_domains: Option<Vec<String>>,
33    pub allowed_local_media_path: Option<PathBuf>,
34    pub fetch_timeout: Duration,
35}
36
37impl Default for MediaConnectorConfig {
38    fn default() -> Self {
39        Self {
40            allowed_domains: None,
41            allowed_local_media_path: None,
42            fetch_timeout: Duration::from_secs(10),
43        }
44    }
45}
46
47#[derive(Clone, Copy, Debug)]
48pub struct ImageFetchConfig {
49    pub detail: ImageDetail,
50}
51
52impl Default for ImageFetchConfig {
53    fn default() -> Self {
54        Self {
55            detail: ImageDetail::Auto,
56        }
57    }
58}
59
60#[derive(Clone, Copy, Debug)]
61pub struct VideoFetchConfig {
62    pub min_frames: usize,
63    pub max_frames: usize,
64    pub sample_fps: f32,
65}
66
67impl Default for VideoFetchConfig {
68    fn default() -> Self {
69        Self {
70            min_frames: 4,
71            max_frames: 768,
72            sample_fps: 2.0,
73        }
74    }
75}
76
77#[derive(Debug, Clone)]
78pub enum MediaSource {
79    Url(String),
80    DataUrl(String),
81    InlineBytes(Vec<u8>),
82    File(PathBuf),
83}
84
85#[derive(Clone)]
86pub struct MediaConnector {
87    client: Client,
88    allowed_domains: Option<HashSet<String>>,
89    allowed_local_media_path: Option<PathBuf>,
90    fetch_timeout: Duration,
91}
92
93impl MediaConnector {
94    pub fn new(client: Client, config: MediaConnectorConfig) -> Result<Self, MediaConnectorError> {
95        let allowed_domains = config.allowed_domains.map(|domains| {
96            domains
97                .into_iter()
98                .map(|d| d.to_ascii_lowercase())
99                .collect::<HashSet<_>>()
100        });
101
102        let allowed_local_media_path = if let Some(path) = config.allowed_local_media_path {
103            Some(std::fs::canonicalize(path)?)
104        } else {
105            None
106        };
107
108        Ok(Self {
109            client,
110            allowed_domains,
111            allowed_local_media_path,
112            fetch_timeout: config.fetch_timeout,
113        })
114    }
115
116    pub async fn fetch_image(
117        &self,
118        source: MediaSource,
119        cfg: ImageFetchConfig,
120    ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
121        match source {
122            MediaSource::Url(url) => self.fetch_http_image(url, cfg).await,
123            MediaSource::DataUrl(data_url) => self.fetch_data_url(data_url, cfg).await,
124            MediaSource::InlineBytes(bytes) => {
125                self.decode_image(bytes.into(), cfg.detail, ImageSource::InlineBytes)
126                    .await
127            }
128            MediaSource::File(path) => self.fetch_file(path, cfg).await,
129        }
130    }
131
132    pub async fn fetch_video(
133        &self,
134        source: MediaSource,
135        cfg: VideoFetchConfig,
136    ) -> Result<Arc<VideoClip>, MediaConnectorError> {
137        // TODO: add a configurable max-video-bytes guard before fully buffering
138        // URL/data/file/inline payloads. VideoClip retains the original bytes,
139        // so oversized inputs should be rejected before decode.
140        match source {
141            MediaSource::Url(url) => self.fetch_http_video(url, cfg).await,
142            MediaSource::DataUrl(data_url) => self.fetch_video_data_url(data_url, cfg).await,
143            MediaSource::InlineBytes(bytes) => {
144                self.decode_video(bytes.into(), cfg, VideoSource::InlineBytes)
145                    .await
146            }
147            MediaSource::File(path) => self.fetch_video_file(path, cfg).await,
148        }
149    }
150
151    async fn fetch_http_image(
152        &self,
153        url: String,
154        cfg: ImageFetchConfig,
155    ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
156        let parsed = Url::parse(&url).map_err(|_| MediaConnectorError::InvalidUrl(url.clone()))?;
157        self.ensure_domain_allowed(&parsed)?;
158
159        let mut req = self.client.get(parsed.as_str());
160        if self.fetch_timeout > Duration::ZERO {
161            req = req.timeout(self.fetch_timeout);
162        }
163
164        let resp = req.send().await.map_err(|err| {
165            if err.is_timeout() {
166                MediaConnectorError::Timeout(self.fetch_timeout)
167            } else {
168                MediaConnectorError::Http(err)
169            }
170        })?;
171
172        let resp = resp.error_for_status()?;
173        let bytes = resp.bytes().await?;
174        self.decode_image(
175            bytes,
176            cfg.detail,
177            ImageSource::Url {
178                url: parsed.to_string(),
179            },
180        )
181        .await
182    }
183
184    async fn fetch_data_url(
185        &self,
186        data_url: String,
187        cfg: ImageFetchConfig,
188    ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
189        let (metadata, data) = data_url
190            .split_once(',')
191            .ok_or_else(|| MediaConnectorError::DataUrl("missing comma in data url".into()))?;
192
193        if !metadata.ends_with(";base64") {
194            return Err(MediaConnectorError::DataUrl(
195                "only base64 encoded data URLs are supported".into(),
196            ));
197        }
198
199        let data = data.trim();
200        let decoded = BASE64_STANDARD.decode(data)?;
201        self.decode_image(decoded.into(), cfg.detail, ImageSource::DataUrl)
202            .await
203    }
204
205    async fn fetch_video_data_url(
206        &self,
207        data_url: String,
208        cfg: VideoFetchConfig,
209    ) -> Result<Arc<VideoClip>, MediaConnectorError> {
210        let (metadata, data) = data_url
211            .split_once(',')
212            .ok_or_else(|| MediaConnectorError::DataUrl("missing comma in data url".into()))?;
213
214        if !metadata.ends_with(";base64") {
215            return Err(MediaConnectorError::DataUrl(
216                "only base64 encoded data URLs are supported".into(),
217            ));
218        }
219
220        let data = data.trim();
221        let decoded = BASE64_STANDARD.decode(data)?;
222        self.decode_video(decoded.into(), cfg, VideoSource::DataUrl)
223            .await
224    }
225
226    async fn fetch_file(
227        &self,
228        path: PathBuf,
229        cfg: ImageFetchConfig,
230    ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
231        let allowed_root = self
232            .allowed_local_media_path
233            .as_ref()
234            .ok_or_else(|| MediaConnectorError::DisallowedLocalPath(path.display().to_string()))?;
235
236        let canonical = fs::canonicalize(&path).await?;
237        if !canonical.starts_with(allowed_root) {
238            return Err(MediaConnectorError::DisallowedLocalPath(
239                path.display().to_string(),
240            ));
241        }
242
243        let bytes = fs::read(&canonical).await?;
244        self.decode_image(
245            bytes.into(),
246            cfg.detail,
247            ImageSource::File { path: canonical },
248        )
249        .await
250    }
251
252    async fn fetch_http_video(
253        &self,
254        url: String,
255        cfg: VideoFetchConfig,
256    ) -> Result<Arc<VideoClip>, MediaConnectorError> {
257        let parsed = Url::parse(&url).map_err(|_| MediaConnectorError::InvalidUrl(url.clone()))?;
258        self.ensure_domain_allowed(&parsed)?;
259
260        let mut req = self.client.get(parsed.as_str());
261        if self.fetch_timeout > Duration::ZERO {
262            req = req.timeout(self.fetch_timeout);
263        }
264
265        let resp = req.send().await.map_err(|err| {
266            if err.is_timeout() {
267                MediaConnectorError::Timeout(self.fetch_timeout)
268            } else {
269                MediaConnectorError::Http(err)
270            }
271        })?;
272
273        let resp = resp.error_for_status()?;
274        let bytes = resp.bytes().await?;
275        self.decode_video(
276            bytes,
277            cfg,
278            VideoSource::Url {
279                url: parsed.to_string(),
280            },
281        )
282        .await
283    }
284
285    async fn fetch_video_file(
286        &self,
287        path: PathBuf,
288        cfg: VideoFetchConfig,
289    ) -> Result<Arc<VideoClip>, MediaConnectorError> {
290        let allowed_root = self
291            .allowed_local_media_path
292            .as_ref()
293            .ok_or_else(|| MediaConnectorError::DisallowedLocalPath(path.display().to_string()))?;
294
295        let canonical = fs::canonicalize(&path).await?;
296        if !canonical.starts_with(allowed_root) {
297            return Err(MediaConnectorError::DisallowedLocalPath(
298                path.display().to_string(),
299            ));
300        }
301
302        let bytes = fs::read(&canonical).await?;
303        self.decode_video(bytes.into(), cfg, VideoSource::File { path: canonical })
304            .await
305    }
306
307    fn ensure_domain_allowed(&self, url: &Url) -> Result<(), MediaConnectorError> {
308        if let Some(allowed) = &self.allowed_domains {
309            let host = url
310                .host_str()
311                .map(|h| h.to_ascii_lowercase())
312                .ok_or_else(|| MediaConnectorError::InvalidUrl(url.to_string()))?;
313            if !allowed.contains(&host) {
314                return Err(MediaConnectorError::DisallowedDomain(host));
315            }
316        }
317        Ok(())
318    }
319
320    async fn decode_image(
321        &self,
322        bytes: Bytes,
323        detail: ImageDetail,
324        source: ImageSource,
325    ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
326        let hash = crate::hasher::hash_image(&bytes);
327
328        let cursor = std::io::Cursor::new(bytes.clone());
329        let reader = image::ImageReader::new(cursor).with_guessed_format()?;
330
331        let image = task::spawn_blocking(move || reader.decode())
332            .await
333            .map_err(MediaConnectorError::Blocking)??;
334
335        Ok(Arc::new(ImageFrame::new(
336            image, bytes, detail, source, hash,
337        )))
338    }
339
340    async fn decode_video(
341        &self,
342        bytes: Bytes,
343        cfg: VideoFetchConfig,
344        source: VideoSource,
345    ) -> Result<Arc<VideoClip>, MediaConnectorError> {
346        if cfg.max_frames == 0 {
347            return Err(MediaConnectorError::VideoDecode(
348                "max_frames must be greater than 0".to_string(),
349            ));
350        }
351        if cfg.min_frames == 0 {
352            return Err(MediaConnectorError::VideoDecode(
353                "min_frames must be greater than 0".to_string(),
354            ));
355        }
356        if cfg.min_frames > cfg.max_frames {
357            return Err(MediaConnectorError::VideoDecode(
358                "min_frames must be less than or equal to max_frames".to_string(),
359            ));
360        }
361        if cfg.sample_fps <= 0.0 {
362            return Err(MediaConnectorError::VideoDecode(
363                "sample_fps must be greater than 0".to_string(),
364            ));
365        }
366
367        let hash = crate::hasher::hash_video(&bytes);
368        let decoded = decode_video_frames(bytes.clone(), cfg).await?;
369
370        let clip = match decoded {
371            DecodedVideoFrames::Images(frames) => VideoClip::new(frames, bytes, source, hash),
372            DecodedVideoFrames::Rgb(rgb_video) => {
373                VideoClip::new_rgb(rgb_video, bytes, source, hash)
374            }
375        };
376        Ok(Arc::new(clip))
377    }
378}
379
380enum DecodedVideoFrames {
381    Images(Vec<image::DynamicImage>),
382    Rgb(DecodedRgbVideo),
383}
384
385async fn decode_video_frames(
386    bytes: Bytes,
387    cfg: VideoFetchConfig,
388) -> Result<DecodedVideoFrames, MediaConnectorError> {
389    let input_bytes = bytes.len();
390    let input_file = {
391        let bytes = bytes.clone();
392        task::spawn_blocking(move || write_temp_video_file(&bytes))
393            .await
394            .map_err(MediaConnectorError::Blocking)??
395    };
396    let input_path = input_file.path().to_path_buf();
397    match video_decode_backend_override().as_deref() {
398        Some("ffmpeg") => decode_video_with_ffmpeg(&input_path, input_bytes, cfg).await,
399        Some("opencv") => {
400            #[cfg(feature = "opencv-video")]
401            {
402                let input_path = input_path.clone();
403                task::spawn_blocking(move || {
404                    decode_video_with_opencv_logged(&input_path, input_bytes, cfg)
405                })
406                .await
407                .map_err(MediaConnectorError::Blocking)?
408            }
409            #[cfg(not(feature = "opencv-video"))]
410            {
411                Err(MediaConnectorError::VideoDecode(
412                    "SMG_VIDEO_DECODE_BACKEND=opencv requires the opencv-video feature".to_string(),
413                ))
414            }
415        }
416        Some(backend) => Err(MediaConnectorError::VideoDecode(format!(
417            "unsupported SMG_VIDEO_DECODE_BACKEND={backend}; expected auto, opencv, or ffmpeg"
418        ))),
419        None => {
420            #[cfg(feature = "opencv-video")]
421            {
422                // OpenCV samples by frame index while the FFmpeg fallback uses an
423                // fps filter, so the fallback can select a different frame set.
424                let opencv_input_path = input_path.clone();
425                let opencv_result = task::spawn_blocking(move || {
426                    decode_video_with_opencv_logged(&opencv_input_path, input_bytes, cfg)
427                })
428                .await
429                .map_err(MediaConnectorError::Blocking)?;
430
431                match opencv_result {
432                    Ok(frames) => Ok(frames),
433                    Err(opencv_error) => {
434                        if log_video_decode_timing_enabled() {
435                            info!(
436                                error = %opencv_error,
437                                "smg_mm_timing video_decode_auto_opencv_fallback"
438                            );
439                        }
440
441                        match decode_video_with_ffmpeg(&input_path, input_bytes, cfg).await {
442                            Ok(frames) => Ok(frames),
443                            Err(ffmpeg_error) => Err(MediaConnectorError::VideoDecode(format!(
444                                "OpenCV decode failed: {opencv_error}; ffmpeg fallback failed: {ffmpeg_error}"
445                            ))),
446                        }
447                    }
448                }
449            }
450
451            #[cfg(not(feature = "opencv-video"))]
452            {
453                decode_video_with_ffmpeg(&input_path, input_bytes, cfg).await
454            }
455        }
456    }
457}
458
459#[cfg(feature = "opencv-video")]
460fn decode_video_with_opencv_logged(
461    input_path: &std::path::Path,
462    input_bytes: usize,
463    cfg: VideoFetchConfig,
464) -> Result<DecodedVideoFrames, MediaConnectorError> {
465    let started = Instant::now();
466    let result = decode_video_with_opencv_file(input_path, cfg);
467    match &result {
468        Ok(_) => log_video_decode_backend_timing("opencv", started, input_bytes, cfg, None),
469        Err(error) => {
470            log_video_decode_backend_timing("opencv", started, input_bytes, cfg, Some(error));
471        }
472    }
473    result
474}
475
476fn video_decode_backend_override() -> Option<String> {
477    let backend = std::env::var("SMG_VIDEO_DECODE_BACKEND")
478        .ok()?
479        .trim()
480        .to_ascii_lowercase();
481    match backend.as_str() {
482        "" | "auto" => None,
483        _ => Some(backend),
484    }
485}
486
487fn log_video_decode_timing_enabled() -> bool {
488    std::env::var("SMG_LOG_MM_TIMING")
489        .map(|value| {
490            matches!(
491                value.trim().to_ascii_lowercase().as_str(),
492                "1" | "true" | "yes" | "on"
493            )
494        })
495        .unwrap_or(false)
496}
497
498fn log_video_decode_backend_timing(
499    backend: &str,
500    started: Instant,
501    input_bytes: usize,
502    cfg: VideoFetchConfig,
503    error: Option<&MediaConnectorError>,
504) {
505    if !log_video_decode_timing_enabled() {
506        return;
507    }
508    let elapsed_ms = started.elapsed().as_secs_f64() * 1000.0;
509    match error {
510        Some(error) => info!(
511            backend,
512            ok = false,
513            input_bytes,
514            min_frames = cfg.min_frames,
515            max_frames = cfg.max_frames,
516            sample_fps = cfg.sample_fps,
517            elapsed_ms,
518            error = %error,
519            "smg_mm_timing video_decode_backend"
520        ),
521        None => info!(
522            backend,
523            ok = true,
524            input_bytes,
525            min_frames = cfg.min_frames,
526            max_frames = cfg.max_frames,
527            sample_fps = cfg.sample_fps,
528            elapsed_ms,
529            "smg_mm_timing video_decode_backend"
530        ),
531    }
532}
533
534#[cfg(feature = "opencv-video")]
535fn decode_video_with_opencv_file(
536    input_path: &std::path::Path,
537    cfg: VideoFetchConfig,
538) -> Result<DecodedVideoFrames, MediaConnectorError> {
539    let input = input_path.to_str().ok_or_else(|| {
540        MediaConnectorError::VideoDecode(format!(
541            "OpenCV video path is not valid UTF-8: {}",
542            input_path.display()
543        ))
544    })?;
545
546    let mut capture = open_opencv_video_capture(input)?;
547
548    let total_frames = capture
549        .get(videoio::CAP_PROP_FRAME_COUNT)
550        .map_err(opencv_decode_error)?
551        .round()
552        .max(0.0) as usize;
553    if total_frames == 0 {
554        return Err(MediaConnectorError::VideoDecode(
555            "OpenCV reported zero video frames".to_string(),
556        ));
557    }
558
559    let fps = capture
560        .get(videoio::CAP_PROP_FPS)
561        .map_err(opencv_decode_error)?;
562    let frame_indices = opencv_frame_indices(total_frames, fps, cfg);
563    if frame_indices.is_empty() {
564        return Err(MediaConnectorError::VideoDecode(
565            "OpenCV video sampling produced no frame indices".to_string(),
566        ));
567    }
568
569    let sampled_frame_counts = counted_frame_indices(&frame_indices);
570    let mut data = Vec::new();
571    let mut frames = Vec::new();
572    frames.try_reserve(frame_indices.len()).map_err(|e| {
573        MediaConnectorError::VideoDecode(format!(
574            "failed to reserve {} decoded video frame records: {e}",
575            frame_indices.len()
576        ))
577    })?;
578    let mut bgr_frame = Mat::default();
579    let mut rgb_frame = Mat::default();
580
581    let timeout = video_process_timeout();
582    let started = Instant::now();
583    // Seek directly to sampled frames instead of scanning every intervening
584    // frame, which can be prohibitively slow for long clips.
585    for (idx, repeat_count) in sampled_frame_counts {
586        if started.elapsed() >= timeout {
587            return Err(MediaConnectorError::VideoDecode(format!(
588                "OpenCV timed out after {:.3} seconds",
589                timeout.as_secs_f64()
590            )));
591        }
592
593        if !capture
594            .set(videoio::CAP_PROP_POS_FRAMES, idx as f64)
595            .map_err(opencv_decode_error)?
596        {
597            return Err(MediaConnectorError::VideoDecode(format!(
598                "OpenCV could not seek to sampled frame {idx}"
599            )));
600        }
601
602        if !capture.read(&mut bgr_frame).map_err(opencv_decode_error)? || bgr_frame.empty() {
603            continue;
604        }
605
606        imgproc::cvt_color_def(&bgr_frame, &mut rgb_frame, imgproc::COLOR_BGR2RGB)
607            .map_err(opencv_decode_error)?;
608
609        let decoded_width = u32::try_from(rgb_frame.cols()).map_err(|_| {
610            MediaConnectorError::VideoDecode(format!(
611                "OpenCV produced invalid RGB frame width: {}",
612                rgb_frame.cols()
613            ))
614        })?;
615        let decoded_height = u32::try_from(rgb_frame.rows()).map_err(|_| {
616            MediaConnectorError::VideoDecode(format!(
617                "OpenCV produced invalid RGB frame height: {}",
618                rgb_frame.rows()
619            ))
620        })?;
621        let frame_size = rawvideo_frame_size(decoded_width, decoded_height)?;
622        let rgb_bytes = rgb_frame.data_bytes().map_err(opencv_decode_error)?;
623        if rgb_bytes.len() < frame_size {
624            return Err(MediaConnectorError::VideoDecode(format!(
625                "OpenCV produced {} RGB bytes for {decoded_width}x{decoded_height} frame, expected {frame_size}",
626                rgb_bytes.len()
627            )));
628        }
629        for _ in 0..repeat_count {
630            let new_len = data.len().checked_add(frame_size).ok_or_else(|| {
631                MediaConnectorError::VideoDecode(format!(
632                    "decoded video byte size overflow while appending {frame_size} bytes"
633                ))
634            })?;
635            ensure_decoded_byte_limit(new_len)?;
636            data.try_reserve(frame_size).map_err(|e| {
637                MediaConnectorError::VideoDecode(format!(
638                    "failed to reserve {frame_size} decoded video bytes: {e}"
639                ))
640            })?;
641            let offset = data.len();
642            data.extend_from_slice(&rgb_bytes[..frame_size]);
643            frames.push(DecodedRgbFrame {
644                width: decoded_width,
645                height: decoded_height,
646                offset,
647                len: frame_size,
648            });
649        }
650    }
651
652    if frames.is_empty() {
653        return Err(MediaConnectorError::VideoDecode(
654            "OpenCV produced no readable sampled frames".to_string(),
655        ));
656    }
657    if frames.len() != frame_indices.len() {
658        return Err(MediaConnectorError::VideoDecode(format!(
659            "OpenCV produced {} sampled frames, expected {}",
660            frames.len(),
661            frame_indices.len()
662        )));
663    }
664
665    Ok(DecodedVideoFrames::Rgb(DecodedRgbVideo::new(
666        Bytes::from(data),
667        frames,
668    )))
669}
670
671#[cfg(feature = "opencv-video")]
672fn open_opencv_video_capture(input: &str) -> Result<videoio::VideoCapture, MediaConnectorError> {
673    let capture = videoio::VideoCapture::from_file(input, videoio::CAP_FFMPEG)
674        .map_err(opencv_decode_error)?;
675    if capture.is_opened().map_err(opencv_decode_error)? {
676        return Ok(capture);
677    }
678
679    let capture =
680        videoio::VideoCapture::from_file(input, videoio::CAP_ANY).map_err(opencv_decode_error)?;
681    if capture.is_opened().map_err(opencv_decode_error)? {
682        return Ok(capture);
683    }
684
685    Err(MediaConnectorError::VideoDecode(format!(
686        "OpenCV could not open video: {input}"
687    )))
688}
689
690#[cfg(feature = "opencv-video")]
691fn opencv_frame_indices(total_frames: usize, fps: f64, cfg: VideoFetchConfig) -> Vec<usize> {
692    let mut target_frames = if fps.is_finite() && fps > 0.0 {
693        let duration = total_frames as f64 / fps;
694        (duration * cfg.sample_fps as f64).round() as usize
695    } else {
696        cfg.max_frames
697    };
698    target_frames = target_frames.clamp(cfg.min_frames, cfg.max_frames);
699    target_frames = target_frames.max(1);
700    if target_frames == 1 {
701        return vec![0];
702    }
703
704    let last = (total_frames - 1) as f64;
705    let denom = (target_frames - 1) as f64;
706    (0..target_frames)
707        .map(|idx| ((idx as f64 * last) / denom).floor() as usize)
708        .collect()
709}
710
711#[cfg(feature = "opencv-video")]
712fn counted_frame_indices(frame_indices: &[usize]) -> Vec<(usize, usize)> {
713    let mut counts = Vec::new();
714    for &idx in frame_indices {
715        if let Some((last_idx, count)) = counts.last_mut() {
716            if *last_idx == idx {
717                *count += 1;
718                continue;
719            }
720        }
721        counts.push((idx, 1));
722    }
723    counts
724}
725
726#[cfg(feature = "opencv-video")]
727fn opencv_decode_error(err: opencv::Error) -> MediaConnectorError {
728    MediaConnectorError::VideoDecode(format!("OpenCV video decode failed: {err}"))
729}
730
731async fn decode_video_with_ffmpeg(
732    input_path: &std::path::Path,
733    input_bytes: usize,
734    cfg: VideoFetchConfig,
735) -> Result<DecodedVideoFrames, MediaConnectorError> {
736    if let Ok(metadata) = probe_video_metadata(input_path).await {
737        let started = Instant::now();
738        match decode_video_with_ffmpeg_ppm(input_path, cfg, metadata).await {
739            Ok(rgb_video) => {
740                log_video_decode_backend_timing("ffmpeg_ppm_file", started, input_bytes, cfg, None);
741                return Ok(DecodedVideoFrames::Rgb(rgb_video));
742            }
743            Err(error) => {
744                log_video_decode_backend_timing(
745                    "ffmpeg_ppm_file",
746                    started,
747                    input_bytes,
748                    cfg,
749                    Some(&error),
750                );
751            }
752        }
753
754        let started = Instant::now();
755        match decode_video_with_ffmpeg_raw(input_path, cfg, metadata).await {
756            Ok(rgb_video) => {
757                log_video_decode_backend_timing("ffmpeg_raw_file", started, input_bytes, cfg, None);
758                return Ok(DecodedVideoFrames::Rgb(rgb_video));
759            }
760            Err(error) => {
761                log_video_decode_backend_timing(
762                    "ffmpeg_raw_file",
763                    started,
764                    input_bytes,
765                    cfg,
766                    Some(&error),
767                );
768            }
769        }
770    }
771
772    let started = Instant::now();
773    match decode_video_with_ffmpeg_png(input_path, cfg).await {
774        Ok(frames) => {
775            log_video_decode_backend_timing("ffmpeg_png_file", started, input_bytes, cfg, None);
776            Ok(DecodedVideoFrames::Images(frames))
777        }
778        Err(error) => {
779            log_video_decode_backend_timing(
780                "ffmpeg_png_file",
781                started,
782                input_bytes,
783                cfg,
784                Some(&error),
785            );
786            Err(error)
787        }
788    }
789}
790
791fn write_temp_video_file(bytes: &[u8]) -> Result<tempfile::NamedTempFile, MediaConnectorError> {
792    let started = Instant::now();
793    let mut input_file = tempfile::Builder::new()
794        .prefix("smg-video-")
795        .suffix(video_temp_suffix(bytes))
796        .tempfile()?;
797    input_file.write_all(bytes)?;
798    input_file.flush()?;
799    if log_video_decode_timing_enabled() {
800        info!(
801            nbytes = bytes.len(),
802            elapsed_ms = started.elapsed().as_secs_f64() * 1000.0,
803            suffix = video_temp_suffix(bytes),
804            "smg_mm_timing video_tempfile_write"
805        );
806    }
807    Ok(input_file)
808}
809
810fn video_temp_suffix(bytes: &[u8]) -> &'static str {
811    if bytes.len() >= 12 && bytes.get(4..8) == Some(b"ftyp") {
812        return ".mp4";
813    }
814    if bytes.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
815        return ".webm";
816    }
817    if bytes.len() >= 12 && bytes.starts_with(b"RIFF") && bytes.get(8..12) == Some(b"AVI ") {
818        return ".avi";
819    }
820    if bytes.starts_with(b"OggS") {
821        return ".ogv";
822    }
823    if bytes.starts_with(&[0x00, 0x00, 0x01, 0xba]) {
824        return ".mpg";
825    }
826    ".video"
827}
828
829fn video_process_timeout() -> Duration {
830    std::env::var("SMG_VIDEO_PROCESS_TIMEOUT_SECS")
831        .ok()
832        .and_then(|value| value.parse::<f64>().ok())
833        .filter(|seconds| seconds.is_finite() && *seconds > 0.0)
834        .map(Duration::from_secs_f64)
835        .unwrap_or(DEFAULT_VIDEO_PROCESS_TIMEOUT)
836}
837
838fn video_max_decoded_bytes() -> usize {
839    std::env::var("SMG_VIDEO_MAX_DECODED_BYTES")
840        .ok()
841        .and_then(|value| value.parse::<usize>().ok())
842        .filter(|bytes| *bytes > 0)
843        .unwrap_or(DEFAULT_VIDEO_MAX_DECODED_BYTES)
844}
845
846fn ensure_decoded_byte_limit(bytes: usize) -> Result<(), MediaConnectorError> {
847    let limit = video_max_decoded_bytes();
848    if bytes > limit {
849        return Err(MediaConnectorError::VideoDecode(format!(
850            "decoded video RGB payload would be {bytes} bytes, exceeding SMG_VIDEO_MAX_DECODED_BYTES={limit}"
851        )));
852    }
853    Ok(())
854}
855
856fn checked_decoded_rgb_bytes(
857    frame_count: usize,
858    frame_size: usize,
859) -> Result<usize, MediaConnectorError> {
860    let bytes = frame_count.checked_mul(frame_size).ok_or_else(|| {
861        MediaConnectorError::VideoDecode(format!(
862            "decoded video byte size overflow for {frame_count} frames of {frame_size} bytes"
863        ))
864    })?;
865    ensure_decoded_byte_limit(bytes)?;
866    Ok(bytes)
867}
868
869async fn run_video_command_output(
870    mut command: Command,
871    program: &'static str,
872) -> Result<Output, MediaConnectorError> {
873    command
874        .stdout(Stdio::piped())
875        .stderr(Stdio::piped())
876        .kill_on_drop(true);
877    let child = command.spawn().map_err(|e| {
878        if e.kind() == std::io::ErrorKind::NotFound {
879            MediaConnectorError::VideoDecode(format!(
880                "{program} executable not found; install {program} to decode video_url inputs"
881            ))
882        } else {
883            MediaConnectorError::Io(e)
884        }
885    })?;
886
887    let timeout = video_process_timeout();
888    match time::timeout(timeout, child.wait_with_output()).await {
889        Ok(Ok(output)) => Ok(output),
890        Ok(Err(error)) => Err(MediaConnectorError::Io(error)),
891        Err(_) => Err(MediaConnectorError::VideoDecode(format!(
892            "{program} timed out after {:.3} seconds",
893            timeout.as_secs_f64()
894        ))),
895    }
896}
897
898async fn decode_video_with_ffmpeg_ppm(
899    input_path: &std::path::Path,
900    cfg: VideoFetchConfig,
901    metadata: VideoMetadata,
902) -> Result<DecodedRgbVideo, MediaConnectorError> {
903    let fps_filter = fps_filter_for_metadata(metadata, cfg);
904    let max_frames = cfg.max_frames.to_string();
905    let frame_size = rawvideo_frame_size(metadata.width, metadata.height)?;
906    let target_frames = expected_sampled_frame_count(metadata, cfg);
907    let decoded_bytes = checked_decoded_rgb_bytes(target_frames, frame_size)?;
908    let output_limit = decoded_bytes
909        .checked_add(target_frames.saturating_mul(64))
910        .unwrap_or_else(video_max_decoded_bytes)
911        .min(video_max_decoded_bytes())
912        .to_string();
913    let mut command = Command::new("ffmpeg");
914    command
915        .args(["-hide_banner", "-loglevel", "error", "-nostdin", "-i"])
916        .arg(input_path)
917        .args([
918            "-vf",
919            &fps_filter,
920            "-frames:v",
921            &max_frames,
922            "-fs",
923            &output_limit,
924            "-f",
925            "image2pipe",
926            "-vcodec",
927            "ppm",
928            "-pix_fmt",
929            "rgb24",
930            "pipe:1",
931        ]);
932    let output = run_video_command_output(command, "ffmpeg").await?;
933
934    if !output.status.success() {
935        let stderr = String::from_utf8_lossy(&output.stderr);
936        return Err(MediaConnectorError::VideoDecode(format!(
937            "ffmpeg failed: {stderr}"
938        )));
939    }
940
941    parse_ppm_rgb_video(Bytes::from(output.stdout))
942}
943
944async fn decode_video_with_ffmpeg_raw(
945    input_path: &std::path::Path,
946    cfg: VideoFetchConfig,
947    metadata: VideoMetadata,
948) -> Result<DecodedRgbVideo, MediaConnectorError> {
949    let fps_filter = fps_filter_for_metadata(metadata, cfg);
950    let max_frames = cfg.max_frames.to_string();
951    let frame_size = rawvideo_frame_size(metadata.width, metadata.height)?;
952    let target_frames = expected_sampled_frame_count(metadata, cfg);
953    let decoded_bytes = checked_decoded_rgb_bytes(target_frames, frame_size)?;
954    let output_limit = decoded_bytes.to_string();
955    let mut command = Command::new("ffmpeg");
956    // Rawvideo has no per-frame header, so we interpret stdout using ffprobe's
957    // coded stream dimensions. Disable FFmpeg autorotation here; otherwise a
958    // display-matrix rotation can swap output width/height and corrupt framing.
959    command
960        .args([
961            "-hide_banner",
962            "-loglevel",
963            "error",
964            "-nostdin",
965            "-noautorotate",
966            "-i",
967        ])
968        .arg(input_path)
969        .args([
970            "-vf",
971            &fps_filter,
972            "-frames:v",
973            &max_frames,
974            "-fs",
975            &output_limit,
976            "-f",
977            "rawvideo",
978            "-pix_fmt",
979            "rgb24",
980            "pipe:1",
981        ]);
982    let output = run_video_command_output(command, "ffmpeg").await?;
983
984    if !output.status.success() {
985        let stderr = String::from_utf8_lossy(&output.stderr);
986        return Err(MediaConnectorError::VideoDecode(format!(
987            "ffmpeg failed: {stderr}"
988        )));
989    }
990
991    let frame_count = output.stdout.len() / frame_size;
992    checked_decoded_rgb_bytes(frame_count, frame_size)?;
993    let mut frames = Vec::new();
994    frames.try_reserve(frame_count).map_err(|e| {
995        MediaConnectorError::VideoDecode(format!(
996            "failed to reserve {frame_count} decoded video frame records: {e}"
997        ))
998    })?;
999    for idx in 0..frame_count {
1000        frames.push(DecodedRgbFrame {
1001            width: metadata.width,
1002            height: metadata.height,
1003            offset: idx * frame_size,
1004            len: frame_size,
1005        });
1006    }
1007    let remainder = output.stdout.len() % frame_size;
1008    if remainder != 0 {
1009        return Err(MediaConnectorError::VideoDecode(format!(
1010            "ffmpeg rawvideo output has trailing partial frame: {remainder} bytes"
1011        )));
1012    }
1013    if frames.is_empty() {
1014        return Err(MediaConnectorError::VideoDecode(
1015            "ffmpeg produced no frames".to_string(),
1016        ));
1017    }
1018    Ok(DecodedRgbVideo::new(Bytes::from(output.stdout), frames))
1019}
1020
1021async fn decode_video_with_ffmpeg_png(
1022    input_path: &std::path::Path,
1023    cfg: VideoFetchConfig,
1024) -> Result<Vec<image::DynamicImage>, MediaConnectorError> {
1025    let fps_filter = fps_filter_for_video(input_path, cfg).await;
1026    let max_frames = cfg.max_frames.to_string();
1027    let output_limit = video_max_decoded_bytes().to_string();
1028    let mut command = Command::new("ffmpeg");
1029    command
1030        .args(["-hide_banner", "-loglevel", "error", "-nostdin", "-i"])
1031        .arg(input_path)
1032        .args([
1033            "-vf",
1034            &fps_filter,
1035            "-frames:v",
1036            &max_frames,
1037            "-fs",
1038            &output_limit,
1039            "-f",
1040            "image2pipe",
1041            "-vcodec",
1042            "png",
1043            "pipe:1",
1044        ]);
1045    let output = run_video_command_output(command, "ffmpeg").await?;
1046
1047    if !output.status.success() {
1048        let stderr = String::from_utf8_lossy(&output.stderr);
1049        return Err(MediaConnectorError::VideoDecode(format!(
1050            "ffmpeg failed: {stderr}"
1051        )));
1052    }
1053
1054    let pngs = split_png_stream(&output.stdout)?;
1055    let mut frames = Vec::with_capacity(pngs.len());
1056    let mut decoded_bytes = 0usize;
1057    for png in pngs {
1058        let image = image::load_from_memory(png)?;
1059        let frame_size = rawvideo_frame_size(image.width(), image.height())?;
1060        decoded_bytes = decoded_bytes.checked_add(frame_size).ok_or_else(|| {
1061            MediaConnectorError::VideoDecode("PNG decoded byte size overflow".to_string())
1062        })?;
1063        ensure_decoded_byte_limit(decoded_bytes)?;
1064        frames.push(image);
1065    }
1066    if frames.is_empty() {
1067        return Err(MediaConnectorError::VideoDecode(
1068            "ffmpeg produced no frames".to_string(),
1069        ));
1070    }
1071    Ok(frames)
1072}
1073
1074#[derive(Debug, Clone, Copy)]
1075struct VideoMetadata {
1076    width: u32,
1077    height: u32,
1078    duration_seconds: Option<f64>,
1079}
1080
1081async fn probe_video_metadata(
1082    input_path: &std::path::Path,
1083) -> Result<VideoMetadata, MediaConnectorError> {
1084    let mut command = Command::new("ffprobe");
1085    command
1086        .args([
1087            "-v",
1088            "error",
1089            "-nostdin",
1090            "-select_streams",
1091            "v:0",
1092            "-show_entries",
1093            "stream=width,height:format=duration",
1094            "-of",
1095            "default=noprint_wrappers=1",
1096        ])
1097        .arg(input_path);
1098    let output = run_video_command_output(command, "ffprobe").await?;
1099
1100    if !output.status.success() {
1101        let stderr = String::from_utf8_lossy(&output.stderr);
1102        return Err(MediaConnectorError::VideoDecode(format!(
1103            "ffprobe failed: {stderr}"
1104        )));
1105    }
1106
1107    let stdout = String::from_utf8_lossy(&output.stdout);
1108    let mut width = None;
1109    let mut height = None;
1110    let mut duration_seconds = None;
1111    for line in stdout.lines() {
1112        let Some((key, value)) = line.split_once('=') else {
1113            continue;
1114        };
1115        match key {
1116            "width" => width = value.parse::<u32>().ok(),
1117            "height" => height = value.parse::<u32>().ok(),
1118            "duration" if value != "N/A" => duration_seconds = value.parse::<f64>().ok(),
1119            _ => {}
1120        }
1121    }
1122
1123    let width = width.ok_or_else(|| {
1124        MediaConnectorError::VideoDecode("ffprobe did not return video width".to_string())
1125    })?;
1126    let height = height.ok_or_else(|| {
1127        MediaConnectorError::VideoDecode("ffprobe did not return video height".to_string())
1128    })?;
1129    Ok(VideoMetadata {
1130        width,
1131        height,
1132        duration_seconds,
1133    })
1134}
1135
1136fn fps_filter_for_metadata(metadata: VideoMetadata, cfg: VideoFetchConfig) -> String {
1137    if let Some(duration) = metadata.duration_seconds {
1138        if let Some(filter) = fps_filter_for_duration(duration, cfg) {
1139            return filter;
1140        }
1141    }
1142
1143    format!("fps={}", cfg.sample_fps)
1144}
1145
1146fn expected_sampled_frame_count(metadata: VideoMetadata, cfg: VideoFetchConfig) -> usize {
1147    if let Some(duration) = metadata.duration_seconds {
1148        if duration.is_finite() && duration > 0.0 {
1149            return (duration * cfg.sample_fps as f64)
1150                .round()
1151                .clamp(cfg.min_frames as f64, cfg.max_frames as f64) as usize;
1152        }
1153    }
1154    cfg.max_frames
1155}
1156
1157fn fps_filter_for_duration(duration: f64, cfg: VideoFetchConfig) -> Option<String> {
1158    if !duration.is_finite() || duration <= 0.0 {
1159        return None;
1160    }
1161    let target_frames = (duration * cfg.sample_fps as f64)
1162        .round()
1163        .clamp(cfg.min_frames as f64, cfg.max_frames as f64);
1164    let fps = (target_frames / duration).max(f64::EPSILON);
1165    Some(format!("fps={fps:.6}"))
1166}
1167
1168async fn fps_filter_for_video(input_path: &std::path::Path, cfg: VideoFetchConfig) -> String {
1169    if let Ok(duration) = probe_video_duration_seconds(input_path).await {
1170        if let Some(filter) = fps_filter_for_duration(duration, cfg) {
1171            return filter;
1172        }
1173    }
1174
1175    format!("fps={}", cfg.sample_fps)
1176}
1177
1178async fn probe_video_duration_seconds(
1179    input_path: &std::path::Path,
1180) -> Result<f64, MediaConnectorError> {
1181    let mut command = Command::new("ffprobe");
1182    command
1183        .args([
1184            "-v",
1185            "error",
1186            "-nostdin",
1187            "-show_entries",
1188            "format=duration",
1189            "-of",
1190            "default=noprint_wrappers=1:nokey=1",
1191        ])
1192        .arg(input_path);
1193    match run_video_command_output(command, "ffprobe").await {
1194        Ok(output) if output.status.success() => {
1195            let stdout = String::from_utf8_lossy(&output.stdout);
1196            stdout.trim().parse::<f64>().map_err(|err| {
1197                MediaConnectorError::VideoDecode(format!("failed to parse ffprobe duration: {err}"))
1198            })
1199        }
1200        Ok(_) | Err(_) => probe_video_duration_seconds_with_ffmpeg(input_path).await,
1201    }
1202}
1203
1204async fn probe_video_duration_seconds_with_ffmpeg(
1205    input_path: &std::path::Path,
1206) -> Result<f64, MediaConnectorError> {
1207    let mut command = Command::new("ffmpeg");
1208    command
1209        .args(["-hide_banner", "-nostdin", "-i"])
1210        .arg(input_path);
1211    let output = run_video_command_output(command, "ffmpeg").await?;
1212
1213    let stderr = String::from_utf8_lossy(&output.stderr);
1214    parse_ffmpeg_duration_seconds(&stderr).ok_or_else(|| {
1215        MediaConnectorError::VideoDecode("failed to parse ffmpeg duration".to_string())
1216    })
1217}
1218
1219fn parse_ffmpeg_duration_seconds(stderr: &str) -> Option<f64> {
1220    let marker = "Duration:";
1221    let start = stderr.find(marker)? + marker.len();
1222    let duration = stderr[start..].trim_start().split(',').next()?.trim();
1223    let mut parts = duration.split(':');
1224    let hours = parts.next()?.parse::<f64>().ok()?;
1225    let minutes = parts.next()?.parse::<f64>().ok()?;
1226    let seconds = parts.next()?.parse::<f64>().ok()?;
1227    Some(hours * 3600.0 + minutes * 60.0 + seconds)
1228}
1229
1230fn split_png_stream(bytes: &[u8]) -> Result<Vec<&[u8]>, MediaConnectorError> {
1231    const PNG_SIG: &[u8; 8] = b"\x89PNG\r\n\x1a\n";
1232    const IEND: &[u8; 4] = b"IEND";
1233
1234    let mut frames = Vec::new();
1235    let mut pos = 0;
1236    while pos < bytes.len() {
1237        let Some(rel_start) = bytes[pos..]
1238            .windows(PNG_SIG.len())
1239            .position(|w| w == PNG_SIG)
1240        else {
1241            break;
1242        };
1243        let start = pos + rel_start;
1244        let mut cursor = start + PNG_SIG.len();
1245
1246        loop {
1247            let remaining = bytes.len() - cursor;
1248            if remaining < 12 {
1249                return Err(MediaConnectorError::VideoDecode(
1250                    "truncated PNG frame in ffmpeg output".to_string(),
1251                ));
1252            }
1253            let mut len_bytes = [0_u8; 4];
1254            len_bytes.copy_from_slice(&bytes[cursor..cursor + 4]);
1255            let len = u32::from_be_bytes(len_bytes) as usize;
1256            let chunk_type = &bytes[cursor + 4..cursor + 8];
1257            if remaining - 12 < len {
1258                return Err(MediaConnectorError::VideoDecode(
1259                    "truncated PNG chunk in ffmpeg output".to_string(),
1260                ));
1261            }
1262            cursor += 12 + len;
1263            if chunk_type == IEND {
1264                frames.push(&bytes[start..cursor]);
1265                pos = cursor;
1266                break;
1267            }
1268        }
1269    }
1270
1271    Ok(frames)
1272}
1273
1274#[cfg(test)]
1275fn parse_ppm_stream(bytes: &[u8]) -> Result<Vec<image::DynamicImage>, MediaConnectorError> {
1276    let layouts = parse_ppm_frame_layout(bytes)?;
1277    let mut frames = Vec::with_capacity(layouts.len());
1278    for layout in layouts {
1279        let end = layout.offset.checked_add(layout.len).ok_or_else(|| {
1280            MediaConnectorError::VideoDecode("PPM frame size overflow".to_string())
1281        })?;
1282        let image = image::RgbImage::from_raw(
1283            layout.width,
1284            layout.height,
1285            bytes[layout.offset..end].to_vec(),
1286        )
1287        .ok_or_else(|| {
1288            MediaConnectorError::VideoDecode(format!(
1289                "failed to build RGB frame from {} bytes for {}x{} video",
1290                layout.len, layout.width, layout.height
1291            ))
1292        })?;
1293        frames.push(image::DynamicImage::ImageRgb8(image));
1294    }
1295    Ok(frames)
1296}
1297
1298fn parse_ppm_rgb_video(bytes: Bytes) -> Result<DecodedRgbVideo, MediaConnectorError> {
1299    let layouts = parse_ppm_frame_layout(&bytes)?;
1300    let decoded_bytes = layouts.iter().try_fold(0usize, |total, frame| {
1301        total.checked_add(frame.len).ok_or_else(|| {
1302            MediaConnectorError::VideoDecode("PPM decoded byte size overflow".to_string())
1303        })
1304    })?;
1305    ensure_decoded_byte_limit(decoded_bytes)?;
1306    Ok(DecodedRgbVideo::new(bytes, layouts))
1307}
1308
1309fn parse_ppm_frame_layout(bytes: &[u8]) -> Result<Vec<DecodedRgbFrame>, MediaConnectorError> {
1310    let mut frames = Vec::new();
1311    let mut pos = 0;
1312
1313    while pos < bytes.len() {
1314        skip_ppm_whitespace_and_comments(bytes, &mut pos);
1315        if pos >= bytes.len() {
1316            break;
1317        }
1318
1319        let magic = read_ppm_token(bytes, &mut pos)?.ok_or_else(|| {
1320            MediaConnectorError::VideoDecode("truncated PPM frame header".to_string())
1321        })?;
1322        if magic != b"P6" {
1323            return Err(MediaConnectorError::VideoDecode(format!(
1324                "unsupported PPM magic: {}",
1325                String::from_utf8_lossy(magic)
1326            )));
1327        }
1328        let width = parse_ppm_u32(bytes, &mut pos, "width")?;
1329        let height = parse_ppm_u32(bytes, &mut pos, "height")?;
1330        let max_value = parse_ppm_u32(bytes, &mut pos, "max value")?;
1331        if width == 0 || height == 0 {
1332            return Err(MediaConnectorError::VideoDecode(
1333                "PPM frame dimensions must be non-zero".to_string(),
1334            ));
1335        }
1336        if max_value != 255 {
1337            return Err(MediaConnectorError::VideoDecode(format!(
1338                "unsupported PPM max value: {max_value}"
1339            )));
1340        }
1341        if pos >= bytes.len() || !bytes[pos].is_ascii_whitespace() {
1342            return Err(MediaConnectorError::VideoDecode(
1343                "PPM header is not followed by pixel data".to_string(),
1344            ));
1345        }
1346        pos += 1;
1347
1348        let frame_size = (width as usize)
1349            .checked_mul(height as usize)
1350            .and_then(|pixels| pixels.checked_mul(3))
1351            .ok_or_else(|| {
1352                MediaConnectorError::VideoDecode(format!(
1353                    "PPM frame dimensions are too large: {width}x{height}"
1354                ))
1355            })?;
1356        let end = pos.checked_add(frame_size).ok_or_else(|| {
1357            MediaConnectorError::VideoDecode("PPM frame size overflow".to_string())
1358        })?;
1359        if end > bytes.len() {
1360            return Err(MediaConnectorError::VideoDecode(
1361                "truncated PPM frame pixel data".to_string(),
1362            ));
1363        }
1364        frames.push(DecodedRgbFrame {
1365            width,
1366            height,
1367            offset: pos,
1368            len: frame_size,
1369        });
1370        pos = end;
1371    }
1372
1373    if frames.is_empty() {
1374        return Err(MediaConnectorError::VideoDecode(
1375            "ffmpeg produced no frames".to_string(),
1376        ));
1377    }
1378
1379    Ok(frames)
1380}
1381
1382fn rawvideo_frame_size(width: u32, height: u32) -> Result<usize, MediaConnectorError> {
1383    let frame_size = (width as usize)
1384        .checked_mul(height as usize)
1385        .and_then(|pixels| pixels.checked_mul(3))
1386        .ok_or_else(|| {
1387            MediaConnectorError::VideoDecode(format!(
1388                "video frame dimensions are too large: {width}x{height}"
1389            ))
1390        })?;
1391    if frame_size == 0 {
1392        return Err(MediaConnectorError::VideoDecode(
1393            "video frame dimensions must be non-zero".to_string(),
1394        ));
1395    }
1396    Ok(frame_size)
1397}
1398
1399fn parse_ppm_u32(bytes: &[u8], pos: &mut usize, field: &str) -> Result<u32, MediaConnectorError> {
1400    let token = read_ppm_token(bytes, pos)?
1401        .ok_or_else(|| MediaConnectorError::VideoDecode(format!("truncated PPM {field} header")))?;
1402    std::str::from_utf8(token)
1403        .ok()
1404        .and_then(|value| value.parse::<u32>().ok())
1405        .ok_or_else(|| {
1406            MediaConnectorError::VideoDecode(format!(
1407                "invalid PPM {field}: {}",
1408                String::from_utf8_lossy(token)
1409            ))
1410        })
1411}
1412
1413fn read_ppm_token<'a>(
1414    bytes: &'a [u8],
1415    pos: &mut usize,
1416) -> Result<Option<&'a [u8]>, MediaConnectorError> {
1417    skip_ppm_whitespace_and_comments(bytes, pos);
1418    if *pos >= bytes.len() {
1419        return Ok(None);
1420    }
1421
1422    let start = *pos;
1423    while *pos < bytes.len() && !bytes[*pos].is_ascii_whitespace() {
1424        if bytes[*pos] == b'#' {
1425            return Err(MediaConnectorError::VideoDecode(
1426                "unexpected PPM comment inside token".to_string(),
1427            ));
1428        }
1429        *pos += 1;
1430    }
1431    Ok(Some(&bytes[start..*pos]))
1432}
1433
1434fn skip_ppm_whitespace_and_comments(bytes: &[u8], pos: &mut usize) {
1435    loop {
1436        while *pos < bytes.len() && bytes[*pos].is_ascii_whitespace() {
1437            *pos += 1;
1438        }
1439        if *pos < bytes.len() && bytes[*pos] == b'#' {
1440            while *pos < bytes.len() && bytes[*pos] != b'\n' {
1441                *pos += 1;
1442            }
1443            continue;
1444        }
1445        break;
1446    }
1447}
1448
1449#[cfg(test)]
1450mod tests {
1451    use super::{
1452        parse_ffmpeg_duration_seconds, parse_ppm_stream, split_png_stream, video_temp_suffix,
1453    };
1454
1455    const TINY_PNG: &[u8] = &[
1456        137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 0, 1, 0, 0, 0, 1, 8, 4,
1457        0, 0, 0, 181, 28, 12, 2, 0, 0, 0, 11, 73, 68, 65, 84, 120, 218, 99, 96, 96, 0, 0, 0, 3, 0,
1458        1, 43, 9, 141, 84, 0, 0, 0, 0, 73, 69, 78, 68, 174, 66, 96, 130,
1459    ];
1460
1461    #[test]
1462    fn splits_concatenated_png_stream() {
1463        let mut stream = Vec::new();
1464        stream.extend_from_slice(TINY_PNG);
1465        stream.extend_from_slice(TINY_PNG);
1466
1467        let frames = match split_png_stream(&stream) {
1468            Ok(frames) => frames,
1469            Err(err) => panic!("split png stream failed: {err}"),
1470        };
1471        assert_eq!(frames.len(), 2);
1472        assert_eq!(frames[0], TINY_PNG);
1473        assert_eq!(frames[1], TINY_PNG);
1474    }
1475
1476    #[test]
1477    fn parses_ffmpeg_duration() {
1478        let stderr = "Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'video.mp4':\n  Duration: 00:01:23.45, start: 0.000000, bitrate: 123 kb/s";
1479        assert_eq!(parse_ffmpeg_duration_seconds(stderr), Some(83.45));
1480    }
1481
1482    #[test]
1483    fn detects_video_temp_suffix_from_container_header() {
1484        let mut mp4 = vec![0; 12];
1485        mp4[4..8].copy_from_slice(b"ftyp");
1486        assert_eq!(video_temp_suffix(&mp4), ".mp4");
1487        assert_eq!(video_temp_suffix(&[0x1a, 0x45, 0xdf, 0xa3]), ".webm");
1488        assert_eq!(video_temp_suffix(b"RIFF....AVI "), ".avi");
1489        assert_eq!(video_temp_suffix(b"OggS"), ".ogv");
1490        assert_eq!(video_temp_suffix(&[0x00, 0x00, 0x01, 0xba]), ".mpg");
1491        assert_eq!(video_temp_suffix(b"unknown"), ".video");
1492    }
1493
1494    #[test]
1495    fn parses_concatenated_ppm_stream() {
1496        let stream = b"P6\n2 1\n255\n\x01\x02\x03\x04\x05\x06P6\n# comment\n1 2\n255\n\x07\x08\x09\x0a\x0b\x0c";
1497
1498        let frames = match parse_ppm_stream(stream) {
1499            Ok(frames) => frames,
1500            Err(err) => panic!("parse ppm stream failed: {err}"),
1501        };
1502        assert_eq!(frames.len(), 2);
1503        assert_eq!(frames[0].width(), 2);
1504        assert_eq!(frames[0].height(), 1);
1505        assert_eq!(frames[1].width(), 1);
1506        assert_eq!(frames[1].height(), 2);
1507    }
1508
1509    #[test]
1510    fn rejects_truncated_ppm_stream() {
1511        assert!(parse_ppm_stream(b"P6\n2 1\n255\n\x01\x02").is_err());
1512    }
1513
1514    #[test]
1515    fn rejects_invalid_ppm_header() {
1516        assert!(parse_ppm_stream(b"P3\n1 1\n255\n\x01\x02\x03").is_err());
1517        assert!(parse_ppm_stream(b"P6\n1 1\n65535\n\x01\x02\x03").is_err());
1518    }
1519
1520    #[test]
1521    fn rejects_zero_dimension_ppm_stream() {
1522        assert!(parse_ppm_stream(b"P6\n0 1\n255\n").is_err());
1523        assert!(parse_ppm_stream(b"P6\n1 0\n255\n").is_err());
1524    }
1525
1526    #[test]
1527    fn rejects_overflowing_ppm_frame_size() {
1528        assert!(parse_ppm_stream(b"P6\n4294967295 4294967295\n255\n").is_err());
1529    }
1530
1531    #[cfg(feature = "opencv-video")]
1532    #[test]
1533    fn opencv_sampling_preserves_min_frames_for_short_clips() {
1534        let cfg = super::VideoFetchConfig {
1535            min_frames: 4,
1536            max_frames: 8,
1537            sample_fps: 2.0,
1538        };
1539        let indices = super::opencv_frame_indices(1, 30.0, cfg);
1540        assert_eq!(indices, vec![0, 0, 0, 0]);
1541    }
1542}