Skip to main content

provenant/scanner/process/
orchestrator.rs

1use super::pipeline::process_file;
2use super::special_cases::process_directory;
3use super::spill::{FileInfoSpillStore, MemoryMode, retain_or_spill_chunk};
4use crate::license_detection::LicenseDetectionEngine;
5use crate::models::FileInfo;
6use crate::progress::ScanProgress;
7use crate::scanner::collect::CollectedPaths;
8use crate::scanner::{LicenseScanOptions, ProcessResult, TextDetectionOptions};
9use rayon::prelude::*;
10use std::sync::Arc;
11
12pub fn process_collected(
13    collected: &CollectedPaths,
14    progress: Arc<ScanProgress>,
15    license_engine: Option<Arc<LicenseDetectionEngine>>,
16    license_options: LicenseScanOptions,
17    text_options: &TextDetectionOptions,
18) -> ProcessResult {
19    let mut all_files: Vec<FileInfo> = collected
20        .files
21        .par_iter()
22        .map(|(path, metadata)| {
23            let file_entry = process_file(
24                path,
25                metadata,
26                progress.as_ref(),
27                license_engine.clone(),
28                license_options,
29                text_options,
30            );
31            progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
32            file_entry
33        })
34        .collect();
35
36    for (path, metadata) in &collected.directories {
37        all_files.push(process_directory(
38            path,
39            metadata,
40            text_options.collect_info,
41            license_engine.is_some(),
42        ));
43    }
44
45    ProcessResult {
46        files: all_files,
47        excluded_count: collected.excluded_count,
48    }
49}
50
51pub fn process_collected_sequential(
52    collected: &CollectedPaths,
53    progress: Arc<ScanProgress>,
54    license_engine: Option<Arc<LicenseDetectionEngine>>,
55    license_options: LicenseScanOptions,
56    text_options: &TextDetectionOptions,
57) -> ProcessResult {
58    let mut all_files: Vec<FileInfo> =
59        Vec::with_capacity(collected.files.len() + collected.directories.len());
60
61    for (path, metadata) in &collected.files {
62        let file_entry = process_file(
63            path,
64            metadata,
65            progress.as_ref(),
66            license_engine.clone(),
67            license_options,
68            text_options,
69        );
70        progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
71        all_files.push(file_entry);
72    }
73
74    for (path, metadata) in &collected.directories {
75        all_files.push(process_directory(
76            path,
77            metadata,
78            text_options.collect_info,
79            license_engine.is_some(),
80        ));
81    }
82
83    ProcessResult {
84        files: all_files,
85        excluded_count: collected.excluded_count,
86    }
87}
88
89pub fn process_collected_with_memory_limit(
90    collected: &CollectedPaths,
91    progress: Arc<ScanProgress>,
92    license_engine: Option<Arc<LicenseDetectionEngine>>,
93    license_options: LicenseScanOptions,
94    text_options: &TextDetectionOptions,
95    max_in_memory: MemoryMode,
96) -> ProcessResult {
97    let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
98        return process_collected(
99            collected,
100            progress,
101            license_engine,
102            license_options,
103            text_options,
104        );
105    };
106
107    let mut retained_files = Vec::new();
108    let mut spill_store: Option<FileInfoSpillStore> = None;
109
110    for chunk in collected.files.chunks(chunk_size) {
111        let processed_chunk: Vec<FileInfo> = chunk
112            .par_iter()
113            .map(|(path, metadata)| {
114                let file_entry = process_file(
115                    path,
116                    metadata,
117                    progress.as_ref(),
118                    license_engine.clone(),
119                    license_options,
120                    text_options,
121                );
122                progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
123                file_entry
124            })
125            .collect();
126
127        retain_or_spill_chunk(
128            processed_chunk,
129            &mut retained_files,
130            &mut spill_store,
131            memory_limit,
132        );
133    }
134
135    for (path, metadata) in &collected.directories {
136        let entry = process_directory(
137            path,
138            metadata,
139            text_options.collect_info,
140            license_engine.is_some(),
141        );
142        retain_or_spill_chunk(
143            vec![entry],
144            &mut retained_files,
145            &mut spill_store,
146            memory_limit,
147        );
148    }
149
150    if let Some(spill_store) = spill_store {
151        retained_files.extend(spill_store.load_all());
152    }
153
154    ProcessResult {
155        files: retained_files,
156        excluded_count: collected.excluded_count,
157    }
158}
159
160pub fn process_collected_with_memory_limit_sequential(
161    collected: &CollectedPaths,
162    progress: Arc<ScanProgress>,
163    license_engine: Option<Arc<LicenseDetectionEngine>>,
164    license_options: LicenseScanOptions,
165    text_options: &TextDetectionOptions,
166    max_in_memory: MemoryMode,
167) -> ProcessResult {
168    let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
169        return process_collected_sequential(
170            collected,
171            progress,
172            license_engine,
173            license_options,
174            text_options,
175        );
176    };
177
178    let mut retained_files = Vec::new();
179    let mut spill_store: Option<FileInfoSpillStore> = None;
180
181    for chunk in collected.files.chunks(chunk_size) {
182        let mut processed_chunk: Vec<FileInfo> = Vec::with_capacity(chunk.len());
183        for (path, metadata) in chunk {
184            let file_entry = process_file(
185                path,
186                metadata,
187                progress.as_ref(),
188                license_engine.clone(),
189                license_options,
190                text_options,
191            );
192            progress.file_completed(path, metadata.len(), &file_entry.scan_errors);
193            processed_chunk.push(file_entry);
194        }
195
196        retain_or_spill_chunk(
197            processed_chunk,
198            &mut retained_files,
199            &mut spill_store,
200            memory_limit,
201        );
202    }
203
204    for (path, metadata) in &collected.directories {
205        let entry = process_directory(
206            path,
207            metadata,
208            text_options.collect_info,
209            license_engine.is_some(),
210        );
211        retain_or_spill_chunk(
212            vec![entry],
213            &mut retained_files,
214            &mut spill_store,
215            memory_limit,
216        );
217    }
218
219    if let Some(spill_store) = spill_store {
220        retained_files.extend(spill_store.load_all());
221    }
222
223    ProcessResult {
224        files: retained_files,
225        excluded_count: collected.excluded_count,
226    }
227}
228
229fn memory_limit_settings(max_in_memory: MemoryMode) -> Option<(usize, usize)> {
230    match max_in_memory {
231        MemoryMode::CollectFirst => None,
232        MemoryMode::StreamUnlimited => Some((0, 256)),
233        MemoryMode::Limit(n) => Some((n, n.max(1))),
234    }
235}