use super::*;
use blueprint_core::{JobCall, JobResult};
use reqwest::Client;
use std::collections::HashMap;
use std::time::Instant;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct HttpFaasExecutor {
base_url: String,
client: Client,
job_endpoints: HashMap<u32, String>,
}
impl HttpFaasExecutor {
pub fn new(base_url: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
client: Client::new(),
job_endpoints: HashMap::new(),
}
}
#[must_use]
pub fn with_job_endpoint(mut self, job_id: u32, endpoint: impl Into<String>) -> Self {
self.job_endpoints.insert(job_id, endpoint.into());
self
}
fn endpoint(&self, job_id: u32) -> String {
self.job_endpoints
.get(&job_id)
.cloned()
.unwrap_or_else(|| format!("{}/job/{}", self.base_url, job_id))
}
}
#[async_trait::async_trait]
impl FaasExecutor for HttpFaasExecutor {
async fn invoke(&self, job_call: JobCall) -> Result<JobResult, FaasError> {
let job_id: u32 = job_call.job_id().into();
let endpoint = self.endpoint(job_id);
debug!(
job_id = job_id,
endpoint = %endpoint,
"Invoking HTTP FaaS function"
);
let payload: super::FaasPayload = job_call.into();
let start = Instant::now();
let response = self
.client
.post(&endpoint)
.json(&payload)
.send()
.await
.map_err(|e| {
warn!(error = %e, "HTTP FaaS invocation failed");
FaasError::InvocationFailed(e.to_string())
})?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(FaasError::FunctionError(format!(
"HTTP {} - {}",
status, body
)));
}
let faas_response: super::FaasResponse = response
.json()
.await
.map_err(|e| FaasError::SerializationError(e.to_string()))?;
let duration = start.elapsed();
info!(
job_id = job_id,
duration_ms = duration.as_millis(),
"HTTP FaaS invocation successful"
);
Ok(faas_response.into())
}
async fn deploy_job(
&self,
_job_id: u32,
_binary: &[u8],
_config: &FaasConfig,
) -> Result<FaasDeployment, FaasError> {
Err(FaasError::InfrastructureError(
"Custom HTTP FaaS does not support automated deployment. \
Deploy your function manually and register its endpoint."
.into(),
))
}
async fn health_check(&self, job_id: u32) -> Result<bool, FaasError> {
let endpoint = self.endpoint(job_id);
debug!(endpoint = %endpoint, "Checking HTTP FaaS health");
self.client
.head(&endpoint)
.send()
.await
.map(|r| r.status().is_success())
.map_err(|e| FaasError::InfrastructureError(format!("Health check failed: {}", e)))
}
async fn get_deployment(&self, job_id: u32) -> Result<FaasDeployment, FaasError> {
Ok(FaasDeployment {
function_id: format!("http-job-{}", job_id),
job_id,
endpoint: self.endpoint(job_id),
cold_start_ms: None,
memory_mb: 0, timeout_secs: 0, })
}
async fn undeploy_job(&self, _job_id: u32) -> Result<(), FaasError> {
Ok(())
}
fn provider_name(&self) -> &'static str {
"Custom HTTP FaaS"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_endpoint_generation() {
let executor = HttpFaasExecutor::new("https://faas.example.com");
assert_eq!(executor.endpoint(0), "https://faas.example.com/job/0");
assert_eq!(executor.endpoint(5), "https://faas.example.com/job/5");
}
#[test]
fn test_custom_endpoint() {
let executor = HttpFaasExecutor::new("https://faas.example.com")
.with_job_endpoint(0, "https://custom.example.com/square");
assert_eq!(executor.endpoint(0), "https://custom.example.com/square");
assert_eq!(executor.endpoint(1), "https://faas.example.com/job/1");
}
#[tokio::test]
#[ignore = "Requires running HTTP server"]
async fn test_http_invocation() {
}
}