use std::rc::Rc;
use std::borrow::Borrow;
use std::path::{Path, PathBuf};
use std::num::NonZeroU32;
use polodb_bson::{Document, Value, ObjectIdMaker, mk_document};
use super::page::{header_page_wrapper, PageHandler};
use super::error::DbErr;
use crate::Config;
use crate::vm::{SubProgram, VM, VmState};
use crate::db::DbResult;
use crate::meta_doc_helper::{meta_doc_key, MetaDocEntry};
use crate::index_ctx::{IndexCtx, merge_options_into_default};
use crate::btree::*;
use crate::page::{RawPage, TransactionState};
use crate::db_handle::DbHandle;
use crate::journal::TransactionType;
use crate::dump::{FullDump, PageDump, OverflowDataPageDump, DataPageDump, FreeListPageDump, BTreePageDump};
use crate::page::header_page_wrapper::HeaderPageWrapper;
macro_rules! try_db_op {
($self: tt, $action: expr) => {
match $action {
Ok(ret) => {
$self.page_handler.auto_commit()?;
ret
}
Err(err) => {
$self.page_handler.auto_rollback()?;
$self.reset_meta_version()?;
return Err(err);
}
}
}
}
#[inline]
fn index_already_exists(index_doc: &Document, key: &str) -> bool {
index_doc.get(key).is_some()
}
pub struct DbContext {
path: PathBuf,
page_handler: Box<PageHandler>,
obj_id_maker: ObjectIdMaker,
meta_version: u32,
}
#[derive(Debug, Clone, Copy)]
pub struct MetaSource {
pub meta_version: u32,
pub meta_id_counter: u32,
pub meta_pid: u32,
}
#[derive(Debug, Clone, Copy)]
pub struct CollectionMeta {
pub id: u32,
pub meta_version: u32,
}
impl DbContext {
pub fn new(path: &Path, config: Config) -> DbResult<DbContext> {
let page_size = NonZeroU32::new(4096).unwrap();
let page_handler = PageHandler::with_config(path, page_size, Rc::new(config))?;
let obj_id_maker = ObjectIdMaker::new();
let mut ctx = DbContext {
path: path.to_path_buf(),
page_handler: Box::new(page_handler),
obj_id_maker,
meta_version: 0,
};
let meta_source = ctx.get_meta_source()?;
ctx.meta_version = meta_source.meta_version;
Ok(ctx)
}
fn reset_meta_version(&mut self) -> DbResult<()> {
let meta = self.get_meta_source()?;
self.meta_version = meta.meta_version;
Ok(())
}
fn check_meta_version(&self, actual_meta_version: u32) -> DbResult<()> {
if self.meta_version != actual_meta_version {
return Err(DbErr::MetaVersionMismatched(self.meta_version, actual_meta_version));
}
Ok(())
}
pub fn get_collection_meta_by_name(&mut self, name: &str) -> DbResult<CollectionMeta> {
self.page_handler.auto_start_transaction(TransactionType::Read)?;
let result = try_db_op!(self, self.internal_get_collection_id_by_name(name));
Ok(result)
}
fn internal_get_collection_id_by_name(&mut self, name: &str) -> DbResult<CollectionMeta> {
let meta_src = self.get_meta_source()?;
let collection_meta = MetaDocEntry::new(0, "<meta>".into(), meta_src.meta_pid);
let query_doc = mk_document! {
"name": name,
};
let meta_doc = mk_document!{};
let subprogram = SubProgram::compile_query(
&collection_meta,
&meta_doc,
&query_doc,
true)?;
let mut handle = self.make_handle(subprogram);
handle.step()?;
if handle.state() == (VmState::HasRow as i8) {
let doc = handle.get().unwrap_document();
let int_raw = doc.pkey_id().unwrap().unwrap_int();
handle.commit_and_close_vm()?;
return Ok(CollectionMeta {
id: int_raw as u32,
meta_version: self.meta_version
});
}
Err(DbErr::CollectionNotFound(name.into()))
}
pub(crate) fn get_meta_source(&mut self) -> DbResult<MetaSource> {
let head_page = self.page_handler.pipeline_read_page(0)?;
DbContext::check_first_page_valid(&head_page)?;
let head_page_wrapper = header_page_wrapper::HeaderPageWrapper::from_raw_page(head_page);
let meta_id_counter = head_page_wrapper.get_meta_id_counter();
let meta_version = head_page_wrapper.get_meta_version();
let meta_pid = head_page_wrapper.get_meta_page_id();
Ok(MetaSource {
meta_id_counter,
meta_version,
meta_pid,
})
}
fn check_first_page_valid(page: &RawPage) -> DbResult<()> {
let mut title_area: [u8; 32] = [0; 32];
title_area.copy_from_slice(&page.data[0..32]);
match std::str::from_utf8(&title_area) {
Ok(s) => {
if !s.starts_with("PoloDB") {
return Err(DbErr::NotAValidDatabase);
}
Ok(())
},
Err(_) => Err(DbErr::NotAValidDatabase),
}
}
pub fn create_collection(&mut self, name: &str) -> DbResult<CollectionMeta> {
self.page_handler.auto_start_transaction(TransactionType::Write)?;
let meta = try_db_op!(self, self.internal_create_collection(name));
Ok(meta)
}
fn check_collection_exist(&mut self, name: &str, meta_src: &MetaSource) -> DbResult<bool> {
let collection_meta = MetaDocEntry::new(0, "<meta>".into(), meta_src.meta_pid);
let query_doc = mk_document! {
"name": name,
};
let meta_doc = mk_document!{};
let subprogram = SubProgram::compile_query(
&collection_meta,
&meta_doc,
&query_doc,
true)?;
let mut handle = self.make_handle(subprogram);
handle.set_rollback_on_drop(false);
handle.step()?;
if handle.state() == (VmState::HasRow as i8) {
return Ok(true);
}
Ok(false)
}
fn internal_create_collection(&mut self, name: &str) -> DbResult<CollectionMeta> {
if name.is_empty() {
return Err(DbErr::IllegalCollectionName(name.into()));
}
let mut meta_source = self.get_meta_source()?;
let exist = self.check_collection_exist(name, &meta_source)?;
if exist {
return Err(DbErr::CollectionAlreadyExits(name.into()));
}
let mut doc = Document::new_without_id();
let collection_id = meta_source.meta_id_counter;
doc.insert(meta_doc_key::ID.into(), collection_id.into());
doc.insert(meta_doc_key::NAME.into(), name.into());
let root_pid = self.page_handler.alloc_page_id()?;
doc.insert(meta_doc_key::ROOT_PID.into(), Value::Int(root_pid as i64));
doc.insert(meta_doc_key::FLAGS.into(), Value::Int(0));
let mut btree_wrapper = BTreePageInsertWrapper::new(
&mut self.page_handler, meta_source.meta_pid);
let insert_result = btree_wrapper.insert_item(&doc, false)?;
if let Some(backward_item) = insert_result.backward_item {
let new_root_id = self.page_handler.alloc_page_id()?;
let raw_page = backward_item.write_to_page(&mut self.page_handler,
new_root_id, meta_source.meta_pid)?;
self.page_handler.pipeline_write_page(&raw_page)?;
meta_source.meta_pid = new_root_id;
}
meta_source.meta_id_counter += 1;
meta_source.meta_version += 1;
self.update_meta_source(&meta_source)?;
Ok(CollectionMeta {
id: collection_id,
meta_version: meta_source.meta_version,
})
}
fn update_meta_source(&mut self, meta_source: &MetaSource) -> DbResult<()> {
let head_page = self.page_handler.pipeline_read_page(0)?;
let mut head_page_wrapper = header_page_wrapper::HeaderPageWrapper::from_raw_page(head_page);
head_page_wrapper.set_meta_page_id(meta_source.meta_pid);
head_page_wrapper.set_meta_id_counter(meta_source.meta_id_counter);
head_page_wrapper.set_meta_version(meta_source.meta_version);
self.meta_version = meta_source.meta_version;
self.page_handler.pipeline_write_page(&head_page_wrapper.0)
}
#[inline]
fn item_size(&self) -> u32 {
(self.page_handler.page_size.get() - HEADER_SIZE) / ITEM_SIZE
}
pub(crate) fn make_handle(&mut self, program: SubProgram) -> DbHandle {
let vm = VM::new(&mut self.page_handler, Box::new(program));
DbHandle::new(vm)
}
pub(crate) fn find_collection_root_pid_by_id(&mut self, parent_pid: u32, root_pid: u32, id: u32) -> DbResult<MetaDocEntry> {
let raw_page = self.page_handler.pipeline_read_page(root_pid)?;
let item_size = self.item_size();
let btree_node = BTreeNode::from_raw(&raw_page, parent_pid, item_size, &mut self.page_handler)?;
let key = Value::from(id);
if btree_node.is_empty() {
return Err(DbErr::CollectionIdNotFound(id));
}
let result = btree_node.search(&key)?;
match result {
SearchKeyResult::Node(node_index) => {
let item = &btree_node.content[node_index];
let doc = self.page_handler.get_doc_from_ticket(&item.data_ticket)?.unwrap();
let entry = MetaDocEntry::from_doc(doc);
Ok(entry)
}
SearchKeyResult::Index(child_index) => {
let next_pid = btree_node.indexes[child_index];
if next_pid == 0 {
return Err(DbErr::CollectionIdNotFound(id));
}
self.find_collection_root_pid_by_id(root_pid, next_pid, id)
}
}
}
pub fn create_index(&mut self, col_id: u32, keys: &Document, options: Option<&Document>) -> DbResult<()> {
self.page_handler.auto_start_transaction(TransactionType::Write)?;
try_db_op!(self, self.internal_create_index(col_id, keys, options));
Ok(())
}
fn internal_create_index(&mut self, col_id: u32, keys: &Document, options: Option<&Document>) -> DbResult<()> {
let meta_source = self.get_meta_source()?;
let mut meta_doc = self.find_collection_root_pid_by_id(
0, meta_source.meta_pid, col_id)?;
for (key_name, value_of_key) in keys.iter() {
if let Value::Int(1) = value_of_key {
} else {
return Err(DbErr::InvalidOrderOfIndex(key_name.clone()));
}
match meta_doc.doc_ref().get(meta_doc_key::INDEXES) {
Some(indexes_obj) => match indexes_obj {
Value::Document(index_doc) => {
if index_already_exists(index_doc.borrow(), key_name) {
return Err(DbErr::IndexAlreadyExists(key_name.clone()));
}
unimplemented!()
}
_ => {
panic!("unexpected: indexes object is not a Document");
}
},
None => {
let mut doc = Document::new_without_id();
let root_pid = self.page_handler.alloc_page_id()?;
let options_doc = merge_options_into_default(root_pid, options)?;
doc.insert(key_name.clone(), Value::Document(Rc::new(options_doc)));
meta_doc.set_indexes(doc);
}
}
}
let key_col = Value::from(col_id);
let meta_source = self.get_meta_source()?;
let inserted = self.update_by_root_pid(
0, meta_source.meta_pid, &key_col, meta_doc.doc_ref())?;
if !inserted {
panic!("update failed");
}
Ok(())
}
#[inline]
fn fix_doc(&mut self, doc: &mut Document) -> bool {
if doc.get(meta_doc_key::ID).is_some() {
return false;
}
let new_oid = self.obj_id_maker.mk_object_id();
doc.insert(meta_doc_key::ID.into(), new_oid.into());
true
}
pub fn insert(&mut self, col_id: u32, meta_version: u32, doc: &mut Document) -> DbResult<bool> {
self.check_meta_version(meta_version)?;
self.page_handler.auto_start_transaction(TransactionType::Write)?;
let changed = try_db_op!(self, self.internal_insert(col_id, doc));
Ok(changed)
}
fn internal_insert(&mut self, col_id: u32, doc: &mut Document) -> DbResult<bool> {
let meta_source = self.get_meta_source()?;
let changed = self.fix_doc(doc);
let mut collection_meta = self.find_collection_root_pid_by_id(
0, meta_source.meta_pid, col_id)?;
let pkey = doc.pkey_id().unwrap();
let mut is_pkey_check_skipped = false;
collection_meta.check_pkey_ty(&pkey, &mut is_pkey_check_skipped)?;
let mut is_meta_changed = false;
let mut index_ctx_opt = IndexCtx::from_meta_doc(collection_meta.doc_ref());
if let Some(index_ctx) = &mut index_ctx_opt {
let mut is_ctx_changed = false;
index_ctx.insert_index_by_content(
doc,
&pkey,
&mut is_ctx_changed,
&mut self.page_handler
)?;
if is_ctx_changed {
index_ctx.merge_to_meta_doc(&mut collection_meta);
is_meta_changed = true;
}
}
let mut insert_wrapper = BTreePageInsertWrapper::new(
&mut self.page_handler, collection_meta.root_pid());
let insert_result: InsertResult = insert_wrapper.insert_item(doc, false)?;
if let Some(backward_item) = &insert_result.backward_item {
let root_pid = collection_meta.root_pid();
self.handle_insert_backward_item(&mut collection_meta, root_pid, backward_item)?;
is_meta_changed = true;
}
if is_pkey_check_skipped {
collection_meta.merge_pkey_ty_to_meta(doc);
is_meta_changed = true;
}
if is_meta_changed {
let key = Value::from(col_id);
let updated= self.update_by_root_pid(
0, meta_source.meta_pid, &key, collection_meta.doc_ref())?;
if !updated {
panic!("unexpected: update meta page failed")
}
}
Ok(changed)
}
pub fn find(&mut self, col_id: u32, meta_version: u32, query: Option<&Document>) -> DbResult<DbHandle> {
self.check_meta_version(meta_version)?;
let meta_source = self.get_meta_source()?;
let collection_meta = self.find_collection_root_pid_by_id(
0, meta_source.meta_pid, col_id)?;
let subprogram = match query {
Some(query) => SubProgram::compile_query(
&collection_meta,
collection_meta.doc_ref(),
query,
true
),
None => SubProgram::compile_query_all(&collection_meta, true),
}?;
let handle = self.make_handle(subprogram);
Ok(handle)
}
pub fn update(&mut self, col_id: u32, meta_version: u32, query: Option<&Document>, update: &Document) -> DbResult<usize> {
self.check_meta_version(meta_version)?;
self.page_handler.auto_start_transaction(TransactionType::Write)?;
let result = try_db_op!(self, self.internal_update(col_id, query, update));
Ok(result)
}
fn internal_update(&mut self, col_id: u32, query: Option<&Document>, update: &Document) -> DbResult<usize> {
let meta_source = self.get_meta_source()?;
let collection_meta = self.find_collection_root_pid_by_id(
0, meta_source.meta_pid, col_id)?;
let subprogram = SubProgram::compile_update(&collection_meta, query, update, true)?;
let mut vm = VM::new(&mut self.page_handler, Box::new(subprogram));
vm.execute()?;
Ok(vm.r2 as usize)
}
pub fn drop(&mut self, col_id: u32, meta_version: u32) -> DbResult<()> {
self.check_meta_version(meta_version)?;
self.page_handler.auto_start_transaction(TransactionType::Write)?;
try_db_op!(self, self.internal_drop(col_id));
Ok(())
}
fn internal_drop(&mut self, col_id: u32) -> DbResult<()> {
let mut meta_source = self.get_meta_source()?;
let collection_meta = self.find_collection_root_pid_by_id(
0, meta_source.meta_pid, col_id)?;
delete_all_helper::delete_all(&mut self.page_handler, collection_meta)?;
let mut btree_wrapper = BTreePageDeleteWrapper::new(
&mut self.page_handler, meta_source.meta_pid);
let pkey = Value::from(col_id);
btree_wrapper.delete_item(&pkey)?;
meta_source.meta_version += 1;
self.update_meta_source(&meta_source)
}
pub fn delete(&mut self, col_id: u32, meta_version: u32, query: &Document) -> DbResult<usize> {
let primary_keys = self.get_primary_keys_by_query(col_id, meta_version, Some(query))?;
self.page_handler.auto_start_transaction(TransactionType::Write)?;
let result = try_db_op!(self, self.internal_delete(col_id, &primary_keys));
Ok(result)
}
fn internal_delete(&mut self, col_id: u32, primary_keys: &[Value]) -> DbResult<usize> {
for pkey in primary_keys {
let _ = self.internal_delete_by_pkey(col_id, pkey)?;
}
Ok(primary_keys.len())
}
pub fn delete_all(&mut self, col_id: u32, meta_version: u32) -> DbResult<usize> {
let primary_keys = self.get_primary_keys_by_query(col_id, meta_version, None)?;
self.page_handler.auto_start_transaction(TransactionType::Write)?;
let result = try_db_op!(self, self.internal_delete(col_id, &primary_keys));
Ok(result)
}
fn get_primary_keys_by_query(&mut self, col_id: u32, meta_version: u32, query: Option<&Document>) -> DbResult<Vec<Value>> {
let mut handle = self.find(col_id, meta_version, query)?;
let mut buffer = vec![];
handle.step()?;
while handle.has_row() {
let doc = handle.get().unwrap_document();
let pkey = doc.pkey_id().unwrap();
buffer.push(pkey);
handle.step()?;
}
Ok(buffer)
}
fn update_by_root_pid(&mut self, parent_pid: u32, root_pid: u32, key: &Value, doc: &Document) -> DbResult<bool> {
let page = self.page_handler.pipeline_read_page(root_pid)?;
let btree_node = BTreeNode::from_raw(&page, parent_pid, self.item_size(), &mut self.page_handler)?;
let search_result = btree_node.search(key)?;
match search_result {
SearchKeyResult::Node(idx) => {
self.page_handler.free_data_ticket(&btree_node.content[idx].data_ticket)?;
let new_ticket = self.page_handler.store_doc(doc)?;
let new_btree_node = btree_node.clone_with_content(idx, BTreeNodeDataItem {
key: key.clone(),
data_ticket: new_ticket,
});
let mut page = RawPage::new(btree_node.pid, self.page_handler.page_size);
new_btree_node.to_raw(&mut page)?;
self.page_handler.pipeline_write_page(&page)?;
Ok(true)
}
SearchKeyResult::Index(idx) => {
let next_pid = btree_node.indexes[idx];
if next_pid == 0 {
return Ok(false);
}
self.update_by_root_pid(root_pid, next_pid, key, doc)
}
}
}
fn handle_insert_backward_item(&mut self,
collection_meta: &mut MetaDocEntry,
left_pid: u32,
backward_item: &InsertBackwardItem) -> DbResult<()> {
let new_root_id = self.page_handler.alloc_page_id()?;
crate::polo_log!("handle backward item, left_pid: {}, new_root_id: {}, right_pid: {}", left_pid, new_root_id, backward_item.right_pid);
let new_root_page = backward_item.write_to_page(&mut self.page_handler, new_root_id, left_pid)?;
self.page_handler.pipeline_write_page(&new_root_page)?;
collection_meta.set_root_pid(new_root_id);
Ok(())
}
#[allow(dead_code)]
pub(crate) fn delete_by_pkey(&mut self, col_id: u32, key: &Value) -> DbResult<Option<Rc<Document>>> {
self.page_handler.auto_start_transaction(TransactionType::Write)?;
let result = try_db_op!(self, self.internal_delete_by_pkey(col_id, key));
Ok(result)
}
fn internal_delete_by_pkey(&mut self, col_id: u32, key: &Value) -> DbResult<Option<Rc<Document>>> {
let meta_source = self.get_meta_source()?;
let collection_meta = self.find_collection_root_pid_by_id(
0, meta_source.meta_pid, col_id)?;
let mut delete_wrapper = BTreePageDeleteWrapper::new(
&mut self.page_handler,
collection_meta.root_pid() as u32
);
let result = delete_wrapper.delete_item(key)?;
delete_wrapper.flush_pages()?;
if let Some(deleted_item) = &result {
let index_ctx_opt = IndexCtx::from_meta_doc(collection_meta.doc_ref());
if let Some(index_ctx) = &index_ctx_opt {
index_ctx.delete_index_by_content(deleted_item.borrow(), &mut self.page_handler)?;
}
return Ok(result)
}
Ok(None)
}
pub fn count(&mut self, col_id: u32, meta_version: u32) -> DbResult<u64> {
self.check_meta_version(meta_version)?;
let meta_source = self.get_meta_source()?;
let collection_meta = self.find_collection_root_pid_by_id(
0, meta_source.meta_pid, col_id)?;
counter_helper::count(&mut self.page_handler, collection_meta)
}
pub fn query_all_meta(&mut self) -> DbResult<Vec<Rc<Document>>> {
unimplemented!()
}
pub fn start_transaction(&mut self, ty: Option<TransactionType>) -> DbResult<()> {
match ty {
Some(ty) => {
self.page_handler.start_transaction(ty)?;
self.page_handler.set_transaction_state(TransactionState::User);
}
None => {
self.page_handler.start_transaction(TransactionType::Read)?;
self.page_handler.set_transaction_state(TransactionState::UserAuto);
}
}
Ok(())
}
pub fn commit(&mut self) -> DbResult<()> {
self.page_handler.commit()?;
self.page_handler.set_transaction_state(TransactionState::NoTrans);
Ok(())
}
pub fn rollback(&mut self) -> DbResult<()> {
self.page_handler.rollback()?;
self.page_handler.set_transaction_state(TransactionState::NoTrans);
Ok(())
}
#[inline]
pub fn object_id_maker(&mut self) -> &mut ObjectIdMaker {
&mut self.obj_id_maker
}
pub fn dump(&mut self) -> DbResult<FullDump> {
let file_meta = self.page_handler.file_meta()?;
let first_page = self.page_handler.pipeline_read_page(0)?;
let first_page_wrapper = HeaderPageWrapper::from_raw_page(first_page);
let version = first_page_wrapper.get_version();
let meta_pid = first_page_wrapper.get_meta_page_id();
let free_list_pid = first_page_wrapper.get_free_list_page_id();
let free_list_size = first_page_wrapper.get_free_list_size();
let page_size = self.page_handler.page_size;
let pages = self.dump_all_pages(file_meta.len())?;
let journal_dump = self.page_handler.dump_journal()?;
let full_dump = FullDump {
path: self.path.clone(),
identifier: first_page_wrapper.get_title(),
version: dump_version(&version),
file_meta,
journal_dump,
meta_pid,
free_list_pid,
free_list_size,
page_size,
pages,
};
Ok(full_dump)
}
fn dump_all_pages(&mut self, file_len: u64) -> DbResult<Vec<PageDump>> {
let page_count = file_len / (self.page_handler.page_size.get() as u64);
let mut result = Vec::with_capacity(page_count as usize);
for index in 0..page_count {
let raw_page = self.page_handler.pipeline_read_page(index as u32)?;
result.push(dump_page(raw_page)?);
}
Ok(result)
}
pub fn get_version() -> String {
const VERSION: &str = env!("CARGO_PKG_VERSION");
VERSION.into()
}
}
fn dump_page(raw_page: RawPage) -> DbResult<PageDump> {
let first = raw_page.data[0];
let second = raw_page.data[1];
if first != 0xFF {
return Ok(PageDump::Undefined(raw_page.page_id));
}
let result = match second {
1 => PageDump::BTreePage(Box::new(BTreePageDump::from_page(&raw_page)?)),
2 => PageDump::OverflowDataPage(Box::new(OverflowDataPageDump)),
3 => PageDump::DataPage(Box::new(DataPageDump::from_page(&raw_page)?)),
4 => PageDump::FreeListPage(Box::new(FreeListPageDump::from_page(raw_page)?)),
_ => PageDump::Undefined(raw_page.page_id),
};
Ok(result)
}
fn dump_version(version: &[u8]) -> String {
let mut result = String::new();
let mut i: usize = 0;
while i < version.len() {
let digit = version[i] as u32;
let ch: char = std::char::from_digit(digit, 10).unwrap();
result.push(ch);
if i != version.len() - 1 {
result.push('.');
}
i += 1;
}
result
}
impl Drop for DbContext {
fn drop(&mut self) {
if self.page_handler.transaction_state() != TransactionState::NoTrans {
let _ = self.page_handler.only_rollback_journal();
}
let checkpoint_result = self.page_handler.checkpoint_journal(); if checkpoint_result.is_ok() {
let path = self.page_handler.journal_file_path().to_path_buf();
let _ = std::fs::remove_file(path); }
}
}