gol_client/persistence/
batch_serializer_local.rs1use super::batch_serializer::{BatchIndexedSerializer, IndexedBatchData};
2use super::HISTORY_EXTENSION;
3use gol_core::{BoardCallbackWithStates, IndexedDataOwned};
4use rayon::prelude::*;
5use serde::Serialize;
6use shellexpand;
7use std::fs::{self, File};
8use std::io::prelude::*;
9use std::io::BufWriter;
10use std::path::Path;
11use std::thread::{self, JoinHandle};
12
13pub struct BatchSerializerLocal<T, U>
14where
15 T: Serialize,
16 U: Serialize,
17{
18 path: String,
19 serializer: BatchIndexedSerializer<T, U>,
20 last_handle: Option<JoinHandle<()>>,
21}
22
23pub struct StateSerializerLocal<T, U, S>
24where
25 T: Serialize,
26 U: Serialize,
27{
28 serializer: BatchSerializerLocal<T, U>,
29 trivial_state: S,
30}
31
32impl<T, U> BatchSerializerLocal<T, U>
33where
34 T: Serialize,
35 U: Serialize,
36{
37 pub fn new(dir_path: &String, serializer: BatchIndexedSerializer<T, U>) -> Self {
38 let expanded = shellexpand::full(dir_path).unwrap();
39 let dir_path = Path::new(expanded.as_ref());
40 let exists = dir_path.exists();
41 if exists {
42 let is_empty = dir_path.read_dir().unwrap().next().is_none();
43 if !is_empty {
44 panic!("Directory \"{}\" not empty.", dir_path.to_str().unwrap());
45 }
46 } else {
47 fs::create_dir_all(dir_path).unwrap();
48 }
49 Self {
50 path: String::from(dir_path.to_str().unwrap()),
51 serializer,
52 last_handle: None,
53 }
54 }
55
56 pub fn push(&mut self, content: T) {
57 let bytes = self.serializer.push(content);
58 self.save_bytes(bytes);
59 }
60
61 fn save_bytes(&mut self, bytes: Option<IndexedBatchData>) {
62 if bytes.is_none() {
63 return;
64 }
65 let data = bytes.unwrap();
66 let file_name = format!("{}_{}.{}", data.idx_beg, data.idx_end, HISTORY_EXTENSION);
67 let file_path = Path::new(&self.path).join(&file_name);
68 let file = File::create(&file_path).unwrap();
69 let mut buffer = BufWriter::new(file);
70 self.wait_on_last_handle();
71 self.last_handle = Some(thread::spawn(move || {
72 buffer.write_all(&data.data).unwrap();
73 buffer.flush().unwrap();
74 }));
75 }
76
77 fn wait_on_last_handle(&mut self) {
78 if self.last_handle.is_none() {
79 return;
80 }
81 self.last_handle.take().unwrap().join().unwrap();
82 self.last_handle = None;
83 }
84}
85
86impl<T, U, S> StateSerializerLocal<T, U, S>
87where
88 T: Serialize,
89 U: Serialize,
90{
91 pub fn new(serializer: BatchSerializerLocal<T, U>, trivial_state: S) -> Self {
92 Self {
93 serializer,
94 trivial_state,
95 }
96 }
97}
98
99impl<T, U> Drop for BatchSerializerLocal<T, U>
100where
101 T: Serialize,
102 U: Serialize,
103{
104 fn drop(&mut self) {
105 let remaining = self.serializer.remaining();
106 self.save_bytes(remaining);
107 self.wait_on_last_handle();
108 }
109}
110
111impl<T, CI, I, S> BoardCallbackWithStates<T, CI, I> for BatchSerializerLocal<Vec<(CI, T)>, S>
112where
113 T: Send + Sync + Serialize,
114 CI: Send + Sync + Serialize,
115 S: Send + Sync + Serialize,
116 I: rayon::iter::ParallelIterator<Item = IndexedDataOwned<CI, T>>,
117{
118 fn execute(&mut self, states: I) {
119 self.push(states.collect());
120 }
121}
122
123impl<T, CI, I, S> BoardCallbackWithStates<T, CI, I> for StateSerializerLocal<Vec<(CI, T)>, S, T>
124where
125 T: Send + Sync + Serialize + std::cmp::PartialEq,
126 CI: Send + Sync + Serialize,
127 S: Send + Sync + Serialize,
128 I: rayon::iter::ParallelIterator<Item = IndexedDataOwned<CI, T>>,
129{
130 fn execute(&mut self, states: I) {
131 self.serializer
132 .push(states.filter(|ele| ele.1 != self.trivial_state).collect());
133 }
134}