use crate::event::{FfmpegProgress, LogLevel};
use crate::{
child::FfmpegChild, event::FfmpegEvent, log_parser::FfmpegLogParser, metadata::FfmpegMetadata,
};
use anyhow::Context;
use futures_util::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use tokio::{io::BufReader, pin, process::ChildStderr};
pub struct FfmpegEventStream {
metadata: FfmpegMetadata,
log_parser: FfmpegLogParser<BufReader<ChildStderr>>,
}
impl FfmpegEventStream {
pub fn new(child: &mut FfmpegChild) -> anyhow::Result<Self> {
let stderr = child.take_stderr().context("no stderr channel")?;
let reader = BufReader::new(stderr);
let parser = FfmpegLogParser::new(reader);
Ok(Self {
metadata: FfmpegMetadata::new(),
log_parser: parser,
})
}
pub async fn collect_metadata(&mut self) -> anyhow::Result<FfmpegMetadata> {
let mut event_queue: Vec<FfmpegEvent> = Vec::new();
while !self.metadata.is_completed() {
let event = self.next().await;
match event {
Some(e) => event_queue.push(e),
None => {
let errors = event_queue
.iter()
.filter_map(|e| match e {
FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e.to_string()),
_ => None,
})
.collect::<Vec<String>>()
.join("");
anyhow::bail!(
"Stream ran out before metadata was gathered. The following errors occurred: {errors}"
)
}
}
}
Ok(self.metadata.clone())
}
pub fn filter_errors(self) -> impl Stream<Item = String> {
self.filter_map(|event| {
futures::future::ready(match event {
FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e),
_ => None,
})
})
}
pub fn filter_progress(self) -> impl Stream<Item = FfmpegProgress> {
self.filter_map(|event| {
futures::future::ready(match event {
FfmpegEvent::Progress(p) => Some(p),
_ => None,
})
})
}
}
impl Stream for FfmpegEventStream {
type Item = FfmpegEvent;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<FfmpegEvent>> {
let fut = self.log_parser.parse_next_event();
let item = {
pin!(fut);
match fut.poll(cx) {
Poll::Ready(Ok(event)) => {
if event == FfmpegEvent::LogEOF {
return Poll::Ready(None);
}
event
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(FfmpegEvent::Error(e.to_string()))),
Poll::Pending => return Poll::Pending,
}
};
if !self.metadata.is_completed() {
if let Err(e) = self.metadata.handle_event(&item) {
return Poll::Ready(Some(FfmpegEvent::Error(e.to_string())));
}
}
Poll::Ready(Some(item))
}
}