1use crate::types::Manifest;
2use crate::tombstone::TombstoneManager;
3use anyhow::{Context, Result};
4use std::collections::{HashMap, HashSet};
5use std::fs;
6use std::path::{Path, PathBuf};
7use std::time::{SystemTime, UNIX_EPOCH};
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct FileChange {
13 pub path: PathBuf,
14 pub change_type: ChangeType,
15 pub last_modified: u64,
16 pub file_size: u64,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub enum ChangeType {
21 Added,
22 Modified,
23 Deleted,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct DeltaManifest {
29 pub base_epoch: u64,
30 pub delta_epoch: u64,
31 pub changes: Vec<FileChange>,
32 pub created_at: u64,
33}
34
35impl DeltaManifest {
36 pub fn new(base_epoch: u64, delta_epoch: u64) -> Self {
37 let created_at = SystemTime::now()
38 .duration_since(UNIX_EPOCH)
39 .unwrap()
40 .as_secs();
41
42 Self {
43 base_epoch,
44 delta_epoch,
45 changes: Vec::new(),
46 created_at,
47 }
48 }
49
50 pub fn add_change(&mut self, change: FileChange) {
51 self.changes.push(change);
52 }
53
54 pub fn write_to_file(&self, path: &Path) -> Result<()> {
55 let json = serde_json::to_string_pretty(self)
56 .context("Failed to serialize delta manifest")?;
57 fs::write(path, json)
58 .context("Failed to write delta manifest")?;
59 Ok(())
60 }
61
62 pub fn read_from_file(path: &Path) -> Result<Self> {
63 let content = fs::read_to_string(path)
64 .context("Failed to read delta manifest")?;
65 let manifest = serde_json::from_str(&content)
66 .context("Failed to parse delta manifest")?;
67 Ok(manifest)
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FileMetadataCache {
74 pub files: HashMap<PathBuf, FileSnapshot>,
75 pub last_scan_time: u64,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct FileSnapshot {
80 pub last_modified: u64,
81 pub file_size: u64,
82 pub file_handle: Option<u32>,
83}
84
85impl FileMetadataCache {
86 pub fn new() -> Self {
87 let last_scan_time = SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .unwrap()
90 .as_secs();
91
92 Self {
93 files: HashMap::new(),
94 last_scan_time,
95 }
96 }
97
98 pub fn load_or_create(cache_path: &Path) -> Result<Self> {
99 if cache_path.exists() {
100 Self::read_from_file(cache_path)
101 } else {
102 Ok(Self::new())
103 }
104 }
105
106 pub fn read_from_file(path: &Path) -> Result<Self> {
107 let content = fs::read_to_string(path)
108 .context("Failed to read file metadata cache")?;
109 let cache = serde_json::from_str(&content)
110 .context("Failed to parse file metadata cache")?;
111 Ok(cache)
112 }
113
114 pub fn write_to_file(&self, path: &Path) -> Result<()> {
115 let json = serde_json::to_string_pretty(self)
116 .context("Failed to serialize file metadata cache")?;
117 fs::write(path, json)
118 .context("Failed to write file metadata cache")?;
119 Ok(())
120 }
121
122 pub fn update_scan_time(&mut self) {
123 self.last_scan_time = SystemTime::now()
124 .duration_since(UNIX_EPOCH)
125 .unwrap()
126 .as_secs();
127 }
128}
129
130pub struct IncrementalUpdater {
132 collection_path: PathBuf,
133 cache_path: PathBuf,
134}
135
136impl IncrementalUpdater {
137 pub fn new(collection_path: &Path) -> Self {
138 let cache_path = collection_path.join("index").join("file_cache.json");
139 Self {
140 collection_path: collection_path.to_path_buf(),
141 cache_path,
142 }
143 }
144
145 pub fn scan_for_changes(
147 &self,
148 source_path: &Path,
149 includes: &[String],
150 excludes: &[String],
151 ) -> Result<Vec<FileChange>> {
152 let mut cache = FileMetadataCache::load_or_create(&self.cache_path)?;
153 let mut changes = Vec::new();
154 let mut current_files = HashSet::new();
155
156 let walker = ignore::WalkBuilder::new(source_path)
158 .hidden(false)
159 .git_ignore(true)
160 .build();
161
162 let globset = self.build_globset(includes, excludes)?;
163
164 for entry in walker {
165 let entry = entry.context("Failed to read directory entry")?;
166 let path = entry.path();
167
168 if !path.is_file() {
169 continue;
170 }
171
172 let relative_path = path.strip_prefix(source_path)
173 .context("Failed to get relative path")?
174 .to_path_buf();
175
176 if !globset.is_match(&relative_path) {
178 continue;
179 }
180
181 current_files.insert(relative_path.clone());
182
183 let metadata = fs::metadata(path)
185 .context("Failed to read file metadata")?;
186
187 let last_modified = metadata.modified()
188 .context("Failed to get modification time")?
189 .duration_since(UNIX_EPOCH)
190 .context("Invalid modification time")?
191 .as_secs();
192
193 let file_size = metadata.len();
194
195 if let Some(snapshot) = cache.files.get(&relative_path) {
197 if snapshot.last_modified != last_modified || snapshot.file_size != file_size {
198 changes.push(FileChange {
199 path: relative_path.clone(),
200 change_type: ChangeType::Modified,
201 last_modified,
202 file_size,
203 });
204 }
205 } else {
206 changes.push(FileChange {
207 path: relative_path.clone(),
208 change_type: ChangeType::Added,
209 last_modified,
210 file_size,
211 });
212 }
213
214 cache.files.insert(relative_path, FileSnapshot {
216 last_modified,
217 file_size,
218 file_handle: None, });
220 }
221
222 let deleted_files: Vec<_> = cache.files.keys()
224 .filter(|path| !current_files.contains(*path))
225 .cloned()
226 .collect();
227
228 for deleted_path in deleted_files {
229 changes.push(FileChange {
230 path: deleted_path.clone(),
231 change_type: ChangeType::Deleted,
232 last_modified: 0,
233 file_size: 0,
234 });
235 cache.files.remove(&deleted_path);
236 }
237
238 cache.update_scan_time();
240 cache.write_to_file(&self.cache_path)?;
241
242 Ok(changes)
243 }
244
245 pub fn apply_changes(
247 &self,
248 changes: Vec<FileChange>,
249 source_path: &Path,
250 ) -> Result<DeltaManifest> {
251 let manifest_path = self.collection_path.join("MANIFEST.a");
253 let base_manifest = Manifest::read_from_file(&manifest_path)?;
254 let new_epoch = base_manifest.epoch + 1;
255
256 let mut delta_manifest = DeltaManifest::new(base_manifest.epoch, new_epoch);
257
258 for change in changes {
260 match change.change_type {
261 ChangeType::Added | ChangeType::Modified => {
262 self.import_single_file(source_path, &change.path)?;
264 },
265 ChangeType::Deleted => {
266 self.mark_file_as_tombstone(&change.path)?;
268 },
269 }
270 delta_manifest.add_change(change);
271 }
272
273 let delta_path = self.collection_path
275 .join("index")
276 .join(format!("delta-{:06}.json", new_epoch));
277 delta_manifest.write_to_file(&delta_path)?;
278
279 let mut new_manifest = base_manifest;
281 new_manifest.epoch = new_epoch;
282 new_manifest.write_to_file(&manifest_path)?;
283
284 Ok(delta_manifest)
285 }
286
287 fn build_globset(&self, includes: &[String], excludes: &[String]) -> Result<globset::GlobSet> {
288 let mut builder = globset::GlobSetBuilder::new();
289
290 if includes.is_empty() {
292 builder.add(globset::Glob::new("**/*")?);
293 } else {
294 for pattern in includes {
295 builder.add(globset::Glob::new(pattern)?);
296 }
297 }
298
299 for pattern in excludes {
301 builder.add(globset::Glob::new(pattern)?);
302 }
303
304 Ok(builder.build()?)
305 }
306
307 fn import_single_file(&self, source_path: &Path, relative_path: &Path) -> Result<()> {
308 use crate::ingest::{Ingester, IngestOptions};
309 use crate::index::{PathIndex, HandlesMap};
310
311 let full_path = source_path.join(relative_path);
312 if !full_path.exists() {
313 return Ok(()); }
315
316 let mut path_index = PathIndex::read_from_file(&self.collection_path.join("index/path.json"))?;
318 let mut handles_map = HandlesMap::read_from_file(&self.collection_path.join("index/handles.json"))?;
319
320 let mut options = IngestOptions::default();
322 options.include_patterns = vec!["**/*".to_string()];
323 let mut ingester = Ingester::new(self.collection_path.clone(), options);
324
325 println!("Importing file: {}", relative_path.display());
328
329 Ok(())
332 }
333
334 fn mark_file_as_tombstone(&self, relative_path: &Path) -> Result<()> {
335 use crate::index::{PathIndex, HandlesMap};
336
337 let tombstone_manager = TombstoneManager::new(&self.collection_path);
338
339 let manifest_path = self.collection_path.join("MANIFEST.a");
341 let manifest = Manifest::read_from_file(&manifest_path)?;
342
343 let path_index = PathIndex::read_from_file(&self.collection_path.join("index/path.json"))?;
345 let handles_map = HandlesMap::read_from_file(&self.collection_path.join("index/handles.json"))?;
346
347 let relative_path_str = relative_path.to_string_lossy().to_string();
349 let file_handle = if let Some(&handle) = path_index.paths.get(&relative_path_str) {
350 handle
351 } else {
352 anyhow::bail!("File not found in path index: {}", relative_path_str);
353 };
354
355 let (segment_id, segment_offset) = if let Some(metadata) = handles_map.get_metadata(file_handle) {
357 (metadata.seg_id, metadata.offset)
358 } else {
359 anyhow::bail!("Handle metadata not found for file: {}", relative_path_str);
360 };
361
362 tombstone_manager.mark_file_deleted(
363 file_handle as u32,
364 relative_path.to_path_buf(),
365 manifest.epoch,
366 segment_id,
367 segment_offset,
368 )?;
369
370 println!("Marked as tombstone: {}", relative_path.display());
371 Ok(())
372 }
373}