lance_encoding/previous/encodings/logical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{
7    cast::AsArray,
8    types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, UInt8Type, Utf8Type},
9    Array, ArrayRef, GenericByteArray, GenericListArray,
10};
11
12use arrow_schema::DataType;
13use futures::{future::BoxFuture, FutureExt};
14use lance_core::Result;
15use log::trace;
16
17use crate::{
18    decoder::{
19        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
20        ScheduledScanLine, SchedulerContext,
21    },
22    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
23};
24
25/// Wraps a varbin scheduler and uses a BinaryPageDecoder to cast
26/// the result to the appropriate type
27#[derive(Debug)]
28pub struct BinarySchedulingJob<'a> {
29    scheduler: &'a BinaryFieldScheduler,
30    inner: Box<dyn SchedulingJob + 'a>,
31}
32
33impl SchedulingJob for BinarySchedulingJob<'_> {
34    fn schedule_next(
35        &mut self,
36        context: &mut SchedulerContext,
37        priority: &dyn PriorityRange,
38    ) -> Result<ScheduledScanLine> {
39        let inner_scan = self.inner.schedule_next(context, priority)?;
40        let wrapped_decoders = inner_scan
41            .decoders
42            .into_iter()
43            .map(|message| {
44                let decoder = message.into_legacy();
45                MessageType::DecoderReady(DecoderReady {
46                    path: decoder.path,
47                    decoder: Box::new(BinaryPageDecoder {
48                        inner: decoder.decoder,
49                        data_type: self.scheduler.data_type.clone(),
50                    }),
51                })
52            })
53            .collect::<Vec<_>>();
54        Ok(ScheduledScanLine {
55            decoders: wrapped_decoders,
56            rows_scheduled: inner_scan.rows_scheduled,
57        })
58    }
59
60    fn num_rows(&self) -> u64 {
61        self.inner.num_rows()
62    }
63}
64
65/// A logical scheduler for utf8/binary pages which assumes the data are encoded as List<u8>
66#[derive(Debug)]
67pub struct BinaryFieldScheduler {
68    varbin_scheduler: Arc<dyn FieldScheduler>,
69    data_type: DataType,
70}
71
72impl BinaryFieldScheduler {
73    // Create a new ListPageScheduler
74    pub fn new(varbin_scheduler: Arc<dyn FieldScheduler>, data_type: DataType) -> Self {
75        Self {
76            varbin_scheduler,
77            data_type,
78        }
79    }
80}
81
82impl FieldScheduler for BinaryFieldScheduler {
83    fn schedule_ranges<'a>(
84        &'a self,
85        ranges: &[std::ops::Range<u64>],
86        filter: &FilterExpression,
87    ) -> Result<Box<dyn SchedulingJob + 'a>> {
88        trace!("Scheduling binary for {} ranges", ranges.len());
89        let varbin_job = self.varbin_scheduler.schedule_ranges(ranges, filter)?;
90        Ok(Box::new(BinarySchedulingJob {
91            scheduler: self,
92            inner: varbin_job,
93        }))
94    }
95
96    fn num_rows(&self) -> u64 {
97        self.varbin_scheduler.num_rows()
98    }
99
100    fn initialize<'a>(
101        &'a self,
102        _filter: &'a FilterExpression,
103        _context: &'a SchedulerContext,
104    ) -> BoxFuture<'a, Result<()>> {
105        // 2.0 schedulers do not need to initialize
106        std::future::ready(Ok(())).boxed()
107    }
108}
109
110#[derive(Debug)]
111pub struct BinaryPageDecoder {
112    inner: Box<dyn LogicalPageDecoder>,
113    data_type: DataType,
114}
115
116impl LogicalPageDecoder for BinaryPageDecoder {
117    fn wait_for_loaded(&mut self, num_rows: u64) -> BoxFuture<'_, Result<()>> {
118        self.inner.wait_for_loaded(num_rows)
119    }
120
121    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
122        let inner_task = self.inner.drain(num_rows)?;
123        Ok(NextDecodeTask {
124            num_rows: inner_task.num_rows,
125            task: Box::new(BinaryArrayDecoder {
126                inner: inner_task.task,
127                data_type: self.data_type.clone(),
128            }),
129        })
130    }
131
132    fn data_type(&self) -> &DataType {
133        &self.data_type
134    }
135
136    fn rows_loaded(&self) -> u64 {
137        self.inner.rows_loaded()
138    }
139
140    fn num_rows(&self) -> u64 {
141        self.inner.num_rows()
142    }
143
144    fn rows_drained(&self) -> u64 {
145        self.inner.rows_drained()
146    }
147}
148
149pub struct BinaryArrayDecoder {
150    inner: Box<dyn DecodeArrayTask>,
151    data_type: DataType,
152}
153
154impl BinaryArrayDecoder {
155    fn from_list_array<T: ByteArrayType>(array: &GenericListArray<T::Offset>) -> ArrayRef {
156        let values = array
157            .values()
158            .as_primitive::<UInt8Type>()
159            .values()
160            .inner()
161            .clone();
162        let offsets = array.offsets().clone();
163        Arc::new(GenericByteArray::<T>::new(
164            offsets,
165            values,
166            array.nulls().cloned(),
167        ))
168    }
169}
170
171impl DecodeArrayTask for BinaryArrayDecoder {
172    fn decode(self: Box<Self>) -> Result<ArrayRef> {
173        let data_type = self.data_type;
174        let arr = self.inner.decode()?;
175        match data_type {
176            DataType::Binary => Ok(Self::from_list_array::<BinaryType>(arr.as_list::<i32>())),
177            DataType::LargeBinary => Ok(Self::from_list_array::<LargeBinaryType>(
178                arr.as_list::<i64>(),
179            )),
180            DataType::Utf8 => Ok(Self::from_list_array::<Utf8Type>(arr.as_list::<i32>())),
181            DataType::LargeUtf8 => Ok(Self::from_list_array::<LargeUtf8Type>(arr.as_list::<i64>())),
182            _ => panic!("Binary decoder does not support this data type"),
183        }
184    }
185}