sk_store/
store.rs

1use std::collections::HashMap;
2
3use anyhow::bail;
4use clockabilly::prelude::*;
5use sk_api::v1::ExportFilters;
6use sk_core::jsonutils;
7use sk_core::k8s::{
8    build_deletable,
9    PodExt,
10    PodLifecycleData,
11    GVK,
12};
13use sk_core::prelude::*;
14use sk_core::time::duration_to_ts_from;
15use thiserror::Error;
16use tracing::*;
17
18use crate::config::TracerConfig;
19use crate::filter::filter_event;
20use crate::pod_owners_map::PodOwnersMap;
21use crate::{
22    ExportedTrace,
23    TraceAction,
24    TraceEvent,
25    TraceEventList,
26    TraceIndex,
27    TraceIterator,
28    TraceStorable,
29    CURRENT_TRACE_FORMAT_VERSION,
30};
31
32
33#[derive(Debug, Error)]
34pub enum TraceStoreError {
35    #[error(
36        "could not parse trace file\n\nIf this trace file is older than version 2, \
37        it is only parseable by SimKube <= 1.1.1.  Please see the release notes for details."
38    )]
39    ParseFailed(#[from] rmp_serde::decode::Error),
40}
41
42#[derive(Clone, Default)]
43pub struct TraceStore {
44    pub(crate) config: TracerConfig,
45    pub(crate) events: TraceEventList,
46    pub(crate) pod_owners: PodOwnersMap,
47    pub(crate) index: TraceIndex,
48}
49
50// The TraceStore object is an in-memory store of a cluster trace.  It keeps track of all the
51// configured Kubernetes objects, as well as lifecycle data for any pods that are owned by the
52// tracked objects.  It also provides functionality for importing and exporting traces.
53//
54// Currently, the store just grows indefinitely, so will eventually run out of memory.  At some
55// point in the future we plan to implement garbage collection so this isn't a problem.
56
57impl TraceStore {
58    pub fn new(config: TracerConfig) -> TraceStore {
59        TraceStore { config, ..Default::default() }
60    }
61
62    pub fn clone_with_events(&self, events: TraceEventList) -> TraceStore {
63        let mut store = self.clone();
64        store.events = events;
65        store
66    }
67
68    pub fn export(&self, start_ts: i64, end_ts: i64, filter: &ExportFilters) -> anyhow::Result<Vec<u8>> {
69        info!("Exporting objs between {start_ts} and {end_ts} with filters: {filter:?}");
70
71        // First, we collect all the events in our trace that match our configured filters.  This
72        // will return an index of objects that we collected, and we set the keep_deleted flag =
73        // true so that in the second step, we keep pod data around even if the owning object was
74        // deleted before the trace ends.
75        let (events, index) = self.collect_events(start_ts, end_ts, filter, true)?;
76        let num_events = events.len();
77
78        // Collect all pod lifecycle data that is a) between the start and end times, and b) is
79        // owned by some object contained in the trace
80        let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index);
81        let data = rmp_serde::to_vec_named(&ExportedTrace {
82            version: CURRENT_TRACE_FORMAT_VERSION,
83            config: self.config.clone(),
84            events,
85            index,
86            pod_lifecycles,
87        })?;
88
89        info!("Exported {} events", num_events);
90        Ok(data)
91    }
92
93    pub fn export_all(&self) -> anyhow::Result<Vec<u8>> {
94        let (Some(start_ts), Some(end_ts)) = (self.start_ts(), self.end_ts()) else {
95            return Ok(vec![]);
96        };
97        self.export(start_ts, end_ts, &ExportFilters::default())
98    }
99
100    pub fn import(data: Vec<u8>, maybe_duration: &Option<String>) -> anyhow::Result<TraceStore> {
101        let exported_trace = rmp_serde::from_slice::<ExportedTrace>(&data).map_err(TraceStoreError::ParseFailed)?;
102        Self::from_exported_trace(exported_trace, maybe_duration)
103    }
104
105    // Note that _importing_ data into a trace store is lossy -- we don't store (or import) all of
106    // the metadata necessary to pick up a trace and continue.  Instead, we just re-import enough
107    // information to be able to run a simulation off the trace store.
108    pub fn from_exported_trace(
109        mut exported_trace: ExportedTrace,
110        maybe_duration: &Option<String>,
111    ) -> anyhow::Result<TraceStore> {
112        if exported_trace.version != CURRENT_TRACE_FORMAT_VERSION {
113            bail!("unsupported trace version: {}", exported_trace.version);
114        }
115
116        let trace_start_ts = exported_trace
117            .events
118            .first()
119            .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
120            .ts;
121        let mut trace_end_ts = exported_trace
122            .events
123            .last()
124            .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
125            .ts;
126        if let Some(trace_duration_str) = maybe_duration {
127            trace_end_ts = duration_to_ts_from(trace_start_ts, trace_duration_str)?;
128            exported_trace.events.retain(|evt| evt.ts < trace_end_ts);
129
130            // Add an empty event to the very end to make sure the driver doesn't shut down early
131            exported_trace
132                .events
133                .push(TraceEvent { ts: trace_end_ts, ..Default::default() });
134        }
135
136        info!("Imported {} events between {trace_start_ts} and {trace_end_ts}", exported_trace.events.len());
137        Ok(TraceStore {
138            config: exported_trace.config,
139            events: exported_trace.events.into(),
140            index: exported_trace.index,
141            pod_owners: PodOwnersMap::new_from_parts(exported_trace.pod_lifecycles, HashMap::new()),
142        })
143    }
144
145    pub(crate) fn collect_events(
146        &self,
147        start_ts: i64,
148        end_ts: i64,
149        filter: &ExportFilters,
150        keep_deleted: bool,
151    ) -> anyhow::Result<(Vec<TraceEvent>, TraceIndex)> {
152        // TODO this is not a huge inefficiency but it is a little annoying to have
153        // an empty event at the start_ts if there aren't any events that happened
154        // before the start_ts
155        let mut events = vec![TraceEvent { ts: start_ts, ..Default::default() }];
156
157        // flattened_objects is a list of everything that happened before start_ts but is
158        // still present at start_ts -- i.e., it is our starting configuration.
159        let mut flattened_objects = HashMap::new();
160        let mut index = TraceIndex::new();
161
162        for (evt, _) in self.iter() {
163            // trace should be end-exclusive, so we use >= here: anything that is at the
164            // end_ts or greater gets discarded.  The event list is stored in
165            // monotonically-increasing order so we are safe to break here.
166            if evt.ts >= end_ts {
167                break;
168            }
169
170            if let Some(new_evt) = filter_event(evt, filter) {
171                for obj in &new_evt.applied_objs {
172                    let gvk = GVK::from_dynamic_obj(obj)?;
173                    let ns_name = obj.namespaced_name();
174                    if new_evt.ts < start_ts {
175                        flattened_objects.insert(ns_name.clone(), obj.clone());
176                    }
177                    let hash = jsonutils::hash_option(obj.data.get("spec"));
178                    index.insert(gvk, ns_name, hash);
179                }
180
181                for obj in &evt.deleted_objs {
182                    let gvk = GVK::from_dynamic_obj(obj)?;
183                    let ns_name = obj.namespaced_name();
184                    if new_evt.ts < start_ts {
185                        flattened_objects.remove(&ns_name);
186                    }
187                    if !keep_deleted {
188                        index.remove(gvk, &ns_name);
189                    }
190                }
191
192                if new_evt.ts >= start_ts {
193                    events.push(new_evt.clone());
194                }
195            }
196        }
197
198        // events[0] is the empty event we inserted at the beginning, so we're guaranteed not to
199        // overwrite anything here.
200        events[0].applied_objs = flattened_objects.into_values().collect();
201        Ok((events, index))
202    }
203}
204
205impl TraceStorable for TraceStore {
206    // We use a swap-and-update operation for the index, which means that if we call
207    // create_or_update_obj from a refresh event, the _new_ index won't have the hash data
208    // available in it yet.  So here we have to pass in a maybe_old_hash which is the value from
209    // the swapped-out data structure.  If this is called from an `Applied` event, we just pass in
210    // `None` and look up the value in the current index (if the object didn't exist in the old
211    // index either, we'll do a second lookup in the new index, but that should be pretty fast).
212    fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option<u64>) -> EmptyResult {
213        let gvk = GVK::from_dynamic_obj(obj)?;
214
215        let ns_name = obj.namespaced_name();
216        let new_hash = jsonutils::hash_option(obj.data.get("spec"));
217        let old_hash = maybe_old_hash.or_else(|| self.index.get(&gvk, &ns_name));
218
219        if Some(new_hash) != old_hash {
220            self.events.append(ts, obj, TraceAction::ObjectApplied);
221        }
222        self.index.insert(gvk, ns_name, new_hash);
223        Ok(())
224    }
225
226    fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult {
227        let gvk = GVK::from_dynamic_obj(obj)?;
228        let ns_name = obj.namespaced_name();
229        self.events.append(ts, obj, TraceAction::ObjectDeleted);
230        self.index.remove(gvk, &ns_name);
231        Ok(())
232    }
233
234    fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult {
235        let mut old_gvk_index = self.index.take_gvk_index(gvk);
236        for obj in objs {
237            let ns_name = obj.namespaced_name();
238            let old_hash = old_gvk_index.remove(&ns_name);
239            self.create_or_update_obj(obj, ts, old_hash)?;
240        }
241
242        for ns_name in old_gvk_index.keys() {
243            self.delete_obj(&build_deletable(gvk, ns_name), ts)?;
244        }
245        Ok(())
246    }
247
248    fn lookup_pod_lifecycle(
249        &self,
250        owner_gvk: &GVK,
251        owner_ns_name: &str,
252        pod_hash: u64,
253        seq: usize,
254    ) -> PodLifecycleData {
255        let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(owner_gvk, owner_ns_name, pod_hash);
256        match maybe_lifecycle_data {
257            Some(data) => data[seq % data.len()].clone(),
258            _ => PodLifecycleData::Empty,
259        }
260    }
261
262    // We assume that we are given a valid/correct lifecycle event here, so we will just
263    // blindly store whatever we are given.  It's up to the caller (the pod watcher in this
264    // case) to ensure that the lifecycle data isn't incorrect.
265    fn record_pod_lifecycle(
266        &mut self,
267        ns_name: &str,
268        maybe_pod: Option<corev1::Pod>,
269        owners: Vec<metav1::OwnerReference>,
270        lifecycle_data: &PodLifecycleData,
271    ) -> EmptyResult {
272        // If we've already stored data about this pod, we just update the existing entry
273        // This assumes that the pod spec is immutable/can't change.  This is _largely_ true in
274        // current Kubernetes, but it may not be true in the future with in-place resource updates
275        // and so forth.  (We're specifically not including labels and annotations in the hash
276        // because those _can_ change).
277        if self.pod_owners.has_pod(ns_name) {
278            self.pod_owners.update_pod_lifecycle(ns_name, lifecycle_data)?;
279        } else if let Some(pod) = &maybe_pod {
280            // Otherwise, we need to check if any of the pod's owners are tracked by us
281            for owner in &owners {
282                // Pods are guaranteed to have namespaces, so the unwrap is fine
283                let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), owner.name);
284                let owner_gvk = GVK::from_owner_ref(owner)?;
285                if !self.has_obj(&owner_gvk, &owner_ns_name) {
286                    continue;
287                }
288
289                if !self.config.track_lifecycle_for(&owner_gvk) {
290                    continue;
291                }
292
293                // We compute a hash of the podspec, because some types of owning objects may have
294                // multiple different types of running pods, and we want to track the lifecycle
295                // data for these separately.  (For example, a volcanojob takes in a list of pod
296                // templates that each have their own replica counts)
297                //
298                // TODO - it's possible that hashing _everything_ may be too much.  Are there types
299                // of data that are unique to each pod that won't materially impact the behaviour?
300                // This does occur for example with coredns's volume mounts.  We may need to filter
301                // more things out from this and/or allow users to specify what is filtered out.
302                let hash = jsonutils::hash(&serde_json::to_value(pod.stable_spec()?)?);
303                self.pod_owners
304                    .store_new_pod_lifecycle(ns_name, &owner_gvk, &owner_ns_name, hash, lifecycle_data);
305                break;
306            }
307        } else {
308            bail!("no pod ownership data found for {}, cannot store", ns_name);
309        }
310
311        Ok(())
312    }
313
314    fn config(&self) -> &TracerConfig {
315        &self.config
316    }
317
318    fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool {
319        self.index.contains(gvk, ns_name)
320    }
321
322    fn start_ts(&self) -> Option<i64> {
323        self.events.front().map(|evt| evt.ts)
324    }
325
326    fn end_ts(&self) -> Option<i64> {
327        self.events.back().map(|evt| evt.ts)
328    }
329
330    fn iter(&self) -> TraceIterator<'_> {
331        TraceIterator { events: &self.events, idx: 0 }
332    }
333}
334
335// Our iterator implementation iterates over all the events in timeseries order.  It returns the
336// current event, and the timestamp of the _next_ event.
337impl<'a> Iterator for TraceIterator<'a> {
338    type Item = (&'a TraceEvent, Option<i64>);
339
340    fn next(&mut self) -> Option<Self::Item> {
341        if self.events.is_empty() {
342            return None;
343        }
344
345        let ret = match self.idx {
346            i if i < self.events.len() - 1 => Some((&self.events[i], Some(self.events[i + 1].ts))),
347            i if i == self.events.len() - 1 => Some((&self.events[i], None)),
348            _ => None,
349        };
350
351        self.idx += 1;
352        ret
353    }
354}
355
356#[cfg(test)]
357#[cfg_attr(coverage, coverage(off))]
358mod test {
359    use super::*;
360
361    impl TraceStore {
362        pub fn sorted_objs_at(&self, end_ts: i64, filter: &ExportFilters) -> Vec<String> {
363            // To compute the list of tracked_objects at a particular timestamp, we _don't_ want to
364            // keep the deleted objects around, so we set that parameter to `false`.
365            let (_, index) = self.collect_events(0, end_ts, filter, false).expect("testing code");
366            let mut res = index.flattened_keys();
367            res.sort();
368            res
369        }
370    }
371}