scribe_scaling/
streaming.rs

1//! Progressive loading and streaming for memory-efficient file processing.
2//!
3//! This module provides true streaming file discovery and processing, avoiding
4//! the memory bottleneck of loading all file metadata at once.
5
6use std::cmp::{Ordering, Reverse};
7use std::collections::BinaryHeap;
8use std::path::{Path, PathBuf};
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::SystemTime;
12
13use futures::{Stream, StreamExt};
14use rayon::prelude::*;
15use serde::{Deserialize, Serialize};
16use tokio::fs;
17use tracing::{debug, info, warn};
18
19use crate::error::{ScalingError, ScalingResult};
20use scribe_core::{file, FileInfo, FileType};
21
22/// File metadata for streaming operations
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
24pub struct FileMetadata {
25    /// File path
26    pub path: PathBuf,
27
28    /// File size in bytes
29    pub size: u64,
30
31    /// Last modified time
32    pub modified: SystemTime,
33
34    /// Detected programming language
35    pub language: String,
36
37    /// File type classification
38    pub file_type: String,
39}
40
41/// Configuration for streaming operations
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct StreamingConfig {
44    /// Whether to enable streaming (vs loading all at once)
45    pub enable_streaming: bool,
46
47    /// Number of files to process concurrently
48    pub concurrency_limit: usize,
49
50    /// Memory limit for streaming operations (bytes)
51    pub memory_limit: usize,
52
53    /// Maximum files to hold in selection heap
54    pub selection_heap_size: usize,
55}
56
57impl Default for StreamingConfig {
58    fn default() -> Self {
59        Self {
60            enable_streaming: true,
61            concurrency_limit: num_cpus::get() * 2,
62            memory_limit: 100 * 1024 * 1024, // 100MB
63            selection_heap_size: 10000,      // Maximum files in selection heap
64        }
65    }
66}
67
68/// Scored file for heap-based selection
69#[derive(Debug, Clone, PartialEq)]
70pub struct ScoredFile {
71    pub metadata: FileMetadata,
72    pub score: f64,
73    pub tokens: usize,
74}
75
76impl Eq for ScoredFile {}
77
78impl PartialOrd for ScoredFile {
79    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80        Some(self.cmp(other))
81    }
82}
83
84impl Ord for ScoredFile {
85    fn cmp(&self, other: &Self) -> Ordering {
86        // For min-heap: higher scores should be "less" so they get removed last
87        // We want to keep highest scores, so lower scores should be removed first
88        self.score
89            .partial_cmp(&other.score)
90            .unwrap_or(Ordering::Equal)
91            .then_with(|| other.tokens.cmp(&self.tokens)) // Prefer smaller token files when scores equal
92    }
93}
94
95/// Streaming file selector with heap-based optimization
96pub struct StreamingSelector {
97    config: StreamingConfig,
98}
99
100impl StreamingSelector {
101    /// Create new streaming selector
102    pub fn new(config: StreamingConfig) -> Self {
103        Self { config }
104    }
105
106    /// Create with default configuration
107    pub fn with_defaults() -> Self {
108        Self::new(StreamingConfig::default())
109    }
110
111    /// Stream files from directory with intelligent selection
112    ///
113    /// This uses O(N log K) complexity instead of O(N log N) where:
114    /// - N = total files in repository
115    /// - K = target number of files to select
116    pub async fn select_files_streaming(
117        &self,
118        repo_path: &Path,
119        target_count: usize,
120        token_budget: usize,
121        score_fn: impl Fn(&FileMetadata) -> f64 + Send + Sync + 'static,
122        token_fn: impl Fn(&FileMetadata) -> usize + Send + Sync + 'static,
123    ) -> ScalingResult<Vec<ScoredFile>> {
124        info!("Starting streaming file selection for: {:?}", repo_path);
125        info!(
126            "Target: {} files, Budget: {} tokens",
127            target_count, token_budget
128        );
129
130        if !repo_path.exists() {
131            return Err(ScalingError::path(
132                "Repository path does not exist",
133                repo_path,
134            ));
135        }
136
137        if !repo_path.is_dir() {
138            return Err(ScalingError::path(
139                "Repository path is not a directory",
140                repo_path,
141            ));
142        }
143
144        // Use min-heap to keep only the best K files in memory
145        let mut selection_heap: BinaryHeap<Reverse<ScoredFile>> = BinaryHeap::new();
146        let mut total_files_seen = 0usize;
147        let mut total_tokens_used = 0usize;
148
149        // Create file discovery stream
150        let file_stream = self.create_file_stream(repo_path).await?;
151
152        // Process files in parallel batches
153        let mut file_stream = Box::pin(file_stream);
154
155        while let Some(file_batch) = file_stream.next().await {
156            total_files_seen += file_batch.len();
157
158            // Score files in parallel
159            let scored_batch: Vec<ScoredFile> = file_batch
160                .into_par_iter()
161                .filter_map(|metadata| {
162                    let score = score_fn(&metadata);
163                    let tokens = token_fn(&metadata);
164
165                    // Skip files that would exceed budget immediately
166                    if tokens > token_budget {
167                        return None;
168                    }
169
170                    Some(ScoredFile {
171                        metadata,
172                        score,
173                        tokens,
174                    })
175                })
176                .collect();
177
178            // Update selection heap with O(log K) insertions
179            for scored_file in scored_batch {
180                if selection_heap.len() < target_count {
181                    // Heap not full, add directly
182                    total_tokens_used += scored_file.tokens;
183                    selection_heap.push(Reverse(scored_file));
184                } else if let Some(worst) = selection_heap.peek() {
185                    // Check if this file is better than the worst in heap
186                    if scored_file.score > worst.0.score {
187                        // Remove worst file
188                        if let Some(Reverse(removed)) = selection_heap.pop() {
189                            total_tokens_used = total_tokens_used.saturating_sub(removed.tokens);
190                        }
191
192                        // Add new file if it fits in budget
193                        if total_tokens_used + scored_file.tokens <= token_budget {
194                            total_tokens_used += scored_file.tokens;
195                            selection_heap.push(Reverse(scored_file));
196                        } else {
197                            // Try to fit by removing files from heap
198                            self.optimize_heap_for_budget(
199                                &mut selection_heap,
200                                &mut total_tokens_used,
201                                token_budget,
202                            );
203                            if total_tokens_used + scored_file.tokens <= token_budget {
204                                total_tokens_used += scored_file.tokens;
205                                selection_heap.push(Reverse(scored_file));
206                            }
207                        }
208                    }
209                }
210            }
211
212            // Log progress every 10k files
213            if total_files_seen % 10000 == 0 {
214                debug!(
215                    "Processed {} files, selected {} candidates",
216                    total_files_seen,
217                    selection_heap.len()
218                );
219            }
220        }
221
222        info!(
223            "Streaming selection complete: {} files processed, {} selected",
224            total_files_seen,
225            selection_heap.len()
226        );
227        info!(
228            "Token utilization: {}/{} ({:.1}%)",
229            total_tokens_used,
230            token_budget,
231            (total_tokens_used as f64 / token_budget as f64) * 100.0
232        );
233
234        // Convert heap to sorted vec (highest scores first)
235        let mut selected: Vec<ScoredFile> =
236            selection_heap.into_iter().map(|Reverse(sf)| sf).collect();
237
238        // Sort by score descending for final output
239        selected.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal));
240
241        Ok(selected)
242    }
243
244    /// Create async stream of file metadata from directory
245    async fn create_file_stream(
246        &self,
247        repo_path: &Path,
248    ) -> ScalingResult<impl Stream<Item = Vec<FileMetadata>> + use<'_>> {
249        let walkdir_iter = walkdir::WalkDir::new(repo_path)
250            .follow_links(false)
251            .max_depth(20) // Reasonable depth limit
252            .into_iter();
253
254        // Convert walkdir iterator to async stream
255        let concurrency_limit = self.config.concurrency_limit;
256        let file_stream = futures::stream::iter(walkdir_iter)
257            .filter_map(move |entry| async move {
258                match entry {
259                    Ok(entry) if entry.file_type().is_file() => {
260                        Some(Self::create_file_metadata_static(entry).await)
261                    }
262                    Ok(_) => None, // Skip directories
263                    Err(e) => {
264                        warn!("Skipping file due to error: {}", e);
265                        None
266                    }
267                }
268            })
269            .filter_map(|result| async move {
270                match result {
271                    Ok(metadata) => Some(metadata),
272                    Err(e) => {
273                        warn!("Failed to create file metadata: {}", e);
274                        None
275                    }
276                }
277            })
278            .chunks(concurrency_limit); // Batch for parallel processing
279
280        Ok(file_stream)
281    }
282
283    /// Create file metadata from walkdir entry (static version)
284    async fn create_file_metadata_static(entry: walkdir::DirEntry) -> ScalingResult<FileMetadata> {
285        let path = entry.path().to_path_buf();
286
287        let (size, modified) = match entry.metadata() {
288            Ok(metadata) => {
289                let size = metadata.len();
290                let modified = metadata.modified().unwrap_or_else(|_| SystemTime::now());
291                (size, modified)
292            }
293            Err(_) => (0, SystemTime::now()),
294        };
295
296        let language = detect_language(&path);
297        let file_type = classify_file_type(&path);
298
299        Ok(FileMetadata {
300            path,
301            size,
302            modified,
303            language,
304            file_type,
305        })
306    }
307
308    /// Optimize heap to fit within token budget by removing lowest-value files
309    fn optimize_heap_for_budget(
310        &self,
311        heap: &mut BinaryHeap<Reverse<ScoredFile>>,
312        current_tokens: &mut usize,
313        budget: usize,
314    ) {
315        while *current_tokens > budget && !heap.is_empty() {
316            if let Some(Reverse(removed)) = heap.pop() {
317                *current_tokens = current_tokens.saturating_sub(removed.tokens);
318            }
319        }
320    }
321}
322
323/// Fast language detection based on file extension
324fn detect_language(path: &Path) -> String {
325    let extension = path
326        .extension()
327        .and_then(|s| s.to_str())
328        .map(|s| s.to_lowercase());
329
330    if matches!(extension.as_deref(), Some("h" | "hpp" | "hxx")) {
331        return "Header".to_string();
332    }
333
334    if path
335        .file_name()
336        .and_then(|s| s.to_str())
337        .map(|s| s.eq_ignore_ascii_case("dockerfile"))
338        .unwrap_or(false)
339    {
340        return "Dockerfile".to_string();
341    }
342
343    let language = file::detect_language_from_path(path);
344    file::language_display_name(&language).to_string()
345}
346
347/// Fast file type classification
348fn classify_file_type(path: &Path) -> String {
349    let extension = path
350        .extension()
351        .and_then(|s| s.to_str())
352        .map(|s| s.to_lowercase())
353        .unwrap_or_default();
354
355    let language = file::detect_language_from_path(path);
356    let file_type =
357        FileInfo::classify_file_type(path.to_string_lossy().as_ref(), &language, &extension);
358
359    match file_type {
360        FileType::Test { .. } => "Test".to_string(),
361        FileType::Documentation { .. } => "Documentation".to_string(),
362        FileType::Configuration { .. } => "Configuration".to_string(),
363        FileType::Binary => "Binary".to_string(),
364        FileType::Generated => "Generated".to_string(),
365        FileType::Source { .. } => match extension.as_str() {
366            "jsx" | "tsx" | "vue" | "svelte" => "Frontend".to_string(),
367            "html" | "htm" | "css" | "scss" | "sass" | "less" => "Web".to_string(),
368            "sh" | "bash" | "bat" | "ps1" => "Script".to_string(),
369            _ => "Source".to_string(),
370        },
371        FileType::Unknown => match extension.as_str() {
372            "png" | "jpg" | "jpeg" | "gif" | "svg" | "ico" => "Image".to_string(),
373            "pdf" | "doc" | "docx" | "ppt" | "pptx" => "Document".to_string(),
374            "sql" => "Database".to_string(),
375            "xml" | "xsd" | "xsl" => "Markup".to_string(),
376            "json" | "yaml" | "yml" | "toml" | "ini" | "cfg" | "conf" => {
377                "Configuration".to_string()
378            }
379            _ => "Other".to_string(),
380        },
381    }
382}
383
384/// Legacy file chunk for backwards compatibility
385#[derive(Debug, Clone)]
386pub struct FileChunk {
387    /// Files in this chunk
388    pub files: Vec<FileMetadata>,
389
390    /// Chunk index
391    pub index: usize,
392
393    /// Total number of chunks
394    pub total_chunks: usize,
395}
396
397impl FileChunk {
398    /// Create a new file chunk
399    pub fn new(files: Vec<FileMetadata>, index: usize, total_chunks: usize) -> Self {
400        Self {
401            files,
402            index,
403            total_chunks,
404        }
405    }
406
407    /// Get the number of files in this chunk
408    pub fn len(&self) -> usize {
409        self.files.len()
410    }
411
412    /// Check if the chunk is empty
413    pub fn is_empty(&self) -> bool {
414        self.files.is_empty()
415    }
416
417    /// Get total size of all files in this chunk
418    pub fn total_size(&self) -> u64 {
419        self.files.iter().map(|f| f.size).sum()
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use std::fs;
427    use tempfile::TempDir;
428
429    #[tokio::test]
430    async fn test_streaming_selector_creation() {
431        let selector = StreamingSelector::with_defaults();
432        assert!(selector.config.enable_streaming);
433        assert!(selector.config.concurrency_limit > 0);
434    }
435
436    #[tokio::test]
437    async fn test_streaming_file_selection() {
438        let temp_dir = TempDir::new().unwrap();
439        let repo_path = temp_dir.path();
440
441        // Create test files
442        fs::create_dir_all(repo_path.join("src")).unwrap();
443        for i in 0..100 {
444            let content = format!("// File {}\nfn main() {{ println!(\"Hello {}\"); }}", i, i);
445            fs::write(
446                repo_path.join("src").join(format!("file_{}.rs", i)),
447                content,
448            )
449            .unwrap();
450        }
451
452        let selector = StreamingSelector::with_defaults();
453
454        // Simple scoring function
455        let score_fn = |file: &FileMetadata| {
456            if file.path.to_string_lossy().contains("file_1") {
457                2.0 // Boost files with "1" in name
458            } else {
459                1.0
460            }
461        };
462
463        // Simple token estimation
464        let token_fn = |file: &FileMetadata| (file.size / 4) as usize;
465
466        let selected = selector
467            .select_files_streaming(repo_path, 10, 10000, score_fn, token_fn)
468            .await
469            .unwrap();
470
471        // Should select some files
472        assert!(!selected.is_empty());
473        assert!(selected.len() <= 10);
474
475        // Files should be sorted by score (highest first)
476        for i in 1..selected.len() {
477            assert!(selected[i - 1].score >= selected[i].score);
478        }
479    }
480
481    #[test]
482    fn test_scored_file_ordering() {
483        let file1 = FileMetadata {
484            path: PathBuf::from("test1.rs"),
485            size: 100,
486            modified: SystemTime::now(),
487            language: "Rust".to_string(),
488            file_type: "Source".to_string(),
489        };
490
491        let file2 = file1.clone();
492
493        let scored1 = ScoredFile {
494            metadata: file1,
495            score: 2.0,
496            tokens: 100,
497        };
498        let scored2 = ScoredFile {
499            metadata: file2,
500            score: 1.0,
501            tokens: 50,
502        };
503
504        // Higher score should be "greater" in our ordering
505        assert!(scored1 > scored2);
506
507        // Test in heap
508        let mut heap = BinaryHeap::new();
509        heap.push(Reverse(scored1.clone()));
510        heap.push(Reverse(scored2.clone()));
511
512        // Min-heap with Reverse should give us the lowest score first (for removal)
513        assert_eq!(heap.pop().unwrap().0.score, 1.0);
514        assert_eq!(heap.pop().unwrap().0.score, 2.0);
515    }
516
517    #[test]
518    fn test_language_detection() {
519        assert_eq!(detect_language(&PathBuf::from("test.rs")), "Rust");
520        assert_eq!(detect_language(&PathBuf::from("test.py")), "Python");
521        assert_eq!(detect_language(&PathBuf::from("test.js")), "JavaScript");
522        assert_eq!(detect_language(&PathBuf::from("test.unknown")), "Unknown");
523    }
524
525    #[test]
526    fn test_file_type_classification() {
527        assert_eq!(classify_file_type(&PathBuf::from("main.rs")), "Source");
528        assert_eq!(
529            classify_file_type(&PathBuf::from("README.md")),
530            "Documentation"
531        );
532        assert_eq!(
533            classify_file_type(&PathBuf::from("config.json")),
534            "Configuration"
535        );
536        assert_eq!(classify_file_type(&PathBuf::from("style.css")), "Web");
537        assert_eq!(classify_file_type(&PathBuf::from("image.png")), "Image");
538    }
539}