use crate::io;
use crate::Config;
use std::fs::Metadata;
use gzb_binary_69::{Reader};
use tokio::fs::File;
use tokio::spawn as TokioSpawn;
use flume::Sender as FlumeSender;
use crate::workers::{u64_from_bytes};
use crate::workers::{Signal,Pointer};
use std::sync::Arc;
use crate::config::{
MapConfig,MapMessage,
LocatorMessage,LocatorNext,LocatorAdd,LocatorRemove,LocatorReset
};
use flume::unbounded;
use std::collections::HashMap;
use futures::future::join_all;
use crate::response::QueResponse;
#[derive(Debug,Clone)]
pub struct Que{
locator_sender:Arc<FlumeSender<LocatorMessage>>
}
const MB_1:u64 = 1_000_000;
impl Que{
pub async fn new(c:Config)->Result<Que,&'static str>{
let (locator_sender,locator_receiver) = unbounded();
let mut locator:HashMap<u64,u8> = HashMap::new();
let mut all_items = Vec::new();
let mut map_index:u8 = 0;
let mut collect_map_initiaters = Vec::new();
for path in c.files.iter(){
map_index += 1;
collect_map_initiaters.push(init_map(
map_index,
path.clone(),
locator_sender.clone(),
c.num_of_writers.clone(),
c.min_que_size.clone(),
c.expand_size.clone()
));
}
let mut map_senders = HashMap::new();
for result in join_all(collect_map_initiaters).await{
match result{
Ok((index,mut items,map_sender))=>{
for item in items.iter(){
locator.insert(item.clone(),index);
}
all_items.append(&mut items);
map_senders.insert(index,map_sender);
},
Err(_e)=>{
return Err(_e);
}
}
}
all_items.sort();
TokioSpawn(async move{
crate::locator::init(
&mut map_senders,
&mut all_items,
&mut locator,
locator_receiver,
map_index
).await;
});
return Ok(Que{
locator_sender:Arc::new(locator_sender)
});
}
pub async fn add(&mut self,value:Vec<u8>)->Result<QueResponse,()>{
let (signal,sleeper) = Signal::new();
match self.locator_sender.send_async(
LocatorMessage::Add(LocatorAdd{
value:value,
signal:signal.clone(),
})
).await{
Ok(_)=>{
return Ok(QueResponse::new(signal,sleeper));
},
Err(_)=>{
return Err(());
}
}
}
pub async fn next(&mut self)->Result<QueResponse,()>{
let (signal,sleeper) = Signal::new();
match self.locator_sender.send_async(
LocatorMessage::Next(LocatorNext{
signal:signal.clone(),
})
).await{
Ok(_)=>{
return Ok(QueResponse::new(signal,sleeper));
},
Err(_)=>{
return Err(());
}
}
}
pub async fn remove(&mut self,pointer:Pointer)->Result<QueResponse,()>{
let (signal,sleeper) = Signal::new();
match self.locator_sender.send_async(
LocatorMessage::Remove(LocatorRemove{
pointer:pointer,
signal:signal.clone(),
})
).await{
Ok(_)=>{
return Ok(QueResponse::new(signal,sleeper));
},
Err(_)=>{
return Err(());
}
}
}
pub async fn reset(&mut self,pointer:Pointer)->Result<QueResponse,()>{
let (signal,sleeper) = Signal::new();
match self.locator_sender.send_async(
LocatorMessage::Reset(LocatorReset{
pointer:pointer,
signal:signal.clone(),
})
).await{
Ok(_)=>{
return Ok(QueResponse::new(signal,sleeper));
},
Err(_)=>{
return Err(());
}
}
}
}
async fn init_map(
index:u8,
path:String,
locator_sender:FlumeSender<LocatorMessage>,
num_of_writers:u8,
min_que_size:u64,
expand_size:u64
)->Result<(u8,Vec<u64>,FlumeSender<MapMessage>),&'static str>{
let metadata:Metadata;
let mut file:File;
match io::init_map(path.clone(),min_que_size).await{
Ok(v)=>{
file = v.0;
metadata = v.1;
},
Err(_e)=>{
println!("!!! failed-init-map-que : {:?}",_e);
return Err("failed-init-map");
}
}
let reader:Reader;
let items:Vec<u64>;
match build_map(metadata,&mut file).await{
Ok(r)=>{
reader = r.0;
items = r.1;
},
Err(_e)=>{
println!("!!! failed-build-map : {:?}",_e);
return Err("failed-build-map");
}
}
let (build_map_config,map_sender) = MapConfig::new(
index.clone(),
path,
reader,
locator_sender,
num_of_writers,
min_que_size,
expand_size
);
TokioSpawn(async move{
crate::map::init(build_map_config).await;
});
return Ok((index,items,map_sender));
}
async fn build_map(metadata:Metadata,file:&mut File)->Result<(Reader,Vec<u64>),&'static str>{
let mut len:u64 = metadata.len(); let chunk_size:u64;
if len > (MB_1 * 100){
chunk_size = 100 * MB_1; } else if len > (MB_1 * 50){
chunk_size = 50 * MB_1; } else if len > (MB_1 * 25){
chunk_size = 25 * MB_1; } else {
chunk_size = len; }
let mut reader:Reader = Reader::with_capacity(1000000, 1000000);
let mut index = 0;
let mut read_buffer:Vec<u8> = Vec::with_capacity(chunk_size as usize);
loop{
if len == 0{
break;
}
let start_at = index*chunk_size;
let read_len:u64;
if len < chunk_size{read_len = len;} else {read_len = chunk_size;}
match io::read_chunk(file, &mut read_buffer, start_at, read_len).await{
Ok(_)=>{
reader.map(&mut read_buffer);
},
Err(_)=>{
return Err("failed-read_chunk");
}
}
if len > chunk_size{
len -= chunk_size;
} else {
len = 0;
}
index += 1;
}
match reader.end(){
Ok(_)=>{},
Err(_)=>{}
}
let mut collect:Vec<u64> = Vec::with_capacity(reader.pointers.len());
for key in reader.pointers.keys(){
match u64_from_bytes(&key){
Ok(num)=>{
collect.push(num);
},
Err(_)=>{}
}
}
collect.sort();
return Ok((reader,collect));
}