dbkit-engine 0.0.9

A columnar query execution engine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
// vim : set ts=4 sw=4 et :

// libstd
use std::mem;
use std::slice;

// DBKit
use super::allocator::{Allocator, OwnedChunk, ChainedArena, MIN_ALIGN};
use super::types::ValueInfo;
use super::schema::{Attribute, Schema};
use super::error::DBError;
use super::row::{RowOffset, RowRange};

pub type BoolBitmap<'a> = &'a [u8];
pub type MutBoolBitmap<'a> = &'a mut [u8];

/// Starting size for the VARLEN arena
const ARENA_MIN_SIZE : usize = MIN_ALIGN;

/// Limit on arena chunk size. This is also on the largest VARLEN value in Columns.
/// Currently the limit for large blobs / text is up to 16MB.
const ARENA_MAX_SIZE : usize = 16 * 1024 * 1024;

/// Trait representing a reference to column data.
/// Data can be owned by current object or references from another one.
pub trait RefColumn<'re> {
    fn attribute(&self) -> &Attribute;
    fn capacity(&self) -> usize;

    /// Will panic if there's no row data
    fn rows_raw_slice(&'re self) -> &'re [u8];
    /// Will panic if there's no null data
    fn nulls_raw_slice(&'re self) -> &'re [u8];

    /// Pointer to the beginning of the raw row data.
    /// ptr can be nil
    unsafe fn rows_ptr(&self) -> *const u8;
    /// Pointer to the beginning of the raw row data.
    /// ptr can be nil
    unsafe fn nulls_ptr(&self) -> *const u8;
}

/// Slice representing the column value vector (row data)
///
// RUST FRUSTRATION: wish this could be part of `RefColumn`
pub fn column_rows<'c, T: ValueInfo>(col: &'c RefColumn) -> Result<&'c [T::Store], DBError> {
    let attr = col.attribute();
    let rows = col.capacity();

    if attr.dtype != T::ENUM {
        return Err(DBError::AttributeType(attr.name.clone()))
    }

    unsafe {
        let col_ptr = col.rows_ptr();
        let typed_ptr: *const T::Store = mem::transmute(col_ptr);

        let out = if typed_ptr.is_null() {
            &[]
        } else {
            slice::from_raw_parts(typed_ptr, rows)
        };

        Ok(out)
    }
}

/// Slice representing the column null vector (row data)
pub fn column_nulls<'c>(col: &'c RefColumn) ->  Result<BoolBitmap<'c>, DBError> {
    let attr = col.attribute();
    let rows = col.capacity();

    if !attr.nullable {
        return Err(DBError::AttributeNullability(attr.name.clone()))
    }

    unsafe {
        let nulls_ptr = col.nulls_ptr();

        let out = if nulls_ptr.is_null() {
            &[]
        } else {
            slice::from_raw_parts(nulls_ptr, rows)
        };

        Ok(out)
    }
}

/// Typed Data Column. Contains a vector of column rows, and optionally a nul vector.
///
/// Knows its capacity but not size, has no concept of current. Those properties are fulfilled by
/// it's parent container (types such as Block).
pub struct Column<'alloc> {
    allocator: &'alloc Allocator,
    attr: Attribute,
    raw_nulls: OwnedChunk<'alloc>,
    raw: OwnedChunk<'alloc>,
    /// Used to store varlen column values
    arena: ChainedArena<'alloc>
}

/// Typed Data Column that references another column
#[derive(Clone)]
pub struct AliasColumn<'parent> {
    attr: Attribute,
    raw_nulls: &'parent [u8],
    raw: &'parent [u8],
}

/// Create another read only alias of a column
///
/// If no range is specified, aliases the whole column source column.
pub fn alias_column<'a>(src: &'a RefColumn<'a>, range: Option<RowRange>)
    -> Result<AliasColumn<'a>, DBError>
{
    let (offset, rows) = range.map_or((0, src.capacity()), |r| (r.offset, r.rows));

    let size_of = src.attribute().dtype.size_of();
    let start = offset * size_of;
    let len = rows + size_of;

    if offset + rows > src.capacity() {
        return Err(DBError::RowOutOfBounds)
    }

    let raw = src.rows_raw_slice();
    let col = &raw[start .. start + len];

    let nulls = if src.attribute().nullable {
        let raw = src.nulls_raw_slice();
        &raw[offset .. offset + rows]
    } else {
        &[]
    };

    Ok(AliasColumn {
        attr: src.attribute().clone(),
        raw: col,
        raw_nulls: nulls,
    })
}

