Skip to main content

modelvault_core/
spill.rs

1//! Spill manager for bounded-memory query operators (0.12.0+; stabilized in 0.13.0).
2//!
3//! v0 implementation: append ephemeral `Temp` segments to the store and truncate them away on drop.
4
5use crate::error::DbError;
6use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
7use crate::segments::writer::SegmentWriter;
8use crate::storage::Store;
9
10/// Owned spill file wrapper that truncates to `base_len` on drop.
11///
12/// This is used by streaming query operators that need a scratch store.
13pub struct TempSpillFile<S: Store> {
14    store: Option<S>,
15    base_len: u64,
16}
17
18impl<S: Store> TempSpillFile<S> {
19    pub fn new(store: S) -> Result<Self, DbError> {
20        let base_len = store.len()?;
21        Ok(Self {
22            store: Some(store),
23            base_len,
24        })
25    }
26
27    pub fn store_mut(&mut self) -> &mut S {
28        self.store.as_mut().expect("spill store already taken")
29    }
30
31    pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
32        let store = self.store_mut();
33        let file_len = store.len()?;
34        let mut writer = SegmentWriter::new(store, file_len);
35        let off = writer.offset();
36        let hdr = SegmentHeader {
37            segment_type: SegmentType::Temp,
38            payload_len: 0,
39            payload_crc32c: 0,
40        };
41        writer.append(hdr, payload)?;
42        Ok(off)
43    }
44
45    pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
46        let mut buf = vec![0u8; len as usize];
47        self.store_mut()
48            .read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
49        Ok(buf)
50    }
51
52    /// Explicitly truncate away all temp spill data and return the inner store.
53    pub fn finish(mut self) -> Result<S, DbError> {
54        let mut store = self.store.take().expect("spill store already taken");
55        store.truncate(self.base_len)?;
56        store.sync()?;
57        Ok(store)
58    }
59}
60
61impl<S: Store> Drop for TempSpillFile<S> {
62    fn drop(&mut self) {
63        if let Some(store) = self.store.as_mut() {
64            let _ = store.truncate(self.base_len);
65            let _ = store.sync();
66        }
67    }
68}
69
70/// RAII guard that truncates the store back to `base_len` on drop.
71pub struct TempSpillGuard<'a, S: Store> {
72    store: &'a mut S,
73    base_len: u64,
74}
75
76impl<'a, S: Store> TempSpillGuard<'a, S> {
77    pub fn new(store: &'a mut S) -> Result<Self, DbError> {
78        let base_len = store.len()?;
79        Ok(Self { store, base_len })
80    }
81
82    pub fn store_mut(&mut self) -> &mut S {
83        self.store
84    }
85
86    /// Append one `Temp` segment and return its offset.
87    pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
88        let file_len = self.store.len()?;
89        let mut writer = SegmentWriter::new(self.store, file_len);
90        let off = writer.offset();
91        let hdr = SegmentHeader {
92            segment_type: SegmentType::Temp,
93            payload_len: 0,
94            payload_crc32c: 0,
95        };
96        writer.append(hdr, payload)?;
97        Ok(off)
98    }
99
100    pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
101        let mut buf = vec![0u8; len as usize];
102        self.store
103            .read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
104        Ok(buf)
105    }
106
107    pub fn base_len(&self) -> u64 {
108        self.base_len
109    }
110}
111
112impl<S: Store> Drop for TempSpillGuard<'_, S> {
113    fn drop(&mut self) {
114        let _ = self.store.truncate(self.base_len);
115        let _ = self.store.sync();
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    include!(concat!(
122        env!("CARGO_MANIFEST_DIR"),
123        "/tests/unit/src_spill_tests.rs"
124    ));
125}