use crate::workers::{Signal,SignalData,Pointer};
use tokio::sync::{Mutex};
use crate::config::{DiskConfig,DiskMessage,DiskAddMessage,DiskGetMessage,DiskRemoveMessage};
use std::sync::Arc;
use tokio::fs::{File,OpenOptions};
use crate::io::{expand,write_chunk,read_chunk,remove_chunk};
pub async fn init(config:DiskConfig){
let mut config = config;
let mut file_builder = OpenOptions::new();
file_builder.write(true)
.read(true)
.create(true);
let mut file:File;
match file_builder.open(&config.path).await{
Ok(f)=>{
file = f;
},
Err(_)=>{
return;
}
}
loop{
let message:DiskMessage;
match config.receiver.recv_async().await{
Ok(m)=>{
message = m;
},
Err(_)=>{
break;
}
}
match message{
DiskMessage::Expand(value)=>{
handle_expand(&mut config,value,&mut file).await;
},
DiskMessage::Add(value)=>{
handle_add(value,&mut file).await;
},
DiskMessage::Get(value)=>{
handle_get(value,&mut file).await;
},
DiskMessage::Remove(value)=>{
handle_remove(value,&mut file).await;
}
}
}
}
async fn handle_remove(message:DiskRemoveMessage,file:&mut File){
match remove_chunk(file, message.boundry.0, message.boundry.1 - message.boundry.0 + 1).await{
Ok(_)=>{
Signal::ok(message.signal).await;
},
Err(_)=>{
Signal::error(message.signal).await;
}
}
}
async fn handle_get(message:DiskGetMessage,file:&mut File){
let len = message.value_boundry.1 - message.value_boundry.0 + 1;
let mut read_buffer:Vec<u8> = Vec::with_capacity(len);
match read_chunk(file, &mut read_buffer, message.value_boundry.0 as u64, len as u64).await{
Ok(_)=>{
Signal::data(message.signal,SignalData::Value((read_buffer,Pointer{
item_index:message.item_index,
map_index:message.map_index
}))).await;
},
Err(_)=>{
Signal::error(message.signal).await;
}
}
}
async fn handle_add(message:DiskAddMessage,file:&mut File){
match write_chunk(file, message.start_at, message.value).await{
Ok(_)=>{
Signal::ok(message.signal).await;
},
Err(_)=>{
Signal::error(message.signal).await;
}
}
}
async fn handle_expand(config:&mut DiskConfig,message:Arc<Mutex<Signal>>,file:&mut File){
match expand(file,&config.frame_size).await{
Ok(_)=>{
Signal::ok(message).await;
},
Err(_)=>{
Signal::error(message).await;
}
}
}