use std::borrow::Cow;
use std::collections::{BinaryHeap, HashMap};
use std::env;
use std::io::Write;
use std::path::{Path, PathBuf};
use anyhow::Result;
use tokio::io::AsyncWriteExt;
use tokio::{fs, process};
use tracing::{event, Level};
use crate::livestream::{MediaFormat, Segment, Stream};
pub async fn concat_streams<P: AsRef<Path>>(
downloaded_paths: &HashMap<Stream, BinaryHeap<(Segment, PathBuf)>>,
output_dir: P,
) -> Result<HashMap<u64, Vec<(&Stream, PathBuf)>>> {
let mut discons: HashMap<_, Vec<_>> = HashMap::new();
for (stream, segments) in downloaded_paths.iter() {
let mut segments_to_process: Vec<(&Segment, &PathBuf)> = Vec::new();
let mut cur_discon_seq = None;
let segments = segments.clone().into_sorted_vec();
for (segment, path) in segments.iter() {
if cur_discon_seq.is_none() {
cur_discon_seq = Some(segment.discon_seq);
}
if cur_discon_seq.map(|x| x == segment.discon_seq).unwrap() {
segments_to_process.push((segment, path));
} else {
if !segments_to_process.is_empty() {
let file_path = gen_concat_path(
stream,
segments_to_process[0].0,
&output_dir,
cur_discon_seq.unwrap(),
)?;
concat_segments(segments_to_process.as_slice(), &file_path).await?;
discons
.entry(cur_discon_seq.unwrap())
.or_default()
.push((stream, file_path));
}
segments_to_process.clear();
segments_to_process.push((segment, path));
cur_discon_seq = Some(segment.discon_seq);
}
}
if !segments_to_process.is_empty() {
let d = cur_discon_seq.unwrap();
let file_path = gen_concat_path(stream, segments_to_process[0].0, &output_dir, d)?;
concat_segments(segments_to_process.as_slice(), &file_path).await?;
discons.entry(d).or_default().push((stream, file_path));
}
}
Ok(discons)
}
fn gen_concat_path(
stream: &Stream,
segment: &Segment,
output_dir: impl AsRef<Path>,
d: u64,
) -> Result<PathBuf> {
let ext = segment.format.extension();
let file_name = format!("{}_{:010}.{}", stream, d, ext);
let file_path = output_dir.as_ref().join(file_name);
Ok(file_path)
}
async fn concat_segments<P: AsRef<Path>>(inputs: &[(&Segment, P)], output: P) -> Result<()> {
if should_use_ffmpeg_concat(inputs[0].0).await? {
ffmpeg_concat(inputs.iter().map(|(_, p)| p), &output).await
} else {
file_concat(inputs.iter().map(|(_, p)| p), &output).await
}
}
async fn file_concat<P: AsRef<Path>>(
input_paths: impl IntoIterator<Item = P>,
output: P,
) -> Result<()> {
event!(
Level::INFO,
"File concat to temporary file {:?}",
output.as_ref()
);
let mut file = fs::File::create(output.as_ref()).await?;
for path in input_paths {
file.write_all(&fs::read(path.as_ref()).await?).await?;
}
Ok(())
}
async fn ffmpeg_concat<P: AsRef<Path>>(
input_paths: impl IntoIterator<Item = P>,
output: P,
) -> Result<()> {
event!(
Level::INFO,
"ffmpeg concat demux to temporary file {:?}",
output.as_ref()
);
let file = tempfile::NamedTempFile::new()?;
let cwd = env::current_dir()?;
for path in input_paths {
let absolute_path = if path.as_ref().is_absolute() {
Cow::from(path.as_ref())
} else {
Cow::Owned(cwd.join(path))
};
writeln!(
file.as_file(),
"file '{}'",
absolute_path.as_ref().to_str().unwrap()
)?;
}
let mut cmd = process::Command::new("ffmpeg");
cmd.arg("-y")
.arg("-f")
.arg("concat")
.arg("-safe")
.arg("0")
.arg("-i")
.arg(file.path())
.arg("-c")
.arg("copy")
.arg("-fflags")
.arg("+genpts")
.arg(output.as_ref())
.kill_on_drop(true);
event!(Level::TRACE, "{:?}", cmd);
let output = cmd.output().await?;
event!(
Level::TRACE,
"ffmpeg stdout: {:#?}",
String::from_utf8_lossy(&output.stdout)
);
event!(
Level::TRACE,
"ffmpeg stderr: {:#?}",
String::from_utf8_lossy(&output.stderr)
);
if !output.status.success() {
return Err(anyhow::anyhow!("ffmpeg command failed"));
}
Ok(())
}
async fn should_use_ffmpeg_concat(segment: &Segment) -> Result<bool> {
#[allow(clippy::match_like_matches_macro)]
let use_ffmpeg = match segment.format {
MediaFormat::Mp3 => true,
_ => false,
};
Ok(use_ffmpeg)
}