persy 0.7.0

Transactional Persistence Engine
Documentation
use crate::{error::PRes, id::RecRef, journal::JournalId, transaction::FreedPage};
use std::{
    cmp::Ordering,
    collections::{hash_map::Entry, HashMap},
    sync::Mutex,
};

pub type SnapshotId = u64;

#[derive(Clone, Debug)]
pub struct SnapshotEntry {
    id: RecRef,
    pos: u64,
    version: u16,
}

impl SnapshotEntry {
    pub fn new(id: &RecRef, pos: u64, version: u16) -> SnapshotEntry {
        SnapshotEntry {
            id: id.clone(),
            pos,
            version,
        }
    }
}
#[derive(Clone, Debug)]
pub struct Snapshot {
    snapshot_id: SnapshotId,
    journal_id: Option<JournalId>,
    entries: Vec<SnapshotEntry>,
    freed_pages: Vec<FreedPage>,
    reference_count: u32,
}

#[derive(Clone, Debug, PartialEq)]
pub struct RecordVersion {
    snapshot_id: SnapshotId,
    pub pos: u64,
    pub version: u16,
}

pub struct InternalSnapshots {
    mapping: HashMap<RecRef, Vec<RecordVersion>>,
    active_snapshots: Vec<Snapshot>,
    snapshot_sequence: u64,
}

pub struct Snapshots {
    lock: Mutex<InternalSnapshots>,
}

pub fn search(value: u64, value1: u64, top: u64) -> Ordering {
    if value > top {
        if value1 > top {
            value.cmp(&value1)
        } else {
            Ordering::Less
        }
    } else if value1 > top {
        Ordering::Greater
    } else {
        value.cmp(&value1)
    }
}

impl Default for Snapshots {
    fn default() -> Snapshots {
        Self::new()
    }
}

impl Snapshots {
    pub fn new() -> Snapshots {
        Snapshots {
            lock: Mutex::new(InternalSnapshots {
                mapping: HashMap::new(),
                active_snapshots: Vec::new(),
                snapshot_sequence: 0,
            }),
        }
    }

    pub fn read_snapshot(&self) -> PRes<SnapshotId> {
        let mut lock = self.lock.lock()?;
        let snapshot_id = lock.snapshot_sequence;
        lock.snapshot_sequence += 1;
        let snapshot_sequence = lock.snapshot_sequence;

        let reference_count = if lock.active_snapshots.is_empty() { 1 } else { 2 };
        let snapshot = Snapshot {
            snapshot_id,
            journal_id: None,
            entries: Vec::new(),
            freed_pages: Vec::new(),
            reference_count,
        };
        if let Err(index) = lock
            .active_snapshots
            .binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
        {
            lock.active_snapshots.insert(index, snapshot);
        }
        Ok(snapshot_id)
    }

    pub fn snapshot(
        &self,
        entries: Vec<SnapshotEntry>,
        freed_pages: Vec<FreedPage>,
        journal_id: JournalId,
    ) -> PRes<SnapshotId> {
        let mut lock = self.lock.lock()?;
        let snapshot_id = lock.snapshot_sequence;
        lock.snapshot_sequence += 1;
        let snapshot_sequence = lock.snapshot_sequence;
        for entry in &entries {
            let to_add = RecordVersion {
                snapshot_id,
                pos: entry.pos,
                version: entry.version,
            };

            match lock.mapping.entry(entry.id.clone()) {
                Entry::Occupied(mut v) => {
                    let vec = v.get_mut();
                    if let Err(index) = vec.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
                    {
                        vec.insert(index, to_add);
                    }
                }
                Entry::Vacant(e) => {
                    let mut v = Vec::new();
                    v.push(to_add);
                    e.insert(v);
                }
            }
        }

        let reference_count = if lock.active_snapshots.is_empty() { 1 } else { 2 };
        let snapshot = Snapshot {
            snapshot_id,
            journal_id: Some(journal_id),
            entries,
            freed_pages,
            reference_count,
        };
        if let Err(index) = lock
            .active_snapshots
            .binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
        {
            lock.active_snapshots.insert(index, snapshot);
        }
        Ok(snapshot_id)
    }

    pub fn read(&self, snapshot_id: SnapshotId, id: &RecRef) -> PRes<Option<RecordVersion>> {
        let lock = self.lock.lock()?;
        let snapshot_sequence = lock.snapshot_sequence;
        Ok(if let Some(v) = lock.mapping.get(id) {
            let index = match v.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence)) {
                Ok(index) => index,
                Err(index) => index,
            };
            v.get(index).cloned()
        } else {
            None
        })
    }

    fn clear_from(&self, snapshot_id: SnapshotId) -> PRes<(Vec<FreedPage>, Vec<JournalId>)> {
        let mut lock = self.lock.lock()?;
        let snapshot_sequence = lock.snapshot_sequence;
        let mut free_pages = Vec::new();
        let mut journal_ids = Vec::new();
        if let Ok(index) = lock
            .active_snapshots
            .binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
        {
            for tx in lock.active_snapshots.split_off(index) {
                if let Some(id) = tx.journal_id {
                    journal_ids.push(id.clone());
                }
                for record in tx.entries {
                    if let Some(v) = lock.mapping.get_mut(&record.id) {
                        if let Ok(index) = v.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
                        {
                            v.split_off(index);
                        }
                    }
                }
                free_pages.extend(tx.freed_pages);
            }
        }
        Ok((free_pages, journal_ids))
    }

    pub fn release(&self, snapshot_id: SnapshotId) -> PRes<(Vec<FreedPage>, Vec<JournalId>)> {
        //TODO: This work fine but can cause problems if double release is called for the same id,
        //to refactor to something a bit more safe
        let mut clear_id = None;
        {
            let mut lock = self.lock.lock()?;
            let snapshot_sequence = lock.snapshot_sequence;
            if let Ok(index) = lock
                .active_snapshots
                .binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
            {
                let mut loop_index = index;
                while let Some(snap) = lock.active_snapshots.get_mut(loop_index) {
                    snap.reference_count -= 1;
                    if snap.reference_count > 0 {
                        break;
                    }
                    clear_id = Some(snap.snapshot_id);
                    loop_index += 1;
                }
            }
        }
        if let Some(c_id) = clear_id {
            self.clear_from(c_id)
        } else {
            Ok((Vec::new(), Vec::new()))
        }
    }
}

