lance_encoding/previous/encodings/logical/
binary.rs1use std::sync::Arc;
5
6use arrow_array::{
7 cast::AsArray,
8 types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, UInt8Type, Utf8Type},
9 Array, ArrayRef, GenericByteArray, GenericListArray,
10};
11
12use arrow_schema::DataType;
13use futures::{future::BoxFuture, FutureExt};
14use lance_core::Result;
15use log::trace;
16
17use crate::{
18 decoder::{
19 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
20 ScheduledScanLine, SchedulerContext,
21 },
22 previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
23};
24
25#[derive(Debug)]
28pub struct BinarySchedulingJob<'a> {
29 scheduler: &'a BinaryFieldScheduler,
30 inner: Box<dyn SchedulingJob + 'a>,
31}
32
33impl SchedulingJob for BinarySchedulingJob<'_> {
34 fn schedule_next(
35 &mut self,
36 context: &mut SchedulerContext,
37 priority: &dyn PriorityRange,
38 ) -> Result<ScheduledScanLine> {
39 let inner_scan = self.inner.schedule_next(context, priority)?;
40 let wrapped_decoders = inner_scan
41 .decoders
42 .into_iter()
43 .map(|message| {
44 let decoder = message.into_legacy();
45 MessageType::DecoderReady(DecoderReady {
46 path: decoder.path,
47 decoder: Box::new(BinaryPageDecoder {
48 inner: decoder.decoder,
49 data_type: self.scheduler.data_type.clone(),
50 }),
51 })
52 })
53 .collect::<Vec<_>>();
54 Ok(ScheduledScanLine {
55 decoders: wrapped_decoders,
56 rows_scheduled: inner_scan.rows_scheduled,
57 })
58 }
59
60 fn num_rows(&self) -> u64 {
61 self.inner.num_rows()
62 }
63}
64
65#[derive(Debug)]
67pub struct BinaryFieldScheduler {
68 varbin_scheduler: Arc<dyn FieldScheduler>,
69 data_type: DataType,
70}
71
72impl BinaryFieldScheduler {
73 pub fn new(varbin_scheduler: Arc<dyn FieldScheduler>, data_type: DataType) -> Self {
75 Self {
76 varbin_scheduler,
77 data_type,
78 }
79 }
80}
81
82impl FieldScheduler for BinaryFieldScheduler {
83 fn schedule_ranges<'a>(
84 &'a self,
85 ranges: &[std::ops::Range<u64>],
86 filter: &FilterExpression,
87 ) -> Result<Box<dyn SchedulingJob + 'a>> {
88 trace!("Scheduling binary for {} ranges", ranges.len());
89 let varbin_job = self.varbin_scheduler.schedule_ranges(ranges, filter)?;
90 Ok(Box::new(BinarySchedulingJob {
91 scheduler: self,
92 inner: varbin_job,
93 }))
94 }
95
96 fn num_rows(&self) -> u64 {
97 self.varbin_scheduler.num_rows()
98 }
99
100 fn initialize<'a>(
101 &'a self,
102 _filter: &'a FilterExpression,
103 _context: &'a SchedulerContext,
104 ) -> BoxFuture<'a, Result<()>> {
105 std::future::ready(Ok(())).boxed()
107 }
108}
109
110#[derive(Debug)]
111pub struct BinaryPageDecoder {
112 inner: Box<dyn LogicalPageDecoder>,
113 data_type: DataType,
114}
115
116impl LogicalPageDecoder for BinaryPageDecoder {
117 fn wait_for_loaded(&mut self, num_rows: u64) -> BoxFuture<'_, Result<()>> {
118 self.inner.wait_for_loaded(num_rows)
119 }
120
121 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
122 let inner_task = self.inner.drain(num_rows)?;
123 Ok(NextDecodeTask {
124 num_rows: inner_task.num_rows,
125 task: Box::new(BinaryArrayDecoder {
126 inner: inner_task.task,
127 data_type: self.data_type.clone(),
128 }),
129 })
130 }
131
132 fn data_type(&self) -> &DataType {
133 &self.data_type
134 }
135
136 fn rows_loaded(&self) -> u64 {
137 self.inner.rows_loaded()
138 }
139
140 fn num_rows(&self) -> u64 {
141 self.inner.num_rows()
142 }
143
144 fn rows_drained(&self) -> u64 {
145 self.inner.rows_drained()
146 }
147}
148
149pub struct BinaryArrayDecoder {
150 inner: Box<dyn DecodeArrayTask>,
151 data_type: DataType,
152}
153
154impl BinaryArrayDecoder {
155 fn from_list_array<T: ByteArrayType>(array: &GenericListArray<T::Offset>) -> ArrayRef {
156 let values = array
157 .values()
158 .as_primitive::<UInt8Type>()
159 .values()
160 .inner()
161 .clone();
162 let offsets = array.offsets().clone();
163 Arc::new(GenericByteArray::<T>::new(
164 offsets,
165 values,
166 array.nulls().cloned(),
167 ))
168 }
169}
170
171impl DecodeArrayTask for BinaryArrayDecoder {
172 fn decode(self: Box<Self>) -> Result<ArrayRef> {
173 let data_type = self.data_type;
174 let arr = self.inner.decode()?;
175 match data_type {
176 DataType::Binary => Ok(Self::from_list_array::<BinaryType>(arr.as_list::<i32>())),
177 DataType::LargeBinary => Ok(Self::from_list_array::<LargeBinaryType>(
178 arr.as_list::<i64>(),
179 )),
180 DataType::Utf8 => Ok(Self::from_list_array::<Utf8Type>(arr.as_list::<i32>())),
181 DataType::LargeUtf8 => Ok(Self::from_list_array::<LargeUtf8Type>(arr.as_list::<i64>())),
182 _ => panic!("Binary decoder does not support this data type"),
183 }
184 }
185}