provenant/scanner/process/
orchestrator.rs1use super::pipeline::process_file;
5use super::special_cases::process_directory;
6use super::spill::{FileInfoSpillStore, MemoryMode, retain_or_spill_chunk};
7use crate::license_detection::LicenseDetectionEngine;
8use crate::models::FileInfo;
9use crate::progress::ScanProgress;
10use crate::scanner::collect::CollectedPaths;
11use crate::scanner::{LicenseScanOptions, ProcessResult, TextDetectionOptions};
12use rayon::prelude::*;
13use std::sync::Arc;
14
15pub fn process_collected(
16 collected: &CollectedPaths,
17 progress: Arc<ScanProgress>,
18 license_engine: Option<Arc<LicenseDetectionEngine>>,
19 license_options: LicenseScanOptions,
20 text_options: &TextDetectionOptions,
21) -> ProcessResult {
22 let mut all_files: Vec<FileInfo> = collected
23 .files
24 .par_iter()
25 .map(|(path, metadata)| {
26 let file_entry = process_file(
27 path,
28 metadata,
29 progress.as_ref(),
30 license_engine.clone(),
31 license_options,
32 text_options,
33 );
34 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
35 file_entry
36 })
37 .collect();
38
39 for (path, metadata) in &collected.directories {
40 all_files.push(process_directory(
41 path,
42 metadata,
43 text_options.collect_info,
44 license_engine.is_some(),
45 ));
46 }
47
48 ProcessResult {
49 files: all_files,
50 excluded_count: collected.excluded_count,
51 }
52}
53
54pub fn process_collected_sequential(
55 collected: &CollectedPaths,
56 progress: Arc<ScanProgress>,
57 license_engine: Option<Arc<LicenseDetectionEngine>>,
58 license_options: LicenseScanOptions,
59 text_options: &TextDetectionOptions,
60) -> ProcessResult {
61 let mut all_files: Vec<FileInfo> =
62 Vec::with_capacity(collected.files.len() + collected.directories.len());
63
64 for (path, metadata) in &collected.files {
65 let file_entry = process_file(
66 path,
67 metadata,
68 progress.as_ref(),
69 license_engine.clone(),
70 license_options,
71 text_options,
72 );
73 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
74 all_files.push(file_entry);
75 }
76
77 for (path, metadata) in &collected.directories {
78 all_files.push(process_directory(
79 path,
80 metadata,
81 text_options.collect_info,
82 license_engine.is_some(),
83 ));
84 }
85
86 ProcessResult {
87 files: all_files,
88 excluded_count: collected.excluded_count,
89 }
90}
91
92pub fn process_collected_with_memory_limit(
93 collected: &CollectedPaths,
94 progress: Arc<ScanProgress>,
95 license_engine: Option<Arc<LicenseDetectionEngine>>,
96 license_options: LicenseScanOptions,
97 text_options: &TextDetectionOptions,
98 max_in_memory: MemoryMode,
99) -> ProcessResult {
100 let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
101 return process_collected(
102 collected,
103 progress,
104 license_engine,
105 license_options,
106 text_options,
107 );
108 };
109
110 let mut retained_files = Vec::new();
111 let mut spill_store: Option<FileInfoSpillStore> = None;
112
113 for chunk in collected.files.chunks(chunk_size) {
114 let processed_chunk: Vec<FileInfo> = chunk
115 .par_iter()
116 .map(|(path, metadata)| {
117 let file_entry = process_file(
118 path,
119 metadata,
120 progress.as_ref(),
121 license_engine.clone(),
122 license_options,
123 text_options,
124 );
125 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
126 file_entry
127 })
128 .collect();
129
130 retain_or_spill_chunk(
131 processed_chunk,
132 &mut retained_files,
133 &mut spill_store,
134 memory_limit,
135 );
136 }
137
138 for (path, metadata) in &collected.directories {
139 let entry = process_directory(
140 path,
141 metadata,
142 text_options.collect_info,
143 license_engine.is_some(),
144 );
145 retain_or_spill_chunk(
146 vec![entry],
147 &mut retained_files,
148 &mut spill_store,
149 memory_limit,
150 );
151 }
152
153 if let Some(spill_store) = spill_store {
154 retained_files.extend(spill_store.load_all());
155 }
156
157 ProcessResult {
158 files: retained_files,
159 excluded_count: collected.excluded_count,
160 }
161}
162
163pub fn process_collected_with_memory_limit_sequential(
164 collected: &CollectedPaths,
165 progress: Arc<ScanProgress>,
166 license_engine: Option<Arc<LicenseDetectionEngine>>,
167 license_options: LicenseScanOptions,
168 text_options: &TextDetectionOptions,
169 max_in_memory: MemoryMode,
170) -> ProcessResult {
171 let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
172 return process_collected_sequential(
173 collected,
174 progress,
175 license_engine,
176 license_options,
177 text_options,
178 );
179 };
180
181 let mut retained_files = Vec::new();
182 let mut spill_store: Option<FileInfoSpillStore> = None;
183
184 for chunk in collected.files.chunks(chunk_size) {
185 let mut processed_chunk: Vec<FileInfo> = Vec::with_capacity(chunk.len());
186 for (path, metadata) in chunk {
187 let file_entry = process_file(
188 path,
189 metadata,
190 progress.as_ref(),
191 license_engine.clone(),
192 license_options,
193 text_options,
194 );
195 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
196 processed_chunk.push(file_entry);
197 }
198
199 retain_or_spill_chunk(
200 processed_chunk,
201 &mut retained_files,
202 &mut spill_store,
203 memory_limit,
204 );
205 }
206
207 for (path, metadata) in &collected.directories {
208 let entry = process_directory(
209 path,
210 metadata,
211 text_options.collect_info,
212 license_engine.is_some(),
213 );
214 retain_or_spill_chunk(
215 vec![entry],
216 &mut retained_files,
217 &mut spill_store,
218 memory_limit,
219 );
220 }
221
222 if let Some(spill_store) = spill_store {
223 retained_files.extend(spill_store.load_all());
224 }
225
226 ProcessResult {
227 files: retained_files,
228 excluded_count: collected.excluded_count,
229 }
230}
231
232fn memory_limit_settings(max_in_memory: MemoryMode) -> Option<(usize, usize)> {
233 match max_in_memory {
234 MemoryMode::CollectFirst => None,
235 MemoryMode::StreamUnlimited => Some((0, 256)),
236 MemoryMode::Limit(n) => Some((n, n.max(1))),
237 }
238}