1use std::{process::Child, time::Duration};
23use futures::{
4 channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
5 SinkExt,
6};
7use thiserror::Error;
8use tokio::{
9 io::{AsyncBufReadExt, BufReader},
10 net::TcpListener,
11};
1213use crate::{FfmpegBuilder, Parameter};
1415type Result<T> = std::result::Result<T, Error>;
1617/// A running instance of ffmpeg.
18#[derive(Debug)]
19pub struct Ffmpeg {
20/// The stream of progress events emitted by ffmpeg.
21pub progress: UnboundedReceiver<Result<Progress>>,
22/// The actual ffmpeg process.
23pub process: Child,
24}
2526/// A progress event emitted by ffmpeg.
27///
28/// Names of the fields directly correspond to the names in the output of ffmpeg's `-progress`.
29/// Everything is wrapped in an option because this has no docs I can find, so I can't guarantee
30/// that they will all be in the data ffmpeg sends.
31/// Note that bitrate is ignored because I'm not sure of the exact format it's in. Blame ffmpeg.
32#[derive(Debug, Default)]
33pub struct Progress {
34/// What frame ffmpeg is on.
35pub frame: Option<u64>,
36/// What framerate ffmpeg is processing at.
37pub fps: Option<f64>,
38/// How much data ffmpeg has output so far, in bytes.
39pub total_size: Option<u64>,
40/// How far ffmpeg has processed.
41pub out_time: Option<Duration>,
42/// How many frames were duplicated? The meaning is unclear.
43pub dup_frames: Option<u64>,
44/// How many frames were dropped.
45pub drop_frames: Option<u64>,
46/// How fast it is processing, relative to 1x playback speed.
47pub speed: Option<f64>,
48/// What ffmpeg will do now.
49pub status: Status,
50}
5152/// What ffmpeg is going to do next.
53#[derive(Debug)]
54pub enum Status {
55/// Ffmpeg will continue emitting progress events.
56Continue,
57/// Ffmpeg has finished processing.
58 ///
59 /// After emitting this, the stream will end.
60End,
61}
6263impl Default for Status {
64fn default() -> Self {
65Self::Continue
66 }
67}
6869/// Various errors that can occur as it runs.
70#[derive(Error, Debug)]
71pub enum Error {
72/// Anything threw an [io::Error](std::io::Error).
73#[error("Io Error: {0}")]
74IoError(
75#[source]
76 #[from]
77std::io::Error,
78 ),
79/// Ffmpeg gave us data that wasn't actually a `key=value` pair.
80 ///
81 /// Hasn't happened in my testing, but I wouldn't put it past ffmpeg.
82#[error("Invalid key=value pair: {0}")]
83KeyValueParseError(String),
84/// Ffmpeg put out something unexpected for `progress`.
85#[error("Unknown status: {0}")]
86UnknownStatusError(String),
87/// Any other error that can occur while parsing ffmpeg output.
88 ///
89 /// Can only be a float or int parsing error.
90 /// The String is what it was trying to parse.
91#[error("Parse Error: {0}")]
92OtherParseError(#[source] Box<dyn std::error::Error + Send>, String),
93}
9495impl<'a> FfmpegBuilder<'a> {
96/// Spawns a new ffmpeg process and records the output, consuming the builder
97 ///
98 /// This has to consume the builder for stdin, etc to work
99pub async fn run(mut self) -> Result<Ffmpeg> {
100let listener = TcpListener::bind("127.0.0.1:0").await?;
101let port = listener.local_addr()?.port();
102let prog_url = format!("tcp://127.0.0.1:{}", port);
103104self = self.option(Parameter::KeyValue("progress", &prog_url));
105let mut command = self.to_command();
106let child = command.spawn()?;
107108let conn = listener.accept().await?.0;
109110let (mut tx, rx) = mpsc::unbounded();
111112 tokio::spawn(async move {
113let mut reader = BufReader::new(conn);
114let mut progress: Progress = Default::default();
115116loop {
117let mut line = String::new();
118let read = reader.read_line(&mut line).await;
119120match read {
121Ok(n) => {
122if n == 0 {
123 tx.close_channel();
124break;
125 }
126 }
127Err(e) => {
128let _ = tx.send(Err(e.into())).await;
129 tx.close_channel();
130 }
131 }
132133if let Some((key, value)) = parse_line(&line) {
134match key {
135"frame" => match value.parse() {
136Ok(x) => progress.frame = Some(x),
137Err(e) => handle_parse_error(&mut tx, e, value).await,
138 },
139"fps" => match value.parse() {
140Ok(x) => progress.fps = Some(x),
141Err(e) => handle_parse_error(&mut tx, e, value).await,
142 },
143// TOOD: bitrate
144"total_size" => match value.parse() {
145Ok(x) => progress.total_size = Some(x),
146Err(e) => handle_parse_error(&mut tx, e, value).await,
147 },
148"out_time_us" => match value.parse() {
149Ok(us) => progress.out_time = Some(Duration::from_micros(us)),
150Err(e) => handle_parse_error(&mut tx, e, value).await,
151 },
152"dup_frames" => match value.parse() {
153Ok(x) => progress.dup_frames = Some(x),
154Err(e) => handle_parse_error(&mut tx, e, value).await,
155 },
156"drop_frames" => match value.parse() {
157Ok(x) => progress.drop_frames = Some(x),
158Err(e) => handle_parse_error(&mut tx, e, value).await,
159 },
160"speed" => {
161let num = &value[..(value.len() - 1)];
162match num.parse() {
163Ok(x) => progress.speed = Some(x),
164Err(e) => handle_parse_error(&mut tx, e, num).await,
165 }
166 }
167"progress" => {
168 progress.status = match value {
169"continue" => Status::Continue,
170"end" => Status::End,
171 x => {
172// This causes feeding the next thing to error
173 // However, we don't care
174 // We just ignore the error
175let _ = tx.feed(Err(Error::UnknownStatusError(x.to_owned())));
176 tx.close_channel();
177178// Just give it a status so it compiles
179Status::End
180 }
181 };
182match tx.feed(Ok(progress)).await {
183Ok(_) => {}
184Err(e) => {
185if e.is_disconnected() {
186 tx.close_channel();
187 }
188 }
189 }
190 progress = Default::default();
191 }
192_ => {}
193 }
194 } else {
195let _ = tx.send(Err(Error::KeyValueParseError(line)));
196 tx.close_channel();
197 }
198 }
199 });
200201Ok(Ffmpeg {
202 progress: rx,
203 process: child,
204 })
205 }
206}
207208fn parse_line<'a>(line: &'a str) -> Option<(&'a str, &'a str)> {
209let trimmed = line.trim();
210let mut iter = trimmed.splitn(2, '=');
211212let mut key = iter.next()?;
213 key = key.trim_end();
214215let mut value = iter.next()?;
216// Ffmpeg was putting in random spaces for some reason?
217value = value.trim_start();
218219Some((key, value))
220}
221222async fn handle_parse_error(
223 tx: &mut UnboundedSender<Result<Progress>>,
224 e: impl std::error::Error + Send + 'static,
225 x: &str,
226) {
227// Ignore the error because we're closing the channel anyway
228let _ = tx
229 .send(Err(Error::OtherParseError(Box::new(e), x.to_owned())))
230 .await;
231 tx.close_channel();
232}