use crate::{Point,Value,Location,read_block::read_block};
use random_access_storage::RandomAccess;
use failure::{Error,ensure,bail};
use std::rc::Rc;
use std::cell::RefCell;
use lru::LruCache;
use std::collections::HashMap;
use desert::{FromBytes,ToBytes,CountBytes};
pub trait DataBatch<P,V> where P: Point, V: Value {
fn batch (&mut self, rows: &Vec<&(P,V)>) -> Result<u64,Error>;
}
pub struct DataMerge<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
data_store: Rc<RefCell<DataStore<S,P,V>>>
}
impl<S,P,V> DataMerge<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
pub fn new (data_store: Rc<RefCell<DataStore<S,P,V>>>) -> Self {
Self { data_store }
}
}
impl<S,P,V> DataBatch<P::Range,u64> for DataMerge<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
fn batch (&mut self, rows: &Vec<&(P::Range,u64)>) -> Result<u64,Error> {
if rows.len() == 1 { Ok(rows[0].1)
} else { let mut dstore = self.data_store.try_borrow_mut()?;
let max = dstore.max_data_size;
let mut combined: Vec<(P,V)> = vec![];
for row in rows {
let pvs: Vec<(P,V)> = dstore.list(row.1)?.iter().map(|c| {
(c.0, c.1.clone())
}).collect();
combined.extend(pvs);
}
ensure![combined.len() <= max, "data size limit exceeded in data merge"];
dstore.batch(&combined.iter().collect())
}
}
}
pub struct DataStore<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
store: S,
range: DataRange<S,P>,
list_cache: LruCache<u64,Vec<(P,V,Location)>>,
pub max_data_size: usize
}
impl<S,P,V> DataBatch<P,V> for DataStore<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
fn batch (&mut self, rows: &Vec<&(P,V)>) -> Result<u64,Error> {
ensure![rows.len() <= self.max_data_size,
"data size limit exceeded in data merge"];
let bitfield_len = (rows.len()+7)/8;
let mut len = 6 + bitfield_len;
for row in rows.iter() {
len += row.count_bytes();
}
let mut data = vec![0u8;len];
let mut offset = 0;
offset += (len as u32).write_bytes(&mut data[offset..])?;
offset += (bitfield_len as u16).write_bytes(&mut data[offset..])?;
for (i,_row) in rows.iter().enumerate() {
data[6+i/8] |= 1<<(i%8);
}
offset += bitfield_len;
for row in rows.iter() {
offset += row.write_bytes(&mut data[offset..])?;
}
let store_offset = self.store.len()?;
self.store.write(store_offset, &data)?;
let bbox = match P::bounds(&rows.iter().map(|(p,_)| *p).collect()) {
None => bail!["failed to calculate bounds"],
Some(bbox) => bbox
};
self.range.write(&(store_offset,P::bounds_to_range(bbox),rows.len() as u64))?;
Ok(store_offset)
}
}
impl<S,P,V> DataStore<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
pub fn open (store: S, range_store: S, max_data_size: usize,
bbox_cache_size: usize, list_cache_size: usize) -> Result<Self,Error> {
Ok(Self {
store,
range: DataRange::new(range_store, bbox_cache_size),
list_cache: LruCache::new(list_cache_size),
max_data_size
})
}
pub fn commit (&mut self) -> Result<(),Error> {
self.store.sync_all()?;
Ok(())
}
pub fn query (&mut self, offset: u64, bbox: &P::Bounds)
-> Result<Vec<(P,V,Location)>,Error> {
let rows = self.list(offset)?;
Ok(rows.iter().filter(|row| {
row.0.overlaps(bbox)
}).map(|row| { row.clone() }).collect())
}
pub fn list (&mut self, offset: u64) -> Result<Vec<(P,V,Location)>,Error> {
match self.list_cache.get(&offset) {
Some(rows) => return Ok(rows.to_vec()),
None => {}
}
let buf = self.read(offset)?;
let rows = self.parse(&buf)?.iter().map(|row| {
(row.0,row.1.clone(),(offset+1,row.2))
}).collect();
self.list_cache.put(offset, rows);
Ok(self.list_cache.peek(&offset).unwrap().to_vec())
}
pub fn parse (&self, buf: &Vec<u8>) -> Result<Vec<(P,V,u32)>,Error> {
let mut results = vec![];
let mut offset = 0;
let bitfield_len = u16::from_be_bytes([buf[0],buf[1]]) as usize;
offset += 2;
let bitfield: &[u8] = &buf[offset..offset+bitfield_len];
offset += bitfield_len;
let mut index = 0;
while offset < buf.len() {
if ((bitfield[index/8]>>(index%8))&1) == 1 {
let (size,pv) = <(P,V)>::from_bytes(&buf[offset..])?;
results.push((pv.0,pv.1,index as u32));
offset += size;
} else {
offset += <(P,V)>::count_from_bytes(&buf[offset..])?;
}
index += 1;
}
Ok(results)
}
pub fn read (&mut self, offset: u64) -> Result<Vec<u8>,Error> {
let len = self.store.len()? as u64;
read_block(&mut self.store, offset, len, 1024)
}
pub fn delete (&mut self, locations: &Vec<Location>) -> Result<(),Error> {
let mut by_block: HashMap<u64,Vec<u32>> = HashMap::new();
for (block,index) in locations {
if *block == 0 { continue } match by_block.get_mut(&(*block-1)) {
Some(indexes) => {
indexes.push(*index);
},
None => {
by_block.insert(*block-1, vec![*index]);
},
}
}
for (block,indexes) in by_block.iter() {
let max_i = match indexes.iter().max() {
Some(i) => *i as u64,
None => bail!["indexes is an empty array"],
};
let len = 7 + max_i/8; ensure![len <= self.store.len()?-block,
"index length past the end of the block"];
let mut header = self.store.read(*block, len)?;
let block_size = u32::from_bytes(&header[0..])?.1 as u64;
let bitfield_len = u16::from_bytes(&header[4..])?.1;
ensure![len <= (bitfield_len as u64) + 6,
"read length {} from index {} past expected bitfield length {} \
for block size {} at offset {}",
len, max_i, bitfield_len, block_size, *block
];
ensure![len <= block_size, "data block is too small"];
for index in indexes.iter() {
let i = *index as usize;
header[6+i/8] &= 0xff - (1<<(i%8));
}
self.store.write(block+6, &header[6..])?;
match self.list_cache.get_mut(block) {
Some(rows) => {
rows.retain(|row| !indexes.contains(&((row.2).1)));
},
None => {},
}
}
Ok(())
}
pub fn bytes (&mut self) -> Result<u64,Error> {
Ok(self.store.len()? as u64)
}
pub fn bbox (&mut self, offset: u64)
-> Result<Option<(P::Bounds,u64)>,Error> {
match self.range.cache.get(&offset) {
None => {},
Some(r) => return Ok(Some(*r))
};
let rows = self.list(offset)?;
if rows.is_empty() {
return Ok(None);
}
let bbox = match P::bounds(&rows.iter().map(|(p,_,_)| *p).collect()) {
None => bail!["invalid data at offset {}", offset],
Some(bbox) => bbox
};
let result = (bbox,rows.len() as u64);
self.range.cache.put(offset, result.clone());
Ok(Some(result))
}
}
pub struct DataRange<S,P>
where S: RandomAccess<Error=Error>, P: Point {
pub store: S,
pub cache: LruCache<u64,(P::Bounds,u64)>
}
impl<S,P> DataRange<S,P>
where S: RandomAccess<Error=Error>, P: Point {
pub fn new (store: S, cache_size: usize) -> Self {
Self {
store,
cache: LruCache::new(cache_size)
}
}
pub fn write (&mut self, b: &(u64,P::Range,u64)) -> Result<(),Error> {
let offset = self.store.len()?;
let data = b.to_bytes()?;
self.store.write(offset, &data)
}
pub fn list (&mut self) -> Result<Vec<(u64,P,u64)>,Error> {
let len = self.store.len()?;
let buf = self.store.read(0, len)?;
let mut offset = 0usize;
let mut results: Vec<(u64,P,u64)> = vec![];
while (offset as u64) < len {
let (size, result) = <(u64,P,u64)>::from_bytes(&buf[offset..])?;
results.push(result);
offset += size;
}
Ok(results)
}
}