1use bytes::Bytes;
33use cache::{Cache, CacheReader};
34use enum_map::EnumMap;
35use futures::{FutureExt, TryStreamExt};
36use junction_api::{backend::BackendId, http::Route, Hostname, Service};
37use std::{
38 borrow::Cow, collections::BTreeSet, future::Future, io::ErrorKind, sync::Arc, time::Duration,
39};
40use tokio::sync::mpsc::{self, Receiver};
41use tokio_stream::wrappers::ReceiverStream;
42use tonic::{transport::Endpoint, Streaming};
43use tracing::debug;
44use xds_api::pb::{
45 envoy::{
46 config::core::v3 as xds_core,
47 service::discovery::v3::{
48 aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
49 DeltaDiscoveryRequest, DeltaDiscoveryResponse,
50 },
51 },
52 google::{protobuf, rpc::Status as GrpcStatus},
53};
54
55mod cache;
56
57mod resources;
58pub use resources::ResourceVersion;
59pub(crate) use resources::{ResourceType, ResourceVec};
60
61use crate::{dns::StdlibResolver, BackendLb, ConfigCache};
62
63mod csds;
64
65#[cfg(test)]
66mod test;
67
68#[derive(Debug, Default, Clone)]
71pub struct XdsConfig {
72 pub name: String,
73 pub type_url: String,
74 pub version: Option<ResourceVersion>,
75 pub xds: Option<protobuf::Any>,
76 pub last_error: Option<(ResourceVersion, String)>,
77}
78
79#[derive(Debug)]
80enum SubscriptionUpdate {
81 AddHosts(Vec<String>),
82 AddBackends(Vec<BackendId>),
83 AddEndpoints(Vec<BackendId>),
84
85 #[allow(unused)]
86 RemoveHosts(Vec<String>),
87 #[allow(unused)]
88 RemoveBackends(Vec<BackendId>),
89 #[allow(unused)]
90 RemoveEndpoints(Vec<BackendId>),
91}
92
93#[derive(Clone)]
104pub(super) struct AdsClient {
105 subs: mpsc::Sender<SubscriptionUpdate>,
106 cache: CacheReader,
107 dns: StdlibResolver,
108}
109
110impl AdsClient {
111 pub(super) fn build(
121 address: impl Into<Bytes>,
122 node_id: String,
123 cluster: String,
124 ) -> Result<(AdsClient, AdsTask), tonic::transport::Error> {
125 let endpoint = Endpoint::from_shared(address)?
127 .connect_timeout(Duration::from_secs(5))
128 .tcp_nodelay(true);
129
130 let node_info = xds_core::Node {
131 id: node_id,
132 cluster,
133 client_features: vec![
134 "envoy.lb.does_not_support_overprovisioning".to_string(),
135 "envoy.lrs.supports_send_all_clusters".to_string(),
136 ],
137 ..Default::default()
138 };
139
140 let (sub_tx, sub_rx) = mpsc::channel(10);
142 let cache = Cache::default();
143
144 let dns = StdlibResolver::new_with(Duration::from_secs(5), Duration::from_millis(500), 2);
146
147 let client = AdsClient {
148 subs: sub_tx,
149 cache: cache.reader(),
150 dns: dns.clone(),
151 };
152 let task = AdsTask {
153 endpoint,
154 initial_channel: None,
155 node_info,
156 cache,
157 dns,
158 subs: sub_rx,
159 };
160
161 Ok((client, task))
162 }
163
164 pub(super) fn csds_server(
165 &self,
166 port: u16,
167 ) -> impl Future<Output = Result<(), tonic::transport::Error>> + Send + 'static {
168 csds::local_server(self.cache.clone(), port)
169 }
170
171 pub(super) fn iter_routes(&self) -> impl Iterator<Item = Arc<Route>> + '_ {
172 self.cache.iter_routes()
173 }
174
175 pub(super) fn iter_backends(&self) -> impl Iterator<Item = Arc<BackendLb>> + '_ {
176 self.cache.iter_backends()
177 }
178
179 pub(super) fn iter_xds(&self) -> impl Iterator<Item = XdsConfig> + '_ {
180 self.cache.iter_xds()
181 }
182}
183
184impl ConfigCache for AdsClient {
190 async fn get_route<S: AsRef<str>>(&self, host: S) -> Option<Arc<Route>> {
191 let hosts = vec![host.as_ref().to_string()];
192 let _ = self.subs.send(SubscriptionUpdate::AddHosts(hosts)).await;
193
194 self.cache.get_route(host).await
195 }
196
197 async fn get_backend(
198 &self,
199 backend: &junction_api::backend::BackendId,
200 ) -> Option<std::sync::Arc<crate::BackendLb>> {
201 let bs = vec![backend.clone()];
202 let _ = self.subs.send(SubscriptionUpdate::AddBackends(bs)).await;
203
204 self.cache.get_backend(backend).await
205 }
206
207 async fn get_endpoints(
208 &self,
209 backend: &junction_api::backend::BackendId,
210 ) -> Option<std::sync::Arc<crate::EndpointGroup>> {
211 let bs = vec![backend.clone()];
212 let _ = self.subs.send(SubscriptionUpdate::AddEndpoints(bs)).await;
213
214 match &backend.service {
215 junction_api::Service::Dns(dns) => {
216 self.dns
217 .get_endpoints_await(&dns.hostname, backend.port)
218 .await
219 }
220 _ => self.cache.get_endpoints(backend).await,
221 }
222 }
223}
224
225pub(crate) struct AdsTask {
227 endpoint: tonic::transport::Endpoint,
228 initial_channel: Option<tonic::transport::Channel>,
229 node_info: xds_core::Node,
230 cache: Cache,
231 dns: StdlibResolver,
232 subs: mpsc::Receiver<SubscriptionUpdate>,
233}
234
235#[derive(Debug, thiserror::Error)]
236struct ShutdownError;
237
238impl std::fmt::Display for ShutdownError {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 write!(f, "AdsTask started after shutdown")
241 }
242}
243
244macro_rules! log_request {
245 ($request:expr) => {
246 tracing::debug!(
247 nack = $request.error_detail.is_some(),
248 "DeltaDiscoveryRequest(n={:?}, ty={:?}, r={:?}, init={:?})",
249 $request.response_nonce,
250 $request.type_url,
251 $request.resource_names_subscribe,
252 $request.initial_resource_versions,
253 );
254 };
255}
256
257macro_rules! log_response {
258 ($response:expr) => {
259 if tracing::enabled!(tracing::Level::DEBUG) {
260 let names_and_versions = names_and_versions(&$response);
261 tracing::debug!(
262 "DeltaDiscoveryResponse(n={:?}, ty={:?}, r={:?})",
263 $response.nonce,
264 $response.type_url,
265 names_and_versions,
266 );
267 }
268 };
269}
270
271fn names_and_versions(response: &DeltaDiscoveryResponse) -> Vec<(String, String)> {
272 response
273 .resources
274 .iter()
275 .map(|r| (r.name.clone(), r.version.clone()))
276 .collect()
277}
278
279impl AdsTask {
280 pub(super) fn is_shutdown(&self) -> bool {
281 self.subs.is_closed()
282 }
283
284 pub(super) async fn run(&mut self) -> Result<(), &(dyn std::error::Error + 'static)> {
285 if self.is_shutdown() {
286 return Err(&ShutdownError);
287 }
288
289 loop {
290 match self.run_connection().await {
291 Ok(()) => break,
292 Err(ConnectionError::AdsDisconnected) => continue,
294 Err(ConnectionError::Connect(e)) => {
301 debug!(err = %e, "failed to connect to ADS server");
302 tokio::time::sleep(Duration::from_secs(2)).await;
303 }
304 Err(ConnectionError::Status(status)) => {
310 let is_broken_pipe =
312 unwrap_io_error(&status).is_some_and(|e| e.kind() == ErrorKind::BrokenPipe);
313
314 if !is_broken_pipe {
315 debug!(err = %status, "ADS connection closed unexpectedly");
316 }
317
318 tokio::time::sleep(if is_broken_pipe {
319 Duration::from_millis(50)
320 } else {
321 Duration::from_secs(2)
322 })
323 .await;
324 }
325 };
326 }
327
328 Ok(())
329 }
330
331 async fn run_connection(&mut self) -> Result<(), ConnectionError> {
342 let (xds_tx, xds_rx) = tokio::sync::mpsc::channel(10);
343
344 let channel = self.new_connection().await?;
346 let mut client = AggregatedDiscoveryServiceClient::new(channel);
347 let stream_response = client
348 .delta_aggregated_resources(ReceiverStream::new(xds_rx))
349 .await?;
350 let mut incoming = stream_response.into_inner();
351
352 self.dns.set_names(self.cache.dns_names());
354
355 let (mut conn, initial_requests) =
357 AdsConnection::new(self.node_info.clone(), &mut self.cache);
358 for msg in initial_requests {
359 log_request!(msg);
360 if xds_tx.send(msg).await.is_err() {
361 return Err(ConnectionError::AdsDisconnected);
362 }
363 }
364
365 loop {
366 let is_eof = handle_update_batch(&mut conn, &mut self.subs, &mut incoming).await?;
367 if is_eof {
368 return Ok(());
369 }
370
371 let (outgoing, dns_updates) = conn.outgoing();
372 for msg in outgoing {
373 log_request!(msg);
374 if xds_tx.send(msg).await.is_err() {
375 return Err(ConnectionError::AdsDisconnected);
376 }
377 }
378 update_dns(&self.dns, dns_updates.add, dns_updates.remove);
379 }
380 }
381
382 pub(super) async fn connect(&mut self) -> Result<(), tonic::transport::Error> {
383 if self.initial_channel.is_none() {
384 let channel = self.endpoint.connect().await?;
385 self.initial_channel = Some(channel)
386 }
387
388 Ok(())
389 }
390
391 async fn new_connection(
392 &mut self,
393 ) -> Result<tonic::transport::Channel, tonic::transport::Error> {
394 match self.initial_channel.take() {
395 Some(channel) => Ok(channel),
396 None => self.endpoint.connect().await,
397 }
398 }
399}
400
401async fn handle_update_batch(
407 conn: &mut AdsConnection<'_>,
408 subs: &mut Receiver<SubscriptionUpdate>,
409 incoming: &mut Streaming<DeltaDiscoveryResponse>,
410) -> Result<bool, ConnectionError> {
411 async fn next_update(
422 conn: &mut AdsConnection<'_>,
423 subs: &mut Receiver<SubscriptionUpdate>,
424 incoming: &mut Streaming<DeltaDiscoveryResponse>,
425 ) -> Result<bool, ConnectionError> {
426 tokio::select! {
427 biased;
428
429 xds_msg = incoming.try_next() => {
430 let response = match xds_msg? {
434 Some(response) => response,
435 None => return Err(ConnectionError::AdsDisconnected),
436 };
437 log_response!(response);
438
439 tracing::trace!("ads connection: handle_ads_message");
440 conn.handle_ads_message(response);
441 }
442 sub_update = subs.recv() => {
443 let Some(sub_update) = sub_update else {
444 return Ok(true)
445 };
446
447 tracing::trace!(
448 ?sub_update,
449 "ads connection: handle_subscription_update",
450 );
451 conn.handle_subscription_update(sub_update);
452 }
453 }
454 Ok(false)
455 }
456
457 if next_update(conn, subs, incoming).await? {
459 return Ok(true);
460 }
461
462 loop {
466 let Some(should_exit) = next_update(conn, subs, incoming).now_or_never() else {
467 break;
468 };
469
470 if should_exit? {
471 return Ok(true);
472 }
473 }
474
475 Ok(false)
476}
477
478#[inline]
479fn update_dns(
480 dns: &StdlibResolver,
481 add: BTreeSet<(Hostname, u16)>,
482 remove: BTreeSet<(Hostname, u16)>,
483) {
484 for (name, port) in add {
485 dns.subscribe(name, port);
486 }
487 for (name, port) in remove {
488 dns.unsubscribe(&name, port);
489 }
490}
491
492#[derive(Debug, thiserror::Error)]
493enum ConnectionError {
494 #[error(transparent)]
495 Connect(#[from] tonic::transport::Error),
496
497 #[error(transparent)]
498 Status(#[from] tonic::Status),
499
500 #[error("ADS server closed the stream")]
501 AdsDisconnected,
502}
503
504fn unwrap_io_error(status: &tonic::Status) -> Option<&std::io::Error> {
510 let mut err: &(dyn std::error::Error + 'static) = status;
511
512 loop {
513 if let Some(e) = err.downcast_ref::<std::io::Error>() {
514 return Some(e);
515 }
516
517 if let Some(e) = err.downcast_ref::<h2::Error>().and_then(|e| e.get_io()) {
519 return Some(e);
520 }
521
522 err = err.source()?;
523 }
524}
525
526struct AdsConnection<'a> {
527 cache: &'a mut Cache,
528 node: Option<xds_core::Node>,
529 acks: EnumMap<ResourceType, Option<AckState>>,
530 unknown_types: Vec<(String, String)>,
531}
532
533#[derive(Debug, Default)]
534struct AckState {
535 nonce: String,
536 error: Option<Cow<'static, str>>,
537}
538
539impl AckState {
540 fn into_ack(self) -> (String, Option<GrpcStatus>) {
541 let nonce = self.nonce;
542 let error = self.error.map(|message| GrpcStatus {
543 message: message.to_string(),
544 code: tonic::Code::InvalidArgument.into(),
545 ..Default::default()
546 });
547
548 (nonce, error)
549 }
550}
551
552impl<'a> AdsConnection<'a> {
553 fn new(node: xds_core::Node, cache: &'a mut Cache) -> (Self, Vec<DeltaDiscoveryRequest>) {
554 let mut requests = Vec::with_capacity(ResourceType::all().len());
555
556 let mut node = Some(node);
557 for &rtype in ResourceType::all() {
558 let initial_versions = cache.versions(rtype);
559 let mut subscribe = cache.initial_subscriptions(rtype);
560 if cache.is_wildcard(rtype) && !subscribe.is_empty() {
561 subscribe.push("*".to_string());
562 }
563
564 if !cache.is_wildcard(rtype) && subscribe.is_empty() && initial_versions.is_empty() {
565 continue;
566 }
567
568 requests.push(DeltaDiscoveryRequest {
569 node: node.take(),
570 type_url: rtype.type_url().to_string(),
571 resource_names_subscribe: subscribe,
572 initial_resource_versions: initial_versions,
573 ..Default::default()
574 });
575 }
576
577 let conn = Self {
578 cache,
579 node,
580 acks: Default::default(),
581 unknown_types: Vec::new(),
582 };
583 (conn, requests)
584 }
585
586 fn outgoing(&mut self) -> (Vec<DeltaDiscoveryRequest>, DnsUpdates) {
587 let mut responses = Vec::with_capacity(ResourceType::all().len());
588
589 for (response_nonce, type_url) in std::mem::take(&mut self.unknown_types) {
594 let error_detail = Some(xds_api::pb::google::rpc::Status {
595 code: tonic::Code::InvalidArgument.into(),
596 message: "unknown type".to_string(),
597 ..Default::default()
598 });
599 responses.push(DeltaDiscoveryRequest {
600 type_url,
601 response_nonce,
602 error_detail,
603 ..Default::default()
604 })
605 }
606
607 let (resources, dns) = self.cache.collect();
609
610 for (rtype, changes) in resources {
614 let ack = self.get_ack(rtype);
615
616 if ack.is_none() && changes.is_empty() {
617 continue;
618 }
619
620 let node = self.node.take();
621 let (response_nonce, error_detail) = ack.map(|a| a.into_ack()).unwrap_or_default();
622 let resource_names_subscribe = changes.added.into_iter().collect();
623 let resource_names_unsubscribe = changes.removed.into_iter().collect();
624
625 responses.push(DeltaDiscoveryRequest {
626 node,
627 type_url: rtype.type_url().to_string(),
628 response_nonce,
629 error_detail,
630 resource_names_subscribe,
631 resource_names_unsubscribe,
632 ..Default::default()
633 })
634 }
635
636 (responses, dns)
637 }
638
639 fn handle_ads_message(&mut self, resp: DeltaDiscoveryResponse) {
640 let Some(rtype) = ResourceType::from_type_url(&resp.type_url) else {
641 tracing::trace!(type_url = %resp.type_url, "unknown type url");
642 self.set_unknown(resp.nonce, resp.type_url);
643 return;
644 };
645
646 let resources = match ResourceVec::from_resources(rtype, resp.resources) {
648 Ok(r) => r,
649 Err(e) => {
650 tracing::trace!(err = %e, "invalid proto");
651 self.set_ack(
652 rtype,
653 resp.nonce,
654 Some(format!("invalid resource: {e}").into()),
655 );
656 return;
657 }
658 };
659
660 let resource_errors = self.cache.insert(resources);
661 let error = match &resource_errors[..] {
662 &[] => None,
663 _ => Some("invalid resources".into()),
665 };
666 self.set_ack(rtype, resp.nonce, error);
667
668 self.cache.remove(rtype, &resp.removed_resources);
670 }
671
672 fn handle_subscription_update(&mut self, update: SubscriptionUpdate) {
673 match update {
674 SubscriptionUpdate::AddHosts(hosts) => {
675 for host in hosts {
676 self.cache.subscribe(ResourceType::Listener, &host);
677 }
678 }
679 SubscriptionUpdate::RemoveHosts(hosts) => {
680 for host in hosts {
681 self.cache.unsubscribe(ResourceType::Listener, &host);
682 }
683 }
684 SubscriptionUpdate::AddBackends(backends) => {
685 for backend in backends {
686 if let Service::Dns(dns) = &backend.service {
687 self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
688 }
689 self.cache.subscribe(ResourceType::Cluster, &backend.name());
690 }
691 }
692 SubscriptionUpdate::RemoveBackends(backends) => {
693 for backend in backends {
694 if let Service::Dns(dns) = &backend.service {
695 self.cache
696 .unsubscribe_dns(dns.hostname.clone(), backend.port);
697 }
698 self.cache
699 .unsubscribe(ResourceType::Cluster, &backend.name());
700 }
701 }
702 SubscriptionUpdate::AddEndpoints(backends) => {
703 for backend in backends {
704 match &backend.service {
705 Service::Dns(dns) => {
706 self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
707 }
708 _ => self
709 .cache
710 .subscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
711 }
712 }
713 }
714 SubscriptionUpdate::RemoveEndpoints(backends) => {
715 for backend in backends {
716 match &backend.service {
717 Service::Dns(dns) => {
718 self.cache
719 .unsubscribe_dns(dns.hostname.clone(), backend.port);
720 }
721 _ => self
722 .cache
723 .unsubscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
724 }
725 }
726 }
727 }
728 }
729
730 fn set_unknown(&mut self, nonce: String, type_url: String) {
731 self.unknown_types.push((nonce, type_url))
732 }
733
734 fn set_ack(&mut self, rtype: ResourceType, nonce: String, error: Option<Cow<'static, str>>) {
735 self.acks[rtype] = Some(AckState { nonce, error })
736 }
737
738 fn get_ack(&mut self, rtype: ResourceType) -> Option<AckState> {
739 self.acks[rtype].take()
740 }
741}
742
743#[derive(Debug, Default, PartialEq, Eq)]
744struct DnsUpdates {
745 add: BTreeSet<(Hostname, u16)>,
746 remove: BTreeSet<(Hostname, u16)>,
747 sync: bool,
748}
749
750#[cfg(test)]
751impl DnsUpdates {
752 fn is_noop(&self) -> bool {
753 self.add.is_empty() && self.remove.is_empty() && !self.sync
754 }
755}
756
757#[cfg(test)]
758mod test_ads_conn {
759 use cache::Cache;
760 use once_cell::sync::Lazy;
761 use pretty_assertions::assert_eq;
762 use xds_api::pb::envoy::service::discovery::v3 as xds_discovery;
763
764 use super::test as xds_test;
765 use super::*;
766
767 static TEST_NODE: Lazy<xds_core::Node> = Lazy::new(|| xds_core::Node {
768 id: "unit-test".to_string(),
769 ..Default::default()
770 });
771
772 #[track_caller]
775 fn new_conn(cache: &mut Cache) -> (AdsConnection, Vec<DeltaDiscoveryRequest>) {
776 let (conn, mut outgoing) = AdsConnection::new(TEST_NODE.clone(), cache);
777
778 if let Some(first) = outgoing.first_mut() {
780 let node = first
781 .node
782 .take()
783 .expect("expected first outgoing request to have a node");
784
785 assert_eq!(node, *TEST_NODE);
786 };
787
788 (conn, outgoing)
789 }
790
791 #[test]
792 fn test_init_empty_wildcard() {
793 let mut cache = Cache::default();
794 cache.set_wildcard(ResourceType::Listener, true);
795 cache.set_wildcard(ResourceType::Cluster, true);
796
797 let (_, outgoing) = new_conn(&mut cache);
798
799 assert_eq!(
800 outgoing,
801 vec![
802 xds_test::req!(t = ResourceType::Cluster),
803 xds_test::req!(t = ResourceType::Listener),
804 ]
805 )
806 }
807
808 #[test]
809 fn test_init_empty_explicit() {
810 let mut cache = Cache::default();
811 cache.set_wildcard(ResourceType::Listener, false);
812 cache.set_wildcard(ResourceType::Cluster, false);
813
814 let (_, outgoing) = new_conn(&mut cache);
815 assert!(outgoing.is_empty());
816 }
817
818 #[test]
819 fn test_init_subscription_wildcard() {
820 let mut cache = Cache::default();
821 cache.set_wildcard(ResourceType::Listener, false);
822 cache.set_wildcard(ResourceType::Cluster, true);
823
824 cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
825 cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
826
827 let (_, outgoing) = new_conn(&mut cache);
830 assert_eq!(
831 outgoing,
832 vec![
833 xds_test::req!(
834 t = ResourceType::Cluster,
835 add = vec!["cluster.example:7891", "*"],
836 init = vec![],
837 ),
838 xds_test::req!(
839 t = ResourceType::ClusterLoadAssignment,
840 add = vec!["cluster.example:7891",],
841 init = vec![],
842 )
843 ]
844 );
845 }
846
847 #[test]
848 fn test_init_subscription_explicit() {
849 let mut cache = Cache::default();
850 cache.set_wildcard(ResourceType::Listener, false);
851 cache.set_wildcard(ResourceType::Cluster, false);
852
853 cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
854 cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
855
856 let (_, outgoing) = new_conn(&mut cache);
857 assert_eq!(
858 outgoing,
859 vec![
860 xds_test::req!(
861 t = ResourceType::Cluster,
862 add = vec!["cluster.example:7891",],
863 init = vec![],
864 ),
865 xds_test::req!(
866 t = ResourceType::ClusterLoadAssignment,
867 add = vec!["cluster.example:7891",],
868 init = vec![],
869 ),
870 ]
871 );
872 }
873
874 #[test]
875 fn test_init_initial_versions() {
876 let mut cache = Cache::default();
877 assert!(cache.is_wildcard(ResourceType::Listener));
878 assert!(!cache.is_wildcard(ResourceType::RouteConfiguration));
879
880 cache.insert(ResourceVec::from_listeners(
881 "123".into(),
882 vec![xds_test::listener!("cooler.example.org", "cool-route")],
883 ));
884 cache.insert(ResourceVec::from_listeners(
885 "456".into(),
886 vec![xds_test::listener!("warmer.example.org", "warm-route")],
887 ));
888 cache.insert(ResourceVec::from_route_configs(
889 "789".into(),
890 vec![xds_test::route_config!(
891 "cool-route",
892 vec![xds_test::vhost!(
893 "an-vhost",
894 ["cooler.example.org"],
895 [xds_test::route!(default "cooler.example.internal:8008")]
896 )]
897 )],
898 ));
899
900 let (_, outgoing) = new_conn(&mut cache);
903 assert_eq!(
904 outgoing,
905 vec![
906 xds_test::req!(
907 t = ResourceType::Cluster,
908 add = vec!["cooler.example.internal:8008", "*"],
909 init = vec![],
910 ),
911 xds_test::req!(
912 t = ResourceType::Listener,
913 add = vec![],
914 init = vec![("cooler.example.org", "123"), ("warmer.example.org", "456"),]
915 ),
916 xds_test::req!(
917 t = ResourceType::RouteConfiguration,
918 add = vec!["warm-route"],
919 init = vec![("cool-route", "789")]
920 ),
921 ],
922 );
923 }
924
925 #[test]
926 fn test_handle_subscribe_hostname() {
927 let mut cache = Cache::default();
928 let (mut conn, _) = new_conn(&mut cache);
929
930 conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![
931 Service::dns("website.internal").unwrap().name(),
932 Service::kube("default", "nginx")
933 .unwrap()
934 .as_backend_id(4443)
935 .name(),
936 ]));
937
938 let (outgoing, dns) = conn.outgoing();
939 assert!(dns.is_noop());
941 assert_eq!(
942 outgoing,
943 vec![xds_test::req!(
944 t = ResourceType::Listener,
945 add = vec!["nginx.default.svc.cluster.local:4443", "website.internal"],
946 )]
947 );
948 }
949
950 #[test]
951 fn test_handle_subscribe_backend() {
952 let mut cache = Cache::default();
953 let (mut conn, _) = new_conn(&mut cache);
954
955 conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![
956 Service::dns("website.internal").unwrap().as_backend_id(80),
957 Service::kube("default", "nginx")
958 .unwrap()
959 .as_backend_id(4443),
960 ]));
961
962 let (outgoing, dns) = conn.outgoing();
963 assert_eq!(
965 dns,
966 DnsUpdates {
967 add: [(Hostname::from_static("website.internal"), 80)]
968 .into_iter()
969 .collect(),
970 ..Default::default()
971 }
972 );
973
974 assert_eq!(
976 outgoing,
977 vec![xds_test::req!(
978 t = ResourceType::Cluster,
979 add = vec![
980 "nginx.default.svc.cluster.local:4443",
981 "website.internal:80"
982 ],
983 )]
984 );
985 }
986
987 #[test]
988 fn test_handle_ads_message_listener_route() {
989 let mut cache = Cache::default();
990 assert!(cache.is_wildcard(ResourceType::Listener));
991
992 let (mut conn, _) = new_conn(&mut cache);
993
994 conn.handle_ads_message(xds_test::resp!(
995 n = "1",
996 add = ResourceVec::from_listeners(
997 "123".into(),
998 vec![xds_test::listener!("cooler.example.org", "cool-route")],
999 ),
1000 remove = vec![],
1001 ));
1002 conn.handle_ads_message(xds_test::resp!(
1003 n = "2",
1004 add = ResourceVec::from_listeners(
1005 "456".into(),
1006 vec![xds_test::listener!("warmer.example.org", "warm-route")],
1007 ),
1008 remove = vec![],
1009 ));
1010 conn.handle_ads_message(xds_test::resp!(
1011 n = "3",
1012 add = ResourceVec::from_route_configs(
1013 "789".into(),
1014 vec![xds_test::route_config!(
1015 "cool-route",
1016 vec![xds_test::vhost!(
1017 "an-vhost",
1018 ["cooler.example.org"],
1019 [xds_test::route!(default "cooler.example.internal:8008")]
1020 )]
1021 )],
1022 ),
1023 remove = vec![],
1024 ));
1025
1026 let (outgoing, dns) = conn.outgoing();
1027 assert!(dns.is_noop());
1029
1030 assert_eq!(
1031 outgoing,
1032 vec![
1033 xds_test::req!(
1035 t = ResourceType::Cluster,
1036 add = vec!["cooler.example.internal:8008"]
1037 ),
1038 xds_test::req!(t = ResourceType::Listener, n = "2"),
1040 xds_test::req!(
1042 t = ResourceType::RouteConfiguration,
1043 n = "3",
1044 add = vec!["warm-route"]
1045 ),
1046 ],
1047 );
1048 }
1049
1050 #[test]
1051 fn test_handle_ads_message_listener_removed() {
1052 let mut cache = Cache::default();
1053 assert!(cache.is_wildcard(ResourceType::Listener));
1054
1055 let (mut conn, _) = new_conn(&mut cache);
1056
1057 conn.handle_ads_message(xds_test::resp!(
1058 n = "1",
1059 add = ResourceVec::from_listeners(
1060 "123".into(),
1061 vec![xds_test::listener!("cooler.example.org", "cool-route")],
1062 ),
1063 remove = vec![],
1064 ));
1065 conn.handle_ads_message(xds_test::resp!(
1066 n = "2",
1067 add = ResourceVec::from_listeners(
1068 "456".into(),
1069 vec![xds_test::listener!("warmer.example.org", "warm-route")],
1070 ),
1071 remove = vec![],
1072 ));
1073 conn.handle_ads_message(xds_test::resp!(
1074 n = "3",
1075 add = ResourceVec::from_route_configs(
1076 "789".into(),
1077 vec![xds_test::route_config!(
1078 "cool-route",
1079 vec![xds_test::vhost!(
1080 "an-vhost",
1081 ["cooler.example.org"],
1082 [xds_test::route!(default "cooler.example.internal:8008")]
1083 )]
1084 )],
1085 ),
1086 remove = vec![],
1087 ));
1088
1089 let (outgoing, dns) = conn.outgoing();
1090 assert!(dns.is_noop());
1092
1093 assert_eq!(
1094 outgoing,
1095 vec![
1096 xds_test::req!(
1098 t = ResourceType::Cluster,
1099 add = vec!["cooler.example.internal:8008"]
1100 ),
1101 xds_test::req!(t = ResourceType::Listener, n = "2"),
1103 xds_test::req!(
1105 t = ResourceType::RouteConfiguration,
1106 n = "3",
1107 add = vec!["warm-route"]
1108 ),
1109 ],
1110 );
1111
1112 conn.handle_ads_message(xds_test::resp!(
1114 n = "4",
1115 add = ResourceVec::from_listeners("123".into(), vec![]),
1116 remove = vec!["warmer.example.org"],
1117 ));
1118
1119 let (outgoing, dns) = conn.outgoing();
1120 assert!(dns.is_noop());
1121 assert_eq!(
1122 outgoing,
1123 vec![
1124 xds_test::req!(t = ResourceType::Listener, n = "4"),
1126 xds_test::req!(
1128 t = ResourceType::RouteConfiguration,
1129 remove = vec!["warm-route"],
1130 ),
1131 ]
1132 );
1133 }
1134
1135 #[test]
1136 fn test_handle_ads_message_cluster_cla() {
1137 let mut cache = Cache::default();
1138 assert!(cache.is_wildcard(ResourceType::Cluster));
1139
1140 let (mut conn, _) = new_conn(&mut cache);
1141
1142 conn.handle_ads_message(xds_test::resp!(
1143 n = "1",
1144 add = ResourceVec::from_clusters(
1145 "123".into(),
1146 vec![
1147 xds_test::cluster!("cooler.example.org:2345"),
1148 xds_test::cluster!("thing.default.svc.cluster.local:9876"),
1149 ],
1150 ),
1151 remove = vec![],
1152 ));
1153 conn.handle_ads_message(xds_test::resp!(
1154 n = "2",
1155 add = ResourceVec::from_load_assignments(
1156 "123".into(),
1157 vec![xds_test::cla!(
1158 "thing.default.svc.cluster.local:9876" => {
1159 "zone1" => ["1.1.1.1"]
1160 }
1161 )],
1162 ),
1163 remove = vec![],
1164 ));
1165 conn.handle_ads_message(xds_test::resp!(
1166 n = "3",
1167 add = ResourceVec::from_listeners("555".into(), vec![
1168 xds_test::listener!("cooler.example.org.lb.jct:2345", "lb-route" => [xds_test::vhost!(
1169 "lb-vhost",
1170 ["cooler.example.org.lb.jct:2345"],
1171 [xds_test::route!(default ring_hash = "x-user", "cooler.example.org:2345")],
1172 )]),
1173 xds_test::listener!("thing.default.svc.cluster.local.lb.jct:9876", "lb-route" => [xds_test::vhost!(
1174 "lb-vhost",
1175 ["cooler.example.org.lb.jct:2345"],
1176 [xds_test::route!(default ring_hash = "x-user", "thing.default.svc.cluster.local:9876")],
1177 )])
1178 ]),
1179 remove = vec![],
1180 ));
1181
1182 let (outgoing, dns) = conn.outgoing();
1183 assert_eq!(
1185 dns,
1186 DnsUpdates {
1187 add: [(Hostname::from_static("cooler.example.org"), 2345)]
1188 .into_iter()
1189 .collect(),
1190 ..Default::default()
1191 }
1192 );
1193 assert_eq!(
1195 outgoing,
1196 vec![
1197 xds_test::req!(t = ResourceType::Cluster, n = "1"),
1198 xds_test::req!(t = ResourceType::ClusterLoadAssignment, n = "2"),
1199 xds_test::req!(t = ResourceType::Listener, n = "3"),
1200 ]
1201 );
1202 }
1203
1204 #[test]
1205 fn test_set_node_after_init() {
1206 let mut cache = Cache::default();
1207 for rtype in ResourceType::all() {
1208 cache.set_wildcard(*rtype, false);
1209 }
1210
1211 let (mut conn, outgoing) = new_conn(&mut cache);
1212 assert!(outgoing.is_empty());
1213
1214 let svc = Service::dns("website.internal").unwrap().as_backend_id(80);
1215 conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![svc]));
1216
1217 let (outgoing, _) = conn.outgoing();
1218 assert_eq!(outgoing[0].node.as_ref(), Some(&*TEST_NODE));
1219 }
1220
1221 #[test]
1222 fn test_handle_unknown_type_url() {
1223 let mut cache = Cache::default();
1224 let (mut conn, _) = new_conn(&mut cache);
1225
1226 conn.handle_ads_message(DeltaDiscoveryResponse {
1227 type_url: "made.up.type_url/Potato".to_string(),
1228 ..Default::default()
1229 });
1230
1231 let (outgoing, dns) = conn.outgoing();
1232 assert!(dns.is_noop());
1233 assert_eq!(
1234 outgoing,
1235 vec![DeltaDiscoveryRequest {
1236 type_url: "made.up.type_url/Potato".to_string(),
1237 error_detail: Some(xds_api::pb::google::rpc::Status {
1238 code: tonic::Code::InvalidArgument.into(),
1239 message: "unknown type".to_string(),
1240 ..Default::default()
1241 }),
1242 ..Default::default()
1243 }]
1244 );
1245 }
1246
1247 #[test]
1248 fn test_handle_invalid_resource() {
1249 let mut cache = Cache::default();
1250 let (mut conn, _) = new_conn(&mut cache);
1251
1252 let node = xds_core::Node {
1253 id: "some-node".to_string(),
1254 ..Default::default()
1255 };
1256 conn.handle_ads_message(DeltaDiscoveryResponse {
1257 type_url: ResourceType::Listener.type_url().to_string(),
1258 resources: vec![xds_discovery::Resource {
1259 resource: Some(protobuf::Any::from_msg(&node).unwrap()),
1260 ..Default::default()
1261 }],
1262 ..Default::default()
1263 });
1264
1265 let (outgoing, dns) = conn.outgoing();
1266 assert!(dns.is_noop());
1267 assert!(matches!(
1268 &outgoing[..],
1269 [DeltaDiscoveryRequest { type_url, error_detail, ..}] if
1270 type_url == ResourceType::Listener.type_url() &&
1271 error_detail.as_ref().is_some_and(|e| e.message.starts_with("invalid resource"))
1272 ));
1273 }
1274
1275 #[test]
1276 fn test_handle_does_not_exist() {
1277 let mut cache = Cache::default();
1278 let (mut conn, _) = new_conn(&mut cache);
1279
1280 let does_not_exist = Service::dns("website.internal").unwrap().name();
1282 conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![does_not_exist.clone()]));
1283 let _ = conn.outgoing();
1284
1285 conn.handle_ads_message(DeltaDiscoveryResponse {
1286 nonce: "boo".to_string(),
1287 type_url: ResourceType::Listener.type_url().to_string(),
1288 removed_resources: vec![does_not_exist.clone()],
1289 ..Default::default()
1290 });
1291
1292 let (outgoing, dns) = conn.outgoing();
1294 assert!(dns.is_noop());
1295 assert_eq!(
1296 outgoing,
1297 vec![xds_test::req!(t = ResourceType::Listener, n = "boo")],
1298 );
1299
1300 let route = cache
1302 .reader()
1303 .get_route("website.internal")
1304 .now_or_never()
1305 .unwrap();
1306 assert_eq!(route, None);
1307 }
1308}