1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::Arc;

use arrow_array::{ArrayRef, BooleanArray};
use arrow_buffer::BooleanBuffer;
use futures::{future::BoxFuture, FutureExt};
use log::trace;

use crate::{
    decoder::{PhysicalPageDecoder, PhysicalPageScheduler},
    encoder::{ArrayEncoder, BufferEncoder, EncodedArray, EncodedArrayBuffer},
    format::pb,
    EncodingsIo,
};

use lance_core::Result;

use super::buffers::BitmapBufferEncoder;

struct DataDecoders {
    validity: Box<dyn PhysicalPageDecoder>,
    values: Box<dyn PhysicalPageDecoder>,
}

enum DataNullStatus {
    // Neither validity nor values
    All,
    // Values only
    None(Box<dyn PhysicalPageDecoder>),
    // Validity and values
    Some(DataDecoders),
}

impl DataNullStatus {
    fn values_decoder(&self) -> Option<&dyn PhysicalPageDecoder> {
        match self {
            Self::All => None,
            Self::Some(decoders) => Some(decoders.values.as_ref()),
            Self::None(values) => Some(values.as_ref()),
        }
    }
}

#[derive(Debug)]
struct DataSchedulers {
    validity: Box<dyn PhysicalPageScheduler>,
    values: Box<dyn PhysicalPageScheduler>,
}

#[derive(Debug)]
enum SchedulerNullStatus {
    // Values only
    None(Box<dyn PhysicalPageScheduler>),
    // Validity and values
    Some(DataSchedulers),
    // Neither validity nor values
    All,
}

impl SchedulerNullStatus {
    fn values_scheduler(&self) -> Option<&dyn PhysicalPageScheduler> {
        match self {
            Self::All => None,
            Self::None(values) => Some(values.as_ref()),
            Self::Some(schedulers) => Some(schedulers.values.as_ref()),
        }
    }
}

/// A physical scheduler for "basic" fields.  These are fields that have an optional
/// validity bitmap and some kind of values buffer.
///
/// No actual decoding happens here, we are simply aggregating the two buffers.
///
/// If everything is null then there are no data buffers at all.
// TODO: Add support/tests for primitive nulls
// TODO: Add tests for the all-null case
//
// Right now this is always present on primitive fields.  In the future we may use a
// sentinel encoding instead.
#[derive(Debug)]
pub struct BasicPageScheduler {
    mode: SchedulerNullStatus,
}

impl BasicPageScheduler {
    /// Creates a new instance that expects a validity bitmap
    pub fn new_nullable(
        validity_decoder: Box<dyn PhysicalPageScheduler>,
        values_decoder: Box<dyn PhysicalPageScheduler>,
    ) -> Self {
        Self {
            mode: SchedulerNullStatus::Some(DataSchedulers {
                validity: validity_decoder,
                values: values_decoder,
            }),
        }
    }

    /// Create a new instance that does not need a validity bitmap because no item is null
    pub fn new_non_nullable(values_decoder: Box<dyn PhysicalPageScheduler>) -> Self {
        Self {
            mode: SchedulerNullStatus::None(values_decoder),
        }
    }

    /// Create a new instance where all values are null
    ///
    /// It may seem strange we need `values_decoder` here but Arrow requires that value
    /// buffers still be allocated / sized even if everything is null.  So we need the value
    /// decoder to calculate the capcity of the garbage buffer.
    pub fn new_all_null() -> Self {
        Self {
            mode: SchedulerNullStatus::All,
        }
    }
}

impl PhysicalPageScheduler for BasicPageScheduler {
    fn schedule_ranges(
        &self,
        ranges: &[std::ops::Range<u32>],
        scheduler: &dyn EncodingsIo,
        top_level_row: u64,
    ) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
        let validity_future = match &self.mode {
            SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None,
            SchedulerNullStatus::Some(schedulers) => Some(schedulers.validity.schedule_ranges(
                ranges,
                scheduler,
                top_level_row,
            )),
        };

        let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() {
            Some(
                values_scheduler
                    .schedule_ranges(ranges, scheduler, top_level_row)
                    .boxed(),
            )
        } else {
            trace!("No values fetch needed since values all null");
            None
        };

        async move {
            let mode = match (values_future, validity_future) {
                (None, None) => DataNullStatus::All,
                (Some(values_future), None) => DataNullStatus::None(values_future.await?),
                (Some(values_future), Some(validity_future)) => {
                    DataNullStatus::Some(DataDecoders {
                        values: values_future.await?,
                        validity: validity_future.await?,
                    })
                }
                _ => unreachable!(),
            };
            Ok(Box::new(BasicPageDecoder { mode }) as Box<dyn PhysicalPageDecoder>)
        }
        .boxed()
    }
}

struct BasicPageDecoder {
    mode: DataNullStatus,
}

