locustdb_serialization/
event_buffer.rs

1use std::collections::HashMap;
2
3use crate::api::AnyVal;
4use crate::default_reader_options;
5use crate::wal_segment_capnp::{self, table_segment_list};
6
7#[derive(Default, Clone, Debug)]
8pub struct EventBuffer {
9    pub tables: HashMap<String, TableBuffer>,
10}
11
12#[derive(Default, Clone, Debug)]
13pub struct TableBuffer {
14    pub len: u64,
15    pub columns: HashMap<String, ColumnBuffer>,
16}
17
18#[derive(Default, Clone, Debug)]
19pub struct ColumnBuffer {
20    pub data: ColumnData,
21}
22
23#[derive(Clone, Debug, Default)]
24pub enum ColumnData {
25    #[default]
26    Empty,
27    Dense(Vec<f64>),
28    Sparse(Vec<(u64, f64)>),
29    I64(Vec<i64>),
30    SparseI64(Vec<(u64, i64)>),
31    String(Vec<String>),
32}
33
34impl ColumnData {
35    pub fn len(&self) -> usize {
36        match self {
37            ColumnData::Dense(data) => data.len(),
38            ColumnData::Sparse(data) => data.len(),
39            ColumnData::I64(data) => data.len(),
40            ColumnData::SparseI64(data) => data.len(),
41            ColumnData::String(data) => data.len(),
42            ColumnData::Empty => 0,
43        }
44    }
45
46    #[must_use]
47    pub fn is_empty(&self) -> bool {
48        match self {
49            ColumnData::Dense(data) => data.is_empty(),
50            ColumnData::Sparse(data) => data.is_empty(),
51            ColumnData::I64(data) => data.is_empty(),
52            ColumnData::SparseI64(data) => data.is_empty(),
53            ColumnData::String(data) => data.is_empty(),
54            ColumnData::Empty => true,
55        }
56    }
57}
58
59impl ColumnBuffer {
60    pub fn push(&mut self, value: AnyVal, existing_len: u64) {
61        match (&mut self.data, value) {
62            (_, AnyVal::Null) => {}
63            (ColumnData::Empty, AnyVal::Float(value)) => {
64                if existing_len == 0 {
65                    self.data = ColumnData::Dense(vec![value])
66                } else {
67                    self.data = ColumnData::Sparse(vec![(existing_len, value)])
68                }
69            }
70            (ColumnData::Empty, AnyVal::Int(value)) => {
71                if existing_len == 0 {
72                    self.data = ColumnData::I64(vec![value])
73                } else {
74                    self.data = ColumnData::SparseI64(vec![(existing_len, value)])
75                }
76            }
77            (ColumnData::Empty, AnyVal::Str(value)) => {
78                assert!(
79                    existing_len == 0,
80                    "Sparse columns not currently supported for string"
81                );
82                self.data = ColumnData::String(vec![value])
83            }
84            (ColumnData::Dense(_), AnyVal::Int(int)) => {
85                self.push(AnyVal::Float(int as f64), existing_len)
86            }
87            (ColumnData::Dense(data), AnyVal::Float(value)) => {
88                if data.len() as u64 == existing_len {
89                    data.push(value)
90                } else {
91                    let mut sparse_data: Vec<(u64, f64)> = data
92                        .drain(..)
93                        .enumerate()
94                        .map(|(i, v)| (i as u64, v))
95                        .collect();
96                    sparse_data.push((existing_len, value));
97                    self.data = ColumnData::Sparse(sparse_data);
98                }
99            }
100            (ColumnData::Sparse(_), AnyVal::Int(value)) => {
101                self.push(AnyVal::Float(value as f64), existing_len)
102            }
103            (ColumnData::Sparse(data), AnyVal::Float(value)) => data.push((existing_len, value)),
104            (ColumnData::I64(data), AnyVal::Int(value)) => {
105                if data.len() as u64 == existing_len {
106                    data.push(value)
107                } else {
108                    let mut sparse_data: Vec<(u64, i64)> = data
109                        .drain(..)
110                        .enumerate()
111                        .map(|(i, v)| (i as u64, v))
112                        .collect();
113                    sparse_data.push((existing_len, value));
114                    self.data = ColumnData::SparseI64(sparse_data);
115                }
116            }
117            (ColumnData::I64(data), AnyVal::Float(value)) => {
118                self.data = ColumnData::Dense(data.iter().map(|v| *v as f64).collect());
119                self.push(AnyVal::Float(value), existing_len);
120            }
121            (ColumnData::SparseI64(data), AnyVal::Int(value)) => {
122                data.push((existing_len, value));
123            }
124            (ColumnData::SparseI64(data), AnyVal::Float(value)) => {
125                self.data = ColumnData::Sparse(data.iter().map(|(i, v)| (*i, *v as f64)).collect());
126                self.push(AnyVal::Float(value), existing_len);
127            }
128            (ColumnData::String(data), AnyVal::Str(value)) => {
129                assert!(
130                    data.len() as u64 == existing_len,
131                    "Sparse columns not currently supported for string"
132                );
133                data.push(value)
134            }
135            (column, data) => unimplemented!("Logging value {:?} to column {:?}", data, column),
136        }
137    }
138}
139
140impl EventBuffer {
141    pub fn serialize(&self) -> Vec<u8> {
142        let mut builder = capnp::message::Builder::new_default();
143        let mut table_segment_list =
144            builder.init_root::<wal_segment_capnp::table_segment_list::Builder>();
145        self.serialize_builder(&mut table_segment_list);
146        let mut buf = Vec::new();
147        capnp::serialize_packed::write_message(&mut buf, &builder).unwrap();
148        buf
149    }
150
151    pub fn serialize_builder(&self, table_segment_list: &mut table_segment_list::Builder) {
152        assert!(self.tables.len() < u32::MAX as usize);
153        let mut data = table_segment_list
154            .reborrow()
155            .init_data(self.tables.len() as u32);
156        for (i, (name, table)) in self.tables.iter().enumerate() {
157            let mut table_builder = data.reborrow().get(i as u32);
158            table_builder.set_len(table.len);
159            table_builder.set_name(name);
160            let mut columns = table_builder
161                .reborrow()
162                .init_columns(table.columns.len() as u32);
163            for (j, (colname, column)) in table.columns.iter().enumerate() {
164                let mut column_builder = columns.reborrow().get(j as u32);
165                column_builder.set_name(colname);
166                match &column.data {
167                    ColumnData::Dense(f64s) => {
168                        column_builder.get_data().set_f64(&f64s[..]).unwrap();
169                    }
170                    ColumnData::Sparse(sparse) => {
171                        let mut sparse_builder = column_builder.get_data().init_sparse_f64();
172                        assert!(sparse.len() < u32::MAX as usize);
173                        let (indices, values): (Vec<_>, Vec<_>) = sparse.iter().cloned().unzip();
174                        sparse_builder.reborrow().set_indices(&indices[..]).unwrap();
175                        sparse_builder.reborrow().set_values(&values[..]).unwrap();
176                    }
177                    ColumnData::I64(i64s) => {
178                        column_builder.get_data().set_i64(&i64s[..]).unwrap();
179                    }
180                    ColumnData::String(strings) => {
181                        column_builder.get_data().set_string(&strings[..]).unwrap();
182                    }
183                    ColumnData::Empty => {
184                        column_builder.get_data().set_empty(());
185                    }
186                    ColumnData::SparseI64(sparse) => {
187                        let mut sparse_builder = column_builder.get_data().init_sparse_i64();
188                        assert!(sparse.len() < u32::MAX as usize);
189                        let (indices, values): (Vec<_>, Vec<_>) = sparse.iter().cloned().unzip();
190                        sparse_builder.reborrow().set_indices(&indices[..]).unwrap();
191                        sparse_builder.reborrow().set_values(&values[..]).unwrap();
192                    }
193                }
194            }
195        }
196    }
197
198    pub fn deserialize(data: &[u8]) -> capnp::Result<Self> {
199        let message_reader = capnp::serialize_packed::read_message(data, default_reader_options())?;
200        let table_segment_list =
201            message_reader.get_root::<wal_segment_capnp::table_segment_list::Reader>()?;
202        let data = EventBuffer::deserialize_reader(table_segment_list)?;
203        Ok(EventBuffer {
204            tables: data.tables,
205        })
206    }
207
208    pub fn deserialize_reader(data: table_segment_list::Reader) -> capnp::Result<Self> {
209        let mut tables = HashMap::<String, TableBuffer>::new();
210        for table in data.get_data()?.iter() {
211            let name = table.get_name()?.to_string().unwrap();
212            let len = table.get_len();
213            let mut columns = HashMap::new();
214            for column in table.get_columns()?.iter() {
215                let colname = column.get_name()?.to_string().unwrap();
216                let data = column.get_data();
217                use crate::wal_segment_capnp::column::data::Which;
218                let data = match data.which()? {
219                    Which::F64(f64s) => ColumnData::Dense(f64s?.iter().collect()),
220                    Which::SparseF64(sparse) => {
221                        let indices = sparse.get_indices()?;
222                        let values = sparse.get_values()?;
223                        ColumnData::Sparse(indices.iter().zip(values.iter()).collect())
224                    }
225                    Which::I64(i64s) => ColumnData::I64(i64s?.iter().collect()),
226                    Which::String(strs) => {
227                        let mut strings = Vec::new();
228                        for s in strs?.iter() {
229                            strings.push(s?.to_string().unwrap());
230                        }
231                        ColumnData::String(strings)
232                    }
233                    Which::Empty(()) => ColumnData::Empty,
234                    Which::SparseI64(sparse) => {
235                        let indices = sparse.get_indices()?;
236                        let values = sparse.get_values()?;
237                        ColumnData::SparseI64(indices.iter().zip(values.iter()).collect())
238                    }
239                };
240                columns.insert(colname, ColumnBuffer { data });
241            }
242            tables.insert(name, TableBuffer { len, columns });
243        }
244        Ok(EventBuffer { tables })
245    }
246}