#![doc = document_features::document_features!()]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
extern crate alloc;
pub mod config;
pub mod error;
pub mod faas;
pub mod metrics_server;
#[cfg(feature = "eigenlayer")]
pub mod eigenlayer;
#[cfg(feature = "symbiotic")]
mod symbiotic;
#[cfg(feature = "tangle")]
pub mod tangle;
use crate::error::RunnerError;
use crate::error::{JobCallError, ProducerError};
use blueprint_core::error::BoxError;
use blueprint_core::metadata::{MetadataMap, MetadataValue};
use blueprint_core::{JobCall, JobResult};
use blueprint_qos::heartbeat::HeartbeatConsumer;
use blueprint_router::Router;
use config::BlueprintEnvironment;
use core::convert::TryFrom;
use core::future::{self, poll_fn};
use core::pin::Pin;
use error::RunnerError as Error;
use futures::{Future, Sink};
use futures_core::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::{SinkExt, StreamExt, TryStreamExt, stream};
use std::io;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::sync::{Mutex, oneshot};
use tokio::time::sleep;
use tower::Service;
#[dynosaur::dynosaur(DynBlueprintConfig)]
pub trait BlueprintConfig: Send + Sync {
fn register(
&self,
env: &BlueprintEnvironment,
) -> impl Future<Output = Result<(), Error>> + Send {
let _ = env;
async { Ok(()) }
}
fn requires_registration(
&self,
env: &BlueprintEnvironment,
) -> impl Future<Output = Result<bool, Error>> + Send {
let _ = env;
async { Ok(true) }
}
fn update_registration_inputs(&self, _inputs: Vec<u8>) -> Result<(), Error> {
let _ = _inputs;
Ok(())
}
fn should_exit_after_registration(&self) -> bool {
true }
}
unsafe impl Send for DynBlueprintConfig<'_> {}
unsafe impl Sync for DynBlueprintConfig<'_> {}
impl BlueprintConfig for () {}
#[cfg(feature = "tls")]
fn resolve_service_id(env: &BlueprintEnvironment) -> Result<u64, crate::error::ConfigError> {
match env.protocol_settings.tangle() {
Ok(settings) => settings
.service_id
.ok_or(crate::error::ConfigError::MissingServiceId),
Err(_) => Err(crate::error::ConfigError::MissingServiceId),
}
}
#[dynosaur::dynosaur(DynBackgroundService)]
pub trait BackgroundService: Send + Sync {
fn start(
&self,
) -> impl Future<Output = Result<oneshot::Receiver<Result<(), Error>>, Error>> + Send;
}
unsafe impl Send for DynBackgroundService<'_> {}
unsafe impl Sync for DynBackgroundService<'_> {}
type Producer =
Arc<Mutex<Box<dyn Stream<Item = Result<JobCall, BoxError>> + Send + Unpin + 'static>>>;
type Consumer = Arc<Mutex<Box<dyn Sink<JobResult, Error = BoxError> + Send + Unpin + 'static>>>;
pub struct BlueprintRunnerBuilder<F> {
config: Box<DynBlueprintConfig<'static>>,
env: BlueprintEnvironment,
producers: Vec<Producer>,
consumers: Vec<Consumer>,
router: Option<Router>,
background_services: Vec<Box<DynBackgroundService<'static>>>,
shutdown_handler: F,
faas_registry: faas::FaasRegistry,
}
impl<F> BlueprintRunnerBuilder<F>
where
F: Future<Output = ()> + Send + 'static,
{
#[must_use]
pub fn router(mut self, router: Router) -> Self {
self.router = Some(router);
self
}
#[must_use]
pub fn producer<E>(
mut self,
producer: impl Stream<Item = Result<JobCall, E>> + Send + Unpin + 'static,
) -> Self
where
E: Into<BoxError> + 'static,
{
let producer: Producer = Arc::new(Mutex::new(Box::new(producer.map_err(Into::into))));
self.producers.push(producer);
self
}
#[must_use]
pub fn consumer<E>(
mut self,
consumer: impl Sink<JobResult, Error = E> + Send + Unpin + 'static,
) -> Self
where
E: Into<BoxError> + 'static,
{
let consumer: Consumer = Arc::new(Mutex::new(Box::new(consumer.sink_map_err(Into::into))));
self.consumers.push(consumer);
self
}
#[must_use]
pub fn heartbeat_service<C: HeartbeatConsumer + Send + Sync + 'static>(
mut self,
service: blueprint_qos::heartbeat::HeartbeatService<C>,
) -> Self {
#[derive(Clone)]
struct HeartbeatServiceAdapter<C: HeartbeatConsumer + Send + Sync + 'static> {
service: blueprint_qos::heartbeat::HeartbeatService<C>,
}
impl<C: HeartbeatConsumer + Send + Sync + 'static> BackgroundService
for HeartbeatServiceAdapter<C>
{
async fn start(&self) -> Result<oneshot::Receiver<Result<(), Error>>, Error> {
self.service
.start_heartbeat()
.await
.map_err(|e| RunnerError::Other(e.into()))?;
let (_tx, rx) = oneshot::channel();
Ok(rx)
}
}
let adapter = HeartbeatServiceAdapter { service };
self.background_services
.push(DynBackgroundService::boxed(adapter));
self
}
#[must_use]
pub fn metrics_server(
mut self,
server: Arc<blueprint_qos::servers::prometheus::PrometheusServer>,
) -> Self {
let adapter = self::metrics_server::MetricsServerAdapter::new(server);
self.background_services
.push(DynBackgroundService::boxed(adapter));
self
}
#[must_use]
pub fn qos_service<C: HeartbeatConsumer + Send + Sync + 'static>(
mut self,
config: blueprint_qos::QoSConfig,
heartbeat_consumer: Option<Arc<C>>,
) -> Self {
struct QoSServiceAdapter<C: HeartbeatConsumer + Send + Sync + 'static> {
qos_service: Arc<Mutex<Option<blueprint_qos::QoSService<C>>>>,
ready: Arc<tokio::sync::Notify>,
}
impl<C: HeartbeatConsumer + Send + Sync + 'static> BackgroundService for QoSServiceAdapter<C> {
async fn start(
&self,
) -> Result<
tokio::sync::oneshot::Receiver<Result<(), crate::error::RunnerError>>,
crate::error::RunnerError,
> {
let (tx, rx) = tokio::sync::oneshot::channel();
let qos_arc_for_adapter_task = self.qos_service.clone();
let ready = self.ready.clone();
tokio::spawn(async move {
blueprint_core::debug!(target: "blueprint-runner", "QoS Adapter Task (Task 2): Waiting for QoS Service build...");
let timeout = std::time::Duration::from_secs(30);
if tokio::time::timeout(timeout, ready.notified())
.await
.is_err()
{
let err_msg = "QoS Adapter Task (Task 2): QoS service did not initialize (build task may have failed or timed out).";
blueprint_core::error!(target: "blueprint-runner", "{}", err_msg);
let _ = tx.send(Err(crate::error::RunnerError::Other(Box::new(
std::io::Error::other(err_msg),
))));
return;
}
let mut service_guard = qos_arc_for_adapter_task.lock().await;
if service_guard.is_none() {
let err_msg = "QoS Adapter Task (Task 2): Notified but QoS service is None (build failed).";
blueprint_core::error!(target: "blueprint-runner", "{}", err_msg);
let _ = tx.send(Err(crate::error::RunnerError::Other(Box::new(
std::io::Error::other(err_msg),
))));
return;
}
blueprint_core::debug!(target: "blueprint-runner", "QoS Adapter Task (Task 2): QoS Service is built. Proceeding to start components.");
let qos_service_mut = service_guard
.as_mut()
.expect("QoS service was Some but now None, this should not happen");
if let Some(hb_service) = qos_service_mut.heartbeat_service() {
blueprint_core::debug!(target: "blueprint-runner", "QoS Adapter Task (Task 2): Starting heartbeat service...");
if let Err(e) = hb_service.start_heartbeat().await {
let err_msg = format!(
"QoS Adapter Task (Task 2): Heartbeat service failed to start: {:?}",
e
);
blueprint_core::error!(target: "blueprint-runner", "{}", err_msg);
let _ = tx.send(Err(crate::error::RunnerError::Other(Box::new(
std::io::Error::other(err_msg),
))));
return;
}
blueprint_core::info!(target: "blueprint-runner", "QoS Adapter Task (Task 2): Heartbeat service started.");
} else {
blueprint_core::debug!(target: "blueprint-runner", "QoS Adapter Task (Task 2): No heartbeat service configured in QoSService.");
}
blueprint_core::info!(target: "blueprint-runner", "QoS Adapter Task (Task 2): QoS Service components initialized successfully.");
let _ = tx.send(Ok(()));
});
Ok(rx)
}
}
let qos_service_arc = Arc::new(Mutex::new(None::<blueprint_qos::QoSService<C>>));
let qos_ready = Arc::new(tokio::sync::Notify::new());
let builder_task_qos_arc = qos_service_arc.clone();
let builder_task_ready = qos_ready.clone();
let http_rpc_endpoint = self.env.http_rpc_endpoint.to_string();
let keystore_uri = self.env.keystore_uri.clone();
#[cfg(feature = "tangle")]
let status_registry_address = self
.env
.protocol_settings
.tangle()
.map(|settings| settings.status_registry_contract)
.ok();
#[cfg(not(feature = "tangle"))]
let status_registry_address = None;
tokio::spawn(async move {
blueprint_core::debug!(target: "blueprint-runner", "QoS Builder Task (Task 1): Initializing QoS Service...");
let mut builder = blueprint_qos::QoSServiceBuilder::<C>::new()
.with_config(config)
.manage_servers(true)
.with_dry_run(self.env.dry_run);
builder = builder.with_http_rpc_endpoint(http_rpc_endpoint);
builder = builder.with_keystore_uri(keystore_uri);
if let Some(address) = status_registry_address {
builder = builder.with_status_registry_address(address);
}
if let Some(consumer) = heartbeat_consumer {
builder = builder.with_heartbeat_consumer(consumer);
}
match builder.build().await {
Ok(service) => {
let mut guard = builder_task_qos_arc.lock().await;
*guard = Some(service);
drop(guard);
builder_task_ready.notify_one();
blueprint_core::info!(target: "blueprint-runner", "QoS Builder Task (Task 1): QoS Service built and stored.");
}
Err(e) => {
blueprint_core::error!(target: "blueprint-runner", "QoS Builder Task (Task 1): Failed to build QoS service: {:?}", e);
builder_task_ready.notify_one();
}
}
});
let adapter = QoSServiceAdapter::<C> {
qos_service: qos_service_arc,
ready: qos_ready,
};
self.background_services
.push(DynBackgroundService::boxed(adapter));
self
}
#[cfg(feature = "tee")]
#[must_use]
pub fn tee(mut self, config: blueprint_tee::TeeConfig) -> Self {
if config.is_enabled() {
struct TeeAuthServiceAdapter {
auth_service: blueprint_tee::TeeAuthService,
}
impl BackgroundService for TeeAuthServiceAdapter {
async fn start(&self) -> Result<oneshot::Receiver<Result<(), Error>>, Error> {
let (tx, rx) = oneshot::channel();
self.auth_service.start_cleanup_loop();
tokio::spawn(async move {
tracing::info!("TEE auth service started");
let _ = tx.send(Ok(()));
});
Ok(rx)
}
}
let auth_service = blueprint_tee::TeeAuthService::new(config.key_exchange.clone());
let adapter = TeeAuthServiceAdapter { auth_service };
self.background_services
.push(DynBackgroundService::boxed(adapter));
tracing::info!(
mode = ?config.mode,
requirement = ?config.requirement,
"TEE support enabled"
);
}
self
}
#[must_use]
pub fn background_service(mut self, service: impl BackgroundService + 'static) -> Self {
self.background_services
.push(DynBackgroundService::boxed(service));
self
}
#[must_use]
pub fn with_faas_executor(
mut self,
job_id: u32,
executor: impl faas::FaasExecutor + 'static,
) -> Self {
self.faas_registry.register(job_id, Arc::new(executor));
self
}
pub fn with_shutdown_handler<F2>(self, handler: F2) -> BlueprintRunnerBuilder<F2>
where
F2: Future<Output = ()> + Send + 'static,
{
BlueprintRunnerBuilder {
config: self.config,
env: self.env,
producers: self.producers,
consumers: self.consumers,
router: self.router,
background_services: self.background_services,
shutdown_handler: handler,
faas_registry: self.faas_registry,
}
}
pub async fn run(self) -> Result<(), Error> {
let Some(router) = self.router else {
return Err(Error::NoRouter);
};
if self.producers.is_empty() {
return Err(Error::NoProducers);
}
let runner = FinalizedBlueprintRunner {
config: self.config,
producers: self.producers,
consumers: self.consumers,
router,
env: self.env,
background_services: self.background_services,
shutdown_handler: self.shutdown_handler,
faas_registry: self.faas_registry,
};
runner.run().await
}
}
pub struct BlueprintRunner {}
impl BlueprintRunner {
pub fn builder<Conf: BlueprintConfig + 'static>(
config: Conf,
env: BlueprintEnvironment,
) -> BlueprintRunnerBuilder<impl Future<Output = ()> + Send + 'static> {
BlueprintRunnerBuilder {
config: DynBlueprintConfig::boxed(config),
env,
producers: Vec::new(),
consumers: Vec::new(),
router: None,
background_services: Vec::new(),
shutdown_handler: future::pending(),
faas_registry: faas::FaasRegistry::new(),
}
}
}
struct FinalizedBlueprintRunner<F> {
config: Box<DynBlueprintConfig<'static>>,
producers: Vec<Producer>,
consumers: Vec<Consumer>,
router: Router,
env: BlueprintEnvironment,
background_services: Vec<Box<DynBackgroundService<'static>>>,
shutdown_handler: F,
faas_registry: faas::FaasRegistry,
}
impl<F> FinalizedBlueprintRunner<F>
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(trivial_casts)]
async fn run(self) -> Result<(), Error> {
let FinalizedBlueprintRunner {
config,
producers,
mut consumers,
mut router,
env,
background_services,
shutdown_handler,
faas_registry,
} = self;
let needs_registration = config.requires_registration(&env).await?;
let skip_registration = env.dry_run;
if env.registration_mode() {
let inputs = capture_registration_inputs(&env).await?;
if !inputs.is_empty() {
config.update_registration_inputs(inputs)?;
}
if env.registration_capture_only() {
return Ok(());
}
if needs_registration && !skip_registration {
config.register(&env).await?;
} else if needs_registration && skip_registration {
blueprint_core::info!(
target: "blueprint-runner",
"Dry run enabled; skipping operator registration"
);
}
if config.should_exit_after_registration() {
return Ok(());
}
} else if needs_registration && !skip_registration {
config.register(&env).await?;
if config.should_exit_after_registration() {
return Ok(());
}
} else if needs_registration && skip_registration {
blueprint_core::info!(
target: "blueprint-runner",
"Dry run enabled; skipping operator registration"
);
}
let mut router = router.as_service();
let has_background_services = !background_services.is_empty();
let mut background_futures = Vec::with_capacity(background_services.len());
for service_box in &background_services {
let receiver = service_box.start().await?;
background_futures.push(Box::pin(async move {
receiver
.await
.map_err(|e| Error::BackgroundService(e.to_string()))
.and(Ok(()))
})
as Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>);
}
let (mut shutdown_tx, shutdown_rx) = oneshot::channel();
tokio::spawn(async move {
let _ = shutdown_rx.await;
blueprint_core::info!(target: "blueprint-runner", "Received graceful shutdown signal. Calling shutdown handler");
shutdown_handler.await;
});
poll_fn(|ctx| router.poll_ready(ctx)).await.unwrap_or(());
let producers = producers.into_iter().map(|producer| {
futures::stream::unfold(producer, |producer| async move {
let result;
{
let mut guard = producer.lock().await;
result = guard.next().await;
}
result.map(|job_call| (job_call, producer))
})
.boxed()
});
let mut producer_stream = futures::stream::select_all(producers);
let mut background_services = if background_futures.is_empty() {
futures::future::select_all(vec![Box::pin(futures::future::ready(Ok(())))
as Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>])
} else {
futures::future::select_all(background_futures)
};
let mut pending_jobs = FuturesUnordered::new();
#[allow(unused_variables)]
let bridge = if env.test_mode {
blueprint_core::debug!(
target: "blueprint-runner",
"Test mode enabled; skipping bridge connection"
);
None
} else {
let bridge = env.bridge().await.map_err(|e| {
blueprint_core::error!(
"[FATAL] Unable to establish bridge connection, aborting runner: {e}"
);
e
})?;
bridge.ping().await.map_err(|e| {
blueprint_core::error!(
"[FATAL] Unable to establish bridge connection, aborting runner: {e}"
);
e
})?;
Some(bridge)
};
#[cfg(feature = "tls")]
if let Some(tls_profile) = &env.tls_profile {
blueprint_core::info!(
target: "blueprint-runner",
"Updating service TLS profile"
);
let service_id = resolve_service_id(&env).inspect_err(|err| {
blueprint_core::error!(
target: "blueprint-runner",
error = ?err,
"TLS profile provided but service ID is missing from configuration"
);
})?;
if let Some(bridge) = bridge.as_ref() {
bridge
.update_blueprint_service_tls_profile(service_id, Some(tls_profile.clone()))
.await
.map_err(|e| {
blueprint_core::error!(
target: "blueprint-runner",
service_id,
"[FATAL] Failed to update TLS profile for service {service_id}: {e}"
);
e
})?;
blueprint_core::info!(
target: "blueprint-runner",
service_id,
"TLS profile updated successfully"
);
} else {
blueprint_core::warn!(
target: "blueprint-runner",
"TLS profile provided but bridge unavailable; skipping update"
);
}
}
const MAX_CONCURRENT_JOBS: usize = 1024;
loop {
tokio::select! {
producer_result = producer_stream.next(), if pending_jobs.len() < MAX_CONCURRENT_JOBS => {
match producer_result {
Some(Ok(job_call)) => {
let block_number =
read_metadata_u64(job_call.metadata(), BLOCK_NUMBER_METADATA_KEYS);
let service_id =
read_metadata_u64(job_call.metadata(), SERVICE_ID_METADATA_KEYS);
blueprint_core::info!(
target: "blueprint-runner",
job_id = ?job_call.job_id(),
block_number = ?block_number,
service_id = ?service_id,
"Received job call from producer stream"
);
let job_id: u32 = job_call.job_id().into();
if faas_registry.is_faas_job(job_id) {
let executor = faas_registry.get(job_id)
.expect("FaaS executor exists for registered job ID")
.clone();
blueprint_core::info!(
target: "blueprint-runner",
job_id = %job_call.job_id(),
provider = executor.provider_name(),
"Delegating job to FaaS executor"
);
pending_jobs.push(tokio::task::spawn(async move {
match executor.invoke(job_call).await {
Ok(result) => Ok(Some(vec![result])),
Err(e) => {
blueprint_core::error!(
target: "blueprint-runner",
error = %e,
"FaaS invocation failed"
);
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
}
}));
} else {
pending_jobs.push(tokio::task::spawn(router.call(job_call)));
}
},
Some(Err(e)) => {
blueprint_core::error!(target: "blueprint-runner", "Producer error: {:?}", e);
let _ = shutdown_tx.send(true);
return Err(ProducerError::Failed(e).into());
},
None => {
blueprint_core::error!(target: "blueprint-runner", "Producer stream ended unexpectedly");
let _ = shutdown_tx.send(true);
return Err(ProducerError::StreamEnded.into());
}
}
},
Some(job_result) = pending_jobs.next() => {
match job_result {
Ok(Ok(Some(results))) => {
blueprint_core::trace!(
target: "blueprint-runner",
count = %results.len(),
"Job call(s) processed by router"
);
let result_stream = stream::iter(results.into_iter().map(Ok));
let send_futures = consumers.iter_mut().map(|consumer| {
let mut stream_clone = result_stream.clone();
async move {
let mut guard = consumer.lock().await;
guard.send_all(&mut stream_clone).await
}
});
let result = futures::future::try_join_all(send_futures).await;
blueprint_core::trace!(
target: "blueprint-runner",
results = ?result.as_ref().map(|_| "success"),
"Job call results were broadcast to consumers"
);
if let Err(e) = result {
let _ = shutdown_tx.send(true);
return Err(Error::Consumer(e));
}
},
Ok(Ok(None)) => {
blueprint_core::debug!(target: "blueprint-runner", "Job call was ignored by router");
},
Ok(Err(e)) => {
blueprint_core::error!(target: "blueprint-runner", "Job call task failed: {:?}", e);
return Err(JobCallError::JobFailed(e).into());
},
Err(e) => {
blueprint_core::error!(target: "blueprint-runner", "Job call failed: {:?}", e);
let _ = shutdown_tx.send(true);
return Err(JobCallError::JobDidntFinish(e).into());
},
}
}
result = &mut background_services => {
let (result, _, remaining_background_services) = result;
match result {
Ok(()) => {
if has_background_services {
blueprint_core::warn!(target: "blueprint-runner", "A background service has finished running");
}
},
Err(e) => {
blueprint_core::error!(target: "blueprint-runner", "A background service failed: {:?}", e);
let _ = shutdown_tx.send(true);
return Err(e);
}
}
if remaining_background_services.is_empty() {
if has_background_services {
blueprint_core::warn!(target: "blueprint-runner", "All background services have ended");
}
continue;
}
background_services = futures::future::select_all(remaining_background_services);
}
() = shutdown_tx.closed() => {
break;
}
}
}
Ok(())
}
}
const REGISTRATION_INPUT_TIMEOUT: Duration = Duration::from_secs(300);
async fn capture_registration_inputs(env: &BlueprintEnvironment) -> Result<Vec<u8>, Error> {
let output_path = env.registration_output_path();
if let Some(parent) = output_path.parent() {
fs::create_dir_all(parent).await?;
}
blueprint_core::info!(
target: "blueprint-runner",
path = %output_path.display(),
"Waiting for blueprint registration inputs"
);
let mut elapsed = Duration::from_secs(0);
loop {
match fs::read(&output_path).await {
Ok(bytes) if !bytes.is_empty() => {
blueprint_core::info!(
target: "blueprint-runner",
path = %output_path.display(),
length = bytes.len(),
"Captured blueprint registration payload"
);
return Ok(bytes);
}
Ok(_) => {
blueprint_core::debug!(
target: "blueprint-runner",
path = %output_path.display(),
"Registration file empty, waiting for payload"
);
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
if elapsed >= REGISTRATION_INPUT_TIMEOUT {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
format!(
"Timed out waiting for registration inputs at {}",
output_path.display()
),
)
.into());
}
sleep(Duration::from_millis(250)).await;
elapsed += Duration::from_millis(250);
}
}
const SERVICE_ID_METADATA_KEYS: &[&str] = &["tangle.service_id", "X-TANGLE-SERVICE-ID"];
const BLOCK_NUMBER_METADATA_KEYS: &[&str] = &["tangle.block_number", "X-TANGLE-BLOCK-NUMBER"];
fn read_metadata_u64(metadata: &MetadataMap<MetadataValue>, keys: &[&str]) -> Option<u64> {
keys.iter().find_map(|key| {
metadata
.get(key)
.and_then(|value| u64::try_from(value).ok())
})
}