gistools/data_store/
file.rs

1use crate::parsers::{FileReader, FileWriter, MMapReader, StdReader, Writer};
2use core::marker::PhantomData;
3use s2json::Properties;
4use serde::{Serialize, de::DeserializeOwned};
5use std::{
6    env, format,
7    fs::{self},
8    path::Path,
9    string::String,
10    time::{SystemTime, UNIX_EPOCH},
11    vec,
12    vec::Vec,
13};
14
15use super::{U64, external_sort};
16
17/// Options to create a S2BaseStore
18#[derive(Debug, Default)]
19pub struct FileOptions {
20    /// If true, then the data is already sorted and get calls can be immediately returned
21    is_sorted: Option<bool>,
22    /// The maximum heap size in bytes for each grouping of data.
23    max_heap: Option<usize>,
24    /// The number of threads to use for sorting
25    thread_count: Option<usize>,
26    /// If desired, a temporary directory to use
27    tmp_dir: Option<String>,
28}
29
30/// The state of the store
31#[derive(Debug, Clone)]
32pub enum FileState<R: StdReader> {
33    /// The store is read-only
34    Read(R),
35    /// The store is write-only
36    Write(FileWriter),
37    /// No data has been written yet
38    Empty,
39}
40
41/// The length of a key: [id: u64, value-offset: u64, value-length: u32]
42pub const KEY_STORE_LENGTH: u64 = 20;
43
44/// An S2 store that uses the FileSystem for both reading and writing
45pub type S2FileStore<K, V> = S2BaseStore<FileReader, K, V>;
46
47/// An S2 store that uses the FileSystem for writing but MMaps the read access
48pub type S2MMapStore<K, V> = S2BaseStore<MMapReader, K, V>;
49
50/// NOTE: The File KVStore is designed to be used in states:
51/// - write-only. The initial state is write-only. Write all you need to before reading
52/// - read-only. Once you have written everything, the first read will lock the file to be static
53///   and read-only.
54#[derive(Debug, Clone)]
55pub struct S2BaseStore<
56    R: StdReader = FileReader,
57    K: U64 = u64,
58    V: Serialize + DeserializeOwned = Properties,
59> {
60    tmp_dir: String,
61    file_name: String,
62    size: u64,
63    sorted: bool,
64    max_heap: Option<usize>,
65    thread_count: Option<usize>,
66    value_offset: u64,
67    key_file: FileState<R>,
68    value_file: FileState<R>,
69    _phantom: PhantomData<(K, V)>,
70}
71impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Default for S2BaseStore<R, K, V> {
72    fn default() -> Self {
73        S2BaseStore {
74            tmp_dir: String::new(),
75            file_name: String::new(),
76            size: 0,
77            sorted: false,
78            max_heap: None,
79            thread_count: None,
80            value_offset: 0,
81            key_file: FileState::Empty,
82            value_file: FileState::Empty,
83            _phantom: PhantomData,
84        }
85    }
86}
87impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> S2BaseStore<R, K, V> {
88    /// Builds a new File based KV
89    pub fn new(file_name: Option<&str>, options: Option<FileOptions>) -> Self {
90        let mut file = Self::default();
91        let options = options.unwrap_or_default();
92        file.tmp_dir = options.tmp_dir.clone().unwrap_or_else(|| build_tmp_dir(None));
93        file.file_name = file_name
94            .map(|f| f.into())
95            .unwrap_or_else(|| build_tmp_file_name(file.tmp_dir.clone()));
96        file.sorted = options.is_sorted.unwrap_or(false);
97        file.max_heap = options.max_heap;
98        file.thread_count = options.thread_count;
99        if !file.sorted {
100            file.switch_to_write_state();
101        } else {
102            file.switch_to_read_state();
103        }
104        // Update the size if the file already existed
105        if let Ok(stat) = fs::metadata(format!("{}.keys", file.file_name)) {
106            file.size = stat.len() / KEY_STORE_LENGTH;
107        }
108
109        file
110    }
111
112    /// Returns the number of entries
113    pub fn len(&self) -> u64 {
114        self.size
115    }
116
117    /// Returns true if the store is empty
118    pub fn is_empty(&self) -> bool {
119        self.size == 0
120    }
121
122    /// Sets the value for the given key
123    pub fn set(&mut self, key: K, value: V) {
124        let key = key.into();
125        self.switch_to_write_state();
126        self.sorted = false;
127        // grab writers
128        let (FileState::Write(key_file), FileState::Write(value_file)) =
129            (&mut self.key_file, &mut self.value_file)
130        else {
131            panic!("Not in write state");
132        };
133        // prepare values
134        let vec_key = u64::to_le_bytes(key).to_vec();
135        let value_str = serde_json::to_vec(&value).unwrap();
136        let value_offest = u64::to_le_bytes(self.value_offset).to_vec();
137        let value_length = u32::to_le_bytes(value_str.len() as u32).to_vec();
138        // write
139        key_file.append(&vec_key);
140        key_file.append(&value_offest);
141        key_file.append(&value_length);
142        value_file.append(&value_str);
143        // update tracker variables
144        self.value_offset += value_str.len() as u64;
145        self.size += 1;
146    }
147
148    /// Checks if the store contains a key
149    pub fn has(&mut self, key: K) -> bool {
150        // if we have no items, early return
151        if self.is_empty() {
152            return false;
153        }
154        let key = key.into();
155        // ensure we are in read state
156        self.switch_to_read_state();
157        // get the lower bound key
158        let lower_index = self.lower_bound(key);
159        if lower_index >= self.size {
160            return false;
161        }
162        let lower_key = self.get_key(lower_index);
163
164        lower_key == key
165    }
166
167    /// Get the values in relation to the given key
168    /// Returns None if the key does not exist, but will return the index of the first match with all values
169    /// of key if the key exists
170    pub fn get(&mut self, key: K, max: Option<usize>) -> Option<(u64, Vec<V>)> {
171        // if we have no items, early return
172        if self.is_empty() {
173            return None;
174        }
175        // ensure we are in read state
176        self.switch_to_read_state();
177        // get the lower bound key
178        let key = key.into();
179        let mut lower_index = self.lower_bound(key);
180        if lower_index >= self.size {
181            return None;
182        }
183
184        // setup the result
185        let max = max.unwrap_or(usize::MAX);
186        let mut res = vec![];
187        // iterate over the values by using the lower bound and moving up until the key changes,
188        // we hit the max, or we hit the end of the file
189        while res.len() < max && lower_index < self.size {
190            let (search_key, value_index, value_length) = self.get_key_value(lower_index);
191            if search_key != key {
192                break;
193            }
194            let value = self.get_value(value_index, value_length);
195            res.push(serde_json::from_slice(&value).unwrap());
196            lower_index += 1;
197        }
198
199        if res.is_empty() { None } else { Some((lower_index - res.len() as u64, res)) }
200    }
201
202    /// Get the value at the given index. Return (key, value)
203    pub fn get_index(&mut self, index: u64) -> Option<(K, V)> {
204        if index >= self.size {
205            return None;
206        }
207        self.switch_to_read_state();
208        let (search_key, value_index, value_length) = self.get_key_value(index);
209        let value = self.get_value(value_index, value_length);
210        Some((K::from(search_key), serde_json::from_slice(&value).unwrap()))
211    }
212
213    /// Sort the data if not sorted
214    pub fn sort(&mut self) {
215        if self.sorted || self.is_empty() {
216            return;
217        }
218        let inputs: Vec<&str> = vec![&self.file_name];
219        external_sort(
220            &inputs,
221            &self.file_name,
222            self.max_heap,
223            self.thread_count,
224            Some(&self.tmp_dir),
225        );
226        self.sorted = true;
227    }
228
229    /// Closes the store
230    pub fn cleanup(&mut self) {
231        fs::remove_file(format!("{}.keys", self.file_name)).unwrap();
232        fs::remove_file(format!("{}.values", self.file_name)).unwrap();
233        self.sorted = false;
234        self.size = 0;
235        self.value_offset = 0;
236    }
237
238    /// Switches to write state if in read.
239    fn switch_to_write_state(&mut self) {
240        match &self.key_file {
241            FileState::Write(_) => {}
242            _ => {
243                self.key_file =
244                    FileState::Write(FileWriter::new(format!("{}.keys", self.file_name)).unwrap());
245            }
246        }
247        match &self.value_file {
248            FileState::Write(_) => {}
249            _ => {
250                self.value_file = FileState::Write(
251                    FileWriter::new(format!("{}.values", self.file_name)).unwrap(),
252                );
253            }
254        }
255    }
256
257    /// Switches to read state if in write. Also sort the keys.
258    fn switch_to_read_state(&mut self) {
259        match &self.key_file {
260            FileState::Read(_) => {}
261            _ => {
262                self.key_file =
263                    FileState::Read(R::new(format!("{}.keys", self.file_name)).unwrap());
264            }
265        }
266        match &self.value_file {
267            FileState::Read(_) => {}
268            _ => {
269                self.value_file =
270                    FileState::Read(R::new(format!("{}.values", self.file_name)).unwrap());
271            }
272        }
273        self.sort();
274    }
275
276    /// get the index of a key that is less than or equal to the key
277    fn lower_bound(&mut self, id: u64) -> u64 {
278        // lower bound search
279        let mut lo: u64 = 0;
280        let mut hi: u64 = self.size;
281        let mut mid: u64;
282
283        while lo < hi {
284            mid = lo + (hi - lo) / 2;
285            let lo_hi = self.get_key(mid);
286            if lo_hi < id {
287                lo = mid + 1;
288            } else {
289                hi = mid;
290            }
291        }
292
293        lo
294    }
295
296    /// Grab a key from the store at an index
297    fn get_key(&mut self, index: u64) -> u64 {
298        if let FileState::Read(file) = &mut self.key_file {
299            file.uint64_le(Some(index * KEY_STORE_LENGTH))
300        } else {
301            panic!("Not in read state");
302        }
303    }
304
305    /// Grab a key, value offset, and value length from the store at an index
306    fn get_key_value(&mut self, index: u64) -> (u64, u64, u32) {
307        if let FileState::Read(file) = &mut self.key_file {
308            (
309                file.uint64_le(Some(index * KEY_STORE_LENGTH)),
310                file.uint64_le(Some(index * KEY_STORE_LENGTH + 8)),
311                file.uint32_le(Some(index * KEY_STORE_LENGTH + 16)),
312            )
313        } else {
314            panic!("Not in read state");
315        }
316    }
317
318    fn get_value(&mut self, index: u64, length: u32) -> Vec<u8> {
319        if let FileState::Read(file) = &mut self.value_file {
320            file.slice(Some(index), Some(index + length as u64))
321        } else {
322            panic!("Not in read state");
323        }
324    }
325
326    /// Iterate over the store, one key-value at a time
327    pub fn iter(&mut self) -> Iter<'_, R, K, V> {
328        Iter { container: self, index: 0 }
329    }
330
331    /// Iterate over the store but group all values related to a key
332    pub fn iter_multi(&mut self) -> IterMulti<'_, R, K, V> {
333        IterMulti { container: self, index: 0 }
334    }
335}
336/// Iterator for S2BaseStore
337#[derive(Debug)]
338pub struct Iter<'a, R: StdReader, K: U64, V: Serialize + DeserializeOwned> {
339    container: &'a mut S2BaseStore<R, K, V>,
340    index: u64,
341}
342impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Iterator for Iter<'_, R, K, V> {
343    type Item = (K, V);
344    fn next(&mut self) -> Option<Self::Item> {
345        let result = self.container.get_index(self.index);
346        self.index += 1;
347        result
348    }
349}
350/// Multi-Value Iterator for S2BaseStore
351#[derive(Debug)]
352pub struct IterMulti<'a, R: StdReader, K: U64, V: Serialize + DeserializeOwned> {
353    container: &'a mut S2BaseStore<R, K, V>,
354    index: u64,
355}
356impl<R: StdReader, K: U64, V: Serialize + DeserializeOwned> Iterator for IterMulti<'_, R, K, V> {
357    type Item = (K, Vec<V>);
358    fn next(&mut self) -> Option<Self::Item> {
359        let first = self.container.get_index(self.index);
360        self.index += 1;
361        if let Some((key, value)) = first {
362            let mut result: (K, Vec<V>) = (key, vec![value]);
363            loop {
364                let next = self.container.get_index(self.index);
365                if let Some((next_key, next_value)) = next {
366                    if next_key == key {
367                        self.index += 1;
368                        result.1.push(next_value);
369                    } else {
370                        return Some(result);
371                    }
372                }
373            }
374        } else {
375            None
376        }
377    }
378}
379
380fn build_tmp_dir(tmp_dir: Option<String>) -> String {
381    tmp_dir.unwrap_or_else(|| {
382        let tmp_dir = env::temp_dir().join("s2_data_store");
383        fs::create_dir_all(&tmp_dir).unwrap();
384        tmp_dir.to_string_lossy().into()
385    })
386}
387
388/// Builds a temporary file name
389fn build_tmp_file_name(tmp_dir: String) -> String {
390    let random_name = format!(
391        "tmp_{:?}",
392        SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() // Unique identifier
393    );
394
395    let file_name = format!("{tmp_dir}/{random_name}");
396
397    // if file_name already exists let's delete it
398    if Path::new(&file_name).exists() {
399        fs::remove_file(&file_name).unwrap();
400    }
401
402    file_name
403}