Skip to main content

provenant/scanner/process/
spill.rs

1// SPDX-FileCopyrightText: Provenant contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::models::FileInfo;
5use std::fs::{self, File};
6use std::io::{Read, Write};
7use tempfile::TempDir;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum MemoryMode {
11    CollectFirst,
12    StreamUnlimited,
13    Limit(usize),
14}
15
16impl std::fmt::Display for MemoryMode {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        match self {
19            MemoryMode::CollectFirst => write!(f, "0"),
20            MemoryMode::StreamUnlimited => write!(f, "-1"),
21            MemoryMode::Limit(n) => write!(f, "{n}"),
22        }
23    }
24}
25
26pub(super) fn retain_or_spill_chunk(
27    chunk: Vec<FileInfo>,
28    retained_files: &mut Vec<FileInfo>,
29    spill_store: &mut Option<FileInfoSpillStore>,
30    memory_limit: usize,
31) {
32    if memory_limit == 0 {
33        spill_store
34            .get_or_insert_with(FileInfoSpillStore::new)
35            .spill(chunk);
36        return;
37    }
38
39    let remaining_capacity = memory_limit.saturating_sub(retained_files.len());
40    if remaining_capacity >= chunk.len() && spill_store.is_none() {
41        retained_files.extend(chunk);
42        return;
43    }
44
45    let mut chunk_iter = chunk.into_iter();
46    retained_files.extend(chunk_iter.by_ref().take(remaining_capacity));
47    let overflow: Vec<FileInfo> = chunk_iter.collect();
48    if !overflow.is_empty() {
49        spill_store
50            .get_or_insert_with(FileInfoSpillStore::new)
51            .spill(overflow);
52    }
53}
54
55pub(super) struct FileInfoSpillStore {
56    temp_dir: TempDir,
57    batch_index: usize,
58}
59
60impl FileInfoSpillStore {
61    fn new() -> Self {
62        Self {
63            temp_dir: TempDir::new().expect("create spill dir"),
64            batch_index: 0,
65        }
66    }
67
68    fn spill(&mut self, files: Vec<FileInfo>) {
69        let path = self
70            .temp_dir
71            .path()
72            .join(format!("batch-{:06}.json.zst", self.batch_index));
73        self.batch_index += 1;
74
75        let payload = serde_json::to_vec(&files).expect("encode spilled file batch");
76        let file = File::create(path).expect("create spill batch file");
77        let mut encoder = zstd::Encoder::new(file, 3).expect("create spill encoder");
78        encoder
79            .write_all(&payload)
80            .expect("write spilled file batch");
81        encoder.finish().expect("finish spill encoder");
82    }
83
84    pub(super) fn load_all(self) -> Vec<FileInfo> {
85        let mut paths: Vec<_> = fs::read_dir(self.temp_dir.path())
86            .expect("read spill dir")
87            .filter_map(Result::ok)
88            .map(|entry| entry.path())
89            .collect();
90        paths.sort();
91
92        let mut files = Vec::new();
93        for path in paths {
94            let file = File::open(path).expect("open spill batch");
95            let mut decoder = zstd::Decoder::new(file).expect("create spill decoder");
96            let mut payload = Vec::new();
97            decoder.read_to_end(&mut payload).expect("read spill batch");
98            let mut batch: Vec<FileInfo> =
99                serde_json::from_slice(&payload).expect("decode spilled file batch");
100            files.append(&mut batch);
101        }
102        files
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use crate::models::{DiagnosticSeverity, FileType, ScanDiagnostic};
110
111    #[test]
112    fn spilled_files_preserve_scan_diagnostic_severity() {
113        let mut store = FileInfoSpillStore::new();
114        let mut file = FileInfo::new(
115            "custom.txt".to_string(),
116            "custom".to_string(),
117            ".txt".to_string(),
118            "project/custom.txt".to_string(),
119            FileType::File,
120            None,
121            None,
122            10,
123            None,
124            None,
125            None,
126            None,
127            None,
128            Vec::new(),
129            None,
130            Vec::new(),
131            Vec::new(),
132            Vec::new(),
133            Vec::new(),
134            Vec::new(),
135            Vec::new(),
136            Vec::new(),
137            Vec::new(),
138            vec!["custom recoverable warning".to_string()],
139        );
140        file.scan_diagnostics = vec![ScanDiagnostic::warning("custom recoverable warning")];
141
142        store.spill(vec![file]);
143        let loaded = store.load_all();
144
145        assert_eq!(loaded.len(), 1);
146        assert_eq!(loaded[0].scan_diagnostics.len(), 1);
147        assert_eq!(
148            loaded[0].scan_diagnostics[0].severity,
149            DiagnosticSeverity::Warning
150        );
151    }
152}