async_ffmpeg_sidecar/
stream.rs

1//! A stream of events from an Ffmpeg process.
2
3use crate::event::{FfmpegProgress, LogLevel};
4use crate::{
5  child::FfmpegChild, event::FfmpegEvent, log_parser::FfmpegLogParser, metadata::FfmpegMetadata,
6};
7use anyhow::Context;
8use futures_util::{Stream, StreamExt};
9use std::future::Future;
10use std::pin::Pin;
11use std::task::Poll;
12use tokio::{io::BufReader, pin, process::ChildStderr};
13
14pub struct FfmpegEventStream {
15  metadata: FfmpegMetadata,
16  // stderr: ChildStderr,
17  log_parser: FfmpegLogParser<BufReader<ChildStderr>>,
18  // stdout: Option<ChildStdout>,
19  // err: bool,
20}
21
22impl FfmpegEventStream {
23  pub fn new(child: &mut FfmpegChild) -> anyhow::Result<Self> {
24    let stderr = child.take_stderr().context("no stderr channel")?;
25    let reader = BufReader::new(stderr);
26    let parser = FfmpegLogParser::new(reader);
27    // let stdout = child.take_stdout();
28
29    Ok(Self {
30      metadata: FfmpegMetadata::new(),
31      log_parser: parser,
32      // stdout,
33      // err: false,
34    })
35  }
36
37  pub async fn collect_metadata(&mut self) -> anyhow::Result<FfmpegMetadata> {
38    let mut event_queue: Vec<FfmpegEvent> = Vec::new();
39
40    while !self.metadata.is_completed() {
41      let event = self.next().await;
42      match event {
43        Some(e) => event_queue.push(e),
44        None => {
45          let errors = event_queue
46            .iter()
47            .filter_map(|e| match e {
48              FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e.to_string()),
49              _ => None,
50            })
51            .collect::<Vec<String>>()
52            .join("");
53
54          anyhow::bail!(
55            "Stream ran out before metadata was gathered. The following errors occurred: {errors}"
56          )
57        }
58      }
59    }
60
61    Ok(self.metadata.clone())
62  }
63
64  //// Stream filters
65
66  /// Returns a stream over error messages (`FfmpegEvent::Error` and `FfmpegEvent::LogError`).
67  pub fn filter_errors(self) -> impl Stream<Item = String> {
68    self.filter_map(|event| {
69      futures::future::ready(match event {
70        FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e),
71        _ => None,
72      })
73    })
74  }
75
76  /// Filter out all events except for progress (`FfmpegEvent::Progress`).
77  pub fn filter_progress(self) -> impl Stream<Item = FfmpegProgress> {
78    self.filter_map(|event| {
79      futures::future::ready(match event {
80        FfmpegEvent::Progress(p) => Some(p),
81        _ => None,
82      })
83    })
84  }
85}
86
87impl Stream for FfmpegEventStream {
88  type Item = FfmpegEvent;
89
90  fn poll_next(
91    mut self: Pin<&mut Self>,
92    cx: &mut std::task::Context<'_>,
93  ) -> Poll<Option<FfmpegEvent>> {
94    let fut = self.log_parser.parse_next_event();
95    let item = {
96      pin!(fut);
97
98      match fut.poll(cx) {
99        Poll::Ready(Ok(event)) => {
100          if event == FfmpegEvent::LogEOF {
101            return Poll::Ready(None);
102          }
103
104          event
105        }
106        Poll::Ready(Err(e)) => return Poll::Ready(Some(FfmpegEvent::Error(e.to_string()))),
107        Poll::Pending => return Poll::Pending,
108      }
109    };
110
111    if !self.metadata.is_completed() {
112      if let Err(e) = self.metadata.handle_event(&item) {
113        return Poll::Ready(Some(FfmpegEvent::Error(e.to_string())));
114      }
115    }
116
117    Poll::Ready(Some(item))
118  }
119}