ff_encode/video/async_encoder.rs
1//! Async video encoder backed by a bounded `tokio::sync::mpsc` channel.
2
3use ff_format::VideoFrame;
4
5use super::builder::{VideoEncoder, VideoEncoderBuilder};
6use crate::EncodeError;
7use crate::async_encoder::{AsyncEncoder, SyncEncoder};
8
9impl SyncEncoder<VideoFrame> for VideoEncoder {
10 fn push_frame(&mut self, frame: &VideoFrame) -> Result<(), EncodeError> {
11 self.push_video(frame)
12 }
13
14 fn drain_and_finish(self) -> Result<(), EncodeError> {
15 self.finish()
16 }
17}
18
19/// Async wrapper around [`VideoEncoder`].
20///
21/// Frames are queued into a bounded channel (capacity 8) and encoded by a
22/// dedicated worker thread. When the channel is full, [`push`] suspends the
23/// caller, providing natural back-pressure.
24///
25/// # Construction
26///
27/// Use [`VideoEncoder::create`] to configure the encoder, then call
28/// [`AsyncVideoEncoder::from_builder`]:
29///
30/// ```ignore
31/// use ff_encode::{AsyncVideoEncoder, VideoEncoder, VideoCodec};
32///
33/// let mut encoder = AsyncVideoEncoder::from_builder(
34/// VideoEncoder::create("output.mp4")
35/// .video(1920, 1080, 30.0)
36/// .video_codec(VideoCodec::H264),
37/// )?;
38///
39/// encoder.push(frame).await?;
40/// encoder.finish().await?;
41/// ```
42///
43/// # Back-pressure
44///
45/// The internal channel holds at most 8 frames. Once that buffer is full,
46/// [`push`] yields until the worker drains a slot. This prevents unbounded
47/// memory growth when the encoder cannot keep up with the frame rate.
48///
49/// [`push`]: AsyncVideoEncoder::push
50pub struct AsyncVideoEncoder {
51 inner: AsyncEncoder<VideoFrame>,
52}
53
54impl std::fmt::Debug for AsyncVideoEncoder {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("AsyncVideoEncoder").finish_non_exhaustive()
57 }
58}
59
60impl AsyncVideoEncoder {
61 /// Builds an async encoder from a configured builder.
62 ///
63 /// Consumes the builder, validates the configuration, opens the output
64 /// file, and starts the worker thread. The worker runs the synchronous
65 /// FFmpeg encode loop in the background.
66 ///
67 /// # Errors
68 ///
69 /// Returns [`EncodeError`] if the builder configuration is invalid or
70 /// the output file cannot be created.
71 pub fn from_builder(builder: VideoEncoderBuilder) -> Result<Self, EncodeError> {
72 let encoder = builder.build()?;
73 Ok(Self {
74 inner: AsyncEncoder::new(encoder),
75 })
76 }
77
78 /// Queues a video frame for encoding.
79 ///
80 /// If the internal channel (capacity 8) is full, this method suspends
81 /// the caller until the worker drains a slot.
82 ///
83 /// # Errors
84 ///
85 /// Returns [`EncodeError::WorkerPanicked`] if the worker thread has
86 /// exited unexpectedly.
87 pub async fn push(&mut self, frame: VideoFrame) -> Result<(), EncodeError> {
88 self.inner.push(frame).await
89 }
90
91 /// Signals end-of-stream, flushes remaining frames, and writes the file trailer.
92 ///
93 /// Drops the channel sender (signalling EOF to the worker), then waits
94 /// for the worker thread to finish without blocking the async executor.
95 /// Any error from the worker is propagated back to the caller.
96 ///
97 /// # Errors
98 ///
99 /// Returns [`EncodeError`] if encoding fails during flush or if the
100 /// worker thread panicked.
101 pub async fn finish(self) -> Result<(), EncodeError> {
102 self.inner.finish().await
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 // Compile-time proof that AsyncVideoEncoder satisfies Send.
111 fn _assert_send() {
112 fn is_send<T: Send>() {}
113 is_send::<AsyncVideoEncoder>();
114 }
115
116 #[test]
117 fn from_builder_should_fail_on_invalid_config() {
118 // A builder with no streams configured is rejected at build time,
119 // not in the worker thread — the error surfaces from from_builder.
120 let result = AsyncVideoEncoder::from_builder(VideoEncoder::create("out.mp4"));
121 assert!(
122 result.is_err(),
123 "expected error for unconfigured builder, got Ok"
124 );
125 assert!(
126 matches!(result.unwrap_err(), EncodeError::InvalidConfig { .. }),
127 "expected InvalidConfig"
128 );
129 }
130}