async_ffmpeg_sidecar/
stream.rs1use crate::event::{FfmpegProgress, LogLevel};
4use crate::{
5 child::FfmpegChild, event::FfmpegEvent, log_parser::FfmpegLogParser, metadata::FfmpegMetadata,
6};
7use anyhow::Context;
8use futures_util::{Stream, StreamExt};
9use std::future::Future;
10use std::pin::Pin;
11use std::task::Poll;
12use tokio::{io::BufReader, pin, process::ChildStderr};
13
14pub struct FfmpegEventStream {
15 metadata: FfmpegMetadata,
16 log_parser: FfmpegLogParser<BufReader<ChildStderr>>,
18 }
21
22impl FfmpegEventStream {
23 pub fn new(child: &mut FfmpegChild) -> anyhow::Result<Self> {
24 let stderr = child.take_stderr().context("no stderr channel")?;
25 let reader = BufReader::new(stderr);
26 let parser = FfmpegLogParser::new(reader);
27 Ok(Self {
30 metadata: FfmpegMetadata::new(),
31 log_parser: parser,
32 })
35 }
36
37 pub async fn collect_metadata(&mut self) -> anyhow::Result<FfmpegMetadata> {
38 let mut event_queue: Vec<FfmpegEvent> = Vec::new();
39
40 while !self.metadata.is_completed() {
41 let event = self.next().await;
42 match event {
43 Some(e) => event_queue.push(e),
44 None => {
45 let errors = event_queue
46 .iter()
47 .filter_map(|e| match e {
48 FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e.to_string()),
49 _ => None,
50 })
51 .collect::<Vec<String>>()
52 .join("");
53
54 anyhow::bail!(
55 "Stream ran out before metadata was gathered. The following errors occurred: {errors}"
56 )
57 }
58 }
59 }
60
61 Ok(self.metadata.clone())
62 }
63
64 pub fn filter_errors(self) -> impl Stream<Item = String> {
68 self.filter_map(|event| {
69 futures::future::ready(match event {
70 FfmpegEvent::Error(e) | FfmpegEvent::Log(LogLevel::Error, e) => Some(e),
71 _ => None,
72 })
73 })
74 }
75
76 pub fn filter_progress(self) -> impl Stream<Item = FfmpegProgress> {
78 self.filter_map(|event| {
79 futures::future::ready(match event {
80 FfmpegEvent::Progress(p) => Some(p),
81 _ => None,
82 })
83 })
84 }
85}
86
87impl Stream for FfmpegEventStream {
88 type Item = FfmpegEvent;
89
90 fn poll_next(
91 mut self: Pin<&mut Self>,
92 cx: &mut std::task::Context<'_>,
93 ) -> Poll<Option<FfmpegEvent>> {
94 let fut = self.log_parser.parse_next_event();
95 let item = {
96 pin!(fut);
97
98 match fut.poll(cx) {
99 Poll::Ready(Ok(event)) => {
100 if event == FfmpegEvent::LogEOF {
101 return Poll::Ready(None);
102 }
103
104 event
105 }
106 Poll::Ready(Err(e)) => return Poll::Ready(Some(FfmpegEvent::Error(e.to_string()))),
107 Poll::Pending => return Poll::Pending,
108 }
109 };
110
111 if !self.metadata.is_completed() {
112 if let Err(e) = self.metadata.handle_event(&item) {
113 return Poll::Ready(Some(FfmpegEvent::Error(e.to_string())));
114 }
115 }
116
117 Poll::Ready(Some(item))
118 }
119}