use random_access_storage::RandomAccess;
use failure::{Error,format_err,bail};
use std::cell::RefCell;
use std::rc::Rc;
use std::mem::size_of;
use crate::{Point,Value,Location};
use crate::branch::{Branch,Node};
use crate::data::{DataStore,DataMerge,DataBatch};
use crate::read_block::read_block;
pub struct TreeIterator<'b,S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
tree: Rc<RefCell<Tree<S,P,V>>>,
bbox: &'b P::Bounds,
cursors: Vec<(u64,usize)>,
blocks: Vec<u64>,
queue: Vec<(P,V,Location)>,
tree_size: u64
}
impl<'b,S,P,V> TreeIterator<'b,S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
pub fn new (tree: Rc<RefCell<Tree<S,P,V>>>, bbox: &'b P::Bounds)
-> Result<Self,Error> {
let tree_size = tree.try_borrow()?.store.len()? as u64;
Ok(Self {
tree,
tree_size,
bbox,
cursors: vec![(0,0)],
blocks: vec![],
queue: vec![]
})
}
}
#[doc(hidden)]
#[macro_export]
macro_rules! iwrap {
($x:expr) => {
match $x {
Err(e) => { return Some(Err(Error::from(e))) },
Ok(b) => { b }
}
};
}
impl<'b,S,P,V> Iterator for TreeIterator<'b,S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
type Item = Result<(P,V,Location),Error>;
fn next (&mut self) -> Option<Self::Item> {
let bf = iwrap![self.tree.try_borrow()].branch_factor;
while !self.cursors.is_empty() || !self.blocks.is_empty()
|| !self.queue.is_empty() {
if !self.queue.is_empty() {
return Some(Ok(self.queue.pop().unwrap()));
}
if !self.blocks.is_empty() { let offset = self.blocks.pop().unwrap();
let tree = iwrap![self.tree.try_borrow()];
let mut dstore = iwrap![tree.data_store.try_borrow_mut()];
self.queue.extend(iwrap![dstore.query(offset, self.bbox)]);
continue
}
let (cursor,depth) = self.cursors.pop().unwrap();
if cursor >= self.tree_size { continue }
let buf = {
let mut tree = iwrap![self.tree.try_borrow_mut()];
iwrap![read_block(&mut tree.store, cursor, self.tree_size, 1024)]
};
let (cursors,blocks) = iwrap![
P::query_branch(&buf, &self.bbox, bf, depth)
];
self.blocks.extend(blocks);
self.cursors.extend(cursors);
}
None
}
}
pub struct TreeOpts<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
pub store: S,
pub data_store: Rc<RefCell<DataStore<S,P,V>>>,
pub branch_factor: usize,
pub max_data_size: usize,
pub index: usize,
}
pub struct Tree<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
pub store: S,
data_store: Rc<RefCell<DataStore<S,P,V>>>,
data_merge: Rc<RefCell<DataMerge<S,P,V>>>,
branch_factor: usize,
pub bytes: u64,
pub index: usize,
max_data_size: usize,
}
impl<S,P,V> Tree<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
pub fn open (opts: TreeOpts<S,P,V>) -> Result<Self,Error> {
let bytes = opts.store.len()? as u64;
let data_merge = Rc::new(RefCell::new(
DataMerge::new(Rc::clone(&opts.data_store))));
Ok(Self {
store: opts.store,
data_store: opts.data_store,
data_merge,
index: opts.index,
bytes,
branch_factor: opts.branch_factor,
max_data_size: opts.max_data_size,
})
}
pub fn clear (&mut self) -> Result<(),Error> {
if self.bytes > 0 {
self.bytes = 0;
self.store.truncate(0)?;
}
self.store.sync_all()?;
Ok(())
}
pub fn is_empty (&mut self) -> Result<bool,Error> {
let r = self.store.is_empty()?;
Ok(r)
}
pub fn build (&mut self, rows: &Vec<(P,V)>) -> Result<(),Error> {
let dstore = Rc::clone(&self.data_store);
self.builder(
Rc::new(rows.iter().map(|row| { (row.clone(),1u64) }).collect()),
dstore
)
}
pub fn build_from_blocks (&mut self, blocks: Vec<(P::Bounds,u64,u64)>)
-> Result<(),Error> {
let inserts: Vec<(P::Range,u64)> = blocks.iter()
.map(|(bbox,offset,_)| { (P::bounds_to_range(*bbox),*offset) })
.collect();
let rows = blocks.iter().enumerate().map(|(i,(_,_,len))| {
(inserts[i],*len)
}).collect();
let dmerge = Rc::clone(&self.data_merge);
self.builder(Rc::new(rows), dmerge)
}
pub fn builder<D,T,U> (&mut self, rows: Rc<Vec<((T,U),u64)>>,
data_store: Rc<RefCell<D>>) -> Result<(),Error>
where D: DataBatch<T,U>, T: Point, U: Value {
self.clear()?;
let bucket = (0..rows.len()).collect();
let b = Branch::<D,T,U>::new(
0,
self.index,
self.max_data_size,
self.branch_factor,
Rc::clone(&data_store),
bucket, rows
)?;
let mut branches = vec![Node::Branch(b)];
match branches[0] {
Node::Branch(ref mut b) => {
let alloc = &mut {|bytes| self.alloc(bytes) };
b.alloc(alloc);
},
_ => panic!["unexpected initial node type"]
};
while !branches.is_empty() {
let mut nbranches = vec![];
for mut branch in branches {
match branch {
Node::Empty => {},
Node::Data(_) => {},
Node::Branch(ref mut b) => {
let (data,nb) = {
let alloc = &mut {|bytes| self.alloc(bytes) };
b.build(alloc)?
};
self.store.write(b.offset, &data)?;
self.bytes = self.bytes.max(b.offset + (data.len() as u64));
nbranches.extend(nb);
}
}
}
branches = nbranches;
}
self.store.sync_all()?;
Ok(())
}
pub fn query<'a,'b> (tree: Rc<RefCell<Self>>, bbox: &'b P::Bounds)
-> Result<TreeIterator<'b,S,P,V>,Error> {
TreeIterator::new(tree, bbox)
}
fn alloc (&mut self, bytes: usize) -> u64 {
let addr = self.bytes;
self.bytes += bytes as u64;
addr
}
pub fn merge (trees: &mut Vec<Rc<RefCell<Self>>>, dst: usize, src: Vec<usize>,
rows: &Vec<(P,V)>) -> Result<(),Error> {
let mut blocks = vec![];
for i in src.iter() {
blocks.extend(trees[*i].try_borrow_mut()?.unbuild()?);
}
{
let tree = trees[dst].try_borrow()?;
let mut dstore = tree.data_store.try_borrow_mut()?;
let m = tree.max_data_size;
let mut srow_len = 0;
for i in 0..(rows.len()+m-1)/m {
let srows = &rows[i*m..((i+1)*m).min(rows.len())];
srow_len += srows.len();
let inserts: Vec<(P,V)> = srows.iter()
.map(|(p,v)| (*p,v.clone())).collect();
let offset = dstore.batch(&inserts.iter().map(|pv| pv).collect())?;
match P::bounds(&inserts.iter().map(|(p,_)| *p).collect()) {
None => bail!["invalid data at offset {}", offset],
Some(bbox) => blocks.push((bbox,offset,inserts.len() as u64))
}
}
ensure_eq!(srow_len, rows.len(), "divided rows incorrectly");
}
trees[dst].try_borrow_mut()?.build_from_blocks(blocks)?;
for i in src.iter() {
trees[*i].try_borrow_mut()?.clear()?
}
Ok(())
}
fn unbuild (&mut self) -> Result<Vec<(P::Bounds,u64,u64)>,Error> {
let mut offsets: Vec<u64> = vec![];
let mut cursors: Vec<(u64,usize)> = vec![(0,0)];
let bf = self.branch_factor;
let n = bf*2-3;
let tree_size = self.store.len()? as u64;
while !cursors.is_empty() {
let (c,depth) = cursors.pop().unwrap();
let buf = read_block(&mut self.store, c, tree_size, 1024)?;
let mut offset = 0;
for _i in 0..n {
offset += P::count_bytes_at(&buf[offset..], depth)?;
}
let d_start = offset;
let i_start = d_start + (n+bf+7)/8;
let b_start = i_start + n*size_of::<u64>();
let b_end = b_start+bf*size_of::<u64>();
ensure_eq!(b_end, buf.len(), "unexpected block length");
for i in 0..n {
let offset = u64::from_be_bytes([
buf[i_start+i*8+0], buf[i_start+i*8+1],
buf[i_start+i*8+2], buf[i_start+i*8+3],
buf[i_start+i*8+4], buf[i_start+i*8+5],
buf[i_start+i*8+6], buf[i_start+i*8+7]
]);
let is_data = ((buf[d_start+i/8]>>(i%8))&1) == 1;
if offset > 0 && is_data {
offsets.push(offset-1);
} else if offset > 0 {
cursors.push((offset-1,depth+1));
}
}
for i in 0..bf {
let offset = u64::from_be_bytes([
buf[b_start+i*8+0], buf[b_start+i*8+1],
buf[b_start+i*8+2], buf[b_start+i*8+3],
buf[b_start+i*8+4], buf[b_start+i*8+5],
buf[b_start+i*8+6], buf[b_start+i*8+7]
]);
let j = i + n;
let is_data = ((buf[d_start+(j/8)]>>(j%8))&1) == 1;
if offset > 0 && is_data {
offsets.push(offset-1);
} else if offset > 0 {
cursors.push((offset-1,depth+1));
}
}
}
let mut blocks = Vec::with_capacity(offsets.len());
let mut dstore = self.data_store.try_borrow_mut()?;
for offset in offsets {
match dstore.bbox(offset)? {
Some((bbox,len)) => blocks.push((bbox,offset,len)),
None => {},
}
}
Ok(blocks)
}
}