Skip to main content

rvf_runtime/
cow_compact.rs

1//! COW-aware compaction engine.
2//!
3//! Two compaction modes:
4//! - **Read optimize**: rewrite hot clusters contiguously for sequential I/O.
5//! - **Space reclaim**: if `hash(local) == hash(parent)`, replace LocalOffset
6//!   with ParentRef to reclaim local storage.
7//!
8//! Segment preservation: unknown segments are copied forward unless
9//! `strip_unknown` is set.
10
11use std::collections::HashMap;
12
13use rvf_types::cow_map::CowMapEntry;
14use rvf_types::RvfError;
15
16use crate::cow_map::CowMap;
17use crate::store::simple_shake256_256;
18
19/// Result of a COW compaction operation.
20pub struct CompactionResult {
21    /// Number of clusters rewritten or reclaimed.
22    pub clusters_affected: u32,
23    /// Bytes reclaimed (for space_reclaim mode).
24    pub bytes_reclaimed: u64,
25    /// Number of clusters that matched parent and were converted to ParentRef.
26    pub clusters_deduplicated: u32,
27}
28
29/// Refcount data for shared clusters.
30pub struct RefcountData {
31    /// Map from cluster_id to reference count.
32    pub refcounts: HashMap<u32, u32>,
33}
34
35/// COW-aware compaction engine.
36pub struct CowCompactor {
37    /// Whether to strip unknown segment types during compaction.
38    pub strip_unknown: bool,
39}
40
41impl Default for CowCompactor {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl CowCompactor {
48    /// Create a new compactor with default settings.
49    pub fn new() -> Self {
50        Self {
51            strip_unknown: false,
52        }
53    }
54
55    /// Read-optimize compaction: reorder local clusters for sequential read.
56    ///
57    /// Scans the COW map, reads all LocalOffset clusters, and rewrites them
58    /// contiguously in cluster_id order. Updates the map entries to point to
59    /// the new contiguous offsets.
60    pub fn compact_read_optimize(
61        cow_map: &mut CowMap,
62        local_data: &HashMap<u32, Vec<u8>>,
63        cluster_size: u32,
64    ) -> Result<CompactionResult, RvfError> {
65        let mut clusters_affected = 0u32;
66        let mut new_data: Vec<(u32, Vec<u8>)> = Vec::new();
67
68        // Collect all local clusters in order
69        for cluster_id in 0..cow_map.cluster_count() {
70            if let CowMapEntry::LocalOffset(_) = cow_map.lookup(cluster_id) {
71                if let Some(data) = local_data.get(&cluster_id) {
72                    new_data.push((cluster_id, data.clone()));
73                    clusters_affected += 1;
74                }
75            }
76        }
77
78        // Assign new sequential offsets (these would be written to file)
79        let mut offset = 0u64;
80        for (cluster_id, _data) in &new_data {
81            cow_map.update(*cluster_id, CowMapEntry::LocalOffset(offset));
82            offset += cluster_size as u64;
83        }
84
85        Ok(CompactionResult {
86            clusters_affected,
87            bytes_reclaimed: 0,
88            clusters_deduplicated: 0,
89        })
90    }
91
92    /// Space-reclaim compaction: if local cluster data matches parent data,
93    /// replace LocalOffset with ParentRef to reclaim space.
94    pub fn compact_space_reclaim(
95        cow_map: &mut CowMap,
96        local_data: &HashMap<u32, Vec<u8>>,
97        parent_data: &HashMap<u32, Vec<u8>>,
98        cluster_size: u32,
99    ) -> Result<CompactionResult, RvfError> {
100        let mut clusters_deduplicated = 0u32;
101        let mut bytes_reclaimed = 0u64;
102
103        for cluster_id in 0..cow_map.cluster_count() {
104            if let CowMapEntry::LocalOffset(_) = cow_map.lookup(cluster_id) {
105                let local = match local_data.get(&cluster_id) {
106                    Some(d) => d,
107                    None => continue,
108                };
109                let parent = match parent_data.get(&cluster_id) {
110                    Some(d) => d,
111                    None => continue,
112                };
113
114                let local_hash = simple_shake256_256(local);
115                let parent_hash = simple_shake256_256(parent);
116
117                if local_hash == parent_hash {
118                    cow_map.update(cluster_id, CowMapEntry::ParentRef);
119                    clusters_deduplicated += 1;
120                    bytes_reclaimed += cluster_size as u64;
121                }
122            }
123        }
124
125        Ok(CompactionResult {
126            clusters_affected: clusters_deduplicated,
127            bytes_reclaimed,
128            clusters_deduplicated,
129        })
130    }
131
132    /// Rebuild reference counts from the COW map.
133    ///
134    /// Each LocalOffset cluster has refcount 1.
135    /// ParentRef clusters increment the parent's refcount.
136    pub fn rebuild_refcounts(cow_map: &CowMap) -> RefcountData {
137        let mut refcounts = HashMap::new();
138
139        for cluster_id in 0..cow_map.cluster_count() {
140            match cow_map.lookup(cluster_id) {
141                CowMapEntry::LocalOffset(_) => {
142                    *refcounts.entry(cluster_id).or_insert(0) += 1;
143                }
144                CowMapEntry::ParentRef => {
145                    // Parent cluster is referenced
146                    *refcounts.entry(cluster_id).or_insert(0) += 1;
147                }
148                CowMapEntry::Unallocated => {}
149            }
150        }
151
152        RefcountData { refcounts }
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[test]
161    fn read_optimize_reorders_clusters() {
162        let mut map = CowMap::new_flat(4);
163        map.update(0, CowMapEntry::LocalOffset(0x1000));
164        map.update(2, CowMapEntry::LocalOffset(0x3000));
165        // 1 and 3 are unallocated
166
167        let mut local_data = HashMap::new();
168        local_data.insert(0, vec![0xAA; 256]);
169        local_data.insert(2, vec![0xBB; 256]);
170
171        let result =
172            CowCompactor::compact_read_optimize(&mut map, &local_data, 256).unwrap();
173
174        assert_eq!(result.clusters_affected, 2);
175
176        // Clusters should now have sequential offsets
177        assert_eq!(map.lookup(0), CowMapEntry::LocalOffset(0));
178        assert_eq!(map.lookup(2), CowMapEntry::LocalOffset(256));
179    }
180
181    #[test]
182    fn space_reclaim_deduplicates() {
183        let mut map = CowMap::new_flat(3);
184        let shared_data = vec![0xAA; 128];
185        let different_data = vec![0xBB; 128];
186
187        map.update(0, CowMapEntry::LocalOffset(0x100));
188        map.update(1, CowMapEntry::LocalOffset(0x200));
189        map.update(2, CowMapEntry::ParentRef);
190
191        let mut local_data = HashMap::new();
192        local_data.insert(0, shared_data.clone()); // same as parent
193        local_data.insert(1, different_data);       // different from parent
194
195        let mut parent_data = HashMap::new();
196        parent_data.insert(0, shared_data); // matches local
197        parent_data.insert(1, vec![0xCC; 128]); // does not match local
198
199        let result =
200            CowCompactor::compact_space_reclaim(&mut map, &local_data, &parent_data, 128)
201                .unwrap();
202
203        assert_eq!(result.clusters_deduplicated, 1);
204        assert_eq!(result.bytes_reclaimed, 128);
205
206        // Cluster 0 should be ParentRef now (deduplicated)
207        assert_eq!(map.lookup(0), CowMapEntry::ParentRef);
208        // Cluster 1 should remain local (different data)
209        assert_eq!(map.lookup(1), CowMapEntry::LocalOffset(0x200));
210    }
211
212    #[test]
213    fn rebuild_refcounts() {
214        let mut map = CowMap::new_flat(4);
215        map.update(0, CowMapEntry::LocalOffset(0x100));
216        map.update(1, CowMapEntry::ParentRef);
217        map.update(2, CowMapEntry::LocalOffset(0x200));
218        // 3 is unallocated
219
220        let refcounts = CowCompactor::rebuild_refcounts(&map);
221
222        assert_eq!(refcounts.refcounts.get(&0), Some(&1));
223        assert_eq!(refcounts.refcounts.get(&1), Some(&1));
224        assert_eq!(refcounts.refcounts.get(&2), Some(&1));
225        assert_eq!(refcounts.refcounts.get(&3), None);
226    }
227}