use crate::keystore::{
KNOWN_PROVIDER_KEYS, Keystore, is_sensitive_key, mask_value, migrate_project_keys,
};
use crate::schema::{DeployManifest, ProjectMeta, ProjectSchema};
use crate::server::events::ResumeEvent;
use crate::server::graph_runner::{INTERRUPTED_SESSIONS, deserialize_interrupt_response};
use crate::server::sse::send_resume_response;
use crate::server::state::AppState;
use adk_core::SessionId;
use adk_deploy::{
BundleBuilder, DeployClient, DeployClientConfig,
DeploymentManifest as PlatformDeploymentManifest, DeploymentRecord, EnvVarSpec,
InteractionConfig, LoginRequest as DeployLoginRequest, ManualInteractionConfig,
PushDeploymentRequest, SecretRef as DeploySecretRef, SecretSetRequest, SourceInfo,
TriggerInteractionConfig, TriggerKind,
};
use axum::{
Json,
extract::{Path, Query, State},
http::{HeaderMap, StatusCode},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path as StdPath;
use uuid::Uuid;
#[derive(Serialize)]
pub struct ApiError {
pub error: String,
}
impl ApiError {
pub fn new(msg: impl Into<String>) -> Self {
Self { error: msg.into() }
}
}
type ApiResult<T> = Result<Json<T>, (StatusCode, Json<ApiError>)>;
fn err(status: StatusCode, msg: impl Into<String>) -> (StatusCode, Json<ApiError>) {
(status, Json(ApiError::new(msg)))
}
pub async fn list_projects(State(state): State<AppState>) -> ApiResult<Vec<ProjectMeta>> {
let storage = state.storage.read().await;
storage
.list()
.await
.map(Json)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
}
#[derive(Deserialize)]
pub struct CreateProjectRequest {
pub name: String,
#[serde(default)]
pub description: String,
}
pub async fn create_project(
State(state): State<AppState>,
Json(req): Json<CreateProjectRequest>,
) -> ApiResult<ProjectSchema> {
let mut project = ProjectSchema::new(&req.name);
project.description = req.description;
let storage = state.storage.read().await;
storage
.save(&project)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(project))
}
pub async fn get_project(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<ProjectSchema> {
let storage = state.storage.read().await;
let mut project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
if let Ok(keystore) = Keystore::new(storage.base_dir(), id) {
let _ = migrate_project_keys(&storage, &keystore, &mut project).await;
}
Ok(Json(project))
}
pub async fn update_project(
State(state): State<AppState>,
Path(id): Path<Uuid>,
Json(mut project): Json<ProjectSchema>,
) -> ApiResult<ProjectSchema> {
let storage = state.storage.read().await;
if !storage.exists(id).await {
return Err(err(StatusCode::NOT_FOUND, "Project not found"));
}
project.id = id;
project.updated_at = chrono::Utc::now();
project
.settings
.env_vars
.retain(|name, _| !is_sensitive_key(name));
storage
.save(&project)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(project))
}
pub async fn delete_project(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<StatusCode, (StatusCode, Json<ApiError>)> {
let storage = state.storage.read().await;
storage
.delete(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Deserialize)]
#[allow(dead_code)]
pub struct RunRequest {
pub input: String,
}
#[derive(Serialize)]
pub struct RunResponse {
pub output: String,
}
pub async fn run_project(
State(_state): State<AppState>,
Path(_id): Path<Uuid>,
Json(_req): Json<RunRequest>,
) -> ApiResult<RunResponse> {
Err(err(
StatusCode::BAD_REQUEST,
"Runtime execution removed. Use 'Build' then run via console with the compiled binary.",
))
}
pub async fn clear_session(
Path(id): Path<Uuid>,
) -> Result<StatusCode, (StatusCode, Json<ApiError>)> {
let _ = id;
Ok(StatusCode::NO_CONTENT)
}
pub async fn compile_project(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<crate::codegen::GeneratedProject> {
let storage = state.storage.read().await;
let project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let generated = crate::codegen::generate_rust_project(&project)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(generated))
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DeployRequest {
#[serde(default)]
pub spatial_os_url: Option<String>,
#[serde(default)]
pub register: Option<bool>,
#[serde(default)]
pub open_spatial_os: Option<bool>,
#[serde(default)]
pub control_plane_url: Option<String>,
#[serde(default)]
pub push_to_deployment_platform: Option<bool>,
#[serde(default)]
pub deployment_environment: Option<String>,
#[serde(default)]
pub open_deployment_console: Option<bool>,
#[serde(default)]
pub workspace_id: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SpatialRegistrationResult {
pub attempted: bool,
pub success: bool,
pub message: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DeployResponse {
pub success: bool,
pub manifest: DeployManifest,
pub manifest_path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub deployment_manifest_path: Option<String>,
pub spatial_os_url: String,
pub registration: SpatialRegistrationResult,
#[serde(skip_serializing_if = "Option::is_none")]
pub deployment_platform_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deployment_console_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deployment_environment: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deployment: Option<DeploymentRecord>,
pub open_url: Option<String>,
}
#[derive(Debug, Deserialize)]
struct SpatialOsRegisterResponse {
#[allow(dead_code)]
ok: bool,
#[allow(dead_code)]
created: bool,
#[allow(dead_code)]
app_id: String,
message: String,
}
fn normalize_spatial_os_url(candidate: Option<String>) -> String {
candidate
.or_else(|| std::env::var("ADK_SPATIAL_OS_URL").ok())
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "http://127.0.0.1:8199".to_string())
.trim_end_matches('/')
.to_string()
}
fn normalize_control_plane_url(candidate: Option<String>) -> String {
candidate
.or_else(|| std::env::var("ADK_DEPLOY_SERVER_URL").ok())
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "http://127.0.0.1:8090".to_string())
.trim_end_matches('/')
.to_string()
}
fn normalize_deployment_console_url(candidate: Option<String>) -> String {
candidate
.or_else(|| std::env::var("ADK_DEPLOY_CONSOLE_URL").ok())
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "http://127.0.0.1:8091".to_string())
.trim_end_matches('/')
.to_string()
}
async fn register_with_spatial_os(
spatial_os_url: &str,
manifest: &DeployManifest,
) -> Result<SpatialOsRegisterResponse, String> {
let endpoint = format!("{spatial_os_url}/api/os/apps/register");
let payload = serde_json::json!({
"manifest": manifest.app.clone(),
"source": "adk_studio",
"source_project_id": manifest.source.project_id.clone(),
});
let response = reqwest::Client::new()
.post(endpoint)
.json(&payload)
.send()
.await
.map_err(|error| format!("failed to reach Spatial OS: {error}"))?;
let status = response.status();
if !status.is_success() {
let body = response
.text()
.await
.unwrap_or_else(|_| "unknown error".to_string());
return Err(format!(
"Spatial OS rejected registration ({status}): {body}"
));
}
response
.json::<SpatialOsRegisterResponse>()
.await
.map_err(|error| format!("invalid response from Spatial OS: {error}"))
}
pub async fn deploy_project(
State(state): State<AppState>,
Path(id): Path<Uuid>,
Json(req): Json<DeployRequest>,
) -> ApiResult<DeployResponse> {
let storage = state.storage.read().await;
let project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let base_dir = storage.base_dir().to_path_buf();
drop(storage);
let manifest = DeployManifest::from_project(&project);
let deploy_dir = base_dir.join("deploy").join(id.to_string());
let runtime_dir = deploy_dir.join("runtime");
tokio::fs::create_dir_all(&deploy_dir)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
tokio::fs::create_dir_all(&runtime_dir)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let manifest_path = deploy_dir.join("deploy_manifest.json");
let manifest_payload = serde_json::to_string_pretty(&manifest)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
tokio::fs::write(&manifest_path, manifest_payload)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let generated = crate::codegen::generate_rust_project(&project)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
write_generated_project(&runtime_dir, &generated)
.map_err(|message| err(StatusCode::INTERNAL_SERVER_ERROR, message))?;
let project_keys = load_project_keys(&base_dir, id)
.await
.map_err(|message| err(StatusCode::INTERNAL_SERVER_ERROR, message))?;
let deployment_manifest = deployment_manifest_from_project(&project, &project_keys);
let deployment_manifest_path = runtime_dir.join("adk-deploy.toml");
let deployment_manifest_payload = deployment_manifest
.to_toml_string()
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
tokio::fs::write(&deployment_manifest_path, deployment_manifest_payload)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let spatial_os_url = normalize_spatial_os_url(req.spatial_os_url);
let control_plane_url = normalize_control_plane_url(req.control_plane_url.clone());
let deployment_console_url = normalize_deployment_console_url(None);
let should_register = req.register.unwrap_or(true);
let should_push = req.push_to_deployment_platform.unwrap_or(true);
let deployment_environment = req
.deployment_environment
.clone()
.unwrap_or_else(|| "staging".to_string());
let registration = if should_register {
match register_with_spatial_os(&spatial_os_url, &manifest).await {
Ok(result) => SpatialRegistrationResult {
attempted: true,
success: true,
message: result.message,
},
Err(message) => SpatialRegistrationResult {
attempted: true,
success: false,
message,
},
}
} else {
SpatialRegistrationResult {
attempted: false,
success: false,
message: "registration skipped by request".to_string(),
}
};
let deployment = if should_push {
let artifact = BundleBuilder::new(&deployment_manifest_path, deployment_manifest.clone())
.build()
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let mut client = DeployClient::new(DeployClientConfig {
endpoint: control_plane_url.clone(),
token: None,
workspace_id: req.workspace_id.clone(),
});
let login = client
.login_ephemeral(&DeployLoginRequest {
email: "studio@adk.local".to_string(),
workspace_name: Some("Default Workspace".to_string()),
})
.await
.map_err(|e| err(StatusCode::BAD_GATEWAY, e.to_string()))?;
for (key, value) in &project_keys {
client
.set_secret(&SecretSetRequest {
environment: deployment_environment.clone(),
key: key.clone(),
value: value.clone(),
})
.await
.map_err(|e| err(StatusCode::BAD_GATEWAY, e.to_string()))?;
}
let response = client
.push_deployment(&PushDeploymentRequest {
workspace_id: req.workspace_id.clone().or(Some(login.workspace_id)),
environment: deployment_environment.clone(),
manifest: deployment_manifest,
bundle_path: artifact.bundle_path.to_string_lossy().to_string(),
checksum_sha256: artifact.checksum_sha256,
binary_path: Some(artifact.binary_path.to_string_lossy().to_string()),
})
.await
.map_err(|e| err(StatusCode::BAD_GATEWAY, e.to_string()))?;
Some(response.deployment)
} else {
None
};
let open_url = if req.open_deployment_console.unwrap_or(true) && deployment.is_some() {
Some(deployment_console_url.clone())
} else if req.open_spatial_os.unwrap_or(false) {
Some(spatial_os_url.clone())
} else {
None
};
Ok(Json(DeployResponse {
success: true,
manifest,
manifest_path: manifest_path.to_string_lossy().to_string(),
deployment_manifest_path: Some(deployment_manifest_path.to_string_lossy().to_string()),
spatial_os_url,
registration,
deployment_platform_url: Some(control_plane_url),
deployment_console_url: Some(deployment_console_url),
deployment_environment: deployment.as_ref().map(|item| item.environment.clone()),
deployment,
open_url,
}))
}
fn deployment_manifest_from_project(
project: &ProjectSchema,
project_keys: &HashMap<String, String>,
) -> PlatformDeploymentManifest {
let binary_name = project_binary_name(project);
let mut manifest = PlatformDeploymentManifest::default();
manifest.agent.name = binary_name.clone();
manifest.agent.binary = binary_name;
manifest.agent.version = if project.version.trim().is_empty() {
"1.0.0".to_string()
} else {
project.version.clone()
};
manifest.agent.description = Some(if project.description.trim().is_empty() {
format!("Deployed from ADK Studio project {}", project.name)
} else {
project.description.clone()
});
manifest.scaling.max_instances = 3;
manifest.interaction = interaction_manifest_from_project(project);
manifest.source = Some(SourceInfo {
kind: "adk_studio".to_string(),
project_id: Some(project.id.to_string()),
project_name: Some(project.name.clone()),
});
for (key, value) in &project.settings.env_vars {
manifest
.env
.insert(key.clone(), EnvVarSpec::Plain(value.clone()));
}
for key in project_keys.keys() {
manifest.env.insert(
key.clone(),
EnvVarSpec::SecretRef {
secret_ref: key.clone(),
},
);
manifest.secrets.push(DeploySecretRef {
key: key.clone(),
required: true,
});
}
manifest
}
fn interaction_manifest_from_project(project: &ProjectSchema) -> Option<InteractionConfig> {
use crate::codegen::action_nodes::{ActionNodeConfig, TriggerType};
let mut manual = None;
let mut triggers = Vec::new();
for node in project.action_nodes.values() {
let ActionNodeConfig::Trigger(trigger) = node else {
continue;
};
let description = trigger
.standard
.description
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
let name = if trigger.standard.name.trim().is_empty() {
trigger.standard.id.clone()
} else {
trigger.standard.name.clone()
};
match trigger.trigger_type {
TriggerType::Manual => {
if manual.is_none() {
let config = trigger.manual.clone().unwrap_or_default();
manual = Some(ManualInteractionConfig {
input_label: config.input_label,
default_prompt: config.default_prompt,
});
}
}
TriggerType::Webhook => {
if let Some(config) = &trigger.webhook {
triggers.push(TriggerInteractionConfig {
id: trigger.standard.id.clone(),
name,
kind: TriggerKind::Webhook,
description,
path: Some(config.path.clone()),
method: Some(config.method.clone()),
auth: Some(config.auth.clone()),
default_prompt: None,
cron: None,
timezone: None,
event_source: None,
event_type: None,
filter: None,
});
}
}
TriggerType::Schedule => {
if let Some(config) = &trigger.schedule {
triggers.push(TriggerInteractionConfig {
id: trigger.standard.id.clone(),
name,
kind: TriggerKind::Schedule,
description,
path: None,
method: None,
auth: None,
default_prompt: config.default_prompt.clone(),
cron: Some(config.cron.clone()),
timezone: Some(config.timezone.clone()),
event_source: None,
event_type: None,
filter: None,
});
}
}
TriggerType::Event => {
if let Some(config) = &trigger.event {
triggers.push(TriggerInteractionConfig {
id: trigger.standard.id.clone(),
name,
kind: TriggerKind::Event,
description,
path: None,
method: None,
auth: None,
default_prompt: None,
cron: None,
timezone: None,
event_source: Some(config.source.clone()),
event_type: Some(config.event_type.clone()),
filter: config.filter.clone(),
});
}
}
}
}
if manual.is_none() && triggers.is_empty() {
None
} else {
Some(InteractionConfig { manual, triggers })
}
}
async fn load_project_keys(
base_dir: &StdPath,
project_id: Uuid,
) -> Result<HashMap<String, String>, String> {
let keystore = Keystore::new(base_dir, project_id).map_err(|e| e.to_string())?;
keystore.load().await.map_err(|e| e.to_string())
}
fn write_generated_project(
runtime_dir: &StdPath,
generated: &crate::codegen::GeneratedProject,
) -> Result<(), String> {
std::fs::create_dir_all(runtime_dir).map_err(|e| e.to_string())?;
for file in &generated.files {
let path = runtime_dir.join(&file.path);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
}
std::fs::write(path, &file.content).map_err(|e| e.to_string())?;
}
Ok(())
}
fn project_binary_name(project: &ProjectSchema) -> String {
let mut project_name = project
.name
.to_lowercase()
.replace(' ', "_")
.replace(|c: char| !c.is_alphanumeric() && c != '_', "");
if project_name.is_empty()
|| project_name
.chars()
.next()
.map(|c| c.is_ascii_digit())
.unwrap_or(false)
{
project_name = format!("project_{project_name}");
}
project_name
}
#[derive(Serialize)]
pub struct BuildResponse {
pub success: bool,
pub output: String,
pub binary_path: Option<String>,
}
pub async fn build_project_stream(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> axum::response::Sse<
impl futures::Stream<Item = Result<axum::response::sse::Event, std::convert::Infallible>>,
> {
use axum::response::sse::Event;
use std::time::Instant;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
let stream = async_stream::stream! {
let start_time = Instant::now();
let storage = state.storage.read().await;
let project = match storage.get(id).await {
Ok(p) => p,
Err(e) => {
yield Ok(Event::default().event("error").data(e.to_string()));
return;
}
};
let generated = match crate::codegen::generate_rust_project(&project) {
Ok(g) => g,
Err(e) => {
yield Ok(Event::default().event("error").data(e.to_string()));
return;
}
};
let mut project_name = project.name.to_lowercase().replace(' ', "_").replace(|c: char| !c.is_alphanumeric() && c != '_', "");
if project_name.is_empty() || project_name.chars().next().map(|c| c.is_ascii_digit()).unwrap_or(false) {
project_name = format!("project_{}", project_name);
}
let build_dir = std::env::temp_dir().join("adk-studio-builds").join(&project_name);
if let Err(e) = std::fs::create_dir_all(&build_dir) {
yield Ok(Event::default().event("error").data(e.to_string()));
return;
}
for file in &generated.files {
let path = build_dir.join(&file.path);
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
if let Err(e) = std::fs::write(&path, &file.content) {
yield Ok(Event::default().event("error").data(e.to_string()));
return;
}
}
yield Ok(Event::default().event("status").data("Starting cargo build..."));
let shared_target = std::env::temp_dir().join("adk-studio-builds").join("_shared_target");
let _ = std::fs::create_dir_all(&shared_target);
let mut child = match Command::new("cargo")
.arg("build")
.env("CARGO_TARGET_DIR", &shared_target)
.current_dir(&build_dir)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn() {
Ok(c) => c,
Err(e) => {
yield Ok(Event::default().event("error").data(e.to_string()));
return;
}
};
let Some(stderr) = child.stderr.take() else {
yield Ok(Event::default().event("error").data("Failed to capture build stderr"));
return;
};
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
yield Ok(Event::default().event("output").data(line));
}
let status = child.wait().await;
let success = status.map(|s| s.success()).unwrap_or(false);
let elapsed = start_time.elapsed();
if success {
let binary = shared_target.join("debug").join(&project_name);
yield Ok(Event::default().event("output").data(format!("\n✓ Build completed in {:.1}s", elapsed.as_secs_f32())));
yield Ok(Event::default().event("done").data(binary.to_string_lossy()));
} else {
yield Ok(Event::default().event("output").data(format!("\n✗ Build failed after {:.1}s", elapsed.as_secs_f32())));
yield Ok(Event::default().event("error").data("Build failed"));
}
};
axum::response::Sse::new(stream)
}
pub async fn build_project(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<BuildResponse> {
let storage = state.storage.read().await;
let project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let generated = crate::codegen::generate_rust_project(&project)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let mut project_name = project
.name
.to_lowercase()
.replace(' ', "_")
.replace(|c: char| !c.is_alphanumeric() && c != '_', "");
if project_name.is_empty()
|| project_name
.chars()
.next()
.map(|c| c.is_ascii_digit())
.unwrap_or(false)
{
project_name = format!("project_{}", project_name);
}
let build_dir = std::env::temp_dir()
.join("adk-studio-builds")
.join(&project_name);
std::fs::create_dir_all(&build_dir)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
for file in &generated.files {
let path = build_dir.join(&file.path);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
std::fs::write(&path, &file.content)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
}
let shared_target = std::env::temp_dir()
.join("adk-studio-builds")
.join("_shared_target");
let _ = std::fs::create_dir_all(&shared_target);
let output = tokio::process::Command::new("cargo")
.arg("build")
.env("CARGO_TARGET_DIR", &shared_target)
.current_dir(&build_dir)
.output()
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let combined = format!("{}\n{}", stdout, stderr);
if output.status.success() {
let binary = shared_target.join("debug").join(&project_name);
Ok(Json(BuildResponse {
success: true,
output: combined,
binary_path: Some(binary.to_string_lossy().to_string()),
}))
} else {
Ok(Json(BuildResponse {
success: false,
output: combined,
binary_path: None,
}))
}
}
#[derive(Debug, Deserialize)]
pub struct ResumeRequest {
pub response: serde_json::Value,
}
#[derive(Debug, Serialize)]
pub struct ResumeResponse {
pub success: bool,
pub node_id: String,
pub message: String,
}
pub async fn resume_session(
Path(session_id): Path<String>,
Json(req): Json<ResumeRequest>,
) -> ApiResult<ResumeResponse> {
let _valid_session_id = SessionId::try_from(session_id.as_str())
.map_err(|e| err(StatusCode::BAD_REQUEST, format!("invalid session_id: {e}")))?;
let interrupted_state = INTERRUPTED_SESSIONS.get(&session_id).await.ok_or_else(|| {
err(
StatusCode::NOT_FOUND,
format!("Session '{}' not found or not interrupted", session_id),
)
})?;
let node_id = interrupted_state.node_id.clone();
let thread_id = interrupted_state.thread_id.clone();
let checkpoint_id = interrupted_state.checkpoint_id.clone();
let state_updates = deserialize_interrupt_response(req.response.clone());
tracing::info!(
session_id = %session_id,
node_id = %node_id,
thread_id = %thread_id,
checkpoint_id = %checkpoint_id,
updates = ?state_updates,
"Resuming interrupted workflow"
);
if let Err(e) = send_resume_response(&session_id, req.response.clone()).await {
tracing::warn!(
session_id = %session_id,
error = %e,
"Failed to send resume response to subprocess, session may have ended"
);
}
INTERRUPTED_SESSIONS.remove(&session_id).await;
let resume_event = ResumeEvent::new(&node_id);
tracing::info!(
session_id = %session_id,
event = %resume_event.to_json(),
"Resume event emitted"
);
Ok(Json(ResumeResponse {
success: true,
node_id,
message: format!(
"Workflow resumed. Response: {}",
serde_json::to_string(&req.response).unwrap_or_default()
),
}))
}
#[derive(Debug, Serialize)]
pub struct WebhookTriggerResponse {
pub success: bool,
pub session_id: String,
pub message: String,
pub path: String,
pub stream_url: String,
pub binary_path: Option<String>,
}
pub fn get_project_binary_path(project_name: &str) -> String {
let project_name = project_name.to_lowercase().replace(' ', "_");
let shared_target = std::env::temp_dir()
.join("adk-studio-builds")
.join("_shared_target");
let binary = shared_target.join("debug").join(&project_name);
binary.to_string_lossy().to_string()
}
pub fn is_project_built(project_name: &str) -> bool {
let binary_path = get_project_binary_path(project_name);
std::path::Path::new(&binary_path).exists()
}
fn percent_encode(s: &str) -> String {
let mut result = String::with_capacity(s.len() * 3);
for byte in s.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
result.push(byte as char);
}
_ => {
result.push_str(&format!("%{:02X}", byte));
}
}
}
result
}
pub async fn webhook_trigger(
State(state): State<AppState>,
Path((id, path)): Path<(Uuid, String)>,
headers: HeaderMap,
Json(payload): Json<serde_json::Value>,
) -> ApiResult<WebhookTriggerResponse> {
let storage = state.storage.read().await;
let project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let webhook_path = format!("/{}", path.trim_start_matches('/'));
let trigger = find_webhook_trigger(&project, &webhook_path, "POST");
if let Some(ref trigger_config) = trigger {
validate_webhook_auth(&headers, trigger_config)?;
}
let session_id = uuid::Uuid::new_v4().to_string();
store_webhook_payload(&session_id, &webhook_path, "POST", payload.clone()).await;
let binary_path = get_project_binary_path(&project.name);
let binary_exists = is_project_built(&project.name);
let stream_url = format!(
"/api/projects/{}/stream?input=__webhook__&session_id={}&binary_path={}",
id,
session_id,
percent_encode(&binary_path)
);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
notify_webhook(
&id.to_string(),
WebhookNotification {
session_id: session_id.clone(),
path: webhook_path.clone(),
method: "POST".to_string(),
payload: payload.clone(),
timestamp,
binary_path: if binary_exists {
Some(binary_path.clone())
} else {
None
},
},
)
.await;
tracing::info!(
project_id = %id,
path = %webhook_path,
session_id = %session_id,
payload = %serde_json::to_string(&payload).unwrap_or_default(),
binary_path = %binary_path,
binary_exists = %binary_exists,
"Webhook trigger received"
);
Ok(Json(WebhookTriggerResponse {
success: true,
session_id: session_id.clone(),
message: format!(
"Webhook received for path '{}'. {}{}",
webhook_path,
if trigger.is_some() {
"Trigger configuration found."
} else {
"No matching trigger found, but payload stored."
},
if !binary_exists {
" WARNING: Project not built. Build the project first."
} else {
""
}
),
path: webhook_path,
stream_url,
binary_path: if binary_exists {
Some(binary_path)
} else {
None
},
}))
}
pub async fn webhook_trigger_get(
State(state): State<AppState>,
Path((id, path)): Path<(Uuid, String)>,
headers: HeaderMap,
Query(params): Query<HashMap<String, String>>,
) -> ApiResult<WebhookTriggerResponse> {
let storage = state.storage.read().await;
let project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let webhook_path = format!("/{}", path.trim_start_matches('/'));
let trigger = find_webhook_trigger(&project, &webhook_path, "GET");
if let Some(ref trigger_config) = trigger {
validate_webhook_auth(&headers, trigger_config)?;
}
let session_id = uuid::Uuid::new_v4().to_string();
let payload = serde_json::to_value(¶ms).unwrap_or(serde_json::Value::Null);
store_webhook_payload(&session_id, &webhook_path, "GET", payload.clone()).await;
let binary_path = get_project_binary_path(&project.name);
let binary_exists = is_project_built(&project.name);
let stream_url = format!(
"/api/projects/{}/stream?input=__webhook__&session_id={}&binary_path={}",
id,
session_id,
percent_encode(&binary_path)
);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
notify_webhook(
&id.to_string(),
WebhookNotification {
session_id: session_id.clone(),
path: webhook_path.clone(),
method: "GET".to_string(),
payload: payload.clone(),
timestamp,
binary_path: if binary_exists {
Some(binary_path.clone())
} else {
None
},
},
)
.await;
tracing::info!(
project_id = %id,
path = %webhook_path,
session_id = %session_id,
params = ?params,
binary_path = %binary_path,
binary_exists = %binary_exists,
"GET Webhook trigger received"
);
Ok(Json(WebhookTriggerResponse {
success: true,
session_id: session_id.clone(),
message: format!(
"GET Webhook received for path '{}'. {}{}",
webhook_path,
if trigger.is_some() {
"Trigger configuration found."
} else {
"No matching trigger found, but payload stored."
},
if !binary_exists {
" WARNING: Project not built. Build the project first."
} else {
""
}
),
path: webhook_path,
stream_url,
binary_path: if binary_exists {
Some(binary_path)
} else {
None
},
}))
}
#[derive(Debug, Serialize)]
pub struct WebhookExecuteResponse {
pub success: bool,
pub response: Option<String>,
pub error: Option<String>,
pub session_id: String,
pub duration_ms: u64,
}
pub async fn webhook_execute(
State(state): State<AppState>,
Path((id, path)): Path<(Uuid, String)>,
headers: HeaderMap,
Json(payload): Json<serde_json::Value>,
) -> ApiResult<WebhookExecuteResponse> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::Command;
let start_time = std::time::Instant::now();
let storage = state.storage.read().await;
let project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let webhook_path = format!("/{}", path.trim_start_matches('/'));
let trigger = find_webhook_trigger(&project, &webhook_path, "POST");
if let Some(ref trigger_config) = trigger {
validate_webhook_auth(&headers, trigger_config)?;
}
let binary_path = get_project_binary_path(&project.name);
if !is_project_built(&project.name) {
return Ok(Json(WebhookExecuteResponse {
success: false,
response: None,
error: Some("Project not built. Build the project first using the UI.".to_string()),
session_id: String::new(),
duration_ms: start_time.elapsed().as_millis() as u64,
}));
}
let session_id = uuid::Uuid::new_v4().to_string();
tracing::info!(
project_id = %id,
path = %webhook_path,
session_id = %session_id,
payload = %serde_json::to_string(&payload).unwrap_or_default(),
"Executing webhook synchronously"
);
let api_key = std::env::var("GOOGLE_API_KEY").unwrap_or_default();
let mut child = match Command::new(&binary_path)
.arg(&session_id)
.env("GOOGLE_API_KEY", &api_key)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(e) => {
return Ok(Json(WebhookExecuteResponse {
success: false,
response: None,
error: Some(format!("Failed to start workflow: {}", e)),
session_id,
duration_ms: start_time.elapsed().as_millis() as u64,
}));
}
};
let input = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
if let Some(stdin) = child.stdin.take() {
let mut writer = BufWriter::new(stdin);
if let Err(e) = writer.write_all(format!("{}\n", input).as_bytes()).await {
return Ok(Json(WebhookExecuteResponse {
success: false,
response: None,
error: Some(format!("Failed to send input: {}", e)),
session_id,
duration_ms: start_time.elapsed().as_millis() as u64,
}));
}
if let Err(e) = writer.flush().await {
return Ok(Json(WebhookExecuteResponse {
success: false,
response: None,
error: Some(format!("Failed to flush input: {}", e)),
session_id,
duration_ms: start_time.elapsed().as_millis() as u64,
}));
}
}
let mut response_text = String::new();
if let Some(stdout) = child.stdout.take() {
let mut reader = BufReader::new(stdout);
let timeout = tokio::time::Duration::from_secs(60);
let deadline = tokio::time::Instant::now() + timeout;
loop {
if tokio::time::Instant::now() > deadline {
let _ = child.kill().await;
return Ok(Json(WebhookExecuteResponse {
success: false,
response: None,
error: Some("Execution timeout (60s)".to_string()),
session_id,
duration_ms: start_time.elapsed().as_millis() as u64,
}));
}
let mut line = String::new();
match tokio::time::timeout(
tokio::time::Duration::from_millis(100),
reader.read_line(&mut line),
)
.await
{
Ok(Ok(0)) => break, Ok(Ok(_)) => {
let line = line.trim_start_matches("> ");
if let Some(response) = line.strip_prefix("RESPONSE:") {
response_text = serde_json::from_str::<String>(response)
.unwrap_or_else(|_| response.to_string());
break;
} else if let Some(chunk) = line.strip_prefix("CHUNK:") {
let decoded = serde_json::from_str::<String>(chunk)
.unwrap_or_else(|_| chunk.to_string());
response_text.push_str(&decoded);
}
}
Ok(Err(_)) => break, Err(_) => continue, }
}
}
let _ = child.kill().await;
let duration_ms = start_time.elapsed().as_millis() as u64;
tracing::info!(
project_id = %id,
session_id = %session_id,
duration_ms = %duration_ms,
response_len = %response_text.len(),
"Webhook execution complete"
);
Ok(Json(WebhookExecuteResponse {
success: true,
response: if response_text.is_empty() {
None
} else {
Some(response_text)
},
error: None,
session_id,
duration_ms,
}))
}
#[derive(Debug, Clone)]
struct WebhookTriggerConfig {
auth: String,
header_name: Option<String>,
token_env_var: Option<String>,
}
fn find_webhook_trigger(
project: &ProjectSchema,
path: &str,
method: &str,
) -> Option<WebhookTriggerConfig> {
use crate::codegen::action_nodes::{ActionNodeConfig, TriggerType};
for node in project.action_nodes.values() {
if let ActionNodeConfig::Trigger(trigger_config) = node {
if trigger_config.trigger_type == TriggerType::Webhook {
if let Some(webhook) = &trigger_config.webhook {
let normalized_path = path.trim_start_matches('/');
let normalized_webhook_path = webhook.path.trim_start_matches('/');
if normalized_path == normalized_webhook_path && webhook.method == method {
return Some(WebhookTriggerConfig {
auth: webhook.auth.clone(),
header_name: webhook
.auth_config
.as_ref()
.and_then(|c| c.header_name.clone()),
token_env_var: webhook
.auth_config
.as_ref()
.and_then(|c| c.token_env_var.clone()),
});
}
}
}
}
}
None
}
fn validate_webhook_auth(
headers: &HeaderMap,
config: &WebhookTriggerConfig,
) -> Result<(), (StatusCode, Json<ApiError>)> {
match config.auth.as_str() {
"bearer" => {
let auth_header = headers.get("Authorization").and_then(|v| v.to_str().ok());
match auth_header {
Some(header) if header.starts_with("Bearer ") => {
let token = header.trim_start_matches("Bearer ");
if token.is_empty() {
return Err(err(StatusCode::UNAUTHORIZED, "Empty bearer token"));
}
if let Some(env_var) = &config.token_env_var {
if let Ok(expected_token) = std::env::var(env_var) {
if token != expected_token {
return Err(err(StatusCode::UNAUTHORIZED, "Invalid bearer token"));
}
}
}
Ok(())
}
Some(_) => Err(err(
StatusCode::UNAUTHORIZED,
"Invalid Authorization header format. Expected: Bearer <token>",
)),
None => Err(err(
StatusCode::UNAUTHORIZED,
"Missing Authorization header",
)),
}
}
"api_key" => {
let header_name = config.header_name.as_deref().unwrap_or("X-API-Key");
let api_key = headers.get(header_name).and_then(|v| v.to_str().ok());
match api_key {
Some(key) if !key.is_empty() => {
if let Some(env_var) = &config.token_env_var {
if let Ok(expected_key) = std::env::var(env_var) {
if key != expected_key {
return Err(err(StatusCode::UNAUTHORIZED, "Invalid API key"));
}
}
}
Ok(())
}
Some(_) => Err(err(StatusCode::UNAUTHORIZED, "Empty API key")),
None => Err(err(
StatusCode::UNAUTHORIZED,
format!("Missing {} header", header_name),
)),
}
}
_ => Ok(()),
}
}
lazy_static::lazy_static! {
static ref WEBHOOK_PAYLOADS: tokio::sync::RwLock<HashMap<String, WebhookPayload>> =
tokio::sync::RwLock::new(HashMap::new());
}
#[derive(Debug, Clone, Serialize)]
pub struct WebhookPayload {
pub path: String,
pub method: String,
pub payload: serde_json::Value,
pub timestamp: u64,
}
async fn store_webhook_payload(
session_id: &str,
path: &str,
method: &str,
payload: serde_json::Value,
) {
let mut payloads = WEBHOOK_PAYLOADS.write().await;
payloads.insert(
session_id.to_string(),
WebhookPayload {
path: path.to_string(),
method: method.to_string(),
payload,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
},
);
}
pub async fn get_webhook_payload(session_id: &str) -> Option<WebhookPayload> {
let mut payloads = WEBHOOK_PAYLOADS.write().await;
payloads.remove(session_id)
}
#[allow(dead_code)]
pub async fn has_webhook_payload(session_id: &str) -> bool {
let payloads = WEBHOOK_PAYLOADS.read().await;
payloads.contains_key(session_id)
}
lazy_static::lazy_static! {
static ref WEBHOOK_CHANNELS: tokio::sync::RwLock<HashMap<String, tokio::sync::broadcast::Sender<WebhookNotification>>> =
tokio::sync::RwLock::new(HashMap::new());
}
#[derive(Debug, Clone, Serialize)]
pub struct WebhookNotification {
pub session_id: String,
pub path: String,
pub method: String,
pub payload: serde_json::Value,
pub timestamp: u64,
pub binary_path: Option<String>,
}
async fn get_webhook_channel(
project_id: &str,
) -> tokio::sync::broadcast::Sender<WebhookNotification> {
let mut channels = WEBHOOK_CHANNELS.write().await;
if let Some(sender) = channels.get(project_id) {
sender.clone()
} else {
let (sender, _) = tokio::sync::broadcast::channel(16);
channels.insert(project_id.to_string(), sender.clone());
sender
}
}
pub async fn notify_webhook(project_id: &str, notification: WebhookNotification) {
let sender = get_webhook_channel(project_id).await;
let _ = sender.send(notification);
}
pub async fn subscribe_webhook_notifications(
project_id: &str,
) -> tokio::sync::broadcast::Receiver<WebhookNotification> {
let sender = get_webhook_channel(project_id).await;
sender.subscribe()
}
#[derive(Debug, Deserialize)]
pub struct EventTriggerRequest {
pub source: String,
#[serde(rename = "eventType")]
pub event_type: String,
#[serde(default)]
pub data: serde_json::Value,
}
#[derive(Debug, Serialize)]
pub struct EventTriggerResponse {
pub success: bool,
pub session_id: String,
pub message: String,
pub source: String,
pub event_type: String,
pub stream_url: String,
pub binary_path: Option<String>,
}
#[derive(Debug, Clone)]
struct EventTriggerConfig {
source: String,
event_type: String,
filter: Option<String>,
}
fn find_event_trigger(
project: &ProjectSchema,
source: &str,
event_type: &str,
) -> Option<EventTriggerConfig> {
use crate::codegen::action_nodes::{ActionNodeConfig, TriggerType};
for node in project.action_nodes.values() {
if let ActionNodeConfig::Trigger(trigger_config) = node {
if trigger_config.trigger_type == TriggerType::Event {
if let Some(event) = &trigger_config.event {
let source_matches = event.source.is_empty() || event.source == source;
let type_matches =
event.event_type.is_empty() || event.event_type == event_type;
if source_matches && type_matches {
return Some(EventTriggerConfig {
source: event.source.clone(),
event_type: event.event_type.clone(),
filter: event.filter.clone(),
});
}
}
}
}
}
None
}
fn apply_event_filter(filter: Option<&str>, data: &serde_json::Value) -> bool {
match filter {
None | Some("") => true, Some(filter_expr) => {
if let Some(eq_pos) = filter_expr.find("==") {
let path_part = filter_expr[..eq_pos].trim();
let value_part = filter_expr[eq_pos + 2..]
.trim()
.trim_matches('\'')
.trim_matches('"');
let path_parts: Vec<&str> = path_part.trim_start_matches("$.").split('.').collect();
let mut current = data;
for part in path_parts {
match current.get(part) {
Some(v) => current = v,
None => return false,
}
}
match current {
serde_json::Value::String(s) => s == value_part,
serde_json::Value::Number(n) => n.to_string() == value_part,
serde_json::Value::Bool(b) => b.to_string() == value_part,
_ => false,
}
} else {
tracing::warn!(filter = %filter_expr, "Unsupported filter expression, allowing event");
true
}
}
}
}
pub async fn event_trigger(
State(state): State<AppState>,
Path(id): Path<Uuid>,
Json(req): Json<EventTriggerRequest>,
) -> ApiResult<EventTriggerResponse> {
let storage = state.storage.read().await;
let project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let trigger = find_event_trigger(&project, &req.source, &req.event_type);
let trigger_config = match trigger {
Some(config) => config,
None => {
return Err(err(
StatusCode::NOT_FOUND,
format!(
"No event trigger found matching source='{}' eventType='{}'",
req.source, req.event_type
),
));
}
};
let filter_data = serde_json::json!({
"source": req.source,
"eventType": req.event_type,
"data": req.data,
});
if !apply_event_filter(trigger_config.filter.as_deref(), &filter_data) {
return Err(err(
StatusCode::BAD_REQUEST,
format!(
"Event data did not match filter: {}",
trigger_config.filter.as_deref().unwrap_or("")
),
));
}
let session_id = uuid::Uuid::new_v4().to_string();
let event_payload = serde_json::json!({
"source": req.source,
"eventType": req.event_type,
"data": req.data,
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
});
store_webhook_payload(
&session_id,
&format!("event:{}/{}", req.source, req.event_type),
"EVENT",
event_payload.clone(),
)
.await;
let binary_path = get_project_binary_path(&project.name);
let binary_exists = is_project_built(&project.name);
let stream_url = format!(
"/api/projects/{}/stream?input=__webhook__&session_id={}&binary_path={}",
id,
session_id,
percent_encode(&binary_path)
);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
notify_webhook(
&id.to_string(),
WebhookNotification {
session_id: session_id.clone(),
path: format!("event:{}/{}", req.source, req.event_type),
method: "EVENT".to_string(),
payload: event_payload.clone(),
timestamp,
binary_path: if binary_exists {
Some(binary_path.clone())
} else {
None
},
},
)
.await;
tracing::info!(
project_id = %id,
source = %req.source,
event_type = %req.event_type,
session_id = %session_id,
data = %serde_json::to_string(&req.data).unwrap_or_default(),
binary_path = %binary_path,
binary_exists = %binary_exists,
"Event trigger received"
);
Ok(Json(EventTriggerResponse {
success: true,
session_id: session_id.clone(),
message: format!(
"Event accepted. Trigger matched: {}/{}{}",
if trigger_config.source.is_empty() {
"*"
} else {
&trigger_config.source
},
if trigger_config.event_type.is_empty() {
"*"
} else {
&trigger_config.event_type
},
if !binary_exists {
". WARNING: Project not built. Build the project first."
} else {
""
}
),
source: req.source,
event_type: req.event_type,
stream_url,
binary_path: if binary_exists {
Some(binary_path)
} else {
None
},
}))
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DetectedKeyEntry {
pub name: String,
pub status: String,
pub masked: Option<String>,
}
#[derive(Serialize)]
pub struct DetectedKeysResponse {
pub keys: Vec<DetectedKeyEntry>,
}
pub async fn detected_keys() -> ApiResult<DetectedKeysResponse> {
let keys = KNOWN_PROVIDER_KEYS
.iter()
.map(|&name| {
let (status, masked) = match std::env::var(name) {
Ok(val) if !val.is_empty() => ("detected".to_string(), Some(mask_value(&val))),
_ => ("not_set".to_string(), None),
};
DetectedKeyEntry {
name: name.to_string(),
status,
masked,
}
})
.collect();
Ok(Json(DetectedKeysResponse { keys }))
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ProjectKeyEntry {
pub name: String,
pub source: String,
pub masked: Option<String>,
}
#[derive(Serialize)]
pub struct ProjectKeysResponse {
pub keys: Vec<ProjectKeyEntry>,
}
pub async fn get_project_keys(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<ProjectKeysResponse> {
let storage = state.storage.read().await;
let mut project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let keystore = Keystore::new(storage.base_dir(), id)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let _ = migrate_project_keys(&storage, &keystore, &mut project).await;
let stored_keys = keystore
.load()
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let keys = KNOWN_PROVIDER_KEYS
.iter()
.map(|&name| {
if let Some(val) = stored_keys.get(name) {
ProjectKeyEntry {
name: name.to_string(),
source: "project".to_string(),
masked: Some(mask_value(val)),
}
} else if let Ok(val) = std::env::var(name) {
if !val.is_empty() {
ProjectKeyEntry {
name: name.to_string(),
source: "environment".to_string(),
masked: Some(mask_value(&val)),
}
} else {
ProjectKeyEntry {
name: name.to_string(),
source: "not_set".to_string(),
masked: None,
}
}
} else {
ProjectKeyEntry {
name: name.to_string(),
source: "not_set".to_string(),
masked: None,
}
}
})
.collect();
Ok(Json(ProjectKeysResponse { keys }))
}
#[derive(Deserialize)]
pub struct SaveProjectKeysRequest {
pub keys: HashMap<String, String>,
}
pub async fn save_project_keys(
State(state): State<AppState>,
Path(id): Path<Uuid>,
Json(body): Json<SaveProjectKeysRequest>,
) -> ApiResult<ProjectKeysResponse> {
let invalid_names: Vec<&String> = body
.keys
.keys()
.filter(|name| !is_sensitive_key(name))
.collect();
if !invalid_names.is_empty() {
return Err(err(
StatusCode::BAD_REQUEST,
format!(
"Unknown key names: {}. Only known provider key patterns are accepted.",
invalid_names
.iter()
.map(|n| n.as_str())
.collect::<Vec<_>>()
.join(", ")
),
));
}
let storage = state.storage.read().await;
let _project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let keystore = Keystore::new(storage.base_dir(), id)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let mut stored_keys = keystore
.load()
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
for (name, value) in &body.keys {
stored_keys.insert(name.clone(), value.clone());
}
keystore
.save(&stored_keys)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let keys = KNOWN_PROVIDER_KEYS
.iter()
.map(|&name| {
if let Some(val) = stored_keys.get(name) {
ProjectKeyEntry {
name: name.to_string(),
source: "project".to_string(),
masked: Some(mask_value(val)),
}
} else if let Ok(val) = std::env::var(name) {
if !val.is_empty() {
ProjectKeyEntry {
name: name.to_string(),
source: "environment".to_string(),
masked: Some(mask_value(&val)),
}
} else {
ProjectKeyEntry {
name: name.to_string(),
source: "not_set".to_string(),
masked: None,
}
}
} else {
ProjectKeyEntry {
name: name.to_string(),
source: "not_set".to_string(),
masked: None,
}
}
})
.collect();
Ok(Json(ProjectKeysResponse { keys }))
}
pub async fn delete_project_key(
State(state): State<AppState>,
Path((id, key_name)): Path<(Uuid, String)>,
) -> ApiResult<ProjectKeysResponse> {
let storage = state.storage.read().await;
let _project = storage
.get(id)
.await
.map_err(|e| err(StatusCode::NOT_FOUND, e.to_string()))?;
let keystore = Keystore::new(storage.base_dir(), id)
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
keystore
.remove(&key_name)
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let stored_keys = keystore
.load()
.await
.map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let keys = KNOWN_PROVIDER_KEYS
.iter()
.map(|&name| {
if let Some(val) = stored_keys.get(name) {
ProjectKeyEntry {
name: name.to_string(),
source: "project".to_string(),
masked: Some(mask_value(val)),
}
} else if let Ok(val) = std::env::var(name) {
if !val.is_empty() {
ProjectKeyEntry {
name: name.to_string(),
source: "environment".to_string(),
masked: Some(mask_value(&val)),
}
} else {
ProjectKeyEntry {
name: name.to_string(),
source: "not_set".to_string(),
masked: None,
}
}
} else {
ProjectKeyEntry {
name: name.to_string(),
source: "not_set".to_string(),
masked: None,
}
}
})
.collect();
Ok(Json(ProjectKeysResponse { keys }))
}
#[cfg(test)]
mod tests {
use super::deployment_manifest_from_project;
use crate::schema::ProjectSchema;
use serde_json::json;
use std::collections::HashMap;
#[test]
fn deployment_manifest_carries_studio_trigger_metadata() {
let mut project = ProjectSchema::new("Console Ready");
project.action_nodes.insert(
"manual_trigger".to_string(),
serde_json::from_value(json!({
"type": "trigger",
"id": "manual_trigger",
"name": "Manual start",
"triggerType": "manual",
"manual": {
"inputLabel": "Ask the ops agent",
"defaultPrompt": "Summarize the latest rollout."
},
"errorHandling": {"mode": "stop"},
"tracing": {"enabled": false, "logLevel": "error"},
"callbacks": {},
"execution": {"timeout": 30000},
"mapping": {"outputKey": "result"}
}))
.unwrap(),
);
project.action_nodes.insert(
"webhook_trigger".to_string(),
serde_json::from_value(json!({
"type": "trigger",
"id": "webhook_trigger",
"name": "Inbound webhook",
"description": "Receives external payloads.",
"triggerType": "webhook",
"webhook": {
"path": "/api/webhook/ingest",
"method": "POST",
"auth": "bearer",
"authConfig": {
"tokenEnvVar": "WEBHOOK_TOKEN"
}
},
"errorHandling": {"mode": "stop"},
"tracing": {"enabled": false, "logLevel": "error"},
"callbacks": {},
"execution": {"timeout": 30000},
"mapping": {"outputKey": "result"}
}))
.unwrap(),
);
let manifest = deployment_manifest_from_project(&project, &HashMap::new());
let interaction = manifest.interaction.expect("studio interaction metadata");
let manual = interaction.manual.expect("manual config");
assert_eq!(manual.input_label, "Ask the ops agent");
assert_eq!(manual.default_prompt, "Summarize the latest rollout.");
assert_eq!(interaction.triggers.len(), 1);
assert_eq!(
interaction.triggers[0].path.as_deref(),
Some("/api/webhook/ingest")
);
assert_eq!(interaction.triggers[0].method.as_deref(), Some("POST"));
}
}