gol_client/persistence/
batch_deserializer_local.rs

1use super::HISTORY_EXTENSION;
2use super::{
3    adjacent_index_prediction::AdjacentIndexPrediction,
4    preload_cache::{PreloadCache, PreloadCacheDelegate},
5};
6use bincode;
7use flate2::read::GzDecoder;
8use rayon::prelude::*;
9use serde::de::DeserializeOwned;
10use std::collections::HashSet;
11use std::fs::{self, read_dir, File};
12use std::io::Read;
13use std::path::Path;
14use std::sync::Arc;
15
16struct DirFileDelegate {
17    path: String,
18    idx_ranges: Vec<usize>,
19}
20
21impl DirFileDelegate {
22    pub fn new(dir_path: &String) -> Self {
23        let expanded = shellexpand::full(dir_path).unwrap();
24        let path = Path::new(expanded.as_ref());
25        let idx_ranges = match construct_ranges(&path) {
26            Ok(val) => val,
27            Err(err) => panic!("{}", err),
28        };
29
30        Self {
31            path: String::from(path.to_str().unwrap()),
32            idx_ranges,
33        }
34    }
35}
36
37impl<T, U> PreloadCacheDelegate<usize, (Arc<Option<T>>, Vec<Arc<(usize, U)>>)> for DirFileDelegate
38where
39    T: Send + Sync + DeserializeOwned,
40    U: Send + Sync + DeserializeOwned,
41{
42    fn get(&self, key: &usize) -> Option<(Arc<Option<T>>, Vec<Arc<(usize, U)>>)> {
43        if key >= &(self.idx_ranges.len() - 1) {
44            return None;
45        }
46        let (start, end) = (self.idx_ranges[*key], self.idx_ranges[key + 1]);
47        let file_name = format!("{}_{}.{}", start, end, HISTORY_EXTENSION);
48        let file_path = Path::new(&self.path).join(&file_name);
49
50        let mut file = File::open(&file_path).expect("File not found.");
51        let metadata = fs::metadata(&file_path).expect("Cannot read file metadata.");
52        let mut buffer = vec![0; metadata.len() as usize];
53        file.read(&mut buffer).expect("Cannot read file.");
54        let mut decoder = GzDecoder::new(&buffer[..]);
55
56        // Uncompressed data should be larger, but good starting size.
57        let mut byte_data = Vec::with_capacity(metadata.len() as usize);
58        decoder.read_to_end(&mut byte_data).unwrap();
59
60        let res: (Option<T>, Vec<(usize, U)>) =
61            bincode::deserialize(&byte_data[..]).expect("Cannot deserialize data.");
62        let res_arc: Vec<Arc<(usize, U)>> = res
63            .1
64            .into_par_iter()
65            .map(|ele| Arc::new((ele.0, ele.1)))
66            .collect();
67
68        Some((Arc::new(res.0), res_arc))
69    }
70}
71
72pub struct BatchDeserializerLocal<T, U> {
73    cache: PreloadCache<usize, (Arc<Option<T>>, Vec<Arc<(usize, U)>>)>,
74    idx_ranges: Vec<usize>,
75}
76
77impl<T, U> BatchDeserializerLocal<T, U> {
78    pub fn new(path: &String) -> Self
79    where
80        T: Send + Sync + DeserializeOwned,
81        U: Send + Sync + DeserializeOwned,
82    {
83        let file_get_delegate = DirFileDelegate::new(path);
84        let predictor = AdjacentIndexPrediction::new()
85            .with_history_size(10)
86            .with_backward_size(1)
87            .with_forward_size(5);
88        let cache = PreloadCache::new(Box::new(predictor), Box::new(file_get_delegate));
89
90        let expanded = shellexpand::full(path).unwrap();
91        let path = Path::new(expanded.as_ref());
92        let idx_ranges = match construct_ranges(&path) {
93            Ok(val) => val,
94            Err(err) => panic!("{}", err),
95        };
96
97        Self { cache, idx_ranges }
98    }
99
100    pub fn get(&self, idx: usize) -> Option<(Arc<Option<T>>, Arc<(usize, U)>)>
101    where
102        T: 'static + Send + Sync + DeserializeOwned,
103        U: 'static + Send + Sync + DeserializeOwned,
104    {
105        let file_idx = Self::find_range_left_idx(&idx, &self.idx_ranges);
106        match file_idx {
107            Some(file_idx) => {
108                let start = self.idx_ranges[file_idx];
109                let inner_idx = idx - start;
110                let file_res = self.cache.get(&file_idx);
111                match file_res {
112                    Some(file_res) => {
113                        Some((Arc::clone(&file_res.0), Arc::clone(&file_res.1[inner_idx])))
114                    }
115                    None => None,
116                }
117            }
118            None => None,
119        }
120    }
121}
122
123impl<T, U> BatchDeserializerLocal<T, U> {
124    fn find_range_left_idx(idx: &usize, ranges: &Vec<usize>) -> Option<usize> {
125        match ranges.binary_search(idx) {
126            Ok(val) => {
127                if val >= ranges.len() - 1 {
128                    None
129                } else {
130                    Some(val)
131                }
132            }
133            Err(val) => {
134                if val > 0 && val < ranges.len() - 1 {
135                    Some(val - 1)
136                } else {
137                    None
138                }
139            }
140        }
141    }
142}
143
144fn construct_ranges(path: &Path) -> Result<Vec<usize>, &'static str> {
145    if !path.is_dir() {
146        return Err("Path specified for deserialization is not a directory.");
147    }
148    let mut res_set = HashSet::new();
149    for ele in read_dir(path).unwrap() {
150        let entry = ele.unwrap();
151        let cur = entry.path();
152        if !cur.is_dir() && cur.extension().unwrap() == HISTORY_EXTENSION {
153            let file_name = cur.file_stem().unwrap().to_str().unwrap();
154            let split: Vec<&str> = file_name.split("_").collect();
155            let start: usize = split[0].parse().expect("Expected integer in file name.");
156            let end: usize = split[1].parse().expect("Expected integer in file name.");
157            res_set.insert(start);
158            res_set.insert(end);
159        }
160    }
161    let mut res_vec: Vec<usize> = res_set.into_iter().collect();
162    res_vec.sort();
163    Ok(res_vec)
164}