use crate::error::{DbError, QueryError};
use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
use crate::segments::writer::SegmentWriter;
use crate::storage::Store;
pub struct TempSpillFile<S: Store> {
store: Option<S>,
base_len: u64,
}
impl<S: Store> TempSpillFile<S> {
pub fn new(store: S) -> Result<Self, DbError> {
let base_len = store.len()?;
Ok(Self {
store: Some(store),
base_len,
})
}
fn store_mut(&mut self) -> Result<&mut S, DbError> {
self.store.as_mut().ok_or_else(|| {
DbError::Query(QueryError {
message: "spill store unavailable".into(),
})
})
}
pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
let store = self.store_mut()?;
let file_len = store.len()?;
let mut writer = SegmentWriter::new(store, file_len);
let off = writer.offset();
let hdr = SegmentHeader {
segment_type: SegmentType::Temp,
payload_len: 0,
payload_crc32c: 0,
};
writer.append(hdr, payload)?;
Ok(off)
}
pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
let mut buf = vec![0u8; len as usize];
self.store_mut()?
.read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
Ok(buf)
}
pub fn finish(mut self) -> Result<S, DbError> {
let mut store = self.store.take().ok_or_else(|| {
DbError::Query(QueryError {
message: "spill store unavailable".into(),
})
})?;
store.truncate(self.base_len)?;
store.sync()?;
Ok(store)
}
}
impl<S: Store> Drop for TempSpillFile<S> {
fn drop(&mut self) {
if let Some(store) = self.store.as_mut() {
let _ = store.truncate(self.base_len);
let _ = store.sync();
}
}
}
pub struct TempSpillGuard<'a, S: Store> {
store: &'a mut S,
base_len: u64,
}
impl<'a, S: Store> TempSpillGuard<'a, S> {
pub fn new(store: &'a mut S) -> Result<Self, DbError> {
let base_len = store.len()?;
Ok(Self { store, base_len })
}
pub fn store_mut(&mut self) -> &mut S {
self.store
}
pub fn append_temp_segment(&mut self, payload: &[u8]) -> Result<u64, DbError> {
let file_len = self.store.len()?;
let mut writer = SegmentWriter::new(self.store, file_len);
let off = writer.offset();
let hdr = SegmentHeader {
segment_type: SegmentType::Temp,
payload_len: 0,
payload_crc32c: 0,
};
writer.append(hdr, payload)?;
Ok(off)
}
pub fn read_temp_payload(&mut self, offset: u64, len: u64) -> Result<Vec<u8>, DbError> {
let mut buf = vec![0u8; len as usize];
self.store
.read_exact_at(offset + SEGMENT_HEADER_LEN as u64, &mut buf)?;
Ok(buf)
}
pub fn base_len(&self) -> u64 {
self.base_len
}
}
impl<S: Store> Drop for TempSpillGuard<'_, S> {
fn drop(&mut self) {
let _ = self.store.truncate(self.base_len);
let _ = self.store.sync();
}
}
#[cfg(test)]
mod tests {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/unit/src_spill_tests.rs"
));
}