gol_client/persistence/
batch_deserializer_local.rs1use super::HISTORY_EXTENSION;
2use super::{
3 adjacent_index_prediction::AdjacentIndexPrediction,
4 preload_cache::{PreloadCache, PreloadCacheDelegate},
5};
6use bincode;
7use flate2::read::GzDecoder;
8use rayon::prelude::*;
9use serde::de::DeserializeOwned;
10use std::collections::HashSet;
11use std::fs::{self, read_dir, File};
12use std::io::Read;
13use std::path::Path;
14use std::sync::Arc;
15
16struct DirFileDelegate {
17 path: String,
18 idx_ranges: Vec<usize>,
19}
20
21impl DirFileDelegate {
22 pub fn new(dir_path: &String) -> Self {
23 let expanded = shellexpand::full(dir_path).unwrap();
24 let path = Path::new(expanded.as_ref());
25 let idx_ranges = match construct_ranges(&path) {
26 Ok(val) => val,
27 Err(err) => panic!("{}", err),
28 };
29
30 Self {
31 path: String::from(path.to_str().unwrap()),
32 idx_ranges,
33 }
34 }
35}
36
37impl<T, U> PreloadCacheDelegate<usize, (Arc<Option<T>>, Vec<Arc<(usize, U)>>)> for DirFileDelegate
38where
39 T: Send + Sync + DeserializeOwned,
40 U: Send + Sync + DeserializeOwned,
41{
42 fn get(&self, key: &usize) -> Option<(Arc<Option<T>>, Vec<Arc<(usize, U)>>)> {
43 if key >= &(self.idx_ranges.len() - 1) {
44 return None;
45 }
46 let (start, end) = (self.idx_ranges[*key], self.idx_ranges[key + 1]);
47 let file_name = format!("{}_{}.{}", start, end, HISTORY_EXTENSION);
48 let file_path = Path::new(&self.path).join(&file_name);
49
50 let mut file = File::open(&file_path).expect("File not found.");
51 let metadata = fs::metadata(&file_path).expect("Cannot read file metadata.");
52 let mut buffer = vec![0; metadata.len() as usize];
53 file.read(&mut buffer).expect("Cannot read file.");
54 let mut decoder = GzDecoder::new(&buffer[..]);
55
56 let mut byte_data = Vec::with_capacity(metadata.len() as usize);
58 decoder.read_to_end(&mut byte_data).unwrap();
59
60 let res: (Option<T>, Vec<(usize, U)>) =
61 bincode::deserialize(&byte_data[..]).expect("Cannot deserialize data.");
62 let res_arc: Vec<Arc<(usize, U)>> = res
63 .1
64 .into_par_iter()
65 .map(|ele| Arc::new((ele.0, ele.1)))
66 .collect();
67
68 Some((Arc::new(res.0), res_arc))
69 }
70}
71
72pub struct BatchDeserializerLocal<T, U> {
73 cache: PreloadCache<usize, (Arc<Option<T>>, Vec<Arc<(usize, U)>>)>,
74 idx_ranges: Vec<usize>,
75}
76
77impl<T, U> BatchDeserializerLocal<T, U> {
78 pub fn new(path: &String) -> Self
79 where
80 T: Send + Sync + DeserializeOwned,
81 U: Send + Sync + DeserializeOwned,
82 {
83 let file_get_delegate = DirFileDelegate::new(path);
84 let predictor = AdjacentIndexPrediction::new()
85 .with_history_size(10)
86 .with_backward_size(1)
87 .with_forward_size(5);
88 let cache = PreloadCache::new(Box::new(predictor), Box::new(file_get_delegate));
89
90 let expanded = shellexpand::full(path).unwrap();
91 let path = Path::new(expanded.as_ref());
92 let idx_ranges = match construct_ranges(&path) {
93 Ok(val) => val,
94 Err(err) => panic!("{}", err),
95 };
96
97 Self { cache, idx_ranges }
98 }
99
100 pub fn get(&self, idx: usize) -> Option<(Arc<Option<T>>, Arc<(usize, U)>)>
101 where
102 T: 'static + Send + Sync + DeserializeOwned,
103 U: 'static + Send + Sync + DeserializeOwned,
104 {
105 let file_idx = Self::find_range_left_idx(&idx, &self.idx_ranges);
106 match file_idx {
107 Some(file_idx) => {
108 let start = self.idx_ranges[file_idx];
109 let inner_idx = idx - start;
110 let file_res = self.cache.get(&file_idx);
111 match file_res {
112 Some(file_res) => {
113 Some((Arc::clone(&file_res.0), Arc::clone(&file_res.1[inner_idx])))
114 }
115 None => None,
116 }
117 }
118 None => None,
119 }
120 }
121}
122
123impl<T, U> BatchDeserializerLocal<T, U> {
124 fn find_range_left_idx(idx: &usize, ranges: &Vec<usize>) -> Option<usize> {
125 match ranges.binary_search(idx) {
126 Ok(val) => {
127 if val >= ranges.len() - 1 {
128 None
129 } else {
130 Some(val)
131 }
132 }
133 Err(val) => {
134 if val > 0 && val < ranges.len() - 1 {
135 Some(val - 1)
136 } else {
137 None
138 }
139 }
140 }
141 }
142}
143
144fn construct_ranges(path: &Path) -> Result<Vec<usize>, &'static str> {
145 if !path.is_dir() {
146 return Err("Path specified for deserialization is not a directory.");
147 }
148 let mut res_set = HashSet::new();
149 for ele in read_dir(path).unwrap() {
150 let entry = ele.unwrap();
151 let cur = entry.path();
152 if !cur.is_dir() && cur.extension().unwrap() == HISTORY_EXTENSION {
153 let file_name = cur.file_stem().unwrap().to_str().unwrap();
154 let split: Vec<&str> = file_name.split("_").collect();
155 let start: usize = split[0].parse().expect("Expected integer in file name.");
156 let end: usize = split[1].parse().expect("Expected integer in file name.");
157 res_set.insert(start);
158 res_set.insert(end);
159 }
160 }
161 let mut res_vec: Vec<usize> = res_set.into_iter().collect();
162 res_vec.sort();
163 Ok(res_vec)
164}