impl<'parent> RefColumn<'parent> for AliasColumn<'parent> {
    fn attribute(&self) -> &Attribute {
        &self.attr
    }

    /// Row capacity
    fn capacity(&self) -> usize {
        self.raw.len() / self.attr.dtype.size_of()
    }

    /// Pointer to the beginning of the raw row data
    unsafe fn rows_ptr(&self) -> *const u8 {
        self.raw.as_ptr()
    }

    /// Pointer to the beginning of the raw row data
    unsafe fn nulls_ptr(&self) -> *const u8 {
        self.raw_nulls.as_ptr()
    }

    fn rows_raw_slice(&'parent self) -> &'parent [u8] {
        self.raw
    }

    fn nulls_raw_slice(&'parent self) -> &'parent [u8] {
        self.raw_nulls
    }
}

impl<'alloc> RefColumn<'alloc> for Column<'alloc> {
    fn attribute(&self) -> &Attribute {
        &self.attr
    }

    /// Row capacity
    fn capacity(&self) -> usize {
        self.raw.len() / self.attr.dtype.size_of()
    }

    /// Pointer to the beginning of the raw row data
    unsafe fn rows_ptr(&self) -> *const u8 {
        self.raw.as_ptr()
    }

    /// Pointer to the beginning of the raw row data
    unsafe fn nulls_ptr(&self) -> *const u8 {
        self.raw_nulls.as_ptr()
    }

    fn rows_raw_slice(&'alloc self) -> &'alloc [u8] {
        self.raw.data.as_ref()
            .map_or(&[], |f| f as &'alloc [u8])
    }

    fn nulls_raw_slice(&'alloc self) -> &'alloc [u8] {
        self.raw_nulls.data.as_ref()
            .map_or(&[], |f| f as &'alloc [u8])
    }
}

impl<'alloc> Column<'alloc> {
    fn new(a: &'alloc Allocator, attr: Attribute) -> Column<'alloc> {
        Column {
            allocator: a,
            attr: attr,
            raw_nulls: OwnedChunk::empty(),
            raw: OwnedChunk::empty(),
            arena: ChainedArena::new(a, ARENA_MIN_SIZE, ARENA_MAX_SIZE),
        }
    }

    pub fn arena(&mut self) -> &mut ChainedArena<'alloc> {
        &mut self.arena
    }

    pub fn nulls_mut(&mut self) -> Result<MutBoolBitmap, DBError> {
        if !self.attr.nullable {
            return Err(DBError::AttributeNullability(self.attr.name.clone()))
        }

        let out: MutBoolBitmap = match self.raw_nulls.data {
            Some(ref mut slice) => slice,
            _ => &mut[],
        };

        Ok(out)
    }

    pub fn rows_mut<T: ValueInfo>(&mut self) -> Result<&mut [T::Store], DBError> {
        if self.attr.dtype != T::ENUM {
            return Err(DBError::AttributeType(self.attr.name.clone()))
        }

        unsafe {
            let ptr: *mut T::Store = mem::transmute(self.raw.as_mut_ptr());
            let out = if ptr.is_null() {
                &mut []
            } else {
                slice::from_raw_parts_mut(ptr, self.capacity())
            };

            Ok(out)
        }
    }

    /// Change the capacity of the Column
    pub fn set_capacity(&mut self, rows: RowOffset) -> Option<DBError> {
        let new_size = rows * self.attr.dtype.size_of();

        if self.raw.is_null() {
            match self.allocator.allocate(new_size) {
                Ok(chunk) => self.raw = chunk,
                Err(e) => return Some(e)
            }

            if self.attr.nullable {
                match self.allocator.allocate(rows) {
                    Ok(chunk) => self.raw_nulls = chunk,
                    Err(e) => return Some(e)
                }
            }
        } else {
            let status = self.raw.resize(new_size);
            if status.is_some() {
                return status;
            }

            if self.attr.nullable {
                let nulls_status = self.raw_nulls.resize(rows);
                if nulls_status.is_some() {
                    return nulls_status;
                }
            }
        }

        None
    }
}

/// A read-only view into data conforming to a pre-defined schema. This view may be backed by a
/// container that owns it data, borrows or aliases somebody elses data.
pub trait View<'v> {
    fn schema(&'v self) -> &'v Schema;
    fn column(&'v self, pos: usize) -> Option<&'v RefColumn<'v>>;

    /// Number of rows
    fn rows(&self) -> RowOffset;
}

/// An implementation of a View that doesn't "own" the data but aliases it
#[derive(Default)]
pub struct RefView<'a> {
    schema: Schema,
    columns: Vec<AliasColumn<'a>>,
    rows: RowOffset,
}

/// Take a view and create a vector of column aliases
pub fn alias_columns<'a>(src: &'a View<'a>, range: Option<RowRange>)
    -> Result<Vec<AliasColumn<'a>>, DBError>
{
    let count = src.schema().count();
    let mut out: Vec<AliasColumn> = Vec::with_capacity(count);

    for pos in 0 .. count {
        let col = alias_column(src.column(pos).unwrap(), range)?;
        out.push(col);
    }

    Ok(out)
}

impl<'a> View<'a> for RefView<'a> {
    fn schema(&'a self) -> &'a Schema {
        &self.schema
    }

    fn column(&'a self, pos: usize) -> Option<&RefColumn> {
        self.columns.get(pos)
            .map(|c| c as &RefColumn)
    }

    fn rows(&self) -> RowOffset {
        self.rows
    }
}

/// Create window into another view
pub fn window_alias<'a>(src: &'a View<'a>, range: Option<RowRange>)
    -> Result<RefView<'a>, DBError>
{
    let (offset, rows) = range.map_or((0, src.rows()), |r| (r.offset, r.rows));

    if offset + rows > src.rows() {
        Err(DBError::RowOutOfBounds)
    } else {
        let schema = src.schema();

        Ok(RefView {
            schema: schema.clone(),
            rows: rows,
            columns: alias_columns(src, range)?,
        })
    }
}

impl<'a> RefView<'a> {
    pub fn new(schema: Schema, columns: Vec<AliasColumn<'a>>, rows: RowOffset) -> RefView<'a> {
        RefView { schema: schema, columns: columns, rows: rows }
    }
}

/// A container for column data conforming to a pre-defined schema. This container is the owner of
/// the columns (and their data)
pub struct Block<'b> {
    allocator: &'b Allocator,
    schema: Schema,
    columns: Vec<Column<'b>>,
    rows: RowOffset,
    capacity: RowOffset,
}

impl<'b> View<'b> for Block<'b> {
    fn schema(&'b self) -> &'b Schema {
        &self.schema
    }

    fn column(&'b self, pos: usize) -> Option<&RefColumn> {
        self.columns.get(pos)
            .map(|c| c as &RefColumn)
    }

    fn rows(&self) -> RowOffset {
        self.rows
    }
}

impl<'b> Block<'b> {
    pub fn new(alloc: &'b Allocator, schema: &Schema) -> Block<'b> {
        let mut b = Block {
            allocator: alloc,
            schema: schema.clone(),
            rows: 0,
            capacity: 0,
            columns: Vec::new()
        };

        for attr in schema.iter() {
            b.columns.push(Column::new(b.allocator, attr.clone()))
        }

        b
    }

    /// Number of rows the Block can currently grow to without re-allocating column data.
    pub fn capacity(&self) -> RowOffset {
        self.capacity
    }

    /// Grow possible row space for each column
    pub fn set_capacity(&mut self, row_cap: RowOffset) -> Option<DBError> {
        for ref mut col in &mut self.columns {
            let status = col.set_capacity(row_cap);
            if status.is_some() {
                return status;
            }
        }

        self.capacity = row_cap;
        if row_cap < self.rows {
            self.rows = row_cap;
        }

        None
    }

    /// Returns rowid of the added row
    pub fn add_row(&mut self) -> Result<RowOffset, DBError> {
        if self.capacity > self.rows {
            let rowid = self.rows;
            self.rows += 1;
            Ok(rowid)
        } else {
            let rowid = self.rows;
            let new_cap = self.capacity + 1024;

            if let Some(err) = self.set_capacity(new_cap) {
                Err(err)
            } else {
                self.rows += 1;
                Ok(rowid)
            }
        }
    }

    /// Mutable reference to column and its data.
    pub fn column_mut(&mut self, pos: usize) -> Option<&mut Column<'b>> {
        self.columns.get_mut(pos)
    }
}