use fs2::FileExt;
pub use std::fs::OpenOptions;
use std::{
collections::HashMap,
fs::File,
mem::replace,
ops::{Bound, RangeBounds},
path::Path,
str,
sync::Arc,
};
pub use crate::id::RecRef;
use crate::index::{
config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX},
keeper::{IndexKeeper, IndexRawIter},
tree::{
nodes::{PageIter, PageIterBack, Value},
Index,
},
};
use crate::transaction::{
PreparedState, SyncMode, Transaction, TxRead,
TxSegCheck::{CREATED, DROPPED, NONE},
};
use crate::{
address::Address,
allocator::Allocator,
config::{Config, TransactionConfig},
discref::{Device, DiscRef, MemRef, PageOps, PAGE_METADATA_SIZE},
error::{PRes, PersyError},
id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
io::{
InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
InfallibleWriteVarInt,
},
journal::{Journal, JournalId, JOURNAL_PAGE_EXP},
record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
snapshot::{release_snapshot, EntryCase, SegmentSnapshop, SnapshotId, Snapshots},
};
#[cfg(feature = "background_ops")]
use crate::background::BackgroundOps;
const DEFAULT_PAGE_EXP: u8 = 10;
pub struct PersyImpl {
config: Arc<Config>,
journal: Arc<Journal>,
address: Address,
indexes: Indexes,
allocator: Arc<Allocator>,
snapshots: Arc<Snapshots>,
#[cfg(feature = "background_ops")]
background_ops: BackgroundOps<SnapshotId>,
}
pub struct TxFinalize {
transaction: Transaction,
prepared: PreparedState,
}
#[derive(PartialEq, Debug, Clone)]
pub enum RecoverStatus {
Started,
PrepareCommit,
Rollback,
Commit,
Cleanup,
}
#[derive(PartialEq, Debug)]
pub enum CommitStatus {
Rollback,
Commit,
}
pub struct RecoverImpl {
tx_id: HashMap<Vec<u8>, JournalId>,
transactions: HashMap<JournalId, (RecoverStatus, Transaction, Option<CommitStatus>)>,
order: Vec<JournalId>,
journal_pages: Vec<u64>,
}
impl RecoverImpl {
pub fn apply<C>(&mut self, recover: C) -> PRes<()>
where
C: Fn(&Vec<u8>) -> bool,
{
for (id, status) in self.list_transactions()? {
if status == RecoverStatus::PrepareCommit {
if recover(&id) {
self.commit(id)?;
} else {
self.rollback(id)?;
}
}
}
Ok(())
}
pub fn list_transactions(&self) -> PRes<Vec<(Vec<u8>, RecoverStatus)>> {
let mut res = Vec::new();
for id in &self.order {
if let Some((id, status)) = self
.transactions
.get(id)
.map(|(s, tx, _)| (tx.meta_id().clone(), s.clone()))
{
res.push((id, status));
}
}
Ok(res)
}
pub fn status(&self, tx_id: Vec<u8>) -> PRes<Option<RecoverStatus>> {
if let Some(id) = self.tx_id.get(&tx_id) {
Ok(self.transactions.get(id).map(|(s, _, _)| s.clone()))
} else {
Ok(None)
}
}
pub fn commit(&mut self, tx_id: Vec<u8>) -> PRes<()> {
if let Some(id) = self.tx_id.get(&tx_id) {
if let Some(tx) = self.transactions.get_mut(id) {
tx.2 = Some(CommitStatus::Commit);
}
}
Ok(())
}
pub fn rollback(&mut self, tx_id: Vec<u8>) -> PRes<()> {
if let Some(id) = self.tx_id.get(&tx_id) {
if let Some(tx) = self.transactions.get_mut(id) {
tx.2 = Some(CommitStatus::Rollback);
}
}
Ok(())
}
}
#[derive(Clone)]
pub struct IndexInfo {
pub id: IndexId,
pub value_mode: ValueMode,
pub key_type: IndexTypeId,
pub value_type: IndexTypeId,
}
#[cfg(feature = "background_ops")]
fn create_background_ops(
journal: Arc<Journal>,
allocator: Arc<Allocator>,
snapshots: Arc<Snapshots>,
) -> PRes<BackgroundOps<SnapshotId>> {
let all_sync = allocator.clone();
BackgroundOps::new(
move || all_sync.disc().sync(),
move |all: &[SnapshotId]| {
for snap in all {
release_snapshot(*snap, &snapshots, &allocator, &journal)?;
}
Ok(())
},
)
}
impl PersyImpl {
pub fn create(path: &Path) -> PRes<()> {
let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
PersyImpl::create_from_file(f)
}
pub fn create_from_file(f: File) -> PRes<()> {
f.try_lock_exclusive()?;
PersyImpl::init_file(f)?;
Ok(())
}
fn init_file(fl: File) -> PRes<()> {
PersyImpl::init(Box::new(DiscRef::new(fl)?))?;
Ok(())
}
fn init(device: Box<dyn Device>) -> PRes<Box<dyn Device>> {
let root_page = device.create_page_raw(DEFAULT_PAGE_EXP)?;
let (allocator_page, allocator) = Allocator::init(device, &Config::new())?;
let address_page = Address::init(&allocator)?;
let journal_page = Journal::init(&allocator)?;
{
let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
root.write_u16(0);
root.write_u64(address_page);
root.write_u64(journal_page);
root.write_u64(allocator_page);
allocator.flush_page(root)?;
}
allocator.disc().sync()?;
Ok(allocator.release())
}
fn new(disc: Box<dyn Device>, config: Config) -> PRes<PersyImpl> {
let address_page;
let journal_page;
let allocator_page;
{
let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
pg.read_u16();
address_page = pg.read_u64();
journal_page = pg.read_u64();
allocator_page = pg.read_u64();
}
let config = Arc::new(config);
let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
let address = Address::new(&allocator, &config, address_page)?;
let journal = Arc::new(Journal::new(&allocator, journal_page)?);
let indexes = Indexes::new(&config);
let snapshots = Arc::new(Snapshots::new());
#[cfg(feature = "background_ops")]
let background_ops = create_background_ops(journal.clone(), allocator.clone(), snapshots.clone())?;
Ok(PersyImpl {
config,
journal,
address,
indexes,
allocator,
snapshots,
#[cfg(feature = "background_ops")]
background_ops,
})
}
fn recover(&self) -> PRes<RecoverImpl> {
let mut commit_order = Vec::new();
let mut transactions = HashMap::new();
let journal = &self.journal;
let jp = journal.recover(|record, id| {
let tx = transactions
.entry(id.clone())
.or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone()), None));
tx.0 = match record.recover(&mut tx.1) {
Err(_) => RecoverStatus::Rollback,
Ok(_) if tx.0 == RecoverStatus::Rollback => RecoverStatus::Rollback,
Ok(x) => match x {
RecoverStatus::Started => RecoverStatus::Started,
RecoverStatus::PrepareCommit => {
commit_order.push(id.clone());
RecoverStatus::PrepareCommit
}
RecoverStatus::Rollback => RecoverStatus::Rollback,
RecoverStatus::Commit => {
commit_order.push(id.clone());
RecoverStatus::Commit
}
RecoverStatus::Cleanup => RecoverStatus::Cleanup,
},
}
})?;
let mut transactions_id = HashMap::new();
for (id, (_, tx, _)) in &transactions {
transactions_id.insert(tx.meta_id().clone(), id.clone());
}
Ok(RecoverImpl {
tx_id: transactions_id,
transactions,
order: commit_order,
journal_pages: jp,
})
}
pub fn final_recover(&self, mut recover: RecoverImpl) -> PRes<()> {
let mut last_id = None;
let allocator = &self.allocator;
for id in recover.order {
if let Some((status, mut tx, choosed)) = recover.transactions.remove(&id) {
if status == RecoverStatus::PrepareCommit {
if choosed == Some(CommitStatus::Commit) || choosed.is_none() {
let prepared = tx.recover_prepare(self)?;
tx.recover_commit(self, prepared)?;
last_id = Some(id);
} else {
tx.recover_rollback(self)?;
}
} else if status == RecoverStatus::Commit {
tx.recover_cleanup(self)?;
}
}
}
for p in recover.journal_pages {
allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
}
for (_, (_, tx, _)) in recover.transactions.iter_mut() {
tx.recover_rollback(self)?;
}
if let Some(id) = last_id {
self.journal.finished_to_clean(&[id])?;
}
allocator.trim_free_at_end()?;
Ok(())
}
pub fn open_recover(f: File, config: Config) -> PRes<(PersyImpl, RecoverImpl)> {
f.try_lock_exclusive()?;
let persy = PersyImpl::new(Box::new(DiscRef::new(f)?), config)?;
let rec = persy.recover()?;
Ok((persy, rec))
}
pub fn memory(config: Config) -> PRes<PersyImpl> {
let device = PersyImpl::init(Box::new(MemRef::new()?))?;
PersyImpl::new(device, config)
}
pub fn begin_with(&self, mut config: TransactionConfig) -> PRes<Transaction> {
let journal = &self.journal;
let strategy = if let Some(st) = config.tx_strategy {
st
} else {
self.config.tx_strategy().clone()
};
let meta_id = if let Some(id) = replace(&mut config.transaction_id, None) {
id
} else {
Vec::new()
};
let sync_mode = if Some(true) == config.background_sync {
SyncMode::BackgroundSync
} else {
SyncMode::Sync
};
Ok(Transaction::new(journal, &strategy, sync_mode, meta_id)?)
}
pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<SegmentId> {
match tx.exists_segment(segment) {
DROPPED => {}
CREATED(_) => {
return Err(PersyError::SegmentAlreadyExists);
}
NONE => {
if self.address.exists_segment(&segment)? {
return Err(PersyError::SegmentAlreadyExists);
}
}
}
let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
Ok(segment_id)
}
pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
let (_, segment_id) = self.check_segment_tx(tx, segment)?;
tx.add_drop_segment(&self.journal, segment, segment_id)?;
Ok(())
}
pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
self.address.exists_segment(segment)
}
pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
match tx.exists_segment(segment) {
DROPPED => Ok(false),
CREATED(_) => Ok(true),
NONE => self.address.exists_segment(segment),
}
}
pub fn exists_index(&self, index: &str) -> PRes<bool> {
self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
}
pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
}
pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
segment.to_segment_id(&self.address)
}
pub fn solve_segment_id_tx(&self, tx: &Transaction, segment: impl ToSegmentId) -> PRes<SegmentId> {
let (sid, _) = segment.to_segment_id_tx(self, tx)?;
Ok(sid)
}
pub fn solve_segment_id_snapshot(&self, snapshot: SnapshotId, segment: impl ToSegmentId) -> PRes<SegmentId> {
segment.to_segment_id_snapshot(&self.snapshots, snapshot)
}
pub fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
index.to_index_id(&self.address)
}
pub fn solve_index_id_tx(&self, tx: &Transaction, index: impl ToIndexId) -> PRes<(IndexId, bool)> {
index.to_index_id_tx(self, tx)
}
pub fn solve_index_id_snapshot(&self, snapshot: SnapshotId, index: impl ToIndexId) -> PRes<IndexId> {
index.to_index_id_snapshot(&self.snapshots, snapshot)
}
pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, SegmentId)> {
match tx.exists_segment(segment) {
DROPPED => Err(PersyError::SegmentNotFound),
CREATED(segment_id) => Ok((true, segment_id)),
NONE => self
.address
.segment_id(segment)?
.map_or(Err(PersyError::SegmentNotFound), |id| Ok((false, id))),
}
}
pub fn write_record_metadata(len: u64, id: &RecRef) -> Vec<u8> {
let mut val = Vec::new();
val.write_u8(0);
val.write_varint_u64(len);
id.write(&mut val);
val
}
pub fn read_record_metadata(meta: &mut dyn InfallibleRead) -> (u64, RecRef) {
let _metadata_version = meta.read_u8();
let len = meta.read_varint_u64();
let id = RecRef::read(meta);
(len, id)
}
pub fn insert_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, rec: &[u8]) -> PRes<RecRef> {
let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
let len = rec.len() as u64;
let allocator = &self.allocator;
let address = &self.address;
let (rec_ref, maybe_new_page) = if in_tx {
address.allocate_temp(segment_id)
} else {
address.allocate(segment_id)
}?;
let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
let allocation_exp = exp_from_content_size(len + metadata.len() as u64);
let mut pg = allocator.allocate(allocation_exp)?;
let page = pg.get_index();
tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
if let Some(new_page) = maybe_new_page {
tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previus_page)?;
}
pg.write_all(&metadata);
pg.write_all(rec);
allocator.flush_page(pg)?;
Ok(rec_ref)
}
fn read_snapshot(&self) -> PRes<SnapshotId> {
self.snapshots.current_snapshot()
}
pub fn snapshot(&self) -> PRes<SnapshotId> {
let snapshot_id = self.snapshots.read_snapshot()?;
let segs = self
.address
.snapshot_list()?
.into_iter()
.map(|(name, id, first_page)| SegmentSnapshop::new(&name, id, first_page))
.collect::<Vec<_>>();
self.snapshots.fill_segments(snapshot_id, &segs)?;
Ok(snapshot_id)
}
pub fn release_snapshot(&self, snapshot_id: SnapshotId) -> PRes<()> {
release_snapshot(snapshot_id, &self.snapshots, &self.allocator, &self.journal)
}
fn read_ref_segment(
&self,
tx: &Transaction,
segment_id: SegmentId,
rec_ref: &RecRef,
) -> PRes<Option<(u64, u16, SegmentId)>> {
Ok(match tx.read(rec_ref) {
TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
TxRead::DELETED => None,
TxRead::NONE => self
.address
.read(rec_ref, segment_id)?
.map(|(pos, version)| (pos, version, segment_id)),
})
}
fn read_page(&self, match_id: &RecRef, page: u64) -> PRes<Option<Vec<u8>>> {
self.read_page_fn(match_id, page, |x| Vec::from(x))
}
fn read_page_fn<T>(&self, match_id: &RecRef, page: u64, f: fn(&[u8]) -> T) -> PRes<Option<T>> {
if let Some(mut pg) = self.allocator.load_page_not_free(page)? {
let (len, id) = PersyImpl::read_record_metadata(&mut pg);
if id.page == match_id.page && id.pos == match_id.pos {
Ok(Some(f(pg.slice(len as usize))))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
pub fn read_tx_internal_fn<T>(
&self,
tx: &Transaction,
segment_id: SegmentId,
id: &RecRef,
f: fn(&[u8]) -> T,
) -> PRes<Option<(T, u16)>> {
loop {
if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
if let Some(record) = self.read_page_fn(id, page, f)? {
break Ok(Some((record, version)));
}
} else {
break Ok(None);
}
}
}
pub fn read_tx_internal(
&self,
tx: &Transaction,
segment_id: SegmentId,
id: &RecRef,
) -> PRes<Option<(Vec<u8>, u16)>> {
self.read_tx_internal_fn(tx, segment_id, id, |x| Vec::from(x))
}
pub fn read_tx(&self, tx: &mut Transaction, segment: SegmentId, id: &RecRef) -> PRes<Option<Vec<u8>>> {
if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
tx.add_read(&self.journal, segment, id, version)?;
Ok(Some(rec))
} else {
Ok(None)
}
}
pub fn lock_record(
&self,
tx: &mut Transaction,
segment: impl ToSegmentId,
id: &RecRef,
version: u16,
) -> PRes<bool> {
let segment_id = segment.to_segment_id(&self.address)?;
tx.lock_record(&self.address, segment_id, id, version)
}
pub fn unlock_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, id: &RecRef) -> PRes<()> {
let segment_id = segment.to_segment_id(&self.address)?;
tx.unlock_record(&self.address, segment_id, id)
}
pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
loop {
if let Some((page, _)) = self.address.read(rec_ref, segment)? {
if let Some(record) = self.read_page(rec_ref, page)? {
break Ok(Some(record));
}
} else {
break Ok(None);
}
}
}
pub fn read_snap(&self, segment: SegmentId, rec_ref: &RecRef, snapshot: SnapshotId) -> PRes<Option<Vec<u8>>> {
self.read_snap_fn(segment, rec_ref, snapshot, |x| Vec::from(x))
}
pub fn read_snap_fn<T>(
&self,
segment: SegmentId,
rec_ref: &RecRef,
snapshot: SnapshotId,
f: fn(&[u8]) -> T,
) -> PRes<Option<T>> {
let segment_id = segment;
loop {
if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
match rec_vers.case {
EntryCase::Change(change) => {
if let Some(record) = self.read_page_fn(rec_ref, change.pos, f)? {
break Ok(Some(record));
}
}
EntryCase::Insert => {
break Ok(None);
}
}
} else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
if let Some(record) = self.read_page_fn(rec_ref, page, f)? {
break Ok(Some(record));
}
} else {
break Ok(None);
}
}
}
pub fn scan(&self, segment: SegmentId) -> PRes<SegmentRawIter> {
let read_snapshot = self.read_snapshot()?;
Ok(SegmentRawIter::new(segment, self.address.scan(segment)?, read_snapshot))
}
pub fn scan_snapshot_index(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
let res = if let Some(r) = self.snapshots.scan(snapshot, segment_id)? {
r
} else {
self.address.scan(segment_id)?
};
Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
}
pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
let res = self.snapshots.scan(snapshot, segment_id)?.unwrap();
Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
}
pub fn scan_tx<'a>(&'a self, tx: &'a Transaction, segment_id: SegmentId) -> PRes<TxSegmentRawIter> {
let read_snapshot = self.read_snapshot()?;
Ok(TxSegmentRawIter::new(
tx,
segment_id,
self.address.scan(segment_id)?,
read_snapshot,
))
}
pub fn update(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
let allocator = &self.allocator;
let journal = &self.journal;
let len = rec.len();
let metadata = PersyImpl::write_record_metadata(len as u64, &rec_ref);
let allocation_exp = exp_from_content_size((len + metadata.len()) as u64);
let mut pg = allocator.allocate(allocation_exp)?;
let page = pg.get_index();
tx.add_update(journal, segment, &rec_ref, page, version)?;
pg.write_all(&metadata);
pg.write_all(rec);
allocator.flush_page(pg)
} else {
Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
}
}
pub fn delete(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef) -> PRes<()> {
if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
tx.add_delete(&self.journal, seg, &rec_ref, version)
} else {
Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
}
}
pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
tx.rollback(self)
}
pub fn prepare(&self, tx: Transaction) -> PRes<TxFinalize> {
let (tx, prepared) = tx.prepare(self)?;
Ok(TxFinalize {
transaction: tx,
prepared,
})
}
pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
let prepared = finalizer.prepared.clone();
let tx = &mut finalizer.transaction;
tx.rollback_prepared(self, prepared)
}
pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
let prepared = finalizer.prepared.clone();
let tx = &mut finalizer.transaction;
tx.commit(self, prepared)
}
pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
Indexes::create_index::<K, V>(self, tx, index_name, 32, 128, value_mode)
}
pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
Indexes::drop_index(self, tx, index_name)
}
pub fn put<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: V) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
Indexes::check_index::<K, V>(self, tx, &index_id)?;
tx.add_put(index_id, k, v);
Ok(())
}
pub fn remove<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: Option<V>) -> PRes<()>
where
K: IndexType,
V: IndexType,
{
Indexes::check_index::<K, V>(self, tx, &index_id)?;
tx.add_remove(index_id, k, v);
Ok(())
}
pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
let (result, vm) = {
let mut ik = Indexes::get_index_keeper_tx::<K, V>(self, tx, &index_id)?;
self.indexes.read_lock(index_id.clone())?;
(ik.get(k)?, IndexKeeper::<K, V>::value_mode(&ik))
};
self.indexes.read_unlock(index_id.clone())?;
tx.apply_changes::<K, V>(vm, index_id, k, result)
}
pub fn get<K, V>(&self, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
let read_snapshot = self.snapshots.read_snapshot()?;
let r = self.get_snapshot(index_id, read_snapshot, k);
release_snapshot(read_snapshot, &self.snapshots, &self.allocator, &self.journal)?;
r
}
pub fn get_snapshot<K, V>(&self, index_id: IndexId, snapshot: SnapshotId, k: &K) -> PRes<Option<Value<V>>>
where
K: IndexType,
V: IndexType,
{
Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?.get(k)
}
pub fn index_next<K, V>(
&self,
index_id: &IndexId,
read_snapshot: SnapshotId,
next: Bound<&K>,
) -> PRes<PageIter<K, V>>
where
K: IndexType,
V: IndexType,
{
Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.iter_from(next)
}
pub fn index_back<K, V>(
&self,
index_id: &IndexId,
read_snapshot: SnapshotId,
next: Bound<&K>,
) -> PRes<PageIterBack<K, V>>
where
K: IndexType,
V: IndexType,
{
Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.back_iter_from(next)
}
pub fn range<K, V, R>(&self, index_id: IndexId, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let read_snapshot = self.snapshots.read_snapshot()?;
self.range_snapshot(index_id, read_snapshot, range, true)
}
pub fn range_snapshot<K, V, R>(
&self,
index_id: IndexId,
snapshot: SnapshotId,
range: R,
release_snapshot: bool,
) -> PRes<(ValueMode, IndexRawIter<K, V>)>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let mut ik = Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?;
let after = ik.iter_from(range.start_bound())?;
let before = ik.back_iter_from(range.end_bound())?;
Ok((
IndexKeeper::<K, V>::value_mode(&ik),
IndexRawIter::new(index_id, snapshot, after, before, release_snapshot),
))
}
pub fn segment_name_tx(&self, tx: &Transaction, id: SegmentId) -> PRes<Option<(String, bool)>> {
if tx.segment_created_in_tx(id) {
Ok(tx.segment_name_by_id(id).map(|x| (x, true)))
} else {
self.address.segment_name_by_id(id).map(|k| k.map(|x| (x, false)))
}
}
pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
Ok(self
.address
.list()?
.into_iter()
.filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
.collect())
}
pub fn list_segments_snapshot(&self, snapshot_id: SnapshotId) -> PRes<Vec<(String, SegmentId)>> {
let list = self.snapshots.list(snapshot_id)?;
Ok(list
.into_iter()
.filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
.collect())
}
pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
let snapshot = self.snapshot()?;
let res = self.list_indexes_snapshot(snapshot);
release_snapshot(snapshot, &self.snapshots, &self.allocator, &self.journal)?;
res
}
pub fn list_indexes_snapshot(&self, snapshot: SnapshotId) -> PRes<Vec<(String, IndexInfo)>> {
let list = self.snapshots.list(snapshot)?;
list.into_iter()
.filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
.map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
name.drain(..INDEX_META_PREFIX.len());
let info = self.index_info(snapshot, &name)?;
Ok((name, info))
})
.collect()
}
pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, SegmentId)>> {
Ok(tx
.filter_list(&self.address.list()?)
.filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
.map(|(name, id)| (name.to_string(), id))
.collect())
}
pub fn list_indexes_tx(&self, tx: &Transaction) -> PRes<Vec<(String, IndexInfo)>> {
tx.filter_list(&self.address.list()?)
.filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
.map(|(name, id)| (name.to_string(), id))
.map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
name.drain(..INDEX_META_PREFIX.len());
let info = self.index_info_tx(tx, &name)?;
Ok((name, info))
})
.collect()
}
fn index_info(&self, snapshot: SnapshotId, name: &str) -> PRes<IndexInfo> {
let id = self.solve_index_id_snapshot(snapshot, name)?;
let index = Indexes::get_index(self, snapshot, &id)?;
Ok(IndexInfo {
id,
value_mode: index.value_mode,
key_type: IndexTypeId::from(index.key_type),
value_type: IndexTypeId::from(index.value_type),
})
}
fn index_info_tx(&self, tx: &Transaction, name: &str) -> PRes<IndexInfo> {
let id = self.solve_index_id_tx(tx, name)?.0;
let (index, _version) = Indexes::get_index_tx(self, tx, &id)?;
Ok(IndexInfo {
id,
value_mode: index.value_mode,
key_type: IndexTypeId::from(index.key_type),
value_type: IndexTypeId::from(index.value_type),
})
}
pub(crate) fn journal(&self) -> &Journal {
&self.journal
}
pub(crate) fn address(&self) -> &Address {
&self.address
}
pub(crate) fn allocator(&self) -> &Allocator {
&self.allocator
}
pub(crate) fn indexes(&self) -> &Indexes {
&self.indexes
}
pub(crate) fn snapshots(&self) -> &Snapshots {
&self.snapshots
}
#[cfg(feature = "background_ops")]
pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, snapshot_id: SnapshotId) -> PRes<()> {
match sync_mode {
SyncMode::Sync => {
let allocator = self.allocator();
allocator.disc().sync()
}
SyncMode::BackgroundSync => {
self.snapshots.acquire(snapshot_id)?;
self.background_ops.add_pending(snapshot_id)
}
}
}
#[cfg(not(feature = "background_ops"))]
pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, _snapshot_id: SnapshotId) -> PRes<()> {
match sync_mode {
SyncMode::Sync => {
let allocator = self.allocator();
allocator.disc().sync()
}
SyncMode::BackgroundSync => unreachable!(),
}
}
}
pub fn exp_from_content_size(size: u64) -> u8 {
let final_size = size + u64::from(PAGE_METADATA_SIZE);
let final_size = u64::max(final_size, 32);
let mut res: u8 = 1;
loop {
if final_size < (1 << res) {
return res;
}
res += 1;
}
}