1use std::{process::Child, time::Duration};
2
3use futures::{
4 channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
5 SinkExt,
6};
7use thiserror::Error;
8use tokio::{
9 io::{AsyncBufReadExt, BufReader},
10 net::TcpListener,
11};
12
13use crate::{FfmpegBuilder, Parameter};
14
15type Result<T> = std::result::Result<T, Error>;
16
17#[derive(Debug)]
19pub struct Ffmpeg {
20 pub progress: UnboundedReceiver<Result<Progress>>,
22 pub process: Child,
24}
25
26#[derive(Debug, Default)]
33pub struct Progress {
34 pub frame: Option<u64>,
36 pub fps: Option<f64>,
38 pub total_size: Option<u64>,
40 pub out_time: Option<Duration>,
42 pub dup_frames: Option<u64>,
44 pub drop_frames: Option<u64>,
46 pub speed: Option<f64>,
48 pub status: Status,
50}
51
52#[derive(Debug)]
54pub enum Status {
55 Continue,
57 End,
61}
62
63impl Default for Status {
64 fn default() -> Self {
65 Self::Continue
66 }
67}
68
69#[derive(Error, Debug)]
71pub enum Error {
72 #[error("Io Error: {0}")]
74 IoError(
75 #[source]
76 #[from]
77 std::io::Error,
78 ),
79 #[error("Invalid key=value pair: {0}")]
83 KeyValueParseError(String),
84 #[error("Unknown status: {0}")]
86 UnknownStatusError(String),
87 #[error("Parse Error: {0}")]
92 OtherParseError(#[source] Box<dyn std::error::Error + Send>, String),
93}
94
95impl<'a> FfmpegBuilder<'a> {
96 pub async fn run(mut self) -> Result<Ffmpeg> {
100 let listener = TcpListener::bind("127.0.0.1:0").await?;
101 let port = listener.local_addr()?.port();
102 let prog_url = format!("tcp://127.0.0.1:{}", port);
103
104 self = self.option(Parameter::KeyValue("progress", &prog_url));
105 let mut command = self.to_command();
106 let child = command.spawn()?;
107
108 let conn = listener.accept().await?.0;
109
110 let (mut tx, rx) = mpsc::unbounded();
111
112 tokio::spawn(async move {
113 let mut reader = BufReader::new(conn);
114 let mut progress: Progress = Default::default();
115
116 loop {
117 let mut line = String::new();
118 let read = reader.read_line(&mut line).await;
119
120 match read {
121 Ok(n) => {
122 if n == 0 {
123 tx.close_channel();
124 break;
125 }
126 }
127 Err(e) => {
128 let _ = tx.send(Err(e.into())).await;
129 tx.close_channel();
130 }
131 }
132
133 if let Some((key, value)) = parse_line(&line) {
134 match key {
135 "frame" => match value.parse() {
136 Ok(x) => progress.frame = Some(x),
137 Err(e) => handle_parse_error(&mut tx, e, value).await,
138 },
139 "fps" => match value.parse() {
140 Ok(x) => progress.fps = Some(x),
141 Err(e) => handle_parse_error(&mut tx, e, value).await,
142 },
143 "total_size" => match value.parse() {
145 Ok(x) => progress.total_size = Some(x),
146 Err(e) => handle_parse_error(&mut tx, e, value).await,
147 },
148 "out_time_us" => match value.parse() {
149 Ok(us) => progress.out_time = Some(Duration::from_micros(us)),
150 Err(e) => handle_parse_error(&mut tx, e, value).await,
151 },
152 "dup_frames" => match value.parse() {
153 Ok(x) => progress.dup_frames = Some(x),
154 Err(e) => handle_parse_error(&mut tx, e, value).await,
155 },
156 "drop_frames" => match value.parse() {
157 Ok(x) => progress.drop_frames = Some(x),
158 Err(e) => handle_parse_error(&mut tx, e, value).await,
159 },
160 "speed" => {
161 let num = &value[..(value.len() - 1)];
162 match num.parse() {
163 Ok(x) => progress.speed = Some(x),
164 Err(e) => handle_parse_error(&mut tx, e, num).await,
165 }
166 }
167 "progress" => {
168 progress.status = match value {
169 "continue" => Status::Continue,
170 "end" => Status::End,
171 x => {
172 let _ = tx.feed(Err(Error::UnknownStatusError(x.to_owned())));
176 tx.close_channel();
177
178 Status::End
180 }
181 };
182 match tx.feed(Ok(progress)).await {
183 Ok(_) => {}
184 Err(e) => {
185 if e.is_disconnected() {
186 tx.close_channel();
187 }
188 }
189 }
190 progress = Default::default();
191 }
192 _ => {}
193 }
194 } else {
195 let _ = tx.send(Err(Error::KeyValueParseError(line)));
196 tx.close_channel();
197 }
198 }
199 });
200
201 Ok(Ffmpeg {
202 progress: rx,
203 process: child,
204 })
205 }
206}
207
208fn parse_line<'a>(line: &'a str) -> Option<(&'a str, &'a str)> {
209 let trimmed = line.trim();
210 let mut iter = trimmed.splitn(2, '=');
211
212 let mut key = iter.next()?;
213 key = key.trim_end();
214
215 let mut value = iter.next()?;
216 value = value.trim_start();
218
219 Some((key, value))
220}
221
222async fn handle_parse_error(
223 tx: &mut UnboundedSender<Result<Progress>>,
224 e: impl std::error::Error + Send + 'static,
225 x: &str,
226) {
227 let _ = tx
229 .send(Err(Error::OtherParseError(Box::new(e), x.to_owned())))
230 .await;
231 tx.close_channel();
232}