rustque/
que.rs

1use crate::io;
2use crate::Config;
3// use crate::map::init as MapSpawnInit;
4// use crate::disk::init as DiskSpawnInit;
5use std::fs::Metadata;
6use gzb_binary_69::{Reader};
7use tokio::fs::File;
8use tokio::spawn as TokioSpawn;
9use flume::Sender as FlumeSender;
10// use flume::Receiver as FlumeReceiver;
11use crate::workers::{u64_from_bytes};
12use crate::workers::{Signal,Pointer};
13// use tokio::sync::{Notify,Mutex};
14use std::sync::Arc;
15use crate::config::{
16    MapConfig,MapMessage,
17    // MapAddMessage,MapGetMessage,MapRemoveMessage,
18    LocatorMessage,LocatorNext,LocatorAdd,LocatorRemove,LocatorReset
19    // DiskConfig,
20};
21use flume::unbounded;
22use std::collections::HashMap;
23
24use futures::future::join_all;
25use crate::response::QueResponse;
26
27// use tokio::runtime::Builder as TokioRuntimeBuilder;
28
29#[derive(Debug,Clone)]
30pub struct Que{
31    locator_sender:Arc<FlumeSender<LocatorMessage>>
32}
33
34const MB_1:u64 = 1_000_000;
35
36impl Que{
37    pub async fn new(c:Config)->Result<Que,&'static str>{
38
39        // println!("que config : {:?}",c);
40
41        let (locator_sender,locator_receiver) = unbounded();
42        let mut locator:HashMap<u64,u8> = HashMap::new();
43        let mut all_items = Vec::new();
44
45        let mut map_index:u8 = 0;
46        let mut collect_map_initiaters = Vec::new();
47        for path in c.files.iter(){
48            map_index += 1;
49            collect_map_initiaters.push(init_map(
50                map_index,
51                path.clone(),
52                locator_sender.clone(),
53                c.num_of_writers.clone(),
54                c.min_que_size.clone(),
55                c.expand_size.clone()
56            ));
57        }
58
59        let mut map_senders = HashMap::new();
60        for result in join_all(collect_map_initiaters).await{
61            match result{
62                Ok((index,mut items,map_sender))=>{
63                    for item in items.iter(){
64                        locator.insert(item.clone(),index);
65                    }
66                    all_items.append(&mut items);
67                    map_senders.insert(index,map_sender);
68                },
69                Err(_e)=>{
70                    return Err(_e);
71                }
72            }
73        }
74
75        all_items.sort();
76        // println!("{:?}",all_items.len());
77        TokioSpawn(async move{
78            crate::locator::init(
79                &mut map_senders,
80                &mut all_items,
81                &mut locator,
82                locator_receiver,
83                map_index
84            ).await;
85        });
86
87        return Ok(Que{
88            locator_sender:Arc::new(locator_sender)
89        });
90
91    }
92    pub async fn add(&mut self,value:Vec<u8>)->Result<QueResponse,()>{
93
94        let (signal,sleeper) = Signal::new();
95        match self.locator_sender.send_async(
96            LocatorMessage::Add(LocatorAdd{
97                value:value,
98                signal:signal.clone(),
99            })
100        ).await{
101            Ok(_)=>{
102                return Ok(QueResponse::new(signal,sleeper));
103            },
104            Err(_)=>{
105                return Err(());
106            }
107        }
108
109    }
110    pub async fn next(&mut self)->Result<QueResponse,()>{
111
112        let (signal,sleeper) = Signal::new();
113        match self.locator_sender.send_async(
114            LocatorMessage::Next(LocatorNext{
115                signal:signal.clone(),
116            })
117        ).await{
118            Ok(_)=>{
119                return Ok(QueResponse::new(signal,sleeper));
120            },
121            Err(_)=>{
122                return Err(());
123            }
124        }
125
126    }
127    pub async fn remove(&mut self,pointer:Pointer)->Result<QueResponse,()>{
128
129        let (signal,sleeper) = Signal::new();
130        match self.locator_sender.send_async(
131            LocatorMessage::Remove(LocatorRemove{
132                pointer:pointer,
133                signal:signal.clone(),
134            })
135        ).await{
136            Ok(_)=>{
137                return Ok(QueResponse::new(signal,sleeper));
138            },
139            Err(_)=>{
140                return Err(());
141            }
142        }
143
144    }
145    pub async fn reset(&mut self,pointer:Pointer)->Result<QueResponse,()>{
146
147        let (signal,sleeper) = Signal::new();
148        match self.locator_sender.send_async(
149            LocatorMessage::Reset(LocatorReset{
150                pointer:pointer,
151                signal:signal.clone(),
152            })
153        ).await{
154            Ok(_)=>{
155                return Ok(QueResponse::new(signal,sleeper));
156            },
157            Err(_)=>{
158                return Err(());
159            }
160        }
161
162    }
163}
164
165async fn init_map(
166    index:u8,
167    path:String,
168    locator_sender:FlumeSender<LocatorMessage>,
169    num_of_writers:u8,
170    min_que_size:u64,
171    expand_size:u64
172)->Result<(u8,Vec<u64>,FlumeSender<MapMessage>),&'static str>{
173
174    //ensure file 
175    let metadata:Metadata;
176    let mut file:File;
177    match io::init_map(path.clone(),min_que_size).await{
178        Ok(v)=>{
179            file = v.0;
180            metadata = v.1;
181        },
182        Err(_e)=>{
183            //debug_error("failed-init-map-que.rs",ERROR);
184            println!("!!! failed-init-map-que : {:?}",_e);
185            return Err("failed-init-map");
186        }
187    }
188
189    let reader:Reader;
190    let items:Vec<u64>;
191    match build_map(metadata,&mut file).await{
192        Ok(r)=>{
193            reader = r.0;
194            items = r.1;
195            // return Ok((index,r.0,r.1));
196        },
197        Err(_e)=>{
198            //debug_error("failed-build-map-que.rs",ERROR);
199            println!("!!! failed-build-map : {:?}",_e);
200            return Err("failed-build-map");
201        }
202    }
203
204    // println!("{:?}",reader.corrupt);
205
206    let (build_map_config,map_sender) = MapConfig::new(
207        index.clone(),
208        path,
209        reader,
210        locator_sender,
211        num_of_writers,
212        min_que_size,
213        expand_size
214    );
215
216    TokioSpawn(async move{
217        crate::map::init(build_map_config).await;
218    });
219
220    // return Err("no_error");
221
222    return Ok((index,items,map_sender));
223
224}
225
226async fn build_map(metadata:Metadata,file:&mut File)->Result<(Reader,Vec<u64>),&'static str>{
227
228    // println!("building map");
229
230    //meta info
231    let mut len:u64 = metadata.len();//bytes
232    // println!("file len : {:?}",len);
233    let chunk_size:u64; //bytes
234
235    if len > (MB_1 * 100){
236        chunk_size = 100 * MB_1;//println!("100");
237    } else if len > (MB_1 * 50){
238        chunk_size = 50 * MB_1;//println!("50");
239    } else if len > (MB_1 * 25){
240        chunk_size = 25 * MB_1;//println!("25");
241    } else {
242        chunk_size = len;//println!("0");
243    }
244
245    // println!("chunk_size : {:?}",chunk_size);
246
247    //make map
248    let mut reader:Reader = Reader::with_capacity(1000000, 1000000);
249    
250
251    //read chunks
252    let mut index = 0;
253    let mut read_buffer:Vec<u8> = Vec::with_capacity(chunk_size as usize);
254
255    loop{
256        if len == 0{
257            break;
258        }
259        let start_at = index*chunk_size;
260        let read_len:u64;
261        if len < chunk_size{read_len = len;} else {read_len = chunk_size;}
262        match io::read_chunk(file, &mut read_buffer, start_at, read_len).await{
263            Ok(_)=>{
264                // println!("read complete");
265                reader.map(&mut read_buffer);
266                // println!("map complete");
267            },
268            Err(_)=>{
269                return Err("failed-read_chunk");
270            }
271        }
272        if len > chunk_size{
273            len -= chunk_size;
274        } else {
275            len = 0;
276        }
277        index += 1;
278    }
279
280    match reader.end(){
281        Ok(_)=>{},
282        Err(_)=>{}
283    }
284
285    let mut collect:Vec<u64> = Vec::with_capacity(reader.pointers.len());
286    for key in reader.pointers.keys(){
287        match u64_from_bytes(&key){
288            Ok(num)=>{
289                collect.push(num);
290            },
291            Err(_)=>{}
292        }
293    }
294
295    collect.sort();
296    return Ok((reader,collect));
297
298}