gol_client/persistence/
batch_serializer_local.rs

1use super::batch_serializer::{BatchIndexedSerializer, IndexedBatchData};
2use super::HISTORY_EXTENSION;
3use gol_core::{BoardCallbackWithStates, IndexedDataOwned};
4use rayon::prelude::*;
5use serde::Serialize;
6use shellexpand;
7use std::fs::{self, File};
8use std::io::prelude::*;
9use std::io::BufWriter;
10use std::path::Path;
11use std::thread::{self, JoinHandle};
12
13pub struct BatchSerializerLocal<T, U>
14where
15    T: Serialize,
16    U: Serialize,
17{
18    path: String,
19    serializer: BatchIndexedSerializer<T, U>,
20    last_handle: Option<JoinHandle<()>>,
21}
22
23pub struct StateSerializerLocal<T, U, S>
24where
25    T: Serialize,
26    U: Serialize,
27{
28    serializer: BatchSerializerLocal<T, U>,
29    trivial_state: S,
30}
31
32impl<T, U> BatchSerializerLocal<T, U>
33where
34    T: Serialize,
35    U: Serialize,
36{
37    pub fn new(dir_path: &String, serializer: BatchIndexedSerializer<T, U>) -> Self {
38        let expanded = shellexpand::full(dir_path).unwrap();
39        let dir_path = Path::new(expanded.as_ref());
40        let exists = dir_path.exists();
41        if exists {
42            let is_empty = dir_path.read_dir().unwrap().next().is_none();
43            if !is_empty {
44                panic!("Directory \"{}\" not empty.", dir_path.to_str().unwrap());
45            }
46        } else {
47            fs::create_dir_all(dir_path).unwrap();
48        }
49        Self {
50            path: String::from(dir_path.to_str().unwrap()),
51            serializer,
52            last_handle: None,
53        }
54    }
55
56    pub fn push(&mut self, content: T) {
57        let bytes = self.serializer.push(content);
58        self.save_bytes(bytes);
59    }
60
61    fn save_bytes(&mut self, bytes: Option<IndexedBatchData>) {
62        if bytes.is_none() {
63            return;
64        }
65        let data = bytes.unwrap();
66        let file_name = format!("{}_{}.{}", data.idx_beg, data.idx_end, HISTORY_EXTENSION);
67        let file_path = Path::new(&self.path).join(&file_name);
68        let file = File::create(&file_path).unwrap();
69        let mut buffer = BufWriter::new(file);
70        self.wait_on_last_handle();
71        self.last_handle = Some(thread::spawn(move || {
72            buffer.write_all(&data.data).unwrap();
73            buffer.flush().unwrap();
74        }));
75    }
76
77    fn wait_on_last_handle(&mut self) {
78        if self.last_handle.is_none() {
79            return;
80        }
81        self.last_handle.take().unwrap().join().unwrap();
82        self.last_handle = None;
83    }
84}
85
86impl<T, U, S> StateSerializerLocal<T, U, S>
87where
88    T: Serialize,
89    U: Serialize,
90{
91    pub fn new(serializer: BatchSerializerLocal<T, U>, trivial_state: S) -> Self {
92        Self {
93            serializer,
94            trivial_state,
95        }
96    }
97}
98
99impl<T, U> Drop for BatchSerializerLocal<T, U>
100where
101    T: Serialize,
102    U: Serialize,
103{
104    fn drop(&mut self) {
105        let remaining = self.serializer.remaining();
106        self.save_bytes(remaining);
107        self.wait_on_last_handle();
108    }
109}
110
111impl<T, CI, I, S> BoardCallbackWithStates<T, CI, I> for BatchSerializerLocal<Vec<(CI, T)>, S>
112where
113    T: Send + Sync + Serialize,
114    CI: Send + Sync + Serialize,
115    S: Send + Sync + Serialize,
116    I: rayon::iter::ParallelIterator<Item = IndexedDataOwned<CI, T>>,
117{
118    fn execute(&mut self, states: I) {
119        self.push(states.collect());
120    }
121}
122
123impl<T, CI, I, S> BoardCallbackWithStates<T, CI, I> for StateSerializerLocal<Vec<(CI, T)>, S, T>
124where
125    T: Send + Sync + Serialize + std::cmp::PartialEq,
126    CI: Send + Sync + Serialize,
127    S: Send + Sync + Serialize,
128    I: rayon::iter::ParallelIterator<Item = IndexedDataOwned<CI, T>>,
129{
130    fn execute(&mut self, states: I) {
131        self.serializer
132            .push(states.filter(|ele| ele.1 != self.trivial_state).collect());
133    }
134}