1pub mod error;
8pub mod extract;
9pub mod qemu;
10pub mod qmp;
11
12use std::io::Read;
13use std::net::TcpStream;
14use std::path::PathBuf;
15use std::process::{Child, Stdio};
16use std::time::{Duration, Instant};
17
18use rvf_types::kernel::KernelArch;
19
20pub use error::LaunchError;
21
22#[derive(Clone, Debug)]
24pub struct LaunchConfig {
25 pub rvf_path: PathBuf,
27 pub memory_mb: u32,
29 pub vcpus: u32,
31 pub api_port: u16,
33 pub ssh_port: Option<u16>,
35 pub enable_kvm: bool,
38 pub qemu_binary: Option<PathBuf>,
40 pub extra_args: Vec<String>,
42 pub kernel_path: Option<PathBuf>,
44 pub initramfs_path: Option<PathBuf>,
46}
47
48impl Default for LaunchConfig {
49 fn default() -> Self {
50 Self {
51 rvf_path: PathBuf::new(),
52 memory_mb: 128,
53 vcpus: 1,
54 api_port: 8080,
55 ssh_port: None,
56 enable_kvm: true,
57 qemu_binary: None,
58 extra_args: Vec::new(),
59 kernel_path: None,
60 initramfs_path: None,
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum VmStatus {
68 Running,
70 Exited(Option<i32>),
72}
73
74pub struct MicroVm {
76 process: Child,
77 api_port: u16,
78 ssh_port: Option<u16>,
79 qmp_socket: PathBuf,
80 pid: u32,
81 _extracted: Option<extract::ExtractedKernel>,
83 _workdir: tempfile::TempDir,
85}
86
87pub struct Launcher;
89
90impl Launcher {
91 pub fn launch(config: &LaunchConfig) -> Result<MicroVm, LaunchError> {
93 if !config.rvf_path.exists() {
94 return Err(LaunchError::Io(std::io::Error::new(
95 std::io::ErrorKind::NotFound,
96 format!("RVF file not found: {}", config.rvf_path.display()),
97 )));
98 }
99
100 let extracted = extract::extract_kernel(&config.rvf_path)?;
102
103 let workdir = tempfile::tempdir().map_err(LaunchError::TempFile)?;
105
106 let qemu_cmd = qemu::build_command(config, &extracted, workdir.path())?;
108
109 let qmp_socket = qemu_cmd.qmp_socket.clone();
110
111 let mut command = qemu_cmd.command;
113 command
114 .stdin(Stdio::null())
115 .stdout(Stdio::piped())
116 .stderr(Stdio::piped());
117
118 let child = command.spawn().map_err(LaunchError::QemuSpawn)?;
119
120 let pid = child.id();
121
122 Ok(MicroVm {
123 process: child,
124 api_port: config.api_port,
125 ssh_port: config.ssh_port,
126 qmp_socket,
127 pid,
128 _extracted: Some(extracted),
129 _workdir: workdir,
130 })
131 }
132
133 pub fn find_qemu(arch: KernelArch) -> Result<PathBuf, LaunchError> {
135 qemu::find_qemu(arch)
136 }
137
138 pub fn kvm_available() -> bool {
140 qemu::kvm_available()
141 }
142}
143
144impl MicroVm {
145 pub fn wait_ready(&mut self, timeout: Duration) -> Result<(), LaunchError> {
147 let start = Instant::now();
148 let addr = format!("127.0.0.1:{}", self.api_port);
149
150 loop {
151 if let Some(exit) = self.try_wait_process()? {
153 let mut stderr_buf = String::new();
154 if let Some(ref mut stderr) = self.process.stderr {
155 let _ = stderr.read_to_string(&mut stderr_buf);
156 }
157 return Err(LaunchError::QemuExited {
158 code: exit,
159 stderr: stderr_buf,
160 });
161 }
162
163 if TcpStream::connect_timeout(
165 &addr.parse().unwrap(),
166 Duration::from_millis(200),
167 )
168 .is_ok()
169 {
170 return Ok(());
171 }
172
173 if start.elapsed() >= timeout {
174 return Err(LaunchError::Timeout {
175 seconds: timeout.as_secs(),
176 });
177 }
178
179 std::thread::sleep(Duration::from_millis(250));
180 }
181 }
182
183 pub fn query(
185 &self,
186 vector: &[f32],
187 k: usize,
188 ) -> Result<Vec<rvf_runtime::SearchResult>, LaunchError> {
189 let _url = format!("http://127.0.0.1:{}/query", self.api_port);
190
191 let payload = serde_json::json!({
193 "vector": vector,
194 "k": k,
195 });
196 let body = serde_json::to_vec(&payload)
197 .map_err(|e| LaunchError::Io(std::io::Error::other(e)))?;
198
199 let addr = format!("127.0.0.1:{}", self.api_port);
202 let mut stream = TcpStream::connect_timeout(
203 &addr.parse().unwrap(),
204 Duration::from_secs(5),
205 )
206 .map_err(LaunchError::Io)?;
207
208 stream
209 .set_read_timeout(Some(Duration::from_secs(30)))
210 .map_err(LaunchError::Io)?;
211
212 use std::io::Write;
213 let request = format!(
214 "POST /query HTTP/1.1\r\n\
215 Host: 127.0.0.1:{}\r\n\
216 Content-Type: application/json\r\n\
217 Content-Length: {}\r\n\
218 Connection: close\r\n\
219 \r\n",
220 self.api_port,
221 body.len(),
222 );
223 stream.write_all(request.as_bytes()).map_err(LaunchError::Io)?;
224 stream.write_all(&body).map_err(LaunchError::Io)?;
225
226 let mut response = String::new();
227 stream.read_to_string(&mut response).map_err(LaunchError::Io)?;
228
229 let body_start = response
231 .find("\r\n\r\n")
232 .map(|i| i + 4)
233 .unwrap_or(0);
234 let resp_body = &response[body_start..];
235
236 #[derive(serde::Deserialize)]
237 struct QueryResult {
238 id: u64,
239 distance: f32,
240 }
241
242 let results: Vec<QueryResult> = serde_json::from_str(resp_body)
243 .map_err(|e| LaunchError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
244
245 Ok(results
246 .into_iter()
247 .map(|r| rvf_runtime::SearchResult {
248 id: r.id,
249 distance: r.distance,
250 })
251 .collect())
252 }
253
254 pub fn status(&mut self) -> VmStatus {
256 match self.process.try_wait() {
257 Ok(Some(status)) => VmStatus::Exited(status.code()),
258 Ok(None) => VmStatus::Running,
259 Err(_) => VmStatus::Exited(None),
260 }
261 }
262
263 pub fn shutdown(&mut self) -> Result<(), LaunchError> {
265 if self.qmp_socket.exists() {
267 match qmp::QmpClient::connect(&self.qmp_socket, Duration::from_secs(5)) {
268 Ok(mut client) => {
269 let _ = client.system_powerdown();
270
271 let start = Instant::now();
273 while start.elapsed() < Duration::from_secs(10) {
274 if let Ok(Some(_)) = self.process.try_wait() {
275 return Ok(());
276 }
277 std::thread::sleep(Duration::from_millis(200));
278 }
279
280 let _ = client.quit();
282 let start = Instant::now();
283 while start.elapsed() < Duration::from_secs(5) {
284 if let Ok(Some(_)) = self.process.try_wait() {
285 return Ok(());
286 }
287 std::thread::sleep(Duration::from_millis(200));
288 }
289 }
290 Err(_) => {
291 }
293 }
294 }
295
296 #[cfg(unix)]
298 {
299 unsafe {
300 libc_kill(self.pid as i32);
301 }
302 let start = Instant::now();
303 while start.elapsed() < Duration::from_secs(5) {
304 if let Ok(Some(_)) = self.process.try_wait() {
305 return Ok(());
306 }
307 std::thread::sleep(Duration::from_millis(100));
308 }
309 }
310
311 let _ = self.process.kill();
313 let _ = self.process.wait();
314 Ok(())
315 }
316
317 pub fn kill(&mut self) -> Result<(), LaunchError> {
319 self.process.kill().map_err(LaunchError::Io)?;
320 let _ = self.process.wait();
321 Ok(())
322 }
323
324 pub fn pid(&self) -> u32 {
326 self.pid
327 }
328
329 pub fn api_port(&self) -> u16 {
331 self.api_port
332 }
333
334 pub fn ssh_port(&self) -> Option<u16> {
336 self.ssh_port
337 }
338
339 pub fn qmp_socket(&self) -> &PathBuf {
341 &self.qmp_socket
342 }
343
344 fn try_wait_process(&mut self) -> Result<Option<Option<i32>>, LaunchError> {
345 match self.process.try_wait() {
346 Ok(Some(status)) => Ok(Some(status.code())),
347 Ok(None) => Ok(None),
348 Err(e) => Err(LaunchError::Io(e)),
349 }
350 }
351}
352
353impl Drop for MicroVm {
354 fn drop(&mut self) {
355 if let Ok(None) = self.process.try_wait() {
357 let _ = self.process.kill();
358 let _ = self.process.wait();
359 }
360 }
361}
362
363#[cfg(unix)]
365unsafe fn libc_kill(pid: i32) {
366 let _ = std::process::Command::new("kill")
369 .args(["-TERM", &pid.to_string()])
370 .output();
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn default_config() {
379 let config = LaunchConfig::default();
380 assert_eq!(config.memory_mb, 128);
381 assert_eq!(config.vcpus, 1);
382 assert_eq!(config.api_port, 8080);
383 assert!(config.enable_kvm);
384 }
385
386 #[test]
387 fn vm_status_variants() {
388 assert_eq!(VmStatus::Running, VmStatus::Running);
389 assert_eq!(VmStatus::Exited(Some(0)), VmStatus::Exited(Some(0)));
390 assert_ne!(VmStatus::Running, VmStatus::Exited(None));
391 }
392}