use super::client::WebSocketRpcClient;
use super::script_actor::WebSocketScriptActor;
use crate::script_discovery::types::DiscoveredScriptActor;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct DynASBConfig {
pub api_url: String,
pub ws_url: String,
pub redis_url: String,
}
#[derive(Debug, Deserialize)]
struct DeployResponse {
success: bool,
function_id: Option<String>,
#[allow(dead_code)]
vm_id: Option<String>,
#[allow(dead_code)]
endpoint: Option<String>,
deployment_time_ms: Option<u64>,
error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynASBFunction {
pub function_id: String,
pub name: String,
pub runtime: String,
pub status: DeploymentStatus,
pub deployment_time_ms: u64,
pub vm_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum DeploymentStatus {
Deploying,
Ready,
Unhealthy,
Stopping,
Stopped,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct FunctionStatusResponse {
#[allow(dead_code)]
id: String,
#[allow(dead_code)]
name: String,
#[allow(dead_code)]
runtime: String,
state: String,
vm_id: String,
#[allow(dead_code)]
invocation_count: u64,
}
pub struct DynASBClient {
config: DynASBConfig,
http: reqwest::Client,
deployed: HashMap<String, DynASBFunction>,
}
impl DynASBClient {
pub fn new(config: DynASBConfig) -> Self {
Self {
config,
http: reqwest::Client::new(),
deployed: HashMap::new(),
}
}
pub async fn deploy(
&mut self,
name: &str,
runtime: &str,
code: &str,
handler: &str,
dependencies: Option<HashMap<String, String>>,
timeout_seconds: Option<u32>,
) -> Result<DynASBFunction> {
let body = json!({
"name": name,
"runtime": runtime,
"code": code,
"handler": handler,
"timeout_seconds": timeout_seconds.unwrap_or(300),
"dependencies": dependencies,
});
let url = format!("{}/api/v1/functions", self.config.api_url);
info!("Deploying function '{}' to dynASB", name);
let response: DeployResponse = self
.http
.post(&url)
.json(&body)
.send()
.await
.context("Failed to reach dynASB API")?
.json()
.await
.context("Failed to parse deploy response")?;
if !response.success {
return Err(anyhow::anyhow!(
"Deployment failed: {}",
response
.error
.unwrap_or_else(|| "unknown error".to_string())
));
}
let function_id = response
.function_id
.ok_or_else(|| anyhow::anyhow!("No function_id in deploy response"))?;
info!(
"Function deployed: id={}, time={}ms",
function_id,
response.deployment_time_ms.unwrap_or(0)
);
let func = DynASBFunction {
function_id: function_id.clone(),
name: name.to_string(),
runtime: runtime.to_string(),
status: DeploymentStatus::Deploying,
deployment_time_ms: response.deployment_time_ms.unwrap_or(0),
vm_id: response.vm_id,
};
self.deployed.insert(function_id.clone(), func.clone());
Ok(func)
}
pub async fn create_actor(
&self,
func: &DynASBFunction,
metadata: DiscoveredScriptActor,
) -> Result<WebSocketScriptActor> {
let ws_url = format!("{}/{}", self.config.ws_url, func.function_id);
let rpc_client = Arc::new(WebSocketRpcClient::new(ws_url));
let actor =
WebSocketScriptActor::new(metadata, rpc_client, self.config.redis_url.clone()).await;
Ok(actor)
}
pub async fn undeploy(&mut self, function_id: &str) -> Result<()> {
let url = format!(
"{}/api/v1/functions/{}/undeploy",
self.config.api_url, function_id
);
self.http
.post(&url)
.send()
.await
.context("Failed to undeploy function")?;
self.deployed.remove(function_id);
info!("Function undeployed: {}", function_id);
Ok(())
}
pub async fn undeploy_all(&mut self) -> Result<()> {
let ids: Vec<String> = self.deployed.keys().cloned().collect();
for id in ids {
let _ = self.undeploy(&id).await;
}
Ok(())
}
pub async fn health_check(&mut self, function_id: &str) -> Result<DeploymentStatus> {
let url = format!("{}/api/v1/functions/{}", self.config.api_url, function_id);
let resp: FunctionStatusResponse = self
.http
.get(&url)
.send()
.await
.context("Failed to reach dynASB health endpoint")?
.json()
.await
.context("Failed to parse function status response")?;
let status = match resp.state.as_str() {
"ready" | "running" => DeploymentStatus::Ready,
"deploying" | "starting" | "booting" => DeploymentStatus::Deploying,
"stopping" => DeploymentStatus::Stopping,
"stopped" | "removed" => DeploymentStatus::Stopped,
_ => DeploymentStatus::Unhealthy,
};
if let Some(func) = self.deployed.get_mut(function_id) {
func.status = status.clone();
if func.vm_id.is_none() && !resp.vm_id.is_empty() {
func.vm_id = Some(resp.vm_id);
}
}
Ok(status)
}
pub async fn wait_until_ready(
&mut self,
function_id: &str,
timeout: Duration,
poll_interval: Duration,
) -> Result<DeploymentStatus> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let status = self.health_check(function_id).await?;
match status {
DeploymentStatus::Ready => {
info!("Function {} is ready", function_id);
return Ok(status);
}
DeploymentStatus::Unhealthy | DeploymentStatus::Stopped => {
warn!(
"Function {} entered terminal state: {:?}",
function_id, status
);
return Err(anyhow::anyhow!(
"Function {} is {:?}, cannot become ready",
function_id,
status
));
}
DeploymentStatus::Deploying | DeploymentStatus::Stopping => {
if tokio::time::Instant::now() >= deadline {
return Err(anyhow::anyhow!(
"Timed out waiting for function {} to become ready (last status: {:?})",
function_id,
status
));
}
debug!(
"Function {} still {:?}, polling again in {:?}",
function_id, status, poll_interval
);
tokio::time::sleep(poll_interval).await;
}
}
}
}
pub fn deployment_metadata(&self, function_id: &str) -> HashMap<String, Value> {
let mut meta = HashMap::new();
if let Some(func) = self.deployed.get(function_id) {
meta.insert("dynasb.function_id".into(), json!(func.function_id));
meta.insert("dynasb.status".into(), json!(func.status));
meta.insert("dynasb.runtime".into(), json!(func.runtime));
meta.insert(
"dynasb.deployment_time_ms".into(),
json!(func.deployment_time_ms),
);
if let Some(vm_id) = &func.vm_id {
meta.insert("dynasb.vm_id".into(), json!(vm_id));
}
meta.insert("dynasb.api_url".into(), json!(self.config.api_url));
meta.insert(
"dynasb.ws_url".into(),
json!(format!("{}/{}", self.config.ws_url, func.function_id)),
);
}
meta
}
pub fn deployed_functions(&self) -> &HashMap<String, DynASBFunction> {
&self.deployed
}
}
impl Drop for DynASBClient {
fn drop(&mut self) {
if !self.deployed.is_empty() {
let ids: Vec<String> = self.deployed.keys().cloned().collect();
let api_url = self.config.api_url.clone();
let http = self.http.clone();
tokio::spawn(async move {
for id in ids {
let url = format!("{}/api/v1/functions/{}/undeploy", api_url, id);
let _ = http.post(&url).send().await;
}
});
}
}
}