provenant/scanner/process/
orchestrator.rs1use super::pipeline::process_file;
2use super::special_cases::process_directory;
3use super::spill::{FileInfoSpillStore, MemoryMode, retain_or_spill_chunk};
4use crate::license_detection::LicenseDetectionEngine;
5use crate::models::FileInfo;
6use crate::progress::ScanProgress;
7use crate::scanner::collect::CollectedPaths;
8use crate::scanner::{LicenseScanOptions, ProcessResult, TextDetectionOptions};
9use rayon::prelude::*;
10use std::sync::Arc;
11
12pub fn process_collected(
13 collected: &CollectedPaths,
14 progress: Arc<ScanProgress>,
15 license_engine: Option<Arc<LicenseDetectionEngine>>,
16 license_options: LicenseScanOptions,
17 text_options: &TextDetectionOptions,
18) -> ProcessResult {
19 let mut all_files: Vec<FileInfo> = collected
20 .files
21 .par_iter()
22 .map(|(path, metadata)| {
23 let file_entry = process_file(
24 path,
25 metadata,
26 progress.as_ref(),
27 license_engine.clone(),
28 license_options,
29 text_options,
30 );
31 progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
32 file_entry
33 })
34 .collect();
35
36 for (path, metadata) in &collected.directories {
37 all_files.push(process_directory(
38 path,
39 metadata,
40 text_options.collect_info,
41 license_engine.is_some(),
42 ));
43 }
44
45 ProcessResult {
46 files: all_files,
47 excluded_count: collected.excluded_count,
48 }
49}
50
51pub fn process_collected_sequential(
52 collected: &CollectedPaths,
53 progress: Arc<ScanProgress>,
54 license_engine: Option<Arc<LicenseDetectionEngine>>,
55 license_options: LicenseScanOptions,
56 text_options: &TextDetectionOptions,
57) -> ProcessResult {
58 let mut all_files: Vec<FileInfo> =
59 Vec::with_capacity(collected.files.len() + collected.directories.len());
60
61 for (path, metadata) in &collected.files {
62 let file_entry = process_file(
63 path,
64 metadata,
65 progress.as_ref(),
66 license_engine.clone(),
67 license_options,
68 text_options,
69 );
70 progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
71 all_files.push(file_entry);
72 }
73
74 for (path, metadata) in &collected.directories {
75 all_files.push(process_directory(
76 path,
77 metadata,
78 text_options.collect_info,
79 license_engine.is_some(),
80 ));
81 }
82
83 ProcessResult {
84 files: all_files,
85 excluded_count: collected.excluded_count,
86 }
87}
88
89pub fn process_collected_with_memory_limit(
90 collected: &CollectedPaths,
91 progress: Arc<ScanProgress>,
92 license_engine: Option<Arc<LicenseDetectionEngine>>,
93 license_options: LicenseScanOptions,
94 text_options: &TextDetectionOptions,
95 max_in_memory: MemoryMode,
96) -> ProcessResult {
97 let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
98 return process_collected(
99 collected,
100 progress,
101 license_engine,
102 license_options,
103 text_options,
104 );
105 };
106
107 let mut retained_files = Vec::new();
108 let mut spill_store: Option<FileInfoSpillStore> = None;
109
110 for chunk in collected.files.chunks(chunk_size) {
111 let processed_chunk: Vec<FileInfo> = chunk
112 .par_iter()
113 .map(|(path, metadata)| {
114 let file_entry = process_file(
115 path,
116 metadata,
117 progress.as_ref(),
118 license_engine.clone(),
119 license_options,
120 text_options,
121 );
122 progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
123 file_entry
124 })
125 .collect();
126
127 retain_or_spill_chunk(
128 processed_chunk,
129 &mut retained_files,
130 &mut spill_store,
131 memory_limit,
132 );
133 }
134
135 for (path, metadata) in &collected.directories {
136 let entry = process_directory(
137 path,
138 metadata,
139 text_options.collect_info,
140 license_engine.is_some(),
141 );
142 retain_or_spill_chunk(
143 vec![entry],
144 &mut retained_files,
145 &mut spill_store,
146 memory_limit,
147 );
148 }
149
150 if let Some(spill_store) = spill_store {
151 retained_files.extend(spill_store.load_all());
152 }
153
154 ProcessResult {
155 files: retained_files,
156 excluded_count: collected.excluded_count,
157 }
158}
159
160pub fn process_collected_with_memory_limit_sequential(
161 collected: &CollectedPaths,
162 progress: Arc<ScanProgress>,
163 license_engine: Option<Arc<LicenseDetectionEngine>>,
164 license_options: LicenseScanOptions,
165 text_options: &TextDetectionOptions,
166 max_in_memory: MemoryMode,
167) -> ProcessResult {
168 let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
169 return process_collected_sequential(
170 collected,
171 progress,
172 license_engine,
173 license_options,
174 text_options,
175 );
176 };
177
178 let mut retained_files = Vec::new();
179 let mut spill_store: Option<FileInfoSpillStore> = None;
180
181 for chunk in collected.files.chunks(chunk_size) {
182 let mut processed_chunk: Vec<FileInfo> = Vec::with_capacity(chunk.len());
183 for (path, metadata) in chunk {
184 let file_entry = process_file(
185 path,
186 metadata,
187 progress.as_ref(),
188 license_engine.clone(),
189 license_options,
190 text_options,
191 );
192 progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
193 processed_chunk.push(file_entry);
194 }
195
196 retain_or_spill_chunk(
197 processed_chunk,
198 &mut retained_files,
199 &mut spill_store,
200 memory_limit,
201 );
202 }
203
204 for (path, metadata) in &collected.directories {
205 let entry = process_directory(
206 path,
207 metadata,
208 text_options.collect_info,
209 license_engine.is_some(),
210 );
211 retain_or_spill_chunk(
212 vec![entry],
213 &mut retained_files,
214 &mut spill_store,
215 memory_limit,
216 );
217 }
218
219 if let Some(spill_store) = spill_store {
220 retained_files.extend(spill_store.load_all());
221 }
222
223 ProcessResult {
224 files: retained_files,
225 excluded_count: collected.excluded_count,
226 }
227}
228
229fn memory_limit_settings(max_in_memory: MemoryMode) -> Option<(usize, usize)> {
230 match max_in_memory {
231 MemoryMode::CollectFirst => None,
232 MemoryMode::StreamUnlimited => Some((0, 256)),
233 MemoryMode::Limit(n) => Some((n, n.max(1))),
234 }
235}