Skip to main content

provenant/scanner/process/
orchestrator.rs

1// SPDX-FileCopyrightText: Provenant contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::pipeline::process_file;
5use super::special_cases::process_directory;
6use super::spill::{FileInfoSpillStore, MemoryMode, retain_or_spill_chunk};
7use crate::license_detection::LicenseDetectionEngine;
8use crate::models::FileInfo;
9use crate::parsers::with_parser_scan_root;
10use crate::progress::ScanProgress;
11use crate::scanner::collect::CollectedPaths;
12use crate::scanner::{LicenseScanOptions, ProcessResult, TextDetectionOptions};
13use rayon::prelude::*;
14use std::sync::Arc;
15
16pub fn process_collected(
17    collected: &CollectedPaths,
18    progress: Arc<ScanProgress>,
19    license_engine: Option<Arc<LicenseDetectionEngine>>,
20    license_options: LicenseScanOptions,
21    text_options: &TextDetectionOptions,
22) -> ProcessResult {
23    let mut all_files: Vec<FileInfo> = collected
24        .files
25        .par_iter()
26        .map(|(path, metadata)| {
27            let file_entry = with_parser_scan_root(collected.scan_root(), || {
28                process_file(
29                    path,
30                    metadata,
31                    progress.as_ref(),
32                    license_engine.clone(),
33                    license_options,
34                    text_options,
35                )
36            });
37            progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
38            file_entry
39        })
40        .collect();
41
42    for (path, metadata) in &collected.directories {
43        all_files.push(process_directory(
44            path,
45            metadata,
46            text_options.collect_info,
47            license_engine.is_some(),
48        ));
49    }
50
51    ProcessResult {
52        files: all_files,
53        excluded_count: collected.excluded_count,
54    }
55}
56
57pub fn process_collected_sequential(
58    collected: &CollectedPaths,
59    progress: Arc<ScanProgress>,
60    license_engine: Option<Arc<LicenseDetectionEngine>>,
61    license_options: LicenseScanOptions,
62    text_options: &TextDetectionOptions,
63) -> ProcessResult {
64    let mut all_files: Vec<FileInfo> =
65        Vec::with_capacity(collected.files.len() + collected.directories.len());
66
67    for (path, metadata) in &collected.files {
68        let file_entry = with_parser_scan_root(collected.scan_root(), || {
69            process_file(
70                path,
71                metadata,
72                progress.as_ref(),
73                license_engine.clone(),
74                license_options,
75                text_options,
76            )
77        });
78        progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
79        all_files.push(file_entry);
80    }
81
82    for (path, metadata) in &collected.directories {
83        all_files.push(process_directory(
84            path,
85            metadata,
86            text_options.collect_info,
87            license_engine.is_some(),
88        ));
89    }
90
91    ProcessResult {
92        files: all_files,
93        excluded_count: collected.excluded_count,
94    }
95}
96
97pub fn process_collected_with_memory_limit(
98    collected: &CollectedPaths,
99    progress: Arc<ScanProgress>,
100    license_engine: Option<Arc<LicenseDetectionEngine>>,
101    license_options: LicenseScanOptions,
102    text_options: &TextDetectionOptions,
103    max_in_memory: MemoryMode,
104) -> ProcessResult {
105    let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
106        return process_collected(
107            collected,
108            progress,
109            license_engine,
110            license_options,
111            text_options,
112        );
113    };
114
115    let mut retained_files = Vec::new();
116    let mut spill_store: Option<FileInfoSpillStore> = None;
117
118    for chunk in collected.files.chunks(chunk_size) {
119        let processed_chunk: Vec<FileInfo> = chunk
120            .par_iter()
121            .map(|(path, metadata)| {
122                let file_entry = with_parser_scan_root(collected.scan_root(), || {
123                    process_file(
124                        path,
125                        metadata,
126                        progress.as_ref(),
127                        license_engine.clone(),
128                        license_options,
129                        text_options,
130                    )
131                });
132                progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
133                file_entry
134            })
135            .collect();
136
137        retain_or_spill_chunk(
138            processed_chunk,
139            &mut retained_files,
140            &mut spill_store,
141            memory_limit,
142        );
143    }
144
145    for (path, metadata) in &collected.directories {
146        let entry = process_directory(
147            path,
148            metadata,
149            text_options.collect_info,
150            license_engine.is_some(),
151        );
152        retain_or_spill_chunk(
153            vec![entry],
154            &mut retained_files,
155            &mut spill_store,
156            memory_limit,
157        );
158    }
159
160    if let Some(spill_store) = spill_store {
161        retained_files.extend(spill_store.load_all());
162    }
163
164    ProcessResult {
165        files: retained_files,
166        excluded_count: collected.excluded_count,
167    }
168}
169
170pub fn process_collected_with_memory_limit_sequential(
171    collected: &CollectedPaths,
172    progress: Arc<ScanProgress>,
173    license_engine: Option<Arc<LicenseDetectionEngine>>,
174    license_options: LicenseScanOptions,
175    text_options: &TextDetectionOptions,
176    max_in_memory: MemoryMode,
177) -> ProcessResult {
178    let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
179        return process_collected_sequential(
180            collected,
181            progress,
182            license_engine,
183            license_options,
184            text_options,
185        );
186    };
187
188    let mut retained_files = Vec::new();
189    let mut spill_store: Option<FileInfoSpillStore> = None;
190
191    for chunk in collected.files.chunks(chunk_size) {
192        let mut processed_chunk: Vec<FileInfo> = Vec::with_capacity(chunk.len());
193        for (path, metadata) in chunk {
194            let file_entry = with_parser_scan_root(collected.scan_root(), || {
195                process_file(
196                    path,
197                    metadata,
198                    progress.as_ref(),
199                    license_engine.clone(),
200                    license_options,
201                    text_options,
202                )
203            });
204            progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
205            processed_chunk.push(file_entry);
206        }
207
208        retain_or_spill_chunk(
209            processed_chunk,
210            &mut retained_files,
211            &mut spill_store,
212            memory_limit,
213        );
214    }
215
216    for (path, metadata) in &collected.directories {
217        let entry = process_directory(
218            path,
219            metadata,
220            text_options.collect_info,
221            license_engine.is_some(),
222        );
223        retain_or_spill_chunk(
224            vec![entry],
225            &mut retained_files,
226            &mut spill_store,
227            memory_limit,
228        );
229    }
230
231    if let Some(spill_store) = spill_store {
232        retained_files.extend(spill_store.load_all());
233    }
234
235    ProcessResult {
236        files: retained_files,
237        excluded_count: collected.excluded_count,
238    }
239}
240
241fn memory_limit_settings(max_in_memory: MemoryMode) -> Option<(usize, usize)> {
242    match max_in_memory {
243        MemoryMode::CollectFirst => None,
244        MemoryMode::StreamUnlimited => Some((0, 256)),
245        MemoryMode::Limit(n) => Some((n, n.max(1))),
246    }
247}