sakurs_core/application/
delta_stack.rs

1use std::sync::Arc;
2
3use rayon::prelude::*;
4
5use crate::{
6    application::{
7        chunking::{ChunkManager, TextChunk},
8        config::{ProcessingError, ProcessingResult, ProcessorConfig},
9        parser::TextParser,
10    },
11    domain::{
12        prefix_sum::{ChunkStartState, PrefixSumComputer},
13        reduce::BoundaryReducer,
14        types::{Boundary, DeltaEntry, DeltaVec, PartialState},
15    },
16    LanguageRules,
17};
18
19use super::execution_mode::ExecutionMode;
20
21/// Result of delta-stack processing with metadata
22pub struct DeltaStackResult {
23    pub boundaries: Vec<usize>,
24    pub chunk_count: usize,
25    pub thread_count: usize,
26}
27
28/// Core implementation of the Δ-Stack Monoid algorithm
29///
30/// This struct encapsulates the three-phase sentence boundary detection algorithm:
31/// 1. Scan phase: Process chunks in parallel to compute partial states
32/// 2. Prefix phase: Compute prefix sums to determine chunk start states
33/// 3. Reduce phase: Combine partial results with start states to find boundaries
34pub struct DeltaStackProcessor {
35    language_rules: Arc<dyn LanguageRules>,
36    chunk_manager: ChunkManager,
37    parser: TextParser,
38}
39
40impl DeltaStackProcessor {
41    /// Creates a new DeltaStackProcessor with the given configuration
42    pub fn new(config: ProcessorConfig, language_rules: Arc<dyn LanguageRules>) -> Self {
43        let chunk_manager = ChunkManager::new(config.chunk_size, config.overlap_size);
44
45        Self {
46            language_rules,
47            chunk_manager,
48            parser: TextParser::new(),
49        }
50    }
51
52    /// Main processing method that executes the Δ-Stack Monoid algorithm
53    pub fn process(&self, text: &str, mode: ExecutionMode) -> ProcessingResult<DeltaStackResult> {
54        // Early return for empty text
55        if text.is_empty() {
56            return Ok(DeltaStackResult {
57                boundaries: Vec::new(),
58                chunk_count: 0,
59                thread_count: 1,
60            });
61        }
62
63        // Phase 0: Chunk the text
64        let chunks = self.chunk_manager.chunk_text(text)?;
65        if chunks.is_empty() {
66            return Ok(DeltaStackResult {
67                boundaries: Vec::new(),
68                chunk_count: 0,
69                thread_count: 1,
70            });
71        }
72
73        let chunk_count = chunks.len();
74
75        // Determine execution strategy
76        let thread_count = mode.determine_thread_count(text.len());
77
78        // Execute the three phases
79        let partial_states = self.scan_phase(&chunks, thread_count)?;
80        let chunk_starts = self.prefix_phase(&partial_states, &chunks)?;
81        let boundaries =
82            self.reduce_phase(&partial_states, &chunk_starts, &chunks, thread_count)?;
83
84        Ok(DeltaStackResult {
85            boundaries,
86            chunk_count,
87            thread_count,
88        })
89    }
90
91    /// Phase 1: Scan - Process chunks to compute partial states
92    fn scan_phase(
93        &self,
94        chunks: &[TextChunk],
95        thread_count: usize,
96    ) -> ProcessingResult<Vec<PartialState>> {
97        if thread_count > 1 {
98            // Parallel processing with custom thread pool
99            let pool = rayon::ThreadPoolBuilder::new()
100                .num_threads(thread_count)
101                .build()
102                .map_err(|e| ProcessingError::InvalidConfig {
103                    reason: format!("Failed to create thread pool: {e}"),
104                })?;
105
106            pool.install(|| {
107                Ok(chunks
108                    .par_iter()
109                    .map(|chunk| {
110                        self.parser
111                            .scan_chunk(&chunk.content, self.language_rules.as_ref())
112                    })
113                    .collect::<Vec<_>>())
114            })
115        } else {
116            // Sequential processing
117            Ok(chunks
118                .iter()
119                .map(|chunk| {
120                    self.parser
121                        .scan_chunk(&chunk.content, self.language_rules.as_ref())
122                })
123                .collect())
124        }
125    }
126
127    /// Phase 2: Prefix - Compute chunk start states using prefix sums
128    fn prefix_phase(
129        &self,
130        partial_states: &[PartialState],
131        chunks: &[TextChunk],
132    ) -> ProcessingResult<Vec<ChunkStartState>> {
133        if partial_states.len() > 1 {
134            Ok(PrefixSumComputer::compute_prefix_sum_with_overlap(
135                partial_states,
136                chunks,
137            ))
138        } else {
139            // Single chunk - create default start state
140            Ok(vec![ChunkStartState {
141                cumulative_deltas: DeltaVec::from_vec(vec![
142                    DeltaEntry { net: 0, min: 0 };
143                    partial_states[0].deltas.len()
144                ]),
145                global_offset: 0,
146            }])
147        }
148    }
149
150    /// Phase 3: Reduce - Combine partial results with start states to find boundaries
151    fn reduce_phase(
152        &self,
153        partial_states: &[PartialState],
154        chunk_starts: &[ChunkStartState],
155        chunks: &[TextChunk],
156        thread_count: usize,
157    ) -> ProcessingResult<Vec<usize>> {
158        // Create pairs of (state, start) for processing
159        let state_start_pairs: Vec<_> = partial_states
160            .iter()
161            .zip(chunk_starts.iter())
162            .zip(chunks.iter())
163            .collect();
164
165        // Process chunks to find boundaries
166        let chunk_boundaries: Vec<Vec<Boundary>> = if thread_count > 1 {
167            // Parallel reduction with custom thread pool
168            let pool = rayon::ThreadPoolBuilder::new()
169                .num_threads(thread_count)
170                .build()
171                .map_err(|e| ProcessingError::InvalidConfig {
172                    reason: format!("Failed to create thread pool: {e}"),
173                })?;
174
175            pool.install(|| {
176                state_start_pairs
177                    .par_iter()
178                    .map(|((state, start), chunk)| self.reduce_chunk(state, start, chunk))
179                    .collect()
180            })
181        } else {
182            // Sequential reduction
183            state_start_pairs
184                .iter()
185                .map(|((state, start), chunk)| self.reduce_chunk(state, start, chunk))
186                .collect()
187        };
188
189        // Merge boundaries from all chunks
190        Ok(self.merge_boundaries(chunk_boundaries))
191    }
192
193    /// Reduces a single chunk to find its boundaries
194    fn reduce_chunk(
195        &self,
196        state: &PartialState,
197        start: &ChunkStartState,
198        chunk: &TextChunk,
199    ) -> Vec<Boundary> {
200        // For single chunk, use reduce_single
201        // For multiple chunks, we need proper indexing
202        if chunk.start_offset == 0 && !chunk.has_suffix_overlap {
203            // This is a single chunk or the only chunk
204            BoundaryReducer::reduce_single(state)
205        } else {
206            // Multi-chunk - need to handle properly with correct offsets
207            let states = vec![state.clone()];
208            let starts = vec![start.clone()];
209            BoundaryReducer::reduce_all(&states, &starts)
210                .into_iter()
211                .filter(|b| b.offset >= chunk.start_offset && b.offset < chunk.end_offset)
212                .collect()
213        }
214    }
215
216    /// Merges boundaries from multiple chunks into a single sorted list
217    fn merge_boundaries(&self, chunk_boundaries: Vec<Vec<Boundary>>) -> Vec<usize> {
218        let mut all_boundaries = Vec::new();
219
220        for boundaries in chunk_boundaries {
221            for boundary in boundaries {
222                all_boundaries.push(boundary.offset);
223            }
224        }
225
226        // Sort and deduplicate
227        all_boundaries.sort_unstable();
228        all_boundaries.dedup();
229
230        all_boundaries
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::domain::language::ConfigurableLanguageRules;
238
239    fn create_test_processor() -> DeltaStackProcessor {
240        let config = ProcessorConfig::default();
241        let rules = Arc::new(ConfigurableLanguageRules::from_code("en").unwrap());
242        DeltaStackProcessor::new(config, rules)
243    }
244
245    #[test]
246    fn test_empty_text() {
247        let processor = create_test_processor();
248        let result = processor.process("", ExecutionMode::Sequential).unwrap();
249        assert!(result.boundaries.is_empty());
250        assert_eq!(result.chunk_count, 0);
251        assert_eq!(result.thread_count, 1);
252    }
253
254    #[test]
255    fn test_single_sentence() {
256        let processor = create_test_processor();
257        let text = "This is a sentence.";
258        let result = processor.process(text, ExecutionMode::Sequential).unwrap();
259        assert_eq!(result.boundaries.len(), 1);
260        assert_eq!(result.boundaries[0], 19); // Position after the period
261        assert_eq!(result.chunk_count, 1);
262        assert_eq!(result.thread_count, 1);
263    }
264
265    #[test]
266    fn test_parallel_vs_sequential() {
267        let processor = create_test_processor();
268        let text = "First sentence. Second sentence. Third sentence.";
269
270        let seq_result = processor.process(text, ExecutionMode::Sequential).unwrap();
271        let par_result = processor
272            .process(text, ExecutionMode::Parallel { threads: Some(2) })
273            .unwrap();
274
275        assert_eq!(seq_result.boundaries, par_result.boundaries);
276        assert_eq!(seq_result.chunk_count, par_result.chunk_count);
277        assert_eq!(seq_result.thread_count, 1);
278        assert_eq!(par_result.thread_count, 2);
279    }
280}