1use std::sync::Arc;
2
3use rayon::prelude::*;
4
5use crate::{
6 application::{
7 chunking::{ChunkManager, TextChunk},
8 config::{ProcessingError, ProcessingResult, ProcessorConfig},
9 parser::TextParser,
10 },
11 domain::{
12 prefix_sum::{ChunkStartState, PrefixSumComputer},
13 reduce::BoundaryReducer,
14 types::{Boundary, DeltaEntry, DeltaVec, PartialState},
15 },
16 LanguageRules,
17};
18
19use super::execution_mode::ExecutionMode;
20
21pub struct DeltaStackResult {
23 pub boundaries: Vec<usize>,
24 pub chunk_count: usize,
25 pub thread_count: usize,
26}
27
28pub struct DeltaStackProcessor {
35 language_rules: Arc<dyn LanguageRules>,
36 chunk_manager: ChunkManager,
37 parser: TextParser,
38}
39
40impl DeltaStackProcessor {
41 pub fn new(config: ProcessorConfig, language_rules: Arc<dyn LanguageRules>) -> Self {
43 let chunk_manager = ChunkManager::new(config.chunk_size, config.overlap_size);
44
45 Self {
46 language_rules,
47 chunk_manager,
48 parser: TextParser::new(),
49 }
50 }
51
52 pub fn process(&self, text: &str, mode: ExecutionMode) -> ProcessingResult<DeltaStackResult> {
54 if text.is_empty() {
56 return Ok(DeltaStackResult {
57 boundaries: Vec::new(),
58 chunk_count: 0,
59 thread_count: 1,
60 });
61 }
62
63 let chunks = self.chunk_manager.chunk_text(text)?;
65 if chunks.is_empty() {
66 return Ok(DeltaStackResult {
67 boundaries: Vec::new(),
68 chunk_count: 0,
69 thread_count: 1,
70 });
71 }
72
73 let chunk_count = chunks.len();
74
75 let thread_count = mode.determine_thread_count(text.len());
77
78 let partial_states = self.scan_phase(&chunks, thread_count)?;
80 let chunk_starts = self.prefix_phase(&partial_states, &chunks)?;
81 let boundaries =
82 self.reduce_phase(&partial_states, &chunk_starts, &chunks, thread_count)?;
83
84 Ok(DeltaStackResult {
85 boundaries,
86 chunk_count,
87 thread_count,
88 })
89 }
90
91 fn scan_phase(
93 &self,
94 chunks: &[TextChunk],
95 thread_count: usize,
96 ) -> ProcessingResult<Vec<PartialState>> {
97 if thread_count > 1 {
98 let pool = rayon::ThreadPoolBuilder::new()
100 .num_threads(thread_count)
101 .build()
102 .map_err(|e| ProcessingError::InvalidConfig {
103 reason: format!("Failed to create thread pool: {e}"),
104 })?;
105
106 pool.install(|| {
107 Ok(chunks
108 .par_iter()
109 .map(|chunk| {
110 self.parser
111 .scan_chunk(&chunk.content, self.language_rules.as_ref())
112 })
113 .collect::<Vec<_>>())
114 })
115 } else {
116 Ok(chunks
118 .iter()
119 .map(|chunk| {
120 self.parser
121 .scan_chunk(&chunk.content, self.language_rules.as_ref())
122 })
123 .collect())
124 }
125 }
126
127 fn prefix_phase(
129 &self,
130 partial_states: &[PartialState],
131 chunks: &[TextChunk],
132 ) -> ProcessingResult<Vec<ChunkStartState>> {
133 if partial_states.len() > 1 {
134 Ok(PrefixSumComputer::compute_prefix_sum_with_overlap(
135 partial_states,
136 chunks,
137 ))
138 } else {
139 Ok(vec![ChunkStartState {
141 cumulative_deltas: DeltaVec::from_vec(vec![
142 DeltaEntry { net: 0, min: 0 };
143 partial_states[0].deltas.len()
144 ]),
145 global_offset: 0,
146 }])
147 }
148 }
149
150 fn reduce_phase(
152 &self,
153 partial_states: &[PartialState],
154 chunk_starts: &[ChunkStartState],
155 chunks: &[TextChunk],
156 thread_count: usize,
157 ) -> ProcessingResult<Vec<usize>> {
158 let state_start_pairs: Vec<_> = partial_states
160 .iter()
161 .zip(chunk_starts.iter())
162 .zip(chunks.iter())
163 .collect();
164
165 let chunk_boundaries: Vec<Vec<Boundary>> = if thread_count > 1 {
167 let pool = rayon::ThreadPoolBuilder::new()
169 .num_threads(thread_count)
170 .build()
171 .map_err(|e| ProcessingError::InvalidConfig {
172 reason: format!("Failed to create thread pool: {e}"),
173 })?;
174
175 pool.install(|| {
176 state_start_pairs
177 .par_iter()
178 .map(|((state, start), chunk)| self.reduce_chunk(state, start, chunk))
179 .collect()
180 })
181 } else {
182 state_start_pairs
184 .iter()
185 .map(|((state, start), chunk)| self.reduce_chunk(state, start, chunk))
186 .collect()
187 };
188
189 Ok(self.merge_boundaries(chunk_boundaries))
191 }
192
193 fn reduce_chunk(
195 &self,
196 state: &PartialState,
197 start: &ChunkStartState,
198 chunk: &TextChunk,
199 ) -> Vec<Boundary> {
200 if chunk.start_offset == 0 && !chunk.has_suffix_overlap {
203 BoundaryReducer::reduce_single(state)
205 } else {
206 let states = vec![state.clone()];
208 let starts = vec![start.clone()];
209 BoundaryReducer::reduce_all(&states, &starts)
210 .into_iter()
211 .filter(|b| b.offset >= chunk.start_offset && b.offset < chunk.end_offset)
212 .collect()
213 }
214 }
215
216 fn merge_boundaries(&self, chunk_boundaries: Vec<Vec<Boundary>>) -> Vec<usize> {
218 let mut all_boundaries = Vec::new();
219
220 for boundaries in chunk_boundaries {
221 for boundary in boundaries {
222 all_boundaries.push(boundary.offset);
223 }
224 }
225
226 all_boundaries.sort_unstable();
228 all_boundaries.dedup();
229
230 all_boundaries
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::domain::language::ConfigurableLanguageRules;
238
239 fn create_test_processor() -> DeltaStackProcessor {
240 let config = ProcessorConfig::default();
241 let rules = Arc::new(ConfigurableLanguageRules::from_code("en").unwrap());
242 DeltaStackProcessor::new(config, rules)
243 }
244
245 #[test]
246 fn test_empty_text() {
247 let processor = create_test_processor();
248 let result = processor.process("", ExecutionMode::Sequential).unwrap();
249 assert!(result.boundaries.is_empty());
250 assert_eq!(result.chunk_count, 0);
251 assert_eq!(result.thread_count, 1);
252 }
253
254 #[test]
255 fn test_single_sentence() {
256 let processor = create_test_processor();
257 let text = "This is a sentence.";
258 let result = processor.process(text, ExecutionMode::Sequential).unwrap();
259 assert_eq!(result.boundaries.len(), 1);
260 assert_eq!(result.boundaries[0], 19); assert_eq!(result.chunk_count, 1);
262 assert_eq!(result.thread_count, 1);
263 }
264
265 #[test]
266 fn test_parallel_vs_sequential() {
267 let processor = create_test_processor();
268 let text = "First sentence. Second sentence. Third sentence.";
269
270 let seq_result = processor.process(text, ExecutionMode::Sequential).unwrap();
271 let par_result = processor
272 .process(text, ExecutionMode::Parallel { threads: Some(2) })
273 .unwrap();
274
275 assert_eq!(seq_result.boundaries, par_result.boundaries);
276 assert_eq!(seq_result.chunk_count, par_result.chunk_count);
277 assert_eq!(seq_result.thread_count, 1);
278 assert_eq!(par_result.thread_count, 2);
279 }
280}