#[cfg(test)]
mod tests {
    use super::{search, RecordVersion, SnapshotEntry, Snapshots};
    use crate::{id::RecRef, journal::JournalId, transaction::FreedPage};
    use std::cmp::Ordering;

    #[test]
    fn test_search() {
        assert_eq!(search(10, 20, 40), Ordering::Less);
        assert_eq!(search(20, 10, 40), Ordering::Greater);
        assert_eq!(search(10, 30, 20), Ordering::Greater);
        assert_eq!(search(30, 10, 20), Ordering::Less);
        assert_eq!(search(20, 20, 20), Ordering::Equal);
        assert_eq!(search(20, 19, 20), Ordering::Greater);
        assert_eq!(search(20, 21, 20), Ordering::Greater);
        assert_eq!(search(21, 21, 20), Ordering::Equal);
        assert_eq!(search(19, 19, 20), Ordering::Equal);
    }

    #[test]
    fn add_and_read() {
        let snap = Snapshots::new();
        let mut records = Vec::new();
        for x in 0..3 {
            records.push(SnapshotEntry {
                id: RecRef::new(10, x),
                pos: x as u64,
                version: x as u16,
            });
        }
        let freed_pages: Vec<_> = records.iter().map(|x| FreedPage::new(x.pos)).collect();
        let tx = snap
            .snapshot(records.clone(), freed_pages.clone(), JournalId::new(0, 0))
            .unwrap();
        snap.clear_from(tx).unwrap();
        let tx = snap.snapshot(records, freed_pages, JournalId::new(0, 0)).unwrap();
        assert_eq!(
            snap.read(tx - 1, &RecRef::new(10, 2)).unwrap(),
            Some(RecordVersion {
                snapshot_id: 1,
                pos: 2,
                version: 2,
            })
        );

        assert_eq!(
            snap.read(tx - 1, &RecRef::new(10, 1)).unwrap(),
            Some(RecordVersion {
                snapshot_id: 1,
                pos: 1,
                version: 1,
            })
        );

        assert_eq!(
            snap.read(tx, &RecRef::new(10, 2)).unwrap(),
            Some(RecordVersion {
                snapshot_id: 1,
                pos: 2,
                version: 2,
            })
        );
        assert_eq!(snap.read(tx + 1, &RecRef::new(10, 2)).unwrap(), None);
        assert_eq!(snap.read(tx + 1, &RecRef::new(10, 10)).unwrap(), None);
    }

    #[test]
    fn add_and_read_mutliple_tx() {
        let snap = Snapshots::new();
        let mut txs = Vec::new();
        for t in 0..5 {
            let mut records = Vec::new();
            for x in 0..10 {
                // I skip a record for the specific tx because i need a missing record for the test
                if x != t {
                    records.push(SnapshotEntry {
                        id: RecRef::new(10, x),
                        pos: (10 * t + x) as u64,
                        version: (10 * t + x) as u16,
                    });
                }
            }
            let freed_pages: Vec<_> = records.iter().map(|x| FreedPage::new(x.pos)).collect();
            txs.push(snap.snapshot(records, freed_pages, JournalId::new(0, 0)).unwrap());
        }
        assert_eq!(
            snap.read(txs[2], &RecRef::new(10, 2)).unwrap(),
            Some(RecordVersion {
                snapshot_id: txs[3],
                pos: 32,
                version: 32,
            })
        );

        assert_eq!(
            snap.read(txs[3], &RecRef::new(10, 3)).unwrap(),
            Some(RecordVersion {
                snapshot_id: txs[4],
                pos: 43,
                version: 43,
            })
        );
    }

    #[test]
    fn test_snapshot_reference_count() {
        let snap = Snapshots::new();
        let mut records = Vec::new();
        for x in 0..3 {
            records.push(SnapshotEntry {
                id: RecRef::new(10, x),
                pos: x as u64,
                version: x as u16,
            });
        }
        let first = snap.read_snapshot().unwrap();
        let freed_pages: Vec<_> = records.iter().map(|x| FreedPage::new(x.pos)).collect();
        let tx = snap
            .snapshot(records.clone(), freed_pages, JournalId::new(0, 0))
            .unwrap();
        let last = snap.read_snapshot().unwrap();
        snap.release(tx).unwrap();
        assert_eq!(
            snap.read(first, &RecRef::new(10, 2)).unwrap(),
            Some(RecordVersion {
                snapshot_id: 1,
                pos: 2,
                version: 2,
            })
        );

        snap.release(first).unwrap();
        assert_eq!(snap.read(last, &RecRef::new(10, 2)).unwrap(), None);
    }
}