pack_it/
table.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use crate::MemUsage;
5use anyhow::{anyhow, bail, ensure, Result};
6use arrow2::array::{
7    Array, MutableArray, MutableBooleanArray, MutableFixedSizeBinaryArray, MutablePrimitiveArray,
8    MutableUtf8Array, TryPush,
9};
10use arrow2::datatypes::{DataType, TimeUnit};
11use arrow2::io::parquet::write::Encoding;
12use arrow2::types::NativeType;
13
14#[derive(Clone)]
15pub struct TableField {
16    pub name: String,
17    pub kind: Kind,
18    pub nullable: bool,
19
20    pub encoding: Encoding,
21}
22
23impl TableField {
24    pub fn new(name: impl ToString, kind: Kind, nullable: bool) -> Self {
25        TableField {
26            name: name.to_string(),
27            kind,
28            nullable,
29            encoding: kind.default_encoding(),
30        }
31    }
32}
33
34#[derive(Copy, Clone)]
35pub enum Kind {
36    Bool,
37    Uuid,
38    U8,
39    I32,
40    I64,
41    F64,
42    String,
43
44    // do we want multiple types here?
45    TimestampSecsZ,
46}
47
48impl Kind {
49    pub fn array_with_capacity(self, capacity: usize) -> VarArray {
50        match self {
51            Kind::Bool => VarArray::new(MutableBooleanArray::with_capacity(capacity)),
52            Kind::U8 => VarArray::new(MutablePrimitiveArray::<u8>::with_capacity(capacity)),
53            Kind::I32 => VarArray::new(MutablePrimitiveArray::<i32>::with_capacity(capacity)),
54            Kind::I64 => VarArray::new(MutablePrimitiveArray::<i64>::with_capacity(capacity)),
55            Kind::F64 => VarArray::new(MutablePrimitiveArray::<f64>::with_capacity(capacity)),
56            Kind::String => VarArray::new(MutableUtf8Array::<i32>::with_capacity(capacity)),
57            Kind::Uuid => VarArray::new(MutableFixedSizeBinaryArray::with_capacity(16, capacity)),
58            Kind::TimestampSecsZ => {
59                VarArray::new(MutablePrimitiveArray::<i64>::with_capacity(capacity))
60            }
61        }
62    }
63
64    pub fn to_arrow(self) -> DataType {
65        match self {
66            Kind::Bool => DataType::Boolean,
67            Kind::U8 => DataType::UInt8,
68            Kind::I32 => DataType::Int32,
69            Kind::I64 => DataType::Int64,
70            Kind::F64 => DataType::Float64,
71            Kind::String => DataType::Utf8,
72            Kind::Uuid => DataType::FixedSizeBinary(16),
73            Kind::TimestampSecsZ => DataType::Timestamp(TimeUnit::Second, None),
74        }
75    }
76
77    pub fn from_arrow(arrow: &DataType) -> Result<Self> {
78        Ok(match arrow {
79            DataType::Utf8 => Kind::String,
80            DataType::Boolean => Kind::Bool,
81            DataType::Int64 => Kind::I64,
82            DataType::Int32 => Kind::I32,
83            DataType::UInt8 => Kind::U8,
84            DataType::Float64 => Kind::F64,
85            DataType::Timestamp(TimeUnit::Second, None) => Kind::TimestampSecsZ,
86            other => bail!("unsupported type {:?}", other),
87        })
88    }
89
90    pub fn default_encoding(&self) -> Encoding {
91        match self {
92            // don't think there's a reasonable encoding for these
93            Kind::Bool | Kind::U8 => Encoding::Plain,
94            // maybe this would practically benefit from the string encoding?
95            Kind::Uuid => Encoding::Plain,
96            // TODO: (writing with arrow2) > External format error: Invalid argument error: The datatype Int32 cannot be encoded by DeltaBinaryPacked
97            Kind::TimestampSecsZ | Kind::I64 | Kind::I32 => Encoding::Plain,
98            // TODO: Error: External format error: Invalid argument error: The datatype Float64 cannot be encoded by ByteStreamSplit
99            Kind::F64 => Encoding::Plain,
100            // TODO: (reading with datafusion) > ArrowError(ParquetError("Error reading batch from projects.parquet (size: 286037286): Parquet argument error: NYI: Encoding DELTA_LENGTH_BYTE_ARRAY is not supported"))
101            Kind::String => Encoding::Plain,
102        }
103    }
104}
105
106pub struct VarArray {
107    pub inner: Box<dyn MutableArray>,
108}
109
110impl VarArray {
111    fn new<T: MutableArray + 'static>(array: T) -> Self {
112        Self {
113            inner: Box::new(array),
114        }
115    }
116
117    pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
118        self.inner.as_any().downcast_ref()
119    }
120
121    pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
122        self.inner.as_mut_any().downcast_mut()
123    }
124
125    // this moves, but has to be called from a mut ref?!
126    fn as_arc(&mut self) -> Arc<dyn Array> {
127        self.inner.as_arc()
128    }
129}
130
131impl MemUsage for VarArray {
132    fn mem_usage(&self) -> usize {
133        // some regrets
134        if let Some(v) = self.downcast_ref::<MutableUtf8Array<i32>>() {
135            v.mem_usage()
136        } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<i64>>() {
137            v.mem_usage()
138        } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<i32>>() {
139            v.mem_usage()
140        } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<i16>>() {
141            v.mem_usage()
142        } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<u8>>() {
143            v.mem_usage()
144        } else if let Some(v) = self.downcast_ref::<MutableBooleanArray>() {
145            v.mem_usage()
146        } else {
147            debug_assert!(false, "unsupported type");
148            // just wildly overestimate
149            self.inner.len() * 16
150        }
151    }
152}
153
154pub struct Table {
155    schema: Box<[Kind]>,
156    builders: Box<[VarArray]>,
157    cap: usize,
158    mem_used: usize,
159}
160
161fn make_builders(schema: &[Kind], cap: usize) -> Box<[VarArray]> {
162    schema
163        .iter()
164        .map(|kind| kind.array_with_capacity(cap))
165        .collect()
166}
167
168impl Table {
169    pub fn with_capacity(schema: &[Kind], cap: usize) -> Self {
170        Self {
171            schema: schema.to_vec().into_boxed_slice(),
172            builders: make_builders(schema, cap),
173            cap,
174            mem_used: 0,
175        }
176    }
177
178    pub fn check_consistent(&self) -> Result<()> {
179        let expectation = self.builders[0].inner.len();
180        for (i, b) in self.builders.iter().enumerate().skip(1) {
181            ensure!(
182                b.inner.len() == expectation,
183                "expected col {} to have length {}, not {}",
184                i,
185                expectation,
186                b.inner.len()
187            );
188        }
189
190        Ok(())
191    }
192
193    pub fn mem_estimate(&self) -> usize {
194        self.mem_used
195    }
196
197    pub fn get(&mut self, item: usize) -> &mut VarArray {
198        &mut self.builders[item]
199    }
200
201    pub fn get_many(&mut self, items: &[usize]) -> Vec<&mut VarArray> {
202        self.builders
203            .iter_mut()
204            .enumerate()
205            .filter(|(i, _)| items.contains(i))
206            .map(|(_, v)| v)
207            .collect()
208    }
209
210    pub fn finish_bulk_push(&mut self) -> Result<()> {
211        self.check_consistent()?;
212        self.mem_used = self.builders.iter().map(|b| b.mem_usage()).sum();
213        Ok(())
214    }
215
216    pub fn rows(&self) -> usize {
217        self.builders[0].inner.len()
218    }
219
220    pub fn push_null(&mut self, i: usize) -> Result<()> {
221        // only off by a factor of about eight
222        self.mem_used += 1;
223        self.builders[i].inner.push_null();
224        Ok(())
225    }
226
227    pub fn push_str(&mut self, i: usize, val: Option<&str>) -> Result<()> {
228        let arr = &mut self.builders[i];
229        if let Some(arr) = arr.downcast_mut::<MutableUtf8Array<i32>>() {
230            self.mem_used +=
231                val.map(|val| val.len()).unwrap_or_default() + std::mem::size_of::<i32>();
232            arr.try_push(val)?;
233            Ok(())
234        } else {
235            Err(anyhow!("can't push a string to this column"))
236        }
237    }
238
239    pub fn push_bool(&mut self, i: usize, val: Option<bool>) -> Result<()> {
240        let arr = &mut self.builders[i];
241        if let Some(arr) = arr.downcast_mut::<MutableBooleanArray>() {
242            // only off by a factor of about four
243            self.mem_used += 1;
244            arr.try_push(val)?;
245            Ok(())
246        } else {
247            Err(anyhow!("can't push a bool to this column"))
248        }
249    }
250
251    pub fn push_fsb(&mut self, i: usize, val: Option<impl AsRef<[u8]>>) -> Result<()> {
252        let arr = &mut self.builders[i];
253        let val = match val {
254            Some(val) => val,
255            None => {
256                arr.inner.push_null();
257                return Ok(());
258            }
259        };
260
261        if let Some(arr) = arr.downcast_mut::<MutableFixedSizeBinaryArray>() {
262            self.mem_used += arr.size();
263            arr.try_push(Some(val.as_ref()))?;
264            Ok(())
265        } else {
266            Err(anyhow!("can't push a uuid to this column"))
267        }
268    }
269
270    pub fn push_primitive<T: NativeType>(&mut self, i: usize, val: Option<T>) -> Result<()> {
271        let arr = &mut self.builders[i];
272        if let Some(arr) = arr.downcast_mut::<MutablePrimitiveArray<T>>() {
273            self.mem_used += std::mem::size_of::<T>();
274            arr.try_push(val)?;
275            Ok(())
276        } else {
277            Err(anyhow!(
278                "can't push an {} to this column",
279                std::any::type_name::<T>()
280            ))
281        }
282    }
283
284    pub fn take_batch(&mut self) -> Vec<Arc<dyn Array>> {
285        let ret = self.builders.iter_mut().map(|arr| arr.as_arc()).collect();
286        self.builders = make_builders(&self.schema, self.cap);
287        self.mem_used = 0;
288        ret
289    }
290}