use std::{
collections::{BTreeSet, HashMap},
str::FromStr,
sync::Arc,
};
use crossbeam_skiplist::SkipMap;
use enum_map::EnumMap;
use junction_api::{backend::BackendId, http::Route, Hostname};
use petgraph::{
graph::{DiGraph, NodeIndex},
visit::{EdgeRef, Visitable},
Direction,
};
use tokio::sync::Notify;
use xds_api::pb::envoy::config::{
cluster::v3 as xds_cluster, endpoint::v3 as xds_endpoint, listener::v3 as xds_listener,
route::v3 as xds_route,
};
use xds_api::pb::google::protobuf;
use crate::{endpoints::EndpointGroup, BackendLb};
use super::{
resources::{
ApiListener, ApiListenerData, Cluster, LoadAssignment, ResourceError, RouteConfig,
RouteConfigData,
},
ConfigCache, DnsUpdates, ResourceType, ResourceVec, ResourceVersion, XdsConfig,
};
#[derive(Debug)]
struct ResourceMap<T> {
changed: Arc<Notify>,
map: SkipMap<String, CacheEntry<T>>,
}
impl<T> Default for ResourceMap<T> {
fn default() -> Self {
Self {
changed: Arc::new(Notify::new()),
map: Default::default(),
}
}
}
impl<T: Send + 'static> ResourceMap<T> {
#[cfg(test)]
fn is_empty(&self) -> bool {
self.map.is_empty()
}
fn get<'a>(&'a self, name: &str) -> Option<ResourceEntry<'a, T>> {
self.map.get(name).map(ResourceEntry)
}
async fn get_await<'a>(&'a self, name: &str) -> Option<ResourceEntry<'a, T>> {
if let Some(entry) = self.map.get(name).map(ResourceEntry) {
return Some(entry);
}
let changed = self.changed.notified();
tokio::pin!(changed);
loop {
if let Some(entry) = self.map.get(name).map(ResourceEntry) {
return Some(entry);
}
changed.as_mut().await;
changed.set(self.changed.notified());
}
}
fn iter(&self) -> impl Iterator<Item = ResourceEntry<T>> + '_ {
self.map.iter().map(ResourceEntry)
}
fn has_data(&self, k: &str) -> bool {
match self.get(k) {
None => false,
Some(entry) => entry.data().is_some(),
}
}
fn versions(&self) -> HashMap<String, String> {
let mut versions = HashMap::new();
for entry in self.iter() {
if entry.data().is_none() {
continue;
};
let Some(version) = entry.version() else {
continue;
};
let name = entry.name().to_string();
let version = version.to_string();
versions.insert(name, version);
}
versions
}
fn remove(&self, name: &str) -> Option<ResourceEntry<T>> {
let entry = self.map.remove(name);
self.changed.notify_waiters();
entry.map(ResourceEntry)
}
fn remove_all<I>(&self, names: I)
where
I: IntoIterator<Item: AsRef<str>>,
{
for name in names {
self.remove(name.as_ref());
}
}
}
impl<X, T> ResourceMap<T>
where
T: CacheEntryData<Xds = X> + Clone + Send + 'static,
X: PartialEq + prost::Name,
{
fn insert_ok(&self, name: String, version: ResourceVersion, t: T) {
self.map.insert(
name,
CacheEntry {
version: Some(version),
last_error: None,
data: Some(t),
},
);
self.changed.notify_waiters();
}
fn insert_tombstone(&self, name: String) {
self.map.insert(
name,
CacheEntry {
version: None,
last_error: None,
data: None,
},
);
self.changed.notify_waiters();
}
fn insert_error<E: Into<ResourceError>>(
&self,
name: String,
version: ResourceVersion,
error: E,
) {
match self.map.get(&name) {
Some(entry) => {
let mut updated_entry = entry.value().clone();
updated_entry.last_error = Some((version, error.into()));
self.map.insert(name, updated_entry);
self.changed.notify_waiters();
}
None => {
self.map.insert(
name,
CacheEntry {
version: None,
last_error: Some((version, error.into())),
data: None,
},
);
self.changed.notify_waiters();
}
}
}
fn is_changed(&self, name: &str, t: &X) -> bool {
let Some(entry) = self.map.get(name) else {
return true;
};
let Some(entry_data) = &entry.value().data else {
return true;
};
entry_data.xds() != t
}
}
struct ResourceEntry<'a, T>(crossbeam_skiplist::map::Entry<'a, String, CacheEntry<T>>);
impl<T> ResourceEntry<'_, T> {
fn name(&self) -> &str {
self.0.key()
}
fn version(&self) -> Option<&ResourceVersion> {
self.0.value().version.as_ref()
}
fn last_error(&self) -> Option<&(ResourceVersion, ResourceError)> {
self.0.value().last_error.as_ref()
}
fn data(&self) -> Option<&T> {
self.0.value().data.as_ref()
}
}
#[derive(Clone, Debug)]
struct CacheEntry<T> {
version: Option<ResourceVersion>,
last_error: Option<(ResourceVersion, ResourceError)>,
data: Option<T>,
}
impl<T> Default for CacheEntry<T> {
fn default() -> Self {
Self {
version: None,
last_error: None,
data: None,
}
}
}
impl<T: CacheEntryData> CacheEntry<T> {}
trait CacheEntryData {
type Xds;
fn xds(&self) -> &Self::Xds;
}
macro_rules! impl_cache_entry {
($entry_ty:ty, $xds_ty:ty) => {
impl CacheEntryData for $entry_ty {
type Xds = $xds_ty;
fn xds(&self) -> &$xds_ty {
&self.xds
}
}
};
}
impl_cache_entry!(ApiListener, xds_listener::Listener);
impl_cache_entry!(RouteConfig, xds_route::RouteConfiguration);
impl_cache_entry!(Cluster, xds_cluster::Cluster);
impl_cache_entry!(LoadAssignment, xds_endpoint::ClusterLoadAssignment);
#[derive(Debug, Default)]
struct CacheData {
listeners: ResourceMap<ApiListener>,
route_configs: ResourceMap<RouteConfig>,
clusters: ResourceMap<Cluster>,
load_assignments: ResourceMap<LoadAssignment>,
}
#[derive(Debug, Default, PartialEq, Eq)]
pub(crate) struct Changes {
pub(crate) added: BTreeSet<String>,
pub(crate) removed: BTreeSet<String>,
}
impl Changes {
pub(crate) fn is_empty(&self) -> bool {
self.added.is_empty() && self.removed.is_empty()
}
}
#[derive(Debug)]
struct Subscriptions {
subs: DiGraph<SubscriptionInfo, ()>,
changes: EnumMap<ResourceType, Changes>,
wildcard: EnumMap<ResourceType, bool>,
}
impl Default for Subscriptions {
fn default() -> Self {
let mut wildcard = EnumMap::default();
for rtype in ResourceType::all() {
wildcard[*rtype] = rtype.supports_wildcard()
}
Self {
subs: Default::default(),
changes: Default::default(),
wildcard,
}
}
}
#[derive(Debug)]
struct SubscriptionInfo {
resource_type: ResourceType,
name: String,
explicit: bool,
wildcard: bool,
}
impl Subscriptions {
fn explicit(&self, rtype: ResourceType) -> impl Iterator<Item = &str> + '_ {
self.subs
.node_weights()
.filter(move |w| w.resource_type == rtype && !w.wildcard)
.map(|w| &w.name[..])
}
fn subscribe(&mut self, rtype: ResourceType, name: &str) {
let sub = self.find_or_create(rtype, name, false);
self.subs[sub].explicit = true;
}
fn unsubscribe(&mut self, rtype: ResourceType, name: &str) {
if let Some(sub) = self.find(rtype, name) {
self.subs[sub].explicit = false;
}
}
fn remove(&mut self, rtype: ResourceType, name: &str) {
if let Some(sub) = self.find_subcribed(rtype, name) {
self.reset_refs(sub);
}
}
#[inline]
fn clear_changes(&mut self, rtype: ResourceType, name: &str) {
self.changes[rtype].added.remove(name);
self.changes[rtype].removed.remove(name);
}
fn remove_sub(&mut self, sub: NodeIndex) {
let sub = self.subs.remove_node(sub).unwrap();
self.changes[sub.resource_type].removed.insert(sub.name);
}
fn find_subcribed(&mut self, rtype: ResourceType, name: &str) -> Option<NodeIndex> {
if self.wildcard[rtype] {
let sub = self.find_or_create(rtype, name, true);
return Some(sub);
}
self.find(rtype, name)
}
fn find(&self, rtype: ResourceType, name: &str) -> Option<NodeIndex> {
self.subs.node_indices().find(|idx| {
let sub = &self.subs[*idx];
sub.resource_type == rtype && sub.name == name
})
}
fn find_or_create(&mut self, rtype: ResourceType, name: &str, wildcard: bool) -> NodeIndex {
match self.find(rtype, name) {
Some(idx) => idx,
None => {
let idx = self.subs.add_node(SubscriptionInfo {
name: name.to_string(),
resource_type: rtype,
explicit: false,
wildcard,
});
if !wildcard {
self.changes[rtype].added.insert(name.to_string());
}
idx
}
}
}
fn reset_refs(&mut self, sub: NodeIndex) {
let out_refs: Vec<_> = self
.subs
.edges_directed(sub, Direction::Outgoing)
.map(|edge_ref| edge_ref.id())
.collect();
for out_ref in out_refs {
self.subs.remove_edge(out_ref);
}
}
fn add_ref(&mut self, from_sub: NodeIndex, rtype: ResourceType, name: &str) {
let to_sub = self.find_or_create(rtype, name, false);
self.subs.add_edge(from_sub, to_sub, ());
}
fn collect(&mut self) {
use petgraph::visit::{Control, DfsEvent};
let mut reachable = self.subs.visit_map();
petgraph::visit::depth_first_search(&self.subs, self.gc_roots(), |event| -> Control<()> {
if let DfsEvent::Discover(n, _) = event {
if reachable.contains(n.index()) {
return Control::Prune;
}
reachable.insert(n.index());
};
Control::Continue
});
for idx in self.subs.node_indices().rev() {
if reachable.contains(idx.index()) {
continue;
}
self.remove_sub(idx);
}
}
fn is_gc_root(&self, node: NodeIndex) -> bool {
let sub_data = &self.subs[node];
sub_data.explicit || sub_data.wildcard
}
fn gc_roots(&self) -> Vec<NodeIndex> {
self.subs
.node_indices()
.filter(|idx| self.is_gc_root(*idx))
.collect()
}
}
#[derive(Debug, Default)]
pub(super) struct Cache {
subs: Subscriptions,
data: Arc<CacheData>,
dns: DnsUpdates,
}
impl Cache {
pub(super) fn reader(&self) -> CacheReader {
CacheReader {
data: self.data.clone(),
}
}
#[cfg(test)]
pub(crate) fn set_wildcard(&mut self, rtype: ResourceType, wildcard: bool) {
if !rtype.supports_wildcard() {
return;
}
self.subs.wildcard[rtype] = wildcard;
}
pub(crate) fn is_wildcard(&self, rtype: ResourceType) -> bool {
self.subs.wildcard[rtype]
}
pub(crate) fn subscribe(&mut self, rtype: ResourceType, name: &str) {
self.subs.subscribe(rtype, name);
}
pub(crate) fn unsubscribe(&mut self, rtype: ResourceType, name: &str) {
self.subs.unsubscribe(rtype, name);
}
pub(crate) fn subscribe_dns(&mut self, hostname: Hostname, port: u16) {
self.dns.add.insert((hostname, port));
}
pub(crate) fn unsubscribe_dns(&mut self, hostname: Hostname, port: u16) {
self.dns.remove.insert((hostname, port));
}
#[cfg(test)]
pub(crate) fn subscriptions(&self, rtype: ResourceType) -> Vec<String> {
self.subs.explicit(rtype).map(|s| s.to_string()).collect()
}
pub(crate) fn dns_names(&self) -> impl Iterator<Item = (Hostname, u16)> + '_ {
self.data
.clusters
.iter()
.filter_map(|e| e.data().and_then(|c| c.dns_name()))
}
pub(crate) fn initial_subscriptions(&self, rtype: ResourceType) -> Vec<String> {
macro_rules! missing_from {
($m:expr) => {
self.subs
.explicit(rtype)
.filter(|k| !$m.has_data(k))
.map(|s| s.to_string())
.collect()
};
}
match rtype {
ResourceType::Cluster => missing_from!(self.data.clusters),
ResourceType::ClusterLoadAssignment => missing_from!(self.data.load_assignments),
ResourceType::Listener => missing_from!(self.data.listeners),
ResourceType::RouteConfiguration => missing_from!(self.data.route_configs),
}
}
pub(crate) fn versions(&self, rtype: ResourceType) -> HashMap<String, String> {
match rtype {
ResourceType::Cluster => self.data.clusters.versions(),
ResourceType::ClusterLoadAssignment => self.data.load_assignments.versions(),
ResourceType::Listener => self.data.listeners.versions(),
ResourceType::RouteConfiguration => self.data.route_configs.versions(),
}
}
pub(crate) fn collect(&mut self) -> (EnumMap<ResourceType, Changes>, DnsUpdates) {
self.subs.collect();
let changes = std::mem::take(&mut self.subs.changes);
macro_rules! remove_all {
($field:ident, $rtype:expr) => {
self.data.$field.remove_all(&changes[$rtype].removed)
};
}
remove_all!(route_configs, ResourceType::RouteConfiguration);
remove_all!(listeners, ResourceType::Listener);
remove_all!(load_assignments, ResourceType::ClusterLoadAssignment);
let mut dns = std::mem::take(&mut self.dns);
for cluster_name in &changes[ResourceType::Cluster].removed {
if let Some(entry) = self.data.clusters.remove(cluster_name) {
let dns_name = entry.data().and_then(|c| c.dns_name());
if let Some(dns_name) = dns_name {
dns.remove.insert(dns_name);
}
}
}
(changes, dns)
}
pub(crate) fn insert(&mut self, resources: ResourceVec) -> Vec<ResourceError> {
match resources {
ResourceVec::Cluster(clusters) => self.insert_clusters(clusters),
ResourceVec::ClusterLoadAssignment(clas) => self.insert_load_assignments(clas),
ResourceVec::Listener(listeners) => self.insert_listeners(listeners),
ResourceVec::RouteConfiguration(rcs) => self.insert_route_configs(rcs),
}
}
pub(crate) fn remove(&mut self, rtype: ResourceType, names: &[String]) {
macro_rules! tombstone_all {
($data:ident, $rtype:expr, $names:expr) => {{
for name in $names {
self.data.$data.insert_tombstone(name.clone());
self.subs.remove(rtype, name);
}
}};
}
match rtype {
ResourceType::Listener => tombstone_all!(listeners, rtype, names),
ResourceType::RouteConfiguration => tombstone_all!(route_configs, rtype, names),
ResourceType::Cluster => tombstone_all!(clusters, rtype, names),
ResourceType::ClusterLoadAssignment => tombstone_all!(load_assignments, rtype, names),
}
}
fn insert_listeners(
&mut self,
listeners: Vec<(ResourceVersion, xds_listener::Listener)>,
) -> Vec<ResourceError> {
let mut errors = Vec::new();
for (version, listener) in listeners {
if !self.data.listeners.is_changed(&listener.name, &listener) {
continue;
}
let Some(sub) = self
.subs
.find_subcribed(ResourceType::Listener, &listener.name)
else {
continue;
};
let listener_name = listener.name.clone();
let api_listener = match ApiListener::from_xds(&listener_name, listener) {
Ok(l) => l,
Err(e) => {
self.data
.listeners
.insert_error(listener_name, version.clone(), e.clone());
errors.push(e);
continue;
}
};
self.subs.reset_refs(sub);
match &api_listener.route_config {
ApiListenerData::Rds(rc_name) => {
self.subs
.add_ref(sub, ResourceType::RouteConfiguration, rc_name.as_str());
}
ApiListenerData::Inlined(RouteConfigData::Route { clusters, .. }) => {
for cluster in clusters {
self.subs
.add_ref(sub, ResourceType::Cluster, cluster.as_str());
}
}
ApiListenerData::Inlined(RouteConfigData::LbPolicy { action, cluster }) => {
let version_and_xds = self.data.clusters.get(cluster.as_str()).and_then(|e| {
let version = e.version();
let data = e.data();
version.zip(data).map(|(v, d)| (v.clone(), d.xds.clone()))
});
let res = match version_and_xds {
Some((version, xds)) => {
self.insert_cluster(version, xds, Some(Arc::clone(action)))
}
None => Ok(()),
};
if let Err(e) = res {
self.data
.listeners
.insert_error(listener_name, version, e.clone());
errors.push(e);
continue;
}
}
}
self.subs
.clear_changes(ResourceType::Listener, &listener_name);
self.data
.listeners
.insert_ok(listener_name, version, api_listener);
}
errors
}
fn insert_clusters(
&mut self,
clusters: Vec<(ResourceVersion, xds_cluster::Cluster)>,
) -> Vec<ResourceError> {
let mut errors = Vec::new();
for (version, cluster) in clusters {
if !self.data.clusters.is_changed(&cluster.name, &cluster) {
continue;
}
let lb_action = self.find_lb_action(&cluster.name);
if let Err(e) = self.insert_cluster(version, cluster, lb_action) {
errors.push(e);
}
}
errors
}
fn insert_cluster(
&mut self,
version: ResourceVersion,
cluster: xds_cluster::Cluster,
lb_policy: Option<Arc<xds_route::RouteAction>>,
) -> Result<(), ResourceError> {
let Some(sub) = self
.subs
.find_subcribed(ResourceType::Cluster, &cluster.name)
else {
return Ok(());
};
let cluster_name = cluster.name.clone();
let cluster = match Cluster::from_xds(cluster, lb_policy.as_deref()) {
Ok(c) => c,
Err(e) => {
self.data
.clusters
.insert_error(cluster_name, version.clone(), e.clone());
return Err(e);
}
};
self.subs.reset_refs(sub);
match cluster.dns_name() {
Some(dns_name) => {
self.dns.add.insert(dns_name);
}
None => self
.subs
.add_ref(sub, ResourceType::ClusterLoadAssignment, &cluster_name),
}
let lb_config_name = cluster.backend_lb.config.id.lb_config_route_name();
self.subs
.add_ref(sub, ResourceType::Listener, &lb_config_name);
self.subs
.clear_changes(ResourceType::Cluster, &cluster_name);
self.data.clusters.insert_ok(cluster_name, version, cluster);
Ok(())
}
fn find_lb_action(&self, cluster_name: &str) -> Option<Arc<xds_route::RouteAction>> {
let target = BackendId::from_str(cluster_name).ok()?;
let listener = self.data.listeners.get(&target.lb_config_route_name())?;
match &listener.data()?.route_config {
ApiListenerData::Rds(name) => {
let route_config = self.data.route_configs.get(name.as_str())?;
route_config.data().and_then(|rc| match &rc.data {
RouteConfigData::LbPolicy { action, .. } => Some(action.clone()),
_ => None,
})
}
ApiListenerData::Inlined(data) => match &data {
RouteConfigData::LbPolicy { action, .. } => Some(action.clone()),
_ => None,
},
}
}
fn insert_route_configs(
&mut self,
route_configs: Vec<(ResourceVersion, xds_route::RouteConfiguration)>,
) -> Vec<ResourceError> {
let mut errors = Vec::new();
for (version, route_config) in route_configs {
let Some(sub) = self
.subs
.find_subcribed(ResourceType::RouteConfiguration, &route_config.name)
else {
continue;
};
let route_name = route_config.name.clone();
let route_config = match RouteConfig::from_xds(route_config) {
Ok(route_config) => route_config,
Err(e) => {
self.data
.route_configs
.insert_error(route_name, version, e.clone());
errors.push(e);
continue;
}
};
match &route_config.data {
RouteConfigData::Route { clusters, .. } => {
for cluster in clusters {
self.subs
.add_ref(sub, ResourceType::Cluster, cluster.as_str());
}
}
RouteConfigData::LbPolicy { action, cluster } => {
let version_and_xds = self.data.clusters.get(cluster.as_str()).and_then(|e| {
let version = e.version();
let data = e.data();
version.zip(data).map(|(v, d)| (v.clone(), d.xds.clone()))
});
let res = match version_and_xds {
Some((version, xds)) => {
self.insert_cluster(version, xds, Some(Arc::clone(action)))
}
None => Ok(()),
};
if let Err(e) = res {
self.data
.route_configs
.insert_error(route_name, version, e.clone());
errors.push(e);
continue;
}
}
}
self.subs
.clear_changes(ResourceType::RouteConfiguration, &route_name);
self.data
.route_configs
.insert_ok(route_name, version, route_config);
}
errors
}
fn insert_load_assignments(
&mut self,
load_assignments: Vec<(ResourceVersion, xds_endpoint::ClusterLoadAssignment)>,
) -> Vec<ResourceError> {
let mut errors = Vec::new();
for (version, load_assignment) in load_assignments {
let sub = self.subs.find_subcribed(
ResourceType::ClusterLoadAssignment,
&load_assignment.cluster_name,
);
if sub.is_none() {
continue;
};
let cla_name = load_assignment.cluster_name.clone();
match LoadAssignment::from_xds(load_assignment) {
Ok(cla) => {
self.subs
.clear_changes(ResourceType::ClusterLoadAssignment, &cla_name);
self.data.load_assignments.insert_ok(cla_name, version, cla);
}
Err(e) => {
self.data
.load_assignments
.insert_error(cla_name, version, e.clone());
errors.push(e);
}
};
}
errors
}
}
#[derive(Default, Clone)]
pub(super) struct CacheReader {
data: Arc<CacheData>,
}
impl ConfigCache for CacheReader {
async fn get_route<S: AsRef<str>>(&self, host: S) -> Option<Arc<Route>> {
let listener = self.data.listeners.get_await(host.as_ref()).await?;
match &listener.data()?.route_config {
ApiListenerData::Rds(name) => {
let route_config = self.data.route_configs.get_await(name.as_str()).await?;
match &route_config.data()?.data {
RouteConfigData::Route { route, .. } => Some(route.clone()),
_ => None,
}
}
ApiListenerData::Inlined(data) => match &data {
RouteConfigData::Route { route, .. } => Some(route.clone()),
_ => None,
},
}
}
async fn get_backend(&self, id: &BackendId) -> Option<Arc<BackendLb>> {
let cluster = self.data.clusters.get_await(&id.name()).await?;
let cluster_data = cluster.data()?;
Some(cluster_data.backend_lb.clone())
}
async fn get_endpoints(&self, id: &BackendId) -> Option<Arc<EndpointGroup>> {
let la = self.data.load_assignments.get_await(&id.name()).await?;
let la_data = la.data()?;
Some(la_data.endpoint_group.clone())
}
}
impl CacheReader {
pub(super) fn iter_routes(&self) -> impl Iterator<Item = Arc<Route>> + '_ {
let listener_routes = self.data.listeners.iter().filter_map(|entry| {
entry
.data()
.and_then(|api_listener| match &api_listener.route_config {
ApiListenerData::Inlined(RouteConfigData::Route { route, .. }) => {
Some(route.clone())
}
_ => None,
})
});
let route_config_routes = self.data.route_configs.iter().filter_map(|entry| {
entry.data().and_then(|rc| match &rc.data {
RouteConfigData::Route { route, .. } => Some(route.clone()),
_ => None,
})
});
listener_routes.chain(route_config_routes)
}
pub(super) fn iter_backends(&self) -> impl Iterator<Item = Arc<BackendLb>> + '_ {
self.data
.clusters
.iter()
.filter_map(|entry| entry.data().map(|cluster| cluster.backend_lb.clone()))
}
pub(super) fn iter_xds(&self) -> impl Iterator<Item = XdsConfig> + '_ {
use prost::Name;
macro_rules! any_iter {
($field:ident, $xds_type:ty) => {
self.data.$field.iter().map(|entry| {
let name = entry.name().to_string();
let type_url = <$xds_type>::type_url();
let version = entry.version().cloned();
let xds = entry.data().map(|data| {
protobuf::Any::from_msg(data.xds()).expect("generated invalid protobuf")
});
let last_error = entry.last_error().map(|(v, e)| (v.clone(), e.to_string()));
XdsConfig {
name,
type_url,
version,
xds,
last_error,
}
})
};
}
any_iter!(listeners, xds_listener::Listener)
.chain(any_iter!(route_configs, xds_route::RouteConfiguration))
.chain(any_iter!(clusters, xds_cluster::Cluster))
.chain(any_iter!(
load_assignments,
xds_endpoint::ClusterLoadAssignment
))
}
}
#[cfg(test)]
mod test {
use junction_api::Service;
use pretty_assertions::assert_eq;
use super::*;
use crate::xds::test as xds_test;
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
#[test]
fn test_reader_send_sync() {
assert_send::<CacheReader>();
assert_sync::<CacheReader>();
}
#[test]
fn test_cache_send_sync() {
assert_send::<Cache>();
assert_sync::<Cache>();
}
macro_rules! collect_str {
($($arg:expr),* $(,)?) => {
[$(
$arg.to_string(),
)*].into_iter().collect()
}
}
macro_rules! collect_kv_str {
($(($k:expr, $v:expr)),* $(,)?) => {
[$(
($k.to_string(), $v.to_string()),
)*].into_iter().collect()
}
}
#[track_caller]
fn assert_insert(errors: Vec<ResourceError>) {
assert!(errors.is_empty(), "errors is not empty: {errors:?}");
}
#[test]
fn test_empty_cache() {
let mut cache = Cache::default();
let (rs, dns) = cache.collect();
assert!(rs.values().all(|v| v.is_empty()));
assert!(dns.is_noop());
assert!(ResourceType::all()
.iter()
.all(|&rtype| cache.versions(rtype).is_empty()));
assert!(cache.subscriptions(ResourceType::Listener).is_empty());
}
#[test]
fn test_insert_listener_lds_explicit() {
let listeners = ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"listener.example.svc.cluster.local",
"example-route",
)],
);
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Listener, false);
assert_insert(cache.insert(listeners.clone()));
let (resources, dns) = cache.collect();
assert_eq!(resources, EnumMap::default());
assert!(dns.is_noop());
cache.subscribe(ResourceType::Listener, "listener.example.svc.cluster.local");
let _ = cache.collect();
assert_insert(cache.insert(listeners));
let (resources, dns) = cache.collect();
assert!(dns.is_noop());
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::RouteConfiguration => Changes {
added: collect_str!["example-route"],
removed: BTreeSet::new(),
},
_ => Changes::default(),
}
);
assert_eq!(
cache.subscriptions(ResourceType::Listener),
vec!["listener.example.svc.cluster.local"]
);
assert_eq!(
cache.versions(ResourceType::Listener),
collect_kv_str![("listener.example.svc.cluster.local", "123")]
);
}
#[test]
fn test_insert_listener_lds_wildcard() {
let mut cache = Cache::default();
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"listener.example.svc.cluster.local",
"example-route",
)],
)));
let (resources, dns) = cache.collect();
assert!(dns.is_noop());
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::RouteConfiguration => Changes {
added: collect_str!["example-route"],
removed: BTreeSet::new(),
},
_ => Changes::default(),
}
);
assert!(cache.subscriptions(ResourceType::Listener).is_empty());
assert_eq!(
cache.versions(ResourceType::Listener),
collect_kv_str![("listener.example.svc.cluster.local", "123")]
);
}
#[test]
fn test_insert_listener_lds_inline_rds() {
let mut cache = Cache::default();
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"listener.example.svc.cluster.local:80",
"example-route" => [xds_test::vhost!(
"a-virtual-host",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster.example:8008")],
)],
)],
)));
let (resources, dns) = cache.collect();
assert!(dns.is_noop());
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::Cluster => Changes {
added: collect_str!["cluster.example:8008"],
removed: BTreeSet::new(),
},
_ => Changes::default(),
}
);
}
#[test]
fn test_insert_listener_invalid() {
let mut cache = Cache::default();
cache.subscribe(ResourceType::Listener, "potato");
let _ = cache.collect();
let errors = cache.insert(ResourceVec::from_listeners(
"123".into(),
[xds_listener::Listener {
name: "potato".to_string(),
..Default::default()
}],
));
assert_eq!(errors.len(), 1);
assert_eq!(cache.subscriptions(ResourceType::Listener), vec!["potato"]);
assert!(cache.versions(ResourceType::Listener).is_empty());
let (resources, dns) = cache.collect();
assert_eq!(resources, Default::default());
assert!(dns.is_noop());
}
#[test]
fn test_insert_cluster_cds_wildcard() {
let mut cache = Cache::default();
let kube_backend = BackendId {
service: Service::kube("default", "whatever").unwrap(),
port: 7890,
};
let dns_backend = BackendId {
service: Service::dns("cluster.example").unwrap(),
port: 4433,
};
assert_insert(cache.insert(ResourceVec::from_clusters(
"123".into(),
vec![
xds_test::cluster!(dns_backend.name().leak()),
xds_test::cluster!(kube_backend.name().leak()),
],
)));
let (resources, dns) = cache.collect();
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::Listener => Changes {
added: collect_str!(
kube_backend.lb_config_route_name(),
dns_backend.lb_config_route_name(),
),
..Default::default()
},
ResourceType::ClusterLoadAssignment => Changes {
added: collect_str![kube_backend.name()],
..Default::default()
},
_ => Changes::default(),
}
);
assert_eq!(
dns,
DnsUpdates {
add: BTreeSet::from_iter([(Hostname::from_static("cluster.example"), 4433)]),
..Default::default()
},
);
assert!(cache.subscriptions(ResourceType::Cluster).is_empty());
assert_eq!(
cache.versions(ResourceType::Cluster),
collect_kv_str![(kube_backend.name(), "123"), (dns_backend.name(), "123"),]
);
}
#[test]
fn test_insert_cluster_cds_explicit() {
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Cluster, false);
let kube_backend = BackendId {
service: Service::kube("default", "whatever").unwrap(),
port: 7890,
};
let dns_backend = BackendId {
service: Service::dns("cluster.example").unwrap(),
port: 4433,
};
cache.subscribe(ResourceType::Cluster, &kube_backend.name());
let _ = cache.collect();
assert_insert(cache.insert(ResourceVec::from_clusters(
"123".into(),
vec![
xds_test::cluster!(dns_backend.name().leak()),
xds_test::cluster!(kube_backend.name().leak()),
],
)));
let (resources, dns) = cache.collect();
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::Listener => Changes {
added: collect_str!(
kube_backend.lb_config_route_name(),
),
..Default::default()
},
ResourceType::ClusterLoadAssignment => Changes {
added: collect_str![kube_backend.name()],
..Default::default()
},
_ => Changes::default(),
}
);
assert!(dns.is_noop());
assert_eq!(
cache.subscriptions(ResourceType::Cluster),
vec![kube_backend.name()],
);
assert_eq!(
cache.versions(ResourceType::Cluster),
collect_kv_str![(kube_backend.name(), "123")]
);
}
#[test]
fn test_insert_route_config() {
let route_config = xds_test::route_config!(
"example-route",
vec![xds_test::vhost!(
"a-vhost",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster.example:8008")]
)]
);
let mut cache = Cache::default();
assert_insert(cache.insert(ResourceVec::from_route_configs(
"123".into(),
vec![route_config.clone()],
)));
let (resources, dns) = cache.collect();
assert!(resources.values().all(|c| c.is_empty()));
assert!(dns.is_noop());
assert!(cache.data.route_configs.is_empty());
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"listener.example.svc.cluster.local",
"example-route"
)],
)));
assert_insert(cache.insert(ResourceVec::from_route_configs(
"123".into(),
vec![route_config],
)));
let (resources, dns) = cache.collect();
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::Cluster => Changes {
added: collect_str!["cluster.example:8008"],
..Default::default()
},
_ => Changes::default(),
}
);
assert!(dns.is_noop());
assert_eq!(
cache.subscriptions(ResourceType::RouteConfiguration),
vec!["example-route"],
);
assert_eq!(
cache.versions(ResourceType::RouteConfiguration),
collect_kv_str![("example-route", 123)],
);
}
#[test]
fn test_insert_load_assignment() {
let kube_backend = BackendId {
service: Service::kube("default", "whatever").unwrap(),
port: 7890,
};
let mut cache = Cache::default();
assert_insert(cache.insert(ResourceVec::from_load_assignments(
"123".into(),
vec![xds_test::cla!(
"whatever.default.svc.cluster.local:7890" => {
"zone1" => ["1.1.1.1"]
}
)],
)));
let (resources, dns) = cache.collect();
assert!(resources.values().all(|c| c.is_empty()));
assert!(dns.is_noop());
assert!(cache.data.load_assignments.is_empty());
assert_insert(cache.insert(ResourceVec::from_clusters(
"123".into(),
vec![xds_test::cluster!(kube_backend.name().leak())],
)));
let _ = cache.collect();
assert_insert(cache.insert(ResourceVec::from_load_assignments(
"123".into(),
vec![xds_test::cla!(
"whatever.default.svc.cluster.local:7890" => {
"zone1" => ["1.1.1.1"]
}
)],
)));
let (resources, dns) = cache.collect();
assert!(resources.values().all(|c| c.is_empty()));
assert!(dns.is_noop());
assert_eq!(
cache.versions(ResourceType::ClusterLoadAssignment),
collect_kv_str![("whatever.default.svc.cluster.local:7890", "123")]
);
}
#[test]
fn test_remove_listener_explicit() {
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Cluster, false);
cache.set_wildcard(ResourceType::Listener, false);
cache.subscribe(ResourceType::Listener, "listener.example.svc.cluster.local");
cache.subscribe(ResourceType::Listener, "listener.local");
let _ = cache.collect();
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"listener.example.svc.cluster.local",
"example-route",
)],
)));
assert_insert(cache.insert(ResourceVec::from_route_configs(
"123".into(),
vec![xds_test::route_config!(
"example-route",
vec![xds_test::vhost!(
"a-vhost",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster.example:8008")]
)]
)],
)));
assert_insert(cache.insert(ResourceVec::from_clusters(
"123".into(),
vec![xds_test::cluster!("cluster.example:8008")],
)));
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"cluster.example.lb.jct:8008",
"lb-route" => [xds_test::vhost!(
"lb-vhost",
["cluster.example.lb.jct:8080"],
[xds_test::route!(default "cluster.example:8008")],
)],
)],
)));
let _ = cache.collect();
assert_eq!(
cache.versions(ResourceType::Cluster),
collect_kv_str![("cluster.example:8008", "123")],
);
assert_eq!(
cache.versions(ResourceType::Listener),
collect_kv_str![
("listener.example.svc.cluster.local", "123"),
("cluster.example.lb.jct:8008", "123"),
],
);
assert_eq!(
cache.versions(ResourceType::RouteConfiguration),
collect_kv_str![("example-route", "123")],
);
assert!(cache
.versions(ResourceType::ClusterLoadAssignment)
.is_empty());
cache.remove(
ResourceType::Listener,
&["listener.example.svc.cluster.local".to_string()],
);
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"listener.local",
"better-example-route",
)],
)));
assert_insert(cache.insert(ResourceVec::from_route_configs(
"123".into(),
vec![xds_test::route_config!(
"better-example-route",
vec![xds_test::vhost!(
"a-vhost",
["listener.local"],
[xds_test::route!(default "cluster.example:8008")]
)]
)],
)));
let (resources, dns) = cache.collect();
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::RouteConfiguration => Changes {
removed: collect_str!("example-route"),
..Default::default()
},
_ => Changes::default()
}
);
assert!(dns.is_noop());
assert_eq!(
cache.subscriptions(ResourceType::Listener),
vec![
"listener.example.svc.cluster.local",
"listener.local",
"cluster.example.lb.jct:8008",
],
);
assert_eq!(
cache.versions(ResourceType::Listener),
collect_kv_str![
("listener.local", "123"),
("cluster.example.lb.jct:8008", "123"),
],
);
assert_eq!(
cache.versions(ResourceType::RouteConfiguration),
collect_kv_str![("better-example-route", "123")],
);
assert_eq!(
cache.versions(ResourceType::Cluster),
collect_kv_str![("cluster.example:8008", "123")],
);
cache.remove(ResourceType::Listener, &["listener.local".to_string()]);
let (resources, dns) = cache.collect();
assert_eq!(
dns,
DnsUpdates {
add: BTreeSet::new(),
remove: [(Hostname::from_static("cluster.example"), 8008)]
.into_iter()
.collect(),
sync: false,
}
);
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::Listener => Changes {
removed: collect_str!("cluster.example.lb.jct:8008"),
..Default::default()
},
ResourceType::RouteConfiguration => Changes {
removed: collect_str!("better-example-route"),
..Default::default()
},
ResourceType::Cluster => Changes {
removed: collect_str!("cluster.example:8008"),
..Default::default()
},
ResourceType::ClusterLoadAssignment => Changes::default(),
}
);
assert_eq!(
cache.subscriptions(ResourceType::Listener),
vec!["listener.example.svc.cluster.local", "listener.local"],
);
for &rtype in ResourceType::all() {
assert_eq!(cache.versions(rtype), HashMap::new());
}
}
#[test]
fn test_remove_listener_wildcard() {
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Cluster, true);
cache.set_wildcard(ResourceType::Listener, true);
cache.subscribe(ResourceType::Listener, "listener.example.svc.cluster.local");
let _ = cache.collect();
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![
xds_test::listener!("listener.example.svc.cluster.local", "example-route"),
xds_test::listener!("listener.local", "better-example-route"),
],
)));
assert_insert(cache.insert(ResourceVec::from_route_configs(
"123".into(),
vec![
xds_test::route_config!(
"example-route",
vec![xds_test::vhost!(
"a-vhost",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster.example:8008")]
)]
),
xds_test::route_config!(
"better-example-route",
vec![xds_test::vhost!(
"a-vhost",
["listener.local"],
[xds_test::route!(default "cluster.example:8008")]
)]
),
],
)));
assert_insert(cache.insert(ResourceVec::from_clusters(
"123".into(),
vec![xds_test::cluster!("cluster.example:8008")],
)));
assert_insert(cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!(
"cluster.example.lb.jct:8008",
"lb-route" => [xds_test::vhost!(
"lb-vhost",
["cluster.example.lb.jct:8080"],
[xds_test::route!(default "cluster.example:8008")],
)],
)],
)));
let _ = cache.collect();
assert_eq!(
cache.versions(ResourceType::Cluster),
collect_kv_str![("cluster.example:8008", "123")],
);
assert_eq!(
cache.versions(ResourceType::Listener),
collect_kv_str![
("listener.local", "123"),
("listener.example.svc.cluster.local", "123"),
("cluster.example.lb.jct:8008", "123"),
],
);
assert_eq!(
cache.versions(ResourceType::RouteConfiguration),
collect_kv_str![("example-route", "123"), ("better-example-route", "123")],
);
assert!(cache
.versions(ResourceType::ClusterLoadAssignment)
.is_empty());
cache.remove(
ResourceType::Listener,
&["listener.example.svc.cluster.local".to_string()],
);
let (resources, dns) = cache.collect();
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::RouteConfiguration => Changes {
removed: collect_str!("example-route"),
..Default::default()
},
_ => Changes::default()
}
);
assert!(dns.is_noop());
assert_eq!(
cache.subscriptions(ResourceType::Listener),
vec![
"listener.example.svc.cluster.local",
"cluster.example.lb.jct:8008"
],
);
assert_eq!(
cache.versions(ResourceType::Listener),
collect_kv_str![
("listener.local", "123"),
("cluster.example.lb.jct:8008", "123"),
],
);
assert_eq!(
cache.versions(ResourceType::RouteConfiguration),
collect_kv_str![("better-example-route", "123")],
);
assert_eq!(
cache.versions(ResourceType::Cluster),
collect_kv_str![("cluster.example:8008", "123")],
);
cache.remove(ResourceType::Listener, &["listener.local".to_string()]);
let (resources, dns) = cache.collect();
assert_eq!(
dns,
DnsUpdates {
add: BTreeSet::new(),
remove: [(Hostname::from_static("cluster.example"), 8008)]
.into_iter()
.collect(),
sync: false,
}
);
assert_eq!(
resources,
enum_map::enum_map! {
ResourceType::Listener => Changes {
removed: collect_str!("cluster.example.lb.jct:8008"),
..Default::default()
},
ResourceType::RouteConfiguration => Changes {
removed: collect_str!("better-example-route"),
..Default::default()
},
ResourceType::Cluster => Changes {
removed: collect_str!("cluster.example:8008"),
..Default::default()
},
ResourceType::ClusterLoadAssignment => Changes::default(),
}
);
assert_eq!(
cache.subscriptions(ResourceType::Listener),
vec!["listener.example.svc.cluster.local"],
);
assert_eq!(cache.versions(ResourceType::Listener), collect_kv_str![]);
}
}