1use std::cmp::{Ordering, Reverse};
7use std::collections::BinaryHeap;
8use std::path::{Path, PathBuf};
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::SystemTime;
12
13use futures::{Stream, StreamExt};
14use rayon::prelude::*;
15use serde::{Deserialize, Serialize};
16use tokio::fs;
17use tracing::{debug, info, warn};
18
19use crate::error::{ScalingError, ScalingResult};
20use scribe_core::{file, FileInfo, FileType};
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
24pub struct FileMetadata {
25 pub path: PathBuf,
27
28 pub size: u64,
30
31 pub modified: SystemTime,
33
34 pub language: String,
36
37 pub file_type: String,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct StreamingConfig {
44 pub enable_streaming: bool,
46
47 pub concurrency_limit: usize,
49
50 pub memory_limit: usize,
52
53 pub selection_heap_size: usize,
55}
56
57impl Default for StreamingConfig {
58 fn default() -> Self {
59 Self {
60 enable_streaming: true,
61 concurrency_limit: num_cpus::get() * 2,
62 memory_limit: 100 * 1024 * 1024, selection_heap_size: 10000, }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq)]
70pub struct ScoredFile {
71 pub metadata: FileMetadata,
72 pub score: f64,
73 pub tokens: usize,
74}
75
76impl Eq for ScoredFile {}
77
78impl PartialOrd for ScoredFile {
79 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80 Some(self.cmp(other))
81 }
82}
83
84impl Ord for ScoredFile {
85 fn cmp(&self, other: &Self) -> Ordering {
86 self.score
89 .partial_cmp(&other.score)
90 .unwrap_or(Ordering::Equal)
91 .then_with(|| other.tokens.cmp(&self.tokens)) }
93}
94
95pub struct StreamingSelector {
97 config: StreamingConfig,
98}
99
100impl StreamingSelector {
101 pub fn new(config: StreamingConfig) -> Self {
103 Self { config }
104 }
105
106 pub fn with_defaults() -> Self {
108 Self::new(StreamingConfig::default())
109 }
110
111 pub async fn select_files_streaming(
117 &self,
118 repo_path: &Path,
119 target_count: usize,
120 token_budget: usize,
121 score_fn: impl Fn(&FileMetadata) -> f64 + Send + Sync + 'static,
122 token_fn: impl Fn(&FileMetadata) -> usize + Send + Sync + 'static,
123 ) -> ScalingResult<Vec<ScoredFile>> {
124 info!("Starting streaming file selection for: {:?}", repo_path);
125 info!(
126 "Target: {} files, Budget: {} tokens",
127 target_count, token_budget
128 );
129
130 if !repo_path.exists() {
131 return Err(ScalingError::path(
132 "Repository path does not exist",
133 repo_path,
134 ));
135 }
136
137 if !repo_path.is_dir() {
138 return Err(ScalingError::path(
139 "Repository path is not a directory",
140 repo_path,
141 ));
142 }
143
144 let mut selection_heap: BinaryHeap<Reverse<ScoredFile>> = BinaryHeap::new();
146 let mut total_files_seen = 0usize;
147 let mut total_tokens_used = 0usize;
148
149 let file_stream = self.create_file_stream(repo_path).await?;
151
152 let mut file_stream = Box::pin(file_stream);
154
155 while let Some(file_batch) = file_stream.next().await {
156 total_files_seen += file_batch.len();
157
158 let scored_batch: Vec<ScoredFile> = file_batch
160 .into_par_iter()
161 .filter_map(|metadata| {
162 let score = score_fn(&metadata);
163 let tokens = token_fn(&metadata);
164
165 if tokens > token_budget {
167 return None;
168 }
169
170 Some(ScoredFile {
171 metadata,
172 score,
173 tokens,
174 })
175 })
176 .collect();
177
178 for scored_file in scored_batch {
180 if selection_heap.len() < target_count {
181 total_tokens_used += scored_file.tokens;
183 selection_heap.push(Reverse(scored_file));
184 } else if let Some(worst) = selection_heap.peek() {
185 if scored_file.score > worst.0.score {
187 if let Some(Reverse(removed)) = selection_heap.pop() {
189 total_tokens_used = total_tokens_used.saturating_sub(removed.tokens);
190 }
191
192 if total_tokens_used + scored_file.tokens <= token_budget {
194 total_tokens_used += scored_file.tokens;
195 selection_heap.push(Reverse(scored_file));
196 } else {
197 self.optimize_heap_for_budget(
199 &mut selection_heap,
200 &mut total_tokens_used,
201 token_budget,
202 );
203 if total_tokens_used + scored_file.tokens <= token_budget {
204 total_tokens_used += scored_file.tokens;
205 selection_heap.push(Reverse(scored_file));
206 }
207 }
208 }
209 }
210 }
211
212 if total_files_seen % 10000 == 0 {
214 debug!(
215 "Processed {} files, selected {} candidates",
216 total_files_seen,
217 selection_heap.len()
218 );
219 }
220 }
221
222 info!(
223 "Streaming selection complete: {} files processed, {} selected",
224 total_files_seen,
225 selection_heap.len()
226 );
227 info!(
228 "Token utilization: {}/{} ({:.1}%)",
229 total_tokens_used,
230 token_budget,
231 (total_tokens_used as f64 / token_budget as f64) * 100.0
232 );
233
234 let mut selected: Vec<ScoredFile> =
236 selection_heap.into_iter().map(|Reverse(sf)| sf).collect();
237
238 selected.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal));
240
241 Ok(selected)
242 }
243
244 async fn create_file_stream(
246 &self,
247 repo_path: &Path,
248 ) -> ScalingResult<impl Stream<Item = Vec<FileMetadata>> + use<'_>> {
249 let walkdir_iter = walkdir::WalkDir::new(repo_path)
250 .follow_links(false)
251 .max_depth(20) .into_iter();
253
254 let concurrency_limit = self.config.concurrency_limit;
256 let file_stream = futures::stream::iter(walkdir_iter)
257 .filter_map(move |entry| async move {
258 match entry {
259 Ok(entry) if entry.file_type().is_file() => {
260 Some(Self::create_file_metadata_static(entry).await)
261 }
262 Ok(_) => None, Err(e) => {
264 warn!("Skipping file due to error: {}", e);
265 None
266 }
267 }
268 })
269 .filter_map(|result| async move {
270 match result {
271 Ok(metadata) => Some(metadata),
272 Err(e) => {
273 warn!("Failed to create file metadata: {}", e);
274 None
275 }
276 }
277 })
278 .chunks(concurrency_limit); Ok(file_stream)
281 }
282
283 async fn create_file_metadata_static(entry: walkdir::DirEntry) -> ScalingResult<FileMetadata> {
285 let path = entry.path().to_path_buf();
286
287 let (size, modified) = match entry.metadata() {
288 Ok(metadata) => {
289 let size = metadata.len();
290 let modified = metadata.modified().unwrap_or_else(|_| SystemTime::now());
291 (size, modified)
292 }
293 Err(_) => (0, SystemTime::now()),
294 };
295
296 let language = detect_language(&path);
297 let file_type = classify_file_type(&path);
298
299 Ok(FileMetadata {
300 path,
301 size,
302 modified,
303 language,
304 file_type,
305 })
306 }
307
308 fn optimize_heap_for_budget(
310 &self,
311 heap: &mut BinaryHeap<Reverse<ScoredFile>>,
312 current_tokens: &mut usize,
313 budget: usize,
314 ) {
315 while *current_tokens > budget && !heap.is_empty() {
316 if let Some(Reverse(removed)) = heap.pop() {
317 *current_tokens = current_tokens.saturating_sub(removed.tokens);
318 }
319 }
320 }
321}
322
323fn detect_language(path: &Path) -> String {
325 let extension = path
326 .extension()
327 .and_then(|s| s.to_str())
328 .map(|s| s.to_lowercase());
329
330 if matches!(extension.as_deref(), Some("h" | "hpp" | "hxx")) {
331 return "Header".to_string();
332 }
333
334 if path
335 .file_name()
336 .and_then(|s| s.to_str())
337 .map(|s| s.eq_ignore_ascii_case("dockerfile"))
338 .unwrap_or(false)
339 {
340 return "Dockerfile".to_string();
341 }
342
343 let language = file::detect_language_from_path(path);
344 file::language_display_name(&language).to_string()
345}
346
347fn classify_file_type(path: &Path) -> String {
349 let extension = path
350 .extension()
351 .and_then(|s| s.to_str())
352 .map(|s| s.to_lowercase())
353 .unwrap_or_default();
354
355 let language = file::detect_language_from_path(path);
356 let file_type =
357 FileInfo::classify_file_type(path.to_string_lossy().as_ref(), &language, &extension);
358
359 match file_type {
360 FileType::Test { .. } => "Test".to_string(),
361 FileType::Documentation { .. } => "Documentation".to_string(),
362 FileType::Configuration { .. } => "Configuration".to_string(),
363 FileType::Binary => "Binary".to_string(),
364 FileType::Generated => "Generated".to_string(),
365 FileType::Source { .. } => match extension.as_str() {
366 "jsx" | "tsx" | "vue" | "svelte" => "Frontend".to_string(),
367 "html" | "htm" | "css" | "scss" | "sass" | "less" => "Web".to_string(),
368 "sh" | "bash" | "bat" | "ps1" => "Script".to_string(),
369 _ => "Source".to_string(),
370 },
371 FileType::Unknown => match extension.as_str() {
372 "png" | "jpg" | "jpeg" | "gif" | "svg" | "ico" => "Image".to_string(),
373 "pdf" | "doc" | "docx" | "ppt" | "pptx" => "Document".to_string(),
374 "sql" => "Database".to_string(),
375 "xml" | "xsd" | "xsl" => "Markup".to_string(),
376 "json" | "yaml" | "yml" | "toml" | "ini" | "cfg" | "conf" => {
377 "Configuration".to_string()
378 }
379 _ => "Other".to_string(),
380 },
381 }
382}
383
384#[derive(Debug, Clone)]
386pub struct FileChunk {
387 pub files: Vec<FileMetadata>,
389
390 pub index: usize,
392
393 pub total_chunks: usize,
395}
396
397impl FileChunk {
398 pub fn new(files: Vec<FileMetadata>, index: usize, total_chunks: usize) -> Self {
400 Self {
401 files,
402 index,
403 total_chunks,
404 }
405 }
406
407 pub fn len(&self) -> usize {
409 self.files.len()
410 }
411
412 pub fn is_empty(&self) -> bool {
414 self.files.is_empty()
415 }
416
417 pub fn total_size(&self) -> u64 {
419 self.files.iter().map(|f| f.size).sum()
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use std::fs;
427 use tempfile::TempDir;
428
429 #[tokio::test]
430 async fn test_streaming_selector_creation() {
431 let selector = StreamingSelector::with_defaults();
432 assert!(selector.config.enable_streaming);
433 assert!(selector.config.concurrency_limit > 0);
434 }
435
436 #[tokio::test]
437 async fn test_streaming_file_selection() {
438 let temp_dir = TempDir::new().unwrap();
439 let repo_path = temp_dir.path();
440
441 fs::create_dir_all(repo_path.join("src")).unwrap();
443 for i in 0..100 {
444 let content = format!("// File {}\nfn main() {{ println!(\"Hello {}\"); }}", i, i);
445 fs::write(
446 repo_path.join("src").join(format!("file_{}.rs", i)),
447 content,
448 )
449 .unwrap();
450 }
451
452 let selector = StreamingSelector::with_defaults();
453
454 let score_fn = |file: &FileMetadata| {
456 if file.path.to_string_lossy().contains("file_1") {
457 2.0 } else {
459 1.0
460 }
461 };
462
463 let token_fn = |file: &FileMetadata| (file.size / 4) as usize;
465
466 let selected = selector
467 .select_files_streaming(repo_path, 10, 10000, score_fn, token_fn)
468 .await
469 .unwrap();
470
471 assert!(!selected.is_empty());
473 assert!(selected.len() <= 10);
474
475 for i in 1..selected.len() {
477 assert!(selected[i - 1].score >= selected[i].score);
478 }
479 }
480
481 #[test]
482 fn test_scored_file_ordering() {
483 let file1 = FileMetadata {
484 path: PathBuf::from("test1.rs"),
485 size: 100,
486 modified: SystemTime::now(),
487 language: "Rust".to_string(),
488 file_type: "Source".to_string(),
489 };
490
491 let file2 = file1.clone();
492
493 let scored1 = ScoredFile {
494 metadata: file1,
495 score: 2.0,
496 tokens: 100,
497 };
498 let scored2 = ScoredFile {
499 metadata: file2,
500 score: 1.0,
501 tokens: 50,
502 };
503
504 assert!(scored1 > scored2);
506
507 let mut heap = BinaryHeap::new();
509 heap.push(Reverse(scored1.clone()));
510 heap.push(Reverse(scored2.clone()));
511
512 assert_eq!(heap.pop().unwrap().0.score, 1.0);
514 assert_eq!(heap.pop().unwrap().0.score, 2.0);
515 }
516
517 #[test]
518 fn test_language_detection() {
519 assert_eq!(detect_language(&PathBuf::from("test.rs")), "Rust");
520 assert_eq!(detect_language(&PathBuf::from("test.py")), "Python");
521 assert_eq!(detect_language(&PathBuf::from("test.js")), "JavaScript");
522 assert_eq!(detect_language(&PathBuf::from("test.unknown")), "Unknown");
523 }
524
525 #[test]
526 fn test_file_type_classification() {
527 assert_eq!(classify_file_type(&PathBuf::from("main.rs")), "Source");
528 assert_eq!(
529 classify_file_type(&PathBuf::from("README.md")),
530 "Documentation"
531 );
532 assert_eq!(
533 classify_file_type(&PathBuf::from("config.json")),
534 "Configuration"
535 );
536 assert_eq!(classify_file_type(&PathBuf::from("style.css")), "Web");
537 assert_eq!(classify_file_type(&PathBuf::from("image.png")), "Image");
538 }
539}