Skip to main content

cbtop/incremental_snapshot/
snapshot.rs

1//! Profile and delta snapshot types.
2
3use std::collections::HashMap;
4
5use super::types::{DeltaMetric, MetricData, RetentionTier};
6
7/// A profile snapshot
8#[derive(Debug, Clone)]
9pub struct ProfileSnapshot {
10    /// Snapshot index
11    pub index: usize,
12    /// Timestamp when snapshot was taken
13    pub timestamp_ns: u64,
14    /// Workload fingerprint for identification
15    pub workload_fingerprint: String,
16    /// Metric data
17    pub metrics: HashMap<String, MetricData>,
18    /// Snapshot metadata
19    pub metadata: HashMap<String, String>,
20    /// CRC32 checksum
21    pub checksum: u32,
22}
23
24impl ProfileSnapshot {
25    /// Create a new snapshot
26    pub fn new(index: usize) -> Self {
27        Self {
28            index,
29            timestamp_ns: std::time::SystemTime::now()
30                .duration_since(std::time::UNIX_EPOCH)
31                .map(|d| d.as_nanos() as u64)
32                .unwrap_or(0),
33            workload_fingerprint: String::new(),
34            metrics: HashMap::new(),
35            metadata: HashMap::new(),
36            checksum: 0,
37        }
38    }
39
40    /// Add metric data
41    pub fn add_metric(&mut self, metric: MetricData) {
42        self.metrics.insert(metric.name.clone(), metric);
43    }
44
45    /// Get metric by name
46    pub fn get_metric(&self, name: &str) -> Option<&MetricData> {
47        self.metrics.get(name)
48    }
49
50    /// Set workload fingerprint
51    pub fn set_fingerprint(&mut self, fingerprint: impl Into<String>) {
52        self.workload_fingerprint = fingerprint.into();
53    }
54
55    /// Get total size in bytes
56    pub fn size_bytes(&self) -> usize {
57        let metrics_size: usize = self.metrics.values().map(|m| m.size_bytes()).sum();
58        let metadata_size: usize = self.metadata.iter().map(|(k, v)| k.len() + v.len()).sum();
59
60        24 + self.workload_fingerprint.len() + metrics_size + metadata_size
61    }
62
63    /// Compute checksum
64    pub fn compute_checksum(&mut self) {
65        // Simple CRC32-like checksum
66        let mut hash: u32 = 0;
67
68        hash = hash.wrapping_add(self.index as u32);
69        hash = hash.wrapping_mul(31);
70        hash = hash.wrapping_add((self.timestamp_ns & 0xFFFFFFFF) as u32);
71
72        // Sort keys for deterministic iteration
73        let mut keys: Vec<_> = self.metrics.keys().collect();
74        keys.sort();
75
76        for name in keys {
77            if let Some(metric) = self.metrics.get(name) {
78                for c in name.bytes() {
79                    hash = hash.wrapping_mul(31).wrapping_add(c as u32);
80                }
81                for &val in &metric.values {
82                    // Include both halves of f64 bits
83                    let bits = val.to_bits();
84                    hash = hash.wrapping_mul(31).wrapping_add((bits >> 32) as u32);
85                    hash = hash
86                        .wrapping_mul(31)
87                        .wrapping_add((bits & 0xFFFFFFFF) as u32);
88                }
89            }
90        }
91
92        self.checksum = hash;
93    }
94
95    /// Verify checksum
96    pub fn verify_checksum(&self) -> bool {
97        let mut copy = self.clone();
98        copy.compute_checksum();
99        copy.checksum == self.checksum
100    }
101}
102
103/// Delta-encoded snapshot
104#[derive(Debug, Clone)]
105pub struct DeltaSnapshot {
106    /// Snapshot index
107    pub index: usize,
108    /// Base snapshot index
109    pub base_index: usize,
110    /// Timestamp
111    pub timestamp_ns: u64,
112    /// Workload fingerprint
113    pub workload_fingerprint: String,
114    /// Delta metrics
115    pub deltas: HashMap<String, DeltaMetric>,
116    /// New metrics (not in base)
117    pub new_metrics: HashMap<String, MetricData>,
118    /// Removed metric names
119    pub removed_metrics: Vec<String>,
120    /// Checksum
121    pub checksum: u32,
122}
123
124impl DeltaSnapshot {
125    /// Create from two snapshots
126    pub fn from_diff(base: &ProfileSnapshot, current: &ProfileSnapshot) -> Self {
127        let mut deltas = HashMap::new();
128        let mut new_metrics = HashMap::new();
129        let mut removed_metrics = Vec::new();
130
131        // Find deltas and new metrics
132        for (name, metric) in &current.metrics {
133            if let Some(base_metric) = base.metrics.get(name) {
134                let delta = metric.delta_from(base_metric);
135                // Only store delta if it's smaller than full metric
136                if delta.size_bytes() < metric.size_bytes() {
137                    deltas.insert(name.clone(), delta);
138                } else {
139                    new_metrics.insert(name.clone(), metric.clone());
140                }
141            } else {
142                new_metrics.insert(name.clone(), metric.clone());
143            }
144        }
145
146        // Find removed metrics
147        for name in base.metrics.keys() {
148            if !current.metrics.contains_key(name) {
149                removed_metrics.push(name.clone());
150            }
151        }
152
153        Self {
154            index: current.index,
155            base_index: base.index,
156            timestamp_ns: current.timestamp_ns,
157            workload_fingerprint: current.workload_fingerprint.clone(),
158            deltas,
159            new_metrics,
160            removed_metrics,
161            checksum: 0,
162        }
163    }
164
165    /// Get compressed size
166    pub fn size_bytes(&self) -> usize {
167        let delta_size: usize = self.deltas.values().map(|d| d.size_bytes()).sum();
168        let new_size: usize = self.new_metrics.values().map(|m| m.size_bytes()).sum();
169        let removed_size: usize = self.removed_metrics.iter().map(|s| s.len()).sum();
170
171        24 + self.workload_fingerprint.len() + delta_size + new_size + removed_size
172    }
173
174    /// Apply delta to base snapshot
175    pub fn apply_to(&self, base: &ProfileSnapshot) -> ProfileSnapshot {
176        let mut result = ProfileSnapshot::new(self.index);
177        result.timestamp_ns = self.timestamp_ns;
178        result.workload_fingerprint = self.workload_fingerprint.clone();
179
180        // Copy base metrics and apply deltas
181        for (name, metric) in &base.metrics {
182            if self.removed_metrics.contains(name) {
183                continue;
184            }
185
186            if let Some(delta) = self.deltas.get(name) {
187                let reconstructed = metric.apply_delta(delta);
188                result.metrics.insert(name.clone(), reconstructed);
189            } else {
190                result.metrics.insert(name.clone(), metric.clone());
191            }
192        }
193
194        // Add new metrics
195        for (name, metric) in &self.new_metrics {
196            result.metrics.insert(name.clone(), metric.clone());
197        }
198
199        result.compute_checksum();
200        result
201    }
202}
203
204/// Index entry for fast lookup
205#[derive(Debug, Clone)]
206pub struct SnapshotIndex {
207    /// Snapshot index
208    pub index: usize,
209    /// Timestamp
210    pub timestamp_ns: u64,
211    /// Workload fingerprint
212    pub fingerprint: String,
213    /// Retention tier
214    pub tier: RetentionTier,
215    /// Offset in storage
216    pub offset: usize,
217    /// Size in bytes
218    pub size_bytes: usize,
219    /// Is delta encoded
220    pub is_delta: bool,
221    /// Base index (if delta)
222    pub base_index: Option<usize>,
223}