1use crate::error::{Result, TextError};
7use crate::sparse::{CsrMatrix, SparseMatrixBuilder, SparseVector};
8use crate::tokenize::{Tokenizer, WordTokenizer};
9use crate::vocabulary::Vocabulary;
10use memmap2::{Mmap, MmapOptions};
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
14use std::path::Path;
15use std::sync::Arc;
16use std::time::Duration;
17
18#[derive(Debug, Clone, Default)]
20pub struct AdvancedStreamingMetrics {
21 pub documents_processed: usize,
23 pub total_processing_time: Duration,
25 pub peak_memory_usage: usize,
27 pub throughput: f64,
29 pub cache_hit_rate: f64,
31 pub memory_efficiency: f64,
33}
34
35#[derive(Debug, Default)]
37#[allow(dead_code)]
38struct MemoryUsageTracker {
39 current_usage: usize,
40 peak_usage: usize,
41}
42
43impl MemoryUsageTracker {
44 #[allow(dead_code)]
45 fn update_usage(&mut self, current: usize) {
46 self.current_usage = current;
47 if current > self.peak_usage {
48 self.peak_usage = current;
49 }
50 }
51}
52
53pub struct MemoryMappedCorpus {
55 mmap: Arc<Mmap>,
56 line_offsets: Vec<usize>,
57}
58
59impl MemoryMappedCorpus {
60 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
62 let file = File::open(path)
63 .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
64
65 let mmap = unsafe {
66 MmapOptions::new()
67 .map(&file)
68 .map_err(|e| TextError::IoError(format!("Failed to memory map file: {e}")))?
69 };
70
71 let line_offsets = Self::build_line_index(&mmap);
73
74 Ok(Self {
75 mmap: Arc::new(mmap),
76 line_offsets,
77 })
78 }
79
80 fn build_line_index(mmap: &Mmap) -> Vec<usize> {
82 let mut offsets = vec![0];
83 let data = mmap.as_ref();
84
85 for (i, &byte) in data.iter().enumerate() {
86 if byte == b'\n' {
87 offsets.push(i + 1);
88 }
89 }
90
91 offsets
92 }
93
94 pub fn num_documents(&self) -> usize {
96 self.line_offsets.len().saturating_sub(1)
97 }
98
99 pub fn get_document(&self, index: usize) -> Result<&str> {
101 if index >= self.num_documents() {
102 return Err(TextError::InvalidInput(format!(
103 "Document index {index} out of range"
104 )));
105 }
106
107 let start = self.line_offsets[index];
108 let end = if index + 1 < self.line_offsets.len() {
109 self.line_offsets[index + 1].saturating_sub(1) } else {
111 self.mmap.len()
112 };
113
114 let data = &self.mmap[start..end];
115 std::str::from_utf8(data)
116 .map_err(|e| TextError::IoError(format!("Invalid UTF-8 in document: {e}")))
117 }
118
119 pub fn iter(&self) -> CorpusIterator {
121 CorpusIterator {
122 corpus: self,
123 current: 0,
124 }
125 }
126
127 pub fn parallel_process<F, R>(&self, chunksize: usize, processor: F) -> Result<Vec<R>>
129 where
130 F: Fn(&[&str]) -> Result<R> + Send + Sync,
131 R: Send,
132 {
133 use scirs2_core::parallel_ops::*;
134
135 let num_docs = self.num_documents();
136 let num_chunks = num_docs.div_ceil(chunksize);
137
138 (0..num_chunks)
139 .into_par_iter()
140 .map(|chunk_idx| {
141 let start = chunk_idx * chunksize;
142 let end = ((chunk_idx + 1) * chunksize).min(num_docs);
143
144 let mut docs = Vec::with_capacity(end - start);
145 for i in start..end {
146 docs.push(self.get_document(i)?);
147 }
148
149 processor(&docs)
150 })
151 .collect()
152 }
153}
154
155pub struct CorpusIterator<'a> {
157 corpus: &'a MemoryMappedCorpus,
158 current: usize,
159}
160
161impl<'a> Iterator for CorpusIterator<'a> {
162 type Item = Result<&'a str>;
163
164 fn next(&mut self) -> Option<Self::Item> {
165 if self.current >= self.corpus.num_documents() {
166 return None;
167 }
168
169 let doc = self.corpus.get_document(self.current);
170 self.current += 1;
171 Some(doc)
172 }
173}
174
175pub struct StreamingTextProcessor<T: Tokenizer> {
177 tokenizer: T,
178 buffer_size: usize,
179}
180
181impl<T: Tokenizer> StreamingTextProcessor<T> {
182 pub fn new(tokenizer: T) -> Self {
184 Self {
185 tokenizer,
186 buffer_size: 1024 * 1024, }
188 }
189
190 pub fn with_buffer_size(mut self, size: usize) -> Self {
192 self.buffer_size = size;
193 self
194 }
195
196 pub fn process_lines<P, F, R>(&self, path: P, processor: F) -> Result<Vec<R>>
198 where
199 P: AsRef<Path>,
200 F: FnMut(&str, usize) -> Result<Option<R>>,
201 {
202 let file = File::open(path)
203 .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
204
205 let reader = BufReader::with_capacity(self.buffer_size, file);
206 self.process_reader_lines(reader, processor)
207 }
208
209 pub fn process_reader_lines<R: BufRead, F, U>(
211 &self,
212 reader: R,
213 mut processor: F,
214 ) -> Result<Vec<U>>
215 where
216 F: FnMut(&str, usize) -> Result<Option<U>>,
217 {
218 let mut results = Vec::new();
219
220 for (line_num, line_result) in reader.lines().enumerate() {
221 let line =
222 line_result.map_err(|e| TextError::IoError(format!("Error reading line: {e}")))?;
223
224 if let Some(result) = processor(&line, line_num)? {
225 results.push(result);
226 }
227 }
228
229 Ok(results)
230 }
231
232 pub fn build_vocabulary_streaming<P: AsRef<Path>>(
234 &self,
235 path: P,
236 min_count: usize,
237 ) -> Result<Vocabulary> {
238 let mut token_counts = HashMap::<String, usize>::new();
239
240 self.process_lines(&path, |line, _line_num| {
242 let tokens = self.tokenizer.tokenize(line)?;
243 for token in tokens {
244 *token_counts.entry(token).or_insert(0) += 1;
245 }
246 Ok(None::<()>)
247 })?;
248
249 let mut vocab = Vocabulary::new();
251 for (token, count) in &token_counts {
252 if *count >= min_count {
253 vocab.add_token(token);
254 }
255 }
256
257 Ok(vocab)
258 }
259}
260
261impl StreamingTextProcessor<WordTokenizer> {
262 pub fn with_default_tokenizer() -> Self {
264 Self::new(WordTokenizer::default())
265 }
266}
267
268pub struct StreamingVectorizer {
270 vocabulary: Vocabulary,
271 chunksize: usize,
272}
273
274impl StreamingVectorizer {
275 pub fn new(vocabulary: Vocabulary) -> Self {
277 Self {
278 vocabulary,
279 chunksize: 1000, }
281 }
282
283 pub fn with_chunksize(mut self, size: usize) -> Self {
285 self.chunksize = size;
286 self
287 }
288
289 pub fn transform_streaming<P, T>(&self, path: P, tokenizer: &T) -> Result<CsrMatrix>
291 where
292 P: AsRef<Path>,
293 T: Tokenizer,
294 {
295 let mut builder = SparseMatrixBuilder::new(self.vocabulary.len());
296
297 let file = std::fs::File::open(path)
298 .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
299 let reader = std::io::BufReader::new(file);
300
301 for line in reader.lines() {
302 let line = line.map_err(|e| TextError::IoError(format!("Error reading line: {e}")))?;
303 let tokens = tokenizer.tokenize(&line)?;
304 let sparse_vec = self.tokens_to_sparse_vector(&tokens)?;
305 builder.add_row(sparse_vec)?;
306 }
307
308 Ok(builder.build())
309 }
310
311 fn tokens_to_sparse_vector(&self, tokens: &[String]) -> Result<SparseVector> {
313 let mut counts = std::collections::HashMap::new();
314
315 for token in tokens {
316 if let Some(idx) = self.vocabulary.get_index(token) {
317 *counts.entry(idx).or_insert(0.0) += 1.0;
318 }
319 }
320
321 let mut indices: Vec<usize> = counts.keys().copied().collect();
322 indices.sort_unstable();
323
324 let values: Vec<f64> = indices.iter().map(|&idx| counts[&idx]).collect();
325
326 let sparse_vec = SparseVector::fromindices_values(indices, values, self.vocabulary.len());
327
328 Ok(sparse_vec)
329 }
330}
331
332pub struct ChunkedCorpusReader {
334 file: File,
335 chunksize: usize,
336 position: u64,
337 file_size: u64,
338}
339
340impl ChunkedCorpusReader {
341 pub fn new<P: AsRef<Path>>(path: P, chunksize: usize) -> Result<Self> {
343 let file = File::open(path)
344 .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
345
346 let file_size = file
347 .metadata()
348 .map_err(|e| TextError::IoError(format!("Failed to get file metadata: {e}")))?
349 .len();
350
351 Ok(Self {
352 file,
353 chunksize,
354 position: 0,
355 file_size,
356 })
357 }
358
359 pub fn next_chunk(&mut self) -> Result<Option<Vec<String>>> {
361 if self.position >= self.file_size {
362 return Ok(None);
363 }
364
365 self.file
366 .seek(SeekFrom::Start(self.position))
367 .map_err(|e| TextError::IoError(format!("Failed to seek: {e}")))?;
368
369 let mut buffer = vec![0u8; self.chunksize];
370 let bytes_read = self
371 .file
372 .read(&mut buffer)
373 .map_err(|e| TextError::IoError(format!("Failed to read chunk: {e}")))?;
374
375 if bytes_read == 0 {
376 return Ok(None);
377 }
378
379 buffer.truncate(bytes_read);
380
381 let last_newline = buffer.iter().rposition(|&b| b == b'\n');
383
384 let chunk_end = if let Some(pos) = last_newline {
385 pos + 1
386 } else if self.position + bytes_read as u64 >= self.file_size {
387 bytes_read
388 } else {
389 return Err(TextError::IoError(
391 "Chunk size too small to contain a complete line".into(),
392 ));
393 };
394
395 let chunk_str = std::str::from_utf8(&buffer[..chunk_end])
396 .map_err(|e| TextError::IoError(format!("Invalid UTF-8: {e}")))?;
397
398 let lines: Vec<String> = chunk_str.lines().map(|s| s.to_string()).collect();
399
400 self.position += chunk_end as u64;
401
402 Ok(Some(lines))
403 }
404
405 pub fn reset(&mut self) -> Result<()> {
407 self.position = 0;
408 self.file
409 .seek(SeekFrom::Start(0))
410 .map_err(|e| TextError::IoError(format!("Failed to seek: {e}")))?;
411 Ok(())
412 }
413}
414
415pub struct MultiFileCorpus {
417 files: Vec<MemoryMappedCorpus>,
418 file_boundaries: Vec<usize>, total_documents: usize,
420}
421
422impl MultiFileCorpus {
423 pub fn from_files<P: AsRef<Path>>(paths: &[P]) -> Result<Self> {
425 let mut files = Vec::new();
426 let mut file_boundaries = vec![0];
427 let mut total_documents = 0;
428
429 for path in paths {
430 let corpus = MemoryMappedCorpus::from_file(path)?;
431 let doc_count = corpus.num_documents();
432 total_documents += doc_count;
433
434 files.push(corpus);
435 file_boundaries.push(total_documents);
436 }
437
438 Ok(Self {
439 files,
440 file_boundaries,
441 total_documents,
442 })
443 }
444
445 pub fn num_documents(&self) -> usize {
447 self.total_documents
448 }
449
450 pub fn get_document(&self, globalindex: usize) -> Result<&str> {
452 if globalindex >= self.total_documents {
453 return Err(TextError::InvalidInput(format!(
454 "Document _index {globalindex} out of range"
455 )));
456 }
457
458 let file_idx = match self.file_boundaries.binary_search(&(globalindex + 1)) {
460 Ok(idx) => {
461 if idx == 0 {
464 0
465 } else {
466 idx - 1
467 }
468 }
469 Err(idx) => {
470 if idx == 0 {
472 0
473 } else {
474 idx - 1
475 }
476 }
477 };
478
479 let local_index = globalindex.saturating_sub(self.file_boundaries[file_idx]);
480 self.files[file_idx].get_document(local_index)
481 }
482
483 pub fn iter(&self) -> MultiFileIterator {
485 MultiFileIterator {
486 corpus: self,
487 current: 0,
488 }
489 }
490
491 pub fn random_sample(&self, samplesize: usize, seed: u64) -> Result<Vec<&str>> {
493 use std::collections::HashSet;
494
495 if samplesize > self.total_documents {
496 return Err(TextError::InvalidInput(
497 "Sample _size larger than corpus _size".into(),
498 ));
499 }
500
501 let mut rng = seed;
502 let mut selected = HashSet::new();
503 let mut samples = Vec::new();
504
505 while samples.len() < samplesize {
506 rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
508 let index = (rng % self.total_documents as u64) as usize;
509
510 if selected.insert(index) {
511 samples.push(self.get_document(index)?);
512 }
513 }
514
515 Ok(samples)
516 }
517}
518
519pub struct MultiFileIterator<'a> {
521 corpus: &'a MultiFileCorpus,
522 current: usize,
523}
524
525impl<'a> Iterator for MultiFileIterator<'a> {
526 type Item = Result<&'a str>;
527
528 fn next(&mut self) -> Option<Self::Item> {
529 if self.current >= self.corpus.num_documents() {
530 return None;
531 }
532
533 let doc = self.corpus.get_document(self.current);
534 self.current += 1;
535 Some(doc)
536 }
537}
538
539pub struct CachedCorpus {
541 corpus: MemoryMappedCorpus,
542 cache: std::collections::HashMap<usize, String>,
543 access_order: std::collections::VecDeque<usize>,
544 cache_size: usize,
545}
546
547impl CachedCorpus {
548 pub fn new(_corpus: MemoryMappedCorpus, cachesize: usize) -> Self {
550 Self {
551 corpus: _corpus,
552 cache: std::collections::HashMap::new(),
553 access_order: std::collections::VecDeque::new(),
554 cache_size: cachesize,
555 }
556 }
557
558 pub fn get_document(&mut self, index: usize) -> Result<String> {
560 if let Some(doc) = self.cache.get(&index) {
562 if let Some(pos) = self.access_order.iter().position(|&x| x == index) {
564 self.access_order.remove(pos);
565 }
566 self.access_order.push_front(index);
567 return Ok(doc.clone());
568 }
569
570 let doc = self.corpus.get_document(index)?.to_string();
572
573 if self.cache.len() >= self.cache_size {
575 if let Some(lru_index) = self.access_order.pop_back() {
577 self.cache.remove(&lru_index);
578 }
579 }
580
581 let doc_clone = doc.clone();
582 self.cache.insert(index, doc);
583 self.access_order.push_front(index);
584
585 Ok(doc_clone)
586 }
587
588 pub fn cache_hit_rate(&self) -> f64 {
590 if self.access_order.is_empty() {
591 0.0
592 } else {
593 self.cache.len() as f64 / self.access_order.len() as f64
594 }
595 }
596}
597
598pub struct CorpusIndex {
600 word_to_docs: std::collections::HashMap<String, Vec<usize>>,
601 #[allow(dead_code)]
602 doc_to_words: Vec<std::collections::HashSet<String>>,
603}
604
605impl CorpusIndex {
606 pub fn build<T: Tokenizer>(corpus: &MemoryMappedCorpus, tokenizer: &T) -> Result<Self> {
608 let mut word_to_docs = std::collections::HashMap::new();
609 let mut doc_to_words = Vec::new();
610
611 for doc_idx in 0..corpus.num_documents() {
612 let doc = corpus.get_document(doc_idx)?;
613 let tokens = tokenizer.tokenize(doc)?;
614 let unique_tokens: std::collections::HashSet<String> = tokens.into_iter().collect();
615
616 for token in &unique_tokens {
617 word_to_docs
618 .entry(token.clone())
619 .or_insert_with(Vec::new)
620 .push(doc_idx);
621 }
622
623 doc_to_words.push(unique_tokens);
624 }
625
626 Ok(Self {
627 word_to_docs,
628 doc_to_words,
629 })
630 }
631
632 pub fn find_documents_with_word(&self, word: &str) -> Option<&[usize]> {
634 self.word_to_docs.get(word).map(|v| v.as_slice())
635 }
636
637 pub fn find_documents_with_all_words(&self, words: &[&str]) -> Vec<usize> {
639 if words.is_empty() {
640 return Vec::new();
641 }
642
643 let mut result: Option<std::collections::HashSet<usize>> = None;
644
645 for &word in words {
646 if let Some(docs) = self.word_to_docs.get(word) {
647 let doc_set: std::collections::HashSet<usize> = docs.iter().copied().collect();
648
649 result = match result {
650 None => Some(doc_set),
651 Some(mut existing) => {
652 existing.retain(|doc| doc_set.contains(doc));
653 Some(existing)
654 }
655 };
656 } else {
657 return Vec::new();
659 }
660 }
661
662 result
663 .map(|set| set.into_iter().collect())
664 .unwrap_or_default()
665 }
666
667 pub fn vocabulary_size(&self) -> usize {
669 self.word_to_docs.len()
670 }
671}
672
673pub struct MemoryMonitor {
675 peak_usage: usize,
676 current_usage: usize,
677 warnings_enabled: bool,
678 warning_threshold: usize,
679}
680
681impl MemoryMonitor {
682 pub fn new() -> Self {
684 Self {
685 peak_usage: 0,
686 current_usage: 0,
687 warnings_enabled: true,
688 warning_threshold: 1024 * 1024 * 1024, }
690 }
691
692 pub fn with_warning_threshold(mut self, threshold: usize) -> Self {
694 self.warning_threshold = threshold;
695 self
696 }
697
698 pub fn allocate(&mut self, size: usize) {
700 self.current_usage += size;
701 self.peak_usage = self.peak_usage.max(self.current_usage);
702
703 if self.warnings_enabled && self.current_usage > self.warning_threshold {
704 eprintln!(
705 "Memory warning: Current usage {} MB exceeds threshold {} MB",
706 self.current_usage / (1024 * 1024),
707 self.warning_threshold / (1024 * 1024)
708 );
709 }
710 }
711
712 pub fn deallocate(&mut self, size: usize) {
714 self.current_usage = self.current_usage.saturating_sub(size);
715 }
716
717 pub fn current_usage(&self) -> usize {
719 self.current_usage
720 }
721
722 pub fn peak_usage(&self) -> usize {
724 self.peak_usage
725 }
726
727 pub fn reset(&mut self) {
729 self.peak_usage = 0;
730 self.current_usage = 0;
731 }
732}
733
734impl Default for MemoryMonitor {
735 fn default() -> Self {
736 Self::new()
737 }
738}
739
740pub struct AdvancedStreamingProcessor<T: Tokenizer> {
742 tokenizer: T,
743 buffer_size: usize,
744 parallel_chunks: usize,
745 memory_monitor: MemoryMonitor,
746}
747
748impl<T: Tokenizer + Send + Sync> AdvancedStreamingProcessor<T> {
749 pub fn new(tokenizer: T) -> Self {
751 Self {
752 tokenizer,
753 buffer_size: 1024 * 1024, parallel_chunks: num_cpus::get(),
755 memory_monitor: MemoryMonitor::new(),
756 }
757 }
758
759 pub fn with_parallelism(mut self, chunks: usize, buffersize: usize) -> Self {
761 self.parallel_chunks = chunks;
762 self.buffer_size = buffersize;
763 self
764 }
765
766 pub fn process_corpus_parallel<F, R>(
768 &mut self,
769 corpus: &MemoryMappedCorpus,
770 processor: F,
771 ) -> Result<Vec<R>>
772 where
773 F: Fn(&str, usize) -> Result<R> + Send + Sync,
774 R: Send,
775 {
776 use scirs2_core::parallel_ops::*;
777
778 let num_docs = corpus.num_documents();
779 let chunksize = num_docs.div_ceil(self.parallel_chunks);
780
781 let estimated_memory = num_docs * 100; self.memory_monitor.allocate(estimated_memory);
784
785 let results: Vec<R> = (0..self.parallel_chunks)
786 .into_par_iter()
787 .map(|chunk_idx| {
788 let start = chunk_idx * chunksize;
789 let end = ((chunk_idx + 1) * chunksize).min(num_docs);
790
791 let mut chunk_results = Vec::new();
792 for doc_idx in start..end {
793 let doc = corpus.get_document(doc_idx)?;
794 let result = processor(doc, doc_idx)?;
795 chunk_results.push(result);
796 }
797 Ok(chunk_results)
798 })
799 .collect::<Result<Vec<_>>>()?
800 .into_iter()
801 .flatten()
802 .collect();
803
804 self.memory_monitor.deallocate(estimated_memory);
805 Ok(results)
806 }
807
808 pub fn build_corpus_statistics(
810 &mut self,
811 corpus: &MemoryMappedCorpus,
812 ) -> Result<CorpusStatistics> {
813 let mut stats = CorpusStatistics::new();
814
815 let tokenizer = self.tokenizer.clone_box();
817
818 self.process_corpus_parallel(corpus, move |doc, _doc_idx| {
819 let tokens = tokenizer.tokenize(doc)?;
820 let char_count = doc.chars().count();
821 let word_count = tokens.len();
822 let line_count = doc.lines().count();
823
824 Ok(DocumentStats {
825 char_count,
826 word_count,
827 line_count,
828 unique_words: tokens
829 .into_iter()
830 .collect::<std::collections::HashSet<_>>()
831 .len(),
832 })
833 })?
834 .into_iter()
835 .for_each(|doc_stats| stats.add_document(doc_stats));
836
837 Ok(stats)
838 }
839
840 pub fn memory_stats(&self) -> (usize, usize) {
842 (
843 self.memory_monitor.current_usage(),
844 self.memory_monitor.peak_usage(),
845 )
846 }
847}
848
849#[derive(Debug, Clone)]
851pub struct CorpusStatistics {
852 pub total_documents: usize,
854 pub total_words: usize,
856 pub total_chars: usize,
858 pub total_lines: usize,
860 pub vocabulary_size: usize,
862 pub avg_doc_length: f64,
864 pub avg_words_per_line: f64,
866}
867
868impl CorpusStatistics {
869 pub fn new() -> Self {
871 Self {
872 total_documents: 0,
873 total_words: 0,
874 total_chars: 0,
875 total_lines: 0,
876 vocabulary_size: 0,
877 avg_doc_length: 0.0,
878 avg_words_per_line: 0.0,
879 }
880 }
881
882 pub fn add_document(&mut self, docstats: DocumentStats) {
884 self.total_documents += 1;
885 self.total_words += docstats.word_count;
886 self.total_chars += docstats.char_count;
887 self.total_lines += docstats.line_count;
888 self.vocabulary_size += docstats.unique_words;
889
890 self.avg_doc_length = self.total_words as f64 / self.total_documents as f64;
892 self.avg_words_per_line = if self.total_lines > 0 {
893 self.total_words as f64 / self.total_lines as f64
894 } else {
895 0.0
896 };
897 }
898}
899
900impl Default for CorpusStatistics {
901 fn default() -> Self {
902 Self::new()
903 }
904}
905
906#[derive(Debug, Clone)]
908pub struct DocumentStats {
909 pub char_count: usize,
911 pub word_count: usize,
913 pub line_count: usize,
915 pub unique_words: usize,
917}
918
919pub struct ProgressTracker {
921 total: usize,
922 current: usize,
923 report_interval: usize,
924}
925
926impl ProgressTracker {
927 pub fn new(total: usize) -> Self {
929 Self {
930 total,
931 current: 0,
932 report_interval: total / 100, }
934 }
935
936 pub fn update(&mut self, count: usize) {
938 self.current += count;
939
940 if self.current.is_multiple_of(self.report_interval) || self.current >= self.total {
941 let percentage = (self.current as f64 / self.total as f64) * 100.0;
942 println!(
943 "Progress: {:.1}% ({}/{})",
944 percentage, self.current, self.total
945 );
946 }
947 }
948
949 pub fn is_complete(&self) -> bool {
951 self.current >= self.total
952 }
953}
954
955#[cfg(test)]
956mod tests {
957 use super::*;
958 use std::io::Write;
959 use tempfile::NamedTempFile;
960
961 #[test]
962 fn test_memory_mapped_corpus() {
963 let mut file = NamedTempFile::new().unwrap();
965 writeln!(file, "First document").unwrap();
966 writeln!(file, "Second document").unwrap();
967 writeln!(file, "Third document").unwrap();
968 file.flush().unwrap();
969
970 let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
971
972 assert_eq!(corpus.num_documents(), 3);
973 assert_eq!(corpus.get_document(0).unwrap(), "First document");
974 assert_eq!(corpus.get_document(1).unwrap(), "Second document");
975 assert_eq!(corpus.get_document(2).unwrap(), "Third document");
976 }
977
978 #[test]
979 fn test_streaming_processor() {
980 let mut file = NamedTempFile::new().unwrap();
981 writeln!(file, "hello world").unwrap();
982 writeln!(file, "foo bar baz").unwrap();
983 file.flush().unwrap();
984
985 let processor = StreamingTextProcessor::with_default_tokenizer();
986 let mut line_count = 0;
987
988 processor
989 .process_lines(file.path(), |_line, _line_num| {
990 line_count += 1;
991 Ok(None::<()>)
992 })
993 .unwrap();
994
995 assert_eq!(line_count, 2);
996 }
997
998 #[test]
999 fn test_chunked_reader() {
1000 let mut file = NamedTempFile::new().unwrap();
1001 for i in 0..100 {
1002 writeln!(file, "Line {i}").unwrap();
1003 }
1004 file.flush().unwrap();
1005
1006 let mut reader = ChunkedCorpusReader::new(file.path(), 256).unwrap();
1007 let mut total_lines = 0;
1008
1009 while let Some(lines) = reader.next_chunk().unwrap() {
1010 total_lines += lines.len();
1011 }
1012
1013 assert_eq!(total_lines, 100);
1014 }
1015
1016 #[test]
1017 fn test_streaming_vocabulary_building() {
1018 let mut file = NamedTempFile::new().unwrap();
1019 writeln!(file, "the quick brown fox").unwrap();
1020 writeln!(file, "the lazy dog").unwrap();
1021 writeln!(file, "the brown dog").unwrap();
1022 file.flush().unwrap();
1023
1024 let processor = StreamingTextProcessor::with_default_tokenizer();
1025 let vocab = processor
1026 .build_vocabulary_streaming(file.path(), 2)
1027 .unwrap();
1028
1029 assert!(vocab.get_index("the").is_some());
1031 assert!(vocab.get_index("brown").is_some());
1032 assert!(vocab.get_index("dog").is_some());
1033
1034 assert!(vocab.get_index("quick").is_none());
1036 assert!(vocab.get_index("fox").is_none());
1037 assert!(vocab.get_index("lazy").is_none());
1038 }
1039
1040 #[test]
1041 fn test_multi_file_corpus() {
1042 let mut file1 = NamedTempFile::new().unwrap();
1044 writeln!(file1, "Document 1 line 1").unwrap();
1045 writeln!(file1, "Document 1 line 2").unwrap();
1046 file1.flush().unwrap();
1047
1048 let mut file2 = NamedTempFile::new().unwrap();
1049 writeln!(file2, "Document 2 line 1").unwrap();
1050 writeln!(file2, "Document 2 line 2").unwrap();
1051 writeln!(file2, "Document 2 line 3").unwrap();
1052 file2.flush().unwrap();
1053
1054 let paths = vec![file1.path(), file2.path()];
1055 let multi_corpus = MultiFileCorpus::from_files(&paths).unwrap();
1056
1057 assert_eq!(multi_corpus.num_documents(), 5); assert_eq!(multi_corpus.get_document(0).unwrap(), "Document 1 line 1");
1059 assert_eq!(multi_corpus.get_document(2).unwrap(), "Document 2 line 1");
1060 assert_eq!(multi_corpus.get_document(4).unwrap(), "Document 2 line 3");
1061 }
1062
1063 #[test]
1064 fn test_multi_file_random_sampling() {
1065 let mut file1 = NamedTempFile::new().unwrap();
1066 for i in 0..10 {
1067 writeln!(file1, "File1 Doc {i}").unwrap();
1068 }
1069 file1.flush().unwrap();
1070
1071 let mut file2 = NamedTempFile::new().unwrap();
1072 for i in 0..10 {
1073 writeln!(file2, "File2 Doc {i}").unwrap();
1074 }
1075 file2.flush().unwrap();
1076
1077 let paths = vec![file1.path(), file2.path()];
1078 let multi_corpus = MultiFileCorpus::from_files(&paths).unwrap();
1079
1080 let sample = multi_corpus.random_sample(5, 12345).unwrap();
1081 assert_eq!(sample.len(), 5);
1082
1083 let sample2 = multi_corpus.random_sample(5, 12345).unwrap();
1085 assert_eq!(sample, sample2);
1086 }
1087
1088 #[test]
1089 fn test_cached_corpus() {
1090 let mut file = NamedTempFile::new().unwrap();
1091 for i in 0..10 {
1092 writeln!(file, "Document {i}").unwrap();
1093 }
1094 file.flush().unwrap();
1095
1096 let base_corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1097 let mut cached_corpus = CachedCorpus::new(base_corpus, 3);
1098
1099 let doc0 = cached_corpus.get_document(0).unwrap();
1101 let doc1 = cached_corpus.get_document(1).unwrap();
1102 let doc2 = cached_corpus.get_document(2).unwrap();
1103
1104 assert_eq!(doc0, "Document 0");
1105 assert_eq!(doc1, "Document 1");
1106 assert_eq!(doc2, "Document 2");
1107
1108 let doc0_again = cached_corpus.get_document(0).unwrap();
1110 assert_eq!(doc0_again, "Document 0");
1111
1112 let hit_rate = cached_corpus.cache_hit_rate();
1114 assert!(hit_rate > 0.0);
1115 }
1116
1117 #[test]
1118 fn test_corpus_index() {
1119 let mut file = NamedTempFile::new().unwrap();
1120 writeln!(file, "the quick brown fox").unwrap();
1121 writeln!(file, "the lazy dog").unwrap();
1122 writeln!(file, "quick brown animal").unwrap();
1123 file.flush().unwrap();
1124
1125 let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1126 let tokenizer = WordTokenizer::default();
1127 let index = CorpusIndex::build(&corpus, &tokenizer).unwrap();
1128
1129 let docs_with_quick = index.find_documents_with_word("quick").unwrap();
1131 assert_eq!(docs_with_quick.len(), 2); let docs_with_all = index.find_documents_with_all_words(&["the", "brown"]);
1135 assert_eq!(docs_with_all.len(), 1); assert!(index.vocabulary_size() > 0);
1139 }
1140
1141 #[test]
1142 fn test_memory_monitor() {
1143 let mut monitor = MemoryMonitor::new().with_warning_threshold(1000);
1144
1145 assert_eq!(monitor.current_usage(), 0);
1146 assert_eq!(monitor.peak_usage(), 0);
1147
1148 monitor.allocate(500);
1149 assert_eq!(monitor.current_usage(), 500);
1150 assert_eq!(monitor.peak_usage(), 500);
1151
1152 monitor.allocate(300);
1153 assert_eq!(monitor.current_usage(), 800);
1154 assert_eq!(monitor.peak_usage(), 800);
1155
1156 monitor.deallocate(200);
1157 assert_eq!(monitor.current_usage(), 600);
1158 assert_eq!(monitor.peak_usage(), 800); monitor.reset();
1161 assert_eq!(monitor.current_usage(), 0);
1162 assert_eq!(monitor.peak_usage(), 0);
1163 }
1164
1165 #[test]
1166 fn test_advanced_streaming_processor() {
1167 let mut file = NamedTempFile::new().unwrap();
1168 writeln!(file, "hello world").unwrap();
1169 writeln!(file, "foo bar baz").unwrap();
1170 writeln!(file, "test document").unwrap();
1171 file.flush().unwrap();
1172
1173 let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1174 let tokenizer = WordTokenizer::default();
1175 let mut processor = AdvancedStreamingProcessor::new(tokenizer);
1176
1177 let results = processor
1178 .process_corpus_parallel(&corpus, |doc, idx| {
1179 let doc_len = doc.len();
1180 Ok(format!("Processed doc {idx}: {doc_len}"))
1181 })
1182 .unwrap();
1183
1184 assert_eq!(results.len(), 3);
1185 assert!(results[0].contains("Processed doc 0"));
1186 assert!(results[1].contains("Processed doc 1"));
1187 assert!(results[2].contains("Processed doc 2"));
1188
1189 let (current, _peak) = processor.memory_stats();
1191 assert_eq!(current, 0); }
1193
1194 #[test]
1195 fn test_corpus_statistics() {
1196 let mut file = NamedTempFile::new().unwrap();
1197 writeln!(file, "hello world test").unwrap();
1198 writeln!(file, "foo bar").unwrap();
1199 writeln!(file, "single").unwrap();
1200 file.flush().unwrap();
1201
1202 let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1203 let tokenizer = WordTokenizer::default();
1204 let mut processor = AdvancedStreamingProcessor::new(tokenizer);
1205
1206 let stats = processor.build_corpus_statistics(&corpus).unwrap();
1207
1208 assert_eq!(stats.total_documents, 3);
1209 assert_eq!(stats.total_words, 6); assert!(stats.avg_doc_length > 0.0);
1211 assert_eq!(stats.total_lines, 3);
1212 }
1213
1214 #[test]
1215 fn test_document_stats() {
1216 let mut stats = CorpusStatistics::new();
1217
1218 let doc_stats1 = DocumentStats {
1219 char_count: 100,
1220 word_count: 20,
1221 line_count: 5,
1222 unique_words: 15,
1223 };
1224
1225 let doc_stats2 = DocumentStats {
1226 char_count: 50,
1227 word_count: 10,
1228 line_count: 2,
1229 unique_words: 8,
1230 };
1231
1232 stats.add_document(doc_stats1);
1233 stats.add_document(doc_stats2);
1234
1235 assert_eq!(stats.total_documents, 2);
1236 assert_eq!(stats.total_words, 30);
1237 assert_eq!(stats.total_chars, 150);
1238 assert_eq!(stats.total_lines, 7);
1239 assert_eq!(stats.avg_doc_length, 15.0); }
1241
1242 #[test]
1243 fn test_corpus_index_edge_cases() {
1244 let mut file = NamedTempFile::new().unwrap();
1245 writeln!(file).unwrap(); writeln!(file, "single").unwrap();
1247 file.flush().unwrap();
1248
1249 let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1250 let tokenizer = WordTokenizer::default();
1251 let index = CorpusIndex::build(&corpus, &tokenizer).unwrap();
1252
1253 assert!(index.find_documents_with_word("nonexistent").is_none());
1255
1256 let empty_result = index.find_documents_with_all_words(&[]);
1258 assert!(empty_result.is_empty());
1259
1260 let no_match = index.find_documents_with_all_words(&["nonexistent"]);
1262 assert!(no_match.is_empty());
1263 }
1264
1265 #[test]
1266 fn test_multi_file_iterator() {
1267 let mut file1 = NamedTempFile::new().unwrap();
1268 writeln!(file1, "doc1").unwrap();
1269 writeln!(file1, "doc2").unwrap();
1270 file1.flush().unwrap();
1271
1272 let mut file2 = NamedTempFile::new().unwrap();
1273 writeln!(file2, "doc3").unwrap();
1274 file2.flush().unwrap();
1275
1276 let paths = vec![file1.path(), file2.path()];
1277 let multi_corpus = MultiFileCorpus::from_files(&paths).unwrap();
1278
1279 let docs: Result<Vec<_>> = multi_corpus.iter().collect();
1280 let docs = docs.unwrap();
1281
1282 assert_eq!(docs.len(), 3);
1283 assert_eq!(docs[0], "doc1");
1284 assert_eq!(docs[1], "doc2");
1285 assert_eq!(docs[2], "doc3");
1286 }
1287}