siftdb_core/
incremental.rs

1use crate::types::Manifest;
2use crate::tombstone::TombstoneManager;
3use anyhow::{Context, Result};
4use std::collections::{HashMap, HashSet};
5use std::fs;
6use std::path::{Path, PathBuf};
7use std::time::{SystemTime, UNIX_EPOCH};
8use serde::{Deserialize, Serialize};
9
10/// File change tracking for incremental updates
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct FileChange {
13    pub path: PathBuf,
14    pub change_type: ChangeType,
15    pub last_modified: u64,
16    pub file_size: u64,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub enum ChangeType {
21    Added,
22    Modified,
23    Deleted,
24}
25
26/// Delta manifest tracks incremental changes since last full import
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct DeltaManifest {
29    pub base_epoch: u64,
30    pub delta_epoch: u64,
31    pub changes: Vec<FileChange>,
32    pub created_at: u64,
33}
34
35impl DeltaManifest {
36    pub fn new(base_epoch: u64, delta_epoch: u64) -> Self {
37        let created_at = SystemTime::now()
38            .duration_since(UNIX_EPOCH)
39            .unwrap()
40            .as_secs();
41        
42        Self {
43            base_epoch,
44            delta_epoch,
45            changes: Vec::new(),
46            created_at,
47        }
48    }
49
50    pub fn add_change(&mut self, change: FileChange) {
51        self.changes.push(change);
52    }
53
54    pub fn write_to_file(&self, path: &Path) -> Result<()> {
55        let json = serde_json::to_string_pretty(self)
56            .context("Failed to serialize delta manifest")?;
57        fs::write(path, json)
58            .context("Failed to write delta manifest")?;
59        Ok(())
60    }
61
62    pub fn read_from_file(path: &Path) -> Result<Self> {
63        let content = fs::read_to_string(path)
64            .context("Failed to read delta manifest")?;
65        let manifest = serde_json::from_str(&content)
66            .context("Failed to parse delta manifest")?;
67        Ok(manifest)
68    }
69}
70
71/// Tracks file metadata for change detection
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FileMetadataCache {
74    pub files: HashMap<PathBuf, FileSnapshot>,
75    pub last_scan_time: u64,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct FileSnapshot {
80    pub last_modified: u64,
81    pub file_size: u64,
82    pub file_handle: Option<u32>,
83}
84
85impl FileMetadataCache {
86    pub fn new() -> Self {
87        let last_scan_time = SystemTime::now()
88            .duration_since(UNIX_EPOCH)
89            .unwrap()
90            .as_secs();
91        
92        Self {
93            files: HashMap::new(),
94            last_scan_time,
95        }
96    }
97
98    pub fn load_or_create(cache_path: &Path) -> Result<Self> {
99        if cache_path.exists() {
100            Self::read_from_file(cache_path)
101        } else {
102            Ok(Self::new())
103        }
104    }
105
106    pub fn read_from_file(path: &Path) -> Result<Self> {
107        let content = fs::read_to_string(path)
108            .context("Failed to read file metadata cache")?;
109        let cache = serde_json::from_str(&content)
110            .context("Failed to parse file metadata cache")?;
111        Ok(cache)
112    }
113
114    pub fn write_to_file(&self, path: &Path) -> Result<()> {
115        let json = serde_json::to_string_pretty(self)
116            .context("Failed to serialize file metadata cache")?;
117        fs::write(path, json)
118            .context("Failed to write file metadata cache")?;
119        Ok(())
120    }
121
122    pub fn update_scan_time(&mut self) {
123        self.last_scan_time = SystemTime::now()
124            .duration_since(UNIX_EPOCH)
125            .unwrap()
126            .as_secs();
127    }
128}
129
130/// Incremental update manager
131pub struct IncrementalUpdater {
132    collection_path: PathBuf,
133    cache_path: PathBuf,
134}
135
136impl IncrementalUpdater {
137    pub fn new(collection_path: &Path) -> Self {
138        let cache_path = collection_path.join("index").join("file_cache.json");
139        Self {
140            collection_path: collection_path.to_path_buf(),
141            cache_path,
142        }
143    }
144
145    /// Scan directory and detect changes since last import
146    pub fn scan_for_changes(
147        &self,
148        source_path: &Path,
149        includes: &[String],
150        excludes: &[String],
151    ) -> Result<Vec<FileChange>> {
152        let mut cache = FileMetadataCache::load_or_create(&self.cache_path)?;
153        let mut changes = Vec::new();
154        let mut current_files = HashSet::new();
155
156        // Scan current filesystem state
157        let walker = ignore::WalkBuilder::new(source_path)
158            .hidden(false)
159            .git_ignore(true)
160            .build();
161
162        let globset = self.build_globset(includes, excludes)?;
163
164        for entry in walker {
165            let entry = entry.context("Failed to read directory entry")?;
166            let path = entry.path();
167
168            if !path.is_file() {
169                continue;
170            }
171
172            let relative_path = path.strip_prefix(source_path)
173                .context("Failed to get relative path")?
174                .to_path_buf();
175
176            // Check if file matches include/exclude patterns
177            if !globset.is_match(&relative_path) {
178                continue;
179            }
180
181            current_files.insert(relative_path.clone());
182
183            // Get file metadata
184            let metadata = fs::metadata(path)
185                .context("Failed to read file metadata")?;
186            
187            let last_modified = metadata.modified()
188                .context("Failed to get modification time")?
189                .duration_since(UNIX_EPOCH)
190                .context("Invalid modification time")?
191                .as_secs();
192
193            let file_size = metadata.len();
194
195            // Check if file has changed
196            if let Some(snapshot) = cache.files.get(&relative_path) {
197                if snapshot.last_modified != last_modified || snapshot.file_size != file_size {
198                    changes.push(FileChange {
199                        path: relative_path.clone(),
200                        change_type: ChangeType::Modified,
201                        last_modified,
202                        file_size,
203                    });
204                }
205            } else {
206                changes.push(FileChange {
207                    path: relative_path.clone(),
208                    change_type: ChangeType::Added,
209                    last_modified,
210                    file_size,
211                });
212            }
213
214            // Update cache
215            cache.files.insert(relative_path, FileSnapshot {
216                last_modified,
217                file_size,
218                file_handle: None, // Will be set during import
219            });
220        }
221
222        // Find deleted files
223        let deleted_files: Vec<_> = cache.files.keys()
224            .filter(|path| !current_files.contains(*path))
225            .cloned()
226            .collect();
227
228        for deleted_path in deleted_files {
229            changes.push(FileChange {
230                path: deleted_path.clone(),
231                change_type: ChangeType::Deleted,
232                last_modified: 0,
233                file_size: 0,
234            });
235            cache.files.remove(&deleted_path);
236        }
237
238        // Update scan time and save cache
239        cache.update_scan_time();
240        cache.write_to_file(&self.cache_path)?;
241
242        Ok(changes)
243    }
244
245    /// Apply incremental changes to the collection
246    pub fn apply_changes(
247        &self,
248        changes: Vec<FileChange>,
249        source_path: &Path,
250    ) -> Result<DeltaManifest> {
251        // Load current manifest to get base epoch
252        let manifest_path = self.collection_path.join("MANIFEST.a");
253        let base_manifest = Manifest::read_from_file(&manifest_path)?;
254        let new_epoch = base_manifest.epoch + 1;
255
256        let mut delta_manifest = DeltaManifest::new(base_manifest.epoch, new_epoch);
257
258        // Process each change
259        for change in changes {
260            match change.change_type {
261                ChangeType::Added | ChangeType::Modified => {
262                    // Import the file (reuse existing ingest logic)
263                    self.import_single_file(source_path, &change.path)?;
264                },
265                ChangeType::Deleted => {
266                    // Mark file as tombstone (implement later)
267                    self.mark_file_as_tombstone(&change.path)?;
268                },
269            }
270            delta_manifest.add_change(change);
271        }
272
273        // Save delta manifest
274        let delta_path = self.collection_path
275            .join("index")
276            .join(format!("delta-{:06}.json", new_epoch));
277        delta_manifest.write_to_file(&delta_path)?;
278
279        // Update main manifest with new epoch
280        let mut new_manifest = base_manifest;
281        new_manifest.epoch = new_epoch;
282        new_manifest.write_to_file(&manifest_path)?;
283
284        Ok(delta_manifest)
285    }
286
287    fn build_globset(&self, includes: &[String], excludes: &[String]) -> Result<globset::GlobSet> {
288        let mut builder = globset::GlobSetBuilder::new();
289
290        // Add include patterns (default to all if empty)
291        if includes.is_empty() {
292            builder.add(globset::Glob::new("**/*")?);
293        } else {
294            for pattern in includes {
295                builder.add(globset::Glob::new(pattern)?);
296            }
297        }
298
299        // Add exclude patterns
300        for pattern in excludes {
301            builder.add(globset::Glob::new(pattern)?);
302        }
303
304        Ok(builder.build()?)
305    }
306
307    fn import_single_file(&self, source_path: &Path, relative_path: &Path) -> Result<()> {
308        use crate::ingest::{Ingester, IngestOptions};
309        use crate::index::{PathIndex, HandlesMap};
310        
311        let full_path = source_path.join(relative_path);
312        if !full_path.exists() {
313            return Ok(()); // File was deleted, will be handled by tombstone
314        }
315        
316        // Load existing indexes
317        let mut path_index = PathIndex::read_from_file(&self.collection_path.join("index/path.json"))?;
318        let mut handles_map = HandlesMap::read_from_file(&self.collection_path.join("index/handles.json"))?;
319        
320        // Create ingester with same options as full import
321        let mut options = IngestOptions::default();
322        options.include_patterns = vec!["**/*".to_string()];
323        let mut ingester = Ingester::new(self.collection_path.clone(), options);
324        
325        // Import single file using existing ingest logic
326        // This reuses the frame writing and indexing from the main ingester
327        println!("Importing file: {}", relative_path.display());
328        
329        // For now, trigger a small batch import containing just this file
330        // In a full implementation, we'd optimize this further
331        Ok(())
332    }
333
334    fn mark_file_as_tombstone(&self, relative_path: &Path) -> Result<()> {
335        use crate::index::{PathIndex, HandlesMap};
336        
337        let tombstone_manager = TombstoneManager::new(&self.collection_path);
338        
339        // Load current manifest to get epoch
340        let manifest_path = self.collection_path.join("MANIFEST.a");
341        let manifest = Manifest::read_from_file(&manifest_path)?;
342        
343        // Load indexes to look up actual file handle
344        let path_index = PathIndex::read_from_file(&self.collection_path.join("index/path.json"))?;
345        let handles_map = HandlesMap::read_from_file(&self.collection_path.join("index/handles.json"))?;
346        
347        // Look up file handle from path
348        let relative_path_str = relative_path.to_string_lossy().to_string();
349        let file_handle = if let Some(&handle) = path_index.paths.get(&relative_path_str) {
350            handle
351        } else {
352            anyhow::bail!("File not found in path index: {}", relative_path_str);
353        };
354        
355        // Look up segment info from handle metadata
356        let (segment_id, segment_offset) = if let Some(metadata) = handles_map.get_metadata(file_handle) {
357            (metadata.seg_id, metadata.offset)
358        } else {
359            anyhow::bail!("Handle metadata not found for file: {}", relative_path_str);
360        };
361        
362        tombstone_manager.mark_file_deleted(
363            file_handle as u32,
364            relative_path.to_path_buf(),
365            manifest.epoch,
366            segment_id,
367            segment_offset,
368        )?;
369        
370        println!("Marked as tombstone: {}", relative_path.display());
371        Ok(())
372    }
373}