mod op;
mod subprogram;
mod codegen;
mod label;
pub(crate) use subprogram::SubProgram;
use std::vec::Vec;
use std::cmp::Ordering;
use bson::Bson;
use op::DbOp;
use crate::cursor::Cursor;
use crate::page_handler::PageHandler;
use crate::btree::{HEADER_SIZE, ITEM_SIZE};
use crate::{TransactionType, DbResult, DbErr};
use crate::error::{mk_field_name_type_unexpected, mk_unexpected_type_for_op};
use std::cell::Cell;
const STACK_SIZE: usize = 256;
macro_rules! try_vm {
($self:ident, $action:expr) => {
match $action {
Ok(result) => result,
Err(err) => {
$self.state = VmState::Halt;
return Err(err);
}
}
}
}
#[repr(i8)]
#[derive(PartialEq, Copy, Clone)]
pub enum VmState {
Halt = -1,
Init = 0,
Running = 1,
HasRow = 2,
}
pub struct VM<'a> {
pub(crate) state: VmState,
pc: *const u8,
r0: i32, r1: Option<Box<Cursor>>,
pub(crate) r2: i64, r3: usize,
page_handler: &'a mut PageHandler,
stack: Vec<Bson>,
pub(crate) program: Box<SubProgram>,
rollback_on_drop: bool,
}
fn generic_cmp(op: DbOp, val1: &Bson, val2: &Bson) -> DbResult<bool> {
let ord = crate::bson_utils::value_cmp(val1, val2)?;
let result = matches!((op, ord),
(DbOp::Equal, Ordering::Equal) |
(DbOp::Greater, Ordering::Greater) |
(DbOp::GreaterEqual, Ordering::Equal) |
(DbOp::GreaterEqual, Ordering::Greater) |
(DbOp::Less, Ordering::Less) |
(DbOp::LessEqual, Ordering::Equal) |
(DbOp::LessEqual, Ordering::Less)
);
Ok(result)
}
impl<'a> VM<'a> {
pub(crate) fn new(page_handler: &mut PageHandler, program: Box<SubProgram>) -> VM {
let stack = Vec::with_capacity(STACK_SIZE);
let pc = program.instructions.as_ptr();
VM {
state: VmState::Init,
pc,
r0: 0,
r1: None,
r2: 0,
r3: 0,
page_handler,
stack,
program,
rollback_on_drop: false,
}
}
#[inline]
fn item_size(&self) -> u32 {
(self.page_handler.page_size.get() - HEADER_SIZE) / ITEM_SIZE
}
fn auto_start_transaction(&mut self, ty: TransactionType) -> DbResult<()> {
let result = self.page_handler.auto_start_transaction(ty)?;
if result.auto_start {
self.rollback_on_drop = true;
}
Ok(())
}
fn open_read(&mut self, root_pid: u32) -> DbResult<()> {
self.auto_start_transaction(TransactionType::Read)?;
self.r1 = Some(Box::new(Cursor::new(self.item_size(), root_pid)));
Ok(())
}
fn open_write(&mut self, root_pid: u32) -> DbResult<()> {
self.auto_start_transaction(TransactionType::Write)?;
self.r1 = Some(Box::new(Cursor::new(self.item_size(), root_pid)));
Ok(())
}
fn reset_cursor(&mut self, is_empty: &Cell<bool>) -> DbResult<()> {
let cursor = self.r1.as_mut().unwrap();
cursor.reset(self.page_handler)?;
if cursor.has_next() {
let item = cursor.peek().unwrap();
let doc = self.page_handler.get_doc_from_ticket(&item)?.unwrap();
self.stack.push(Bson::Document(doc));
is_empty.set(false);
} else {
is_empty.set(true);
}
Ok(())
}
fn find_by_primary_key(&mut self) -> DbResult<bool> {
let cursor = self.r1.as_mut().unwrap();
let top_index = self.stack.len() - 1;
let op = &self.stack[top_index];
let result = cursor.reset_by_pkey(self.page_handler, op)?;
if !result {
return Ok(false);
}
let ticket = cursor.peek().unwrap();
let doc = self.page_handler.get_doc_from_ticket(&ticket)?;
if let Some(doc) = doc {
self.stack.push(Bson::Document(doc));
Ok(true)
} else {
panic!("unexpected: item with key '{}' has been deleted, pid: {}, index: {}", op, ticket.pid, ticket.index);
}
}
fn next(&mut self) -> DbResult<()> {
let cursor = self.r1.as_mut().unwrap();
let _ = cursor.next(self.page_handler)?;
match cursor.peek() {
Some(ticket) => {
let doc = self.page_handler.get_doc_from_ticket(&ticket)?.unwrap();
self.stack.push(Bson::Document(doc));
debug_assert!(self.stack.len() <= 64, "stack too large: {}", self.stack.len());
self.r0 = 1;
}
None => {
self.r0 = 0;
}
}
Ok(())
}
pub(crate) fn stack_top(&self) -> &Bson {
&self.stack[self.stack.len() - 1]
}
#[inline]
fn reset_location(&mut self, location: u32) {
unsafe {
self.pc = self.program.instructions.as_ptr().add(location as usize);
}
}
fn borrow_static(&self, index: usize) -> &Bson {
&self.program.static_values[index]
}
fn inc_field(&mut self, field_id: usize) -> DbResult<()> {
let key = self.program.static_values[field_id].as_str().unwrap();
let value_index = self.stack.len() - 1;
let doc_index = self.stack.len() - 2;
let value = self.stack[value_index].clone();
let mut_doc = self.stack[doc_index].as_document_mut().unwrap();
match mut_doc.get(key) {
Some(Bson::Null) => {
return Err(DbErr::IncrementNullField);
}
Some(Bson::Int64(original_int_value)) => {
let new_value = match value {
Bson::Int64(inc_int_value) => {
let new_value = *original_int_value + inc_int_value;
Bson::Int64(new_value)
}
Bson::Double(inc_double_value) => {
let new_value = *original_int_value as f64 + inc_double_value;
Bson::Double(new_value)
}
_ => {
let name = format!("{}", value);
return Err(mk_field_name_type_unexpected(
key.into(),
"number".into(),
name));
}
};
mut_doc.insert::<String, Bson>(key.into(), new_value);
}
Some(Bson::Double(original_float_value)) => {
let new_value = match value {
Bson::Int64(inc_int_value) => {
let new_value = *original_float_value + inc_int_value as f64;
Bson::Double(new_value)
}
Bson::Double(inc_float_value) => {
let new_value = *original_float_value + inc_float_value;
Bson::Double(new_value)
}
_ => {
let name = format!("{}", value);
return Err(mk_field_name_type_unexpected(
key.into(),
"number".into(),
name));
}
};
mut_doc.insert::<String, Bson>(key.into(), new_value);
}
Some(_ty) => {
let name = format!("{}", value);
return Err(mk_field_name_type_unexpected(
key.into(),
"number".into(),
name));
}
None => {
mut_doc.insert::<String, Bson>(key.into(), value);
}
}
Ok(())
}
fn mul_field(&mut self, field_id: usize) -> DbResult<()> {
let key = self.program.static_values[field_id].as_str().unwrap();
let value_index = self.stack.len() - 1;
let doc_index = self.stack.len() - 2;
let value = self.stack[value_index].clone();
let mut_doc = self.stack[doc_index].as_document_mut().unwrap();
match mut_doc.get(key) {
Some(Bson::Int64(original_int_value)) => {
let new_value = match value {
Bson::Int64(inc_int_value) => {
let new_value = *original_int_value * inc_int_value;
Bson::Int64(new_value)
}
Bson::Double(inc_double_value) => {
let new_value = *original_int_value as f64 * inc_double_value;
Bson::Double(new_value)
}
_ => {
let name = format!("{}", value);
return Err(mk_field_name_type_unexpected(
key.into(),
"number".into(),
name));
}
};
mut_doc.insert::<String, Bson>(key.into(), new_value);
}
Some(Bson::Double(original_float_value)) => {
let new_value = match value {
Bson::Int64(inc_int_value) => {
let new_value = *original_float_value * inc_int_value as f64;
Bson::Double(new_value)
}
Bson::Double(inc_float_value) => {
let new_value = *original_float_value * inc_float_value;
Bson::Double(new_value)
}
_ => {
let name = format!("{}", value);
return Err(mk_field_name_type_unexpected(
key.into(),
"number".into(),
name));
}
};
mut_doc.insert::<String, Bson>(key.into(), new_value);
}
Some(_ty) => {
let name = format!("{}", value);
return Err(mk_field_name_type_unexpected(
key.into(),
"number".into(),
name));
}
None => {
mut_doc.insert::<String, Bson>(key.into(), value);
}
}
Ok(())
}
fn unset_field(&mut self, field_id: u32) -> DbResult<()> {
let key = self.program.static_values[field_id as usize].as_str().unwrap();
let doc_index = self.stack.len() - 1;
let mut_doc = self.stack[doc_index].as_document_mut().unwrap();
let _ = mut_doc.remove(key);
Ok(())
}
fn array_size(&mut self) -> DbResult<usize> {
let top = self.stack.len() - 1;
let doc = crate::try_unwrap_array!("ArraySize", &self.stack[top]);
Ok(doc.len())
}
fn array_push(&mut self) -> DbResult<()> {
let st = self.stack.len();
let val = self.stack[st - 1].clone();
let array_value = match &mut self.stack[st - 2] {
Bson::Array(arr) => arr,
_ => {
let name = format!("{}", self.stack[st- 2]);
return Err(DbErr::UnexpectedTypeForOp(mk_unexpected_type_for_op(
"$push", "Array", name
)))
}
};
array_value.push(val);
Ok(())
}
fn array_pop_first(&mut self) -> DbResult<()> {
let st = self.stack.len();
let array_value = match &mut self.stack[st - 1] {
Bson::Array(arr) => arr,
_ => {
let name = format!("{}", self.stack[st - 1]);
return Err(DbErr::UnexpectedTypeForOp(mk_unexpected_type_for_op(
"$pop", "Array", name
)))
}
};
array_value.drain(0..1);
Ok(())
}
fn array_pop_last(&mut self) -> DbResult<()> {
let st = self.stack.len();
let array_value = match &mut self.stack[st - 1] {
Bson::Array(arr) => arr,
_ => {
let name = format!("{}", self.stack[st - 1]);
return Err(DbErr::UnexpectedTypeForOp(mk_unexpected_type_for_op(
"$pop", "Array", name
)))
}
};
array_value.pop();
Ok(())
}
pub(crate) fn execute(&mut self) -> DbResult<()> {
if self.state == VmState::Halt {
return Err(DbErr::VmIsHalt);
}
self.state = VmState::Running;
unsafe {
loop {
let op = self.pc.cast::<DbOp>().read();
match op {
DbOp::Goto => {
let location = self.pc.add(1).cast::<u32>().read();
self.reset_location(location);
}
DbOp::Label => {
self.pc = self.pc.add(5);
}
DbOp::IncR2 => {
self.r2 += 1;
self.pc = self.pc.add(1);
}
DbOp::IfTrue => {
let location = self.pc.add(1).cast::<u32>().read();
if self.r0 != 0 { self.reset_location(location);
} else {
self.pc = self.pc.add(5);
}
}
DbOp::IfFalse => {
let location = self.pc.add(1).cast::<u32>().read();
if self.r0 == 0 { self.reset_location(location);
} else {
self.pc = self.pc.add(5);
}
}
DbOp::Rewind => {
let location = self.pc.add(1).cast::<u32>().read();
let is_empty = Cell::new(false);
try_vm!(self, self.reset_cursor(&is_empty));
if is_empty.get() {
self.reset_location(location);
} else {
self.pc = self.pc.add(5);
}
}
DbOp::FindByPrimaryKey => {
let location = self.pc.add(1).cast::<u32>().read();
let found = try_vm!(self, self.find_by_primary_key());
if !found {
self.reset_location(location);
} else {
self.pc = self.pc.add(5);
}
}
DbOp::Next => {
try_vm!(self, self.next());
if self.r0 != 0 {
let location = self.pc.add(1).cast::<u32>().read();
self.reset_location(location);
} else {
self.pc = self.pc.add(5);
}
}
DbOp::PushValue => {
let id = self.pc.add(1).cast::<u32>().read();
let value = self.borrow_static(id as usize).clone();
self.stack.push(value);
self.pc = self.pc.add(5);
}
DbOp::PushR0 => {
self.stack.push(Bson::from(self.r0));
self.pc = self.pc.add(1);
}
DbOp::StoreR0 => {
let top = self.stack_top().as_i64().unwrap();
self.r0 = top as i32;
self.pc = self.pc.add(1);
}
DbOp::GetField => {
let key_stat_id = self.pc.add(1).cast::<u32>().read();
let location = self.pc.add(5).cast::<u32>().read();
let key = self.borrow_static(key_stat_id as usize);
let key_name = key.as_str().unwrap();
let top = self.stack[self.stack.len() - 1].clone();
let doc = match top {
Bson::Document(doc) => doc,
_ => {
let name = format!("{}", top);
let err = mk_field_name_type_unexpected(
key_name.into(),
"Document".into(),
name);
self.state = VmState::Halt;
return Err(err)
}
};
match doc.get(key_name) {
Some(val) => {
self.stack.push(val.clone());
self.pc = self.pc.add(9);
}
None => {
self.reset_location(location);
}
}
}
DbOp::UnsetField => {
let field_id = self.pc.add(1).cast::<u32>().read();
try_vm!(self, self.unset_field(field_id));
self.pc = self.pc.add(5);
}
DbOp::IncField => {
let filed_id = self.pc.add(1).cast::<u32>().read();
try_vm!(self, self.inc_field(filed_id as usize));
self.pc = self.pc.add(5);
}
DbOp::MulField => {
let filed_id = self.pc.add(1).cast::<u32>().read();
try_vm!(self, self.mul_field(filed_id as usize));
self.pc = self.pc.add(5);
}
DbOp::SetField => {
let filed_id = self.pc.add(1).cast::<u32>().read();
let key = self.program.static_values[filed_id as usize].as_str().unwrap();
let value_index = self.stack.len() - 1;
let doc_index = self.stack.len() - 2;
let value = self.stack[value_index].clone();
let mut_doc = self.stack[doc_index].as_document_mut().unwrap();
mut_doc.insert::<String, Bson>(key.into(), value);
self.pc = self.pc.add(5);
}
DbOp::ArraySize => {
let size = try_vm!(self, self.array_size());
self.stack.push(Bson::from(size as i64));
self.pc = self.pc.add(1);
}
DbOp::ArrayPush => {
try_vm!(self, self.array_push());
self.pc = self.pc.add(1);
}
DbOp::ArrayPopFirst => {
try_vm!(self, self.array_pop_first());
self.pc = self.pc.add(1);
}
DbOp::ArrayPopLast => {
try_vm!(self, self.array_pop_last());
self.pc = self.pc.add(1);
}
DbOp::UpdateCurrent => {
let top_index = self.stack.len() - 1;
let top_value = &self.stack[top_index];
let doc = top_value.as_document().unwrap();
self.r1.as_mut().unwrap().update_current(self.page_handler, doc)?;
self.pc = self.pc.add(1);
}
DbOp::Pop => {
self.stack.pop();
self.pc = self.pc.add(1);
}
DbOp::Pop2 => {
let offset = self.pc.add(1).cast::<u32>().read();
self.stack.set_len(self.stack.len() - (offset as usize));
self.pc = self.pc.add(5);
}
DbOp::Equal | DbOp::Greater | DbOp::GreaterEqual |
DbOp::Less | DbOp::LessEqual => {
let val1 = &self.stack[self.stack.len() - 2];
let val2 = &self.stack[self.stack.len() - 1];
let cmp = try_vm!(self, generic_cmp(op, val1, val2));
self.r0 = if cmp {
1
} else {
0
};
self.pc = self.pc.add(1);
}
DbOp::In => {
let top1 = &self.stack[self.stack.len() - 1];
let top2 = &self.stack[self.stack.len() - 2];
self.r0 = 0;
for item in top1.as_array().unwrap().iter() {
let cmp_result = crate::bson_utils::value_cmp(top2, item);
if let Ok(Ordering::Equal) = cmp_result {
self.r0 = 1;
break;
}
}
self.pc = self.pc.add(1);
}
DbOp::OpenRead => {
let root_pid = self.pc.add(1).cast::<u32>().read();
try_vm!(self, self.open_read(root_pid));
self.pc = self.pc.add(5);
}
DbOp::OpenWrite => {
let root_pid = self.pc.add(1).cast::<u32>().read();
try_vm!(self, self.open_write(root_pid));
self.pc = self.pc.add(5);
}
DbOp::ResultRow => {
self.pc = self.pc.add(1);
self.state = VmState::HasRow;
return Ok(());
}
DbOp::Close => {
self.r1 = None;
if self.rollback_on_drop {
self.page_handler.auto_commit()?;
self.rollback_on_drop = false;
}
self.pc = self.pc.add(1);
}
DbOp::SaveStackPos => {
self.r3 = self.stack.len();
self.pc = self.pc.add(1);
}
DbOp::RecoverStackPos => {
self.stack.resize(self.r3, Bson::Null);
self.pc = self.pc.add(1);
}
DbOp::_EOF |
DbOp::Halt => {
self.r1 = None;
self.state = VmState::Halt;
return Ok(());
}
}
}
}
}
pub(crate) fn commit_and_close(mut self) -> DbResult<()> {
self.page_handler.auto_commit()?;
self.rollback_on_drop = false;
Ok(())
}
pub(crate) fn set_rollback_on_drop(&mut self, value: bool) {
self.rollback_on_drop = value;
}
}
impl<'a> Drop for VM<'a> {
fn drop(&mut self) {
if self.rollback_on_drop {
let _result = self.page_handler.rollback();
#[cfg(debug_assertions)]
if let Err(err) = _result {
panic!("rollback fatal: {}", err);
}
}
}
}