scirs2_text/
streaming.rs

1//! Memory-efficient streaming and memory-mapped text processing
2//!
3//! This module provides utilities for processing large text corpora that don't fit in memory
4//! using streaming and memory-mapped file techniques.
5
6use crate::error::{Result, TextError};
7use crate::sparse::{CsrMatrix, SparseMatrixBuilder, SparseVector};
8use crate::tokenize::{Tokenizer, WordTokenizer};
9use crate::vocabulary::Vocabulary;
10use memmap2::{Mmap, MmapOptions};
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
14use std::path::Path;
15use std::sync::Arc;
16use std::time::Duration;
17
18/// Advanced-advanced streaming metrics for performance monitoring
19#[derive(Debug, Clone, Default)]
20pub struct AdvancedStreamingMetrics {
21    /// Total documents processed
22    pub documents_processed: usize,
23    /// Total processing time
24    pub total_processing_time: Duration,
25    /// Peak memory usage in bytes
26    pub peak_memory_usage: usize,
27    /// Current throughput (documents per second)
28    pub throughput: f64,
29    /// Cache hit rate
30    pub cache_hit_rate: f64,
31    /// Memory efficiency score
32    pub memory_efficiency: f64,
33}
34
35/// Memory usage tracking for streaming operations
36#[derive(Debug, Default)]
37#[allow(dead_code)]
38struct MemoryUsageTracker {
39    current_usage: usize,
40    peak_usage: usize,
41}
42
43impl MemoryUsageTracker {
44    #[allow(dead_code)]
45    fn update_usage(&mut self, current: usize) {
46        self.current_usage = current;
47        if current > self.peak_usage {
48            self.peak_usage = current;
49        }
50    }
51}
52
53/// Memory-mapped corpus for efficient large file processing
54pub struct MemoryMappedCorpus {
55    mmap: Arc<Mmap>,
56    line_offsets: Vec<usize>,
57}
58
59impl MemoryMappedCorpus {
60    /// Create a new memory-mapped corpus from a file
61    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
62        let file = File::open(path)
63            .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
64
65        let mmap = unsafe {
66            MmapOptions::new()
67                .map(&file)
68                .map_err(|e| TextError::IoError(format!("Failed to memory map file: {e}")))?
69        };
70
71        // Build line offset index
72        let line_offsets = Self::build_line_index(&mmap);
73
74        Ok(Self {
75            mmap: Arc::new(mmap),
76            line_offsets,
77        })
78    }
79
80    /// Build an index of line offsets for fast access
81    fn build_line_index(mmap: &Mmap) -> Vec<usize> {
82        let mut offsets = vec![0];
83        let data = mmap.as_ref();
84
85        for (i, &byte) in data.iter().enumerate() {
86            if byte == b'\n' {
87                offsets.push(i + 1);
88            }
89        }
90
91        offsets
92    }
93
94    /// Get the number of documents (lines) in the corpus
95    pub fn num_documents(&self) -> usize {
96        self.line_offsets.len().saturating_sub(1)
97    }
98
99    /// Get a specific document by index
100    pub fn get_document(&self, index: usize) -> Result<&str> {
101        if index >= self.num_documents() {
102            return Err(TextError::InvalidInput(format!(
103                "Document index {index} out of range"
104            )));
105        }
106
107        let start = self.line_offsets[index];
108        let end = if index + 1 < self.line_offsets.len() {
109            self.line_offsets[index + 1].saturating_sub(1) // Remove newline
110        } else {
111            self.mmap.len()
112        };
113
114        let data = &self.mmap[start..end];
115        std::str::from_utf8(data)
116            .map_err(|e| TextError::IoError(format!("Invalid UTF-8 in document: {e}")))
117    }
118
119    /// Iterate over all documents
120    pub fn iter(&self) -> CorpusIterator {
121        CorpusIterator {
122            corpus: self,
123            current: 0,
124        }
125    }
126
127    /// Process documents in parallel chunks
128    pub fn parallel_process<F, R>(&self, chunksize: usize, processor: F) -> Result<Vec<R>>
129    where
130        F: Fn(&[&str]) -> Result<R> + Send + Sync,
131        R: Send,
132    {
133        use scirs2_core::parallel_ops::*;
134
135        let num_docs = self.num_documents();
136        let num_chunks = num_docs.div_ceil(chunksize);
137
138        (0..num_chunks)
139            .into_par_iter()
140            .map(|chunk_idx| {
141                let start = chunk_idx * chunksize;
142                let end = ((chunk_idx + 1) * chunksize).min(num_docs);
143
144                let mut docs = Vec::with_capacity(end - start);
145                for i in start..end {
146                    docs.push(self.get_document(i)?);
147                }
148
149                processor(&docs)
150            })
151            .collect()
152    }
153}
154
155/// Iterator over documents in a memory-mapped corpus
156pub struct CorpusIterator<'a> {
157    corpus: &'a MemoryMappedCorpus,
158    current: usize,
159}
160
161impl<'a> Iterator for CorpusIterator<'a> {
162    type Item = Result<&'a str>;
163
164    fn next(&mut self) -> Option<Self::Item> {
165        if self.current >= self.corpus.num_documents() {
166            return None;
167        }
168
169        let doc = self.corpus.get_document(self.current);
170        self.current += 1;
171        Some(doc)
172    }
173}
174
175/// Streaming text processor for handling arbitrarily large files
176pub struct StreamingTextProcessor<T: Tokenizer> {
177    tokenizer: T,
178    buffer_size: usize,
179}
180
181impl<T: Tokenizer> StreamingTextProcessor<T> {
182    /// Create a new streaming processor
183    pub fn new(tokenizer: T) -> Self {
184        Self {
185            tokenizer,
186            buffer_size: 1024 * 1024, // 1MB default buffer
187        }
188    }
189
190    /// Set custom buffer size
191    pub fn with_buffer_size(mut self, size: usize) -> Self {
192        self.buffer_size = size;
193        self
194    }
195
196    /// Process a file line by line
197    pub fn process_lines<P, F, R>(&self, path: P, processor: F) -> Result<Vec<R>>
198    where
199        P: AsRef<Path>,
200        F: FnMut(&str, usize) -> Result<Option<R>>,
201    {
202        let file = File::open(path)
203            .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
204
205        let reader = BufReader::with_capacity(self.buffer_size, file);
206        self.process_reader_lines(reader, processor)
207    }
208
209    /// Process lines from any reader
210    pub fn process_reader_lines<R: BufRead, F, U>(
211        &self,
212        reader: R,
213        mut processor: F,
214    ) -> Result<Vec<U>>
215    where
216        F: FnMut(&str, usize) -> Result<Option<U>>,
217    {
218        let mut results = Vec::new();
219
220        for (line_num, line_result) in reader.lines().enumerate() {
221            let line =
222                line_result.map_err(|e| TextError::IoError(format!("Error reading line: {e}")))?;
223
224            if let Some(result) = processor(&line, line_num)? {
225                results.push(result);
226            }
227        }
228
229        Ok(results)
230    }
231
232    /// Build vocabulary from a streaming corpus
233    pub fn build_vocabulary_streaming<P: AsRef<Path>>(
234        &self,
235        path: P,
236        min_count: usize,
237    ) -> Result<Vocabulary> {
238        let mut token_counts = HashMap::<String, usize>::new();
239
240        // First pass: _count tokens
241        self.process_lines(&path, |line, _line_num| {
242            let tokens = self.tokenizer.tokenize(line)?;
243            for token in tokens {
244                *token_counts.entry(token).or_insert(0) += 1;
245            }
246            Ok(None::<()>)
247        })?;
248
249        // Build vocabulary with high-frequency tokens
250        let mut vocab = Vocabulary::new();
251        for (token, count) in &token_counts {
252            if *count >= min_count {
253                vocab.add_token(token);
254            }
255        }
256
257        Ok(vocab)
258    }
259}
260
261impl StreamingTextProcessor<WordTokenizer> {
262    /// Create a streaming processor with default word tokenizer
263    pub fn with_default_tokenizer() -> Self {
264        Self::new(WordTokenizer::default())
265    }
266}
267
268/// Streaming vectorizer for creating sparse matrices from large corpora
269pub struct StreamingVectorizer {
270    vocabulary: Vocabulary,
271    chunksize: usize,
272}
273
274impl StreamingVectorizer {
275    /// Create a new streaming vectorizer
276    pub fn new(vocabulary: Vocabulary) -> Self {
277        Self {
278            vocabulary,
279            chunksize: 1000, // Process 1000 documents at a time
280        }
281    }
282
283    /// Set chunk size for processing
284    pub fn with_chunksize(mut self, size: usize) -> Self {
285        self.chunksize = size;
286        self
287    }
288
289    /// Transform a streaming corpus into a sparse matrix
290    pub fn transform_streaming<P, T>(&self, path: P, tokenizer: &T) -> Result<CsrMatrix>
291    where
292        P: AsRef<Path>,
293        T: Tokenizer,
294    {
295        let mut builder = SparseMatrixBuilder::new(self.vocabulary.len());
296
297        let file = std::fs::File::open(path)
298            .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
299        let reader = std::io::BufReader::new(file);
300
301        for line in reader.lines() {
302            let line = line.map_err(|e| TextError::IoError(format!("Error reading line: {e}")))?;
303            let tokens = tokenizer.tokenize(&line)?;
304            let sparse_vec = self.tokens_to_sparse_vector(&tokens)?;
305            builder.add_row(sparse_vec)?;
306        }
307
308        Ok(builder.build())
309    }
310
311    /// Convert tokens to sparse vector
312    fn tokens_to_sparse_vector(&self, tokens: &[String]) -> Result<SparseVector> {
313        let mut counts = std::collections::HashMap::new();
314
315        for token in tokens {
316            if let Some(idx) = self.vocabulary.get_index(token) {
317                *counts.entry(idx).or_insert(0.0) += 1.0;
318            }
319        }
320
321        let mut indices: Vec<usize> = counts.keys().copied().collect();
322        indices.sort_unstable();
323
324        let values: Vec<f64> = indices.iter().map(|&idx| counts[&idx]).collect();
325
326        let sparse_vec = SparseVector::fromindices_values(indices, values, self.vocabulary.len());
327
328        Ok(sparse_vec)
329    }
330}
331
332/// Chunked corpus reader for processing files in manageable chunks
333pub struct ChunkedCorpusReader {
334    file: File,
335    chunksize: usize,
336    position: u64,
337    file_size: u64,
338}
339
340impl ChunkedCorpusReader {
341    /// Create a new chunked reader
342    pub fn new<P: AsRef<Path>>(path: P, chunksize: usize) -> Result<Self> {
343        let file = File::open(path)
344            .map_err(|e| TextError::IoError(format!("Failed to open file: {e}")))?;
345
346        let file_size = file
347            .metadata()
348            .map_err(|e| TextError::IoError(format!("Failed to get file metadata: {e}")))?
349            .len();
350
351        Ok(Self {
352            file,
353            chunksize,
354            position: 0,
355            file_size,
356        })
357    }
358
359    /// Read the next chunk of complete lines
360    pub fn next_chunk(&mut self) -> Result<Option<Vec<String>>> {
361        if self.position >= self.file_size {
362            return Ok(None);
363        }
364
365        self.file
366            .seek(SeekFrom::Start(self.position))
367            .map_err(|e| TextError::IoError(format!("Failed to seek: {e}")))?;
368
369        let mut buffer = vec![0u8; self.chunksize];
370        let bytes_read = self
371            .file
372            .read(&mut buffer)
373            .map_err(|e| TextError::IoError(format!("Failed to read chunk: {e}")))?;
374
375        if bytes_read == 0 {
376            return Ok(None);
377        }
378
379        buffer.truncate(bytes_read);
380
381        // Find the last newline to ensure complete lines
382        let last_newline = buffer.iter().rposition(|&b| b == b'\n');
383
384        let chunk_end = if let Some(pos) = last_newline {
385            pos + 1
386        } else if self.position + bytes_read as u64 >= self.file_size {
387            bytes_read
388        } else {
389            // No newline found and not at end of file, need to read more
390            return Err(TextError::IoError(
391                "Chunk size too small to contain a complete line".into(),
392            ));
393        };
394
395        let chunk_str = std::str::from_utf8(&buffer[..chunk_end])
396            .map_err(|e| TextError::IoError(format!("Invalid UTF-8: {e}")))?;
397
398        let lines: Vec<String> = chunk_str.lines().map(|s| s.to_string()).collect();
399
400        self.position += chunk_end as u64;
401
402        Ok(Some(lines))
403    }
404
405    /// Reset to the beginning of the file
406    pub fn reset(&mut self) -> Result<()> {
407        self.position = 0;
408        self.file
409            .seek(SeekFrom::Start(0))
410            .map_err(|e| TextError::IoError(format!("Failed to seek: {e}")))?;
411        Ok(())
412    }
413}
414
415/// Multi-file corpus for handling large distributed corpora
416pub struct MultiFileCorpus {
417    files: Vec<MemoryMappedCorpus>,
418    file_boundaries: Vec<usize>, // Cumulative document counts
419    total_documents: usize,
420}
421
422impl MultiFileCorpus {
423    /// Create corpus from multiple files
424    pub fn from_files<P: AsRef<Path>>(paths: &[P]) -> Result<Self> {
425        let mut files = Vec::new();
426        let mut file_boundaries = vec![0];
427        let mut total_documents = 0;
428
429        for path in paths {
430            let corpus = MemoryMappedCorpus::from_file(path)?;
431            let doc_count = corpus.num_documents();
432            total_documents += doc_count;
433
434            files.push(corpus);
435            file_boundaries.push(total_documents);
436        }
437
438        Ok(Self {
439            files,
440            file_boundaries,
441            total_documents,
442        })
443    }
444
445    /// Get total number of documents across all files
446    pub fn num_documents(&self) -> usize {
447        self.total_documents
448    }
449
450    /// Get document by global index
451    pub fn get_document(&self, globalindex: usize) -> Result<&str> {
452        if globalindex >= self.total_documents {
453            return Err(TextError::InvalidInput(format!(
454                "Document _index {globalindex} out of range"
455            )));
456        }
457
458        // Find which file contains this document
459        let file_idx = match self.file_boundaries.binary_search(&(globalindex + 1)) {
460            Ok(idx) => {
461                // Found exact match, means we're at a boundary
462                // The document belongs to the previous file
463                if idx == 0 {
464                    0
465                } else {
466                    idx - 1
467                }
468            }
469            Err(idx) => {
470                // Not found, idx is insertion point
471                if idx == 0 {
472                    0
473                } else {
474                    idx - 1
475                }
476            }
477        };
478
479        let local_index = globalindex.saturating_sub(self.file_boundaries[file_idx]);
480        self.files[file_idx].get_document(local_index)
481    }
482
483    /// Iterate over all documents across files
484    pub fn iter(&self) -> MultiFileIterator {
485        MultiFileIterator {
486            corpus: self,
487            current: 0,
488        }
489    }
490
491    /// Get random sample of documents
492    pub fn random_sample(&self, samplesize: usize, seed: u64) -> Result<Vec<&str>> {
493        use std::collections::HashSet;
494
495        if samplesize > self.total_documents {
496            return Err(TextError::InvalidInput(
497                "Sample _size larger than corpus _size".into(),
498            ));
499        }
500
501        let mut rng = seed;
502        let mut selected = HashSet::new();
503        let mut samples = Vec::new();
504
505        while samples.len() < samplesize {
506            // Simple LCG for deterministic random numbers
507            rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
508            let index = (rng % self.total_documents as u64) as usize;
509
510            if selected.insert(index) {
511                samples.push(self.get_document(index)?);
512            }
513        }
514
515        Ok(samples)
516    }
517}
518
519/// Iterator for multi-file corpus
520pub struct MultiFileIterator<'a> {
521    corpus: &'a MultiFileCorpus,
522    current: usize,
523}
524
525impl<'a> Iterator for MultiFileIterator<'a> {
526    type Item = Result<&'a str>;
527
528    fn next(&mut self) -> Option<Self::Item> {
529        if self.current >= self.corpus.num_documents() {
530            return None;
531        }
532
533        let doc = self.corpus.get_document(self.current);
534        self.current += 1;
535        Some(doc)
536    }
537}
538
539/// Cached memory-mapped corpus with LRU caching for frequently accessed documents
540pub struct CachedCorpus {
541    corpus: MemoryMappedCorpus,
542    cache: std::collections::HashMap<usize, String>,
543    access_order: std::collections::VecDeque<usize>,
544    cache_size: usize,
545}
546
547impl CachedCorpus {
548    /// Create cached corpus with specified cache size
549    pub fn new(_corpus: MemoryMappedCorpus, cachesize: usize) -> Self {
550        Self {
551            corpus: _corpus,
552            cache: std::collections::HashMap::new(),
553            access_order: std::collections::VecDeque::new(),
554            cache_size: cachesize,
555        }
556    }
557
558    /// Get document with caching
559    pub fn get_document(&mut self, index: usize) -> Result<String> {
560        // Check cache first
561        if let Some(doc) = self.cache.get(&index) {
562            // Move to front of access order
563            if let Some(pos) = self.access_order.iter().position(|&x| x == index) {
564                self.access_order.remove(pos);
565            }
566            self.access_order.push_front(index);
567            return Ok(doc.clone());
568        }
569
570        // Not in cache, get from corpus
571        let doc = self.corpus.get_document(index)?.to_string();
572
573        // Add to cache
574        if self.cache.len() >= self.cache_size {
575            // Remove least recently used
576            if let Some(lru_index) = self.access_order.pop_back() {
577                self.cache.remove(&lru_index);
578            }
579        }
580
581        let doc_clone = doc.clone();
582        self.cache.insert(index, doc);
583        self.access_order.push_front(index);
584
585        Ok(doc_clone)
586    }
587
588    /// Get cache hit rate
589    pub fn cache_hit_rate(&self) -> f64 {
590        if self.access_order.is_empty() {
591            0.0
592        } else {
593            self.cache.len() as f64 / self.access_order.len() as f64
594        }
595    }
596}
597
598/// Advanced indexing for fast text search in large corpora
599pub struct CorpusIndex {
600    word_to_docs: std::collections::HashMap<String, Vec<usize>>,
601    #[allow(dead_code)]
602    doc_to_words: Vec<std::collections::HashSet<String>>,
603}
604
605impl CorpusIndex {
606    /// Build index from corpus
607    pub fn build<T: Tokenizer>(corpus: &MemoryMappedCorpus, tokenizer: &T) -> Result<Self> {
608        let mut word_to_docs = std::collections::HashMap::new();
609        let mut doc_to_words = Vec::new();
610
611        for doc_idx in 0..corpus.num_documents() {
612            let doc = corpus.get_document(doc_idx)?;
613            let tokens = tokenizer.tokenize(doc)?;
614            let unique_tokens: std::collections::HashSet<String> = tokens.into_iter().collect();
615
616            for token in &unique_tokens {
617                word_to_docs
618                    .entry(token.clone())
619                    .or_insert_with(Vec::new)
620                    .push(doc_idx);
621            }
622
623            doc_to_words.push(unique_tokens);
624        }
625
626        Ok(Self {
627            word_to_docs,
628            doc_to_words,
629        })
630    }
631
632    /// Find documents containing a specific word
633    pub fn find_documents_with_word(&self, word: &str) -> Option<&[usize]> {
634        self.word_to_docs.get(word).map(|v| v.as_slice())
635    }
636
637    /// Find documents containing all words
638    pub fn find_documents_with_all_words(&self, words: &[&str]) -> Vec<usize> {
639        if words.is_empty() {
640            return Vec::new();
641        }
642
643        let mut result: Option<std::collections::HashSet<usize>> = None;
644
645        for &word in words {
646            if let Some(docs) = self.word_to_docs.get(word) {
647                let doc_set: std::collections::HashSet<usize> = docs.iter().copied().collect();
648
649                result = match result {
650                    None => Some(doc_set),
651                    Some(mut existing) => {
652                        existing.retain(|doc| doc_set.contains(doc));
653                        Some(existing)
654                    }
655                };
656            } else {
657                // Word not found, no documents match
658                return Vec::new();
659            }
660        }
661
662        result
663            .map(|set| set.into_iter().collect())
664            .unwrap_or_default()
665    }
666
667    /// Get vocabulary size
668    pub fn vocabulary_size(&self) -> usize {
669        self.word_to_docs.len()
670    }
671}
672
673/// Memory usage monitor for large corpus operations
674pub struct MemoryMonitor {
675    peak_usage: usize,
676    current_usage: usize,
677    warnings_enabled: bool,
678    warning_threshold: usize,
679}
680
681impl MemoryMonitor {
682    /// Create new memory monitor
683    pub fn new() -> Self {
684        Self {
685            peak_usage: 0,
686            current_usage: 0,
687            warnings_enabled: true,
688            warning_threshold: 1024 * 1024 * 1024, // 1GB default
689        }
690    }
691
692    /// Set warning threshold in bytes
693    pub fn with_warning_threshold(mut self, threshold: usize) -> Self {
694        self.warning_threshold = threshold;
695        self
696    }
697
698    /// Track memory allocation
699    pub fn allocate(&mut self, size: usize) {
700        self.current_usage += size;
701        self.peak_usage = self.peak_usage.max(self.current_usage);
702
703        if self.warnings_enabled && self.current_usage > self.warning_threshold {
704            eprintln!(
705                "Memory warning: Current usage {} MB exceeds threshold {} MB",
706                self.current_usage / (1024 * 1024),
707                self.warning_threshold / (1024 * 1024)
708            );
709        }
710    }
711
712    /// Track memory deallocation
713    pub fn deallocate(&mut self, size: usize) {
714        self.current_usage = self.current_usage.saturating_sub(size);
715    }
716
717    /// Get current memory usage in bytes
718    pub fn current_usage(&self) -> usize {
719        self.current_usage
720    }
721
722    /// Get peak memory usage in bytes
723    pub fn peak_usage(&self) -> usize {
724        self.peak_usage
725    }
726
727    /// Reset statistics
728    pub fn reset(&mut self) {
729        self.peak_usage = 0;
730        self.current_usage = 0;
731    }
732}
733
734impl Default for MemoryMonitor {
735    fn default() -> Self {
736        Self::new()
737    }
738}
739
740/// Advanced streaming processor with parallel processing and monitoring
741pub struct AdvancedStreamingProcessor<T: Tokenizer> {
742    tokenizer: T,
743    buffer_size: usize,
744    parallel_chunks: usize,
745    memory_monitor: MemoryMonitor,
746}
747
748impl<T: Tokenizer + Send + Sync> AdvancedStreamingProcessor<T> {
749    /// Create new advanced streaming processor
750    pub fn new(tokenizer: T) -> Self {
751        Self {
752            tokenizer,
753            buffer_size: 1024 * 1024, // 1MB
754            parallel_chunks: num_cpus::get(),
755            memory_monitor: MemoryMonitor::new(),
756        }
757    }
758
759    /// Set parallel processing parameters
760    pub fn with_parallelism(mut self, chunks: usize, buffersize: usize) -> Self {
761        self.parallel_chunks = chunks;
762        self.buffer_size = buffersize;
763        self
764    }
765
766    /// Process corpus with parallel memory-mapped chunks
767    pub fn process_corpus_parallel<F, R>(
768        &mut self,
769        corpus: &MemoryMappedCorpus,
770        processor: F,
771    ) -> Result<Vec<R>>
772    where
773        F: Fn(&str, usize) -> Result<R> + Send + Sync,
774        R: Send,
775    {
776        use scirs2_core::parallel_ops::*;
777
778        let num_docs = corpus.num_documents();
779        let chunksize = num_docs.div_ceil(self.parallel_chunks);
780
781        // Track memory usage
782        let estimated_memory = num_docs * 100; // Rough estimate
783        self.memory_monitor.allocate(estimated_memory);
784
785        let results: Vec<R> = (0..self.parallel_chunks)
786            .into_par_iter()
787            .map(|chunk_idx| {
788                let start = chunk_idx * chunksize;
789                let end = ((chunk_idx + 1) * chunksize).min(num_docs);
790
791                let mut chunk_results = Vec::new();
792                for doc_idx in start..end {
793                    let doc = corpus.get_document(doc_idx)?;
794                    let result = processor(doc, doc_idx)?;
795                    chunk_results.push(result);
796                }
797                Ok(chunk_results)
798            })
799            .collect::<Result<Vec<_>>>()?
800            .into_iter()
801            .flatten()
802            .collect();
803
804        self.memory_monitor.deallocate(estimated_memory);
805        Ok(results)
806    }
807
808    /// Build advanced statistics from corpus
809    pub fn build_corpus_statistics(
810        &mut self,
811        corpus: &MemoryMappedCorpus,
812    ) -> Result<CorpusStatistics> {
813        let mut stats = CorpusStatistics::new();
814
815        // Clone tokenizer to avoid borrow conflict
816        let tokenizer = self.tokenizer.clone_box();
817
818        self.process_corpus_parallel(corpus, move |doc, _doc_idx| {
819            let tokens = tokenizer.tokenize(doc)?;
820            let char_count = doc.chars().count();
821            let word_count = tokens.len();
822            let line_count = doc.lines().count();
823
824            Ok(DocumentStats {
825                char_count,
826                word_count,
827                line_count,
828                unique_words: tokens
829                    .into_iter()
830                    .collect::<std::collections::HashSet<_>>()
831                    .len(),
832            })
833        })?
834        .into_iter()
835        .for_each(|doc_stats| stats.add_document(doc_stats));
836
837        Ok(stats)
838    }
839
840    /// Get memory usage statistics
841    pub fn memory_stats(&self) -> (usize, usize) {
842        (
843            self.memory_monitor.current_usage(),
844            self.memory_monitor.peak_usage(),
845        )
846    }
847}
848
849/// Statistics for corpus analysis
850#[derive(Debug, Clone)]
851pub struct CorpusStatistics {
852    /// Total number of documents in the corpus
853    pub total_documents: usize,
854    /// Total number of words across all documents
855    pub total_words: usize,
856    /// Total number of characters across all documents
857    pub total_chars: usize,
858    /// Total number of lines across all documents
859    pub total_lines: usize,
860    /// Size of the vocabulary (unique words)
861    pub vocabulary_size: usize,
862    /// Average document length in words
863    pub avg_doc_length: f64,
864    /// Average words per line
865    pub avg_words_per_line: f64,
866}
867
868impl CorpusStatistics {
869    /// Create new empty statistics
870    pub fn new() -> Self {
871        Self {
872            total_documents: 0,
873            total_words: 0,
874            total_chars: 0,
875            total_lines: 0,
876            vocabulary_size: 0,
877            avg_doc_length: 0.0,
878            avg_words_per_line: 0.0,
879        }
880    }
881
882    /// Add document statistics
883    pub fn add_document(&mut self, docstats: DocumentStats) {
884        self.total_documents += 1;
885        self.total_words += docstats.word_count;
886        self.total_chars += docstats.char_count;
887        self.total_lines += docstats.line_count;
888        self.vocabulary_size += docstats.unique_words;
889
890        // Recalculate averages
891        self.avg_doc_length = self.total_words as f64 / self.total_documents as f64;
892        self.avg_words_per_line = if self.total_lines > 0 {
893            self.total_words as f64 / self.total_lines as f64
894        } else {
895            0.0
896        };
897    }
898}
899
900impl Default for CorpusStatistics {
901    fn default() -> Self {
902        Self::new()
903    }
904}
905
906/// Statistics for individual documents
907#[derive(Debug, Clone)]
908pub struct DocumentStats {
909    /// Number of characters in the document
910    pub char_count: usize,
911    /// Number of words in the document
912    pub word_count: usize,
913    /// Number of lines in the document
914    pub line_count: usize,
915    /// Number of unique words in the document
916    pub unique_words: usize,
917}
918
919/// Progress tracker for long-running operations
920pub struct ProgressTracker {
921    total: usize,
922    current: usize,
923    report_interval: usize,
924}
925
926impl ProgressTracker {
927    /// Create a new progress tracker
928    pub fn new(total: usize) -> Self {
929        Self {
930            total,
931            current: 0,
932            report_interval: total / 100, // Report every 1%
933        }
934    }
935
936    /// Update progress
937    pub fn update(&mut self, count: usize) {
938        self.current += count;
939
940        if self.current.is_multiple_of(self.report_interval) || self.current >= self.total {
941            let percentage = (self.current as f64 / self.total as f64) * 100.0;
942            println!(
943                "Progress: {:.1}% ({}/{})",
944                percentage, self.current, self.total
945            );
946        }
947    }
948
949    /// Check if complete
950    pub fn is_complete(&self) -> bool {
951        self.current >= self.total
952    }
953}
954
955#[cfg(test)]
956mod tests {
957    use super::*;
958    use std::io::Write;
959    use tempfile::NamedTempFile;
960
961    #[test]
962    fn test_memory_mapped_corpus() {
963        // Create a temporary file with test data
964        let mut file = NamedTempFile::new().unwrap();
965        writeln!(file, "First document").unwrap();
966        writeln!(file, "Second document").unwrap();
967        writeln!(file, "Third document").unwrap();
968        file.flush().unwrap();
969
970        let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
971
972        assert_eq!(corpus.num_documents(), 3);
973        assert_eq!(corpus.get_document(0).unwrap(), "First document");
974        assert_eq!(corpus.get_document(1).unwrap(), "Second document");
975        assert_eq!(corpus.get_document(2).unwrap(), "Third document");
976    }
977
978    #[test]
979    fn test_streaming_processor() {
980        let mut file = NamedTempFile::new().unwrap();
981        writeln!(file, "hello world").unwrap();
982        writeln!(file, "foo bar baz").unwrap();
983        file.flush().unwrap();
984
985        let processor = StreamingTextProcessor::with_default_tokenizer();
986        let mut line_count = 0;
987
988        processor
989            .process_lines(file.path(), |_line, _line_num| {
990                line_count += 1;
991                Ok(None::<()>)
992            })
993            .unwrap();
994
995        assert_eq!(line_count, 2);
996    }
997
998    #[test]
999    fn test_chunked_reader() {
1000        let mut file = NamedTempFile::new().unwrap();
1001        for i in 0..100 {
1002            writeln!(file, "Line {i}").unwrap();
1003        }
1004        file.flush().unwrap();
1005
1006        let mut reader = ChunkedCorpusReader::new(file.path(), 256).unwrap();
1007        let mut total_lines = 0;
1008
1009        while let Some(lines) = reader.next_chunk().unwrap() {
1010            total_lines += lines.len();
1011        }
1012
1013        assert_eq!(total_lines, 100);
1014    }
1015
1016    #[test]
1017    fn test_streaming_vocabulary_building() {
1018        let mut file = NamedTempFile::new().unwrap();
1019        writeln!(file, "the quick brown fox").unwrap();
1020        writeln!(file, "the lazy dog").unwrap();
1021        writeln!(file, "the brown dog").unwrap();
1022        file.flush().unwrap();
1023
1024        let processor = StreamingTextProcessor::with_default_tokenizer();
1025        let vocab = processor
1026            .build_vocabulary_streaming(file.path(), 2)
1027            .unwrap();
1028
1029        // "the" appears 3 times, "brown" and "dog" appear 2 times each
1030        assert!(vocab.get_index("the").is_some());
1031        assert!(vocab.get_index("brown").is_some());
1032        assert!(vocab.get_index("dog").is_some());
1033
1034        // These appear only once and should be pruned
1035        assert!(vocab.get_index("quick").is_none());
1036        assert!(vocab.get_index("fox").is_none());
1037        assert!(vocab.get_index("lazy").is_none());
1038    }
1039
1040    #[test]
1041    fn test_multi_file_corpus() {
1042        // Create multiple test files
1043        let mut file1 = NamedTempFile::new().unwrap();
1044        writeln!(file1, "Document 1 line 1").unwrap();
1045        writeln!(file1, "Document 1 line 2").unwrap();
1046        file1.flush().unwrap();
1047
1048        let mut file2 = NamedTempFile::new().unwrap();
1049        writeln!(file2, "Document 2 line 1").unwrap();
1050        writeln!(file2, "Document 2 line 2").unwrap();
1051        writeln!(file2, "Document 2 line 3").unwrap();
1052        file2.flush().unwrap();
1053
1054        let paths = vec![file1.path(), file2.path()];
1055        let multi_corpus = MultiFileCorpus::from_files(&paths).unwrap();
1056
1057        assert_eq!(multi_corpus.num_documents(), 5); // 2 + 3 documents
1058        assert_eq!(multi_corpus.get_document(0).unwrap(), "Document 1 line 1");
1059        assert_eq!(multi_corpus.get_document(2).unwrap(), "Document 2 line 1");
1060        assert_eq!(multi_corpus.get_document(4).unwrap(), "Document 2 line 3");
1061    }
1062
1063    #[test]
1064    fn test_multi_file_random_sampling() {
1065        let mut file1 = NamedTempFile::new().unwrap();
1066        for i in 0..10 {
1067            writeln!(file1, "File1 Doc {i}").unwrap();
1068        }
1069        file1.flush().unwrap();
1070
1071        let mut file2 = NamedTempFile::new().unwrap();
1072        for i in 0..10 {
1073            writeln!(file2, "File2 Doc {i}").unwrap();
1074        }
1075        file2.flush().unwrap();
1076
1077        let paths = vec![file1.path(), file2.path()];
1078        let multi_corpus = MultiFileCorpus::from_files(&paths).unwrap();
1079
1080        let sample = multi_corpus.random_sample(5, 12345).unwrap();
1081        assert_eq!(sample.len(), 5);
1082
1083        // Should be deterministic with same seed
1084        let sample2 = multi_corpus.random_sample(5, 12345).unwrap();
1085        assert_eq!(sample, sample2);
1086    }
1087
1088    #[test]
1089    fn test_cached_corpus() {
1090        let mut file = NamedTempFile::new().unwrap();
1091        for i in 0..10 {
1092            writeln!(file, "Document {i}").unwrap();
1093        }
1094        file.flush().unwrap();
1095
1096        let base_corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1097        let mut cached_corpus = CachedCorpus::new(base_corpus, 3);
1098
1099        // Access documents
1100        let doc0 = cached_corpus.get_document(0).unwrap();
1101        let doc1 = cached_corpus.get_document(1).unwrap();
1102        let doc2 = cached_corpus.get_document(2).unwrap();
1103
1104        assert_eq!(doc0, "Document 0");
1105        assert_eq!(doc1, "Document 1");
1106        assert_eq!(doc2, "Document 2");
1107
1108        // Access doc0 again - should be cached
1109        let doc0_again = cached_corpus.get_document(0).unwrap();
1110        assert_eq!(doc0_again, "Document 0");
1111
1112        // Cache should have good hit rate for repeated access
1113        let hit_rate = cached_corpus.cache_hit_rate();
1114        assert!(hit_rate > 0.0);
1115    }
1116
1117    #[test]
1118    fn test_corpus_index() {
1119        let mut file = NamedTempFile::new().unwrap();
1120        writeln!(file, "the quick brown fox").unwrap();
1121        writeln!(file, "the lazy dog").unwrap();
1122        writeln!(file, "quick brown animal").unwrap();
1123        file.flush().unwrap();
1124
1125        let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1126        let tokenizer = WordTokenizer::default();
1127        let index = CorpusIndex::build(&corpus, &tokenizer).unwrap();
1128
1129        // Test single word search
1130        let docs_with_quick = index.find_documents_with_word("quick").unwrap();
1131        assert_eq!(docs_with_quick.len(), 2); // Documents 0 and 2
1132
1133        // Test multi-word search
1134        let docs_with_all = index.find_documents_with_all_words(&["the", "brown"]);
1135        assert_eq!(docs_with_all.len(), 1); // Only document 0
1136
1137        // Test vocabulary size
1138        assert!(index.vocabulary_size() > 0);
1139    }
1140
1141    #[test]
1142    fn test_memory_monitor() {
1143        let mut monitor = MemoryMonitor::new().with_warning_threshold(1000);
1144
1145        assert_eq!(monitor.current_usage(), 0);
1146        assert_eq!(monitor.peak_usage(), 0);
1147
1148        monitor.allocate(500);
1149        assert_eq!(monitor.current_usage(), 500);
1150        assert_eq!(monitor.peak_usage(), 500);
1151
1152        monitor.allocate(300);
1153        assert_eq!(monitor.current_usage(), 800);
1154        assert_eq!(monitor.peak_usage(), 800);
1155
1156        monitor.deallocate(200);
1157        assert_eq!(monitor.current_usage(), 600);
1158        assert_eq!(monitor.peak_usage(), 800); // Peak should remain
1159
1160        monitor.reset();
1161        assert_eq!(monitor.current_usage(), 0);
1162        assert_eq!(monitor.peak_usage(), 0);
1163    }
1164
1165    #[test]
1166    fn test_advanced_streaming_processor() {
1167        let mut file = NamedTempFile::new().unwrap();
1168        writeln!(file, "hello world").unwrap();
1169        writeln!(file, "foo bar baz").unwrap();
1170        writeln!(file, "test document").unwrap();
1171        file.flush().unwrap();
1172
1173        let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1174        let tokenizer = WordTokenizer::default();
1175        let mut processor = AdvancedStreamingProcessor::new(tokenizer);
1176
1177        let results = processor
1178            .process_corpus_parallel(&corpus, |doc, idx| {
1179                let doc_len = doc.len();
1180                Ok(format!("Processed doc {idx}: {doc_len}"))
1181            })
1182            .unwrap();
1183
1184        assert_eq!(results.len(), 3);
1185        assert!(results[0].contains("Processed doc 0"));
1186        assert!(results[1].contains("Processed doc 1"));
1187        assert!(results[2].contains("Processed doc 2"));
1188
1189        // Test memory stats
1190        let (current, _peak) = processor.memory_stats();
1191        assert_eq!(current, 0); // Should be deallocated after processing
1192    }
1193
1194    #[test]
1195    fn test_corpus_statistics() {
1196        let mut file = NamedTempFile::new().unwrap();
1197        writeln!(file, "hello world test").unwrap();
1198        writeln!(file, "foo bar").unwrap();
1199        writeln!(file, "single").unwrap();
1200        file.flush().unwrap();
1201
1202        let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1203        let tokenizer = WordTokenizer::default();
1204        let mut processor = AdvancedStreamingProcessor::new(tokenizer);
1205
1206        let stats = processor.build_corpus_statistics(&corpus).unwrap();
1207
1208        assert_eq!(stats.total_documents, 3);
1209        assert_eq!(stats.total_words, 6); // 3 + 2 + 1
1210        assert!(stats.avg_doc_length > 0.0);
1211        assert_eq!(stats.total_lines, 3);
1212    }
1213
1214    #[test]
1215    fn test_document_stats() {
1216        let mut stats = CorpusStatistics::new();
1217
1218        let doc_stats1 = DocumentStats {
1219            char_count: 100,
1220            word_count: 20,
1221            line_count: 5,
1222            unique_words: 15,
1223        };
1224
1225        let doc_stats2 = DocumentStats {
1226            char_count: 50,
1227            word_count: 10,
1228            line_count: 2,
1229            unique_words: 8,
1230        };
1231
1232        stats.add_document(doc_stats1);
1233        stats.add_document(doc_stats2);
1234
1235        assert_eq!(stats.total_documents, 2);
1236        assert_eq!(stats.total_words, 30);
1237        assert_eq!(stats.total_chars, 150);
1238        assert_eq!(stats.total_lines, 7);
1239        assert_eq!(stats.avg_doc_length, 15.0); // 30 words / 2 docs
1240    }
1241
1242    #[test]
1243    fn test_corpus_index_edge_cases() {
1244        let mut file = NamedTempFile::new().unwrap();
1245        writeln!(file).unwrap(); // Empty document
1246        writeln!(file, "single").unwrap();
1247        file.flush().unwrap();
1248
1249        let corpus = MemoryMappedCorpus::from_file(file.path()).unwrap();
1250        let tokenizer = WordTokenizer::default();
1251        let index = CorpusIndex::build(&corpus, &tokenizer).unwrap();
1252
1253        // Search for non-existent word
1254        assert!(index.find_documents_with_word("nonexistent").is_none());
1255
1256        // Search with empty word list
1257        let empty_result = index.find_documents_with_all_words(&[]);
1258        assert!(empty_result.is_empty());
1259
1260        // Search for word that doesn't exist
1261        let no_match = index.find_documents_with_all_words(&["nonexistent"]);
1262        assert!(no_match.is_empty());
1263    }
1264
1265    #[test]
1266    fn test_multi_file_iterator() {
1267        let mut file1 = NamedTempFile::new().unwrap();
1268        writeln!(file1, "doc1").unwrap();
1269        writeln!(file1, "doc2").unwrap();
1270        file1.flush().unwrap();
1271
1272        let mut file2 = NamedTempFile::new().unwrap();
1273        writeln!(file2, "doc3").unwrap();
1274        file2.flush().unwrap();
1275
1276        let paths = vec![file1.path(), file2.path()];
1277        let multi_corpus = MultiFileCorpus::from_files(&paths).unwrap();
1278
1279        let docs: Result<Vec<_>> = multi_corpus.iter().collect();
1280        let docs = docs.unwrap();
1281
1282        assert_eq!(docs.len(), 3);
1283        assert_eq!(docs[0], "doc1");
1284        assert_eq!(docs[1], "doc2");
1285        assert_eq!(docs[2], "doc3");
1286    }
1287}