siftdb_core/
ingest.rs

1use std::path::Path;
2use std::fs;
3use std::io::Read;
4use std::collections::HashMap;
5use ignore::WalkBuilder;
6use anyhow::Result;
7use crate::types::*;
8use crate::storage::{SegmentWriter, generate_line_table};
9use crate::index::{PathIndex, HandlesMap};
10use crate::locking::{SWMRLockManager, WriteLock};
11
12pub struct IngestOptions {
13    pub include_patterns: Vec<String>,
14    pub exclude_patterns: Vec<String>,
15    pub max_file_bytes: u64,
16    pub binary_ratio_threshold: f32,
17}
18
19impl Default for IngestOptions {
20    fn default() -> Self {
21        Self {
22            include_patterns: vec!["**/*".to_string()],
23            exclude_patterns: vec![
24                "**/target/**".to_string(),
25                "**/node_modules/**".to_string(),
26                "**/.git/**".to_string(),
27                "**/build/**".to_string(),
28                "**/dist/**".to_string(),
29            ],
30            max_file_bytes: 10 * 1024 * 1024, // 10MB
31            binary_ratio_threshold: 0.3, // 30% non-printable chars = binary
32        }
33    }
34}
35
36pub struct Ingester {
37    collection_path: std::path::PathBuf,
38    options: IngestOptions,
39    lock_manager: SWMRLockManager,
40}
41
42impl Ingester {
43    pub fn new(collection_path: std::path::PathBuf, options: IngestOptions) -> Self {
44        let lock_manager = SWMRLockManager::new(&collection_path);
45        Self {
46            collection_path,
47            options,
48            lock_manager,
49        }
50    }
51    
52    /// Ingest from filesystem with default lock configuration
53    pub fn ingest_from_fs(&mut self, source_path: &Path) -> Result<IngestStats> {
54        self.ingest_from_fs_with_lock_config(source_path, 300, "sift-import".to_string())
55    }
56    
57    /// Ingest from filesystem with custom lock configuration
58    pub fn ingest_from_fs_with_lock_config(&mut self, source_path: &Path, timeout_secs: u64, holder_info: String) -> Result<IngestStats> {
59        // Acquire write lock before starting ingestion - temporarily simplified to avoid borrowing conflicts
60        // TODO: Properly integrate lock management with ingestion lifecycle
61        let lock_manager = SWMRLockManager::new(&self.collection_path);
62        let _write_lock = lock_manager.acquire_write_lock(timeout_secs, holder_info)?;
63        self.ingest_from_fs_impl(source_path)
64    }
65    
66    /// Internal implementation of ingestion (assumes lock is already held)
67    fn ingest_from_fs_impl(&mut self, source_path: &Path) -> Result<IngestStats> {
68        let mut stats = IngestStats::default();
69        let mut ingested_content = HashMap::new();
70        let mut path_mappings = HashMap::new();
71        let mut handle_metadata = HashMap::new();
72        
73        // Load existing indexes or create new ones
74        let mut path_index = if self.collection_path.join("index/path.json").exists() {
75            PathIndex::read_from_file(&self.collection_path.join("index/path.json"))?
76        } else {
77            PathIndex::new()
78        };
79        let mut handles_map = if self.collection_path.join("index/handles.json").exists() {
80            HandlesMap::read_from_file(&self.collection_path.join("index/handles.json"))?
81        } else {
82            HandlesMap::new()
83        };
84        
85        // Create segment writer
86        let store_path = self.collection_path.join("store");
87        let seg_id = self.find_next_segment_id(&store_path)?;
88        let mut writer = SegmentWriter::new(&store_path, seg_id)?;
89        
90        // Walk directory with ignore patterns
91        let walker = WalkBuilder::new(source_path)
92            .hidden(false) // Include hidden files
93            .git_ignore(true)
94            .git_global(true)
95            .git_exclude(true)
96            .build();
97        
98        for entry in walker {
99            let entry = entry?;
100            let path = entry.path();
101            
102            // Skip directories
103            if path.is_dir() {
104                continue;
105            }
106            
107            // Apply filters
108            if !self.should_include_file(path)? {
109                stats.skipped += 1;
110                continue;
111            }
112            
113            // Read file content
114            let content = match fs::read(path) {
115                Ok(content) => content,
116                Err(e) => {
117                    eprintln!("Warning: Failed to read {}: {}", path.display(), e);
118                    stats.errors += 1;
119                    continue;
120                }
121            };
122            
123            // Check file size
124            if content.len() > self.options.max_file_bytes as usize {
125                stats.skipped += 1;
126                continue;
127            }
128            
129            // Check if binary
130            if self.is_binary(&content) {
131                stats.skipped += 1;
132                continue;
133            }
134            
135            // Generate relative path from source
136            let relative_path = path.strip_prefix(source_path)
137                .unwrap_or(path)
138                .to_string_lossy()
139                .to_string();
140            
141            // Detect language
142            let lang = detect_language(path);
143            
144            // Generate line table
145            let line_table = generate_line_table(&content);
146            
147            // Create frame
148            let header = FileHeader::new(&content, &line_table, lang);
149            let frame = Frame {
150                header,
151                content: content.clone(),
152                line_table,
153            };
154            
155            // Write frame and get handle
156            let handle = path_index.add_path(relative_path.clone());
157            let metadata = writer.write_frame(&frame)?;
158            handles_map.add_handle(handle, metadata.clone());
159            
160            ingested_content.insert(handle, content);
161            path_mappings.insert(relative_path, handle);
162            handle_metadata.insert(handle, metadata);
163            
164            stats.ingested += 1;
165            
166            if stats.ingested % 100 == 0 {
167                println!("Ingested {} files...", stats.ingested);
168            }
169        }
170        
171        // Build and save indexes
172        let mut path_index = PathIndex::new();
173        for (path, handle) in path_mappings {
174            path_index.paths.insert(path, handle);
175        }
176        
177        let mut handles_map = HandlesMap::new();
178        for (handle, metadata) in handle_metadata {
179            handles_map.add_handle(handle, metadata);
180        }
181        
182        path_index.write_to_file(&self.collection_path.join("index/path.json"))?;
183        handles_map.write_to_file(&self.collection_path.join("index/handles.json"))?;
184        
185        // Build inverted index from file contents for O(1) search
186        println!("Building inverted index for O(1) search...");
187        let mut file_contents = HashMap::new();
188        
189        for (file_handle, content) in &ingested_content {
190            if let Ok(content_str) = String::from_utf8(content.clone()) {
191                file_contents.insert(*file_handle as u32, content_str);
192            }
193        }
194        
195        if !file_contents.is_empty() {
196            let inverted_index = crate::inverted_index::InvertedIndex::build_from_content(
197                file_contents,
198                &self.collection_path.join("index/terms.fst"),
199                &self.collection_path.join("index/posting_lists.json")
200            )?;
201            println!("✓ Inverted index built with {} terms", inverted_index.term_count());
202        }
203        
204        println!(
205            "Ingestion complete: {} files ingested, {} skipped, {} errors",
206            stats.ingested, stats.skipped, stats.errors
207        );
208        
209        Ok(stats)
210    }
211    
212    fn should_include_file(&self, path: &Path) -> Result<bool> {
213        let path_str = path.to_string_lossy();
214        
215        // Check exclude patterns first
216        for pattern in &self.options.exclude_patterns {
217            if self.glob_match(pattern, &path_str) {
218                return Ok(false);
219            }
220        }
221        
222        // Check include patterns
223        for pattern in &self.options.include_patterns {
224            if self.glob_match(pattern, &path_str) {
225                return Ok(true);
226            }
227        }
228        
229        Ok(false)
230    }
231    
232    fn is_binary(&self, content: &[u8]) -> bool {
233        if content.is_empty() {
234            return false;
235        }
236        
237        let mut non_printable = 0;
238        for &byte in content.iter().take(1024) { // Check first 1KB
239            if byte < 32 && byte != 9 && byte != 10 && byte != 13 {
240                non_printable += 1;
241            }
242        }
243        
244        let ratio = non_printable as f32 / content.len().min(1024) as f32;
245        ratio > self.options.binary_ratio_threshold
246    }
247    
248    fn glob_match(&self, pattern: &str, text: &str) -> bool {
249        // Simple glob matching for MVP
250        if pattern == "**/*" {
251            return true;
252        }
253        
254        // Handle **/*.ext patterns (recursive file extension matching)
255        if pattern.starts_with("**/") {
256            let suffix = &pattern[3..];
257            if suffix.starts_with("*.") {
258                // Extract extension from pattern like "*.rs"
259                let ext = &suffix[1..]; // Include the dot
260                return text.ends_with(ext);
261            } else {
262                return text.ends_with(suffix);
263            }
264        }
265        
266        if pattern.starts_with("**/") && pattern.ends_with("/**") {
267            let dir_name = &pattern[3..pattern.len()-3];
268            return text.contains(&format!("/{}/", dir_name)) || 
269                   text.starts_with(&format!("{}/", dir_name));
270        }
271        
272        if pattern.ends_with("/**") {
273            let prefix = &pattern[..pattern.len()-3];
274            return text.starts_with(prefix);
275        }
276        
277        // Handle simple extension patterns like "*.rs"
278        if pattern.starts_with("*.") {
279            let ext = &pattern[1..]; // Include the dot
280            return text.ends_with(ext);
281        }
282        
283        // Handle exact filename matches (check if pattern matches just the filename)
284        if !pattern.contains('/') && !pattern.contains('*') {
285            if let Some(filename) = text.split('/').last() {
286                if filename == pattern {
287                    return true;
288                }
289            }
290        }
291        
292        // Wildcard matching
293        if pattern.contains('*') {
294            return self.wildcard_match(pattern, text);
295        }
296        
297        pattern == text
298    }
299    
300    fn wildcard_match(&self, pattern: &str, text: &str) -> bool {
301        let pattern_chars: Vec<char> = pattern.chars().collect();
302        let text_chars: Vec<char> = text.chars().collect();
303        
304        self.match_recursive(&pattern_chars, &text_chars, 0, 0)
305    }
306    
307    fn match_recursive(&self, pattern: &[char], text: &[char], p_idx: usize, t_idx: usize) -> bool {
308        if p_idx == pattern.len() {
309            return t_idx == text.len();
310        }
311        
312        if pattern[p_idx] == '*' {
313            // Try matching 0 or more characters
314            for i in t_idx..=text.len() {
315                if self.match_recursive(pattern, text, p_idx + 1, i) {
316                    return true;
317                }
318            }
319            false
320        } else if t_idx < text.len() && (pattern[p_idx] == text[t_idx] || pattern[p_idx] == '?') {
321            self.match_recursive(pattern, text, p_idx + 1, t_idx + 1)
322        } else {
323            false
324        }
325    }
326    
327    fn find_next_segment_id(&self, store_path: &Path) -> Result<u32> {
328        let mut max_id = 0;
329        
330        if store_path.exists() {
331            for entry in fs::read_dir(store_path)? {
332                let entry = entry?;
333                let name = entry.file_name();
334                let name_str = name.to_string_lossy();
335                
336                if name_str.starts_with("seg-") && name_str.ends_with(".sift") {
337                    if let Some(id_str) = name_str.strip_prefix("seg-").and_then(|s| s.strip_suffix(".sift")) {
338                        if let Ok(id) = id_str.parse::<u32>() {
339                            max_id = max_id.max(id);
340                        }
341                    }
342                }
343            }
344        }
345        
346        Ok(max_id + 1)
347    }
348}
349
350#[derive(Debug, Clone)]
351pub struct IngestStats {
352    pub ingested: u64,
353    pub skipped: u64,
354    pub errors: u64,
355}
356
357impl IngestStats {
358    pub fn new() -> Self {
359        Self {
360            ingested: 0,
361            skipped: 0,
362            errors: 0,
363        }
364    }
365}
366
367impl Default for IngestStats {
368    fn default() -> Self {
369        Self::new()
370    }
371}