#![forbid(unsafe_code)]
#![deny(missing_docs)]
pub use crate::{
atomfile::AtomicFile,
builtin::standard_builtins,
pstore::{AccessPagedData, SharedPagedData},
stg::{MemFile, SimpleFileStorage, Storage},
};
#[cfg(feature = "gentrans")]
pub use crate::gentrans::{GenTransaction, Part};
#[cfg(feature = "builtin")]
pub use crate::{
builtin::check_types,
compile::{c_bool, c_float, c_int, c_value},
exec::EvalEnv,
expr::ObjRef,
expr::{Block, DataKind, Expr},
run::{CExp, CExpPtr, CompileFunc},
value::Value,
};
#[cfg(not(feature = "builtin"))]
use crate::{
compile::{c_bool, c_int, c_value},
exec::EvalEnv,
expr::{Block, DataKind, Expr},
run::{CExp, CExpPtr, CompileFunc},
value::Value,
};
use crate::{
bytes::ByteStorage,
compact::CompactFile,
expr::*,
page::{Page, PagePtr},
parse::Parser,
run::*,
sortedfile::{Asc, Id, Record, SortedFile},
table::{ColInfo, IndexInfo, Row, SaveOp, Table},
util::{nd, newmap, SmallSet},
value::*,
};
use std::{
any::Any,
cell::{Cell, RefCell},
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
panic,
rc::Rc,
sync::{Arc, Mutex, RwLock},
};
use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
#[cfg(feature = "max")]
#[macro_use]
pub mod util;
#[cfg(not(feature = "max"))]
#[macro_use]
mod util;
#[cfg(feature = "gentrans")]
pub mod gentrans;
pub mod stg;
pub mod pstore;
pub mod atomfile;
#[cfg(feature = "max")]
pub mod builtin;
#[cfg(not(feature = "max"))]
mod builtin;
#[cfg(feature = "max")]
pub mod compact;
#[cfg(not(feature = "max"))]
mod compact;
#[cfg(feature = "builtin")]
pub mod compile;
#[cfg(not(feature = "builtin"))]
mod compile;
#[cfg(feature = "max")]
pub mod exec;
#[cfg(not(feature = "max"))]
mod exec;
#[cfg(feature = "builtin")]
pub mod expr;
#[cfg(not(feature = "builtin"))]
mod expr;
#[cfg(feature = "max")]
pub mod page;
#[cfg(not(feature = "max"))]
mod page;
#[cfg(feature = "max")]
pub mod parse;
#[cfg(not(feature = "max"))]
mod parse;
#[cfg(feature = "max")]
pub mod run;
#[cfg(not(feature = "max"))]
mod run;
#[cfg(feature = "max")]
pub mod sortedfile;
#[cfg(not(feature = "max"))]
mod sortedfile;
#[cfg(feature = "max")]
pub mod sys;
#[cfg(not(feature = "max"))]
mod sys;
#[cfg(feature = "max")]
pub mod table;
#[cfg(not(feature = "max"))]
mod table;
#[cfg(feature = "builtin")]
pub mod value;
#[cfg(not(feature = "builtin"))]
mod value;
#[cfg(feature = "max")]
pub mod cexp;
#[cfg(not(feature = "max"))]
mod cexp;
#[cfg(feature = "max")]
pub mod bytes;
#[cfg(not(feature = "max"))]
mod bytes;
pub type Data = Arc<Vec<u8>>;
pub type DB = Rc<Database>;
pub type BuiltinMap = HashMap<String, (DataKind, CompileFunc)>;
pub struct Database {
pub file: AccessPagedData,
pub builtins: Arc<BuiltinMap>,
pub sys_schema: Rc<Table>,
pub sys_table: Rc<Table>,
pub sys_column: Rc<Table>,
pub sys_index: Rc<Table>,
pub sys_index_col: Rc<Table>,
pub sys_function: Rc<Table>,
pub schemas: RefCell<HashMap<String, i64>>,
pub tables: RefCell<HashMap<ObjRef, Rc<Table>>>,
pub functions: RefCell<HashMap<ObjRef, Rc<Function>>>,
pub lastid: Cell<i64>,
pub err: Cell<bool>,
pub is_new: bool,
bs: Vec<ByteStorage>,
function_reset: Cell<bool>,
page_size_max: usize,
}
const SYS_ROOT_LAST: u64 = 16;
impl Database {
pub fn new(file: AccessPagedData, initsql: &str, builtins: Arc<BuiltinMap>) -> DB {
let is_new = file.is_new();
let mut tb = TableBuilder::new();
let sys_schema = tb.nt("Schema", &[("Name", STRING)]);
let sys_table = tb.nt(
"Table",
&[
("Root", INT),
("Schema", INT),
("Name", STRING),
("IdGen", INT),
],
);
let sys_column = tb.nt("Column", &[("Table", INT), ("Name", STRING), ("Type", INT)]);
let sys_index = tb.nt("Index", &[("Root", INT), ("Table", INT), ("Name", STRING)]);
let sys_index_col = tb.nt("IndexColumn", &[("Index", INT), ("ColId", INT)]);
let sys_function = tb.nt(
"Function",
&[("Schema", INT), ("Name", NAMESTR), ("Def", BIGSTR)],
);
sys_schema.add_index(tb.rt(), vec![0]);
sys_table.add_index(tb.rt(), vec![1, 2]);
sys_column.add_index(tb.rt(), vec![0]);
sys_index.add_index(tb.rt(), vec![1]);
sys_index_col.add_index(tb.rt(), vec![0]);
sys_function.add_index(tb.rt(), vec![0, 1]);
sys_function.add_index(tb.rt(), vec![1]);
let mut bs = Vec::new();
for ft in 0..bytes::NFT {
bs.push(ByteStorage::new(ft as u64, ft));
}
let page_size_max = file.spd.page_size_max();
let db = Rc::new(Database {
file,
sys_schema,
sys_table,
sys_column,
sys_index,
sys_index_col,
sys_function,
bs,
schemas: newmap(),
functions: newmap(),
tables: newmap(),
builtins,
function_reset: Cell::new(false),
lastid: Cell::new(0),
err: Cell::new(false),
is_new,
page_size_max,
});
assert!(tb.alloc as u64 - 1 == SYS_ROOT_LAST);
if is_new {
for _ft in 0..bytes::NFT {
db.alloc_page(); }
}
for t in &tb.list {
if !is_new {
t.id_gen.set(sys::get_id_gen(&db, t.id as u64));
}
db.publish_table(t.clone());
}
if is_new {
let sysinit = "
CREATE SCHEMA sys
GO
CREATE TABLE sys.Schema( Name string )
CREATE TABLE sys.Table( Root int, Schema int, Name string, IdGen int )
CREATE TABLE sys.Column( Table int, Name string, Type int )
CREATE TABLE sys.Index( Root int, Table int, Name string )
CREATE TABLE sys.IndexColumn( Index int, ColId int )
CREATE TABLE sys.Function( Schema int, Name string(31), Def string(249) )
GO
CREATE INDEX ByName ON sys.Schema(Name)
CREATE INDEX BySchemaName ON sys.Table(Schema,Name)
CREATE INDEX ByTable ON sys.Column(Table)
CREATE INDEX ByTable ON sys.Index(Table)
CREATE INDEX ByIndex ON sys.IndexColumn(Index)
CREATE INDEX BySchemaName ON sys.Function(Schema,Name)
CREATE INDEX ByName ON sys.Function(Name)
GO
";
let mut dq = DummyTransaction {};
db.run(sysinit, &mut dq);
db.run(initsql, &mut dq);
db.save();
}
db
}
pub fn run(self: &DB, source: &str, tr: &mut dyn Transaction) {
if let Some(e) = self.go(source, tr) {
let err = format!(
"{} in {} at line {} column {}.",
e.msg, e.rname, e.line, e.column
);
tr.set_error(err);
self.err.set(true);
}
}
fn go(self: &DB, source: &str, tr: &mut dyn Transaction) -> Option<SqlError> {
let mut p = Parser::new(source, self);
let result = std::panic::catch_unwind(panic::AssertUnwindSafe(|| {
p.batch(tr);
}));
if let Err(x) = result {
Some(if let Some(e) = x.downcast_ref::<SqlError>() {
SqlError {
msg: e.msg.clone(),
line: e.line,
column: e.column,
rname: e.rname.clone(),
}
} else if let Some(s) = x.downcast_ref::<&str>() {
p.make_error((*s).to_string())
} else if let Some(s) = x.downcast_ref::<String>() {
p.make_error(s.to_string())
} else {
p.make_error("unrecognised/unexpected error".to_string())
})
} else {
None
}
}
pub fn changed(self: &DB) -> bool {
if self.err.get() {
return false;
}
for bs in &self.bs {
if bs.changed() {
return true;
}
}
let tm = &*self.tables.borrow();
for t in tm.values() {
if t.id_gen_dirty.get() {
return true;
}
if t.file.changed() {
return true;
}
}
false
}
pub fn save(self: &DB) -> usize {
let op = if self.err.get() {
self.err.set(false);
SaveOp::RollBack
} else {
SaveOp::Save
};
for bs in &self.bs {
bs.save(self, op);
}
let tm = &*self.tables.borrow();
for t in tm.values() {
if t.id_gen_dirty.get() {
if op == SaveOp::Save {
sys::save_id_gen(self, t.id as u64, t.id_gen.get());
} else {
t.id_gen.set(sys::get_id_gen(self, t.id as u64));
}
t.id_gen_dirty.set(false);
}
}
for t in tm.values() {
t.save(self, op);
}
if self.function_reset.get() {
for function in self.functions.borrow().values() {
function.ilist.borrow_mut().clear();
}
self.functions.borrow_mut().clear();
self.function_reset.set(false);
}
self.file.save(op)
}
#[cfg(not(feature = "max"))]
fn get_table(self: &DB, name: &ObjRef) -> Option<Rc<Table>> {
if let Some(t) = self.tables.borrow().get(name) {
return Some(t.clone());
}
sys::get_table(self, name)
}
#[cfg(feature = "max")]
pub fn get_table(self: &DB, name: &ObjRef) -> Option<Rc<Table>> {
if let Some(t) = self.tables.borrow().get(name) {
return Some(t.clone());
}
sys::get_table(self, name)
}
#[cfg(feature = "max")]
pub fn table(self: &DB, schema: &str, name: &str) -> Rc<Table> {
self.get_table(&ObjRef::new(schema, name)).unwrap()
}
fn get_function(self: &DB, name: &ObjRef) -> Option<Rc<Function>> {
if let Some(f) = self.functions.borrow().get(name) {
return Some(f.clone());
}
sys::get_function(self, name)
}
fn publish_table(&self, table: Rc<Table>) {
let name = table.info.name.clone();
self.tables.borrow_mut().insert(name, table);
}
fn encode(self: &DB, val: &Value, size: usize) -> Code {
let bytes = match val {
Value::RcBinary(x) => &**x,
Value::ArcBinary(x) => &**x,
Value::String(x) => x.as_bytes(),
_ => {
return Code {
id: u64::MAX,
ft: 0,
};
}
};
if bytes.len() < size {
return Code {
id: u64::MAX,
ft: 0,
};
}
let tbe = &bytes[size - 9..];
let ft = bytes::fragment_type(tbe.len());
let id = self.bs[ft].encode(self, &bytes[size - 9..]);
Code { id, ft }
}
fn decode(self: &DB, code: Code, inline: usize) -> Vec<u8> {
self.bs[code.ft].decode(self, code.id, inline)
}
fn delcode(self: &DB, code: Code) {
if code.id != u64::MAX {
self.bs[code.ft].delcode(self, code.id);
}
}
fn alloc_page(self: &DB) -> u64 {
self.file.alloc_page()
}
fn free_page(self: &DB, lpnum: u64) {
self.file.free_page(lpnum);
}
#[cfg(feature = "pack")]
fn lp_size(&self, pnum: u64) -> u64 {
self.file.spd.file.read().unwrap().lp_size(pnum) as u64
}
#[cfg(feature = "pack")]
fn repack_file(self: &DB, k: i64, schema: &str, tname: &str) -> i64 {
if k >= 0 {
let name = ObjRef::new(schema, tname);
if let Some(t) = self.get_table(&name) {
return t.repack(self, k as usize);
}
} else {
let k = (-k - 1) as usize;
if k < 4 {
return self.bs[k].repack_file(self);
}
}
-1
}
#[cfg(feature = "verify")]
pub fn verify(self: &DB) -> String {
let (mut pages, total) = self.file.spd.file.read().unwrap().get_info();
let total = total as usize;
let free = pages.len();
for bs in &self.bs {
bs.file.get_used(self, &mut pages);
}
let tm = &*self.tables.borrow();
for t in tm.values() {
t.get_used(self, &mut pages);
}
assert!(pages.len() == total);
format!(
"All Ok. Logical page summary: free={} used={} total={}.",
free,
total - free,
total
)
}
}
impl Drop for Database {
fn drop(&mut self) {
for function in self.functions.borrow().values() {
function.ilist.borrow_mut().clear();
}
}
}
struct TableBuilder {
alloc: usize,
list: Vec<Rc<Table>>,
}
impl TableBuilder {
fn new() -> Self {
Self {
alloc: bytes::NFT,
list: Vec::new(),
}
}
fn nt(&mut self, name: &str, ct: &[(&str, DataType)]) -> Rc<Table> {
let root = self.rt();
let id = 1 + (root - bytes::NFT as u64);
let name = ObjRef::new("sys", name);
let info = ColInfo::new(name, ct);
let table = Table::new(id as i64, root, 1, Rc::new(info));
self.list.push(table.clone());
table
}
fn rt(&mut self) -> u64 {
let result = self.alloc;
self.alloc += 1;
result as u64
}
}
pub trait Transaction: Any {
fn status_code(&mut self, _code: i64) {}
fn header(&mut self, _name: &str, _value: &str) {}
fn selected(&mut self, values: &[Value]);
fn global(&self, _kind: i64) -> i64 {
0
}
fn arg(&mut self, _kind: i64, _name: &str) -> Rc<String> {
Rc::new(String::new())
}
fn file_attr(&mut self, _fnum: i64, _atx: i64) -> Rc<String> {
Rc::new(String::new())
}
fn file_content(&mut self, _fnum: i64) -> Arc<Vec<u8>> {
nd()
}
fn set_error(&mut self, err: String);
fn get_error(&mut self) -> String {
String::new()
}
fn set_extension(&mut self, _ext: Box<dyn Any + Send + Sync>) {}
fn get_extension(&mut self) -> Box<dyn Any + Send + Sync> {
Box::new(())
}
}
struct DummyTransaction {}
impl Transaction for DummyTransaction {
fn selected(&mut self, _values: &[Value]) {}
fn set_error(&mut self, err: String) {
println!("Error: {}", err);
}
}
#[test]
pub fn concurrency() {
let file = Box::new(MemFile::default());
let upd = Box::new(MemFile::default());
let stg = Box::new(AtomicFile::new(file, upd));
let mut bmap = BuiltinMap::default();
standard_builtins(&mut bmap);
let bmap = Arc::new(bmap);
let spd = Arc::new(SharedPagedData::new(stg));
let wapd = AccessPagedData::new_writer(spd.clone());
let db = Database::new(wapd, "CREATE SCHEMA test", bmap.clone());
let nt = 100;
for i in 0..nt {
let mut tr = GenTransaction::default();
let sql = format!(
"CREATE TABLE test.[T{}](N int) GO INSERT INTO test.[T{}](N) VALUES (0)",
i, i
);
db.run(&sql, &mut tr);
assert!(db.save() > 0);
}
let mut rapd = Vec::new();
for i in 0..1000 {
rapd.push((i, AccessPagedData::new_reader(spd.clone())));
let mut tr = GenTransaction::default();
let table = i % nt;
let sql = format!("UPDATE test.[T{}] SET N = N + 1 WHERE 1=1", table);
db.run(&sql, &mut tr);
assert!(db.save() > 0);
}
use rand::Rng;
let mut rng = rand::thread_rng();
while !rapd.is_empty() {
let r = rng.gen::<usize>() % rapd.len();
let (i, rapd) = rapd.remove(r);
let db = Database::new(rapd, "", bmap.clone());
let mut tr = GenTransaction::default();
let table = rng.gen::<usize>() % nt;
let sql = format!("SELECT N FROM test.[T{}]", table);
db.run(&sql, &mut tr);
let expect = i / nt + if i % nt > table { 1 } else { 0 };
assert!(tr.rp.output == format!("{}", expect).as_bytes());
}
}