use std::convert::Infallible;
use std::ffi::OsString;
use std::io::{self, Read, Write};
use std::mem;
use std::pin::Pin;
use std::process::{ChildStdout, Stdio};
use std::task::Poll;
use bytes::Bytes;
pub use command_builder::*;
pub use ffmpeg::*;
use futures_util::Stream;
use tempfile::NamedTempFile;
use tracing::{debug, error, warn};
pub use yt_dlp::*;
use crate::WrapIoResult;
use crate::async_read::AsyncReadStream;
use crate::source::{SourceStream, StreamOutcome};
mod command_builder;
mod ffmpeg;
mod yt_dlp;
pub trait SpawnCommand {
fn spawn(self) -> io::Result<SpawnedCommand>;
}
#[derive(Debug)]
pub struct Command {
program: OsString,
args: Vec<OsString>,
stderr_handle: Option<Stdio>,
}
impl Command {
pub fn new<S>(program: S) -> Self
where
S: Into<OsString>,
{
Self {
program: program.into(),
args: Vec::new(),
stderr_handle: None,
}
}
#[must_use]
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<OsString>,
{
for arg in args {
self = self.arg(arg);
}
self
}
#[must_use]
pub fn arg<S>(mut self, arg: S) -> Self
where
S: Into<OsString>,
{
self.args.push(arg.into());
self
}
pub fn insert_arg<S>(mut self, index: usize, arg: S) -> Self
where
S: Into<OsString>,
{
self.args.insert(index, arg.into());
self
}
#[must_use]
pub fn stderr_handle<S>(mut self, stderr_handle: S) -> Self
where
S: Into<Stdio>,
{
self.stderr_handle = Some(stderr_handle.into());
self
}
}
impl SpawnCommand for Command {
fn spawn(self) -> io::Result<SpawnedCommand> {
SpawnedCommand::new(self, None, Vec::new())
}
}
#[derive(Debug)]
pub struct SpawnedCommand {
child_handle: tokio::process::Child,
stderr_files: Vec<NamedTempFile>,
}
impl SpawnedCommand {
fn new(
command: Command,
prev_out: Option<ChildStdout>,
mut stderr_files: Vec<NamedTempFile>,
) -> Result<Self, io::Error> {
let mut tokio_command = tokio::process::Command::new(command.program);
tokio_command.args(command.args).stdout(Stdio::piped());
if let Some(handle) = command.stderr_handle {
tokio_command.stderr(handle);
} else {
let (stdio, stderr_file) = stdio_to_tmp_file()?;
tokio_command.stderr(stdio);
stderr_files.push(stderr_file);
}
if let Some(prev_out) = prev_out {
tokio_command.stdin(prev_out);
}
tokio_command.kill_on_drop(true);
#[cfg(target_os = "windows")]
{
tokio_command.creation_flags(0x08000000);
}
Ok(Self {
child_handle: tokio_command.spawn().wrap_err("error spawning process")?,
stderr_files,
})
}
}
fn stdio_to_tmp_file() -> io::Result<(Stdio, NamedTempFile)> {
let stderr_file = tempfile::NamedTempFile::new().wrap_err("error creating temp file")?;
let stdio = Stdio::from(
stderr_file
.as_file()
.try_clone()
.wrap_err("error cloning file")?,
);
Ok((stdio, stderr_file))
}
#[derive(Debug)]
pub struct ProcessStreamParams {
content_length: Option<u64>,
command: SpawnedCommand,
}
impl ProcessStreamParams {
pub fn new<C>(command: C) -> io::Result<Self>
where
C: SpawnCommand,
{
Ok(Self {
command: command.spawn()?,
content_length: None,
})
}
#[must_use]
pub fn content_length<L>(self, content_length: L) -> Self
where
L: Into<Option<u64>>,
{
Self {
content_length: content_length.into(),
..self
}
}
}
#[derive(Debug)]
pub struct ProcessStream {
stream: AsyncReadStream<tokio::process::ChildStdout>,
child_handle: tokio::process::Child,
stderr_files: Vec<NamedTempFile>,
}
impl ProcessStream {
fn check_stderr_files(&mut self) {
for file in &mut self.stderr_files {
let _ = file
.flush()
.inspect_err(|e| error!("error flushing file: {e:?}"));
if let Ok(mut file_handle) = file
.reopen()
.inspect_err(|e| error!("error opening file: {e:?}"))
{
let mut buf = String::new();
let _ = file_handle
.read_to_string(&mut buf)
.inspect_err(|e| error!("error reading file: {e:?}"));
warn!("stderr from child process: {buf}");
}
}
}
fn close_stderr_files(&mut self) {
for file in mem::take(&mut self.stderr_files) {
let _ = file
.close()
.inspect_err(|e| warn!("error closing file: {e:?}"));
}
}
}
impl SourceStream for ProcessStream {
type Params = ProcessStreamParams;
type StreamCreationError = Infallible;
async fn create(params: Self::Params) -> Result<Self, Self::StreamCreationError> {
let ProcessStreamParams {
content_length,
mut command,
} = params;
Ok(Self {
stream: AsyncReadStream::new(
command.child_handle.stdout.take().expect("stdout missing"),
content_length,
),
child_handle: command.child_handle,
stderr_files: command.stderr_files,
})
}
fn content_length(&self) -> Option<u64> {
self.stream.content_length()
}
fn supports_seek(&self) -> bool {
self.stream.supports_seek()
}
async fn seek_range(&mut self, start: u64, end: Option<u64>) -> io::Result<()> {
self.stream.seek_range(start, end).await
}
async fn reconnect(&mut self, current_position: u64) -> io::Result<()> {
self.stream.reconnect(current_position).await
}
async fn on_finish(
&mut self,
result: io::Result<()>,
outcome: StreamOutcome,
) -> io::Result<()> {
let check_command_error = if result.is_ok() {
let wait_res = self.child_handle.wait().await?;
let command_failed = !wait_res.success();
if command_failed {
warn!("command exited with error code: {wait_res:?}");
}
command_failed
} else {
debug!("killing child process");
self.child_handle.kill().await?;
debug!("child process killed");
outcome == StreamOutcome::Completed
};
if check_command_error {
self.check_stderr_files();
}
self.close_stderr_files();
result
}
}
impl Stream for ProcessStream {
type Item = io::Result<Bytes>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}