ffmpeg_cli/
runner.rs

1use std::{process::Child, time::Duration};
2
3use futures::{
4    channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
5    SinkExt,
6};
7use thiserror::Error;
8use tokio::{
9    io::{AsyncBufReadExt, BufReader},
10    net::TcpListener,
11};
12
13use crate::{FfmpegBuilder, Parameter};
14
15type Result<T> = std::result::Result<T, Error>;
16
17/// A running instance of ffmpeg.
18#[derive(Debug)]
19pub struct Ffmpeg {
20    /// The stream of progress events emitted by ffmpeg.
21    pub progress: UnboundedReceiver<Result<Progress>>,
22    /// The actual ffmpeg process.
23    pub process: Child,
24}
25
26/// A progress event emitted by ffmpeg.
27///
28/// Names of the fields directly correspond to the names in the output of ffmpeg's `-progress`.  
29/// Everything is wrapped in an option because this has no docs I can find, so I can't guarantee
30/// that they will all be in the data ffmpeg sends.
31/// Note that bitrate is ignored because I'm not sure of the exact format it's in. Blame ffmpeg.  
32#[derive(Debug, Default)]
33pub struct Progress {
34    /// What frame ffmpeg is on.
35    pub frame: Option<u64>,
36    /// What framerate ffmpeg is processing at.
37    pub fps: Option<f64>,
38    /// How much data ffmpeg has output so far, in bytes.
39    pub total_size: Option<u64>,
40    /// How far ffmpeg has processed.
41    pub out_time: Option<Duration>,
42    /// How many frames were duplicated? The meaning is unclear.
43    pub dup_frames: Option<u64>,
44    /// How many frames were dropped.
45    pub drop_frames: Option<u64>,
46    /// How fast it is processing, relative to 1x playback speed.
47    pub speed: Option<f64>,
48    /// What ffmpeg will do now.
49    pub status: Status,
50}
51
52/// What ffmpeg is going to do next.
53#[derive(Debug)]
54pub enum Status {
55    /// Ffmpeg will continue emitting progress events.
56    Continue,
57    /// Ffmpeg has finished processing.
58    ///
59    /// After emitting this, the stream will end.
60    End,
61}
62
63impl Default for Status {
64    fn default() -> Self {
65        Self::Continue
66    }
67}
68
69/// Various errors that can occur as it runs.
70#[derive(Error, Debug)]
71pub enum Error {
72    /// Anything threw an [io::Error](std::io::Error).
73    #[error("Io Error: {0}")]
74    IoError(
75        #[source]
76        #[from]
77        std::io::Error,
78    ),
79    /// Ffmpeg gave us data that wasn't actually a `key=value` pair.
80    ///
81    /// Hasn't happened in my testing, but I wouldn't put it past ffmpeg.
82    #[error("Invalid key=value pair: {0}")]
83    KeyValueParseError(String),
84    /// Ffmpeg put out something unexpected for `progress`.
85    #[error("Unknown status: {0}")]
86    UnknownStatusError(String),
87    /// Any other error that can occur while parsing ffmpeg output.
88    ///
89    /// Can only be a float or int parsing error.
90    /// The String is what it was trying to parse.
91    #[error("Parse Error: {0}")]
92    OtherParseError(#[source] Box<dyn std::error::Error + Send>, String),
93}
94
95impl<'a> FfmpegBuilder<'a> {
96    /// Spawns a new ffmpeg process and records the output, consuming the builder
97    ///
98    /// This has to consume the builder for stdin, etc to work
99    pub async fn run(mut self) -> Result<Ffmpeg> {
100        let listener = TcpListener::bind("127.0.0.1:0").await?;
101        let port = listener.local_addr()?.port();
102        let prog_url = format!("tcp://127.0.0.1:{}", port);
103
104        self = self.option(Parameter::KeyValue("progress", &prog_url));
105        let mut command = self.to_command();
106        let child = command.spawn()?;
107
108        let conn = listener.accept().await?.0;
109
110        let (mut tx, rx) = mpsc::unbounded();
111
112        tokio::spawn(async move {
113            let mut reader = BufReader::new(conn);
114            let mut progress: Progress = Default::default();
115
116            loop {
117                let mut line = String::new();
118                let read = reader.read_line(&mut line).await;
119
120                match read {
121                    Ok(n) => {
122                        if n == 0 {
123                            tx.close_channel();
124                            break;
125                        }
126                    }
127                    Err(e) => {
128                        let _ = tx.send(Err(e.into())).await;
129                        tx.close_channel();
130                    }
131                }
132
133                if let Some((key, value)) = parse_line(&line) {
134                    match key {
135                        "frame" => match value.parse() {
136                            Ok(x) => progress.frame = Some(x),
137                            Err(e) => handle_parse_error(&mut tx, e, value).await,
138                        },
139                        "fps" => match value.parse() {
140                            Ok(x) => progress.fps = Some(x),
141                            Err(e) => handle_parse_error(&mut tx, e, value).await,
142                        },
143                        // TOOD: bitrate
144                        "total_size" => match value.parse() {
145                            Ok(x) => progress.total_size = Some(x),
146                            Err(e) => handle_parse_error(&mut tx, e, value).await,
147                        },
148                        "out_time_us" => match value.parse() {
149                            Ok(us) => progress.out_time = Some(Duration::from_micros(us)),
150                            Err(e) => handle_parse_error(&mut tx, e, value).await,
151                        },
152                        "dup_frames" => match value.parse() {
153                            Ok(x) => progress.dup_frames = Some(x),
154                            Err(e) => handle_parse_error(&mut tx, e, value).await,
155                        },
156                        "drop_frames" => match value.parse() {
157                            Ok(x) => progress.drop_frames = Some(x),
158                            Err(e) => handle_parse_error(&mut tx, e, value).await,
159                        },
160                        "speed" => {
161                            let num = &value[..(value.len() - 1)];
162                            match num.parse() {
163                                Ok(x) => progress.speed = Some(x),
164                                Err(e) => handle_parse_error(&mut tx, e, num).await,
165                            }
166                        }
167                        "progress" => {
168                            progress.status = match value {
169                                "continue" => Status::Continue,
170                                "end" => Status::End,
171                                x => {
172                                    // This causes feeding the next thing to error
173                                    // However, we don't care
174                                    // We just ignore the error
175                                    let _ = tx.feed(Err(Error::UnknownStatusError(x.to_owned())));
176                                    tx.close_channel();
177
178                                    // Just give it a status so it compiles
179                                    Status::End
180                                }
181                            };
182                            match tx.feed(Ok(progress)).await {
183                                Ok(_) => {}
184                                Err(e) => {
185                                    if e.is_disconnected() {
186                                        tx.close_channel();
187                                    }
188                                }
189                            }
190                            progress = Default::default();
191                        }
192                        _ => {}
193                    }
194                } else {
195                    let _ = tx.send(Err(Error::KeyValueParseError(line)));
196                    tx.close_channel();
197                }
198            }
199        });
200
201        Ok(Ffmpeg {
202            progress: rx,
203            process: child,
204        })
205    }
206}
207
208fn parse_line<'a>(line: &'a str) -> Option<(&'a str, &'a str)> {
209    let trimmed = line.trim();
210    let mut iter = trimmed.splitn(2, '=');
211
212    let mut key = iter.next()?;
213    key = key.trim_end();
214
215    let mut value = iter.next()?;
216    // Ffmpeg was putting in random spaces for some reason?
217    value = value.trim_start();
218
219    Some((key, value))
220}
221
222async fn handle_parse_error(
223    tx: &mut UnboundedSender<Result<Progress>>,
224    e: impl std::error::Error + Send + 'static,
225    x: &str,
226) {
227    // Ignore the error because we're closing the channel anyway
228    let _ = tx
229        .send(Err(Error::OtherParseError(Box::new(e), x.to_owned())))
230        .await;
231    tx.close_channel();
232}