use std::collections::HashMap;
use anyhow::bail;
use clockabilly::prelude::*;
use kube::api::DynamicObject;
use serde::{
Deserialize,
Serialize,
};
use sk_core::external_storage::{
ObjectStoreWrapper,
SkObjectStore,
};
use sk_core::k8s::{
GVK,
PodLifecycleData,
};
use sk_core::prelude::*;
use sk_core::time::duration_to_ts_from;
use thiserror::Error;
use tracing::*;
use crate::CURRENT_TRACE_FORMAT_VERSION;
use crate::event::TraceEvent;
use crate::index::TraceIndex;
use crate::pod_owners_map::PodLifecyclesMap;
#[derive(Debug, Error)]
pub enum TraceError {
#[error(
"could not parse trace file\n\nIf this trace file is older than version 2, \
it is only parseable by SimKube <= 1.1.1. Please see the release notes for details."
)]
ParseFailed(#[from] rmp_serde::decode::Error),
}
#[derive(Clone, Deserialize, Serialize)]
pub struct ExportedTrace {
pub version: u16,
pub config: TracerConfig,
pub events: Vec<TraceEvent>,
pub index: TraceIndex,
pub pod_lifecycles: HashMap<(GVK, String), PodLifecyclesMap>,
}
impl Default for ExportedTrace {
fn default() -> Self {
ExportedTrace {
version: CURRENT_TRACE_FORMAT_VERSION,
config: TracerConfig::default(),
events: vec![],
index: TraceIndex::default(),
pod_lifecycles: HashMap::default(),
}
}
}
impl ExportedTrace {
pub async fn from_path(path: &str) -> anyhow::Result<Self> {
let object_store = SkObjectStore::new(path)?;
let trace_data = object_store.get().await?.to_vec();
ExportedTrace::import(trace_data, None)
}
pub fn new_with_events(events: Vec<TraceEvent>) -> ExportedTrace {
ExportedTrace { events, ..Default::default() }
}
pub fn import(data: Vec<u8>, maybe_duration: Option<&String>) -> anyhow::Result<ExportedTrace> {
let mut exported_trace = rmp_serde::from_slice::<ExportedTrace>(&data).map_err(TraceError::ParseFailed)?;
if exported_trace.version != CURRENT_TRACE_FORMAT_VERSION {
bail!("unsupported trace version: {}", exported_trace.version);
}
let trace_start_ts = exported_trace
.events
.first()
.unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
.ts;
let mut trace_end_ts = exported_trace
.events
.last()
.unwrap_or(&TraceEvent { ts: UtcClock.now_ts(), ..Default::default() })
.ts;
if let Some(trace_duration_str) = maybe_duration {
trace_end_ts = duration_to_ts_from(trace_start_ts, trace_duration_str)?;
exported_trace.events.retain(|evt| evt.ts < trace_end_ts);
exported_trace
.events
.push(TraceEvent { ts: trace_end_ts, ..Default::default() });
}
info!("Imported {} events between {trace_start_ts} and {trace_end_ts}", exported_trace.events.len());
Ok(exported_trace)
}
pub fn lookup_pod_lifecycle(
&self,
owner_gvk: &GVK,
owner_ns_name: &str,
pod_hash: u64,
seq: usize,
) -> PodLifecycleData {
let maybe_lifecycle_data = self
.pod_lifecycles
.get(&(owner_gvk.clone(), owner_ns_name.into()))
.and_then(|l| l.get(&pod_hash));
match maybe_lifecycle_data {
Some(data) => data[seq % data.len()].clone(),
_ => PodLifecycleData::Empty,
}
}
pub fn append_event(&mut self, event: TraceEvent) {
self.events.push(event);
}
pub fn end_ts(&self) -> Option<i64> {
self.events.last().map(|evt| evt.ts)
}
pub fn events(&self) -> Vec<TraceEvent> {
self.events.clone()
}
pub fn get_event(&self, idx: usize) -> Option<&TraceEvent> {
self.events.get(idx)
}
pub fn has_obj(&self, gvk: &GVK, ns_name: &str) -> bool {
self.index.contains(gvk, ns_name)
}
pub fn get_object(&self, event_idx: usize, obj_idx: usize) -> Option<&DynamicObject> {
let event = self.get_event(event_idx)?;
let applied_len = event.applied_objs.len();
if obj_idx >= applied_len {
event.deleted_objs.get(obj_idx - applied_len)
} else {
event.applied_objs.get(obj_idx)
}
}
pub fn iter(&self) -> TraceIterator<'_> {
TraceIterator::new(&self.events)
}
pub fn is_empty_at(&self, idx: usize) -> bool {
self.events
.get(idx)
.map(|evt| evt.applied_objs.is_empty() && evt.deleted_objs.is_empty())
.unwrap_or(true)
}
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
self.events.len()
}
pub fn prepend_event(&mut self, event: TraceEvent) {
let mut tmp = vec![event];
tmp.append(&mut self.events);
self.events = tmp;
}
pub fn start_ts(&self) -> Option<i64> {
self.events.first().map(|evt| evt.ts)
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
Ok(rmp_serde::to_vec_named(self)?)
}
}
#[derive(Clone)]
pub struct TraceIterator<'a> {
events: &'a Vec<TraceEvent>,
idx: usize,
}
impl<'a> TraceIterator<'a> {
pub(crate) fn new(events: &'a Vec<TraceEvent>) -> Self {
TraceIterator { events, idx: 0 }
}
}
impl<'a> Iterator for TraceIterator<'a> {
type Item = (&'a TraceEvent, Option<i64>);
fn next(&mut self) -> Option<Self::Item> {
if self.events.is_empty() {
return None;
}
let ret = match self.idx {
i if i < self.events.len() - 1 => Some((&self.events[i], Some(self.events[i + 1].ts))),
i if i == self.events.len() - 1 => Some((&self.events[i], None)),
_ => None,
};
self.idx += 1;
ret
}
}
#[cfg(test)]
impl ExportedTrace {}