1use anyhow::{Context, Result};
42use hexz_common::constants::DEFAULT_ZSTD_LEVEL;
43use hexz_core::format::magic::HEADER_SIZE;
44use hexz_core::store::StorageBackend;
45use serde_json::Value;
46use std::fs;
47use std::io::{Read, Write};
48use std::os::unix::net::UnixStream;
49use std::path::PathBuf;
50use std::process::Command;
51use std::sync::Arc;
52use std::sync::atomic::{AtomicBool, Ordering};
53use std::thread;
54use std::time::{Duration, Instant};
55
56const MOUNT_WAIT_RETRIES: usize = 50;
62
63const MOUNT_WAIT_SLEEP: Duration = Duration::from_millis(100);
68
69const QEMU_INIT_TIMEOUT: Duration = Duration::from_secs(10);
74
75const QMP_POLL_ITERATIONS: usize = 20;
80
81const QMP_POLL_SLEEP: Duration = Duration::from_millis(500);
86
87const QMP_CONNECT_RETRY_SLEEP: Duration = Duration::from_millis(200);
92
93#[allow(clippy::too_many_arguments)]
109pub fn run(
110 snap_path: String,
111 ram_size: Option<String>,
112 kernel_mode: bool,
113 persist_path: Option<PathBuf>,
114 qmp_socket: Option<PathBuf>,
115 network_mode: String,
116 backend_type: String,
117 no_graphics: bool,
118 vnc: bool,
119) -> Result<()> {
120 match backend_type.as_str() {
121 "qemu" => boot_qemu(
122 snap_path,
123 ram_size,
124 kernel_mode,
125 persist_path,
126 qmp_socket,
127 network_mode,
128 no_graphics,
129 vnc,
130 ),
131 "firecracker" => {
132 #[cfg(feature = "firecracker")]
133 return boot_firecracker(snap_path, ram_size, persist_path, network_mode);
134 #[cfg(not(feature = "firecracker"))]
135 anyhow::bail!(
136 "Firecracker backend is not available. Compile with --features firecracker"
137 )
138 }
139 other => anyhow::bail!("Unknown backend: {}", other),
140 }
141}
142
143#[cfg(feature = "firecracker")]
144fn boot_firecracker(
145 _snap_path: String,
146 _ram_size: Option<String>,
147 _persist_path: Option<PathBuf>,
148 _network_mode: String,
149) -> anyhow::Result<()> {
150 anyhow::bail!("Firecracker backend is not yet fully implemented.")
156}
157
158fn boot_qemu(
159 snap_path: String,
160 ram_size: Option<String>,
161 kernel_mode: bool,
162 persist_path: Option<PathBuf>,
163 qmp_socket: Option<PathBuf>,
164 network_mode: String,
165 no_graphics: bool,
166 vnc: bool,
167) -> Result<()> {
168 let mount_dir = tempfile::tempdir().context("Failed to create temp mount dir")?;
169 let mount_path = mount_dir.path().to_path_buf();
170
171 let (overlay_path, _temp_guard) = if let Some(p) = persist_path {
172 if let Some(parent) = p.parent() {
173 fs::create_dir_all(parent)?;
174 }
175
176 let meta = p.with_extension("meta");
177 if !meta.exists() {
178 std::fs::File::create(&meta)?;
179 }
180
181 if !p.exists() {
182 std::fs::File::create(&p).context("Failed to create persistent overlay file")?;
183 }
184 (p, None)
185 } else {
186 let t = tempfile::NamedTempFile::new()?;
187 let p = t.path().to_path_buf();
188
189 let meta = p.with_extension("meta");
190 std::fs::File::create(&meta)?;
191
192 (p, Some(t))
193 };
194
195 println!("Mounting {} at {}...", snap_path, mount_path.display());
196
197 println!("Overlay path: {:?}", overlay_path);
198 if _temp_guard.is_some() {
199 println!("(Ephemeral mode: Overlay will be deleted on exit)");
200 }
201
202 let snap_path_clone = snap_path.clone();
203 let mount_path_clone = mount_path.clone();
204 let overlay_path_clone = overlay_path.clone();
205
206 let mounted_flag = Arc::new(AtomicBool::new(false));
207 let mounted_clone = mounted_flag.clone();
208
209 let mount_handle = thread::spawn(move || -> Result<()> {
210 let backend = Arc::new(
211 hexz_core::store::local::file::FileBackend::new(std::path::Path::new(&snap_path_clone))
212 .context("Failed to open snapshot file")?,
213 );
214
215 let header_bytes = backend
216 .read_exact(0, HEADER_SIZE)
217 .context("Failed to read header")?;
218 let header: hexz_core::format::header::Header =
219 bincode::deserialize(&header_bytes).context("Failed to deserialize header")?;
220
221 let compressor: Box<dyn hexz_core::algo::compression::Compressor> = match header.compression
222 {
223 hexz_core::format::header::CompressionType::Zstd => {
224 let dict = if let (Some(off), Some(len)) =
225 (header.dictionary_offset, header.dictionary_length)
226 {
227 Some(
228 backend
229 .read_exact(off, len as usize)
230 .context("Failed to read dictionary")?
231 .to_vec(),
232 )
233 } else {
234 None
235 };
236 Box::new(hexz_core::algo::compression::zstd::ZstdCompressor::new(
237 DEFAULT_ZSTD_LEVEL,
238 dict,
239 ))
240 }
241 _ => Box::new(hexz_core::algo::compression::lz4::Lz4Compressor::new()),
242 };
243
244 let snap =
245 hexz_core::File::new(backend, compressor, None).context("Failed to create File")?;
246
247 mounted_clone.store(true, Ordering::Release);
248
249 hexz_fuse::mount_fs(
251 snap,
252 &mount_path_clone,
253 Some(&overlay_path_clone),
254 1000,
255 1000,
256 )
257 .context("Mount failed")?;
258
259 Ok(())
260 });
261
262 print!("Waiting for mount...");
263 let mut retries = 0;
264 loop {
265 if retries > MOUNT_WAIT_RETRIES {
266 anyhow::bail!("Timed out waiting for mount");
267 }
268 if mount_handle.is_finished() {
269 match mount_handle.join() {
270 Ok(Err(e)) => return Err(e.context("Mount thread failed")),
271 Ok(Ok(())) => anyhow::bail!("Mount thread exited unexpectedly (unmounted?)"),
272 Err(e) => std::panic::resume_unwind(e),
273 }
274 }
275 if mounted_flag.load(Ordering::Acquire) && mount_path.join("disk").exists() {
276 println!(" Ready.");
277 break;
278 }
279 thread::sleep(MOUNT_WAIT_SLEEP);
280 print!(".");
281 use std::io::Write;
282 std::io::stdout().flush()?;
283 retries += 1;
284 }
285
286 let disk_path = mount_path.join("disk");
287 let mem_path = mount_path.join("memory");
288 let has_memory = mem_path.exists();
289
290 let memory_arg = if let Some(r) = ram_size {
291 r
292 } else {
293 if has_memory {
294 println!("! Warning: RAM size not specified. Defaulting to 4G.");
295 }
296 "4G".to_string()
297 };
298
299 println!("Booting VM (RAM: {})...", memory_arg);
300
301 let internal_qmp = tempfile::NamedTempFile::new()?;
302 let internal_qmp_path = internal_qmp.path().to_path_buf();
303 let _ = std::fs::remove_file(&internal_qmp_path);
304
305 let mut qemu = Command::new("qemu-system-x86_64");
306 qemu.arg("-m").arg(&memory_arg);
307
308 if vnc {
309 println!("Starting VNC server on display :1 (Port 5901).");
310 qemu.arg("-display").arg("vnc=:1");
311 } else if no_graphics {
312 println!("(Running in Headless Serial Mode)");
313 println!("* To exit QEMU: Press 'Ctrl+a' then 'x'");
314
315 qemu.arg("-nographic");
316 }
317
318 if network_mode == "user" {
319 println!("Networking enabled (user/virtio)");
320 qemu.arg("-net").arg("nic,model=virtio");
321 qemu.arg("-net").arg("user");
322 } else if network_mode == "tap" {
323 println!("Networking enabled (tap)");
324 qemu.arg("-net").arg("nic,model=virtio");
325 qemu.arg("-net").arg("tap");
326 } else {
327 println!("Networking disabled (strict isolation)");
328 qemu.arg("-net").arg("none");
329 }
330
331 qemu.arg("-drive")
332 .arg(format!("file={},format=raw", disk_path.display()));
333
334 if kernel_mode {
335 qemu.arg("-enable-kvm");
336 }
337
338 qemu.arg("-qmp").arg(format!(
339 "unix:{},server,nowait",
340 internal_qmp_path.to_string_lossy()
341 ));
342
343 if let Some(socket) = qmp_socket {
344 if socket.exists() {
345 let _ = std::fs::remove_file(&socket);
346 }
347 println!("QMP Socket enabled at: {:?}", socket);
348 qemu.arg("-qmp")
349 .arg(format!("unix:{},server,nowait", socket.to_string_lossy()));
350 }
351
352 if has_memory {
353 let mem_path_str = mem_path.to_string_lossy().replace('\'', "'\\''");
354 qemu.arg("-incoming")
355 .arg(format!("exec:cat '{}'", mem_path_str));
356 }
357
358 let mut child = qemu.spawn().context("Failed to run qemu-system-x86_64")?;
359
360 if has_memory {
361 println!("Waiting for VM to initialize...");
362
363 let start_time = Instant::now();
364 let timeout = QEMU_INIT_TIMEOUT;
365 let mut connected = false;
366
367 while start_time.elapsed() < timeout {
368 if let Ok(Some(status)) = child.try_wait() {
369 anyhow::bail!("QEMU process exited unexpectedly with status: {}", status);
370 }
371
372 if let Ok(mut stream) = UnixStream::connect(&internal_qmp_path) {
373 connected = true;
374
375 let _ = read_qmp_response(&mut stream);
376 let _ = send_qmp_command(&mut stream, "qmp_capabilities");
377
378 println!("Connected to QEMU. Polling status...");
379 for _ in 0..QMP_POLL_ITERATIONS {
380 if let Ok(resp) = send_qmp_command(&mut stream, "query-status")
381 && let Some(ret) = resp.get("return")
382 && let Some(status) = ret.get("status").and_then(|s| s.as_str())
383 {
384 if status == "paused" || status == "postmigrate" || status == "prelaunch" {
385 println!("VM State: {}. Sending resume command...", status);
386 let _ = send_qmp_command(&mut stream, "cont");
387 break;
388 } else if status == "running" {
389 println!("VM is running.");
390 break;
391 }
392 }
393 thread::sleep(QMP_POLL_SLEEP);
394 }
395 break;
396 }
397 thread::sleep(QMP_CONNECT_RETRY_SLEEP);
398 }
399
400 if !connected {
401 eprintln!("! Warning: Failed to connect to QEMU QMP socket. VM may be frozen.");
402 }
403 }
404
405 let status = child.wait()?;
406
407 if !status.success() {
408 eprintln!("QEMU exited with error");
409 }
410
411 println!("Cleaning up...");
412 let _ = Command::new("fusermount")
413 .arg("-u")
414 .arg(&mount_path)
415 .status();
416
417 match mount_handle.join() {
418 Ok(Ok(())) => {}
419 Ok(Err(e)) => eprintln!("Mount thread returned error: {}", e),
420 Err(e) => eprintln!("Mount thread panicked: {:?}", e),
421 }
422
423 if _temp_guard.is_some() {
424 let meta_path = overlay_path.with_extension("meta");
425 if meta_path.exists() {
426 let _ = std::fs::remove_file(meta_path);
427 }
428 }
429
430 Ok(())
431}
432
433fn send_qmp_command(stream: &mut UnixStream, cmd: &str) -> Result<Value> {
445 let json = serde_json::json!({ "execute": cmd });
446 let data = serde_json::to_string(&json)?;
447 stream.write_all(data.as_bytes())?;
448 read_qmp_response(stream)
449}
450
451fn read_qmp_response(stream: &mut UnixStream) -> Result<Value> {
464 let mut buf = [0u8; HEADER_SIZE];
465 let n = stream.read(&mut buf)?;
466 let s = String::from_utf8_lossy(&buf[..n]);
467
468 for line in s.lines() {
469 if let Ok(val) = serde_json::from_str::<Value>(line)
470 && val.get("return").is_some()
471 {
472 return Ok(val);
473 }
474 }
475 Ok(serde_json::json!({}))
476}