1use std::collections::HashMap;
2
3use crate::api::AnyVal;
4use crate::default_reader_options;
5use crate::wal_segment_capnp::{self, table_segment_list};
6
7#[derive(Default, Clone, Debug)]
8pub struct EventBuffer {
9 pub tables: HashMap<String, TableBuffer>,
10}
11
12#[derive(Default, Clone, Debug)]
13pub struct TableBuffer {
14 pub len: u64,
15 pub columns: HashMap<String, ColumnBuffer>,
16}
17
18#[derive(Default, Clone, Debug)]
19pub struct ColumnBuffer {
20 pub data: ColumnData,
21}
22
23#[derive(Clone, Debug, Default)]
24pub enum ColumnData {
25 #[default]
26 Empty,
27 Dense(Vec<f64>),
28 Sparse(Vec<(u64, f64)>),
29 I64(Vec<i64>),
30 SparseI64(Vec<(u64, i64)>),
31 String(Vec<String>),
32}
33
34impl ColumnData {
35 pub fn len(&self) -> usize {
36 match self {
37 ColumnData::Dense(data) => data.len(),
38 ColumnData::Sparse(data) => data.len(),
39 ColumnData::I64(data) => data.len(),
40 ColumnData::SparseI64(data) => data.len(),
41 ColumnData::String(data) => data.len(),
42 ColumnData::Empty => 0,
43 }
44 }
45
46 #[must_use]
47 pub fn is_empty(&self) -> bool {
48 match self {
49 ColumnData::Dense(data) => data.is_empty(),
50 ColumnData::Sparse(data) => data.is_empty(),
51 ColumnData::I64(data) => data.is_empty(),
52 ColumnData::SparseI64(data) => data.is_empty(),
53 ColumnData::String(data) => data.is_empty(),
54 ColumnData::Empty => true,
55 }
56 }
57}
58
59impl ColumnBuffer {
60 pub fn push(&mut self, value: AnyVal, existing_len: u64) {
61 match (&mut self.data, value) {
62 (_, AnyVal::Null) => {}
63 (ColumnData::Empty, AnyVal::Float(value)) => {
64 if existing_len == 0 {
65 self.data = ColumnData::Dense(vec![value])
66 } else {
67 self.data = ColumnData::Sparse(vec![(existing_len, value)])
68 }
69 }
70 (ColumnData::Empty, AnyVal::Int(value)) => {
71 if existing_len == 0 {
72 self.data = ColumnData::I64(vec![value])
73 } else {
74 self.data = ColumnData::SparseI64(vec![(existing_len, value)])
75 }
76 }
77 (ColumnData::Empty, AnyVal::Str(value)) => {
78 assert!(
79 existing_len == 0,
80 "Sparse columns not currently supported for string"
81 );
82 self.data = ColumnData::String(vec![value])
83 }
84 (ColumnData::Dense(_), AnyVal::Int(int)) => {
85 self.push(AnyVal::Float(int as f64), existing_len)
86 }
87 (ColumnData::Dense(data), AnyVal::Float(value)) => {
88 if data.len() as u64 == existing_len {
89 data.push(value)
90 } else {
91 let mut sparse_data: Vec<(u64, f64)> = data
92 .drain(..)
93 .enumerate()
94 .map(|(i, v)| (i as u64, v))
95 .collect();
96 sparse_data.push((existing_len, value));
97 self.data = ColumnData::Sparse(sparse_data);
98 }
99 }
100 (ColumnData::Sparse(_), AnyVal::Int(value)) => {
101 self.push(AnyVal::Float(value as f64), existing_len)
102 }
103 (ColumnData::Sparse(data), AnyVal::Float(value)) => data.push((existing_len, value)),
104 (ColumnData::I64(data), AnyVal::Int(value)) => {
105 if data.len() as u64 == existing_len {
106 data.push(value)
107 } else {
108 let mut sparse_data: Vec<(u64, i64)> = data
109 .drain(..)
110 .enumerate()
111 .map(|(i, v)| (i as u64, v))
112 .collect();
113 sparse_data.push((existing_len, value));
114 self.data = ColumnData::SparseI64(sparse_data);
115 }
116 }
117 (ColumnData::I64(data), AnyVal::Float(value)) => {
118 self.data = ColumnData::Dense(data.iter().map(|v| *v as f64).collect());
119 self.push(AnyVal::Float(value), existing_len);
120 }
121 (ColumnData::SparseI64(data), AnyVal::Int(value)) => {
122 data.push((existing_len, value));
123 }
124 (ColumnData::SparseI64(data), AnyVal::Float(value)) => {
125 self.data = ColumnData::Sparse(data.iter().map(|(i, v)| (*i, *v as f64)).collect());
126 self.push(AnyVal::Float(value), existing_len);
127 }
128 (ColumnData::String(data), AnyVal::Str(value)) => {
129 assert!(
130 data.len() as u64 == existing_len,
131 "Sparse columns not currently supported for string"
132 );
133 data.push(value)
134 }
135 (column, data) => unimplemented!("Logging value {:?} to column {:?}", data, column),
136 }
137 }
138}
139
140impl EventBuffer {
141 pub fn serialize(&self) -> Vec<u8> {
142 let mut builder = capnp::message::Builder::new_default();
143 let mut table_segment_list =
144 builder.init_root::<wal_segment_capnp::table_segment_list::Builder>();
145 self.serialize_builder(&mut table_segment_list);
146 let mut buf = Vec::new();
147 capnp::serialize_packed::write_message(&mut buf, &builder).unwrap();
148 buf
149 }
150
151 pub fn serialize_builder(&self, table_segment_list: &mut table_segment_list::Builder) {
152 assert!(self.tables.len() < u32::MAX as usize);
153 let mut data = table_segment_list
154 .reborrow()
155 .init_data(self.tables.len() as u32);
156 for (i, (name, table)) in self.tables.iter().enumerate() {
157 let mut table_builder = data.reborrow().get(i as u32);
158 table_builder.set_len(table.len);
159 table_builder.set_name(name);
160 let mut columns = table_builder
161 .reborrow()
162 .init_columns(table.columns.len() as u32);
163 for (j, (colname, column)) in table.columns.iter().enumerate() {
164 let mut column_builder = columns.reborrow().get(j as u32);
165 column_builder.set_name(colname);
166 match &column.data {
167 ColumnData::Dense(f64s) => {
168 column_builder.get_data().set_f64(&f64s[..]).unwrap();
169 }
170 ColumnData::Sparse(sparse) => {
171 let mut sparse_builder = column_builder.get_data().init_sparse_f64();
172 assert!(sparse.len() < u32::MAX as usize);
173 let (indices, values): (Vec<_>, Vec<_>) = sparse.iter().cloned().unzip();
174 sparse_builder.reborrow().set_indices(&indices[..]).unwrap();
175 sparse_builder.reborrow().set_values(&values[..]).unwrap();
176 }
177 ColumnData::I64(i64s) => {
178 column_builder.get_data().set_i64(&i64s[..]).unwrap();
179 }
180 ColumnData::String(strings) => {
181 column_builder.get_data().set_string(&strings[..]).unwrap();
182 }
183 ColumnData::Empty => {
184 column_builder.get_data().set_empty(());
185 }
186 ColumnData::SparseI64(sparse) => {
187 let mut sparse_builder = column_builder.get_data().init_sparse_i64();
188 assert!(sparse.len() < u32::MAX as usize);
189 let (indices, values): (Vec<_>, Vec<_>) = sparse.iter().cloned().unzip();
190 sparse_builder.reborrow().set_indices(&indices[..]).unwrap();
191 sparse_builder.reborrow().set_values(&values[..]).unwrap();
192 }
193 }
194 }
195 }
196 }
197
198 pub fn deserialize(data: &[u8]) -> capnp::Result<Self> {
199 let message_reader = capnp::serialize_packed::read_message(data, default_reader_options())?;
200 let table_segment_list =
201 message_reader.get_root::<wal_segment_capnp::table_segment_list::Reader>()?;
202 let data = EventBuffer::deserialize_reader(table_segment_list)?;
203 Ok(EventBuffer {
204 tables: data.tables,
205 })
206 }
207
208 pub fn deserialize_reader(data: table_segment_list::Reader) -> capnp::Result<Self> {
209 let mut tables = HashMap::<String, TableBuffer>::new();
210 for table in data.get_data()?.iter() {
211 let name = table.get_name()?.to_string().unwrap();
212 let len = table.get_len();
213 let mut columns = HashMap::new();
214 for column in table.get_columns()?.iter() {
215 let colname = column.get_name()?.to_string().unwrap();
216 let data = column.get_data();
217 use crate::wal_segment_capnp::column::data::Which;
218 let data = match data.which()? {
219 Which::F64(f64s) => ColumnData::Dense(f64s?.iter().collect()),
220 Which::SparseF64(sparse) => {
221 let indices = sparse.get_indices()?;
222 let values = sparse.get_values()?;
223 ColumnData::Sparse(indices.iter().zip(values.iter()).collect())
224 }
225 Which::I64(i64s) => ColumnData::I64(i64s?.iter().collect()),
226 Which::String(strs) => {
227 let mut strings = Vec::new();
228 for s in strs?.iter() {
229 strings.push(s?.to_string().unwrap());
230 }
231 ColumnData::String(strings)
232 }
233 Which::Empty(()) => ColumnData::Empty,
234 Which::SparseI64(sparse) => {
235 let indices = sparse.get_indices()?;
236 let values = sparse.get_values()?;
237 ColumnData::SparseI64(indices.iter().zip(values.iter()).collect())
238 }
239 };
240 columns.insert(colname, ColumnBuffer { data });
241 }
242 tables.insert(name, TableBuffer { len, columns });
243 }
244 Ok(EventBuffer { tables })
245 }
246}