1use std::collections::VecDeque;
61use std::path::{Path, PathBuf};
62use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
63use std::sync::Arc;
64
65use crate::parser::{parse_file_symbols, Language};
66use crate::security::SecurityScanner;
67use crate::tokenizer::{TokenModel, Tokenizer};
68
69use super::error::EmbedError;
70use super::hasher::hash_content;
71use super::limits::ResourceLimits;
72use super::types::{ChunkContext, ChunkSource, EmbedChunk, EmbedSettings, RepoIdentifier};
73
74#[derive(Debug, Clone)]
76pub struct StreamConfig {
77 pub file_batch_size: usize,
79
80 pub chunk_buffer_size: usize,
82
83 pub skip_on_error: bool,
85
86 pub max_errors: usize,
88
89 pub parallel_batches: bool,
91}
92
93impl Default for StreamConfig {
94 fn default() -> Self {
95 Self {
96 file_batch_size: 50,
97 chunk_buffer_size: 200,
98 skip_on_error: true,
99 max_errors: 100,
100 parallel_batches: true,
101 }
102 }
103}
104
105#[derive(Debug, Clone, Default)]
107pub struct StreamStats {
108 pub total_files: usize,
110
111 pub files_processed: usize,
113
114 pub files_skipped: usize,
116
117 pub chunks_generated: usize,
119
120 pub bytes_processed: u64,
122
123 pub error_count: usize,
125}
126
127impl StreamStats {
128 pub fn progress_percent(&self) -> f64 {
130 if self.total_files == 0 {
131 return 100.0;
132 }
133 (self.files_processed as f64 / self.total_files as f64) * 100.0
134 }
135
136 pub fn estimated_chunks_remaining(&self) -> usize {
138 if self.files_processed == 0 {
139 return 0;
140 }
141 let rate = self.chunks_generated as f64 / self.files_processed as f64;
142 let remaining_files = self.total_files.saturating_sub(self.files_processed);
143 (remaining_files as f64 * rate) as usize
144 }
145}
146
147pub struct ChunkStream {
152 pending_files: VecDeque<PathBuf>,
154
155 chunk_buffer: VecDeque<Result<EmbedChunk, EmbedError>>,
157
158 repo_root: PathBuf,
160
161 settings: EmbedSettings,
163
164 limits: ResourceLimits,
166
167 config: StreamConfig,
169
170 tokenizer: Tokenizer,
172
173 security_scanner: Option<SecurityScanner>,
175
176 repo_id: RepoIdentifier,
178
179 stats: StreamStats,
181
182 cancelled: Arc<AtomicBool>,
184
185 error_count: AtomicUsize,
187}
188
189impl ChunkStream {
190 pub fn new(
192 repo_path: impl AsRef<Path>,
193 settings: EmbedSettings,
194 limits: ResourceLimits,
195 ) -> Result<Self, EmbedError> {
196 Self::with_config(repo_path, settings, limits, StreamConfig::default())
197 }
198
199 pub fn with_config(
201 repo_path: impl AsRef<Path>,
202 settings: EmbedSettings,
203 limits: ResourceLimits,
204 config: StreamConfig,
205 ) -> Result<Self, EmbedError> {
206 let repo_root = repo_path
207 .as_ref()
208 .canonicalize()
209 .map_err(|e| EmbedError::IoError {
210 path: repo_path.as_ref().to_path_buf(),
211 source: e,
212 })?;
213
214 if !repo_root.is_dir() {
215 return Err(EmbedError::NotADirectory { path: repo_root });
216 }
217
218 let security_scanner = if settings.scan_secrets {
220 Some(SecurityScanner::new())
221 } else {
222 None
223 };
224
225 let mut stream = Self {
226 pending_files: VecDeque::new(),
227 chunk_buffer: VecDeque::new(),
228 repo_root,
229 settings,
230 limits,
231 config,
232 tokenizer: Tokenizer::new(),
233 security_scanner,
234 repo_id: RepoIdentifier::default(),
235 stats: StreamStats::default(),
236 cancelled: Arc::new(AtomicBool::new(false)),
237 error_count: AtomicUsize::new(0),
238 };
239
240 stream.discover_files()?;
242
243 Ok(stream)
244 }
245
246 pub fn with_repo_id(mut self, repo_id: RepoIdentifier) -> Self {
248 self.repo_id = repo_id;
249 self
250 }
251
252 pub fn stats(&self) -> &StreamStats {
254 &self.stats
255 }
256
257 pub fn cancellation_handle(&self) -> CancellationHandle {
259 CancellationHandle { cancelled: Arc::clone(&self.cancelled) }
260 }
261
262 pub fn is_cancelled(&self) -> bool {
264 self.cancelled.load(Ordering::Relaxed)
265 }
266
267 fn discover_files(&mut self) -> Result<(), EmbedError> {
269 use glob::Pattern;
270 use ignore::WalkBuilder;
271
272 let include_patterns: Vec<Pattern> = self
274 .settings
275 .include_patterns
276 .iter()
277 .filter_map(|p| Pattern::new(p).ok())
278 .collect();
279
280 let exclude_patterns: Vec<Pattern> = self
281 .settings
282 .exclude_patterns
283 .iter()
284 .filter_map(|p| Pattern::new(p).ok())
285 .collect();
286
287 let walker = WalkBuilder::new(&self.repo_root)
288 .hidden(false)
289 .git_ignore(true)
290 .git_global(true)
291 .git_exclude(true)
292 .follow_links(false)
293 .build();
294
295 let mut files = Vec::new();
296
297 for entry in walker.flatten() {
298 let path = entry.path();
299
300 if !path.is_file() {
301 continue;
302 }
303
304 let relative = path
306 .strip_prefix(&self.repo_root)
307 .unwrap_or(path)
308 .to_string_lossy();
309
310 if !include_patterns.is_empty()
312 && !include_patterns.iter().any(|p| p.matches(&relative))
313 {
314 continue;
315 }
316
317 if exclude_patterns.iter().any(|p| p.matches(&relative)) {
319 continue;
320 }
321
322 let ext = match path.extension().and_then(|e| e.to_str()) {
324 Some(e) => e,
325 None => continue,
326 };
327
328 if Language::from_extension(ext).is_none() {
329 continue;
330 }
331
332 if !self.settings.include_tests && self.is_test_file(path) {
334 continue;
335 }
336
337 files.push(path.to_path_buf());
338 }
339
340 files.sort();
342
343 self.stats.total_files = files.len();
344 self.pending_files = files.into();
345
346 if !self.limits.check_file_count(self.stats.total_files) {
348 return Err(EmbedError::TooManyFiles {
349 count: self.stats.total_files,
350 max: self.limits.max_files,
351 });
352 }
353
354 Ok(())
355 }
356
357 fn is_test_file(&self, path: &Path) -> bool {
359 let path_str = path.to_string_lossy().to_lowercase();
360
361 path_str.contains("/tests/")
362 || path_str.contains("\\tests\\")
363 || path_str.contains("/test/")
364 || path_str.contains("\\test\\")
365 || path_str.contains("/__tests__/")
366 || path_str.contains("\\__tests__\\")
367 }
368
369 fn fill_buffer(&mut self) -> bool {
371 if self.is_cancelled() {
372 return false;
373 }
374
375 let batch_size = self.config.file_batch_size.min(self.pending_files.len());
377 if batch_size == 0 {
378 return false;
379 }
380
381 let batch: Vec<_> = (0..batch_size)
382 .filter_map(|_| self.pending_files.pop_front())
383 .collect();
384
385 for file_path in batch {
387 if self.is_cancelled() {
388 break;
389 }
390
391 match self.process_file(&file_path) {
392 Ok(chunks) => {
393 self.stats.files_processed += 1;
394 self.stats.chunks_generated += chunks.len();
395
396 for chunk in chunks {
397 self.chunk_buffer.push_back(Ok(chunk));
398 }
399 },
400 Err(e) => {
401 self.stats.error_count += 1;
402 let current_errors = self.error_count.fetch_add(1, Ordering::Relaxed) + 1;
403
404 if e.is_skippable() && self.config.skip_on_error {
405 self.stats.files_skipped += 1;
406 if !e.is_critical() {
408 self.chunk_buffer.push_back(Err(e));
409 }
410 } else if current_errors >= self.config.max_errors {
411 self.chunk_buffer.push_back(Err(EmbedError::TooManyErrors {
413 count: current_errors,
414 max: self.config.max_errors,
415 }));
416 self.cancelled.store(true, Ordering::Relaxed);
417 break;
418 } else if e.is_critical() {
419 self.chunk_buffer.push_back(Err(e));
420 break;
421 }
422 },
423 }
424 }
425
426 !self.chunk_buffer.is_empty() || !self.pending_files.is_empty()
427 }
428
429 fn process_file(&mut self, path: &Path) -> Result<Vec<EmbedChunk>, EmbedError> {
431 let metadata = std::fs::metadata(path)
433 .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
434
435 if !self.limits.check_file_size(metadata.len()) {
436 return Err(EmbedError::FileTooLarge {
437 path: path.to_path_buf(),
438 size: metadata.len(),
439 max: self.limits.max_file_size,
440 });
441 }
442
443 let mut content = std::fs::read_to_string(path)
445 .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
446
447 self.stats.bytes_processed += content.len() as u64;
448
449 if let Some(max_line_len) = content.lines().map(|l| l.len()).max() {
451 if !self.limits.check_line_length(max_line_len) {
452 return Err(EmbedError::LineTooLong {
453 path: path.to_path_buf(),
454 length: max_line_len,
455 max: self.limits.max_line_length,
456 });
457 }
458 }
459
460 let relative_path = self.safe_relative_path(path)?;
462
463 if let Some(ref scanner) = self.security_scanner {
464 let findings = scanner.scan(&content, &relative_path);
465 if !findings.is_empty() {
466 if self.settings.fail_on_secrets {
467 let files = findings
468 .iter()
469 .map(|f| format!(" {}:{} - {}", f.file, f.line, f.kind.name()))
470 .collect::<Vec<_>>()
471 .join("\n");
472 return Err(EmbedError::SecretsDetected { count: findings.len(), files });
473 }
474
475 if self.settings.redact_secrets {
476 content = scanner.redact_content(&content, &relative_path);
477 }
478 }
479 }
480
481 let language = self.detect_language(path);
483 let mut symbols = parse_file_symbols(&content, path);
484 symbols.sort_by(|a, b| {
485 a.start_line
486 .cmp(&b.start_line)
487 .then_with(|| a.end_line.cmp(&b.end_line))
488 .then_with(|| a.name.cmp(&b.name))
489 });
490
491 let lines: Vec<&str> = content.lines().collect();
492 let mut chunks = Vec::with_capacity(symbols.len());
493
494 for symbol in &symbols {
495 if !self.settings.include_imports
497 && matches!(symbol.kind, crate::types::SymbolKind::Import)
498 {
499 continue;
500 }
501
502 let start_line = symbol.start_line.saturating_sub(1) as usize;
504 let end_line = (symbol.end_line as usize).min(lines.len());
505 let context_start = start_line.saturating_sub(self.settings.context_lines as usize);
506 let context_end = (end_line + self.settings.context_lines as usize).min(lines.len());
507
508 let chunk_content = lines[context_start..context_end].join("\n");
509
510 let token_model = TokenModel::from_model_name(&self.settings.token_model)
512 .unwrap_or(TokenModel::Claude);
513 let tokens = self.tokenizer.count(&chunk_content, token_model);
514
515 let hash = hash_content(&chunk_content);
517
518 let fqn = self.compute_fqn(&relative_path, symbol);
520
521 chunks.push(EmbedChunk {
522 id: hash.short_id,
523 full_hash: hash.full_hash,
524 content: chunk_content,
525 tokens,
526 kind: symbol.kind.into(),
527 source: ChunkSource {
528 repo: self.repo_id.clone(),
529 file: relative_path.clone(),
530 lines: ((context_start + 1) as u32, context_end as u32),
531 symbol: symbol.name.clone(),
532 fqn: Some(fqn),
533 language: language.clone(),
534 parent: symbol.parent.clone(),
535 visibility: symbol.visibility.into(),
536 is_test: self.is_test_code(path, symbol),
537 },
538 context: ChunkContext {
539 docstring: symbol.docstring.clone(),
540 comments: Vec::new(),
541 signature: symbol.signature.clone(),
542 calls: symbol.calls.clone(),
543 called_by: Vec::new(),
544 imports: Vec::new(),
545 tags: Vec::new(),
546 lines_of_code: 0,
547 max_nesting_depth: 0,
548 },
549 part: None,
550 });
551 }
552
553 Ok(chunks)
554 }
555
556 fn safe_relative_path(&self, path: &Path) -> Result<String, EmbedError> {
558 let canonical = path
559 .canonicalize()
560 .map_err(|e| EmbedError::IoError { path: path.to_path_buf(), source: e })?;
561
562 if !canonical.starts_with(&self.repo_root) {
563 return Err(EmbedError::PathTraversal {
564 path: canonical,
565 repo_root: self.repo_root.clone(),
566 });
567 }
568
569 Ok(canonical
570 .strip_prefix(&self.repo_root)
571 .unwrap_or(&canonical)
572 .to_string_lossy()
573 .replace('\\', "/"))
574 }
575
576 fn detect_language(&self, path: &Path) -> String {
578 path.extension()
579 .and_then(|e| e.to_str())
580 .and_then(Language::from_extension)
581 .map(|l| l.display_name().to_string())
582 .unwrap_or_else(|| "unknown".to_string())
583 }
584
585 fn compute_fqn(&self, file: &str, symbol: &crate::types::Symbol) -> String {
587 let module_path = file
588 .strip_suffix(".rs")
589 .or_else(|| file.strip_suffix(".py"))
590 .or_else(|| file.strip_suffix(".ts"))
591 .or_else(|| file.strip_suffix(".tsx"))
592 .or_else(|| file.strip_suffix(".js"))
593 .or_else(|| file.strip_suffix(".jsx"))
594 .or_else(|| file.strip_suffix(".go"))
595 .unwrap_or(file)
596 .replace(['\\', '/'], "::"); if let Some(ref parent) = symbol.parent {
599 format!("{}::{}::{}", module_path, parent, symbol.name)
600 } else {
601 format!("{}::{}", module_path, symbol.name)
602 }
603 }
604
605 fn is_test_code(&self, path: &Path, symbol: &crate::types::Symbol) -> bool {
607 let path_str = path.to_string_lossy().to_lowercase();
608 let name = symbol.name.to_lowercase();
609
610 path_str.contains("test")
611 || path_str.contains("spec")
612 || name.starts_with("test_")
613 || name.ends_with("_test")
614 }
615
616 pub fn collect_all(self) -> Result<Vec<EmbedChunk>, EmbedError> {
621 let mut chunks = Vec::new();
622 let mut last_error = None;
623
624 for result in self {
625 match result {
626 Ok(chunk) => chunks.push(chunk),
627 Err(e) if e.is_skippable() => {
628 },
630 Err(e) => {
631 last_error = Some(e);
632 },
633 }
634 }
635
636 if let Some(e) = last_error {
637 if chunks.is_empty() {
638 return Err(e);
639 }
640 }
641
642 chunks.sort_by(|a, b| {
644 a.source
645 .file
646 .cmp(&b.source.file)
647 .then_with(|| a.source.lines.0.cmp(&b.source.lines.0))
648 .then_with(|| a.source.lines.1.cmp(&b.source.lines.1))
649 .then_with(|| a.source.symbol.cmp(&b.source.symbol))
650 .then_with(|| a.id.cmp(&b.id))
651 });
652
653 Ok(chunks)
654 }
655}
656
657impl Iterator for ChunkStream {
658 type Item = Result<EmbedChunk, EmbedError>;
659
660 fn next(&mut self) -> Option<Self::Item> {
661 if let Some(chunk) = self.chunk_buffer.pop_front() {
663 return Some(chunk);
664 }
665
666 if self.fill_buffer() {
668 self.chunk_buffer.pop_front()
669 } else {
670 None
671 }
672 }
673
674 fn size_hint(&self) -> (usize, Option<usize>) {
675 let remaining = self.stats.estimated_chunks_remaining();
676 let buffered = self.chunk_buffer.len();
677 (buffered, Some(buffered + remaining))
678 }
679}
680
681#[derive(Clone)]
683pub struct CancellationHandle {
684 cancelled: Arc<AtomicBool>,
685}
686
687impl CancellationHandle {
688 pub fn cancel(&self) {
690 self.cancelled.store(true, Ordering::Relaxed);
691 }
692
693 pub fn is_cancelled(&self) -> bool {
695 self.cancelled.load(Ordering::Relaxed)
696 }
697}
698
699pub trait BatchIterator: Iterator {
701 fn batches(self, batch_size: usize) -> Batches<Self>
703 where
704 Self: Sized,
705 {
706 Batches { iter: self, batch_size }
707 }
708}
709
710impl<I: Iterator> BatchIterator for I {}
711
712pub struct Batches<I> {
714 iter: I,
715 batch_size: usize,
716}
717
718impl<I: Iterator> Iterator for Batches<I> {
719 type Item = Vec<I::Item>;
720
721 fn next(&mut self) -> Option<Self::Item> {
722 let mut batch = Vec::with_capacity(self.batch_size);
723
724 for _ in 0..self.batch_size {
725 match self.iter.next() {
726 Some(item) => batch.push(item),
727 None => break,
728 }
729 }
730
731 if batch.is_empty() {
732 None
733 } else {
734 Some(batch)
735 }
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use super::*;
742 use tempfile::TempDir;
743
744 fn create_test_file(dir: &Path, name: &str, content: &str) {
745 let path = dir.join(name);
746 if let Some(parent) = path.parent() {
747 std::fs::create_dir_all(parent).unwrap();
748 }
749 std::fs::write(path, content).unwrap();
750 }
751
752 #[test]
753 fn test_chunk_stream_basic() {
754 let temp_dir = TempDir::new().unwrap();
755 let rust_code = r#"
756/// A test function
757fn hello() {
758 println!("Hello, world!");
759}
760
761fn goodbye() {
762 println!("Goodbye!");
763}
764"#;
765 create_test_file(temp_dir.path(), "test.rs", rust_code);
766
767 let settings = EmbedSettings::default();
768 let limits = ResourceLimits::default();
769
770 let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
771 let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
772
773 assert!(!chunks.is_empty());
774 }
775
776 #[test]
777 fn test_stream_stats() {
778 let temp_dir = TempDir::new().unwrap();
779 create_test_file(temp_dir.path(), "a.rs", "fn foo() {}");
780 create_test_file(temp_dir.path(), "b.rs", "fn bar() {}");
781 create_test_file(temp_dir.path(), "c.rs", "fn baz() {}");
782
783 let settings = EmbedSettings::default();
784 let limits = ResourceLimits::default();
785
786 let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
787
788 assert_eq!(stream.stats().total_files, 3);
789
790 let _chunks: Vec<_> = stream.collect();
792 }
793
794 #[test]
795 fn test_cancellation() {
796 let temp_dir = TempDir::new().unwrap();
797 for i in 0..10 {
798 create_test_file(
799 temp_dir.path(),
800 &format!("file{}.rs", i),
801 &format!("fn func{}() {{}}", i),
802 );
803 }
804
805 let settings = EmbedSettings::default();
806 let limits = ResourceLimits::default();
807
808 let mut stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
809 let handle = stream.cancellation_handle();
810
811 let _ = stream.next();
813 let _ = stream.next();
814
815 handle.cancel();
817
818 assert!(stream.is_cancelled());
820 }
821
822 #[test]
823 fn test_batch_iterator() {
824 let items: Vec<i32> = (0..10).collect();
825 let batches: Vec<Vec<i32>> = items.into_iter().batches(3).collect();
826
827 assert_eq!(batches.len(), 4);
828 assert_eq!(batches[0], vec![0, 1, 2]);
829 assert_eq!(batches[1], vec![3, 4, 5]);
830 assert_eq!(batches[2], vec![6, 7, 8]);
831 assert_eq!(batches[3], vec![9]);
832 }
833
834 #[test]
835 fn test_collect_all_sorts_deterministically() {
836 let temp_dir = TempDir::new().unwrap();
837 create_test_file(temp_dir.path(), "z.rs", "fn z_func() {}");
838 create_test_file(temp_dir.path(), "a.rs", "fn a_func() {}");
839 create_test_file(temp_dir.path(), "m.rs", "fn m_func() {}");
840
841 let settings = EmbedSettings::default();
842 let limits = ResourceLimits::default();
843
844 let stream = ChunkStream::new(temp_dir.path(), settings, limits).unwrap();
845 let chunks = stream.collect_all().unwrap();
846
847 assert!(chunks[0].source.file < chunks[1].source.file);
849 assert!(chunks[1].source.file < chunks[2].source.file);
850 }
851
852 #[test]
853 fn test_stream_config() {
854 let config = StreamConfig {
855 file_batch_size: 10,
856 chunk_buffer_size: 50,
857 skip_on_error: true,
858 max_errors: 5,
859 parallel_batches: false,
860 };
861
862 let temp_dir = TempDir::new().unwrap();
863 create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
864
865 let settings = EmbedSettings::default();
866 let limits = ResourceLimits::default();
867
868 let stream = ChunkStream::with_config(temp_dir.path(), settings, limits, config).unwrap();
869 let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
870
871 assert!(!chunks.is_empty());
872 }
873
874 #[test]
875 fn test_stream_with_repo_id() {
876 let temp_dir = TempDir::new().unwrap();
877 create_test_file(temp_dir.path(), "test.rs", "fn test() {}");
878
879 let settings = EmbedSettings::default();
880 let limits = ResourceLimits::default();
881 let repo_id = RepoIdentifier::new("github.com/test", "my-repo");
882
883 let stream = ChunkStream::new(temp_dir.path(), settings, limits)
884 .unwrap()
885 .with_repo_id(repo_id);
886
887 let chunks: Vec<_> = stream.filter_map(|r| r.ok()).collect();
888
889 assert!(!chunks.is_empty());
890 assert_eq!(chunks[0].source.repo.namespace, "github.com/test");
891 assert_eq!(chunks[0].source.repo.name, "my-repo");
892 }
893}