1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::bail;
5use kube::Resource;
6use sk_api::v1::ExportFilters;
7use sk_core::jsonutils;
8use sk_core::k8s::{
9 DynamicApiSet,
10 GVK,
11 OwnersCache,
12 PodExt,
13 PodLifecycleData,
14 format_gvk_name,
15};
16use sk_core::prelude::*;
17use tokio::sync::Mutex;
18use tracing::*;
19
20use crate::config::TracerConfig;
21use crate::event::{
22 TraceAction,
23 TraceEvent,
24 append_event,
25};
26use crate::index::TraceIndex;
27use crate::pod_owners_map::PodOwnersMap;
28use crate::trace::ExportedTrace;
29
30pub struct TraceStore {
31 pub(crate) config: TracerConfig,
32 pub(crate) events: Vec<TraceEvent>,
33 pub(crate) pod_owners: PodOwnersMap,
34 pub(crate) index: TraceIndex,
35
36 owners_cache: Arc<Mutex<OwnersCache>>,
37}
38
39impl TraceStore {
47 pub fn new(config: TracerConfig, apiset: DynamicApiSet) -> TraceStore {
48 TraceStore {
49 config,
50 events: vec![],
51 pod_owners: PodOwnersMap::default(),
52 index: TraceIndex::default(),
53
54 owners_cache: Arc::new(Mutex::new(OwnersCache::new(apiset))),
55 }
56 }
57
58 pub async fn export(&self, start_ts: i64, end_ts: i64, filter: &ExportFilters) -> anyhow::Result<Vec<u8>> {
59 info!("Exporting objs between {start_ts} and {end_ts} with filters: {filter:?}");
60
61 let (events, index) = self.collect_events(start_ts, end_ts, filter, true).await?;
66 let num_events = events.len();
67
68 let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index);
71 let data = ExportedTrace {
72 config: self.config.clone(),
73 events,
74 index,
75 pod_lifecycles,
76 ..Default::default()
77 }
78 .to_bytes()?;
79
80 info!("Exported {} events", num_events);
81 Ok(data)
82 }
83
84 pub(super) async fn collect_events(
85 &self,
86 start_ts: i64,
87 end_ts: i64,
88 filter: &ExportFilters,
89 keep_deleted: bool,
90 ) -> anyhow::Result<(Vec<TraceEvent>, TraceIndex)> {
91 let mut events = vec![TraceEvent { ts: start_ts, ..Default::default() }];
95
96 let mut flattened_objects = HashMap::new();
99 let mut index = TraceIndex::new();
100
101 for evt in self.events.iter() {
102 if evt.ts >= end_ts {
106 break;
107 }
108
109 let mut filtered_applied_objs = vec![];
110 let mut filtered_deleted_objs = vec![];
111
112 for obj in &evt.applied_objs {
113 let gvk = GVK::from_dynamic_obj(obj)?;
114 let ns_name = obj.namespaced_name();
115
116 if object_matches_filter(obj, filter)
117 || self.is_owned_by_tracked_object(&gvk, &ns_name, obj, &index).await?
118 {
119 debug!("applied obj {} filtered out", format_gvk_name(&gvk, &ns_name));
120 continue;
121 }
122
123 if evt.ts < start_ts {
124 flattened_objects.insert(ns_name.clone(), obj.clone());
125 } else {
126 filtered_applied_objs.push(obj.clone());
127 }
128 let hash = jsonutils::hash_option(obj.data.get("spec"));
129 index.insert(gvk, ns_name, hash);
130 }
131
132 for obj in &evt.deleted_objs {
133 let gvk = GVK::from_dynamic_obj(obj)?;
134 let ns_name = obj.namespaced_name();
135
136 if object_matches_filter(obj, filter)
137 || self.is_owned_by_tracked_object(&gvk, &ns_name, obj, &index).await?
138 {
139 debug!("deleted obj {} filtered out", format_gvk_name(&gvk, &ns_name));
140 continue;
141 }
142
143 if evt.ts < start_ts {
144 flattened_objects.remove(&ns_name);
145 } else {
146 filtered_deleted_objs.push(obj.clone());
147 }
148
149 if !keep_deleted {
150 index.remove(gvk, &ns_name);
151 }
152 }
153
154 if evt.ts >= start_ts && !(filtered_applied_objs.is_empty() && filtered_deleted_objs.is_empty()) {
159 events.push(TraceEvent {
160 ts: evt.ts,
161 applied_objs: filtered_applied_objs,
162 deleted_objs: filtered_deleted_objs,
163 });
164 }
165 }
166
167 events[0].applied_objs = flattened_objects.into_values().collect();
170 Ok((events, index))
171 }
172
173 pub(super) fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult {
174 let gvk = GVK::from_dynamic_obj(obj)?;
175 let ns_name = obj.namespaced_name();
176
177 let new_hash = jsonutils::hash_option(obj.data.get("spec"));
178 let old_hash = self.index.get(&gvk, &ns_name);
179
180 if Some(new_hash) != old_hash {
181 append_event(&mut self.events, ts, obj, TraceAction::ObjectApplied);
182 }
183 self.index.insert(gvk, ns_name, new_hash);
184 Ok(())
185 }
186
187 pub(super) fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult {
188 let gvk = GVK::from_dynamic_obj(obj)?;
189 let ns_name = obj.namespaced_name();
190 append_event(&mut self.events, ts, obj, TraceAction::ObjectDeleted);
191 self.index.remove(gvk, &ns_name);
192 Ok(())
193 }
194
195 pub(super) async fn record_pod_lifecycle(
199 &mut self,
200 ns_name: &str,
201 maybe_pod: &Option<corev1::Pod>,
202 lifecycle_data: &PodLifecycleData,
203 ) -> EmptyResult {
204 if self.pod_owners.has_pod(ns_name) {
210 self.pod_owners.update_pod_lifecycle(ns_name, lifecycle_data)?;
211 } else if let Some(pod) = maybe_pod {
212 let owners = self
213 .owners_cache
214 .lock()
215 .await
216 .lookup_by_name_or_obj(&corev1::Pod::gvk(), ns_name, maybe_pod.as_ref())
217 .await;
218
219 for owner in owners {
220 let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), owner.name);
222 let owner_gvk = GVK::from_owner_ref(&owner)?;
223 if !self.index.contains(&owner_gvk, &owner_ns_name) {
224 continue;
225 }
226
227 if !self.config.track_lifecycle_for(&owner_gvk) {
228 continue;
229 }
230
231 let hash = jsonutils::hash(&serde_json::to_value(pod.stable_spec()?)?);
241 self.pod_owners
242 .store_new_pod_lifecycle(ns_name, &owner_gvk, &owner_ns_name, hash, lifecycle_data);
243 break;
244 }
245 } else {
246 bail!("no pod ownership data found for {}, cannot store", ns_name);
247 }
248
249 Ok(())
250 }
251
252 async fn is_owned_by_tracked_object(
253 &self,
254 gvk: &GVK,
255 ns_name: &str,
256 obj: &(impl Resource + Sync),
257 index: &TraceIndex,
261 ) -> anyhow::Result<bool> {
262 let owners = self
266 .owners_cache
267 .lock()
268 .await
269 .lookup_by_name_or_obj(gvk, ns_name, Some(obj))
270 .await;
271
272 for owner in owners {
273 let owner_ns_name = format!("{}/{}", obj.namespace().unwrap(), owner.name);
278 let owner_gvk = GVK::from_owner_ref(&owner)?;
279 if index.contains(&owner_gvk, &owner_ns_name) {
280 return Ok(true);
281 }
282 }
283 Ok(false)
284 }
285}
286
287fn object_matches_filter(obj: &DynamicObject, f: &ExportFilters) -> bool {
288 obj.metadata
289 .namespace
290 .as_ref()
291 .is_some_and(|ns| f.excluded_namespaces.contains(ns))
292 || obj
293 .metadata
294 .owner_references
295 .as_ref()
296 .is_some_and(|owners| owners.iter().any(|owner| &owner.kind == "DaemonSet"))
297 || f.excluded_labels.iter().any(|sel| obj.matches(sel).unwrap())
301}
302
303#[cfg(test)]
304#[cfg_attr(coverage, coverage(off))]
305mod test {
306 use super::*;
307
308 impl TraceStore {
309 pub async fn objs_at(&self, end_ts: i64, filter: &ExportFilters) -> Vec<String> {
312 let (_, index) = self.collect_events(0, end_ts, filter, false).await.expect("testing code");
315 index.flattened_keys()
316 }
317 }
318}