1#[cfg(test)]
2mod tests;
3
4use std::{
5 fmt::{Display, Write as FmtWrite},
6 fs::{self, DirEntry, File},
7 io::Write,
8 path::{Path, PathBuf},
9 process::{Command, Stdio},
10 sync::Arc,
11};
12
13use anyhow::{anyhow, Context};
14use av_format::{
15 buffer::AccReader,
16 demuxer::{Context as DemuxerContext, Event},
17 muxer::{Context as MuxerContext, Writer},
18 rational::Rational64,
19};
20use av_ivf::{demuxer::IvfDemuxer, muxer::IvfMuxer};
21use path_abs::{PathAbs, PathInfo};
22use serde::{Deserialize, Serialize};
23use tracing::{debug, error, trace, warn};
24
25use crate::{encoder::Encoder, util::read_in_dir};
26
27#[derive(
28 PartialEq,
29 Eq,
30 Copy,
31 Clone,
32 Serialize,
33 Deserialize,
34 Debug,
35 strum::EnumString,
36 strum::IntoStaticStr,
37)]
38pub enum ConcatMethod {
39 #[strum(serialize = "mkvmerge")]
40 MKVMerge,
41 #[strum(serialize = "ffmpeg")]
42 FFmpeg,
43 #[strum(serialize = "ivf")]
44 Ivf,
45}
46
47impl Display for ConcatMethod {
48 #[inline]
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.write_str(<&'static str>::from(self))
51 }
52}
53
54#[tracing::instrument(level = "debug")]
55pub fn sort_files_by_filename(files: &mut [PathBuf]) {
56 files.sort_unstable_by_key(|x| {
57 x.file_stem()
60 .expect("should have file stem")
61 .to_string_lossy()
62 .parse::<u32>()
63 .expect("files should follow numeric pattern")
64 });
65}
66
67#[tracing::instrument(level = "debug")]
68pub fn ivf(input: &Path, out: &Path) -> anyhow::Result<()> {
69 let mut files: Vec<PathBuf> = read_in_dir(input)?.collect();
70
71 sort_files_by_filename(&mut files);
72
73 assert!(!files.is_empty());
74
75 let output = File::create(out)?;
76
77 let mut muxer = MuxerContext::new(IvfMuxer::new(), Writer::new(output));
78
79 let global_info = {
80 let acc = AccReader::new(std::fs::File::open(&files[0])?);
81 let mut demuxer = DemuxerContext::new(IvfDemuxer::new(), acc);
82
83 demuxer.read_headers()?;
84
85 let duration = demuxer.info.duration.unwrap_or(0)
87 + files.iter().skip(1).try_fold(0u64, |sum, file| -> anyhow::Result<_> {
88 let acc = AccReader::new(std::fs::File::open(file)?);
89 let mut demuxer = DemuxerContext::new(IvfDemuxer::new(), acc);
90
91 demuxer.read_headers()?;
92 Ok(sum + demuxer.info.duration.unwrap_or(0))
93 })?;
94
95 let mut info = demuxer.info;
96 info.duration = Some(duration);
97 info
98 };
99
100 muxer.set_global_info(global_info)?;
101
102 muxer.configure()?;
103 muxer.write_header()?;
104
105 let mut pos_offset: usize = 0;
106 for file in &files {
107 let mut last_pos: usize = 0;
108 let input = std::fs::File::open(file)?;
109
110 let acc = AccReader::new(input);
111
112 let mut demuxer = DemuxerContext::new(IvfDemuxer::new(), acc);
113 demuxer.read_headers()?;
114
115 trace!("global info: {:#?}", demuxer.info);
116
117 loop {
118 match demuxer.read_event() {
119 Ok(event) => match event {
120 Event::MoreDataNeeded(sz) => panic!("needed more data: {sz} bytes"),
121 Event::NewStream(s) => panic!("new stream: {s:?}"),
122 Event::NewPacket(mut packet) => {
123 if let Some(p) = packet.pos.as_mut() {
124 last_pos = *p;
125 *p += pos_offset;
126 }
127
128 trace!("received packet with pos: {:?}", packet.pos);
129 muxer.write_packet(Arc::new(packet))?;
130 },
131 Event::Continue => {
132 },
134 Event::Eof => {
135 trace!("EOF received.");
136 break;
137 },
138 _ => unimplemented!(),
139 },
140 Err(e) => {
141 error!("{:?}", e);
142 break;
143 },
144 }
145 }
146 pos_offset += last_pos + 1;
147 }
148
149 muxer.write_trailer()?;
150
151 Ok(())
152}
153
154#[tracing::instrument(level = "debug")]
155fn read_encoded_chunks(encode_dir: &Path) -> anyhow::Result<Vec<DirEntry>> {
156 Ok(fs::read_dir(encode_dir)
157 .with_context(|| {
158 format!(
159 "Failed to read encoded chunks from {}",
160 encode_dir.display()
161 )
162 })?
163 .collect::<Result<Vec<_>, _>>()?)
164}
165
166#[tracing::instrument(level = "debug")]
167pub fn mkvmerge(
168 temp_dir: &Path,
169 output: &Path,
170 encoder: Encoder,
171 num_chunks: usize,
172 output_fps: Option<Rational64>,
173) -> anyhow::Result<()> {
174 const MAXIMUM_CHUNKS_PER_MERGE: usize = 100;
175 #[cfg(windows)]
177 fn fix_path<P: AsRef<Path>>(p: P) -> String {
178 const UNC_PREFIX: &str = r#"\\?\"#;
179
180 let p = p.as_ref().display().to_string();
181 p.strip_prefix(UNC_PREFIX).map_or_else(
182 || p.clone(),
183 |path| {
184 path.strip_prefix("UNC")
185 .map_or_else(|| path.to_string(), |p2| format!("\\{p2}"))
186 },
187 )
188 }
189
190 #[cfg(not(windows))]
191 fn fix_path<P: AsRef<Path>>(p: P) -> String {
192 p.as_ref().display().to_string()
193 }
194
195 let audio_file = PathBuf::from(&temp_dir).join("audio.mkv");
196 let audio_file = PathAbs::new(&audio_file)?;
197 let audio_file = audio_file.as_path().exists().then(|| fix_path(audio_file));
198
199 let encode_dir = PathBuf::from(temp_dir).join("encode");
200
201 let output = PathAbs::new(output)?;
202
203 assert!(num_chunks != 0);
204
205 let num_chunk_groups = (num_chunks as f64 / MAXIMUM_CHUNKS_PER_MERGE as f64).ceil() as usize;
206 let chunk_groups: Vec<Vec<String>> = (0..num_chunk_groups)
207 .map(|group_index| {
208 let start = group_index * MAXIMUM_CHUNKS_PER_MERGE;
209 let end = (start + MAXIMUM_CHUNKS_PER_MERGE).min(num_chunks);
210 (start..end)
211 .map(|i| {
212 format!(
213 "{i:05}.{ext}",
214 ext = match encoder {
215 Encoder::x264 => "264",
216 Encoder::x265 => "hevc",
217 _ => "ivf",
218 }
219 )
220 })
221 .collect()
222 })
223 .collect();
224
225 chunk_groups.iter().enumerate().try_for_each(|(group_index, chunk_group)| {
226 let group_options_path =
227 PathBuf::from(&temp_dir).join(format!("group_options_{group_index:05}.json"));
228 let group_options_output_path = PathAbs::new(
229 PathBuf::from(&temp_dir).join(format!("group_output_{group_index:05}.mkv")),
230 )?;
231
232 let group_options_json_contents = mkvmerge_options_json(
233 chunk_group,
234 &fix_path(group_options_output_path.to_string_lossy().as_ref()),
235 None,
236 output_fps,
237 );
238
239 let mut group_options_json = File::create(group_options_path)?;
240 group_options_json.write_all(group_options_json_contents?.as_bytes())?;
241
242 let mut group_cmd = Command::new("mkvmerge");
243 group_cmd.current_dir(&encode_dir);
244 group_cmd.arg(format!("@../group_options_{group_index:05}.json"));
245
246 let group_out = group_cmd
247 .output()
248 .with_context(|| "Failed to execute mkvmerge command for concatenation")?;
249
250 if !group_out.status.success() {
251 return Err(anyhow::Error::msg(format!(
252 "Failed to execute mkvmerge command for concatenation: {}",
253 String::from_utf8_lossy(&group_out.stderr)
254 )));
255 }
256
257 Ok(())
258 })?;
259
260 let chunk_group_options_names: Vec<String> = (0..num_chunk_groups)
261 .map(|group_index| format!("group_output_{group_index:05}.mkv"))
262 .collect();
263
264 let options_path = PathBuf::from(&temp_dir).join("options.json");
265 let options_json_contents = mkvmerge_options_json(
266 &chunk_group_options_names,
267 &fix_path(output.to_string_lossy().as_ref()),
268 audio_file.as_deref(),
269 output_fps,
270 );
271
272 let mut options_json = File::create(options_path)?;
273 options_json.write_all(options_json_contents?.as_bytes())?;
274
275 let mut cmd = Command::new("mkvmerge");
276 cmd.current_dir(temp_dir);
277 cmd.arg("@./options.json");
278
279 let out = cmd
280 .output()
281 .with_context(|| "Failed to execute mkvmerge command for concatenation")?;
282
283 if !out.status.success() {
284 error!(
287 "mkvmerge concatenation failed with output: {:#?}\ncommand: {:?}",
288 out, cmd
289 );
290 return Err(anyhow!("mkvmerge concatenation failed"));
291 }
292
293 Ok(())
294}
295
296#[tracing::instrument(level = "debug")]
298pub fn mkvmerge_options_json(
299 chunks: &[String],
300 output: &str,
301 audio: Option<&str>,
302 output_fps: Option<Rational64>,
303) -> anyhow::Result<String> {
304 let mut file_string = String::with_capacity(
305 64 + output.len()
306 + audio.map_or(0, |a| a.len() + 2)
307 + chunks.iter().map(|s| s.len() + 4).sum::<usize>(),
308 );
309 write!(file_string, "[\"-o\", {output:?}")?;
310 if let Some(audio) = audio {
311 write!(file_string, ", {audio:?}")?;
312 }
313 if let Some(output_fps) = output_fps {
314 write!(
315 file_string,
316 ", \"--default-duration\", \"0:{}/{}fps\", \"[\"",
317 output_fps.numer(),
318 output_fps.denom()
319 )?;
320 } else {
321 file_string.push_str(", \"[\"");
322 }
323 for chunk in chunks {
324 write!(file_string, ", \"{chunk}\"")?;
325 }
326 file_string.push_str(",\"]\"]");
327
328 Ok(file_string)
329}
330
331#[tracing::instrument(level = "debug")]
334pub fn ffmpeg(temp: &Path, output: &Path) -> anyhow::Result<()> {
335 fn write_concat_file(temp_folder: &Path) -> anyhow::Result<()> {
336 let concat_file = temp_folder.join("concat");
337 let encode_folder = temp_folder.join("encode");
338
339 let mut files = read_encoded_chunks(&encode_folder)?;
340
341 files.sort_by_key(DirEntry::path);
342
343 let mut contents = String::with_capacity(24 * files.len());
344
345 for i in files {
346 writeln!(
347 contents,
348 "file {}",
349 format!("{path}", path = i.path().display())
350 .replace('\\', r"\\")
351 .replace(' ', r"\ ")
352 .replace('\'', r"\'")
353 )?;
354 }
355
356 let mut file = File::create(concat_file)?;
357 file.write_all(contents.as_bytes())?;
358
359 Ok(())
360 }
361
362 let temp = PathAbs::new(temp)?;
363 let temp = temp.as_path();
364
365 let concat = temp.join("concat");
366 let concat_file = concat.to_string_lossy();
367
368 write_concat_file(temp)?;
369
370 let audio_file = {
371 let file = temp.join("audio.mkv");
372 (file.exists() && file.metadata().expect("file should have metadata").len() > 1000)
373 .then_some(file)
374 };
375
376 let mut cmd = Command::new("ffmpeg");
377
378 cmd.stdout(Stdio::piped());
379 cmd.stderr(Stdio::piped());
380
381 if let Some(file) = audio_file {
382 cmd.args([
383 "-y",
384 "-hide_banner",
385 "-loglevel",
386 "error",
387 "-f",
388 "concat",
389 "-safe",
390 "0",
391 "-i",
392 &concat_file,
393 "-i",
394 ])
395 .arg(file)
396 .args(["-map", "0", "-map", "1", "-c", "copy"])
397 .arg(output);
398 } else {
399 cmd.args([
400 "-y",
401 "-hide_banner",
402 "-loglevel",
403 "error",
404 "-f",
405 "concat",
406 "-safe",
407 "0",
408 "-i",
409 &concat_file,
410 ])
411 .args(["-map", "0", "-c", "copy"])
412 .arg(output);
413 }
414
415 debug!("FFmpeg concat command: {:?}", cmd);
416
417 let out = cmd
418 .output()
419 .with_context(|| "Failed to execute FFmpeg command for concatenation")?;
420
421 if !out.status.success() {
422 error!(
423 "FFmpeg concatenation failed with output: {:#?}\ncommand: {:?}",
424 out, cmd
425 );
426 return Err(anyhow!("FFmpeg concatenation failed"));
427 }
428
429 Ok(())
430}