use crate::config::{EcpdsConfig, PartialOutagePolicy};
use futures::future::join_all;
use serde::Deserialize;
use std::collections::HashSet;
use thiserror::Error;
use tracing::{debug, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FetchOutcome {
Success,
Unauthorized,
Forbidden,
ClientError,
ServerError,
InvalidResponse,
Unreachable,
}
impl FetchOutcome {
pub fn label(&self) -> &'static str {
match self {
Self::Success => "success",
Self::Unauthorized => "http_401",
Self::Forbidden => "http_403",
Self::ClientError => "http_4xx",
Self::ServerError => "http_5xx",
Self::InvalidResponse => "invalid_response",
Self::Unreachable => "unreachable",
}
}
fn pessimistic_max(self, other: Self) -> Self {
fn rank(o: FetchOutcome) -> u8 {
match o {
FetchOutcome::Unauthorized => 6,
FetchOutcome::Forbidden => 5,
FetchOutcome::ClientError => 4,
FetchOutcome::InvalidResponse => 3,
FetchOutcome::ServerError => 2,
FetchOutcome::Unreachable => 1,
FetchOutcome::Success => 0,
}
}
if rank(other) > rank(self) {
other
} else {
self
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DenyReason {
DestinationNotInList,
MatchKeyMissing,
}
impl DenyReason {
pub fn label(&self) -> &'static str {
match self {
Self::DestinationNotInList => "deny_destination",
Self::MatchKeyMissing => "deny_match_key_missing",
}
}
}
#[derive(Debug, Error)]
pub enum EcpdsError {
#[error("ECPDS service is inaccessible ({fetch_outcome:?})")]
ServiceUnavailable {
fetch_outcome: FetchOutcome,
},
#[error("Access denied: {message}")]
AccessDenied {
reason: DenyReason,
message: String,
},
#[error("Invalid response from ECPDS server {server_index}: {message}")]
InvalidResponse {
server_index: usize,
message: String,
},
#[error("Invalid ECPDS server URL '{server}': {source}")]
InvalidServerUrl {
server: String,
#[source]
source: url::ParseError,
},
#[error("HTTP client construction failed: {0}")]
HttpClientBuild(#[source] reqwest::Error),
#[error("HTTP request to ECPDS server {server_index} failed: {message}")]
Http {
server_index: usize,
status: Option<u16>,
message: String,
},
}
impl EcpdsError {
pub fn fetch_outcome(&self) -> FetchOutcome {
match self {
Self::Http { status, .. } => match status {
Some(401) => FetchOutcome::Unauthorized,
Some(403) => FetchOutcome::Forbidden,
Some(s) if (400..500).contains(s) => FetchOutcome::ClientError,
Some(s) if (500..600).contains(s) => FetchOutcome::ServerError,
Some(_) => FetchOutcome::ServerError,
None => FetchOutcome::Unreachable,
},
Self::InvalidResponse { .. } => FetchOutcome::InvalidResponse,
Self::ServiceUnavailable { fetch_outcome } => *fetch_outcome,
Self::AccessDenied { .. }
| Self::InvalidServerUrl { .. }
| Self::HttpClientBuild(_) => FetchOutcome::Unreachable,
}
}
pub fn deny_reason(&self) -> Option<DenyReason> {
match self {
Self::AccessDenied { reason, .. } => Some(*reason),
_ => None,
}
}
}
#[derive(Deserialize)]
struct EcpdsResponse {
#[serde(rename = "destinationList")]
destination_list: Vec<serde_json::Value>,
success: String,
}
#[derive(Debug)]
pub struct EcpdsClient {
http: reqwest::Client,
servers: Vec<reqwest::Url>,
username: String,
password: String,
target_field: String,
partial_outage_policy: PartialOutagePolicy,
}
impl EcpdsClient {
pub fn new(config: &EcpdsConfig) -> Result<Self, EcpdsError> {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(
config.request_timeout_seconds,
))
.connect_timeout(std::time::Duration::from_secs(
config.connect_timeout_seconds,
))
.build()
.map_err(EcpdsError::HttpClientBuild)?;
let servers = config
.servers
.iter()
.map(|s| {
reqwest::Url::parse(s).map_err(|source| EcpdsError::InvalidServerUrl {
server: s.clone(),
source,
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Self {
http,
servers,
username: config.username.clone(),
password: config.password.clone(),
target_field: config.target_field.clone(),
partial_outage_policy: config.partial_outage_policy,
})
}
pub async fn fetch_user_destinations(
&self,
username: &str,
) -> Result<(HashSet<String>, FetchOutcome), EcpdsError> {
if self.servers.is_empty() {
return Err(EcpdsError::ServiceUnavailable {
fetch_outcome: FetchOutcome::Unreachable,
});
}
let futures = self
.servers
.iter()
.enumerate()
.map(|(i, server)| self.fetch_from_server(i, server, username));
let results: Vec<Result<Vec<String>, EcpdsError>> = join_all(futures).await;
for (i, result) in results.iter().enumerate() {
match result {
Ok(_) => debug!(
service_name = crate::service_name(),
service_version = crate::service_version(),
event_name = "auth.ecpds.fetch.succeeded",
server_index = i,
server = %self.servers[i],
username,
"ECPDS server fetch succeeded"
),
Err(e) => warn!(
service_name = crate::service_name(),
service_version = crate::service_version(),
event_name = "auth.ecpds.fetch.failed",
server_index = i,
server = %self.servers[i],
username,
error = %e,
"ECPDS server fetch failed"
),
}
}
match self.partial_outage_policy {
PartialOutagePolicy::Strict => Self::merge_strict(results),
PartialOutagePolicy::AnySuccess => Self::merge_any_success(results),
}
}
fn merge_strict(
results: Vec<Result<Vec<String>, EcpdsError>>,
) -> Result<(HashSet<String>, FetchOutcome), EcpdsError> {
if results.is_empty() {
return Err(EcpdsError::ServiceUnavailable {
fetch_outcome: FetchOutcome::Unreachable,
});
}
let mut union: HashSet<String> = HashSet::new();
let mut worst_failure: Option<FetchOutcome> = None;
for result in results {
match result {
Ok(dests) => union.extend(dests),
Err(e) => {
let outcome = e.fetch_outcome();
worst_failure = Some(match worst_failure {
None => outcome,
Some(prev) => prev.pessimistic_max(outcome),
});
}
}
}
if let Some(fetch_outcome) = worst_failure {
return Err(EcpdsError::ServiceUnavailable { fetch_outcome });
}
Ok((union, FetchOutcome::Success))
}
fn merge_any_success(
results: Vec<Result<Vec<String>, EcpdsError>>,
) -> Result<(HashSet<String>, FetchOutcome), EcpdsError> {
let mut union: HashSet<String> = HashSet::new();
let mut any_success = false;
let mut worst_failure: Option<FetchOutcome> = None;
for result in results {
match result {
Ok(dests) => {
any_success = true;
union.extend(dests);
}
Err(e) => {
let outcome = e.fetch_outcome();
worst_failure = Some(match worst_failure {
None => outcome,
Some(prev) => prev.pessimistic_max(outcome),
});
}
}
}
if !any_success {
return Err(EcpdsError::ServiceUnavailable {
fetch_outcome: worst_failure.unwrap_or(FetchOutcome::Unreachable),
});
}
let merged_outcome = worst_failure.unwrap_or(FetchOutcome::Success);
Ok((union, merged_outcome))
}
fn build_request_url(base: &reqwest::Url, username: &str) -> Result<reqwest::Url, String> {
let mut url = base.clone();
url.path_segments_mut()
.map_err(|()| format!("server URL '{base}' cannot be a base"))?
.pop_if_empty()
.extend(["ecpds", "v1", "destination", "list"]);
url.query_pairs_mut().append_pair("id", username);
Ok(url)
}
async fn fetch_from_server(
&self,
server_index: usize,
server: &reqwest::Url,
username: &str,
) -> Result<Vec<String>, EcpdsError> {
let url =
Self::build_request_url(server, username).map_err(|message| EcpdsError::Http {
server_index,
status: None,
message,
})?;
let response = self
.http
.get(url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| EcpdsError::Http {
server_index,
status: None,
message: e.to_string(),
})?;
let status = response.status();
if !status.is_success() {
return Err(EcpdsError::Http {
server_index,
status: Some(status.as_u16()),
message: format!("HTTP {status}"),
});
}
let ecpds_resp: EcpdsResponse =
response
.json()
.await
.map_err(|e| EcpdsError::InvalidResponse {
server_index,
message: e.to_string(),
})?;
if ecpds_resp.success != "yes" {
return Err(EcpdsError::InvalidResponse {
server_index,
message: format!(
"ECPDS reported success={:?} (expected \"yes\"); \
treating as upstream failure",
ecpds_resp.success
),
});
}
let total = ecpds_resp.destination_list.len();
let mut skipped_inactive = 0usize;
let mut skipped_missing_field = 0usize;
let destinations: Vec<String> = ecpds_resp
.destination_list
.into_iter()
.filter_map(|record| {
if record.get("active").and_then(|v| v.as_bool()) != Some(true) {
skipped_inactive += 1;
return None;
}
let extracted = record
.get(&self.target_field)
.and_then(|v| v.as_str())
.map(String::from);
if extracted.is_none() {
skipped_missing_field += 1;
}
extracted
})
.collect();
if skipped_inactive > 0 {
debug!(
service_name = crate::service_name(),
service_version = crate::service_version(),
event_name = "auth.ecpds.fetch.skipped_inactive",
server_index,
server = %server,
username,
skipped = skipped_inactive,
total,
"ECPDS records with active!=true were skipped"
);
}
if skipped_missing_field > 0 {
debug!(
service_name = crate::service_name(),
service_version = crate::service_version(),
event_name = "auth.ecpds.fetch.skipped_record",
server_index,
server = %server,
username,
target_field = %self.target_field,
skipped = skipped_missing_field,
total,
"ECPDS records missing target_field were skipped"
);
}
Ok(destinations)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{EcpdsConfig, PartialOutagePolicy};
fn make_config(servers: Vec<String>) -> EcpdsConfig {
EcpdsConfig {
username: "testuser".to_string(),
password: "testpass".to_string(),
target_field: "name".to_string(),
match_key: "destination".to_string(),
cache_ttl_seconds: 300,
max_entries: 1000,
request_timeout_seconds: 30,
connect_timeout_seconds: 5,
partial_outage_policy: PartialOutagePolicy::Strict,
servers,
}
}
#[test]
fn skipped_record_events_pin_debug_level_in_source() {
let src = include_str!("client.rs");
for event_name in [
"auth.ecpds.fetch.skipped_inactive",
"auth.ecpds.fetch.skipped_record",
] {
let needle = format!("event_name = \"{event_name}\"");
let event_idx = src
.find(&needle)
.unwrap_or_else(|| panic!("event {event_name:?} not found in source"));
let window_start = event_idx.saturating_sub(256);
let window = &src[window_start..event_idx];
let macro_name = ["debug!", "info!", "warn!", "error!", "trace!"]
.iter()
.filter_map(|m| window.rfind(m).map(|pos| (*m, pos)))
.max_by_key(|(_, pos)| *pos)
.map(|(m, _)| m)
.unwrap_or_else(|| {
panic!("no tracing macro found in 256 bytes before {event_name:?}")
});
assert_eq!(
macro_name, "debug!",
"event {event_name:?} must use debug! (PR #86 Phase 3 contract)",
);
}
}
#[tokio::test]
async fn fetch_parses_destination_names() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"CIP","active":true},{"name":"FOO","active":true}],"success":"yes"}"#)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, outcome) = client.fetch_user_destinations("testuser").await.unwrap();
mock.assert_async().await;
assert!(result.contains("CIP"));
assert!(result.contains("FOO"));
assert_eq!(outcome, FetchOutcome::Success);
}
#[tokio::test]
async fn any_success_policy_merges_and_deduplicates_multi_server() {
let mut server_a = mockito::Server::new_async().await;
let mut server_b = mockito::Server::new_async().await;
server_a
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"CIP","active":true},{"name":"FOO","active":true}],"success":"yes"}"#)
.create_async()
.await;
server_b
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"FOO","active":true},{"name":"BAR","active":true}],"success":"yes"}"#)
.create_async()
.await;
let mut config = make_config(vec![server_a.url(), server_b.url()]);
config.partial_outage_policy = PartialOutagePolicy::AnySuccess;
let client = EcpdsClient::new(&config).expect("client must build");
let (result, outcome) = client.fetch_user_destinations("testuser").await.unwrap();
let mut sorted: Vec<String> = result.into_iter().collect();
sorted.sort();
assert_eq!(sorted, vec!["BAR", "CIP", "FOO"]);
assert_eq!(outcome, FetchOutcome::Success);
}
#[tokio::test]
async fn fetch_returns_service_unavailable_when_all_servers_down() {
let config = make_config(vec!["http://localhost:1".to_string()]);
let client = EcpdsClient::new(&config).expect("client must build");
let result = client.fetch_user_destinations("testuser").await;
assert!(matches!(result, Err(EcpdsError::ServiceUnavailable { .. })));
}
#[tokio::test]
async fn any_success_policy_succeeds_when_one_server_is_down() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"CIP","active":true}],"success":"yes"}"#)
.create_async()
.await;
let mut config = make_config(vec!["http://localhost:1".to_string(), server.url()]);
config.partial_outage_policy = PartialOutagePolicy::AnySuccess;
let client = EcpdsClient::new(&config).expect("client must build");
let (result, outcome) = client.fetch_user_destinations("testuser").await.unwrap();
assert!(result.contains("CIP"));
assert_eq!(
outcome,
FetchOutcome::Unreachable,
"partial outage must surface the worst per-server failure outcome, \
not a synthetic Success that hides the down server from metrics"
);
}
#[tokio::test]
async fn strict_policy_fails_when_one_server_is_down() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"CIP","active":true}],"success":"yes"}"#)
.create_async()
.await;
let config = make_config(vec!["http://localhost:1".to_string(), server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("strict policy must fail when any server is unreachable");
assert!(matches!(err, EcpdsError::ServiceUnavailable { .. }));
}
#[tokio::test]
async fn strict_policy_picks_worst_failure_outcome_across_servers() {
let mut auth_server = mockito::Server::new_async().await;
auth_server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(401)
.with_body(r#"{"error":"unauthorized"}"#)
.create_async()
.await;
let config = make_config(vec!["http://127.0.0.1:1".to_string(), auth_server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("must fail");
assert_eq!(
err.fetch_outcome(),
FetchOutcome::Unauthorized,
"401 outranks Unreachable so on-call sees the credential problem, \
not the coincident dead server"
);
}
#[tokio::test]
async fn strict_policy_unions_disjoint_responses_from_all_servers() {
let mut server_a = mockito::Server::new_async().await;
let mut server_b = mockito::Server::new_async().await;
server_a
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"CIP","active":true}],"success":"yes"}"#)
.create_async()
.await;
server_b
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"BAR","active":true}],"success":"yes"}"#)
.create_async()
.await;
let config = make_config(vec![server_a.url(), server_b.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, outcome) = client.fetch_user_destinations("testuser").await.unwrap();
let mut sorted: Vec<String> = result.into_iter().collect();
sorted.sort();
assert_eq!(sorted, vec!["BAR".to_string(), "CIP".to_string()]);
assert_eq!(outcome, FetchOutcome::Success);
}
#[tokio::test]
async fn strict_policy_unions_overlapping_responses_from_all_servers() {
let mut server_a = mockito::Server::new_async().await;
let mut server_b = mockito::Server::new_async().await;
for srv in [&mut server_a, &mut server_b] {
srv.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"CIP","active":true},{"name":"FOO","active":true}],"success":"yes"}"#)
.create_async()
.await;
}
let config = make_config(vec![server_a.url(), server_b.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, outcome) = client.fetch_user_destinations("testuser").await.unwrap();
let mut sorted: Vec<String> = result.into_iter().collect();
sorted.sort();
assert_eq!(sorted, vec!["CIP".to_string(), "FOO".to_string()]);
assert_eq!(outcome, FetchOutcome::Success);
}
#[tokio::test]
async fn fetch_classifies_http_401_as_unauthorized() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(401)
.with_body(r#"{"error":"unauthorized"}"#)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("must fail");
let outcome = err.fetch_outcome();
assert_eq!(outcome, FetchOutcome::Unauthorized, "got {outcome:?}");
assert_eq!(outcome.label(), "http_401");
}
#[tokio::test]
async fn fetch_classifies_http_500_as_server_error() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(500)
.with_body(r#"{"error":"oops"}"#)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("must fail");
assert_eq!(err.fetch_outcome(), FetchOutcome::ServerError);
assert_eq!(err.fetch_outcome().label(), "http_5xx");
}
#[tokio::test]
async fn fetch_classifies_http_404_as_client_error() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(404)
.with_body(r#"{"error":"not found"}"#)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("must fail");
assert_eq!(err.fetch_outcome(), FetchOutcome::ClientError);
assert_eq!(err.fetch_outcome().label(), "http_4xx");
}
#[tokio::test]
async fn fetch_classifies_http_429_as_client_error() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(429)
.with_body(r#"{"error":"rate limited"}"#)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("must fail");
assert_eq!(err.fetch_outcome(), FetchOutcome::ClientError);
assert_eq!(err.fetch_outcome().label(), "http_4xx");
}
#[tokio::test]
async fn fetch_classifies_malformed_json_as_invalid_response() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body("not even close to valid json")
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("must fail");
assert_eq!(err.fetch_outcome(), FetchOutcome::InvalidResponse);
assert_eq!(err.fetch_outcome().label(), "invalid_response");
}
#[tokio::test]
async fn fetch_classifies_unreachable_as_unreachable() {
let config = make_config(vec!["http://127.0.0.1:1".to_string()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("testuser")
.await
.expect_err("must fail");
assert_eq!(err.fetch_outcome(), FetchOutcome::Unreachable);
}
#[tokio::test]
async fn parsing_skips_inactive_records_so_inactive_destinations_deny_access() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"destinationList":[{"name":"CIP","active":true},{"name":"INACTIVE","active":false},{"name":"NO_FIELD"}],"success":"yes"}"#,
)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, _outcome) = client.fetch_user_destinations("testuser").await.unwrap();
assert!(
result.contains("CIP"),
"active CIP must be in the allow-list"
);
assert!(
!result.contains("INACTIVE"),
"active=false must be filtered out (real-ECPDS contract)"
);
assert!(
!result.contains("NO_FIELD"),
"missing active field is treated as inactive (safe default)"
);
}
#[tokio::test]
async fn parsing_tolerates_records_missing_target_field() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"destinationList":[{"name":"CIP","active":true},{"id":"no-name","active":true},{"name":"FOO","active":true}],"success":"yes"}"#,
)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, _outcome) = client.fetch_user_destinations("testuser").await.unwrap();
let mut sorted: Vec<String> = result.into_iter().collect();
sorted.sort();
assert_eq!(sorted, vec!["CIP".to_string(), "FOO".to_string()]);
}
#[tokio::test]
async fn fetch_uses_custom_target_field() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"id":"DEST1","name":"CIP","active":true}],"success":"yes"}"#)
.create_async()
.await;
let mut config = make_config(vec![server.url()]);
config.target_field = "id".to_string();
let client = EcpdsClient::new(&config).expect("client must build");
let (result, _outcome) = client.fetch_user_destinations("testuser").await.unwrap();
assert!(result.contains("DEST1"));
assert!(!result.contains("CIP"));
}
#[test]
fn build_request_url_percent_encodes_special_chars() {
let base = reqwest::Url::parse("http://example.com").unwrap();
let url = EcpdsClient::build_request_url(&base, "user+name with spaces&extra=injected")
.expect("URL must build");
let s = url.as_str();
assert!(s.starts_with("http://example.com/ecpds/v1/destination/list?id="));
assert!(s.contains("user%2Bname"), "got {s}");
assert!(
s.contains("with+spaces") || s.contains("with%20spaces"),
"got {s}"
);
assert!(s.contains("%26extra%3Dinjected"), "got {s}");
assert!(!s.contains("&extra=injected"), "got {s}");
}
#[test]
fn build_request_url_handles_reverse_proxy_prefix_with_trailing_slash() {
let base = reqwest::Url::parse("https://proxy.example/ecpds-api/").unwrap();
let url = EcpdsClient::build_request_url(&base, "alice").unwrap();
assert_eq!(
url.as_str(),
"https://proxy.example/ecpds-api/ecpds/v1/destination/list?id=alice"
);
}
#[test]
fn build_request_url_handles_reverse_proxy_prefix_without_trailing_slash() {
let base = reqwest::Url::parse("https://proxy.example/ecpds-api").unwrap();
let url = EcpdsClient::build_request_url(&base, "alice").unwrap();
assert_eq!(
url.as_str(),
"https://proxy.example/ecpds-api/ecpds/v1/destination/list?id=alice"
);
}
#[test]
fn client_construction_rejects_invalid_server_url() {
let config = make_config(vec!["not a url".to_string()]);
let err = EcpdsClient::new(&config)
.expect_err("invalid server URL must be rejected at construction");
assert!(matches!(err, EcpdsError::InvalidServerUrl { .. }));
}
#[tokio::test]
async fn fetch_url_encodes_username_with_special_chars() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::UrlEncoded(
"id".into(),
"u+s er&name".into(),
))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"OK","active":true}],"success":"yes"}"#)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, _outcome) = client
.fetch_user_destinations("u+s er&name")
.await
.expect("should succeed");
mock.assert_async().await;
assert!(result.contains("OK"));
}
#[tokio::test]
async fn fetch_sends_http_basic_auth_with_configured_credentials() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.match_header(
"authorization",
mockito::Matcher::Exact("Basic dGVzdHVzZXI6dGVzdHBhc3M=".to_string()),
)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"OK","active":true}],"success":"yes"}"#)
.expect(1)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, outcome) = client
.fetch_user_destinations("alice")
.await
.expect("fetch must succeed when Basic auth header is sent");
mock.assert_async().await;
assert_eq!(outcome, FetchOutcome::Success);
assert!(
result.contains("OK"),
"mock only matches when the Authorization header is exactly \
'Basic dGVzdHVzZXI6dGVzdHBhc3M=' (base64 of 'testuser:testpass'); \
a successful response proves the service-account creds reached ECPDS"
);
}
#[tokio::test]
async fn fetch_treats_success_field_not_yes_as_invalid_response() {
let mut server = mockito::Server::new_async().await;
server
.mock("GET", "/ecpds/v1/destination/list")
.match_query(mockito::Matcher::Any)
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[],"success":"no"}"#)
.create_async()
.await;
let config = make_config(vec![server.url()]);
let client = EcpdsClient::new(&config).expect("client must build");
let err = client
.fetch_user_destinations("alice")
.await
.expect_err("ECPDS reporting success != yes must surface as a fetch failure");
assert_eq!(
err.fetch_outcome(),
FetchOutcome::InvalidResponse,
"success={{!=yes}} indicates an upstream-reported failure; treating it \
as a normal empty allow-list would hide the outage from \
aviso_ecpds_fetch_total"
);
}
#[tokio::test]
async fn fetch_works_with_reverse_proxy_prefix_server() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/some-prefix/ecpds/v1/destination/list")
.match_query(mockito::Matcher::UrlEncoded("id".into(), "alice".into()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"destinationList":[{"name":"OK","active":true}],"success":"yes"}"#)
.create_async()
.await;
let config = make_config(vec![format!("{}/some-prefix/", server.url())]);
let client = EcpdsClient::new(&config).expect("client must build");
let (result, _outcome) = client
.fetch_user_destinations("alice")
.await
.expect("should succeed");
mock.assert_async().await;
assert!(result.contains("OK"));
}
}