Module lance_encoding::decoder
source · Expand description
Utilities and traits for decoding data
There are two types of decoders, logical decoders and physical decoders. In addition, decoding data is broken into two steps, scheduling the I/O and decoding the returned data.
§Physical vs. Logical Decoding
The physical traits are self::PhysicalPageScheduler
and
self::PhysicalPageDecoder
. These are lower level encodings. They
have a few advantages:
- They do not need to decode into an Arrow array and so they don’t need to be enveloped into the Arrow filesystem (e.g. Arrow doesn’t have a bit-packed type. We can use variable-length binary but that is kind of overkill)
- They can decode into existing storage. This can allow for “page bridging”. If we are trying to decode into a batch of 1024 rows and the rows 0..1024 are spread across two pages then we can avoid a memory copy by allocating once and decoding each page into the outer allocation.
However, there are some limitations too:
- They are constrained to a single column
- The API is more complex
The logical traits are self::LogicalPageScheduler
and self::LogicalPageDecoder
These are designed to map from Arrow fields into one or more columns of Lance
data. They do not decode into existing buffers and instead they return an Arrow Array.
Encodings are typically nested into each other to form a tree. The top of the tree is the user requested schema. Each field in that schema is assigned to one top-level logical encoding. That encoding can then contain other logical encodings or physical encodings. Physical encodings can also contain other physical encodings.
So, for example, a single field in the Arrow schema might have the type List
The encoding tree could then be:
root: List (logical encoding)
- indices: Primitive (logical encoding)
- column: Basic (physical encoding)
- validity: Bitmap (physical encoding)
- values: RLE (physical encoding)
- runs: Value (physical encoding)
- values: Value (physical encoding)
- column: Basic (physical encoding)
- items: Primitive (logical encoding)
- column: Basic (physical encoding)
- values: Value (phsyical encoding)
- column: Basic (physical encoding)
Note that, in this example, root.items.column does not have a validity because there were no nulls in the page.
Note that the decoding API is not symmetric with the encoding API. The encoding API contains buffer encoders, array encoders, and field encoders. Physical decoders correspond to both buffer encoders and array encoders. Logical decoders correspond to both array encoders and field encoders.
§Multiple buffers or multiple columns?
Note that there are many different ways we can write encodings. For example, we might store primitive fields in a single column with two buffers (one for validity and one for values)
On the other hand, we could also store a primitive field as two different columns. One that yields a non-nullable boolean array and one that yields a non-nullable array of items. Then we could combine these two arrays into a single array where the boolean array is the bitmap. There are a few subtle differences between the approaches:
- Storing things as multiple buffers within the same column is generally more efficient and easier to schedule. For example, in-batch coalescing is very easy but can only be done on data that is in the same page.
- When things are stored in multiple columns you have to worry about their pages not being in sync. In our previous validity / values example this means we might have to do some memory copies to get the validity array and values arrays to be the same length as decode.
- When things are stored in a single column, projection is impossible. For example, if we tried to store all the struct fields in a single column with lots of buffers then we wouldn’t be able to read back individual fields of the struct.
- When things are stored in a single column they must have the same length. This is an issue for list fields. The items column does not usually have the same length as the parent list column (in most cases it is much longer).
The fixed size list decoding is an interesting example because it is actually both a physical encoding and a logical encoding. A fixed size list of a physical encoding is, itself, a physical encoding (e.g. a fixed size list of doubles). However, a fixed size list of a logical encoding is a logical encoding (e.g. a fixed size list of structs).
§The scheduling loop
Reading a Lance file involves both scheduling and decoding. Its generally expected that these will run as two separate threads.
I/O PARALLELISM
Issues
Requests ┌─────────────────┐
│ │ Wait for
┌──────────► I/O Service ├─────► Enough I/O ◄─┐
│ │ │ For batch │
│ └─────────────────┘ │3 │
│ │ │
│ │ │2
┌─────────────────────┴─┐ ┌─────────▼───────┴┐
│ │ │ │Poll
│ Batch Decode │ Decode tasks sent via channel│ Batch Decode │1
│ Scheduler ├─────────────────────────────►│ Stream ◄─────
│ │ │ │
└─────▲─────────────┬───┘ └─────────┬────────┘
│ │ │4
│ │ │
└─────────────┘ ┌────────┴────────┐
Caller of schedule_range Buffer polling │ │
will be scheduler thread to achieve CPU │ Decode Batch ├────►
and schedule one decode parallelism │ Task │
task (and all needed I/O) (thread per │ │
per logical page batch) └─────────────────┘
The scheduling thread will work through the file from the start to the end as quickly as possible. Data is scheduled one page at a time in a row-major fashion. For example, imagine we have a file with the following page structure:
Score (Float32) | C0P0 |
Id (16-byte UUID) | C1P0 | C1P1 | C1P2 | C1P3 |
Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
This would be quite common as each of these pages has the same number of bytes. Let’s pretend
each page is 1MiB and so there are 256Ki rows of data. Each page of Score
has 256Ki rows.
Each page of Id
has 64Ki rows. Each page of Vector
has 256 rows. The scheduler would then
schedule in the following order:
C0 P0 C1 P0 C2 P0 C2 P1 … (254 pages omitted) C2 P255 C1 P1 C2 P256 … (254 pages omitted) C2 P511 C1 P2 C2 P512 … (254 pages omitted) C2 P767 C1 P3 C2 P768 … (254 pages omitted) C2 P1024
This is the ideal scheduling order because it means we can decode complete rows as quickly as possible. Note that the scheduler thread does not need to wait for I/O to happen at any point. As soon as it starts it will start scheduling one page of I/O after another until it has scheduled the entire file’s worth of I/O. This is slightly different than other file readers which have “row group parallelism” and will typically only schedule X row groups worth of reads at a time.
In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute falls behind.
§Indirect I/O
Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded the file. This happens when we have variable sized list data. In that case the scheduling task for that page will only schedule the first part of the read (loading the list offsets). It will then immediately spawn a new tokio task to wait for that I/O and decode the list offsets. That follow-up task is not part of the scheduling loop or the decode loop. It is a free task. Once the list offsets are decoded we submit a follow-up I/O task. This task is scheduled at a high priority because the decoder is going to need it soon.
§The decode loop
As soon as the scheduler starts we can start decoding. Each time we schedule a page we
push a decoder for that page’s data into a channel. The decode loop
(BatchDecodeStream
) reads from that channel. Each time it receives a decoder it
waits until the decoder has all of its data. Then it grabs the next decoder. Once it has
enough loaded decoders to complete a batch worth of rows it will spawn a “decode batch task”.
These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow arrays. This may involve signifciant CPU processing like decompression or arithmetic in order to restore the data to its correct in-memory representation.
§Batch size
The BatchDecodeStream
is configured with a batch size. This does not need to have any
relation to the page size(s) used to write the data. This keeps our compute work completely
independent of our I/O work. We suggest using small batch sizes:
- Batches should fit in CPU cache (at least L3)
- More batches means more opportunity for parallelism
- The “batch overhead” is very small in Lance compared to other formats because it has no relation to the way the data is stored.
Structs§
- A stream that takes scheduled jobs and generates decode tasks from them.
- Metadata describing a column in a file
- The scheduler for decoding batches
- A task to decode data into an Arrow array
- Metadata describing a page in a file
Traits§
- A trait for tasks that decode data into an Arrow array
- A decoder for a field’s worth of data
- A scheduler for a field’s worth of data
- A decoder for single-column encodings of primitive data (this includes fixed size lists of primitive data)
- A scheduler for single-column encodings of primitive data