parallel_processing_demo/
parallel_processing_demo.rs

1//! Parallel processing demonstration
2
3use scirs2_text::{
4    ParallelCorpusProcessor, ParallelTextProcessor, ParallelTokenizer, ParallelVectorizer,
5    TfidfVectorizer, WordTokenizer,
6};
7use std::time::Instant;
8
9#[allow(dead_code)]
10fn main() -> Result<(), Box<dyn std::error::Error>> {
11    println!("Parallel Text Processing Demo");
12    println!("============================\n");
13
14    // Create test data with larger size to demonstrate parallelism
15    println!("Creating test data...");
16    let texts = create_testtexts(1000);
17
18    // Create references to handle &[&str] requirements
19    let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
20
21    println!("Total documents: {}", texts.len());
22    println!("Example document: {}", texts[0]);
23
24    // 1. Simple Parallel Text Processing
25    println!("\n1. Basic Parallel Processing");
26    println!("---------------------------");
27
28    let processor = ParallelTextProcessor::new();
29
30    let start = Instant::now();
31    let word_counts = processor.process(&text_refs, |text| {
32        // Count words in each document
33        text.split_whitespace().count()
34    });
35    let duration = start.elapsed();
36
37    println!("Processed {} documents in {:.2?}", texts.len(), duration);
38    println!(
39        "Average word count: {:.2}",
40        word_counts.iter().sum::<usize>() as f64 / word_counts.len() as f64
41    );
42
43    // Sequential comparison
44    let start = Instant::now();
45    let _seq_word_counts: Vec<_> = texts
46        .iter()
47        .map(|text| text.split_whitespace().count())
48        .collect();
49    let seq_duration = start.elapsed();
50
51    println!("Sequential processing took {seq_duration:.2?}");
52    println!(
53        "Speedup factor: {:.2}x",
54        seq_duration.as_secs_f64() / duration.as_secs_f64()
55    );
56
57    // 2. Parallel Tokenization
58    println!("\n2. Parallel Tokenization");
59    println!("----------------------");
60
61    let tokenizer = ParallelTokenizer::new(WordTokenizer::new(true)); // Pass 'lowercase' parameter
62
63    let start = Instant::now();
64    let tokens = tokenizer.tokenize(&text_refs)?;
65    let duration = start.elapsed();
66
67    println!("Tokenized {} documents in {:.2?}", texts.len(), duration);
68    println!(
69        "Total tokens: {}",
70        tokens.iter().map(|t| t.len()).sum::<usize>()
71    );
72    println!(
73        "Sample tokens from first document: {:?}",
74        tokens[0].iter().take(5).collect::<Vec<_>>()
75    );
76
77    // Custom token processing
78    println!("\nCustom token processing...");
79    let start = Instant::now();
80    let token_stats = tokenizer.tokenize_and_map(&text_refs, |tokens| {
81        // Calculate token statistics
82        let count = tokens.len();
83        let avg_len = if count > 0 {
84            tokens.iter().map(|t| t.len()).sum::<usize>() as f64 / count as f64
85        } else {
86            0.0
87        };
88        (count, avg_len)
89    })?;
90    let duration = start.elapsed();
91
92    println!("Processed token statistics in {duration:.2?}");
93    println!(
94        "Average tokens per document: {:.2}",
95        token_stats.iter().map(|(count_, _)| *count_).sum::<usize>() as f64
96            / token_stats.len() as f64
97    );
98    println!(
99        "Average token length: {:.2}",
100        token_stats.iter().map(|(_, avg_len)| *avg_len).sum::<f64>() / token_stats.len() as f64
101    );
102
103    // 3. Parallel Vectorization
104    println!("\n3. Parallel Vectorization");
105    println!("------------------------");
106
107    // First fit the vectorizer
108    let mut vectorizer = TfidfVectorizer::default();
109    let start = Instant::now();
110
111    // Import the Vectorizer trait to use its methods
112    use scirs2_text::Vectorizer;
113    vectorizer.fit(&text_refs)?;
114    let fit_duration = start.elapsed();
115
116    println!("Fitted vectorizer in {fit_duration:.2?}");
117
118    // Now transform in parallel
119    let parallel_vectorizer = ParallelVectorizer::new(vectorizer).with_chunk_size(100);
120
121    let start = Instant::now();
122    let vectors = parallel_vectorizer.transform(&text_refs)?;
123    let transform_duration = start.elapsed();
124
125    println!(
126        "Transformed {} documents in {:.2?}",
127        texts.len(),
128        transform_duration
129    );
130    println!("Vector shape: {:?}", vectors.shape());
131    println!(
132        "Non-zero elements: {}",
133        vectors.iter().filter(|&&x| x > 0.0).count()
134    );
135
136    // 4. Batch Processing with Progress
137    println!("\n4. Batch Processing with Progress");
138    println!("--------------------------------");
139
140    let processor = ParallelCorpusProcessor::new(100).with_threads(num_cpus::get());
141
142    println!("Processing with {} threads...", num_cpus::get());
143    let start = Instant::now();
144
145    let last_progress = std::sync::Mutex::new(0);
146    let result = processor.process_with_progress(
147        &text_refs,
148        |batch| {
149            // Analyze batch of documents
150            let mut word_counts = Vec::new();
151            let mut char_counts = Vec::new();
152
153            for &text in batch {
154                word_counts.push(text.split_whitespace().count());
155                char_counts.push(text.chars().count());
156            }
157
158            Ok(word_counts.into_iter().zip(char_counts).collect::<Vec<_>>())
159        },
160        |current, total| {
161            // Only print progress updates at 10% intervals
162            let percent = current * 100 / total;
163            let mut last = last_progress.lock().unwrap();
164            if percent / 10 > *last / 10 {
165                println!("  Progress: {current}/{total}  ({percent}%)");
166                *last = percent;
167            }
168        },
169    )?;
170
171    let duration = start.elapsed();
172
173    println!("Processed {} documents in {:.2?}", texts.len(), duration);
174    println!(
175        "Average words per document: {:.2}",
176        result.iter().map(|(words_, _)| *words_).sum::<usize>() as f64 / result.len() as f64
177    );
178    println!(
179        "Average characters per document: {:.2}",
180        result.iter().map(|(_, chars)| chars).sum::<usize>() as f64 / result.len() as f64
181    );
182
183    // 5. Memory-efficient processing
184    println!("\n5. Memory-Efficient Large Corpus Processing");
185    println!("------------------------------------------");
186
187    println!("Simulating processing of a large corpus...");
188    let largetexts: Vec<&str> = text_refs.iter().cycle().take(5000).copied().collect();
189    println!("Large corpus size: {} documents", largetexts.len());
190
191    let processor = ParallelCorpusProcessor::new(250).with_max_memory(1024 * 1024 * 1024); // 1 GB limit
192
193    let start = Instant::now();
194    let summary = processor.process(&largetexts, |batch| {
195        // Compute simple statistics for the batch
196        let batch_size = batch.len();
197        let total_words: usize = batch
198            .iter()
199            .map(|&text| text.split_whitespace().count())
200            .sum();
201        let total_chars: usize = batch.iter().map(|&text| text.chars().count()).sum();
202
203        Ok(vec![(batch_size, total_words, total_chars)])
204    })?;
205    let duration = start.elapsed();
206
207    let total_words: usize = summary.iter().map(|(_, words_, _)| *words_).sum();
208    let total_chars: usize = summary.iter().map(|(_, _, chars)| *chars).sum();
209
210    println!("Processed large corpus in {duration:.2?}");
211    println!("Total words: {total_words}");
212    println!("Total chars: {total_chars}");
213    println!(
214        "Average processing speed: {:.2} documents/second",
215        largetexts.len() as f64 / duration.as_secs_f64()
216    );
217
218    Ok(())
219}
220
221#[allow(dead_code)]
222fn create_testtexts(size: usize) -> Vec<String> {
223    // Sample text fragments to combine randomly
224    let subjects = [
225        "Machine learning",
226        "Natural language processing",
227        "Data science",
228        "Artificial intelligence",
229        "Statistical analysis",
230        "Deep learning",
231        "Text mining",
232        "Information retrieval",
233        "Computational linguistics",
234    ];
235
236    let verbs = [
237        "transforms",
238        "revolutionizes",
239        "enhances",
240        "analyzes",
241        "processes",
242        "interprets",
243        "understands",
244        "models",
245        "extracts information from",
246    ];
247
248    let objects = [
249        "text documents",
250        "language patterns",
251        "unstructured data",
252        "communication systems",
253        "research methodologies",
254        "business decisions",
255        "customer feedback",
256        "social media content",
257        "scientific literature",
258    ];
259
260    let adjectives = [
261        "modern",
262        "complex",
263        "efficient",
264        "intelligent",
265        "advanced",
266        "innovative",
267        "powerful",
268        "sophisticated",
269        "state-of-the-art",
270    ];
271
272    let adverbs = [
273        "dramatically",
274        "significantly",
275        "effectively",
276        "precisely",
277        "rapidly",
278        "intelligently",
279        "thoroughly",
280        "fundamentally",
281        "increasingly",
282    ];
283
284    let mut texts = Vec::with_capacity(size);
285    let mut rng = scirs2_core::random::rng();
286
287    for _ in 0..size {
288        let subject = subjects[rng.next_u32() as usize % subjects.len()];
289        let verb = verbs[rng.next_u32() as usize % verbs.len()];
290        let object = objects[rng.next_u32() as usize % objects.len()];
291        let adjective = adjectives[rng.next_u32() as usize % adjectives.len()];
292        let adverb = adverbs[rng.next_u32() as usize % adverbs.len()];
293
294        // Create 2-3 sentences per document
295        let num_sentences = 2 + (rng.next_u32() % 2) as usize;
296        let mut sentences = Vec::with_capacity(num_sentences);
297
298        // First sentence
299        sentences.push(format!("{subject} {adverb} {verb} {object}."));
300
301        // Second sentence
302        sentences.push(format!(
303            "This {adjective} approach enables {adjective} applications in various domains."
304        ));
305
306        // Optional third sentence
307        if num_sentences > 2 {
308            let subject2 = subjects[rng.next_u32() as usize % subjects.len()];
309            let adverb2 = adverbs[rng.next_u32() as usize % adverbs.len()];
310            sentences.push(format!(
311                "{subject2} is {adverb2} improving with recent technological advances."
312            ));
313        }
314
315        texts.push(sentences.join(" "));
316    }
317
318    texts
319}
320
321trait Random {
322    fn next_u32(&mut self) -> u32;
323}
324
325impl Random for scirs2_core::random::rngs::ThreadRng {
326    fn next_u32(&mut self) -> u32 {
327        use scirs2_core::random::Rng;
328        self.random()
329    }
330}