1use std::sync::Arc;
2
3use sk_core::k8s::DynamicApiSet;
4use sk_core::prelude::*;
5use tokio::sync::{
6 Mutex,
7 mpsc,
8};
9use tokio::task::JoinSet;
10use tracing::*;
11
12use crate::config::TracerConfig;
13use crate::event::TraceAction;
14use crate::store::TraceStore;
15use crate::watchers::{
16 dyn_obj_watcher,
17 pod_watcher,
18};
19
20pub struct TraceManager {
21 config: TracerConfig,
22 store: Arc<Mutex<TraceStore>>,
23 ready_rx: mpsc::Receiver<bool>,
24 js: JoinSet<()>,
25}
26
27impl TraceManager {
28 pub async fn start(client: kube::Client, config: TracerConfig) -> anyhow::Result<Self> {
29 let mut apiset = DynamicApiSet::new(client.clone());
30
31 let (ready_tx, ready_rx): (mpsc::Sender<bool>, mpsc::Receiver<bool>) =
32 mpsc::channel(config.tracked_objects.len() + 1);
33 let (dyn_obj_tx, dyn_obj_rx): (dyn_obj_watcher::Sender, dyn_obj_watcher::Receiver) = mpsc::unbounded_channel();
34 let (pod_tx, pod_rx): (pod_watcher::Sender, pod_watcher::Receiver) = mpsc::unbounded_channel();
35
36 let mut js = JoinSet::new();
37 for gvk in config.tracked_objects.keys() {
38 let watcher =
39 dyn_obj_watcher::new_with_stream(gvk, &mut apiset, dyn_obj_tx.clone(), ready_tx.clone()).await?;
40 js.spawn(watcher.start());
41 }
42
43 let pw = pod_watcher::new_with_stream(client.clone(), pod_tx, ready_tx.clone())?;
44 js.spawn(pw.start());
45
46 let store = Arc::new(Mutex::new(TraceStore::new(config.clone(), apiset)));
47 js.spawn(handle_messages(dyn_obj_rx, pod_rx, store.clone()));
48
49 Ok(TraceManager { config, store, ready_rx, js })
50 }
51
52 pub fn get_store(&self) -> Arc<Mutex<TraceStore>> {
53 self.store.clone()
54 }
55
56 pub async fn shutdown(&mut self) {
57 self.js.shutdown().await;
58 }
59
60 pub async fn wait_ready(&mut self) {
61 for _ in 0..self.config.tracked_objects.len() + 1 {
62 let _ = self.ready_rx.recv().await;
63 }
64 }
65}
66
67pub(crate) async fn handle_messages(
68 mut dyn_obj_rx: dyn_obj_watcher::Receiver,
69 mut pod_rx: pod_watcher::Receiver,
70 m_store: Arc<Mutex<TraceStore>>,
71) -> () {
72 loop {
73 tokio::select! {
74 Some(request) = dyn_obj_rx.recv() => {
75 let mut store = m_store.lock().await;
76 let res = match request.action {
77 TraceAction::ObjectApplied => store.create_or_update_obj(&request.obj, request.ts),
78 TraceAction::ObjectDeleted => store.delete_obj(&request.obj, request.ts),
79 };
80 if let Err(err) = res {
81 error!(
82 "could not send dynamic object update for ({:?}, {}, {}): {err}",
83 request.action,
84 request.obj.namespaced_name(),
85 request.ts,
86 );
87 }
88 },
89 Some(request) = pod_rx.recv() => {
90 let mut store = m_store.lock().await;
91 if let Err(err) = store.record_pod_lifecycle(
92 &request.ns_name,
93 &request.maybe_pod,
94 &request.lifecycle_data,
95 ).await {
96 error!("could not send pod object update for ({}, {:?}, {:?}): {err}",
97 request.ns_name, request.maybe_pod.map(|p| p.namespaced_name()), request.lifecycle_data);
98 }
99 },
100 else => break,
101 }
102 }
103}