gol_client/persistence/
batch_serializer.rs1use bincode;
2use flate2::{write::GzEncoder, Compression};
3use serde::Serialize;
4use std::io::prelude::*;
5
6pub struct IndexedBatchData {
7 pub idx_beg: usize,
8 pub idx_end: usize,
9 pub data: Vec<u8>,
10}
11
12pub struct BatchIndexedSerializer<T, U> {
13 batch_size: usize,
14 iter_count: usize,
15 history_buffer: Vec<(usize, T)>,
16 header: Option<U>,
17}
18
19impl<T, U> BatchIndexedSerializer<T, U>
20where
21 T: Serialize,
22 U: Serialize,
23{
24 pub fn new(batch_size: usize) -> Self {
25 let batch_size = std::cmp::max(batch_size, 1);
26 Self {
27 batch_size,
28 iter_count: 0,
29 history_buffer: Vec::with_capacity(batch_size),
30 header: None,
31 }
32 }
33
34 pub fn with_header(self, header: U) -> Self {
35 let mut res = self;
36 res.header = Some(header);
37 res
38 }
39
40 pub fn push(&mut self, item: T) -> Option<IndexedBatchData> {
41 self.history_buffer.push((self.iter_count, item));
42
43 self.iter_count += 1;
44 if self.history_buffer.len() >= self.batch_size {
45 let serialized = self.serialize_and_clear_buffer();
46 Some(serialized)
47 } else {
48 None
49 }
50 }
51
52 pub fn remaining(&mut self) -> Option<IndexedBatchData> {
53 if self.history_buffer.is_empty() {
54 None
55 } else {
56 Some(self.serialize_and_clear_buffer())
57 }
58 }
59
60 fn serialize_and_clear_buffer(&mut self) -> IndexedBatchData {
61 let idx_beg = self.iter_count - self.history_buffer.len();
62 let idx_end = self.iter_count;
63 let data = bincode::serialize(&(self.header.as_ref(), &self.history_buffer)).unwrap();
64 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
65 encoder.write_all(&data).unwrap();
66 let data = encoder.finish().unwrap();
67 self.history_buffer = Vec::with_capacity(self.batch_size);
68 IndexedBatchData {
69 idx_beg,
70 idx_end,
71 data,
72 }
73 }
74}
75
76impl<T, U> Drop for BatchIndexedSerializer<T, U> {
77 fn drop(&mut self) {
78 if !self.history_buffer.is_empty() {
79 eprintln!(
80 "Dropping non-empty batch serializer with {} remaining items.",
81 self.history_buffer.len()
82 );
83 }
84 }
85}