Module lance_encoding::decoder

source ·
Expand description

Utilities and traits for decoding data

There are two types of decoders, logical decoders and physical decoders. In addition, decoding data is broken into two steps, scheduling the I/O and decoding the returned data.

§Physical vs. Logical Decoding

The physical traits are self::PhysicalPageScheduler and self::PhysicalPageDecoder. These are lower level encodings. They have a few advantages:

  • They do not need to decode into an Arrow array and so they don’t need to be enveloped into the Arrow filesystem (e.g. Arrow doesn’t have a bit-packed type. We can use variable-length binary but that is kind of overkill)
  • They can decode into existing storage. This can allow for “page bridging”. If we are trying to decode into a batch of 1024 rows and the rows 0..1024 are spread across two pages then we can avoid a memory copy by allocating once and decoding each page into the outer allocation.

However, there are some limitations too:

  • They are constrained to a single column
  • The API is more complex

The logical traits are self::LogicalPageScheduler and self::LogicalPageDecoder These are designed to map from Arrow fields into one or more columns of Lance data. They do not decode into existing buffers and instead they return an Arrow Array.

Encodings are typically nested into each other to form a tree. The top of the tree is the user requested schema. Each field in that schema is assigned to one top-level logical encoding. That encoding can then contain other logical encodings or physical encodings. Physical encodings can also contain other physical encodings.

So, for example, a single field in the Arrow schema might have the type List

The encoding tree could then be:

root: List (logical encoding)

  • indices: Primitive (logical encoding)
    • column: Basic (physical encoding)
      • validity: Bitmap (physical encoding)
      • values: RLE (physical encoding)
        • runs: Value (physical encoding)
        • values: Value (physical encoding)
  • items: Primitive (logical encoding)
    • column: Basic (physical encoding)
      • values: Value (phsyical encoding)

Note that, in this example, root.items.column does not have a validity because there were no nulls in the page.

Note that the decoding API is not symmetric with the encoding API. The encoding API contains buffer encoders, array encoders, and field encoders. Physical decoders correspond to both buffer encoders and array encoders. Logical decoders correspond to both array encoders and field encoders.

§Multiple buffers or multiple columns?

Note that there are many different ways we can write encodings. For example, we might store primitive fields in a single column with two buffers (one for validity and one for values)

On the other hand, we could also store a primitive field as two different columns. One that yields a non-nullable boolean array and one that yields a non-nullable array of items. Then we could combine these two arrays into a single array where the boolean array is the bitmap. There are a few subtle differences between the approaches:

  • Storing things as multiple buffers within the same column is generally more efficient and easier to schedule. For example, in-batch coalescing is very easy but can only be done on data that is in the same page.
  • When things are stored in multiple columns you have to worry about their pages not being in sync. In our previous validity / values example this means we might have to do some memory copies to get the validity array and values arrays to be the same length as decode.
  • When things are stored in a single column, projection is impossible. For example, if we tried to store all the struct fields in a single column with lots of buffers then we wouldn’t be able to read back individual fields of the struct.
  • When things are stored in a single column they must have the same length. This is an issue for list fields. The items column does not usually have the same length as the parent list column (in most cases it is much longer).

The fixed size list decoding is an interesting example because it is actually both a physical encoding and a logical encoding. A fixed size list of a physical encoding is, itself, a physical encoding (e.g. a fixed size list of doubles). However, a fixed size list of a logical encoding is a logical encoding (e.g. a fixed size list of structs).

§The scheduling loop

Reading a Lance file involves both scheduling and decoding. Its generally expected that these will run as two separate threads.


                                   I/O PARALLELISM
                      Issues
                      Requests   ┌─────────────────┐
                                 │                 │        Wait for
                      ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
                      │          │                 │        For batch    │
                      │          └─────────────────┘             │3      │
                      │                                          │       │
                      │                                          │       │2
┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
│                       │                              │                  │Poll
│       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
│       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
│                       │                              │                  │
└─────▲─────────────┬───┘                              └─────────┬────────┘
      │             │                                            │4
      │             │                                            │
      └─────────────┘                                   ┌────────┴────────┐
 Caller of schedule_range                Buffer polling │                 │
 will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
 and schedule one decode                 parallelism    │ Task            │
 task (and all needed I/O)               (thread per    │                 │
 per logical page                         batch)        └─────────────────┘

The scheduling thread will work through the file from the start to the end as quickly as possible. Data is scheduled one page at a time in a row-major fashion. For example, imagine we have a file with the following page structure:

Score (Float32)     | C0P0 |
Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |

This would be quite common as each of these pages has the same number of bytes. Let’s pretend each page is 1MiB and so there are 256Ki rows of data. Each page of Score has 256Ki rows. Each page of Id has 64Ki rows. Each page of Vector has 256 rows. The scheduler would then schedule in the following order:

C0 P0 C1 P0 C2 P0 C2 P1 … (254 pages omitted) C2 P255 C1 P1 C2 P256 … (254 pages omitted) C2 P511 C1 P2 C2 P512 … (254 pages omitted) C2 P767 C1 P3 C2 P768 … (254 pages omitted) C2 P1024

This is the ideal scheduling order because it means we can decode complete rows as quickly as possible. Note that the scheduler thread does not need to wait for I/O to happen at any point. As soon as it starts it will start scheduling one page of I/O after another until it has scheduled the entire file’s worth of I/O. This is slightly different than other file readers which have “row group parallelism” and will typically only schedule X row groups worth of reads at a time.

In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute falls behind.

§Indirect I/O

Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded the file. This happens when we have variable sized list data. In that case the scheduling task for that page will only schedule the first part of the read (loading the list offsets). It will then immediately spawn a new tokio task to wait for that I/O and decode the list offsets. That follow-up task is not part of the scheduling loop or the decode loop. It is a free task. Once the list offsets are decoded we submit a follow-up I/O task. This task is scheduled at a high priority because the decoder is going to need it soon.

§The decode loop

As soon as the scheduler starts we can start decoding. Each time we schedule a page we push a decoder for that page’s data into a channel. The decode loop (BatchDecodeStream) reads from that channel. Each time it receives a decoder it waits until the decoder has all of its data. Then it grabs the next decoder. Once it has enough loaded decoders to complete a batch worth of rows it will spawn a “decode batch task”.

These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow arrays. This may involve signifciant CPU processing like decompression or arithmetic in order to restore the data to its correct in-memory representation.

§Batch size

The BatchDecodeStream is configured with a batch size. This does not need to have any relation to the page size(s) used to write the data. This keeps our compute work completely independent of our I/O work. We suggest using small batch sizes:

  • Batches should fit in CPU cache (at least L3)
  • More batches means more opportunity for parallelism
  • The “batch overhead” is very small in Lance compared to other formats because it has no relation to the way the data is stored.

Structs§

Traits§