Skip to main content

cbtop/incremental_snapshot/
store.rs

1//! Incremental snapshot store with delta compression and retention policies.
2
3use std::collections::HashMap;
4use std::time::Instant;
5
6use super::query::{SnapshotConfig, SnapshotQuery};
7use super::snapshot::{DeltaSnapshot, ProfileSnapshot, SnapshotIndex};
8use super::types::{RetentionTier, SnapshotError, SnapshotResult};
9
10/// Incremental snapshot store
11#[derive(Debug)]
12pub struct IncrementalSnapshotStore {
13    /// Configuration
14    config: SnapshotConfig,
15    /// Snapshot index
16    index: Vec<SnapshotIndex>,
17    /// Keyframe snapshots (full snapshots)
18    pub(super) keyframes: HashMap<usize, ProfileSnapshot>,
19    /// Delta snapshots
20    pub(super) deltas: HashMap<usize, DeltaSnapshot>,
21    /// Total raw size (uncompressed)
22    total_raw_size: usize,
23    /// Total compressed size
24    total_compressed_size: usize,
25    /// Next snapshot index
26    next_index: usize,
27}
28
29impl IncrementalSnapshotStore {
30    /// Create a new snapshot store
31    pub fn new(config: SnapshotConfig) -> Self {
32        Self {
33            config,
34            index: Vec::new(),
35            keyframes: HashMap::new(),
36            deltas: HashMap::new(),
37            total_raw_size: 0,
38            total_compressed_size: 0,
39            next_index: 0,
40        }
41    }
42
43    /// Append a new snapshot
44    pub fn append(&mut self, mut snapshot: ProfileSnapshot) -> SnapshotResult<usize> {
45        snapshot.index = self.next_index;
46        snapshot.compute_checksum();
47
48        let raw_size = snapshot.size_bytes();
49        self.total_raw_size += raw_size;
50
51        // Decide if this should be a keyframe
52        let is_keyframe = self
53            .next_index
54            .is_multiple_of(self.config.keyframe_interval)
55            || self.keyframes.is_empty();
56
57        let (compressed_size, is_delta, base_index) = if is_keyframe {
58            // Store as keyframe
59            let size = snapshot.size_bytes();
60            self.keyframes.insert(self.next_index, snapshot.clone());
61            (size, false, None)
62        } else {
63            // Find nearest keyframe
64            let keyframe_idx =
65                (self.next_index / self.config.keyframe_interval) * self.config.keyframe_interval;
66
67            if let Some(base) = self.keyframes.get(&keyframe_idx) {
68                // Create delta from keyframe
69                let delta = DeltaSnapshot::from_diff(base, &snapshot);
70                let size = delta.size_bytes();
71                self.deltas.insert(self.next_index, delta);
72                (size, true, Some(keyframe_idx))
73            } else {
74                // No keyframe found, store as keyframe
75                let size = snapshot.size_bytes();
76                self.keyframes.insert(self.next_index, snapshot.clone());
77                (size, false, None)
78            }
79        };
80
81        self.total_compressed_size += compressed_size;
82
83        // Add index entry
84        self.index.push(SnapshotIndex {
85            index: self.next_index,
86            timestamp_ns: snapshot.timestamp_ns,
87            fingerprint: snapshot.workload_fingerprint.clone(),
88            tier: RetentionTier::Raw,
89            offset: 0, // In-memory, no offset
90            size_bytes: compressed_size,
91            is_delta,
92            base_index,
93        });
94
95        let idx = self.next_index;
96        self.next_index += 1;
97
98        Ok(idx)
99    }
100
101    /// Get snapshot by index
102    pub fn get(&self, index: usize) -> SnapshotResult<ProfileSnapshot> {
103        if index >= self.next_index {
104            return Err(SnapshotError::IndexOutOfBounds {
105                index,
106                max: self.next_index.saturating_sub(1),
107            });
108        }
109
110        // Check if it's a keyframe
111        if let Some(snapshot) = self.keyframes.get(&index) {
112            return Ok(snapshot.clone());
113        }
114
115        // It's a delta, need to reconstruct
116        if let Some(delta) = self.deltas.get(&index) {
117            if let Some(base) = self.keyframes.get(&delta.base_index) {
118                let reconstructed = delta.apply_to(base);
119
120                if self.config.verify_checksums && !reconstructed.verify_checksum() {
121                    return Err(SnapshotError::Corrupt {
122                        reason: "Checksum verification failed".to_string(),
123                    });
124                }
125
126                return Ok(reconstructed);
127            }
128        }
129
130        Err(SnapshotError::NotFound { index })
131    }
132
133    /// Query snapshots
134    pub fn query(&self, query: &SnapshotQuery) -> SnapshotResult<Vec<ProfileSnapshot>> {
135        let mut results = Vec::new();
136        let mut memory_used = 0;
137
138        for idx_entry in &self.index {
139            if !query.matches(idx_entry) {
140                continue;
141            }
142
143            // Check memory limit
144            if memory_used + idx_entry.size_bytes > self.config.max_query_memory_bytes {
145                break;
146            }
147
148            let snapshot = self.get(idx_entry.index)?;
149
150            // Check metric filter
151            if let Some(ref metric_name) = query.metric_name {
152                if !snapshot.metrics.contains_key(metric_name) {
153                    continue;
154                }
155            }
156
157            memory_used += idx_entry.size_bytes;
158            results.push(snapshot);
159
160            // Check limit
161            if let Some(limit) = query.limit {
162                if results.len() >= limit {
163                    break;
164                }
165            }
166        }
167
168        Ok(results)
169    }
170
171    /// Get compression ratio
172    pub fn compression_ratio(&self) -> f64 {
173        if self.total_raw_size == 0 {
174            1.0
175        } else {
176            self.total_compressed_size as f64 / self.total_raw_size as f64
177        }
178    }
179
180    /// Get snapshot count
181    pub fn count(&self) -> usize {
182        self.next_index
183    }
184
185    /// Get total raw size
186    pub fn total_raw_size(&self) -> usize {
187        self.total_raw_size
188    }
189
190    /// Get total compressed size
191    pub fn total_compressed_size(&self) -> usize {
192        self.total_compressed_size
193    }
194
195    /// Clean up old snapshots based on retention policy
196    pub fn cleanup(&mut self, _now: Instant, reference_time: u64) {
197        let _raw_cutoff = reference_time.saturating_sub(self.config.raw_max_age.as_nanos() as u64);
198        let compressed_cutoff =
199            reference_time.saturating_sub(self.config.compressed_max_age.as_nanos() as u64);
200
201        // Update tiers and remove expired
202        self.index.retain(|idx| {
203            if idx.timestamp_ns < compressed_cutoff {
204                // Remove from storage
205                self.keyframes.remove(&idx.index);
206                self.deltas.remove(&idx.index);
207                return false;
208            }
209            true
210        });
211    }
212
213    /// Get configuration
214    pub fn config(&self) -> &SnapshotConfig {
215        &self.config
216    }
217
218    /// Get index entries
219    pub fn index(&self) -> &[SnapshotIndex] {
220        &self.index
221    }
222}