use axum::{
Json,
extract::{Path, Query, State},
http::StatusCode,
};
use serde::Deserialize;
use crate::crypto::{SealedEnvelope, sealed_seal};
use crate::db::models::{CredentialCreateRequest, CredentialListResponse, CredentialResponse};
use crate::error::{AppError, AppResult};
use crate::services::{CredentialService, RuntimeService};
#[derive(Debug, Deserialize, Default)]
pub struct ListCredentialsQuery {
#[serde(rename = "type")]
pub credential_type: Option<String>,
pub q: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
pub struct GetCredentialQuery {
#[serde(default)]
pub include_data: bool,
pub execution_id: Option<i64>,
pub parent_execution_id: Option<i64>,
}
pub async fn create_or_update(
service: State<CredentialService>,
request: Json<CredentialCreateRequest>,
) -> AppResult<(StatusCode, Json<CredentialResponse>)> {
let started_at = std::time::Instant::now();
let result = create_or_update_inner(service, request).await;
let status_label = if result.is_ok() { "ok" } else { "error" };
crate::metrics::record_write_request(
crate::metrics::endpoint::CREDENTIALS_UPSERT,
status_label,
started_at.elapsed().as_secs_f64(),
);
result
}
async fn create_or_update_inner(
State(service): State<CredentialService>,
Json(request): Json<CredentialCreateRequest>,
) -> AppResult<(StatusCode, Json<CredentialResponse>)> {
let response = service.create_or_update(request).await?;
Ok((StatusCode::OK, Json(response)))
}
pub async fn list(
State(service): State<CredentialService>,
Query(query): Query<ListCredentialsQuery>,
) -> AppResult<Json<CredentialListResponse>> {
let response = service
.list(query.credential_type.as_deref(), query.q.as_deref())
.await?;
Ok(Json(response))
}
pub async fn get(
State(service): State<CredentialService>,
Path(identifier): Path<String>,
Query(query): Query<GetCredentialQuery>,
) -> AppResult<Json<CredentialResponse>> {
let response = service
.get(&identifier, query.include_data, query.execution_id)
.await?;
Ok(Json(response))
}
pub async fn delete(
State(service): State<CredentialService>,
Path(identifier): Path<String>,
) -> AppResult<Json<serde_json::Value>> {
let id = service.delete(&identifier).await?;
Ok(Json(serde_json::json!({
"message": "Credential deleted successfully",
"id": id
})))
}
#[derive(Clone)]
pub struct SealedCredentialDeps {
pub credentials: CredentialService,
pub runtime: RuntimeService,
}
#[derive(Debug, Deserialize, Default)]
pub struct GetSealedCredentialQuery {
pub worker_id: String,
pub execution_id: Option<i64>,
pub parent_execution_id: Option<i64>,
}
pub async fn get_sealed(
State(deps): State<SealedCredentialDeps>,
Path(identifier): Path<String>,
Query(query): Query<GetSealedCredentialQuery>,
) -> AppResult<Json<SealedEnvelope>> {
let span = tracing::info_span!(
"credential.seal",
worker_id = %query.worker_id,
identifier = %identifier,
execution_id = query.execution_id,
);
let _guard = span.enter();
let pubkey_bytes = match deps.runtime.get_worker_public_key(&query.worker_id).await {
Ok(Some(b)) => b,
Ok(None) => {
crate::metrics::record_credential_seal("no_pubkey");
return Err(AppError::BadRequest(format!(
"worker '{}' did not register a sealing pubkey (worker_public_key \
missing from the noetl.runtime row, or the worker_pool row \
doesn't exist)",
query.worker_id
)));
}
Err(e) => {
crate::metrics::record_credential_seal("worker_not_found");
return Err(e);
}
};
let pubkey = x25519_dalek::PublicKey::from(pubkey_bytes);
let credential = match deps
.credentials
.get(&identifier, true, query.execution_id)
.await
{
Ok(c) => c,
Err(AppError::ResidencyViolation {
credential,
entry_region,
server_region,
}) => {
let registry = crate::secrets::broker::registry();
let Some(broker_url) = registry.broker_for(&entry_region) else {
crate::metrics::record_credential_seal("residency_violation");
return Err(AppError::ResidencyViolation {
credential,
entry_region,
server_region,
});
};
tracing::info!(
worker_id = %query.worker_id,
credential = %credential,
entry_region = %entry_region,
broker_url = %broker_url,
"credential.cross_region.fallback"
);
let started = std::time::Instant::now();
let client = match crate::secrets::broker::BrokerClient::new() {
Ok(c) => c,
Err(e) => {
crate::metrics::record_cross_region_broker_call(&entry_region, "unreachable");
return Err(e);
}
};
let body = crate::secrets::broker::CrossRegionResolveRequest {
alias: identifier.clone(),
worker_public_key_b64: {
use base64::Engine as _;
base64::engine::general_purpose::STANDARD.encode(pubkey_bytes)
},
worker_id: query.worker_id.clone(),
execution_id: query.execution_id,
parent_execution_id: query.parent_execution_id,
expected_entry_region: entry_region.clone(),
requesting_region: server_region.clone(),
};
let envelope = match client.resolve(broker_url, &body).await {
Ok(e) => e,
Err(e) => {
crate::metrics::record_cross_region_broker_call_duration(
&entry_region,
started.elapsed().as_secs_f64(),
);
crate::metrics::record_cross_region_broker_call(&entry_region, "unreachable");
return Err(e);
}
};
crate::metrics::record_cross_region_broker_call_duration(
&entry_region,
started.elapsed().as_secs_f64(),
);
crate::metrics::record_cross_region_broker_call(&entry_region, "ok");
crate::metrics::record_credential_seal("ok_via_broker");
return Ok(Json(envelope));
}
Err(e) => {
crate::metrics::record_credential_seal("credential_error");
return Err(e);
}
};
let plaintext = serde_json::to_vec(&credential).map_err(|e| {
crate::metrics::record_credential_seal("seal_error");
AppError::Internal(format!("sealed get: serialize credential: {e}"))
})?;
let envelope = sealed_seal(&pubkey, &plaintext).inspect_err(|_| {
crate::metrics::record_credential_seal("seal_error");
})?;
crate::metrics::record_credential_seal("ok");
Ok(Json(envelope))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::sealed_open;
use x25519_dalek::{PublicKey, StaticSecret};
#[test]
fn sealed_credential_round_trips_via_primitives() {
let recipient_sk = StaticSecret::random_from_rng(rand_core::OsRng);
let recipient_pk = PublicKey::from(&recipient_sk);
let credential = serde_json::json!({
"id": "1234567890",
"name": "duffel-token",
"type": "bearer",
"data": { "token": "sk-test-AbCdEf123" }
});
let plaintext = serde_json::to_vec(&credential).unwrap();
let envelope = sealed_seal(&recipient_pk, &plaintext).unwrap();
let opened = sealed_open(&recipient_sk, &envelope).unwrap();
let opened_json: serde_json::Value = serde_json::from_slice(&opened).unwrap();
assert_eq!(opened_json, credential);
}
#[test]
fn tampered_sealed_credential_is_rejected() {
use base64::{Engine as _, engine::general_purpose::STANDARD as B64};
let recipient_sk = StaticSecret::random_from_rng(rand_core::OsRng);
let recipient_pk = PublicKey::from(&recipient_sk);
let plaintext = br#"{"id":"1","name":"x","type":"bearer","data":{"token":"t"}}"#;
let mut envelope = sealed_seal(&recipient_pk, plaintext).unwrap();
let mut ct = B64.decode(&envelope.ciphertext).unwrap();
ct[0] ^= 0x01;
envelope.ciphertext = B64.encode(&ct);
let err = sealed_open(&recipient_sk, &envelope).unwrap_err();
assert!(format!("{err:?}").contains("AEAD verify/decrypt"));
}
}