pub struct ParallelTextProcessor { /* private fields */ }Expand description
Parallel text processor that can run multiple operations in parallel
Implementations§
Source§impl ParallelTextProcessor
impl ParallelTextProcessor
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new parallel text processor
Examples found in repository?
examples/parallel_processing_demo.rs (line 28)
10fn main() -> Result<(), Box<dyn std::error::Error>> {
11 println!("Parallel Text Processing Demo");
12 println!("============================\n");
13
14 // Create test data with larger size to demonstrate parallelism
15 println!("Creating test data...");
16 let texts = create_testtexts(1000);
17
18 // Create references to handle &[&str] requirements
19 let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
20
21 println!("Total documents: {}", texts.len());
22 println!("Example document: {}", texts[0]);
23
24 // 1. Simple Parallel Text Processing
25 println!("\n1. Basic Parallel Processing");
26 println!("---------------------------");
27
28 let processor = ParallelTextProcessor::new();
29
30 let start = Instant::now();
31 let word_counts = processor.process(&text_refs, |text| {
32 // Count words in each document
33 text.split_whitespace().count()
34 });
35 let duration = start.elapsed();
36
37 println!("Processed {} documents in {:.2?}", texts.len(), duration);
38 println!(
39 "Average word count: {:.2}",
40 word_counts.iter().sum::<usize>() as f64 / word_counts.len() as f64
41 );
42
43 // Sequential comparison
44 let start = Instant::now();
45 let _seq_word_counts: Vec<_> = texts
46 .iter()
47 .map(|text| text.split_whitespace().count())
48 .collect();
49 let seq_duration = start.elapsed();
50
51 println!("Sequential processing took {seq_duration:.2?}");
52 println!(
53 "Speedup factor: {:.2}x",
54 seq_duration.as_secs_f64() / duration.as_secs_f64()
55 );
56
57 // 2. Parallel Tokenization
58 println!("\n2. Parallel Tokenization");
59 println!("----------------------");
60
61 let tokenizer = ParallelTokenizer::new(WordTokenizer::new(true)); // Pass 'lowercase' parameter
62
63 let start = Instant::now();
64 let tokens = tokenizer.tokenize(&text_refs)?;
65 let duration = start.elapsed();
66
67 println!("Tokenized {} documents in {:.2?}", texts.len(), duration);
68 println!(
69 "Total tokens: {}",
70 tokens.iter().map(|t| t.len()).sum::<usize>()
71 );
72 println!(
73 "Sample tokens from first document: {:?}",
74 tokens[0].iter().take(5).collect::<Vec<_>>()
75 );
76
77 // Custom token processing
78 println!("\nCustom token processing...");
79 let start = Instant::now();
80 let token_stats = tokenizer.tokenize_and_map(&text_refs, |tokens| {
81 // Calculate token statistics
82 let count = tokens.len();
83 let avg_len = if count > 0 {
84 tokens.iter().map(|t| t.len()).sum::<usize>() as f64 / count as f64
85 } else {
86 0.0
87 };
88 (count, avg_len)
89 })?;
90 let duration = start.elapsed();
91
92 println!("Processed token statistics in {duration:.2?}");
93 println!(
94 "Average tokens per document: {:.2}",
95 token_stats.iter().map(|(count_, _)| *count_).sum::<usize>() as f64
96 / token_stats.len() as f64
97 );
98 println!(
99 "Average token length: {:.2}",
100 token_stats.iter().map(|(_, avg_len)| *avg_len).sum::<f64>() / token_stats.len() as f64
101 );
102
103 // 3. Parallel Vectorization
104 println!("\n3. Parallel Vectorization");
105 println!("------------------------");
106
107 // First fit the vectorizer
108 let mut vectorizer = TfidfVectorizer::default();
109 let start = Instant::now();
110
111 // Import the Vectorizer trait to use its methods
112 use scirs2_text::Vectorizer;
113 vectorizer.fit(&text_refs)?;
114 let fit_duration = start.elapsed();
115
116 println!("Fitted vectorizer in {fit_duration:.2?}");
117
118 // Now transform in parallel
119 let parallel_vectorizer = ParallelVectorizer::new(vectorizer).with_chunk_size(100);
120
121 let start = Instant::now();
122 let vectors = parallel_vectorizer.transform(&text_refs)?;
123 let transform_duration = start.elapsed();
124
125 println!(
126 "Transformed {} documents in {:.2?}",
127 texts.len(),
128 transform_duration
129 );
130 println!("Vector shape: {:?}", vectors.shape());
131 println!(
132 "Non-zero elements: {}",
133 vectors.iter().filter(|&&x| x > 0.0).count()
134 );
135
136 // 4. Batch Processing with Progress
137 println!("\n4. Batch Processing with Progress");
138 println!("--------------------------------");
139
140 let processor = ParallelCorpusProcessor::new(100).with_threads(num_cpus::get());
141
142 println!("Processing with {} threads...", num_cpus::get());
143 let start = Instant::now();
144
145 let last_progress = std::sync::Mutex::new(0);
146 let result = processor.process_with_progress(
147 &text_refs,
148 |batch| {
149 // Analyze batch of documents
150 let mut word_counts = Vec::new();
151 let mut char_counts = Vec::new();
152
153 for &text in batch {
154 word_counts.push(text.split_whitespace().count());
155 char_counts.push(text.chars().count());
156 }
157
158 Ok(word_counts.into_iter().zip(char_counts).collect::<Vec<_>>())
159 },
160 |current, total| {
161 // Only print progress updates at 10% intervals
162 let percent = current * 100 / total;
163 let mut last = last_progress.lock().unwrap();
164 if percent / 10 > *last / 10 {
165 println!(" Progress: {current}/{total} ({percent}%)");
166 *last = percent;
167 }
168 },
169 )?;
170
171 let duration = start.elapsed();
172
173 println!("Processed {} documents in {:.2?}", texts.len(), duration);
174 println!(
175 "Average words per document: {:.2}",
176 result.iter().map(|(words_, _)| *words_).sum::<usize>() as f64 / result.len() as f64
177 );
178 println!(
179 "Average characters per document: {:.2}",
180 result.iter().map(|(_, chars)| chars).sum::<usize>() as f64 / result.len() as f64
181 );
182
183 // 5. Memory-efficient processing
184 println!("\n5. Memory-Efficient Large Corpus Processing");
185 println!("------------------------------------------");
186
187 println!("Simulating processing of a large corpus...");
188 let largetexts: Vec<&str> = text_refs.iter().cycle().take(5000).copied().collect();
189 println!("Large corpus size: {} documents", largetexts.len());
190
191 let processor = ParallelCorpusProcessor::new(250).with_max_memory(1024 * 1024 * 1024); // 1 GB limit
192
193 let start = Instant::now();
194 let summary = processor.process(&largetexts, |batch| {
195 // Compute simple statistics for the batch
196 let batch_size = batch.len();
197 let total_words: usize = batch
198 .iter()
199 .map(|&text| text.split_whitespace().count())
200 .sum();
201 let total_chars: usize = batch.iter().map(|&text| text.chars().count()).sum();
202
203 Ok(vec![(batch_size, total_words, total_chars)])
204 })?;
205 let duration = start.elapsed();
206
207 let total_words: usize = summary.iter().map(|(_, words_, _)| *words_).sum();
208 let total_chars: usize = summary.iter().map(|(_, _, chars)| *chars).sum();
209
210 println!("Processed large corpus in {duration:.2?}");
211 println!("Total words: {total_words}");
212 println!("Total chars: {total_chars}");
213 println!(
214 "Average processing speed: {:.2} documents/second",
215 largetexts.len() as f64 / duration.as_secs_f64()
216 );
217
218 Ok(())
219}Sourcepub fn with_threads(self, numthreads: usize) -> Self
pub fn with_threads(self, numthreads: usize) -> Self
Set the number of threads
Sourcepub fn process<F, R>(&self, texts: &[&str], f: F) -> Vec<R>
pub fn process<F, R>(&self, texts: &[&str], f: F) -> Vec<R>
Process texts in parallel with a given function
Examples found in repository?
examples/parallel_processing_demo.rs (lines 31-34)
10fn main() -> Result<(), Box<dyn std::error::Error>> {
11 println!("Parallel Text Processing Demo");
12 println!("============================\n");
13
14 // Create test data with larger size to demonstrate parallelism
15 println!("Creating test data...");
16 let texts = create_testtexts(1000);
17
18 // Create references to handle &[&str] requirements
19 let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
20
21 println!("Total documents: {}", texts.len());
22 println!("Example document: {}", texts[0]);
23
24 // 1. Simple Parallel Text Processing
25 println!("\n1. Basic Parallel Processing");
26 println!("---------------------------");
27
28 let processor = ParallelTextProcessor::new();
29
30 let start = Instant::now();
31 let word_counts = processor.process(&text_refs, |text| {
32 // Count words in each document
33 text.split_whitespace().count()
34 });
35 let duration = start.elapsed();
36
37 println!("Processed {} documents in {:.2?}", texts.len(), duration);
38 println!(
39 "Average word count: {:.2}",
40 word_counts.iter().sum::<usize>() as f64 / word_counts.len() as f64
41 );
42
43 // Sequential comparison
44 let start = Instant::now();
45 let _seq_word_counts: Vec<_> = texts
46 .iter()
47 .map(|text| text.split_whitespace().count())
48 .collect();
49 let seq_duration = start.elapsed();
50
51 println!("Sequential processing took {seq_duration:.2?}");
52 println!(
53 "Speedup factor: {:.2}x",
54 seq_duration.as_secs_f64() / duration.as_secs_f64()
55 );
56
57 // 2. Parallel Tokenization
58 println!("\n2. Parallel Tokenization");
59 println!("----------------------");
60
61 let tokenizer = ParallelTokenizer::new(WordTokenizer::new(true)); // Pass 'lowercase' parameter
62
63 let start = Instant::now();
64 let tokens = tokenizer.tokenize(&text_refs)?;
65 let duration = start.elapsed();
66
67 println!("Tokenized {} documents in {:.2?}", texts.len(), duration);
68 println!(
69 "Total tokens: {}",
70 tokens.iter().map(|t| t.len()).sum::<usize>()
71 );
72 println!(
73 "Sample tokens from first document: {:?}",
74 tokens[0].iter().take(5).collect::<Vec<_>>()
75 );
76
77 // Custom token processing
78 println!("\nCustom token processing...");
79 let start = Instant::now();
80 let token_stats = tokenizer.tokenize_and_map(&text_refs, |tokens| {
81 // Calculate token statistics
82 let count = tokens.len();
83 let avg_len = if count > 0 {
84 tokens.iter().map(|t| t.len()).sum::<usize>() as f64 / count as f64
85 } else {
86 0.0
87 };
88 (count, avg_len)
89 })?;
90 let duration = start.elapsed();
91
92 println!("Processed token statistics in {duration:.2?}");
93 println!(
94 "Average tokens per document: {:.2}",
95 token_stats.iter().map(|(count_, _)| *count_).sum::<usize>() as f64
96 / token_stats.len() as f64
97 );
98 println!(
99 "Average token length: {:.2}",
100 token_stats.iter().map(|(_, avg_len)| *avg_len).sum::<f64>() / token_stats.len() as f64
101 );
102
103 // 3. Parallel Vectorization
104 println!("\n3. Parallel Vectorization");
105 println!("------------------------");
106
107 // First fit the vectorizer
108 let mut vectorizer = TfidfVectorizer::default();
109 let start = Instant::now();
110
111 // Import the Vectorizer trait to use its methods
112 use scirs2_text::Vectorizer;
113 vectorizer.fit(&text_refs)?;
114 let fit_duration = start.elapsed();
115
116 println!("Fitted vectorizer in {fit_duration:.2?}");
117
118 // Now transform in parallel
119 let parallel_vectorizer = ParallelVectorizer::new(vectorizer).with_chunk_size(100);
120
121 let start = Instant::now();
122 let vectors = parallel_vectorizer.transform(&text_refs)?;
123 let transform_duration = start.elapsed();
124
125 println!(
126 "Transformed {} documents in {:.2?}",
127 texts.len(),
128 transform_duration
129 );
130 println!("Vector shape: {:?}", vectors.shape());
131 println!(
132 "Non-zero elements: {}",
133 vectors.iter().filter(|&&x| x > 0.0).count()
134 );
135
136 // 4. Batch Processing with Progress
137 println!("\n4. Batch Processing with Progress");
138 println!("--------------------------------");
139
140 let processor = ParallelCorpusProcessor::new(100).with_threads(num_cpus::get());
141
142 println!("Processing with {} threads...", num_cpus::get());
143 let start = Instant::now();
144
145 let last_progress = std::sync::Mutex::new(0);
146 let result = processor.process_with_progress(
147 &text_refs,
148 |batch| {
149 // Analyze batch of documents
150 let mut word_counts = Vec::new();
151 let mut char_counts = Vec::new();
152
153 for &text in batch {
154 word_counts.push(text.split_whitespace().count());
155 char_counts.push(text.chars().count());
156 }
157
158 Ok(word_counts.into_iter().zip(char_counts).collect::<Vec<_>>())
159 },
160 |current, total| {
161 // Only print progress updates at 10% intervals
162 let percent = current * 100 / total;
163 let mut last = last_progress.lock().unwrap();
164 if percent / 10 > *last / 10 {
165 println!(" Progress: {current}/{total} ({percent}%)");
166 *last = percent;
167 }
168 },
169 )?;
170
171 let duration = start.elapsed();
172
173 println!("Processed {} documents in {:.2?}", texts.len(), duration);
174 println!(
175 "Average words per document: {:.2}",
176 result.iter().map(|(words_, _)| *words_).sum::<usize>() as f64 / result.len() as f64
177 );
178 println!(
179 "Average characters per document: {:.2}",
180 result.iter().map(|(_, chars)| chars).sum::<usize>() as f64 / result.len() as f64
181 );
182
183 // 5. Memory-efficient processing
184 println!("\n5. Memory-Efficient Large Corpus Processing");
185 println!("------------------------------------------");
186
187 println!("Simulating processing of a large corpus...");
188 let largetexts: Vec<&str> = text_refs.iter().cycle().take(5000).copied().collect();
189 println!("Large corpus size: {} documents", largetexts.len());
190
191 let processor = ParallelCorpusProcessor::new(250).with_max_memory(1024 * 1024 * 1024); // 1 GB limit
192
193 let start = Instant::now();
194 let summary = processor.process(&largetexts, |batch| {
195 // Compute simple statistics for the batch
196 let batch_size = batch.len();
197 let total_words: usize = batch
198 .iter()
199 .map(|&text| text.split_whitespace().count())
200 .sum();
201 let total_chars: usize = batch.iter().map(|&text| text.chars().count()).sum();
202
203 Ok(vec![(batch_size, total_words, total_chars)])
204 })?;
205 let duration = start.elapsed();
206
207 let total_words: usize = summary.iter().map(|(_, words_, _)| *words_).sum();
208 let total_chars: usize = summary.iter().map(|(_, _, chars)| *chars).sum();
209
210 println!("Processed large corpus in {duration:.2?}");
211 println!("Total words: {total_words}");
212 println!("Total chars: {total_chars}");
213 println!(
214 "Average processing speed: {:.2} documents/second",
215 largetexts.len() as f64 / duration.as_secs_f64()
216 );
217
218 Ok(())
219}Sourcepub fn process_and_flatten<F, R>(&self, texts: &[&str], f: F) -> Vec<R>
pub fn process_and_flatten<F, R>(&self, texts: &[&str], f: F) -> Vec<R>
Process texts in parallel and flatten the results
Trait Implementations§
Auto Trait Implementations§
impl Freeze for ParallelTextProcessor
impl RefUnwindSafe for ParallelTextProcessor
impl Send for ParallelTextProcessor
impl Sync for ParallelTextProcessor
impl Unpin for ParallelTextProcessor
impl UnwindSafe for ParallelTextProcessor
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
Source§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
The inverse inclusion map: attempts to construct
self from the equivalent element of its
superset. Read moreSource§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
Checks if
self is actually part of its subset T (and can be converted to it).Source§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
Use with care! Same as
self.to_subset but without any property checks. Always succeeds.Source§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
The inclusion map: converts
self to the equivalent element of its superset.