Skip to main content

sk_store/
manager.rs

1use std::sync::Arc;
2
3use sk_core::k8s::DynamicApiSet;
4use sk_core::prelude::*;
5use tokio::sync::{
6    Mutex,
7    mpsc,
8};
9use tokio::task::JoinSet;
10use tracing::*;
11
12use crate::config::TracerConfig;
13use crate::event::TraceAction;
14use crate::store::TraceStore;
15use crate::watchers::{
16    dyn_obj_watcher,
17    pod_watcher,
18};
19
20pub struct TraceManager {
21    config: TracerConfig,
22    store: Arc<Mutex<TraceStore>>,
23    ready_rx: mpsc::Receiver<bool>,
24    js: JoinSet<()>,
25}
26
27impl TraceManager {
28    pub async fn start(client: kube::Client, config: TracerConfig) -> anyhow::Result<Self> {
29        let mut apiset = DynamicApiSet::new(client.clone());
30
31        let (ready_tx, ready_rx): (mpsc::Sender<bool>, mpsc::Receiver<bool>) =
32            mpsc::channel(config.tracked_objects.len() + 1);
33        let (dyn_obj_tx, dyn_obj_rx): (dyn_obj_watcher::Sender, dyn_obj_watcher::Receiver) = mpsc::unbounded_channel();
34        let (pod_tx, pod_rx): (pod_watcher::Sender, pod_watcher::Receiver) = mpsc::unbounded_channel();
35
36        let mut js = JoinSet::new();
37        for gvk in config.tracked_objects.keys() {
38            let watcher =
39                dyn_obj_watcher::new_with_stream(gvk, &mut apiset, dyn_obj_tx.clone(), ready_tx.clone()).await?;
40            js.spawn(watcher.start());
41        }
42
43        let pw = pod_watcher::new_with_stream(client.clone(), pod_tx, ready_tx.clone())?;
44        js.spawn(pw.start());
45
46        let store = Arc::new(Mutex::new(TraceStore::new(config.clone(), apiset)));
47        js.spawn(handle_messages(dyn_obj_rx, pod_rx, store.clone()));
48
49        Ok(TraceManager { config, store, ready_rx, js })
50    }
51
52    pub fn get_store(&self) -> Arc<Mutex<TraceStore>> {
53        self.store.clone()
54    }
55
56    pub async fn shutdown(&mut self) {
57        self.js.shutdown().await;
58    }
59
60    pub async fn wait_ready(&mut self) {
61        for _ in 0..self.config.tracked_objects.len() + 1 {
62            let _ = self.ready_rx.recv().await;
63        }
64    }
65}
66
67pub(crate) async fn handle_messages(
68    mut dyn_obj_rx: dyn_obj_watcher::Receiver,
69    mut pod_rx: pod_watcher::Receiver,
70    m_store: Arc<Mutex<TraceStore>>,
71) -> () {
72    loop {
73        tokio::select! {
74            Some(request) = dyn_obj_rx.recv() => {
75                let mut store = m_store.lock().await;
76                let res = match request.action {
77                    TraceAction::ObjectApplied => store.create_or_update_obj(&request.obj, request.ts),
78                    TraceAction::ObjectDeleted => store.delete_obj(&request.obj, request.ts),
79                };
80                if let Err(err) = res {
81                    error!(
82                        "could not send dynamic object update for ({:?}, {}, {}): {err}",
83                        request.action,
84                        request.obj.namespaced_name(),
85                        request.ts,
86                    );
87                }
88            },
89            Some(request) = pod_rx.recv() => {
90                let mut store = m_store.lock().await;
91                if let Err(err) = store.record_pod_lifecycle(
92                    &request.ns_name,
93                    &request.maybe_pod,
94                    &request.lifecycle_data,
95                ).await {
96                    error!("could not send pod object update for ({}, {:?}, {:?}): {err}",
97                        request.ns_name, request.maybe_pod.map(|p| p.namespaced_name()), request.lifecycle_data);
98                }
99            },
100            else => break,
101        }
102    }
103}