1use crate::io;
2use crate::Config;
3use std::fs::Metadata;
6use gzb_binary_69::{Reader};
7use tokio::fs::File;
8use tokio::spawn as TokioSpawn;
9use flume::Sender as FlumeSender;
10use crate::workers::{u64_from_bytes};
12use crate::workers::{Signal,Pointer};
13use std::sync::Arc;
15use crate::config::{
16 MapConfig,MapMessage,
17 LocatorMessage,LocatorNext,LocatorAdd,LocatorRemove,LocatorReset
19 };
21use flume::unbounded;
22use std::collections::HashMap;
23
24use futures::future::join_all;
25use crate::response::QueResponse;
26
27#[derive(Debug,Clone)]
30pub struct Que{
31 locator_sender:Arc<FlumeSender<LocatorMessage>>
32}
33
34const MB_1:u64 = 1_000_000;
35
36impl Que{
37 pub async fn new(c:Config)->Result<Que,&'static str>{
38
39 let (locator_sender,locator_receiver) = unbounded();
42 let mut locator:HashMap<u64,u8> = HashMap::new();
43 let mut all_items = Vec::new();
44
45 let mut map_index:u8 = 0;
46 let mut collect_map_initiaters = Vec::new();
47 for path in c.files.iter(){
48 map_index += 1;
49 collect_map_initiaters.push(init_map(
50 map_index,
51 path.clone(),
52 locator_sender.clone(),
53 c.num_of_writers.clone(),
54 c.min_que_size.clone(),
55 c.expand_size.clone()
56 ));
57 }
58
59 let mut map_senders = HashMap::new();
60 for result in join_all(collect_map_initiaters).await{
61 match result{
62 Ok((index,mut items,map_sender))=>{
63 for item in items.iter(){
64 locator.insert(item.clone(),index);
65 }
66 all_items.append(&mut items);
67 map_senders.insert(index,map_sender);
68 },
69 Err(_e)=>{
70 return Err(_e);
71 }
72 }
73 }
74
75 all_items.sort();
76 TokioSpawn(async move{
78 crate::locator::init(
79 &mut map_senders,
80 &mut all_items,
81 &mut locator,
82 locator_receiver,
83 map_index
84 ).await;
85 });
86
87 return Ok(Que{
88 locator_sender:Arc::new(locator_sender)
89 });
90
91 }
92 pub async fn add(&mut self,value:Vec<u8>)->Result<QueResponse,()>{
93
94 let (signal,sleeper) = Signal::new();
95 match self.locator_sender.send_async(
96 LocatorMessage::Add(LocatorAdd{
97 value:value,
98 signal:signal.clone(),
99 })
100 ).await{
101 Ok(_)=>{
102 return Ok(QueResponse::new(signal,sleeper));
103 },
104 Err(_)=>{
105 return Err(());
106 }
107 }
108
109 }
110 pub async fn next(&mut self)->Result<QueResponse,()>{
111
112 let (signal,sleeper) = Signal::new();
113 match self.locator_sender.send_async(
114 LocatorMessage::Next(LocatorNext{
115 signal:signal.clone(),
116 })
117 ).await{
118 Ok(_)=>{
119 return Ok(QueResponse::new(signal,sleeper));
120 },
121 Err(_)=>{
122 return Err(());
123 }
124 }
125
126 }
127 pub async fn remove(&mut self,pointer:Pointer)->Result<QueResponse,()>{
128
129 let (signal,sleeper) = Signal::new();
130 match self.locator_sender.send_async(
131 LocatorMessage::Remove(LocatorRemove{
132 pointer:pointer,
133 signal:signal.clone(),
134 })
135 ).await{
136 Ok(_)=>{
137 return Ok(QueResponse::new(signal,sleeper));
138 },
139 Err(_)=>{
140 return Err(());
141 }
142 }
143
144 }
145 pub async fn reset(&mut self,pointer:Pointer)->Result<QueResponse,()>{
146
147 let (signal,sleeper) = Signal::new();
148 match self.locator_sender.send_async(
149 LocatorMessage::Reset(LocatorReset{
150 pointer:pointer,
151 signal:signal.clone(),
152 })
153 ).await{
154 Ok(_)=>{
155 return Ok(QueResponse::new(signal,sleeper));
156 },
157 Err(_)=>{
158 return Err(());
159 }
160 }
161
162 }
163}
164
165async fn init_map(
166 index:u8,
167 path:String,
168 locator_sender:FlumeSender<LocatorMessage>,
169 num_of_writers:u8,
170 min_que_size:u64,
171 expand_size:u64
172)->Result<(u8,Vec<u64>,FlumeSender<MapMessage>),&'static str>{
173
174 let metadata:Metadata;
176 let mut file:File;
177 match io::init_map(path.clone(),min_que_size).await{
178 Ok(v)=>{
179 file = v.0;
180 metadata = v.1;
181 },
182 Err(_e)=>{
183 println!("!!! failed-init-map-que : {:?}",_e);
185 return Err("failed-init-map");
186 }
187 }
188
189 let reader:Reader;
190 let items:Vec<u64>;
191 match build_map(metadata,&mut file).await{
192 Ok(r)=>{
193 reader = r.0;
194 items = r.1;
195 },
197 Err(_e)=>{
198 println!("!!! failed-build-map : {:?}",_e);
200 return Err("failed-build-map");
201 }
202 }
203
204 let (build_map_config,map_sender) = MapConfig::new(
207 index.clone(),
208 path,
209 reader,
210 locator_sender,
211 num_of_writers,
212 min_que_size,
213 expand_size
214 );
215
216 TokioSpawn(async move{
217 crate::map::init(build_map_config).await;
218 });
219
220 return Ok((index,items,map_sender));
223
224}
225
226async fn build_map(metadata:Metadata,file:&mut File)->Result<(Reader,Vec<u64>),&'static str>{
227
228 let mut len:u64 = metadata.len();let chunk_size:u64; if len > (MB_1 * 100){
236 chunk_size = 100 * MB_1;} else if len > (MB_1 * 50){
238 chunk_size = 50 * MB_1;} else if len > (MB_1 * 25){
240 chunk_size = 25 * MB_1;} else {
242 chunk_size = len;}
244
245 let mut reader:Reader = Reader::with_capacity(1000000, 1000000);
249
250
251 let mut index = 0;
253 let mut read_buffer:Vec<u8> = Vec::with_capacity(chunk_size as usize);
254
255 loop{
256 if len == 0{
257 break;
258 }
259 let start_at = index*chunk_size;
260 let read_len:u64;
261 if len < chunk_size{read_len = len;} else {read_len = chunk_size;}
262 match io::read_chunk(file, &mut read_buffer, start_at, read_len).await{
263 Ok(_)=>{
264 reader.map(&mut read_buffer);
266 },
268 Err(_)=>{
269 return Err("failed-read_chunk");
270 }
271 }
272 if len > chunk_size{
273 len -= chunk_size;
274 } else {
275 len = 0;
276 }
277 index += 1;
278 }
279
280 match reader.end(){
281 Ok(_)=>{},
282 Err(_)=>{}
283 }
284
285 let mut collect:Vec<u64> = Vec::with_capacity(reader.pointers.len());
286 for key in reader.pointers.keys(){
287 match u64_from_bytes(&key){
288 Ok(num)=>{
289 collect.push(num);
290 },
291 Err(_)=>{}
292 }
293 }
294
295 collect.sort();
296 return Ok((reader,collect));
297
298}