duende_core/adapters/
pepita.rs

1//! pepita MicroVM adapter implementation.
2//!
3//! Provides daemon management via pepita microVMs with vsock communication.
4//!
5//! pepita is PAIML's lightweight microVM implementation similar to Firecracker,
6//! optimized for running single-purpose daemons with minimal overhead.
7
8use crate::adapter::{DaemonHandle, PlatformAdapter, PlatformError, PlatformResult, TracerHandle};
9use crate::daemon::Daemon;
10use crate::platform::Platform;
11use crate::types::{DaemonStatus, FailureReason, Signal};
12
13use async_trait::async_trait;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU32, Ordering};
17use tokio::sync::RwLock;
18
19/// Vsock CID allocator.
20static NEXT_CID: AtomicU32 = AtomicU32::new(3); // CID 0-2 are reserved
21
22/// pepita MicroVM adapter.
23///
24/// Manages daemons inside lightweight microVMs via vsock communication.
25///
26/// # Architecture
27///
28/// ```text
29/// Host                          MicroVM
30/// ┌─────────────────┐          ┌─────────────────┐
31/// │  PepitaAdapter  │          │  pepita guest   │
32/// │  ┌───────────┐  │  vsock   │  ┌───────────┐  │
33/// │  │ VmManager ├──┼──────────┼──┤ DaemonCtl │  │
34/// │  └───────────┘  │          │  └───────────┘  │
35/// └─────────────────┘          └─────────────────┘
36/// ```
37///
38/// # Requirements
39///
40/// - Linux with KVM support (`/dev/kvm`)
41/// - pepita VMM installed
42/// - Kernel and rootfs images for guests
43///
44/// # Example
45///
46/// ```rust,ignore
47/// use duende_core::adapters::PepitaAdapter;
48/// use duende_core::PlatformAdapter;
49///
50/// let adapter = PepitaAdapter::new();
51/// let handle = adapter.spawn(my_daemon).await?;
52/// ```
53pub struct PepitaAdapter {
54    /// Vsock base port for daemon communication
55    vsock_base_port: u32,
56    /// Running VMs indexed by daemon ID
57    vms: Arc<RwLock<HashMap<uuid::Uuid, VmInfo>>>,
58    /// Default kernel path
59    default_kernel: Option<String>,
60    /// Default rootfs path
61    default_rootfs: Option<String>,
62}
63
64/// Information about a running VM.
65#[derive(Debug, Clone)]
66#[allow(dead_code)] // Fields used for future VM management operations
67struct VmInfo {
68    /// VM identifier
69    vm_id: String,
70    /// Vsock CID
71    vsock_cid: u32,
72    /// Process ID of VMM process (if spawned locally)
73    vmm_pid: Option<u32>,
74    /// Current VM state
75    state: VmState,
76}
77
78/// VM state.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80#[allow(dead_code)] // Variants used for future VM state management
81enum VmState {
82    /// VM is starting
83    Starting,
84    /// VM is running
85    Running,
86    /// VM is paused
87    Paused,
88    /// VM has stopped
89    Stopped,
90    /// VM failed
91    Failed,
92}
93
94impl PepitaAdapter {
95    /// Creates a new pepita adapter with default settings.
96    #[must_use]
97    pub fn new() -> Self {
98        Self {
99            vsock_base_port: 5000,
100            vms: Arc::new(RwLock::new(HashMap::new())),
101            default_kernel: None,
102            default_rootfs: None,
103        }
104    }
105
106    /// Creates a pepita adapter with custom vsock base port.
107    #[must_use]
108    pub fn with_vsock_port(vsock_base_port: u32) -> Self {
109        Self {
110            vsock_base_port,
111            vms: Arc::new(RwLock::new(HashMap::new())),
112            default_kernel: None,
113            default_rootfs: None,
114        }
115    }
116
117    /// Creates a pepita adapter with kernel and rootfs paths.
118    #[must_use]
119    pub fn with_images(kernel: impl Into<String>, rootfs: impl Into<String>) -> Self {
120        Self {
121            vsock_base_port: 5000,
122            vms: Arc::new(RwLock::new(HashMap::new())),
123            default_kernel: Some(kernel.into()),
124            default_rootfs: Some(rootfs.into()),
125        }
126    }
127
128    /// Returns the vsock base port.
129    #[must_use]
130    pub const fn vsock_base_port(&self) -> u32 {
131        self.vsock_base_port
132    }
133
134    /// Allocates a new vsock CID.
135    fn allocate_cid() -> u32 {
136        NEXT_CID.fetch_add(1, Ordering::Relaxed)
137    }
138
139    /// Generates a VM ID from daemon name.
140    fn vm_id(daemon_name: &str) -> String {
141        format!(
142            "duende-vm-{}",
143            daemon_name.replace(' ', "-").replace('_', "-")
144        )
145    }
146
147    /// Checks if KVM is available.
148    fn kvm_available() -> bool {
149        std::path::Path::new("/dev/kvm").exists()
150    }
151
152    /// Checks if pepita VMM is installed.
153    async fn pepita_available() -> bool {
154        tokio::process::Command::new("pepita")
155            .arg("--version")
156            .output()
157            .await
158            .map(|o| o.status.success())
159            .unwrap_or(false)
160    }
161
162    /// Maps Signal to signal number for vsock command.
163    fn signal_number(sig: Signal) -> i32 {
164        match sig {
165            Signal::Term => 15,
166            Signal::Kill => 9,
167            Signal::Int => 2,
168            Signal::Quit => 3,
169            Signal::Hup => 1,
170            Signal::Usr1 => 10,
171            Signal::Usr2 => 12,
172            Signal::Stop => 19,
173            Signal::Cont => 18,
174        }
175    }
176}
177
178impl Default for PepitaAdapter {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184#[async_trait]
185impl PlatformAdapter for PepitaAdapter {
186    fn platform(&self) -> Platform {
187        Platform::PepitaMicroVM
188    }
189
190    async fn spawn(&self, daemon: Box<dyn Daemon>) -> PlatformResult<DaemonHandle> {
191        // Check prerequisites
192        if !Self::kvm_available() {
193            return Err(PlatformError::spawn_failed(
194                "KVM not available: /dev/kvm not found. Ensure KVM is enabled and you have permissions.",
195            ));
196        }
197
198        if !Self::pepita_available().await {
199            return Err(PlatformError::spawn_failed(
200                "pepita VMM not found. Install pepita or add it to PATH.",
201            ));
202        }
203
204        let kernel = self.default_kernel.as_ref().ok_or_else(|| {
205            PlatformError::Config(
206                "No kernel image configured. Use with_images() to set kernel path.".into(),
207            )
208        })?;
209
210        let rootfs = self.default_rootfs.as_ref().ok_or_else(|| {
211            PlatformError::Config(
212                "No rootfs image configured. Use with_images() to set rootfs path.".into(),
213            )
214        })?;
215
216        let daemon_name = daemon.name().to_string();
217        let daemon_id = daemon.id();
218        let vm_id = Self::vm_id(&daemon_name);
219        let vsock_cid = Self::allocate_cid();
220
221        // Build pepita command
222        // pepita run --kernel <path> --rootfs <path> --vsock-cid <cid> --memory <mb> --cpus <n>
223        let output = tokio::process::Command::new("pepita")
224            .arg("run")
225            .arg("--kernel")
226            .arg(kernel)
227            .arg("--rootfs")
228            .arg(rootfs)
229            .arg("--vsock-cid")
230            .arg(vsock_cid.to_string())
231            .arg("--memory")
232            .arg("256") // Default 256MB
233            .arg("--cpus")
234            .arg("1")
235            .arg("--name")
236            .arg(&vm_id)
237            .arg("--daemon") // Run in background
238            .output()
239            .await
240            .map_err(|e| PlatformError::spawn_failed(format!("Failed to execute pepita: {}", e)))?;
241
242        if !output.status.success() {
243            let stderr = String::from_utf8_lossy(&output.stderr);
244            return Err(PlatformError::spawn_failed(format!(
245                "pepita run failed: {}",
246                stderr
247            )));
248        }
249
250        // Store VM info
251        let vm_info = VmInfo {
252            vm_id: vm_id.clone(),
253            vsock_cid,
254            vmm_pid: None, // pepita manages this internally
255            state: VmState::Running,
256        };
257
258        self.vms.write().await.insert(*daemon_id.as_uuid(), vm_info);
259
260        Ok(DaemonHandle::pepita(daemon_id, vm_id, vsock_cid))
261    }
262
263    async fn signal(&self, handle: &DaemonHandle, sig: Signal) -> PlatformResult<()> {
264        let (vm_id, _vsock_cid) = match (handle.pepita_vm_id(), handle.vsock_cid()) {
265            (Some(id), Some(cid)) => (id, cid),
266            _ => {
267                return Err(PlatformError::spawn_failed(
268                    "Invalid handle type for pepita adapter",
269                ));
270            }
271        };
272
273        // Send signal via vsock or pepita CLI
274        // pepita signal --name <vm_id> --signal <sig>
275        let output = tokio::process::Command::new("pepita")
276            .arg("signal")
277            .arg("--name")
278            .arg(vm_id)
279            .arg("--signal")
280            .arg(Self::signal_number(sig).to_string())
281            .output()
282            .await
283            .map_err(|e| {
284                PlatformError::signal_failed(format!("Failed to execute pepita: {}", e))
285            })?;
286
287        if !output.status.success() {
288            let stderr = String::from_utf8_lossy(&output.stderr);
289            return Err(PlatformError::signal_failed(format!(
290                "pepita signal failed: {}",
291                stderr
292            )));
293        }
294
295        // Update state if stopping
296        if matches!(sig, Signal::Term | Signal::Kill) {
297            if let Some(vm_info) = self.vms.write().await.get_mut(handle.id().as_uuid()) {
298                vm_info.state = VmState::Stopped;
299            }
300        }
301
302        Ok(())
303    }
304
305    async fn status(&self, handle: &DaemonHandle) -> PlatformResult<DaemonStatus> {
306        let vm_id = handle
307            .pepita_vm_id()
308            .ok_or_else(|| PlatformError::spawn_failed("Invalid handle type for pepita adapter"))?;
309
310        // Query pepita for VM status
311        // pepita status --name <vm_id> --json
312        let output = tokio::process::Command::new("pepita")
313            .arg("status")
314            .arg("--name")
315            .arg(vm_id)
316            .arg("--json")
317            .output()
318            .await
319            .map_err(|e| {
320                PlatformError::status_failed(format!("Failed to execute pepita: {}", e))
321            })?;
322
323        if !output.status.success() {
324            // VM not found = stopped
325            return Ok(DaemonStatus::Stopped);
326        }
327
328        let stdout = String::from_utf8_lossy(&output.stdout);
329
330        // Parse JSON status
331        if stdout.contains("\"state\": \"running\"") || stdout.contains("\"state\":\"running\"") {
332            Ok(DaemonStatus::Running)
333        } else if stdout.contains("\"state\": \"paused\"") {
334            Ok(DaemonStatus::Paused)
335        } else if stdout.contains("\"state\": \"failed\"") {
336            Ok(DaemonStatus::Failed(FailureReason::ExitCode(1)))
337        } else {
338            Ok(DaemonStatus::Stopped)
339        }
340    }
341
342    async fn attach_tracer(&self, handle: &DaemonHandle) -> PlatformResult<TracerHandle> {
343        let vsock_cid = handle
344            .vsock_cid()
345            .ok_or_else(|| PlatformError::spawn_failed("Invalid handle type for pepita adapter"))?;
346
347        if vsock_cid == 0 {
348            return Err(PlatformError::tracer_failed("VM not running"));
349        }
350
351        // Return a remote vsock-based tracer handle
352        Ok(TracerHandle::remote_vsock(handle.id()))
353    }
354}
355
356impl PepitaAdapter {
357    /// Stops and destroys a VM.
358    pub async fn destroy(&self, vm_id: &str) -> PlatformResult<()> {
359        let output = tokio::process::Command::new("pepita")
360            .arg("destroy")
361            .arg("--name")
362            .arg(vm_id)
363            .arg("--force")
364            .output()
365            .await
366            .map_err(|e| PlatformError::spawn_failed(format!("Failed to execute pepita: {}", e)))?;
367
368        if !output.status.success() {
369            // Ignore errors - VM might already be destroyed
370        }
371
372        Ok(())
373    }
374
375    /// Lists all running VMs.
376    pub async fn list_vms(&self) -> PlatformResult<Vec<String>> {
377        let output = tokio::process::Command::new("pepita")
378            .arg("list")
379            .arg("--format")
380            .arg("name")
381            .output()
382            .await
383            .map_err(|e| PlatformError::spawn_failed(format!("Failed to execute pepita: {}", e)))?;
384
385        if !output.status.success() {
386            return Ok(Vec::new());
387        }
388
389        let stdout = String::from_utf8_lossy(&output.stdout);
390        Ok(stdout.lines().map(|s| s.to_string()).collect())
391    }
392}
393
394// Extend DaemonHandle for pepita-specific accessors
395impl crate::adapter::DaemonHandle {
396    /// Returns the pepita VM ID, if applicable.
397    #[must_use]
398    pub fn pepita_vm_id(&self) -> Option<&str> {
399        match self.handle_data() {
400            crate::adapter::HandleData::Pepita { vm_id, .. } => Some(vm_id),
401            _ => None,
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[test]
411    fn test_pepita_adapter_new() {
412        let adapter = PepitaAdapter::new();
413        assert_eq!(adapter.vsock_base_port(), 5000);
414        assert_eq!(adapter.platform(), Platform::PepitaMicroVM);
415    }
416
417    #[test]
418    fn test_pepita_adapter_with_vsock_port() {
419        let adapter = PepitaAdapter::with_vsock_port(9000);
420        assert_eq!(adapter.vsock_base_port(), 9000);
421    }
422
423    #[test]
424    fn test_pepita_adapter_with_images() {
425        let adapter = PepitaAdapter::with_images("/boot/vmlinuz", "/var/lib/rootfs.img");
426        assert!(adapter.default_kernel.is_some());
427        assert!(adapter.default_rootfs.is_some());
428    }
429
430    #[test]
431    fn test_pepita_adapter_default() {
432        let adapter = PepitaAdapter::default();
433        assert_eq!(adapter.platform(), Platform::PepitaMicroVM);
434    }
435
436    #[test]
437    fn test_vm_id_generation() {
438        assert_eq!(PepitaAdapter::vm_id("my-daemon"), "duende-vm-my-daemon");
439        assert_eq!(PepitaAdapter::vm_id("my daemon"), "duende-vm-my-daemon");
440    }
441
442    #[test]
443    fn test_allocate_cid() {
444        let cid1 = PepitaAdapter::allocate_cid();
445        let cid2 = PepitaAdapter::allocate_cid();
446        assert!(cid2 > cid1);
447    }
448
449    #[test]
450    fn test_signal_number() {
451        assert_eq!(PepitaAdapter::signal_number(Signal::Term), 15);
452        assert_eq!(PepitaAdapter::signal_number(Signal::Kill), 9);
453    }
454
455    #[tokio::test]
456    async fn test_pepita_adapter_spawn_fails_without_kvm() {
457        // Skip if KVM is available (would need pepita)
458        if PepitaAdapter::kvm_available() {
459            return;
460        }
461
462        let adapter = PepitaAdapter::with_images("/boot/vmlinuz", "/rootfs.img");
463
464        struct TestDaemon {
465            id: crate::types::DaemonId,
466            metrics: crate::metrics::DaemonMetrics,
467        }
468
469        #[async_trait::async_trait]
470        impl crate::daemon::Daemon for TestDaemon {
471            fn id(&self) -> crate::types::DaemonId {
472                self.id
473            }
474            fn name(&self) -> &str {
475                "test"
476            }
477            async fn init(&mut self, _: &crate::config::DaemonConfig) -> crate::error::Result<()> {
478                Ok(())
479            }
480            async fn run(
481                &mut self,
482                _: &mut crate::daemon::DaemonContext,
483            ) -> crate::error::Result<crate::types::ExitReason> {
484                Ok(crate::types::ExitReason::Graceful)
485            }
486            async fn shutdown(&mut self, _: std::time::Duration) -> crate::error::Result<()> {
487                Ok(())
488            }
489            async fn health_check(&self) -> crate::types::HealthStatus {
490                crate::types::HealthStatus::healthy(1)
491            }
492            fn metrics(&self) -> &crate::metrics::DaemonMetrics {
493                &self.metrics
494            }
495        }
496
497        let daemon = TestDaemon {
498            id: crate::types::DaemonId::new(),
499            metrics: crate::metrics::DaemonMetrics::new(),
500        };
501
502        let result = adapter.spawn(Box::new(daemon)).await;
503        assert!(result.is_err());
504        // Should fail because KVM is not available
505        let err = result.unwrap_err();
506        assert!(err.to_string().contains("KVM") || err.to_string().contains("pepita"));
507    }
508
509    // ==================== Extended Tests for Coverage ====================
510
511    #[test]
512    fn test_signal_number_all_signals() {
513        assert_eq!(PepitaAdapter::signal_number(Signal::Int), 2);
514        assert_eq!(PepitaAdapter::signal_number(Signal::Quit), 3);
515        assert_eq!(PepitaAdapter::signal_number(Signal::Hup), 1);
516        assert_eq!(PepitaAdapter::signal_number(Signal::Usr1), 10);
517        assert_eq!(PepitaAdapter::signal_number(Signal::Usr2), 12);
518        assert_eq!(PepitaAdapter::signal_number(Signal::Stop), 19);
519        assert_eq!(PepitaAdapter::signal_number(Signal::Cont), 18);
520    }
521
522    #[test]
523    fn test_vm_id_variations() {
524        assert_eq!(PepitaAdapter::vm_id("test"), "duende-vm-test");
525        assert_eq!(PepitaAdapter::vm_id("test_daemon"), "duende-vm-test-daemon");
526        assert_eq!(PepitaAdapter::vm_id("test-daemon"), "duende-vm-test-daemon");
527        assert_eq!(PepitaAdapter::vm_id(""), "duende-vm-");
528        assert_eq!(PepitaAdapter::vm_id("a b c"), "duende-vm-a-b-c");
529    }
530
531    #[test]
532    fn test_kvm_available_check() {
533        // This just tests that the function doesn't panic
534        let _ = PepitaAdapter::kvm_available();
535    }
536
537    #[tokio::test]
538    async fn test_pepita_available_check() {
539        // pepita is likely not installed, so this should return false
540        let available = PepitaAdapter::pepita_available().await;
541        // We just verify it doesn't panic; result depends on system
542        let _ = available;
543    }
544
545    #[test]
546    fn test_vm_info_state() {
547        let info = VmInfo {
548            vm_id: "test".to_string(),
549            vsock_cid: 10,
550            vmm_pid: Some(1234),
551            state: VmState::Running,
552        };
553        assert_eq!(info.vm_id, "test");
554        assert_eq!(info.vsock_cid, 10);
555        assert_eq!(info.vmm_pid, Some(1234));
556        assert_eq!(info.state, VmState::Running);
557    }
558
559    #[test]
560    fn test_vm_state_variants() {
561        assert_eq!(VmState::Starting, VmState::Starting);
562        assert_eq!(VmState::Running, VmState::Running);
563        assert_eq!(VmState::Paused, VmState::Paused);
564        assert_eq!(VmState::Stopped, VmState::Stopped);
565        assert_eq!(VmState::Failed, VmState::Failed);
566        assert_ne!(VmState::Running, VmState::Stopped);
567    }
568
569    #[test]
570    fn test_vm_info_clone() {
571        let info = VmInfo {
572            vm_id: "clone-test".to_string(),
573            vsock_cid: 20,
574            vmm_pid: None,
575            state: VmState::Paused,
576        };
577        let cloned = info.clone();
578        assert_eq!(cloned.vm_id, "clone-test");
579        assert_eq!(cloned.vsock_cid, 20);
580    }
581
582    #[test]
583    fn test_vm_info_debug() {
584        let info = VmInfo {
585            vm_id: "debug-test".to_string(),
586            vsock_cid: 30,
587            vmm_pid: Some(5678),
588            state: VmState::Running,
589        };
590        let debug = format!("{:?}", info);
591        assert!(debug.contains("debug-test"));
592        assert!(debug.contains("30"));
593    }
594
595    #[test]
596    fn test_pepita_adapter_images_with_string() {
597        let adapter = PepitaAdapter::with_images(
598            String::from("/boot/vmlinuz"),
599            String::from("/var/lib/rootfs.img"),
600        );
601        assert_eq!(adapter.default_kernel.as_deref(), Some("/boot/vmlinuz"));
602        assert_eq!(adapter.default_rootfs.as_deref(), Some("/var/lib/rootfs.img"));
603    }
604
605    #[test]
606    fn test_allocate_cid_sequence() {
607        // Allocate several CIDs and verify they're increasing
608        let cids: Vec<u32> = (0..5).map(|_| PepitaAdapter::allocate_cid()).collect();
609        for i in 1..cids.len() {
610            assert!(cids[i] > cids[i - 1]);
611        }
612    }
613
614    #[tokio::test]
615    async fn test_list_vms_without_pepita() {
616        let adapter = PepitaAdapter::new();
617        // Without pepita installed, this should return empty vec or error
618        let result = adapter.list_vms().await;
619        match result {
620            Ok(vms) => {
621                // Either empty or has some VMs
622                let _ = vms;
623            }
624            Err(_) => {
625                // Error is acceptable if pepita not installed
626            }
627        }
628    }
629
630    #[tokio::test]
631    async fn test_destroy_nonexistent_vm() {
632        let adapter = PepitaAdapter::new();
633        // Should not panic even for nonexistent VM
634        let result = adapter.destroy("nonexistent-vm").await;
635        // Result depends on pepita availability, but should not panic
636        let _ = result;
637    }
638}