persy 0.9.0

Transactional Persistence Engine
Documentation
use crate::{
    address::Address,
    error::{PRes, PersyError},
    index::config::{format_segment_name_data, format_segment_name_meta, index_name_from_meta_segment},
    persy::PersyImpl,
    snapshot::{SnapshotId, Snapshots},
    transaction::{Transaction, TxSegCheck},
};
use data_encoding::BASE32_DNSSEC;
pub use std::fs::OpenOptions;
use std::{fmt, io::Write, str};
use unsigned_varint::{
    decode::{u32 as u32_vdec, u64 as u64_vdec},
    encode::{u32 as u32_venc, u32_buffer, u64 as u64_venc, u64_buffer},
};

fn write_id(f: &mut fmt::Formatter, id: u32) -> fmt::Result {
    let mut buffer = Vec::new();
    buffer
        .write_all(u32_venc(id, &mut u32_buffer()))
        .expect("no failure expected only memory allocation");
    buffer.push(0b0101_0101);
    write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
}

fn read_id(s: &str) -> PRes<u32> {
    let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
    Ok(u32_vdec(&bytes)?.0)
}

#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
pub struct RecRef {
    pub page: u64,
    pub pos: u32,
}

impl RecRef {
    #[inline]
    pub fn new(page: u64, pos: u32) -> RecRef {
        RecRef { page, pos }
    }
}

impl fmt::Display for RecRef {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let mut buffer = Vec::new();
        buffer
            .write_all(u64_venc(self.page, &mut u64_buffer()))
            .expect("no failure expected only memory allocation");
        buffer.push(0b0101_0101);
        buffer
            .write_all(u32_venc(self.pos, &mut u32_buffer()))
            .expect("no failure expected only memory allocation");
        write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
    }
}

impl std::str::FromStr for RecRef {
    type Err = PersyError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
        let (page, rest) = u64_vdec(&bytes)?;
        let (pos, _) = u32_vdec(&rest[1..])?;
        Ok(RecRef::new(page, pos))
    }
}

/// Identifier of a persistent record, can be used for read, update or delete the record
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
pub struct PersyId(pub(crate) RecRef);

impl fmt::Display for PersyId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl std::str::FromStr for PersyId {
    type Err = PersyError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        Ok(PersyId(s.parse()?))
    }
}

/// Represents a looked-up segment
///
/// # Invalidation
/// When a segment is deleted, the corresponding SegmentId becomes invalid.
///
/// # Serialization
/// This type supports serialization,
/// It does NOT serialize to the segment name.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
pub struct SegmentId {
    pub(crate) id: u32,
}

impl SegmentId {
    #[inline]
    pub(crate) fn new(id: u32) -> Self {
        SegmentId { id }
    }
}

pub trait ToSegmentId {
    fn to_segment_id(self, address: &Address) -> PRes<SegmentId>;
    fn to_segment_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(SegmentId, bool)>;
    fn to_segment_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<SegmentId>;
}

impl ToSegmentId for &SegmentId {
    #[inline]
    fn to_segment_id(self, address: &Address) -> PRes<SegmentId> {
        self.clone().to_segment_id(address)
    }
    #[inline]
    fn to_segment_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(SegmentId, bool)> {
        self.clone().to_segment_id_tx(persy, tx)
    }
    #[inline]
    fn to_segment_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<SegmentId> {
        self.clone().to_segment_id_snapshot(snapshots, snapshot)
    }
}

impl ToSegmentId for SegmentId {
    #[inline]
    fn to_segment_id(self, _address: &Address) -> PRes<SegmentId> {
        Ok(self)
    }
    #[inline]
    fn to_segment_id_tx(self, _persy: &PersyImpl, tx: &Transaction) -> PRes<(SegmentId, bool)> {
        // Is safe to check only if is created in tx, because a segment cannot be created and
        // dropped in the same tx
        let id = self.id;
        Ok((self, tx.segment_created_in_tx(id)))
    }
    #[inline]
    fn to_segment_id_snapshot(self, _snapshots: &Snapshots, _snapshot: SnapshotId) -> PRes<SegmentId> {
        Ok(self)
    }
}

impl<T: AsRef<str>> ToSegmentId for T {
    #[inline]
    fn to_segment_id(self, address: &Address) -> PRes<SegmentId> {
        address
            .segment_id(self.as_ref())?
            .map_or(Err(PersyError::SegmentNotFound), |id| Ok(SegmentId { id }))
    }
    #[inline]
    fn to_segment_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(SegmentId, bool)> {
        persy
            .check_segment_tx(tx, self.as_ref())
            .map(|(cr_in_tx, id)| (SegmentId::new(id), cr_in_tx))
    }
    fn to_segment_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<SegmentId> {
        snapshots
            .solve_segment_id(snapshot, self.as_ref())?
            .ok_or(PersyError::SegmentNotFound)
    }
}

impl fmt::Display for SegmentId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write_id(f, self.id)
    }
}

impl std::str::FromStr for SegmentId {
    type Err = PersyError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        Ok(SegmentId::new(read_id(s)?))
    }
}

/// Unique identifier of an index
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
pub struct IndexId {
    meta: u32,
    data: u32,
}

pub trait ToIndexId {
    fn to_index_id(self, address: &Address) -> PRes<IndexId>;
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(IndexId, bool)>;
    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<IndexId>;
}

impl ToIndexId for &IndexId {
    #[inline]
    fn to_index_id(self, address: &Address) -> PRes<IndexId> {
        self.clone().to_index_id(address)
    }
    #[inline]
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(IndexId, bool)> {
        self.clone().to_index_id_tx(persy, tx)
    }
    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<IndexId> {
        self.clone().to_index_id_snapshot(snapshots, snapshot)
    }
}

