av1an_core/concat/
mod.rs

1#[cfg(test)]
2mod tests;
3
4use std::{
5    fmt::{Display, Write as FmtWrite},
6    fs::{self, DirEntry, File},
7    io::Write,
8    path::{Path, PathBuf},
9    process::{Command, Stdio},
10    sync::Arc,
11};
12
13use anyhow::{anyhow, Context};
14use av_format::{
15    buffer::AccReader,
16    demuxer::{Context as DemuxerContext, Event},
17    muxer::{Context as MuxerContext, Writer},
18    rational::Rational64,
19};
20use av_ivf::{demuxer::IvfDemuxer, muxer::IvfMuxer};
21use path_abs::{PathAbs, PathInfo};
22use serde::{Deserialize, Serialize};
23use tracing::{debug, error, trace, warn};
24
25use crate::{encoder::Encoder, util::read_in_dir};
26
27#[derive(
28    PartialEq,
29    Eq,
30    Copy,
31    Clone,
32    Serialize,
33    Deserialize,
34    Debug,
35    strum::EnumString,
36    strum::IntoStaticStr,
37)]
38pub enum ConcatMethod {
39    #[strum(serialize = "mkvmerge")]
40    MKVMerge,
41    #[strum(serialize = "ffmpeg")]
42    FFmpeg,
43    #[strum(serialize = "ivf")]
44    Ivf,
45}
46
47impl Display for ConcatMethod {
48    #[inline]
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.write_str(<&'static str>::from(self))
51    }
52}
53
54#[tracing::instrument(level = "debug")]
55pub fn sort_files_by_filename(files: &mut [PathBuf]) {
56    files.sort_unstable_by_key(|x| {
57        // If the temp directory follows the expected format of 00000.ivf, 00001.ivf,
58        // etc., then these unwraps will not fail
59        x.file_stem()
60            .expect("should have file stem")
61            .to_string_lossy()
62            .parse::<u32>()
63            .expect("files should follow numeric pattern")
64    });
65}
66
67#[tracing::instrument(level = "debug")]
68pub fn ivf(input: &Path, out: &Path) -> anyhow::Result<()> {
69    let mut files: Vec<PathBuf> = read_in_dir(input)?.collect();
70
71    sort_files_by_filename(&mut files);
72
73    assert!(!files.is_empty());
74
75    let output = File::create(out)?;
76
77    let mut muxer = MuxerContext::new(IvfMuxer::new(), Writer::new(output));
78
79    let global_info = {
80        let acc = AccReader::new(std::fs::File::open(&files[0])?);
81        let mut demuxer = DemuxerContext::new(IvfDemuxer::new(), acc);
82
83        demuxer.read_headers()?;
84
85        // attempt to set the duration correctly
86        let duration = demuxer.info.duration.unwrap_or(0)
87            + files.iter().skip(1).try_fold(0u64, |sum, file| -> anyhow::Result<_> {
88                let acc = AccReader::new(std::fs::File::open(file)?);
89                let mut demuxer = DemuxerContext::new(IvfDemuxer::new(), acc);
90
91                demuxer.read_headers()?;
92                Ok(sum + demuxer.info.duration.unwrap_or(0))
93            })?;
94
95        let mut info = demuxer.info;
96        info.duration = Some(duration);
97        info
98    };
99
100    muxer.set_global_info(global_info)?;
101
102    muxer.configure()?;
103    muxer.write_header()?;
104
105    let mut pos_offset: usize = 0;
106    for file in &files {
107        let mut last_pos: usize = 0;
108        let input = std::fs::File::open(file)?;
109
110        let acc = AccReader::new(input);
111
112        let mut demuxer = DemuxerContext::new(IvfDemuxer::new(), acc);
113        demuxer.read_headers()?;
114
115        trace!("global info: {:#?}", demuxer.info);
116
117        loop {
118            match demuxer.read_event() {
119                Ok(event) => match event {
120                    Event::MoreDataNeeded(sz) => panic!("needed more data: {sz} bytes"),
121                    Event::NewStream(s) => panic!("new stream: {s:?}"),
122                    Event::NewPacket(mut packet) => {
123                        if let Some(p) = packet.pos.as_mut() {
124                            last_pos = *p;
125                            *p += pos_offset;
126                        }
127
128                        trace!("received packet with pos: {:?}", packet.pos);
129                        muxer.write_packet(Arc::new(packet))?;
130                    },
131                    Event::Continue => {
132                        // do nothing
133                    },
134                    Event::Eof => {
135                        trace!("EOF received.");
136                        break;
137                    },
138                    _ => unimplemented!(),
139                },
140                Err(e) => {
141                    error!("{:?}", e);
142                    break;
143                },
144            }
145        }
146        pos_offset += last_pos + 1;
147    }
148
149    muxer.write_trailer()?;
150
151    Ok(())
152}
153
154#[tracing::instrument(level = "debug")]
155fn read_encoded_chunks(encode_dir: &Path) -> anyhow::Result<Vec<DirEntry>> {
156    Ok(fs::read_dir(encode_dir)
157        .with_context(|| {
158            format!(
159                "Failed to read encoded chunks from {}",
160                encode_dir.display()
161            )
162        })?
163        .collect::<Result<Vec<_>, _>>()?)
164}
165
166#[tracing::instrument(level = "debug")]
167pub fn mkvmerge(
168    temp_dir: &Path,
169    output: &Path,
170    encoder: Encoder,
171    num_chunks: usize,
172    output_fps: Option<Rational64>,
173) -> anyhow::Result<()> {
174    const MAXIMUM_CHUNKS_PER_MERGE: usize = 100;
175    // mkvmerge does not accept UNC paths on Windows
176    #[cfg(windows)]
177    fn fix_path<P: AsRef<Path>>(p: P) -> String {
178        const UNC_PREFIX: &str = r#"\\?\"#;
179
180        let p = p.as_ref().display().to_string();
181        p.strip_prefix(UNC_PREFIX).map_or_else(
182            || p.clone(),
183            |path| {
184                path.strip_prefix("UNC")
185                    .map_or_else(|| path.to_string(), |p2| format!("\\{p2}"))
186            },
187        )
188    }
189
190    #[cfg(not(windows))]
191    fn fix_path<P: AsRef<Path>>(p: P) -> String {
192        p.as_ref().display().to_string()
193    }
194
195    let audio_file = PathBuf::from(&temp_dir).join("audio.mkv");
196    let audio_file = PathAbs::new(&audio_file)?;
197    let audio_file = audio_file.as_path().exists().then(|| fix_path(audio_file));
198
199    let encode_dir = PathBuf::from(temp_dir).join("encode");
200
201    let output = PathAbs::new(output)?;
202
203    assert!(num_chunks != 0);
204
205    let num_chunk_groups = (num_chunks as f64 / MAXIMUM_CHUNKS_PER_MERGE as f64).ceil() as usize;
206    let chunk_groups: Vec<Vec<String>> = (0..num_chunk_groups)
207        .map(|group_index| {
208            let start = group_index * MAXIMUM_CHUNKS_PER_MERGE;
209            let end = (start + MAXIMUM_CHUNKS_PER_MERGE).min(num_chunks);
210            (start..end)
211                .map(|i| {
212                    format!(
213                        "{i:05}.{ext}",
214                        ext = match encoder {
215                            Encoder::x264 => "264",
216                            Encoder::x265 => "hevc",
217                            _ => "ivf",
218                        }
219                    )
220                })
221                .collect()
222        })
223        .collect();
224
225    chunk_groups.iter().enumerate().try_for_each(|(group_index, chunk_group)| {
226        let group_options_path =
227            PathBuf::from(&temp_dir).join(format!("group_options_{group_index:05}.json"));
228        let group_options_output_path = PathAbs::new(
229            PathBuf::from(&temp_dir).join(format!("group_output_{group_index:05}.mkv")),
230        )?;
231
232        let group_options_json_contents = mkvmerge_options_json(
233            chunk_group,
234            &fix_path(group_options_output_path.to_string_lossy().as_ref()),
235            None,
236            output_fps,
237        );
238
239        let mut group_options_json = File::create(group_options_path)?;
240        group_options_json.write_all(group_options_json_contents?.as_bytes())?;
241
242        let mut group_cmd = Command::new("mkvmerge");
243        group_cmd.current_dir(&encode_dir);
244        group_cmd.arg(format!("@../group_options_{group_index:05}.json"));
245
246        let group_out = group_cmd
247            .output()
248            .with_context(|| "Failed to execute mkvmerge command for concatenation")?;
249
250        if !group_out.status.success() {
251            return Err(anyhow::Error::msg(format!(
252                "Failed to execute mkvmerge command for concatenation: {}",
253                String::from_utf8_lossy(&group_out.stderr)
254            )));
255        }
256
257        Ok(())
258    })?;
259
260    let chunk_group_options_names: Vec<String> = (0..num_chunk_groups)
261        .map(|group_index| format!("group_output_{group_index:05}.mkv"))
262        .collect();
263
264    let options_path = PathBuf::from(&temp_dir).join("options.json");
265    let options_json_contents = mkvmerge_options_json(
266        &chunk_group_options_names,
267        &fix_path(output.to_string_lossy().as_ref()),
268        audio_file.as_deref(),
269        output_fps,
270    );
271
272    let mut options_json = File::create(options_path)?;
273    options_json.write_all(options_json_contents?.as_bytes())?;
274
275    let mut cmd = Command::new("mkvmerge");
276    cmd.current_dir(temp_dir);
277    cmd.arg("@./options.json");
278
279    let out = cmd
280        .output()
281        .with_context(|| "Failed to execute mkvmerge command for concatenation")?;
282
283    if !out.status.success() {
284        // TODO: make an EncoderCrash-like struct, but without all the other fields so
285        // it can be used in a more broad scope than just for the pipe/encoder
286        error!(
287            "mkvmerge concatenation failed with output: {:#?}\ncommand: {:?}",
288            out, cmd
289        );
290        return Err(anyhow!("mkvmerge concatenation failed"));
291    }
292
293    Ok(())
294}
295
296/// Create mkvmerge options.json
297#[tracing::instrument(level = "debug")]
298pub fn mkvmerge_options_json(
299    chunks: &[String],
300    output: &str,
301    audio: Option<&str>,
302    output_fps: Option<Rational64>,
303) -> anyhow::Result<String> {
304    let mut file_string = String::with_capacity(
305        64 + output.len()
306            + audio.map_or(0, |a| a.len() + 2)
307            + chunks.iter().map(|s| s.len() + 4).sum::<usize>(),
308    );
309    write!(file_string, "[\"-o\", {output:?}")?;
310    if let Some(audio) = audio {
311        write!(file_string, ", {audio:?}")?;
312    }
313    if let Some(output_fps) = output_fps {
314        write!(
315            file_string,
316            ", \"--default-duration\", \"0:{}/{}fps\", \"[\"",
317            output_fps.numer(),
318            output_fps.denom()
319        )?;
320    } else {
321        file_string.push_str(", \"[\"");
322    }
323    for chunk in chunks {
324        write!(file_string, ", \"{chunk}\"")?;
325    }
326    file_string.push_str(",\"]\"]");
327
328    Ok(file_string)
329}
330
331/// Concatenates using ffmpeg (does not work with x265, and may have incorrect
332/// FPS with vpx)
333#[tracing::instrument(level = "debug")]
334pub fn ffmpeg(temp: &Path, output: &Path) -> anyhow::Result<()> {
335    fn write_concat_file(temp_folder: &Path) -> anyhow::Result<()> {
336        let concat_file = temp_folder.join("concat");
337        let encode_folder = temp_folder.join("encode");
338
339        let mut files = read_encoded_chunks(&encode_folder)?;
340
341        files.sort_by_key(DirEntry::path);
342
343        let mut contents = String::with_capacity(24 * files.len());
344
345        for i in files {
346            writeln!(
347                contents,
348                "file {}",
349                format!("{path}", path = i.path().display())
350                    .replace('\\', r"\\")
351                    .replace(' ', r"\ ")
352                    .replace('\'', r"\'")
353            )?;
354        }
355
356        let mut file = File::create(concat_file)?;
357        file.write_all(contents.as_bytes())?;
358
359        Ok(())
360    }
361
362    let temp = PathAbs::new(temp)?;
363    let temp = temp.as_path();
364
365    let concat = temp.join("concat");
366    let concat_file = concat.to_string_lossy();
367
368    write_concat_file(temp)?;
369
370    let audio_file = {
371        let file = temp.join("audio.mkv");
372        (file.exists() && file.metadata().expect("file should have metadata").len() > 1000)
373            .then_some(file)
374    };
375
376    let mut cmd = Command::new("ffmpeg");
377
378    cmd.stdout(Stdio::piped());
379    cmd.stderr(Stdio::piped());
380
381    if let Some(file) = audio_file {
382        cmd.args([
383            "-y",
384            "-hide_banner",
385            "-loglevel",
386            "error",
387            "-f",
388            "concat",
389            "-safe",
390            "0",
391            "-i",
392            &concat_file,
393            "-i",
394        ])
395        .arg(file)
396        .args(["-map", "0", "-map", "1", "-c", "copy"])
397        .arg(output);
398    } else {
399        cmd.args([
400            "-y",
401            "-hide_banner",
402            "-loglevel",
403            "error",
404            "-f",
405            "concat",
406            "-safe",
407            "0",
408            "-i",
409            &concat_file,
410        ])
411        .args(["-map", "0", "-c", "copy"])
412        .arg(output);
413    }
414
415    debug!("FFmpeg concat command: {:?}", cmd);
416
417    let out = cmd
418        .output()
419        .with_context(|| "Failed to execute FFmpeg command for concatenation")?;
420
421    if !out.status.success() {
422        error!(
423            "FFmpeg concatenation failed with output: {:#?}\ncommand: {:?}",
424            out, cmd
425        );
426        return Err(anyhow!("FFmpeg concatenation failed"));
427    }
428
429    Ok(())
430}