Skip to main content

sk_store/
store.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::bail;
5use kube::Resource;
6use sk_api::v1::ExportFilters;
7use sk_core::jsonutils;
8use sk_core::k8s::{
9    DynamicApiSet,
10    GVK,
11    OwnersCache,
12    PodExt,
13    PodLifecycleData,
14    format_gvk_name,
15};
16use sk_core::prelude::*;
17use tokio::sync::Mutex;
18use tracing::*;
19
20use crate::config::TracerConfig;
21use crate::event::{
22    TraceAction,
23    TraceEvent,
24    append_event,
25};
26use crate::index::TraceIndex;
27use crate::pod_owners_map::PodOwnersMap;
28use crate::trace::ExportedTrace;
29
30pub struct TraceStore {
31    pub(crate) config: TracerConfig,
32    pub(crate) events: Vec<TraceEvent>,
33    pub(crate) pod_owners: PodOwnersMap,
34    pub(crate) index: TraceIndex,
35
36    owners_cache: Arc<Mutex<OwnersCache>>,
37}
38
39// The TraceStore object is an in-memory store of a cluster trace.  It keeps track of all the
40// configured Kubernetes objects, as well as lifecycle data for any pods that are owned by the
41// tracked objects.  It also provides functionality for importing and exporting traces.
42//
43// Currently, the store just grows indefinitely, so will eventually run out of memory.  At some
44// point in the future we plan to implement garbage collection so this isn't a problem.
45
46impl TraceStore {
47    pub fn new(config: TracerConfig, apiset: DynamicApiSet) -> TraceStore {
48        TraceStore {
49            config,
50            events: vec![],
51            pod_owners: PodOwnersMap::default(),
52            index: TraceIndex::default(),
53
54            owners_cache: Arc::new(Mutex::new(OwnersCache::new(apiset))),
55        }
56    }
57
58    pub async fn export(&self, start_ts: i64, end_ts: i64, filter: &ExportFilters) -> anyhow::Result<Vec<u8>> {
59        info!("Exporting objs between {start_ts} and {end_ts} with filters: {filter:?}");
60
61        // First, we collect all the events in our trace that match our configured filters.  This
62        // will return an index of objects that we collected, and we set the keep_deleted flag =
63        // true so that in the second step, we keep pod data around even if the owning object was
64        // deleted before the trace ends.
65        let (events, index) = self.collect_events(start_ts, end_ts, filter, true).await?;
66        let num_events = events.len();
67
68        // Collect all pod lifecycle data that is a) between the start and end times, and b) is
69        // owned by some object contained in the trace
70        let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index);
71        let data = ExportedTrace {
72            config: self.config.clone(),
73            events,
74            index,
75            pod_lifecycles,
76            ..Default::default()
77        }
78        .to_bytes()?;
79
80        info!("Exported {} events", num_events);
81        Ok(data)
82    }
83
84    pub(super) async fn collect_events(
85        &self,
86        start_ts: i64,
87        end_ts: i64,
88        filter: &ExportFilters,
89        keep_deleted: bool,
90    ) -> anyhow::Result<(Vec<TraceEvent>, TraceIndex)> {
91        // TODO this is not a huge inefficiency but it is a little annoying to have
92        // an empty event at the start_ts if there aren't any events that happened
93        // before the start_ts
94        let mut events = vec![TraceEvent { ts: start_ts, ..Default::default() }];
95
96        // flattened_objects is a list of everything that happened before start_ts but is
97        // still present at start_ts -- i.e., it is our starting configuration.
98        let mut flattened_objects = HashMap::new();
99        let mut index = TraceIndex::new();
100
101        for evt in self.events.iter() {
102            // trace should be end-exclusive, so we use >= here: anything that is at the
103            // end_ts or greater gets discarded.  The event list is stored in
104            // monotonically-increasing order so we are safe to break here.
105            if evt.ts >= end_ts {
106                break;
107            }
108
109            let mut filtered_applied_objs = vec![];
110            let mut filtered_deleted_objs = vec![];
111
112            for obj in &evt.applied_objs {
113                let gvk = GVK::from_dynamic_obj(obj)?;
114                let ns_name = obj.namespaced_name();
115
116                if object_matches_filter(obj, filter)
117                    || self.is_owned_by_tracked_object(&gvk, &ns_name, obj, &index).await?
118                {
119                    debug!("applied obj {} filtered out", format_gvk_name(&gvk, &ns_name));
120                    continue;
121                }
122
123                if evt.ts < start_ts {
124                    flattened_objects.insert(ns_name.clone(), obj.clone());
125                } else {
126                    filtered_applied_objs.push(obj.clone());
127                }
128                let hash = jsonutils::hash_option(obj.data.get("spec"));
129                index.insert(gvk, ns_name, hash);
130            }
131
132            for obj in &evt.deleted_objs {
133                let gvk = GVK::from_dynamic_obj(obj)?;
134                let ns_name = obj.namespaced_name();
135
136                if object_matches_filter(obj, filter)
137                    || self.is_owned_by_tracked_object(&gvk, &ns_name, obj, &index).await?
138                {
139                    debug!("deleted obj {} filtered out", format_gvk_name(&gvk, &ns_name));
140                    continue;
141                }
142
143                if evt.ts < start_ts {
144                    flattened_objects.remove(&ns_name);
145                } else {
146                    filtered_deleted_objs.push(obj.clone());
147                }
148
149                if !keep_deleted {
150                    index.remove(gvk, &ns_name);
151                }
152            }
153
154            // We can't filter on evt.ts >= start_ts earlier because we need to
155            // track all of the objects that existed before start_ts; the second
156            // boolean condition ensures that only non-empty events are added to the
157            // exported trace (either objects applied or deleted).
158            if evt.ts >= start_ts && !(filtered_applied_objs.is_empty() && filtered_deleted_objs.is_empty()) {
159                events.push(TraceEvent {
160                    ts: evt.ts,
161                    applied_objs: filtered_applied_objs,
162                    deleted_objs: filtered_deleted_objs,
163                });
164            }
165        }
166
167        // events[0] is the empty event we inserted at the beginning, so we're guaranteed not to
168        // overwrite anything here.
169        events[0].applied_objs = flattened_objects.into_values().collect();
170        Ok((events, index))
171    }
172
173    pub(super) fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult {
174        let gvk = GVK::from_dynamic_obj(obj)?;
175        let ns_name = obj.namespaced_name();
176
177        let new_hash = jsonutils::hash_option(obj.data.get("spec"));
178        let old_hash = self.index.get(&gvk, &ns_name);
179
180        if Some(new_hash) != old_hash {
181            append_event(&mut self.events, ts, obj, TraceAction::ObjectApplied);
182        }
183        self.index.insert(gvk, ns_name, new_hash);
184        Ok(())
185    }
186
187    pub(super) fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult {
188        let gvk = GVK::from_dynamic_obj(obj)?;
189        let ns_name = obj.namespaced_name();
190        append_event(&mut self.events, ts, obj, TraceAction::ObjectDeleted);
191        self.index.remove(gvk, &ns_name);
192        Ok(())
193    }
194
195    // We assume that we are given a valid/correct lifecycle event here, so we will just
196    // blindly store whatever we are given.  It's up to the caller (the pod watcher in this
197    // case) to ensure that the lifecycle data isn't incorrect.
198    pub(super) async fn record_pod_lifecycle(
199        &mut self,
200        ns_name: &str,
201        maybe_pod: &Option<corev1::Pod>,
202        lifecycle_data: &PodLifecycleData,
203    ) -> EmptyResult {
204        // If we've already stored data about this pod, we just update the existing entry
205        // This assumes that the pod spec is immutable/can't change.  This is _largely_ true in
206        // current Kubernetes, but it may not be true in the future with in-place resource updates
207        // and so forth.  (We're specifically not including labels and annotations in the hash
208        // because those _can_ change).
209        if self.pod_owners.has_pod(ns_name) {
210            self.pod_owners.update_pod_lifecycle(ns_name, lifecycle_data)?;
211        } else if let Some(pod) = maybe_pod {
212            let owners = self
213                .owners_cache
214                .lock()
215                .await
216                .lookup_by_name_or_obj(&corev1::Pod::gvk(), ns_name, maybe_pod.as_ref())
217                .await;
218
219            for owner in owners {
220                // Pods are guaranteed to have namespaces, so the unwrap is fine
221                let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), owner.name);
222                let owner_gvk = GVK::from_owner_ref(&owner)?;
223                if !self.index.contains(&owner_gvk, &owner_ns_name) {
224                    continue;
225                }
226
227                if !self.config.track_lifecycle_for(&owner_gvk) {
228                    continue;
229                }
230
231                // We compute a hash of the podspec, because some types of owning objects may have
232                // multiple different types of running pods, and we want to track the lifecycle
233                // data for these separately.  (For example, a volcanojob takes in a list of pod
234                // templates that each have their own replica counts)
235                //
236                // TODO - it's possible that hashing _everything_ may be too much.  Are there types
237                // of data that are unique to each pod that won't materially impact the behaviour?
238                // This does occur for example with coredns's volume mounts.  We may need to filter
239                // more things out from this and/or allow users to specify what is filtered out.
240                let hash = jsonutils::hash(&serde_json::to_value(pod.stable_spec()?)?);
241                self.pod_owners
242                    .store_new_pod_lifecycle(ns_name, &owner_gvk, &owner_ns_name, hash, lifecycle_data);
243                break;
244            }
245        } else {
246            bail!("no pod ownership data found for {}, cannot store", ns_name);
247        }
248
249        Ok(())
250    }
251
252    async fn is_owned_by_tracked_object(
253        &self,
254        gvk: &GVK,
255        ns_name: &str,
256        obj: &(impl Resource + Sync),
257        // We specifically DO NOT use self.index here, because the index at time t_n
258        // probably has ~little relation to whatever the index looked like at the
259        // time we're performing the export.
260        index: &TraceIndex,
261    ) -> anyhow::Result<bool> {
262        // If any of the owners of this object are exported, we don't want to also
263        // export this object; in the simulation replay, it would result in duplicate
264        // objects being created
265        let owners = self
266            .owners_cache
267            .lock()
268            .await
269            .lookup_by_name_or_obj(gvk, ns_name, Some(obj))
270            .await;
271
272        for owner in owners {
273            // TODO right now we only look up _namespaced_ owners, not cluster-scoped; in
274            // principle, it's possible to get the cluster-scoped owners, since the owner
275            // cache knows what they are, but passing that information back up to us is
276            // sortof annoying and I don't want to bother right now.
277            let owner_ns_name = format!("{}/{}", obj.namespace().unwrap(), owner.name);
278            let owner_gvk = GVK::from_owner_ref(&owner)?;
279            if index.contains(&owner_gvk, &owner_ns_name) {
280                return Ok(true);
281            }
282        }
283        Ok(false)
284    }
285}
286
287fn object_matches_filter(obj: &DynamicObject, f: &ExportFilters) -> bool {
288    obj.metadata
289        .namespace
290        .as_ref()
291        .is_some_and(|ns| f.excluded_namespaces.contains(ns))
292        || obj
293            .metadata
294            .owner_references
295            .as_ref()
296            .is_some_and(|owners| owners.iter().any(|owner| &owner.kind == "DaemonSet"))
297        // TODO: maybe don't call unwrap here?  Right now we panic if the user specifies
298        // an invalid label selector.  Or, maybe it doesn't matter once we write the CLI
299        // tool.
300        || f.excluded_labels.iter().any(|sel| obj.matches(sel).unwrap())
301}
302
303#[cfg(test)]
304#[cfg_attr(coverage, coverage(off))]
305mod test {
306    use super::*;
307
308    impl TraceStore {
309        // This is really stupid to have async, it's a consequence of collect_events now
310        // querying ownership information.... probably should fix this at some point
311        pub async fn objs_at(&self, end_ts: i64, filter: &ExportFilters) -> Vec<String> {
312            // To compute the list of tracked_objects at a particular timestamp, we _don't_ want to
313            // keep the deleted objects around, so we set that parameter to `false`.
314            let (_, index) = self.collect_events(0, end_ts, filter, false).await.expect("testing code");
315            index.flattened_keys()
316        }
317    }
318}