locustdb_serialization/
api.rs

1use capnp::serialize_packed;
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, HashSet};
4
5use crate::api_capnp::{self, query_response};
6use crate::default_reader_options;
7
8#[derive(Serialize, Deserialize, Debug)]
9pub struct ColumnNameRequest {
10    pub tables: Vec<String>,
11    pub pattern: Option<String>,
12    pub offset: Option<usize>,
13    pub limit: Option<usize>,
14}
15
16#[derive(Serialize, Deserialize, Debug)]
17pub struct ColumnNameResponse {
18    pub columns: Vec<String>,
19    pub offset: usize,
20    pub len: usize,
21}
22
23#[derive(Serialize, Deserialize, Debug)]
24pub struct QueryRequest {
25    pub query: String,
26}
27
28#[derive(Serialize, Deserialize, Debug)]
29pub struct MultiQueryRequest {
30    pub queries: Vec<String>,
31    pub encoding_opts: Option<EncodingOpts>,
32}
33
34#[derive(Serialize, Deserialize, Debug)]
35pub struct QueryResponse {
36    pub columns: HashMap<String, Column>,
37}
38
39#[derive(Serialize, Deserialize, Debug, Clone)]
40pub struct EncodingOpts {
41    pub xor_float_compression: bool,
42    pub mantissa: Option<u32>,
43    pub full_precision_cols: HashSet<String>,
44}
45
46pub struct MultiQueryResponse {
47    pub responses: Vec<QueryResponse>,
48}
49
50#[derive(Serialize, Deserialize, Debug, Clone)]
51pub enum Column {
52    Float(Vec<f64>),
53    Int(Vec<i64>),
54    String(Vec<String>),
55    Mixed(Vec<AnyVal>),
56    Null(usize),
57
58    Xor(Vec<u8>),
59}
60
61#[derive(Serialize, Deserialize, Debug, Clone)]
62pub enum AnyVal {
63    Int(i64),
64    Float(f64),
65    Str(String),
66    Null,
67}
68
69impl Column {
70    pub fn size_bytes(&self) -> usize {
71        let heapsize = match self {
72            Column::Float(xs) => xs.len() * std::mem::size_of::<f64>(),
73            Column::Int(xs) => xs.len() * std::mem::size_of::<i64>(),
74            Column::String(xs) => xs.iter().map(|s| s.len()).sum(),
75            Column::Mixed(xs) => xs
76                .iter()
77                .map(|m| match m {
78                    AnyVal::Str(s) => s.len() + std::mem::size_of::<AnyVal>(),
79                    _ => std::mem::size_of::<AnyVal>(),
80                })
81                .sum(),
82            Column::Null(_) => 0,
83            Column::Xor(xs) => xs.len(),
84        };
85        heapsize + std::mem::size_of::<Column>()
86    }
87
88    pub fn len(&self) -> usize {
89        match self {
90            Column::Float(xs) => xs.len(),
91            Column::Int(xs) => xs.len(),
92            Column::String(xs) => xs.len(),
93            Column::Mixed(xs) => xs.len(),
94            Column::Null(n) => *n,
95            Column::Xor(_) => panic!("len() not implemented for xor compressed columns"),
96        }
97    }
98
99    #[must_use]
100    pub fn is_empty(&self) -> bool {
101        self.len() == 0
102    }
103}
104
105impl MultiQueryResponse {
106    pub fn deserialize(data: &[u8]) -> capnp::Result<MultiQueryResponse> {
107        let message_reader =
108            serialize_packed::read_message(data, default_reader_options())?;
109        let multi_query_response =
110            message_reader.get_root::<api_capnp::multi_query_response::Reader>()?;
111        let mut responses = Vec::new();
112        for query_response in multi_query_response.get_responses()?.iter() {
113            responses.push(QueryResponse::deserialize_reader(query_response)?);
114        }
115        Ok(MultiQueryResponse { responses })
116    }
117
118    pub fn serialize(&self) -> Vec<u8> {
119        let mut builder = capnp::message::Builder::new_default();
120        let multi_query_response = builder.init_root::<api_capnp::multi_query_response::Builder>();
121        let mut responses = multi_query_response.init_responses(self.responses.len() as u32);
122        for (i, response) in self.responses.iter().enumerate() {
123            let mut response_builder = responses.reborrow().get(i as u32);
124            response.serialize_builder(&mut response_builder);
125        }
126        let mut buf = Vec::new();
127        serialize_packed::write_message(&mut buf, &builder).unwrap();
128        buf
129    }
130}
131
132impl QueryResponse {
133    pub fn deserialize(data: &[u8]) -> capnp::Result<QueryResponse> {
134        let message_reader =
135            serialize_packed::read_message(data, default_reader_options()).unwrap();
136        let query_response = message_reader.get_root::<api_capnp::query_response::Reader>()?;
137        QueryResponse::deserialize_reader(query_response)
138    }
139
140    pub fn deserialize_reader(
141        query_response: query_response::Reader,
142    ) -> capnp::Result<QueryResponse> {
143        let mut columns = HashMap::new();
144        for column in query_response.get_columns()?.iter() {
145            let (name, column) = Column::deserialize_reader(column)?;
146            columns.insert(name, column);
147        }
148        Ok(QueryResponse { columns })
149    }
150
151    pub fn serialize(&self) -> Vec<u8> {
152        let mut builder = capnp::message::Builder::new_default();
153        let mut query_response = builder.init_root::<query_response::Builder>();
154        self.serialize_builder(&mut query_response);
155        let mut buf = Vec::new();
156        serialize_packed::write_message(&mut buf, &builder).unwrap();
157        buf
158    }
159
160    pub fn serialize_builder(&self, query_response: &mut query_response::Builder) {
161        let mut columns = query_response
162            .reborrow()
163            .init_columns(self.columns.len() as u32);
164        for (i, (name, column)) in self.columns.iter().enumerate() {
165            let mut column_builder = columns.reborrow().get(i as u32);
166            column.serialize_builder(name, &mut column_builder);
167        }
168    }
169}
170
171impl Column {
172    #[cfg(test)]
173    fn serialize(&self) -> Vec<u8> {
174        let mut builder = capnp::message::Builder::new_default();
175        let mut column = builder.init_root::<api_capnp::column::Builder>();
176        self.serialize_builder("", &mut column);
177        let mut buf = Vec::new();
178        serialize_packed::write_message(&mut buf, &builder).unwrap();
179        buf
180    }
181
182    fn serialize_builder(&self, name: &str, column_builder: &mut api_capnp::column::Builder) {
183        column_builder.set_name(name);
184        match self {
185            Column::Float(xs) => column_builder
186                .reborrow()
187                .init_data()
188                .set_f64(&xs[..])
189                .unwrap(),
190            Column::Int(xs) => {
191                let delta_stats = determine_delta_compressability(&xs[..]);
192                if delta_stats.min_delta == delta_stats.max_delta
193                    && delta_stats.max_delta <= i64::MAX as i128
194                    && delta_stats.max_delta >= i64::MIN as i128
195                {
196                    let mut range = column_builder.reborrow().init_data().init_range();
197                    range.set_start(xs[0]);
198                    range.set_len(xs.len() as u64);
199                    range.set_step(delta_stats.min_delta as i64);
200                } else if delta_stats.min_delta >= i8::MIN as i128
201                    && delta_stats.max_delta <= i8::MAX as i128
202                {
203                    let mut delta_encoded = column_builder
204                        .reborrow()
205                        .init_data()
206                        .init_delta_encoded_i8();
207                    delta_encoded.set_first(xs[0]);
208                    delta_encoded.set_data(&delta_encode(xs)[..]).unwrap();
209                } else if delta_stats.min_delta_delta >= i8::MIN as i128
210                    && delta_stats.max_delta_delta <= i8::MAX as i128
211                {
212                    let mut double_delta_encoded = column_builder
213                        .reborrow()
214                        .init_data()
215                        .init_double_delta_encoded_i8();
216                    double_delta_encoded.set_first(xs[0]);
217                    double_delta_encoded.set_second(xs[1]);
218                    double_delta_encoded
219                        .set_data(&double_delta_encode(xs)[..])
220                        .unwrap();
221                } else if delta_stats.min_delta >= i16::MIN as i128
222                    && delta_stats.max_delta <= i16::MAX as i128
223                {
224                    let mut delta_encoded = column_builder
225                        .reborrow()
226                        .init_data()
227                        .init_delta_encoded_i16();
228                    delta_encoded.set_first(xs[0]);
229                    delta_encoded.set_data(&delta_encode(xs)[..]).unwrap();
230                } else if delta_stats.min_delta_delta >= i16::MIN as i128
231                    && delta_stats.max_delta_delta <= i16::MAX as i128
232                {
233                    let mut double_delta_encoded = column_builder
234                        .reborrow()
235                        .init_data()
236                        .init_double_delta_encoded_i16();
237                    double_delta_encoded.set_first(xs[0]);
238                    double_delta_encoded.set_second(xs[1]);
239                    double_delta_encoded
240                        .set_data(&double_delta_encode(xs)[..])
241                        .unwrap();
242                } else if delta_stats.min_delta >= i32::MIN as i128
243                    && delta_stats.max_delta <= i32::MAX as i128
244                {
245                    let mut delta_encoded = column_builder
246                        .reborrow()
247                        .init_data()
248                        .init_delta_encoded_i32();
249                    delta_encoded.set_first(xs[0]);
250                    delta_encoded.set_data(&delta_encode(xs)[..]).unwrap();
251                } else if delta_stats.min_delta_delta >= i32::MIN as i128
252                    && delta_stats.max_delta_delta <= i32::MAX as i128
253                {
254                    let mut double_delta_encoded = column_builder
255                        .reborrow()
256                        .init_data()
257                        .init_double_delta_encoded_i32();
258                    double_delta_encoded.set_first(xs[0]);
259                    double_delta_encoded.set_second(xs[1]);
260                    double_delta_encoded
261                        .set_data(&double_delta_encode(xs)[..])
262                        .unwrap();
263                } else {
264                    column_builder
265                        .reborrow()
266                        .init_data()
267                        .set_i64(&xs[..])
268                        .unwrap()
269                }
270            }
271            Column::String(xs) => {
272                let mut strings = column_builder
273                    .reborrow()
274                    .init_data()
275                    .init_string(xs.len() as u32);
276                for (i, s) in xs.iter().enumerate() {
277                    strings.set(i as u32, s);
278                }
279            }
280            Column::Mixed(xs) => {
281                let mut mixed = column_builder
282                    .reborrow()
283                    .init_data()
284                    .init_mixed(xs.len() as u32);
285                for (i, x) in xs.iter().enumerate() {
286                    let mut x_builder = mixed.reborrow().get(i as u32);
287                    match x {
288                        AnyVal::Int(x) => x_builder.set_i64(*x),
289                        AnyVal::Float(x) => x_builder.set_f64(*x),
290                        AnyVal::Str(x) => x_builder.set_string(x),
291                        AnyVal::Null => x_builder.set_null(()),
292                    }
293                }
294            }
295            Column::Null(n) => column_builder.reborrow().init_data().set_null(*n as u64),
296            Column::Xor(xs) => column_builder.reborrow().init_data().set_xor_f64(&xs[..]),
297        };
298    }
299
300    #[cfg(test)]
301    fn deserialize(data: &[u8]) -> capnp::Result<(String, Column)> {
302        let message_reader =
303            serialize_packed::read_message(data, default_reader_options()).unwrap();
304        let column = message_reader.get_root::<api_capnp::column::Reader>()?;
305        Column::deserialize_reader(column)
306    }
307
308    fn deserialize_reader(column: api_capnp::column::Reader) -> capnp::Result<(String, Column)> {
309        let name = column.get_name()?.to_string().unwrap();
310        use api_capnp::column::data::Which;
311        let column = match column.get_data().which()? {
312            Which::F64(xs) => Column::Float(xs?.iter().collect()),
313            Which::I64(xs) => Column::Int(xs?.iter().collect()),
314            Which::String(xs) => {
315                let mut strings = Vec::new();
316                for s in xs?.iter() {
317                    strings.push(s?.to_string().unwrap());
318                }
319                Column::String(strings)
320            }
321            Which::Mixed(xs) => {
322                let mut mixed = Vec::new();
323                for x in xs?.iter() {
324                    use api_capnp::any_val::Which;
325                    let x = match x.which()? {
326                        Which::I64(x) => AnyVal::Int(x),
327                        Which::F64(x) => AnyVal::Float(x),
328                        Which::String(x) => AnyVal::Str(x?.to_string().unwrap()),
329                        Which::Null(()) => AnyVal::Null,
330                    };
331                    mixed.push(x);
332                }
333                Column::Mixed(mixed)
334            }
335            Which::Null(xs) => Column::Null(xs as usize),
336            Which::XorF64(xs) => Column::Xor(xs?.to_vec()),
337            Which::DeltaEncodedI8(xs) => {
338                let first = xs.get_first();
339                let data = xs.get_data()?;
340                let mut decoded = Vec::with_capacity(data.len() as usize + 1);
341                decoded.push(first);
342                let mut last = first;
343                for i in data {
344                    last += i as i64;
345                    decoded.push(last);
346                }
347                Column::Int(decoded)
348            }
349            Which::DeltaEncodedI16(xs) => {
350                let first = xs.get_first();
351                let data = xs.get_data()?;
352                let mut decoded = Vec::with_capacity(data.len() as usize + 1);
353                decoded.push(first);
354                let mut last = first;
355                for i in data {
356                    last += i as i64;
357                    decoded.push(last);
358                }
359                Column::Int(decoded)
360            }
361            Which::DeltaEncodedI32(xs) => {
362                let first = xs.get_first();
363                let data = xs.get_data()?;
364                let mut decoded = Vec::with_capacity(data.len() as usize + 1);
365                decoded.push(first);
366                let mut last = first;
367                for i in data {
368                    last += i as i64;
369                    decoded.push(last);
370                }
371                Column::Int(decoded)
372            }
373            Which::DoubleDeltaEncodedI8(xs) => {
374                let first = xs.get_first();
375                let second = xs.get_second();
376                let data = xs.get_data()?;
377                let mut decoded = Vec::with_capacity(data.len() as usize + 2);
378                decoded.push(first);
379                decoded.push(second);
380                let mut last = second;
381                let mut last_delta = second - first;
382                for i in data {
383                    last_delta += i as i64;
384                    last += last_delta;
385                    decoded.push(last);
386                }
387                Column::Int(decoded)
388            }
389            Which::DoubleDeltaEncodedI16(xs) => {
390                let first = xs.get_first();
391                let second = xs.get_second();
392                let data = xs.get_data()?;
393                let mut decoded = Vec::with_capacity(data.len() as usize + 2);
394                decoded.push(first);
395                decoded.push(second);
396                let mut last = second;
397                let mut last_delta = second - first;
398                for i in data {
399                    last_delta += i as i64;
400                    last += last_delta;
401                    decoded.push(last);
402                }
403                Column::Int(decoded)
404            }
405            Which::DoubleDeltaEncodedI32(xs) => {
406                let first = xs.get_first();
407                let second = xs.get_second();
408                let data = xs.get_data()?;
409                let mut decoded = Vec::with_capacity(data.len() as usize + 2);
410                decoded.push(first);
411                decoded.push(second);
412                let mut last = second;
413                let mut last_delta = second - first;
414                for i in data {
415                    last_delta += i as i64;
416                    last += last_delta;
417                    decoded.push(last);
418                }
419                Column::Int(decoded)
420            }
421            Which::Range(xs) => {
422                let start = xs.get_start();
423                let len = xs.get_len() as usize;
424                let step = xs.get_step();
425                let decoded = (0..len).map(|i| start + i as i64 * step).collect();
426                Column::Int(decoded)
427            }
428        };
429        Ok((name, column))
430    }
431}
432
433#[derive(Debug)]
434struct DeltaStats {
435    min_delta: i128,
436    max_delta: i128,
437    min_delta_delta: i128,
438    max_delta_delta: i128,
439}
440
441fn determine_delta_compressability(ints: &[i64]) -> DeltaStats {
442    let mut min_delta;
443    let mut max_delta;
444    let mut min_delta_delta = i128::MAX;
445    let mut max_delta_delta = i128::MIN;
446
447    if ints.len() < 2 {
448        return DeltaStats {
449            min_delta: i128::MIN,
450            max_delta: i128::MAX,
451            min_delta_delta: i128::MIN,
452            max_delta_delta: i128::MAX,
453        };
454    }
455
456    let mut previous = ints[1];
457    let mut previous_delta = (ints[1] - ints[0]) as i128;
458    min_delta = previous_delta;
459    max_delta = previous_delta;
460    for curr in &ints[2..] {
461        let delta = (*curr - previous) as i128;
462        min_delta = min_delta.min(delta);
463        max_delta = max_delta.max(delta);
464        let delta_delta = delta - previous_delta;
465        min_delta_delta = min_delta_delta.min(delta_delta);
466        max_delta_delta = max_delta_delta.max(delta_delta);
467        previous = *curr;
468        previous_delta = delta;
469    }
470
471    DeltaStats {
472        min_delta,
473        max_delta,
474        min_delta_delta,
475        max_delta_delta,
476    }
477}
478
479fn delta_encode<T>(ints: &[i64]) -> Vec<T>
480where
481    T: TryFrom<i64>,
482    <T as TryFrom<i64>>::Error: std::fmt::Debug,
483{
484    let mut encoded = Vec::with_capacity(ints.len());
485    let mut previous = ints[0];
486    for curr in &ints[1..] {
487        let delta = *curr - previous;
488        encoded.push(T::try_from(delta).unwrap());
489        previous = *curr;
490    }
491    encoded
492}
493
494fn double_delta_encode<T>(ints: &[i64]) -> Vec<T>
495where
496    T: TryFrom<i64>,
497    <T as TryFrom<i64>>::Error: std::fmt::Debug,
498{
499    let mut encoded = Vec::with_capacity(ints.len());
500    let mut previous = ints[1];
501    let mut previous_delta = ints[1] - ints[0];
502    for curr in &ints[2..] {
503        let delta = curr - previous;
504        let delta_delta = delta - previous_delta;
505        encoded.push(T::try_from(delta_delta).unwrap());
506        previous = *curr;
507        previous_delta = delta;
508    }
509    encoded
510}
511
512pub mod any_val_syntax {
513    pub fn vf64<F>(x: F) -> super::AnyVal
514    where
515        F: TryInto<f64>,
516        <F as TryInto<f64>>::Error: std::fmt::Debug,
517    {
518        super::AnyVal::Float(x.try_into().unwrap())
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use pretty_assertions::assert_eq;
526
527    fn test_compression(ints: Vec<i64>, max_bytes: usize) {
528        let column = Column::Int(ints.clone());
529        let serialized = column.serialize();
530        assert!(serialized.len() < max_bytes, "took {} bytes", serialized.len());
531        let (_, deserialized) = Column::deserialize(&serialized).unwrap();
532        match deserialized {
533            Column::Int(xs) => assert_eq!(ints, xs),
534            _ => panic!("expected int column"),
535        }
536    }
537
538    #[test]
539    fn test_range_compression() {
540        test_compression(
541            (0..1024)
542                .map(|i| -1231429 + i * 241248124)
543                .collect::<Vec<_>>(),
544            64,
545        );
546    }
547
548    #[test]
549    fn test_i8_delta_compression() {
550        test_compression(
551            (0..1024)
552                .map(|i| -1231429 + i * 32 - i % 32)
553                .collect::<Vec<_>>(),
554            64 + 1024,
555        );
556    }
557
558    #[test]
559    fn test_i16_delta_compression() {
560        test_compression(
561            (0..1024)
562                .map(|i| -1231429 + i * 512 - (i * 7) % 1024)
563                .collect::<Vec<_>>(),
564            64 + 2048,
565        );
566    }
567
568    #[test]
569    fn test_i8_delta_delta_compression() {
570        test_compression(
571            (0..1024)
572                .map(|i| -1231429 + i * 3812384134 - (i * 7) % 32)
573                .collect::<Vec<_>>(),
574            64 + 1024,
575        );
576    }
577}