1use crate::snapshot::{Resources, Snapshot};
2use data_plane_api::envoy::config::core::v3::Node;
3use data_plane_api::envoy::service::discovery::v3::{DiscoveryRequest, DiscoveryResponse};
4use log::info;
5use slab::Slab;
6use std::collections::{HashMap, HashSet};
7use std::time::Instant;
8use tokio::sync::mpsc;
9use tokio::sync::Mutex;
10
11#[derive(Debug)]
12pub struct Cache {
13 inner: Mutex<Inner>,
14 ads: bool,
15}
16
17#[derive(Debug)]
18struct Inner {
19 status: HashMap<String, NodeStatus>,
20 snapshots: HashMap<String, Snapshot>,
21}
22
23#[derive(Debug)]
24struct NodeStatus {
25 last_request_time: Instant,
26 watches: Slab<Watch>,
27}
28
29impl NodeStatus {
30 fn new() -> Self {
31 Self {
32 last_request_time: Instant::now(),
33 watches: Slab::new(),
34 }
35 }
36}
37
38#[derive(Clone, Debug)]
39pub struct WatchId {
40 node_id: String,
41 index: usize,
42}
43
44#[derive(Debug)]
45struct Watch {
46 req: DiscoveryRequest,
47 tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
48}
49
50pub enum FetchError {
51 VersionUpToDate,
52 NotFound,
53}
54
55impl Cache {
56 pub fn new(ads: bool) -> Self {
57 Self {
58 inner: Mutex::new(Inner::new()),
59 ads,
60 }
61 }
62
63 pub async fn create_watch(
65 &self,
66 req: &DiscoveryRequest,
67 tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
68 known_resource_names: &HashMap<String, HashSet<String>>,
69 ) -> Option<WatchId> {
70 let mut inner = self.inner.lock().await;
71 let node_id = hash_id(&req.node);
72 inner.update_node_status(&node_id);
73 if let Some(snapshot) = inner.snapshots.get(&node_id) {
74 let resources = snapshot.resources(&req.type_url);
75 let version = snapshot.version(&req.type_url);
76 let type_known_resource_names = known_resource_names.get(&req.type_url);
77 if inner.is_requesting_new_resources(req, resources, type_known_resource_names) {
79 if self.ads && check_ads_consistency(req, resources) {
80 info!("not responding: ads consistency");
81 return Some(inner.set_watch(&node_id, req, tx));
82 }
83 info!("responding: resource diff");
84 respond(req, tx, resources, version).await;
85 return None;
86 }
87 if req.version_info == version {
88 info!("set watch: latest version");
91 Some(inner.set_watch(&node_id, req, tx))
92 } else {
93 if self.ads && check_ads_consistency(req, resources) {
95 info!("not responding: ads consistency");
96 return Some(inner.set_watch(&node_id, req, tx));
97 }
98 info!("responding: new version");
99 respond(req, tx, resources, version).await;
100 None
101 }
102 } else {
103 info!("set watch: no snapshot");
106 Some(inner.set_watch(&node_id, req, tx))
107 }
108 }
109
110 pub async fn cancel_watch(&self, watch_id: &WatchId) {
112 let mut inner = self.inner.lock().await;
113 if let Some(status) = inner.status.get_mut(&watch_id.node_id) {
114 status.watches.try_remove(watch_id.index);
115 }
116 }
117
118 pub async fn set_snapshot(&self, node: &str, snapshot: Snapshot) {
121 let mut inner = self.inner.lock().await;
122 inner.snapshots.insert(node.to_string(), snapshot.clone());
123 if let Some(status) = inner.status.get_mut(node) {
124 let mut to_delete = Vec::new();
125 for (watch_id, watch) in &mut status.watches {
126 let version = snapshot.version(&watch.req.type_url);
127 if version != watch.req.version_info {
128 to_delete.push(watch_id)
129 }
130 }
131
132 for watch_id in to_delete {
133 let watch = status.watches.remove(watch_id);
134 let resources = snapshot.resources(&watch.req.type_url);
135 let version = snapshot.version(&watch.req.type_url);
136 info!(
137 "watch triggered version={} type_url={}",
138 version, &watch.req.type_url
139 );
140 respond(&watch.req, watch.tx, resources, version).await;
141 }
142 }
143 }
144
145 pub async fn fetch<'a>(
146 &'a self,
147 req: &'a DiscoveryRequest,
148 type_url: &'static str,
149 ) -> Result<DiscoveryResponse, FetchError> {
150 let inner = self.inner.lock().await;
151 let node_id = hash_id(&req.node);
152 let snapshot = inner.snapshots.get(&node_id).ok_or(FetchError::NotFound)?;
153 let version = snapshot.version(&req.type_url);
154 if req.version_info == version {
155 return Err(FetchError::VersionUpToDate);
156 }
157 let resources = snapshot.resources(type_url);
158 Ok(build_response(req, resources, version))
159 }
160
161 pub async fn node_status(&self) -> HashMap<String, Instant> {
162 let inner = self.inner.lock().await;
163 inner
164 .status
165 .iter()
166 .map(|(k, v)| (k.clone(), v.last_request_time))
167 .collect()
168 }
169}
170
171impl Inner {
172 fn new() -> Self {
173 Self {
174 status: HashMap::new(),
175 snapshots: HashMap::new(),
176 }
177 }
178
179 fn set_watch(
180 &mut self,
181 node_id: &str,
182 req: &DiscoveryRequest,
183 tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
184 ) -> WatchId {
185 let watch = Watch {
186 req: req.clone(),
187 tx,
188 };
189 let status = self.status.get_mut(node_id).unwrap();
190 let index = status.watches.insert(watch);
191 WatchId {
192 node_id: node_id.to_string(),
193 index,
194 }
195 }
196
197 fn update_node_status(&mut self, node_id: &str) {
198 self.status
199 .entry(node_id.to_string())
200 .and_modify(|entry| entry.last_request_time = Instant::now())
201 .or_insert_with(NodeStatus::new);
202 }
203
204 fn is_requesting_new_resources(
205 &self,
206 req: &DiscoveryRequest,
207 resources: Option<&Resources>,
208 type_known_resource_names: Option<&HashSet<String>>,
209 ) -> bool {
210 if let Some(resources) = resources {
211 if let Some(known_resource_names) = type_known_resource_names {
212 let mut diff = Vec::new();
213 for name in &req.resource_names {
214 if !known_resource_names.contains(name) {
215 diff.push(name)
216 }
217 }
218 for name in diff {
219 if resources.items.contains_key(name) {
220 return true;
221 }
222 }
223 }
224 }
225 false
226 }
227}
228
229fn hash_id(node: &Option<Node>) -> String {
230 node.as_ref().map_or(String::new(), |node| node.id.clone())
231}
232
233fn build_response(
234 req: &DiscoveryRequest,
235 resources: Option<&Resources>,
236 version: &str,
237) -> DiscoveryResponse {
238 let mut filtered_resources = Vec::new();
239 if let Some(resources) = resources {
240 if req.resource_names.is_empty() {
241 filtered_resources = resources
242 .items
243 .values()
244 .map(|resource| resource.into_any())
245 .collect();
246 } else {
247 for name in &req.resource_names {
248 if let Some(resource) = resources.items.get(name) {
249 filtered_resources.push(resource.into_any())
250 }
251 }
252 }
253 }
254 DiscoveryResponse {
255 type_url: req.type_url.clone(),
256 nonce: String::new(),
257 version_info: version.to_string(),
258 resources: filtered_resources,
259 control_plane: None,
260 canary: false,
261 }
262}
263
264async fn respond(
265 req: &DiscoveryRequest,
266 tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
267 resources: Option<&Resources>,
268 version: &str,
269) {
270 let rep = build_response(req, resources, version);
271 tx.send((req.clone(), rep)).await.unwrap();
272}
273
274fn check_ads_consistency(req: &DiscoveryRequest, resources: Option<&Resources>) -> bool {
275 if !req.resource_names.is_empty() {
276 if let Some(resources) = resources {
277 let set: HashSet<&String> = HashSet::from_iter(req.resource_names.iter());
278 for (name, _) in resources.items.iter() {
279 if !set.contains(name) {
280 return false;
281 }
282 }
283 }
284 }
285 true
286}