//! # Persy - Transactional Persistence Engine
//!
//! Simple single file, durable, paginated, transactional persistence engine, based on copy on write, write
//! ahead log, two phase commit.
//!
//! # Example
//!
//! ```rust
//! use persy::{Persy,Config};
//! //...
//! # use persy::PRes;
//! # fn foo() -> PRes<()> {
//! Persy::create("./open.persy")?;
//! let persy = Persy::open("./open.persy",Config::new())?;
//! let mut tx = persy.begin()?;
//! tx.create_segment("seg")?;
//! let data = vec![1;20];
//! tx.insert_record("seg", &data)?;
//! let prepared = tx.prepare_commit()?;
//! prepared.commit()?;
//! for (_id,content) in persy.scan("seg")? {
//! assert_eq!(content[0], 20);
//! //....
//! }
//! # Ok(())
//! # }
//! ```
//!
//!
mod address;
mod allocator;
mod config;
mod discref;
mod error;
mod flush_checksum;
mod id;
mod index;
mod journal;
mod locks;
mod persy;
mod record_scanner;
mod segment;
mod snapshot;
mod transaction;
pub use crate::{
config::{Config, TxStrategy},
error::{PRes, PersyError},
id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
index::config::{ByteVec, IndexType, IndexTypeId, ValueMode},
index::tree::Value,
persy::{IndexInfo, RecoverStatus},
snapshot::SnapshotId,
};
use crate::{
index::keeper::{IndexRawIter, TxIndexRawIter},
persy::{PersyImpl, RecoverImpl, TxFinalize},
record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
};
use fs2::FileExt;
use std::{
fs::{self, File},
mem::replace,
ops::RangeBounds,
path::Path,
sync::Arc,
};
/// Transaction container, it include all the changes done in a transaction.
pub struct Transaction {
persy_impl: Arc<PersyImpl>,
tx: Option<transaction::Transaction>,
}
pub type TransactionId = Vec<u8>;
/// Main structure to operate persy storage files
///
#[derive(Clone)]
pub struct Persy {
persy_impl: Arc<PersyImpl>,
}
/// Options, flags, configs which can be used
/// to configure how a persy database is opened.
///
/// ```
/// use persy::{OpenOptions, Persy, PersyId, PRes, ValueMode};
///
/// # fn main() -> PRes<()> {
/// // This function will only be called on database creation
/// fn init(persy: &Persy) -> PRes<()> {
/// let mut tx = persy.begin()?;
///
/// tx.create_segment("data")?;
/// tx.create_index::<u64, PersyId>("index", ValueMode::REPLACE)?;
///
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
///
/// println!("Segment and Index successfully created");
/// Ok(())
/// }
///
/// let persy = OpenOptions::new().create(true).prepare_with(init).open("target/persy.db")?;
/// # std::fs::remove_file("target/persy.db")?;
/// # Ok(())
/// # }
/// ```
pub struct OpenOptions {
truncate: bool,
create: bool,
create_new: bool,
config: Config,
prepare: Option<Box<dyn Fn(&Persy) -> PRes<()>>>,
recover: Option<Box<dyn Fn(&Vec<u8>) -> bool>>,
}
impl OpenOptions {
pub fn new() -> OpenOptions {
OpenOptions {
truncate: false,
create: false,
create_new: false,
config: Config::new(),
prepare: None,
recover: None,
}
}
/// Truncate the file on open removing all the persistent data
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
self.truncate = truncate;
self
}
/// Create a new file if not exists
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
self.create = create;
self
}
/// Create a new file if exists fail
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
self.create_new = create_new;
self
}
/// Provide a function for initialize the storage in case did not existed
///
pub fn prepare_with<F>(&mut self, prepare: F) -> &mut OpenOptions
where
F: Fn(&Persy) -> PRes<()> + 'static,
{
self.prepare = Some(Box::new(prepare));
self
}
/// Provide a function for check if a transaction must be committed or rollback in case of
/// recover from crash
pub fn recover_with<F>(&mut self, recover: F) -> &mut OpenOptions
where
F: Fn(&Vec<u8>) -> bool + 'static,
{
self.recover = Some(Box::new(recover));
self
}
/// Provide general storage configurations
pub fn config(&mut self, config: Config) -> &mut OpenOptions {
self.config = config;
self
}
/// Open a file to a recover structure to list pending transactions and select witch commit and
/// rollback
pub fn recover<P>(&mut self, path: P) -> PRes<Recover>
where
P: AsRef<Path>,
{
let path = path.as_ref();
let must_prepare = !path.exists() || self.truncate;
let file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(self.create)
.create_new(self.create_new)
.open(path)?;
let config = self.config.clone();
// according to the dup2 documentation any acquired lock
// is shared with duplicated file descriptors (try_clone).
// https://www.mkssoftware.com/docs/man3/dup2.3.asp
file.try_lock_exclusive()?;
if self.truncate {
file.set_len(0)?;
}
if must_prepare {
let file2 = file.try_clone()?;
Persy::create_from_file(file)?;
let (persy_impl, recov) = PersyImpl::open_recover(file2, config)?;
let p = Arc::new(persy_impl);
if let Some(prepare) = &mut self.prepare {
let persy = Persy { persy_impl: p.clone() };
(prepare)(&persy)?;
}
Ok(Recover::new(recov, p))
} else if let Some(recover) = &self.recover {
let (persy_impl, mut recov) = PersyImpl::open_recover(file, config)?;
recov.apply(recover)?;
Ok(Recover::new(recov, Arc::new(persy_impl)))
} else {
let (persy_impl, recov) = PersyImpl::open_recover(file, config)?;
Ok(Recover::new(recov, Arc::new(persy_impl)))
}
}
/// Open a file from the given path with the current options
pub fn open<P>(&mut self, path: P) -> PRes<Persy>
where
P: AsRef<Path>,
{
let recover = self.recover(path)?;
recover.finalize()
}
}
impl Default for OpenOptions {
fn default() -> OpenOptions {
OpenOptions::new()
}
}
/// Intermediate recover status to select witch transactions to commit or rollback and list witch
/// transactions are in a intermediate state
///
/// # Example
///
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./recover_example.persy")?;
/// let mut recover = Persy::recover("./recover_example.persy", Config::new())?;
/// for (tx_id,status) in recover.list_transactions()? {
/// // Check the transaction if can be committed using the tx_id
/// if true {
/// // if so commit the tx
/// recover.commit(tx_id)?;
/// } else {
/// // otherwise roll back it
/// recover.rollback(tx_id)?;
/// }
/// // finalize all the transaction marked to finalize and get a persy instance.
/// }
/// let persy = recover.finalize()?;
/// # Ok(())
/// # }
/// ```
///
///
pub struct Recover {
recover_impl: RecoverImpl,
persy_impl: Arc<PersyImpl>,
}
impl Recover {
fn new(recover_impl: RecoverImpl, persy_impl: Arc<PersyImpl>) -> Recover {
Recover {
recover_impl,
persy_impl,
}
}
/// List all the transactions found in the log with the current status
pub fn list_transactions(&self) -> PRes<Vec<(TransactionId, RecoverStatus)>> {
self.recover_impl.list_transactions()
}
/// Mark to commit a transaction in the log with state prepared commit
pub fn commit(&mut self, tx_id: TransactionId) -> PRes<()> {
self.recover_impl.commit(tx_id)
}
/// Mark to rollback a transaction that is not yet committed
pub fn rollback(&mut self, tx_id: TransactionId) -> PRes<()> {
self.recover_impl.rollback(tx_id)
}
/// Read the status of a transaction in the log
pub fn status(&self, tx_id: TransactionId) -> PRes<Option<RecoverStatus>> {
self.recover_impl.status(tx_id)
}
/// Recover all the prepared committed transactions that are not marked to rollback
pub fn finalize(self) -> PRes<Persy> {
self.persy_impl.final_recover(self.recover_impl)?;
Ok(Persy {
persy_impl: self.persy_impl,
})
}
}
/// prepared transaction state
#[must_use]
pub struct TransactionFinalize {
persy_impl: Arc<PersyImpl>,
finalize: Option<TxFinalize>,
}
impl TransactionFinalize {
/// Rollback a prepared commit.
///
/// All the modification are rolled back and all the used resources are put released
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// //Do what ever operations on the records
/// let data = vec![1;20];
/// tx.insert_record("seg", &data)?;
/// let prepared = tx.prepare_commit()?;
/// prepared.rollback()?;
/// # Ok(())
/// # }
/// ```
pub fn rollback(mut self) -> PRes<()> {
if let Some(mut finalize) = replace(&mut self.finalize, None) {
self.persy_impl.rollback_prepared(&mut finalize)?;
}
Ok(())
}
/// Finalize the commit result of a prepared commit.
///
/// All the operation done on the transaction are finalized all the lock released, all the
/// old resources are released for reuse.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn commit(mut self) -> PRes<()> {
if let Some(mut finalize) = replace(&mut self.finalize, None) {
self.persy_impl.commit(&mut finalize)?;
}
Ok(())
}
}
impl Drop for TransactionFinalize {
fn drop(&mut self) {
if let Some(mut finalize) = replace(&mut self.finalize, None) {
self.persy_impl.rollback_prepared(&mut finalize).unwrap();
}
}
}
/// Index Iterator implementation for iterating on a range of keys
pub struct IndexIter<K: IndexType, V: IndexType> {
iter_impl: IndexRawIter<K, V>,
persy_impl: Arc<PersyImpl>,
}
impl<K: IndexType, V: IndexType> IndexIter<K, V> {
fn new(iter_impl: IndexRawIter<K, V>, persy_impl: Arc<PersyImpl>) -> IndexIter<K, V> {
IndexIter { iter_impl, persy_impl }
}
}
impl<K, V> Iterator for IndexIter<K, V>
where
K: IndexType,
V: IndexType,
{
type Item = (K, Value<V>);
fn next(&mut self) -> Option<Self::Item> {
self.iter_impl.next(&self.persy_impl)
}
}
impl<K, V> DoubleEndedIterator for IndexIter<K, V>
where
K: IndexType,
V: IndexType,
{
fn next_back(&mut self) -> Option<Self::Item> {
self.iter_impl.next_back(&self.persy_impl)
}
}
impl<K, V> Drop for IndexIter<K, V>
where
K: IndexType,
V: IndexType,
{
fn drop(&mut self) {
self.iter_impl.release(&self.persy_impl).unwrap()
}
}
/// Index Iterator implementation for iterating on a range of keys
pub struct TxIndexIter<'a, K: IndexType, V: IndexType> {
iter_impl: TxIndexRawIter<K, V>,
tx: &'a mut Transaction,
}
impl<'a, K: IndexType, V: IndexType> TxIndexIter<'a, K, V> {
fn new(iter_impl: TxIndexRawIter<K, V>, tx: &'a mut Transaction) -> TxIndexIter<'a, K, V> {
TxIndexIter { iter_impl, tx }
}
/// get the next element in the iterator giving the access on the transaction owned by the
/// iterator
pub fn next_tx(&mut self) -> Option<(K, Value<V>, &mut Transaction)> {
if let Some((k, v)) = self.iter_impl.next(&self.tx.persy_impl, self.tx.tx.as_mut().unwrap()) {
Some((k, v, self.tx))
} else {
None
}
}
/// Direct access to the transaction owned by the iterator
pub fn tx(&mut self) -> &mut Transaction {
self.tx
}
}
impl<'a, K, V> Iterator for TxIndexIter<'a, K, V>
where
K: IndexType,
V: IndexType,
{
type Item = (K, Value<V>);
fn next(&mut self) -> Option<Self::Item> {
self.iter_impl.next(&self.tx.persy_impl, self.tx.tx.as_mut().unwrap())
}
}
impl<'a, K, V> DoubleEndedIterator for TxIndexIter<'a, K, V>
where
K: IndexType,
V: IndexType,
{
fn next_back(&mut self) -> Option<Self::Item> {
self.iter_impl
.next_back(&self.tx.persy_impl, self.tx.tx.as_mut().unwrap())
}
}
/// Iterator implementation to scan a segment
pub struct SegmentIter {
iter_impl: SegmentRawIter,
persy_impl: Arc<PersyImpl>,
}
impl SegmentIter {
fn new(iter_impl: SegmentRawIter, persy_impl: Arc<PersyImpl>) -> SegmentIter {
SegmentIter { iter_impl, persy_impl }
}
}
impl Iterator for SegmentIter {
type Item = (PersyId, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
self.iter_impl.next(&self.persy_impl)
}
}
/// Iterator implementation to scan a segment considering in transaction changes.
pub struct TxSegmentIter<'a> {
iter_impl: TxSegmentRawIter,
tx: &'a mut Transaction,
}
impl<'a> TxSegmentIter<'a> {
fn new(iter_impl: TxSegmentRawIter, tx: &'a mut Transaction) -> TxSegmentIter<'a> {
TxSegmentIter { iter_impl, tx }
}
/// get the next element in the iterator giving the access on the transaction owned by the
/// iterator
pub fn next_tx(&mut self) -> Option<(PersyId, Vec<u8>, &mut Transaction)> {
if let Some((id, rec, _)) = self.iter_impl.next(&self.tx.persy_impl, &self.tx.tx.as_mut().unwrap()) {
Some((id, rec, self.tx))
} else {
None
}
}
/// Direct access to the transaction owned by the iterator
pub fn tx(&mut self) -> &mut Transaction {
self.tx
}
}
impl<'a> Iterator for TxSegmentIter<'a> {
type Item = (PersyId, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
self.iter_impl
.next(&self.tx.persy_impl, &self.tx.tx.as_mut().unwrap())
.map(|(id, content, _)| (id, content))
}
}
impl Persy {
/// Create a new database file.
///
/// # Errors
///
/// Fails if the file already exists.
///
pub fn create<P: AsRef<Path>>(path: P) -> PRes<()> {
PersyImpl::create(path.as_ref())
}
/// Create a new database file.
///
/// # Errors
///
/// Fails if the file already exists.
///
pub fn create_from_file(file: File) -> PRes<()> {
PersyImpl::create_from_file(file)
}
/// Open a database file.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open<P: AsRef<Path>>(path: P, config: Config) -> PRes<Persy> {
Persy::open_with_recover(path, config, |_| true)
}
/// Open a database file from a path with a recover function.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open_with_recover<P: AsRef<Path>, C>(path: P, config: Config, recover: C) -> PRes<Persy>
where
C: Fn(&TransactionId) -> bool,
{
let f = fs::OpenOptions::new().write(true).read(true).open(path)?;
Persy::open_from_file_with_recover(f, config, recover)
}
/// Open a database file from a path and return a recover structure that allow to select the
/// transactions to commit and recover them.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn recover<P: AsRef<Path>>(path: P, config: Config) -> PRes<Recover> {
let f = fs::OpenOptions::new().write(true).read(true).open(path)?;
let (persy_impl, recover) = PersyImpl::open_recover(f, config)?;
Ok(Recover::new(recover, Arc::new(persy_impl)))
}
/// Open a database file from a direct file handle.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open_from_file(path: File, config: Config) -> PRes<Persy> {
let (persy_impl, recover) = PersyImpl::open_recover(path, config)?;
persy_impl.final_recover(recover)?;
Ok(Persy {
persy_impl: Arc::new(persy_impl),
})
}
/// Open a database file, from a direct file handle and a transaction recover function.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open_from_file_with_recover<C>(path: File, config: Config, recover: C) -> PRes<Persy>
where
C: Fn(&TransactionId) -> bool,
{
let (persy_impl, mut recov) = PersyImpl::open_recover(path, config)?;
recov.apply(recover)?;
persy_impl.final_recover(recov)?;
Ok(Persy {
persy_impl: Arc::new(persy_impl),
})
}
/// Open an existing database or create it if it does not exist yet,
/// calling the `prepare` function just after the creation.
///
/// # Example
///
/// ```rust
/// use std::path::Path;
/// use persy::{Persy, Config, PersyId, ValueMode};
/// # use persy::PRes;
///
/// # fn main() -> PRes<()> {
/// let path = Path::new("target/open_or_create.db");
/// let config = Config::new();
///
/// let persy = Persy::open_or_create_with(path, config, |persy| {
/// // this closure is only called on database creation
/// let mut tx = persy.begin()?;
/// tx.create_segment("data")?;
/// tx.create_index::<u64, PersyId>("index", ValueMode::REPLACE)?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// println!("Segment and Index successfully created");
/// Ok(())
/// })?;
/// # std::fs::remove_file("target/open_or_create.db")?;
/// # Ok(())
/// # }
/// ```
pub fn open_or_create_with<P, F>(path: P, config: Config, prepare: F) -> PRes<Persy>
where
P: AsRef<Path>,
F: FnOnce(&Persy) -> PRes<()>,
{
let path = path.as_ref();
let persy;
if !path.exists() {
Persy::create(path)?;
persy = Persy::open(path, config)?;
prepare(&persy)?;
} else {
persy = Persy::open(path, config)?;
}
Ok(persy)
}
/// Begin a new transaction.
///
/// The transaction isolation level is 'read_commited'.
/// for commit call [`prepare_commit`] and [`commit`]
///
/// [`prepare_commit`]:struct.Persy.html#method.prepare_commit
/// [`commit`]:struct.Persy.html#method.commit
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// // ...
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn begin(&self) -> PRes<Transaction> {
Ok(Transaction {
tx: Some(self.persy_impl.begin()?),
persy_impl: self.persy_impl.clone(),
})
}
/// Begin a new transaction specifying and id usable for crash recover.
///
/// The transaction isolation level is 'read_commited'.
/// for commit call [`prepare_commit`] and [`commit`]
///
/// [`prepare_commit`]:struct.Persy.html#method.prepare_commit
/// [`commit`]:struct.Persy.html#method.commit
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let tx_id = vec![2;2];
/// let mut tx = persy.begin_id(tx_id)?;
/// // ...
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn begin_id(&self, meta_id: TransactionId) -> PRes<Transaction> {
Ok(Transaction {
tx: Some(self.persy_impl.begin_id(meta_id)?),
persy_impl: self.persy_impl.clone(),
})
}
/// Check if a segment already exist in the storage
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("my_new_segment")?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// assert!(persy.exists_segment("my_new_segment")?);
/// # Ok(())
/// # }
/// ```
pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
self.persy_impl.exists_segment(segment)
}
/// Resolves the segment to a SegmentId
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("my_new_segment")?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// let segment_id = persy.solve_segment_id("my_new_segment")?;
/// # Ok(())
/// # }
/// ```
pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
//TODO: find a better name and make this public again
self.persy_impl.solve_segment_id(segment)
}
/// Resolves the index to a IndexId
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// let segment_id = persy.solve_index_id("my_new_index")?;
/// # Ok(())
/// # }
/// ```
pub fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
//TODO: find a better name and make this public again
self.persy_impl.solve_index_id(index)
}
/// Read the record content from persistent data.
///
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// let read = persy.read_record("seg", &id)?.expect("record exits");
/// assert_eq!(data,read);
/// # Ok(())
/// # }
/// ```
pub fn read_record(&self, segment: impl ToSegmentId, id: &PersyId) -> PRes<Option<Vec<u8>>> {
let segment_id = self.solve_segment_id(segment)?;
self.persy_impl.read_record(segment_id, &id.0)
}
/// Scan a segment for persistent records
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// let mut count = 0;
/// for (id,content) in persy.scan("seg")? {
/// println!("record size:{}",content.len());
/// count+=1;
/// }
/// assert_eq!(count,1);
/// # Ok(())
/// # }
/// ```
pub fn scan(&self, segment: impl ToSegmentId) -> PRes<SegmentIter> {
let segment_id = self.solve_segment_id(segment)?;
Ok(SegmentIter::new(
self.persy_impl.scan(segment_id)?,
self.persy_impl.clone(),
))
}
/// Check if a segment already exist in the storage
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::REPLACE)?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// assert!(persy.exists_index("my_new_index")?);
/// # Ok(())
/// # }
/// ```
pub fn exists_index(&self, segment: &str) -> PRes<bool> {
self.persy_impl.exists_index(segment)
}
/// Get a value or a group of values from a key.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode, Value};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// # tx.put::<u8,u8>("my_new_index",10,10)?;
/// # let prepared = tx.prepare_commit()?;
/// # prepared.commit()?;
/// let val = persy.get::<u8,u8>("my_new_index",&10)?;
/// if let Some(is_there) = val {
/// // A value is actually there
/// match is_there {
/// Value::SINGLE(actual_value) => {
/// },
/// Value::CLUSTER(actual_value) => {
/// },
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub fn get<K, V>(&self, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
let index_id = self.solve_index_id(index_name)?;
self.persy_impl.get::<K, V>(index_id, k)
}
/// Browse a range of keys and values from and index.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode, Value,IndexIter};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// # tx.put::<u8,u8>("my_new_index",10,10)?;
/// # let prepared = tx.prepare_commit()?;
/// # prepared.commit()?;
/// let iter:IndexIter<u8,u8> = persy.range("my_new_index",10..12)?;
/// for (k,val) in iter {
/// // A value is actually there
/// match val {
/// Value::SINGLE(actual_value) => {
/// },
/// Value::CLUSTER(actual_value) => {
/// },
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub fn range<K, V, R>(&self, index_name: &str, range: R) -> PRes<IndexIter<K, V>>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let index_id = self.solve_index_id(index_name)?;
let (_, raw) = self.persy_impl.range(index_id, range)?;
Ok(IndexIter::new(raw, self.persy_impl.clone()))
}
/// List all the existing segments.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("seg")?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// let segments = persy.list_segments()?;
/// let names = segments.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"seg".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
Ok(self
.persy_impl
.list_segments()?
.into_iter()
.map(|(name, id)| (name, SegmentId::new(id)))
.collect())
}
/// List all the existing indexes.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes, ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("index", ValueMode::CLUSTER)?;
/// let prepared = tx.prepare_commit()?;
/// prepared.commit()?;
/// let indexes = persy.list_indexes()?;
/// let names = indexes.into_iter().map(|(name,_info)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"seg".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
Ok(self.persy_impl.list_indexes()?)
}
/// Create a read snapshot at the current data status.
///
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// let persy = Persy::open("./open.persy",Config::new())?;
/// // ... More logic
/// let snapshot = persy.snapshot()?;
/// // .. Access data from the snapshot
/// # Ok(())
/// # }
/// ```
pub fn snapshot(&self) -> PRes<Snapshot> {
Ok(Snapshot::new(self.persy_impl.clone(), self.persy_impl.read_snapshot()?))
}
}
fn tx_mut(tx: &mut Option<transaction::Transaction>) -> &mut transaction::Transaction {
tx.as_mut().unwrap()
}
impl Transaction {
fn tx_mut(&mut self) -> &mut transaction::Transaction {
tx_mut(&mut self.tx)
}
fn tx(&self) -> &transaction::Transaction {
self.tx.as_ref().unwrap()
}
/// Create a new segment with the provided name
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("my_new_segment")?;
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn create_segment(&mut self, segment: &str) -> PRes<SegmentId> {
assert!(!segment.starts_with(index::config::INDEX_META_PREFIX));
assert!(!segment.starts_with(index::config::INDEX_DATA_PREFIX));
self.persy_impl.create_segment(tx_mut(&mut self.tx), segment)
}
/// Drop a existing segment
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.drop_segment("existing_segment_name")?;
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn drop_segment(&mut self, segment: &str) -> PRes<()> {
self.persy_impl.drop_segment(tx_mut(&mut self.tx), segment)
}
/// Check if a segment already exist in the storage considering the transaction
///
///
/// # Example
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("my_new_segment")?;
/// assert!(tx.exists_segment("my_new_segment")?);
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
self.persy_impl.exists_segment_tx(self.tx(), segment)
}
/// Resolves the segment to a SegmentId, considering the transaction
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("my_new_segment")?;
/// let segment_id = tx.solve_segment_id("my_new_segment")?;
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
self.persy_impl.solve_segment_id_tx(self.tx(), segment)
}
/// Resolves the index name to a IndexId, considering the transaction
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes, ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// let segment_id = tx.solve_index_id("my_new_index")?;
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
let (id, _) = self.persy_impl.solve_index_id_tx(self.tx(), index)?;
Ok(id)
}
/// Create a new record.
///
/// This function return an id that can be used by [`read_record`] and [`read_record_tx`],
/// the record content can be read only with the [`read_record_tx`] till the transaction is committed.
///
/// [`read_record_tx`]:struct.Persy.html#method.read_record_tx
/// [`read_record`]:struct.Persy.html#method.read_record
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// tx.insert_record("seg", &data)?;
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn insert_record(&mut self, segment: impl ToSegmentId, rec: &[u8]) -> PRes<PersyId> {
Ok(PersyId(self.persy_impl.insert_record(
tx_mut(&mut self.tx),
segment,
rec,
)?))
}
/// Read the record content considering eventual in transaction changes.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// let read = tx.read_record("seg", &id)?.expect("record exists");
/// assert_eq!(data,read);
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn read_record(&mut self, segment: impl ToSegmentId, id: &PersyId) -> PRes<Option<Vec<u8>>> {
let segment_id = self.solve_segment_id(segment)?;
self.persy_impl.read_record_tx(tx_mut(&mut self.tx), segment_id, &id.0)
}
/// Scan for persistent and in transaction records
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// let mut count = 0;
/// for (id,content) in tx.scan("seg")? {
/// println!("record size:{}",content.len());
/// count+=1;
/// }
/// assert_eq!(count,1);
/// # Ok(())
/// # }
/// ```
pub fn scan<'a>(&'a mut self, segment: impl ToSegmentId) -> PRes<TxSegmentIter<'a>> {
let segment_id = self.solve_segment_id(segment)?;
Ok(TxSegmentIter::new(
self.persy_impl.scan_tx(&self.tx.as_mut().unwrap(), segment_id)?,
self,
))
}
/// Update the record content.
///
/// This updated content can be read only with the [`read_record_tx`] till the transaction is committed.
///
/// [`read_record_tx`]:struct.Persy.html#method.read_record_tx
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// let new_data = vec![2;20];
/// tx.update_record("seg", &id, &new_data)?;
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn update_record(&mut self, segment: impl ToSegmentId, id: &PersyId, rec: &[u8]) -> PRes<()> {
let segment_id = self.solve_segment_id(segment)?;
self.persy_impl
.update_record(tx_mut(&mut self.tx), segment_id, &id.0, rec)
}
/// Delete a record.
///
/// The record will result deleted only reading it whit [`read_record_tx`] till the transaction is committed.
///
/// [`read_record_tx`]:struct.Persy.html#method.read_record_tx
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// tx.delete_record("seg", &id)?;
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn delete_record(&mut self, segment: impl ToSegmentId, id: &PersyId) -> PRes<()> {
let segment_id = self.solve_segment_id(segment)?;
self.persy_impl.delete_record(tx_mut(&mut self.tx), segment_id, &id.0)
}
/// Create a new index with the name and the value management mode.
///
/// The create operation require two template arguments that are the types as keys and
/// values of the index this have to match the following operation on the indexes.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn create_index<K, V>(&mut self, index_name: &str, value_mode: ValueMode) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
self.persy_impl
.create_index::<K, V>(tx_mut(&mut self.tx), index_name, value_mode)
}
/// Drop an existing index.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.drop_index("my_new_index")?;
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn drop_index(&mut self, index_name: &str) -> PRes<()> {
self.persy_impl.drop_index(tx_mut(&mut self.tx), index_name)
}
/// Check if a segment already exist in the storage considering the transaction
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::REPLACE)?;
/// assert!(tx.exists_index("my_new_index")?);
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn exists_index(&self, segment: &str) -> PRes<bool> {
self.persy_impl.exists_index_tx(self.tx(), segment)
}
/// Put a key value in an index following the value mode strategy.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn put<K, V>(&mut self, index_name: &str, k: K, v: V) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
let index_id = self.solve_index_id(index_name)?;
self.persy_impl.put::<K, V>(tx_mut(&mut self.tx), index_id, k, v)
}
/// Remove a key and optionally a specific value from an index following the value mode strategy.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// tx.remove::<u8,u8>("my_new_index",10,Some(10))?;
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn remove<K, V>(&mut self, index_name: &str, k: K, v: Option<V>) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
let index_id = self.solve_index_id(index_name)?;
self.persy_impl.remove::<K, V>(tx_mut(&mut self.tx), index_id, k, v)
}
/// Get a value or a group of values from a key considering changes in transaction.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode, Value};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// let val = tx.get::<u8,u8>("my_new_index",&10)?;
/// if let Some(is_there) = val {
/// // A value is actually there
/// match is_there {
/// Value::SINGLE(actual_value) => {
/// },
/// Value::CLUSTER(actual_value) => {
/// },
/// }
/// }
/// # tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn get<K, V>(&mut self, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
let index_id = self.solve_index_id(index_name)?;
self.persy_impl.get_tx::<K, V>(tx_mut(&mut self.tx), index_id, k)
}
/// Browse a range of keys and values from an index including the transaction changes
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode, Value,TxIndexIter};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// let persy = Persy::open("./data.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// {
/// let iter:TxIndexIter<u8,u8> = tx.range("my_new_index",10..12)?;
/// for (k,val) in iter {
/// // A value is actually there
/// match val {
/// Value::SINGLE(actual_value) => {
/// },
/// Value::CLUSTER(actual_value) => {
/// },
/// }
/// }
/// }
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn range<'a, K, V, R>(&'a mut self, index_name: &str, range: R) -> PRes<TxIndexIter<'a, K, V>>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let index_id = self.solve_index_id(index_name)?;
let (vm, raw) = self
.persy_impl
.range(index_id.clone(), (range.start_bound(), range.end_bound()))?;
let tx_iter = self.tx_mut().index_range::<K, V, R>(index_id.clone(), range);
let tx_raw = TxIndexRawIter::new(index_id, tx_iter, Some(raw), vm);
Ok(TxIndexIter::new(tx_raw, self))
}
/// Rollback a not yet prepared transaction.
///
/// All the resources used for eventual insert or update are released.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// tx.insert_record("seg", &data)?;
/// tx.rollback()?;
/// # Ok(())
/// # }
/// ```
pub fn rollback(mut self) -> PRes<()> {
if let Some(real_tx) = replace(&mut self.tx, None) {
self.persy_impl.rollback(real_tx)?;
}
Ok(())
}
/// Prepare to commit a transaction, it will lock all the records involved in the transaction
/// till a [`commit`] or [`rollback_prepared`] is called.
///
/// [`commit`]:struct.Persy.html#method.commit
/// [`rollback_prepared`]:struct.Persy.html#method.rollback_prepared
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// //Do what ever operations on the records
/// let data = vec![1;20];
/// tx.insert_record("seg", &data)?;
/// tx.prepare_commit()?;
/// # Ok(())
/// # }
/// ```
pub fn prepare_commit(mut self) -> PRes<TransactionFinalize> {
let real_tx = replace(&mut self.tx, None).unwrap();
Ok(TransactionFinalize {
persy_impl: self.persy_impl.clone(),
finalize: Some(self.persy_impl.prepare_commit(real_tx)?),
})
}
/// List all the existing segments, considering all the changes in transaction.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("seg")?;
/// let segments = tx.list_segments()?;
/// let names = segments.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"seg".to_string()));
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
Ok(self
.persy_impl
.list_segments_tx(self.tx())?
.into_iter()
.map(|(name, id)| (name, SegmentId::new(id)))
.collect())
}
/// List all the existing indexes, considering changes in the transaction.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8, u8>("idx", ValueMode::REPLACE)?;
/// let indexes = tx.list_indexes()?;
/// let names = indexes.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"idx".to_string()));
/// tx.prepare_commit()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
Ok(self.persy_impl.list_indexes_tx(self.tx())?)
}
}
impl Drop for Transaction {
fn drop(&mut self) {
if let Some(tx) = replace(&mut self.tx, None) {
self.persy_impl.rollback(tx).unwrap();
}
}
}
/// Iterator implementation to scan a segment considering in transaction changes.
pub struct SnapshotSegmentIter {
iter_impl: SegmentSnapshotRawIter,
persy_impl: Arc<PersyImpl>,
}
impl<'a> SnapshotSegmentIter {
fn new(iter_impl: SegmentSnapshotRawIter, persy_impl: Arc<PersyImpl>) -> SnapshotSegmentIter {
SnapshotSegmentIter { iter_impl, persy_impl }
}
}
impl<'a> Iterator for SnapshotSegmentIter {
type Item = (PersyId, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
self.iter_impl.next(&self.persy_impl)
}
}
/// Read snapshot at a specific point in time.
///
/// All the changes from transactions committed at the specific point in time were the snapshot was
/// create are readable from this snapshot, all subsequent transactions are ignored.
///
/// Copy of the data old data is kept on the disc, with indexing access from in memory structures,
/// on drop of the Snapshot, if there are no older snapshot all the data old by this snapshot not
/// existing anymore in the final state will be cleaned up.
///
#[derive(Clone)]
pub struct Snapshot {
snap: Arc<SnapshotInt>,
}
struct SnapshotInt {
persy_impl: Arc<PersyImpl>,
snapshot_id: SnapshotId,
}
impl Snapshot {
pub(crate) fn new(persy_impl: Arc<PersyImpl>, snapshot_id: SnapshotId) -> Snapshot {
Snapshot {
snap: Arc::new(SnapshotInt {
persy_impl,
snapshot_id,
}),
}
}
fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
self.snap
.persy_impl
.solve_segment_id_snapshot(self.snap.snapshot_id, segment)
}
fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
self.snap
.persy_impl
.solve_index_id_snapshot(self.snap.snapshot_id, index)
}
/// Read the record content at the point of time the snapshot was taken ignoring all following
/// committed transactions
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// tx.prepare_commit()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let read = snapshot.read_record("seg", &id)?.expect("record exists");
/// assert_eq!(data,read);
/// # Ok(())
/// # }
/// ```
pub fn read_record(&self, segment: impl ToSegmentId, id: &PersyId) -> PRes<Option<Vec<u8>>> {
let segment_id = self.solve_segment_id(segment)?;
self.snap
.persy_impl
.read_record_snapshot(segment_id, &id.0, self.snap.snapshot_id)
}
/// Scan for records existing at the moment of snapshot creation, ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert_record("seg", &data)?;
/// tx.prepare_commit()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let mut count = 0;
/// for (id,content) in snapshot.scan("seg")? {
/// println!("record size:{}",content.len());
/// count+=1;
/// }
/// assert_eq!(count,1);
/// # Ok(())
/// # }
/// ```
pub fn scan(&self, segment: impl ToSegmentId) -> PRes<SnapshotSegmentIter> {
let segment_id = self.solve_segment_id(segment)?;
Ok(SnapshotSegmentIter::new(
self.snap.persy_impl.scan_snapshot(segment_id, self.snap.snapshot_id)?,
self.snap.persy_impl.clone(),
))
}
/// Get a value or a group of values from a key at the point the snapshot was taken ignoring
/// all following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode, Value};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// # let persy = Persy::open("./data.persy",Config::new())?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// tx.prepare_commit()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let val = snapshot.get::<u8,u8>("my_new_index",&10)?;
/// if let Some(is_there) = val {
/// // A value is actually there
/// match is_there {
/// Value::SINGLE(actual_value) => {
/// },
/// Value::CLUSTER(actual_value) => {
/// },
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub fn get<K, V>(&self, index_name: &str, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
let index_id = self.solve_index_id(index_name)?;
self.snap
.persy_impl
.get_snapshot::<K, V>(index_id, self.snap.snapshot_id, k)
}
/// Browse a range of keys and values from an index at the pointing that the snapshot was created ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode, Value, IndexIter};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./data.persy")?;
/// let persy = Persy::open("./data.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::CLUSTER)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// tx.prepare_commit()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let iter:IndexIter<u8,u8> = snapshot.range("my_new_index",10..12)?;
/// for (k,val) in iter {
/// // A value is actually there
/// match val {
/// Value::SINGLE(actual_value) => {
/// },
/// Value::CLUSTER(actual_value) => {
/// },
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub fn range<K, V, R>(&self, index_name: &str, range: R) -> PRes<IndexIter<K, V>>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let index_id = self.solve_index_id(index_name)?;
let (_, raw) = self
.snap
.persy_impl
.range_snapshot(index_id, self.snap.snapshot_id, range, false)?;
Ok(IndexIter::new(raw, self.snap.persy_impl.clone()))
}
/// List all the existing segments, at the pointing that the snapshot was created ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("seg")?;
/// tx.prepare_commit()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let segments = snapshot.list_segments()?;
/// let names = segments.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"seg".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
Ok(self.snap.persy_impl.list_segments_snapshot(self.snap.snapshot_id)?)
}
/// List all the existing indexes, at the pointing that the snapshot was created ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config,PRes,ValueMode};
/// # fn foo() -> PRes<()> {
/// # Persy::create("./open.persy")?;
/// # let persy = Persy::open("./open.persy",Config::new())?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8, u8>("idx", ValueMode::REPLACE)?;
/// tx.prepare_commit()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let indexes = snapshot.list_indexes()?;
/// let names = indexes.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"idx".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
Ok(self.snap.persy_impl.list_indexes_snapshot(self.snap.snapshot_id)?)
}
}
impl Drop for SnapshotInt {
fn drop(&mut self) {
self.persy_impl.release_snapshot(self.snapshot_id).unwrap();
}
}
#[cfg(test)]
mod tests {
use super::{Config, Persy};
use std::fs;
#[test]
pub fn test_recover_prepared_tx() {
Persy::create("./target/test_recover_prepared.persy").unwrap();
let id;
let val;
{
let persy = Persy::open("./target/test_recover_prepared.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare_commit().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
val = String::from("aaa").into_bytes();
id = tx.insert_record("def", &val).expect("error on insert value");
let mut prepared = tx.prepare_commit().expect("error on commit prepare");
std::mem::replace(&mut prepared.finalize, None);
}
{
let persy = Persy::open("./target/test_recover_prepared.persy", Config::new()).unwrap();
assert_eq!(persy.read_record("def", &id).expect("error reading record"), Some(val));
}
fs::remove_file("./target/test_recover_prepared.persy").unwrap();
}
#[test]
pub fn test_dobule_recover_prepared_tx() {
Persy::create("./target/test_double_recover_prepared.persy").unwrap();
let id;
let id1;
let val;
let val1;
{
let persy = Persy::open("./target/test_double_recover_prepared.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare_commit().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
val = String::from("aaa").into_bytes();
id = tx.insert_record("def", &val).expect("error on insert value");
let mut prepared = tx.prepare_commit().expect("error on commit prepare");
std::mem::replace(&mut prepared.finalize, None);
}
{
let persy = Persy::open("./target/test_double_recover_prepared.persy", Config::new()).unwrap();
assert_eq!(
persy.read_record("def", &id).expect("error reading record"),
Some(val.clone())
);
let mut tx = persy.begin().expect("error on transaction begin");
val1 = String::from("bbbb").into_bytes();
id1 = tx.insert_record("def", &val1).expect("error on insert value");
let mut prepared = tx.prepare_commit().expect("error on commit prepare");
std::mem::replace(&mut prepared.finalize, None);
}
{
let persy = Persy::open("./target/test_double_recover_prepared.persy", Config::new()).unwrap();
assert_eq!(persy.read_record("def", &id).expect("error reading record"), Some(val));
assert_eq!(
persy.read_record("def", &id1).expect("error reading record",),
Some(val1)
);
}
fs::remove_file("./target/test_double_recover_prepared.persy").unwrap();
}
#[test]
pub fn test_recover_tx_id() {
Persy::create("./target/test_recover_tx_id.persy").unwrap();
let id;
let id_pers;
let id_pers_update;
let val;
let val_1;
let tx_id = vec![10; 5];
{
val = String::from("aaa").into_bytes();
let persy = Persy::open("./target/test_recover_tx_id.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
tx.create_segment("def").expect("error on segment creation");
id_pers = tx.insert_record("def", &val).expect("error on insert value");
id_pers_update = tx.insert_record("def", &val).expect("error on insert value");
let fin = tx.prepare_commit().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin_id(tx_id.clone()).expect("error on transaction begin");
id = tx.insert_record("def", &val).expect("error on insert value");
tx.delete_record("def", &id_pers).expect("delete record works");
val_1 = String::from("bbb").into_bytes();
tx.update_record("def", &id_pers_update, &val_1)
.expect("delete record works");
let mut prepared = tx.prepare_commit().expect("error on commit prepare");
std::mem::replace(&mut prepared.finalize, None);
}
{
let persy = Persy::open_with_recover("./target/test_recover_tx_id.persy", Config::new(), |t_id| {
assert_eq!(&tx_id, t_id);
true
})
.unwrap();
assert_eq!(persy.read_record("def", &id).expect("error reading record"), Some(val));
assert_eq!(persy.read_record("def", &id_pers).expect("error reading record"), None);
assert_eq!(
persy.read_record("def", &id_pers_update).expect("error reading record"),
Some(val_1)
);
}
fs::remove_file("./target/test_recover_tx_id.persy").unwrap();
}
#[test]
pub fn test_recover_tx_choice() {
Persy::create("./target/test_recover_tx_choice.persy").unwrap();
let id;
let id_1;
let id_pers;
let id_pers_update;
let val;
let val_1;
let tx_id = vec![10; 5];
let tx_id_1 = vec![10; 10];
{
val = String::from("aaa").into_bytes();
let persy = Persy::open("./target/test_recover_tx_choice.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
tx.create_segment("def").expect("error on segment creation");
id_pers = tx.insert_record("def", &val).expect("error on insert value");
id_pers_update = tx.insert_record("def", &val).expect("error on insert value");
let fin = tx.prepare_commit().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin_id(tx_id.clone()).expect("error on transaction begin");
id = tx.insert_record("def", &val).expect("error on insert value");
let mut prepared = tx.prepare_commit().expect("error on commit prepare");
std::mem::replace(&mut prepared.finalize, None);
let mut tx = persy.begin_id(tx_id_1.clone()).expect("error on transaction begin");
id_1 = tx.insert_record("def", &val).expect("error on insert value");
tx.delete_record("def", &id_pers).expect("delete record works");
val_1 = String::from("bbb").into_bytes();
tx.update_record("def", &id_pers_update, &val_1)
.expect("delete record works");
let mut prepared = tx.prepare_commit().expect("error on commit prepare");
std::mem::replace(&mut prepared.finalize, None);
}
{
let mut recover = Persy::recover("./target/test_recover_tx_choice.persy", Config::new()).unwrap();
assert!(recover.list_transactions().expect("list successfully").len() >= 2);
recover.rollback(tx_id_1).expect("marked rollback correctly");
recover.commit(tx_id).expect("marked commit correctly");
let persy = recover.finalize().expect("recover correctly");
assert_eq!(
persy.read_record("def", &id).expect("error reading record"),
Some(val.clone())
);
assert_eq!(persy.read_record("def", &id_1).expect("error reading record"), None);
assert_eq!(
persy.read_record("def", &id_pers).expect("error reading record"),
Some(val.clone())
);
assert_eq!(
persy.read_record("def", &id_pers_update).expect("error reading record"),
Some(val)
);
}
fs::remove_file("./target/test_recover_tx_choice.persy").unwrap();
}
}