use std::pin::Pin;
use arcbox_grpc::v1::machine_service_server;
use arcbox_protocol::v1::{
CreateMachineRequest, CreateMachineResponse, Empty, InspectMachineRequest, ListMachinesRequest,
ListMachinesResponse, MachineAgentRequest, MachineExecOutput, MachineExecRequest, MachineInfo,
MachineNetwork, MachinePingResponse, MachineSummary, MachineSystemInfo, RemoveMachineRequest,
StartMachineRequest, StopMachineRequest,
};
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
use super::{SharedRuntime, SharedRuntimeExt};
pub struct MachineServiceImpl {
runtime: SharedRuntime,
}
impl MachineServiceImpl {
#[must_use]
pub fn new(runtime: SharedRuntime) -> Self {
Self { runtime }
}
}
#[tonic::async_trait]
impl machine_service_server::MachineService for MachineServiceImpl {
async fn create(
&self,
request: Request<CreateMachineRequest>,
) -> Result<Response<CreateMachineResponse>, Status> {
let req = request.into_inner();
let runtime = self.runtime.ready()?;
let memory_mb = req.memory / (1024 * 1024);
let disk_gb = req.disk_size / (1024 * 1024 * 1024);
let config = arcbox_core::machine::MachineConfig {
name: req.name.clone(),
cpus: if req.cpus == 0 {
runtime.config().vm.effective_cpus()
} else {
req.cpus
},
memory_mb,
disk_gb,
kernel: if req.kernel.is_empty() {
None
} else {
Some(req.kernel)
},
cmdline: if req.cmdline.is_empty() {
None
} else {
Some(req.cmdline)
},
distro: if req.distro.is_empty() {
None
} else {
Some(req.distro)
},
distro_version: if req.version.is_empty() {
None
} else {
Some(req.version)
},
block_devices: Vec::new(),
backend: arcbox_core::VmBackend::Hv,
enable_rosetta: false,
};
runtime
.machine_manager()
.create(config)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(CreateMachineResponse { id: req.name }))
}
async fn start(
&self,
request: Request<StartMachineRequest>,
) -> Result<Response<Empty>, Status> {
let id = request.into_inner().id;
self.runtime
.ready()?
.machine_manager()
.start(&id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Empty {}))
}
async fn stop(&self, request: Request<StopMachineRequest>) -> Result<Response<Empty>, Status> {
let id = request.into_inner().id;
self.runtime
.ready()?
.machine_manager()
.stop(&id)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Empty {}))
}
async fn remove(
&self,
request: Request<RemoveMachineRequest>,
) -> Result<Response<Empty>, Status> {
let req = request.into_inner();
self.runtime
.ready()?
.machine_manager()
.remove(&req.id, req.force)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Empty {}))
}
async fn list(
&self,
_request: Request<ListMachinesRequest>,
) -> Result<Response<ListMachinesResponse>, Status> {
let machines = self.runtime.ready()?.machine_manager().list();
let summaries: Vec<_> = machines
.into_iter()
.map(|m| MachineSummary {
id: m.name.clone(),
name: m.name,
state: format!("{:?}", m.state).to_lowercase(),
cpus: m.cpus,
memory: m.memory_mb * 1024 * 1024,
disk_size: m.disk_gb * 1024 * 1024 * 1024,
ip_address: m.ip_address.unwrap_or_default(),
created: m.created_at.timestamp(),
})
.collect();
Ok(Response::new(ListMachinesResponse {
machines: summaries,
}))
}
async fn inspect(
&self,
request: Request<InspectMachineRequest>,
) -> Result<Response<MachineInfo>, Status> {
let id = request.into_inner().id;
let machine = self
.runtime
.ready()?
.machine_manager()
.get(&id)
.ok_or_else(|| Status::not_found("machine not found"))?;
Ok(Response::new(MachineInfo {
id: machine.name.clone(),
name: machine.name,
state: format!("{:?}", machine.state).to_lowercase(),
hardware: Some(arcbox_protocol::v1::MachineHardware {
cpus: machine.cpus,
memory: machine.memory_mb * 1024 * 1024,
arch: std::env::consts::ARCH.to_string(),
}),
network: Some(MachineNetwork {
ip_address: machine.ip_address.clone().unwrap_or_default(),
gateway: String::new(),
mac_address: String::new(),
dns_servers: vec![],
bridge_mac_address: arcbox_core::vm::bridge_nic_mac_for_vm_id(&machine.vm_id),
}),
storage: Some(arcbox_protocol::v1::MachineStorage {
disk_size: machine.disk_gb * 1024 * 1024 * 1024,
disk_format: "raw".to_string(),
disk_path: String::new(),
}),
os: Some(arcbox_protocol::v1::MachineOs {
distro: machine
.distro
.clone()
.unwrap_or_else(|| "linux".to_string()),
version: machine.distro_version.clone().unwrap_or_default(),
kernel: machine.kernel.unwrap_or_default(),
}),
created: None,
started_at: None,
mounts: vec![],
}))
}
async fn ping(
&self,
request: Request<MachineAgentRequest>,
) -> Result<Response<MachinePingResponse>, Status> {
let id = request.into_inner().id;
let mut agent = self
.runtime
.ready()?
.get_agent(&id)
.map_err(|e| Status::internal(e.to_string()))?;
let response = agent
.ping()
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(MachinePingResponse {
message: response.message,
version: response.version,
}))
}
async fn get_system_info(
&self,
request: Request<MachineAgentRequest>,
) -> Result<Response<MachineSystemInfo>, Status> {
let id = request.into_inner().id;
let mut agent = self
.runtime
.ready()?
.get_agent(&id)
.map_err(|e| Status::internal(e.to_string()))?;
let info = agent
.get_system_info()
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(MachineSystemInfo {
kernel_version: info.kernel_version,
os_name: info.os_name,
os_version: info.os_version,
arch: info.arch,
total_memory: info.total_memory,
available_memory: info.available_memory,
cpu_count: info.cpu_count,
load_average: info.load_average,
hostname: info.hostname,
uptime: info.uptime,
ip_addresses: info.ip_addresses,
}))
}
type ExecStream =
Pin<Box<dyn Stream<Item = Result<MachineExecOutput, Status>> + Send + 'static>>;
async fn exec(
&self,
_request: Request<MachineExecRequest>,
) -> Result<Response<Self::ExecStream>, Status> {
Err(Status::unimplemented(
"machine exec is no longer supported by guest agent RPC",
))
}
async fn ssh_info(
&self,
_request: Request<arcbox_protocol::v1::SshInfoRequest>,
) -> Result<Response<arcbox_protocol::v1::SshInfoResponse>, Status> {
Err(Status::unimplemented("ssh_info not implemented"))
}
}