1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
use std::collections::HashMap;
use anyhow::bail;
use clockabilly::{
Clockable,
UtcClock,
};
use sk_api::v1::ExportFilters;
use sk_core::jsonutils;
use sk_core::k8s::{
build_deletable,
PodExt,
PodLifecycleData,
GVK,
};
use sk_core::prelude::*;
use sk_core::time::duration_to_ts_from;
use thiserror::Error;
use tracing::*;
use crate::config::TracerConfig;
use crate::filter::filter_event;
use crate::pod_owners_map::PodOwnersMap;
use crate::{
ExportedTrace,
TraceAction,
TraceEvent,
TraceEventList,
TraceIndex,
TraceIterator,
TraceStorable,
CURRENT_TRACE_FORMAT_VERSION,
};
#[derive(Debug, Error)]
pub enum TraceStoreError {
#[error(
"could not parse trace file\n\nIf this trace file is older than version 2, \
it is only parseable by SimKube <= 1.1.1. Please see the release notes for details."
)]
ParseFailed(#[from] rmp_serde::decode::Error),
}
#[derive(Clone, Default)]
pub struct TraceStore {
pub(crate) config: TracerConfig,
pub(crate) events: TraceEventList,
pub(crate) pod_owners: PodOwnersMap,
pub(crate) index: TraceIndex,
}
// The TraceStore object is an in-memory store of a cluster trace. It keeps track of all the
// configured Kubernetes objects, as well as lifecycle data for any pods that are owned by the
// tracked objects. It also provides functionality for importing and exporting traces.
//
// Currently, the store just grows indefinitely, so will eventually run out of memory. At some
// point in the future we plan to implement garbage collection so this isn't a problem.
impl TraceStore {
pub fn new(config: TracerConfig) -> TraceStore {
TraceStore { config, ..Default::default() }
}
pub fn clone_with_events(&self, events: TraceEventList) -> TraceStore {
let mut store = self.clone();
store.events = events;
store
}
pub fn export(&self, start_ts: i64, end_ts: i64, filter: &ExportFilters) -> anyhow::Result<Vec<u8>> {
info!("Exporting objs between {start_ts} and {end_ts} with filters: {filter:?}");
// First, we collect all the events in our trace that match our configured filters. This
// will return an index of objects that we collected, and we set the keep_deleted flag =
// true so that in the second step, we keep pod data around even if the owning object was
// deleted before the trace ends.
let (events, index) = self.collect_events(start_ts, end_ts, filter, true)?;
let num_events = events.len();
// Collect all pod lifecycle data that is a) between the start and end times, and b) is
// owned by some object contained in the trace
let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index);
let data = rmp_serde::to_vec_named(&ExportedTrace {
version: CURRENT_TRACE_FORMAT_VERSION,
config: self.config.clone(),
events,
index,
pod_lifecycles,
})?;
info!("Exported {} events", num_events);
Ok(data)
}
pub fn export_all(&self) -> anyhow::Result<Vec<u8>> {
let (Some(start_ts), Some(end_ts)) = (self.start_ts(), self.end_ts()) else {
return Ok(vec![]);
};
self.export(start_ts, end_ts, &ExportFilters::default())
}
pub fn import(data: Vec<u8>, maybe_duration: &Option<String>) -> anyhow::Result<TraceStore> {
let exported_trace = rmp_serde::from_slice::<ExportedTrace>(&data).map_err(TraceStoreError::ParseFailed)?;
Self::from_exported_trace(exported_trace, maybe_duration)
}
// Note that _importing_ data into a trace store is lossy -- we don't store (or import) all of
// the metadata necessary to pick up a trace and continue. Instead, we just re-import enough
// information to be able to run a simulation off the trace store.
pub fn from_exported_trace(
mut exported_trace: ExportedTrace,
maybe_duration: &Option<String>,
) -> anyhow::Result<TraceStore> {
if exported_trace.version != CURRENT_TRACE_FORMAT_VERSION {
bail!("unsupported trace version: {}", exported_trace.version);
}
let trace_start_ts = exported_trace
.events
.first()
.unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
.ts;
let mut trace_end_ts = exported_trace
.events
.last()
.unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
.ts;
if let Some(trace_duration_str) = maybe_duration {
trace_end_ts = duration_to_ts_from(trace_start_ts, trace_duration_str)?;
exported_trace.events.retain(|evt| evt.ts < trace_end_ts);
// Add an empty event to the very end to make sure the driver doesn't shut down early
exported_trace
.events
.push(TraceEvent { ts: trace_end_ts, ..Default::default() });
}
info!("Imported {} events between {trace_start_ts} and {trace_end_ts}", exported_trace.events.len());
Ok(TraceStore {
config: exported_trace.config,
events: exported_trace.events.into(),
index: exported_trace.index,
pod_owners: PodOwnersMap::new_from_parts(exported_trace.pod_lifecycles, HashMap::new()),
})
}
pub(crate) fn collect_events(
&self,
start_ts: i64,
end_ts: i64,
filter: &ExportFilters,
keep_deleted: bool,
) -> anyhow::Result<(Vec<TraceEvent>, TraceIndex)> {
// TODO this is not a huge inefficiency but it is a little annoying to have
// an empty event at the start_ts if there aren't any events that happened
// before the start_ts
let mut events = vec![TraceEvent { ts: start_ts, ..Default::default() }];
// flattened_objects is a list of everything that happened before start_ts but is
// still present at start_ts -- i.e., it is our starting configuration.
let mut flattened_objects = HashMap::new();
let mut index = TraceIndex::new();
for (evt, _) in self.iter() {
// trace should be end-exclusive, so we use >= here: anything that is at the
// end_ts or greater gets discarded. The event list is stored in
// monotonically-increasing order so we are safe to break here.
if evt.ts >= end_ts {
break;
}
if let Some(new_evt) = filter_event(evt, filter) {
for obj in &new_evt.applied_objs {
let gvk = GVK::from_dynamic_obj(obj)?;
let ns_name = obj.namespaced_name();
if new_evt.ts < start_ts {
flattened_objects.insert(ns_name.clone(), obj.clone());
}
let hash = jsonutils::hash_option(obj.data.get("spec"));
index.insert(gvk, ns_name, hash);
}
for obj in &evt.deleted_objs {
let gvk = GVK::from_dynamic_obj(obj)?;
let ns_name = obj.namespaced_name();
if new_evt.ts < start_ts {
flattened_objects.remove(&ns_name);
}
if !keep_deleted {
index.remove(gvk, &ns_name);
}
}
if new_evt.ts >= start_ts {
events.push(new_evt.clone());
}
}
}
// events[0] is the empty event we inserted at the beginning, so we're guaranteed not to
// overwrite anything here.
events[0].applied_objs = flattened_objects.into_values().collect();
Ok((events, index))
}
}
impl TraceStorable for TraceStore {
// We use a swap-and-update operation for the index, which means that if we call
// create_or_update_obj from a refresh event, the _new_ index won't have the hash data
// available in it yet. So here we have to pass in a maybe_old_hash which is the value from
// the swapped-out data structure. If this is called from an `Applied` event, we just pass in
// `None` and look up the value in the current index (if the object didn't exist in the old
// index either, we'll do a second lookup in the new index, but that should be pretty fast).
fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option<u64>) -> EmptyResult {
let gvk = GVK::from_dynamic_obj(obj)?;
let ns_name = obj.namespaced_name();
let new_hash = jsonutils::hash_option(obj.data.get("spec"));
let old_hash = maybe_old_hash.or_else(|| self.index.get(&gvk, &ns_name));
if Some(new_hash) != old_hash {
self.events.append(ts, obj, TraceAction::ObjectApplied);
}
self.index.insert(gvk, ns_name, new_hash);
Ok(())
}
fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult {
let gvk = GVK::from_dynamic_obj(obj)?;
let ns_name = obj.namespaced_name();
self.events.append(ts, obj, TraceAction::ObjectDeleted);
self.index.remove(gvk, &ns_name);
Ok(())
}
fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult {
let mut old_gvk_index = self.index.take_gvk_index(gvk);
for obj in objs {
let ns_name = obj.namespaced_name();
let old_hash = old_gvk_index.remove(&ns_name);
self.create_or_update_obj(obj, ts, old_hash)?;
}
for ns_name in old_gvk_index.keys() {
self.delete_obj(&build_deletable(gvk, ns_name), ts)?;
}
Ok(())
}
fn lookup_pod_lifecycle(
&self,
owner_gvk: &GVK,
owner_ns_name: &str,
pod_hash: u64,
seq: usize,
) -> PodLifecycleData {
let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(owner_gvk, owner_ns_name, pod_hash);
match maybe_lifecycle_data {
Some(data) => data[seq % data.len()].clone(),
_ => PodLifecycleData::Empty,
}
}
// We assume that we are given a valid/correct lifecycle event here, so we will just
// blindly store whatever we are given. It's up to the caller (the pod watcher in this
// case) to ensure that the lifecycle data isn't incorrect.
fn record_pod_lifecycle(
&mut self,
ns_name: &str,
maybe_pod: Option<corev1::Pod>,
owners: Vec<metav1::OwnerReference>,
lifecycle_data: &PodLifecycleData,
) -> EmptyResult {
// If we've already stored data about this pod, we just update the existing entry
// This assumes that the pod spec is immutable/can't change. This is _largely_ true in
// current Kubernetes, but it may not be true in the future with in-place resource updates
// and so forth. (We're specifically not including labels and annotations in the hash
// because those _can_ change).
if self.pod_owners.has_pod(ns_name) {
self.pod_owners.update_pod_lifecycle(ns_name, lifecycle_data)?;
} else if let Some(pod) = &maybe_pod {
// Otherwise, we need to check if any of the pod's owners are tracked by us
for owner in &owners {
// Pods are guaranteed to have namespaces, so the unwrap is fine
let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), owner.name);
let owner_gvk = GVK::from_owner_ref(owner)?;
if !self.has_obj(&owner_gvk, &owner_ns_name) {
continue;
}
if !self.config.track_lifecycle_for(&owner_gvk) {
continue;
}
// We compute a hash of the podspec, because some types of owning objects may have
// multiple different types of running pods, and we want to track the lifecycle
// data for these separately. (For example, a volcanojob takes in a list of pod
// templates that each have their own replica counts)
//
// TODO - it's possible that hashing _everything_ may be too much. Are there types
// of data that are unique to each pod that won't materially impact the behaviour?
// This does occur for example with coredns's volume mounts. We may need to filter
// more things out from this and/or allow users to specify what is filtered out.
let hash = jsonutils::hash(&serde_json::to_value(pod.stable_spec()?)?);
self.pod_owners
.store_new_pod_lifecycle(ns_name, &owner_gvk, &owner_ns_name, hash, lifecycle_data);
break;
}
} else {
bail!("no pod ownership data found for {}, cannot store", ns_name);
}
Ok(())
}
fn config(&self) -> &TracerConfig {
&self.config
}
fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool {
self.index.contains(gvk, ns_name)
}
fn start_ts(&self) -> Option<i64> {
self.events.front().map(|evt| evt.ts)
}
fn end_ts(&self) -> Option<i64> {
self.events.back().map(|evt| evt.ts)
}
fn iter(&self) -> TraceIterator<'_> {
TraceIterator { events: &self.events, idx: 0 }
}
}
// Our iterator implementation iterates over all the events in timeseries order. It returns the
// current event, and the timestamp of the _next_ event.
impl<'a> Iterator for TraceIterator<'a> {
type Item = (&'a TraceEvent, Option<i64>);
fn next(&mut self) -> Option<Self::Item> {
if self.events.is_empty() {
return None;
}
let ret = match self.idx {
i if i < self.events.len() - 1 => Some((&self.events[i], Some(self.events[i + 1].ts))),
i if i == self.events.len() - 1 => Some((&self.events[i], None)),
_ => None,
};
self.idx += 1;
ret
}
}
#[cfg(test)]
mod test {
use super::*;
impl TraceStore {
pub fn sorted_objs_at(&self, end_ts: i64, filter: &ExportFilters) -> Vec<String> {
// To compute the list of tracked_objects at a particular timestamp, we _don't_ want to
// keep the deleted objects around, so we set that parameter to `false`.
let (_, index) = self.collect_events(0, end_ts, filter, false).expect("testing code");
let mut res = index.flattened_keys();
res.sort();
res
}
}
}