locustdb-serialization 0.2.2

Serialization formats used by LocustDB for peristent storage and client/server communication.
Documentation
use std::collections::HashMap;

use crate::api::AnyVal;
use crate::default_reader_options;
use crate::wal_segment_capnp::{self, table_segment_list};

#[derive(Default, Clone, Debug)]
pub struct EventBuffer {
    pub tables: HashMap<String, TableBuffer>,
}

#[derive(Default, Clone, Debug)]
pub struct TableBuffer {
    pub len: u64,
    pub columns: HashMap<String, ColumnBuffer>,
}

#[derive(Default, Clone, Debug)]
pub struct ColumnBuffer {
    pub data: ColumnData,
}

#[derive(Clone, Debug, Default)]
pub enum ColumnData {
    #[default]
    Empty,
    Dense(Vec<f64>),
    Sparse(Vec<(u64, f64)>),
    I64(Vec<i64>),
    SparseI64(Vec<(u64, i64)>),
    String(Vec<String>),
}

impl ColumnData {
    pub fn len(&self) -> usize {
        match self {
            ColumnData::Dense(data) => data.len(),
            ColumnData::Sparse(data) => data.len(),
            ColumnData::I64(data) => data.len(),
            ColumnData::SparseI64(data) => data.len(),
            ColumnData::String(data) => data.len(),
            ColumnData::Empty => 0,
        }
    }

    #[must_use]
    pub fn is_empty(&self) -> bool {
        match self {
            ColumnData::Dense(data) => data.is_empty(),
            ColumnData::Sparse(data) => data.is_empty(),
            ColumnData::I64(data) => data.is_empty(),
            ColumnData::SparseI64(data) => data.is_empty(),
            ColumnData::String(data) => data.is_empty(),
            ColumnData::Empty => true,
        }
    }
}

impl ColumnBuffer {
    pub fn push(&mut self, value: AnyVal, existing_len: u64) {
        match (&mut self.data, value) {
            (_, AnyVal::Null) => {}
            (ColumnData::Empty, AnyVal::Float(value)) => {
                if existing_len == 0 {
                    self.data = ColumnData::Dense(vec![value])
                } else {
                    self.data = ColumnData::Sparse(vec![(existing_len, value)])
                }
            }
            (ColumnData::Empty, AnyVal::Int(value)) => {
                if existing_len == 0 {
                    self.data = ColumnData::I64(vec![value])
                } else {
                    self.data = ColumnData::SparseI64(vec![(existing_len, value)])
                }
            }
            (ColumnData::Empty, AnyVal::Str(value)) => {
                assert!(
                    existing_len == 0,
                    "Sparse columns not currently supported for string"
                );
                self.data = ColumnData::String(vec![value])
            }
            (ColumnData::Dense(_), AnyVal::Int(int)) => {
                self.push(AnyVal::Float(int as f64), existing_len)
            }
            (ColumnData::Dense(data), AnyVal::Float(value)) => {
                if data.len() as u64 == existing_len {
                    data.push(value)
                } else {
                    let mut sparse_data: Vec<(u64, f64)> = data
                        .drain(..)
                        .enumerate()
                        .map(|(i, v)| (i as u64, v))
                        .collect();
                    sparse_data.push((existing_len, value));
                    self.data = ColumnData::Sparse(sparse_data);
                }
            }
            (ColumnData::Sparse(_), AnyVal::Int(value)) => {
                self.push(AnyVal::Float(value as f64), existing_len)
            }
            (ColumnData::Sparse(data), AnyVal::Float(value)) => data.push((existing_len, value)),
            (ColumnData::I64(data), AnyVal::Int(value)) => {
                if data.len() as u64 == existing_len {
                    data.push(value)
                } else {
                    let mut sparse_data: Vec<(u64, i64)> = data
                        .drain(..)
                        .enumerate()
                        .map(|(i, v)| (i as u64, v))
                        .collect();
                    sparse_data.push((existing_len, value));
                    self.data = ColumnData::SparseI64(sparse_data);
                }
            }
            (ColumnData::I64(data), AnyVal::Float(value)) => {
                self.data = ColumnData::Dense(data.iter().map(|v| *v as f64).collect());
                self.push(AnyVal::Float(value), existing_len);
            }
            (ColumnData::SparseI64(data), AnyVal::Int(value)) => {
                data.push((existing_len, value));
            }
            (ColumnData::SparseI64(data), AnyVal::Float(value)) => {
                self.data = ColumnData::Sparse(data.iter().map(|(i, v)| (*i, *v as f64)).collect());
                self.push(AnyVal::Float(value), existing_len);
            }
            (ColumnData::String(data), AnyVal::Str(value)) => {
                assert!(
                    data.len() as u64 == existing_len,
                    "Sparse columns not currently supported for string"
                );
                data.push(value)
            }
            (column, data) => unimplemented!("Logging value {:?} to column {:?}", data, column),
        }
    }
}

