Skip to main content

provenant/scanner/process/
orchestrator.rs

1// SPDX-FileCopyrightText: Provenant contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::pipeline::process_file;
5use super::special_cases::process_directory;
6use super::spill::{FileInfoSpillStore, MemoryMode, retain_or_spill_chunk};
7use crate::license_detection::LicenseDetectionEngine;
8use crate::models::FileInfo;
9use crate::progress::ScanProgress;
10use crate::scanner::collect::CollectedPaths;
11use crate::scanner::{LicenseScanOptions, ProcessResult, TextDetectionOptions};
12use rayon::prelude::*;
13use std::sync::Arc;
14
15pub fn process_collected(
16    collected: &CollectedPaths,
17    progress: Arc<ScanProgress>,
18    license_engine: Option<Arc<LicenseDetectionEngine>>,
19    license_options: LicenseScanOptions,
20    text_options: &TextDetectionOptions,
21) -> ProcessResult {
22    let mut all_files: Vec<FileInfo> = collected
23        .files
24        .par_iter()
25        .map(|(path, metadata)| {
26            let file_entry = process_file(
27                path,
28                metadata,
29                progress.as_ref(),
30                license_engine.clone(),
31                license_options,
32                text_options,
33            );
34            progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
35            file_entry
36        })
37        .collect();
38
39    for (path, metadata) in &collected.directories {
40        all_files.push(process_directory(
41            path,
42            metadata,
43            text_options.collect_info,
44            license_engine.is_some(),
45        ));
46    }
47
48    ProcessResult {
49        files: all_files,
50        excluded_count: collected.excluded_count,
51    }
52}
53
54pub fn process_collected_sequential(
55    collected: &CollectedPaths,
56    progress: Arc<ScanProgress>,
57    license_engine: Option<Arc<LicenseDetectionEngine>>,
58    license_options: LicenseScanOptions,
59    text_options: &TextDetectionOptions,
60) -> ProcessResult {
61    let mut all_files: Vec<FileInfo> =
62        Vec::with_capacity(collected.files.len() + collected.directories.len());
63
64    for (path, metadata) in &collected.files {
65        let file_entry = process_file(
66            path,
67            metadata,
68            progress.as_ref(),
69            license_engine.clone(),
70            license_options,
71            text_options,
72        );
73        progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
74        all_files.push(file_entry);
75    }
76
77    for (path, metadata) in &collected.directories {
78        all_files.push(process_directory(
79            path,
80            metadata,
81            text_options.collect_info,
82            license_engine.is_some(),
83        ));
84    }
85
86    ProcessResult {
87        files: all_files,
88        excluded_count: collected.excluded_count,
89    }
90}
91
92pub fn process_collected_with_memory_limit(
93    collected: &CollectedPaths,
94    progress: Arc<ScanProgress>,
95    license_engine: Option<Arc<LicenseDetectionEngine>>,
96    license_options: LicenseScanOptions,
97    text_options: &TextDetectionOptions,
98    max_in_memory: MemoryMode,
99) -> ProcessResult {
100    let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
101        return process_collected(
102            collected,
103            progress,
104            license_engine,
105            license_options,
106            text_options,
107        );
108    };
109
110    let mut retained_files = Vec::new();
111    let mut spill_store: Option<FileInfoSpillStore> = None;
112
113    for chunk in collected.files.chunks(chunk_size) {
114        let processed_chunk: Vec<FileInfo> = chunk
115            .par_iter()
116            .map(|(path, metadata)| {
117                let file_entry = process_file(
118                    path,
119                    metadata,
120                    progress.as_ref(),
121                    license_engine.clone(),
122                    license_options,
123                    text_options,
124                );
125                progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
126                file_entry
127            })
128            .collect();
129
130        retain_or_spill_chunk(
131            processed_chunk,
132            &mut retained_files,
133            &mut spill_store,
134            memory_limit,
135        );
136    }
137
138    for (path, metadata) in &collected.directories {
139        let entry = process_directory(
140            path,
141            metadata,
142            text_options.collect_info,
143            license_engine.is_some(),
144        );
145        retain_or_spill_chunk(
146            vec![entry],
147            &mut retained_files,
148            &mut spill_store,
149            memory_limit,
150        );
151    }
152
153    if let Some(spill_store) = spill_store {
154        retained_files.extend(spill_store.load_all());
155    }
156
157    ProcessResult {
158        files: retained_files,
159        excluded_count: collected.excluded_count,
160    }
161}
162
163pub fn process_collected_with_memory_limit_sequential(
164    collected: &CollectedPaths,
165    progress: Arc<ScanProgress>,
166    license_engine: Option<Arc<LicenseDetectionEngine>>,
167    license_options: LicenseScanOptions,
168    text_options: &TextDetectionOptions,
169    max_in_memory: MemoryMode,
170) -> ProcessResult {
171    let Some((memory_limit, chunk_size)) = memory_limit_settings(max_in_memory) else {
172        return process_collected_sequential(
173            collected,
174            progress,
175            license_engine,
176            license_options,
177            text_options,
178        );
179    };
180
181    let mut retained_files = Vec::new();
182    let mut spill_store: Option<FileInfoSpillStore> = None;
183
184    for chunk in collected.files.chunks(chunk_size) {
185        let mut processed_chunk: Vec<FileInfo> = Vec::with_capacity(chunk.len());
186        for (path, metadata) in chunk {
187            let file_entry = process_file(
188                path,
189                metadata,
190                progress.as_ref(),
191                license_engine.clone(),
192                license_options,
193                text_options,
194            );
195            progress.file_completed(path, metadata.len(), &file_entry.scan_diagnostics);
196            processed_chunk.push(file_entry);
197        }
198
199        retain_or_spill_chunk(
200            processed_chunk,
201            &mut retained_files,
202            &mut spill_store,
203            memory_limit,
204        );
205    }
206
207    for (path, metadata) in &collected.directories {
208        let entry = process_directory(
209            path,
210            metadata,
211            text_options.collect_info,
212            license_engine.is_some(),
213        );
214        retain_or_spill_chunk(
215            vec![entry],
216            &mut retained_files,
217            &mut spill_store,
218            memory_limit,
219        );
220    }
221
222    if let Some(spill_store) = spill_store {
223        retained_files.extend(spill_store.load_all());
224    }
225
226    ProcessResult {
227        files: retained_files,
228        excluded_count: collected.excluded_count,
229    }
230}
231
232fn memory_limit_settings(max_in_memory: MemoryMode) -> Option<(usize, usize)> {
233    match max_in_memory {
234        MemoryMode::CollectFirst => None,
235        MemoryMode::StreamUnlimited => Some((0, 256)),
236        MemoryMode::Limit(n) => Some((n, n.max(1))),
237    }
238}