1use crate::parsers::{FileReader, FileWriter, MMapReader, StdReader, Writer};
2use core::marker::PhantomData;
3use s2json::Properties;
4use serde::{Serialize, de::DeserializeOwned};
5use std::{
6 env, format,
7 fs::{self},
8 path::Path,
9 string::String,
10 time::{SystemTime, UNIX_EPOCH},
11 vec,
12 vec::Vec,
13};
14
15use super::{U64, external_sort};
16
17#[derive(Debug, Default)]
19pub struct FileOptions {
20 is_sorted: Option<bool>,
22 max_heap: Option<usize>,
24 thread_count: Option<usize>,
26 tmp_dir: Option<String>,
28}
29
30#[derive(Debug, Clone)]
32pub enum FileState<R: StdReader> {
33 Read(R),
35 Write(FileWriter),
37 Empty,
39}
40
41pub const KEY_STORE_LENGTH: u64 = 20;
43
44pub type S2FileStore<K, V> = S2BaseStore<FileReader, K, V>;
46
47pub type S2MMapStore<K, V> = S2BaseStore<MMapReader, K, V>;
49
50#[derive(Debug, Clone)]
55pub struct S2BaseStore<
56 R: StdReader = FileReader,
57 K: U64 = u64,
58 V: Serialize + DeserializeOwned = Properties,
59> {
60 tmp_dir: String,
61 file_name: String,
62 size: u64,
63 sorted: bool,
64 max_heap: Option<usize>,
65 thread_count: Option<usize>,
66 value_offset: u64,
67 key_file: FileState<R>,
68 value_file: FileState<R>,
69 _phantom: PhantomData<(K, V)>,
70}
71impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Default for S2BaseStore<R, K, V> {
72 fn default() -> Self {
73 S2BaseStore {
74 tmp_dir: String::new(),
75 file_name: String::new(),
76 size: 0,
77 sorted: false,
78 max_heap: None,
79 thread_count: None,
80 value_offset: 0,
81 key_file: FileState::Empty,
82 value_file: FileState::Empty,
83 _phantom: PhantomData,
84 }
85 }
86}
87impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> S2BaseStore<R, K, V> {
88 pub fn new(file_name: Option<&str>, options: Option<FileOptions>) -> Self {
90 let mut file = Self::default();
91 let options = options.unwrap_or_default();
92 file.tmp_dir = options.tmp_dir.clone().unwrap_or_else(|| build_tmp_dir(None));
93 file.file_name = file_name
94 .map(|f| f.into())
95 .unwrap_or_else(|| build_tmp_file_name(file.tmp_dir.clone()));
96 file.sorted = options.is_sorted.unwrap_or(false);
97 file.max_heap = options.max_heap;
98 file.thread_count = options.thread_count;
99 if !file.sorted {
100 file.switch_to_write_state();
101 } else {
102 file.switch_to_read_state();
103 }
104 if let Ok(stat) = fs::metadata(format!("{}.keys", file.file_name)) {
106 file.size = stat.len() / KEY_STORE_LENGTH;
107 }
108
109 file
110 }
111
112 pub fn len(&self) -> u64 {
114 self.size
115 }
116
117 pub fn is_empty(&self) -> bool {
119 self.size == 0
120 }
121
122 pub fn set(&mut self, key: K, value: V) {
124 let key = key.into();
125 self.switch_to_write_state();
126 self.sorted = false;
127 let (FileState::Write(key_file), FileState::Write(value_file)) =
129 (&mut self.key_file, &mut self.value_file)
130 else {
131 panic!("Not in write state");
132 };
133 let vec_key = u64::to_le_bytes(key).to_vec();
135 let value_str = serde_json::to_vec(&value).unwrap();
136 let value_offest = u64::to_le_bytes(self.value_offset).to_vec();
137 let value_length = u32::to_le_bytes(value_str.len() as u32).to_vec();
138 key_file.append(&vec_key);
140 key_file.append(&value_offest);
141 key_file.append(&value_length);
142 value_file.append(&value_str);
143 self.value_offset += value_str.len() as u64;
145 self.size += 1;
146 }
147
148 pub fn has(&mut self, key: K) -> bool {
150 if self.is_empty() {
152 return false;
153 }
154 let key = key.into();
155 self.switch_to_read_state();
157 let lower_index = self.lower_bound(key);
159 if lower_index >= self.size {
160 return false;
161 }
162 let lower_key = self.get_key(lower_index);
163
164 lower_key == key
165 }
166
167 pub fn get(&mut self, key: K, max: Option<usize>) -> Option<(u64, Vec<V>)> {
171 if self.is_empty() {
173 return None;
174 }
175 self.switch_to_read_state();
177 let key = key.into();
179 let mut lower_index = self.lower_bound(key);
180 if lower_index >= self.size {
181 return None;
182 }
183
184 let max = max.unwrap_or(usize::MAX);
186 let mut res = vec![];
187 while res.len() < max && lower_index < self.size {
190 let (search_key, value_index, value_length) = self.get_key_value(lower_index);
191 if search_key != key {
192 break;
193 }
194 let value = self.get_value(value_index, value_length);
195 res.push(serde_json::from_slice(&value).unwrap());
196 lower_index += 1;
197 }
198
199 if res.is_empty() { None } else { Some((lower_index - res.len() as u64, res)) }
200 }
201
202 pub fn get_index(&mut self, index: u64) -> Option<(K, V)> {
204 if index >= self.size {
205 return None;
206 }
207 self.switch_to_read_state();
208 let (search_key, value_index, value_length) = self.get_key_value(index);
209 let value = self.get_value(value_index, value_length);
210 Some((K::from(search_key), serde_json::from_slice(&value).unwrap()))
211 }
212
213 pub fn sort(&mut self) {
215 if self.sorted || self.is_empty() {
216 return;
217 }
218 let inputs: Vec<&str> = vec![&self.file_name];
219 external_sort(
220 &inputs,
221 &self.file_name,
222 self.max_heap,
223 self.thread_count,
224 Some(&self.tmp_dir),
225 );
226 self.sorted = true;
227 }
228
229 pub fn cleanup(&mut self) {
231 fs::remove_file(format!("{}.keys", self.file_name)).unwrap();
232 fs::remove_file(format!("{}.values", self.file_name)).unwrap();
233 self.sorted = false;
234 self.size = 0;
235 self.value_offset = 0;
236 }
237
238 fn switch_to_write_state(&mut self) {
240 match &self.key_file {
241 FileState::Write(_) => {}
242 _ => {
243 self.key_file =
244 FileState::Write(FileWriter::new(format!("{}.keys", self.file_name)).unwrap());
245 }
246 }
247 match &self.value_file {
248 FileState::Write(_) => {}
249 _ => {
250 self.value_file = FileState::Write(
251 FileWriter::new(format!("{}.values", self.file_name)).unwrap(),
252 );
253 }
254 }
255 }
256
257 fn switch_to_read_state(&mut self) {
259 match &self.key_file {
260 FileState::Read(_) => {}
261 _ => {
262 self.key_file =
263 FileState::Read(R::new(format!("{}.keys", self.file_name)).unwrap());
264 }
265 }
266 match &self.value_file {
267 FileState::Read(_) => {}
268 _ => {
269 self.value_file =
270 FileState::Read(R::new(format!("{}.values", self.file_name)).unwrap());
271 }
272 }
273 self.sort();
274 }
275
276 fn lower_bound(&mut self, id: u64) -> u64 {
278 let mut lo: u64 = 0;
280 let mut hi: u64 = self.size;
281 let mut mid: u64;
282
283 while lo < hi {
284 mid = lo + (hi - lo) / 2;
285 let lo_hi = self.get_key(mid);
286 if lo_hi < id {
287 lo = mid + 1;
288 } else {
289 hi = mid;
290 }
291 }
292
293 lo
294 }
295
296 fn get_key(&mut self, index: u64) -> u64 {
298 if let FileState::Read(file) = &mut self.key_file {
299 file.uint64_le(Some(index * KEY_STORE_LENGTH))
300 } else {
301 panic!("Not in read state");
302 }
303 }
304
305 fn get_key_value(&mut self, index: u64) -> (u64, u64, u32) {
307 if let FileState::Read(file) = &mut self.key_file {
308 (
309 file.uint64_le(Some(index * KEY_STORE_LENGTH)),
310 file.uint64_le(Some(index * KEY_STORE_LENGTH + 8)),
311 file.uint32_le(Some(index * KEY_STORE_LENGTH + 16)),
312 )
313 } else {
314 panic!("Not in read state");
315 }
316 }
317
318 fn get_value(&mut self, index: u64, length: u32) -> Vec<u8> {
319 if let FileState::Read(file) = &mut self.value_file {
320 file.slice(Some(index), Some(index + length as u64))
321 } else {
322 panic!("Not in read state");
323 }
324 }
325
326 pub fn iter(&mut self) -> Iter<'_, R, K, V> {
328 Iter { container: self, index: 0 }
329 }
330
331 pub fn iter_multi(&mut self) -> IterMulti<'_, R, K, V> {
333 IterMulti { container: self, index: 0 }
334 }
335}
336#[derive(Debug)]
338pub struct Iter<'a, R: StdReader, K: U64, V: Serialize + DeserializeOwned> {
339 container: &'a mut S2BaseStore<R, K, V>,
340 index: u64,
341}
342impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Iterator for Iter<'_, R, K, V> {
343 type Item = (K, V);
344 fn next(&mut self) -> Option<Self::Item> {
345 let result = self.container.get_index(self.index);
346 self.index += 1;
347 result
348 }
349}
350#[derive(Debug)]
352pub struct IterMulti<'a, R: StdReader, K: U64, V: Serialize + DeserializeOwned> {
353 container: &'a mut S2BaseStore<R, K, V>,
354 index: u64,
355}
356impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Iterator for IterMulti<'_, R, K, V> {
357 type Item = (K, Vec<V>);
358 fn next(&mut self) -> Option<Self::Item> {
359 let first = self.container.get_index(self.index);
360 self.index += 1;
361 if let Some((key, value)) = first {
362 let mut result: (K, Vec<V>) = (key, vec![value]);
363 loop {
364 let next = self.container.get_index(self.index);
365 if let Some((next_key, next_value)) = next {
366 if next_key == key {
367 self.index += 1;
368 result.1.push(next_value);
369 } else {
370 return Some(result);
371 }
372 }
373 }
374 } else {
375 None
376 }
377 }
378}
379
380fn build_tmp_dir(tmp_dir: Option<String>) -> String {
381 tmp_dir.unwrap_or_else(|| {
382 let tmp_dir = env::temp_dir().join("s2_data_store");
383 fs::create_dir_all(&tmp_dir).unwrap();
384 tmp_dir.to_string_lossy().into()
385 })
386}
387
388fn build_tmp_file_name(tmp_dir: String) -> String {
390 let random_name = format!(
391 "tmp_{:?}",
392 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() );
394
395 let file_name = format!("{tmp_dir}/{random_name}");
396
397 if Path::new(&file_name).exists() {
399 fs::remove_file(&file_name).unwrap();
400 }
401
402 file_name
403}