provenant/scanner/process/
spill.rs1use crate::models::FileInfo;
2use std::fs::{self, File};
3use std::io::{Read, Write};
4use tempfile::TempDir;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum MemoryMode {
8 CollectFirst,
9 StreamUnlimited,
10 Limit(usize),
11}
12
13impl std::fmt::Display for MemoryMode {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 match self {
16 MemoryMode::CollectFirst => write!(f, "0"),
17 MemoryMode::StreamUnlimited => write!(f, "-1"),
18 MemoryMode::Limit(n) => write!(f, "{n}"),
19 }
20 }
21}
22
23pub(super) fn retain_or_spill_chunk(
24 chunk: Vec<FileInfo>,
25 retained_files: &mut Vec<FileInfo>,
26 spill_store: &mut Option<FileInfoSpillStore>,
27 memory_limit: usize,
28) {
29 if memory_limit == 0 {
30 spill_store
31 .get_or_insert_with(FileInfoSpillStore::new)
32 .spill(chunk);
33 return;
34 }
35
36 let remaining_capacity = memory_limit.saturating_sub(retained_files.len());
37 if remaining_capacity >= chunk.len() && spill_store.is_none() {
38 retained_files.extend(chunk);
39 return;
40 }
41
42 let mut chunk_iter = chunk.into_iter();
43 retained_files.extend(chunk_iter.by_ref().take(remaining_capacity));
44 let overflow: Vec<FileInfo> = chunk_iter.collect();
45 if !overflow.is_empty() {
46 spill_store
47 .get_or_insert_with(FileInfoSpillStore::new)
48 .spill(overflow);
49 }
50}
51
52pub(super) struct FileInfoSpillStore {
53 temp_dir: TempDir,
54 batch_index: usize,
55}
56
57impl FileInfoSpillStore {
58 fn new() -> Self {
59 Self {
60 temp_dir: TempDir::new().expect("create spill dir"),
61 batch_index: 0,
62 }
63 }
64
65 fn spill(&mut self, files: Vec<FileInfo>) {
66 let path = self
67 .temp_dir
68 .path()
69 .join(format!("batch-{:06}.json.zst", self.batch_index));
70 self.batch_index += 1;
71
72 let payload = serde_json::to_vec(&files).expect("encode spilled file batch");
73 let file = File::create(path).expect("create spill batch file");
74 let mut encoder = zstd::Encoder::new(file, 3).expect("create spill encoder");
75 encoder
76 .write_all(&payload)
77 .expect("write spilled file batch");
78 encoder.finish().expect("finish spill encoder");
79 }
80
81 pub(super) fn load_all(self) -> Vec<FileInfo> {
82 let mut paths: Vec<_> = fs::read_dir(self.temp_dir.path())
83 .expect("read spill dir")
84 .filter_map(Result::ok)
85 .map(|entry| entry.path())
86 .collect();
87 paths.sort();
88
89 let mut files = Vec::new();
90 for path in paths {
91 let file = File::open(path).expect("open spill batch");
92 let mut decoder = zstd::Decoder::new(file).expect("create spill decoder");
93 let mut payload = Vec::new();
94 decoder.read_to_end(&mut payload).expect("read spill batch");
95 let mut batch: Vec<FileInfo> =
96 serde_json::from_slice(&payload).expect("decode spilled file batch");
97 files.append(&mut batch);
98 }
99 files
100 }
101}