infiniloom_engine/embedding/
streaming.rs

1//! Streaming API for large repository processing
2//!
3//! This module provides an iterator-based interface for processing large repositories
4//! without loading all chunks into memory simultaneously. This is essential for:
5//!
6//! - **Large Monorepos**: Repositories with 100K+ files
7//! - **CI/CD Pipelines**: Memory-constrained container environments
8//! - **Real-time Processing**: Stream chunks directly to vector databases
9//!
10//! # Quick Start
11//!
12//! ```rust,ignore
13//! use infiniloom_engine::embedding::streaming::{ChunkStream, StreamConfig};
14//!
15//! let stream = ChunkStream::new(repo_path, settings, limits)?;
16//!
17//! // Process chunks as they're generated
18//! for chunk_result in stream {
19//!     match chunk_result {
20//!         Ok(chunk) => {
21//!             // Send to vector database, write to file, etc.
22//!             upload_to_pinecone(&chunk)?;
23//!         }
24//!         Err(e) if e.is_skippable() => {
25//!             // Non-critical error, continue processing
26//!             eprintln!("Warning: {}", e);
27//!         }
28//!         Err(e) => {
29//!             // Critical error, abort
30//!             return Err(e.into());
31//!         }
32//!     }
33//! }
34//! ```
35//!
36//! # Batch Processing
37//!
38//! For better throughput, process chunks in batches:
39//!
40//! ```rust,ignore
41//! let stream = ChunkStream::new(repo_path, settings, limits)?
42//!     .with_batch_size(100);
43//!
44//! for batch in stream.batches() {
45//!     let chunks: Vec<_> = batch.into_iter().filter_map(|r| r.ok()).collect();
46//!     bulk_upload_to_vector_db(&chunks)?;
47//! }
48//! ```
49//!
50//! # Memory Guarantees
51//!
52//! The streaming API bounds memory usage to approximately:
53//! - `batch_size * avg_chunk_size` for chunk data
54//! - `O(files_in_current_batch)` for file metadata
55//! - `O(symbols_per_file)` for parse state
56//!
57//! For a typical batch_size of 100 and avg_chunk_size of 5KB, memory usage
58//! is bounded to ~500KB for chunk data, regardless of repository size.
59
60use std::collections::VecDeque;
61use std::path::{Path, PathBuf};
62use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
63use std::sync::Arc;
64
65use crate::parser::{parse_file_symbols, Language};
66use crate::security::SecurityScanner;
67use crate::tokenizer::{TokenModel, Tokenizer};
68
69use super::error::EmbedError;
70use super::hasher::hash_content;
71use super::limits::ResourceLimits;
72use super::types::{
73    ChunkContext, ChunkSource, EmbedChunk, EmbedSettings, RepoIdentifier,
74};
75
76/// Configuration for streaming chunk generation
77#[derive(Debug, Clone)]
78pub struct StreamConfig {
79    /// Number of files to process in each batch
80    pub file_batch_size: usize,
81
82    /// Maximum chunks to buffer before yielding
83    pub chunk_buffer_size: usize,
84
85    /// Whether to skip files that cause errors
86    pub skip_on_error: bool,
87
88    /// Maximum errors before aborting
89    pub max_errors: usize,
90
91    /// Enable parallel file processing within batches
92    pub parallel_batches: bool,
93}
94
95impl Default for StreamConfig {
96    fn default() -> Self {
97        Self {
98            file_batch_size: 50,
99            chunk_buffer_size: 200,
100            skip_on_error: true,
101            max_errors: 100,
102            parallel_batches: true,
103        }
104    }
105}
106
107/// Statistics for streaming progress
108#[derive(Debug, Clone, Default)]
109pub struct StreamStats {
110    /// Total files discovered
111    pub total_files: usize,
112
113    /// Files processed so far
114    pub files_processed: usize,
115
116    /// Files skipped due to errors
117    pub files_skipped: usize,
118
119    /// Chunks generated so far
120    pub chunks_generated: usize,
121
122    /// Bytes processed so far
123    pub bytes_processed: u64,
124
125    /// Errors encountered
126    pub error_count: usize,
127}
128
129impl StreamStats {
130    /// Get progress as a percentage (0.0 - 100.0)
131    pub fn progress_percent(&self) -> f64 {
132        if self.total_files == 0 {
133            return 100.0;
134        }
135        (self.files_processed as f64 / self.total_files as f64) * 100.0
136    }
137
138    /// Estimated chunks remaining (based on current rate)
139    pub fn estimated_chunks_remaining(&self) -> usize {
140        if self.files_processed == 0 {
141            return 0;
142        }
143        let rate = self.chunks_generated as f64 / self.files_processed as f64;
144        let remaining_files = self.total_files.saturating_sub(self.files_processed);
145        (remaining_files as f64 * rate) as usize
146    }
147}
148
149/// Streaming chunk iterator for large repositories
150///
151/// This iterator yields chunks one at a time as they are generated,
152/// without loading the entire repository into memory.
153pub struct ChunkStream {
154    /// Queued files to process
155    pending_files: VecDeque<PathBuf>,
156
157    /// Buffer of generated chunks waiting to be yielded
158    chunk_buffer: VecDeque<Result<EmbedChunk, EmbedError>>,
159
160    /// Repository root path
161    repo_root: PathBuf,
162
163    /// Embedding settings
164    settings: EmbedSettings,
165
166    /// Resource limits
167    limits: ResourceLimits,
168
169    /// Stream configuration
170    config: StreamConfig,
171
172    /// Tokenizer instance
173    tokenizer: Tokenizer,
174
175    /// Security scanner (optional)
176    security_scanner: Option<SecurityScanner>,
177
178    /// Repository identifier
179    repo_id: RepoIdentifier,
180
181    /// Statistics
182    stats: StreamStats,
183
184    /// Cancellation flag
185    cancelled: Arc<AtomicBool>,
186
187    /// Error count for early termination
188    error_count: AtomicUsize,
189}
190
191impl ChunkStream {
192    /// Create a new chunk stream for a repository
193    pub fn new(
194        repo_path: impl AsRef<Path>,
195        settings: EmbedSettings,
196        limits: ResourceLimits,
197    ) -> Result<Self, EmbedError> {
198        Self::with_config(repo_path, settings, limits, StreamConfig::default())
199    }
200
201    /// Create with custom stream configuration
202    pub fn with_config(
203        repo_path: impl AsRef<Path>,
204        settings: EmbedSettings,
205        limits: ResourceLimits,
206        config: StreamConfig,
207    ) -> Result<Self, EmbedError> {
208        let repo_root = repo_path.as_ref().canonicalize().map_err(|e| {
209            EmbedError::IoError {
210                path: repo_path.as_ref().to_path_buf(),
211                source: e,
212            }
213        })?;
214
215        if !repo_root.is_dir() {
216            return Err(EmbedError::NotADirectory { path: repo_root });
217        }
218
219        // Security scanner if enabled
220        let security_scanner = if settings.scan_secrets {
221            Some(SecurityScanner::new())
222        } else {
223            None
224        };
225
226        let mut stream = Self {
227            pending_files: VecDeque::new(),
228            chunk_buffer: VecDeque::new(),
229            repo_root,
230            settings,
231            limits,
232            config,
233            tokenizer: Tokenizer::new(),
234            security_scanner,
235            repo_id: RepoIdentifier::default(),
236            stats: StreamStats::default(),
237            cancelled: Arc::new(AtomicBool::new(false)),
238            error_count: AtomicUsize::new(0),
239        };
240
241        // Discover files
242        stream.discover_files()?;
243
244        Ok(stream)
245    }
246
247    /// Set the repository identifier for multi-tenant RAG
248    pub fn with_repo_id(mut self, repo_id: RepoIdentifier) -> Self {
249        self.repo_id = repo_id;
250        self
251    }
252
253    /// Get current streaming statistics
254    pub fn stats(&self) -> &StreamStats {
255        &self.stats
256    }
257
258    /// Get a cancellation handle for this stream
259    pub fn cancellation_handle(&self) -> CancellationHandle {
260        CancellationHandle {
261            cancelled: Arc::clone(&self.cancelled),
262        }
263    }
264
265    /// Check if the stream has been cancelled
266    pub fn is_cancelled(&self) -> bool {
267        self.cancelled.load(Ordering::Relaxed)
268    }
269
270    /// Discover all files in the repository
271    fn discover_files(&mut self) -> Result<(), EmbedError> {
272        use glob::Pattern;
273        use ignore::WalkBuilder;
274
275        // Compile include/exclude patterns
276        let include_patterns: Vec<Pattern> = self
277            .settings
278            .include_patterns
279            .iter()
280            .filter_map(|p| Pattern::new(p).ok())
281            .collect();
282
283        let exclude_patterns: Vec<Pattern> = self
284            .settings
285            .exclude_patterns
286            .iter()
287            .filter_map(|p| Pattern::new(p).ok())
288            .collect();
289
290        let walker = WalkBuilder::new(&self.repo_root)
291            .hidden(false)
292            .git_ignore(true)
293            .git_global(true)
294            .git_exclude(true)
295            .follow_links(false)
296            .build();
297
298        let mut files = Vec::new();
299
300        for entry in walker.flatten() {
301            let path = entry.path();
302
303            if !path.is_file() {
304                continue;
305            }
306
307            // Get relative path for pattern matching
308            let relative = path
309                .strip_prefix(&self.repo_root)
310                .unwrap_or(path)
311                .to_string_lossy();
312
313            // Check include patterns
314            if !include_patterns.is_empty()
315                && !include_patterns.iter().any(|p| p.matches(&relative))
316            {
317                continue;
318            }
319
320            // Check exclude patterns
321            if exclude_patterns.iter().any(|p| p.matches(&relative)) {
322                continue;
323            }
324
325            // Check for supported language
326            let ext = match path.extension().and_then(|e| e.to_str()) {
327                Some(e) => e,
328                None => continue,
329            };
330
331            if Language::from_extension(ext).is_none() {
332                continue;
333            }
334
335            // Skip test files if configured
336            if !self.settings.include_tests && self.is_test_file(path) {
337                continue;
338            }
339
340            files.push(path.to_path_buf());
341        }
342
343        // Sort for determinism
344        files.sort();
345
346        self.stats.total_files = files.len();
347        self.pending_files = files.into();
348
349        // Check file limit
350        if !self.limits.check_file_count(self.stats.total_files) {
351            return Err(EmbedError::TooManyFiles {
352                count: self.stats.total_files,
353                max: self.limits.max_files,
354            });
355        }
356
357        Ok(())
358    }
359
360    /// Check if a file is a test file
361    fn is_test_file(&self, path: &Path) -> bool {
362        let path_str = path.to_string_lossy().to_lowercase();
363
364        path_str.contains("/tests/")
365            || path_str.contains("\\tests\\")
366            || path_str.contains("/test/")
367            || path_str.contains("\\test\\")
368            || path_str.contains("/__tests__/")
369            || path_str.contains("\\__tests__\\")
370    }
371
372    /// Process the next batch of files and fill the chunk buffer
373    fn fill_buffer(&mut self) -> bool {
374        if self.is_cancelled() {
375            return false;
376        }
377
378        // Take a batch of files
379        let batch_size = self.config.file_batch_size.min(self.pending_files.len());
380        if batch_size == 0 {
381            return false;
382        }
383
384        let batch: Vec<_> = (0..batch_size)
385            .filter_map(|_| self.pending_files.pop_front())
386            .collect();
387
388        // Process files
389        for file_path in batch {
390            if self.is_cancelled() {
391                break;
392            }
393
394            match self.process_file(&file_path) {
395                Ok(chunks) => {
396                    self.stats.files_processed += 1;
397                    self.stats.chunks_generated += chunks.len();
398
399                    for chunk in chunks {
400                        self.chunk_buffer.push_back(Ok(chunk));
401                    }
402                }
403                Err(e) => {
404                    self.stats.error_count += 1;
405                    let current_errors = self.error_count.fetch_add(1, Ordering::Relaxed) + 1;
406
407                    if e.is_skippable() && self.config.skip_on_error {
408                        self.stats.files_skipped += 1;
409                        // Optionally emit the error for logging
410                        if !e.is_critical() {
411                            self.chunk_buffer.push_back(Err(e));
412                        }
413                    } else if current_errors >= self.config.max_errors {
414                        // Too many errors, emit and stop
415                        self.chunk_buffer.push_back(Err(EmbedError::TooManyErrors {
416                            count: current_errors,
417                            max: self.config.max_errors,
418                        }));
419                        self.cancelled.store(true, Ordering::Relaxed);
420                        break;
421                    } else if e.is_critical() {
422                        self.chunk_buffer.push_back(Err(e));
423                        break;
424                    }
425                }
426            }
427        }
428
429        !self.chunk_buffer.is_empty() || !self.pending_files.is_empty()
430    }
431
432    /// Process a single file and return its chunks
433    fn process_file(&mut self, path: &Path) -> Result<Vec<EmbedChunk>, EmbedError> {
434        // Validate file size
435        let metadata = std::fs::metadata(path).map_err(|e| EmbedError::IoError {
436            path: path.to_path_buf(),
437            source: e,
438        })?;
439
440        if !self.limits.check_file_size(metadata.len()) {
441            return Err(EmbedError::FileTooLarge {
442                path: path.to_path_buf(),
443                size: metadata.len(),
444                max: self.limits.max_file_size,
445            });
446        }
447
448        // Read file
449        let mut content = std::fs::read_to_string(path).map_err(|e| EmbedError::IoError {
450            path: path.to_path_buf(),
451            source: e,
452        })?;
453
454        self.stats.bytes_processed += content.len() as u64;
455
456        // Check for long lines (minified files)
457        if let Some(max_line_len) = content.lines().map(|l| l.len()).max() {
458            if !self.limits.check_line_length(max_line_len) {
459                return Err(EmbedError::LineTooLong {
460                    path: path.to_path_buf(),
461                    length: max_line_len,
462                    max: self.limits.max_line_length,
463                });
464            }
465        }
466
467        // Security scanning
468        let relative_path = self.safe_relative_path(path)?;
469
470        if let Some(ref scanner) = self.security_scanner {
471            let findings = scanner.scan(&content, &relative_path);
472            if !findings.is_empty() {
473                if self.settings.fail_on_secrets {
474                    let files = findings
475                        .iter()
476                        .map(|f| format!("  {}:{} - {}", f.file, f.line, f.kind.name()))
477                        .collect::<Vec<_>>()
478                        .join("\n");
479                    return Err(EmbedError::SecretsDetected {
480                        count: findings.len(),
481                        files,
482                    });
483                }
484
485                if self.settings.redact_secrets {
486                    content = scanner.redact_content(&content, &relative_path);
487                }
488            }
489        }
490
491        // Parse symbols
492        let language = self.detect_language(path);
493        let mut symbols = parse_file_symbols(&content, path);
494        symbols.sort_by(|a, b| {
495            a.start_line
496                .cmp(&b.start_line)
497                .then_with(|| a.end_line.cmp(&b.end_line))
498                .then_with(|| a.name.cmp(&b.name))
499        });
500
501        let lines: Vec<&str> = content.lines().collect();
502        let mut chunks = Vec::with_capacity(symbols.len());
503
504        for symbol in &symbols {
505            // Skip imports if configured
506            if !self.settings.include_imports
507                && matches!(symbol.kind, crate::types::SymbolKind::Import)
508            {
509                continue;
510            }
511
512            // Extract content with context
513            let start_line = symbol.start_line.saturating_sub(1) as usize;
514            let end_line = (symbol.end_line as usize).min(lines.len());
515            let context_start =
516                start_line.saturating_sub(self.settings.context_lines as usize);
517            let context_end =
518                (end_line + self.settings.context_lines as usize).min(lines.len());
519
520            let chunk_content = lines[context_start..context_end].join("\n");
521
522            // Count tokens
523            let token_model = TokenModel::from_model_name(&self.settings.token_model)
524                .unwrap_or(TokenModel::Claude);
525            let tokens = self.tokenizer.count(&chunk_content, token_model);
526
527            // Generate hash
528            let hash = hash_content(&chunk_content);
529
530            // Build FQN
531            let fqn = self.compute_fqn(&relative_path, symbol);
532
533            chunks.push(EmbedChunk {
534                id: hash.short_id,
535                full_hash: hash.full_hash,
536                content: chunk_content,
537                tokens,
538                kind: symbol.kind.into(),
539                source: ChunkSource {
540                    repo: self.repo_id.clone(),
541                    file: relative_path.clone(),
542                    lines: ((context_start + 1) as u32, context_end as u32),
543                    symbol: symbol.name.clone(),
544                    fqn: Some(fqn),
545                    language: language.clone(),
546                    parent: symbol.parent.clone(),
547                    visibility: symbol.visibility.into(),
548                    is_test: self.is_test_code(path, symbol),
549                },
550                context: ChunkContext {
551                    docstring: symbol.docstring.clone(),
552                    comments: Vec::new(),
553                    signature: symbol.signature.clone(),
554                    calls: symbol.calls.clone(),
555                    called_by: Vec::new(),
556                    imports: Vec::new(),
557                    tags: Vec::new(),
558                    lines_of_code: 0,
559                    max_nesting_depth: 0,
560                },
561                part: None,
562            });
563        }
564
565        Ok(chunks)
566    }
567
568    /// Get safe relative path
569    fn safe_relative_path(&self, path: &Path) -> Result<String, EmbedError> {
570        let canonical =
571            path.canonicalize()
572                .map_err(|e| EmbedError::IoError {
573                    path: path.to_path_buf(),
574                    source: e,
575                })?;
576
577        if !canonical.starts_with(&self.repo_root) {
578            return Err(EmbedError::PathTraversal {
579                path: canonical,
580                repo_root: self.repo_root.clone(),
581            });
582        }
583
584        Ok(canonical
585            .strip_prefix(&self.repo_root)
586            .unwrap_or(&canonical)
587            .to_string_lossy()
588            .replace('\\', "/"))
589    }
590
591    /// Detect language from file path
592    fn detect_language(&self, path: &Path) -> String {
593        path.extension()
594            .and_then(|e| e.to_str())
595            .and_then(Language::from_extension)
596            .map(|l| l.display_name().to_string())
597            .unwrap_or_else(|| "unknown".to_string())
598    }
599
600    /// Compute fully qualified name
601    fn compute_fqn(&self, file: &str, symbol: &crate::types::Symbol) -> String {
602        let module_path = file
603            .strip_suffix(".rs")
604            .or_else(|| file.strip_suffix(".py"))
605            .or_else(|| file.strip_suffix(".ts"))
606            .or_else(|| file.strip_suffix(".tsx"))
607            .or_else(|| file.strip_suffix(".js"))
608            .or_else(|| file.strip_suffix(".jsx"))
609            .or_else(|| file.strip_suffix(".go"))
610            .unwrap_or(file)
611            .replace('\\', "::")
612            .replace('/', "::");
613
614        if let Some(ref parent) = symbol.parent {
615            format!("{}::{}::{}", module_path, parent, symbol.name)
616        } else {
617            format!("{}::{}", module_path, symbol.name)
618        }
619    }
620
621    /// Check if code is test code
622    fn is_test_code(&self, path: &Path, symbol: &crate::types::Symbol) -> bool {
623        let path_str = path.to_string_lossy().to_lowercase();
624        let name = symbol.name.to_lowercase();
625
626        path_str.contains("test")
627            || path_str.contains("spec")
628            || name.starts_with("test_")
629            || name.ends_with("_test")
630    }
631
632    /// Collect all remaining chunks into a vector (for compatibility)
633    ///
634    /// Note: This defeats the purpose of streaming by loading everything into memory.
635    /// Use only when you need to sort or deduplicate the full result set.
636    pub fn collect_all(self) -> Result<Vec<EmbedChunk>, EmbedError> {
637        let mut chunks = Vec::new();
638        let mut last_error = None;
639
640        for result in self {
641            match result {
642                Ok(chunk) => chunks.push(chunk),
643                Err(e) if e.is_skippable() => {
644                    // Non-critical, skip
645                }
646                Err(e) => {
647                    last_error = Some(e);
648                }
649            }
650        }
651
652        if let Some(e) = last_error {
653            if chunks.is_empty() {
654                return Err(e);
655            }
656        }
657
658        // Sort for determinism (matches EmbedChunker behavior)
659        chunks.sort_by(|a, b| {
660            a.source
661                .file
662                .cmp(&b.source.file)
663                .then_with(|| a.source.lines.0.cmp(&b.source.lines.0))
664                .then_with(|| a.source.lines.1.cmp(&b.source.lines.1))
665                .then_with(|| a.source.symbol.cmp(&b.source.symbol))
666                .then_with(|| a.id.cmp(&b.id))
667        });
668
669        Ok(chunks)
670    }
671}
672
673impl Iterator for ChunkStream {
674    type Item = Result<EmbedChunk, EmbedError>;
675
676    fn next(&mut self) -> Option<Self::Item> {
677        // Return buffered chunk if available
678        if let Some(chunk) = self.chunk_buffer.pop_front() {
679            return Some(chunk);
680        }
681
682        // Try to fill buffer
683        if self.fill_buffer() {
684            self.chunk_buffer.pop_front()
685        } else {
686            None
687        }
688    }
689
690    fn size_hint(&self) -> (usize, Option<usize>) {
691        let remaining = self.stats.estimated_chunks_remaining();
692        let buffered = self.chunk_buffer.len();
693        (buffered, Some(buffered + remaining))
694    }
695}
696
697/// Handle for cancelling a chunk stream from another thread
698#[derive(Clone)]
699pub struct CancellationHandle {
700    cancelled: Arc<AtomicBool>,
701}
702
703impl CancellationHandle {
704    /// Cancel the associated stream
705    pub fn cancel(&self) {
706        self.cancelled.store(true, Ordering::Relaxed);
707    }
708
709    /// Check if cancellation has been requested
710    pub fn is_cancelled(&self) -> bool {
711        self.cancelled.load(Ordering::Relaxed)
712    }
713}
714
715/// Extension trait for batch processing
716pub trait BatchIterator: Iterator {
717    /// Process items in batches
718    fn batches(self, batch_size: usize) -> Batches<Self>
719    where
720        Self: Sized,
721    {
722        Batches {
723            iter: self,
724            batch_size,
725        }
726    }
727}
728
729impl<I: Iterator> BatchIterator for I {}
730
731/// Iterator adapter that yields batches
732pub struct Batches<I> {
733    iter: I,
734    batch_size: usize,
735}
736
737impl<I: Iterator> Iterator for Batches<I> {
738    type Item = Vec<I::Item>;
739
740    fn next(&mut self) -> Option<Self::Item> {
741        let mut batch = Vec::with_capacity(self.batch_size);
742
743        for _ in 0..self.batch_size {
744            match self.iter.next() {
745                Some(item) => batch.push(item),
746                None => break,
747            }
748        }
749
750        if batch.is_empty() {
751            None
752        } else {
753            Some(batch)
754        }
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761    use tempfile::TempDir;
762
763    fn create_test_file(dir: &Path, name: &str, content: &str) {
764        let path = dir.join(name);
765        if let Some(parent) = path.parent() {
766            std::fs::create_dir_all(parent).unwrap();
767        }
768        std::fs::write(path, content).unwrap();
769    }
770
771    #[test]
772    fn test_chunk_stream_basic() {
773        let temp_dir = TempDir::new().unwrap();
774        let rust_code = r#"
775/// A test function
776fn hello() {
777    println!("Hello, world!");
778}
779
780fn goodbye() {
781    println!("Goodbye!");
782}
783"#;
784        create_test_file(temp_dir.path(), "test.rs", rust_code);
785
786        let settings = EmbedSettings::default();
787        let limits = ResourceLimits::default();
788
789        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
790        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
791
792        assert!(!chunks.is_empty());
793    }
794
795    #[test]
796    fn test_stream_stats() {
797        let temp_dir = TempDir::new().unwrap();
798        create_test_file(temp_dir.path(), "a.rs", "fn foo() {}");
799        create_test_file(temp_dir.path(), "b.rs", "fn bar() {}");
800        create_test_file(temp_dir.path(), "c.rs", "fn baz() {}");
801
802        let settings = EmbedSettings::default();
803        let limits = ResourceLimits::default();
804
805        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
806
807        assert_eq!(stream.stats().total_files, 3);
808
809        // Consume the stream
810        let _chunks: Vec<_> = stream.collect();
811    }
812
813    #[test]
814    fn test_cancellation() {
815        let temp_dir = TempDir::new().unwrap();
816        for i in 0..10 {
817            create_test_file(
818                temp_dir.path(),
819                &format!("file{}.rs", i),
820                &format!("fn func{}() {{}}", i),
821            );
822        }
823
824        let settings = EmbedSettings::default();
825        let limits = ResourceLimits::default();
826
827        let mut stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
828        let handle = stream.cancellation_handle();
829
830        // Get a few chunks
831        let _ = stream.next();
832        let _ = stream.next();
833
834        // Cancel
835        handle.cancel();
836
837        // Stream should stop
838        assert!(stream.is_cancelled());
839    }
840
841    #[test]
842    fn test_batch_iterator() {
843        let items: Vec<i32> = (0..10).collect();
844        let batches: Vec<Vec<i32>> = items.into_iter().batches(3).collect();
845
846        assert_eq!(batches.len(), 4);
847        assert_eq!(batches[0], vec![0, 1, 2]);
848        assert_eq!(batches[1], vec![3, 4, 5]);
849        assert_eq!(batches[2], vec![6, 7, 8]);
850        assert_eq!(batches[3], vec![9]);
851    }
852
853    #[test]
854    fn test_collect_all_sorts_deterministically() {
855        let temp_dir = TempDir::new().unwrap();
856        create_test_file(temp_dir.path(), "z.rs", "fn z_func() {}");
857        create_test_file(temp_dir.path(), "a.rs", "fn a_func() {}");
858        create_test_file(temp_dir.path(), "m.rs", "fn m_func() {}");
859
860        let settings = EmbedSettings::default();
861        let limits = ResourceLimits::default();
862
863        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
864        let chunks = stream.collect_all().unwrap();
865
866        // Should be sorted by file path
867        assert!(chunks[0].source.file < chunks[1].source.file);
868        assert!(chunks[1].source.file < chunks[2].source.file);
869    }
870
871    #[test]
872    fn test_stream_config() {
873        let config = StreamConfig {
874            file_batch_size: 10,
875            chunk_buffer_size: 50,
876            skip_on_error: true,
877            max_errors: 5,
878            parallel_batches: false,
879        };
880
881        let temp_dir = TempDir::new().unwrap();
882        create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
883
884        let settings = EmbedSettings::default();
885        let limits = ResourceLimits::default();
886
887        let stream =
888            ChunkStream::with_config(temp_dir.path(), settings, limits, config).unwrap();
889        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
890
891        assert!(!chunks.is_empty());
892    }
893
894    #[test]
895    fn test_stream_with_repo_id() {
896        let temp_dir = TempDir::new().unwrap();
897        create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
898
899        let settings = EmbedSettings::default();
900        let limits = ResourceLimits::default();
901        let repo_id = RepoIdentifier::new("github.com/test", "my-repo");
902
903        let stream = ChunkStream::new(temp_dir.path(), settings, limits)
904            .unwrap()
905            .with_repo_id(repo_id);
906
907        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
908
909        assert!(!chunks.is_empty());
910        assert_eq!(chunks[0].source.repo.namespace, "github.com/test");
911        assert_eq!(chunks[0].source.repo.name, "my-repo");
912    }
913}