provenant/scanner/process/
orchestrator.rs1use super::pipeline::process_file;
5use super::special_cases::process_directory;
6use super::spill::{FileInfoSpillStore, MemoryMode, retain_or_spill_chunk};
7use crate::license_detection::LicenseDetectionEngine;
8use crate::models::FileInfo;
9use crate::parsers::with_parser_scan_root;
10use crate::progress::ScanProgress;
11use crate::scanner::collect::CollectedPaths;
12use crate::scanner::{LicenseScanOptions, ProcessResult, TextDetectionOptions};
13use rayon::prelude::*;
14use std::sync::Arc;
15
16pub fn process_collected(
17 collected: &CollectedPaths,
18 progress: Arc<ScanProgress>,
19 license_engine: Option<Arc<LicenseDetectionEngine>>,
20 license_options: LicenseScanOptions,
21 text_options: &TextDetectionOptions,
22) -> ProcessResult {
23 let mut all_files: Vec<FileInfo> = collected
24 .files
25 .par_iter()
26 .map(|(path, metadata)| {
27 let file_entry = with_parser_scan_root(collected.scan_root(), || {
28 process_file(
29 path,
30 metadata,
31 progress.as_ref(),
32 license_engine.clone(),
33 license_options,
34 text_options,
35 )
36 });
37 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
38 file_entry
39 })
40 .collect();
41
42 for (path, metadata) in &collected.directories {
43 all_files.push(process_directory(
44 path,
45 metadata,
46 text_options.collect_info,
47 license_engine.is_some(),
48 ));
49 }
50
51 ProcessResult {
52 files: all_files,
53 excluded_count: collected.excluded_count,
54 }
55}
56
57pub fn process_collected_sequential(
58 collected: &CollectedPaths,
59 progress: Arc<ScanProgress>,
60 license_engine: Option<Arc<LicenseDetectionEngine>>,
61 license_options: LicenseScanOptions,
62 text_options: &TextDetectionOptions,
63) -> ProcessResult {
64 let mut all_files: Vec<FileInfo> =
65 Vec::with_capacity(collected.files.len() + collected.directories.len());
66
67 for (path, metadata) in &collected.files {
68 let file_entry = with_parser_scan_root(collected.scan_root(), || {
69 process_file(
70 path,
71 metadata,
72 progress.as_ref(),
73 license_engine.clone(),
74 license_options,
75 text_options,
76 )
77 });
78 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
79 all_files.push(file_entry);
80 }
81
82 for (path, metadata) in &collected.directories {
83 all_files.push(process_directory(
84 path,
85 metadata,
86 text_options.collect_info,
87 license_engine.is_some(),
88 ));
89 }
90
91 ProcessResult {
92 files: all_files,
93 excluded_count: collected.excluded_count,
94 }
95}
96
97pub fn process_collected_with_memory_limit(
98 collected: &CollectedPaths,
99 progress: Arc<ScanProgress>,
100 license_engine: Option<Arc<LicenseDetectionEngine>>,
101 license_options: LicenseScanOptions,
102 text_options: &TextDetectionOptions,
103 max_in_memory: MemoryMode,
104) -> ProcessResult {
105 let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
106 return process_collected(
107 collected,
108 progress,
109 license_engine,
110 license_options,
111 text_options,
112 );
113 };
114
115 let mut retained_files = Vec::new();
116 let mut spill_store: Option<FileInfoSpillStore> = None;
117
118 for chunk in collected.files.chunks(chunk_size) {
119 let processed_chunk: Vec<FileInfo> = chunk
120 .par_iter()
121 .map(|(path, metadata)| {
122 let file_entry = with_parser_scan_root(collected.scan_root(), || {
123 process_file(
124 path,
125 metadata,
126 progress.as_ref(),
127 license_engine.clone(),
128 license_options,
129 text_options,
130 )
131 });
132 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
133 file_entry
134 })
135 .collect();
136
137 retain_or_spill_chunk(
138 processed_chunk,
139 &mut retained_files,
140 &mut spill_store,
141 memory_limit,
142 );
143 }
144
145 for (path, metadata) in &collected.directories {
146 let entry = process_directory(
147 path,
148 metadata,
149 text_options.collect_info,
150 license_engine.is_some(),
151 );
152 retain_or_spill_chunk(
153 vec![entry],
154 &mut retained_files,
155 &mut spill_store,
156 memory_limit,
157 );
158 }
159
160 if let Some(spill_store) = spill_store {
161 retained_files.extend(spill_store.load_all());
162 }
163
164 ProcessResult {
165 files: retained_files,
166 excluded_count: collected.excluded_count,
167 }
168}
169
170pub fn process_collected_with_memory_limit_sequential(
171 collected: &CollectedPaths,
172 progress: Arc<ScanProgress>,
173 license_engine: Option<Arc<LicenseDetectionEngine>>,
174 license_options: LicenseScanOptions,
175 text_options: &TextDetectionOptions,
176 max_in_memory: MemoryMode,
177) -> ProcessResult {
178 let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
179 return process_collected_sequential(
180 collected,
181 progress,
182 license_engine,
183 license_options,
184 text_options,
185 );
186 };
187
188 let mut retained_files = Vec::new();
189 let mut spill_store: Option<FileInfoSpillStore> = None;
190
191 for chunk in collected.files.chunks(chunk_size) {
192 let mut processed_chunk: Vec<FileInfo> = Vec::with_capacity(chunk.len());
193 for (path, metadata) in chunk {
194 let file_entry = with_parser_scan_root(collected.scan_root(), || {
195 process_file(
196 path,
197 metadata,
198 progress.as_ref(),
199 license_engine.clone(),
200 license_options,
201 text_options,
202 )
203 });
204 progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
205 processed_chunk.push(file_entry);
206 }
207
208 retain_or_spill_chunk(
209 processed_chunk,
210 &mut retained_files,
211 &mut spill_store,
212 memory_limit,
213 );
214 }
215
216 for (path, metadata) in &collected.directories {
217 let entry = process_directory(
218 path,
219 metadata,
220 text_options.collect_info,
221 license_engine.is_some(),
222 );
223 retain_or_spill_chunk(
224 vec![entry],
225 &mut retained_files,
226 &mut spill_store,
227 memory_limit,
228 );
229 }
230
231 if let Some(spill_store) = spill_store {
232 retained_files.extend(spill_store.load_all());
233 }
234
235 ProcessResult {
236 files: retained_files,
237 excluded_count: collected.excluded_count,
238 }
239}
240
241fn memory_limit_settings(max_in_memory: MemoryMode) -> Option<(usize, usize)> {
242 match max_in_memory {
243 MemoryMode::CollectFirst => None,
244 MemoryMode::StreamUnlimited => Some((0, 256)),
245 MemoryMode::Limit(n) => Some((n, n.max(1))),
246 }
247}