1use std::{
2 collections::HashSet,
3 io::Write,
4 path::PathBuf,
5 process::{Output, Stdio},
6 sync::Arc,
7 time::{Duration, Instant},
8};
9
10use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine};
11use bytes::Bytes;
12#[cfg(feature = "opencv-video")]
13use opencv::{core::Mat, imgproc, prelude::*, videoio};
14use reqwest::Client;
15use tokio::{fs, process::Command, task, time};
16use tracing::info;
17use url::Url;
18
19const DEFAULT_VIDEO_PROCESS_TIMEOUT: Duration = Duration::from_secs(30);
20const DEFAULT_VIDEO_MAX_DECODED_BYTES: usize = 1024 * 1024 * 1024;
21
22use super::{
23 error::MediaConnectorError,
24 types::{
25 DecodedRgbFrame, DecodedRgbVideo, ImageDetail, ImageFrame, ImageSource, VideoClip,
26 VideoSource,
27 },
28};
29
30#[derive(Clone)]
31pub struct MediaConnectorConfig {
32 pub allowed_domains: Option<Vec<String>>,
33 pub allowed_local_media_path: Option<PathBuf>,
34 pub fetch_timeout: Duration,
35}
36
37impl Default for MediaConnectorConfig {
38 fn default() -> Self {
39 Self {
40 allowed_domains: None,
41 allowed_local_media_path: None,
42 fetch_timeout: Duration::from_secs(10),
43 }
44 }
45}
46
47#[derive(Clone, Copy, Debug)]
48pub struct ImageFetchConfig {
49 pub detail: ImageDetail,
50}
51
52impl Default for ImageFetchConfig {
53 fn default() -> Self {
54 Self {
55 detail: ImageDetail::Auto,
56 }
57 }
58}
59
60#[derive(Clone, Copy, Debug)]
61pub struct VideoFetchConfig {
62 pub min_frames: usize,
63 pub max_frames: usize,
64 pub sample_fps: f32,
65}
66
67impl Default for VideoFetchConfig {
68 fn default() -> Self {
69 Self {
70 min_frames: 4,
71 max_frames: 768,
72 sample_fps: 2.0,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
78pub enum MediaSource {
79 Url(String),
80 DataUrl(String),
81 InlineBytes(Vec<u8>),
82 File(PathBuf),
83}
84
85#[derive(Clone)]
86pub struct MediaConnector {
87 client: Client,
88 allowed_domains: Option<HashSet<String>>,
89 allowed_local_media_path: Option<PathBuf>,
90 fetch_timeout: Duration,
91}
92
93impl MediaConnector {
94 pub fn new(client: Client, config: MediaConnectorConfig) -> Result<Self, MediaConnectorError> {
95 let allowed_domains = config.allowed_domains.map(|domains| {
96 domains
97 .into_iter()
98 .map(|d| d.to_ascii_lowercase())
99 .collect::<HashSet<_>>()
100 });
101
102 let allowed_local_media_path = if let Some(path) = config.allowed_local_media_path {
103 Some(std::fs::canonicalize(path)?)
104 } else {
105 None
106 };
107
108 Ok(Self {
109 client,
110 allowed_domains,
111 allowed_local_media_path,
112 fetch_timeout: config.fetch_timeout,
113 })
114 }
115
116 pub async fn fetch_image(
117 &self,
118 source: MediaSource,
119 cfg: ImageFetchConfig,
120 ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
121 match source {
122 MediaSource::Url(url) => self.fetch_http_image(url, cfg).await,
123 MediaSource::DataUrl(data_url) => self.fetch_data_url(data_url, cfg).await,
124 MediaSource::InlineBytes(bytes) => {
125 self.decode_image(bytes.into(), cfg.detail, ImageSource::InlineBytes)
126 .await
127 }
128 MediaSource::File(path) => self.fetch_file(path, cfg).await,
129 }
130 }
131
132 pub async fn fetch_video(
133 &self,
134 source: MediaSource,
135 cfg: VideoFetchConfig,
136 ) -> Result<Arc<VideoClip>, MediaConnectorError> {
137 match source {
141 MediaSource::Url(url) => self.fetch_http_video(url, cfg).await,
142 MediaSource::DataUrl(data_url) => self.fetch_video_data_url(data_url, cfg).await,
143 MediaSource::InlineBytes(bytes) => {
144 self.decode_video(bytes.into(), cfg, VideoSource::InlineBytes)
145 .await
146 }
147 MediaSource::File(path) => self.fetch_video_file(path, cfg).await,
148 }
149 }
150
151 async fn fetch_http_image(
152 &self,
153 url: String,
154 cfg: ImageFetchConfig,
155 ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
156 let parsed = Url::parse(&url).map_err(|_| MediaConnectorError::InvalidUrl(url.clone()))?;
157 self.ensure_domain_allowed(&parsed)?;
158
159 let mut req = self.client.get(parsed.as_str());
160 if self.fetch_timeout > Duration::ZERO {
161 req = req.timeout(self.fetch_timeout);
162 }
163
164 let resp = req.send().await.map_err(|err| {
165 if err.is_timeout() {
166 MediaConnectorError::Timeout(self.fetch_timeout)
167 } else {
168 MediaConnectorError::Http(err)
169 }
170 })?;
171
172 let resp = resp.error_for_status()?;
173 let bytes = resp.bytes().await?;
174 self.decode_image(
175 bytes,
176 cfg.detail,
177 ImageSource::Url {
178 url: parsed.to_string(),
179 },
180 )
181 .await
182 }
183
184 async fn fetch_data_url(
185 &self,
186 data_url: String,
187 cfg: ImageFetchConfig,
188 ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
189 let (metadata, data) = data_url
190 .split_once(',')
191 .ok_or_else(|| MediaConnectorError::DataUrl("missing comma in data url".into()))?;
192
193 if !metadata.ends_with(";base64") {
194 return Err(MediaConnectorError::DataUrl(
195 "only base64 encoded data URLs are supported".into(),
196 ));
197 }
198
199 let data = data.trim();
200 let decoded = BASE64_STANDARD.decode(data)?;
201 self.decode_image(decoded.into(), cfg.detail, ImageSource::DataUrl)
202 .await
203 }
204
205 async fn fetch_video_data_url(
206 &self,
207 data_url: String,
208 cfg: VideoFetchConfig,
209 ) -> Result<Arc<VideoClip>, MediaConnectorError> {
210 let (metadata, data) = data_url
211 .split_once(',')
212 .ok_or_else(|| MediaConnectorError::DataUrl("missing comma in data url".into()))?;
213
214 if !metadata.ends_with(";base64") {
215 return Err(MediaConnectorError::DataUrl(
216 "only base64 encoded data URLs are supported".into(),
217 ));
218 }
219
220 let data = data.trim();
221 let decoded = BASE64_STANDARD.decode(data)?;
222 self.decode_video(decoded.into(), cfg, VideoSource::DataUrl)
223 .await
224 }
225
226 async fn fetch_file(
227 &self,
228 path: PathBuf,
229 cfg: ImageFetchConfig,
230 ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
231 let allowed_root = self
232 .allowed_local_media_path
233 .as_ref()
234 .ok_or_else(|| MediaConnectorError::DisallowedLocalPath(path.display().to_string()))?;
235
236 let canonical = fs::canonicalize(&path).await?;
237 if !canonical.starts_with(allowed_root) {
238 return Err(MediaConnectorError::DisallowedLocalPath(
239 path.display().to_string(),
240 ));
241 }
242
243 let bytes = fs::read(&canonical).await?;
244 self.decode_image(
245 bytes.into(),
246 cfg.detail,
247 ImageSource::File { path: canonical },
248 )
249 .await
250 }
251
252 async fn fetch_http_video(
253 &self,
254 url: String,
255 cfg: VideoFetchConfig,
256 ) -> Result<Arc<VideoClip>, MediaConnectorError> {
257 let parsed = Url::parse(&url).map_err(|_| MediaConnectorError::InvalidUrl(url.clone()))?;
258 self.ensure_domain_allowed(&parsed)?;
259
260 let mut req = self.client.get(parsed.as_str());
261 if self.fetch_timeout > Duration::ZERO {
262 req = req.timeout(self.fetch_timeout);
263 }
264
265 let resp = req.send().await.map_err(|err| {
266 if err.is_timeout() {
267 MediaConnectorError::Timeout(self.fetch_timeout)
268 } else {
269 MediaConnectorError::Http(err)
270 }
271 })?;
272
273 let resp = resp.error_for_status()?;
274 let bytes = resp.bytes().await?;
275 self.decode_video(
276 bytes,
277 cfg,
278 VideoSource::Url {
279 url: parsed.to_string(),
280 },
281 )
282 .await
283 }
284
285 async fn fetch_video_file(
286 &self,
287 path: PathBuf,
288 cfg: VideoFetchConfig,
289 ) -> Result<Arc<VideoClip>, MediaConnectorError> {
290 let allowed_root = self
291 .allowed_local_media_path
292 .as_ref()
293 .ok_or_else(|| MediaConnectorError::DisallowedLocalPath(path.display().to_string()))?;
294
295 let canonical = fs::canonicalize(&path).await?;
296 if !canonical.starts_with(allowed_root) {
297 return Err(MediaConnectorError::DisallowedLocalPath(
298 path.display().to_string(),
299 ));
300 }
301
302 let bytes = fs::read(&canonical).await?;
303 self.decode_video(bytes.into(), cfg, VideoSource::File { path: canonical })
304 .await
305 }
306
307 fn ensure_domain_allowed(&self, url: &Url) -> Result<(), MediaConnectorError> {
308 if let Some(allowed) = &self.allowed_domains {
309 let host = url
310 .host_str()
311 .map(|h| h.to_ascii_lowercase())
312 .ok_or_else(|| MediaConnectorError::InvalidUrl(url.to_string()))?;
313 if !allowed.contains(&host) {
314 return Err(MediaConnectorError::DisallowedDomain(host));
315 }
316 }
317 Ok(())
318 }
319
320 async fn decode_image(
321 &self,
322 bytes: Bytes,
323 detail: ImageDetail,
324 source: ImageSource,
325 ) -> Result<Arc<ImageFrame>, MediaConnectorError> {
326 let hash = crate::hasher::hash_image(&bytes);
327
328 let cursor = std::io::Cursor::new(bytes.clone());
329 let reader = image::ImageReader::new(cursor).with_guessed_format()?;
330
331 let image = task::spawn_blocking(move || reader.decode())
332 .await
333 .map_err(MediaConnectorError::Blocking)??;
334
335 Ok(Arc::new(ImageFrame::new(
336 image, bytes, detail, source, hash,
337 )))
338 }
339
340 async fn decode_video(
341 &self,
342 bytes: Bytes,
343 cfg: VideoFetchConfig,
344 source: VideoSource,
345 ) -> Result<Arc<VideoClip>, MediaConnectorError> {
346 if cfg.max_frames == 0 {
347 return Err(MediaConnectorError::VideoDecode(
348 "max_frames must be greater than 0".to_string(),
349 ));
350 }
351 if cfg.min_frames == 0 {
352 return Err(MediaConnectorError::VideoDecode(
353 "min_frames must be greater than 0".to_string(),
354 ));
355 }
356 if cfg.min_frames > cfg.max_frames {
357 return Err(MediaConnectorError::VideoDecode(
358 "min_frames must be less than or equal to max_frames".to_string(),
359 ));
360 }
361 if cfg.sample_fps <= 0.0 {
362 return Err(MediaConnectorError::VideoDecode(
363 "sample_fps must be greater than 0".to_string(),
364 ));
365 }
366
367 let hash = crate::hasher::hash_video(&bytes);
368 let decoded = decode_video_frames(bytes.clone(), cfg).await?;
369
370 let clip = match decoded {
371 DecodedVideoFrames::Images(frames) => VideoClip::new(frames, bytes, source, hash),
372 DecodedVideoFrames::Rgb(rgb_video) => {
373 VideoClip::new_rgb(rgb_video, bytes, source, hash)
374 }
375 };
376 Ok(Arc::new(clip))
377 }
378}
379
380enum DecodedVideoFrames {
381 Images(Vec<image::DynamicImage>),
382 Rgb(DecodedRgbVideo),
383}
384
385async fn decode_video_frames(
386 bytes: Bytes,
387 cfg: VideoFetchConfig,
388) -> Result<DecodedVideoFrames, MediaConnectorError> {
389 let input_bytes = bytes.len();
390 let input_file = {
391 let bytes = bytes.clone();
392 task::spawn_blocking(move || write_temp_video_file(&bytes))
393 .await
394 .map_err(MediaConnectorError::Blocking)??
395 };
396 let input_path = input_file.path().to_path_buf();
397 match video_decode_backend_override().as_deref() {
398 Some("ffmpeg") => decode_video_with_ffmpeg(&input_path, input_bytes, cfg).await,
399 Some("opencv") => {
400 #[cfg(feature = "opencv-video")]
401 {
402 let input_path = input_path.clone();
403 task::spawn_blocking(move || {
404 decode_video_with_opencv_logged(&input_path, input_bytes, cfg)
405 })
406 .await
407 .map_err(MediaConnectorError::Blocking)?
408 }
409 #[cfg(not(feature = "opencv-video"))]
410 {
411 Err(MediaConnectorError::VideoDecode(
412 "SMG_VIDEO_DECODE_BACKEND=opencv requires the opencv-video feature".to_string(),
413 ))
414 }
415 }
416 Some(backend) => Err(MediaConnectorError::VideoDecode(format!(
417 "unsupported SMG_VIDEO_DECODE_BACKEND={backend}; expected auto, opencv, or ffmpeg"
418 ))),
419 None => {
420 #[cfg(feature = "opencv-video")]
421 {
422 let opencv_input_path = input_path.clone();
425 let opencv_result = task::spawn_blocking(move || {
426 decode_video_with_opencv_logged(&opencv_input_path, input_bytes, cfg)
427 })
428 .await
429 .map_err(MediaConnectorError::Blocking)?;
430
431 match opencv_result {
432 Ok(frames) => Ok(frames),
433 Err(opencv_error) => {
434 if log_video_decode_timing_enabled() {
435 info!(
436 error = %opencv_error,
437 "smg_mm_timing video_decode_auto_opencv_fallback"
438 );
439 }
440
441 match decode_video_with_ffmpeg(&input_path, input_bytes, cfg).await {
442 Ok(frames) => Ok(frames),
443 Err(ffmpeg_error) => Err(MediaConnectorError::VideoDecode(format!(
444 "OpenCV decode failed: {opencv_error}; ffmpeg fallback failed: {ffmpeg_error}"
445 ))),
446 }
447 }
448 }
449 }
450
451 #[cfg(not(feature = "opencv-video"))]
452 {
453 decode_video_with_ffmpeg(&input_path, input_bytes, cfg).await
454 }
455 }
456 }
457}
458
459#[cfg(feature = "opencv-video")]
460fn decode_video_with_opencv_logged(
461 input_path: &std::path::Path,
462 input_bytes: usize,
463 cfg: VideoFetchConfig,
464) -> Result<DecodedVideoFrames, MediaConnectorError> {
465 let started = Instant::now();
466 let result = decode_video_with_opencv_file(input_path, cfg);
467 match &result {
468 Ok(_) => log_video_decode_backend_timing("opencv", started, input_bytes, cfg, None),
469 Err(error) => {
470 log_video_decode_backend_timing("opencv", started, input_bytes, cfg, Some(error));
471 }
472 }
473 result
474}
475
476fn video_decode_backend_override() -> Option<String> {
477 let backend = std::env::var("SMG_VIDEO_DECODE_BACKEND")
478 .ok()?
479 .trim()
480 .to_ascii_lowercase();
481 match backend.as_str() {
482 "" | "auto" => None,
483 _ => Some(backend),
484 }
485}
486
487fn log_video_decode_timing_enabled() -> bool {
488 std::env::var("SMG_LOG_MM_TIMING")
489 .map(|value| {
490 matches!(
491 value.trim().to_ascii_lowercase().as_str(),
492 "1" | "true" | "yes" | "on"
493 )
494 })
495 .unwrap_or(false)
496}
497
498fn log_video_decode_backend_timing(
499 backend: &str,
500 started: Instant,
501 input_bytes: usize,
502 cfg: VideoFetchConfig,
503 error: Option<&MediaConnectorError>,
504) {
505 if !log_video_decode_timing_enabled() {
506 return;
507 }
508 let elapsed_ms = started.elapsed().as_secs_f64() * 1000.0;
509 match error {
510 Some(error) => info!(
511 backend,
512 ok = false,
513 input_bytes,
514 min_frames = cfg.min_frames,
515 max_frames = cfg.max_frames,
516 sample_fps = cfg.sample_fps,
517 elapsed_ms,
518 error = %error,
519 "smg_mm_timing video_decode_backend"
520 ),
521 None => info!(
522 backend,
523 ok = true,
524 input_bytes,
525 min_frames = cfg.min_frames,
526 max_frames = cfg.max_frames,
527 sample_fps = cfg.sample_fps,
528 elapsed_ms,
529 "smg_mm_timing video_decode_backend"
530 ),
531 }
532}
533
534#[cfg(feature = "opencv-video")]
535fn decode_video_with_opencv_file(
536 input_path: &std::path::Path,
537 cfg: VideoFetchConfig,
538) -> Result<DecodedVideoFrames, MediaConnectorError> {
539 let input = input_path.to_str().ok_or_else(|| {
540 MediaConnectorError::VideoDecode(format!(
541 "OpenCV video path is not valid UTF-8: {}",
542 input_path.display()
543 ))
544 })?;
545
546 let mut capture = open_opencv_video_capture(input)?;
547
548 let total_frames = capture
549 .get(videoio::CAP_PROP_FRAME_COUNT)
550 .map_err(opencv_decode_error)?
551 .round()
552 .max(0.0) as usize;
553 if total_frames == 0 {
554 return Err(MediaConnectorError::VideoDecode(
555 "OpenCV reported zero video frames".to_string(),
556 ));
557 }
558
559 let fps = capture
560 .get(videoio::CAP_PROP_FPS)
561 .map_err(opencv_decode_error)?;
562 let frame_indices = opencv_frame_indices(total_frames, fps, cfg);
563 if frame_indices.is_empty() {
564 return Err(MediaConnectorError::VideoDecode(
565 "OpenCV video sampling produced no frame indices".to_string(),
566 ));
567 }
568
569 let sampled_frame_counts = counted_frame_indices(&frame_indices);
570 let mut data = Vec::new();
571 let mut frames = Vec::new();
572 frames.try_reserve(frame_indices.len()).map_err(|e| {
573 MediaConnectorError::VideoDecode(format!(
574 "failed to reserve {} decoded video frame records: {e}",
575 frame_indices.len()
576 ))
577 })?;
578 let mut bgr_frame = Mat::default();
579 let mut rgb_frame = Mat::default();
580
581 let timeout = video_process_timeout();
582 let started = Instant::now();
583 for (idx, repeat_count) in sampled_frame_counts {
586 if started.elapsed() >= timeout {
587 return Err(MediaConnectorError::VideoDecode(format!(
588 "OpenCV timed out after {:.3} seconds",
589 timeout.as_secs_f64()
590 )));
591 }
592
593 if !capture
594 .set(videoio::CAP_PROP_POS_FRAMES, idx as f64)
595 .map_err(opencv_decode_error)?
596 {
597 return Err(MediaConnectorError::VideoDecode(format!(
598 "OpenCV could not seek to sampled frame {idx}"
599 )));
600 }
601
602 if !capture.read(&mut bgr_frame).map_err(opencv_decode_error)? || bgr_frame.empty() {
603 continue;
604 }
605
606 imgproc::cvt_color_def(&bgr_frame, &mut rgb_frame, imgproc::COLOR_BGR2RGB)
607 .map_err(opencv_decode_error)?;
608
609 let decoded_width = u32::try_from(rgb_frame.cols()).map_err(|_| {
610 MediaConnectorError::VideoDecode(format!(
611 "OpenCV produced invalid RGB frame width: {}",
612 rgb_frame.cols()
613 ))
614 })?;
615 let decoded_height = u32::try_from(rgb_frame.rows()).map_err(|_| {
616 MediaConnectorError::VideoDecode(format!(
617 "OpenCV produced invalid RGB frame height: {}",
618 rgb_frame.rows()
619 ))
620 })?;
621 let frame_size = rawvideo_frame_size(decoded_width, decoded_height)?;
622 let rgb_bytes = rgb_frame.data_bytes().map_err(opencv_decode_error)?;
623 if rgb_bytes.len() < frame_size {
624 return Err(MediaConnectorError::VideoDecode(format!(
625 "OpenCV produced {} RGB bytes for {decoded_width}x{decoded_height} frame, expected {frame_size}",
626 rgb_bytes.len()
627 )));
628 }
629 for _ in 0..repeat_count {
630 let new_len = data.len().checked_add(frame_size).ok_or_else(|| {
631 MediaConnectorError::VideoDecode(format!(
632 "decoded video byte size overflow while appending {frame_size} bytes"
633 ))
634 })?;
635 ensure_decoded_byte_limit(new_len)?;
636 data.try_reserve(frame_size).map_err(|e| {
637 MediaConnectorError::VideoDecode(format!(
638 "failed to reserve {frame_size} decoded video bytes: {e}"
639 ))
640 })?;
641 let offset = data.len();
642 data.extend_from_slice(&rgb_bytes[..frame_size]);
643 frames.push(DecodedRgbFrame {
644 width: decoded_width,
645 height: decoded_height,
646 offset,
647 len: frame_size,
648 });
649 }
650 }
651
652 if frames.is_empty() {
653 return Err(MediaConnectorError::VideoDecode(
654 "OpenCV produced no readable sampled frames".to_string(),
655 ));
656 }
657 if frames.len() != frame_indices.len() {
658 return Err(MediaConnectorError::VideoDecode(format!(
659 "OpenCV produced {} sampled frames, expected {}",
660 frames.len(),
661 frame_indices.len()
662 )));
663 }
664
665 Ok(DecodedVideoFrames::Rgb(DecodedRgbVideo::new(
666 Bytes::from(data),
667 frames,
668 )))
669}
670
671#[cfg(feature = "opencv-video")]
672fn open_opencv_video_capture(input: &str) -> Result<videoio::VideoCapture, MediaConnectorError> {
673 let capture = videoio::VideoCapture::from_file(input, videoio::CAP_FFMPEG)
674 .map_err(opencv_decode_error)?;
675 if capture.is_opened().map_err(opencv_decode_error)? {
676 return Ok(capture);
677 }
678
679 let capture =
680 videoio::VideoCapture::from_file(input, videoio::CAP_ANY).map_err(opencv_decode_error)?;
681 if capture.is_opened().map_err(opencv_decode_error)? {
682 return Ok(capture);
683 }
684
685 Err(MediaConnectorError::VideoDecode(format!(
686 "OpenCV could not open video: {input}"
687 )))
688}
689
690#[cfg(feature = "opencv-video")]
691fn opencv_frame_indices(total_frames: usize, fps: f64, cfg: VideoFetchConfig) -> Vec<usize> {
692 let mut target_frames = if fps.is_finite() && fps > 0.0 {
693 let duration = total_frames as f64 / fps;
694 (duration * cfg.sample_fps as f64).round() as usize
695 } else {
696 cfg.max_frames
697 };
698 target_frames = target_frames.clamp(cfg.min_frames, cfg.max_frames);
699 target_frames = target_frames.max(1);
700 if target_frames == 1 {
701 return vec![0];
702 }
703
704 let last = (total_frames - 1) as f64;
705 let denom = (target_frames - 1) as f64;
706 (0..target_frames)
707 .map(|idx| ((idx as f64 * last) / denom).floor() as usize)
708 .collect()
709}
710
711#[cfg(feature = "opencv-video")]
712fn counted_frame_indices(frame_indices: &[usize]) -> Vec<(usize, usize)> {
713 let mut counts = Vec::new();
714 for &idx in frame_indices {
715 if let Some((last_idx, count)) = counts.last_mut() {
716 if *last_idx == idx {
717 *count += 1;
718 continue;
719 }
720 }
721 counts.push((idx, 1));
722 }
723 counts
724}
725
726#[cfg(feature = "opencv-video")]
727fn opencv_decode_error(err: opencv::Error) -> MediaConnectorError {
728 MediaConnectorError::VideoDecode(format!("OpenCV video decode failed: {err}"))
729}
730
731async fn decode_video_with_ffmpeg(
732 input_path: &std::path::Path,
733 input_bytes: usize,
734 cfg: VideoFetchConfig,
735) -> Result<DecodedVideoFrames, MediaConnectorError> {
736 if let Ok(metadata) = probe_video_metadata(input_path).await {
737 let started = Instant::now();
738 match decode_video_with_ffmpeg_ppm(input_path, cfg, metadata).await {
739 Ok(rgb_video) => {
740 log_video_decode_backend_timing("ffmpeg_ppm_file", started, input_bytes, cfg, None);
741 return Ok(DecodedVideoFrames::Rgb(rgb_video));
742 }
743 Err(error) => {
744 log_video_decode_backend_timing(
745 "ffmpeg_ppm_file",
746 started,
747 input_bytes,
748 cfg,
749 Some(&error),
750 );
751 }
752 }
753
754 let started = Instant::now();
755 match decode_video_with_ffmpeg_raw(input_path, cfg, metadata).await {
756 Ok(rgb_video) => {
757 log_video_decode_backend_timing("ffmpeg_raw_file", started, input_bytes, cfg, None);
758 return Ok(DecodedVideoFrames::Rgb(rgb_video));
759 }
760 Err(error) => {
761 log_video_decode_backend_timing(
762 "ffmpeg_raw_file",
763 started,
764 input_bytes,
765 cfg,
766 Some(&error),
767 );
768 }
769 }
770 }
771
772 let started = Instant::now();
773 match decode_video_with_ffmpeg_png(input_path, cfg).await {
774 Ok(frames) => {
775 log_video_decode_backend_timing("ffmpeg_png_file", started, input_bytes, cfg, None);
776 Ok(DecodedVideoFrames::Images(frames))
777 }
778 Err(error) => {
779 log_video_decode_backend_timing(
780 "ffmpeg_png_file",
781 started,
782 input_bytes,
783 cfg,
784 Some(&error),
785 );
786 Err(error)
787 }
788 }
789}
790
791fn write_temp_video_file(bytes: &[u8]) -> Result<tempfile::NamedTempFile, MediaConnectorError> {
792 let started = Instant::now();
793 let mut input_file = tempfile::Builder::new()
794 .prefix("smg-video-")
795 .suffix(video_temp_suffix(bytes))
796 .tempfile()?;
797 input_file.write_all(bytes)?;
798 input_file.flush()?;
799 if log_video_decode_timing_enabled() {
800 info!(
801 nbytes = bytes.len(),
802 elapsed_ms = started.elapsed().as_secs_f64() * 1000.0,
803 suffix = video_temp_suffix(bytes),
804 "smg_mm_timing video_tempfile_write"
805 );
806 }
807 Ok(input_file)
808}
809
810fn video_temp_suffix(bytes: &[u8]) -> &'static str {
811 if bytes.len() >= 12 && bytes.get(4..8) == Some(b"ftyp") {
812 return ".mp4";
813 }
814 if bytes.starts_with(&[0x1a, 0x45, 0xdf, 0xa3]) {
815 return ".webm";
816 }
817 if bytes.len() >= 12 && bytes.starts_with(b"RIFF") && bytes.get(8..12) == Some(b"AVI ") {
818 return ".avi";
819 }
820 if bytes.starts_with(b"OggS") {
821 return ".ogv";
822 }
823 if bytes.starts_with(&[0x00, 0x00, 0x01, 0xba]) {
824 return ".mpg";
825 }
826 ".video"
827}
828
829fn video_process_timeout() -> Duration {
830 std::env::var("SMG_VIDEO_PROCESS_TIMEOUT_SECS")
831 .ok()
832 .and_then(|value| value.parse::<f64>().ok())
833 .filter(|seconds| seconds.is_finite() && *seconds > 0.0)
834 .map(Duration::from_secs_f64)
835 .unwrap_or(DEFAULT_VIDEO_PROCESS_TIMEOUT)
836}
837
838fn video_max_decoded_bytes() -> usize {
839 std::env::var("SMG_VIDEO_MAX_DECODED_BYTES")
840 .ok()
841 .and_then(|value| value.parse::<usize>().ok())
842 .filter(|bytes| *bytes > 0)
843 .unwrap_or(DEFAULT_VIDEO_MAX_DECODED_BYTES)
844}
845
846fn ensure_decoded_byte_limit(bytes: usize) -> Result<(), MediaConnectorError> {
847 let limit = video_max_decoded_bytes();
848 if bytes > limit {
849 return Err(MediaConnectorError::VideoDecode(format!(
850 "decoded video RGB payload would be {bytes} bytes, exceeding SMG_VIDEO_MAX_DECODED_BYTES={limit}"
851 )));
852 }
853 Ok(())
854}
855
856fn checked_decoded_rgb_bytes(
857 frame_count: usize,
858 frame_size: usize,
859) -> Result<usize, MediaConnectorError> {
860 let bytes = frame_count.checked_mul(frame_size).ok_or_else(|| {
861 MediaConnectorError::VideoDecode(format!(
862 "decoded video byte size overflow for {frame_count} frames of {frame_size} bytes"
863 ))
864 })?;
865 ensure_decoded_byte_limit(bytes)?;
866 Ok(bytes)
867}
868
869async fn run_video_command_output(
870 mut command: Command,
871 program: &'static str,
872) -> Result<Output, MediaConnectorError> {
873 command
874 .stdout(Stdio::piped())
875 .stderr(Stdio::piped())
876 .kill_on_drop(true);
877 let child = command.spawn().map_err(|e| {
878 if e.kind() == std::io::ErrorKind::NotFound {
879 MediaConnectorError::VideoDecode(format!(
880 "{program} executable not found; install {program} to decode video_url inputs"
881 ))
882 } else {
883 MediaConnectorError::Io(e)
884 }
885 })?;
886
887 let timeout = video_process_timeout();
888 match time::timeout(timeout, child.wait_with_output()).await {
889 Ok(Ok(output)) => Ok(output),
890 Ok(Err(error)) => Err(MediaConnectorError::Io(error)),
891 Err(_) => Err(MediaConnectorError::VideoDecode(format!(
892 "{program} timed out after {:.3} seconds",
893 timeout.as_secs_f64()
894 ))),
895 }
896}
897
898async fn decode_video_with_ffmpeg_ppm(
899 input_path: &std::path::Path,
900 cfg: VideoFetchConfig,
901 metadata: VideoMetadata,
902) -> Result<DecodedRgbVideo, MediaConnectorError> {
903 let fps_filter = fps_filter_for_metadata(metadata, cfg);
904 let max_frames = cfg.max_frames.to_string();
905 let frame_size = rawvideo_frame_size(metadata.width, metadata.height)?;
906 let target_frames = expected_sampled_frame_count(metadata, cfg);
907 let decoded_bytes = checked_decoded_rgb_bytes(target_frames, frame_size)?;
908 let output_limit = decoded_bytes
909 .checked_add(target_frames.saturating_mul(64))
910 .unwrap_or_else(video_max_decoded_bytes)
911 .min(video_max_decoded_bytes())
912 .to_string();
913 let mut command = Command::new("ffmpeg");
914 command
915 .args(["-hide_banner", "-loglevel", "error", "-nostdin", "-i"])
916 .arg(input_path)
917 .args([
918 "-vf",
919 &fps_filter,
920 "-frames:v",
921 &max_frames,
922 "-fs",
923 &output_limit,
924 "-f",
925 "image2pipe",
926 "-vcodec",
927 "ppm",
928 "-pix_fmt",
929 "rgb24",
930 "pipe:1",
931 ]);
932 let output = run_video_command_output(command, "ffmpeg").await?;
933
934 if !output.status.success() {
935 let stderr = String::from_utf8_lossy(&output.stderr);
936 return Err(MediaConnectorError::VideoDecode(format!(
937 "ffmpeg failed: {stderr}"
938 )));
939 }
940
941 parse_ppm_rgb_video(Bytes::from(output.stdout))
942}
943
944async fn decode_video_with_ffmpeg_raw(
945 input_path: &std::path::Path,
946 cfg: VideoFetchConfig,
947 metadata: VideoMetadata,
948) -> Result<DecodedRgbVideo, MediaConnectorError> {
949 let fps_filter = fps_filter_for_metadata(metadata, cfg);
950 let max_frames = cfg.max_frames.to_string();
951 let frame_size = rawvideo_frame_size(metadata.width, metadata.height)?;
952 let target_frames = expected_sampled_frame_count(metadata, cfg);
953 let decoded_bytes = checked_decoded_rgb_bytes(target_frames, frame_size)?;
954 let output_limit = decoded_bytes.to_string();
955 let mut command = Command::new("ffmpeg");
956 command
960 .args([
961 "-hide_banner",
962 "-loglevel",
963 "error",
964 "-nostdin",
965 "-noautorotate",
966 "-i",
967 ])
968 .arg(input_path)
969 .args([
970 "-vf",
971 &fps_filter,
972 "-frames:v",
973 &max_frames,
974 "-fs",
975 &output_limit,
976 "-f",
977 "rawvideo",
978 "-pix_fmt",
979 "rgb24",
980 "pipe:1",
981 ]);
982 let output = run_video_command_output(command, "ffmpeg").await?;
983
984 if !output.status.success() {
985 let stderr = String::from_utf8_lossy(&output.stderr);
986 return Err(MediaConnectorError::VideoDecode(format!(
987 "ffmpeg failed: {stderr}"
988 )));
989 }
990
991 let frame_count = output.stdout.len() / frame_size;
992 checked_decoded_rgb_bytes(frame_count, frame_size)?;
993 let mut frames = Vec::new();
994 frames.try_reserve(frame_count).map_err(|e| {
995 MediaConnectorError::VideoDecode(format!(
996 "failed to reserve {frame_count} decoded video frame records: {e}"
997 ))
998 })?;
999 for idx in 0..frame_count {
1000 frames.push(DecodedRgbFrame {
1001 width: metadata.width,
1002 height: metadata.height,
1003 offset: idx * frame_size,
1004 len: frame_size,
1005 });
1006 }
1007 let remainder = output.stdout.len() % frame_size;
1008 if remainder != 0 {
1009 return Err(MediaConnectorError::VideoDecode(format!(
1010 "ffmpeg rawvideo output has trailing partial frame: {remainder} bytes"
1011 )));
1012 }
1013 if frames.is_empty() {
1014 return Err(MediaConnectorError::VideoDecode(
1015 "ffmpeg produced no frames".to_string(),
1016 ));
1017 }
1018 Ok(DecodedRgbVideo::new(Bytes::from(output.stdout), frames))
1019}
1020
1021async fn decode_video_with_ffmpeg_png(
1022 input_path: &std::path::Path,
1023 cfg: VideoFetchConfig,
1024) -> Result<Vec<image::DynamicImage>, MediaConnectorError> {
1025 let fps_filter = fps_filter_for_video(input_path, cfg).await;
1026 let max_frames = cfg.max_frames.to_string();
1027 let output_limit = video_max_decoded_bytes().to_string();
1028 let mut command = Command::new("ffmpeg");
1029 command
1030 .args(["-hide_banner", "-loglevel", "error", "-nostdin", "-i"])
1031 .arg(input_path)
1032 .args([
1033 "-vf",
1034 &fps_filter,
1035 "-frames:v",
1036 &max_frames,
1037 "-fs",
1038 &output_limit,
1039 "-f",
1040 "image2pipe",
1041 "-vcodec",
1042 "png",
1043 "pipe:1",
1044 ]);
1045 let output = run_video_command_output(command, "ffmpeg").await?;
1046
1047 if !output.status.success() {
1048 let stderr = String::from_utf8_lossy(&output.stderr);
1049 return Err(MediaConnectorError::VideoDecode(format!(
1050 "ffmpeg failed: {stderr}"
1051 )));
1052 }
1053
1054 let pngs = split_png_stream(&output.stdout)?;
1055 let mut frames = Vec::with_capacity(pngs.len());
1056 let mut decoded_bytes = 0usize;
1057 for png in pngs {
1058 let image = image::load_from_memory(png)?;
1059 let frame_size = rawvideo_frame_size(image.width(), image.height())?;
1060 decoded_bytes = decoded_bytes.checked_add(frame_size).ok_or_else(|| {
1061 MediaConnectorError::VideoDecode("PNG decoded byte size overflow".to_string())
1062 })?;
1063 ensure_decoded_byte_limit(decoded_bytes)?;
1064 frames.push(image);
1065 }
1066 if frames.is_empty() {
1067 return Err(MediaConnectorError::VideoDecode(
1068 "ffmpeg produced no frames".to_string(),
1069 ));
1070 }
1071 Ok(frames)
1072}
1073
1074#[derive(Debug, Clone, Copy)]
1075struct VideoMetadata {
1076 width: u32,
1077 height: u32,
1078 duration_seconds: Option<f64>,
1079}
1080
1081async fn probe_video_metadata(
1082 input_path: &std::path::Path,
1083) -> Result<VideoMetadata, MediaConnectorError> {
1084 let mut command = Command::new("ffprobe");
1085 command
1086 .args([
1087 "-v",
1088 "error",
1089 "-nostdin",
1090 "-select_streams",
1091 "v:0",
1092 "-show_entries",
1093 "stream=width,height:format=duration",
1094 "-of",
1095 "default=noprint_wrappers=1",
1096 ])
1097 .arg(input_path);
1098 let output = run_video_command_output(command, "ffprobe").await?;
1099
1100 if !output.status.success() {
1101 let stderr = String::from_utf8_lossy(&output.stderr);
1102 return Err(MediaConnectorError::VideoDecode(format!(
1103 "ffprobe failed: {stderr}"
1104 )));
1105 }
1106
1107 let stdout = String::from_utf8_lossy(&output.stdout);
1108 let mut width = None;
1109 let mut height = None;
1110 let mut duration_seconds = None;
1111 for line in stdout.lines() {
1112 let Some((key, value)) = line.split_once('=') else {
1113 continue;
1114 };
1115 match key {
1116 "width" => width = value.parse::<u32>().ok(),
1117 "height" => height = value.parse::<u32>().ok(),
1118 "duration" if value != "N/A" => duration_seconds = value.parse::<f64>().ok(),
1119 _ => {}
1120 }
1121 }
1122
1123 let width = width.ok_or_else(|| {
1124 MediaConnectorError::VideoDecode("ffprobe did not return video width".to_string())
1125 })?;
1126 let height = height.ok_or_else(|| {
1127 MediaConnectorError::VideoDecode("ffprobe did not return video height".to_string())
1128 })?;
1129 Ok(VideoMetadata {
1130 width,
1131 height,
1132 duration_seconds,
1133 })
1134}
1135
1136fn fps_filter_for_metadata(metadata: VideoMetadata, cfg: VideoFetchConfig) -> String {
1137 if let Some(duration) = metadata.duration_seconds {
1138 if let Some(filter) = fps_filter_for_duration(duration, cfg) {
1139 return filter;
1140 }
1141 }
1142
1143 format!("fps={}", cfg.sample_fps)
1144}
1145
1146fn expected_sampled_frame_count(metadata: VideoMetadata, cfg: VideoFetchConfig) -> usize {
1147 if let Some(duration) = metadata.duration_seconds {
1148 if duration.is_finite() && duration > 0.0 {
1149 return (duration * cfg.sample_fps as f64)
1150 .round()
1151 .clamp(cfg.min_frames as f64, cfg.max_frames as f64) as usize;
1152 }
1153 }
1154 cfg.max_frames
1155}
1156
1157fn fps_filter_for_duration(duration: f64, cfg: VideoFetchConfig) -> Option<String> {
1158 if !duration.is_finite() || duration <= 0.0 {
1159 return None;
1160 }
1161 let target_frames = (duration * cfg.sample_fps as f64)
1162 .round()
1163 .clamp(cfg.min_frames as f64, cfg.max_frames as f64);
1164 let fps = (target_frames / duration).max(f64::EPSILON);
1165 Some(format!("fps={fps:.6}"))
1166}
1167
1168async fn fps_filter_for_video(input_path: &std::path::Path, cfg: VideoFetchConfig) -> String {
1169 if let Ok(duration) = probe_video_duration_seconds(input_path).await {
1170 if let Some(filter) = fps_filter_for_duration(duration, cfg) {
1171 return filter;
1172 }
1173 }
1174
1175 format!("fps={}", cfg.sample_fps)
1176}
1177
1178async fn probe_video_duration_seconds(
1179 input_path: &std::path::Path,
1180) -> Result<f64, MediaConnectorError> {
1181 let mut command = Command::new("ffprobe");
1182 command
1183 .args([
1184 "-v",
1185 "error",
1186 "-nostdin",
1187 "-show_entries",
1188 "format=duration",
1189 "-of",
1190 "default=noprint_wrappers=1:nokey=1",
1191 ])
1192 .arg(input_path);
1193 match run_video_command_output(command, "ffprobe").await {
1194 Ok(output) if output.status.success() => {
1195 let stdout = String::from_utf8_lossy(&output.stdout);
1196 stdout.trim().parse::<f64>().map_err(|err| {
1197 MediaConnectorError::VideoDecode(format!("failed to parse ffprobe duration: {err}"))
1198 })
1199 }
1200 Ok(_) | Err(_) => probe_video_duration_seconds_with_ffmpeg(input_path).await,
1201 }
1202}
1203
1204async fn probe_video_duration_seconds_with_ffmpeg(
1205 input_path: &std::path::Path,
1206) -> Result<f64, MediaConnectorError> {
1207 let mut command = Command::new("ffmpeg");
1208 command
1209 .args(["-hide_banner", "-nostdin", "-i"])
1210 .arg(input_path);
1211 let output = run_video_command_output(command, "ffmpeg").await?;
1212
1213 let stderr = String::from_utf8_lossy(&output.stderr);
1214 parse_ffmpeg_duration_seconds(&stderr).ok_or_else(|| {
1215 MediaConnectorError::VideoDecode("failed to parse ffmpeg duration".to_string())
1216 })
1217}
1218
1219fn parse_ffmpeg_duration_seconds(stderr: &str) -> Option<f64> {
1220 let marker = "Duration:";
1221 let start = stderr.find(marker)? + marker.len();
1222 let duration = stderr[start..].trim_start().split(',').next()?.trim();
1223 let mut parts = duration.split(':');
1224 let hours = parts.next()?.parse::<f64>().ok()?;
1225 let minutes = parts.next()?.parse::<f64>().ok()?;
1226 let seconds = parts.next()?.parse::<f64>().ok()?;
1227 Some(hours * 3600.0 + minutes * 60.0 + seconds)
1228}
1229
1230fn split_png_stream(bytes: &[u8]) -> Result<Vec<&[u8]>, MediaConnectorError> {
1231 const PNG_SIG: &[u8; 8] = b"\x89PNG\r\n\x1a\n";
1232 const IEND: &[u8; 4] = b"IEND";
1233
1234 let mut frames = Vec::new();
1235 let mut pos = 0;
1236 while pos < bytes.len() {
1237 let Some(rel_start) = bytes[pos..]
1238 .windows(PNG_SIG.len())
1239 .position(|w| w == PNG_SIG)
1240 else {
1241 break;
1242 };
1243 let start = pos + rel_start;
1244 let mut cursor = start + PNG_SIG.len();
1245
1246 loop {
1247 let remaining = bytes.len() - cursor;
1248 if remaining < 12 {
1249 return Err(MediaConnectorError::VideoDecode(
1250 "truncated PNG frame in ffmpeg output".to_string(),
1251 ));
1252 }
1253 let mut len_bytes = [0_u8; 4];
1254 len_bytes.copy_from_slice(&bytes[cursor..cursor + 4]);
1255 let len = u32::from_be_bytes(len_bytes) as usize;
1256 let chunk_type = &bytes[cursor + 4..cursor + 8];
1257 if remaining - 12 < len {
1258 return Err(MediaConnectorError::VideoDecode(
1259 "truncated PNG chunk in ffmpeg output".to_string(),
1260 ));
1261 }
1262 cursor += 12 + len;
1263 if chunk_type == IEND {
1264 frames.push(&bytes[start..cursor]);
1265 pos = cursor;
1266 break;
1267 }
1268 }
1269 }
1270
1271 Ok(frames)
1272}
1273
1274#[cfg(test)]
1275fn parse_ppm_stream(bytes: &[u8]) -> Result<Vec<image::DynamicImage>, MediaConnectorError> {
1276 let layouts = parse_ppm_frame_layout(bytes)?;
1277 let mut frames = Vec::with_capacity(layouts.len());
1278 for layout in layouts {
1279 let end = layout.offset.checked_add(layout.len).ok_or_else(|| {
1280 MediaConnectorError::VideoDecode("PPM frame size overflow".to_string())
1281 })?;
1282 let image = image::RgbImage::from_raw(
1283 layout.width,
1284 layout.height,
1285 bytes[layout.offset..end].to_vec(),
1286 )
1287 .ok_or_else(|| {
1288 MediaConnectorError::VideoDecode(format!(
1289 "failed to build RGB frame from {} bytes for {}x{} video",
1290 layout.len, layout.width, layout.height
1291 ))
1292 })?;
1293 frames.push(image::DynamicImage::ImageRgb8(image));
1294 }
1295 Ok(frames)
1296}
1297
1298fn parse_ppm_rgb_video(bytes: Bytes) -> Result<DecodedRgbVideo, MediaConnectorError> {
1299 let layouts = parse_ppm_frame_layout(&bytes)?;
1300 let decoded_bytes = layouts.iter().try_fold(0usize, |total, frame| {
1301 total.checked_add(frame.len).ok_or_else(|| {
1302 MediaConnectorError::VideoDecode("PPM decoded byte size overflow".to_string())
1303 })
1304 })?;
1305 ensure_decoded_byte_limit(decoded_bytes)?;
1306 Ok(DecodedRgbVideo::new(bytes, layouts))
1307}
1308
1309fn parse_ppm_frame_layout(bytes: &[u8]) -> Result<Vec<DecodedRgbFrame>, MediaConnectorError> {
1310 let mut frames = Vec::new();
1311 let mut pos = 0;
1312
1313 while pos < bytes.len() {
1314 skip_ppm_whitespace_and_comments(bytes, &mut pos);
1315 if pos >= bytes.len() {
1316 break;
1317 }
1318
1319 let magic = read_ppm_token(bytes, &mut pos)?.ok_or_else(|| {
1320 MediaConnectorError::VideoDecode("truncated PPM frame header".to_string())
1321 })?;
1322 if magic != b"P6" {
1323 return Err(MediaConnectorError::VideoDecode(format!(
1324 "unsupported PPM magic: {}",
1325 String::from_utf8_lossy(magic)
1326 )));
1327 }
1328 let width = parse_ppm_u32(bytes, &mut pos, "width")?;
1329 let height = parse_ppm_u32(bytes, &mut pos, "height")?;
1330 let max_value = parse_ppm_u32(bytes, &mut pos, "max value")?;
1331 if width == 0 || height == 0 {
1332 return Err(MediaConnectorError::VideoDecode(
1333 "PPM frame dimensions must be non-zero".to_string(),
1334 ));
1335 }
1336 if max_value != 255 {
1337 return Err(MediaConnectorError::VideoDecode(format!(
1338 "unsupported PPM max value: {max_value}"
1339 )));
1340 }
1341 if pos >= bytes.len() || !bytes[pos].is_ascii_whitespace() {
1342 return Err(MediaConnectorError::VideoDecode(
1343 "PPM header is not followed by pixel data".to_string(),
1344 ));
1345 }
1346 pos += 1;
1347
1348 let frame_size = (width as usize)
1349 .checked_mul(height as usize)
1350 .and_then(|pixels| pixels.checked_mul(3))
1351 .ok_or_else(|| {
1352 MediaConnectorError::VideoDecode(format!(
1353 "PPM frame dimensions are too large: {width}x{height}"
1354 ))
1355 })?;
1356 let end = pos.checked_add(frame_size).ok_or_else(|| {
1357 MediaConnectorError::VideoDecode("PPM frame size overflow".to_string())
1358 })?;
1359 if end > bytes.len() {
1360 return Err(MediaConnectorError::VideoDecode(
1361 "truncated PPM frame pixel data".to_string(),
1362 ));
1363 }
1364 frames.push(DecodedRgbFrame {
1365 width,
1366 height,
1367 offset: pos,
1368 len: frame_size,
1369 });
1370 pos = end;
1371 }
1372
1373 if frames.is_empty() {
1374 return Err(MediaConnectorError::VideoDecode(
1375 "ffmpeg produced no frames".to_string(),
1376 ));
1377 }
1378
1379 Ok(frames)
1380}
1381
1382fn rawvideo_frame_size(width: u32, height: u32) -> Result<usize, MediaConnectorError> {
1383 let frame_size = (width as usize)
1384 .checked_mul(height as usize)
1385 .and_then(|pixels| pixels.checked_mul(3))
1386 .ok_or_else(|| {
1387 MediaConnectorError::VideoDecode(format!(
1388 "video frame dimensions are too large: {width}x{height}"
1389 ))
1390 })?;
1391 if frame_size == 0 {
1392 return Err(MediaConnectorError::VideoDecode(
1393 "video frame dimensions must be non-zero".to_string(),
1394 ));
1395 }
1396 Ok(frame_size)
1397}
1398
1399fn parse_ppm_u32(bytes: &[u8], pos: &mut usize, field: &str) -> Result<u32, MediaConnectorError> {
1400 let token = read_ppm_token(bytes, pos)?
1401 .ok_or_else(|| MediaConnectorError::VideoDecode(format!("truncated PPM {field} header")))?;
1402 std::str::from_utf8(token)
1403 .ok()
1404 .and_then(|value| value.parse::<u32>().ok())
1405 .ok_or_else(|| {
1406 MediaConnectorError::VideoDecode(format!(
1407 "invalid PPM {field}: {}",
1408 String::from_utf8_lossy(token)
1409 ))
1410 })
1411}
1412
1413fn read_ppm_token<'a>(
1414 bytes: &'a [u8],
1415 pos: &mut usize,
1416) -> Result<Option<&'a [u8]>, MediaConnectorError> {
1417 skip_ppm_whitespace_and_comments(bytes, pos);
1418 if *pos >= bytes.len() {
1419 return Ok(None);
1420 }
1421
1422 let start = *pos;
1423 while *pos < bytes.len() && !bytes[*pos].is_ascii_whitespace() {
1424 if bytes[*pos] == b'#' {
1425 return Err(MediaConnectorError::VideoDecode(
1426 "unexpected PPM comment inside token".to_string(),
1427 ));
1428 }
1429 *pos += 1;
1430 }
1431 Ok(Some(&bytes[start..*pos]))
1432}
1433
1434fn skip_ppm_whitespace_and_comments(bytes: &[u8], pos: &mut usize) {
1435 loop {
1436 while *pos < bytes.len() && bytes[*pos].is_ascii_whitespace() {
1437 *pos += 1;
1438 }
1439 if *pos < bytes.len() && bytes[*pos] == b'#' {
1440 while *pos < bytes.len() && bytes[*pos] != b'\n' {
1441 *pos += 1;
1442 }
1443 continue;
1444 }
1445 break;
1446 }
1447}
1448
1449#[cfg(test)]
1450mod tests {
1451 use super::{
1452 parse_ffmpeg_duration_seconds, parse_ppm_stream, split_png_stream, video_temp_suffix,
1453 };
1454
1455 const TINY_PNG: &[u8] = &[
1456 137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 0, 1, 0, 0, 0, 1, 8, 4,
1457 0, 0, 0, 181, 28, 12, 2, 0, 0, 0, 11, 73, 68, 65, 84, 120, 218, 99, 96, 96, 0, 0, 0, 3, 0,
1458 1, 43, 9, 141, 84, 0, 0, 0, 0, 73, 69, 78, 68, 174, 66, 96, 130,
1459 ];
1460
1461 #[test]
1462 fn splits_concatenated_png_stream() {
1463 let mut stream = Vec::new();
1464 stream.extend_from_slice(TINY_PNG);
1465 stream.extend_from_slice(TINY_PNG);
1466
1467 let frames = match split_png_stream(&stream) {
1468 Ok(frames) => frames,
1469 Err(err) => panic!("split png stream failed: {err}"),
1470 };
1471 assert_eq!(frames.len(), 2);
1472 assert_eq!(frames[0], TINY_PNG);
1473 assert_eq!(frames[1], TINY_PNG);
1474 }
1475
1476 #[test]
1477 fn parses_ffmpeg_duration() {
1478 let stderr = "Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'video.mp4':\n Duration: 00:01:23.45, start: 0.000000, bitrate: 123 kb/s";
1479 assert_eq!(parse_ffmpeg_duration_seconds(stderr), Some(83.45));
1480 }
1481
1482 #[test]
1483 fn detects_video_temp_suffix_from_container_header() {
1484 let mut mp4 = vec![0; 12];
1485 mp4[4..8].copy_from_slice(b"ftyp");
1486 assert_eq!(video_temp_suffix(&mp4), ".mp4");
1487 assert_eq!(video_temp_suffix(&[0x1a, 0x45, 0xdf, 0xa3]), ".webm");
1488 assert_eq!(video_temp_suffix(b"RIFF....AVI "), ".avi");
1489 assert_eq!(video_temp_suffix(b"OggS"), ".ogv");
1490 assert_eq!(video_temp_suffix(&[0x00, 0x00, 0x01, 0xba]), ".mpg");
1491 assert_eq!(video_temp_suffix(b"unknown"), ".video");
1492 }
1493
1494 #[test]
1495 fn parses_concatenated_ppm_stream() {
1496 let stream = b"P6\n2 1\n255\n\x01\x02\x03\x04\x05\x06P6\n# comment\n1 2\n255\n\x07\x08\x09\x0a\x0b\x0c";
1497
1498 let frames = match parse_ppm_stream(stream) {
1499 Ok(frames) => frames,
1500 Err(err) => panic!("parse ppm stream failed: {err}"),
1501 };
1502 assert_eq!(frames.len(), 2);
1503 assert_eq!(frames[0].width(), 2);
1504 assert_eq!(frames[0].height(), 1);
1505 assert_eq!(frames[1].width(), 1);
1506 assert_eq!(frames[1].height(), 2);
1507 }
1508
1509 #[test]
1510 fn rejects_truncated_ppm_stream() {
1511 assert!(parse_ppm_stream(b"P6\n2 1\n255\n\x01\x02").is_err());
1512 }
1513
1514 #[test]
1515 fn rejects_invalid_ppm_header() {
1516 assert!(parse_ppm_stream(b"P3\n1 1\n255\n\x01\x02\x03").is_err());
1517 assert!(parse_ppm_stream(b"P6\n1 1\n65535\n\x01\x02\x03").is_err());
1518 }
1519
1520 #[test]
1521 fn rejects_zero_dimension_ppm_stream() {
1522 assert!(parse_ppm_stream(b"P6\n0 1\n255\n").is_err());
1523 assert!(parse_ppm_stream(b"P6\n1 0\n255\n").is_err());
1524 }
1525
1526 #[test]
1527 fn rejects_overflowing_ppm_frame_size() {
1528 assert!(parse_ppm_stream(b"P6\n4294967295 4294967295\n255\n").is_err());
1529 }
1530
1531 #[cfg(feature = "opencv-video")]
1532 #[test]
1533 fn opencv_sampling_preserves_min_frames_for_short_clips() {
1534 let cfg = super::VideoFetchConfig {
1535 min_frames: 4,
1536 max_frames: 8,
1537 sample_fps: 2.0,
1538 };
1539 let indices = super::opencv_frame_indices(1, 30.0, cfg);
1540 assert_eq!(indices, vec![0, 0, 0, 0]);
1541 }
1542}