1use capnp::serialize_packed;
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, HashSet};
4
5use crate::api_capnp::{self, query_response};
6use crate::default_reader_options;
7
8#[derive(Serialize, Deserialize, Debug)]
9pub struct ColumnNameRequest {
10 pub tables: Vec<String>,
11 pub pattern: Option<String>,
12 pub offset: Option<usize>,
13 pub limit: Option<usize>,
14}
15
16#[derive(Serialize, Deserialize, Debug)]
17pub struct ColumnNameResponse {
18 pub columns: Vec<String>,
19 pub offset: usize,
20 pub len: usize,
21}
22
23#[derive(Serialize, Deserialize, Debug)]
24pub struct QueryRequest {
25 pub query: String,
26}
27
28#[derive(Serialize, Deserialize, Debug)]
29pub struct MultiQueryRequest {
30 pub queries: Vec<String>,
31 pub encoding_opts: Option<EncodingOpts>,
32}
33
34#[derive(Serialize, Deserialize, Debug)]
35pub struct QueryResponse {
36 pub columns: HashMap<String, Column>,
37}
38
39#[derive(Serialize, Deserialize, Debug, Clone)]
40pub struct EncodingOpts {
41 pub xor_float_compression: bool,
42 pub mantissa: Option<u32>,
43 pub full_precision_cols: HashSet<String>,
44}
45
46pub struct MultiQueryResponse {
47 pub responses: Vec<QueryResponse>,
48}
49
50#[derive(Serialize, Deserialize, Debug, Clone)]
51pub enum Column {
52 Float(Vec<f64>),
53 Int(Vec<i64>),
54 String(Vec<String>),
55 Mixed(Vec<AnyVal>),
56 Null(usize),
57
58 Xor(Vec<u8>),
59}
60
61#[derive(Serialize, Deserialize, Debug, Clone)]
62pub enum AnyVal {
63 Int(i64),
64 Float(f64),
65 Str(String),
66 Null,
67}
68
69impl Column {
70 pub fn size_bytes(&self) -> usize {
71 let heapsize = match self {
72 Column::Float(xs) => xs.len() * std::mem::size_of::<f64>(),
73 Column::Int(xs) => xs.len() * std::mem::size_of::<i64>(),
74 Column::String(xs) => xs.iter().map(|s| s.len()).sum(),
75 Column::Mixed(xs) => xs
76 .iter()
77 .map(|m| match m {
78 AnyVal::Str(s) => s.len() + std::mem::size_of::<AnyVal>(),
79 _ => std::mem::size_of::<AnyVal>(),
80 })
81 .sum(),
82 Column::Null(_) => 0,
83 Column::Xor(xs) => xs.len(),
84 };
85 heapsize + std::mem::size_of::<Column>()
86 }
87
88 pub fn len(&self) -> usize {
89 match self {
90 Column::Float(xs) => xs.len(),
91 Column::Int(xs) => xs.len(),
92 Column::String(xs) => xs.len(),
93 Column::Mixed(xs) => xs.len(),
94 Column::Null(n) => *n,
95 Column::Xor(_) => panic!("len() not implemented for xor compressed columns"),
96 }
97 }
98
99 #[must_use]
100 pub fn is_empty(&self) -> bool {
101 self.len() == 0
102 }
103}
104
105impl MultiQueryResponse {
106 pub fn deserialize(data: &[u8]) -> capnp::Result<MultiQueryResponse> {
107 let message_reader =
108 serialize_packed::read_message(data, default_reader_options())?;
109 let multi_query_response =
110 message_reader.get_root::<api_capnp::multi_query_response::Reader>()?;
111 let mut responses = Vec::new();
112 for query_response in multi_query_response.get_responses()?.iter() {
113 responses.push(QueryResponse::deserialize_reader(query_response)?);
114 }
115 Ok(MultiQueryResponse { responses })
116 }
117
118 pub fn serialize(&self) -> Vec<u8> {
119 let mut builder = capnp::message::Builder::new_default();
120 let multi_query_response = builder.init_root::<api_capnp::multi_query_response::Builder>();
121 let mut responses = multi_query_response.init_responses(self.responses.len() as u32);
122 for (i, response) in self.responses.iter().enumerate() {
123 let mut response_builder = responses.reborrow().get(i as u32);
124 response.serialize_builder(&mut response_builder);
125 }
126 let mut buf = Vec::new();
127 serialize_packed::write_message(&mut buf, &builder).unwrap();
128 buf
129 }
130}
131
132impl QueryResponse {
133 pub fn deserialize(data: &[u8]) -> capnp::Result<QueryResponse> {
134 let message_reader =
135 serialize_packed::read_message(data, default_reader_options()).unwrap();
136 let query_response = message_reader.get_root::<api_capnp::query_response::Reader>()?;
137 QueryResponse::deserialize_reader(query_response)
138 }
139
140 pub fn deserialize_reader(
141 query_response: query_response::Reader,
142 ) -> capnp::Result<QueryResponse> {
143 let mut columns = HashMap::new();
144 for column in query_response.get_columns()?.iter() {
145 let (name, column) = Column::deserialize_reader(column)?;
146 columns.insert(name, column);
147 }
148 Ok(QueryResponse { columns })
149 }
150
151 pub fn serialize(&self) -> Vec<u8> {
152 let mut builder = capnp::message::Builder::new_default();
153 let mut query_response = builder.init_root::<query_response::Builder>();
154 self.serialize_builder(&mut query_response);
155 let mut buf = Vec::new();
156 serialize_packed::write_message(&mut buf, &builder).unwrap();
157 buf
158 }
159
160 pub fn serialize_builder(&self, query_response: &mut query_response::Builder) {
161 let mut columns = query_response
162 .reborrow()
163 .init_columns(self.columns.len() as u32);
164 for (i, (name, column)) in self.columns.iter().enumerate() {
165 let mut column_builder = columns.reborrow().get(i as u32);
166 column.serialize_builder(name, &mut column_builder);
167 }
168 }
169}
170
171impl Column {
172 #[cfg(test)]
173 fn serialize(&self) -> Vec<u8> {
174 let mut builder = capnp::message::Builder::new_default();
175 let mut column = builder.init_root::<api_capnp::column::Builder>();
176 self.serialize_builder("", &mut column);
177 let mut buf = Vec::new();
178 serialize_packed::write_message(&mut buf, &builder).unwrap();
179 buf
180 }
181
182 fn serialize_builder(&self, name: &str, column_builder: &mut api_capnp::column::Builder) {
183 column_builder.set_name(name);
184 match self {
185 Column::Float(xs) => column_builder
186 .reborrow()
187 .init_data()
188 .set_f64(&xs[..])
189 .unwrap(),
190 Column::Int(xs) => {
191 let delta_stats = determine_delta_compressability(&xs[..]);
192 if delta_stats.min_delta == delta_stats.max_delta
193 && delta_stats.max_delta <= i64::MAX as i128
194 && delta_stats.max_delta >= i64::MIN as i128
195 {
196 let mut range = column_builder.reborrow().init_data().init_range();
197 range.set_start(xs[0]);
198 range.set_len(xs.len() as u64);
199 range.set_step(delta_stats.min_delta as i64);
200 } else if delta_stats.min_delta >= i8::MIN as i128
201 && delta_stats.max_delta <= i8::MAX as i128
202 {
203 let mut delta_encoded = column_builder
204 .reborrow()
205 .init_data()
206 .init_delta_encoded_i8();
207 delta_encoded.set_first(xs[0]);
208 delta_encoded.set_data(&delta_encode(xs)[..]).unwrap();
209 } else if delta_stats.min_delta_delta >= i8::MIN as i128
210 && delta_stats.max_delta_delta <= i8::MAX as i128
211 {
212 let mut double_delta_encoded = column_builder
213 .reborrow()
214 .init_data()
215 .init_double_delta_encoded_i8();
216 double_delta_encoded.set_first(xs[0]);
217 double_delta_encoded.set_second(xs[1]);
218 double_delta_encoded
219 .set_data(&double_delta_encode(xs)[..])
220 .unwrap();
221 } else if delta_stats.min_delta >= i16::MIN as i128
222 && delta_stats.max_delta <= i16::MAX as i128
223 {
224 let mut delta_encoded = column_builder
225 .reborrow()
226 .init_data()
227 .init_delta_encoded_i16();
228 delta_encoded.set_first(xs[0]);
229 delta_encoded.set_data(&delta_encode(xs)[..]).unwrap();
230 } else if delta_stats.min_delta_delta >= i16::MIN as i128
231 && delta_stats.max_delta_delta <= i16::MAX as i128
232 {
233 let mut double_delta_encoded = column_builder
234 .reborrow()
235 .init_data()
236 .init_double_delta_encoded_i16();
237 double_delta_encoded.set_first(xs[0]);
238 double_delta_encoded.set_second(xs[1]);
239 double_delta_encoded
240 .set_data(&double_delta_encode(xs)[..])
241 .unwrap();
242 } else if delta_stats.min_delta >= i32::MIN as i128
243 && delta_stats.max_delta <= i32::MAX as i128
244 {
245 let mut delta_encoded = column_builder
246 .reborrow()
247 .init_data()
248 .init_delta_encoded_i32();
249 delta_encoded.set_first(xs[0]);
250 delta_encoded.set_data(&delta_encode(xs)[..]).unwrap();
251 } else if delta_stats.min_delta_delta >= i32::MIN as i128
252 && delta_stats.max_delta_delta <= i32::MAX as i128
253 {
254 let mut double_delta_encoded = column_builder
255 .reborrow()
256 .init_data()
257 .init_double_delta_encoded_i32();
258 double_delta_encoded.set_first(xs[0]);
259 double_delta_encoded.set_second(xs[1]);
260 double_delta_encoded
261 .set_data(&double_delta_encode(xs)[..])
262 .unwrap();
263 } else {
264 column_builder
265 .reborrow()
266 .init_data()
267 .set_i64(&xs[..])
268 .unwrap()
269 }
270 }
271 Column::String(xs) => {
272 let mut strings = column_builder
273 .reborrow()
274 .init_data()
275 .init_string(xs.len() as u32);
276 for (i, s) in xs.iter().enumerate() {
277 strings.set(i as u32, s);
278 }
279 }
280 Column::Mixed(xs) => {
281 let mut mixed = column_builder
282 .reborrow()
283 .init_data()
284 .init_mixed(xs.len() as u32);
285 for (i, x) in xs.iter().enumerate() {
286 let mut x_builder = mixed.reborrow().get(i as u32);
287 match x {
288 AnyVal::Int(x) => x_builder.set_i64(*x),
289 AnyVal::Float(x) => x_builder.set_f64(*x),
290 AnyVal::Str(x) => x_builder.set_string(x),
291 AnyVal::Null => x_builder.set_null(()),
292 }
293 }
294 }
295 Column::Null(n) => column_builder.reborrow().init_data().set_null(*n as u64),
296 Column::Xor(xs) => column_builder.reborrow().init_data().set_xor_f64(&xs[..]),
297 };
298 }
299
300 #[cfg(test)]
301 fn deserialize(data: &[u8]) -> capnp::Result<(String, Column)> {
302 let message_reader =
303 serialize_packed::read_message(data, default_reader_options()).unwrap();
304 let column = message_reader.get_root::<api_capnp::column::Reader>()?;
305 Column::deserialize_reader(column)
306 }
307
308 fn deserialize_reader(column: api_capnp::column::Reader) -> capnp::Result<(String, Column)> {
309 let name = column.get_name()?.to_string().unwrap();
310 use api_capnp::column::data::Which;
311 let column = match column.get_data().which()? {
312 Which::F64(xs) => Column::Float(xs?.iter().collect()),
313 Which::I64(xs) => Column::Int(xs?.iter().collect()),
314 Which::String(xs) => {
315 let mut strings = Vec::new();
316 for s in xs?.iter() {
317 strings.push(s?.to_string().unwrap());
318 }
319 Column::String(strings)
320 }
321 Which::Mixed(xs) => {
322 let mut mixed = Vec::new();
323 for x in xs?.iter() {
324 use api_capnp::any_val::Which;
325 let x = match x.which()? {
326 Which::I64(x) => AnyVal::Int(x),
327 Which::F64(x) => AnyVal::Float(x),
328 Which::String(x) => AnyVal::Str(x?.to_string().unwrap()),
329 Which::Null(()) => AnyVal::Null,
330 };
331 mixed.push(x);
332 }
333 Column::Mixed(mixed)
334 }
335 Which::Null(xs) => Column::Null(xs as usize),
336 Which::XorF64(xs) => Column::Xor(xs?.to_vec()),
337 Which::DeltaEncodedI8(xs) => {
338 let first = xs.get_first();
339 let data = xs.get_data()?;
340 let mut decoded = Vec::with_capacity(data.len() as usize + 1);
341 decoded.push(first);
342 let mut last = first;
343 for i in data {
344 last += i as i64;
345 decoded.push(last);
346 }
347 Column::Int(decoded)
348 }
349 Which::DeltaEncodedI16(xs) => {
350 let first = xs.get_first();
351 let data = xs.get_data()?;
352 let mut decoded = Vec::with_capacity(data.len() as usize + 1);
353 decoded.push(first);
354 let mut last = first;
355 for i in data {
356 last += i as i64;
357 decoded.push(last);
358 }
359 Column::Int(decoded)
360 }
361 Which::DeltaEncodedI32(xs) => {
362 let first = xs.get_first();
363 let data = xs.get_data()?;
364 let mut decoded = Vec::with_capacity(data.len() as usize + 1);
365 decoded.push(first);
366 let mut last = first;
367 for i in data {
368 last += i as i64;
369 decoded.push(last);
370 }
371 Column::Int(decoded)
372 }
373 Which::DoubleDeltaEncodedI8(xs) => {
374 let first = xs.get_first();
375 let second = xs.get_second();
376 let data = xs.get_data()?;
377 let mut decoded = Vec::with_capacity(data.len() as usize + 2);
378 decoded.push(first);
379 decoded.push(second);
380 let mut last = second;
381 let mut last_delta = second - first;
382 for i in data {
383 last_delta += i as i64;
384 last += last_delta;
385 decoded.push(last);
386 }
387 Column::Int(decoded)
388 }
389 Which::DoubleDeltaEncodedI16(xs) => {
390 let first = xs.get_first();
391 let second = xs.get_second();
392 let data = xs.get_data()?;
393 let mut decoded = Vec::with_capacity(data.len() as usize + 2);
394 decoded.push(first);
395 decoded.push(second);
396 let mut last = second;
397 let mut last_delta = second - first;
398 for i in data {
399 last_delta += i as i64;
400 last += last_delta;
401 decoded.push(last);
402 }
403 Column::Int(decoded)
404 }
405 Which::DoubleDeltaEncodedI32(xs) => {
406 let first = xs.get_first();
407 let second = xs.get_second();
408 let data = xs.get_data()?;
409 let mut decoded = Vec::with_capacity(data.len() as usize + 2);
410 decoded.push(first);
411 decoded.push(second);
412 let mut last = second;
413 let mut last_delta = second - first;
414 for i in data {
415 last_delta += i as i64;
416 last += last_delta;
417 decoded.push(last);
418 }
419 Column::Int(decoded)
420 }
421 Which::Range(xs) => {
422 let start = xs.get_start();
423 let len = xs.get_len() as usize;
424 let step = xs.get_step();
425 let decoded = (0..len).map(|i| start + i as i64 * step).collect();
426 Column::Int(decoded)
427 }
428 };
429 Ok((name, column))
430 }
431}
432
433#[derive(Debug)]
434struct DeltaStats {
435 min_delta: i128,
436 max_delta: i128,
437 min_delta_delta: i128,
438 max_delta_delta: i128,
439}
440
441fn determine_delta_compressability(ints: &[i64]) -> DeltaStats {
442 let mut min_delta;
443 let mut max_delta;
444 let mut min_delta_delta = i128::MAX;
445 let mut max_delta_delta = i128::MIN;
446
447 if ints.len() < 2 {
448 return DeltaStats {
449 min_delta: i128::MIN,
450 max_delta: i128::MAX,
451 min_delta_delta: i128::MIN,
452 max_delta_delta: i128::MAX,
453 };
454 }
455
456 let mut previous = ints[1];
457 let mut previous_delta = (ints[1] - ints[0]) as i128;
458 min_delta = previous_delta;
459 max_delta = previous_delta;
460 for curr in &ints[2..] {
461 let delta = (*curr - previous) as i128;
462 min_delta = min_delta.min(delta);
463 max_delta = max_delta.max(delta);
464 let delta_delta = delta - previous_delta;
465 min_delta_delta = min_delta_delta.min(delta_delta);
466 max_delta_delta = max_delta_delta.max(delta_delta);
467 previous = *curr;
468 previous_delta = delta;
469 }
470
471 DeltaStats {
472 min_delta,
473 max_delta,
474 min_delta_delta,
475 max_delta_delta,
476 }
477}
478
479fn delta_encode<T>(ints: &[i64]) -> Vec<T>
480where
481 T: TryFrom<i64>,
482 <T as TryFrom<i64>>::Error: std::fmt::Debug,
483{
484 let mut encoded = Vec::with_capacity(ints.len());
485 let mut previous = ints[0];
486 for curr in &ints[1..] {
487 let delta = *curr - previous;
488 encoded.push(T::try_from(delta).unwrap());
489 previous = *curr;
490 }
491 encoded
492}
493
494fn double_delta_encode<T>(ints: &[i64]) -> Vec<T>
495where
496 T: TryFrom<i64>,
497 <T as TryFrom<i64>>::Error: std::fmt::Debug,
498{
499 let mut encoded = Vec::with_capacity(ints.len());
500 let mut previous = ints[1];
501 let mut previous_delta = ints[1] - ints[0];
502 for curr in &ints[2..] {
503 let delta = curr - previous;
504 let delta_delta = delta - previous_delta;
505 encoded.push(T::try_from(delta_delta).unwrap());
506 previous = *curr;
507 previous_delta = delta;
508 }
509 encoded
510}
511
512pub mod any_val_syntax {
513 pub fn vf64<F>(x: F) -> super::AnyVal
514 where
515 F: TryInto<f64>,
516 <F as TryInto<f64>>::Error: std::fmt::Debug,
517 {
518 super::AnyVal::Float(x.try_into().unwrap())
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525 use pretty_assertions::assert_eq;
526
527 fn test_compression(ints: Vec<i64>, max_bytes: usize) {
528 let column = Column::Int(ints.clone());
529 let serialized = column.serialize();
530 assert!(serialized.len() < max_bytes, "took {} bytes", serialized.len());
531 let (_, deserialized) = Column::deserialize(&serialized).unwrap();
532 match deserialized {
533 Column::Int(xs) => assert_eq!(ints, xs),
534 _ => panic!("expected int column"),
535 }
536 }
537
538 #[test]
539 fn test_range_compression() {
540 test_compression(
541 (0..1024)
542 .map(|i| -1231429 + i * 241248124)
543 .collect::<Vec<_>>(),
544 64,
545 );
546 }
547
548 #[test]
549 fn test_i8_delta_compression() {
550 test_compression(
551 (0..1024)
552 .map(|i| -1231429 + i * 32 - i % 32)
553 .collect::<Vec<_>>(),
554 64 + 1024,
555 );
556 }
557
558 #[test]
559 fn test_i16_delta_compression() {
560 test_compression(
561 (0..1024)
562 .map(|i| -1231429 + i * 512 - (i * 7) % 1024)
563 .collect::<Vec<_>>(),
564 64 + 2048,
565 );
566 }
567
568 #[test]
569 fn test_i8_delta_delta_compression() {
570 test_compression(
571 (0..1024)
572 .map(|i| -1231429 + i * 3812384134 - (i * 7) % 32)
573 .collect::<Vec<_>>(),
574 64 + 1024,
575 );
576 }
577}