stream_download/process/
mod.rs

1//! A [`SourceStream`] adapter for reading data from external processes. This is a wrapper on top of
2//! the [`async_read`][crate::async_read] module.
3//!
4//! Due to limitations with reading `stdout` and `stderr` simultaneously while piping large amounts
5//! of data to `stdin` (see [`std::process::Stdio::piped`]), the child process' `stderr` handle
6//! will be redirected to a temporary file rather than piped directly into the program unless
7//! explicitly overridden with [`Command::stderr_handle`].
8//!
9//! This implementation makes the assumption that any commands used will output binary data to
10//! `stdout` and any error messages will be logged to `stderr`. You probably want to disable
11//! [`Settings::cancel_on_drop`][crate::Settings::cancel_on_drop] when using this module in order
12//! to ensure all error messages are flushed before the process is stopped.
13//!
14//! Helpers for interacting with `yt-dlp` for extracting media from specific sites and `ffmpeg` for
15//! post-processing are also included.
16
17use std::convert::Infallible;
18use std::ffi::OsString;
19use std::io::{self, Read, Write};
20use std::mem;
21use std::pin::Pin;
22use std::process::{ChildStdout, Stdio};
23use std::task::Poll;
24
25use bytes::Bytes;
26pub use command_builder::*;
27pub use ffmpeg::*;
28use futures_util::Stream;
29use tempfile::NamedTempFile;
30use tracing::{debug, error, warn};
31pub use yt_dlp::*;
32
33use crate::WrapIoResult;
34use crate::async_read::AsyncReadStream;
35use crate::source::{SourceStream, StreamOutcome};
36
37mod command_builder;
38mod ffmpeg;
39mod yt_dlp;
40
41/// Trait used by objects that can spawn a command suitable for use with a [`ProcessStream`].
42pub trait SpawnCommand {
43    /// Spawns the command with the stdout and stderr handles configured appropriately.
44    fn spawn(self) -> io::Result<SpawnedCommand>;
45}
46
47/// A simplified representation of an OS command that can be used to create a [`SpawnedCommand`].
48#[derive(Debug)]
49pub struct Command {
50    program: OsString,
51    args: Vec<OsString>,
52    stderr_handle: Option<Stdio>,
53}
54
55impl Command {
56    /// Creates a new [`Command`].
57    pub fn new<S>(program: S) -> Self
58    where
59        S: Into<OsString>,
60    {
61        Self {
62            program: program.into(),
63            args: Vec::new(),
64            stderr_handle: None,
65        }
66    }
67
68    /// Adds multiple arguments to the [`Command`].
69    #[must_use]
70    pub fn args<I, S>(mut self, args: I) -> Self
71    where
72        I: IntoIterator<Item = S>,
73        S: Into<OsString>,
74    {
75        for arg in args {
76            self = self.arg(arg);
77        }
78        self
79    }
80
81    /// Adds a single argument to the [`Command`].
82    #[must_use]
83    pub fn arg<S>(mut self, arg: S) -> Self
84    where
85        S: Into<OsString>,
86    {
87        self.args.push(arg.into());
88        self
89    }
90
91    /// Inserts an argument at the given position.
92    pub fn insert_arg<S>(mut self, index: usize, arg: S) -> Self
93    where
94        S: Into<OsString>,
95    {
96        self.args.insert(index, arg.into());
97        self
98    }
99
100    /// Sets the [`Stdio`] handle for the `stderr` stream of the command. This is used for error
101    /// reporting in case of a failure.
102    ///
103    /// Setting this value will override the default behavior of creating a temporary file to store
104    /// the output. See [the module documentation][crate::process] for why this is necessary.
105    #[must_use]
106    pub fn stderr_handle<S>(mut self, stderr_handle: S) -> Self
107    where
108        S: Into<Stdio>,
109    {
110        self.stderr_handle = Some(stderr_handle.into());
111        self
112    }
113}
114
115impl SpawnCommand for Command {
116    fn spawn(self) -> io::Result<SpawnedCommand> {
117        SpawnedCommand::new(self, None, Vec::new())
118    }
119}
120
121/// A representation of a [`tokio::process::Command`] that's been spawned.
122#[derive(Debug)]
123pub struct SpawnedCommand {
124    child_handle: tokio::process::Child,
125    stderr_files: Vec<NamedTempFile>,
126}
127
128impl SpawnedCommand {
129    fn new(
130        command: Command,
131        prev_out: Option<ChildStdout>,
132        mut stderr_files: Vec<NamedTempFile>,
133    ) -> Result<Self, io::Error> {
134        let mut tokio_command = tokio::process::Command::new(command.program);
135
136        tokio_command.args(command.args).stdout(Stdio::piped());
137        if let Some(handle) = command.stderr_handle {
138            tokio_command.stderr(handle);
139        } else {
140            let (stdio, stderr_file) = stdio_to_tmp_file()?;
141            tokio_command.stderr(stdio);
142            stderr_files.push(stderr_file);
143        }
144
145        if let Some(prev_out) = prev_out {
146            tokio_command.stdin(prev_out);
147        }
148
149        tokio_command.kill_on_drop(true);
150        #[cfg(target_os = "windows")]
151        {
152            // CREATE_NO_WINDOW
153            tokio_command.creation_flags(0x08000000);
154        }
155
156        Ok(Self {
157            child_handle: tokio_command.spawn().wrap_err("error spawning process")?,
158            stderr_files,
159        })
160    }
161}
162
163fn stdio_to_tmp_file() -> io::Result<(Stdio, NamedTempFile)> {
164    let stderr_file = tempfile::NamedTempFile::new().wrap_err("error creating temp file")?;
165    let stdio = Stdio::from(
166        stderr_file
167            .as_file()
168            .try_clone()
169            .wrap_err("error cloning file")?,
170    );
171    Ok((stdio, stderr_file))
172}
173
174/// Parameters for creating a [`ProcessStream`].
175#[derive(Debug)]
176pub struct ProcessStreamParams {
177    content_length: Option<u64>,
178    command: SpawnedCommand,
179}
180
181impl ProcessStreamParams {
182    /// Creates a new [`ProcessStreamParams`].
183    pub fn new<C>(command: C) -> io::Result<Self>
184    where
185        C: SpawnCommand,
186    {
187        Ok(Self {
188            command: command.spawn()?,
189            content_length: None,
190        })
191    }
192
193    /// Set the content length of the stream.
194    #[must_use]
195    pub fn content_length<L>(self, content_length: L) -> Self
196    where
197        L: Into<Option<u64>>,
198    {
199        Self {
200            content_length: content_length.into(),
201            ..self
202        }
203    }
204}
205
206/// A [`SourceStream`] implementation that asynchronously reads the `stdout` byte stream from a
207/// [`tokio::process::Command`].
208#[derive(Debug)]
209pub struct ProcessStream {
210    stream: AsyncReadStream<tokio::process::ChildStdout>,
211    child_handle: tokio::process::Child,
212    stderr_files: Vec<NamedTempFile>,
213}
214
215impl ProcessStream {
216    fn check_stderr_files(&mut self) {
217        for file in &mut self.stderr_files {
218            let _ = file
219                .flush()
220                .inspect_err(|e| error!("error flushing file: {e:?}"));
221            // Need to reopen the file to access the contents since it was written to from an
222            // external process
223            if let Ok(mut file_handle) = file
224                .reopen()
225                .inspect_err(|e| error!("error opening file: {e:?}"))
226            {
227                let mut buf = String::new();
228                let _ = file_handle
229                    .read_to_string(&mut buf)
230                    .inspect_err(|e| error!("error reading file: {e:?}"));
231                warn!("stderr from child process: {buf}");
232            }
233        }
234    }
235
236    fn close_stderr_files(&mut self) {
237        for file in mem::take(&mut self.stderr_files) {
238            let _ = file
239                .close()
240                .inspect_err(|e| warn!("error closing file: {e:?}"));
241        }
242    }
243}
244
245impl SourceStream for ProcessStream {
246    type Params = ProcessStreamParams;
247    type StreamCreationError = Infallible;
248
249    async fn create(params: Self::Params) -> Result<Self, Self::StreamCreationError> {
250        let ProcessStreamParams {
251            content_length,
252            mut command,
253        } = params;
254
255        Ok(Self {
256            stream: AsyncReadStream::new(
257                command.child_handle.stdout.take().expect("stdout missing"),
258                content_length,
259            ),
260            child_handle: command.child_handle,
261            stderr_files: command.stderr_files,
262        })
263    }
264
265    fn content_length(&self) -> Option<u64> {
266        self.stream.content_length()
267    }
268
269    fn supports_seek(&self) -> bool {
270        self.stream.supports_seek()
271    }
272
273    async fn seek_range(&mut self, start: u64, end: Option<u64>) -> io::Result<()> {
274        self.stream.seek_range(start, end).await
275    }
276
277    async fn reconnect(&mut self, current_position: u64) -> io::Result<()> {
278        self.stream.reconnect(current_position).await
279    }
280
281    async fn on_finish(
282        &mut self,
283        result: io::Result<()>,
284        outcome: StreamOutcome,
285    ) -> io::Result<()> {
286        let check_command_error = if result.is_ok() {
287            let wait_res = self.child_handle.wait().await?;
288            let command_failed = !wait_res.success();
289            if command_failed {
290                warn!("command exited with error code: {wait_res:?}");
291            }
292            command_failed
293        } else {
294            debug!("killing child process");
295            self.child_handle.kill().await?;
296            debug!("child process killed");
297            // Don't need to check process for errors if the user explicitly cancelled the stream
298            outcome == StreamOutcome::Completed
299        };
300
301        if check_command_error {
302            self.check_stderr_files();
303        }
304        self.close_stderr_files();
305
306        result
307    }
308}
309
310impl Stream for ProcessStream {
311    type Item = io::Result<Bytes>;
312
313    fn poll_next(
314        mut self: std::pin::Pin<&mut Self>,
315        cx: &mut std::task::Context<'_>,
316    ) -> Poll<Option<Self::Item>> {
317        Pin::new(&mut self.stream).poll_next(cx)
318    }
319}