1use crate::types::HandleMetadata;
2use anyhow::{Context, Result};
3use std::collections::{HashMap, HashSet};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct Tombstone {
12 pub file_handle: u32,
13 pub path: PathBuf,
14 pub deleted_at: u64,
15 pub deleted_epoch: u64,
16 pub segment_id: u32,
18 pub segment_offset: u64,
19}
20
21impl Tombstone {
22 pub fn new(
23 file_handle: u32,
24 path: PathBuf,
25 deleted_epoch: u64,
26 segment_id: u32,
27 segment_offset: u64,
28 ) -> Self {
29 let deleted_at = SystemTime::now()
30 .duration_since(UNIX_EPOCH)
31 .unwrap()
32 .as_secs();
33
34 Self {
35 file_handle,
36 path,
37 deleted_at,
38 deleted_epoch,
39 segment_id,
40 segment_offset,
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct TombstoneRegistry {
48 pub tombstones: HashMap<u32, Tombstone>, pub path_to_handle: HashMap<PathBuf, u32>, pub last_compaction_epoch: u64,
51}
52
53impl TombstoneRegistry {
54 pub fn new() -> Self {
55 Self {
56 tombstones: HashMap::new(),
57 path_to_handle: HashMap::new(),
58 last_compaction_epoch: 0,
59 }
60 }
61
62 pub fn load_or_create(registry_path: &Path) -> Result<Self> {
64 if registry_path.exists() {
65 Self::read_from_file(registry_path)
66 } else {
67 Ok(Self::new())
68 }
69 }
70
71 pub fn read_from_file(path: &Path) -> Result<Self> {
72 let content = fs::read_to_string(path)
73 .context("Failed to read tombstone registry")?;
74 let registry = serde_json::from_str(&content)
75 .context("Failed to parse tombstone registry")?;
76 Ok(registry)
77 }
78
79 pub fn write_to_file(&self, path: &Path) -> Result<()> {
80 let json = serde_json::to_string_pretty(self)
81 .context("Failed to serialize tombstone registry")?;
82 fs::write(path, json)
83 .context("Failed to write tombstone registry")?;
84 Ok(())
85 }
86
87 pub fn add_tombstone(&mut self, tombstone: Tombstone) {
89 let file_handle = tombstone.file_handle;
90 let path = tombstone.path.clone();
91
92 self.tombstones.insert(file_handle, tombstone);
93 self.path_to_handle.insert(path, file_handle);
94 }
95
96 pub fn is_tombstoned(&self, file_handle: u32) -> bool {
98 self.tombstones.contains_key(&file_handle)
99 }
100
101 pub fn is_path_tombstoned(&self, path: &Path) -> bool {
103 self.path_to_handle.contains_key(path)
104 }
105
106 pub fn get_tombstone(&self, file_handle: u32) -> Option<&Tombstone> {
108 self.tombstones.get(&file_handle)
109 }
110
111 pub fn get_tombstones_since_epoch(&self, epoch: u64) -> Vec<&Tombstone> {
113 self.tombstones
114 .values()
115 .filter(|t| t.deleted_epoch >= epoch)
116 .collect()
117 }
118
119 pub fn compact_tombstones_before_epoch(&mut self, epoch: u64) -> Vec<Tombstone> {
121 let to_remove: Vec<u32> = self.tombstones
122 .iter()
123 .filter(|(_, t)| t.deleted_epoch < epoch)
124 .map(|(handle, _)| *handle)
125 .collect();
126
127 let mut removed = Vec::new();
128 for handle in to_remove {
129 if let Some(tombstone) = self.tombstones.remove(&handle) {
130 self.path_to_handle.remove(&tombstone.path);
131 removed.push(tombstone);
132 }
133 }
134
135 self.last_compaction_epoch = epoch;
136 removed
137 }
138
139 pub fn stats(&self) -> TombstoneStats {
141 let total_count = self.tombstones.len();
142 let oldest_epoch = self.tombstones
143 .values()
144 .map(|t| t.deleted_epoch)
145 .min()
146 .unwrap_or(0);
147 let newest_epoch = self.tombstones
148 .values()
149 .map(|t| t.deleted_epoch)
150 .max()
151 .unwrap_or(0);
152
153 TombstoneStats {
154 total_count,
155 oldest_epoch,
156 newest_epoch,
157 last_compaction_epoch: self.last_compaction_epoch,
158 }
159 }
160}
161
162#[derive(Debug)]
163pub struct TombstoneStats {
164 pub total_count: usize,
165 pub oldest_epoch: u64,
166 pub newest_epoch: u64,
167 pub last_compaction_epoch: u64,
168}
169
170pub struct TombstoneManager {
172 collection_path: PathBuf,
173 registry_path: PathBuf,
174}
175
176impl TombstoneManager {
177 pub fn new(collection_path: &Path) -> Self {
178 let registry_path = collection_path.join("index").join("tombstones.json");
179 Self {
180 collection_path: collection_path.to_path_buf(),
181 registry_path,
182 }
183 }
184
185 pub fn load_registry(&self) -> Result<TombstoneRegistry> {
187 TombstoneRegistry::load_or_create(&self.registry_path)
188 }
189
190 pub fn save_registry(&self, registry: &TombstoneRegistry) -> Result<()> {
192 if let Some(parent) = self.registry_path.parent() {
194 fs::create_dir_all(parent)?;
195 }
196 registry.write_to_file(&self.registry_path)
197 }
198
199 pub fn mark_file_deleted(
201 &self,
202 file_handle: u32,
203 path: PathBuf,
204 current_epoch: u64,
205 segment_id: u32,
206 segment_offset: u64,
207 ) -> Result<()> {
208 let mut registry = self.load_registry()?;
209
210 let tombstone = Tombstone::new(
211 file_handle,
212 path,
213 current_epoch,
214 segment_id,
215 segment_offset,
216 );
217
218 registry.add_tombstone(tombstone);
219 self.save_registry(®istry)?;
220
221 Ok(())
222 }
223
224 pub fn filter_live_handles(&self, handles: Vec<u32>) -> Result<Vec<u32>> {
226 let registry = self.load_registry()?;
227
228 let live_handles = handles
229 .into_iter()
230 .filter(|h| !registry.is_tombstoned(*h))
231 .collect();
232
233 Ok(live_handles)
234 }
235
236 pub fn get_compaction_candidates(&self, before_epoch: u64) -> Result<Vec<Tombstone>> {
238 let registry = self.load_registry()?;
239 let candidates = registry.get_tombstones_since_epoch(0)
240 .into_iter()
241 .filter(|t| t.deleted_epoch < before_epoch)
242 .cloned()
243 .collect();
244
245 Ok(candidates)
246 }
247
248 pub fn compact_tombstones(&self, before_epoch: u64) -> Result<usize> {
250 let mut registry = self.load_registry()?;
251 let removed = registry.compact_tombstones_before_epoch(before_epoch);
252 let count = removed.len();
253
254 self.save_registry(®istry)?;
255
256 Ok(count)
257 }
258
259 pub fn get_stats(&self) -> Result<TombstoneStats> {
261 let registry = self.load_registry()?;
262 Ok(registry.stats())
263 }
264}