1use std::collections::HashMap;
2
3use anyhow::bail;
4use clockabilly::prelude::*;
5use sk_api::v1::ExportFilters;
6use sk_core::jsonutils;
7use sk_core::k8s::{
8 build_deletable,
9 PodExt,
10 PodLifecycleData,
11 GVK,
12};
13use sk_core::prelude::*;
14use sk_core::time::duration_to_ts_from;
15use thiserror::Error;
16use tracing::*;
17
18use crate::config::TracerConfig;
19use crate::filter::filter_event;
20use crate::pod_owners_map::PodOwnersMap;
21use crate::{
22 ExportedTrace,
23 TraceAction,
24 TraceEvent,
25 TraceEventList,
26 TraceIndex,
27 TraceIterator,
28 TraceStorable,
29 CURRENT_TRACE_FORMAT_VERSION,
30};
31
32
33#[derive(Debug, Error)]
34pub enum TraceStoreError {
35 #[error(
36 "could not parse trace file\n\nIf this trace file is older than version 2, \
37 it is only parseable by SimKube <= 1.1.1. Please see the release notes for details."
38 )]
39 ParseFailed(#[from] rmp_serde::decode::Error),
40}
41
42#[derive(Clone, Default)]
43pub struct TraceStore {
44 pub(crate) config: TracerConfig,
45 pub(crate) events: TraceEventList,
46 pub(crate) pod_owners: PodOwnersMap,
47 pub(crate) index: TraceIndex,
48}
49
50impl TraceStore {
58 pub fn new(config: TracerConfig) -> TraceStore {
59 TraceStore { config, ..Default::default() }
60 }
61
62 pub fn clone_with_events(&self, events: TraceEventList) -> TraceStore {
63 let mut store = self.clone();
64 store.events = events;
65 store
66 }
67
68 pub fn export(&self, start_ts: i64, end_ts: i64, filter: &ExportFilters) -> anyhow::Result<Vec<u8>> {
69 info!("Exporting objs between {start_ts} and {end_ts} with filters: {filter:?}");
70
71 let (events, index) = self.collect_events(start_ts, end_ts, filter, true)?;
76 let num_events = events.len();
77
78 let pod_lifecycles = self.pod_owners.filter(start_ts, end_ts, &index);
81 let data = rmp_serde::to_vec_named(&ExportedTrace {
82 version: CURRENT_TRACE_FORMAT_VERSION,
83 config: self.config.clone(),
84 events,
85 index,
86 pod_lifecycles,
87 })?;
88
89 info!("Exported {} events", num_events);
90 Ok(data)
91 }
92
93 pub fn export_all(&self) -> anyhow::Result<Vec<u8>> {
94 let (Some(start_ts), Some(end_ts)) = (self.start_ts(), self.end_ts()) else {
95 return Ok(vec![]);
96 };
97 self.export(start_ts, end_ts, &ExportFilters::default())
98 }
99
100 pub fn import(data: Vec<u8>, maybe_duration: &Option<String>) -> anyhow::Result<TraceStore> {
101 let exported_trace = rmp_serde::from_slice::<ExportedTrace>(&data).map_err(TraceStoreError::ParseFailed)?;
102 Self::from_exported_trace(exported_trace, maybe_duration)
103 }
104
105 pub fn from_exported_trace(
109 mut exported_trace: ExportedTrace,
110 maybe_duration: &Option<String>,
111 ) -> anyhow::Result<TraceStore> {
112 if exported_trace.version != CURRENT_TRACE_FORMAT_VERSION {
113 bail!("unsupported trace version: {}", exported_trace.version);
114 }
115
116 let trace_start_ts = exported_trace
117 .events
118 .first()
119 .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
120 .ts;
121 let mut trace_end_ts = exported_trace
122 .events
123 .last()
124 .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
125 .ts;
126 if let Some(trace_duration_str) = maybe_duration {
127 trace_end_ts = duration_to_ts_from(trace_start_ts, trace_duration_str)?;
128 exported_trace.events.retain(|evt| evt.ts < trace_end_ts);
129
130 exported_trace
132 .events
133 .push(TraceEvent { ts: trace_end_ts, ..Default::default() });
134 }
135
136 info!("Imported {} events between {trace_start_ts} and {trace_end_ts}", exported_trace.events.len());
137 Ok(TraceStore {
138 config: exported_trace.config,
139 events: exported_trace.events.into(),
140 index: exported_trace.index,
141 pod_owners: PodOwnersMap::new_from_parts(exported_trace.pod_lifecycles, HashMap::new()),
142 })
143 }
144
145 pub(crate) fn collect_events(
146 &self,
147 start_ts: i64,
148 end_ts: i64,
149 filter: &ExportFilters,
150 keep_deleted: bool,
151 ) -> anyhow::Result<(Vec<TraceEvent>, TraceIndex)> {
152 let mut events = vec![TraceEvent { ts: start_ts, ..Default::default() }];
156
157 let mut flattened_objects = HashMap::new();
160 let mut index = TraceIndex::new();
161
162 for (evt, _) in self.iter() {
163 if evt.ts >= end_ts {
167 break;
168 }
169
170 if let Some(new_evt) = filter_event(evt, filter) {
171 for obj in &new_evt.applied_objs {
172 let gvk = GVK::from_dynamic_obj(obj)?;
173 let ns_name = obj.namespaced_name();
174 if new_evt.ts < start_ts {
175 flattened_objects.insert(ns_name.clone(), obj.clone());
176 }
177 let hash = jsonutils::hash_option(obj.data.get("spec"));
178 index.insert(gvk, ns_name, hash);
179 }
180
181 for obj in &evt.deleted_objs {
182 let gvk = GVK::from_dynamic_obj(obj)?;
183 let ns_name = obj.namespaced_name();
184 if new_evt.ts < start_ts {
185 flattened_objects.remove(&ns_name);
186 }
187 if !keep_deleted {
188 index.remove(gvk, &ns_name);
189 }
190 }
191
192 if new_evt.ts >= start_ts {
193 events.push(new_evt.clone());
194 }
195 }
196 }
197
198 events[0].applied_objs = flattened_objects.into_values().collect();
201 Ok((events, index))
202 }
203}
204
205impl TraceStorable for TraceStore {
206 fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option<u64>) -> EmptyResult {
213 let gvk = GVK::from_dynamic_obj(obj)?;
214
215 let ns_name = obj.namespaced_name();
216 let new_hash = jsonutils::hash_option(obj.data.get("spec"));
217 let old_hash = maybe_old_hash.or_else(|| self.index.get(&gvk, &ns_name));
218
219 if Some(new_hash) != old_hash {
220 self.events.append(ts, obj, TraceAction::ObjectApplied);
221 }
222 self.index.insert(gvk, ns_name, new_hash);
223 Ok(())
224 }
225
226 fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) -> EmptyResult {
227 let gvk = GVK::from_dynamic_obj(obj)?;
228 let ns_name = obj.namespaced_name();
229 self.events.append(ts, obj, TraceAction::ObjectDeleted);
230 self.index.remove(gvk, &ns_name);
231 Ok(())
232 }
233
234 fn update_all_objs_for_gvk(&mut self, gvk: &GVK, objs: &[DynamicObject], ts: i64) -> EmptyResult {
235 let mut old_gvk_index = self.index.take_gvk_index(gvk);
236 for obj in objs {
237 let ns_name = obj.namespaced_name();
238 let old_hash = old_gvk_index.remove(&ns_name);
239 self.create_or_update_obj(obj, ts, old_hash)?;
240 }
241
242 for ns_name in old_gvk_index.keys() {
243 self.delete_obj(&build_deletable(gvk, ns_name), ts)?;
244 }
245 Ok(())
246 }
247
248 fn lookup_pod_lifecycle(
249 &self,
250 owner_gvk: &GVK,
251 owner_ns_name: &str,
252 pod_hash: u64,
253 seq: usize,
254 ) -> PodLifecycleData {
255 let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(owner_gvk, owner_ns_name, pod_hash);
256 match maybe_lifecycle_data {
257 Some(data) => data[seq % data.len()].clone(),
258 _ => PodLifecycleData::Empty,
259 }
260 }
261
262 fn record_pod_lifecycle(
266 &mut self,
267 ns_name: &str,
268 maybe_pod: Option<corev1::Pod>,
269 owners: Vec<metav1::OwnerReference>,
270 lifecycle_data: &PodLifecycleData,
271 ) -> EmptyResult {
272 if self.pod_owners.has_pod(ns_name) {
278 self.pod_owners.update_pod_lifecycle(ns_name, lifecycle_data)?;
279 } else if let Some(pod) = &maybe_pod {
280 for owner in &owners {
282 let owner_ns_name = format!("{}/{}", pod.namespace().unwrap(), owner.name);
284 let owner_gvk = GVK::from_owner_ref(owner)?;
285 if !self.has_obj(&owner_gvk, &owner_ns_name) {
286 continue;
287 }
288
289 if !self.config.track_lifecycle_for(&owner_gvk) {
290 continue;
291 }
292
293 let hash = jsonutils::hash(&serde_json::to_value(pod.stable_spec()?)?);
303 self.pod_owners
304 .store_new_pod_lifecycle(ns_name, &owner_gvk, &owner_ns_name, hash, lifecycle_data);
305 break;
306 }
307 } else {
308 bail!("no pod ownership data found for {}, cannot store", ns_name);
309 }
310
311 Ok(())
312 }
313
314 fn config(&self) -> &TracerConfig {
315 &self.config
316 }
317
318 fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool {
319 self.index.contains(gvk, ns_name)
320 }
321
322 fn start_ts(&self) -> Option<i64> {
323 self.events.front().map(|evt| evt.ts)
324 }
325
326 fn end_ts(&self) -> Option<i64> {
327 self.events.back().map(|evt| evt.ts)
328 }
329
330 fn iter(&self) -> TraceIterator<'_> {
331 TraceIterator { events: &self.events, idx: 0 }
332 }
333}
334
335impl<'a> Iterator for TraceIterator<'a> {
338 type Item = (&'a TraceEvent, Option<i64>);
339
340 fn next(&mut self) -> Option<Self::Item> {
341 if self.events.is_empty() {
342 return None;
343 }
344
345 let ret = match self.idx {
346 i if i < self.events.len() - 1 => Some((&self.events[i], Some(self.events[i + 1].ts))),
347 i if i == self.events.len() - 1 => Some((&self.events[i], None)),
348 _ => None,
349 };
350
351 self.idx += 1;
352 ret
353 }
354}
355
356#[cfg(test)]
357#[cfg_attr(coverage, coverage(off))]
358mod test {
359 use super::*;
360
361 impl TraceStore {
362 pub fn sorted_objs_at(&self, end_ts: i64, filter: &ExportFilters) -> Vec<String> {
363 let (_, index) = self.collect_events(0, end_ts, filter, false).expect("testing code");
366 let mut res = index.flattened_keys();
367 res.sort();
368 res
369 }
370 }
371}