provenant/scanner/process/
spill.rs1use crate::models::FileInfo;
5use std::fs::{self, File};
6use std::io::{Read, Write};
7use tempfile::TempDir;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum MemoryMode {
11 CollectFirst,
12 StreamUnlimited,
13 Limit(usize),
14}
15
16impl std::fmt::Display for MemoryMode {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 match self {
19 MemoryMode::CollectFirst => write!(f, "0"),
20 MemoryMode::StreamUnlimited => write!(f, "-1"),
21 MemoryMode::Limit(n) => write!(f, "{n}"),
22 }
23 }
24}
25
26pub(super) fn retain_or_spill_chunk(
27 chunk: Vec<FileInfo>,
28 retained_files: &mut Vec<FileInfo>,
29 spill_store: &mut Option<FileInfoSpillStore>,
30 memory_limit: usize,
31) {
32 if memory_limit == 0 {
33 spill_store
34 .get_or_insert_with(FileInfoSpillStore::new)
35 .spill(chunk);
36 return;
37 }
38
39 let remaining_capacity = memory_limit.saturating_sub(retained_files.len());
40 if remaining_capacity >= chunk.len() && spill_store.is_none() {
41 retained_files.extend(chunk);
42 return;
43 }
44
45 let mut chunk_iter = chunk.into_iter();
46 retained_files.extend(chunk_iter.by_ref().take(remaining_capacity));
47 let overflow: Vec<FileInfo> = chunk_iter.collect();
48 if !overflow.is_empty() {
49 spill_store
50 .get_or_insert_with(FileInfoSpillStore::new)
51 .spill(overflow);
52 }
53}
54
55pub(super) struct FileInfoSpillStore {
56 temp_dir: TempDir,
57 batch_index: usize,
58}
59
60impl FileInfoSpillStore {
61 fn new() -> Self {
62 Self {
63 temp_dir: TempDir::new().expect("create spill dir"),
64 batch_index: 0,
65 }
66 }
67
68 fn spill(&mut self, files: Vec<FileInfo>) {
69 let path = self
70 .temp_dir
71 .path()
72 .join(format!("batch-{:06}.json.zst", self.batch_index));
73 self.batch_index += 1;
74
75 let payload = serde_json::to_vec(&files).expect("encode spilled file batch");
76 let file = File::create(path).expect("create spill batch file");
77 let mut encoder = zstd::Encoder::new(file, 3).expect("create spill encoder");
78 encoder
79 .write_all(&payload)
80 .expect("write spilled file batch");
81 encoder.finish().expect("finish spill encoder");
82 }
83
84 pub(super) fn load_all(self) -> Vec<FileInfo> {
85 let mut paths: Vec<_> = fs::read_dir(self.temp_dir.path())
86 .expect("read spill dir")
87 .filter_map(Result::ok)
88 .map(|entry| entry.path())
89 .collect();
90 paths.sort();
91
92 let mut files = Vec::new();
93 for path in paths {
94 let file = File::open(path).expect("open spill batch");
95 let mut decoder = zstd::Decoder::new(file).expect("create spill decoder");
96 let mut payload = Vec::new();
97 decoder.read_to_end(&mut payload).expect("read spill batch");
98 let mut batch: Vec<FileInfo> =
99 serde_json::from_slice(&payload).expect("decode spilled file batch");
100 files.append(&mut batch);
101 }
102 files
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use crate::models::{DiagnosticSeverity, FileType, ScanDiagnostic};
110
111 #[test]
112 fn spilled_files_preserve_scan_diagnostic_severity() {
113 let mut store = FileInfoSpillStore::new();
114 let mut file = FileInfo::new(
115 "custom.txt".to_string(),
116 "custom".to_string(),
117 ".txt".to_string(),
118 "project/custom.txt".to_string(),
119 FileType::File,
120 None,
121 None,
122 10,
123 None,
124 None,
125 None,
126 None,
127 None,
128 Vec::new(),
129 None,
130 Vec::new(),
131 Vec::new(),
132 Vec::new(),
133 Vec::new(),
134 Vec::new(),
135 Vec::new(),
136 Vec::new(),
137 Vec::new(),
138 vec!["custom recoverable warning".to_string()],
139 );
140 file.scan_diagnostics = vec![ScanDiagnostic::warning("custom recoverable warning")];
141
142 store.spill(vec![file]);
143 let loaded = store.load_all();
144
145 assert_eq!(loaded.len(), 1);
146 assert_eq!(loaded[0].scan_diagnostics.len(), 1);
147 assert_eq!(
148 loaded[0].scan_diagnostics[0].severity,
149 DiagnosticSeverity::Warning
150 );
151 }
152}