1use std::collections::VecDeque;
61use std::path::{Path, PathBuf};
62use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
63use std::sync::Arc;
64
65use crate::parser::{parse_file_symbols, Language};
66use crate::security::SecurityScanner;
67use crate::tokenizer::{TokenModel, Tokenizer};
68
69use super::error::EmbedError;
70use super::hasher::hash_content;
71use super::limits::ResourceLimits;
72use super::types::{
73 ChunkContext, ChunkSource, EmbedChunk, EmbedSettings, RepoIdentifier,
74};
75
76#[derive(Debug, Clone)]
78pub struct StreamConfig {
79 pub file_batch_size: usize,
81
82 pub chunk_buffer_size: usize,
84
85 pub skip_on_error: bool,
87
88 pub max_errors: usize,
90
91 pub parallel_batches: bool,
93}
94
95impl Default for StreamConfig {
96 fn default() -> Self {
97 Self {
98 file_batch_size: 50,
99 chunk_buffer_size: 200,
100 skip_on_error: true,
101 max_errors: 100,
102 parallel_batches: true,
103 }
104 }
105}
106
107#[derive(Debug, Clone, Default)]
109pub struct StreamStats {
110 pub total_files: usize,
112
113 pub files_processed: usize,
115
116 pub files_skipped: usize,
118
119 pub chunks_generated: usize,
121
122 pub bytes_processed: u64,
124
125 pub error_count: usize,
127}
128
129impl StreamStats {
130 pub fn progress_percent(&self) -> f64 {
132 if self.total_files == 0 {
133 return 100.0;
134 }
135 (self.files_processed as f64 / self.total_files as f64) * 100.0
136 }
137
138 pub fn estimated_chunks_remaining(&self) -> usize {
140 if self.files_processed == 0 {
141 return 0;
142 }
143 let rate = self.chunks_generated as f64 / self.files_processed as f64;
144 let remaining_files = self.total_files.saturating_sub(self.files_processed);
145 (remaining_files as f64 * rate) as usize
146 }
147}
148
149pub struct ChunkStream {
154 pending_files: VecDeque<PathBuf>,
156
157 chunk_buffer: VecDeque<Result<EmbedChunk, EmbedError>>,
159
160 repo_root: PathBuf,
162
163 settings: EmbedSettings,
165
166 limits: ResourceLimits,
168
169 config: StreamConfig,
171
172 tokenizer: Tokenizer,
174
175 security_scanner: Option<SecurityScanner>,
177
178 repo_id: RepoIdentifier,
180
181 stats: StreamStats,
183
184 cancelled: Arc<AtomicBool>,
186
187 error_count: AtomicUsize,
189}
190
191impl ChunkStream {
192 pub fn new(
194 repo_path: impl AsRef<Path>,
195 settings: EmbedSettings,
196 limits: ResourceLimits,
197 ) -> Result<Self, EmbedError> {
198 Self::with_config(repo_path, settings, limits, StreamConfig::default())
199 }
200
201 pub fn with_config(
203 repo_path: impl AsRef<Path>,
204 settings: EmbedSettings,
205 limits: ResourceLimits,
206 config: StreamConfig,
207 ) -> Result<Self, EmbedError> {
208 let repo_root = repo_path.as_ref().canonicalize().map_err(|e| {
209 EmbedError::IoError {
210 path: repo_path.as_ref().to_path_buf(),
211 source: e,
212 }
213 })?;
214
215 if !repo_root.is_dir() {
216 return Err(EmbedError::NotADirectory { path: repo_root });
217 }
218
219 let security_scanner = if settings.scan_secrets {
221 Some(SecurityScanner::new())
222 } else {
223 None
224 };
225
226 let mut stream = Self {
227 pending_files: VecDeque::new(),
228 chunk_buffer: VecDeque::new(),
229 repo_root,
230 settings,
231 limits,
232 config,
233 tokenizer: Tokenizer::new(),
234 security_scanner,
235 repo_id: RepoIdentifier::default(),
236 stats: StreamStats::default(),
237 cancelled: Arc::new(AtomicBool::new(false)),
238 error_count: AtomicUsize::new(0),
239 };
240
241 stream.discover_files()?;
243
244 Ok(stream)
245 }
246
247 pub fn with_repo_id(mut self, repo_id: RepoIdentifier) -> Self {
249 self.repo_id = repo_id;
250 self
251 }
252
253 pub fn stats(&self) -> &StreamStats {
255 &self.stats
256 }
257
258 pub fn cancellation_handle(&self) -> CancellationHandle {
260 CancellationHandle {
261 cancelled: Arc::clone(&self.cancelled),
262 }
263 }
264
265 pub fn is_cancelled(&self) -> bool {
267 self.cancelled.load(Ordering::Relaxed)
268 }
269
270 fn discover_files(&mut self) -> Result<(), EmbedError> {
272 use glob::Pattern;
273 use ignore::WalkBuilder;
274
275 let include_patterns: Vec<Pattern> = self
277 .settings
278 .include_patterns
279 .iter()
280 .filter_map(|p| Pattern::new(p).ok())
281 .collect();
282
283 let exclude_patterns: Vec<Pattern> = self
284 .settings
285 .exclude_patterns
286 .iter()
287 .filter_map(|p| Pattern::new(p).ok())
288 .collect();
289
290 let walker = WalkBuilder::new(&self.repo_root)
291 .hidden(false)
292 .git_ignore(true)
293 .git_global(true)
294 .git_exclude(true)
295 .follow_links(false)
296 .build();
297
298 let mut files = Vec::new();
299
300 for entry in walker.flatten() {
301 let path = entry.path();
302
303 if !path.is_file() {
304 continue;
305 }
306
307 let relative = path
309 .strip_prefix(&self.repo_root)
310 .unwrap_or(path)
311 .to_string_lossy();
312
313 if !include_patterns.is_empty()
315 && !include_patterns.iter().any(|p| p.matches(&relative))
316 {
317 continue;
318 }
319
320 if exclude_patterns.iter().any(|p| p.matches(&relative)) {
322 continue;
323 }
324
325 let ext = match path.extension().and_then(|e| e.to_str()) {
327 Some(e) => e,
328 None => continue,
329 };
330
331 if Language::from_extension(ext).is_none() {
332 continue;
333 }
334
335 if !self.settings.include_tests && self.is_test_file(path) {
337 continue;
338 }
339
340 files.push(path.to_path_buf());
341 }
342
343 files.sort();
345
346 self.stats.total_files = files.len();
347 self.pending_files = files.into();
348
349 if !self.limits.check_file_count(self.stats.total_files) {
351 return Err(EmbedError::TooManyFiles {
352 count: self.stats.total_files,
353 max: self.limits.max_files,
354 });
355 }
356
357 Ok(())
358 }
359
360 fn is_test_file(&self, path: &Path) -> bool {
362 let path_str = path.to_string_lossy().to_lowercase();
363
364 path_str.contains("/tests/")
365 || path_str.contains("\\tests\\")
366 || path_str.contains("/test/")
367 || path_str.contains("\\test\\")
368 || path_str.contains("/__tests__/")
369 || path_str.contains("\\__tests__\\")
370 }
371
372 fn fill_buffer(&mut self) -> bool {
374 if self.is_cancelled() {
375 return false;
376 }
377
378 let batch_size = self.config.file_batch_size.min(self.pending_files.len());
380 if batch_size == 0 {
381 return false;
382 }
383
384 let batch: Vec<_> = (0..batch_size)
385 .filter_map(|_| self.pending_files.pop_front())
386 .collect();
387
388 for file_path in batch {
390 if self.is_cancelled() {
391 break;
392 }
393
394 match self.process_file(&file_path) {
395 Ok(chunks) => {
396 self.stats.files_processed += 1;
397 self.stats.chunks_generated += chunks.len();
398
399 for chunk in chunks {
400 self.chunk_buffer.push_back(Ok(chunk));
401 }
402 }
403 Err(e) => {
404 self.stats.error_count += 1;
405 let current_errors = self.error_count.fetch_add(1, Ordering::Relaxed) + 1;
406
407 if e.is_skippable() && self.config.skip_on_error {
408 self.stats.files_skipped += 1;
409 if !e.is_critical() {
411 self.chunk_buffer.push_back(Err(e));
412 }
413 } else if current_errors >= self.config.max_errors {
414 self.chunk_buffer.push_back(Err(EmbedError::TooManyErrors {
416 count: current_errors,
417 max: self.config.max_errors,
418 }));
419 self.cancelled.store(true, Ordering::Relaxed);
420 break;
421 } else if e.is_critical() {
422 self.chunk_buffer.push_back(Err(e));
423 break;
424 }
425 }
426 }
427 }
428
429 !self.chunk_buffer.is_empty() || !self.pending_files.is_empty()
430 }
431
432 fn process_file(&mut self, path: &Path) -> Result<Vec<EmbedChunk>, EmbedError> {
434 let metadata = std::fs::metadata(path).map_err(|e| EmbedError::IoError {
436 path: path.to_path_buf(),
437 source: e,
438 })?;
439
440 if !self.limits.check_file_size(metadata.len()) {
441 return Err(EmbedError::FileTooLarge {
442 path: path.to_path_buf(),
443 size: metadata.len(),
444 max: self.limits.max_file_size,
445 });
446 }
447
448 let mut content = std::fs::read_to_string(path).map_err(|e| EmbedError::IoError {
450 path: path.to_path_buf(),
451 source: e,
452 })?;
453
454 self.stats.bytes_processed += content.len() as u64;
455
456 if let Some(max_line_len) = content.lines().map(|l| l.len()).max() {
458 if !self.limits.check_line_length(max_line_len) {
459 return Err(EmbedError::LineTooLong {
460 path: path.to_path_buf(),
461 length: max_line_len,
462 max: self.limits.max_line_length,
463 });
464 }
465 }
466
467 let relative_path = self.safe_relative_path(path)?;
469
470 if let Some(ref scanner) = self.security_scanner {
471 let findings = scanner.scan(&content, &relative_path);
472 if !findings.is_empty() {
473 if self.settings.fail_on_secrets {
474 let files = findings
475 .iter()
476 .map(|f| format!(" {}:{} - {}", f.file, f.line, f.kind.name()))
477 .collect::<Vec<_>>()
478 .join("\n");
479 return Err(EmbedError::SecretsDetected {
480 count: findings.len(),
481 files,
482 });
483 }
484
485 if self.settings.redact_secrets {
486 content = scanner.redact_content(&content, &relative_path);
487 }
488 }
489 }
490
491 let language = self.detect_language(path);
493 let mut symbols = parse_file_symbols(&content, path);
494 symbols.sort_by(|a, b| {
495 a.start_line
496 .cmp(&b.start_line)
497 .then_with(|| a.end_line.cmp(&b.end_line))
498 .then_with(|| a.name.cmp(&b.name))
499 });
500
501 let lines: Vec<&str> = content.lines().collect();
502 let mut chunks = Vec::with_capacity(symbols.len());
503
504 for symbol in &symbols {
505 if !self.settings.include_imports
507 && matches!(symbol.kind, crate::types::SymbolKind::Import)
508 {
509 continue;
510 }
511
512 let start_line = symbol.start_line.saturating_sub(1) as usize;
514 let end_line = (symbol.end_line as usize).min(lines.len());
515 let context_start =
516 start_line.saturating_sub(self.settings.context_lines as usize);
517 let context_end =
518 (end_line + self.settings.context_lines as usize).min(lines.len());
519
520 let chunk_content = lines[context_start..context_end].join("\n");
521
522 let token_model = TokenModel::from_model_name(&self.settings.token_model)
524 .unwrap_or(TokenModel::Claude);
525 let tokens = self.tokenizer.count(&chunk_content, token_model);
526
527 let hash = hash_content(&chunk_content);
529
530 let fqn = self.compute_fqn(&relative_path, symbol);
532
533 chunks.push(EmbedChunk {
534 id: hash.short_id,
535 full_hash: hash.full_hash,
536 content: chunk_content,
537 tokens,
538 kind: symbol.kind.into(),
539 source: ChunkSource {
540 repo: self.repo_id.clone(),
541 file: relative_path.clone(),
542 lines: ((context_start + 1) as u32, context_end as u32),
543 symbol: symbol.name.clone(),
544 fqn: Some(fqn),
545 language: language.clone(),
546 parent: symbol.parent.clone(),
547 visibility: symbol.visibility.into(),
548 is_test: self.is_test_code(path, symbol),
549 },
550 context: ChunkContext {
551 docstring: symbol.docstring.clone(),
552 comments: Vec::new(),
553 signature: symbol.signature.clone(),
554 calls: symbol.calls.clone(),
555 called_by: Vec::new(),
556 imports: Vec::new(),
557 tags: Vec::new(),
558 lines_of_code: 0,
559 max_nesting_depth: 0,
560 },
561 part: None,
562 });
563 }
564
565 Ok(chunks)
566 }
567
568 fn safe_relative_path(&self, path: &Path) -> Result<String, EmbedError> {
570 let canonical =
571 path.canonicalize()
572 .map_err(|e| EmbedError::IoError {
573 path: path.to_path_buf(),
574 source: e,
575 })?;
576
577 if !canonical.starts_with(&self.repo_root) {
578 return Err(EmbedError::PathTraversal {
579 path: canonical,
580 repo_root: self.repo_root.clone(),
581 });
582 }
583
584 Ok(canonical
585 .strip_prefix(&self.repo_root)
586 .unwrap_or(&canonical)
587 .to_string_lossy()
588 .replace('\\', "/"))
589 }
590
591 fn detect_language(&self, path: &Path) -> String {
593 path.extension()
594 .and_then(|e| e.to_str())
595 .and_then(Language::from_extension)
596 .map(|l| l.display_name().to_string())
597 .unwrap_or_else(|| "unknown".to_string())
598 }
599
600 fn compute_fqn(&self, file: &str, symbol: &crate::types::Symbol) -> String {
602 let module_path = file
603 .strip_suffix(".rs")
604 .or_else(|| file.strip_suffix(".py"))
605 .or_else(|| file.strip_suffix(".ts"))
606 .or_else(|| file.strip_suffix(".tsx"))
607 .or_else(|| file.strip_suffix(".js"))
608 .or_else(|| file.strip_suffix(".jsx"))
609 .or_else(|| file.strip_suffix(".go"))
610 .unwrap_or(file)
611 .replace('\\', "::")
612 .replace('/', "::");
613
614 if let Some(ref parent) = symbol.parent {
615 format!("{}::{}::{}", module_path, parent, symbol.name)
616 } else {
617 format!("{}::{}", module_path, symbol.name)
618 }
619 }
620
621 fn is_test_code(&self, path: &Path, symbol: &crate::types::Symbol) -> bool {
623 let path_str = path.to_string_lossy().to_lowercase();
624 let name = symbol.name.to_lowercase();
625
626 path_str.contains("test")
627 || path_str.contains("spec")
628 || name.starts_with("test_")
629 || name.ends_with("_test")
630 }
631
632 pub fn collect_all(self) -> Result<Vec<EmbedChunk>, EmbedError> {
637 let mut chunks = Vec::new();
638 let mut last_error = None;
639
640 for result in self {
641 match result {
642 Ok(chunk) => chunks.push(chunk),
643 Err(e) if e.is_skippable() => {
644 }
646 Err(e) => {
647 last_error = Some(e);
648 }
649 }
650 }
651
652 if let Some(e) = last_error {
653 if chunks.is_empty() {
654 return Err(e);
655 }
656 }
657
658 chunks.sort_by(|a, b| {
660 a.source
661 .file
662 .cmp(&b.source.file)
663 .then_with(|| a.source.lines.0.cmp(&b.source.lines.0))
664 .then_with(|| a.source.lines.1.cmp(&b.source.lines.1))
665 .then_with(|| a.source.symbol.cmp(&b.source.symbol))
666 .then_with(|| a.id.cmp(&b.id))
667 });
668
669 Ok(chunks)
670 }
671}
672
673impl Iterator for ChunkStream {
674 type Item = Result<EmbedChunk, EmbedError>;
675
676 fn next(&mut self) -> Option<Self::Item> {
677 if let Some(chunk) = self.chunk_buffer.pop_front() {
679 return Some(chunk);
680 }
681
682 if self.fill_buffer() {
684 self.chunk_buffer.pop_front()
685 } else {
686 None
687 }
688 }
689
690 fn size_hint(&self) -> (usize, Option<usize>) {
691 let remaining = self.stats.estimated_chunks_remaining();
692 let buffered = self.chunk_buffer.len();
693 (buffered, Some(buffered + remaining))
694 }
695}
696
697#[derive(Clone)]
699pub struct CancellationHandle {
700 cancelled: Arc<AtomicBool>,
701}
702
703impl CancellationHandle {
704 pub fn cancel(&self) {
706 self.cancelled.store(true, Ordering::Relaxed);
707 }
708
709 pub fn is_cancelled(&self) -> bool {
711 self.cancelled.load(Ordering::Relaxed)
712 }
713}
714
715pub trait BatchIterator: Iterator {
717 fn batches(self, batch_size: usize) -> Batches<Self>
719 where
720 Self: Sized,
721 {
722 Batches {
723 iter: self,
724 batch_size,
725 }
726 }
727}
728
729impl<I: Iterator> BatchIterator for I {}
730
731pub struct Batches<I> {
733 iter: I,
734 batch_size: usize,
735}
736
737impl<I: Iterator> Iterator for Batches<I> {
738 type Item = Vec<I::Item>;
739
740 fn next(&mut self) -> Option<Self::Item> {
741 let mut batch = Vec::with_capacity(self.batch_size);
742
743 for _ in 0..self.batch_size {
744 match self.iter.next() {
745 Some(item) => batch.push(item),
746 None => break,
747 }
748 }
749
750 if batch.is_empty() {
751 None
752 } else {
753 Some(batch)
754 }
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use tempfile::TempDir;
762
763 fn create_test_file(dir: &Path, name: &str, content: &str) {
764 let path = dir.join(name);
765 if let Some(parent) = path.parent() {
766 std::fs::create_dir_all(parent).unwrap();
767 }
768 std::fs::write(path, content).unwrap();
769 }
770
771 #[test]
772 fn test_chunk_stream_basic() {
773 let temp_dir = TempDir::new().unwrap();
774 let rust_code = r#"
775/// A test function
776fn hello() {
777 println!("Hello, world!");
778}
779
780fn goodbye() {
781 println!("Goodbye!");
782}
783"#;
784 create_test_file(temp_dir.path(), "test.rs", rust_code);
785
786 let settings = EmbedSettings::default();
787 let limits = ResourceLimits::default();
788
789 let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
790 let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
791
792 assert!(!chunks.is_empty());
793 }
794
795 #[test]
796 fn test_stream_stats() {
797 let temp_dir = TempDir::new().unwrap();
798 create_test_file(temp_dir.path(), "a.rs", "fn foo() {}");
799 create_test_file(temp_dir.path(), "b.rs", "fn bar() {}");
800 create_test_file(temp_dir.path(), "c.rs", "fn baz() {}");
801
802 let settings = EmbedSettings::default();
803 let limits = ResourceLimits::default();
804
805 let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
806
807 assert_eq!(stream.stats().total_files, 3);
808
809 let _chunks: Vec<_> = stream.collect();
811 }
812
813 #[test]
814 fn test_cancellation() {
815 let temp_dir = TempDir::new().unwrap();
816 for i in 0..10 {
817 create_test_file(
818 temp_dir.path(),
819 &format!("file{}.rs", i),
820 &format!("fn func{}() {{}}", i),
821 );
822 }
823
824 let settings = EmbedSettings::default();
825 let limits = ResourceLimits::default();
826
827 let mut stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
828 let handle = stream.cancellation_handle();
829
830 let _ = stream.next();
832 let _ = stream.next();
833
834 handle.cancel();
836
837 assert!(stream.is_cancelled());
839 }
840
841 #[test]
842 fn test_batch_iterator() {
843 let items: Vec<i32> = (0..10).collect();
844 let batches: Vec<Vec<i32>> = items.into_iter().batches(3).collect();
845
846 assert_eq!(batches.len(), 4);
847 assert_eq!(batches[0], vec![0, 1, 2]);
848 assert_eq!(batches[1], vec![3, 4, 5]);
849 assert_eq!(batches[2], vec![6, 7, 8]);
850 assert_eq!(batches[3], vec![9]);
851 }
852
853 #[test]
854 fn test_collect_all_sorts_deterministically() {
855 let temp_dir = TempDir::new().unwrap();
856 create_test_file(temp_dir.path(), "z.rs", "fn z_func() {}");
857 create_test_file(temp_dir.path(), "a.rs", "fn a_func() {}");
858 create_test_file(temp_dir.path(), "m.rs", "fn m_func() {}");
859
860 let settings = EmbedSettings::default();
861 let limits = ResourceLimits::default();
862
863 let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
864 let chunks = stream.collect_all().unwrap();
865
866 assert!(chunks[0].source.file < chunks[1].source.file);
868 assert!(chunks[1].source.file < chunks[2].source.file);
869 }
870
871 #[test]
872 fn test_stream_config() {
873 let config = StreamConfig {
874 file_batch_size: 10,
875 chunk_buffer_size: 50,
876 skip_on_error: true,
877 max_errors: 5,
878 parallel_batches: false,
879 };
880
881 let temp_dir = TempDir::new().unwrap();
882 create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
883
884 let settings = EmbedSettings::default();
885 let limits = ResourceLimits::default();
886
887 let stream =
888 ChunkStream::with_config(temp_dir.path(), settings, limits, config).unwrap();
889 let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
890
891 assert!(!chunks.is_empty());
892 }
893
894 #[test]
895 fn test_stream_with_repo_id() {
896 let temp_dir = TempDir::new().unwrap();
897 create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
898
899 let settings = EmbedSettings::default();
900 let limits = ResourceLimits::default();
901 let repo_id = RepoIdentifier::new("github.com/test", "my-repo");
902
903 let stream = ChunkStream::new(temp_dir.path(), settings, limits)
904 .unwrap()
905 .with_repo_id(repo_id);
906
907 let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
908
909 assert!(!chunks.is_empty());
910 assert_eq!(chunks[0].source.repo.namespace, "github.com/test");
911 assert_eq!(chunks[0].source.repo.name, "my-repo");
912 }
913}