rust_control_plane/
cache.rs

1use crate::snapshot::{Resources, Snapshot};
2use data_plane_api::envoy::config::core::v3::Node;
3use data_plane_api::envoy::service::discovery::v3::{DiscoveryRequest, DiscoveryResponse};
4use log::info;
5use slab::Slab;
6use std::collections::{HashMap, HashSet};
7use std::time::Instant;
8use tokio::sync::mpsc;
9use tokio::sync::Mutex;
10
11#[derive(Debug)]
12pub struct Cache {
13    inner: Mutex<Inner>,
14    ads: bool,
15}
16
17#[derive(Debug)]
18struct Inner {
19    status: HashMap<String, NodeStatus>,
20    snapshots: HashMap<String, Snapshot>,
21}
22
23#[derive(Debug)]
24struct NodeStatus {
25    last_request_time: Instant,
26    watches: Slab<Watch>,
27}
28
29impl NodeStatus {
30    fn new() -> Self {
31        Self {
32            last_request_time: Instant::now(),
33            watches: Slab::new(),
34        }
35    }
36}
37
38#[derive(Clone, Debug)]
39pub struct WatchId {
40    node_id: String,
41    index: usize,
42}
43
44#[derive(Debug)]
45struct Watch {
46    req: DiscoveryRequest,
47    tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
48}
49
50pub enum FetchError {
51    VersionUpToDate,
52    NotFound,
53}
54
55impl Cache {
56    pub fn new(ads: bool) -> Self {
57        Self {
58            inner: Mutex::new(Inner::new()),
59            ads,
60        }
61    }
62
63    // Either responds on tx immediately, or sets a watch, returning a watch ID.
64    pub async fn create_watch(
65        &self,
66        req: &DiscoveryRequest,
67        tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
68        known_resource_names: &HashMap<String, HashSet<String>>,
69    ) -> Option<WatchId> {
70        let mut inner = self.inner.lock().await;
71        let node_id = hash_id(&req.node);
72        inner.update_node_status(&node_id);
73        if let Some(snapshot) = inner.snapshots.get(&node_id) {
74            let resources = snapshot.resources(&req.type_url);
75            let version = snapshot.version(&req.type_url);
76            let type_known_resource_names = known_resource_names.get(&req.type_url);
77            // Check if a different set of resources has been requested.
78            if inner.is_requesting_new_resources(req, resources, type_known_resource_names) {
79                if self.ads && check_ads_consistency(req, resources) {
80                    info!("not responding: ads consistency");
81                    return Some(inner.set_watch(&node_id, req, tx));
82                }
83                info!("responding: resource diff");
84                respond(req, tx, resources, version).await;
85                return None;
86            }
87            if req.version_info == version {
88                // Client is already at the latest version, so we have nothing to respond with.
89                // Set a watch because we may receive a new version in the future.
90                info!("set watch: latest version");
91                Some(inner.set_watch(&node_id, req, tx))
92            } else {
93                // The version has changed, so we should respond.
94                if self.ads && check_ads_consistency(req, resources) {
95                    info!("not responding: ads consistency");
96                    return Some(inner.set_watch(&node_id, req, tx));
97                }
98                info!("responding: new version");
99                respond(req, tx, resources, version).await;
100                None
101            }
102        } else {
103            // No snapshot exists for this node, so we have nothing to respond with.
104            // Set a watch because we may receive a snapshot for this node in the future.
105            info!("set watch: no snapshot");
106            Some(inner.set_watch(&node_id, req, tx))
107        }
108    }
109
110    // Deletes a watch previously created with create_watch.
111    pub async fn cancel_watch(&self, watch_id: &WatchId) {
112        let mut inner = self.inner.lock().await;
113        if let Some(status) = inner.status.get_mut(&watch_id.node_id) {
114            status.watches.try_remove(watch_id.index);
115        }
116    }
117
118    // Updates snapshot associated with a given node so that future requests receive it.
119    // Triggers existing watches for the given node.
120    pub async fn set_snapshot(&self, node: &str, snapshot: Snapshot) {
121        let mut inner = self.inner.lock().await;
122        inner.snapshots.insert(node.to_string(), snapshot.clone());
123        if let Some(status) = inner.status.get_mut(node) {
124            let mut to_delete = Vec::new();
125            for (watch_id, watch) in &mut status.watches {
126                let version = snapshot.version(&watch.req.type_url);
127                if version != watch.req.version_info {
128                    to_delete.push(watch_id)
129                }
130            }
131
132            for watch_id in to_delete {
133                let watch = status.watches.remove(watch_id);
134                let resources = snapshot.resources(&watch.req.type_url);
135                let version = snapshot.version(&watch.req.type_url);
136                info!(
137                    "watch triggered version={} type_url={}",
138                    version, &watch.req.type_url
139                );
140                respond(&watch.req, watch.tx, resources, version).await;
141            }
142        }
143    }
144
145    pub async fn fetch<'a>(
146        &'a self,
147        req: &'a DiscoveryRequest,
148        type_url: &'static str,
149    ) -> Result<DiscoveryResponse, FetchError> {
150        let inner = self.inner.lock().await;
151        let node_id = hash_id(&req.node);
152        let snapshot = inner.snapshots.get(&node_id).ok_or(FetchError::NotFound)?;
153        let version = snapshot.version(&req.type_url);
154        if req.version_info == version {
155            return Err(FetchError::VersionUpToDate);
156        }
157        let resources = snapshot.resources(type_url);
158        Ok(build_response(req, resources, version))
159    }
160
161    pub async fn node_status(&self) -> HashMap<String, Instant> {
162        let inner = self.inner.lock().await;
163        inner
164            .status
165            .iter()
166            .map(|(k, v)| (k.clone(), v.last_request_time))
167            .collect()
168    }
169}
170
171impl Inner {
172    fn new() -> Self {
173        Self {
174            status: HashMap::new(),
175            snapshots: HashMap::new(),
176        }
177    }
178
179    fn set_watch(
180        &mut self,
181        node_id: &str,
182        req: &DiscoveryRequest,
183        tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
184    ) -> WatchId {
185        let watch = Watch {
186            req: req.clone(),
187            tx,
188        };
189        let status = self.status.get_mut(node_id).unwrap();
190        let index = status.watches.insert(watch);
191        WatchId {
192            node_id: node_id.to_string(),
193            index,
194        }
195    }
196
197    fn update_node_status(&mut self, node_id: &str) {
198        self.status
199            .entry(node_id.to_string())
200            .and_modify(|entry| entry.last_request_time = Instant::now())
201            .or_insert_with(NodeStatus::new);
202    }
203
204    fn is_requesting_new_resources(
205        &self,
206        req: &DiscoveryRequest,
207        resources: Option<&Resources>,
208        type_known_resource_names: Option<&HashSet<String>>,
209    ) -> bool {
210        if let Some(resources) = resources {
211            if let Some(known_resource_names) = type_known_resource_names {
212                let mut diff = Vec::new();
213                for name in &req.resource_names {
214                    if !known_resource_names.contains(name) {
215                        diff.push(name)
216                    }
217                }
218                for name in diff {
219                    if resources.items.contains_key(name) {
220                        return true;
221                    }
222                }
223            }
224        }
225        false
226    }
227}
228
229fn hash_id(node: &Option<Node>) -> String {
230    node.as_ref().map_or(String::new(), |node| node.id.clone())
231}
232
233fn build_response(
234    req: &DiscoveryRequest,
235    resources: Option<&Resources>,
236    version: &str,
237) -> DiscoveryResponse {
238    let mut filtered_resources = Vec::new();
239    if let Some(resources) = resources {
240        if req.resource_names.is_empty() {
241            filtered_resources = resources
242                .items
243                .values()
244                .map(|resource| resource.into_any())
245                .collect();
246        } else {
247            for name in &req.resource_names {
248                if let Some(resource) = resources.items.get(name) {
249                    filtered_resources.push(resource.into_any())
250                }
251            }
252        }
253    }
254    DiscoveryResponse {
255        type_url: req.type_url.clone(),
256        nonce: String::new(),
257        version_info: version.to_string(),
258        resources: filtered_resources,
259        control_plane: None,
260        canary: false,
261    }
262}
263
264async fn respond(
265    req: &DiscoveryRequest,
266    tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
267    resources: Option<&Resources>,
268    version: &str,
269) {
270    let rep = build_response(req, resources, version);
271    tx.send((req.clone(), rep)).await.unwrap();
272}
273
274fn check_ads_consistency(req: &DiscoveryRequest, resources: Option<&Resources>) -> bool {
275    if !req.resource_names.is_empty() {
276        if let Some(resources) = resources {
277            let set: HashSet<&String> = HashSet::from_iter(req.resource_names.iter());
278            for (name, _) in resources.items.iter() {
279                if !set.contains(name) {
280                    return false;
281                }
282            }
283        }
284    }
285    true
286}