1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use crate::workers::{Signal,SignalData,Pointer};
// use crate::workers::{debug_error,debug_message};
// use crate::workers::Debugger;
use tokio::sync::{Mutex};
use crate::config::{DiskConfig,DiskMessage,DiskAddMessage,DiskGetMessage,DiskRemoveMessage};
use std::sync::Arc;
use tokio::fs::{File,OpenOptions};
use crate::io::{expand,write_chunk,read_chunk,remove_chunk};
// use std::time::Instant;
// use crate::workers::debug_error;
// const ERROR:bool = true;
// const DEBUG:bool = false;
pub async fn init(config:DiskConfig){
let mut config = config;
let mut file_builder = OpenOptions::new();
file_builder.write(true)
.read(true)
.create(true);
let mut file:File;
match file_builder.open(&config.path).await{
Ok(f)=>{
file = f;
},
Err(_)=>{
// debug_error("failed-open_file-disk.rs",ERROR);
return;
}
}
loop{
let message:DiskMessage;
match config.receiver.recv_async().await{
Ok(m)=>{
message = m;
},
Err(_)=>{
// debug_error("failed-receive_message-disk.rs",ERROR);
break;
}
}
match message{
DiskMessage::Expand(value)=>{
handle_expand(&mut config,value,&mut file).await;
},
DiskMessage::Add(value)=>{
handle_add(value,&mut file).await;
},
DiskMessage::Get(value)=>{
handle_get(value,&mut file).await;
},
DiskMessage::Remove(value)=>{
handle_remove(value,&mut file).await;
}
}
}
}
async fn handle_remove(message:DiskRemoveMessage,file:&mut File){
match remove_chunk(file, message.boundry.0, message.boundry.1 - message.boundry.0 + 1).await{
Ok(_)=>{
Signal::ok(message.signal).await;
},
Err(_)=>{
Signal::error(message.signal).await;
}
}
}
async fn handle_get(message:DiskGetMessage,file:&mut File){
let len = message.value_boundry.1 - message.value_boundry.0 + 1;
let mut read_buffer:Vec<u8> = Vec::with_capacity(len);
match read_chunk(file, &mut read_buffer, message.value_boundry.0 as u64, len as u64).await{
Ok(_)=>{
Signal::data(message.signal,SignalData::Value((read_buffer,Pointer{
item_index:message.item_index,
map_index:message.map_index
}))).await;
},
Err(_)=>{
Signal::error(message.signal).await;
}
}
}
//(u64,Vec<u8>,Arc<Mutex<Signal>>,Arc<Notify>)
async fn handle_add(message:DiskAddMessage,file:&mut File){
//Debuggerupdate(&message.debugger, "disk add message received").await;
// debug_message("disk add message received",DEBUG);
match write_chunk(file, message.start_at, message.value).await{
Ok(_)=>{
// println!("editing signal");
// debug_message("write_complete-add-disk", DEBUG);
Signal::ok(message.signal).await;
// println!("signal marked");
//Debuggerupdate(&message.debugger, "added to disk").await;
// debug_message("added to disk",DEBUG);
},
Err(_)=>{
// debug_error("failed-add-disk",ERROR);
Signal::error(message.signal).await;
//Debuggererror(&message.debugger, "failed-handle_add-disk.rs").await;
// debug_message("failed add to disk",DEBUG);
// debug_error("failed-handle_add-disk.rs",ERROR);
}
}
//Debuggerupdate(&message.debugger, "handle_add notified").await;
// debug_message("handle_add notified",DEBUG);
}
//(Arc<Mutex<Signal>>,Arc<Notify>),file:&mut File
async fn handle_expand(config:&mut DiskConfig,message:Arc<Mutex<Signal>>,file:&mut File){
match expand(file,&config.frame_size).await{
Ok(_)=>{
Signal::ok(message).await;
},
Err(_)=>{
Signal::error(message).await;
// debug_error("failed-handle_expand-disk.rs",ERROR);
}
}
}