use async_trait::async_trait;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use fcctl_core::firecracker::models::SnapshotType;
use fcctl_core::snapshot::SnapshotManager;
use fcctl_core::vm::VmManager;
use terraphim_firecracker::{PoolConfig, Sub2SecondOptimizer, VmPoolManager};
use super::ssh::SshExecutor;
use super::{Capability, ExecutionContext, ExecutionResult, SnapshotId, ValidationResult};
use crate::config::{BackendType, RlmConfig};
use crate::error::{RlmError, RlmResult};
use crate::types::SessionId;
use crate::validator::KnowledgeGraphValidator;
pub struct FirecrackerExecutor {
config: RlmConfig,
vm_manager: tokio::sync::Mutex<Option<VmManager>>,
snapshot_manager: tokio::sync::Mutex<Option<SnapshotManager>>,
pool_manager: parking_lot::RwLock<Option<Arc<VmPoolManager>>>,
ssh_executor: SshExecutor,
capabilities: Vec<Capability>,
session_to_vm: parking_lot::RwLock<HashMap<SessionId, String>>,
current_snapshot: parking_lot::RwLock<HashMap<SessionId, String>>,
snapshot_counts: parking_lot::RwLock<HashMap<SessionId, u32>>,
validator: Option<Arc<KnowledgeGraphValidator>>,
}
impl FirecrackerExecutor {
pub fn new(
config: RlmConfig,
validator: Option<Arc<KnowledgeGraphValidator>>,
) -> Result<Self, RlmError> {
if !super::is_kvm_available() {
return Err(RlmError::BackendInitFailed {
backend: "firecracker".to_string(),
message: "KVM is not available (/dev/kvm does not exist)".to_string(),
});
}
let capabilities = vec![
Capability::VmIsolation,
Capability::Snapshots,
Capability::NetworkAudit,
Capability::OverlayFs,
Capability::LlmBridge,
Capability::DnsAllowlist,
Capability::ResourceLimits,
Capability::PythonExecution,
Capability::BashExecution,
Capability::FileOperations,
];
let ssh_executor = SshExecutor::new()
.with_user("root")
.with_private_key("/tmp/ubuntu-22.04.id_rsa")
.with_output_dir(std::env::temp_dir().join("terraphim_rlm_output"));
Ok(Self {
config,
vm_manager: tokio::sync::Mutex::new(None),
snapshot_manager: tokio::sync::Mutex::new(None),
pool_manager: parking_lot::RwLock::new(None),
ssh_executor,
capabilities,
session_to_vm: parking_lot::RwLock::new(HashMap::new()),
current_snapshot: parking_lot::RwLock::new(HashMap::new()),
snapshot_counts: parking_lot::RwLock::new(HashMap::new()),
validator,
})
}
pub async fn initialize(&self) -> RlmResult<()> {
log::info!("Initializing FirecrackerExecutor with fcctl-core managers");
let firecracker_bin = PathBuf::from("/usr/local/bin/firecracker");
let socket_base_path = PathBuf::from("/tmp/firecracker-sockets");
if !socket_base_path.exists() {
std::fs::create_dir_all(&socket_base_path).map_err(|e| {
RlmError::BackendInitFailed {
backend: "firecracker".to_string(),
message: format!("Failed to create socket directory: {}", e),
}
})?;
}
let vm_manager =
VmManager::new(&firecracker_bin, &socket_base_path, None).map_err(|e| {
RlmError::BackendInitFailed {
backend: "firecracker".to_string(),
message: format!("Failed to create VmManager: {}", e),
}
})?;
*self.vm_manager.lock().await = Some(vm_manager);
let snapshots_dir = PathBuf::from("/var/lib/terraphim/snapshots");
if !snapshots_dir.exists() {
std::fs::create_dir_all(&snapshots_dir).map_err(|e| RlmError::BackendInitFailed {
backend: "firecracker".to_string(),
message: format!("Failed to create snapshots directory: {}", e),
})?;
}
let snapshot_manager = SnapshotManager::new(&snapshots_dir, None).map_err(|e| {
RlmError::BackendInitFailed {
backend: "firecracker".to_string(),
message: format!("Failed to create SnapshotManager: {}", e),
}
})?;
*self.snapshot_manager.lock().await = Some(snapshot_manager);
log::info!("FirecrackerExecutor initialized successfully");
Ok(())
}
#[allow(dead_code)]
async fn ensure_pool(&self) -> Result<Arc<VmPoolManager>, RlmError> {
if let Some(ref pool) = *self.pool_manager.read() {
return Ok(Arc::clone(pool));
}
log::info!(
"Initializing Firecracker VM pool (min={}, max={}, target={})",
self.config.pool_min_size,
self.config.pool_max_size,
self.config.pool_target_size
);
let _pool_config = PoolConfig {
min_pool_size: self.config.pool_min_size,
max_pool_size: self.config.pool_max_size,
target_pool_size: self.config.pool_target_size,
allocation_timeout: std::time::Duration::from_millis(self.config.allocation_timeout_ms),
..Default::default()
};
let _optimizer = Arc::new(Sub2SecondOptimizer::new());
log::warn!("FirecrackerExecutor: VM pool initialization not yet fully implemented");
Err(RlmError::BackendInitFailed {
backend: "firecracker".to_string(),
message: "VM pool initialization requires VmManager integration".to_string(),
})
}
async fn get_or_allocate_vm(&self, session_id: &SessionId) -> RlmResult<Option<String>> {
{
let session_to_vm = self.session_to_vm.read();
if let Some(vm_id) = session_to_vm.get(session_id) {
log::debug!("Using existing VM for session {}: {}", session_id, vm_id);
return Ok(Some(vm_id.clone()));
}
}
let mut vm_manager_guard = self.vm_manager.lock().await;
if let Some(ref mut vm_manager) = *vm_manager_guard {
log::info!("Creating new VM for session {}", session_id);
let vm_ip = vm_manager.get_vm_ip("placeholder").ok();
let boot_args = if let Some(ref ip) = vm_ip {
format!(
"console=ttyS0 reboot=k panic=1 pci=off ip={}::172.26.0.1:255.255.255.0::eth0:off",
ip
)
} else {
"console=ttyS0 reboot=k panic=1 pci=off".to_string()
};
let vm_config = fcctl_core::firecracker::VmConfig {
kernel_path: "/tmp/vmlinux-5.10.225".to_string(),
rootfs_path: "/tmp/ubuntu-22.04.ext4".to_string(),
vcpus: 2,
memory_mb: 1024,
initrd_path: None,
boot_args: Some(boot_args),
vm_type: fcctl_core::firecracker::VmType::Minimal,
};
match vm_manager.create_vm(&vm_config, None).await {
Ok(vm_id) => {
log::info!("Created VM {} for session {}", vm_id, session_id);
drop(vm_manager_guard); self.session_to_vm
.write()
.insert(*session_id, vm_id.clone());
return Ok(Some(vm_id));
}
Err(e) => {
log::error!("Failed to create VM for session {}: {}", session_id, e);
}
}
} else {
log::debug!(
"VmManager not initialized, cannot create VM for session {}",
session_id
);
}
Ok(None)
}
pub fn assign_vm_to_session(&self, session_id: SessionId, vm_id: String) {
log::info!("Assigning VM {} to session {}", vm_id, session_id);
self.session_to_vm.write().insert(session_id, vm_id);
}
pub fn release_session_vm(&self, session_id: &SessionId) -> Option<String> {
self.session_to_vm.write().remove(session_id)
}
pub fn get_current_snapshot(&self, session_id: &SessionId) -> Option<String> {
self.current_snapshot.read().get(session_id).cloned()
}
fn set_current_snapshot(&self, session_id: &SessionId, snapshot_id: String) {
self.current_snapshot
.write()
.insert(*session_id, snapshot_id);
}
fn clear_current_snapshot(&self, session_id: &SessionId) {
self.current_snapshot.write().remove(session_id);
}
pub async fn rollback(&self, session_id: &SessionId) -> Result<(), RlmError> {
let current = self.get_current_snapshot(session_id);
match current {
Some(snapshot_id) => {
log::warn!(
"Rolling back session {} to snapshot '{}'",
session_id,
snapshot_id
);
let vm_id = self.session_to_vm.read().get(session_id).cloned();
if let Some(vm_id) = vm_id {
let mut snapshot_manager_guard = self.snapshot_manager.lock().await;
let mut vm_manager_guard = self.vm_manager.lock().await;
if let (Some(snapshot_manager), Some(vm_manager)) =
(&mut *snapshot_manager_guard, &mut *vm_manager_guard)
{
let vm_client = vm_manager.get_vm_client(&vm_id).await.map_err(|e| {
RlmError::SnapshotRestoreFailed {
message: format!("Failed to get VM client: {}", e),
}
})?;
snapshot_manager
.restore_snapshot(vm_client, &snapshot_id)
.await
.map_err(|e| RlmError::SnapshotRestoreFailed {
message: format!("Rollback failed: {}", e),
})?;
}
}
Ok(())
}
None => {
log::warn!(
"No current snapshot for session {}, rollback is a no-op",
session_id
);
Ok(())
}
}
}
async fn execute_in_vm(
&self,
code: &str,
is_python: bool,
ctx: &ExecutionContext,
) -> Result<ExecutionResult, RlmError> {
let start = Instant::now();
log::debug!(
"FirecrackerExecutor::execute_in_vm called (python={}, session={})",
is_python,
ctx.session_id
);
let vm_id = self.get_or_allocate_vm(&ctx.session_id).await?;
match vm_id {
Some(ref id) => {
let vm_ip = {
let vm_manager_guard = self.vm_manager.lock().await;
if let Some(ref vm_manager) = *vm_manager_guard {
vm_manager.get_vm_ip(id).ok()
} else {
None
}
};
match vm_ip {
Some(ip) => {
log::info!(
"Executing on VM {} ({}) for session {}",
id,
ip,
ctx.session_id
);
let ssh_ready =
tokio::time::timeout(std::time::Duration::from_secs(30), async {
for i in 0..30 {
if self.ssh_executor.check_connection(&ip).await {
log::info!("VM {} SSH ready after {} attempts", id, i + 1);
return true;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
false
})
.await
.unwrap_or(false);
if !ssh_ready {
log::warn!("VM {} SSH not ready, returning stub", id);
return self.stub_response(code, start);
}
let result = if is_python {
self.ssh_executor.execute_python(&ip, code, ctx).await
} else {
self.ssh_executor.execute_command(&ip, code, ctx).await
};
match result {
Ok(mut res) => {
res.metadata.insert("vm_id".to_string(), id.clone());
res.metadata.insert("vm_ip".to_string(), ip);
res.metadata
.insert("backend".to_string(), "firecracker".to_string());
Ok(res)
}
Err(e) => {
log::error!("VM execution failed: {}", e);
Err(e)
}
}
}
None => {
log::warn!("VM {} has no IP assigned", id);
self.stub_response(code, start)
}
}
}
None => self.stub_response(code, start),
}
}
fn stub_response(&self, code: &str, start: Instant) -> Result<ExecutionResult, RlmError> {
let execution_time = start.elapsed().as_millis() as u64;
Ok(ExecutionResult {
stdout: format!(
"[FirecrackerExecutor] No VM available. Would execute: {}",
if code.len() > 100 {
format!("{}...", &code[..100])
} else {
code.to_string()
}
),
stderr: "Warning: No VM allocated for this session. \
Call initialize() and assign_vm_to_session() first."
.to_string(),
exit_code: 0,
execution_time_ms: execution_time,
output_truncated: false,
output_file_path: None,
timed_out: false,
metadata: {
let mut m = HashMap::new();
m.insert("stub".to_string(), "true".to_string());
m.insert("backend".to_string(), "firecracker".to_string());
m
},
})
}
}
#[async_trait]
impl super::ExecutionEnvironment for FirecrackerExecutor {
type Error = RlmError;
async fn execute_code(
&self,
code: &str,
ctx: &ExecutionContext,
) -> Result<ExecutionResult, Self::Error> {
self.execute_in_vm(code, true, ctx).await
}
async fn execute_command(
&self,
cmd: &str,
ctx: &ExecutionContext,
) -> Result<ExecutionResult, Self::Error> {
self.execute_in_vm(cmd, false, ctx).await
}
async fn validate(&self, input: &str) -> Result<ValidationResult, Self::Error> {
match self.validator.as_ref() {
Some(validator) if !input.trim().is_empty() => {
let vr = validator.validate(input)?;
Ok(ValidationResult::from_validator_result(
&vr,
self.config.kg_strictness,
))
}
_ => Ok(ValidationResult::valid(Vec::new())),
}
}
async fn create_snapshot(
&self,
session_id: &SessionId,
name: &str,
) -> Result<SnapshotId, Self::Error> {
log::info!("Creating snapshot '{}' for session {}", name, session_id);
let count = *self.snapshot_counts.read().get(session_id).unwrap_or(&0);
if count >= self.config.max_snapshots_per_session {
return Err(RlmError::MaxSnapshotsReached {
max: self.config.max_snapshots_per_session,
});
}
let vm_id = self
.session_to_vm
.read()
.get(session_id)
.cloned()
.ok_or_else(|| RlmError::SnapshotCreationFailed {
message: "No VM assigned to session".to_string(),
})?;
let snapshot_id = {
let mut snapshot_manager_guard = self.snapshot_manager.lock().await;
let mut vm_manager_guard = self.vm_manager.lock().await;
match (&mut *snapshot_manager_guard, &mut *vm_manager_guard) {
(Some(snapshot_manager), Some(vm_manager)) => {
let vm_client = vm_manager.get_vm_client(&vm_id).await.map_err(|e| {
RlmError::SnapshotCreationFailed {
message: format!("Failed to get VM client: {}", e),
}
})?;
snapshot_manager
.create_snapshot(vm_client, &vm_id, name, SnapshotType::Full, None)
.await
.map_err(|e| RlmError::SnapshotCreationFailed {
message: format!("Snapshot creation failed: {}", e),
})?
}
(None, _) => {
return Err(RlmError::SnapshotCreationFailed {
message: "SnapshotManager not initialized".to_string(),
});
}
(_, None) => {
return Err(RlmError::SnapshotCreationFailed {
message: "VmManager not initialized".to_string(),
});
}
}
};
*self.snapshot_counts.write().entry(*session_id).or_insert(0) += 1;
let result = SnapshotId::new(name, *session_id);
log::info!(
"Snapshot '{}' ({}) created for session {}",
name,
snapshot_id,
session_id
);
Ok(result)
}
async fn restore_snapshot(&self, id: &SnapshotId) -> Result<(), Self::Error> {
log::info!(
"Restoring snapshot '{}' ({}) for session {}",
id.name,
id.id,
id.session_id
);
let vm_id = self
.session_to_vm
.read()
.get(&id.session_id)
.cloned()
.ok_or_else(|| RlmError::SnapshotRestoreFailed {
message: "No VM assigned to session".to_string(),
})?;
{
let mut snapshot_manager_guard = self.snapshot_manager.lock().await;
let mut vm_manager_guard = self.vm_manager.lock().await;
match (&mut *snapshot_manager_guard, &mut *vm_manager_guard) {
(Some(snapshot_manager), Some(vm_manager)) => {
let vm_client = vm_manager.get_vm_client(&vm_id).await.map_err(|e| {
RlmError::SnapshotRestoreFailed {
message: format!("Failed to get VM client: {}", e),
}
})?;
snapshot_manager
.restore_snapshot(vm_client, &id.id.to_string())
.await
.map_err(|e| RlmError::SnapshotRestoreFailed {
message: format!("Snapshot restore failed: {}", e),
})?;
}
(None, _) => {
return Err(RlmError::SnapshotRestoreFailed {
message: "SnapshotManager not initialized".to_string(),
});
}
(_, None) => {
return Err(RlmError::SnapshotRestoreFailed {
message: "VmManager not initialized".to_string(),
});
}
}
}
self.set_current_snapshot(&id.session_id, id.id.to_string());
log::info!(
"Snapshot '{}' restored for session {}",
id.name,
id.session_id
);
Ok(())
}
async fn list_snapshots(&self, session_id: &SessionId) -> Result<Vec<SnapshotId>, Self::Error> {
let vm_id = self.session_to_vm.read().get(session_id).cloned();
if vm_id.is_none() {
log::debug!(
"No VM assigned to session {}, returning empty snapshot list",
session_id
);
return Ok(Vec::new());
}
log::debug!(
"list_snapshots for session {} (vm={})",
session_id,
vm_id.unwrap()
);
Ok(Vec::new())
}
async fn delete_snapshot(&self, id: &SnapshotId) -> Result<(), Self::Error> {
log::info!(
"Deleting snapshot '{}' ({}) from session {}",
id.name,
id.id,
id.session_id
);
{
let mut snapshot_manager_guard = self.snapshot_manager.lock().await;
if let Some(snapshot_manager) = &mut *snapshot_manager_guard {
snapshot_manager
.delete_snapshot(&id.id.to_string(), true)
.await
.map_err(|e| RlmError::SnapshotNotFound {
snapshot_id: format!("Delete failed: {}", e),
})?;
} else {
return Err(RlmError::SnapshotNotFound {
snapshot_id: "SnapshotManager not initialized".to_string(),
});
}
}
if let Some(count) = self.snapshot_counts.write().get_mut(&id.session_id) {
*count = count.saturating_sub(1);
}
log::debug!("Snapshot {} deleted", id.id);
Ok(())
}
async fn delete_session_snapshots(&self, session_id: &SessionId) -> Result<(), Self::Error> {
log::info!("Deleting all snapshots for session {}", session_id);
let vm_id = self.session_to_vm.read().get(session_id).cloned();
if let Some(vm_id) = vm_id {
let mut snapshot_manager_guard = self.snapshot_manager.lock().await;
if let Some(snapshot_manager) = &mut *snapshot_manager_guard {
let snapshots = snapshot_manager
.list_snapshots(Some(&vm_id))
.await
.unwrap_or_default();
for snapshot in snapshots {
if let Err(e) = snapshot_manager.delete_snapshot(&snapshot.id, true).await {
log::warn!("Failed to delete snapshot {}: {}", snapshot.id, e);
}
}
log::info!(
"Deleted snapshots for session {} (vm={})",
session_id,
vm_id
);
}
}
self.snapshot_counts.write().remove(session_id);
self.clear_current_snapshot(session_id);
Ok(())
}
fn capabilities(&self) -> &[Capability] {
&self.capabilities
}
fn backend_type(&self) -> BackendType {
BackendType::Firecracker
}
async fn health_check(&self) -> Result<bool, Self::Error> {
if !super::is_kvm_available() {
return Ok(false);
}
let vm_manager_initialized = self.vm_manager.lock().await.is_some();
let snapshot_manager_initialized = self.snapshot_manager.lock().await.is_some();
if !vm_manager_initialized || !snapshot_manager_initialized {
log::warn!("FirecrackerExecutor not fully initialized");
return Ok(false);
}
Ok(true)
}
async fn cleanup(&self) -> Result<(), Self::Error> {
log::info!("FirecrackerExecutor::cleanup called");
self.session_to_vm.write().clear();
self.current_snapshot.write().clear();
self.snapshot_counts.write().clear();
Ok(())
}
async fn end_session(&self, session_id: &SessionId) -> Result<(), Self::Error> {
if let Some(vm_id) = self.release_session_vm(session_id) {
log::debug!("end_session({}) released vm {}", session_id, vm_id);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::ExecutionEnvironment;
#[test]
fn test_firecracker_executor_capabilities() {
if !super::super::is_kvm_available() {
eprintln!("Skipping test: KVM not available");
return;
}
let config = RlmConfig::default();
let executor = FirecrackerExecutor::new(config).unwrap();
assert!(executor.has_capability(Capability::VmIsolation));
assert!(executor.has_capability(Capability::Snapshots));
assert!(executor.has_capability(Capability::PythonExecution));
assert!(!executor.has_capability(Capability::ContainerIsolation));
}
#[test]
fn test_firecracker_executor_requires_kvm() {
if super::super::is_kvm_available() {
eprintln!("Skipping test: KVM is available");
return;
}
let config = RlmConfig::default();
let result = FirecrackerExecutor::new(config);
assert!(result.is_err());
}
#[tokio::test]
async fn test_session_vm_assignment() {
if !super::super::is_kvm_available() {
eprintln!("Skipping test: KVM not available");
return;
}
let config = RlmConfig::default();
let executor = FirecrackerExecutor::new(config).unwrap();
let session_id = SessionId::new();
assert!(executor.session_to_vm.read().get(&session_id).is_none());
executor.assign_vm_to_session(session_id, "vm-test-123".to_string());
assert_eq!(
executor.session_to_vm.read().get(&session_id),
Some(&"vm-test-123".to_string())
);
let released = executor.release_session_vm(&session_id);
assert_eq!(released, Some("vm-test-123".to_string()));
assert!(executor.session_to_vm.read().get(&session_id).is_none());
}
#[tokio::test]
async fn test_current_snapshot_tracking() {
if !super::super::is_kvm_available() {
eprintln!("Skipping test: KVM not available");
return;
}
let config = RlmConfig::default();
let executor = FirecrackerExecutor::new(config).unwrap();
let session_id = SessionId::new();
assert!(executor.get_current_snapshot(&session_id).is_none());
executor.set_current_snapshot(&session_id, "snap-123".to_string());
assert_eq!(
executor.get_current_snapshot(&session_id),
Some("snap-123".to_string())
);
executor.clear_current_snapshot(&session_id);
assert!(executor.get_current_snapshot(&session_id).is_none());
}
#[tokio::test]
async fn test_rollback_without_current_snapshot() {
if !super::super::is_kvm_available() {
eprintln!("Skipping test: KVM not available");
return;
}
let config = RlmConfig::default();
let executor = FirecrackerExecutor::new(config).unwrap();
let session_id = SessionId::new();
let result = executor.rollback(&session_id).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_health_check_without_initialization() {
if !super::super::is_kvm_available() {
eprintln!("Skipping test: KVM not available");
return;
}
let config = RlmConfig::default();
let executor = FirecrackerExecutor::new(config).unwrap();
let result = executor.health_check().await.unwrap();
assert!(!result);
}
}