use std::collections::HashMap;
use std::sync::OnceLock;
use std::time::Duration;
use crate::crypto::SealedEnvelope;
use crate::error::{AppError, AppResult};
pub fn registry() -> &'static BrokerRegistry {
static R: OnceLock<BrokerRegistry> = OnceLock::new();
R.get_or_init(BrokerRegistry::from_env)
}
#[derive(Debug, Clone, Default)]
pub struct BrokerRegistry {
inner: HashMap<String, String>,
}
impl BrokerRegistry {
pub fn from_env() -> Self {
let raw = match std::env::var("NOETL_SECRET_BROKER_REGISTRY") {
Ok(s) if !s.is_empty() => s,
_ => return Self::default(),
};
let parsed: HashMap<String, String> = match serde_json::from_str(&raw) {
Ok(m) => m,
Err(e) => {
tracing::warn!(
error = %e,
"NOETL_SECRET_BROKER_REGISTRY: failed to parse as JSON object; treating as empty"
);
return Self::default();
}
};
let inner = parsed.into_iter().filter(|(_, v)| !v.is_empty()).collect();
Self { inner }
}
#[cfg(test)]
pub fn from_map(inner: HashMap<String, String>) -> Self {
Self { inner }
}
pub fn broker_for(&self, region: &str) -> Option<&str> {
self.inner.get(region).map(|s| s.as_str())
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.inner.len()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CrossRegionResolveRequest {
pub alias: String,
pub worker_public_key_b64: String,
pub worker_id: String,
#[serde(default)]
pub execution_id: Option<i64>,
#[serde(default)]
pub parent_execution_id: Option<i64>,
pub expected_entry_region: String,
#[serde(default)]
pub requesting_region: String,
}
#[derive(Debug, Clone)]
pub struct BrokerClient {
http: reqwest::Client,
#[allow(dead_code)]
timeout: Duration,
}
impl BrokerClient {
pub fn new() -> AppResult<Self> {
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(default_timeout_secs()))
.build()
.map_err(|e| AppError::Config(format!("cross-region broker: build client: {e}")))?;
Ok(Self {
http,
timeout: Duration::from_secs(default_timeout_secs()),
})
}
pub async fn resolve(
&self,
broker_url: &str,
body: &CrossRegionResolveRequest,
) -> AppResult<SealedEnvelope> {
let url = format!(
"{}/api/internal/cross-region/resolve",
broker_url.trim_end_matches('/')
);
let resp = self.http.post(&url).json(body).send().await.map_err(|e| {
AppError::CrossRegionUnreachable {
broker_url: broker_url.to_string(),
cause: e.to_string(),
}
})?;
let status = resp.status();
if !status.is_success() {
let body_snippet = resp
.text()
.await
.unwrap_or_default()
.chars()
.take(200)
.collect::<String>();
return Err(AppError::CrossRegionUnreachable {
broker_url: broker_url.to_string(),
cause: format!("broker returned HTTP {status}: {body_snippet}"),
});
}
resp.json::<SealedEnvelope>()
.await
.map_err(|e| AppError::CrossRegionUnreachable {
broker_url: broker_url.to_string(),
cause: format!("decode broker response: {e}"),
})
}
#[cfg(test)]
pub fn timeout(&self) -> Duration {
self.timeout
}
}
fn default_timeout_secs() -> u64 {
std::env::var("NOETL_SECRET_BROKER_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|n| *n > 0)
.unwrap_or(10)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn registry_default_is_empty() {
let r = BrokerRegistry::default();
assert_eq!(r.len(), 0);
assert!(r.broker_for("eu-central-1").is_none());
}
#[test]
fn registry_from_map_lookup() {
let mut m = HashMap::new();
m.insert("eu-central-1".to_string(), "https://eu.example".to_string());
m.insert("ap-south-1".to_string(), "https://ap.example".to_string());
let r = BrokerRegistry::from_map(m);
assert_eq!(r.len(), 2);
assert_eq!(r.broker_for("eu-central-1"), Some("https://eu.example"));
assert_eq!(r.broker_for("ap-south-1"), Some("https://ap.example"));
assert!(r.broker_for("us-east-1").is_none());
}
#[test]
fn registry_from_env_parses_json() {
let saved = std::env::var("NOETL_SECRET_BROKER_REGISTRY").ok();
unsafe {
std::env::set_var(
"NOETL_SECRET_BROKER_REGISTRY",
r#"{"eu":"https://eu.example","ap":"https://ap.example"}"#,
)
};
let r = BrokerRegistry::from_env();
assert_eq!(r.len(), 2);
assert_eq!(r.broker_for("eu"), Some("https://eu.example"));
match saved {
Some(v) => unsafe { std::env::set_var("NOETL_SECRET_BROKER_REGISTRY", v) },
None => unsafe { std::env::remove_var("NOETL_SECRET_BROKER_REGISTRY") },
}
}
#[test]
fn registry_from_env_empty_when_unset() {
let saved = std::env::var("NOETL_SECRET_BROKER_REGISTRY").ok();
unsafe { std::env::remove_var("NOETL_SECRET_BROKER_REGISTRY") };
let r = BrokerRegistry::from_env();
assert_eq!(r.len(), 0);
if let Some(v) = saved {
unsafe { std::env::set_var("NOETL_SECRET_BROKER_REGISTRY", v) };
}
}
#[test]
fn registry_from_env_empty_when_invalid_json() {
let saved = std::env::var("NOETL_SECRET_BROKER_REGISTRY").ok();
unsafe { std::env::set_var("NOETL_SECRET_BROKER_REGISTRY", "not-json") };
let r = BrokerRegistry::from_env();
assert_eq!(r.len(), 0);
match saved {
Some(v) => unsafe { std::env::set_var("NOETL_SECRET_BROKER_REGISTRY", v) },
None => unsafe { std::env::remove_var("NOETL_SECRET_BROKER_REGISTRY") },
}
}
#[test]
fn registry_drops_empty_values() {
let saved = std::env::var("NOETL_SECRET_BROKER_REGISTRY").ok();
unsafe {
std::env::set_var(
"NOETL_SECRET_BROKER_REGISTRY",
r#"{"eu":"","ap":"https://ap.example"}"#,
)
};
let r = BrokerRegistry::from_env();
assert_eq!(r.len(), 1);
assert!(r.broker_for("eu").is_none());
assert_eq!(r.broker_for("ap"), Some("https://ap.example"));
match saved {
Some(v) => unsafe { std::env::set_var("NOETL_SECRET_BROKER_REGISTRY", v) },
None => unsafe { std::env::remove_var("NOETL_SECRET_BROKER_REGISTRY") },
}
}
#[test]
fn broker_client_builds() {
let _c = BrokerClient::new().expect("client builds");
}
#[test]
fn broker_url_trailing_slash_handled() {
let raw = "https://eu.example/";
let url = format!(
"{}/api/internal/cross-region/resolve",
raw.trim_end_matches('/')
);
assert_eq!(url, "https://eu.example/api/internal/cross-region/resolve");
}
}