Skip to main content

sk_store/
trace.rs

1use std::collections::HashMap;
2
3use anyhow::bail;
4use clockabilly::prelude::*;
5use serde::{
6    Deserialize,
7    Serialize,
8};
9use sk_core::k8s::{
10    GVK,
11    PodLifecycleData,
12};
13use sk_core::time::duration_to_ts_from;
14use thiserror::Error;
15use tracing::*;
16
17use crate::CURRENT_TRACE_FORMAT_VERSION;
18use crate::config::TracerConfig;
19use crate::event::TraceEvent;
20use crate::index::TraceIndex;
21use crate::pod_owners_map::PodLifecyclesMap;
22
23#[derive(Debug, Error)]
24pub enum TraceError {
25    #[error(
26        "could not parse trace file\n\nIf this trace file is older than version 2, \
27        it is only parseable by SimKube <= 1.1.1.  Please see the release notes for details."
28    )]
29    ParseFailed(#[from] rmp_serde::decode::Error),
30}
31
32#[derive(Clone, Deserialize, Serialize)]
33pub struct ExportedTrace {
34    pub version: u16,
35    pub config: TracerConfig,
36    pub events: Vec<TraceEvent>,
37    pub index: TraceIndex,
38    pub pod_lifecycles: HashMap<(GVK, String), PodLifecyclesMap>,
39}
40
41impl Default for ExportedTrace {
42    fn default() -> Self {
43        ExportedTrace {
44            version: CURRENT_TRACE_FORMAT_VERSION,
45            config: TracerConfig::default(),
46            events: vec![],
47            index: TraceIndex::default(),
48            pod_lifecycles: HashMap::default(),
49        }
50    }
51}
52
53impl ExportedTrace {
54    pub fn import(data: Vec<u8>, maybe_duration: Option<&String>) -> anyhow::Result<ExportedTrace> {
55        let mut exported_trace = rmp_serde::from_slice::<ExportedTrace>(&data).map_err(TraceError::ParseFailed)?;
56
57        if exported_trace.version != CURRENT_TRACE_FORMAT_VERSION {
58            bail!("unsupported trace version: {}", exported_trace.version);
59        }
60
61        let trace_start_ts = exported_trace
62            .events
63            .first()
64            .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
65            .ts;
66        let mut trace_end_ts = exported_trace
67            .events
68            .last()
69            .unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
70            .ts;
71        if let Some(trace_duration_str) = maybe_duration {
72            trace_end_ts = duration_to_ts_from(trace_start_ts, trace_duration_str)?;
73            exported_trace.events.retain(|evt| evt.ts < trace_end_ts);
74
75            // Add an empty event to the very end to make sure the driver doesn't shut down early
76            exported_trace
77                .events
78                .push(TraceEvent { ts: trace_end_ts, ..Default::default() });
79        }
80
81        info!("Imported {} events between {trace_start_ts} and {trace_end_ts}", exported_trace.events.len());
82        Ok(exported_trace)
83    }
84
85    pub fn clone_with_events(&self, events: Vec<TraceEvent>) -> ExportedTrace {
86        let mut trace = self.clone();
87        trace.events = events;
88        trace
89    }
90
91    pub fn lookup_pod_lifecycle(
92        &self,
93        owner_gvk: &GVK,
94        owner_ns_name: &str,
95        pod_hash: u64,
96        seq: usize,
97    ) -> PodLifecycleData {
98        let maybe_lifecycle_data = self
99            .pod_lifecycles
100            .get(&(owner_gvk.clone(), owner_ns_name.into()))
101            .and_then(|l| l.get(&pod_hash));
102        match maybe_lifecycle_data {
103            Some(data) => data[seq % data.len()].clone(),
104            _ => PodLifecycleData::Empty,
105        }
106    }
107
108    pub fn append_event(&mut self, event: TraceEvent) {
109        self.events.push(event);
110    }
111
112    pub fn end_ts(&self) -> Option<i64> {
113        self.events.last().map(|evt| evt.ts)
114    }
115
116    pub fn events(&self) -> Vec<TraceEvent> {
117        self.events.clone()
118    }
119
120    pub fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool {
121        self.index.contains(gvk, ns_name)
122    }
123
124    pub fn iter(&self) -> TraceIterator<'_> {
125        TraceIterator::new(&self.events)
126    }
127
128    pub fn prepend_event(&mut self, event: TraceEvent) {
129        let mut tmp = vec![event];
130        tmp.append(&mut self.events);
131        self.events = tmp;
132    }
133
134    pub fn start_ts(&self) -> Option<i64> {
135        self.events.first().map(|evt| evt.ts)
136    }
137
138    pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
139        Ok(rmp_serde::to_vec_named(self)?)
140    }
141}
142
143pub struct TraceIterator<'a> {
144    events: &'a Vec<TraceEvent>,
145    idx: usize,
146}
147
148impl<'a> TraceIterator<'a> {
149    pub(crate) fn new(events: &'a Vec<TraceEvent>) -> Self {
150        TraceIterator { events, idx: 0 }
151    }
152}
153
154// Our iterator implementation iterates over all the events in timeseries order.  It returns the
155// current event, and the timestamp of the _next_ event.
156impl<'a> Iterator for TraceIterator<'a> {
157    type Item = (&'a TraceEvent, Option<i64>);
158
159    fn next(&mut self) -> Option<Self::Item> {
160        if self.events.is_empty() {
161            return None;
162        }
163
164        let ret = match self.idx {
165            i if i < self.events.len() - 1 => Some((&self.events[i], Some(self.events[i + 1].ts))),
166            i if i == self.events.len() - 1 => Some((&self.events[i], None)),
167            _ => None,
168        };
169
170        self.idx += 1;
171        ret
172    }
173}