#[cfg(feature = "vm-sandbox")]
use super::hypervisor::{HypervisorInstance, ServiceVmConfig};
use super::native::ProcessHandle;
use crate::config::BlueprintManagerContext;
use crate::error::{Error, Result};
use crate::rt::ResourceLimits;
#[cfg(feature = "containers")]
use crate::rt::container::ContainerInstance;
#[cfg(feature = "remote-providers")]
use crate::rt::remote::RemoteServiceInstance;
use crate::sources::{BlueprintArgs, BlueprintEnvVars};
use blueprint_client_tangle::ConfidentialityPolicy;
use blueprint_core::error;
use blueprint_core::{info, warn};
use blueprint_manager_bridge::server::{Bridge, BridgeHandle};
use blueprint_runner::config::BlueprintEnvironment;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Status {
NotStarted,
Pending,
Running,
Finished,
Error,
Unknown,
}
struct NativeProcessInfo {
#[expect(unused, reason = "Host processes aren't resource constrained yet")]
limits: ResourceLimits,
binary_path: PathBuf,
service_name: String,
env_vars: BlueprintEnvVars,
arguments: BlueprintArgs,
}
enum NativeProcess {
NotStarted(NativeProcessInfo),
Started(ProcessHandle),
}
enum Runtime {
#[cfg(feature = "vm-sandbox")]
Hypervisor(HypervisorInstance),
#[cfg(feature = "containers")]
Container(ContainerInstance),
#[cfg(feature = "remote-providers")]
Remote(RemoteServiceInstance),
Native(NativeProcess),
}
pub struct Service {
runtime: Runtime,
bridge: BridgeHandle,
alive_rx: Option<oneshot::Receiver<()>>,
}
async fn create_bridge(
ctx: &BlueprintManagerContext,
runtime_dir: impl AsRef<Path>,
service_name: &str,
no_vm: bool,
expected_service_id: Option<u64>,
) -> Result<(PathBuf, BridgeHandle, oneshot::Receiver<()>)> {
let db = ctx
.db()
.await
.expect("not possible to get to this point without a db set");
let bridge = Bridge::new(
runtime_dir.as_ref().to_path_buf(),
service_name.to_string(),
db,
no_vm,
expected_service_id,
);
let bridge_base_socket = bridge.base_socket_path();
let (handle, alive_rx) = bridge.spawn().map_err(|e| {
error!("Failed to spawn manager <-> service bridge: {e}");
e
})?;
Ok((bridge_base_socket, handle, alive_rx))
}
impl Service {
#[allow(clippy::too_many_arguments)]
pub async fn from_binary(
ctx: &BlueprintManagerContext,
limits: ResourceLimits,
_blueprint_config: &BlueprintEnvironment,
_id: u32,
env: BlueprintEnvVars,
args: BlueprintArgs,
binary_path: &Path,
sub_service_str: &str,
_cache_dir: &Path,
runtime_dir: &Path,
) -> Result<Service> {
#[cfg(feature = "vm-sandbox")]
if !ctx.vm_sandbox_options.no_vm {
return Service::new_vm(
ctx,
limits,
ServiceVmConfig {
id: _id,
..Default::default()
},
&_blueprint_config.data_dir,
&_blueprint_config.keystore_uri,
_cache_dir,
runtime_dir,
sub_service_str,
binary_path,
env,
args,
)
.await;
}
Service::new_native(
ctx,
limits,
runtime_dir,
sub_service_str,
binary_path,
env,
args,
)
.await
}
#[allow(clippy::too_many_arguments)]
#[cfg(feature = "vm-sandbox")]
pub async fn new_vm(
ctx: &BlueprintManagerContext,
limits: ResourceLimits,
vm_config: ServiceVmConfig,
data_dir: impl AsRef<Path>,
keystore: impl AsRef<Path>,
cache_dir: impl AsRef<Path>,
runtime_dir: impl AsRef<Path>,
service_name: &str,
binary_path: impl AsRef<Path>,
mut env_vars: BlueprintEnvVars,
arguments: BlueprintArgs,
) -> Result<Service> {
env_vars.bridge_socket_path = None;
let (bridge_base_socket, bridge_handle, alive_rx) =
create_bridge(ctx, runtime_dir.as_ref(), service_name, false, None).await?;
let mut hypervisor = HypervisorInstance::new(
ctx,
limits,
vm_config,
cache_dir.as_ref(),
runtime_dir.as_ref(),
service_name,
)?;
hypervisor
.prepare(
ctx,
keystore,
data_dir,
cache_dir,
bridge_base_socket,
binary_path.as_ref(),
env_vars,
arguments,
)
.await?;
Ok(Self {
runtime: Runtime::Hypervisor(hypervisor),
bridge: bridge_handle,
alive_rx: Some(alive_rx),
})
}
#[allow(clippy::too_many_arguments)]
#[cfg(feature = "containers")]
pub async fn new_container(
ctx: &BlueprintManagerContext,
limits: ResourceLimits,
runtime_dir: impl AsRef<Path>,
service_name: &str,
image: String,
mut env_vars: BlueprintEnvVars,
arguments: BlueprintArgs,
confidentiality_policy: ConfidentialityPolicy,
debug: bool,
) -> Result<Service> {
let (bridge_base_socket, bridge_handle, alive_rx) =
create_bridge(ctx, runtime_dir.as_ref(), service_name, false, None).await?;
env_vars.bridge_socket_path = Some(bridge_base_socket);
let instance = ContainerInstance::new(
ctx,
limits,
service_name,
image,
env_vars,
arguments,
confidentiality_policy,
debug,
)
.await?;
Ok(Self {
runtime: Runtime::Container(instance),
bridge: bridge_handle,
alive_rx: Some(alive_rx),
})
}
#[allow(clippy::too_many_arguments)]
pub async fn new_native(
ctx: &BlueprintManagerContext,
limits: ResourceLimits,
runtime_dir: impl AsRef<Path>,
service_name: &str,
binary_path: impl AsRef<Path>,
mut env_vars: BlueprintEnvVars,
arguments: BlueprintArgs,
) -> Result<Service> {
let (bridge_base_socket, bridge_handle, alive_rx) =
create_bridge(ctx, runtime_dir.as_ref(), service_name, true, None).await?;
env_vars.bridge_socket_path = Some(bridge_base_socket);
Ok(Self {
runtime: Runtime::Native(NativeProcess::NotStarted(NativeProcessInfo {
limits,
binary_path: binary_path.as_ref().to_path_buf(),
service_name: service_name.to_string(),
env_vars,
arguments,
})),
bridge: bridge_handle,
alive_rx: Some(alive_rx),
})
}
#[cfg(feature = "remote-providers")]
pub async fn new_remote(
ctx: &BlueprintManagerContext,
runtime_dir: impl AsRef<Path>,
service_name: &str,
remote_instance: RemoteServiceInstance,
) -> Result<Service> {
let (_, bridge_handle, alive_rx) =
create_bridge(ctx, runtime_dir.as_ref(), service_name, true, None).await?;
Ok(Self {
runtime: Runtime::Remote(remote_instance),
bridge: bridge_handle,
alive_rx: Some(alive_rx),
})
}
#[allow(clippy::unused_async)]
pub async fn status(&mut self) -> Result<Status> {
match &mut self.runtime {
#[cfg(feature = "vm-sandbox")]
Runtime::Hypervisor(hypervisor) => hypervisor.status().await,
#[cfg(feature = "containers")]
Runtime::Container(container) => container.status().await,
#[cfg(feature = "remote-providers")]
Runtime::Remote(remote) => remote.status().await,
Runtime::Native(NativeProcess::Started(instance)) => Ok(instance.status()),
Runtime::Native(NativeProcess::NotStarted(_)) => Ok(Status::NotStarted),
}
}
#[allow(clippy::unused_async)]
pub async fn start(&mut self) -> Result<Option<impl Future<Output = Result<()>> + use<>>> {
let Some(alive_rx) = self.alive_rx.take() else {
error!("Service already started!");
return Ok(None);
};
match &mut self.runtime {
#[cfg(feature = "vm-sandbox")]
Runtime::Hypervisor(hypervisor) => {
hypervisor.start().await.map_err(|e| {
error!("Failed to start hypervisor: {e}");
e
})?;
}
#[cfg(feature = "containers")]
Runtime::Container(container) => {
container.start().await.map_err(|e| {
error!("Failed to start container: {e}");
e
})?;
}
#[cfg(feature = "remote-providers")]
Runtime::Remote(remote) => {
remote.start().await.map_err(|e| {
error!("Failed to start remote service: {e}");
e
})?;
return Ok(None);
}
Runtime::Native(instance) => match instance {
NativeProcess::NotStarted(info) => {
let args = info.arguments.encode(true);
let env_vars = info.env_vars.encode();
info!(
"Spawning native process: {} with args: {:?}",
info.binary_path.display(),
args
);
let process_handle = tokio::process::Command::new(&info.binary_path)
.kill_on_drop(true)
.stdin(std::process::Stdio::null())
.current_dir(&std::env::current_dir()?)
.envs(env_vars)
.args(args)
.spawn()?;
let handle =
generate_running_process_status_handle(process_handle, &info.service_name);
*instance = NativeProcess::Started(handle);
}
NativeProcess::Started(_) => {
error!("Service already started!");
return Ok(None);
}
},
}
Ok(Some(async move {
if time::timeout(Duration::from_secs(480), alive_rx)
.await
.is_err()
{
error!("Service never connected to bridge (network error?)");
return Err(Error::Other("Bridge connection timeout".into()));
}
Ok(())
}))
}
#[allow(clippy::unused_async)]
pub async fn shutdown(self) -> Result<()> {
match self.runtime {
#[cfg(feature = "vm-sandbox")]
Runtime::Hypervisor(hypervisor) => {
hypervisor.shutdown().await.map_err(|e| {
error!("Failed to shut down hypervisor: {e}");
e
})?;
}
#[cfg(feature = "containers")]
Runtime::Container(container) => {
container.shutdown().await.map_err(|e| {
error!("Failed to shut down container instance: {e}");
e
})?;
}
#[cfg(feature = "remote-providers")]
Runtime::Remote(mut remote) => {
remote.shutdown().await.map_err(|e| {
error!("Failed to shut down remote service: {e}");
e
})?;
}
Runtime::Native(NativeProcess::Started(instance)) => {
if !instance.abort() {
error!("Failed to abort service");
}
}
_ => warn!("No process attached"),
}
self.bridge.shutdown();
Ok(())
}
#[must_use]
#[cfg(feature = "vm-sandbox")]
pub fn hypervisor(&self) -> Option<&HypervisorInstance> {
match &self.runtime {
Runtime::Hypervisor(hypervisor) => Some(hypervisor),
_ => None,
}
}
}
#[must_use]
fn generate_running_process_status_handle(
process: tokio::process::Child,
service_name: &str,
) -> ProcessHandle {
let (abort_tx, abort_rx) = tokio::sync::oneshot::channel::<()>();
let (status_tx, status_rx) = tokio::sync::mpsc::unbounded_channel::<Status>();
let service_name = service_name.to_string();
let service_name_clone = service_name.clone();
let task = async move {
info!("Starting process execution for {service_name}");
let _ = status_tx.send(Status::Running);
let output = process.wait_with_output().await;
if output.is_ok() {
let _ = status_tx.send(Status::Finished).ok();
} else {
let _ = status_tx.send(Status::Error).ok();
}
warn!("Process for {service_name} exited: {output:?}");
};
let task = async move {
tokio::select! {
_ = abort_rx => {
info!("Abort signal received for {service_name_clone}");
},
() = task => {},
}
};
tokio::spawn(task);
ProcessHandle::new(status_rx, abort_tx)
}