1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::io::{Cursor, Write};

use arrow_array::{cast::AsArray, Array, ArrayRef};
use arrow_buffer::{BooleanBufferBuilder, Buffer};
use arrow_schema::DataType;

use lance_arrow::DataTypeExt;
use lance_core::Result;

use crate::encoder::{BufferEncoder, EncodedBuffer, EncodedBufferMeta};

use super::value::CompressionScheme;

#[derive(Debug, Default)]
pub struct FlatBufferEncoder {}

impl BufferEncoder for FlatBufferEncoder {
    fn encode(&self, arrays: &[ArrayRef]) -> Result<(EncodedBuffer, EncodedBufferMeta)> {
        let parts = arrays
            .iter()
            .map(|arr| arr.to_data().buffers()[0].clone())
            .collect::<Vec<_>>();
        let data_type = arrays[0].data_type();
        Ok((
            EncodedBuffer { parts },
            EncodedBufferMeta {
                bits_per_value: (data_type.byte_width() * 8) as u64,
                bitpacking: None,
                compression_scheme: None,
            },
        ))
    }
}

pub trait BufferCompressor: std::fmt::Debug + Send + Sync {
    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
}

#[derive(Debug, Default)]
pub struct ZstdBufferCompressor {}

impl BufferCompressor for ZstdBufferCompressor {
    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
        let mut encoder = zstd::Encoder::new(output_buf, 0)?;
        encoder.write_all(input_buf)?;
        match encoder.finish() {
            Ok(_) => Ok(()),
            Err(e) => Err(e.into()),
        }
    }

    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
        let source = Cursor::new(input_buf);
        zstd::stream::copy_decode(source, output_buf)?;
        Ok(())
    }
}

pub struct GeneralBufferCompressor {}

impl GeneralBufferCompressor {
    pub fn get_compressor(compression_type: &str) -> Box<dyn BufferCompressor> {
        match compression_type {
            "" => Box::<ZstdBufferCompressor>::default(),
            "zstd" => Box::<ZstdBufferCompressor>::default(),
            _ => panic!("Unsupported compression type: {}", compression_type),
        }
    }
}

// An encoder which uses lightweight compression, such as zstd/lz4 to encode buffers
#[derive(Debug)]
pub struct CompressedBufferEncoder {
    compressor: Box<dyn BufferCompressor>,
}

impl Default for CompressedBufferEncoder {
    fn default() -> Self {
        Self {
            compressor: GeneralBufferCompressor::get_compressor("zstd"),
        }
    }
}

impl CompressedBufferEncoder {
    pub fn new(compression_type: &str) -> Self {
        let compressor = GeneralBufferCompressor::get_compressor(compression_type);
        Self { compressor }
    }
}

impl BufferEncoder for CompressedBufferEncoder {
    fn encode(&self, arrays: &[ArrayRef]) -> Result<(EncodedBuffer, EncodedBufferMeta)> {
        let mut parts = Vec::with_capacity(arrays.len());
        for arr in arrays {
            let buffer = arr.to_data().buffers()[0].clone();
            let buffer_len = buffer.len();
            let buffer_data = buffer.as_slice();
            let mut compressed = Vec::with_capacity(buffer_len);
            self.compressor.compress(buffer_data, &mut compressed)?;
            parts.push(Buffer::from(compressed));
        }

        let data_type = arrays[0].data_type();

        Ok((
            EncodedBuffer { parts },
            EncodedBufferMeta {
                bits_per_value: (data_type.byte_width() * 8) as u64,
                bitpacking: None,
                compression_scheme: Some(CompressionScheme::Zstd),
            },
        ))
    }
}

// Encoder for writing boolean arrays as dense bitmaps
#[derive(Debug, Default)]
pub struct BitmapBufferEncoder {}

impl BufferEncoder for BitmapBufferEncoder {
    fn encode(&self, arrays: &[ArrayRef]) -> Result<(EncodedBuffer, EncodedBufferMeta)> {
        debug_assert!(arrays
            .iter()
            .all(|arr| *arr.data_type() == DataType::Boolean));
        let num_rows: u32 = arrays.iter().map(|arr| arr.len() as u32).sum();
        // Empty pages don't make sense, this should be prevented before we
        // get here
        debug_assert_ne!(num_rows, 0);
        // We can't just write the inner value buffers one after the other because
        // bitmaps can have junk padding at the end (e.g. a boolean array with 12
        // values will be 2 bytes but the last four bits of the second byte are
        // garbage).  So we go ahead and pay the cost of a copy (we could avoid this
        // if we really needed to, at the expense of more complicated code and a slightly
        // larger encoded size but writer cost generally doesn't matter as much as reader cost)
        let mut builder = BooleanBufferBuilder::new(num_rows as usize);
        for arr in arrays {
            let bool_arr = arr.as_boolean();
            builder.append_buffer(bool_arr.values());
        }
        let buffer = builder.finish().into_inner();
        let parts = vec![buffer];
        let buffer = EncodedBuffer { parts };
        Ok((
            buffer,
            EncodedBufferMeta {
                bits_per_value: 1,
                bitpacking: None,
                compression_scheme: None,
            },
        ))
    }
}