1use crate::error::{SpatialError, SpatialResult};
2use crate::stereo::generate_stereo_pair;
3use crate::SpatialConfig;
4use image::{DynamicImage, ImageBuffer, RgbImage};
5use std::path::Path;
6use std::process::Stdio;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::process::Command;
9use tokio::sync::mpsc;
10
11#[derive(Clone, Debug)]
12pub struct VideoProgress {
13 pub current_frame: u32,
14 pub total_frames: u32,
15 pub stage: String,
16 pub percent: f64,
17}
18
19impl VideoProgress {
20 pub fn new(current_frame: u32, total_frames: u32, stage: String) -> Self {
21 let percent = if total_frames > 0 {
22 (current_frame as f64 / total_frames as f64 * 100.0).min(100.0)
23 } else {
24 0.0
25 };
26 Self {
27 current_frame,
28 total_frames,
29 stage,
30 percent,
31 }
32 }
33}
34
35#[derive(Clone, Debug)]
36pub struct VideoMetadata {
37 pub width: u32,
38 pub height: u32,
39 pub fps: f64,
40 pub total_frames: u32,
41 pub duration: f64,
42 pub has_audio: bool,
43}
44
45pub type ProgressCallback = Box<dyn Fn(VideoProgress) + Send + Sync>;
46
47pub async fn get_video_metadata(input_path: &Path) -> SpatialResult<VideoMetadata> {
48 let output = Command::new("ffprobe")
49 .args([
50 "-v",
51 "error",
52 "-select_streams",
53 "v:0",
54 "-count_frames",
55 "-show_entries",
56 "stream=width,height,r_frame_rate,nb_read_frames,duration",
57 "-show_entries",
58 "format=duration",
59 "-of",
60 "csv=p=0",
61 input_path.to_str().unwrap(),
62 ])
63 .output()
64 .await
65 .map_err(|e| {
66 SpatialError::Other(format!(
67 "Failed to run ffprobe (is ffmpeg installed?): {}",
68 e
69 ))
70 })?;
71
72 if !output.status.success() {
73 let stderr = String::from_utf8_lossy(&output.stderr);
74 return Err(SpatialError::Other(format!("ffprobe failed: {}", stderr)));
75 }
76
77 let stdout = String::from_utf8_lossy(&output.stdout);
78 let parts: Vec<&str> = stdout.trim().split(',').collect();
79
80 if parts.len() < 4 {
81 return Err(SpatialError::Other(format!(
82 "Unexpected ffprobe output: {}",
83 stdout
84 )));
85 }
86
87 let width = parts[0]
88 .parse::<u32>()
89 .map_err(|_| SpatialError::Other("Failed to parse width".to_string()))?;
90 let height = parts[1]
91 .parse::<u32>()
92 .map_err(|_| SpatialError::Other("Failed to parse height".to_string()))?;
93
94 let fps = if parts[2].contains('/') {
95 let fps_parts: Vec<&str> = parts[2].split('/').collect();
96 let num: f64 = fps_parts[0].parse().unwrap_or(30.0);
97 let den: f64 = fps_parts[1].parse().unwrap_or(1.0);
98 num / den
99 } else {
100 parts[2].parse().unwrap_or(30.0)
101 };
102
103 let total_frames = parts[3]
104 .parse::<u32>()
105 .map_err(|_| SpatialError::Other("Failed to parse frame count".to_string()))?;
106
107 let duration = parts
108 .get(4)
109 .and_then(|s| s.parse::<f64>().ok())
110 .unwrap_or(total_frames as f64 / fps);
111
112 let audio_output = Command::new("ffprobe")
113 .args([
114 "-v",
115 "error",
116 "-select_streams",
117 "a:0",
118 "-show_entries",
119 "stream=codec_type",
120 "-of",
121 "csv=p=0",
122 input_path.to_str().unwrap(),
123 ])
124 .output()
125 .await
126 .map_err(|e| SpatialError::Other(format!("Failed to check audio: {}", e)))?;
127
128 let has_audio = String::from_utf8_lossy(&audio_output.stdout)
129 .trim()
130 .contains("audio");
131
132 Ok(VideoMetadata {
133 width,
134 height,
135 fps,
136 total_frames,
137 duration,
138 has_audio,
139 })
140}
141
142async fn extract_frames(
143 input_path: &Path,
144 metadata: &VideoMetadata,
145) -> SpatialResult<mpsc::Receiver<Vec<u8>>> {
146 let (tx, rx) = mpsc::channel::<Vec<u8>>(10);
147
148 let width = metadata.width;
149 let height = metadata.height;
150 let frame_size = (width * height * 3) as usize;
151
152 let input_path = input_path.to_path_buf();
153
154 tokio::spawn(async move {
155 let mut child = Command::new("ffmpeg")
156 .args([
157 "-i",
158 input_path.to_str().unwrap(),
159 "-f",
160 "rawvideo",
161 "-pix_fmt",
162 "rgb24",
163 "-vsync",
164 "0",
165 "-",
166 ])
167 .stdout(Stdio::piped())
168 .stderr(Stdio::null())
169 .spawn()
170 .expect("Failed to spawn ffmpeg");
171
172 let stdout = child.stdout.take().expect("Failed to capture stdout");
173 let mut reader = tokio::io::BufReader::new(stdout);
174 let mut frame_buffer = vec![0u8; frame_size];
175
176 loop {
177 match reader.read_exact(&mut frame_buffer).await {
178 Ok(_) => {
179 if tx.send(frame_buffer.clone()).await.is_err() {
180 break;
181 }
182 }
183 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
184 Err(_) => break,
185 }
186 }
187
188 let _ = child.wait().await;
189 });
190
191 Ok(rx)
192}
193
194fn frame_to_image(data: &[u8], width: u32, height: u32) -> SpatialResult<DynamicImage> {
195 let rgb_image = RgbImage::from_raw(width, height, data.to_vec()).ok_or_else(|| {
196 SpatialError::ImageError(format!(
197 "Failed to create image from frame data ({}x{})",
198 width, height
199 ))
200 })?;
201 Ok(DynamicImage::ImageRgb8(rgb_image))
202}
203
204async fn encode_stereo_video(
205 output_path: std::path::PathBuf,
206 metadata: VideoMetadata,
207 mut rx: mpsc::Receiver<(DynamicImage, DynamicImage)>,
208) -> SpatialResult<()> {
209 let width = metadata.width;
210 let height = metadata.height;
211 let fps = metadata.fps;
212
213 let output_width = width * 2;
214 let output_height = height;
215
216 let mut child = Command::new("ffmpeg")
217 .args([
218 "-f",
219 "rawvideo",
220 "-pix_fmt",
221 "rgb24",
222 "-s",
223 &format!("{}x{}", output_width, output_height),
224 "-r",
225 &format!("{}", fps),
226 "-i",
227 "-",
228 "-c:v",
229 "libx264",
230 "-preset",
231 "medium",
232 "-crf",
233 "23",
234 "-pix_fmt",
235 "yuv420p",
236 "-y",
237 output_path.to_str().unwrap(),
238 ])
239 .stdin(Stdio::piped())
240 .stdout(Stdio::null())
241 .stderr(Stdio::null())
242 .spawn()
243 .map_err(|e| SpatialError::Other(format!("Failed to spawn ffmpeg encoder: {}", e)))?;
244
245 let mut stdin = child.stdin.take().expect("Failed to capture stdin");
246
247 while let Some((left, right)) = rx.recv().await {
248 let mut sbs_image = ImageBuffer::new(output_width, output_height);
249
250 let left_rgb = left.to_rgb8();
251 for y in 0..height {
252 for x in 0..width {
253 let pixel = left_rgb.get_pixel(x, y);
254 sbs_image.put_pixel(x, y, *pixel);
255 }
256 }
257
258 let right_rgb = right.to_rgb8();
259 for y in 0..height {
260 for x in 0..width {
261 let pixel = right_rgb.get_pixel(x, y);
262 sbs_image.put_pixel(width + x, y, *pixel);
263 }
264 }
265
266 stdin
267 .write_all(&sbs_image.into_raw())
268 .await
269 .map_err(|e| SpatialError::IoError(format!("Failed to write frame: {}", e)))?;
270 }
271
272 drop(stdin);
273
274 let status = child
275 .wait()
276 .await
277 .map_err(|e| SpatialError::Other(format!("ffmpeg encoding failed: {}", e)))?;
278
279 if !status.success() {
280 return Err(SpatialError::Other(
281 "ffmpeg encoding exited with error".to_string(),
282 ));
283 }
284
285 Ok(())
286}
287
288pub async fn process_video(
289 input_path: &Path,
290 output_path: &Path,
291 config: SpatialConfig,
292 progress_cb: Option<ProgressCallback>,
293) -> SpatialResult<()> {
294 if !input_path.exists() {
295 return Err(SpatialError::IoError(format!(
296 "Input file not found: {:?}",
297 input_path
298 )));
299 }
300
301 let metadata = get_video_metadata(input_path).await?;
302
303 crate::model::ensure_model_exists::<fn(u64, u64)>(&config.encoder_size, None).await?;
304
305 #[cfg(all(target_os = "macos", feature = "coreml"))]
306 let estimator = {
307 let model_path = crate::model::find_model(&config.encoder_size)?;
308 let model_str = model_path.to_str().ok_or_else(|| {
309 SpatialError::ModelError("Invalid model path encoding".to_string())
310 })?;
311 std::sync::Arc::new(crate::depth_coreml::CoreMLDepthEstimator::new(model_str)?)
312 };
313
314 let mut frame_rx = extract_frames(input_path, &metadata).await?;
315
316 let (processed_tx, processed_rx) = mpsc::channel::<(DynamicImage, DynamicImage)>(10);
317
318 let encode_handle = tokio::spawn(encode_stereo_video(
319 output_path.to_path_buf(),
320 metadata.clone(),
321 processed_rx,
322 ));
323
324 let mut frame_count = 0u32;
325 let total_frames = metadata.total_frames;
326
327 if let Some(ref cb) = progress_cb {
328 cb(VideoProgress::new(0, total_frames, "extracting".to_string()));
329 }
330
331 while let Some(frame_data) = frame_rx.recv().await {
332 let frame = frame_to_image(&frame_data, metadata.width, metadata.height)?;
333
334 frame_count += 1;
335 if let Some(ref cb) = progress_cb {
336 if frame_count % 10 == 0 || frame_count == total_frames {
337 cb(VideoProgress::new(
338 frame_count,
339 total_frames,
340 "processing".to_string(),
341 ));
342 }
343 }
344
345 #[cfg(all(target_os = "macos", feature = "coreml"))]
346 let depth_map = estimator.estimate(&frame)?;
347
348 #[cfg(not(all(target_os = "macos", feature = "coreml")))]
349 let depth_map = {
350 #[cfg(feature = "onnx")]
351 {
352 let model_path = crate::model::find_model(&config.encoder_size)?;
355 let est = crate::depth::OnnxDepthEstimator::new(model_path.to_str().unwrap())?;
356 est.estimate(&frame)?
357 }
358 #[cfg(not(feature = "onnx"))]
359 {
360 return Err(SpatialError::ConfigError(
361 "No depth backend enabled. Enable 'coreml' or 'onnx' feature.".to_string(),
362 ));
363 }
364 };
365
366 let (left, right) = generate_stereo_pair(&frame, &depth_map, config.max_disparity)?;
367
368 if processed_tx.send((left, right)).await.is_err() {
369 return Err(SpatialError::Other(
370 "Encoder stopped unexpectedly".to_string(),
371 ));
372 }
373 }
374
375 drop(processed_tx);
376
377 if let Some(ref cb) = progress_cb {
378 cb(VideoProgress::new(
379 total_frames,
380 total_frames,
381 "encoding".to_string(),
382 ));
383 }
384
385 encode_handle
386 .await
387 .map_err(|e| SpatialError::Other(format!("Encoding task failed: {}", e)))??;
388
389 if let Some(ref cb) = progress_cb {
390 cb(VideoProgress::new(
391 total_frames,
392 total_frames,
393 "complete".to_string(),
394 ));
395 }
396
397 Ok(())
398}