cbtop/incremental_snapshot/
store.rs1use std::collections::HashMap;
4use std::time::Instant;
5
6use super::query::{SnapshotConfig, SnapshotQuery};
7use super::snapshot::{DeltaSnapshot, ProfileSnapshot, SnapshotIndex};
8use super::types::{RetentionTier, SnapshotError, SnapshotResult};
9
10#[derive(Debug)]
12pub struct IncrementalSnapshotStore {
13 config: SnapshotConfig,
15 index: Vec<SnapshotIndex>,
17 pub(super) keyframes: HashMap<usize, ProfileSnapshot>,
19 pub(super) deltas: HashMap<usize, DeltaSnapshot>,
21 total_raw_size: usize,
23 total_compressed_size: usize,
25 next_index: usize,
27}
28
29impl IncrementalSnapshotStore {
30 pub fn new(config: SnapshotConfig) -> Self {
32 Self {
33 config,
34 index: Vec::new(),
35 keyframes: HashMap::new(),
36 deltas: HashMap::new(),
37 total_raw_size: 0,
38 total_compressed_size: 0,
39 next_index: 0,
40 }
41 }
42
43 pub fn append(&mut self, mut snapshot: ProfileSnapshot) -> SnapshotResult<usize> {
45 snapshot.index = self.next_index;
46 snapshot.compute_checksum();
47
48 let raw_size = snapshot.size_bytes();
49 self.total_raw_size += raw_size;
50
51 let is_keyframe = self
53 .next_index
54 .is_multiple_of(self.config.keyframe_interval)
55 || self.keyframes.is_empty();
56
57 let (compressed_size, is_delta, base_index) = if is_keyframe {
58 let size = snapshot.size_bytes();
60 self.keyframes.insert(self.next_index, snapshot.clone());
61 (size, false, None)
62 } else {
63 let keyframe_idx =
65 (self.next_index / self.config.keyframe_interval) * self.config.keyframe_interval;
66
67 if let Some(base) = self.keyframes.get(&keyframe_idx) {
68 let delta = DeltaSnapshot::from_diff(base, &snapshot);
70 let size = delta.size_bytes();
71 self.deltas.insert(self.next_index, delta);
72 (size, true, Some(keyframe_idx))
73 } else {
74 let size = snapshot.size_bytes();
76 self.keyframes.insert(self.next_index, snapshot.clone());
77 (size, false, None)
78 }
79 };
80
81 self.total_compressed_size += compressed_size;
82
83 self.index.push(SnapshotIndex {
85 index: self.next_index,
86 timestamp_ns: snapshot.timestamp_ns,
87 fingerprint: snapshot.workload_fingerprint.clone(),
88 tier: RetentionTier::Raw,
89 offset: 0, size_bytes: compressed_size,
91 is_delta,
92 base_index,
93 });
94
95 let idx = self.next_index;
96 self.next_index += 1;
97
98 Ok(idx)
99 }
100
101 pub fn get(&self, index: usize) -> SnapshotResult<ProfileSnapshot> {
103 if index >= self.next_index {
104 return Err(SnapshotError::IndexOutOfBounds {
105 index,
106 max: self.next_index.saturating_sub(1),
107 });
108 }
109
110 if let Some(snapshot) = self.keyframes.get(&index) {
112 return Ok(snapshot.clone());
113 }
114
115 if let Some(delta) = self.deltas.get(&index) {
117 if let Some(base) = self.keyframes.get(&delta.base_index) {
118 let reconstructed = delta.apply_to(base);
119
120 if self.config.verify_checksums && !reconstructed.verify_checksum() {
121 return Err(SnapshotError::Corrupt {
122 reason: "Checksum verification failed".to_string(),
123 });
124 }
125
126 return Ok(reconstructed);
127 }
128 }
129
130 Err(SnapshotError::NotFound { index })
131 }
132
133 pub fn query(&self, query: &SnapshotQuery) -> SnapshotResult<Vec<ProfileSnapshot>> {
135 let mut results = Vec::new();
136 let mut memory_used = 0;
137
138 for idx_entry in &self.index {
139 if !query.matches(idx_entry) {
140 continue;
141 }
142
143 if memory_used + idx_entry.size_bytes > self.config.max_query_memory_bytes {
145 break;
146 }
147
148 let snapshot = self.get(idx_entry.index)?;
149
150 if let Some(ref metric_name) = query.metric_name {
152 if !snapshot.metrics.contains_key(metric_name) {
153 continue;
154 }
155 }
156
157 memory_used += idx_entry.size_bytes;
158 results.push(snapshot);
159
160 if let Some(limit) = query.limit {
162 if results.len() >= limit {
163 break;
164 }
165 }
166 }
167
168 Ok(results)
169 }
170
171 pub fn compression_ratio(&self) -> f64 {
173 if self.total_raw_size == 0 {
174 1.0
175 } else {
176 self.total_compressed_size as f64 / self.total_raw_size as f64
177 }
178 }
179
180 pub fn count(&self) -> usize {
182 self.next_index
183 }
184
185 pub fn total_raw_size(&self) -> usize {
187 self.total_raw_size
188 }
189
190 pub fn total_compressed_size(&self) -> usize {
192 self.total_compressed_size
193 }
194
195 pub fn cleanup(&mut self, _now: Instant, reference_time: u64) {
197 let _raw_cutoff = reference_time.saturating_sub(self.config.raw_max_age.as_nanos() as u64);
198 let compressed_cutoff =
199 reference_time.saturating_sub(self.config.compressed_max_age.as_nanos() as u64);
200
201 self.index.retain(|idx| {
203 if idx.timestamp_ns < compressed_cutoff {
204 self.keyframes.remove(&idx.index);
206 self.deltas.remove(&idx.index);
207 return false;
208 }
209 true
210 });
211 }
212
213 pub fn config(&self) -> &SnapshotConfig {
215 &self.config
216 }
217
218 pub fn index(&self) -> &[SnapshotIndex] {
220 &self.index
221 }
222}