use axum::{
extract::{Path, State},
Json,
};
use serde::Serialize;
use serde_json::Value;
use crate::error::{AppError, AppResult};
use crate::handlers::internal::RequireInternalApiToken;
use crate::services::CredentialService;
use crate::state::AppState;
#[derive(Clone)]
pub struct IngressDeps {
pub state: AppState,
pub credentials: CredentialService,
}
#[derive(Debug, Serialize)]
pub struct VerifyConfig {
#[serde(rename = "type")]
pub verify_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub header: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub secret: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub audience: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub service_account: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct DispatchConfig {
pub playbook: String,
pub payload_from: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_pool: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct IngressConfig {
pub listener: String,
pub catalog_path: String,
pub source: String,
pub subscription_id: String,
pub verify: VerifyConfig,
pub dispatch: DispatchConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub directives: Option<Value>,
}
pub async fn get_ingress_config(
_auth: RequireInternalApiToken,
State(deps): State<IngressDeps>,
Path(listener): Path<String>,
) -> AppResult<Json<IngressConfig>> {
let span = tracing::info_span!("gateway.ingress.config", listener = %listener);
let _g = span.enter();
let (catalog_path, spec) = resolve_subscription_by_listener(&deps.state, &listener)
.await?
.ok_or_else(|| {
AppError::NotFound(format!("No push subscription for ingress listener '{}'", listener))
})?;
let ingress = spec
.get("ingress")
.ok_or_else(|| AppError::Validation("subscription has no 'ingress' block".into()))?;
let verify_block = ingress
.get("verify")
.ok_or_else(|| AppError::Validation("subscription ingress has no 'verify' block".into()))?;
let verify_type = verify_block
.get("type")
.and_then(|v| v.as_str())
.ok_or_else(|| AppError::Validation("ingress.verify.type missing".into()))?
.to_string();
let header = verify_block
.get("header")
.and_then(|v| v.as_str())
.map(|s| s.to_ascii_lowercase());
let audience = verify_block
.get("audience")
.and_then(|v| v.as_str())
.map(str::to_string);
let service_account = verify_block
.get("service_account")
.and_then(|v| v.as_str())
.map(str::to_string);
let secret = match verify_type.as_str() {
"hmac_sha256" | "bearer" => {
let alias = verify_block
.get("secret")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.ok_or_else(|| {
AppError::Validation(format!(
"ingress.verify.secret (Wallet alias) is required for '{}'",
verify_type
))
})?;
Some(resolve_secret_alias(&deps.credentials, alias).await?)
}
"pubsub_oidc" => None,
other => {
return Err(AppError::Validation(format!(
"unsupported ingress.verify.type '{}'",
other
)))
}
};
let dispatch_block = spec
.get("dispatch")
.ok_or_else(|| AppError::Validation("subscription has no 'dispatch' block".into()))?;
let playbook = dispatch_block
.get("playbook")
.and_then(|v| v.as_str())
.ok_or_else(|| AppError::Validation("dispatch.playbook missing".into()))?
.to_string();
let payload_from = dispatch_block
.get("payload_from")
.and_then(|v| v.as_str())
.unwrap_or("message.json")
.to_string();
let execution_pool = dispatch_block
.get("execution_pool")
.and_then(|v| v.as_str())
.map(str::to_string);
let directives = spec
.get("headers")
.map(|h| serde_json::to_value(h).unwrap_or(Value::Null))
.filter(|v| !v.is_null());
let registered =
crate::handlers::subscription::ensure_registered(&deps.state, &catalog_path).await?;
tracing::info!(
listener = %listener,
catalog_path = %catalog_path,
subscription_id = %registered.subscription_id,
verify_type = %verify_type,
"Resolved push-ingress config for gateway"
);
let source = spec
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("webhook")
.to_string();
Ok(Json(IngressConfig {
listener,
catalog_path,
source,
subscription_id: registered.subscription_id,
verify: VerifyConfig {
verify_type,
header,
secret,
audience,
service_account,
},
dispatch: DispatchConfig {
playbook,
payload_from,
execution_pool,
},
directives,
}))
}
async fn resolve_subscription_by_listener(
state: &AppState,
listener: &str,
) -> AppResult<Option<(String, serde_yaml::Value)>> {
let rows: Vec<(String, String)> = sqlx::query_as(
r#"
SELECT DISTINCT ON (path) path, content
FROM noetl.catalog
WHERE LOWER(kind) = 'subscription'
ORDER BY path, version DESC
"#,
)
.fetch_all(state.pools.cluster())
.await?;
for (path, content) in rows {
let Ok(spec_doc) = serde_yaml::from_str::<serde_yaml::Value>(&content) else {
continue;
};
let Some(spec) = spec_doc.get("spec") else {
continue;
};
let gateway_path = spec
.get("ingress")
.and_then(|i| i.get("gateway_path"))
.and_then(|v| v.as_str());
let Some(gp) = gateway_path else { continue };
if listener_matches(gp, listener) {
return Ok(Some((path, spec.clone())));
}
}
Ok(None)
}
fn listener_matches(gateway_path: &str, listener: &str) -> bool {
let trimmed = gateway_path.trim_end_matches('/');
trimmed
.rsplit('/')
.next()
.map(|seg| seg == listener)
.unwrap_or(false)
}
async fn resolve_secret_alias(credentials: &CredentialService, alias: &str) -> AppResult<String> {
let resolved = credentials.get(alias, true, None).await?;
let data = resolved.data.ok_or_else(|| {
AppError::Internal(format!("verify secret alias '{}' resolved with no data", alias))
})?;
let value = match &data {
Value::String(s) => Some(s.clone()),
Value::Object(map) => ["secret", "token", "value", "key", "password"]
.iter()
.find_map(|k| map.get(*k).and_then(|v| v.as_str()).map(str::to_string)),
_ => None,
};
value.ok_or_else(|| {
AppError::Validation(format!(
"verify secret alias '{}' did not yield a string secret (expected a bare string or a \
'secret'/'token'/'value'/'key'/'password' field)",
alias
))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn listener_match_trailing_segment() {
assert!(listener_matches("/ingress/stripe", "stripe"));
assert!(listener_matches("/ingress/stripe/", "stripe"));
assert!(listener_matches("/stripe", "stripe"));
assert!(listener_matches("/api/hooks/billing", "billing"));
}
#[test]
fn listener_no_false_match() {
assert!(!listener_matches("/ingress/stripe", "billing"));
assert!(!listener_matches("/ingress/stripe-events", "stripe"));
}
}