avx_arrow/compression/
delta.rs1use crate::error::{ArrowError, Result};
6use std::mem;
7
8pub fn encode_i64(data: &[i64]) -> Result<Vec<u8>> {
10 if data.is_empty() {
11 return Ok(Vec::new());
12 }
13
14 let mut output = Vec::with_capacity(data.len() * mem::size_of::<i64>());
15
16 output.extend_from_slice(&data[0].to_le_bytes());
18
19 for i in 1..data.len() {
21 let delta = data[i].wrapping_sub(data[i - 1]);
22 write_varint(&mut output, delta);
23 }
24
25 Ok(output)
26}
27
28pub fn decode_i64(data: &[u8]) -> Result<Vec<i64>> {
30 if data.len() < 8 {
31 return Err(ArrowError::InvalidData(
32 "Delta data must have at least 8 bytes".to_string()
33 ));
34 }
35
36 let mut output = Vec::new();
37 let mut i = 0;
38
39 let first = i64::from_le_bytes(data[i..i+8].try_into().unwrap());
41 output.push(first);
42 i += 8;
43
44 let mut last = first;
46 while i < data.len() {
47 let (delta, bytes_read) = read_varint(&data[i..])?;
48 last = last.wrapping_add(delta);
49 output.push(last);
50 i += bytes_read;
51 }
52
53 Ok(output)
54}
55
56pub fn encode_f64(data: &[f64]) -> Result<Vec<u8>> {
58 let bits: Vec<i64> = data.iter()
60 .map(|&f| f.to_bits() as i64)
61 .collect();
62 encode_i64(&bits)
63}
64
65pub fn decode_f64(data: &[u8]) -> Result<Vec<f64>> {
67 let bits = decode_i64(data)?;
68 Ok(bits.iter()
69 .map(|&i| f64::from_bits(i as u64))
70 .collect())
71}
72
73fn write_varint(output: &mut Vec<u8>, value: i64) {
75 let zigzag = ((value << 1) ^ (value >> 63)) as u64;
77
78 let mut n = zigzag;
79 loop {
80 let mut byte = (n & 0x7F) as u8;
81 n >>= 7;
82 if n != 0 {
83 byte |= 0x80; }
85 output.push(byte);
86 if n == 0 {
87 break;
88 }
89 }
90}
91
92fn read_varint(data: &[u8]) -> Result<(i64, usize)> {
94 let mut result = 0u64;
95 let mut shift = 0;
96 let mut i = 0;
97
98 loop {
99 if i >= data.len() {
100 return Err(ArrowError::InvalidData(
101 "Truncated varint".to_string()
102 ));
103 }
104
105 let byte = data[i];
106 result |= ((byte & 0x7F) as u64) << shift;
107 i += 1;
108
109 if byte & 0x80 == 0 {
110 break;
111 }
112 shift += 7;
113
114 if shift >= 64 {
115 return Err(ArrowError::InvalidData(
116 "Varint too large".to_string()
117 ));
118 }
119 }
120
121 let value = ((result >> 1) as i64) ^ -((result & 1) as i64);
123 Ok((value, i))
124}
125
126pub struct DeltaEncoder {
128 last: Option<i64>,
129 buffer: Vec<u8>,
130}
131
132impl DeltaEncoder {
133 pub fn new() -> Self {
135 Self {
136 last: None,
137 buffer: Vec::new(),
138 }
139 }
140
141 pub fn encode_i64(&mut self, value: i64) {
143 match self.last {
144 None => {
145 self.buffer.extend_from_slice(&value.to_le_bytes());
146 self.last = Some(value);
147 }
148 Some(last) => {
149 let delta = value.wrapping_sub(last);
150 write_varint(&mut self.buffer, delta);
151 self.last = Some(value);
152 }
153 }
154 }
155
156 pub fn encode_f64(&mut self, value: f64) {
158 self.encode_i64(value.to_bits() as i64);
159 }
160
161 pub fn finish(self) -> Vec<u8> {
163 self.buffer
164 }
165}
166
167impl Default for DeltaEncoder {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn test_delta_sequential() {
179 let data: Vec<i64> = (0..1000).collect();
180 let encoded = encode_i64(&data).unwrap();
181 assert!(encoded.len() < data.len() * 8);
182
183 let decoded = decode_i64(&encoded).unwrap();
184 assert_eq!(decoded, data);
185 }
186
187 #[test]
188 fn test_delta_timestamps() {
189 let base = 1700000000i64;
190 let data: Vec<i64> = (0..100).map(|i| base + i * 1000).collect();
191 let encoded = encode_i64(&data).unwrap();
192 let decoded = decode_i64(&encoded).unwrap();
193 assert_eq!(decoded, data);
194 }
195
196 #[test]
197 fn test_delta_f64() {
198 let data: Vec<f64> = (0..100).map(|i| i as f64 * 0.1).collect();
199 let encoded = encode_f64(&data).unwrap();
200 let decoded = decode_f64(&encoded).unwrap();
201
202 for (a, b) in data.iter().zip(decoded.iter()) {
203 assert!((a - b).abs() < 1e-10);
204 }
205 }
206
207 #[test]
208 fn test_varint() {
209 let values = vec![0i64, 1, -1, 127, -127, 128, -128, 10000, -10000];
210 let mut buffer = Vec::new();
211
212 for &v in &values {
213 write_varint(&mut buffer, v);
214 }
215
216 let mut pos = 0;
217 for &v in &values {
218 let (decoded, bytes) = read_varint(&buffer[pos..]).unwrap();
219 assert_eq!(decoded, v);
220 pos += bytes;
221 }
222 }
223
224 #[test]
225 fn test_delta_encoder() {
226 let mut encoder = DeltaEncoder::new();
227 for i in 0..100 {
228 encoder.encode_i64(i);
229 }
230
231 let encoded = encoder.finish();
232 let decoded = decode_i64(&encoded).unwrap();
233 assert_eq!(decoded, (0..100).collect::<Vec<_>>());
234 }
235}
236
237
238
239
240