1use crate::error::{DbError, QueryError};
6use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
7use crate::segments::writer::SegmentWriter;
8use crate::storage::Store;
9
10pub struct TempSpillFile<S: Store> {
14 store: Option<S>,
15 base_len: u64,
16}
17
18impl<S: Store> TempSpillFile<S> {
19 pub fn new(store: S) -> Result<Self, DbError> {
20 let base_len = store.len()?;
21 Ok(Self {
22 store: Some(store),
23 base_len,
24 })
25 }
26
27 fn store_mut(&mut self) -> Result<&mut S, DbError> {
28 self.store.as_mut().ok_or_else(|| {
29 DbError::Query(QueryError {
30 message: "spill store unavailable".into(),
31 })
32 })
33 }
34
35 pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
36 let store = self.store_mut()?;
37 let file_len = store.len()?;
38 let mut writer = SegmentWriter::new(store, file_len);
39 let off = writer.offset();
40 let hdr = SegmentHeader {
41 segment_type: SegmentType::Temp,
42 payload_len: 0,
43 payload_crc32c: 0,
44 };
45 writer.append(hdr, payload)?;
46 Ok(off)
47 }
48
49 pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
50 let mut buf = vec![0u8; len as usize];
51 self.store_mut()?
52 .read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
53 Ok(buf)
54 }
55
56 pub fn finish(mut self) -> Result<S, DbError> {
58 let mut store = self.store.take().ok_or_else(|| {
59 DbError::Query(QueryError {
60 message: "spill store unavailable".into(),
61 })
62 })?;
63 store.truncate(self.base_len)?;
64 store.sync()?;
65 Ok(store)
66 }
67}
68
69impl<S: Store> Drop for TempSpillFile<S> {
70 fn drop(&mut self) {
71 if let Some(store) = self.store.as_mut() {
72 let _ = store.truncate(self.base_len);
73 let _ = store.sync();
74 }
75 }
76}
77
78pub struct TempSpillGuard<'a, S: Store> {
80 store: &'a mut S,
81 base_len: u64,
82}
83
84impl<'a, S: Store> TempSpillGuard<'a, S> {
85 pub fn new(store: &'a mut S) -> Result<Self, DbError> {
86 let base_len = store.len()?;
87 Ok(Self { store, base_len })
88 }
89
90 pub fn store_mut(&mut self) -> &mut S {
91 self.store
92 }
93
94 pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
96 let file_len = self.store.len()?;
97 let mut writer = SegmentWriter::new(self.store, file_len);
98 let off = writer.offset();
99 let hdr = SegmentHeader {
100 segment_type: SegmentType::Temp,
101 payload_len: 0,
102 payload_crc32c: 0,
103 };
104 writer.append(hdr, payload)?;
105 Ok(off)
106 }
107
108 pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
109 let mut buf = vec![0u8; len as usize];
110 self.store
111 .read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
112 Ok(buf)
113 }
114
115 pub fn base_len(&self) -> u64 {
116 self.base_len
117 }
118}
119
120impl<S: Store> Drop for TempSpillGuard<'_, S> {
121 fn drop(&mut self) {
122 let _ = self.store.truncate(self.base_len);
123 let _ = self.store.sync();
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 include!(concat!(
130 env!("CARGO_MANIFEST_DIR"),
131 "/tests/unit/src_spill_tests.rs"
132 ));
133}