impl ToIndexId for IndexId {
    #[inline]
    fn to_index_id(self, address: &Address) -> PRes<IndexId> {
        if self.data == 0 {
            // backward compatible serialization does not have data, find it by name convention
            let meta_name = address
                .segment_name_by_id(self.meta)?
                .ok_or(PersyError::IndexNotFound)?;
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
            let data = address.segment_id(&data_name)?.ok_or(PersyError::IndexNotFound)?;
            Ok(IndexId::new(self.meta, data))
        } else {
            Ok(self)
        }
    }

    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<IndexId> {
        if self.data == 0 {
            // backward compatible serialization does not have data, find it by name convention
            let meta_name = snapshots
                .solve_segment_name(snapshot, SegmentId::new(self.meta))?
                .ok_or(PersyError::IndexNotFound)?;
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
            let data = snapshots
                .solve_segment_id(snapshot, &data_name)?
                .ok_or(PersyError::IndexNotFound)?;
            Ok(IndexId::new(self.meta, data.id))
        } else {
            Ok(self)
        }
    }

    #[inline]
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(IndexId, bool)> {
        if self.data == 0 {
            // backward compatible serialization does not have data, find it by name convention
            let (meta_name, in_tx) = persy.segment_name_tx(tx, self.meta)?.ok_or(PersyError::IndexNotFound)?;
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
            let data = if in_tx {
                if let TxSegCheck::CREATED(id) = tx.exists_segment(&data_name) {
                    id
                } else {
                    return Err(PersyError::IndexNotFound);
                }
            } else {
                persy
                    .address()
                    .segment_id(&data_name)?
                    .ok_or(PersyError::IndexNotFound)?
            };
            Ok((IndexId::new(self.meta, data), in_tx))
        } else {
            // Is safe to check only if is created in tx, because a segment cannot be created and
            // dropped in the same tx

            let id = self.meta;
            Ok((self, tx.segment_created_in_tx(id)))
        }
    }
}

impl<T: AsRef<str>> ToIndexId for T {
    #[inline]
    fn to_index_id(self, address: &Address) -> PRes<IndexId> {
        let meta_name = format_segment_name_meta(self.as_ref());
        let data_name = format_segment_name_data(self.as_ref());
        let meta = address.segment_id(&meta_name)?.ok_or(PersyError::IndexNotFound)?;
        let data = address.segment_id(&data_name)?.ok_or(PersyError::IndexNotFound)?;
        Ok(IndexId::new(meta, data))
    }
    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<IndexId> {
        let meta_name = format_segment_name_meta(self.as_ref());
        let data_name = format_segment_name_data(self.as_ref());
        let meta = snapshots
            .solve_segment_id(snapshot, &meta_name)?
            .ok_or(PersyError::IndexNotFound)?;
        let data = snapshots
            .solve_segment_id(snapshot, &data_name)?
            .ok_or(PersyError::IndexNotFound)?;
        Ok(IndexId::new(meta.id, data.id))
    }
    #[inline]
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(IndexId, bool)> {
        let meta_name = format_segment_name_meta(self.as_ref());
        let data_name = format_segment_name_data(self.as_ref());
        let (_, meta) = persy.check_segment_tx(tx, &meta_name)?;
        let (in_tx, data) = persy.check_segment_tx(tx, &data_name)?;
        Ok((IndexId::new(meta, data), in_tx))
    }
}
impl IndexId {
    pub(crate) fn new(meta: u32, data: u32) -> IndexId {
        IndexId { meta, data }
    }
}

impl fmt::Display for IndexId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let mut buffer = Vec::new();
        buffer
            .write_all(u32_venc(self.meta, &mut u32_buffer()))
            .expect("no failure expected only memory allocation");
        buffer
            .write_all(u32_venc(self.data, &mut u32_buffer()))
            .expect("no failure expected only memory allocation");
        buffer.push(0b0101_0101);
        write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
    }
}

impl std::str::FromStr for IndexId {
    type Err = PersyError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
        let (meta, bytes) = u32_vdec(&bytes)?;
        let data = if bytes.len() > 1 {
            let (d, _) = u32_vdec(&bytes)?;
            d
        } else {
            0
        };
        Ok(IndexId::new(meta, data))
    }
}

pub fn index_id_to_segment_id_meta(id: &IndexId) -> SegmentId {
    SegmentId { id: id.meta }
}
pub fn index_id_to_segment_id_data(id: &IndexId) -> SegmentId {
    SegmentId { id: id.data }
}

#[cfg(test)]
mod tests {
    use super::{IndexId, RecRef, SegmentId};

    #[test]
    fn test_persy_id_string() {
        let id = RecRef::new(20, 30);
        let s = format!("{}", id);
        assert_eq!(s.parse::<RecRef>().ok(), Some(id));
    }

    #[test]
    fn test_persy_id_parse_failure() {
        let s = "ACCC";
        assert!(s.parse::<RecRef>().is_err());
    }

    #[test]
    fn test_segmend_id_string() {
        let id = SegmentId::new(20);
        let s = format!("{}", id);
        assert_eq!(s.parse::<SegmentId>().ok(), Some(id));
    }

    #[test]
    fn test_segment_id_parse_failure() {
        let s = "ACCC";
        assert!(s.parse::<SegmentId>().is_err());
    }

    #[test]
    fn test_index_id_string() {
        let id = IndexId::new(20, 30);
        let s = format!("{}", id);
        assert_eq!(s.parse::<IndexId>().ok(), Some(id));
    }

    #[test]
    fn test_index_id_parse_failure() {
        let s = "ACCC";
        assert!(s.parse::<IndexId>().is_err());
    }
}