use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RouteRow {
pub tenant_id: String,
pub sor_name: String,
pub alias: String,
pub deployment_id: String,
pub pack_name: String,
pub pack_version: String,
pub base_path: String,
pub state_namespace: String,
pub visibility: String,
pub routable: bool,
pub traffic: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingTable {
pub schema: String,
pub routes: Vec<RouteRow>,
}
pub trait RoutingTableSource: Send + Sync {
fn fetch(
&self,
sorx_base_url: &str,
tenant: Option<&str>,
sor: Option<&str>,
) -> Result<Vec<RouteRow>, String>;
}
pub trait UpstreamRegistry: Send + Sync {
fn upstream_for(&self, deployment_id: &str) -> Option<String>;
}
#[derive(Debug, Clone, Default)]
pub struct StaticUpstreamRegistry {
map: HashMap<String, String>,
}
impl StaticUpstreamRegistry {
pub fn new(map: HashMap<String, String>) -> Self {
Self { map }
}
}
impl UpstreamRegistry for StaticUpstreamRegistry {
fn upstream_for(&self, deployment_id: &str) -> Option<String> {
self.map.get(deployment_id).cloned()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ScopeKey {
tenant: Option<String>,
sor: Option<String>,
}
struct CacheEntry {
rows: Vec<RouteRow>,
fetched_at: Instant,
}
pub struct AliasResolver {
source: Box<dyn RoutingTableSource>,
ttl: Duration,
cache: Mutex<HashMap<ScopeKey, CacheEntry>>,
}
impl AliasResolver {
pub fn new(source: Box<dyn RoutingTableSource>, ttl: Duration) -> Self {
Self {
source,
ttl,
cache: Mutex::new(HashMap::new()),
}
}
pub fn resolve(
&self,
sorx_base_url: &str,
tenant: &str,
sor: &str,
alias: &str,
) -> Option<RouteRow> {
let rows = self.rows_for_scope(sorx_base_url, tenant, sor);
select_routable(&rows, tenant, sor, alias)
}
fn rows_for_scope(&self, sorx_base_url: &str, tenant: &str, sor: &str) -> Vec<RouteRow> {
let key = ScopeKey {
tenant: Some(tenant.to_string()),
sor: Some(sor.to_string()),
};
let mut cache = match self.cache.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
let fresh = cache
.get(&key)
.map(|entry| entry.fetched_at.elapsed() < self.ttl)
.unwrap_or(false);
if fresh {
if let Some(entry) = cache.get(&key) {
return entry.rows.clone();
}
}
match self.source.fetch(sorx_base_url, Some(tenant), Some(sor)) {
Ok(rows) => {
cache.insert(
key,
CacheEntry {
rows: rows.clone(),
fetched_at: Instant::now(),
},
);
rows
}
Err(err) => {
if let Some(entry) = cache.get(&key) {
tracing::warn!(
error = %err,
tenant,
sor,
"sorx routing-table refetch failed; serving stale cache"
);
entry.rows.clone()
} else {
tracing::warn!(
error = %err,
tenant,
sor,
"sorx routing-table fetch failed and no cache available"
);
Vec::new()
}
}
}
}
}
fn select_routable(rows: &[RouteRow], tenant: &str, sor: &str, alias: &str) -> Option<RouteRow> {
rows.iter()
.find(|r| r.routable && r.tenant_id == tenant && r.sor_name == sor && r.alias == alias)
.cloned()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProxyOutcome {
Forward {
upstream: String,
rewritten_path: String,
deployment_id: String,
},
NotFound { status: u16, reason: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RequestPath {
pub tenant: String,
pub sor: String,
pub alias: String,
pub rest: String,
}
pub fn parse_request_path(path: &str) -> Option<RequestPath> {
let path = path.split('?').next().unwrap_or(path);
let trimmed = path.trim_start_matches('/');
let mut segments = trimmed.splitn(4, '/');
let tenant = segments.next().filter(|s| !s.is_empty())?;
let sor = segments.next().filter(|s| !s.is_empty())?;
let alias = segments.next().filter(|s| !s.is_empty())?;
let rest = segments.next().unwrap_or("");
Some(RequestPath {
tenant: tenant.to_string(),
sor: sor.to_string(),
alias: alias.to_string(),
rest: rest.to_string(),
})
}
fn rewrite_path(base_path: &str, rest: &str) -> String {
let base = base_path.trim_matches('/');
let rest = rest.trim_matches('/');
match (base.is_empty(), rest.is_empty()) {
(true, true) => "/".to_string(),
(true, false) => format!("/{rest}"),
(false, true) => format!("/{base}"),
(false, false) => format!("/{base}/{rest}"),
}
}
pub fn route_request(
resolver: &AliasResolver,
upstreams: &dyn UpstreamRegistry,
sorx_base_url: &str,
_method: &str,
path: &str,
_headers: &[(String, String)],
_body: &[u8],
) -> ProxyOutcome {
let parsed = match parse_request_path(path) {
Some(p) => p,
None => {
return ProxyOutcome::NotFound {
status: 400,
reason: format!(
"malformed path {path:?}: expected /{{tenant}}/{{sor}}/{{alias}}/{{rest}}"
),
};
}
};
let row = match resolver.resolve(sorx_base_url, &parsed.tenant, &parsed.sor, &parsed.alias) {
Some(row) => row,
None => {
return ProxyOutcome::NotFound {
status: 404,
reason: format!(
"no routable deployment for tenant={} sor={} alias={}",
parsed.tenant, parsed.sor, parsed.alias
),
};
}
};
let upstream = match upstreams.upstream_for(&row.deployment_id) {
Some(addr) => addr,
None => {
return ProxyOutcome::NotFound {
status: 503,
reason: format!(
"no live upstream registered for deployment_id={} \
(instance not spawned/healthy yet)",
row.deployment_id
),
};
}
};
ProxyOutcome::Forward {
upstream,
rewritten_path: rewrite_path(&row.base_path, &parsed.rest),
deployment_id: row.deployment_id,
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RouteDecision {
pub method: String,
pub path: String,
pub outcome: RouteDecisionOutcome,
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum RouteDecisionOutcome {
Forward {
upstream: String,
rewritten_path: String,
deployment_id: String,
},
NotFound {
status: u16,
reason: String,
},
}
pub fn describe_request(
resolver: &AliasResolver,
upstreams: &dyn UpstreamRegistry,
sorx_base_url: &str,
method: &str,
path: &str,
) -> RouteDecision {
let outcome = match route_request(resolver, upstreams, sorx_base_url, method, path, &[], &[]) {
ProxyOutcome::Forward {
upstream,
rewritten_path,
deployment_id,
} => RouteDecisionOutcome::Forward {
upstream,
rewritten_path,
deployment_id,
},
ProxyOutcome::NotFound { status, reason } => {
RouteDecisionOutcome::NotFound { status, reason }
}
};
RouteDecision {
method: method.to_string(),
path: path.to_string(),
outcome,
}
}
pub struct HttpRoutingTableSource {
client: reqwest::blocking::Client,
}
impl HttpRoutingTableSource {
pub fn new() -> Result<Self, String> {
let client = reqwest::blocking::Client::builder()
.connect_timeout(Duration::from_secs(3))
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| format!("failed to build http client: {e}"))?;
Ok(Self { client })
}
}
impl RoutingTableSource for HttpRoutingTableSource {
fn fetch(
&self,
sorx_base_url: &str,
tenant: Option<&str>,
sor: Option<&str>,
) -> Result<Vec<RouteRow>, String> {
let base = sorx_base_url.trim_end_matches('/');
let mut url = format!("{base}/v1/sorx/routing-table");
let mut params: Vec<(&str, &str)> = Vec::new();
if let Some(t) = tenant {
params.push(("tenant", t));
}
if let Some(s) = sor {
params.push(("sor", s));
}
let mut request = self.client.get(&url);
if !params.is_empty() {
request = request.query(¶ms);
}
url = format!("{url}{}", render_query_suffix(¶ms));
let response = request
.send()
.map_err(|e| format!("GET {url} failed: {e}"))?;
let status = response.status();
if !status.is_success() {
let body = response.text().unwrap_or_default();
return Err(format!("GET {url} returned {status}: {body}"));
}
let table: RoutingTable = response
.json()
.map_err(|e| format!("GET {url} returned undecodable routing-table: {e}"))?;
Ok(table.routes)
}
}
fn render_query_suffix(params: &[(&str, &str)]) -> String {
if params.is_empty() {
return String::new();
}
let joined = params
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("&");
format!("?{joined}")
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
fn row(alias: &str, deployment_id: &str, routable: bool, base_path: &str) -> RouteRow {
RouteRow {
tenant_id: "acme".to_string(),
sor_name: "customer".to_string(),
alias: alias.to_string(),
deployment_id: deployment_id.to_string(),
pack_name: "pack".to_string(),
pack_version: "1.0.0".to_string(),
base_path: base_path.to_string(),
state_namespace: "ns".to_string(),
visibility: "public".to_string(),
routable,
traffic: 100,
}
}
struct FakeSource {
rows: Vec<RouteRow>,
calls: Arc<AtomicUsize>,
}
impl RoutingTableSource for FakeSource {
fn fetch(
&self,
_url: &str,
_tenant: Option<&str>,
_sor: Option<&str>,
) -> Result<Vec<RouteRow>, String> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(self.rows.clone())
}
}
struct FlakySource {
rows: Vec<RouteRow>,
calls: Arc<AtomicUsize>,
}
impl RoutingTableSource for FlakySource {
fn fetch(
&self,
_url: &str,
_tenant: Option<&str>,
_sor: Option<&str>,
) -> Result<Vec<RouteRow>, String> {
let n = self.calls.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Ok(self.rows.clone())
} else {
Err("sorx unreachable".to_string())
}
}
}
fn upstreams(pairs: &[(&str, &str)]) -> StaticUpstreamRegistry {
StaticUpstreamRegistry::new(
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
)
}
#[test]
fn resolve_returns_routable_deployment() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![
row("v1", "dep-old", false, "/sor/customer"),
row("v1", "dep-new", true, "/sor/customer"),
row("v2", "dep-x", false, "/sor/customer"),
],
calls: calls.clone(),
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_secs(60));
let resolved = resolver
.resolve("http://sorx", "acme", "customer", "v1")
.expect("v1 should resolve to the routable deployment");
assert_eq!(resolved.deployment_id, "dep-new");
assert!(resolved.routable);
assert!(
resolver
.resolve("http://sorx", "acme", "customer", "v2")
.is_none()
);
assert!(
resolver
.resolve("http://sorx", "acme", "customer", "nope")
.is_none()
);
}
#[test]
fn resolve_ttl_refetch() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![row("v1", "dep-1", true, "/sor/customer")],
calls: calls.clone(),
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_millis(30));
resolver.resolve("http://sorx", "acme", "customer", "v1");
resolver.resolve("http://sorx", "acme", "customer", "v1");
assert_eq!(calls.load(Ordering::SeqCst), 1);
std::thread::sleep(Duration::from_millis(50));
resolver.resolve("http://sorx", "acme", "customer", "v1");
assert_eq!(calls.load(Ordering::SeqCst), 2);
}
#[test]
fn resolve_stale_on_error() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FlakySource {
rows: vec![row("v1", "dep-1", true, "/sor/customer")],
calls: calls.clone(),
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_millis(10));
let first = resolver.resolve("http://sorx", "acme", "customer", "v1");
assert_eq!(first.expect("first resolve ok").deployment_id, "dep-1");
std::thread::sleep(Duration::from_millis(20));
let stale = resolver.resolve("http://sorx", "acme", "customer", "v1");
assert_eq!(stale.expect("stale resolve ok").deployment_id, "dep-1");
assert_eq!(calls.load(Ordering::SeqCst), 2);
}
#[test]
fn resolve_no_cache_and_error_returns_none() {
let calls = Arc::new(AtomicUsize::new(0));
struct AlwaysErr(Arc<AtomicUsize>);
impl RoutingTableSource for AlwaysErr {
fn fetch(
&self,
_u: &str,
_t: Option<&str>,
_s: Option<&str>,
) -> Result<Vec<RouteRow>, String> {
self.0.fetch_add(1, Ordering::SeqCst);
Err("down".to_string())
}
}
let resolver =
AliasResolver::new(Box::new(AlwaysErr(calls.clone())), Duration::from_secs(60));
assert!(
resolver
.resolve("http://sorx", "acme", "customer", "v1")
.is_none()
);
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[test]
fn route_request_forwards_to_upstream() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![row("v1", "dep-1", true, "/sor/customer")],
calls,
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_secs(60));
let reg = upstreams(&[("dep-1", "127.0.0.1:8088")]);
let outcome = route_request(
&resolver,
®,
"http://sorx",
"GET",
"/acme/customer/v1/orders/42",
&[],
&[],
);
match outcome {
ProxyOutcome::Forward {
upstream,
rewritten_path,
deployment_id,
} => {
assert_eq!(upstream, "127.0.0.1:8088");
assert_eq!(rewritten_path, "/sor/customer/orders/42");
assert_eq!(deployment_id, "dep-1");
}
other => panic!("expected Forward, got {other:?}"),
}
}
#[test]
fn route_request_forwards_alias_root_under_base_path() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![row("v1", "dep-1", true, "/sor/customer")],
calls,
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_secs(60));
let reg = upstreams(&[("dep-1", "127.0.0.1:8088")]);
let outcome = route_request(
&resolver,
®,
"http://sorx",
"GET",
"/acme/customer/v1",
&[],
&[],
);
match outcome {
ProxyOutcome::Forward { rewritten_path, .. } => {
assert_eq!(rewritten_path, "/sor/customer");
}
other => panic!("expected Forward, got {other:?}"),
}
}
#[test]
fn route_request_unresolved_alias_404() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![row("v1", "dep-1", false, "/sor/customer")], calls,
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_secs(60));
let reg = upstreams(&[("dep-1", "127.0.0.1:8088")]);
let outcome = route_request(
&resolver,
®,
"http://sorx",
"GET",
"/acme/customer/v1/orders",
&[],
&[],
);
match outcome {
ProxyOutcome::NotFound { status, .. } => assert_eq!(status, 404),
other => panic!("expected 404 NotFound, got {other:?}"),
}
}
#[test]
fn route_request_no_upstream_503() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![row("v1", "dep-1", true, "/sor/customer")],
calls,
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_secs(60));
let reg = upstreams(&[("dep-other", "127.0.0.1:9999")]);
let outcome = route_request(
&resolver,
®,
"http://sorx",
"GET",
"/acme/customer/v1/orders",
&[],
&[],
);
match outcome {
ProxyOutcome::NotFound { status, .. } => assert_eq!(status, 503),
other => panic!("expected 503 NotFound, got {other:?}"),
}
}
#[test]
fn route_request_rejects_malformed_path() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![row("v1", "dep-1", true, "/sor/customer")],
calls,
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_secs(60));
let reg = upstreams(&[("dep-1", "127.0.0.1:8088")]);
for bad in ["/", "/acme", "/acme/customer", ""] {
let outcome = route_request(&resolver, ®, "http://sorx", "GET", bad, &[], &[]);
match outcome {
ProxyOutcome::NotFound { status, .. } => {
assert_eq!(status, 400, "path {bad:?} should be 400")
}
other => panic!("expected 400 for {bad:?}, got {other:?}"),
}
}
}
#[test]
fn parse_request_path_variants() {
let p = parse_request_path("/acme/customer/v1/orders/42").unwrap();
assert_eq!(p.tenant, "acme");
assert_eq!(p.sor, "customer");
assert_eq!(p.alias, "v1");
assert_eq!(p.rest, "orders/42");
let p = parse_request_path("/acme/customer/v1").unwrap();
assert_eq!(p.rest, "");
let p = parse_request_path("/acme/customer/v1/orders?page=2").unwrap();
assert_eq!(p.rest, "orders");
assert!(parse_request_path("/acme/customer").is_none());
}
#[test]
fn rewrite_path_normalizes_slashes() {
assert_eq!(
rewrite_path("/sor/customer", "orders/42"),
"/sor/customer/orders/42"
);
assert_eq!(
rewrite_path("/sor/customer/", "/orders"),
"/sor/customer/orders"
);
assert_eq!(rewrite_path("/sor/customer", ""), "/sor/customer");
assert_eq!(rewrite_path("", "orders"), "/orders");
assert_eq!(rewrite_path("", ""), "/");
}
#[test]
fn describe_request_serializes_forward() {
let calls = Arc::new(AtomicUsize::new(0));
let source = FakeSource {
rows: vec![row("v1", "dep-1", true, "/sor/customer")],
calls,
};
let resolver = AliasResolver::new(Box::new(source), Duration::from_secs(60));
let reg = upstreams(&[("dep-1", "127.0.0.1:8088")]);
let decision = describe_request(
&resolver,
®,
"http://sorx",
"GET",
"/acme/customer/v1/orders",
);
let json = serde_json::to_value(&decision).unwrap();
assert_eq!(json["outcome"]["kind"], "forward");
assert_eq!(json["outcome"]["upstream"], "127.0.0.1:8088");
assert_eq!(json["outcome"]["rewritten_path"], "/sor/customer/orders");
}
#[test]
fn routing_table_deserializes_with_unknown_fields() {
let raw = r#"{
"schema": "sorx.routing-table.v1",
"routes": [
{
"tenant_id": "acme",
"sor_name": "customer",
"alias": "v1",
"deployment_id": "dep-1",
"pack_name": "pack",
"pack_version": "1.0.0",
"base_path": "/sor/customer",
"state_namespace": "ns",
"visibility": "public",
"routable": true,
"traffic": 100,
"future_field": "ignored"
}
]
}"#;
let table: RoutingTable = serde_json::from_str(raw).unwrap();
assert_eq!(table.routes.len(), 1);
assert_eq!(table.routes[0].deployment_id, "dep-1");
}
}