sled 0.34.7

Lightweight high-performance pure-rust transactional embedded database.
Documentation
use crate::{pagecache::*, *};

/// A pending log reservation which can be aborted or completed.
/// NB the holder should quickly call `complete` or `abort` as
/// taking too long to decide will cause the underlying IO
/// buffer to become blocked.
#[derive(Debug)]
pub struct Reservation<'a> {
    pub(super) log: &'a Log,
    pub(super) iobuf: Arc<IoBuf>,
    pub(super) buf: &'a mut [u8],
    pub(super) flushed: bool,
    pub(super) pointer: DiskPtr,
    pub(super) lsn: Lsn,
    pub(super) is_blob_rewrite: bool,
    pub(super) header_len: usize,
}

impl<'a> Drop for Reservation<'a> {
    fn drop(&mut self) {
        // We auto-abort if the user never uses a reservation.
        if !self.flushed {
            if let Err(e) = self.flush(false) {
                self.log.config.set_global_error(e);
            }
        }
    }
}

impl<'a> Reservation<'a> {
    /// Cancel the reservation, placing a failed flush on disk, returning
    /// the (cancelled) log sequence number and file offset.
    pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
        if self.pointer.is_blob() && !self.is_blob_rewrite {
            // we don't want to remove this blob if something
            // else may still be using it.

            trace!(
                "removing blob for aborted reservation at lsn {}",
                self.pointer
            );

            remove_blob(self.pointer.blob().1, &self.log.config)?;
        }

        self.flush(false)
    }

    /// Complete the reservation, placing the buffer on disk. returns
    /// the log sequence number of the write, and the file offset.
    pub fn complete(mut self) -> Result<(Lsn, DiskPtr)> {
        self.flush(true)
    }

    /// Get the log sequence number for this update.
    pub const fn lsn(&self) -> Lsn {
        self.lsn
    }

    /// Get the underlying storage location for the written value.
    /// Note that an blob write still has a pointer in the
    /// log at the provided lid location.
    pub const fn pointer(&self) -> DiskPtr {
        self.pointer
    }

    /// Returns the length of the on-log reservation.
    pub(crate) fn reservation_len(&self) -> usize {
        self.buf.len()
    }

    /// Refills the reservation buffer with new data.
    /// Must supply a buffer of an identical length
    /// as the one initially provided. Don't use this
    /// on messages subject to compression etc...
    ///
    /// # Panics
    ///
    /// Will panic if the reservation is not the correct
    /// size to hold a serialized Lsn.
    #[doc(hidden)]
    pub fn mark_writebatch(self, peg_lsn: Lsn) -> Result<(Lsn, DiskPtr)> {
        trace!(
            "writing batch required stable lsn {} into \
             BatchManifest at lid {} peg_lsn {}",
            peg_lsn,
            self.pointer.lid(),
            self.lsn
        );

        if self.lsn == peg_lsn {
            // this can happen because high-level tree updates
            // may result in no work happening.
            self.abort()
        } else {
            self.buf[4] = MessageKind::BatchManifest.into();

            let buf = lsn_to_arr(peg_lsn);

            let dst = &mut self.buf[self.header_len..];

            dst.copy_from_slice(&buf);

            let mut intervals = self.log.iobufs.intervals.lock();
            intervals.mark_batch((self.lsn, peg_lsn));
            drop(intervals);

            self.complete()
        }
    }

    fn flush(&mut self, valid: bool) -> Result<(Lsn, DiskPtr)> {
        if self.flushed {
            panic!("flushing already-flushed reservation!");
        }

        self.flushed = true;

        if !valid {
            // don't actually zero the message, still check its hash
            // on recovery to find corruption.
            self.buf[4] = MessageKind::Canceled.into();
        }

        let crc32 = calculate_message_crc32(
            self.buf[..self.header_len].as_ref(),
            &self.buf[self.header_len..],
        );
        let crc32_arr = u32_to_arr(crc32);

        #[allow(unsafe_code)]
        unsafe {
            std::ptr::copy_nonoverlapping(
                crc32_arr.as_ptr(),
                self.buf.as_mut_ptr(),
                std::mem::size_of::<u32>(),
            );
        }
        self.log.exit_reservation(&self.iobuf)?;

        Ok((self.lsn(), self.pointer()))
    }
}