Skip to main content

modelvault_core/
spill.rs

1//! Spill manager for bounded-memory query operators (0.12.0+; stabilized in 0.13.0).
2//!
3//! v0 implementation: append ephemeral `Temp` segments to the store and truncate them away on drop.
4
5use crate::error::{DbError, QueryError};
6use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
7use crate::segments::writer::SegmentWriter;
8use crate::storage::Store;
9
10/// Owned spill file wrapper that truncates to `base_len` on drop.
11///
12/// This is used by streaming query operators that need a scratch store.
13pub struct TempSpillFile<S: Store> {
14    store: Option<S>,
15    base_len: u64,
16}
17
18impl<S: Store> TempSpillFile<S> {
19    pub fn new(store: S) -> Result<Self, DbError> {
20        let base_len = store.len()?;
21        Ok(Self {
22            store: Some(store),
23            base_len,
24        })
25    }
26
27    fn store_mut(&mut self) -> Result<&mut S, DbError> {
28        self.store.as_mut().ok_or_else(|| {
29            DbError::Query(QueryError {
30                message: "spill store unavailable".into(),
31            })
32        })
33    }
34
35    pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
36        let store = self.store_mut()?;
37        let file_len = store.len()?;
38        let mut writer = SegmentWriter::new(store, file_len);
39        let off = writer.offset();
40        let hdr = SegmentHeader {
41            segment_type: SegmentType::Temp,
42            payload_len: 0,
43            payload_crc32c: 0,
44        };
45        writer.append(hdr, payload)?;
46        Ok(off)
47    }
48
49    pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
50        let mut buf = vec![0u8; len as usize];
51        self.store_mut()?
52            .read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
53        Ok(buf)
54    }
55
56    /// Explicitly truncate away all temp spill data and return the inner store.
57    pub fn finish(mut self) -> Result<S, DbError> {
58        let mut store = self.store.take().ok_or_else(|| {
59            DbError::Query(QueryError {
60                message: "spill store unavailable".into(),
61            })
62        })?;
63        store.truncate(self.base_len)?;
64        store.sync()?;
65        Ok(store)
66    }
67}
68
69impl<S: Store> Drop for TempSpillFile<S> {
70    fn drop(&mut self) {
71        if let Some(store) = self.store.as_mut() {
72            let _ = store.truncate(self.base_len);
73            let _ = store.sync();
74        }
75    }
76}
77
78/// RAII guard that truncates the store back to `base_len` on drop.
79pub struct TempSpillGuard<'a, S: Store> {
80    store: &'a mut S,
81    base_len: u64,
82}
83
84impl<'a, S: Store> TempSpillGuard<'a, S> {
85    pub fn new(store: &'a mut S) -> Result<Self, DbError> {
86        let base_len = store.len()?;
87        Ok(Self { store, base_len })
88    }
89
90    pub fn store_mut(&mut self) -> &mut S {
91        self.store
92    }
93
94    /// Append one `Temp` segment and return its offset.
95    pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
96        let file_len = self.store.len()?;
97        let mut writer = SegmentWriter::new(self.store, file_len);
98        let off = writer.offset();
99        let hdr = SegmentHeader {
100            segment_type: SegmentType::Temp,
101            payload_len: 0,
102            payload_crc32c: 0,
103        };
104        writer.append(hdr, payload)?;
105        Ok(off)
106    }
107
108    pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
109        let mut buf = vec![0u8; len as usize];
110        self.store
111            .read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
112        Ok(buf)
113    }
114
115    pub fn base_len(&self) -> u64 {
116        self.base_len
117    }
118}
119
120impl<S: Store> Drop for TempSpillGuard<'_, S> {
121    fn drop(&mut self) {
122        let _ = self.store.truncate(self.base_len);
123        let _ = self.store.sync();
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    include!(concat!(
130        env!("CARGO_MANIFEST_DIR"),
131        "/tests/unit/src_spill_tests.rs"
132    ));
133}