1use crate::error::{SpatialError, SpatialResult};
2use crate::stereo::generate_stereo_pair;
3use crate::SpatialConfig;
4use image::{DynamicImage, ImageBuffer, RgbImage};
5use std::path::Path;
6use std::process::Stdio;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::process::Command;
9use tokio::sync::mpsc;
10
11#[derive(Clone, Debug)]
12pub struct VideoProgress {
13 pub current_frame: u32,
14 pub total_frames: u32,
15 pub stage: String,
16 pub percent: f64,
17}
18
19impl VideoProgress {
20 pub fn new(current_frame: u32, total_frames: u32, stage: String) -> Self {
21 let percent = if total_frames > 0 {
22 (current_frame as f64 / total_frames as f64 * 100.0).min(100.0)
23 } else {
24 0.0
25 };
26 Self {
27 current_frame,
28 total_frames,
29 stage,
30 percent,
31 }
32 }
33}
34
35#[derive(Clone, Debug)]
36pub struct VideoMetadata {
37 pub width: u32,
38 pub height: u32,
39 pub fps: f64,
40 pub total_frames: u32,
41 pub duration: f64,
42 pub has_audio: bool,
43}
44
45pub type ProgressCallback = Box<dyn Fn(VideoProgress) + Send + Sync>;
46
47pub async fn get_video_metadata(input_path: &Path) -> SpatialResult<VideoMetadata> {
48 let input_str = input_path
49 .to_str()
50 .ok_or_else(|| SpatialError::Other("Invalid input path encoding".to_string()))?;
51
52 let output = Command::new("ffprobe")
53 .args([
54 "-v", "error",
55 "-select_streams", "v:0",
56 "-show_entries", "stream=width,height,r_frame_rate,nb_frames,duration",
57 "-show_entries", "format=duration",
58 "-of", "json",
59 input_str,
60 ])
61 .output()
62 .await
63 .map_err(|e| {
64 SpatialError::Other(format!(
65 "Failed to run ffprobe (is ffmpeg installed?): {}",
66 e
67 ))
68 })?;
69
70 if !output.status.success() {
71 let stderr = String::from_utf8_lossy(&output.stderr);
72 return Err(SpatialError::Other(format!("ffprobe failed: {}", stderr)));
73 }
74
75 let stdout = String::from_utf8_lossy(&output.stdout);
76 let json: serde_json::Value = serde_json::from_str(&stdout)
77 .map_err(|e| SpatialError::Other(format!("Failed to parse ffprobe JSON: {}", e)))?;
78
79 let stream = json["streams"]
80 .as_array()
81 .and_then(|s| s.first())
82 .ok_or_else(|| SpatialError::Other("No video stream found".to_string()))?;
83
84 let width = stream["width"]
85 .as_u64()
86 .ok_or_else(|| SpatialError::Other("Failed to parse width".to_string()))? as u32;
87 let height = stream["height"]
88 .as_u64()
89 .ok_or_else(|| SpatialError::Other("Failed to parse height".to_string()))? as u32;
90
91 let fps = stream["r_frame_rate"]
92 .as_str()
93 .map(|s| {
94 if let Some((num, den)) = s.split_once('/') {
95 let n: f64 = num.parse().unwrap_or(30.0);
96 let d: f64 = den.parse().unwrap_or(1.0);
97 n / d
98 } else {
99 s.parse().unwrap_or(30.0)
100 }
101 })
102 .unwrap_or(30.0);
103
104 let duration = stream["duration"]
105 .as_str()
106 .and_then(|s| s.parse::<f64>().ok())
107 .or_else(|| {
108 json["format"]["duration"]
109 .as_str()
110 .and_then(|s| s.parse::<f64>().ok())
111 })
112 .unwrap_or(0.0);
113
114 let total_frames = stream["nb_frames"]
115 .as_str()
116 .and_then(|s| s.parse::<u32>().ok())
117 .unwrap_or_else(|| (duration * fps).round() as u32);
118
119 let audio_output = Command::new("ffprobe")
120 .args([
121 "-v", "error",
122 "-select_streams", "a:0",
123 "-show_entries", "stream=codec_type",
124 "-of", "csv=p=0",
125 input_str,
126 ])
127 .output()
128 .await
129 .map_err(|e| SpatialError::Other(format!("Failed to check audio: {}", e)))?;
130
131 let has_audio = String::from_utf8_lossy(&audio_output.stdout)
132 .trim()
133 .contains("audio");
134
135 Ok(VideoMetadata {
136 width,
137 height,
138 fps,
139 total_frames,
140 duration,
141 has_audio,
142 })
143}
144
145async fn extract_frames(
146 input_path: &Path,
147 metadata: &VideoMetadata,
148) -> SpatialResult<mpsc::Receiver<Vec<u8>>> {
149 let (tx, rx) = mpsc::channel::<Vec<u8>>(10);
150
151 let width = metadata.width;
152 let height = metadata.height;
153 let frame_size = (width * height * 3) as usize;
154
155 let input_path = input_path.to_path_buf();
156
157 tokio::spawn(async move {
158 let mut child = Command::new("ffmpeg")
159 .args([
160 "-i",
161 input_path.to_str().unwrap(),
162 "-f",
163 "rawvideo",
164 "-pix_fmt",
165 "rgb24",
166 "-vsync",
167 "0",
168 "-",
169 ])
170 .stdout(Stdio::piped())
171 .stderr(Stdio::null())
172 .spawn()
173 .expect("Failed to spawn ffmpeg");
174
175 let stdout = child.stdout.take().expect("Failed to capture stdout");
176 let mut reader = tokio::io::BufReader::new(stdout);
177 let mut frame_buffer = vec![0u8; frame_size];
178
179 loop {
180 match reader.read_exact(&mut frame_buffer).await {
181 Ok(_) => {
182 if tx.send(frame_buffer.clone()).await.is_err() {
183 break;
184 }
185 }
186 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
187 Err(_) => break,
188 }
189 }
190
191 let _ = child.wait().await;
192 });
193
194 Ok(rx)
195}
196
197fn frame_to_image(data: &[u8], width: u32, height: u32) -> SpatialResult<DynamicImage> {
198 let rgb_image = RgbImage::from_raw(width, height, data.to_vec()).ok_or_else(|| {
199 SpatialError::ImageError(format!(
200 "Failed to create image from frame data ({}x{})",
201 width, height
202 ))
203 })?;
204 Ok(DynamicImage::ImageRgb8(rgb_image))
205}
206
207async fn encode_stereo_video(
208 output_path: std::path::PathBuf,
209 metadata: VideoMetadata,
210 mut rx: mpsc::Receiver<(DynamicImage, DynamicImage)>,
211) -> SpatialResult<()> {
212 let width = metadata.width;
213 let height = metadata.height;
214 let fps = metadata.fps;
215
216 let output_width = width * 2;
217 let output_height = height;
218
219 let mut child = Command::new("ffmpeg")
220 .args([
221 "-f",
222 "rawvideo",
223 "-pix_fmt",
224 "rgb24",
225 "-s",
226 &format!("{}x{}", output_width, output_height),
227 "-r",
228 &format!("{}", fps),
229 "-i",
230 "-",
231 "-c:v",
232 "libx264",
233 "-preset",
234 "medium",
235 "-crf",
236 "23",
237 "-pix_fmt",
238 "yuv420p",
239 "-y",
240 output_path.to_str().unwrap(),
241 ])
242 .stdin(Stdio::piped())
243 .stdout(Stdio::null())
244 .stderr(Stdio::null())
245 .spawn()
246 .map_err(|e| SpatialError::Other(format!("Failed to spawn ffmpeg encoder: {}", e)))?;
247
248 let mut stdin = child.stdin.take().expect("Failed to capture stdin");
249
250 while let Some((left, right)) = rx.recv().await {
251 let mut sbs_image = ImageBuffer::new(output_width, output_height);
252
253 let left_rgb = left.to_rgb8();
254 for y in 0..height {
255 for x in 0..width {
256 let pixel = left_rgb.get_pixel(x, y);
257 sbs_image.put_pixel(x, y, *pixel);
258 }
259 }
260
261 let right_rgb = right.to_rgb8();
262 for y in 0..height {
263 for x in 0..width {
264 let pixel = right_rgb.get_pixel(x, y);
265 sbs_image.put_pixel(width + x, y, *pixel);
266 }
267 }
268
269 stdin
270 .write_all(&sbs_image.into_raw())
271 .await
272 .map_err(|e| SpatialError::IoError(format!("Failed to write frame: {}", e)))?;
273 }
274
275 drop(stdin);
276
277 let status = child
278 .wait()
279 .await
280 .map_err(|e| SpatialError::Other(format!("ffmpeg encoding failed: {}", e)))?;
281
282 if !status.success() {
283 return Err(SpatialError::Other(
284 "ffmpeg encoding exited with error".to_string(),
285 ));
286 }
287
288 Ok(())
289}
290
291pub async fn process_video(
292 input_path: &Path,
293 output_path: &Path,
294 config: SpatialConfig,
295 progress_cb: Option<ProgressCallback>,
296) -> SpatialResult<()> {
297 if !input_path.exists() {
298 return Err(SpatialError::IoError(format!(
299 "Input file not found: {:?}",
300 input_path
301 )));
302 }
303
304 let metadata = get_video_metadata(input_path).await?;
305
306 crate::model::ensure_model_exists::<fn(u64, u64)>(&config.encoder_size, None).await?;
307
308 #[cfg(all(target_os = "macos", feature = "coreml"))]
309 let estimator = {
310 let model_path = crate::model::find_model(&config.encoder_size)?;
311 let model_str = model_path.to_str().ok_or_else(|| {
312 SpatialError::ModelError("Invalid model path encoding".to_string())
313 })?;
314 std::sync::Arc::new(crate::depth_coreml::CoreMLDepthEstimator::new(model_str)?)
315 };
316
317 let mut frame_rx = extract_frames(input_path, &metadata).await?;
318
319 let (processed_tx, processed_rx) = mpsc::channel::<(DynamicImage, DynamicImage)>(10);
320
321 let encode_handle = tokio::spawn(encode_stereo_video(
322 output_path.to_path_buf(),
323 metadata.clone(),
324 processed_rx,
325 ));
326
327 let mut frame_count = 0u32;
328 let total_frames = metadata.total_frames;
329
330 if let Some(ref cb) = progress_cb {
331 cb(VideoProgress::new(0, total_frames, "extracting".to_string()));
332 }
333
334 while let Some(frame_data) = frame_rx.recv().await {
335 let frame = frame_to_image(&frame_data, metadata.width, metadata.height)?;
336
337 frame_count += 1;
338 if let Some(ref cb) = progress_cb {
339 if frame_count % 10 == 0 || frame_count == total_frames {
340 cb(VideoProgress::new(
341 frame_count,
342 total_frames,
343 "processing".to_string(),
344 ));
345 }
346 }
347
348 #[cfg(all(target_os = "macos", feature = "coreml"))]
349 let depth_map = estimator.estimate(&frame)?;
350
351 #[cfg(not(all(target_os = "macos", feature = "coreml")))]
352 let depth_map = {
353 #[cfg(feature = "onnx")]
354 {
355 let model_path = crate::model::find_model(&config.encoder_size)?;
358 let est = crate::depth::OnnxDepthEstimator::new(model_path.to_str().unwrap())?;
359 est.estimate(&frame)?
360 }
361 #[cfg(not(feature = "onnx"))]
362 {
363 return Err(SpatialError::ConfigError(
364 "No depth backend enabled. Enable 'coreml' or 'onnx' feature.".to_string(),
365 ));
366 }
367 };
368
369 let (left, right) = generate_stereo_pair(&frame, &depth_map, config.max_disparity)?;
370
371 if processed_tx.send((left, right)).await.is_err() {
372 return Err(SpatialError::Other(
373 "Encoder stopped unexpectedly".to_string(),
374 ));
375 }
376 }
377
378 drop(processed_tx);
379
380 if let Some(ref cb) = progress_cb {
381 cb(VideoProgress::new(
382 total_frames,
383 total_frames,
384 "encoding".to_string(),
385 ));
386 }
387
388 encode_handle
389 .await
390 .map_err(|e| SpatialError::Other(format!("Encoding task failed: {}", e)))??;
391
392 if let Some(ref cb) = progress_cb {
393 cb(VideoProgress::new(
394 total_frames,
395 total_frames,
396 "complete".to_string(),
397 ));
398 }
399
400 Ok(())
401}