use std::sync::Arc;
use std::collections::HashMap;
use tokio::io::{Result as TokioResult};
use tokio::task::JoinSet;
use tokio::fs::{create_dir_all, remove_dir_all, rename};
use tokio::sync::{Mutex, RwLock};
use crate::validate;
use crate::path_concat;
use crate::seq::Seq;
use crate::list::List;
use crate::items::{FeedItem, ColItem};
use crate::datatype::Dataunit;
use crate::dataset::{Dataset, get_dataset_size};
pub struct Conn {
path: String,
feed_list: RwLock<List<FeedItem, String>>,
feed_map: RwLock<HashMap<String, FeedItem>>,
col_list_mapping: RwLock<HashMap<String, List<ColItem, String>>>,
col_map_mapping: RwLock<HashMap<String, HashMap<String, ColItem>>>,
seq_mapping: RwLock<HashMap<String, HashMap<String, Arc<Mutex<Seq>>>>>,
}
impl Conn {
pub async fn new(path: &str) -> TokioResult<Self> {
create_dir_all(path).await?;
let feed_list = List::<FeedItem, String>::new(
Self::_get_feed_list_path(path)
).await?;
let instance = Self {
path: path.to_string(),
feed_list: RwLock::new(feed_list),
feed_map: RwLock::new(HashMap::new()),
col_list_mapping: RwLock::new(HashMap::new()),
col_map_mapping: RwLock::new(HashMap::new()),
seq_mapping: RwLock::new(HashMap::new()),
};
let feed_map = instance.feed_list.write().await.map().await?;
for (feed_name, feed_item) in feed_map.into_iter() {
instance._feed_open(&feed_name, feed_item).await?;
}
Ok(instance)
}
pub fn path(&self) -> String {
self.path.clone()
}
pub async fn feed_list(&self) -> Vec<FeedItem> {
self.feed_map.read().await.values().cloned().collect()
}
pub async fn feed_exists(&self, feed_name: &str) -> bool {
self.feed_map.read().await.contains_key(feed_name)
}
pub async fn feed_add(&self, feed_name: &str) -> TokioResult<()> {
validate!(!self.feed_exists(feed_name).await,
AlreadyExists, feed_name)?;
let feed_item = FeedItem::new(feed_name)?;
let feed_path = path_concat!(self.path.clone(), feed_name);
create_dir_all(feed_path).await?;
self.feed_list.write().await.add(&feed_item).await?;
self._feed_open(feed_name, feed_item).await?;
Ok(())
}
pub async fn feed_remove(&self, feed_name: &str) -> TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
self._feed_close(feed_name).await;
self.feed_list.write().await.remove(&feed_name.to_string()).await?;
let feed_path = path_concat!(self.path.clone(), feed_name);
remove_dir_all(feed_path).await?;
Ok(())
}
pub async fn feed_rename(&self, name: &str, name_new: &str) ->
TokioResult<()> {
validate!(self.feed_exists(name).await, NotFound, name)?;
validate!(!self.feed_exists(name_new).await, AlreadyExists, name_new)?;
let mut feed_item = self._feed_close(name).await;
let res: TokioResult<()> = {
feed_item.rename(name_new)?;
self.feed_list.write().await
.modify(&name.to_string(), &feed_item).await?;
let feed_path = path_concat!(self.path.clone(), name);
let feed_path_new = path_concat!(self.path.clone(), name_new);
rename(feed_path, feed_path_new).await?;
Ok(())
};
self._feed_open(name_new, feed_item).await?;
res?;
Ok(())
}
pub async fn col_list(&self, feed_name: &str) -> TokioResult<Vec<ColItem>> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
Ok(self.col_map_mapping.read().await[feed_name]
.values().cloned().collect())
}
pub async fn col_exists(&self, feed_name: &str,
col_name: &str) -> TokioResult<bool> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
Ok(self.col_map_mapping.read().await[feed_name].contains_key(col_name))
}
pub async fn col_rename(&self, feed_name: &str, name: &str,
name_new: &str) -> TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
validate!(self.col_exists(feed_name, name).await?, NotFound, name)?;
validate!(!self.col_exists(feed_name, name_new).await?,
AlreadyExists, name_new)?;
let mut col_item = self._col_close(feed_name, name).await;
let res: TokioResult<()> = {
col_item.rename(name_new)?;
self.col_list_mapping.write().await.get_mut(feed_name).unwrap()
.modify(&name.to_string(), &col_item).await?;
let seq_path = Self::_get_seq_path(&self.path, feed_name, name);
let seq_path_new = Self::_get_seq_path(&self.path, feed_name,
name_new);
rename(seq_path, seq_path_new.clone()).await?;
Ok(())
};
self._col_open(feed_name, name_new, col_item).await?;
res?;
Ok(())
}
pub async fn col_add(&self, feed_name: &str, col_name: &str,
datatype: &str) -> TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
validate!(!self.col_exists(feed_name, col_name).await?,
AlreadyExists, col_name)?;
let col_item = ColItem::new(col_name, datatype)?;
self.col_list_mapping.write().await.get_mut(feed_name).unwrap()
.add(&col_item).await?;
self._col_open(feed_name, col_name, col_item).await?;
let size = self.feed_map.read().await[feed_name].size;
let seq = &self.seq_mapping.read().await[feed_name][col_name];
seq.lock().await.resize(size).await?;
Ok(())
}
pub async fn col_remove(&self, feed_name: &str, col_name: &str) ->
TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
validate!(self.col_exists(feed_name, col_name).await?,
NotFound, col_name)?;
self._col_close(feed_name, col_name).await;
self.col_list_mapping.write().await.get_mut(feed_name).unwrap()
.remove(&col_name.to_string()).await?;
let seq_path = Self::_get_seq_path(&self.path, feed_name, col_name);
tokio::fs::remove_file(seq_path).await?;
Ok(())
}
pub async fn size_get(&self, feed_name: &str) -> TokioResult<usize> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
Ok(self.feed_map.read().await[feed_name].size)
}
pub async fn size_set(&self, feed_name: &str, size: usize) ->
TokioResult<usize> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
let mut js = JoinSet::new();
for seq in self.seq_mapping.read().await[feed_name].values() {
let seq_clone = Arc::clone(seq);
js.spawn(async move {
seq_clone.lock().await.resize(size).await
});
}
js.join_all().await;
let mut feed_map = self.feed_map.write().await;
let feed_item = feed_map.get_mut(feed_name).unwrap();
let old_size = feed_item.size;
feed_item.size = size;
self.feed_list.write().await
.modify(&feed_name.to_string(), feed_item).await?;
Ok(old_size)
}
pub async fn data_get(&self, feed_name: &str, ix: usize, size: usize,
cols: &[String]) -> TokioResult<Dataset> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
validate!(ix + size <= self.feed_map.read().await[feed_name].size,
UnexpectedEof, (ix + size).to_string())?;
let mut js = JoinSet::new();
for col_name in cols.iter() {
validate!(self.col_exists(feed_name, col_name).await?,
NotFound, &**col_name)?;
let datatype = self.col_map_mapping.read().await
[feed_name][col_name].datatype.clone();
let seq = &self.seq_mapping.read().await[feed_name][col_name];
let seq_clone = Arc::clone(seq);
let col_name_clone = col_name.clone();
js.spawn(async move {
let mut block = vec![0u8; size * datatype.size()];
seq_clone.lock().await.get(ix, &mut block).await.unwrap();
(block, datatype, col_name_clone)
});
}
let mut ds = HashMap::new();
while let Some(res) = js.join_next().await {
let (block, datatype, col_name) = res?;
let series = block.chunks(datatype.size())
.map(|chunk| datatype.from_bytes(chunk))
.collect::<Vec<Dataunit>>();
ds.insert(col_name, series);
}
Ok(ds)
}
pub async fn data_push(&self, feed_name: &str, ds: &Dataset) ->
TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
let size = get_dataset_size(ds)?;
if size > 0 {
let ix = self.feed_map.read().await[feed_name].size;
self.size_set(feed_name, ix + size).await?;
self.data_patch(feed_name, ix, ds).await?;
}
Ok(())
}
pub async fn data_save(&self, feed_name: &str, ix: usize,
ds: &Dataset) -> TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
let cols = self.col_map_mapping.read().await[feed_name]
.keys().cloned().collect::<Vec<String>>();
self._data_update(feed_name, ix, ds, &cols).await?;
Ok(())
}
pub async fn data_patch(&self, feed_name: &str, ix: usize,
ds: &Dataset) -> TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
let cols = ds.keys().cloned().collect::<Vec<String>>();
self._data_update(feed_name, ix, ds, &cols).await?;
Ok(())
}
pub async fn raw_get(&self, feed_name: &str, col_name: &str, ix: usize,
size: usize) -> TokioResult<Vec<u8>> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
validate!(self.col_exists(feed_name, col_name).await?,
NotFound, col_name)?;
validate!(ix + size <= self.feed_map.read().await[feed_name].size,
UnexpectedEof, (ix + size).to_string())?;
let seq = &self.seq_mapping.read().await[feed_name][col_name];
let col_item = &self.col_map_mapping
.read().await[feed_name][col_name];
let mut block = vec![0u8; size * col_item.datatype.size()];
seq.lock().await.get(ix, &mut block).await?;
Ok(block)
}
pub async fn raw_set(&self, feed_name: &str, col_name: &str, ix: usize,
block: &[u8]) -> TokioResult<()> {
validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;
validate!(self.col_exists(feed_name, col_name).await?,
NotFound, col_name)?;
let seq_arc = &self.seq_mapping.read().await[feed_name][col_name];
let mut seq = seq_arc.lock().await;
let end = ix + block.len() / seq.block_size();
validate!(end <= self.feed_map.read().await[feed_name].size,
UnexpectedEof, end.to_string())?;
seq.update(ix, block).await?;
Ok(())
}
async fn _data_update(&self, feed_name: &str, ix: usize, ds: &Dataset,
cols: &[String]) -> TokioResult<()> {
let size = get_dataset_size(ds)?;
validate!(ix + size <= self.feed_map.read().await[feed_name].size,
UnexpectedEof, (ix + size).to_string())?;
if size > 0 {
let mut js = JoinSet::new();
for col_name in cols.iter() {
if let Some(col_item) = &self.col_map_mapping.read()
.await[feed_name].get(col_name) {
let block = if let Some(series) = ds.get(col_name) {
series.iter().map(
|unit| col_item.datatype.to_bytes(unit).unwrap()
).collect::<Vec<Vec<u8>>>().concat()
} else {
vec![0u8; size * col_item.datatype.size()]
};
let seq = &self.seq_mapping.read()
.await[feed_name][col_name];
let seq_clone = Arc::clone(seq);
js.spawn(async move {
seq_clone.lock().await.update(ix, &block).await
});
}
}
js.join_all().await;
}
Ok(())
}
async fn _feed_open(&self, feed_name: &str, feed_item: FeedItem) ->
TokioResult<()> {
let col_list_path = Self::_get_col_list_path(&self.path, feed_name);
let mut col_list = List::<ColItem, String>::new(col_list_path).await?;
let col_map = col_list.map().await?;
self.col_map_mapping.write().await
.insert(feed_name.to_string(), HashMap::new());
self.seq_mapping.write().await
.insert(feed_name.to_string(), HashMap::new());
for (col_name, col_item) in col_map.into_iter() {
self._col_open(feed_name, &col_name, col_item).await?;
}
self.feed_map.write().await.insert(feed_name.to_string(), feed_item);
self.col_list_mapping.write().await
.insert(feed_name.to_string(), col_list);
Ok(())
}
async fn _feed_close(&self, feed_name: &str) -> FeedItem {
self.seq_mapping.write().await.remove(feed_name);
self.col_list_mapping.write().await.remove(feed_name);
self.col_map_mapping.write().await.remove(feed_name);
self.feed_map.write().await.remove(feed_name).unwrap()
}
async fn _col_open(&self, feed_name: &str, col_name: &str,
col_item: ColItem) -> TokioResult<()> {
let seq_path = Self::_get_seq_path(&self.path, feed_name, col_name);
let seq = Seq::new(seq_path, col_item.datatype.size()).await?;
self.col_map_mapping.write().await.get_mut(feed_name).unwrap()
.insert(col_name.to_string(), col_item);
self.seq_mapping.write().await.get_mut(feed_name).unwrap()
.insert(col_name.to_string(), Arc::new(Mutex::new(seq)));
Ok(())
}
async fn _col_close(&self, feed_name: &str, col_name: &str) -> ColItem {
self.seq_mapping.write().await.get_mut(feed_name).unwrap()
.remove(col_name);
self.col_map_mapping.write().await.get_mut(feed_name).unwrap()
.remove(col_name).unwrap()
}
fn _get_feed_list_path(path: &str) -> String {
path_concat!(path, "feed.list")
}
fn _get_col_list_path(path: &str, feed_name: &str) -> String {
path_concat!(path, feed_name, "col.list")
}
fn _get_seq_path(path: &str, feed_name: &str, col_name: &str) -> String {
path_concat!(path, feed_name, format!("{}.col", col_name))
}
}