junction_core/
xds.rs

1//! Junction XDS.
2//!
3//! This module contains [AdsClient], the interface between Junction and XDS. A
4//! client is a long-lived pile of state that gets wrapped by one or more Junction
5//! Clients to present Route and Backend data to the world.
6//!
7//! The stateful internals of a client are written in a sans-io way as much as
8//! possible to make it easier to test and verify the complexity of ADS. Most of
9//! the nasty bits are wrapped up in the [cache] module - a Cache is responsible
10//! for parsing and storing raw xDS and its Junction equivalent, and for tracking
11//! the relationship between resources.
12//!
13//! An [AdsConnection] wraps a cache and a DNS resolver and multiplexes the
14//! input from remote connections, subscriptions from clients and uses the
15//! state of the cache to register interest in DNS names and to subscribe to
16//! xDS resources.
17//!
18//! The [AdsTask] returned from a client is the actual io in this module - an
19//! [AdsTask] actually does gRPC and listens on sockets and drives a new
20//! [AdsConnection] every time it reconnects.
21
22//  # TODO
23//
24// - Figure out how to run a Client without an upstream ADS server. Right now
25//   we don't process subscription updates until a gRPC connection gets
26//   established which seems bad.
27//
28//  - XDS client features:
29//    `envoy.lb.does_not_support_overprovisioning` and friends. See
30//    <https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md>.
31
32use bytes::Bytes;
33use cache::{Cache, CacheReader};
34use enum_map::EnumMap;
35use futures::{FutureExt, TryStreamExt};
36use junction_api::{backend::BackendId, http::Route, Hostname, Service};
37use std::{
38    borrow::Cow, collections::BTreeSet, future::Future, io::ErrorKind, sync::Arc, time::Duration,
39};
40use tokio::sync::mpsc::{self, Receiver};
41use tokio_stream::wrappers::ReceiverStream;
42use tonic::{transport::Endpoint, Streaming};
43use tracing::debug;
44use xds_api::pb::{
45    envoy::{
46        config::core::v3 as xds_core,
47        service::discovery::v3::{
48            aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
49            DeltaDiscoveryRequest, DeltaDiscoveryResponse,
50        },
51    },
52    google::{protobuf, rpc::Status as GrpcStatus},
53};
54
55mod cache;
56
57mod resources;
58pub use resources::ResourceVersion;
59pub(crate) use resources::{ResourceType, ResourceVec};
60
61use crate::{dns::StdlibResolver, BackendLb, ConfigCache};
62
63mod csds;
64
65#[cfg(test)]
66mod test;
67
68/// A single xDS configuration object, with additional metadata about when it
69/// was fetched and processed.
70#[derive(Debug, Default, Clone)]
71pub struct XdsConfig {
72    pub name: String,
73    pub type_url: String,
74    pub version: Option<ResourceVersion>,
75    pub xds: Option<protobuf::Any>,
76    pub last_error: Option<(ResourceVersion, String)>,
77}
78
79#[derive(Debug)]
80enum SubscriptionUpdate {
81    AddHosts(Vec<String>),
82    AddBackends(Vec<BackendId>),
83    AddEndpoints(Vec<BackendId>),
84
85    #[allow(unused)]
86    RemoveHosts(Vec<String>),
87    #[allow(unused)]
88    RemoveBackends(Vec<BackendId>),
89    #[allow(unused)]
90    RemoveEndpoints(Vec<BackendId>),
91}
92
93/// A Junction ADS client that manages long-lived xDS state by connecting to a
94/// remote server.
95///
96/// The client presents downstream as a [ConfigCache] so that a client can query
97/// `Route` and `Backend` data. It also exposes a subscription interface for
98/// both so that clients can register interest without having to know about the
99/// details of xDS.
100///
101/// See the module docs for the general design of this whole module and how the
102/// client pulls it all together.
103#[derive(Clone)]
104pub(super) struct AdsClient {
105    subs: mpsc::Sender<SubscriptionUpdate>,
106    cache: CacheReader,
107    dns: StdlibResolver,
108}
109
110impl AdsClient {
111    /// Create a new paired `AdsClient`` and `AdsTask`.
112    ///
113    /// A single `AdsTask` is expected to run in the background and communicate
114    /// with an ADS service, while any number of `AdsClient`s can use it to read
115    /// and request discovery data.
116    ///
117    /// This method doesn't start the background work necessary to communicate with
118    /// an ADS server. To do that, call the [run][AdsTask::run] method on the returned
119    /// `AdsTask`.
120    pub(super) fn build(
121        address: impl Into<Bytes>,
122        node_id: String,
123        cluster: String,
124    ) -> Result<(AdsClient, AdsTask), tonic::transport::Error> {
125        // FIXME: make this configurable
126        let endpoint = Endpoint::from_shared(address)?
127            .connect_timeout(Duration::from_secs(5))
128            .tcp_nodelay(true);
129
130        let node_info = xds_core::Node {
131            id: node_id,
132            cluster,
133            client_features: vec![
134                "envoy.lb.does_not_support_overprovisioning".to_string(),
135                "envoy.lrs.supports_send_all_clusters".to_string(),
136            ],
137            ..Default::default()
138        };
139
140        // TODO: how should we pick this number?
141        let (sub_tx, sub_rx) = mpsc::channel(10);
142        let cache = Cache::default();
143
144        // FIXME: make this configurable
145        let dns = StdlibResolver::new_with(Duration::from_secs(5), Duration::from_millis(500), 2);
146
147        let client = AdsClient {
148            subs: sub_tx,
149            cache: cache.reader(),
150            dns: dns.clone(),
151        };
152        let task = AdsTask {
153            endpoint,
154            initial_channel: None,
155            node_info,
156            cache,
157            dns,
158            subs: sub_rx,
159        };
160
161        Ok((client, task))
162    }
163
164    pub(super) fn csds_server(
165        &self,
166        port: u16,
167    ) -> impl Future<Output = Result<(), tonic::transport::Error>> + Send + 'static {
168        csds::local_server(self.cache.clone(), port)
169    }
170
171    pub(super) fn iter_routes(&self) -> impl Iterator<Item = Arc<Route>> + '_ {
172        self.cache.iter_routes()
173    }
174
175    pub(super) fn iter_backends(&self) -> impl Iterator<Item = Arc<BackendLb>> + '_ {
176        self.cache.iter_backends()
177    }
178
179    pub(super) fn iter_xds(&self) -> impl Iterator<Item = XdsConfig> + '_ {
180        self.cache.iter_xds()
181    }
182}
183
184// TODO: the whole add-a-subscription-on-get thing is a bit werid but we don't
185// have a better signal yet. there probably is one, but we need some way to
186// distinguish between "get_endpoints was called because client.resolve_http was
187// called and its downstream of a listener" and "get_endpoints was called
188// because there is a DNS cluster in a static config".
189impl ConfigCache for AdsClient {
190    async fn get_route<S: AsRef<str>>(&self, host: S) -> Option<Arc<Route>> {
191        let hosts = vec![host.as_ref().to_string()];
192        let _ = self.subs.send(SubscriptionUpdate::AddHosts(hosts)).await;
193
194        self.cache.get_route(host).await
195    }
196
197    async fn get_backend(
198        &self,
199        backend: &junction_api::backend::BackendId,
200    ) -> Option<std::sync::Arc<crate::BackendLb>> {
201        let bs = vec![backend.clone()];
202        let _ = self.subs.send(SubscriptionUpdate::AddBackends(bs)).await;
203
204        self.cache.get_backend(backend).await
205    }
206
207    async fn get_endpoints(
208        &self,
209        backend: &junction_api::backend::BackendId,
210    ) -> Option<std::sync::Arc<crate::EndpointGroup>> {
211        let bs = vec![backend.clone()];
212        let _ = self.subs.send(SubscriptionUpdate::AddEndpoints(bs)).await;
213
214        match &backend.service {
215            junction_api::Service::Dns(dns) => {
216                self.dns
217                    .get_endpoints_await(&dns.hostname, backend.port)
218                    .await
219            }
220            _ => self.cache.get_endpoints(backend).await,
221        }
222    }
223}
224
225/// The IO-doing, gRPC adjacent part of running an ADS client.
226pub(crate) struct AdsTask {
227    endpoint: tonic::transport::Endpoint,
228    initial_channel: Option<tonic::transport::Channel>,
229    node_info: xds_core::Node,
230    cache: Cache,
231    dns: StdlibResolver,
232    subs: mpsc::Receiver<SubscriptionUpdate>,
233}
234
235#[derive(Debug, thiserror::Error)]
236struct ShutdownError;
237
238impl std::fmt::Display for ShutdownError {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        write!(f, "AdsTask started after shutdown")
241    }
242}
243
244macro_rules! log_request {
245    ($request:expr) => {
246        tracing::debug!(
247            nack = $request.error_detail.is_some(),
248            "DeltaDiscoveryRequest(n={:?}, ty={:?}, r={:?}, init={:?})",
249            $request.response_nonce,
250            $request.type_url,
251            $request.resource_names_subscribe,
252            $request.initial_resource_versions,
253        );
254    };
255}
256
257macro_rules! log_response {
258    ($response:expr) => {
259        if tracing::enabled!(tracing::Level::DEBUG) {
260            let names_and_versions = names_and_versions(&$response);
261            tracing::debug!(
262                "DeltaDiscoveryResponse(n={:?}, ty={:?}, r={:?})",
263                $response.nonce,
264                $response.type_url,
265                names_and_versions,
266            );
267        }
268    };
269}
270
271fn names_and_versions(response: &DeltaDiscoveryResponse) -> Vec<(String, String)> {
272    response
273        .resources
274        .iter()
275        .map(|r| (r.name.clone(), r.version.clone()))
276        .collect()
277}
278
279impl AdsTask {
280    pub(super) fn is_shutdown(&self) -> bool {
281        self.subs.is_closed()
282    }
283
284    pub(super) async fn run(&mut self) -> Result<(), &(dyn std::error::Error + 'static)> {
285        if self.is_shutdown() {
286            return Err(&ShutdownError);
287        }
288
289        loop {
290            match self.run_connection().await {
291                Ok(()) => break,
292                // on an ADS disconnect, just reconnect
293                Err(ConnectionError::AdsDisconnected) => continue,
294                // On a connection error, reconnect with a backoff and try to
295                // find a new ADS server.
296                //
297                // There's no great way to distingush between a connection
298                // that's never going to work and a temporary (but long) outage,
299                // so we'll just patiently keep trying.
300                Err(ConnectionError::Connect(e)) => {
301                    debug!(err = %e, "failed to connect to ADS server");
302                    tokio::time::sleep(Duration::from_secs(2)).await;
303                }
304                // The stream closed with a Tonic error. This is usually either
305                // a broken pipe or some other kind of IO error.
306                //
307                // There's also nothing to do here but log an error and
308                // continue, but don't wait too long on broken pipe.
309                Err(ConnectionError::Status(status)) => {
310                    // FIXME: emit an event with tracing or metrics or something here
311                    let is_broken_pipe =
312                        unwrap_io_error(&status).is_some_and(|e| e.kind() == ErrorKind::BrokenPipe);
313
314                    if !is_broken_pipe {
315                        debug!(err = %status, "ADS connection closed unexpectedly");
316                    }
317
318                    tokio::time::sleep(if is_broken_pipe {
319                        Duration::from_millis(50)
320                    } else {
321                        Duration::from_secs(2)
322                    })
323                    .await;
324                }
325            };
326        }
327
328        Ok(())
329    }
330
331    // TODO: can we split this even further from IO so we can run it without an
332    // active server? it would be nice to process subscription updates even
333    // while the connection is dead, and might allow adding static resources
334    // directly to a cache instead of keeping a separate static cache.
335    //
336    // To do it in a resasonable way, we need to pull the GRPC connection out
337    // of here. right now this async fn is implicitly a single-connection state
338    // machine - we could keep that and have a separate disconnected loop that
339    // we transition into, or we could pass a "NewConnection" message into here
340    // and manually manage connected vs. disconnected state.
341    async fn run_connection(&mut self) -> Result<(), ConnectionError> {
342        let (xds_tx, xds_rx) = tokio::sync::mpsc::channel(10);
343
344        // set up the gRPC stream
345        let channel = self.new_connection().await?;
346        let mut client = AggregatedDiscoveryServiceClient::new(channel);
347        let stream_response = client
348            .delta_aggregated_resources(ReceiverStream::new(xds_rx))
349            .await?;
350        let mut incoming = stream_response.into_inner();
351
352        // set DNS names
353        self.dns.set_names(self.cache.dns_names());
354
355        // set up the xDS connection and start sending messages
356        let (mut conn, initial_requests) =
357            AdsConnection::new(self.node_info.clone(), &mut self.cache);
358        for msg in initial_requests {
359            log_request!(msg);
360            if xds_tx.send(msg).await.is_err() {
361                return Err(ConnectionError::AdsDisconnected);
362            }
363        }
364
365        loop {
366            let is_eof = handle_update_batch(&mut conn, &mut self.subs, &mut incoming).await?;
367            if is_eof {
368                return Ok(());
369            }
370
371            let (outgoing, dns_updates) = conn.outgoing();
372            for msg in outgoing {
373                log_request!(msg);
374                if xds_tx.send(msg).await.is_err() {
375                    return Err(ConnectionError::AdsDisconnected);
376                }
377            }
378            update_dns(&self.dns, dns_updates.add, dns_updates.remove);
379        }
380    }
381
382    pub(super) async fn connect(&mut self) -> Result<(), tonic::transport::Error> {
383        if self.initial_channel.is_none() {
384            let channel = self.endpoint.connect().await?;
385            self.initial_channel = Some(channel)
386        }
387
388        Ok(())
389    }
390
391    async fn new_connection(
392        &mut self,
393    ) -> Result<tonic::transport::Channel, tonic::transport::Error> {
394        match self.initial_channel.take() {
395            Some(channel) => Ok(channel),
396            None => self.endpoint.connect().await,
397        }
398    }
399}
400
401// handle a batch of incoming messages/subscriptions.
402//
403// awaits until an update is recvd from either subscriptions or xds, and then
404// immediately grabs any pending updates as well. returns as soon as there's
405// nothing to immediately do and handling updates would block.
406async fn handle_update_batch(
407    conn: &mut AdsConnection<'_>,
408    subs: &mut Receiver<SubscriptionUpdate>,
409    incoming: &mut Streaming<DeltaDiscoveryResponse>,
410) -> Result<bool, ConnectionError> {
411    // handle the next possible input. runs a biased select over gRPC and
412    // subscription inputs.
413    //
414    // this function is inlined here because:
415    // - abstracting a handle_batch method is miserable, the type system makes
416    //   it hard to abstract over a bunch of mut references like this.
417    // - there is no reason, even just testing, to run this function by
418    //   itself
419    //
420    // it's a bit weird to inline, but only a bit
421    async fn next_update(
422        conn: &mut AdsConnection<'_>,
423        subs: &mut Receiver<SubscriptionUpdate>,
424        incoming: &mut Streaming<DeltaDiscoveryResponse>,
425    ) -> Result<bool, ConnectionError> {
426        tokio::select! {
427            biased;
428
429            xds_msg = incoming.try_next() => {
430                // on GRPC status errors, the connection has died and we're
431                // going to reconnect. pass the error up to reset things
432                // and move on.
433                let response = match xds_msg? {
434                    Some(response) => response,
435                    None => return Err(ConnectionError::AdsDisconnected),
436                };
437                log_response!(response);
438
439                tracing::trace!("ads connection: handle_ads_message");
440                conn.handle_ads_message(response);
441            }
442            sub_update = subs.recv() => {
443                let Some(sub_update) = sub_update else {
444                    return Ok(true)
445                };
446
447                tracing::trace!(
448                    ?sub_update,
449                    "ads connection: handle_subscription_update",
450                );
451                conn.handle_subscription_update(sub_update);
452            }
453        }
454        Ok(false)
455    }
456
457    // await the next update
458    if next_update(conn, subs, incoming).await? {
459        return Ok(true);
460    }
461
462    // try to handle any immediately pending updates. do not await, there is
463    // probably some work to be done to handle effects now, so we should
464    // return back to the caller.
465    loop {
466        let Some(should_exit) = next_update(conn, subs, incoming).now_or_never() else {
467            break;
468        };
469
470        if should_exit? {
471            return Ok(true);
472        }
473    }
474
475    Ok(false)
476}
477
478#[inline]
479fn update_dns(
480    dns: &StdlibResolver,
481    add: BTreeSet<(Hostname, u16)>,
482    remove: BTreeSet<(Hostname, u16)>,
483) {
484    for (name, port) in add {
485        dns.subscribe(name, port);
486    }
487    for (name, port) in remove {
488        dns.unsubscribe(&name, port);
489    }
490}
491
492#[derive(Debug, thiserror::Error)]
493enum ConnectionError {
494    #[error(transparent)]
495    Connect(#[from] tonic::transport::Error),
496
497    #[error(transparent)]
498    Status(#[from] tonic::Status),
499
500    #[error("ADS server closed the stream")]
501    AdsDisconnected,
502}
503
504/// Returns `true` if this tonic [Status] was caused by a [std::io::Error].
505///
506/// Adapted from the `tonic` examples.
507///
508/// https://github.com/hyperium/tonic/blob/941726cc46b995dcc393c9d2b462d440bd3514f3/examples/src/streaming/server.rs#L15
509fn unwrap_io_error(status: &tonic::Status) -> Option<&std::io::Error> {
510    let mut err: &(dyn std::error::Error + 'static) = status;
511
512    loop {
513        if let Some(e) = err.downcast_ref::<std::io::Error>() {
514            return Some(e);
515        }
516
517        // https://github.com/hyperium/h2/pull/462
518        if let Some(e) = err.downcast_ref::<h2::Error>().and_then(|e| e.get_io()) {
519            return Some(e);
520        }
521
522        err = err.source()?;
523    }
524}
525
526struct AdsConnection<'a> {
527    cache: &'a mut Cache,
528    node: Option<xds_core::Node>,
529    acks: EnumMap<ResourceType, Option<AckState>>,
530    unknown_types: Vec<(String, String)>,
531}
532
533#[derive(Debug, Default)]
534struct AckState {
535    nonce: String,
536    error: Option<Cow<'static, str>>,
537}
538
539impl AckState {
540    fn into_ack(self) -> (String, Option<GrpcStatus>) {
541        let nonce = self.nonce;
542        let error = self.error.map(|message| GrpcStatus {
543            message: message.to_string(),
544            code: tonic::Code::InvalidArgument.into(),
545            ..Default::default()
546        });
547
548        (nonce, error)
549    }
550}
551
552impl<'a> AdsConnection<'a> {
553    fn new(node: xds_core::Node, cache: &'a mut Cache) -> (Self, Vec<DeltaDiscoveryRequest>) {
554        let mut requests = Vec::with_capacity(ResourceType::all().len());
555
556        let mut node = Some(node);
557        for &rtype in ResourceType::all() {
558            let initial_versions = cache.versions(rtype);
559            let mut subscribe = cache.initial_subscriptions(rtype);
560            if cache.is_wildcard(rtype) && !subscribe.is_empty() {
561                subscribe.push("*".to_string());
562            }
563
564            if !cache.is_wildcard(rtype) && subscribe.is_empty() && initial_versions.is_empty() {
565                continue;
566            }
567
568            requests.push(DeltaDiscoveryRequest {
569                node: node.take(),
570                type_url: rtype.type_url().to_string(),
571                resource_names_subscribe: subscribe,
572                initial_resource_versions: initial_versions,
573                ..Default::default()
574            });
575        }
576
577        let conn = Self {
578            cache,
579            node,
580            acks: Default::default(),
581            unknown_types: Vec::new(),
582        };
583        (conn, requests)
584    }
585
586    fn outgoing(&mut self) -> (Vec<DeltaDiscoveryRequest>, DnsUpdates) {
587        let mut responses = Vec::with_capacity(ResourceType::all().len());
588
589        // tee up invalid type messages.
590        //
591        // this should be a hyper rare ocurrence, so `take` the vec to reset the
592        // allocation to nothing instead of `drain` which keeps the capacity.
593        for (response_nonce, type_url) in std::mem::take(&mut self.unknown_types) {
594            let error_detail = Some(xds_api::pb::google::rpc::Status {
595                code: tonic::Code::InvalidArgument.into(),
596                message: "unknown type".to_string(),
597                ..Default::default()
598            });
599            responses.push(DeltaDiscoveryRequest {
600                type_url,
601                response_nonce,
602                error_detail,
603                ..Default::default()
604            })
605        }
606
607        // map changes into responses. DNS updates get passed through directly
608        let (resources, dns) = self.cache.collect();
609
610        // EnumMap::into_iter will always cover all variants as keys in xDS
611        // make-before-break order, so just iterating over `resources` here gets
612        // us responses in an appropriate order.
613        for (rtype, changes) in resources {
614            let ack = self.get_ack(rtype);
615
616            if ack.is_none() && changes.is_empty() {
617                continue;
618            }
619
620            let node = self.node.take();
621            let (response_nonce, error_detail) = ack.map(|a| a.into_ack()).unwrap_or_default();
622            let resource_names_subscribe = changes.added.into_iter().collect();
623            let resource_names_unsubscribe = changes.removed.into_iter().collect();
624
625            responses.push(DeltaDiscoveryRequest {
626                node,
627                type_url: rtype.type_url().to_string(),
628                response_nonce,
629                error_detail,
630                resource_names_subscribe,
631                resource_names_unsubscribe,
632                ..Default::default()
633            })
634        }
635
636        (responses, dns)
637    }
638
639    fn handle_ads_message(&mut self, resp: DeltaDiscoveryResponse) {
640        let Some(rtype) = ResourceType::from_type_url(&resp.type_url) else {
641            tracing::trace!(type_url = %resp.type_url, "unknown type url");
642            self.set_unknown(resp.nonce, resp.type_url);
643            return;
644        };
645
646        // add resources
647        let resources = match ResourceVec::from_resources(rtype, resp.resources) {
648            Ok(r) => r,
649            Err(e) => {
650                tracing::trace!(err = %e, "invalid proto");
651                self.set_ack(
652                    rtype,
653                    resp.nonce,
654                    Some(format!("invalid resource: {e}").into()),
655                );
656                return;
657            }
658        };
659
660        let resource_errors = self.cache.insert(resources);
661        let error = match &resource_errors[..] {
662            &[] => None,
663            // TOOD: actually generate a useful error message here
664            _ => Some("invalid resources".into()),
665        };
666        self.set_ack(rtype, resp.nonce, error);
667
668        // remove resources
669        self.cache.remove(rtype, &resp.removed_resources);
670    }
671
672    fn handle_subscription_update(&mut self, update: SubscriptionUpdate) {
673        match update {
674            SubscriptionUpdate::AddHosts(hosts) => {
675                for host in hosts {
676                    self.cache.subscribe(ResourceType::Listener, &host);
677                }
678            }
679            SubscriptionUpdate::RemoveHosts(hosts) => {
680                for host in hosts {
681                    self.cache.unsubscribe(ResourceType::Listener, &host);
682                }
683            }
684            SubscriptionUpdate::AddBackends(backends) => {
685                for backend in backends {
686                    if let Service::Dns(dns) = &backend.service {
687                        self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
688                    }
689                    self.cache.subscribe(ResourceType::Cluster, &backend.name());
690                }
691            }
692            SubscriptionUpdate::RemoveBackends(backends) => {
693                for backend in backends {
694                    if let Service::Dns(dns) = &backend.service {
695                        self.cache
696                            .unsubscribe_dns(dns.hostname.clone(), backend.port);
697                    }
698                    self.cache
699                        .unsubscribe(ResourceType::Cluster, &backend.name());
700                }
701            }
702            SubscriptionUpdate::AddEndpoints(backends) => {
703                for backend in backends {
704                    match &backend.service {
705                        Service::Dns(dns) => {
706                            self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
707                        }
708                        _ => self
709                            .cache
710                            .subscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
711                    }
712                }
713            }
714            SubscriptionUpdate::RemoveEndpoints(backends) => {
715                for backend in backends {
716                    match &backend.service {
717                        Service::Dns(dns) => {
718                            self.cache
719                                .unsubscribe_dns(dns.hostname.clone(), backend.port);
720                        }
721                        _ => self
722                            .cache
723                            .unsubscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
724                    }
725                }
726            }
727        }
728    }
729
730    fn set_unknown(&mut self, nonce: String, type_url: String) {
731        self.unknown_types.push((nonce, type_url))
732    }
733
734    fn set_ack(&mut self, rtype: ResourceType, nonce: String, error: Option<Cow<'static, str>>) {
735        self.acks[rtype] = Some(AckState { nonce, error })
736    }
737
738    fn get_ack(&mut self, rtype: ResourceType) -> Option<AckState> {
739        self.acks[rtype].take()
740    }
741}
742
743#[derive(Debug, Default, PartialEq, Eq)]
744struct DnsUpdates {
745    add: BTreeSet<(Hostname, u16)>,
746    remove: BTreeSet<(Hostname, u16)>,
747    sync: bool,
748}
749
750#[cfg(test)]
751impl DnsUpdates {
752    fn is_noop(&self) -> bool {
753        self.add.is_empty() && self.remove.is_empty() && !self.sync
754    }
755}
756
757#[cfg(test)]
758mod test_ads_conn {
759    use cache::Cache;
760    use once_cell::sync::Lazy;
761    use pretty_assertions::assert_eq;
762    use xds_api::pb::envoy::service::discovery::v3 as xds_discovery;
763
764    use super::test as xds_test;
765    use super::*;
766
767    static TEST_NODE: Lazy<xds_core::Node> = Lazy::new(|| xds_core::Node {
768        id: "unit-test".to_string(),
769        ..Default::default()
770    });
771
772    /// create a new connection with TEST_NODE and the given cache. asserts that
773    /// the first outgoing message has its Node set to TEST_NODE.
774    #[track_caller]
775    fn new_conn(cache: &mut Cache) -> (AdsConnection, Vec<DeltaDiscoveryRequest>) {
776        let (conn, mut outgoing) = AdsConnection::new(TEST_NODE.clone(), cache);
777
778        // assert the node is there
779        if let Some(first) = outgoing.first_mut() {
780            let node = first
781                .node
782                .take()
783                .expect("expected first outgoing request to have a node");
784
785            assert_eq!(node, *TEST_NODE);
786        };
787
788        (conn, outgoing)
789    }
790
791    #[test]
792    fn test_init_empty_wildcard() {
793        let mut cache = Cache::default();
794        cache.set_wildcard(ResourceType::Listener, true);
795        cache.set_wildcard(ResourceType::Cluster, true);
796
797        let (_, outgoing) = new_conn(&mut cache);
798
799        assert_eq!(
800            outgoing,
801            vec![
802                xds_test::req!(t = ResourceType::Cluster),
803                xds_test::req!(t = ResourceType::Listener),
804            ]
805        )
806    }
807
808    #[test]
809    fn test_init_empty_explicit() {
810        let mut cache = Cache::default();
811        cache.set_wildcard(ResourceType::Listener, false);
812        cache.set_wildcard(ResourceType::Cluster, false);
813
814        let (_, outgoing) = new_conn(&mut cache);
815        assert!(outgoing.is_empty());
816    }
817
818    #[test]
819    fn test_init_subscription_wildcard() {
820        let mut cache = Cache::default();
821        cache.set_wildcard(ResourceType::Listener, false);
822        cache.set_wildcard(ResourceType::Cluster, true);
823
824        cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
825        cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
826
827        // only the Clusters should have the wildcard sub, CLA should not, since it's
828        // not a wildcard-capable resource type
829        let (_, outgoing) = new_conn(&mut cache);
830        assert_eq!(
831            outgoing,
832            vec![
833                xds_test::req!(
834                    t = ResourceType::Cluster,
835                    add = vec!["cluster.example:7891", "*"],
836                    init = vec![],
837                ),
838                xds_test::req!(
839                    t = ResourceType::ClusterLoadAssignment,
840                    add = vec!["cluster.example:7891",],
841                    init = vec![],
842                )
843            ]
844        );
845    }
846
847    #[test]
848    fn test_init_subscription_explicit() {
849        let mut cache = Cache::default();
850        cache.set_wildcard(ResourceType::Listener, false);
851        cache.set_wildcard(ResourceType::Cluster, false);
852
853        cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
854        cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
855
856        let (_, outgoing) = new_conn(&mut cache);
857        assert_eq!(
858            outgoing,
859            vec![
860                xds_test::req!(
861                    t = ResourceType::Cluster,
862                    add = vec!["cluster.example:7891",],
863                    init = vec![],
864                ),
865                xds_test::req!(
866                    t = ResourceType::ClusterLoadAssignment,
867                    add = vec!["cluster.example:7891",],
868                    init = vec![],
869                ),
870            ]
871        );
872    }
873
874    #[test]
875    fn test_init_initial_versions() {
876        let mut cache = Cache::default();
877        assert!(cache.is_wildcard(ResourceType::Listener));
878        assert!(!cache.is_wildcard(ResourceType::RouteConfiguration));
879
880        cache.insert(ResourceVec::from_listeners(
881            "123".into(),
882            vec![xds_test::listener!("cooler.example.org", "cool-route")],
883        ));
884        cache.insert(ResourceVec::from_listeners(
885            "456".into(),
886            vec![xds_test::listener!("warmer.example.org", "warm-route")],
887        ));
888        cache.insert(ResourceVec::from_route_configs(
889            "789".into(),
890            vec![xds_test::route_config!(
891                "cool-route",
892                vec![xds_test::vhost!(
893                    "an-vhost",
894                    ["cooler.example.org"],
895                    [xds_test::route!(default "cooler.example.internal:8008")]
896                )]
897            )],
898        ));
899
900        // both wildcard and non-wildcard should start with an empty add list
901        // but resources in init
902        let (_, outgoing) = new_conn(&mut cache);
903        assert_eq!(
904            outgoing,
905            vec![
906                xds_test::req!(
907                    t = ResourceType::Cluster,
908                    add = vec!["cooler.example.internal:8008", "*"],
909                    init = vec![],
910                ),
911                xds_test::req!(
912                    t = ResourceType::Listener,
913                    add = vec![],
914                    init = vec![("cooler.example.org", "123"), ("warmer.example.org", "456"),]
915                ),
916                xds_test::req!(
917                    t = ResourceType::RouteConfiguration,
918                    add = vec!["warm-route"],
919                    init = vec![("cool-route", "789")]
920                ),
921            ],
922        );
923    }
924
925    #[test]
926    fn test_handle_subscribe_hostname() {
927        let mut cache = Cache::default();
928        let (mut conn, _) = new_conn(&mut cache);
929
930        conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![
931            Service::dns("website.internal").unwrap().name(),
932            Service::kube("default", "nginx")
933                .unwrap()
934                .as_backend_id(4443)
935                .name(),
936        ]));
937
938        let (outgoing, dns) = conn.outgoing();
939        // dns should not update on listeners
940        assert!(dns.is_noop());
941        assert_eq!(
942            outgoing,
943            vec![xds_test::req!(
944                t = ResourceType::Listener,
945                add = vec!["nginx.default.svc.cluster.local:4443", "website.internal"],
946            )]
947        );
948    }
949
950    #[test]
951    fn test_handle_subscribe_backend() {
952        let mut cache = Cache::default();
953        let (mut conn, _) = new_conn(&mut cache);
954
955        conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![
956            Service::dns("website.internal").unwrap().as_backend_id(80),
957            Service::kube("default", "nginx")
958                .unwrap()
959                .as_backend_id(4443),
960        ]));
961
962        let (outgoing, dns) = conn.outgoing();
963        // dns shouldn't preemptively update on dns backends
964        assert_eq!(
965            dns,
966            DnsUpdates {
967                add: [(Hostname::from_static("website.internal"), 80)]
968                    .into_iter()
969                    .collect(),
970                ..Default::default()
971            }
972        );
973
974        // should generate xds for clusters
975        assert_eq!(
976            outgoing,
977            vec![xds_test::req!(
978                t = ResourceType::Cluster,
979                add = vec![
980                    "nginx.default.svc.cluster.local:4443",
981                    "website.internal:80"
982                ],
983            )]
984        );
985    }
986
987    #[test]
988    fn test_handle_ads_message_listener_route() {
989        let mut cache = Cache::default();
990        assert!(cache.is_wildcard(ResourceType::Listener));
991
992        let (mut conn, _) = new_conn(&mut cache);
993
994        conn.handle_ads_message(xds_test::resp!(
995            n = "1",
996            add = ResourceVec::from_listeners(
997                "123".into(),
998                vec![xds_test::listener!("cooler.example.org", "cool-route")],
999            ),
1000            remove = vec![],
1001        ));
1002        conn.handle_ads_message(xds_test::resp!(
1003            n = "2",
1004            add = ResourceVec::from_listeners(
1005                "456".into(),
1006                vec![xds_test::listener!("warmer.example.org", "warm-route")],
1007            ),
1008            remove = vec![],
1009        ));
1010        conn.handle_ads_message(xds_test::resp!(
1011            n = "3",
1012            add = ResourceVec::from_route_configs(
1013                "789".into(),
1014                vec![xds_test::route_config!(
1015                    "cool-route",
1016                    vec![xds_test::vhost!(
1017                        "an-vhost",
1018                        ["cooler.example.org"],
1019                        [xds_test::route!(default "cooler.example.internal:8008")]
1020                    )]
1021                )],
1022            ),
1023            remove = vec![],
1024        ));
1025
1026        let (outgoing, dns) = conn.outgoing();
1027        // no dns changes until we get a cluster
1028        assert!(dns.is_noop());
1029
1030        assert_eq!(
1031            outgoing,
1032            vec![
1033                // new resource subs
1034                xds_test::req!(
1035                    t = ResourceType::Cluster,
1036                    add = vec!["cooler.example.internal:8008"]
1037                ),
1038                // listener ack
1039                xds_test::req!(t = ResourceType::Listener, n = "2"),
1040                // route config acks and new sub
1041                xds_test::req!(
1042                    t = ResourceType::RouteConfiguration,
1043                    n = "3",
1044                    add = vec!["warm-route"]
1045                ),
1046            ],
1047        );
1048    }
1049
1050    #[test]
1051    fn test_handle_ads_message_listener_removed() {
1052        let mut cache = Cache::default();
1053        assert!(cache.is_wildcard(ResourceType::Listener));
1054
1055        let (mut conn, _) = new_conn(&mut cache);
1056
1057        conn.handle_ads_message(xds_test::resp!(
1058            n = "1",
1059            add = ResourceVec::from_listeners(
1060                "123".into(),
1061                vec![xds_test::listener!("cooler.example.org", "cool-route")],
1062            ),
1063            remove = vec![],
1064        ));
1065        conn.handle_ads_message(xds_test::resp!(
1066            n = "2",
1067            add = ResourceVec::from_listeners(
1068                "456".into(),
1069                vec![xds_test::listener!("warmer.example.org", "warm-route")],
1070            ),
1071            remove = vec![],
1072        ));
1073        conn.handle_ads_message(xds_test::resp!(
1074            n = "3",
1075            add = ResourceVec::from_route_configs(
1076                "789".into(),
1077                vec![xds_test::route_config!(
1078                    "cool-route",
1079                    vec![xds_test::vhost!(
1080                        "an-vhost",
1081                        ["cooler.example.org"],
1082                        [xds_test::route!(default "cooler.example.internal:8008")]
1083                    )]
1084                )],
1085            ),
1086            remove = vec![],
1087        ));
1088
1089        let (outgoing, dns) = conn.outgoing();
1090        // no dns changes until we get a cluster
1091        assert!(dns.is_noop());
1092
1093        assert_eq!(
1094            outgoing,
1095            vec![
1096                // new resource subs
1097                xds_test::req!(
1098                    t = ResourceType::Cluster,
1099                    add = vec!["cooler.example.internal:8008"]
1100                ),
1101                // listener ack
1102                xds_test::req!(t = ResourceType::Listener, n = "2"),
1103                // route config acks and new sub
1104                xds_test::req!(
1105                    t = ResourceType::RouteConfiguration,
1106                    n = "3",
1107                    add = vec!["warm-route"]
1108                ),
1109            ],
1110        );
1111
1112        // the server gets a delete for the listener we already have
1113        conn.handle_ads_message(xds_test::resp!(
1114            n = "4",
1115            add = ResourceVec::from_listeners("123".into(), vec![]),
1116            remove = vec!["warmer.example.org"],
1117        ));
1118
1119        let (outgoing, dns) = conn.outgoing();
1120        assert!(dns.is_noop());
1121        assert_eq!(
1122            outgoing,
1123            vec![
1124                // listener ack
1125                xds_test::req!(t = ResourceType::Listener, n = "4"),
1126                // route config remove
1127                xds_test::req!(
1128                    t = ResourceType::RouteConfiguration,
1129                    remove = vec!["warm-route"],
1130                ),
1131            ]
1132        );
1133    }
1134
1135    #[test]
1136    fn test_handle_ads_message_cluster_cla() {
1137        let mut cache = Cache::default();
1138        assert!(cache.is_wildcard(ResourceType::Cluster));
1139
1140        let (mut conn, _) = new_conn(&mut cache);
1141
1142        conn.handle_ads_message(xds_test::resp!(
1143            n = "1",
1144            add = ResourceVec::from_clusters(
1145                "123".into(),
1146                vec![
1147                    xds_test::cluster!("cooler.example.org:2345"),
1148                    xds_test::cluster!("thing.default.svc.cluster.local:9876"),
1149                ],
1150            ),
1151            remove = vec![],
1152        ));
1153        conn.handle_ads_message(xds_test::resp!(
1154            n = "2",
1155            add = ResourceVec::from_load_assignments(
1156                "123".into(),
1157                vec![xds_test::cla!(
1158                    "thing.default.svc.cluster.local:9876" => {
1159                        "zone1" => ["1.1.1.1"]
1160                    }
1161                )],
1162            ),
1163            remove = vec![],
1164        ));
1165        conn.handle_ads_message(xds_test::resp!(
1166            n = "3",
1167            add = ResourceVec::from_listeners("555".into(), vec![
1168                xds_test::listener!("cooler.example.org.lb.jct:2345", "lb-route" => [xds_test::vhost!(
1169                    "lb-vhost",
1170                    ["cooler.example.org.lb.jct:2345"],
1171                    [xds_test::route!(default ring_hash = "x-user", "cooler.example.org:2345")],
1172                )]),
1173                xds_test::listener!("thing.default.svc.cluster.local.lb.jct:9876", "lb-route" => [xds_test::vhost!(
1174                    "lb-vhost",
1175                    ["cooler.example.org.lb.jct:2345"],
1176                    [xds_test::route!(default ring_hash = "x-user", "thing.default.svc.cluster.local:9876")],
1177                )])
1178            ]),
1179            remove = vec![],
1180        ));
1181
1182        let (outgoing, dns) = conn.outgoing();
1183        // dns changes, we got a dns cluster
1184        assert_eq!(
1185            dns,
1186            DnsUpdates {
1187                add: [(Hostname::from_static("cooler.example.org"), 2345)]
1188                    .into_iter()
1189                    .collect(),
1190                ..Default::default()
1191            }
1192        );
1193        // should generate ACKs
1194        assert_eq!(
1195            outgoing,
1196            vec![
1197                xds_test::req!(t = ResourceType::Cluster, n = "1"),
1198                xds_test::req!(t = ResourceType::ClusterLoadAssignment, n = "2"),
1199                xds_test::req!(t = ResourceType::Listener, n = "3"),
1200            ]
1201        );
1202    }
1203
1204    #[test]
1205    fn test_set_node_after_init() {
1206        let mut cache = Cache::default();
1207        for rtype in ResourceType::all() {
1208            cache.set_wildcard(*rtype, false);
1209        }
1210
1211        let (mut conn, outgoing) = new_conn(&mut cache);
1212        assert!(outgoing.is_empty());
1213
1214        let svc = Service::dns("website.internal").unwrap().as_backend_id(80);
1215        conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![svc]));
1216
1217        let (outgoing, _) = conn.outgoing();
1218        assert_eq!(outgoing[0].node.as_ref(), Some(&*TEST_NODE));
1219    }
1220
1221    #[test]
1222    fn test_handle_unknown_type_url() {
1223        let mut cache = Cache::default();
1224        let (mut conn, _) = new_conn(&mut cache);
1225
1226        conn.handle_ads_message(DeltaDiscoveryResponse {
1227            type_url: "made.up.type_url/Potato".to_string(),
1228            ..Default::default()
1229        });
1230
1231        let (outgoing, dns) = conn.outgoing();
1232        assert!(dns.is_noop());
1233        assert_eq!(
1234            outgoing,
1235            vec![DeltaDiscoveryRequest {
1236                type_url: "made.up.type_url/Potato".to_string(),
1237                error_detail: Some(xds_api::pb::google::rpc::Status {
1238                    code: tonic::Code::InvalidArgument.into(),
1239                    message: "unknown type".to_string(),
1240                    ..Default::default()
1241                }),
1242                ..Default::default()
1243            }]
1244        );
1245    }
1246
1247    #[test]
1248    fn test_handle_invalid_resource() {
1249        let mut cache = Cache::default();
1250        let (mut conn, _) = new_conn(&mut cache);
1251
1252        let node = xds_core::Node {
1253            id: "some-node".to_string(),
1254            ..Default::default()
1255        };
1256        conn.handle_ads_message(DeltaDiscoveryResponse {
1257            type_url: ResourceType::Listener.type_url().to_string(),
1258            resources: vec![xds_discovery::Resource {
1259                resource: Some(protobuf::Any::from_msg(&node).unwrap()),
1260                ..Default::default()
1261            }],
1262            ..Default::default()
1263        });
1264
1265        let (outgoing, dns) = conn.outgoing();
1266        assert!(dns.is_noop());
1267        assert!(matches!(
1268            &outgoing[..],
1269            [DeltaDiscoveryRequest { type_url, error_detail, ..}] if
1270                type_url == ResourceType::Listener.type_url() &&
1271                error_detail.as_ref().is_some_and(|e| e.message.starts_with("invalid resource"))
1272        ));
1273    }
1274
1275    #[test]
1276    fn test_handle_does_not_exist() {
1277        let mut cache = Cache::default();
1278        let (mut conn, _) = new_conn(&mut cache);
1279
1280        // handle a subscription update
1281        let does_not_exist = Service::dns("website.internal").unwrap().name();
1282        conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![does_not_exist.clone()]));
1283        let _ = conn.outgoing();
1284
1285        conn.handle_ads_message(DeltaDiscoveryResponse {
1286            nonce: "boo".to_string(),
1287            type_url: ResourceType::Listener.type_url().to_string(),
1288            removed_resources: vec![does_not_exist.clone()],
1289            ..Default::default()
1290        });
1291
1292        // should generate an ACK immediately
1293        let (outgoing, dns) = conn.outgoing();
1294        assert!(dns.is_noop());
1295        assert_eq!(
1296            outgoing,
1297            vec![xds_test::req!(t = ResourceType::Listener, n = "boo")],
1298        );
1299
1300        // route should be tombstoned
1301        let route = cache
1302            .reader()
1303            .get_route("website.internal")
1304            .now_or_never()
1305            .unwrap();
1306        assert_eq!(route, None);
1307    }
1308}