Skip to main content

rvf_launch/
lib.rs

1//! QEMU microVM launcher for RVF computational containers.
2//!
3//! This crate extracts a kernel image from an RVF file's KERNEL_SEG,
4//! builds a QEMU command line, launches the VM, and provides a handle
5//! for management (query, shutdown, kill) via QMP.
6
7pub mod error;
8pub mod extract;
9pub mod qemu;
10pub mod qmp;
11
12use std::io::Read;
13use std::net::TcpStream;
14use std::path::PathBuf;
15use std::process::{Child, Stdio};
16use std::time::{Duration, Instant};
17
18use rvf_types::kernel::KernelArch;
19
20pub use error::LaunchError;
21
22/// Configuration for launching an RVF microVM.
23#[derive(Clone, Debug)]
24pub struct LaunchConfig {
25    /// Path to the RVF store file.
26    pub rvf_path: PathBuf,
27    /// Memory allocation in MiB.
28    pub memory_mb: u32,
29    /// Number of virtual CPUs.
30    pub vcpus: u32,
31    /// Host port to forward to the VM's API port (guest :8080).
32    pub api_port: u16,
33    /// Optional host port to forward to the VM's SSH port (guest :2222).
34    pub ssh_port: Option<u16>,
35    /// Whether to enable KVM acceleration (falls back to TCG if unavailable
36    /// unless the kernel requires KVM).
37    pub enable_kvm: bool,
38    /// Override the QEMU binary path.
39    pub qemu_binary: Option<PathBuf>,
40    /// Extra arguments to pass to QEMU.
41    pub extra_args: Vec<String>,
42    /// Override the kernel image path (skip extraction from RVF).
43    pub kernel_path: Option<PathBuf>,
44    /// Override the initramfs path.
45    pub initramfs_path: Option<PathBuf>,
46}
47
48impl Default for LaunchConfig {
49    fn default() -> Self {
50        Self {
51            rvf_path: PathBuf::new(),
52            memory_mb: 128,
53            vcpus: 1,
54            api_port: 8080,
55            ssh_port: None,
56            enable_kvm: true,
57            qemu_binary: None,
58            extra_args: Vec::new(),
59            kernel_path: None,
60            initramfs_path: None,
61        }
62    }
63}
64
65/// Current status of the microVM.
66#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum VmStatus {
68    /// QEMU process is running.
69    Running,
70    /// QEMU process has exited.
71    Exited(Option<i32>),
72}
73
74/// A running QEMU microVM.
75pub struct MicroVm {
76    process: Child,
77    api_port: u16,
78    ssh_port: Option<u16>,
79    qmp_socket: PathBuf,
80    pid: u32,
81    /// Holds the extracted kernel temp files alive.
82    _extracted: Option<extract::ExtractedKernel>,
83    /// Holds the work directory alive.
84    _workdir: tempfile::TempDir,
85}
86
87/// Top-level launcher API.
88pub struct Launcher;
89
90impl Launcher {
91    /// Extract kernel from an RVF file and launch it in a QEMU microVM.
92    pub fn launch(config: &LaunchConfig) -> Result<MicroVm, LaunchError> {
93        if !config.rvf_path.exists() {
94            return Err(LaunchError::Io(std::io::Error::new(
95                std::io::ErrorKind::NotFound,
96                format!("RVF file not found: {}", config.rvf_path.display()),
97            )));
98        }
99
100        // Extract kernel from RVF
101        let extracted = extract::extract_kernel(&config.rvf_path)?;
102
103        // Create a working directory for QMP socket, logs, etc.
104        let workdir = tempfile::tempdir().map_err(LaunchError::TempFile)?;
105
106        // Build the QEMU command
107        let qemu_cmd = qemu::build_command(config, &extracted, workdir.path())?;
108
109        let qmp_socket = qemu_cmd.qmp_socket.clone();
110
111        // Spawn QEMU
112        let mut command = qemu_cmd.command;
113        command
114            .stdin(Stdio::null())
115            .stdout(Stdio::piped())
116            .stderr(Stdio::piped());
117
118        let child = command.spawn().map_err(LaunchError::QemuSpawn)?;
119
120        let pid = child.id();
121
122        Ok(MicroVm {
123            process: child,
124            api_port: config.api_port,
125            ssh_port: config.ssh_port,
126            qmp_socket,
127            pid,
128            _extracted: Some(extracted),
129            _workdir: workdir,
130        })
131    }
132
133    /// Find the QEMU binary for the given architecture.
134    pub fn find_qemu(arch: KernelArch) -> Result<PathBuf, LaunchError> {
135        qemu::find_qemu(arch)
136    }
137
138    /// Check if KVM is available on this host.
139    pub fn kvm_available() -> bool {
140        qemu::kvm_available()
141    }
142}
143
144impl MicroVm {
145    /// Wait for the VM's API port to accept TCP connections.
146    pub fn wait_ready(&mut self, timeout: Duration) -> Result<(), LaunchError> {
147        let start = Instant::now();
148        let addr = format!("127.0.0.1:{}", self.api_port);
149
150        loop {
151            // Check if the process has exited
152            if let Some(exit) = self.try_wait_process()? {
153                let mut stderr_buf = String::new();
154                if let Some(ref mut stderr) = self.process.stderr {
155                    let _ = stderr.read_to_string(&mut stderr_buf);
156                }
157                return Err(LaunchError::QemuExited {
158                    code: exit,
159                    stderr: stderr_buf,
160                });
161            }
162
163            // Try connecting to the API port
164            if TcpStream::connect_timeout(
165                &addr.parse().unwrap(),
166                Duration::from_millis(200),
167            )
168            .is_ok()
169            {
170                return Ok(());
171            }
172
173            if start.elapsed() >= timeout {
174                return Err(LaunchError::Timeout {
175                    seconds: timeout.as_secs(),
176                });
177            }
178
179            std::thread::sleep(Duration::from_millis(250));
180        }
181    }
182
183    /// Send a vector query to the running VM's HTTP API.
184    pub fn query(
185        &self,
186        vector: &[f32],
187        k: usize,
188    ) -> Result<Vec<rvf_runtime::SearchResult>, LaunchError> {
189        let _url = format!("http://127.0.0.1:{}/query", self.api_port);
190
191        // Build JSON payload
192        let payload = serde_json::json!({
193            "vector": vector,
194            "k": k,
195        });
196        let body = serde_json::to_vec(&payload)
197            .map_err(|e| LaunchError::Io(std::io::Error::other(e)))?;
198
199        // Use a raw TCP connection to send an HTTP POST (avoids depending
200        // on a full HTTP client library).
201        let addr = format!("127.0.0.1:{}", self.api_port);
202        let mut stream = TcpStream::connect_timeout(
203            &addr.parse().unwrap(),
204            Duration::from_secs(5),
205        )
206        .map_err(LaunchError::Io)?;
207
208        stream
209            .set_read_timeout(Some(Duration::from_secs(30)))
210            .map_err(LaunchError::Io)?;
211
212        use std::io::Write;
213        let request = format!(
214            "POST /query HTTP/1.1\r\n\
215             Host: 127.0.0.1:{}\r\n\
216             Content-Type: application/json\r\n\
217             Content-Length: {}\r\n\
218             Connection: close\r\n\
219             \r\n",
220            self.api_port,
221            body.len(),
222        );
223        stream.write_all(request.as_bytes()).map_err(LaunchError::Io)?;
224        stream.write_all(&body).map_err(LaunchError::Io)?;
225
226        let mut response = String::new();
227        stream.read_to_string(&mut response).map_err(LaunchError::Io)?;
228
229        // Parse the HTTP response body (skip headers)
230        let body_start = response
231            .find("\r\n\r\n")
232            .map(|i| i + 4)
233            .unwrap_or(0);
234        let resp_body = &response[body_start..];
235
236        #[derive(serde::Deserialize)]
237        struct QueryResult {
238            id: u64,
239            distance: f32,
240        }
241
242        let results: Vec<QueryResult> = serde_json::from_str(resp_body)
243            .map_err(|e| LaunchError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
244
245        Ok(results
246            .into_iter()
247            .map(|r| rvf_runtime::SearchResult {
248                id: r.id,
249                distance: r.distance,
250            })
251            .collect())
252    }
253
254    /// Get the current VM status.
255    pub fn status(&mut self) -> VmStatus {
256        match self.process.try_wait() {
257            Ok(Some(status)) => VmStatus::Exited(status.code()),
258            Ok(None) => VmStatus::Running,
259            Err(_) => VmStatus::Exited(None),
260        }
261    }
262
263    /// Graceful shutdown: try QMP `system_powerdown`, fall back to SIGTERM.
264    pub fn shutdown(&mut self) -> Result<(), LaunchError> {
265        // Try QMP first
266        if self.qmp_socket.exists() {
267            match qmp::QmpClient::connect(&self.qmp_socket, Duration::from_secs(5)) {
268                Ok(mut client) => {
269                    let _ = client.system_powerdown();
270
271                    // Wait up to 10 seconds for the VM to shut down
272                    let start = Instant::now();
273                    while start.elapsed() < Duration::from_secs(10) {
274                        if let Ok(Some(_)) = self.process.try_wait() {
275                            return Ok(());
276                        }
277                        std::thread::sleep(Duration::from_millis(200));
278                    }
279
280                    // Still running, try quit
281                    let _ = client.quit();
282                    let start = Instant::now();
283                    while start.elapsed() < Duration::from_secs(5) {
284                        if let Ok(Some(_)) = self.process.try_wait() {
285                            return Ok(());
286                        }
287                        std::thread::sleep(Duration::from_millis(200));
288                    }
289                }
290                Err(_) => {
291                    // QMP not available, fall through to SIGTERM
292                }
293            }
294        }
295
296        // Fall back to SIGTERM (via kill on Unix)
297        #[cfg(unix)]
298        {
299            unsafe {
300                libc_kill(self.pid as i32);
301            }
302            let start = Instant::now();
303            while start.elapsed() < Duration::from_secs(5) {
304                if let Ok(Some(_)) = self.process.try_wait() {
305                    return Ok(());
306                }
307                std::thread::sleep(Duration::from_millis(100));
308            }
309        }
310
311        // Last resort: kill -9
312        let _ = self.process.kill();
313        let _ = self.process.wait();
314        Ok(())
315    }
316
317    /// Force-kill the VM process immediately.
318    pub fn kill(&mut self) -> Result<(), LaunchError> {
319        self.process.kill().map_err(LaunchError::Io)?;
320        let _ = self.process.wait();
321        Ok(())
322    }
323
324    /// Get the QEMU process PID.
325    pub fn pid(&self) -> u32 {
326        self.pid
327    }
328
329    /// Get the API port.
330    pub fn api_port(&self) -> u16 {
331        self.api_port
332    }
333
334    /// Get the SSH port, if configured.
335    pub fn ssh_port(&self) -> Option<u16> {
336        self.ssh_port
337    }
338
339    /// Get the QMP socket path.
340    pub fn qmp_socket(&self) -> &PathBuf {
341        &self.qmp_socket
342    }
343
344    fn try_wait_process(&mut self) -> Result<Option<Option<i32>>, LaunchError> {
345        match self.process.try_wait() {
346            Ok(Some(status)) => Ok(Some(status.code())),
347            Ok(None) => Ok(None),
348            Err(e) => Err(LaunchError::Io(e)),
349        }
350    }
351}
352
353impl Drop for MicroVm {
354    fn drop(&mut self) {
355        // Best-effort cleanup: try to kill the process if still running.
356        if let Ok(None) = self.process.try_wait() {
357            let _ = self.process.kill();
358            let _ = self.process.wait();
359        }
360    }
361}
362
363/// Send SIGTERM on Unix. Avoids a libc dependency by using a raw syscall.
364#[cfg(unix)]
365unsafe fn libc_kill(pid: i32) {
366    // SIGTERM = 15 on all Unix platforms
367    // We use std::process::Command as a portable way to send signals.
368    let _ = std::process::Command::new("kill")
369        .args(["-TERM", &pid.to_string()])
370        .output();
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn default_config() {
379        let config = LaunchConfig::default();
380        assert_eq!(config.memory_mb, 128);
381        assert_eq!(config.vcpus, 1);
382        assert_eq!(config.api_port, 8080);
383        assert!(config.enable_kvm);
384    }
385
386    #[test]
387    fn vm_status_variants() {
388        assert_eq!(VmStatus::Running, VmStatus::Running);
389        assert_eq!(VmStatus::Exited(Some(0)), VmStatus::Exited(Some(0)));
390        assert_ne!(VmStatus::Running, VmStatus::Exited(None));
391    }
392}