1use crate::{
5 error::Error,
6 error::Result,
7 test_spec::{BucketOperation, WatchSpec},
8};
9use futures::StreamExt;
10use kube::{
11 api::{DynamicObject, Patch, PatchParams},
12 core::{ApiResource, GroupVersionKind},
13 runtime::watcher,
14 runtime::watcher::{Event, InitialListStrategy},
15 Api, Client, ResourceExt,
16};
17use serde_json;
18use serde_json::json;
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use tokio::sync::Mutex;
22use tokio::task::JoinSet;
23use tokio::time::{sleep, Duration};
24use tokio_util::sync::CancellationToken;
25
26const FINALIZER_NAME: &str = "blackjack.io/finalizer";
27
28pub struct Bucket {
29 pub allowed_operations: HashSet<BucketOperation>,
30 pub data: HashMap<String, serde_json::Value>,
31}
32
33impl Default for Bucket {
34 fn default() -> Self {
35 Bucket {
36 allowed_operations: HashSet::from([
37 BucketOperation::Create,
38 BucketOperation::Patch,
39 BucketOperation::Delete,
40 ]),
41 data: HashMap::new(),
42 }
43 }
44}
45
46impl Bucket {
47 pub fn new(allowed_operations: HashSet<BucketOperation>) -> Self {
48 Bucket {
49 allowed_operations,
50 data: HashMap::new(),
51 }
52 }
53}
54
55pub type Buckets = HashMap<String, Bucket>;
56
57pub struct CollectedData {
58 pub buckets: Buckets,
59}
60pub type CollectedDataContainer = Arc<Mutex<CollectedData>>;
61
62impl CollectedData {
63 pub fn new() -> Self {
64 CollectedData {
65 buckets: HashMap::new(),
66 }
67 }
68
69 pub fn contains(&self, uid: &str) -> bool {
70 for (_, bucket) in &self.buckets {
71 if bucket.data.contains_key(uid) {
72 return true;
73 }
74 }
75 false
76 }
77
78 pub async fn cleanup(&self, client: Client) -> Result<()> {
79 let uids: Vec<String> = {
80 self.buckets
81 .iter()
82 .flat_map(|(_, bucket)| bucket.data.iter())
83 .map(|(uid, _)| uid.clone())
84 .collect()
85 };
86
87 for uid in uids {
88 log::debug!("Removing finalizer for {uid}");
89 let resource = {
90 self.buckets
91 .values()
92 .flat_map(|bucket| bucket.data.get(&uid))
93 .next()
94 .cloned()
95 };
96
97 if let Some(resource_value) = resource {
98 let obj: DynamicObject = serde_json::from_value(resource_value)?;
99 let name = obj.name_any();
100 let namespace = obj.namespace().unwrap_or_default();
101 let api: Api<DynamicObject> = Api::namespaced_with(
102 client.clone(),
103 &namespace,
104 &ApiResource::from_gvk(&GroupVersionKind::try_from(
105 &obj.types.unwrap_or_default(),
106 )?),
107 );
108
109 let patch = json!({
110 "metadata": {
111 "finalizers": null
112 }
113 });
114 let patch_params = PatchParams::default();
115 log::debug!("calling API");
116 match api.patch(&name, &patch_params, &Patch::Merge(&patch)).await {
117 Ok(_) => log::debug!("Removed finalizer from '{}'", name),
118 Err(e) => log::warn!("Failed to remove finalizer from '{}': {}", name, e),
119 }
120 } else {
121 log::warn!("Resource with UID '{}' not found in collected_data", uid);
122 }
123 }
124
125 Ok(())
126 }
127}
128
129struct CollectorBrief {
130 client: Client,
131 namespace: String,
132 api_resource: ApiResource,
133 spec: WatchSpec,
134 collected_data: CollectedDataContainer,
135 token: CancellationToken,
136}
137
138pub struct Collector {
139 token: CancellationToken,
140 tasks: JoinSet<Result<()>>,
141}
142
143impl Collector {
144 pub fn new_data() -> CollectedDataContainer {
145 CollectedDataContainer::new(Mutex::new(CollectedData::new()))
146 }
147
148 pub async fn new(
149 client: Client,
150 specs: Vec<WatchSpec>,
151 collected_data: CollectedDataContainer,
152 ) -> Result<Self> {
153 let token = CancellationToken::new();
154 let mut tasks = JoinSet::new();
155 for spec in specs {
156 let brief = CollectorBrief {
157 client: client.clone(),
158 namespace: spec.namespace.clone(),
159 collected_data: collected_data.clone(),
160 token: token.clone(),
161 api_resource: ApiResource::from_gvk(&GroupVersionKind::gvk(
162 &spec.group,
163 &spec.version,
164 &spec.kind,
165 )),
166 spec,
167 };
168
169 tasks.spawn(async move { brief.start().await });
170 }
171
172 Ok(Collector { token, tasks })
173 }
174
175 pub async fn stop(&mut self) -> Result<()> {
176 assert!(!self.token.is_cancelled());
177
178 self.token.cancel();
179 let tasks = std::mem::take(&mut self.tasks);
180 log::debug!("joining all watcher");
181 let results = tasks.join_all().await;
182 let errors: Vec<Error> = results.into_iter().filter_map(|res| res.err()).collect();
183 log::debug!("num errors: {}", errors.len());
184 match errors.len() {
185 0 => Ok(()),
186 1 => Err(errors.into_iter().next().unwrap()),
187 _ => Err(Error::MultipleErrors(errors)),
188 }
189 }
190}
191
192impl CollectorBrief {
193 async fn handle_apply(&self, api: Api<DynamicObject>, obj: DynamicObject) -> Result<()> {
194 let name = obj.name_any();
195 let uid = obj.metadata.uid.clone().unwrap();
196 let mut data = self.collected_data.lock().await;
197 let is_marked_for_deletion = obj.metadata.deletion_timestamp.is_some();
198 let mut is_stored = (*data).contains(&uid);
199 let mut has_finalizer = obj.finalizers().contains(&FINALIZER_NAME.to_string());
200 if !is_stored && !is_marked_for_deletion {
201 if !has_finalizer {
202 let patch = json!({
203 "metadata": {
204 "finalizers": [FINALIZER_NAME]
205 }
206 });
207 let patch_params = PatchParams::default();
208 match api.patch(&name, &patch_params, &Patch::Merge(&patch)).await {
209 Ok(_) => {
210 has_finalizer = true;
211 log::debug!("Added finalizer to '{}'", name);
212 }
213 Err(e) => {
214 log::debug!("Failed to add finalizer to '{}': {}", name, e);
215 }
216 }
217 }
218 }
219 if is_marked_for_deletion {
220 if is_stored {
221 is_stored = false;
222 for (_, bucket) in &mut (*data).buckets {
223 if bucket.allowed_operations.contains(&BucketOperation::Delete) {
224 bucket.data.remove(&uid);
225 } else {
226 is_stored = true;
227 }
228 }
229 }
230 if has_finalizer && !is_stored {
231 let patch = json!({
232 "metadata": {
233 "finalizers": null
234 }
235 });
236 let patch_params = PatchParams::default();
237 match api.patch(&name, &patch_params, &Patch::Merge(&patch)).await {
238 Ok(_) => log::debug!("Removed finalizer from '{}'", name),
239 Err(e) => log::debug!("Failed to remove finalizer from '{}': {}", name, e),
240 }
241 }
242 } else {
243 let value = serde_json::to_value(&obj).unwrap_or_else(|_| serde_json::Value::Null);
244 let bucket = (*data)
245 .buckets
246 .entry(self.spec.name.clone())
247 .or_insert_with(Default::default);
248 if (!bucket.data.contains_key(&uid)
249 && bucket.allowed_operations.contains(&BucketOperation::Create))
250 || (bucket.data.contains_key(&uid)
251 && bucket.allowed_operations.contains(&BucketOperation::Patch))
252 {
253 bucket.data.insert(uid, value);
254 }
255 }
256 Ok(())
257 }
258
259 async fn start(&self) -> Result<()> {
260 let api: Api<DynamicObject> =
261 Api::namespaced_with(self.client.clone(), &self.namespace, &self.api_resource);
262 let label_selector = self
263 .spec
264 .labels
265 .as_ref()
266 .and_then(|labels| {
267 Some(
268 labels
269 .iter()
270 .map(|(k, v)| format!("{}={}", k, v))
271 .collect::<Vec<_>>()
272 .join(","),
273 )
274 })
275 .or_else(|| Some(String::new()))
276 .unwrap();
277 let field_selector = self
278 .spec
279 .fields
280 .as_ref()
281 .and_then(|fields| {
282 Some(
283 fields
284 .iter()
285 .map(|(k, v)| format!("{}={}", k, v))
286 .collect::<Vec<_>>()
287 .join(","),
288 )
289 })
290 .or_else(|| Some(String::new()))
291 .unwrap();
292
293 let config = watcher::Config {
294 label_selector: Some(label_selector),
295 field_selector: Some(field_selector),
296 initial_list_strategy: InitialListStrategy::ListWatch,
297 ..watcher::Config::default()
298 };
299 let mut stream = watcher(api.clone(), config).boxed();
300
301 while let Some(event) = tokio::select! {
302 biased;
303 _ = self.token.cancelled() => {
304 log::debug!("Watcher for id '{}' received cancellation signal.", self.spec.name);
305 None
306 }
307 event = stream.next() => event,
308 } {
309 let result = match event {
310 Ok(Event::Apply(obj)) | Ok(Event::InitApply(obj)) => match obj.uid() {
311 Some(_) => self.handle_apply(api.clone(), obj).await,
312 None => Err(Error::NoUidError),
313 },
314 Ok(_) => Ok(()),
315 Err(e) => Err(Error::WatcherError(e)),
316 };
317 match result {
318 Ok(_) => {}
319 Err(e) => {
320 log::warn!(
321 "Error watching resource for watch '{}': {}",
322 self.spec.name,
323 e
324 );
325 sleep(Duration::from_secs(10)).await;
326 log::debug!("Resuming to watch resource'{}'", self.spec.name);
327 }
328 }
329 }
330
331 log::debug!("Watcher for id '{}' is terminating.", self.spec.name);
332
333 Ok(())
334 }
335}