impl EventBuffer {
    pub fn serialize(&self) -> Vec<u8> {
        let mut builder = capnp::message::Builder::new_default();
        let mut table_segment_list =
            builder.init_root::<wal_segment_capnp::table_segment_list::Builder>();
        self.serialize_builder(&mut table_segment_list);
        let mut buf = Vec::new();
        capnp::serialize_packed::write_message(&mut buf, &builder).unwrap();
        buf
    }

    pub fn serialize_builder(&self, table_segment_list: &mut table_segment_list::Builder) {
        assert!(self.tables.len() < u32::MAX as usize);
        let mut data = table_segment_list
            .reborrow()
            .init_data(self.tables.len() as u32);
        for (i, (name, table)) in self.tables.iter().enumerate() {
            let mut table_builder = data.reborrow().get(i as u32);
            table_builder.set_len(table.len);
            table_builder.set_name(name);
            let mut columns = table_builder
                .reborrow()
                .init_columns(table.columns.len() as u32);
            for (j, (colname, column)) in table.columns.iter().enumerate() {
                let mut column_builder = columns.reborrow().get(j as u32);
                column_builder.set_name(colname);
                match &column.data {
                    ColumnData::Dense(f64s) => {
                        column_builder.get_data().set_f64(&f64s[..]).unwrap();
                    }
                    ColumnData::Sparse(sparse) => {
                        let mut sparse_builder = column_builder.get_data().init_sparse_f64();
                        assert!(sparse.len() < u32::MAX as usize);
                        let (indices, values): (Vec<_>, Vec<_>) = sparse.iter().cloned().unzip();
                        sparse_builder.reborrow().set_indices(&indices[..]).unwrap();
                        sparse_builder.reborrow().set_values(&values[..]).unwrap();
                    }
                    ColumnData::I64(i64s) => {
                        column_builder.get_data().set_i64(&i64s[..]).unwrap();
                    }
                    ColumnData::String(strings) => {
                        column_builder.get_data().set_string(&strings[..]).unwrap();
                    }
                    ColumnData::Empty => {
                        column_builder.get_data().set_empty(());
                    }
                    ColumnData::SparseI64(sparse) => {
                        let mut sparse_builder = column_builder.get_data().init_sparse_i64();
                        assert!(sparse.len() < u32::MAX as usize);
                        let (indices, values): (Vec<_>, Vec<_>) = sparse.iter().cloned().unzip();
                        sparse_builder.reborrow().set_indices(&indices[..]).unwrap();
                        sparse_builder.reborrow().set_values(&values[..]).unwrap();
                    }
                }
            }
        }
    }

    pub fn deserialize(data: &[u8]) -> capnp::Result<Self> {
        let message_reader = capnp::serialize_packed::read_message(data, default_reader_options())?;
        let table_segment_list =
            message_reader.get_root::<wal_segment_capnp::table_segment_list::Reader>()?;
        let data = EventBuffer::deserialize_reader(table_segment_list)?;
        Ok(EventBuffer {
            tables: data.tables,
        })
    }

    pub fn deserialize_reader(data: table_segment_list::Reader) -> capnp::Result<Self> {
        let mut tables = HashMap::<String, TableBuffer>::new();
        for table in data.get_data()?.iter() {
            let name = table.get_name()?.to_string().unwrap();
            let len = table.get_len();
            let mut columns = HashMap::new();
            for column in table.get_columns()?.iter() {
                let colname = column.get_name()?.to_string().unwrap();
                let data = column.get_data();
                use crate::wal_segment_capnp::column::data::Which;
                let data = match data.which()? {
                    Which::F64(f64s) => ColumnData::Dense(f64s?.iter().collect()),
                    Which::SparseF64(sparse) => {
                        let indices = sparse.get_indices()?;
                        let values = sparse.get_values()?;
                        ColumnData::Sparse(indices.iter().zip(values.iter()).collect())
                    }
                    Which::I64(i64s) => ColumnData::I64(i64s?.iter().collect()),
                    Which::String(strs) => {
                        let mut strings = Vec::new();
                        for s in strs?.iter() {
                            strings.push(s?.to_string().unwrap());
                        }
                        ColumnData::String(strings)
                    }
                    Which::Empty(()) => ColumnData::Empty,
                    Which::SparseI64(sparse) => {
                        let indices = sparse.get_indices()?;
                        let values = sparse.get_values()?;
                        ColumnData::SparseI64(indices.iter().zip(values.iter()).collect())
                    }
                };
                columns.insert(colname, ColumnBuffer { data });
            }
            tables.insert(name, TableBuffer { len, columns });
        }
        Ok(EventBuffer { tables })
    }
}