Skip to main content

infiniloom_engine/embedding/
streaming.rs

1//! Streaming API for large repository processing
2//!
3//! This module provides an iterator-based interface for processing large repositories
4//! without loading all chunks into memory simultaneously. This is essential for:
5//!
6//! - **Large Monorepos**: Repositories with 100K+ files
7//! - **CI/CD Pipelines**: Memory-constrained container environments
8//! - **Real-time Processing**: Stream chunks directly to vector databases
9//!
10//! # Quick Start
11//!
12//! ```rust,ignore
13//! use infiniloom_engine::embedding::streaming::{ChunkStream, StreamConfig};
14//!
15//! let stream = ChunkStream::new(repo_path, settings, limits)?;
16//!
17//! // Process chunks as they're generated
18//! for chunk_result in stream {
19//!     match chunk_result {
20//!         Ok(chunk) => {
21//!             // Send to vector database, write to file, etc.
22//!             upload_to_pinecone(&chunk)?;
23//!         }
24//!         Err(e) if e.is_skippable() => {
25//!             // Non-critical error, continue processing
26//!             eprintln!("Warning: {}", e);
27//!         }
28//!         Err(e) => {
29//!             // Critical error, abort
30//!             return Err(e.into());
31//!         }
32//!     }
33//! }
34//! ```
35//!
36//! # Batch Processing
37//!
38//! For better throughput, process chunks in batches:
39//!
40//! ```rust,ignore
41//! let stream = ChunkStream::new(repo_path, settings, limits)?
42//!     .with_batch_size(100);
43//!
44//! for batch in stream.batches() {
45//!     let chunks: Vec<_> = batch.into_iter().filter_map(|r| r.ok()).collect();
46//!     bulk_upload_to_vector_db(&chunks)?;
47//! }
48//! ```
49//!
50//! # Memory Guarantees
51//!
52//! The streaming API bounds memory usage to approximately:
53//! - `batch_size * avg_chunk_size` for chunk data
54//! - `O(files_in_current_batch)` for file metadata
55//! - `O(symbols_per_file)` for parse state
56//!
57//! For a typical batch_size of 100 and avg_chunk_size of 5KB, memory usage
58//! is bounded to ~500KB for chunk data, regardless of repository size.
59
60use std::collections::VecDeque;
61use std::path::{Path, PathBuf};
62use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
63use std::sync::Arc;
64
65use crate::parser::{parse_file_symbols, Language};
66use crate::security::SecurityScanner;
67use crate::tokenizer::{TokenModel, Tokenizer};
68
69use super::chunker::{generate_summary, generate_tags_for_symbol};
70use super::complexity::compute_complexity;
71use super::error::EmbedError;
72use super::hasher::hash_content;
73use super::identifiers::extract_identifiers;
74use super::limits::ResourceLimits;
75use super::type_extraction::extract_types;
76use super::types::{ChunkContext, ChunkSource, EmbedChunk, EmbedSettings, RepoIdentifier};
77
78/// Configuration for streaming chunk generation
79#[derive(Debug, Clone)]
80pub struct StreamConfig {
81    /// Number of files to process in each batch
82    pub file_batch_size: usize,
83
84    /// Maximum chunks to buffer before yielding
85    pub chunk_buffer_size: usize,
86
87    /// Whether to skip files that cause errors
88    pub skip_on_error: bool,
89
90    /// Maximum errors before aborting
91    pub max_errors: usize,
92
93    /// Enable parallel file processing within batches
94    pub parallel_batches: bool,
95}
96
97impl Default for StreamConfig {
98    fn default() -> Self {
99        Self {
100            file_batch_size: 50,
101            chunk_buffer_size: 200,
102            skip_on_error: true,
103            max_errors: 100,
104            parallel_batches: true,
105        }
106    }
107}
108
109/// Statistics for streaming progress
110#[derive(Debug, Clone, Default)]
111pub struct StreamStats {
112    /// Total files discovered
113    pub total_files: usize,
114
115    /// Files processed so far
116    pub files_processed: usize,
117
118    /// Files skipped due to errors
119    pub files_skipped: usize,
120
121    /// Chunks generated so far
122    pub chunks_generated: usize,
123
124    /// Bytes processed so far
125    pub bytes_processed: u64,
126
127    /// Errors encountered
128    pub error_count: usize,
129}
130
131impl StreamStats {
132    /// Get progress as a percentage (0.0 - 100.0)
133    pub fn progress_percent(&self) -> f64 {
134        if self.total_files == 0 {
135            return 100.0;
136        }
137        (self.files_processed as f64 / self.total_files as f64) * 100.0
138    }
139
140    /// Estimated chunks remaining (based on current rate)
141    pub fn estimated_chunks_remaining(&self) -> usize {
142        if self.files_processed == 0 {
143            return 0;
144        }
145        let rate = self.chunks_generated as f64 / self.files_processed as f64;
146        let remaining_files = self.total_files.saturating_sub(self.files_processed);
147        (remaining_files as f64 * rate) as usize
148    }
149}
150
151/// Streaming chunk iterator for large repositories
152///
153/// This iterator yields chunks one at a time as they are generated,
154/// without loading the entire repository into memory.
155pub struct ChunkStream {
156    /// Queued files to process
157    pending_files: VecDeque<PathBuf>,
158
159    /// Buffer of generated chunks waiting to be yielded
160    chunk_buffer: VecDeque<Result<EmbedChunk, EmbedError>>,
161
162    /// Repository root path
163    repo_root: PathBuf,
164
165    /// Embedding settings
166    settings: EmbedSettings,
167
168    /// Resource limits
169    limits: ResourceLimits,
170
171    /// Stream configuration
172    config: StreamConfig,
173
174    /// Tokenizer instance
175    tokenizer: Tokenizer,
176
177    /// Security scanner (optional)
178    security_scanner: Option<SecurityScanner>,
179
180    /// Repository identifier
181    repo_id: RepoIdentifier,
182
183    /// Statistics
184    stats: StreamStats,
185
186    /// Cancellation flag
187    cancelled: Arc<AtomicBool>,
188
189    /// Error count for early termination
190    error_count: AtomicUsize,
191}
192
193impl ChunkStream {
194    /// Create a new chunk stream for a repository
195    pub fn new(
196        repo_path: impl AsRef<Path>,
197        settings: EmbedSettings,
198        limits: ResourceLimits,
199    ) -> Result<Self, EmbedError> {
200        Self::with_config(repo_path, settings, limits, StreamConfig::default())
201    }
202
203    /// Create with custom stream configuration
204    pub fn with_config(
205        repo_path: impl AsRef<Path>,
206        settings: EmbedSettings,
207        limits: ResourceLimits,
208        config: StreamConfig,
209    ) -> Result<Self, EmbedError> {
210        let repo_root = repo_path
211            .as_ref()
212            .canonicalize()
213            .map_err(|e| EmbedError::IoError {
214                path: repo_path.as_ref().to_path_buf(),
215                source: e,
216            })?;
217
218        if !repo_root.is_dir() {
219            return Err(EmbedError::NotADirectory { path: repo_root });
220        }
221
222        // Security scanner if enabled
223        let security_scanner = if settings.scan_secrets {
224            Some(SecurityScanner::new())
225        } else {
226            None
227        };
228
229        let mut stream = Self {
230            pending_files: VecDeque::new(),
231            chunk_buffer: VecDeque::new(),
232            repo_root,
233            settings,
234            limits,
235            config,
236            tokenizer: Tokenizer::new(),
237            security_scanner,
238            repo_id: RepoIdentifier::default(),
239            stats: StreamStats::default(),
240            cancelled: Arc::new(AtomicBool::new(false)),
241            error_count: AtomicUsize::new(0),
242        };
243
244        // Discover files
245        stream.discover_files()?;
246
247        Ok(stream)
248    }
249
250    /// Set the repository identifier for multi-tenant RAG
251    pub fn with_repo_id(mut self, repo_id: RepoIdentifier) -> Self {
252        self.repo_id = repo_id;
253        self
254    }
255
256    /// Get current streaming statistics
257    pub fn stats(&self) -> &StreamStats {
258        &self.stats
259    }
260
261    /// Get a cancellation handle for this stream
262    pub fn cancellation_handle(&self) -> CancellationHandle {
263        CancellationHandle { cancelled: Arc::clone(&self.cancelled) }
264    }
265
266    /// Check if the stream has been cancelled
267    pub fn is_cancelled(&self) -> bool {
268        self.cancelled.load(Ordering::Relaxed)
269    }
270
271    /// Discover all files in the repository
272    fn discover_files(&mut self) -> Result<(), EmbedError> {
273        use glob::Pattern;
274        use ignore::WalkBuilder;
275
276        // Compile include/exclude patterns
277        let include_patterns: Vec<Pattern> = self
278            .settings
279            .include_patterns
280            .iter()
281            .filter_map(|p| Pattern::new(p).ok())
282            .collect();
283
284        let exclude_patterns: Vec<Pattern> = self
285            .settings
286            .exclude_patterns
287            .iter()
288            .filter_map(|p| Pattern::new(p).ok())
289            .collect();
290
291        let walker = WalkBuilder::new(&self.repo_root)
292            .hidden(false)
293            .git_ignore(true)
294            .git_global(true)
295            .git_exclude(true)
296            .follow_links(false)
297            .build();
298
299        let mut files = Vec::new();
300
301        for entry in walker.flatten() {
302            let path = entry.path();
303
304            if !path.is_file() {
305                continue;
306            }
307
308            // Get relative path for pattern matching
309            let relative = path
310                .strip_prefix(&self.repo_root)
311                .unwrap_or(path)
312                .to_string_lossy();
313
314            // Check include patterns
315            if !include_patterns.is_empty()
316                && !include_patterns.iter().any(|p| p.matches(&relative))
317            {
318                continue;
319            }
320
321            // Check exclude patterns
322            if exclude_patterns.iter().any(|p| p.matches(&relative)) {
323                continue;
324            }
325
326            // Check for supported language
327            let ext = match path.extension().and_then(|e| e.to_str()) {
328                Some(e) => e,
329                None => continue,
330            };
331
332            if Language::from_extension(ext).is_none() {
333                continue;
334            }
335
336            // Skip test files if configured
337            if !self.settings.include_tests && self.is_test_file(path) {
338                continue;
339            }
340
341            files.push(path.to_path_buf());
342        }
343
344        // Sort for determinism
345        files.sort();
346
347        self.stats.total_files = files.len();
348        self.pending_files = files.into();
349
350        // Check file limit
351        if !self.limits.check_file_count(self.stats.total_files) {
352            return Err(EmbedError::TooManyFiles {
353                count: self.stats.total_files,
354                max: self.limits.max_files,
355            });
356        }
357
358        Ok(())
359    }
360
361    /// Check if a file is a test file
362    fn is_test_file(&self, path: &Path) -> bool {
363        let path_str = path.to_string_lossy().to_lowercase();
364
365        path_str.contains("/tests/")
366            || path_str.contains("\\tests\\")
367            || path_str.contains("/test/")
368            || path_str.contains("\\test\\")
369            || path_str.contains("/__tests__/")
370            || path_str.contains("\\__tests__\\")
371    }
372
373    /// Process the next batch of files and fill the chunk buffer
374    fn fill_buffer(&mut self) -> bool {
375        if self.is_cancelled() {
376            return false;
377        }
378
379        // Take a batch of files
380        let batch_size = self.config.file_batch_size.min(self.pending_files.len());
381        if batch_size == 0 {
382            return false;
383        }
384
385        let batch: Vec<_> = (0..batch_size)
386            .filter_map(|_| self.pending_files.pop_front())
387            .collect();
388
389        // Process files
390        for file_path in batch {
391            if self.is_cancelled() {
392                break;
393            }
394
395            match self.process_file(&file_path) {
396                Ok(chunks) => {
397                    self.stats.files_processed += 1;
398                    self.stats.chunks_generated += chunks.len();
399
400                    for chunk in chunks {
401                        self.chunk_buffer.push_back(Ok(chunk));
402                    }
403                },
404                Err(e) => {
405                    self.stats.error_count += 1;
406                    let current_errors = self.error_count.fetch_add(1, Ordering::Relaxed) + 1;
407
408                    if e.is_skippable() && self.config.skip_on_error {
409                        self.stats.files_skipped += 1;
410                        // Optionally emit the error for logging
411                        if !e.is_critical() {
412                            self.chunk_buffer.push_back(Err(e));
413                        }
414                    } else if current_errors >= self.config.max_errors {
415                        // Too many errors, emit and stop
416                        self.chunk_buffer.push_back(Err(EmbedError::TooManyErrors {
417                            count: current_errors,
418                            max: self.config.max_errors,
419                        }));
420                        self.cancelled.store(true, Ordering::Relaxed);
421                        break;
422                    } else if e.is_critical() {
423                        self.chunk_buffer.push_back(Err(e));
424                        break;
425                    }
426                },
427            }
428        }
429
430        !self.chunk_buffer.is_empty() || !self.pending_files.is_empty()
431    }
432
433    /// Process a single file and return its chunks
434    fn process_file(&mut self, path: &Path) -> Result<Vec<EmbedChunk>, EmbedError> {
435        // Validate file size
436        let metadata = std::fs::metadata(path)
437            .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
438
439        if !self.limits.check_file_size(metadata.len()) {
440            return Err(EmbedError::FileTooLarge {
441                path: path.to_path_buf(),
442                size: metadata.len(),
443                max: self.limits.max_file_size,
444            });
445        }
446
447        // Read file
448        let mut content = std::fs::read_to_string(path)
449            .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
450
451        self.stats.bytes_processed += content.len() as u64;
452
453        // Check for long lines (minified files)
454        if let Some(max_line_len) = content.lines().map(|l| l.len()).max() {
455            if !self.limits.check_line_length(max_line_len) {
456                return Err(EmbedError::LineTooLong {
457                    path: path.to_path_buf(),
458                    length: max_line_len,
459                    max: self.limits.max_line_length,
460                });
461            }
462        }
463
464        // Security scanning
465        let relative_path = self.safe_relative_path(path)?;
466
467        if let Some(ref scanner) = self.security_scanner {
468            let findings = scanner.scan(&content, &relative_path);
469            if !findings.is_empty() {
470                if self.settings.fail_on_secrets {
471                    let files = findings
472                        .iter()
473                        .map(|f| format!("  {}:{} - {}", f.file, f.line, f.kind.name()))
474                        .collect::<Vec<_>>()
475                        .join("\n");
476                    return Err(EmbedError::SecretsDetected { count: findings.len(), files });
477                }
478
479                if self.settings.redact_secrets {
480                    content = scanner.redact_content(&content, &relative_path);
481                }
482            }
483        }
484
485        // Parse symbols
486        let language = self.detect_language(path);
487        let mut symbols = parse_file_symbols(&content, path);
488        symbols.sort_by(|a, b| {
489            a.start_line
490                .cmp(&b.start_line)
491                .then_with(|| a.end_line.cmp(&b.end_line))
492                .then_with(|| a.name.cmp(&b.name))
493        });
494
495        let lines: Vec<&str> = content.lines().collect();
496        let mut chunks = Vec::with_capacity(symbols.len());
497
498        for symbol in &symbols {
499            // Skip imports if configured
500            if !self.settings.include_imports
501                && matches!(symbol.kind, crate::types::SymbolKind::Import)
502            {
503                continue;
504            }
505
506            // Extract content with context
507            let start_line = symbol.start_line.saturating_sub(1) as usize;
508            let end_line = (symbol.end_line as usize).min(lines.len());
509            let context_start = start_line.saturating_sub(self.settings.context_lines as usize);
510            let context_end = (end_line + self.settings.context_lines as usize).min(lines.len());
511
512            let chunk_content = lines[context_start..context_end].join("\n");
513
514            // Count tokens
515            let token_model = TokenModel::from_model_name(&self.settings.token_model)
516                .unwrap_or(TokenModel::Claude);
517            let tokens = self.tokenizer.count(&chunk_content, token_model);
518
519            // Generate hash
520            let hash = hash_content(&chunk_content);
521
522            // Build FQN
523            let fqn = self.compute_fqn(&relative_path, symbol);
524
525            // Extract keywords and context prefix before moving chunk_content
526            let keywords = super::chunker::extract_keywords(&chunk_content);
527            let context_prefix = super::chunker::generate_context_prefix(
528                &relative_path,
529                symbol.parent.as_deref(),
530                &symbol.kind,
531            );
532
533            // Extract enrichments: identifiers, type signatures, complexity, tags
534            let lang_enum =
535                Language::from_extension(path.extension().and_then(|e| e.to_str()).unwrap_or(""));
536            let identifiers = extract_identifiers(&chunk_content, lang_enum);
537            let (type_signature, parameter_types, return_type, error_types) =
538                if let Some(lang) = lang_enum {
539                    match extract_types(&chunk_content, lang) {
540                        Some(ti) => {
541                            (ti.type_signature, ti.parameter_types, ti.return_type, ti.error_types)
542                        },
543                        None => (None, Vec::new(), None, Vec::new()),
544                    }
545                } else {
546                    (None, Vec::new(), None, Vec::new())
547                };
548            let complexity_score = lang_enum.and_then(|l| compute_complexity(&chunk_content, l));
549            let tags = generate_tags_for_symbol(&symbol.name, symbol.signature.as_deref());
550
551            let chunk_kind = symbol.kind.into();
552            let source = ChunkSource {
553                repo: self.repo_id.clone(),
554                file: relative_path.clone(),
555                lines: ((context_start + 1) as u32, context_end as u32),
556                symbol: symbol.name.clone(),
557                fqn: Some(fqn),
558                language: language.clone(),
559                parent: symbol.parent.clone(),
560                visibility: symbol.visibility.into(),
561                is_test: self.is_test_code(path, symbol),
562                module_path: Some(super::chunker::derive_module_path(&relative_path, &language)),
563                parent_chunk_id: None,
564            };
565
566            let mut context = ChunkContext {
567                docstring: symbol.docstring.clone(),
568                comments: Vec::new(),
569                signature: symbol.signature.clone(),
570                calls: symbol.calls.clone(),
571                called_by: Vec::new(),
572                imports: Vec::new(),
573                tags,
574                keywords,
575                context_prefix: Some(context_prefix),
576                summary: None,
577                qualified_calls: Vec::new(),
578                unresolved_calls: Vec::new(),
579                identifiers,
580                type_signature,
581                parameter_types,
582                return_type,
583                error_types,
584                lines_of_code: chunk_content.lines().count() as u32,
585                max_nesting_depth: 0,
586                git: None,
587                complexity_score,
588                dependents_count: None,
589            };
590
591            // Generate summary after source and context are built
592            context.summary = generate_summary(chunk_kind, &source, &context);
593
594            chunks.push(EmbedChunk {
595                id: hash.short_id,
596                full_hash: hash.full_hash,
597                content: chunk_content,
598                tokens,
599                kind: chunk_kind,
600                source,
601                children_ids: Vec::new(),
602                context,
603                repr: "code".to_owned(),
604                code_chunk_id: None,
605                part: None,
606            });
607        }
608
609        Ok(chunks)
610    }
611
612    /// Get safe relative path
613    fn safe_relative_path(&self, path: &Path) -> Result<String, EmbedError> {
614        let canonical = path
615            .canonicalize()
616            .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
617
618        if !canonical.starts_with(&self.repo_root) {
619            return Err(EmbedError::PathTraversal {
620                path: canonical,
621                repo_root: self.repo_root.clone(),
622            });
623        }
624
625        Ok(canonical
626            .strip_prefix(&self.repo_root)
627            .unwrap_or(&canonical)
628            .to_string_lossy()
629            .replace('\\', "/"))
630    }
631
632    /// Detect language from file path
633    fn detect_language(&self, path: &Path) -> String {
634        path.extension()
635            .and_then(|e| e.to_str())
636            .and_then(Language::from_extension)
637            .map_or_else(|| "unknown".to_owned(), |l| l.display_name().to_owned())
638    }
639
640    /// Compute fully qualified name
641    fn compute_fqn(&self, file: &str, symbol: &crate::types::Symbol) -> String {
642        let module_path = file
643            .strip_suffix(".rs")
644            .or_else(|| file.strip_suffix(".py"))
645            .or_else(|| file.strip_suffix(".ts"))
646            .or_else(|| file.strip_suffix(".tsx"))
647            .or_else(|| file.strip_suffix(".js"))
648            .or_else(|| file.strip_suffix(".jsx"))
649            .or_else(|| file.strip_suffix(".go"))
650            .unwrap_or(file)
651            .replace(['\\', '/'], "::"); // Normalize path separators
652
653        // Build the symbol portion
654        let symbol_part = if let Some(ref parent) = symbol.parent {
655            format!("{}::{}::{}", module_path, parent, symbol.name)
656        } else {
657            format!("{}::{}", module_path, symbol.name)
658        };
659
660        // Prepend repo identity: "{namespace}/{name}::{symbol_part}" or "{name}::{symbol_part}"
661        let repo_prefix = self.repo_id.qualified_name();
662        if repo_prefix.is_empty() {
663            symbol_part
664        } else {
665            format!("{}::{}", repo_prefix, symbol_part)
666        }
667    }
668
669    /// Check if code is test code
670    fn is_test_code(&self, path: &Path, symbol: &crate::types::Symbol) -> bool {
671        let path_str = path.to_string_lossy().to_lowercase();
672        let name = symbol.name.to_lowercase();
673
674        path_str.contains("test")
675            || path_str.contains("spec")
676            || name.starts_with("test_")
677            || name.ends_with("_test")
678    }
679
680    /// Collect all remaining chunks into a vector (for compatibility)
681    ///
682    /// Note: This defeats the purpose of streaming by loading everything into memory.
683    /// Use only when you need to sort or deduplicate the full result set.
684    pub fn collect_all(self) -> Result<Vec<EmbedChunk>, EmbedError> {
685        let mut chunks = Vec::new();
686        let mut last_error = None;
687
688        for result in self {
689            match result {
690                Ok(chunk) => chunks.push(chunk),
691                Err(e) if e.is_skippable() => {
692                    // Non-critical, skip
693                },
694                Err(e) => {
695                    last_error = Some(e);
696                },
697            }
698        }
699
700        if let Some(e) = last_error {
701            if chunks.is_empty() {
702                return Err(e);
703            }
704        }
705
706        // Sort for determinism (matches EmbedChunker behavior)
707        chunks.sort_by(|a, b| {
708            a.source
709                .file
710                .cmp(&b.source.file)
711                .then_with(|| a.source.lines.0.cmp(&b.source.lines.0))
712                .then_with(|| a.source.lines.1.cmp(&b.source.lines.1))
713                .then_with(|| a.source.symbol.cmp(&b.source.symbol))
714                .then_with(|| a.id.cmp(&b.id))
715        });
716
717        Ok(chunks)
718    }
719}
720
721impl Iterator for ChunkStream {
722    type Item = Result<EmbedChunk, EmbedError>;
723
724    fn next(&mut self) -> Option<Self::Item> {
725        // Return buffered chunk if available
726        if let Some(chunk) = self.chunk_buffer.pop_front() {
727            return Some(chunk);
728        }
729
730        // Try to fill buffer
731        if self.fill_buffer() {
732            self.chunk_buffer.pop_front()
733        } else {
734            None
735        }
736    }
737
738    fn size_hint(&self) -> (usize, Option<usize>) {
739        let remaining = self.stats.estimated_chunks_remaining();
740        let buffered = self.chunk_buffer.len();
741        (buffered, Some(buffered + remaining))
742    }
743}
744
745/// Handle for cancelling a chunk stream from another thread
746#[derive(Clone)]
747pub struct CancellationHandle {
748    cancelled: Arc<AtomicBool>,
749}
750
751impl CancellationHandle {
752    /// Cancel the associated stream
753    pub fn cancel(&self) {
754        self.cancelled.store(true, Ordering::Relaxed);
755    }
756
757    /// Check if cancellation has been requested
758    pub fn is_cancelled(&self) -> bool {
759        self.cancelled.load(Ordering::Relaxed)
760    }
761}
762
763/// Extension trait for batch processing
764pub trait BatchIterator: Iterator {
765    /// Process items in batches
766    fn batches(self, batch_size: usize) -> Batches<Self>
767    where
768        Self: Sized,
769    {
770        Batches { iter: self, batch_size }
771    }
772}
773
774impl<I: Iterator> BatchIterator for I {}
775
776/// Iterator adapter that yields batches
777pub struct Batches<I> {
778    iter: I,
779    batch_size: usize,
780}
781
782impl<I: Iterator> Iterator for Batches<I> {
783    type Item = Vec<I::Item>;
784
785    fn next(&mut self) -> Option<Self::Item> {
786        let mut batch = Vec::with_capacity(self.batch_size);
787
788        for _ in 0..self.batch_size {
789            match self.iter.next() {
790                Some(item) => batch.push(item),
791                None => break,
792            }
793        }
794
795        if batch.is_empty() {
796            None
797        } else {
798            Some(batch)
799        }
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806    use tempfile::TempDir;
807
808    fn create_test_file(dir: &Path, name: &str, content: &str) {
809        let path = dir.join(name);
810        if let Some(parent) = path.parent() {
811            std::fs::create_dir_all(parent).unwrap();
812        }
813        std::fs::write(path, content).unwrap();
814    }
815
816    #[test]
817    fn test_chunk_stream_basic() {
818        let temp_dir = TempDir::new().unwrap();
819        let rust_code = r#"
820/// A test function
821fn hello() {
822    println!("Hello, world!");
823}
824
825fn goodbye() {
826    println!("Goodbye!");
827}
828"#;
829        create_test_file(temp_dir.path(), "test.rs", rust_code);
830
831        let settings = EmbedSettings::default();
832        let limits = ResourceLimits::default();
833
834        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
835        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
836
837        assert!(!chunks.is_empty());
838    }
839
840    #[test]
841    fn test_stream_stats() {
842        let temp_dir = TempDir::new().unwrap();
843        create_test_file(temp_dir.path(), "a.rs", "fn foo() {}");
844        create_test_file(temp_dir.path(), "b.rs", "fn bar() {}");
845        create_test_file(temp_dir.path(), "c.rs", "fn baz() {}");
846
847        let settings = EmbedSettings::default();
848        let limits = ResourceLimits::default();
849
850        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
851
852        assert_eq!(stream.stats().total_files, 3);
853
854        // Consume the stream
855        let _chunks: Vec<_> = stream.collect();
856    }
857
858    #[test]
859    fn test_cancellation() {
860        let temp_dir = TempDir::new().unwrap();
861        for i in 0..10 {
862            create_test_file(
863                temp_dir.path(),
864                &format!("file{}.rs", i),
865                &format!("fn func{}() {{}}", i),
866            );
867        }
868
869        let settings = EmbedSettings::default();
870        let limits = ResourceLimits::default();
871
872        let mut stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
873        let handle = stream.cancellation_handle();
874
875        // Get a few chunks
876        let _ = stream.next();
877        let _ = stream.next();
878
879        // Cancel
880        handle.cancel();
881
882        // Stream should stop
883        assert!(stream.is_cancelled());
884    }
885
886    #[test]
887    fn test_batch_iterator() {
888        let items: Vec<i32> = (0..10).collect();
889        let batches: Vec<Vec<i32>> = items.into_iter().batches(3).collect();
890
891        assert_eq!(batches.len(), 4);
892        assert_eq!(batches[0], vec![0, 1, 2]);
893        assert_eq!(batches[1], vec![3, 4, 5]);
894        assert_eq!(batches[2], vec![6, 7, 8]);
895        assert_eq!(batches[3], vec![9]);
896    }
897
898    #[test]
899    fn test_collect_all_sorts_deterministically() {
900        let temp_dir = TempDir::new().unwrap();
901        create_test_file(temp_dir.path(), "z.rs", "fn z_func() {}");
902        create_test_file(temp_dir.path(), "a.rs", "fn a_func() {}");
903        create_test_file(temp_dir.path(), "m.rs", "fn m_func() {}");
904
905        let settings = EmbedSettings::default();
906        let limits = ResourceLimits::default();
907
908        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
909        let chunks = stream.collect_all().unwrap();
910
911        // Should be sorted by file path
912        assert!(chunks[0].source.file < chunks[1].source.file);
913        assert!(chunks[1].source.file < chunks[2].source.file);
914    }
915
916    #[test]
917    fn test_stream_config() {
918        let config = StreamConfig {
919            file_batch_size: 10,
920            chunk_buffer_size: 50,
921            skip_on_error: true,
922            max_errors: 5,
923            parallel_batches: false,
924        };
925
926        let temp_dir = TempDir::new().unwrap();
927        create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
928
929        let settings = EmbedSettings::default();
930        let limits = ResourceLimits::default();
931
932        let stream = ChunkStream::with_config(temp_dir.path(), settings, limits, config).unwrap();
933        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
934
935        assert!(!chunks.is_empty());
936    }
937
938    #[test]
939    fn test_stream_with_repo_id() {
940        let temp_dir = TempDir::new().unwrap();
941        create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
942
943        let settings = EmbedSettings::default();
944        let limits = ResourceLimits::default();
945        let repo_id = RepoIdentifier::new("github.com/test", "my-repo");
946
947        let stream = ChunkStream::new(temp_dir.path(), settings, limits)
948            .unwrap()
949            .with_repo_id(repo_id);
950
951        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
952
953        assert!(!chunks.is_empty());
954        assert_eq!(chunks[0].source.repo.namespace.as_deref(), Some("github.com/test"));
955        assert_eq!(chunks[0].source.repo.name, "my-repo");
956    }
957
958    // ---------------------------------------------------------------
959    // Behavioral tests for keyword and context_prefix population
960    // (Issue #100: ChunkStream was producing empty keywords/context_prefix)
961    // ---------------------------------------------------------------
962
963    #[test]
964    fn test_extract_keywords_returns_domain_terms_not_language_keywords() {
965        let rust_code = r#"
966fn calculate_checksum(buffer: &[u8]) -> u64 {
967    let mut digest = 0u64;
968    for byte in buffer {
969        digest = digest.wrapping_mul(31).wrapping_add(*byte as u64);
970    }
971    digest
972}
973"#;
974        let keywords = super::super::chunker::extract_keywords(rust_code);
975
976        // Should contain domain-specific identifiers split from camelCase/snake_case
977        assert!(
978            keywords.contains(&"calculate".to_string()),
979            "Expected 'calculate' in keywords, got: {:?}",
980            keywords
981        );
982        assert!(
983            keywords.contains(&"checksum".to_string()),
984            "Expected 'checksum' in keywords, got: {:?}",
985            keywords
986        );
987        assert!(
988            keywords.contains(&"buffer".to_string()),
989            "Expected 'buffer' in keywords, got: {:?}",
990            keywords
991        );
992        assert!(
993            keywords.contains(&"digest".to_string()),
994            "Expected 'digest' in keywords, got: {:?}",
995            keywords
996        );
997
998        // Should NOT contain generic Rust keywords (these are in the stopword list)
999        assert!(!keywords.contains(&"fn".to_string()), "'fn' should be filtered as a stopword");
1000        assert!(!keywords.contains(&"let".to_string()), "'let' should be filtered as a stopword");
1001        assert!(!keywords.contains(&"for".to_string()), "'for' should be filtered as a stopword");
1002        assert!(!keywords.contains(&"mut".to_string()), "'mut' should be filtered as a stopword");
1003    }
1004
1005    #[test]
1006    fn test_extract_keywords_handles_camel_case_and_snake_case() {
1007        let code = r#"
1008fn parse_http_response(rawBytes: &[u8]) -> HttpResponse {
1009    let contentLength = extract_content_length(rawBytes);
1010    HttpResponse::new(contentLength)
1011}
1012"#;
1013        let keywords = super::super::chunker::extract_keywords(code);
1014
1015        // snake_case splits: parse_http_response -> parse, http, response
1016        assert!(
1017            keywords.contains(&"parse".to_string()),
1018            "Expected 'parse' from snake_case split, got: {:?}",
1019            keywords
1020        );
1021        assert!(
1022            keywords.contains(&"http".to_string()),
1023            "Expected 'http' from snake_case split, got: {:?}",
1024            keywords
1025        );
1026        assert!(
1027            keywords.contains(&"response".to_string()),
1028            "Expected 'response' from identifier split, got: {:?}",
1029            keywords
1030        );
1031
1032        // camelCase splits: rawBytes -> raw, Bytes; contentLength -> content, Length
1033        assert!(
1034            keywords.contains(&"content".to_string()),
1035            "Expected 'content' from camelCase split, got: {:?}",
1036            keywords
1037        );
1038        assert!(
1039            keywords.contains(&"length".to_string()),
1040            "Expected 'length' from camelCase split, got: {:?}",
1041            keywords
1042        );
1043    }
1044
1045    #[test]
1046    fn test_extract_keywords_nonempty_for_nontrivial_code() {
1047        // Any function with meaningful identifier names should produce keywords
1048        let code = r#"
1049fn validate_user_credentials(username: &str, password: &str) -> bool {
1050    let stored_hash = fetch_password_hash(username);
1051    verify_hash(password, &stored_hash)
1052}
1053"#;
1054        let keywords = super::super::chunker::extract_keywords(code);
1055        assert!(!keywords.is_empty(), "Non-trivial code should produce at least some keywords");
1056        // Should have several domain terms
1057        assert!(
1058            keywords.len() >= 3,
1059            "Expected at least 3 keywords for code with rich identifiers, got {}: {:?}",
1060            keywords.len(),
1061            keywords
1062        );
1063    }
1064
1065    #[test]
1066    fn test_generate_context_prefix_format_without_parent() {
1067        use crate::types::SymbolKind;
1068
1069        let prefix = super::super::chunker::generate_context_prefix(
1070            "src/auth.rs",
1071            None,
1072            &SymbolKind::Function,
1073        );
1074
1075        assert_eq!(prefix, "From src/auth.rs, function");
1076    }
1077
1078    #[test]
1079    fn test_generate_context_prefix_format_with_parent() {
1080        use crate::types::SymbolKind;
1081
1082        let prefix = super::super::chunker::generate_context_prefix(
1083            "src/models/user.rs",
1084            Some("UserService"),
1085            &SymbolKind::Method,
1086        );
1087
1088        assert_eq!(prefix, "From src/models/user.rs, in UserService, method");
1089    }
1090
1091    #[test]
1092    fn test_generate_context_prefix_various_kinds() {
1093        use crate::types::SymbolKind;
1094
1095        let cases = vec![
1096            (SymbolKind::Class, "class"),
1097            (SymbolKind::Struct, "struct"),
1098            (SymbolKind::Enum, "enum"),
1099            (SymbolKind::Trait, "trait"),
1100            (SymbolKind::Interface, "interface"),
1101            (SymbolKind::Constant, "constant"),
1102            (SymbolKind::Import, "import"),
1103            (SymbolKind::Module, "module"),
1104            (SymbolKind::Macro, "macro"),
1105        ];
1106
1107        for (kind, expected_name) in cases {
1108            let prefix = super::super::chunker::generate_context_prefix("src/lib.rs", None, &kind);
1109            assert_eq!(
1110                prefix,
1111                format!("From src/lib.rs, {expected_name}"),
1112                "Wrong prefix for kind {:?}",
1113                kind
1114            );
1115        }
1116    }
1117
1118    #[test]
1119    fn test_chunk_stream_populates_keywords_and_context_prefix() {
1120        let temp_dir = TempDir::new().unwrap();
1121        let rust_code = r#"
1122/// Validates and normalizes an email address
1123fn validate_email_address(input: &str) -> Option<String> {
1124    let trimmed = input.trim().to_lowercase();
1125    if trimmed.contains('@') && trimmed.contains('.') {
1126        Some(trimmed)
1127    } else {
1128        None
1129    }
1130}
1131"#;
1132        create_test_file(temp_dir.path(), "src/validator.rs", rust_code);
1133
1134        let settings = EmbedSettings::default();
1135        let limits = ResourceLimits::default();
1136
1137        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
1138        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
1139
1140        assert!(!chunks.is_empty(), "Should produce at least one chunk");
1141
1142        for chunk in &chunks {
1143            // Keywords should be populated (not empty)
1144            assert!(
1145                !chunk.context.keywords.is_empty(),
1146                "Chunk '{}' has empty keywords; expected domain terms from the code",
1147                chunk.source.symbol
1148            );
1149
1150            // Context prefix should be populated (not None)
1151            assert!(
1152                chunk.context.context_prefix.is_some(),
1153                "Chunk '{}' has None context_prefix; expected 'From <path>, <kind>'",
1154                chunk.source.symbol
1155            );
1156
1157            let prefix = chunk.context.context_prefix.as_ref().unwrap();
1158
1159            // Prefix should start with "From " and contain the file path
1160            assert!(
1161                prefix.starts_with("From "),
1162                "Context prefix should start with 'From ', got: {}",
1163                prefix
1164            );
1165            assert!(
1166                prefix.contains("validator.rs"),
1167                "Context prefix should reference the source file, got: {}",
1168                prefix
1169            );
1170        }
1171
1172        // Find the validate_email_address chunk specifically
1173        let email_chunk = chunks
1174            .iter()
1175            .find(|c| c.source.symbol == "validate_email_address");
1176        assert!(email_chunk.is_some(), "Should have a chunk for validate_email_address");
1177
1178        let email_chunk = email_chunk.unwrap();
1179
1180        // Verify keywords contain domain-relevant terms
1181        let kw = &email_chunk.context.keywords;
1182        assert!(
1183            kw.contains(&"validate".to_string()) || kw.contains(&"email".to_string()),
1184            "Keywords for validate_email_address should include 'validate' or 'email', got: {:?}",
1185            kw
1186        );
1187
1188        // Verify context prefix format for a top-level function
1189        let prefix = email_chunk.context.context_prefix.as_ref().unwrap();
1190        assert!(
1191            prefix.contains("function"),
1192            "Context prefix for a function should contain 'function', got: {}",
1193            prefix
1194        );
1195    }
1196
1197    #[test]
1198    fn test_chunk_stream_context_prefix_includes_parent_for_methods() {
1199        let temp_dir = TempDir::new().unwrap();
1200        // Use Python since Tree-sitter reliably detects class methods with parent info
1201        let python_code = r#"
1202class DatabaseConnection:
1203    def execute_query(self, sql_statement):
1204        cursor = self.connection.cursor()
1205        cursor.execute(sql_statement)
1206        return cursor.fetchall()
1207"#;
1208        create_test_file(temp_dir.path(), "src/database.py", python_code);
1209
1210        let settings = EmbedSettings::default();
1211        let limits = ResourceLimits::default();
1212
1213        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
1214        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
1215
1216        // Find the method chunk
1217        let method_chunk = chunks.iter().find(|c| c.source.symbol == "execute_query");
1218
1219        if let Some(chunk) = method_chunk {
1220            let prefix = chunk.context.context_prefix.as_ref().unwrap();
1221            // If parent was detected, prefix should include "in <parent>"
1222            if chunk.source.parent.is_some() {
1223                assert!(
1224                    prefix.contains("in "),
1225                    "Method with parent should have 'in <parent>' in prefix, got: {}",
1226                    prefix
1227                );
1228                assert!(
1229                    prefix.contains("DatabaseConnection"),
1230                    "Parent should be 'DatabaseConnection', got: {}",
1231                    prefix
1232                );
1233            }
1234            // Keywords should include domain terms from the method
1235            assert!(!chunk.context.keywords.is_empty(), "Method chunk should have keywords");
1236        }
1237        // If the method chunk was not found (parser limitation), the test still passes
1238        // since the class chunk would have been produced instead
1239    }
1240}