Skip to main content

spatial_maker/
video.rs

1use crate::error::{SpatialError, SpatialResult};
2use crate::stereo::generate_stereo_pair;
3use crate::SpatialConfig;
4use image::{DynamicImage, ImageBuffer, RgbImage};
5use std::path::Path;
6use std::process::Stdio;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::process::Command;
9use tokio::sync::mpsc;
10
11#[derive(Clone, Debug)]
12pub struct VideoProgress {
13	pub current_frame: u32,
14	pub total_frames: u32,
15	pub stage: String,
16	pub percent: f64,
17}
18
19impl VideoProgress {
20	pub fn new(current_frame: u32, total_frames: u32, stage: String) -> Self {
21		let percent = if total_frames > 0 {
22			(current_frame as f64 / total_frames as f64 * 100.0).min(100.0)
23		} else {
24			0.0
25		};
26		Self {
27			current_frame,
28			total_frames,
29			stage,
30			percent,
31		}
32	}
33}
34
35#[derive(Clone, Debug)]
36pub struct VideoMetadata {
37	pub width: u32,
38	pub height: u32,
39	pub fps: f64,
40	pub total_frames: u32,
41	pub duration: f64,
42	pub has_audio: bool,
43}
44
45pub type ProgressCallback = Box<dyn Fn(VideoProgress) + Send + Sync>;
46
47pub async fn get_video_metadata(input_path: &Path) -> SpatialResult<VideoMetadata> {
48	let input_str = input_path
49		.to_str()
50		.ok_or_else(|| SpatialError::Other("Invalid input path encoding".to_string()))?;
51
52	let output = Command::new("ffprobe")
53		.args([
54			"-v", "error",
55			"-select_streams", "v:0",
56			"-show_entries", "stream=width,height,r_frame_rate,nb_frames,duration",
57			"-show_entries", "format=duration",
58			"-of", "json",
59			input_str,
60		])
61		.output()
62		.await
63		.map_err(|e| {
64			SpatialError::Other(format!(
65				"Failed to run ffprobe (is ffmpeg installed?): {}",
66				e
67			))
68		})?;
69
70	if !output.status.success() {
71		let stderr = String::from_utf8_lossy(&output.stderr);
72		return Err(SpatialError::Other(format!("ffprobe failed: {}", stderr)));
73	}
74
75	let stdout = String::from_utf8_lossy(&output.stdout);
76	let json: serde_json::Value = serde_json::from_str(&stdout)
77		.map_err(|e| SpatialError::Other(format!("Failed to parse ffprobe JSON: {}", e)))?;
78
79	let stream = json["streams"]
80		.as_array()
81		.and_then(|s| s.first())
82		.ok_or_else(|| SpatialError::Other("No video stream found".to_string()))?;
83
84	let width = stream["width"]
85		.as_u64()
86		.ok_or_else(|| SpatialError::Other("Failed to parse width".to_string()))? as u32;
87	let height = stream["height"]
88		.as_u64()
89		.ok_or_else(|| SpatialError::Other("Failed to parse height".to_string()))? as u32;
90
91	let fps = stream["r_frame_rate"]
92		.as_str()
93		.map(|s| {
94			if let Some((num, den)) = s.split_once('/') {
95				let n: f64 = num.parse().unwrap_or(30.0);
96				let d: f64 = den.parse().unwrap_or(1.0);
97				n / d
98			} else {
99				s.parse().unwrap_or(30.0)
100			}
101		})
102		.unwrap_or(30.0);
103
104	let duration = stream["duration"]
105		.as_str()
106		.and_then(|s| s.parse::<f64>().ok())
107		.or_else(|| {
108			json["format"]["duration"]
109				.as_str()
110				.and_then(|s| s.parse::<f64>().ok())
111		})
112		.unwrap_or(0.0);
113
114	let total_frames = stream["nb_frames"]
115		.as_str()
116		.and_then(|s| s.parse::<u32>().ok())
117		.unwrap_or_else(|| (duration * fps).round() as u32);
118
119	let audio_output = Command::new("ffprobe")
120		.args([
121			"-v", "error",
122			"-select_streams", "a:0",
123			"-show_entries", "stream=codec_type",
124			"-of", "csv=p=0",
125			input_str,
126		])
127		.output()
128		.await
129		.map_err(|e| SpatialError::Other(format!("Failed to check audio: {}", e)))?;
130
131	let has_audio = String::from_utf8_lossy(&audio_output.stdout)
132		.trim()
133		.contains("audio");
134
135	Ok(VideoMetadata {
136		width,
137		height,
138		fps,
139		total_frames,
140		duration,
141		has_audio,
142	})
143}
144
145async fn extract_frames(
146	input_path: &Path,
147	metadata: &VideoMetadata,
148) -> SpatialResult<mpsc::Receiver<Vec<u8>>> {
149	let (tx, rx) = mpsc::channel::<Vec<u8>>(10);
150
151	let width = metadata.width;
152	let height = metadata.height;
153	let frame_size = (width * height * 3) as usize;
154
155	let input_path = input_path.to_path_buf();
156
157	tokio::spawn(async move {
158		let mut child = Command::new("ffmpeg")
159			.args([
160				"-i",
161				input_path.to_str().unwrap(),
162				"-f",
163				"rawvideo",
164				"-pix_fmt",
165				"rgb24",
166				"-vsync",
167				"0",
168				"-",
169			])
170			.stdout(Stdio::piped())
171			.stderr(Stdio::null())
172			.spawn()
173			.expect("Failed to spawn ffmpeg");
174
175		let stdout = child.stdout.take().expect("Failed to capture stdout");
176		let mut reader = tokio::io::BufReader::new(stdout);
177		let mut frame_buffer = vec![0u8; frame_size];
178
179		loop {
180			match reader.read_exact(&mut frame_buffer).await {
181				Ok(_) => {
182					if tx.send(frame_buffer.clone()).await.is_err() {
183						break;
184					}
185				}
186				Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
187				Err(_) => break,
188			}
189		}
190
191		let _ = child.wait().await;
192	});
193
194	Ok(rx)
195}
196
197fn frame_to_image(data: &[u8], width: u32, height: u32) -> SpatialResult<DynamicImage> {
198	let rgb_image = RgbImage::from_raw(width, height, data.to_vec()).ok_or_else(|| {
199		SpatialError::ImageError(format!(
200			"Failed to create image from frame data ({}x{})",
201			width, height
202		))
203	})?;
204	Ok(DynamicImage::ImageRgb8(rgb_image))
205}
206
207async fn encode_stereo_video(
208	output_path: std::path::PathBuf,
209	metadata: VideoMetadata,
210	mut rx: mpsc::Receiver<(DynamicImage, DynamicImage)>,
211) -> SpatialResult<()> {
212	let width = metadata.width;
213	let height = metadata.height;
214	let fps = metadata.fps;
215
216	let output_width = width * 2;
217	let output_height = height;
218
219	let mut child = Command::new("ffmpeg")
220		.args([
221			"-f",
222			"rawvideo",
223			"-pix_fmt",
224			"rgb24",
225			"-s",
226			&format!("{}x{}", output_width, output_height),
227			"-r",
228			&format!("{}", fps),
229			"-i",
230			"-",
231			"-c:v",
232			"libx264",
233			"-preset",
234			"medium",
235			"-crf",
236			"23",
237			"-pix_fmt",
238			"yuv420p",
239			"-y",
240			output_path.to_str().unwrap(),
241		])
242		.stdin(Stdio::piped())
243		.stdout(Stdio::null())
244		.stderr(Stdio::null())
245		.spawn()
246		.map_err(|e| SpatialError::Other(format!("Failed to spawn ffmpeg encoder: {}", e)))?;
247
248	let mut stdin = child.stdin.take().expect("Failed to capture stdin");
249
250	while let Some((left, right)) = rx.recv().await {
251		let mut sbs_image = ImageBuffer::new(output_width, output_height);
252
253		let left_rgb = left.to_rgb8();
254		for y in 0..height {
255			for x in 0..width {
256				let pixel = left_rgb.get_pixel(x, y);
257				sbs_image.put_pixel(x, y, *pixel);
258			}
259		}
260
261		let right_rgb = right.to_rgb8();
262		for y in 0..height {
263			for x in 0..width {
264				let pixel = right_rgb.get_pixel(x, y);
265				sbs_image.put_pixel(width + x, y, *pixel);
266			}
267		}
268
269		stdin
270			.write_all(&sbs_image.into_raw())
271			.await
272			.map_err(|e| SpatialError::IoError(format!("Failed to write frame: {}", e)))?;
273	}
274
275	drop(stdin);
276
277	let status = child
278		.wait()
279		.await
280		.map_err(|e| SpatialError::Other(format!("ffmpeg encoding failed: {}", e)))?;
281
282	if !status.success() {
283		return Err(SpatialError::Other(
284			"ffmpeg encoding exited with error".to_string(),
285		));
286	}
287
288	Ok(())
289}
290
291pub async fn process_video(
292	input_path: &Path,
293	output_path: &Path,
294	config: SpatialConfig,
295	progress_cb: Option<ProgressCallback>,
296) -> SpatialResult<()> {
297	if !input_path.exists() {
298		return Err(SpatialError::IoError(format!(
299			"Input file not found: {:?}",
300			input_path
301		)));
302	}
303
304	let metadata = get_video_metadata(input_path).await?;
305
306	crate::model::ensure_model_exists::<fn(u64, u64)>(&config.encoder_size, None).await?;
307
308	#[cfg(all(target_os = "macos", feature = "coreml"))]
309	let estimator = {
310		let model_path = crate::model::find_model(&config.encoder_size)?;
311		let model_str = model_path.to_str().ok_or_else(|| {
312			SpatialError::ModelError("Invalid model path encoding".to_string())
313		})?;
314		std::sync::Arc::new(crate::depth_coreml::CoreMLDepthEstimator::new(model_str)?)
315	};
316
317	let mut frame_rx = extract_frames(input_path, &metadata).await?;
318
319	let (processed_tx, processed_rx) = mpsc::channel::<(DynamicImage, DynamicImage)>(10);
320
321	let encode_handle = tokio::spawn(encode_stereo_video(
322		output_path.to_path_buf(),
323		metadata.clone(),
324		processed_rx,
325	));
326
327	let mut frame_count = 0u32;
328	let total_frames = metadata.total_frames;
329
330	if let Some(ref cb) = progress_cb {
331		cb(VideoProgress::new(0, total_frames, "extracting".to_string()));
332	}
333
334	while let Some(frame_data) = frame_rx.recv().await {
335		let frame = frame_to_image(&frame_data, metadata.width, metadata.height)?;
336
337		frame_count += 1;
338		if let Some(ref cb) = progress_cb {
339			if frame_count % 10 == 0 || frame_count == total_frames {
340				cb(VideoProgress::new(
341					frame_count,
342					total_frames,
343					"processing".to_string(),
344				));
345			}
346		}
347
348		#[cfg(all(target_os = "macos", feature = "coreml"))]
349		let depth_map = estimator.estimate(&frame)?;
350
351		#[cfg(not(all(target_os = "macos", feature = "coreml")))]
352		let depth_map = {
353			#[cfg(feature = "onnx")]
354			{
355				// For ONNX, we'd need to cache the estimator too
356				// For now, this is a placeholder - ONNX video is not the primary path
357				let model_path = crate::model::find_model(&config.encoder_size)?;
358				let est = crate::depth::OnnxDepthEstimator::new(model_path.to_str().unwrap())?;
359				est.estimate(&frame)?
360			}
361			#[cfg(not(feature = "onnx"))]
362			{
363				return Err(SpatialError::ConfigError(
364					"No depth backend enabled. Enable 'coreml' or 'onnx' feature.".to_string(),
365				));
366			}
367		};
368
369		let (left, right) = generate_stereo_pair(&frame, &depth_map, config.max_disparity)?;
370
371		if processed_tx.send((left, right)).await.is_err() {
372			return Err(SpatialError::Other(
373				"Encoder stopped unexpectedly".to_string(),
374			));
375		}
376	}
377
378	drop(processed_tx);
379
380	if let Some(ref cb) = progress_cb {
381		cb(VideoProgress::new(
382			total_frames,
383			total_frames,
384			"encoding".to_string(),
385		));
386	}
387
388	encode_handle
389		.await
390		.map_err(|e| SpatialError::Other(format!("Encoding task failed: {}", e)))??;
391
392	if let Some(ref cb) = progress_cb {
393		cb(VideoProgress::new(
394			total_frames,
395			total_frames,
396			"complete".to_string(),
397		));
398	}
399
400	Ok(())
401}