1use std::collections::HashMap;
24use std::process::Command;
25use std::sync::Mutex;
26use std::time::{Duration, Instant};
27
28use crate::buffer::{BufferKind, ZeroCopyBuffer};
29use crate::ots::pipeline::{OtsError, Transformer, TransformerBackend};
30
31pub fn is_ffmpeg_available() -> bool {
37 use std::sync::OnceLock;
38 static AVAILABLE: OnceLock<bool> = OnceLock::new();
39 *AVAILABLE.get_or_init(|| {
40 Command::new("ffmpeg")
41 .arg("-version")
42 .output()
43 .map(|o| o.status.success())
44 .unwrap_or(false)
45 })
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct FfmpegPipeline {
53 pub from: BufferKind,
54 pub to: BufferKind,
55 pub argv: Vec<String>,
58}
59
60impl FfmpegPipeline {
61 pub fn new(
62 from: BufferKind,
63 to: BufferKind,
64 argv: impl IntoIterator<Item = String>,
65 ) -> Self {
66 FfmpegPipeline {
67 from,
68 to,
69 argv: argv.into_iter().collect(),
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
77pub struct FfmpegPoolConfig {
78 pub ttl: Duration,
80 pub max_entries: usize,
83}
84
85impl Default for FfmpegPoolConfig {
86 fn default() -> Self {
87 FfmpegPoolConfig {
88 ttl: Duration::from_secs(60),
89 max_entries: 32,
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
95struct PoolEntry {
96 pipeline: FfmpegPipeline,
97 last_used: Instant,
98 hits: u64,
100}
101
102pub struct FfmpegPool {
109 entries: Mutex<HashMap<String, PoolEntry>>,
110 config: FfmpegPoolConfig,
111}
112
113impl FfmpegPool {
114 pub fn new(config: FfmpegPoolConfig) -> Self {
115 FfmpegPool {
116 entries: Mutex::new(HashMap::new()),
117 config,
118 }
119 }
120
121 pub fn register(&self, pipeline: FfmpegPipeline) {
122 if !is_ffmpeg_available() {
123 return;
124 }
125 let key = Self::key_for(&pipeline);
126 let mut guard = self.entries.lock().expect("pool poisoned");
127 self.evict_stale(&mut guard);
128 if guard.len() >= self.config.max_entries {
129 return;
130 }
131 guard.insert(
132 key,
133 PoolEntry {
134 pipeline,
135 last_used: Instant::now(),
136 hits: 0,
137 },
138 );
139 }
140
141 pub fn execute(
144 &self,
145 pipeline: &FfmpegPipeline,
146 payload: &[u8],
147 ) -> Result<Vec<u8>, OtsError> {
148 if !is_ffmpeg_available() {
149 return Err(OtsError::TransformFailed(
150 "ffmpeg not available on this host; register a native \
151 transformer or install ffmpeg to unlock subprocess \
152 paths"
153 .into(),
154 ));
155 }
156
157 if let Ok(mut guard) = self.entries.lock() {
159 let key = Self::key_for(pipeline);
160 let entry = guard.entry(key).or_insert_with(|| PoolEntry {
161 pipeline: pipeline.clone(),
162 last_used: Instant::now(),
163 hits: 0,
164 });
165 entry.last_used = Instant::now();
166 entry.hits += 1;
167 }
168
169 let mut args: Vec<String> = vec![
170 "-y".into(),
171 "-hide_banner".into(),
172 "-loglevel".into(),
173 "error".into(),
174 ];
175 args.extend(pipeline.argv.iter().cloned());
176
177 use std::io::Write;
178 use std::process::Stdio;
179 let mut child = Command::new("ffmpeg")
180 .args(&args)
181 .stdin(Stdio::piped())
182 .stdout(Stdio::piped())
183 .stderr(Stdio::piped())
184 .spawn()
185 .map_err(|e| {
186 OtsError::TransformFailed(format!("ffmpeg spawn: {e}"))
187 })?;
188
189 if let Some(mut stdin) = child.stdin.take() {
190 stdin.write_all(payload).map_err(|e| {
191 OtsError::TransformFailed(format!("ffmpeg stdin: {e}"))
192 })?;
193 }
194 let output = child.wait_with_output().map_err(|e| {
195 OtsError::TransformFailed(format!("ffmpeg wait: {e}"))
196 })?;
197
198 if !output.status.success() {
199 let stderr = String::from_utf8_lossy(&output.stderr);
200 return Err(OtsError::TransformFailed(format!(
201 "ffmpeg exited {:?}: {stderr}",
202 output.status.code()
203 )));
204 }
205 Ok(output.stdout)
206 }
207
208 pub fn snapshot(&self) -> Vec<(String, u64, Duration)> {
209 let guard = self.entries.lock().expect("pool poisoned");
210 let now = Instant::now();
211 guard
212 .iter()
213 .map(|(k, e)| {
214 let age = now.duration_since(e.last_used);
215 (k.clone(), e.hits, age)
216 })
217 .collect()
218 }
219
220 fn evict_stale(&self, entries: &mut HashMap<String, PoolEntry>) {
221 let now = Instant::now();
222 entries.retain(|_, e| now.duration_since(e.last_used) < self.config.ttl);
223 }
224
225 fn key_for(pipeline: &FfmpegPipeline) -> String {
226 format!(
227 "{}->{}|{}",
228 pipeline.from,
229 pipeline.to,
230 pipeline.argv.join(" ")
231 )
232 }
233}
234
235impl Default for FfmpegPool {
236 fn default() -> Self {
237 FfmpegPool::new(FfmpegPoolConfig::default())
238 }
239}
240
241pub struct FfmpegTransformer {
246 pub pipeline: FfmpegPipeline,
247 pub pool: std::sync::Arc<FfmpegPool>,
248 pub cost_hint: u32,
249}
250
251impl Transformer for FfmpegTransformer {
252 fn source_kind(&self) -> BufferKind {
253 self.pipeline.from.clone()
254 }
255
256 fn sink_kind(&self) -> BufferKind {
257 self.pipeline.to.clone()
258 }
259
260 fn backend(&self) -> TransformerBackend {
261 TransformerBackend::Subprocess
262 }
263
264 fn cost_hint(&self) -> u32 {
265 self.cost_hint
266 }
267
268 fn transform(
269 &self,
270 input: &ZeroCopyBuffer,
271 ) -> Result<ZeroCopyBuffer, OtsError> {
272 let out_bytes = self.pool.execute(&self.pipeline, input.as_slice())?;
273 let mut buf =
274 ZeroCopyBuffer::from_bytes(out_bytes, self.sink_kind());
275 if let Some(tenant) = input.tenant_id() {
276 buf = buf.with_tenant(tenant.to_string());
277 }
278 Ok(buf)
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn pool_registers_without_crashing_when_ffmpeg_missing() {
288 let pool = FfmpegPool::default();
291 pool.register(FfmpegPipeline::new(
292 BufferKind::new("custom"),
293 BufferKind::new("other"),
294 std::iter::empty(),
295 ));
296 let _ = pool.snapshot();
299 }
300
301 #[test]
302 fn key_for_is_deterministic() {
303 let p = FfmpegPipeline::new(
304 BufferKind::new("a"),
305 BufferKind::new("b"),
306 ["-f".into(), "s16le".into()],
307 );
308 let k1 = FfmpegPool::key_for(&p);
309 let k2 = FfmpegPool::key_for(&p);
310 assert_eq!(k1, k2);
311 }
312
313 #[test]
314 fn execute_returns_error_when_ffmpeg_missing() {
315 if is_ffmpeg_available() {
321 return;
322 }
323 let pool = FfmpegPool::default();
324 let pipeline = FfmpegPipeline::new(
325 BufferKind::new("a"),
326 BufferKind::new("b"),
327 std::iter::empty(),
328 );
329 let err = pool.execute(&pipeline, b"nothing").unwrap_err();
330 matches!(err, OtsError::TransformFailed(_));
331 }
332
333 #[test]
334 fn transformer_backend_is_subprocess() {
335 let pool = std::sync::Arc::new(FfmpegPool::default());
336 let t = FfmpegTransformer {
337 pipeline: FfmpegPipeline::new(
338 BufferKind::new("a"),
339 BufferKind::new("b"),
340 std::iter::empty(),
341 ),
342 pool,
343 cost_hint: 10,
344 };
345 assert_eq!(t.backend(), TransformerBackend::Subprocess);
346 assert_eq!(t.cost_hint(), 10);
347 }
348}