use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use futures_util::future::join_all;
use greentic_deploy_spec::{DeploymentId, Environment, MessagingEndpoint, SecretRef};
use serde_json::{Value, json};
use crate::http_routes::{HttpRouteDescriptor, INGEST_HTTP_OP, derive_provider_name};
use crate::operator_log;
use crate::revision_serve::Activation;
use crate::secrets_gate::DynSecretsManager;
use crate::webhook_secret_resolver::secret_ref_to_store_uri;
const SETUP_WEBHOOK_OP: &str = "setup_webhook";
const SETUP_WEBHOOK_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, PartialEq)]
enum SecretTokenSource {
WebhookSecretRef(SecretRef),
ProviderIdFallback(String),
None,
}
#[derive(Debug, Clone, PartialEq)]
struct WebhookRegistration {
tenant: String,
deployment_id: DeploymentId,
bundle_id: greentic_deploy_spec::BundleId,
revision_id: greentic_deploy_spec::RevisionId,
provider_type: String,
provider_name: String,
webhook_url: String,
secret_token_source: SecretTokenSource,
instance_id: String,
extra_endpoints: usize,
}
impl WebhookRegistration {
fn payload(&self, resolved_secret_token: Option<&str>) -> Vec<u8> {
let mut body = json!({
"webhook_url": self.webhook_url,
"tenant": self.tenant,
"provider_instance_id": self.instance_id,
});
if let Some(token) = resolved_secret_token {
body["secret_token"] = json!(token);
}
serde_json::to_vec(&body).unwrap_or_default()
}
}
pub(crate) async fn register_new_model_webhooks(
activation: &Activation,
environment: &Environment,
public_base_url: Option<&str>,
) {
let Some(base) = public_base_url else {
operator_log::info(
module_path!(),
"skipping webhook auto-registration: no configured PUBLIC_BASE_URL \
(register the provider webhook manually)",
);
return;
};
let tenant_by_deployment: HashMap<DeploymentId, String> = environment
.bundles
.iter()
.map(|d| {
(
d.deployment_id,
d.route_binding.tenant_selector.tenant.clone(),
)
})
.collect();
let plans = plan_webhook_registrations(
activation.routing.http_routes.routes(),
&environment.messaging_endpoints,
&tenant_by_deployment,
base,
);
if plans.is_empty() {
operator_log::info(
module_path!(),
"webhook auto-registration: no provider-ingest routes to register",
);
return;
}
let secrets = activation.host.secrets_manager();
join_all(
plans
.into_iter()
.map(|plan| register_one(activation, &secrets, plan)),
)
.await;
}
async fn register_one(
activation: &Activation,
secrets: &DynSecretsManager,
plan: WebhookRegistration,
) {
if plan.extra_endpoints > 0 {
operator_log::warn(
module_path!(),
format!(
"provider={} bundle has {} additional endpoint(s) linked beyond the one \
auto-registered; a bot has a single webhook — register the others manually",
plan.provider_name, plan.extra_endpoints,
),
);
}
let resolved_token = resolve_secret_token_source(&plan, secrets).await;
let token_value = match resolved_token {
ResolvedSecretToken::Ready(ref v) => v.as_deref(),
ResolvedSecretToken::Unresolved => {
operator_log::warn(
module_path!(),
format!(
"skipping webhook registration for provider={} url={} deployment={}: \
webhook_secret_ref could not be resolved \
(provider keeps existing config until next reload)",
plan.provider_name, plan.webhook_url, plan.deployment_id,
),
);
return;
}
};
let payload = plan.payload(token_value);
let invoke = activation.host.invoke_provider_for_revision(
&plan.tenant,
plan.deployment_id,
plan.bundle_id.clone(),
plan.revision_id,
&plan.provider_type,
SETUP_WEBHOOK_OP,
payload,
None,
None,
);
match tokio::time::timeout(SETUP_WEBHOOK_TIMEOUT, invoke).await {
Err(_elapsed) => {
operator_log::warn(
module_path!(),
format!(
"webhook registration timed out after {}s: provider={} url={} \
(the provider may or may not have stored the webhook)",
SETUP_WEBHOOK_TIMEOUT.as_secs(),
plan.provider_name,
plan.webhook_url,
),
);
}
Ok(Ok(output)) => {
let ok = output.get("ok").and_then(Value::as_bool).unwrap_or(true);
if ok {
let token_source = match &plan.secret_token_source {
SecretTokenSource::WebhookSecretRef(_) => "webhook_secret_ref",
SecretTokenSource::ProviderIdFallback(_) => "provider_id",
SecretTokenSource::None => "none",
};
operator_log::info(
module_path!(),
format!(
"webhook registered: provider={} url={} deployment={} \
secret_token={} token_source={token_source}",
plan.provider_name,
plan.webhook_url,
plan.deployment_id,
token_value.is_some(),
),
);
} else {
let err = output
.get("error")
.and_then(Value::as_str)
.unwrap_or("unknown");
operator_log::warn(
module_path!(),
format!(
"webhook registration reported failure: provider={} url={} error={}",
plan.provider_name, plan.webhook_url, err,
),
);
}
}
Ok(Err(err)) => {
operator_log::debug(
module_path!(),
format!(
"setup_webhook unavailable for provider={} deployment={}: {err:#}",
plan.provider_name, plan.deployment_id,
),
);
}
}
}
#[derive(Debug, Clone, PartialEq)]
enum ResolvedSecretToken {
Ready(Option<String>),
Unresolved,
}
async fn resolve_secret_token_source(
plan: &WebhookRegistration,
secrets: &DynSecretsManager,
) -> ResolvedSecretToken {
match &plan.secret_token_source {
SecretTokenSource::None => ResolvedSecretToken::Ready(None),
SecretTokenSource::ProviderIdFallback(value) => {
ResolvedSecretToken::Ready(Some(value.clone()))
}
SecretTokenSource::WebhookSecretRef(secret_ref) => {
let uri = secret_ref_to_store_uri(secret_ref);
match secrets.read(&uri).await {
Ok(bytes) => match String::from_utf8(bytes) {
Ok(value) => ResolvedSecretToken::Ready(Some(value)),
Err(_) => {
operator_log::warn(
module_path!(),
format!(
"webhook secret at `{uri}` is not valid UTF-8; \
skipping registration for provider={} url={} \
(provider keeps existing webhook config until next reload)",
plan.provider_name, plan.webhook_url
),
);
ResolvedSecretToken::Unresolved
}
},
Err(err) => {
operator_log::warn(
module_path!(),
format!(
"could not resolve webhook secret at `{uri}`: {err}; \
skipping registration for provider={} url={} \
(provider keeps existing webhook config until next reload)",
plan.provider_name, plan.webhook_url
),
);
ResolvedSecretToken::Unresolved
}
}
}
}
}
fn plan_webhook_registrations(
routes: &[HttpRouteDescriptor],
endpoints: &[MessagingEndpoint],
tenant_by_deployment: &HashMap<DeploymentId, String>,
base: &str,
) -> Vec<WebhookRegistration> {
let base = base.trim_end_matches('/');
let mut seen: HashSet<(DeploymentId, greentic_deploy_spec::RevisionId, String)> =
HashSet::new();
let mut plans = Vec::new();
for route in routes {
if route.provider_op != INGEST_HTTP_OP {
continue;
}
let Some(scope) = route.scope.as_ref() else {
continue;
};
let Some(provider_type) = route.provider_type.as_deref() else {
continue;
};
let Some(name) = derive_provider_name(provider_type) else {
continue;
};
if !seen.insert((scope.deployment_id, scope.revision_id, name.clone())) {
continue;
}
let Some(tenant) = tenant_by_deployment.get(&scope.deployment_id) else {
continue;
};
let matching: Vec<&MessagingEndpoint> = endpoints
.iter()
.filter(|e| {
e.linked_bundles.iter().any(|b| b == &scope.bundle_id)
&& derive_provider_name(&e.provider_type).as_deref() == Some(name.as_str())
})
.collect();
let secret_token_source = match matching.first() {
None => SecretTokenSource::None,
Some(ep) => match &ep.webhook_secret_ref {
Some(ref_) => SecretTokenSource::WebhookSecretRef(ref_.clone()),
None => SecretTokenSource::ProviderIdFallback(ep.provider_id.clone()),
},
};
let instance_id = matching
.first()
.map(|e| e.endpoint_id.to_string())
.unwrap_or_else(|| scope.deployment_id.to_string());
let extra_endpoints = matching.len().saturating_sub(1);
plans.push(WebhookRegistration {
tenant: tenant.clone(),
deployment_id: scope.deployment_id,
bundle_id: scope.bundle_id.clone(),
revision_id: scope.revision_id,
provider_type: provider_type.to_string(),
provider_name: name,
webhook_url: format!("{base}{}", route.pattern),
secret_token_source,
instance_id,
extra_endpoints,
});
}
plans
}
pub(crate) fn post_reload_registration(
store_root: PathBuf,
env_id: String,
rt: tokio::runtime::Handle,
) -> impl FnMut(&Activation) + Send + 'static {
move |activation: &Activation| {
let env = match load_environment(&store_root, &env_id) {
Ok(env) => env,
Err(err) => {
operator_log::warn(
module_path!(),
format!(
"skipping webhook re-registration after reload: \
cannot load environment `{env_id}`: {err:#}"
),
);
return;
}
};
let public_base_url = resolve_public_base_url_for_reload(&env);
let activation = activation.clone();
rt.spawn(async move {
register_new_model_webhooks(&activation, &env, public_base_url.as_deref()).await;
});
}
}
fn resolve_public_base_url_for_reload(env: &Environment) -> Option<String> {
crate::startup_contract::resolve_public_base_url(env)
.ok()
.flatten()
}
fn load_environment(store_root: &Path, env_id: &str) -> Result<Environment> {
let env_store = greentic_deployer::environment::LocalFsStore::new(store_root.to_path_buf());
let env_typed = greentic_types::EnvId::new(env_id)
.with_context(|| format!("invalid environment id `{env_id}`"))?;
greentic_deployer::environment::EnvironmentStore::load(&env_store, &env_typed)
.with_context(|| format!("loading environment `{env_id}`"))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::http_routes::{RevisionScope, provider_descriptor_for_test};
use crate::test_fixtures::{
FakeSecrets, endpoint_typed as endpoint, env_with, telegram_endpoint_with_webhook_secret,
};
use greentic_deploy_spec::{BundleId, DeploymentId, RevisionId};
fn scope(deployment: DeploymentId, bundle: &str, revision: RevisionId) -> RevisionScope {
RevisionScope {
deployment_id: deployment,
bundle_id: BundleId::new(bundle),
revision_id: revision,
}
}
#[test]
fn builds_url_from_served_pattern_and_carries_endpoint_secret_token() {
let dep = DeploymentId::new();
let rev = RevisionId::new();
let routes = vec![provider_descriptor_for_test(
"/bot/webhook/telegram",
"messaging.telegram",
scope(dep, "realbot-pack", rev),
)];
let endpoints = vec![endpoint("telegram", "tg-secret-token", &["realbot-pack"])];
let eid = endpoints[0].endpoint_id.to_string();
let tenants = HashMap::from([(dep, "default".to_string())]);
let plans = plan_webhook_registrations(&routes, &endpoints, &tenants, "https://host/");
assert_eq!(plans.len(), 1);
let p = &plans[0];
assert_eq!(p.webhook_url, "https://host/bot/webhook/telegram");
assert!(matches!(
&p.secret_token_source,
SecretTokenSource::ProviderIdFallback(s) if s == "tg-secret-token"
));
assert_eq!(p.tenant, "default");
assert_eq!(p.provider_type, "messaging.telegram");
let body: Value = serde_json::from_slice(&p.payload(Some("tg-secret-token"))).unwrap();
assert_eq!(body["webhook_url"], "https://host/bot/webhook/telegram");
assert_eq!(body["secret_token"], "tg-secret-token");
assert_eq!(body["tenant"], "default");
assert_eq!(body["provider_instance_id"], eid.as_str());
}
#[test]
fn registers_url_only_when_no_endpoint_links_the_bundle() {
let dep = DeploymentId::new();
let rev = RevisionId::new();
let routes = vec![provider_descriptor_for_test(
"/bot/webhook/telegram",
"messaging.telegram",
scope(dep, "realbot-pack", rev),
)];
let endpoints = vec![endpoint("telegram", "tg-secret-token", &["other-pack"])];
let tenants = HashMap::from([(dep, "default".to_string())]);
let plans = plan_webhook_registrations(&routes, &endpoints, &tenants, "https://host");
assert_eq!(plans.len(), 1);
assert_eq!(plans[0].secret_token_source, SecretTokenSource::None);
let body: Value = serde_json::from_slice(&plans[0].payload(None)).unwrap();
assert!(body.get("secret_token").is_none());
assert_eq!(body["provider_instance_id"], dep.to_string().as_str());
}
#[test]
fn multiple_same_provider_endpoints_register_first_and_count_extras() {
let dep = DeploymentId::new();
let rev = RevisionId::new();
let routes = vec![provider_descriptor_for_test(
"/bot/webhook/telegram",
"messaging.telegram.bot",
scope(dep, "realbot-pack", rev),
)];
let endpoints = vec![
endpoint("telegram", "tok-a", &["realbot-pack"]),
endpoint("telegram", "tok-b", &["realbot-pack"]),
];
let tenants = HashMap::from([(dep, "default".to_string())]);
let plans = plan_webhook_registrations(&routes, &endpoints, &tenants, "https://host");
assert_eq!(plans.len(), 1, "one webhook per provider route");
assert!(matches!(
&plans[0].secret_token_source,
SecretTokenSource::ProviderIdFallback(s) if s == "tok-a"
));
assert_eq!(
plans[0].extra_endpoints, 1,
"second endpoint counted as extra"
);
}
fn plan_with_source(source: SecretTokenSource) -> WebhookRegistration {
WebhookRegistration {
tenant: "default".to_string(),
deployment_id: DeploymentId::new(),
bundle_id: BundleId::new("p"),
revision_id: RevisionId::new(),
provider_type: "messaging.telegram".to_string(),
provider_name: "telegram".to_string(),
webhook_url: "https://host/bot/webhook/telegram".to_string(),
secret_token_source: source,
instance_id: "instance".to_string(),
extra_endpoints: 0,
}
}
#[tokio::test]
async fn resolves_webhook_secret_ref_through_secrets_manager() {
let ref_ =
SecretRef::try_new("secret://local/default/_/messaging-abc/webhook_secret".to_string())
.unwrap();
let uri = crate::webhook_secret_resolver::secret_ref_to_store_uri(&ref_);
let secrets: DynSecretsManager = std::sync::Arc::new(FakeSecrets(HashMap::from([(
uri,
b"resolved-value".to_vec(),
)])));
let plan = plan_with_source(SecretTokenSource::WebhookSecretRef(ref_));
assert_eq!(
resolve_secret_token_source(&plan, &secrets).await,
ResolvedSecretToken::Ready(Some("resolved-value".to_string()))
);
}
#[tokio::test]
async fn resolves_provider_id_fallback_inline() {
let secrets: DynSecretsManager = std::sync::Arc::new(FakeSecrets(HashMap::new()));
let plan = plan_with_source(SecretTokenSource::ProviderIdFallback(
"legacy-tok".to_string(),
));
assert_eq!(
resolve_secret_token_source(&plan, &secrets).await,
ResolvedSecretToken::Ready(Some("legacy-tok".to_string()))
);
}
#[tokio::test]
async fn missing_webhook_secret_returns_unresolved_so_caller_skips_setup_webhook() {
let ref_ = SecretRef::try_new(
"secret://local/default/_/messaging-missing/webhook_secret".to_string(),
)
.unwrap();
let secrets: DynSecretsManager = std::sync::Arc::new(FakeSecrets(HashMap::new()));
let plan = plan_with_source(SecretTokenSource::WebhookSecretRef(ref_));
assert_eq!(
resolve_secret_token_source(&plan, &secrets).await,
ResolvedSecretToken::Unresolved
);
}
#[tokio::test]
async fn non_utf8_webhook_secret_returns_unresolved_so_caller_skips_setup_webhook() {
let ref_ =
SecretRef::try_new("secret://local/default/_/messaging-bad/webhook_secret".to_string())
.unwrap();
let uri = crate::webhook_secret_resolver::secret_ref_to_store_uri(&ref_);
let secrets: DynSecretsManager = std::sync::Arc::new(FakeSecrets(HashMap::from([(
uri,
vec![0xFF, 0xFE, 0x00], )])));
let plan = plan_with_source(SecretTokenSource::WebhookSecretRef(ref_));
assert_eq!(
resolve_secret_token_source(&plan, &secrets).await,
ResolvedSecretToken::Unresolved
);
}
#[tokio::test]
async fn no_endpoint_source_resolves_to_ready_none() {
let secrets: DynSecretsManager = std::sync::Arc::new(FakeSecrets(HashMap::new()));
let plan = plan_with_source(SecretTokenSource::None);
assert_eq!(
resolve_secret_token_source(&plan, &secrets).await,
ResolvedSecretToken::Ready(None)
);
}
#[test]
fn prefers_webhook_secret_ref_over_provider_id_when_endpoint_carries_one() {
let dep = DeploymentId::new();
let rev = RevisionId::new();
let routes = vec![provider_descriptor_for_test(
"/bot/webhook/telegram",
"messaging.telegram",
scope(dep, "realbot-pack", rev),
)];
let endpoints = vec![telegram_endpoint_with_webhook_secret(
"tg-legacy-id",
&["realbot-pack"],
)];
let tenants = HashMap::from([(dep, "default".to_string())]);
let plans = plan_webhook_registrations(&routes, &endpoints, &tenants, "https://host");
assert_eq!(plans.len(), 1);
assert!(matches!(
&plans[0].secret_token_source,
SecretTokenSource::WebhookSecretRef(_)
));
}
#[test]
fn dedups_multiple_prefixes_to_one_registration_per_provider() {
let dep = DeploymentId::new();
let rev = RevisionId::new();
let routes = vec![
provider_descriptor_for_test(
"/bot/webhook/telegram",
"messaging.telegram",
scope(dep, "p", rev),
),
provider_descriptor_for_test(
"/api/bot/webhook/telegram",
"messaging.telegram",
scope(dep, "p", rev),
),
];
let tenants = HashMap::from([(dep, "default".to_string())]);
let plans = plan_webhook_registrations(&routes, &[], &tenants, "https://host");
assert_eq!(
plans.len(),
1,
"one webhook per (deployment, revision, provider)"
);
}
#[test]
fn skips_non_ingest_routes_and_unknown_deployments() {
let dep = DeploymentId::new();
let rev = RevisionId::new();
let routes = vec![provider_descriptor_for_test(
"/bot/webhook/telegram",
"messaging.telegram",
scope(dep, "p", rev),
)];
let plans = plan_webhook_registrations(&routes, &[], &HashMap::new(), "https://host");
assert!(plans.is_empty());
}
#[test]
fn resolve_public_base_url_uses_fresh_env_not_captured_boot_value() {
let mut env_old = env_with(vec![]);
env_old.host_config.public_base_url = Some("https://old.example.com".to_string());
assert_eq!(
resolve_public_base_url_for_reload(&env_old),
Some("https://old.example.com".to_string()),
);
let mut env_new = env_with(vec![]);
env_new.host_config.public_base_url = Some("https://new.example.com".to_string());
assert_eq!(
resolve_public_base_url_for_reload(&env_new),
Some("https://new.example.com".to_string()),
"must reflect the freshly-loaded env, not a stale captured value",
);
}
}