Skip to main content

axon/ots/subprocess/
ffmpeg.rs

1//! ffmpeg subprocess wrapper with a warm-process pool.
2//!
3//! ffmpeg is the fallback for transformations OTS can't handle
4//! natively (arbitrary codec combos, format containers, colour-space
5//! conversions). The wrapper:
6//!
7//! 1. Detects ffmpeg at startup — absence is NOT fatal. OTS falls
8//!    back to native paths when they exist; flows that need ffmpeg
9//!    but can't find it emit `ots:capability_degraded` on first
10//!    pipeline synthesis and compile-time warnings on the checker.
11//! 2. Maintains a TTL-bounded pool of warm processes keyed by the
12//!    pipeline signature (`source_kind → sink_kind` + flag set).
13//!    First call pays the spawn cost; subsequent calls reuse within
14//!    the TTL.
15//! 3. Never returns plaintext of the payload on stderr; adopter
16//!    `RUST_LOG` levels control ffmpeg's own verbosity.
17//!
18//! This module ships the pool + detection + executor plumbing.
19//! Concrete `Transformer` implementations that delegate to ffmpeg
20//! are adopter-side — they pick the exact ffmpeg args (e.g.
21//! `-f s16le -ar 16000 -ac 1 ...`) that match their kind taxonomy.
22
23use std::collections::HashMap;
24use std::process::Command;
25use std::sync::Mutex;
26use std::time::{Duration, Instant};
27
28use crate::buffer::{BufferKind, ZeroCopyBuffer};
29use crate::ots::pipeline::{OtsError, Transformer, TransformerBackend};
30
31// ── Detection ───────────────────────────────────────────────────────
32
33/// Probe once at startup; cache the result for the process lifetime.
34/// Adopters who want to re-detect (e.g. after a container upgrade
35/// installed ffmpeg) restart the process.
36pub fn is_ffmpeg_available() -> bool {
37    use std::sync::OnceLock;
38    static AVAILABLE: OnceLock<bool> = OnceLock::new();
39    *AVAILABLE.get_or_init(|| {
40        Command::new("ffmpeg")
41            .arg("-version")
42            .output()
43            .map(|o| o.status.success())
44            .unwrap_or(false)
45    })
46}
47
48// ── Pipeline description ────────────────────────────────────────────
49
50/// Concrete ffmpeg invocation — codec-pair signature + argv.
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct FfmpegPipeline {
53    pub from: BufferKind,
54    pub to: BufferKind,
55    /// argv AFTER the `ffmpeg` executable; the wrapper prepends
56    /// `-y -hide_banner -loglevel error` for reproducible stderr.
57    pub argv: Vec<String>,
58}
59
60impl FfmpegPipeline {
61    pub fn new(
62        from: BufferKind,
63        to: BufferKind,
64        argv: impl IntoIterator<Item = String>,
65    ) -> Self {
66        FfmpegPipeline {
67            from,
68            to,
69            argv: argv.into_iter().collect(),
70        }
71    }
72}
73
74// ── Warm pool ───────────────────────────────────────────────────────
75
76#[derive(Debug, Clone)]
77pub struct FfmpegPoolConfig {
78    /// How long a pipeline entry stays warm after its last use.
79    pub ttl: Duration,
80    /// Cap total entries so a long-running process doesn't
81    /// accumulate unbounded pipelines.
82    pub max_entries: usize,
83}
84
85impl Default for FfmpegPoolConfig {
86    fn default() -> Self {
87        FfmpegPoolConfig {
88            ttl: Duration::from_secs(60),
89            max_entries: 32,
90        }
91    }
92}
93
94#[derive(Debug, Clone)]
95struct PoolEntry {
96    pipeline: FfmpegPipeline,
97    last_used: Instant,
98    /// Cumulative invocation count for observability.
99    hits: u64,
100}
101
102/// TTL-bounded warm cache. The wrapper doesn't keep the ffmpeg
103/// process alive across calls (we spawn per-call today) — the
104/// pool's value is in caching the resolved pipeline descriptor
105/// + metrics, which is where the meaningful cost sits for small
106/// audio chunks. A follow-up revision can upgrade to a pipe-in /
107/// pipe-out long-running ffmpeg worker without changing this API.
108pub struct FfmpegPool {
109    entries: Mutex<HashMap<String, PoolEntry>>,
110    config: FfmpegPoolConfig,
111}
112
113impl FfmpegPool {
114    pub fn new(config: FfmpegPoolConfig) -> Self {
115        FfmpegPool {
116            entries: Mutex::new(HashMap::new()),
117            config,
118        }
119    }
120
121    pub fn register(&self, pipeline: FfmpegPipeline) {
122        if !is_ffmpeg_available() {
123            return;
124        }
125        let key = Self::key_for(&pipeline);
126        let mut guard = self.entries.lock().expect("pool poisoned");
127        self.evict_stale(&mut guard);
128        if guard.len() >= self.config.max_entries {
129            return;
130        }
131        guard.insert(
132            key,
133            PoolEntry {
134                pipeline,
135                last_used: Instant::now(),
136                hits: 0,
137            },
138        );
139    }
140
141    /// Execute ffmpeg for this pipeline. Spawns per-call today;
142    /// future revision upgrades to a long-running pipe-in worker.
143    pub fn execute(
144        &self,
145        pipeline: &FfmpegPipeline,
146        payload: &[u8],
147    ) -> Result<Vec<u8>, OtsError> {
148        if !is_ffmpeg_available() {
149            return Err(OtsError::TransformFailed(
150                "ffmpeg not available on this host; register a native \
151                 transformer or install ffmpeg to unlock subprocess \
152                 paths"
153                    .into(),
154            ));
155        }
156
157        // Update pool stats (best-effort; contention shouldn't block).
158        if let Ok(mut guard) = self.entries.lock() {
159            let key = Self::key_for(pipeline);
160            let entry = guard.entry(key).or_insert_with(|| PoolEntry {
161                pipeline: pipeline.clone(),
162                last_used: Instant::now(),
163                hits: 0,
164            });
165            entry.last_used = Instant::now();
166            entry.hits += 1;
167        }
168
169        let mut args: Vec<String> = vec![
170            "-y".into(),
171            "-hide_banner".into(),
172            "-loglevel".into(),
173            "error".into(),
174        ];
175        args.extend(pipeline.argv.iter().cloned());
176
177        use std::io::Write;
178        use std::process::Stdio;
179        let mut child = Command::new("ffmpeg")
180            .args(&args)
181            .stdin(Stdio::piped())
182            .stdout(Stdio::piped())
183            .stderr(Stdio::piped())
184            .spawn()
185            .map_err(|e| {
186                OtsError::TransformFailed(format!("ffmpeg spawn: {e}"))
187            })?;
188
189        if let Some(mut stdin) = child.stdin.take() {
190            stdin.write_all(payload).map_err(|e| {
191                OtsError::TransformFailed(format!("ffmpeg stdin: {e}"))
192            })?;
193        }
194        let output = child.wait_with_output().map_err(|e| {
195            OtsError::TransformFailed(format!("ffmpeg wait: {e}"))
196        })?;
197
198        if !output.status.success() {
199            let stderr = String::from_utf8_lossy(&output.stderr);
200            return Err(OtsError::TransformFailed(format!(
201                "ffmpeg exited {:?}: {stderr}",
202                output.status.code()
203            )));
204        }
205        Ok(output.stdout)
206    }
207
208    pub fn snapshot(&self) -> Vec<(String, u64, Duration)> {
209        let guard = self.entries.lock().expect("pool poisoned");
210        let now = Instant::now();
211        guard
212            .iter()
213            .map(|(k, e)| {
214                let age = now.duration_since(e.last_used);
215                (k.clone(), e.hits, age)
216            })
217            .collect()
218    }
219
220    fn evict_stale(&self, entries: &mut HashMap<String, PoolEntry>) {
221        let now = Instant::now();
222        entries.retain(|_, e| now.duration_since(e.last_used) < self.config.ttl);
223    }
224
225    fn key_for(pipeline: &FfmpegPipeline) -> String {
226        format!(
227            "{}->{}|{}",
228            pipeline.from,
229            pipeline.to,
230            pipeline.argv.join(" ")
231        )
232    }
233}
234
235impl Default for FfmpegPool {
236    fn default() -> Self {
237        FfmpegPool::new(FfmpegPoolConfig::default())
238    }
239}
240
241// ── Generic subprocess transformer ──────────────────────────────────
242
243/// Transformer that routes through the shared pool. Adopters
244/// create one per `FfmpegPipeline` they register.
245pub struct FfmpegTransformer {
246    pub pipeline: FfmpegPipeline,
247    pub pool: std::sync::Arc<FfmpegPool>,
248    pub cost_hint: u32,
249}
250
251impl Transformer for FfmpegTransformer {
252    fn source_kind(&self) -> BufferKind {
253        self.pipeline.from.clone()
254    }
255
256    fn sink_kind(&self) -> BufferKind {
257        self.pipeline.to.clone()
258    }
259
260    fn backend(&self) -> TransformerBackend {
261        TransformerBackend::Subprocess
262    }
263
264    fn cost_hint(&self) -> u32 {
265        self.cost_hint
266    }
267
268    fn transform(
269        &self,
270        input: &ZeroCopyBuffer,
271    ) -> Result<ZeroCopyBuffer, OtsError> {
272        let out_bytes = self.pool.execute(&self.pipeline, input.as_slice())?;
273        let mut buf =
274            ZeroCopyBuffer::from_bytes(out_bytes, self.sink_kind());
275        if let Some(tenant) = input.tenant_id() {
276            buf = buf.with_tenant(tenant.to_string());
277        }
278        Ok(buf)
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn pool_registers_without_crashing_when_ffmpeg_missing() {
288        // Regression: calling register() on a host without ffmpeg
289        // should be a no-op, not a panic.
290        let pool = FfmpegPool::default();
291        pool.register(FfmpegPipeline::new(
292            BufferKind::new("custom"),
293            BufferKind::new("other"),
294            std::iter::empty(),
295        ));
296        // Snapshot is either empty (no ffmpeg) or contains the
297        // registration; either is legal.
298        let _ = pool.snapshot();
299    }
300
301    #[test]
302    fn key_for_is_deterministic() {
303        let p = FfmpegPipeline::new(
304            BufferKind::new("a"),
305            BufferKind::new("b"),
306            ["-f".into(), "s16le".into()],
307        );
308        let k1 = FfmpegPool::key_for(&p);
309        let k2 = FfmpegPool::key_for(&p);
310        assert_eq!(k1, k2);
311    }
312
313    #[test]
314    fn execute_returns_error_when_ffmpeg_missing() {
315        // On a CI runner without ffmpeg the execute() path errors
316        // instead of crashing. We skip the assertion when ffmpeg is
317        // actually available (the call path then requires valid
318        // payload + args, which we don't want to synthesise in unit
319        // tests).
320        if is_ffmpeg_available() {
321            return;
322        }
323        let pool = FfmpegPool::default();
324        let pipeline = FfmpegPipeline::new(
325            BufferKind::new("a"),
326            BufferKind::new("b"),
327            std::iter::empty(),
328        );
329        let err = pool.execute(&pipeline, b"nothing").unwrap_err();
330        matches!(err, OtsError::TransformFailed(_));
331    }
332
333    #[test]
334    fn transformer_backend_is_subprocess() {
335        let pool = std::sync::Arc::new(FfmpegPool::default());
336        let t = FfmpegTransformer {
337            pipeline: FfmpegPipeline::new(
338                BufferKind::new("a"),
339                BufferKind::new("b"),
340                std::iter::empty(),
341            ),
342            pool,
343            cost_hint: 10,
344        };
345        assert_eq!(t.backend(), TransformerBackend::Subprocess);
346        assert_eq!(t.cost_hint(), 10);
347    }
348}