scribe_scaling/
engine.rs

1//! Main scaling engine that integrates all optimization components.
2//!
3//! This module provides the primary interface for the scaling system,
4//! coordinating streaming, caching, parallel processing, adaptive configuration,
5//! and signature extraction for optimal performance at scale.
6
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12use serde::{Deserialize, Serialize};
13use tracing::{debug, error, info, warn};
14
15use crate::adaptive::AdaptiveConfig;
16use crate::caching::{compute_config_hash, compute_repository_hash, CacheConfig, ProcessingCache};
17use crate::error::{ScalingError, ScalingResult};
18use crate::memory::MemoryConfig;
19use crate::metrics::{BenchmarkResult, ScalingMetrics};
20use crate::parallel::ParallelConfig;
21use crate::signatures::SignatureConfig;
22use crate::streaming::{FileMetadata, StreamingConfig};
23use scribe_core::{file, FileInfo, FileType};
24
25/// Complete scaling configuration combining all subsystems
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ScalingConfig {
28    pub streaming: StreamingConfig,
29    pub caching: CacheConfig,
30    pub parallel: ParallelConfig,
31    pub adaptive: AdaptiveConfig,
32    pub signatures: SignatureConfig,
33    pub memory: MemoryConfig,
34
35    /// Token budget for intelligent selection (0 = unlimited)
36    pub token_budget: Option<usize>,
37
38    /// Enable intelligent file selection before processing
39    pub enable_intelligent_selection: bool,
40
41    /// Selection algorithm to use when intelligent selection is enabled
42    pub selection_algorithm: Option<String>,
43
44    /// Enable context positioning optimization (HEAD/MIDDLE/TAIL)
45    pub enable_context_positioning: bool,
46
47    /// Query for context positioning (affects HEAD section)
48    pub positioning_query: Option<String>,
49}
50
51impl Default for ScalingConfig {
52    fn default() -> Self {
53        Self {
54            streaming: StreamingConfig::default(),
55            caching: CacheConfig::default(),
56            parallel: ParallelConfig::default(),
57            adaptive: AdaptiveConfig::default(),
58            signatures: SignatureConfig::default(),
59            memory: MemoryConfig::default(),
60            token_budget: None,                  // Unlimited by default
61            enable_intelligent_selection: false, // Off by default for backward compatibility
62            selection_algorithm: None,           // Will use V5Integrated when enabled
63            enable_context_positioning: false,   // Off by default for backward compatibility
64            positioning_query: None,             // No query by default
65        }
66    }
67}
68
69impl ScalingConfig {
70    /// Create configuration optimized for small repositories
71    pub fn small_repository() -> Self {
72        Self {
73            streaming: StreamingConfig {
74                enable_streaming: false,
75                concurrency_limit: 2,
76                memory_limit: 50 * 1024 * 1024, // 50MB
77                selection_heap_size: 1000,
78            },
79            parallel: ParallelConfig {
80                max_concurrent_tasks: 2,
81                async_worker_count: 1,
82                cpu_worker_count: 1,
83                task_timeout: Duration::from_secs(10),
84                enable_work_stealing: false,
85            },
86            token_budget: Some(8000), // Reasonable default for small repos
87            enable_intelligent_selection: true,
88            selection_algorithm: Some("v2_quotas".to_string()),
89            ..Default::default()
90        }
91    }
92
93    /// Create configuration optimized for large repositories
94    pub fn large_repository() -> Self {
95        Self {
96            streaming: StreamingConfig {
97                enable_streaming: true,
98                concurrency_limit: 8,
99                memory_limit: 500 * 1024 * 1024, // 500MB
100                selection_heap_size: 10000,
101            },
102            parallel: ParallelConfig {
103                max_concurrent_tasks: 16,
104                async_worker_count: 8,
105                cpu_worker_count: 8,
106                task_timeout: Duration::from_secs(60),
107                enable_work_stealing: true,
108            },
109            token_budget: Some(15000), // Larger budget for complex repositories
110            enable_intelligent_selection: true,
111            selection_algorithm: Some("v5_integrated".to_string()),
112            ..Default::default()
113        }
114    }
115
116    /// Create configuration with specific token budget
117    pub fn with_token_budget(token_budget: usize) -> Self {
118        Self {
119            token_budget: Some(token_budget),
120            enable_intelligent_selection: true,
121            selection_algorithm: Some(
122                match token_budget {
123                    0..=2000 => "v2_quotas",
124                    2001..=15000 => "v5_integrated",
125                    _ => "v5_integrated",
126                }
127                .to_string(),
128            ),
129            ..Self::default()
130        }
131    }
132}
133
134/// Repository processing results
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ProcessingResult {
137    /// List of processed files with metadata
138    pub files: Vec<FileMetadata>,
139
140    /// Total number of files processed
141    pub total_files: usize,
142
143    /// Processing time
144    pub processing_time: Duration,
145
146    /// Peak memory usage in bytes
147    pub memory_peak: usize,
148
149    /// Cache hit count
150    pub cache_hits: u64,
151
152    /// Cache miss count
153    pub cache_misses: u64,
154
155    /// Additional metrics
156    pub metrics: ScalingMetrics,
157}
158
159/// Main scaling engine
160pub struct ScalingEngine {
161    config: ScalingConfig,
162    started_at: Option<Instant>,
163    cache: Option<ProcessingCache>,
164}
165
166impl ScalingEngine {
167    /// Create a new scaling engine with the given configuration
168    pub async fn new(config: ScalingConfig) -> ScalingResult<Self> {
169        info!(
170            "Initializing scaling engine with configuration: {:?}",
171            config
172        );
173
174        let cache = if config.caching.memory_cache_size > 0 {
175            Some(ProcessingCache::new(config.caching.clone()))
176        } else {
177            None
178        };
179
180        Ok(Self {
181            config,
182            started_at: None,
183            cache,
184        })
185    }
186
187    /// Create a scaling engine with default configuration
188    pub async fn with_defaults() -> ScalingResult<Self> {
189        Self::new(ScalingConfig::default()).await
190    }
191
192    /// Create a scaling engine with the given configuration
193    pub fn with_config(config: ScalingConfig) -> Self {
194        let cache = if config.caching.memory_cache_size > 0 {
195            Some(ProcessingCache::new(config.caching.clone()))
196        } else {
197            None
198        };
199
200        Self {
201            config,
202            started_at: None,
203            cache,
204        }
205    }
206
207    /// Process a repository with scaling optimizations
208    pub async fn process_repository(&mut self, path: &Path) -> ScalingResult<ProcessingResult> {
209        let start_time = Instant::now();
210        self.started_at = Some(start_time);
211
212        info!("Processing repository: {:?}", path);
213
214        if !path.exists() {
215            return Err(ScalingError::path("Repository path does not exist", path));
216        }
217
218        if !path.is_dir() {
219            return Err(ScalingError::path(
220                "Repository path is not a directory",
221                path,
222            ));
223        }
224
225        let config_hash = compute_config_hash(&self.config);
226        let mut repo_hash_for_cache = None;
227
228        if let Some(cache) = self.cache.as_mut() {
229            match compute_repository_hash(path) {
230                Ok(repo_hash) => {
231                    if let Some(mut cached) = cache.get(repo_hash, &config_hash) {
232                        if std::env::var("SCRIBE_DEBUG").is_ok() {
233                            eprintln!("✅ Using cached scaling result for {}", path.display());
234                        }
235
236                        cached.processing_time = Duration::from_millis(0);
237                        cached.cache_hits = cached.total_files as u64;
238                        cached.cache_misses = 0;
239                        cached.metrics.cache_hits = cached.cache_hits;
240                        cached.metrics.cache_misses = cached.cache_misses;
241
242                        return Ok(cached);
243                    }
244
245                    repo_hash_for_cache = Some(repo_hash);
246                }
247                Err(err) => {
248                    if std::env::var("SCRIBE_DEBUG").is_ok() {
249                        eprintln!("⚠️  Failed to compute repository hash: {}", err);
250                    }
251                }
252            }
253        }
254
255        // Check if intelligent selection is enabled
256        if self.config.enable_intelligent_selection && self.config.token_budget.is_some() {
257            let mut result = self.process_with_intelligent_selection(path).await?;
258
259            result.cache_hits = 0;
260            result.cache_misses = result.total_files as u64;
261            result.metrics.cache_hits = result.cache_hits;
262            result.metrics.cache_misses = result.cache_misses;
263
264            if let (Some(repo_hash), Some(cache)) = (repo_hash_for_cache, self.cache.as_mut()) {
265                cache.insert(repo_hash, &config_hash, result.clone());
266                cache.flush();
267            }
268
269            return Ok(result);
270        }
271
272        // Use optimized streaming file discovery
273        info!("Using optimized streaming file discovery for basic processing");
274
275        // Create a streaming selector even for basic processing to avoid memory issues
276        let streaming_config = crate::streaming::StreamingConfig {
277            enable_streaming: true,
278            concurrency_limit: self.config.parallel.max_concurrent_tasks,
279            memory_limit: self.config.streaming.memory_limit,
280            selection_heap_size: 50000, // Large heap for full discovery
281        };
282
283        let streaming_selector = crate::streaming::StreamingSelector::new(streaming_config);
284
285        // For basic processing, we want all files (within reason), so use very high limits
286        let target_count = 50000; // Reasonable limit to prevent memory explosion
287        let token_budget = 1_000_000; // Very high budget to include most files
288
289        // Simple scoring that doesn't filter much
290        let score_fn = |_file: &FileMetadata| -> f64 { 1.0 };
291        let token_fn = |file: &FileMetadata| -> usize { (file.size / 4) as usize };
292
293        let scored_files = streaming_selector
294            .select_files_streaming(path, target_count, token_budget, score_fn, token_fn)
295            .await?;
296
297        let files: Vec<FileMetadata> = scored_files
298            .into_iter()
299            .map(|scored| scored.metadata)
300            .collect();
301
302        let total_size: u64 = files.iter().map(|f| f.size).sum();
303
304        let processing_time = start_time.elapsed();
305
306        info!("Processed {} files in {:?}", files.len(), processing_time);
307
308        let mut result = ProcessingResult {
309            total_files: files.len(),
310            processing_time,
311            memory_peak: estimate_memory_usage(files.len()),
312            cache_hits: 0,
313            cache_misses: files.len() as u64,
314            metrics: ScalingMetrics {
315                files_processed: files.len() as u64,
316                total_processing_time: processing_time,
317                memory_peak: estimate_memory_usage(files.len()),
318                cache_hits: 0,
319                cache_misses: files.len() as u64,
320                parallel_efficiency: 1.0,
321                streaming_overhead: Duration::from_millis(0),
322            },
323            files,
324        };
325
326        if let (Some(repo_hash), Some(cache)) = (repo_hash_for_cache, self.cache.as_mut()) {
327            cache.insert(repo_hash, &config_hash, result.clone());
328            cache.flush();
329        }
330
331        Ok(result)
332    }
333
334    /// Run performance benchmarks
335    pub async fn benchmark(
336        &mut self,
337        path: &Path,
338        iterations: usize,
339    ) -> ScalingResult<Vec<BenchmarkResult>> {
340        let mut results = Vec::with_capacity(iterations);
341
342        for i in 0..iterations {
343            info!("Running benchmark iteration {}/{}", i + 1, iterations);
344
345            let start = Instant::now();
346            let result = self.process_repository(path).await?;
347            let duration = start.elapsed();
348
349            let benchmark_result = BenchmarkResult::new(
350                format!("iteration_{}", i + 1),
351                duration,
352                result.memory_peak,
353                result.total_files as f64 / duration.as_secs_f64(),
354                1.0, // 100% success rate
355            );
356
357            results.push(benchmark_result);
358        }
359
360        Ok(results)
361    }
362
363    /// Get current configuration
364    pub fn config(&self) -> &ScalingConfig {
365        &self.config
366    }
367
368    /// Process repository with intelligent selection enabled
369    async fn process_with_intelligent_selection(
370        &self,
371        path: &Path,
372    ) -> ScalingResult<ProcessingResult> {
373        info!("Processing repository with intelligent selection enabled");
374
375        // Create ScalingSelector with the configured token budget
376        let token_budget = self.config.token_budget.unwrap_or(8000);
377        let mut selector = crate::selector::ScalingSelector::with_token_budget(token_budget);
378
379        // Execute intelligent selection and processing
380        let selection_result = selector.select_and_process(path).await?;
381
382        info!(
383            "Intelligent selection completed: {} files selected, {:.1}% token utilization",
384            selection_result.selected_files.len(),
385            selection_result.token_utilization * 100.0
386        );
387
388        // Return the processing result from the selector
389        Ok(selection_result.processing_result)
390    }
391
392    /// Check if engine is ready for processing
393    pub fn is_ready(&self) -> bool {
394        true // Always ready in this simple implementation
395    }
396}
397
398/// Simple language detection based on file extension
399fn detect_language(path: &Path) -> String {
400    let extension = path
401        .extension()
402        .and_then(|s| s.to_str())
403        .map(|s| s.to_lowercase());
404
405    if matches!(extension.as_deref(), Some("h" | "hpp" | "hxx")) {
406        return "Header".to_string();
407    }
408
409    if path
410        .file_name()
411        .and_then(|s| s.to_str())
412        .map(|s| s.eq_ignore_ascii_case("dockerfile"))
413        .unwrap_or(false)
414    {
415        return "Dockerfile".to_string();
416    }
417
418    let language = file::detect_language_from_path(path);
419    file::language_display_name(&language).to_string()
420}
421
422/// Simple file type classification
423fn classify_file_type(path: &Path) -> String {
424    let extension = path
425        .extension()
426        .and_then(|s| s.to_str())
427        .map(|s| s.to_lowercase())
428        .unwrap_or_default();
429
430    let language = file::detect_language_from_path(path);
431    let file_type =
432        FileInfo::classify_file_type(path.to_string_lossy().as_ref(), &language, &extension);
433
434    match file_type {
435        FileType::Test { .. } => "Test".to_string(),
436        FileType::Documentation { .. } => "Documentation".to_string(),
437        FileType::Configuration { .. } => "Configuration".to_string(),
438        FileType::Binary => "Binary".to_string(),
439        FileType::Generated => "Generated".to_string(),
440        FileType::Source { .. } => match extension.as_str() {
441            "h" | "hpp" | "hxx" => "Header".to_string(),
442            _ => "Source".to_string(),
443        },
444        FileType::Unknown => match extension.as_str() {
445            "md" | "txt" | "rst" | "adoc" => "Documentation".to_string(),
446            "json" | "yaml" | "yml" | "toml" | "ini" | "cfg" => "Configuration".to_string(),
447            "png" | "jpg" | "jpeg" | "gif" | "svg" => "Image".to_string(),
448            _ => "Other".to_string(),
449        },
450    }
451}
452
453/// Estimate memory usage based on file count
454fn estimate_memory_usage(file_count: usize) -> usize {
455    // Rough estimate: ~1KB per file metadata
456    file_count * 1024
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use std::fs;
463    use tempfile::TempDir;
464
465    #[tokio::test]
466    async fn test_scaling_engine_creation() {
467        let engine = ScalingEngine::with_defaults().await;
468        assert!(engine.is_ok());
469    }
470
471    #[tokio::test]
472    async fn test_repository_processing() {
473        let temp_dir = TempDir::new().unwrap();
474        let repo_path = temp_dir.path();
475
476        // Create test files
477        fs::write(repo_path.join("main.rs"), "fn main() {}").unwrap();
478        fs::write(repo_path.join("lib.rs"), "pub fn test() {}").unwrap();
479
480        let mut engine = ScalingEngine::with_defaults().await.unwrap();
481        let result = engine.process_repository(repo_path).await.unwrap();
482
483        assert!(result.total_files >= 2);
484        assert!(result.processing_time.as_nanos() > 0);
485        assert!(result.memory_peak > 0);
486    }
487
488    #[tokio::test]
489    async fn test_configuration_presets() {
490        let small_config = ScalingConfig::small_repository();
491        assert!(!small_config.streaming.enable_streaming);
492        assert_eq!(small_config.parallel.max_concurrent_tasks, 2);
493
494        let large_config = ScalingConfig::large_repository();
495        assert!(large_config.streaming.enable_streaming);
496        assert!(large_config.parallel.max_concurrent_tasks >= 16);
497    }
498
499    #[tokio::test]
500    async fn test_error_handling() {
501        let mut engine = ScalingEngine::with_defaults().await.unwrap();
502        let non_existent_path = Path::new("/non/existent/path");
503
504        let result = engine.process_repository(non_existent_path).await;
505        assert!(result.is_err());
506    }
507
508    #[tokio::test]
509    async fn test_benchmarking() {
510        let temp_dir = TempDir::new().unwrap();
511        let repo_path = temp_dir.path();
512
513        // Create test file
514        fs::write(repo_path.join("test.rs"), "fn test() {}").unwrap();
515
516        let mut engine = ScalingEngine::with_defaults().await.unwrap();
517        let results = engine.benchmark(repo_path, 3).await.unwrap();
518
519        assert_eq!(results.len(), 3);
520        for result in results {
521            assert!(result.duration.as_nanos() > 0);
522            assert!(result.throughput > 0.0);
523            assert_eq!(result.success_rate, 1.0);
524        }
525    }
526
527    #[test]
528    fn test_language_detection() {
529        assert_eq!(detect_language(Path::new("test.rs")), "Rust");
530        assert_eq!(detect_language(Path::new("test.py")), "Python");
531        assert_eq!(detect_language(Path::new("test.unknown")), "Unknown");
532    }
533
534    #[test]
535    fn test_file_type_classification() {
536        assert_eq!(classify_file_type(Path::new("main.rs")), "Source");
537        assert_eq!(classify_file_type(Path::new("README.md")), "Documentation");
538        assert_eq!(
539            classify_file_type(Path::new("config.json")),
540            "Configuration"
541        );
542    }
543}