Skip to main content

rlm_rs/chunking/
parallel.rs

1//! Parallel chunking orchestrator.
2//!
3//! Wraps another chunker and processes chunks in parallel using rayon
4//! for improved performance on large texts.
5
6use crate::chunking::traits::{ChunkMetadata, Chunker};
7use crate::core::Chunk;
8use crate::error::Result;
9use rayon::prelude::*;
10
11/// Parallel chunking orchestrator.
12///
13/// Wraps another chunker and uses rayon for parallel processing.
14/// Useful for very large texts where chunking itself is CPU-bound.
15///
16/// # Examples
17///
18/// ```
19/// use rlm_rs::chunking::{Chunker, ParallelChunker, SemanticChunker};
20///
21/// let chunker = ParallelChunker::new(SemanticChunker::new());
22/// let text = "Hello, world! ".repeat(1000);
23/// let chunks = chunker.chunk(1, &text, None).unwrap();
24/// ```
25#[derive(Debug, Clone)]
26pub struct ParallelChunker<C: Chunker + Clone> {
27    /// The inner chunker to use for actual chunking.
28    inner: C,
29    /// Minimum text size to enable parallel processing.
30    min_parallel_size: usize,
31    /// Number of segments to split the text into for parallel processing.
32    num_segments: usize,
33}
34
35impl<C: Chunker + Clone> ParallelChunker<C> {
36    /// Creates a new parallel chunker wrapping the given chunker.
37    ///
38    /// # Arguments
39    ///
40    /// * `inner` - The chunker to use for actual text processing.
41    #[must_use]
42    pub fn new(inner: C) -> Self {
43        Self {
44            inner,
45            min_parallel_size: 100_000, // 100KB minimum for parallel
46            num_segments: num_cpus::get().max(2),
47        }
48    }
49
50    /// Sets the minimum text size for parallel processing.
51    ///
52    /// Texts smaller than this will be processed sequentially.
53    #[must_use]
54    pub const fn min_parallel_size(mut self, size: usize) -> Self {
55        self.min_parallel_size = size;
56        self
57    }
58
59    /// Sets the number of parallel segments.
60    #[must_use]
61    pub fn num_segments(mut self, n: usize) -> Self {
62        self.num_segments = n.max(1);
63        self
64    }
65
66    /// Splits text into roughly equal segments at good boundaries.
67    fn split_into_segments<'a>(&self, text: &'a str, n: usize) -> Vec<(usize, &'a str)> {
68        if n <= 1 || text.len() < self.min_parallel_size {
69            return vec![(0, text)];
70        }
71
72        let segment_size = text.len() / n;
73        let mut segments = Vec::with_capacity(n);
74        let mut start = 0;
75
76        for i in 0..n {
77            let target_end = if i == n - 1 {
78                text.len()
79            } else {
80                start + segment_size
81            };
82
83            let end = Self::find_segment_boundary(text, target_end);
84            let end = end.max(start + 1).min(text.len());
85
86            if start < text.len() {
87                segments.push((start, &text[start..end]));
88            }
89
90            start = end;
91            if start >= text.len() {
92                break;
93            }
94        }
95
96        segments
97    }
98
99    /// Finds a good boundary for segment splitting.
100    fn find_segment_boundary(text: &str, target: usize) -> usize {
101        if target >= text.len() {
102            return text.len();
103        }
104
105        // Look for paragraph break first
106        let search_start = target.saturating_sub(1000);
107        let search_region = &text[search_start..target.min(text.len())];
108
109        if let Some(pos) = search_region.rfind("\n\n") {
110            return search_start + pos + 2;
111        }
112
113        // Then newline
114        if let Some(pos) = search_region.rfind('\n') {
115            return search_start + pos + 1;
116        }
117
118        // Then space
119        if let Some(pos) = search_region.rfind(' ') {
120            return search_start + pos + 1;
121        }
122
123        // Fallback to character boundary
124        let mut pos = target;
125        while !text.is_char_boundary(pos) && pos > 0 {
126            pos -= 1;
127        }
128        pos
129    }
130
131    /// Merges chunks from multiple segments, reindexing them.
132    fn merge_chunks(segment_chunks: Vec<Vec<Chunk>>, buffer_id: i64) -> Vec<Chunk> {
133        let mut all_chunks: Vec<Chunk> = Vec::new();
134        let mut index = 0;
135
136        for chunks in segment_chunks {
137            for mut chunk in chunks {
138                chunk.index = index;
139                chunk.buffer_id = buffer_id;
140                all_chunks.push(chunk);
141                index += 1;
142            }
143        }
144
145        all_chunks
146    }
147}
148
149impl<C: Chunker + Clone + Send + Sync> Chunker for ParallelChunker<C> {
150    fn chunk(
151        &self,
152        buffer_id: i64,
153        text: &str,
154        metadata: Option<&ChunkMetadata>,
155    ) -> Result<Vec<Chunk>> {
156        // For small texts, just use the inner chunker directly
157        if text.len() < self.min_parallel_size {
158            return self.inner.chunk(buffer_id, text, metadata);
159        }
160
161        // Split into segments
162        let segments = self.split_into_segments(text, self.num_segments);
163
164        if segments.len() <= 1 {
165            return self.inner.chunk(buffer_id, text, metadata);
166        }
167
168        // Process segments in parallel
169        let results: Vec<Result<Vec<Chunk>>> = segments
170            .par_iter()
171            .map(|(offset, segment)| {
172                let mut chunks = self.inner.chunk(buffer_id, segment, metadata)?;
173
174                // Adjust byte ranges to account for segment offset
175                for chunk in &mut chunks {
176                    chunk.byte_range =
177                        (chunk.byte_range.start + offset)..(chunk.byte_range.end + offset);
178                }
179
180                Ok(chunks)
181            })
182            .collect();
183
184        // Collect results, propagating any errors
185        let mut all_segment_chunks = Vec::with_capacity(results.len());
186        for result in results {
187            all_segment_chunks.push(result?);
188        }
189
190        // Merge and reindex
191        Ok(Self::merge_chunks(all_segment_chunks, buffer_id))
192    }
193
194    fn name(&self) -> &'static str {
195        "parallel"
196    }
197
198    fn supports_parallel(&self) -> bool {
199        true
200    }
201
202    fn description(&self) -> &'static str {
203        "Parallel chunking using rayon for multi-threaded processing"
204    }
205}
206
207// Add num_cpus as a simple function since we can't add dependencies mid-implementation
208mod num_cpus {
209    pub fn get() -> usize {
210        std::thread::available_parallelism().map_or(4, std::num::NonZeroUsize::get)
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::chunking::SemanticChunker;
218
219    #[test]
220    fn test_parallel_chunker_small_text() {
221        let chunker = ParallelChunker::new(SemanticChunker::with_size(50));
222        let text = "Hello, world!";
223        let chunks = chunker.chunk(1, text, None).unwrap();
224        assert_eq!(chunks.len(), 1);
225        assert_eq!(chunks[0].content, text);
226    }
227
228    #[test]
229    fn test_parallel_chunker_large_text() {
230        let chunker = ParallelChunker::new(SemanticChunker::with_size(1000))
231            .min_parallel_size(1000)
232            .num_segments(4);
233
234        // Generate large text
235        let text = "Hello, world! This is a test sentence. ".repeat(500);
236
237        let chunks = chunker.chunk(1, &text, None).unwrap();
238
239        // Verify all chunks are valid
240        for chunk in &chunks {
241            assert!(!chunk.content.is_empty());
242            assert_eq!(&text[chunk.byte_range.clone()], chunk.content);
243        }
244
245        // Verify indices are sequential
246        for (i, chunk) in chunks.iter().enumerate() {
247            assert_eq!(chunk.index, i);
248        }
249    }
250
251    #[test]
252    fn test_parallel_chunker_preserves_content() {
253        let chunker = ParallelChunker::new(SemanticChunker::with_size(500))
254            .min_parallel_size(500)
255            .num_segments(2);
256
257        let text = "Paragraph one. Sentence two.\n\nParagraph two. More text here.\n\n".repeat(50);
258
259        let chunks = chunker.chunk(1, &text, None).unwrap();
260
261        // Reconstruct text from chunks (accounting for possible overlap)
262        let mut reconstructed = String::new();
263        let mut last_end = 0;
264
265        for chunk in &chunks {
266            use std::cmp::Ordering;
267            match chunk.byte_range.start.cmp(&last_end) {
268                Ordering::Greater => {
269                    // Gap - shouldn't happen in well-formed chunking
270                }
271                Ordering::Less => {
272                    // Overlap - skip overlapping portion
273                    let skip = last_end - chunk.byte_range.start;
274                    if skip < chunk.content.len() {
275                        reconstructed.push_str(&chunk.content[skip..]);
276                    }
277                }
278                Ordering::Equal => {
279                    reconstructed.push_str(&chunk.content);
280                }
281            }
282            last_end = chunk.byte_range.end;
283        }
284
285        // The reconstructed text should cover the original
286        assert!(!chunks.is_empty());
287        assert!(!reconstructed.is_empty());
288    }
289
290    #[test]
291    fn test_parallel_chunker_strategy_name() {
292        let chunker = ParallelChunker::new(SemanticChunker::new());
293        assert_eq!(chunker.name(), "parallel");
294        assert!(chunker.supports_parallel());
295    }
296
297    #[test]
298    fn test_split_into_segments() {
299        let chunker = ParallelChunker::new(SemanticChunker::new())
300            .min_parallel_size(10)
301            .num_segments(3);
302
303        let text = "First paragraph.\n\nSecond paragraph.\n\nThird paragraph.";
304        let segments = chunker.split_into_segments(text, 3);
305
306        // Should have multiple segments
307        assert!(!segments.is_empty());
308
309        // All segments should be non-empty
310        for (_, segment) in &segments {
311            assert!(!segment.is_empty());
312        }
313    }
314
315    #[test]
316    fn test_parallel_chunker_empty_text() {
317        let chunker = ParallelChunker::new(SemanticChunker::new());
318        let chunks = chunker.chunk(1, "", None).unwrap();
319        assert!(chunks.is_empty());
320    }
321
322    #[test]
323    fn test_split_into_segments_single_segment() {
324        // Test when n <= 1 returns single segment (line 69)
325        let chunker = ParallelChunker::new(SemanticChunker::new())
326            .min_parallel_size(10)
327            .num_segments(1);
328
329        let text = "This is some test content";
330        let segments = chunker.split_into_segments(text, 1);
331        assert_eq!(segments.len(), 1);
332        assert_eq!(segments[0].1, text);
333    }
334
335    #[test]
336    fn test_split_into_segments_text_too_small() {
337        // Test when text.len() < min_parallel_size (line 68-69)
338        let chunker = ParallelChunker::new(SemanticChunker::new())
339            .min_parallel_size(1000)
340            .num_segments(4);
341
342        let text = "Short text";
343        let segments = chunker.split_into_segments(text, 4);
344        assert_eq!(segments.len(), 1);
345        assert_eq!(segments[0].1, text);
346    }
347
348    #[test]
349    fn test_parallel_chunker_segments_collapse_to_one() {
350        // Test when split produces only 1 segment, falls back to inner (line 165)
351        let chunker = ParallelChunker::new(SemanticChunker::with_size(100))
352            .min_parallel_size(10)
353            .num_segments(10);
354
355        // Text that's small enough that segmentation produces just one segment
356        let text = "A short text that won't split well.";
357        let chunks = chunker.chunk(1, text, None).unwrap();
358        assert!(!chunks.is_empty());
359    }
360
361    #[test]
362    fn test_parallel_chunker_description() {
363        // Test description method (lines 202-203)
364        let chunker = ParallelChunker::new(SemanticChunker::new());
365        let desc = chunker.description();
366        assert!(desc.contains("Parallel"));
367        assert!(!desc.is_empty());
368    }
369
370    #[test]
371    fn test_find_segment_boundary_no_good_boundary() {
372        // Test when no paragraph, newline, or space is found (lines 124-128)
373        let text = "AAAAAAAAAAAAAAAAAAAA"; // No natural boundaries
374        let boundary = ParallelChunker::<SemanticChunker>::find_segment_boundary(text, 10);
375        // Should fall back to character boundary
376        assert!(boundary <= text.len());
377    }
378
379    #[test]
380    fn test_find_segment_boundary_at_end() {
381        // Test when target >= text.len() (line 102)
382        let text = "Short";
383        let boundary = ParallelChunker::<SemanticChunker>::find_segment_boundary(text, 100);
384        assert_eq!(boundary, text.len());
385    }
386
387    #[test]
388    fn test_find_segment_boundary_finds_space() {
389        // Test when space is found but no newline (lines 119-120)
390        let text = "word1 word2 word3 word4";
391        let boundary = ParallelChunker::<SemanticChunker>::find_segment_boundary(text, 15);
392        // Should find a space boundary
393        assert!(boundary > 0 && boundary <= text.len());
394    }
395}