1use std::any::Any;
2use std::sync::Arc;
3
4use crate::MemUsage;
5use anyhow::{anyhow, bail, ensure, Result};
6use arrow2::array::{
7 Array, MutableArray, MutableBooleanArray, MutableFixedSizeBinaryArray, MutablePrimitiveArray,
8 MutableUtf8Array, TryPush,
9};
10use arrow2::datatypes::{DataType, TimeUnit};
11use arrow2::io::parquet::write::Encoding;
12use arrow2::types::NativeType;
13
14#[derive(Clone)]
15pub struct TableField {
16 pub name: String,
17 pub kind: Kind,
18 pub nullable: bool,
19
20 pub encoding: Encoding,
21}
22
23impl TableField {
24 pub fn new(name: impl ToString, kind: Kind, nullable: bool) -> Self {
25 TableField {
26 name: name.to_string(),
27 kind,
28 nullable,
29 encoding: kind.default_encoding(),
30 }
31 }
32}
33
34#[derive(Copy, Clone)]
35pub enum Kind {
36 Bool,
37 Uuid,
38 U8,
39 I32,
40 I64,
41 F64,
42 String,
43
44 TimestampSecsZ,
46}
47
48impl Kind {
49 pub fn array_with_capacity(self, capacity: usize) -> VarArray {
50 match self {
51 Kind::Bool => VarArray::new(MutableBooleanArray::with_capacity(capacity)),
52 Kind::U8 => VarArray::new(MutablePrimitiveArray::<u8>::with_capacity(capacity)),
53 Kind::I32 => VarArray::new(MutablePrimitiveArray::<i32>::with_capacity(capacity)),
54 Kind::I64 => VarArray::new(MutablePrimitiveArray::<i64>::with_capacity(capacity)),
55 Kind::F64 => VarArray::new(MutablePrimitiveArray::<f64>::with_capacity(capacity)),
56 Kind::String => VarArray::new(MutableUtf8Array::<i32>::with_capacity(capacity)),
57 Kind::Uuid => VarArray::new(MutableFixedSizeBinaryArray::with_capacity(16, capacity)),
58 Kind::TimestampSecsZ => {
59 VarArray::new(MutablePrimitiveArray::<i64>::with_capacity(capacity))
60 }
61 }
62 }
63
64 pub fn to_arrow(self) -> DataType {
65 match self {
66 Kind::Bool => DataType::Boolean,
67 Kind::U8 => DataType::UInt8,
68 Kind::I32 => DataType::Int32,
69 Kind::I64 => DataType::Int64,
70 Kind::F64 => DataType::Float64,
71 Kind::String => DataType::Utf8,
72 Kind::Uuid => DataType::FixedSizeBinary(16),
73 Kind::TimestampSecsZ => DataType::Timestamp(TimeUnit::Second, None),
74 }
75 }
76
77 pub fn from_arrow(arrow: &DataType) -> Result<Self> {
78 Ok(match arrow {
79 DataType::Utf8 => Kind::String,
80 DataType::Boolean => Kind::Bool,
81 DataType::Int64 => Kind::I64,
82 DataType::Int32 => Kind::I32,
83 DataType::UInt8 => Kind::U8,
84 DataType::Float64 => Kind::F64,
85 DataType::Timestamp(TimeUnit::Second, None) => Kind::TimestampSecsZ,
86 other => bail!("unsupported type {:?}", other),
87 })
88 }
89
90 pub fn default_encoding(&self) -> Encoding {
91 match self {
92 Kind::Bool | Kind::U8 => Encoding::Plain,
94 Kind::Uuid => Encoding::Plain,
96 Kind::TimestampSecsZ | Kind::I64 | Kind::I32 => Encoding::Plain,
98 Kind::F64 => Encoding::Plain,
100 Kind::String => Encoding::Plain,
102 }
103 }
104}
105
106pub struct VarArray {
107 pub inner: Box<dyn MutableArray>,
108}
109
110impl VarArray {
111 fn new<T: MutableArray + 'static>(array: T) -> Self {
112 Self {
113 inner: Box::new(array),
114 }
115 }
116
117 pub fn downcast_ref<T: Any>(&self) -> Option<&T> {
118 self.inner.as_any().downcast_ref()
119 }
120
121 pub fn downcast_mut<T: Any>(&mut self) -> Option<&mut T> {
122 self.inner.as_mut_any().downcast_mut()
123 }
124
125 fn as_arc(&mut self) -> Arc<dyn Array> {
127 self.inner.as_arc()
128 }
129}
130
131impl MemUsage for VarArray {
132 fn mem_usage(&self) -> usize {
133 if let Some(v) = self.downcast_ref::<MutableUtf8Array<i32>>() {
135 v.mem_usage()
136 } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<i64>>() {
137 v.mem_usage()
138 } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<i32>>() {
139 v.mem_usage()
140 } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<i16>>() {
141 v.mem_usage()
142 } else if let Some(v) = self.downcast_ref::<MutablePrimitiveArray<u8>>() {
143 v.mem_usage()
144 } else if let Some(v) = self.downcast_ref::<MutableBooleanArray>() {
145 v.mem_usage()
146 } else {
147 debug_assert!(false, "unsupported type");
148 self.inner.len() * 16
150 }
151 }
152}
153
154pub struct Table {
155 schema: Box<[Kind]>,
156 builders: Box<[VarArray]>,
157 cap: usize,
158 mem_used: usize,
159}
160
161fn make_builders(schema: &[Kind], cap: usize) -> Box<[VarArray]> {
162 schema
163 .iter()
164 .map(|kind| kind.array_with_capacity(cap))
165 .collect()
166}
167
168impl Table {
169 pub fn with_capacity(schema: &[Kind], cap: usize) -> Self {
170 Self {
171 schema: schema.to_vec().into_boxed_slice(),
172 builders: make_builders(schema, cap),
173 cap,
174 mem_used: 0,
175 }
176 }
177
178 pub fn check_consistent(&self) -> Result<()> {
179 let expectation = self.builders[0].inner.len();
180 for (i, b) in self.builders.iter().enumerate().skip(1) {
181 ensure!(
182 b.inner.len() == expectation,
183 "expected col {} to have length {}, not {}",
184 i,
185 expectation,
186 b.inner.len()
187 );
188 }
189
190 Ok(())
191 }
192
193 pub fn mem_estimate(&self) -> usize {
194 self.mem_used
195 }
196
197 pub fn get(&mut self, item: usize) -> &mut VarArray {
198 &mut self.builders[item]
199 }
200
201 pub fn get_many(&mut self, items: &[usize]) -> Vec<&mut VarArray> {
202 self.builders
203 .iter_mut()
204 .enumerate()
205 .filter(|(i, _)| items.contains(i))
206 .map(|(_, v)| v)
207 .collect()
208 }
209
210 pub fn finish_bulk_push(&mut self) -> Result<()> {
211 self.check_consistent()?;
212 self.mem_used = self.builders.iter().map(|b| b.mem_usage()).sum();
213 Ok(())
214 }
215
216 pub fn rows(&self) -> usize {
217 self.builders[0].inner.len()
218 }
219
220 pub fn push_null(&mut self, i: usize) -> Result<()> {
221 self.mem_used += 1;
223 self.builders[i].inner.push_null();
224 Ok(())
225 }
226
227 pub fn push_str(&mut self, i: usize, val: Option<&str>) -> Result<()> {
228 let arr = &mut self.builders[i];
229 if let Some(arr) = arr.downcast_mut::<MutableUtf8Array<i32>>() {
230 self.mem_used +=
231 val.map(|val| val.len()).unwrap_or_default() + std::mem::size_of::<i32>();
232 arr.try_push(val)?;
233 Ok(())
234 } else {
235 Err(anyhow!("can't push a string to this column"))
236 }
237 }
238
239 pub fn push_bool(&mut self, i: usize, val: Option<bool>) -> Result<()> {
240 let arr = &mut self.builders[i];
241 if let Some(arr) = arr.downcast_mut::<MutableBooleanArray>() {
242 self.mem_used += 1;
244 arr.try_push(val)?;
245 Ok(())
246 } else {
247 Err(anyhow!("can't push a bool to this column"))
248 }
249 }
250
251 pub fn push_fsb(&mut self, i: usize, val: Option<impl AsRef<[u8]>>) -> Result<()> {
252 let arr = &mut self.builders[i];
253 let val = match val {
254 Some(val) => val,
255 None => {
256 arr.inner.push_null();
257 return Ok(());
258 }
259 };
260
261 if let Some(arr) = arr.downcast_mut::<MutableFixedSizeBinaryArray>() {
262 self.mem_used += arr.size();
263 arr.try_push(Some(val.as_ref()))?;
264 Ok(())
265 } else {
266 Err(anyhow!("can't push a uuid to this column"))
267 }
268 }
269
270 pub fn push_primitive<T: NativeType>(&mut self, i: usize, val: Option<T>) -> Result<()> {
271 let arr = &mut self.builders[i];
272 if let Some(arr) = arr.downcast_mut::<MutablePrimitiveArray<T>>() {
273 self.mem_used += std::mem::size_of::<T>();
274 arr.try_push(val)?;
275 Ok(())
276 } else {
277 Err(anyhow!(
278 "can't push an {} to this column",
279 std::any::type_name::<T>()
280 ))
281 }
282 }
283
284 pub fn take_batch(&mut self) -> Vec<Arc<dyn Array>> {
285 let ret = self.builders.iter_mut().map(|arr| arr.as_arc()).collect();
286 self.builders = make_builders(&self.schema, self.cap);
287 self.mem_used = 0;
288 ret
289 }
290}