Skip to main content

provenant/scanner/process/
spill.rs

1use crate::models::FileInfo;
2use std::fs::{self, File};
3use std::io::{Read, Write};
4use tempfile::TempDir;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum MemoryMode {
8    CollectFirst,
9    StreamUnlimited,
10    Limit(usize),
11}
12
13impl std::fmt::Display for MemoryMode {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        match self {
16            MemoryMode::CollectFirst => write!(f, "0"),
17            MemoryMode::StreamUnlimited => write!(f, "-1"),
18            MemoryMode::Limit(n) => write!(f, "{n}"),
19        }
20    }
21}
22
23pub(super) fn retain_or_spill_chunk(
24    chunk: Vec<FileInfo>,
25    retained_files: &mut Vec<FileInfo>,
26    spill_store: &mut Option<FileInfoSpillStore>,
27    memory_limit: usize,
28) {
29    if memory_limit == 0 {
30        spill_store
31            .get_or_insert_with(FileInfoSpillStore::new)
32            .spill(chunk);
33        return;
34    }
35
36    let remaining_capacity = memory_limit.saturating_sub(retained_files.len());
37    if remaining_capacity >= chunk.len() && spill_store.is_none() {
38        retained_files.extend(chunk);
39        return;
40    }
41
42    let mut chunk_iter = chunk.into_iter();
43    retained_files.extend(chunk_iter.by_ref().take(remaining_capacity));
44    let overflow: Vec<FileInfo> = chunk_iter.collect();
45    if !overflow.is_empty() {
46        spill_store
47            .get_or_insert_with(FileInfoSpillStore::new)
48            .spill(overflow);
49    }
50}
51
52pub(super) struct FileInfoSpillStore {
53    temp_dir: TempDir,
54    batch_index: usize,
55}
56
57impl FileInfoSpillStore {
58    fn new() -> Self {
59        Self {
60            temp_dir: TempDir::new().expect("create spill dir"),
61            batch_index: 0,
62        }
63    }
64
65    fn spill(&mut self, files: Vec<FileInfo>) {
66        let path = self
67            .temp_dir
68            .path()
69            .join(format!("batch-{:06}.json.zst", self.batch_index));
70        self.batch_index += 1;
71
72        let payload = serde_json::to_vec(&files).expect("encode spilled file batch");
73        let file = File::create(path).expect("create spill batch file");
74        let mut encoder = zstd::Encoder::new(file, 3).expect("create spill encoder");
75        encoder
76            .write_all(&payload)
77            .expect("write spilled file batch");
78        encoder.finish().expect("finish spill encoder");
79    }
80
81    pub(super) fn load_all(self) -> Vec<FileInfo> {
82        let mut paths: Vec<_> = fs::read_dir(self.temp_dir.path())
83            .expect("read spill dir")
84            .filter_map(Result::ok)
85            .map(|entry| entry.path())
86            .collect();
87        paths.sort();
88
89        let mut files = Vec::new();
90        for path in paths {
91            let file = File::open(path).expect("open spill batch");
92            let mut decoder = zstd::Decoder::new(file).expect("create spill decoder");
93            let mut payload = Vec::new();
94            decoder.read_to_end(&mut payload).expect("read spill batch");
95            let mut batch: Vec<FileInfo> =
96                serde_json::from_slice(&payload).expect("decode spilled file batch");
97            files.append(&mut batch);
98        }
99        files
100    }
101}