av1an_core/
lib.rs

1use std::{
2    cmp::max,
3    collections::{hash_map::DefaultHasher, HashMap},
4    fs::{self, read_to_string, File},
5    hash::{Hash, Hasher},
6    io::Write,
7    path::{Path, PathBuf},
8    string::ToString,
9    sync::{
10        atomic::{AtomicBool, AtomicUsize},
11        Mutex,
12    },
13    thread::available_parallelism,
14    time::Instant,
15};
16
17use ::vapoursynth::{api::API, map::OwnedMap};
18use anyhow::{bail, Context};
19use av1_grain::TransferFunction;
20use av_format::rational::Rational64;
21use chunk::Chunk;
22use dashmap::DashMap;
23use once_cell::sync::{Lazy, OnceCell};
24use serde::{Deserialize, Serialize};
25use strum::{Display, EnumString, IntoStaticStr};
26use tracing::info;
27
28pub use crate::{
29    concat::ConcatMethod,
30    context::Av1anContext,
31    encoder::Encoder,
32    settings::{EncodeArgs, InputPixelFormat, PixelFormat},
33    target_quality::{InterpolationMethod, TargetQuality},
34    util::read_in_dir,
35};
36use crate::{
37    ffmpeg::FFPixelFormat,
38    progress_bar::finish_progress_bar,
39    vapoursynth::{create_vs_file, generate_loadscript_text, CacheSource, LoadscriptArgs},
40};
41
42mod broker;
43mod chunk;
44mod concat;
45mod context;
46mod encoder;
47pub mod ffmpeg;
48mod metrics {
49    pub mod butteraugli;
50    pub mod statistics;
51    pub mod vmaf;
52    pub mod xpsnr;
53}
54mod interpol;
55mod parse;
56mod progress_bar;
57mod scene_detect;
58mod scenes;
59mod settings;
60mod split;
61mod target_quality;
62mod util;
63pub mod vapoursynth;
64mod zones;
65
66static CLIP_INFO_CACHE: Lazy<Mutex<HashMap<CacheKey, ClipInfo>>> =
67    Lazy::new(|| Mutex::new(HashMap::new()));
68
69#[derive(Debug, Clone, Hash, PartialEq, Eq)]
70struct CacheKey {
71    input:    Input,
72    // Not strictly necessary, but allows for proxy to have different values for vspipe_args or
73    // chunking method
74    is_proxy: bool,
75}
76
77#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
78pub enum Input {
79    VapourSynth {
80        path:        PathBuf,
81        vspipe_args: Vec<String>,
82        // Must be stored in memory at initialization instead of generating
83        // on demand in order to reduce thrashing disk with frequent reads from Target Quality
84        // probing
85        script_text: String,
86        is_proxy:    bool,
87    },
88    Video {
89        path:         PathBuf,
90        // Used to generate script_text if chunk_method is supported
91        temp:         String,
92        // Store as a string of ChunkMethod to enable hashing
93        chunk_method: ChunkMethod,
94        is_proxy:     bool,
95        cache_mode:   CacheSource,
96    },
97}
98
99impl Input {
100    #[inline]
101    #[expect(clippy::too_many_arguments)]
102    pub fn new<P: AsRef<Path> + Into<PathBuf>>(
103        path: P,
104        vspipe_args: Vec<String>,
105        temporary_directory: &str,
106        chunk_method: ChunkMethod,
107        scene_detection_downscale_height: Option<usize>,
108        scene_detection_pixel_format: Option<FFPixelFormat>,
109        scene_detection_scaler: Option<&str>,
110        is_proxy: bool,
111        cache_mode: CacheSource,
112    ) -> anyhow::Result<Self> {
113        let input = if let Some(ext) = path.as_ref().extension() {
114            if ext == "py" || ext == "vpy" {
115                let input_path = path.into();
116                let script_text = read_to_string(input_path.clone())?;
117                Ok::<Self, anyhow::Error>(Self::VapourSynth {
118                    path: input_path,
119                    vspipe_args,
120                    script_text,
121                    is_proxy,
122                })
123            } else {
124                let input_path = path.into();
125                Ok(Self::Video {
126                    path: input_path,
127                    temp: temporary_directory.to_owned(),
128                    chunk_method,
129                    is_proxy,
130                    cache_mode,
131                })
132            }
133        } else {
134            let input_path = path.into();
135            Ok(Self::Video {
136                path: input_path,
137                temp: temporary_directory.to_owned(),
138                chunk_method,
139                is_proxy,
140                cache_mode,
141            })
142        }?;
143
144        if input.is_video() && input.is_vapoursynth_script() {
145            // Clip info is cached and reused so the values need to be correct
146            // the first time. The loadscript needs to be generated along with
147            // prerequisite cache/index files and their directories.
148            let (_, cache_file_already_exists) = generate_loadscript_text(&LoadscriptArgs {
149                temp: temporary_directory,
150                source: input.as_path(),
151                chunk_method,
152                scene_detection_downscale_height,
153                scene_detection_pixel_format,
154                scene_detection_scaler: scene_detection_scaler.unwrap_or_default(),
155                is_proxy,
156                cache_mode,
157            })?;
158            if !cache_file_already_exists {
159                // Getting the clip info will cause VapourSynth to generate the
160                // cache file which may take a long time.
161                info!("Generating VapourSynth cache file");
162            }
163
164            create_vs_file(&LoadscriptArgs {
165                temp: temporary_directory,
166                source: input.as_path(),
167                chunk_method,
168                scene_detection_downscale_height,
169                scene_detection_pixel_format,
170                scene_detection_scaler: scene_detection_scaler.unwrap_or_default(),
171                is_proxy,
172                cache_mode,
173            })?;
174
175            input.clip_info()?;
176        }
177
178        Ok(input)
179    }
180
181    /// Returns a reference to the inner path, panicking if the input is not an
182    /// `Input::Video`.
183    #[inline]
184    pub fn as_video_path(&self) -> &Path {
185        match &self {
186            Input::Video {
187                path, ..
188            } => path.as_ref(),
189            Input::VapourSynth {
190                ..
191            } => {
192                panic!("called `Input::as_video_path()` on an `Input::VapourSynth` variant")
193            },
194        }
195    }
196
197    /// Returns a reference to the inner path, panicking if the input is not an
198    /// `Input::VapourSynth`.
199    #[inline]
200    pub fn as_vapoursynth_path(&self) -> &Path {
201        match &self {
202            Input::VapourSynth {
203                path, ..
204            } => path.as_ref(),
205            Input::Video {
206                ..
207            } => {
208                panic!("called `Input::as_vapoursynth_path()` on an `Input::Video` variant")
209            },
210        }
211    }
212
213    /// Returns a reference to the inner path regardless of whether `self` is
214    /// `Video` or `VapourSynth`.
215    ///
216    /// The caller must ensure that the input type is being properly handled.
217    /// This method should not be used unless the code is TRULY agnostic of the
218    /// input type!
219    #[inline]
220    pub fn as_path(&self) -> &Path {
221        match &self {
222            Input::Video {
223                path, ..
224            }
225            | Input::VapourSynth {
226                path, ..
227            } => path.as_ref(),
228        }
229    }
230
231    /// Returns a VapourSynth script as a string. If `self` is `Video`, the
232    /// script will be generated for supported VapourSynth chunk methods.
233    #[inline]
234    pub fn as_script_text(
235        &self,
236        scene_detection_downscale_height: Option<usize>,
237        scene_detection_pixel_format: Option<FFPixelFormat>,
238        scene_detection_scaler: Option<&str>,
239    ) -> anyhow::Result<String> {
240        match &self {
241            Input::VapourSynth {
242                script_text, ..
243            } => Ok(script_text.clone()),
244            Input::Video {
245                path,
246                temp,
247                chunk_method,
248                is_proxy,
249                cache_mode,
250            } => match chunk_method {
251                ChunkMethod::LSMASH
252                | ChunkMethod::FFMS2
253                | ChunkMethod::DGDECNV
254                | ChunkMethod::BESTSOURCE => {
255                    let (script_text, _) = generate_loadscript_text(&LoadscriptArgs {
256                        temp,
257                        source: path,
258                        chunk_method: *chunk_method,
259                        scene_detection_downscale_height,
260                        scene_detection_pixel_format,
261                        scene_detection_scaler: scene_detection_scaler.unwrap_or_default(),
262                        is_proxy: *is_proxy,
263                        cache_mode: *cache_mode,
264                    })?;
265                    Ok(script_text)
266                },
267                _ => Err(anyhow::anyhow!(
268                    "Cannot generate VapourSynth script text with chunk method {chunk_method:?}"
269                )),
270            },
271        }
272    }
273
274    /// Returns a path to the VapourSynth script, panicking if the input is not
275    /// an `Input::VapourSynth` or `Input::Video` with a valid chunk method.
276    #[inline]
277    pub fn as_script_path(&self) -> PathBuf {
278        match &self {
279            Input::VapourSynth {
280                path, ..
281            } => path.clone(),
282            Input::Video {
283                temp, ..
284            } if self.is_vapoursynth_script() => {
285                let temp: &Path = temp.as_ref();
286                temp.join("split").join(if self.is_proxy() {
287                    "loadscript_proxy.vpy"
288                } else {
289                    "loadscript.vpy"
290                })
291            },
292            Input::Video {
293                ..
294            } => panic!("called `Input::as_script_path()` on an `Input::Video` variant"),
295        }
296    }
297
298    #[inline]
299    pub const fn is_video(&self) -> bool {
300        matches!(&self, Input::Video { .. })
301    }
302
303    #[inline]
304    pub const fn is_vapoursynth(&self) -> bool {
305        matches!(&self, Input::VapourSynth { .. })
306    }
307
308    #[inline]
309    pub const fn is_proxy(&self) -> bool {
310        match &self {
311            Input::Video {
312                is_proxy, ..
313            }
314            | Input::VapourSynth {
315                is_proxy, ..
316            } => *is_proxy,
317        }
318    }
319
320    #[inline]
321    pub fn is_vapoursynth_script(&self) -> bool {
322        match &self {
323            Input::VapourSynth {
324                ..
325            } => true,
326            Input::Video {
327                chunk_method, ..
328            } => matches!(
329                chunk_method,
330                ChunkMethod::LSMASH
331                    | ChunkMethod::FFMS2
332                    | ChunkMethod::DGDECNV
333                    | ChunkMethod::BESTSOURCE
334            ),
335        }
336    }
337
338    #[inline]
339    pub fn clip_info(&self) -> anyhow::Result<ClipInfo> {
340        const FAIL_MSG: &str = "Failed to get number of frames for input video";
341
342        let mut cache = CLIP_INFO_CACHE.lock().expect("mutex should acquire lock");
343        let key = CacheKey {
344            input:    self.clone(),
345            is_proxy: self.is_proxy(),
346        };
347        let cached = cache.get(&key);
348        if let Some(cached) = cached {
349            return Ok(*cached);
350        }
351
352        let info = match &self {
353            Input::Video {
354                path, ..
355            } if !&self.is_vapoursynth_script() => {
356                ffmpeg::get_clip_info(path.as_path()).context(FAIL_MSG)?
357            },
358            path => {
359                vapoursynth::get_clip_info(path, &self.as_vspipe_args_map()?).context(FAIL_MSG)?
360            },
361        };
362        cache.insert(key, info);
363        Ok(info)
364    }
365
366    /// Calculates tiles from resolution
367    /// Don't convert tiles to encoder specific representation
368    /// Default video without tiling is 1,1
369    /// Return number of horizontal and vertical tiles
370    #[inline]
371    pub fn calculate_tiles(&self) -> (u32, u32) {
372        match self.clip_info().map(|info| info.resolution) {
373            Ok((h, v)) => {
374                // tile range 0-1440 pixels
375                let horizontal = max((h - 1) / 720, 1);
376                let vertical = max((v - 1) / 720, 1);
377
378                (horizontal, vertical)
379            },
380            _ => (1, 1),
381        }
382    }
383
384    /// Returns the vector of arguments passed to the vspipe python environment
385    /// If the input is not a vapoursynth script, the vector will be empty.
386    #[inline]
387    pub fn as_vspipe_args_vec(&self) -> anyhow::Result<Vec<String>> {
388        match self {
389            Input::VapourSynth {
390                vspipe_args, ..
391            } => Ok(vspipe_args.to_owned()),
392            Input::Video {
393                ..
394            } => Ok(vec![]),
395        }
396    }
397
398    /// Creates and returns an OwnedMap of the arguments passed to the vspipe
399    /// python environment If the input is not a vapoursynth script, the map
400    /// will be empty.
401    #[inline]
402    pub fn as_vspipe_args_map(&self) -> anyhow::Result<OwnedMap<'static>> {
403        let mut args_map = OwnedMap::new(
404            API::get().ok_or_else(|| anyhow::anyhow!("failed to access Vapoursynth API"))?,
405        );
406
407        for arg in self.as_vspipe_args_vec()? {
408            let split: Vec<&str> = arg.split_terminator('=').collect();
409            if args_map.set_data(split[0], split[1].as_bytes()).is_err() {
410                bail!("Failed to split vspipe arguments");
411            };
412        }
413
414        Ok(args_map)
415    }
416
417    #[inline]
418    pub fn as_vspipe_args_hashmap(&self) -> anyhow::Result<HashMap<String, String>> {
419        let mut args_map = HashMap::new();
420        for arg in self.as_vspipe_args_vec()? {
421            let split: Vec<&str> = arg.split_terminator('=').collect();
422            args_map.insert(split[0].to_string(), split[1].to_string());
423        }
424        Ok(args_map)
425    }
426}
427
428#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
429struct DoneChunk {
430    frames:     usize,
431    size_bytes: u64,
432}
433
434/// Concurrent data structure for keeping track of the finished chunks in an
435/// encode
436#[derive(Debug, Deserialize, Serialize)]
437struct DoneJson {
438    frames:     AtomicUsize,
439    done:       DashMap<String, DoneChunk>,
440    audio_done: AtomicBool,
441}
442
443static DONE_JSON: OnceCell<DoneJson> = OnceCell::new();
444
445// once_cell::sync::Lazy cannot be used here due to Lazy<T> not implementing
446// Serialize or Deserialize, we need to get a reference directly to the global
447// data
448fn get_done() -> &'static DoneJson {
449    DONE_JSON.get().expect("DONE_JSON should be initialized")
450}
451
452fn init_done(done: DoneJson) -> &'static DoneJson {
453    DONE_JSON.get_or_init(|| done)
454}
455
456#[inline]
457pub fn list_index(params: &[impl AsRef<str>], is_match: fn(&str) -> bool) -> Option<usize> {
458    assert!(!params.is_empty(), "received empty list of parameters");
459
460    params
461        .iter()
462        .enumerate()
463        .find_map(|(idx, s)| is_match(s.as_ref()).then_some(idx))
464}
465
466#[derive(Serialize, Deserialize, Debug, EnumString, IntoStaticStr, Display, Clone)]
467pub enum SplitMethod {
468    #[strum(serialize = "av-scenechange")]
469    AvScenechange,
470    #[strum(serialize = "none")]
471    None,
472}
473
474#[derive(Serialize, Deserialize, Debug, Clone, Copy, EnumString, IntoStaticStr, Display)]
475pub enum ScenecutMethod {
476    #[strum(serialize = "fast")]
477    Fast,
478    #[strum(serialize = "standard")]
479    Standard,
480}
481
482#[derive(
483    PartialEq,
484    Eq,
485    Copy,
486    Clone,
487    Serialize,
488    Deserialize,
489    Debug,
490    EnumString,
491    IntoStaticStr,
492    Display,
493    Hash,
494)]
495pub enum ChunkMethod {
496    #[strum(serialize = "select")]
497    Select,
498    #[strum(serialize = "hybrid")]
499    Hybrid,
500    #[strum(serialize = "segment")]
501    Segment,
502    #[strum(serialize = "ffms2")]
503    FFMS2,
504    #[strum(serialize = "lsmash")]
505    LSMASH,
506    #[strum(serialize = "dgdecnv")]
507    DGDECNV,
508    #[strum(serialize = "bestsource")]
509    BESTSOURCE,
510}
511
512#[derive(
513    PartialEq, Eq, Copy, Clone, Serialize, Deserialize, Debug, Display, EnumString, IntoStaticStr,
514)]
515pub enum ChunkOrdering {
516    #[strum(serialize = "long-to-short")]
517    LongestFirst,
518    #[strum(serialize = "short-to-long")]
519    ShortestFirst,
520    #[strum(serialize = "sequential")]
521    Sequential,
522    #[strum(serialize = "random")]
523    Random,
524}
525
526#[derive(
527    PartialEq,
528    Eq,
529    Copy,
530    Clone,
531    Serialize,
532    Deserialize,
533    Debug,
534    Display,
535    EnumString,
536    IntoStaticStr,
537    Hash,
538)]
539pub enum VmafFeature {
540    #[strum(serialize = "default")]
541    Default,
542    #[strum(serialize = "weighted")]
543    Weighted,
544    #[strum(serialize = "neg")]
545    Neg,
546    #[strum(serialize = "motionless")]
547    Motionless,
548    #[strum(serialize = "uhd")]
549    Uhd,
550}
551
552#[derive(
553    PartialEq, Eq, Copy, Clone, Serialize, Deserialize, Debug, Display, EnumString, IntoStaticStr,
554)]
555pub enum TargetMetric {
556    #[strum(serialize = "vmaf")]
557    VMAF,
558    #[strum(serialize = "ssimulacra2")]
559    SSIMULACRA2,
560    #[strum(serialize = "butteraugli-inf")]
561    ButteraugliINF,
562    #[strum(serialize = "butteraugli-3")]
563    Butteraugli3,
564    #[strum(serialize = "xpsnr")]
565    XPSNR,
566    #[strum(serialize = "xpsnr-weighted")]
567    XPSNRWeighted,
568}
569
570/// Determine the optimal number of workers for an encoder
571#[inline]
572pub fn determine_workers(args: &EncodeArgs) -> anyhow::Result<u64> {
573    let res = args.input.clip_info()?.resolution;
574    let tiles = args.tiles;
575    let megapixels = (res.0 * res.1) as f64 / 1e6;
576    // encoder memory and chunk_method memory usage scales with resolution
577    // (megapixels), approximately linearly. Expressed as GB/Megapixel
578    let cm_ram = match args.chunk_method {
579        ChunkMethod::FFMS2 | ChunkMethod::LSMASH | ChunkMethod::BESTSOURCE => 0.3,
580        ChunkMethod::DGDECNV => 0.3,
581        ChunkMethod::Hybrid | ChunkMethod::Select | ChunkMethod::Segment => 0.1,
582    };
583    let enc_ram = match args.encoder {
584        Encoder::aom => 0.4,
585        Encoder::rav1e => 0.7,
586        Encoder::svt_av1 => 1.2,
587        Encoder::vpx => 0.3,
588        Encoder::x264 => 0.7,
589        Encoder::x265 => 0.6,
590    };
591    // This is a rough estimate of how many cpu cores will be fully loaded by an
592    // encoder worker. With rav1e, CPU usage scales with tiles, but not 1:1.
593    // Other encoders don't seem to significantly scale CPU usage with tiles.
594    // CPU threads/worker here is relative to default threading parameters, e.g. aom
595    // will use 1 thread/worker if --threads=1 is set.
596    let cpu_threads = match args.encoder {
597        Encoder::aom => 4,
598        Encoder::rav1e => ((tiles.0 * tiles.1) as f32 * 0.7).ceil() as u64,
599        Encoder::svt_av1 => 6,
600        Encoder::vpx => 3,
601        Encoder::x264 | Encoder::x265 => 8,
602    };
603    // memory usage scales with pixel format, expressed as a multiplier of memory
604    // usage. Roughly the same behavior was observed accross all encoders.
605    let pix_mult = match args.output_pix_format.format {
606        FFPixelFormat::YUV444P | FFPixelFormat::YUV444P10LE | FFPixelFormat::YUV444P12LE => 1.5,
607        FFPixelFormat::YUV422P | FFPixelFormat::YUV422P10LE | FFPixelFormat::YUV422P12LE => 1.25,
608        _ => 1.0,
609    };
610
611    let mut system = sysinfo::System::new();
612    system.refresh_memory();
613    let cpu = available_parallelism()
614        .expect("Unrecoverable: Failed to get thread count")
615        .get() as u64;
616    // sysinfo returns Bytes, convert to GB
617    // use total instead of available, because av1an does not resize worker pool
618    let ram_gb = system.total_memory() as f64 / 1e9;
619
620    Ok(std::cmp::max(
621        std::cmp::min(
622            cpu / cpu_threads,
623            (ram_gb / (megapixels * (enc_ram + cm_ram) * pix_mult)).round() as u64,
624        ),
625        1,
626    ))
627}
628
629#[inline]
630pub fn hash_path(path: &Path) -> String {
631    let mut s = DefaultHasher::new();
632    path.hash(&mut s);
633    #[expect(clippy::string_slice, reason = "we know the hash only contains ascii")]
634    format!("{:x}", s.finish())[..7].to_string()
635}
636
637fn save_chunk_queue(temp: &str, chunk_queue: &[Chunk]) -> anyhow::Result<()> {
638    let mut file = File::create(Path::new(temp).join("chunks.json"))
639        .with_context(|| "Failed to create chunks.json file")?;
640
641    file
642    // serializing chunk_queue as json should never fail, so unwrap is OK here
643    .write_all(serde_json::to_string(&chunk_queue)?.as_bytes())
644    .with_context(|| format!("Failed to write serialized chunk_queue data to {:?}", &file))?;
645
646    Ok(())
647}
648
649#[derive(Debug, Clone, Copy, PartialEq, Eq)]
650pub enum Verbosity {
651    Verbose,
652    Normal,
653    Quiet,
654}
655
656fn read_chunk_queue(temp: &Path) -> anyhow::Result<Vec<Chunk>> {
657    let file = Path::new(temp).join("chunks.json");
658
659    let contents = fs::read_to_string(&file)
660        .with_context(|| format!("Failed to read chunk queue file {}", file.display()))?;
661
662    Ok(serde_json::from_str(&contents)?)
663}
664
665#[derive(Serialize, Deserialize, Debug, EnumString, IntoStaticStr, Display, Clone)]
666pub enum ProbingStatisticName {
667    #[strum(serialize = "mean")]
668    Mean = 0,
669    #[strum(serialize = "median")]
670    Median = 1,
671    #[strum(serialize = "harmonic")]
672    Harmonic = 2,
673    #[strum(serialize = "percentile")]
674    Percentile = 3,
675    #[strum(serialize = "standard-deviation")]
676    StandardDeviation = 4,
677    #[strum(serialize = "mode")]
678    Mode = 5,
679    #[strum(serialize = "minimum")]
680    Minimum = 6,
681    #[strum(serialize = "maximum")]
682    Maximum = 7,
683    #[strum(serialize = "root-mean-square")]
684    RootMeanSquare = 8,
685    #[strum(serialize = "auto")]
686    Automatic = 9,
687}
688
689#[derive(Serialize, Deserialize, Debug, Clone)]
690pub struct ProbingStatistic {
691    pub name:  ProbingStatisticName,
692    pub value: Option<f64>,
693}
694
695#[derive(Debug, Clone, Copy)]
696pub struct ClipInfo {
697    pub num_frames:               usize,
698    pub format_info:              InputPixelFormat,
699    pub frame_rate:               Rational64,
700    pub resolution:               (u32, u32), // (width, height), consider using type aliases
701    /// This is overly simplified because we currently only use it for photon
702    /// noise gen, which only supports two transfer functions
703    pub transfer_characteristics: TransferFunction,
704}
705
706impl ClipInfo {
707    #[inline]
708    pub fn transfer_function_params_adjusted(&self, enc_params: &[String]) -> TransferFunction {
709        if enc_params.iter().any(|p| {
710            let p = p.to_ascii_lowercase();
711            p == "pq" || p.ends_with("=pq") || p.ends_with("smpte2084")
712        }) {
713            return TransferFunction::SMPTE2084;
714        }
715        if enc_params.iter().any(|p| {
716            let p = p.to_ascii_lowercase();
717            // If the user specified an SDR transfer characteristic, assume they want to
718            // encode to SDR.
719            p.ends_with("bt709")
720                || p.ends_with("bt.709")
721                || p.ends_with("bt601")
722                || p.ends_with("bt.601")
723                || p.contains("smpte240")
724                || p.contains("smpte170")
725        }) {
726            return TransferFunction::BT1886;
727        }
728        self.transfer_characteristics
729    }
730}