infiniloom_engine/embedding/
streaming.rs

1//! Streaming API for large repository processing
2//!
3//! This module provides an iterator-based interface for processing large repositories
4//! without loading all chunks into memory simultaneously. This is essential for:
5//!
6//! - **Large Monorepos**: Repositories with 100K+ files
7//! - **CI/CD Pipelines**: Memory-constrained container environments
8//! - **Real-time Processing**: Stream chunks directly to vector databases
9//!
10//! # Quick Start
11//!
12//! ```rust,ignore
13//! use infiniloom_engine::embedding::streaming::{ChunkStream, StreamConfig};
14//!
15//! let stream = ChunkStream::new(repo_path, settings, limits)?;
16//!
17//! // Process chunks as they're generated
18//! for chunk_result in stream {
19//!     match chunk_result {
20//!         Ok(chunk) => {
21//!             // Send to vector database, write to file, etc.
22//!             upload_to_pinecone(&chunk)?;
23//!         }
24//!         Err(e) if e.is_skippable() => {
25//!             // Non-critical error, continue processing
26//!             eprintln!("Warning: {}", e);
27//!         }
28//!         Err(e) => {
29//!             // Critical error, abort
30//!             return Err(e.into());
31//!         }
32//!     }
33//! }
34//! ```
35//!
36//! # Batch Processing
37//!
38//! For better throughput, process chunks in batches:
39//!
40//! ```rust,ignore
41//! let stream = ChunkStream::new(repo_path, settings, limits)?
42//!     .with_batch_size(100);
43//!
44//! for batch in stream.batches() {
45//!     let chunks: Vec<_> = batch.into_iter().filter_map(|r| r.ok()).collect();
46//!     bulk_upload_to_vector_db(&chunks)?;
47//! }
48//! ```
49//!
50//! # Memory Guarantees
51//!
52//! The streaming API bounds memory usage to approximately:
53//! - `batch_size * avg_chunk_size` for chunk data
54//! - `O(files_in_current_batch)` for file metadata
55//! - `O(symbols_per_file)` for parse state
56//!
57//! For a typical batch_size of 100 and avg_chunk_size of 5KB, memory usage
58//! is bounded to ~500KB for chunk data, regardless of repository size.
59
60use std::collections::VecDeque;
61use std::path::{Path, PathBuf};
62use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
63use std::sync::Arc;
64
65use crate::parser::{parse_file_symbols, Language};
66use crate::security::SecurityScanner;
67use crate::tokenizer::{TokenModel, Tokenizer};
68
69use super::error::EmbedError;
70use super::hasher::hash_content;
71use super::limits::ResourceLimits;
72use super::types::{ChunkContext, ChunkSource, EmbedChunk, EmbedSettings, RepoIdentifier};
73
74/// Configuration for streaming chunk generation
75#[derive(Debug, Clone)]
76pub struct StreamConfig {
77    /// Number of files to process in each batch
78    pub file_batch_size: usize,
79
80    /// Maximum chunks to buffer before yielding
81    pub chunk_buffer_size: usize,
82
83    /// Whether to skip files that cause errors
84    pub skip_on_error: bool,
85
86    /// Maximum errors before aborting
87    pub max_errors: usize,
88
89    /// Enable parallel file processing within batches
90    pub parallel_batches: bool,
91}
92
93impl Default for StreamConfig {
94    fn default() -> Self {
95        Self {
96            file_batch_size: 50,
97            chunk_buffer_size: 200,
98            skip_on_error: true,
99            max_errors: 100,
100            parallel_batches: true,
101        }
102    }
103}
104
105/// Statistics for streaming progress
106#[derive(Debug, Clone, Default)]
107pub struct StreamStats {
108    /// Total files discovered
109    pub total_files: usize,
110
111    /// Files processed so far
112    pub files_processed: usize,
113
114    /// Files skipped due to errors
115    pub files_skipped: usize,
116
117    /// Chunks generated so far
118    pub chunks_generated: usize,
119
120    /// Bytes processed so far
121    pub bytes_processed: u64,
122
123    /// Errors encountered
124    pub error_count: usize,
125}
126
127impl StreamStats {
128    /// Get progress as a percentage (0.0 - 100.0)
129    pub fn progress_percent(&self) -> f64 {
130        if self.total_files == 0 {
131            return 100.0;
132        }
133        (self.files_processed as f64 / self.total_files as f64) * 100.0
134    }
135
136    /// Estimated chunks remaining (based on current rate)
137    pub fn estimated_chunks_remaining(&self) -> usize {
138        if self.files_processed == 0 {
139            return 0;
140        }
141        let rate = self.chunks_generated as f64 / self.files_processed as f64;
142        let remaining_files = self.total_files.saturating_sub(self.files_processed);
143        (remaining_files as f64 * rate) as usize
144    }
145}
146
147/// Streaming chunk iterator for large repositories
148///
149/// This iterator yields chunks one at a time as they are generated,
150/// without loading the entire repository into memory.
151pub struct ChunkStream {
152    /// Queued files to process
153    pending_files: VecDeque<PathBuf>,
154
155    /// Buffer of generated chunks waiting to be yielded
156    chunk_buffer: VecDeque<Result<EmbedChunk, EmbedError>>,
157
158    /// Repository root path
159    repo_root: PathBuf,
160
161    /// Embedding settings
162    settings: EmbedSettings,
163
164    /// Resource limits
165    limits: ResourceLimits,
166
167    /// Stream configuration
168    config: StreamConfig,
169
170    /// Tokenizer instance
171    tokenizer: Tokenizer,
172
173    /// Security scanner (optional)
174    security_scanner: Option<SecurityScanner>,
175
176    /// Repository identifier
177    repo_id: RepoIdentifier,
178
179    /// Statistics
180    stats: StreamStats,
181
182    /// Cancellation flag
183    cancelled: Arc<AtomicBool>,
184
185    /// Error count for early termination
186    error_count: AtomicUsize,
187}
188
189impl ChunkStream {
190    /// Create a new chunk stream for a repository
191    pub fn new(
192        repo_path: impl AsRef<Path>,
193        settings: EmbedSettings,
194        limits: ResourceLimits,
195    ) -> Result<Self, EmbedError> {
196        Self::with_config(repo_path, settings, limits, StreamConfig::default())
197    }
198
199    /// Create with custom stream configuration
200    pub fn with_config(
201        repo_path: impl AsRef<Path>,
202        settings: EmbedSettings,
203        limits: ResourceLimits,
204        config: StreamConfig,
205    ) -> Result<Self, EmbedError> {
206        let repo_root = repo_path
207            .as_ref()
208            .canonicalize()
209            .map_err(|e| EmbedError::IoError {
210                path: repo_path.as_ref().to_path_buf(),
211                source: e,
212            })?;
213
214        if !repo_root.is_dir() {
215            return Err(EmbedError::NotADirectory { path: repo_root });
216        }
217
218        // Security scanner if enabled
219        let security_scanner = if settings.scan_secrets {
220            Some(SecurityScanner::new())
221        } else {
222            None
223        };
224
225        let mut stream = Self {
226            pending_files: VecDeque::new(),
227            chunk_buffer: VecDeque::new(),
228            repo_root,
229            settings,
230            limits,
231            config,
232            tokenizer: Tokenizer::new(),
233            security_scanner,
234            repo_id: RepoIdentifier::default(),
235            stats: StreamStats::default(),
236            cancelled: Arc::new(AtomicBool::new(false)),
237            error_count: AtomicUsize::new(0),
238        };
239
240        // Discover files
241        stream.discover_files()?;
242
243        Ok(stream)
244    }
245
246    /// Set the repository identifier for multi-tenant RAG
247    pub fn with_repo_id(mut self, repo_id: RepoIdentifier) -> Self {
248        self.repo_id = repo_id;
249        self
250    }
251
252    /// Get current streaming statistics
253    pub fn stats(&self) -> &StreamStats {
254        &self.stats
255    }
256
257    /// Get a cancellation handle for this stream
258    pub fn cancellation_handle(&self) -> CancellationHandle {
259        CancellationHandle { cancelled: Arc::clone(&self.cancelled) }
260    }
261
262    /// Check if the stream has been cancelled
263    pub fn is_cancelled(&self) -> bool {
264        self.cancelled.load(Ordering::Relaxed)
265    }
266
267    /// Discover all files in the repository
268    fn discover_files(&mut self) -> Result<(), EmbedError> {
269        use glob::Pattern;
270        use ignore::WalkBuilder;
271
272        // Compile include/exclude patterns
273        let include_patterns: Vec<Pattern> = self
274            .settings
275            .include_patterns
276            .iter()
277            .filter_map(|p| Pattern::new(p).ok())
278            .collect();
279
280        let exclude_patterns: Vec<Pattern> = self
281            .settings
282            .exclude_patterns
283            .iter()
284            .filter_map(|p| Pattern::new(p).ok())
285            .collect();
286
287        let walker = WalkBuilder::new(&self.repo_root)
288            .hidden(false)
289            .git_ignore(true)
290            .git_global(true)
291            .git_exclude(true)
292            .follow_links(false)
293            .build();
294
295        let mut files = Vec::new();
296
297        for entry in walker.flatten() {
298            let path = entry.path();
299
300            if !path.is_file() {
301                continue;
302            }
303
304            // Get relative path for pattern matching
305            let relative = path
306                .strip_prefix(&self.repo_root)
307                .unwrap_or(path)
308                .to_string_lossy();
309
310            // Check include patterns
311            if !include_patterns.is_empty()
312                && !include_patterns.iter().any(|p| p.matches(&relative))
313            {
314                continue;
315            }
316
317            // Check exclude patterns
318            if exclude_patterns.iter().any(|p| p.matches(&relative)) {
319                continue;
320            }
321
322            // Check for supported language
323            let ext = match path.extension().and_then(|e| e.to_str()) {
324                Some(e) => e,
325                None => continue,
326            };
327
328            if Language::from_extension(ext).is_none() {
329                continue;
330            }
331
332            // Skip test files if configured
333            if !self.settings.include_tests && self.is_test_file(path) {
334                continue;
335            }
336
337            files.push(path.to_path_buf());
338        }
339
340        // Sort for determinism
341        files.sort();
342
343        self.stats.total_files = files.len();
344        self.pending_files = files.into();
345
346        // Check file limit
347        if !self.limits.check_file_count(self.stats.total_files) {
348            return Err(EmbedError::TooManyFiles {
349                count: self.stats.total_files,
350                max: self.limits.max_files,
351            });
352        }
353
354        Ok(())
355    }
356
357    /// Check if a file is a test file
358    fn is_test_file(&self, path: &Path) -> bool {
359        let path_str = path.to_string_lossy().to_lowercase();
360
361        path_str.contains("/tests/")
362            || path_str.contains("\\tests\\")
363            || path_str.contains("/test/")
364            || path_str.contains("\\test\\")
365            || path_str.contains("/__tests__/")
366            || path_str.contains("\\__tests__\\")
367    }
368
369    /// Process the next batch of files and fill the chunk buffer
370    fn fill_buffer(&mut self) -> bool {
371        if self.is_cancelled() {
372            return false;
373        }
374
375        // Take a batch of files
376        let batch_size = self.config.file_batch_size.min(self.pending_files.len());
377        if batch_size == 0 {
378            return false;
379        }
380
381        let batch: Vec<_> = (0..batch_size)
382            .filter_map(|_| self.pending_files.pop_front())
383            .collect();
384
385        // Process files
386        for file_path in batch {
387            if self.is_cancelled() {
388                break;
389            }
390
391            match self.process_file(&file_path) {
392                Ok(chunks) => {
393                    self.stats.files_processed += 1;
394                    self.stats.chunks_generated += chunks.len();
395
396                    for chunk in chunks {
397                        self.chunk_buffer.push_back(Ok(chunk));
398                    }
399                },
400                Err(e) => {
401                    self.stats.error_count += 1;
402                    let current_errors = self.error_count.fetch_add(1, Ordering::Relaxed) + 1;
403
404                    if e.is_skippable() && self.config.skip_on_error {
405                        self.stats.files_skipped += 1;
406                        // Optionally emit the error for logging
407                        if !e.is_critical() {
408                            self.chunk_buffer.push_back(Err(e));
409                        }
410                    } else if current_errors >= self.config.max_errors {
411                        // Too many errors, emit and stop
412                        self.chunk_buffer.push_back(Err(EmbedError::TooManyErrors {
413                            count: current_errors,
414                            max: self.config.max_errors,
415                        }));
416                        self.cancelled.store(true, Ordering::Relaxed);
417                        break;
418                    } else if e.is_critical() {
419                        self.chunk_buffer.push_back(Err(e));
420                        break;
421                    }
422                },
423            }
424        }
425
426        !self.chunk_buffer.is_empty() || !self.pending_files.is_empty()
427    }
428
429    /// Process a single file and return its chunks
430    fn process_file(&mut self, path: &Path) -> Result<Vec<EmbedChunk>, EmbedError> {
431        // Validate file size
432        let metadata = std::fs::metadata(path)
433            .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
434
435        if !self.limits.check_file_size(metadata.len()) {
436            return Err(EmbedError::FileTooLarge {
437                path: path.to_path_buf(),
438                size: metadata.len(),
439                max: self.limits.max_file_size,
440            });
441        }
442
443        // Read file
444        let mut content = std::fs::read_to_string(path)
445            .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
446
447        self.stats.bytes_processed += content.len() as u64;
448
449        // Check for long lines (minified files)
450        if let Some(max_line_len) = content.lines().map(|l| l.len()).max() {
451            if !self.limits.check_line_length(max_line_len) {
452                return Err(EmbedError::LineTooLong {
453                    path: path.to_path_buf(),
454                    length: max_line_len,
455                    max: self.limits.max_line_length,
456                });
457            }
458        }
459
460        // Security scanning
461        let relative_path = self.safe_relative_path(path)?;
462
463        if let Some(ref scanner) = self.security_scanner {
464            let findings = scanner.scan(&content, &relative_path);
465            if !findings.is_empty() {
466                if self.settings.fail_on_secrets {
467                    let files = findings
468                        .iter()
469                        .map(|f| format!("  {}:{} - {}", f.file, f.line, f.kind.name()))
470                        .collect::<Vec<_>>()
471                        .join("\n");
472                    return Err(EmbedError::SecretsDetected { count: findings.len(), files });
473                }
474
475                if self.settings.redact_secrets {
476                    content = scanner.redact_content(&content, &relative_path);
477                }
478            }
479        }
480
481        // Parse symbols
482        let language = self.detect_language(path);
483        let mut symbols = parse_file_symbols(&content, path);
484        symbols.sort_by(|a, b| {
485            a.start_line
486                .cmp(&b.start_line)
487                .then_with(|| a.end_line.cmp(&b.end_line))
488                .then_with(|| a.name.cmp(&b.name))
489        });
490
491        let lines: Vec<&str> = content.lines().collect();
492        let mut chunks = Vec::with_capacity(symbols.len());
493
494        for symbol in &symbols {
495            // Skip imports if configured
496            if !self.settings.include_imports
497                && matches!(symbol.kind, crate::types::SymbolKind::Import)
498            {
499                continue;
500            }
501
502            // Extract content with context
503            let start_line = symbol.start_line.saturating_sub(1) as usize;
504            let end_line = (symbol.end_line as usize).min(lines.len());
505            let context_start = start_line.saturating_sub(self.settings.context_lines as usize);
506            let context_end = (end_line + self.settings.context_lines as usize).min(lines.len());
507
508            let chunk_content = lines[context_start..context_end].join("\n");
509
510            // Count tokens
511            let token_model = TokenModel::from_model_name(&self.settings.token_model)
512                .unwrap_or(TokenModel::Claude);
513            let tokens = self.tokenizer.count(&chunk_content, token_model);
514
515            // Generate hash
516            let hash = hash_content(&chunk_content);
517
518            // Build FQN
519            let fqn = self.compute_fqn(&relative_path, symbol);
520
521            chunks.push(EmbedChunk {
522                id: hash.short_id,
523                full_hash: hash.full_hash,
524                content: chunk_content,
525                tokens,
526                kind: symbol.kind.into(),
527                source: ChunkSource {
528                    repo: self.repo_id.clone(),
529                    file: relative_path.clone(),
530                    lines: ((context_start + 1) as u32, context_end as u32),
531                    symbol: symbol.name.clone(),
532                    fqn: Some(fqn),
533                    language: language.clone(),
534                    parent: symbol.parent.clone(),
535                    visibility: symbol.visibility.into(),
536                    is_test: self.is_test_code(path, symbol),
537                },
538                context: ChunkContext {
539                    docstring: symbol.docstring.clone(),
540                    comments: Vec::new(),
541                    signature: symbol.signature.clone(),
542                    calls: symbol.calls.clone(),
543                    called_by: Vec::new(),
544                    imports: Vec::new(),
545                    tags: Vec::new(),
546                    lines_of_code: 0,
547                    max_nesting_depth: 0,
548                },
549                part: None,
550            });
551        }
552
553        Ok(chunks)
554    }
555
556    /// Get safe relative path
557    fn safe_relative_path(&self, path: &Path) -> Result<String, EmbedError> {
558        let canonical = path
559            .canonicalize()
560            .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
561
562        if !canonical.starts_with(&self.repo_root) {
563            return Err(EmbedError::PathTraversal {
564                path: canonical,
565                repo_root: self.repo_root.clone(),
566            });
567        }
568
569        Ok(canonical
570            .strip_prefix(&self.repo_root)
571            .unwrap_or(&canonical)
572            .to_string_lossy()
573            .replace('\\', "/"))
574    }
575
576    /// Detect language from file path
577    fn detect_language(&self, path: &Path) -> String {
578        path.extension()
579            .and_then(|e| e.to_str())
580            .and_then(Language::from_extension)
581            .map_or_else(|| "unknown".to_owned(), |l| l.display_name().to_owned())
582    }
583
584    /// Compute fully qualified name
585    fn compute_fqn(&self, file: &str, symbol: &crate::types::Symbol) -> String {
586        let module_path = file
587            .strip_suffix(".rs")
588            .or_else(|| file.strip_suffix(".py"))
589            .or_else(|| file.strip_suffix(".ts"))
590            .or_else(|| file.strip_suffix(".tsx"))
591            .or_else(|| file.strip_suffix(".js"))
592            .or_else(|| file.strip_suffix(".jsx"))
593            .or_else(|| file.strip_suffix(".go"))
594            .unwrap_or(file)
595            .replace(['\\', '/'], "::"); // Normalize path separators
596
597        if let Some(ref parent) = symbol.parent {
598            format!("{}::{}::{}", module_path, parent, symbol.name)
599        } else {
600            format!("{}::{}", module_path, symbol.name)
601        }
602    }
603
604    /// Check if code is test code
605    fn is_test_code(&self, path: &Path, symbol: &crate::types::Symbol) -> bool {
606        let path_str = path.to_string_lossy().to_lowercase();
607        let name = symbol.name.to_lowercase();
608
609        path_str.contains("test")
610            || path_str.contains("spec")
611            || name.starts_with("test_")
612            || name.ends_with("_test")
613    }
614
615    /// Collect all remaining chunks into a vector (for compatibility)
616    ///
617    /// Note: This defeats the purpose of streaming by loading everything into memory.
618    /// Use only when you need to sort or deduplicate the full result set.
619    pub fn collect_all(self) -> Result<Vec<EmbedChunk>, EmbedError> {
620        let mut chunks = Vec::new();
621        let mut last_error = None;
622
623        for result in self {
624            match result {
625                Ok(chunk) => chunks.push(chunk),
626                Err(e) if e.is_skippable() => {
627                    // Non-critical, skip
628                },
629                Err(e) => {
630                    last_error = Some(e);
631                },
632            }
633        }
634
635        if let Some(e) = last_error {
636            if chunks.is_empty() {
637                return Err(e);
638            }
639        }
640
641        // Sort for determinism (matches EmbedChunker behavior)
642        chunks.sort_by(|a, b| {
643            a.source
644                .file
645                .cmp(&b.source.file)
646                .then_with(|| a.source.lines.0.cmp(&b.source.lines.0))
647                .then_with(|| a.source.lines.1.cmp(&b.source.lines.1))
648                .then_with(|| a.source.symbol.cmp(&b.source.symbol))
649                .then_with(|| a.id.cmp(&b.id))
650        });
651
652        Ok(chunks)
653    }
654}
655
656impl Iterator for ChunkStream {
657    type Item = Result<EmbedChunk, EmbedError>;
658
659    fn next(&mut self) -> Option<Self::Item> {
660        // Return buffered chunk if available
661        if let Some(chunk) = self.chunk_buffer.pop_front() {
662            return Some(chunk);
663        }
664
665        // Try to fill buffer
666        if self.fill_buffer() {
667            self.chunk_buffer.pop_front()
668        } else {
669            None
670        }
671    }
672
673    fn size_hint(&self) -> (usize, Option<usize>) {
674        let remaining = self.stats.estimated_chunks_remaining();
675        let buffered = self.chunk_buffer.len();
676        (buffered, Some(buffered + remaining))
677    }
678}
679
680/// Handle for cancelling a chunk stream from another thread
681#[derive(Clone)]
682pub struct CancellationHandle {
683    cancelled: Arc<AtomicBool>,
684}
685
686impl CancellationHandle {
687    /// Cancel the associated stream
688    pub fn cancel(&self) {
689        self.cancelled.store(true, Ordering::Relaxed);
690    }
691
692    /// Check if cancellation has been requested
693    pub fn is_cancelled(&self) -> bool {
694        self.cancelled.load(Ordering::Relaxed)
695    }
696}
697
698/// Extension trait for batch processing
699pub trait BatchIterator: Iterator {
700    /// Process items in batches
701    fn batches(self, batch_size: usize) -> Batches<Self>
702    where
703        Self: Sized,
704    {
705        Batches { iter: self, batch_size }
706    }
707}
708
709impl<I: Iterator> BatchIterator for I {}
710
711/// Iterator adapter that yields batches
712pub struct Batches<I> {
713    iter: I,
714    batch_size: usize,
715}
716
717impl<I: Iterator> Iterator for Batches<I> {
718    type Item = Vec<I::Item>;
719
720    fn next(&mut self) -> Option<Self::Item> {
721        let mut batch = Vec::with_capacity(self.batch_size);
722
723        for _ in 0..self.batch_size {
724            match self.iter.next() {
725                Some(item) => batch.push(item),
726                None => break,
727            }
728        }
729
730        if batch.is_empty() {
731            None
732        } else {
733            Some(batch)
734        }
735    }
736}
737
738#[cfg(test)]
739mod tests {
740    use super::*;
741    use tempfile::TempDir;
742
743    fn create_test_file(dir: &Path, name: &str, content: &str) {
744        let path = dir.join(name);
745        if let Some(parent) = path.parent() {
746            std::fs::create_dir_all(parent).unwrap();
747        }
748        std::fs::write(path, content).unwrap();
749    }
750
751    #[test]
752    fn test_chunk_stream_basic() {
753        let temp_dir = TempDir::new().unwrap();
754        let rust_code = r#"
755/// A test function
756fn hello() {
757    println!("Hello, world!");
758}
759
760fn goodbye() {
761    println!("Goodbye!");
762}
763"#;
764        create_test_file(temp_dir.path(), "test.rs", rust_code);
765
766        let settings = EmbedSettings::default();
767        let limits = ResourceLimits::default();
768
769        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
770        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
771
772        assert!(!chunks.is_empty());
773    }
774
775    #[test]
776    fn test_stream_stats() {
777        let temp_dir = TempDir::new().unwrap();
778        create_test_file(temp_dir.path(), "a.rs", "fn foo() {}");
779        create_test_file(temp_dir.path(), "b.rs", "fn bar() {}");
780        create_test_file(temp_dir.path(), "c.rs", "fn baz() {}");
781
782        let settings = EmbedSettings::default();
783        let limits = ResourceLimits::default();
784
785        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
786
787        assert_eq!(stream.stats().total_files, 3);
788
789        // Consume the stream
790        let _chunks: Vec<_> = stream.collect();
791    }
792
793    #[test]
794    fn test_cancellation() {
795        let temp_dir = TempDir::new().unwrap();
796        for i in 0..10 {
797            create_test_file(
798                temp_dir.path(),
799                &format!("file{}.rs", i),
800                &format!("fn func{}() {{}}", i),
801            );
802        }
803
804        let settings = EmbedSettings::default();
805        let limits = ResourceLimits::default();
806
807        let mut stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
808        let handle = stream.cancellation_handle();
809
810        // Get a few chunks
811        let _ = stream.next();
812        let _ = stream.next();
813
814        // Cancel
815        handle.cancel();
816
817        // Stream should stop
818        assert!(stream.is_cancelled());
819    }
820
821    #[test]
822    fn test_batch_iterator() {
823        let items: Vec<i32> = (0..10).collect();
824        let batches: Vec<Vec<i32>> = items.into_iter().batches(3).collect();
825
826        assert_eq!(batches.len(), 4);
827        assert_eq!(batches[0], vec![0, 1, 2]);
828        assert_eq!(batches[1], vec![3, 4, 5]);
829        assert_eq!(batches[2], vec![6, 7, 8]);
830        assert_eq!(batches[3], vec![9]);
831    }
832
833    #[test]
834    fn test_collect_all_sorts_deterministically() {
835        let temp_dir = TempDir::new().unwrap();
836        create_test_file(temp_dir.path(), "z.rs", "fn z_func() {}");
837        create_test_file(temp_dir.path(), "a.rs", "fn a_func() {}");
838        create_test_file(temp_dir.path(), "m.rs", "fn m_func() {}");
839
840        let settings = EmbedSettings::default();
841        let limits = ResourceLimits::default();
842
843        let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
844        let chunks = stream.collect_all().unwrap();
845
846        // Should be sorted by file path
847        assert!(chunks[0].source.file < chunks[1].source.file);
848        assert!(chunks[1].source.file < chunks[2].source.file);
849    }
850
851    #[test]
852    fn test_stream_config() {
853        let config = StreamConfig {
854            file_batch_size: 10,
855            chunk_buffer_size: 50,
856            skip_on_error: true,
857            max_errors: 5,
858            parallel_batches: false,
859        };
860
861        let temp_dir = TempDir::new().unwrap();
862        create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
863
864        let settings = EmbedSettings::default();
865        let limits = ResourceLimits::default();
866
867        let stream = ChunkStream::with_config(temp_dir.path(), settings, limits, config).unwrap();
868        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
869
870        assert!(!chunks.is_empty());
871    }
872
873    #[test]
874    fn test_stream_with_repo_id() {
875        let temp_dir = TempDir::new().unwrap();
876        create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
877
878        let settings = EmbedSettings::default();
879        let limits = ResourceLimits::default();
880        let repo_id = RepoIdentifier::new("github.com/test", "my-repo");
881
882        let stream = ChunkStream::new(temp_dir.path(), settings, limits)
883            .unwrap()
884            .with_repo_id(repo_id);
885
886        let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
887
888        assert!(!chunks.is_empty());
889        assert_eq!(chunks[0].source.repo.namespace, "github.com/test");
890        assert_eq!(chunks[0].source.repo.name, "my-repo");
891    }
892}