blackjack/
collector.rs

1// Copyright 2024 Ole Kliemann
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    error::Error,
6    error::Result,
7    test_spec::{BucketOperation, WatchSpec},
8};
9use futures::StreamExt;
10use kube::{
11    api::{DynamicObject, Patch, PatchParams},
12    core::{ApiResource, GroupVersionKind},
13    runtime::watcher,
14    runtime::watcher::{Event, InitialListStrategy},
15    Api, Client, ResourceExt,
16};
17use serde_json;
18use serde_json::json;
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use tokio::sync::Mutex;
22use tokio::task::JoinSet;
23use tokio::time::{sleep, Duration};
24use tokio_util::sync::CancellationToken;
25
26const FINALIZER_NAME: &str = "blackjack.io/finalizer";
27
28pub struct Bucket {
29    pub allowed_operations: HashSet<BucketOperation>,
30    pub data: HashMap<String, serde_json::Value>,
31}
32
33impl Default for Bucket {
34    fn default() -> Self {
35        Bucket {
36            allowed_operations: HashSet::from([
37                BucketOperation::Create,
38                BucketOperation::Patch,
39                BucketOperation::Delete,
40            ]),
41            data: HashMap::new(),
42        }
43    }
44}
45
46impl Bucket {
47    pub fn new(allowed_operations: HashSet<BucketOperation>) -> Self {
48        Bucket {
49            allowed_operations,
50            data: HashMap::new(),
51        }
52    }
53}
54
55pub type Buckets = HashMap<String, Bucket>;
56
57pub struct CollectedData {
58    pub buckets: Buckets,
59}
60pub type CollectedDataContainer = Arc<Mutex<CollectedData>>;
61
62impl CollectedData {
63    pub fn new() -> Self {
64        CollectedData {
65            buckets: HashMap::new(),
66        }
67    }
68
69    pub fn contains(&self, uid: &str) -> bool {
70        for (_, bucket) in &self.buckets {
71            if bucket.data.contains_key(uid) {
72                return true;
73            }
74        }
75        false
76    }
77
78    pub async fn cleanup(&self, client: Client) -> Result<()> {
79        let uids: Vec<String> = {
80            self.buckets
81                .iter()
82                .flat_map(|(_, bucket)| bucket.data.iter())
83                .map(|(uid, _)| uid.clone())
84                .collect()
85        };
86
87        for uid in uids {
88            log::debug!("Removing finalizer for {uid}");
89            let resource = {
90                self.buckets
91                    .values()
92                    .flat_map(|bucket| bucket.data.get(&uid))
93                    .next()
94                    .cloned()
95            };
96
97            if let Some(resource_value) = resource {
98                let obj: DynamicObject = serde_json::from_value(resource_value)?;
99                let name = obj.name_any();
100                let namespace = obj.namespace().unwrap_or_default();
101                let api: Api<DynamicObject> = Api::namespaced_with(
102                    client.clone(),
103                    &namespace,
104                    &ApiResource::from_gvk(&GroupVersionKind::try_from(
105                        &obj.types.unwrap_or_default(),
106                    )?),
107                );
108
109                let patch = json!({
110                    "metadata": {
111                        "finalizers": null
112                    }
113                });
114                let patch_params = PatchParams::default();
115                log::debug!("calling API");
116                match api.patch(&name, &patch_params, &Patch::Merge(&patch)).await {
117                    Ok(_) => log::debug!("Removed finalizer from '{}'", name),
118                    Err(e) => log::warn!("Failed to remove finalizer from '{}': {}", name, e),
119                }
120            } else {
121                log::warn!("Resource with UID '{}' not found in collected_data", uid);
122            }
123        }
124
125        Ok(())
126    }
127}
128
129struct CollectorBrief {
130    client: Client,
131    namespace: String,
132    api_resource: ApiResource,
133    spec: WatchSpec,
134    collected_data: CollectedDataContainer,
135    token: CancellationToken,
136}
137
138pub struct Collector {
139    token: CancellationToken,
140    tasks: JoinSet<Result<()>>,
141}
142
143impl Collector {
144    pub fn new_data() -> CollectedDataContainer {
145        CollectedDataContainer::new(Mutex::new(CollectedData::new()))
146    }
147
148    pub async fn new(
149        client: Client,
150        specs: Vec<WatchSpec>,
151        collected_data: CollectedDataContainer,
152    ) -> Result<Self> {
153        let token = CancellationToken::new();
154        let mut tasks = JoinSet::new();
155        for spec in specs {
156            let brief = CollectorBrief {
157                client: client.clone(),
158                namespace: spec.namespace.clone(),
159                collected_data: collected_data.clone(),
160                token: token.clone(),
161                api_resource: ApiResource::from_gvk(&GroupVersionKind::gvk(
162                    &spec.group,
163                    &spec.version,
164                    &spec.kind,
165                )),
166                spec,
167            };
168
169            tasks.spawn(async move { brief.start().await });
170        }
171
172        Ok(Collector { token, tasks })
173    }
174
175    pub async fn stop(&mut self) -> Result<()> {
176        assert!(!self.token.is_cancelled());
177
178        self.token.cancel();
179        let tasks = std::mem::take(&mut self.tasks);
180        log::debug!("joining all watcher");
181        let results = tasks.join_all().await;
182        let errors: Vec<Error> = results.into_iter().filter_map(|res| res.err()).collect();
183        log::debug!("num errors: {}", errors.len());
184        match errors.len() {
185            0 => Ok(()),
186            1 => Err(errors.into_iter().next().unwrap()),
187            _ => Err(Error::MultipleErrors(errors)),
188        }
189    }
190}
191
192impl CollectorBrief {
193    async fn handle_apply(&self, api: Api<DynamicObject>, obj: DynamicObject) -> Result<()> {
194        let name = obj.name_any();
195        let uid = obj.metadata.uid.clone().unwrap();
196        let mut data = self.collected_data.lock().await;
197        let is_marked_for_deletion = obj.metadata.deletion_timestamp.is_some();
198        let mut is_stored = (*data).contains(&uid);
199        let mut has_finalizer = obj.finalizers().contains(&FINALIZER_NAME.to_string());
200        if !is_stored && !is_marked_for_deletion {
201            if !has_finalizer {
202                let patch = json!({
203                    "metadata": {
204                        "finalizers": [FINALIZER_NAME]
205                    }
206                });
207                let patch_params = PatchParams::default();
208                match api.patch(&name, &patch_params, &Patch::Merge(&patch)).await {
209                    Ok(_) => {
210                        has_finalizer = true;
211                        log::debug!("Added finalizer to '{}'", name);
212                    }
213                    Err(e) => {
214                        log::debug!("Failed to add finalizer to '{}': {}", name, e);
215                    }
216                }
217            }
218        }
219        if is_marked_for_deletion {
220            if is_stored {
221                is_stored = false;
222                for (_, bucket) in &mut (*data).buckets {
223                    if bucket.allowed_operations.contains(&BucketOperation::Delete) {
224                        bucket.data.remove(&uid);
225                    } else {
226                        is_stored = true;
227                    }
228                }
229            }
230            if has_finalizer && !is_stored {
231                let patch = json!({
232                    "metadata": {
233                        "finalizers": null
234                    }
235                });
236                let patch_params = PatchParams::default();
237                match api.patch(&name, &patch_params, &Patch::Merge(&patch)).await {
238                    Ok(_) => log::debug!("Removed finalizer from '{}'", name),
239                    Err(e) => log::debug!("Failed to remove finalizer from '{}': {}", name, e),
240                }
241            }
242        } else {
243            let value = serde_json::to_value(&obj).unwrap_or_else(|_| serde_json::Value::Null);
244            let bucket = (*data)
245                .buckets
246                .entry(self.spec.name.clone())
247                .or_insert_with(Default::default);
248            if (!bucket.data.contains_key(&uid)
249                && bucket.allowed_operations.contains(&BucketOperation::Create))
250                || (bucket.data.contains_key(&uid)
251                    && bucket.allowed_operations.contains(&BucketOperation::Patch))
252            {
253                bucket.data.insert(uid, value);
254            }
255        }
256        Ok(())
257    }
258
259    async fn start(&self) -> Result<()> {
260        let api: Api<DynamicObject> =
261            Api::namespaced_with(self.client.clone(), &self.namespace, &self.api_resource);
262        let label_selector = self
263            .spec
264            .labels
265            .as_ref()
266            .and_then(|labels| {
267                Some(
268                    labels
269                        .iter()
270                        .map(|(k, v)| format!("{}={}", k, v))
271                        .collect::<Vec<_>>()
272                        .join(","),
273                )
274            })
275            .or_else(|| Some(String::new()))
276            .unwrap();
277        let field_selector = self
278            .spec
279            .fields
280            .as_ref()
281            .and_then(|fields| {
282                Some(
283                    fields
284                        .iter()
285                        .map(|(k, v)| format!("{}={}", k, v))
286                        .collect::<Vec<_>>()
287                        .join(","),
288                )
289            })
290            .or_else(|| Some(String::new()))
291            .unwrap();
292
293        let config = watcher::Config {
294            label_selector: Some(label_selector),
295            field_selector: Some(field_selector),
296            initial_list_strategy: InitialListStrategy::ListWatch,
297            ..watcher::Config::default()
298        };
299        let mut stream = watcher(api.clone(), config).boxed();
300
301        while let Some(event) = tokio::select! {
302            biased;
303            _ = self.token.cancelled() => {
304                log::debug!("Watcher for id '{}' received cancellation signal.", self.spec.name);
305                None
306            }
307            event = stream.next() => event,
308        } {
309            let result = match event {
310                Ok(Event::Apply(obj)) | Ok(Event::InitApply(obj)) => match obj.uid() {
311                    Some(_) => self.handle_apply(api.clone(), obj).await,
312                    None => Err(Error::NoUidError),
313                },
314                Ok(_) => Ok(()),
315                Err(e) => Err(Error::WatcherError(e)),
316            };
317            match result {
318                Ok(_) => {}
319                Err(e) => {
320                    log::warn!(
321                        "Error watching resource for watch '{}': {}",
322                        self.spec.name,
323                        e
324                    );
325                    sleep(Duration::from_secs(10)).await;
326                    log::debug!("Resuming to watch resource'{}'", self.spec.name);
327                }
328            }
329        }
330
331        log::debug!("Watcher for id '{}' is terminating.", self.spec.name);
332
333        Ok(())
334    }
335}