use crate::config::{AuthProxyOpts, BlueprintManagerContext};
use crate::error::Error;
use crate::error::Result;
#[cfg(feature = "vm-sandbox")]
use crate::rt::hypervisor::net;
use crate::sdk::entry::SendFuture;
use blueprint_auth::db::RocksDb;
use blueprint_core::{error, info};
use blueprint_keystore::{Keystore, KeystoreConfig};
use blueprint_runner::config::BlueprintEnvironment;
use color_eyre::Report;
use color_eyre::eyre::OptionExt;
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::task::JoinHandle;
#[cfg(feature = "remote-providers")]
pub(crate) mod remote_provider_integration;
#[cfg(feature = "remote-providers")]
pub mod capabilities;
pub struct BlueprintManagerHandle {
shutdown_call: Option<tokio::sync::oneshot::Sender<()>>,
start_tx: Option<tokio::sync::oneshot::Sender<()>>,
running_task: JoinHandle<color_eyre::Result<()>>,
span: tracing::Span,
keystore_uri: String,
}
impl BlueprintManagerHandle {
pub fn start(&mut self) -> color_eyre::Result<()> {
let _span = self.span.enter();
match self.start_tx.take() {
Some(tx) => match tx.send(()) {
Ok(()) => {
info!("Start signal sent to Blueprint Manager");
Ok(())
}
Err(()) => Err(Report::msg(
"Failed to send start signal to Blueprint Manager",
)),
},
None => Err(Report::msg("Blueprint Manager Already Started")),
}
}
pub fn shutdown(&mut self) -> color_eyre::Result<()> {
self.shutdown_call
.take()
.map(|tx| tx.send(()))
.ok_or_eyre("Shutdown already called")?
.map_err(|()| Report::msg("Failed to send shutdown signal to Blueprint Manager"))
}
#[must_use]
pub fn keystore_uri(&self) -> &str {
&self.keystore_uri
}
#[must_use]
pub fn span(&self) -> &tracing::Span {
&self.span
}
}
impl Drop for BlueprintManagerHandle {
fn drop(&mut self) {
let _ = self.start();
}
}
impl Future for BlueprintManagerHandle {
type Output = color_eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.start_tx.is_some() {
if let Err(err) = this.start() {
return Poll::Ready(Err(err));
}
}
let result = futures::ready!(Pin::new(&mut this.running_task).poll(cx));
match result {
Ok(res) => Poll::Ready(res),
Err(err) => Poll::Ready(Err(Report::msg(format!(
"Blueprint Manager Closed Unexpectedly (JoinError): {err:?}"
)))),
}
}
}
#[allow(clippy::used_underscore_binding)]
pub async fn run_blueprint_manager_with_keystore<F: SendFuture<'static, ()>>(
#[allow(unused_mut)] mut ctx: BlueprintManagerContext,
_keystore: Keystore,
env: BlueprintEnvironment,
shutdown_cmd: F,
) -> color_eyre::Result<BlueprintManagerHandle> {
let logger_id = if let Some(custom_id) = &ctx.instance_id {
custom_id.as_str()
} else {
"Local"
};
let span = tracing::info_span!("Blueprint-Manager", id = logger_id);
let _span = span.enter();
info!("Starting blueprint manager ... waiting for start signal ...");
let (db, auth_proxy_task) =
run_auth_proxy(ctx.data_dir().to_path_buf(), ctx.auth_proxy_opts.clone()).await?;
ctx.set_db(db).await;
let mut active_blueprints = HashMap::new();
let keystore_uri = env.keystore_uri.clone();
#[cfg(feature = "vm-sandbox")]
let network_interface = ctx.vm.network_interface.clone();
let manager_task = async move {
let protocol_type: crate::protocol::ProtocolType = (&env.protocol_settings).into();
info!(
"Initializing blueprint manager for protocol: {:?}",
protocol_type
);
let mut protocol_manager =
crate::protocol::ProtocolManager::new(protocol_type, env.clone(), &ctx).await?;
protocol_manager
.run(&env, &ctx, &mut active_blueprints)
.await?;
Err::<(), _>(Error::ClientDied)
};
let (tx_stop, rx_stop) = tokio::sync::oneshot::channel::<()>();
let shutdown_task = async move {
tokio::select! {
_res0 = shutdown_cmd => {
info!("Shutdown-1 command received, closing application");
},
_res1 = rx_stop => {
info!("Manual shutdown signal received, closing application");
}
}
};
let (start_tx, start_rx) = tokio::sync::oneshot::channel::<()>();
let combined_task = async move {
start_rx
.await
.map_err(|_err| Report::msg("Failed to receive start signal"))?;
tokio::select! {
res0 = manager_task => {
Err(Report::msg(format!("Blueprint Manager Closed Unexpectedly: {res0:?}")))
},
res1 = auth_proxy_task => {
Err(Report::msg(format!("Auth Proxy Closed Unexpectedly: {res1:?}")))
},
() = shutdown_task => {
#[cfg(feature = "vm-sandbox")]
if let Err(e) = net::nftables::cleanup_firewall(&network_interface) {
error!("Failed to cleanup nftables rules: {e}");
}
Ok(())
}
}
};
drop(_span);
let handle = tokio::spawn(combined_task);
let handle = BlueprintManagerHandle {
start_tx: Some(start_tx),
shutdown_call: Some(tx_stop),
running_task: handle,
span,
keystore_uri,
};
Ok(handle)
}
#[allow(clippy::used_underscore_binding)]
pub async fn run_blueprint_manager<F: SendFuture<'static, ()>>(
ctx: BlueprintManagerContext,
env: BlueprintEnvironment,
shutdown_cmd: F,
) -> color_eyre::Result<BlueprintManagerHandle> {
run_blueprint_manager_with_keystore(
ctx,
Keystore::new(KeystoreConfig::new().fs_root(&env.keystore_uri))?,
env,
shutdown_cmd,
)
.await
}
pub async fn run_auth_proxy(
data_dir: PathBuf,
auth_proxy_opts: AuthProxyOpts,
) -> Result<(RocksDb, impl Future<Output = Result<()>>)> {
let db_path = data_dir.join("private").join("auth-proxy").join("db");
tokio::fs::create_dir_all(&db_path).await?;
let proxy = blueprint_auth::proxy::AuthenticatedProxy::new(&db_path)?;
let db = proxy.db();
let router = proxy.router();
Ok((db, async move {
let listener = tokio::net::TcpListener::bind((
auth_proxy_opts.auth_proxy_host,
auth_proxy_opts.auth_proxy_port,
))
.await?;
info!(
"Auth proxy listening on {}:{}",
auth_proxy_opts.auth_proxy_host, auth_proxy_opts.auth_proxy_port
);
let result = axum::serve(listener, router).await;
if let Err(err) = result {
error!("Auth proxy error: {err}");
}
Ok(())
}))
}