stream_download/process/
mod.rs1use std::convert::Infallible;
18use std::ffi::OsString;
19use std::io::{self, Read, Write};
20use std::mem;
21use std::pin::Pin;
22use std::process::{ChildStdout, Stdio};
23use std::task::Poll;
24
25use bytes::Bytes;
26pub use command_builder::*;
27pub use ffmpeg::*;
28use futures_util::Stream;
29use tempfile::NamedTempFile;
30use tracing::{debug, error, warn};
31pub use yt_dlp::*;
32
33use crate::WrapIoResult;
34use crate::async_read::AsyncReadStream;
35use crate::source::{SourceStream, StreamOutcome};
36
37mod command_builder;
38mod ffmpeg;
39mod yt_dlp;
40
41pub trait SpawnCommand {
43 fn spawn(self) -> io::Result<SpawnedCommand>;
45}
46
47#[derive(Debug)]
49pub struct Command {
50 program: OsString,
51 args: Vec<OsString>,
52 stderr_handle: Option<Stdio>,
53}
54
55impl Command {
56 pub fn new<S>(program: S) -> Self
58 where
59 S: Into<OsString>,
60 {
61 Self {
62 program: program.into(),
63 args: Vec::new(),
64 stderr_handle: None,
65 }
66 }
67
68 #[must_use]
70 pub fn args<I, S>(mut self, args: I) -> Self
71 where
72 I: IntoIterator<Item = S>,
73 S: Into<OsString>,
74 {
75 for arg in args {
76 self = self.arg(arg);
77 }
78 self
79 }
80
81 #[must_use]
83 pub fn arg<S>(mut self, arg: S) -> Self
84 where
85 S: Into<OsString>,
86 {
87 self.args.push(arg.into());
88 self
89 }
90
91 pub fn insert_arg<S>(mut self, index: usize, arg: S) -> Self
93 where
94 S: Into<OsString>,
95 {
96 self.args.insert(index, arg.into());
97 self
98 }
99
100 #[must_use]
106 pub fn stderr_handle<S>(mut self, stderr_handle: S) -> Self
107 where
108 S: Into<Stdio>,
109 {
110 self.stderr_handle = Some(stderr_handle.into());
111 self
112 }
113}
114
115impl SpawnCommand for Command {
116 fn spawn(self) -> io::Result<SpawnedCommand> {
117 SpawnedCommand::new(self, None, Vec::new())
118 }
119}
120
121#[derive(Debug)]
123pub struct SpawnedCommand {
124 child_handle: tokio::process::Child,
125 stderr_files: Vec<NamedTempFile>,
126}
127
128impl SpawnedCommand {
129 fn new(
130 command: Command,
131 prev_out: Option<ChildStdout>,
132 mut stderr_files: Vec<NamedTempFile>,
133 ) -> Result<Self, io::Error> {
134 let mut tokio_command = tokio::process::Command::new(command.program);
135
136 tokio_command.args(command.args).stdout(Stdio::piped());
137 if let Some(handle) = command.stderr_handle {
138 tokio_command.stderr(handle);
139 } else {
140 let (stdio, stderr_file) = stdio_to_tmp_file()?;
141 tokio_command.stderr(stdio);
142 stderr_files.push(stderr_file);
143 }
144
145 if let Some(prev_out) = prev_out {
146 tokio_command.stdin(prev_out);
147 }
148
149 tokio_command.kill_on_drop(true);
150 #[cfg(target_os = "windows")]
151 {
152 tokio_command.creation_flags(0x08000000);
154 }
155
156 Ok(Self {
157 child_handle: tokio_command.spawn().wrap_err("error spawning process")?,
158 stderr_files,
159 })
160 }
161}
162
163fn stdio_to_tmp_file() -> io::Result<(Stdio, NamedTempFile)> {
164 let stderr_file = tempfile::NamedTempFile::new().wrap_err("error creating temp file")?;
165 let stdio = Stdio::from(
166 stderr_file
167 .as_file()
168 .try_clone()
169 .wrap_err("error cloning file")?,
170 );
171 Ok((stdio, stderr_file))
172}
173
174#[derive(Debug)]
176pub struct ProcessStreamParams {
177 content_length: Option<u64>,
178 command: SpawnedCommand,
179}
180
181impl ProcessStreamParams {
182 pub fn new<C>(command: C) -> io::Result<Self>
184 where
185 C: SpawnCommand,
186 {
187 Ok(Self {
188 command: command.spawn()?,
189 content_length: None,
190 })
191 }
192
193 #[must_use]
195 pub fn content_length<L>(self, content_length: L) -> Self
196 where
197 L: Into<Option<u64>>,
198 {
199 Self {
200 content_length: content_length.into(),
201 ..self
202 }
203 }
204}
205
206#[derive(Debug)]
209pub struct ProcessStream {
210 stream: AsyncReadStream<tokio::process::ChildStdout>,
211 child_handle: tokio::process::Child,
212 stderr_files: Vec<NamedTempFile>,
213}
214
215impl ProcessStream {
216 fn check_stderr_files(&mut self) {
217 for file in &mut self.stderr_files {
218 let _ = file
219 .flush()
220 .inspect_err(|e| error!("error flushing file: {e:?}"));
221 if let Ok(mut file_handle) = file
224 .reopen()
225 .inspect_err(|e| error!("error opening file: {e:?}"))
226 {
227 let mut buf = String::new();
228 let _ = file_handle
229 .read_to_string(&mut buf)
230 .inspect_err(|e| error!("error reading file: {e:?}"));
231 warn!("stderr from child process: {buf}");
232 }
233 }
234 }
235
236 fn close_stderr_files(&mut self) {
237 for file in mem::take(&mut self.stderr_files) {
238 let _ = file
239 .close()
240 .inspect_err(|e| warn!("error closing file: {e:?}"));
241 }
242 }
243}
244
245impl SourceStream for ProcessStream {
246 type Params = ProcessStreamParams;
247 type StreamCreationError = Infallible;
248
249 async fn create(params: Self::Params) -> Result<Self, Self::StreamCreationError> {
250 let ProcessStreamParams {
251 content_length,
252 mut command,
253 } = params;
254
255 Ok(Self {
256 stream: AsyncReadStream::new(
257 command.child_handle.stdout.take().expect("stdout missing"),
258 content_length,
259 ),
260 child_handle: command.child_handle,
261 stderr_files: command.stderr_files,
262 })
263 }
264
265 fn content_length(&self) -> Option<u64> {
266 self.stream.content_length()
267 }
268
269 fn supports_seek(&self) -> bool {
270 self.stream.supports_seek()
271 }
272
273 async fn seek_range(&mut self, start: u64, end: Option<u64>) -> io::Result<()> {
274 self.stream.seek_range(start, end).await
275 }
276
277 async fn reconnect(&mut self, current_position: u64) -> io::Result<()> {
278 self.stream.reconnect(current_position).await
279 }
280
281 async fn on_finish(
282 &mut self,
283 result: io::Result<()>,
284 outcome: StreamOutcome,
285 ) -> io::Result<()> {
286 let check_command_error = if result.is_ok() {
287 let wait_res = self.child_handle.wait().await?;
288 let command_failed = !wait_res.success();
289 if command_failed {
290 warn!("command exited with error code: {wait_res:?}");
291 }
292 command_failed
293 } else {
294 debug!("killing child process");
295 self.child_handle.kill().await?;
296 debug!("child process killed");
297 outcome == StreamOutcome::Completed
299 };
300
301 if check_command_error {
302 self.check_stderr_files();
303 }
304 self.close_stderr_files();
305
306 result
307 }
308}
309
310impl Stream for ProcessStream {
311 type Item = io::Result<Bytes>;
312
313 fn poll_next(
314 mut self: std::pin::Pin<&mut Self>,
315 cx: &mut std::task::Context<'_>,
316 ) -> Poll<Option<Self::Item>> {
317 Pin::new(&mut self.stream).poll_next(cx)
318 }
319}