1use std::time::Instant;
10
11use ff_decode::{AudioDecoder, ImageDecoder, VideoDecoder};
12use ff_encode::{BitrateMode, HardwareEncoder, VideoEncoder};
13use ff_filter::{FilterGraph, HwAccel};
14use ff_format::{AudioCodec, ChapterInfo, Timestamp, VideoCodec};
15
16use crate::error::PipelineError;
17use crate::progress::{Progress, ProgressCallback};
18
19#[non_exhaustive]
25pub struct EncoderConfig {
26 pub video_codec: VideoCodec,
28
29 pub audio_codec: AudioCodec,
31
32 pub bitrate_mode: BitrateMode,
34
35 pub resolution: Option<(u32, u32)>,
47
48 pub framerate: Option<f64>,
52
53 pub hardware: Option<HwAccel>,
57}
58
59impl EncoderConfig {
60 #[must_use]
63 pub fn builder() -> EncoderConfigBuilder {
64 EncoderConfigBuilder::new()
65 }
66}
67
68pub struct EncoderConfigBuilder {
72 video_codec: VideoCodec,
73 audio_codec: AudioCodec,
74 bitrate_mode: BitrateMode,
75 resolution: Option<(u32, u32)>,
76 framerate: Option<f64>,
77 hardware: Option<HwAccel>,
78}
79
80impl EncoderConfigBuilder {
81 fn new() -> Self {
82 Self {
83 video_codec: VideoCodec::H264,
84 audio_codec: AudioCodec::Aac,
85 bitrate_mode: BitrateMode::Crf(23),
86 resolution: None,
87 framerate: None,
88 hardware: None,
89 }
90 }
91
92 #[must_use]
94 pub fn video_codec(mut self, codec: VideoCodec) -> Self {
95 self.video_codec = codec;
96 self
97 }
98
99 #[must_use]
101 pub fn audio_codec(mut self, codec: AudioCodec) -> Self {
102 self.audio_codec = codec;
103 self
104 }
105
106 #[must_use]
108 pub fn bitrate_mode(mut self, mode: BitrateMode) -> Self {
109 self.bitrate_mode = mode;
110 self
111 }
112
113 #[must_use]
115 pub fn crf(mut self, crf: u32) -> Self {
116 self.bitrate_mode = BitrateMode::Crf(crf);
117 self
118 }
119
120 #[must_use]
122 pub fn resolution(mut self, width: u32, height: u32) -> Self {
123 self.resolution = Some((width, height));
124 self
125 }
126
127 #[must_use]
129 pub fn framerate(mut self, fps: f64) -> Self {
130 self.framerate = Some(fps);
131 self
132 }
133
134 #[must_use]
136 pub fn hardware(mut self, hw: HwAccel) -> Self {
137 self.hardware = Some(hw);
138 self
139 }
140
141 #[must_use]
143 pub fn build(self) -> EncoderConfig {
144 EncoderConfig {
145 video_codec: self.video_codec,
146 audio_codec: self.audio_codec,
147 bitrate_mode: self.bitrate_mode,
148 resolution: self.resolution,
149 framerate: self.framerate,
150 hardware: self.hardware,
151 }
152 }
153}
154
155pub struct Pipeline {
160 inputs: Vec<String>,
161 secondary_inputs: Vec<String>,
162 filter: Option<FilterGraph>,
163 output: Option<(String, EncoderConfig)>,
164 callback: Option<ProgressCallback>,
165 metadata: Vec<(String, String)>,
166 chapters: Vec<ChapterInfo>,
167 two_pass: bool,
168}
169
170impl Pipeline {
171 #[must_use]
186 pub fn builder() -> PipelineBuilder {
187 PipelineBuilder::new()
188 }
189
190 pub fn run(self) -> Result<(), PipelineError> {
200 let first_input = &self.inputs[0];
202 let (out_path, enc_config) = self.output.ok_or(PipelineError::NoOutput)?;
203 let mut filter = self.filter;
204 let num_inputs = self.inputs.len();
205
206 let first_vdec = VideoDecoder::open(first_input).build()?;
208 let (out_width, out_height) = enc_config.resolution.unwrap_or_else(|| {
209 filter
210 .as_ref()
211 .and_then(|fg| fg.output_resolution())
212 .unwrap_or_else(|| (first_vdec.width(), first_vdec.height()))
213 });
214 let fps = enc_config
215 .framerate
216 .unwrap_or_else(|| first_vdec.frame_rate());
217
218 let total_frames = if num_inputs == 1 {
220 first_vdec.stream_info().frame_count()
221 } else {
222 None
223 };
224
225 log::info!(
226 "pipeline starting inputs={num_inputs} secondary_inputs={} output={out_path} \
227 width={out_width} height={out_height} fps={fps} total_frames={total_frames:?}",
228 self.secondary_inputs.len()
229 );
230
231 let audio_config: Option<(u32, u32)> = match AudioDecoder::open(first_input).build() {
233 Ok(adec) => Some((
234 adec.stream_info().sample_rate(),
235 adec.stream_info().channels(),
236 )),
237 Err(e) => {
238 log::warn!(
239 "audio stream unavailable, encoding video only \
240 path={first_input} reason={e}"
241 );
242 None
243 }
244 };
245
246 let run_audio = !self.two_pass;
248 if self.two_pass && audio_config.is_some() {
249 log::warn!(
250 "two-pass encoding is video-only; audio stream will be skipped \
251 path={first_input}"
252 );
253 }
254
255 let hw = hwaccel_to_hardware_encoder(enc_config.hardware);
257 let mut enc_builder = VideoEncoder::create(&out_path)
258 .video(out_width, out_height, fps)
259 .video_codec(enc_config.video_codec)
260 .bitrate_mode(enc_config.bitrate_mode)
261 .hardware_encoder(hw);
262
263 if run_audio && let Some((sample_rate, channels)) = audio_config {
264 enc_builder = enc_builder
265 .audio(sample_rate, channels)
266 .audio_codec(enc_config.audio_codec);
267 }
268
269 if self.two_pass {
270 enc_builder = enc_builder.two_pass();
271 }
272
273 for (key, value) in self.metadata {
274 enc_builder = enc_builder.metadata(&key, &value);
275 }
276 for chapter in self.chapters {
277 enc_builder = enc_builder.chapter(chapter);
278 }
279
280 let mut encoder = enc_builder.build()?;
281 log::debug!(
282 "encoder opened codec={} hardware={hw:?}",
283 encoder.actual_video_codec()
284 );
285
286 let start = Instant::now();
287 let mut frames_processed: u64 = 0;
288 let mut cancelled = false;
289 let frame_period_secs = if fps > 0.0 { 1.0 / fps } else { 0.0 };
290
291 let mut pts_offset_secs: f64 = 0.0;
293
294 let secondary_frames: Vec<_> = {
297 let mut frames = Vec::with_capacity(self.secondary_inputs.len());
298 for path in &self.secondary_inputs {
299 let ext = std::path::Path::new(path)
300 .extension()
301 .and_then(|e| e.to_str())
302 .map(str::to_lowercase)
303 .unwrap_or_default();
304 let frame = if matches!(
305 ext.as_str(),
306 "jpg" | "jpeg" | "png" | "bmp" | "webp" | "tiff" | "tif"
307 ) {
308 let dec = ImageDecoder::open(path).build()?;
309 dec.decode()?
310 } else {
311 let mut dec = VideoDecoder::open(path).build()?;
312 dec.decode_one()?.ok_or(PipelineError::FrameNotAvailable)?
313 };
314 frames.push(frame);
315 }
316 frames
317 };
318
319 let mut maybe_first_vdec = Some(first_vdec);
321
322 'inputs: for input in &self.inputs {
323 let mut vdec = if let Some(vd) = maybe_first_vdec.take() {
324 vd
325 } else {
326 VideoDecoder::open(input).build()?
327 };
328
329 let mut last_frame_end_secs: f64 = pts_offset_secs;
330
331 loop {
332 let Some(mut raw_frame) = vdec.decode_one()? else {
333 break;
334 };
335
336 let ts = raw_frame.timestamp();
338 let new_pts_secs = pts_offset_secs + ts.as_secs_f64();
339 last_frame_end_secs = new_pts_secs + frame_period_secs;
340 raw_frame.set_timestamp(Timestamp::from_secs_f64(new_pts_secs, ts.time_base()));
341
342 let frame = if let Some(ref mut fg) = filter {
343 fg.push_video(0, &raw_frame)?;
344 for (slot_idx, sec_frame) in secondary_frames.iter().enumerate() {
346 fg.push_video(slot_idx + 1, sec_frame)?;
347 }
348 match fg.pull_video()? {
349 Some(f) => f,
350 None => continue, }
352 } else {
353 raw_frame
354 };
355
356 encoder.push_video(&frame)?;
357 frames_processed += 1;
358
359 if let Some(ref cb) = self.callback {
360 let progress = Progress {
361 frames_processed,
362 total_frames,
363 elapsed: start.elapsed(),
364 };
365 if !cb(&progress) {
366 log::info!(
367 "pipeline cancelled by callback \
368 frames_processed={frames_processed}"
369 );
370 cancelled = true;
371 break 'inputs;
372 }
373 }
374 }
375
376 pts_offset_secs = last_frame_end_secs;
378 log::debug!("input complete path={input} pts_offset_secs={pts_offset_secs:.3}");
379 }
380
381 if !cancelled && run_audio && audio_config.is_some() {
383 let mut audio_offset_secs: f64 = 0.0;
384 for input in &self.inputs {
385 match AudioDecoder::open(input).build() {
386 Ok(mut adec) => {
387 let mut last_audio_end_secs: f64 = audio_offset_secs;
388 while let Some(mut aframe) = adec.decode_one()? {
389 let ts = aframe.timestamp();
390 let new_pts_secs = audio_offset_secs + ts.as_secs_f64();
391 #[allow(clippy::cast_precision_loss)]
392 let frame_dur_secs = if aframe.sample_rate() > 0 {
393 aframe.samples() as f64 / f64::from(aframe.sample_rate())
394 } else {
395 0.0
396 };
397 last_audio_end_secs = new_pts_secs + frame_dur_secs;
398 aframe.set_timestamp(Timestamp::from_secs_f64(
399 new_pts_secs,
400 ts.time_base(),
401 ));
402
403 let aframe = if let Some(ref mut fg) = filter {
404 fg.push_audio(0, &aframe)?;
405 match fg.pull_audio()? {
406 Some(f) => f,
407 None => continue,
408 }
409 } else {
410 aframe
411 };
412 encoder.push_audio(&aframe)?;
413 }
414 audio_offset_secs = last_audio_end_secs;
415 }
416 Err(e) => {
417 log::warn!("audio stream unavailable path={input} reason={e}");
418 }
419 }
420 }
421 }
422
423 encoder.finish()?;
425
426 let elapsed = start.elapsed();
427 log::info!("pipeline finished frames_processed={frames_processed} elapsed={elapsed:?}");
428
429 if cancelled {
430 return Err(PipelineError::Cancelled);
431 }
432 Ok(())
433 }
434}
435
436pub(crate) fn hwaccel_to_hardware_encoder(hw: Option<HwAccel>) -> HardwareEncoder {
441 match hw {
442 None => HardwareEncoder::None,
443 Some(HwAccel::Cuda) => HardwareEncoder::Nvenc,
444 Some(HwAccel::VideoToolbox) => HardwareEncoder::VideoToolbox,
445 Some(HwAccel::Vaapi) => HardwareEncoder::Vaapi,
446 }
447}
448
449pub struct PipelineBuilder {
454 inputs: Vec<String>,
455 secondary_inputs: Vec<String>,
456 filter: Option<FilterGraph>,
457 output: Option<(String, EncoderConfig)>,
458 callback: Option<ProgressCallback>,
459 metadata: Vec<(String, String)>,
460 chapters: Vec<ChapterInfo>,
461 two_pass: bool,
462}
463
464impl PipelineBuilder {
465 #[must_use]
467 pub fn new() -> Self {
468 Self {
469 inputs: Vec::new(),
470 secondary_inputs: Vec::new(),
471 filter: None,
472 output: None,
473 callback: None,
474 metadata: Vec::new(),
475 chapters: Vec::new(),
476 two_pass: false,
477 }
478 }
479
480 #[must_use]
484 pub fn input(mut self, path: &str) -> Self {
485 self.inputs.push(path.to_owned());
486 self
487 }
488
489 #[must_use]
507 pub fn secondary_input(mut self, path: &str) -> Self {
508 self.secondary_inputs.push(path.to_owned());
509 self
510 }
511
512 #[must_use]
516 pub fn filter(mut self, graph: FilterGraph) -> Self {
517 self.filter = Some(graph);
518 self
519 }
520
521 #[must_use]
534 pub fn filter_opt(self, graph: Option<FilterGraph>) -> Self {
535 match graph {
536 Some(g) => self.filter(g),
537 None => self,
538 }
539 }
540
541 #[must_use]
547 pub fn metadata(mut self, key: &str, value: &str) -> Self {
548 self.metadata.push((key.to_string(), value.to_string()));
549 self
550 }
551
552 #[must_use]
556 pub fn chapter(mut self, chapter: ChapterInfo) -> Self {
557 self.chapters.push(chapter);
558 self
559 }
560
561 #[must_use]
567 pub fn two_pass(mut self) -> Self {
568 self.two_pass = true;
569 self
570 }
571
572 #[must_use]
574 pub fn output(mut self, path: &str, config: EncoderConfig) -> Self {
575 self.output = Some((path.to_owned(), config));
576 self
577 }
578
579 #[must_use]
585 pub fn on_progress(mut self, cb: impl Fn(&Progress) -> bool + Send + 'static) -> Self {
586 self.callback = Some(Box::new(cb));
587 self
588 }
589
590 pub fn build(self) -> Result<Pipeline, PipelineError> {
599 if self.inputs.is_empty() {
600 return Err(PipelineError::NoInput);
601 }
602 if self.output.is_none() {
603 return Err(PipelineError::NoOutput);
604 }
605 if !self.secondary_inputs.is_empty() && self.filter.is_none() {
606 return Err(PipelineError::SecondaryInputWithoutFilter);
607 }
608 Ok(Pipeline {
609 inputs: self.inputs,
610 secondary_inputs: self.secondary_inputs,
611 filter: self.filter,
612 output: self.output,
613 callback: self.callback,
614 metadata: self.metadata,
615 chapters: self.chapters,
616 two_pass: self.two_pass,
617 })
618 }
619}
620
621impl Default for PipelineBuilder {
622 fn default() -> Self {
623 Self::new()
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630 use ff_encode::BitrateMode;
631 use ff_format::{AudioCodec, VideoCodec};
632
633 fn dummy_config() -> EncoderConfig {
634 EncoderConfig::builder()
635 .video_codec(VideoCodec::H264)
636 .audio_codec(AudioCodec::Aac)
637 .bitrate_mode(BitrateMode::Cbr(4_000_000))
638 .build()
639 }
640
641 #[test]
642 fn build_should_return_error_when_no_input() {
643 let result = Pipeline::builder()
644 .output("/tmp/out.mp4", dummy_config())
645 .build();
646 assert!(matches!(result, Err(PipelineError::NoInput)));
647 }
648
649 #[test]
650 fn build_should_return_error_when_no_output() {
651 let result = Pipeline::builder().input("/tmp/in.mp4").build();
652 assert!(matches!(result, Err(PipelineError::NoOutput)));
653 }
654
655 #[test]
656 fn build_should_succeed_with_valid_input_and_output() {
657 let pipeline = Pipeline::builder()
658 .input("/tmp/in.mp4")
659 .output("/tmp/out.mp4", dummy_config())
660 .build();
661 assert!(pipeline.is_ok());
662 }
663
664 #[test]
665 fn input_should_accept_multiple_paths() {
666 let result = Pipeline::builder()
669 .input("/tmp/a.mp4")
670 .input("/tmp/b.mp4")
671 .input("/tmp/c.mp4")
672 .output("/tmp/out.mp4", dummy_config())
673 .build();
674 assert!(result.is_ok());
675 }
676
677 #[test]
678 fn on_progress_should_not_prevent_successful_build() {
679 let result = Pipeline::builder()
680 .input("/tmp/in.mp4")
681 .output("/tmp/out.mp4", dummy_config())
682 .on_progress(|_p| true)
683 .build();
684 assert!(result.is_ok());
685 }
686
687 #[test]
688 fn default_should_produce_empty_builder() {
689 let result = PipelineBuilder::default()
692 .output("/tmp/out.mp4", dummy_config())
693 .build();
694 assert!(matches!(result, Err(PipelineError::NoInput)));
695 }
696
697 #[test]
698 fn build_should_require_both_input_and_output() {
699 assert!(matches!(
701 Pipeline::builder().build(),
702 Err(PipelineError::NoInput)
703 ));
704 assert!(matches!(
705 Pipeline::builder().input("/tmp/in.mp4").build(),
706 Err(PipelineError::NoOutput)
707 ));
708 }
709
710 #[test]
711 fn secondary_input_without_filter_should_return_error() {
712 let result = Pipeline::builder()
713 .input("/tmp/in.mp4")
714 .secondary_input("/tmp/logo.png")
715 .output("/tmp/out.mp4", dummy_config())
716 .build();
717 assert!(matches!(
718 result,
719 Err(PipelineError::SecondaryInputWithoutFilter)
720 ));
721 }
722
723 #[test]
724 fn filter_opt_with_none_should_not_prevent_successful_build() {
725 let result = Pipeline::builder()
726 .input("/tmp/in.mp4")
727 .output("/tmp/out.mp4", dummy_config())
728 .filter_opt(None)
729 .build();
730 assert!(result.is_ok());
731 }
732
733 #[test]
734 fn metadata_should_accumulate_key_value_pairs() {
735 let builder = Pipeline::builder()
736 .input("/tmp/in.mp4")
737 .output("/tmp/out.mp4", dummy_config())
738 .metadata("title", "My Video")
739 .metadata("artist", "Author");
740 assert_eq!(builder.metadata.len(), 2);
741 assert_eq!(
742 builder.metadata[0],
743 ("title".to_string(), "My Video".to_string())
744 );
745 assert_eq!(
746 builder.metadata[1],
747 ("artist".to_string(), "Author".to_string())
748 );
749 }
750
751 #[test]
752 fn chapter_should_append_chapter_info() {
753 use std::time::Duration;
754 let ch = ChapterInfo::builder()
755 .id(0)
756 .title("Intro")
757 .start(Duration::ZERO)
758 .end(Duration::from_secs(10))
759 .build();
760 let builder = Pipeline::builder()
761 .input("/tmp/in.mp4")
762 .output("/tmp/out.mp4", dummy_config())
763 .chapter(ch);
764 assert_eq!(builder.chapters.len(), 1);
765 }
766
767 #[test]
768 fn metadata_and_chapters_should_be_empty_by_default() {
769 let builder = Pipeline::builder();
770 assert!(builder.metadata.is_empty());
771 assert!(builder.chapters.is_empty());
772 }
773
774 #[test]
775 fn two_pass_flag_should_default_to_false() {
776 let builder = Pipeline::builder();
777 assert!(!builder.two_pass);
778 }
779
780 #[test]
781 fn two_pass_should_set_flag() {
782 let builder = Pipeline::builder()
783 .input("/tmp/in.mp4")
784 .output("/tmp/out.mp4", dummy_config())
785 .two_pass();
786 assert!(builder.two_pass);
787 }
788
789 #[test]
790 fn two_pass_should_not_prevent_successful_build() {
791 let result = Pipeline::builder()
792 .input("/tmp/in.mp4")
793 .output("/tmp/out.mp4", dummy_config())
794 .two_pass()
795 .build();
796 assert!(result.is_ok());
797 }
798
799 #[test]
800 fn filter_opt_with_none_should_behave_like_no_filter_call() {
801 let result = Pipeline::builder()
804 .input("/tmp/in.mp4")
805 .secondary_input("/tmp/logo.png")
806 .output("/tmp/out.mp4", dummy_config())
807 .filter_opt(None)
808 .build();
809 assert!(matches!(
810 result,
811 Err(PipelineError::SecondaryInputWithoutFilter)
812 ));
813 }
814}