use blueprint_core::{JobCall, JobResult};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum FaasError {
#[error("Function invocation failed: {0}")]
InvocationFailed(String),
#[error("Function execution timed out after {0:?}")]
Timeout(std::time::Duration),
#[error("Function error: {0}")]
FunctionError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Infrastructure error: {0}")]
InfrastructureError(String),
#[error("Cold start latency exceeded threshold: {0:?}")]
ColdStartLatency(std::time::Duration),
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
}
#[derive(Debug, Clone)]
pub struct FaasDeployment {
pub function_id: String,
pub job_id: u32,
pub endpoint: String,
pub cold_start_ms: Option<u64>,
pub memory_mb: u32,
pub timeout_secs: u32,
}
#[derive(Debug, Clone)]
pub struct FaasMetrics {
pub total_duration_ms: u64,
pub execution_duration_ms: u64,
pub cold_start: bool,
pub memory_used_mb: Option<u32>,
pub billed_duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FaasPayload {
pub job_id: u32,
#[serde(with = "serde_bytes")]
pub args: Vec<u8>,
}
impl From<JobCall> for FaasPayload {
fn from(job_call: JobCall) -> Self {
Self {
job_id: job_call.job_id().into(),
args: job_call.body().to_vec(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FaasResponse {
#[serde(with = "serde_bytes")]
pub result: Vec<u8>,
}
impl From<FaasResponse> for JobResult {
fn from(response: FaasResponse) -> Self {
JobResult::new(Bytes::from(response.result))
}
}
impl From<JobResult> for FaasResponse {
fn from(job_result: JobResult) -> Self {
match job_result.into_parts() {
Ok((_parts, body)) => Self {
result: body.to_vec(),
},
Err(_) => Self { result: Vec::new() },
}
}
}
#[async_trait::async_trait]
pub trait FaasExecutor: Send + Sync + fmt::Debug {
async fn invoke(&self, job_call: JobCall) -> Result<JobResult, FaasError>;
async fn invoke_with_metrics(
&self,
job_call: JobCall,
) -> Result<(JobResult, FaasMetrics), FaasError> {
let start = std::time::Instant::now();
let result = self.invoke(job_call).await?;
let duration = start.elapsed();
let metrics = FaasMetrics {
total_duration_ms: duration.as_millis() as u64,
execution_duration_ms: duration.as_millis() as u64,
cold_start: false,
memory_used_mb: None,
billed_duration_ms: duration.as_millis() as u64,
};
Ok((result, metrics))
}
async fn deploy_job(
&self,
job_id: u32,
binary: &[u8],
config: &FaasConfig,
) -> Result<FaasDeployment, FaasError>;
async fn health_check(&self, job_id: u32) -> Result<bool, FaasError>;
async fn warm(&self, job_id: u32) -> Result<(), FaasError> {
let _ = job_id;
Ok(())
}
async fn get_deployment(&self, job_id: u32) -> Result<FaasDeployment, FaasError>;
async fn undeploy_job(&self, job_id: u32) -> Result<(), FaasError>;
fn provider_name(&self) -> &'static str;
}
#[derive(Debug, Clone)]
pub struct FaasConfig {
pub memory_mb: u32,
pub timeout_secs: u32,
pub env_vars: std::collections::HashMap<String, String>,
pub max_concurrency: Option<u32>,
pub keep_warm: bool,
pub provider_config: Option<serde_json::Value>,
}
impl Default for FaasConfig {
fn default() -> Self {
Self {
memory_mb: 512,
timeout_secs: 300,
env_vars: std::collections::HashMap::new(),
max_concurrency: None,
keep_warm: false,
provider_config: None,
}
}
}
pub type DynFaasExecutor = Arc<dyn FaasExecutor>;
#[async_trait::async_trait]
impl<T: FaasExecutor + ?Sized> FaasExecutor for Arc<T> {
async fn invoke(&self, job_call: JobCall) -> Result<JobResult, FaasError> {
(**self).invoke(job_call).await
}
async fn invoke_with_metrics(
&self,
job_call: JobCall,
) -> Result<(JobResult, FaasMetrics), FaasError> {
(**self).invoke_with_metrics(job_call).await
}
async fn deploy_job(
&self,
job_id: u32,
binary: &[u8],
config: &FaasConfig,
) -> Result<FaasDeployment, FaasError> {
(**self).deploy_job(job_id, binary, config).await
}
async fn health_check(&self, job_id: u32) -> Result<bool, FaasError> {
(**self).health_check(job_id).await
}
async fn warm(&self, job_id: u32) -> Result<(), FaasError> {
(**self).warm(job_id).await
}
async fn get_deployment(&self, job_id: u32) -> Result<FaasDeployment, FaasError> {
(**self).get_deployment(job_id).await
}
async fn undeploy_job(&self, job_id: u32) -> Result<(), FaasError> {
(**self).undeploy_job(job_id).await
}
fn provider_name(&self) -> &'static str {
(**self).provider_name()
}
}
#[derive(Default)]
pub struct FaasRegistry {
executors: std::collections::HashMap<u32, DynFaasExecutor>,
}
impl FaasRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn register(&mut self, job_id: u32, executor: DynFaasExecutor) {
self.executors.insert(job_id, executor);
}
pub fn get(&self, job_id: u32) -> Option<&DynFaasExecutor> {
self.executors.get(&job_id)
}
pub fn is_faas_job(&self, job_id: u32) -> bool {
self.executors.contains_key(&job_id)
}
pub fn job_ids(&self) -> impl Iterator<Item = u32> + '_ {
self.executors.keys().copied()
}
}
impl fmt::Debug for FaasRegistry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FaasRegistry")
.field("job_count", &self.executors.len())
.field("job_ids", &self.executors.keys().collect::<Vec<_>>())
.finish()
}
}