impl PhysicalPageDecoder for BasicPageDecoder {
    fn update_capacity(
        &self,
        rows_to_skip: u32,
        num_rows: u32,
        buffers: &mut [(u64, bool)],
        all_null: &mut bool,
    ) {
        // No need to look at the validity decoder to know the dest buffer size since it is boolean
        buffers[0].0 = arrow_buffer::bit_util::ceil(num_rows as usize, 8) as u64;
        // The validity buffer is only required if we have some nulls
        buffers[0].1 = matches!(self.mode, DataNullStatus::Some(_));
        if let Some(values) = self.mode.values_decoder() {
            values.update_capacity(rows_to_skip, num_rows, &mut buffers[1..], all_null);
        } else {
            *all_null = true;
        }
    }

    fn decode_into(
        &self,
        rows_to_skip: u32,
        num_rows: u32,
        dest_buffers: &mut [bytes::BytesMut],
    ) -> Result<()> {
        match &self.mode {
            DataNullStatus::Some(decoders) => {
                decoders
                    .validity
                    .decode_into(rows_to_skip, num_rows, &mut dest_buffers[..1])?;
                decoders
                    .values
                    .decode_into(rows_to_skip, num_rows, &mut dest_buffers[1..])?;
            }
            // Either dest_buffers[0] is empty, in which case these are no-ops, or one of the
            // other pages needed the buffer, in which case we need to fill our section
            DataNullStatus::All => {
                dest_buffers[0].fill(0);
            }
            DataNullStatus::None(values) => {
                dest_buffers[0].fill(1);
                values.decode_into(rows_to_skip, num_rows, &mut dest_buffers[1..])?;
            }
        }
        Ok(())
    }

    fn num_buffers(&self) -> u32 {
        1 + self
            .mode
            .values_decoder()
            .map(|val| val.num_buffers())
            .unwrap_or(0)
    }
}

#[derive(Debug)]
pub struct BasicEncoder {
    values_encoder: Box<dyn ArrayEncoder>,
}

impl BasicEncoder {
    pub fn new(values_encoder: Box<dyn ArrayEncoder>) -> Self {
        Self { values_encoder }
    }
}

impl ArrayEncoder for BasicEncoder {
    fn encode(&self, arrays: &[ArrayRef], buffer_index: &mut u32) -> Result<EncodedArray> {
        let (null_count, row_count) = arrays
            .iter()
            .map(|arr| (arr.null_count() as u32, arr.len() as u32))
            .fold((0, 0), |acc, val| (acc.0 + val.0, acc.1 + val.1));
        let (buffers, nullability) = if null_count == 0 {
            let arr_encoding = self.values_encoder.encode(arrays, buffer_index)?;
            let encoding = pb::nullable::Nullability::NoNulls(Box::new(pb::nullable::NoNull {
                values: Some(Box::new(arr_encoding.encoding)),
            }));
            (arr_encoding.buffers, encoding)
        } else if null_count == row_count {
            let encoding = pb::nullable::Nullability::AllNulls(pb::nullable::AllNull {});
            (vec![], encoding)
        } else {
            let validity_as_arrays = arrays
                .iter()
                .map(|arr| {
                    if let Some(nulls) = arr.nulls() {
                        Arc::new(BooleanArray::new(nulls.inner().clone(), None)) as ArrayRef
                    } else {
                        let buff = BooleanBuffer::new_set(arr.len());
                        Arc::new(BooleanArray::new(buff, None)) as ArrayRef
                    }
                })
                .collect::<Vec<_>>();

            let validity_buffer_index = *buffer_index;
            *buffer_index += 1;
            let validity = BitmapBufferEncoder::default().encode(&validity_as_arrays)?;
            let validity_encoding = Box::new(pb::ArrayEncoding {
                array_encoding: Some(pb::array_encoding::ArrayEncoding::Flat(pb::Flat {
                    bits_per_value: 1,
                    buffer: Some(pb::Buffer {
                        buffer_index: validity_buffer_index,
                        buffer_type: pb::buffer::BufferType::Page as i32,
                    }),
                    compression: None,
                })),
            });

            let arr_encoding = self.values_encoder.encode(arrays, buffer_index)?;
            let encoding = pb::nullable::Nullability::SomeNulls(Box::new(pb::nullable::SomeNull {
                validity: Some(validity_encoding),
                values: Some(Box::new(arr_encoding.encoding)),
            }));

            let mut buffers = arr_encoding.buffers;
            buffers.push(EncodedArrayBuffer {
                parts: validity.parts,
                index: validity_buffer_index,
            });
            (buffers, encoding)
        };

        Ok(EncodedArray {
            buffers,
            encoding: pb::ArrayEncoding {
                array_encoding: Some(pb::array_encoding::ArrayEncoding::Nullable(Box::new(
                    pb::Nullable {
                        nullability: Some(nullability),
                    },
                ))),
            },
        })
    }
}