1use postcard::from_bytes;
5use reifydb_abi::data::column::ColumnTypeCode;
6use reifydb_type::value::{
7 Value, decimal::Decimal, ordered_f32::OrderedF32, ordered_f64::OrderedF64, row_number::RowNumber, r#type::Type,
8};
9use serde::de::DeserializeOwned;
10
11use crate::operator::change::{BorrowedColumn, BorrowedColumns};
12
13#[derive(Clone, Copy)]
14pub struct RowView<'a> {
15 columns: BorrowedColumns<'a>,
16 index: usize,
17}
18
19impl<'a> RowView<'a> {
20 pub(crate) fn new(columns: BorrowedColumns<'a>, index: usize) -> Self {
21 Self {
22 columns,
23 index,
24 }
25 }
26
27 pub fn index(&self) -> usize {
28 self.index
29 }
30
31 pub fn columns(&self) -> BorrowedColumns<'a> {
32 self.columns
33 }
34
35 pub fn row_number(&self) -> Option<RowNumber> {
36 self.columns.row_numbers().get(self.index).copied().map(RowNumber)
37 }
38
39 pub fn created_at_nanos(&self) -> Option<u64> {
40 self.columns.created_at().get(self.index).copied()
41 }
42
43 pub fn updated_at_nanos(&self) -> Option<u64> {
44 self.columns.updated_at().get(self.index).copied()
45 }
46
47 pub fn is_defined(&self, name: &str) -> bool {
48 match self.columns.column(name) {
49 Some(col) => is_defined_at(&col, self.index),
50 None => false,
51 }
52 }
53
54 pub fn utf8(&self, name: &str) -> Option<&'a str> {
55 let col = self.column_defined(name)?;
56 if col.type_code() != ColumnTypeCode::Utf8 {
57 return None;
58 }
59 col.iter_str().nth(self.index)
60 }
61
62 pub fn blob(&self, name: &str) -> Option<&'a [u8]> {
63 let col = self.column_defined(name)?;
64 if col.type_code() != ColumnTypeCode::Blob {
65 return None;
66 }
67 col.iter_bytes().nth(self.index)
68 }
69
70 pub fn bool(&self, name: &str) -> Option<bool> {
71 let col = self.column_defined(name)?;
72 if col.type_code() != ColumnTypeCode::Bool {
73 return None;
74 }
75 let bytes = col.data_bytes();
76 let byte = bytes.get(self.index / 8).copied()?;
77 Some((byte >> (self.index % 8)) & 1 == 1)
78 }
79
80 pub fn u64(&self, name: &str) -> Option<u64> {
81 let col = self.column_defined(name)?;
82 match col.type_code() {
83 ColumnTypeCode::Uint8 => fixed_at::<u64>(&col, self.index),
84 ColumnTypeCode::Uint4 => fixed_at::<u32>(&col, self.index).map(u64::from),
85 ColumnTypeCode::Uint2 => fixed_at::<u16>(&col, self.index).map(u64::from),
86 ColumnTypeCode::Uint1 => fixed_at::<u8>(&col, self.index).map(u64::from),
87 _ => None,
88 }
89 }
90
91 pub fn u32(&self, name: &str) -> Option<u32> {
92 let col = self.column_defined(name)?;
93 match col.type_code() {
94 ColumnTypeCode::Uint4 => fixed_at::<u32>(&col, self.index),
95 ColumnTypeCode::Uint2 => fixed_at::<u16>(&col, self.index).map(u32::from),
96 ColumnTypeCode::Uint1 => fixed_at::<u8>(&col, self.index).map(u32::from),
97 _ => None,
98 }
99 }
100
101 pub fn u16(&self, name: &str) -> Option<u16> {
102 let col = self.column_defined(name)?;
103 match col.type_code() {
104 ColumnTypeCode::Uint2 => fixed_at::<u16>(&col, self.index),
105 ColumnTypeCode::Uint1 => fixed_at::<u8>(&col, self.index).map(u16::from),
106 _ => None,
107 }
108 }
109
110 pub fn u8(&self, name: &str) -> Option<u8> {
111 let col = self.column_defined(name)?;
112 if col.type_code() != ColumnTypeCode::Uint1 {
113 return None;
114 }
115 fixed_at::<u8>(&col, self.index)
116 }
117
118 pub fn i64(&self, name: &str) -> Option<i64> {
119 let col = self.column_defined(name)?;
120 match col.type_code() {
121 ColumnTypeCode::Int8 => fixed_at::<i64>(&col, self.index),
122 ColumnTypeCode::Int4 => fixed_at::<i32>(&col, self.index).map(i64::from),
123 ColumnTypeCode::Int2 => fixed_at::<i16>(&col, self.index).map(i64::from),
124 ColumnTypeCode::Int1 => fixed_at::<i8>(&col, self.index).map(i64::from),
125 _ => None,
126 }
127 }
128
129 pub fn i32(&self, name: &str) -> Option<i32> {
130 let col = self.column_defined(name)?;
131 match col.type_code() {
132 ColumnTypeCode::Int4 => fixed_at::<i32>(&col, self.index),
133 ColumnTypeCode::Int2 => fixed_at::<i16>(&col, self.index).map(i32::from),
134 ColumnTypeCode::Int1 => fixed_at::<i8>(&col, self.index).map(i32::from),
135 _ => None,
136 }
137 }
138
139 pub fn i16(&self, name: &str) -> Option<i16> {
140 let col = self.column_defined(name)?;
141 match col.type_code() {
142 ColumnTypeCode::Int2 => fixed_at::<i16>(&col, self.index),
143 ColumnTypeCode::Int1 => fixed_at::<i8>(&col, self.index).map(i16::from),
144 _ => None,
145 }
146 }
147
148 pub fn i8(&self, name: &str) -> Option<i8> {
149 let col = self.column_defined(name)?;
150 if col.type_code() != ColumnTypeCode::Int1 {
151 return None;
152 }
153 fixed_at::<i8>(&col, self.index)
154 }
155
156 pub fn f64(&self, name: &str) -> Option<f64> {
157 let col = self.column_defined(name)?;
158 match col.type_code() {
159 ColumnTypeCode::Float8 => fixed_at::<f64>(&col, self.index),
160 ColumnTypeCode::Float4 => fixed_at::<f32>(&col, self.index).map(f64::from),
161 _ => None,
162 }
163 }
164
165 pub fn f32(&self, name: &str) -> Option<f32> {
166 let col = self.column_defined(name)?;
167 if col.type_code() != ColumnTypeCode::Float4 {
168 return None;
169 }
170 fixed_at::<f32>(&col, self.index)
171 }
172
173 pub fn decimal(&self, name: &str) -> Option<Decimal> {
174 let col = self.column_defined(name)?;
175 match col.type_code() {
176 ColumnTypeCode::Decimal => decode_serialized_at::<Decimal>(&col, self.index),
177 ColumnTypeCode::Float8 => fixed_at::<f64>(&col, self.index).map(Decimal::from),
178 ColumnTypeCode::Float4 => fixed_at::<f32>(&col, self.index).map(|v| Decimal::from(v as f64)),
179 _ => None,
180 }
181 }
182
183 pub fn value(&self, name: &str) -> Option<Value> {
184 let col = self.columns.column(name)?;
185 Some(read_value_at(&col, self.index))
186 }
187
188 fn column_defined(&self, name: &str) -> Option<BorrowedColumn<'a>> {
189 let col = self.columns.column(name)?;
190 if !is_defined_at(&col, self.index) {
191 return None;
192 }
193 Some(col)
194 }
195}
196
197pub(crate) fn is_defined_at(col: &BorrowedColumn<'_>, index: usize) -> bool {
198 let bv = col.defined_bitvec();
199 if bv.is_empty() {
200 return true;
201 }
202 let byte = match bv.get(index / 8) {
203 Some(b) => *b,
204 None => return false,
205 };
206 (byte >> (index % 8)) & 1 == 1
207}
208
209pub(crate) fn fixed_at<T: Copy>(col: &BorrowedColumn<'_>, index: usize) -> Option<T> {
210 let slice = unsafe { col.as_slice::<T>()? };
211 slice.get(index).copied()
212}
213
214pub(crate) fn decode_serialized_at<T>(col: &BorrowedColumn<'_>, index: usize) -> Option<T>
215where
216 T: DeserializeOwned,
217{
218 let data = col.data_bytes();
219 let offsets = col.offsets();
220 if index + 1 >= offsets.len() {
221 return None;
222 }
223 let start = offsets[index] as usize;
224 let end = offsets[index + 1] as usize;
225 if end > data.len() || start > end {
226 return None;
227 }
228 from_bytes::<T>(&data[start..end]).ok()
229}
230
231fn type_for_code(code: ColumnTypeCode) -> Type {
232 match code {
233 ColumnTypeCode::Bool => Type::Boolean,
234 ColumnTypeCode::Float4 => Type::Float4,
235 ColumnTypeCode::Float8 => Type::Float8,
236 ColumnTypeCode::Int1 => Type::Int1,
237 ColumnTypeCode::Int2 => Type::Int2,
238 ColumnTypeCode::Int4 => Type::Int4,
239 ColumnTypeCode::Int8 => Type::Int8,
240 ColumnTypeCode::Int16 => Type::Int16,
241 ColumnTypeCode::Uint1 => Type::Uint1,
242 ColumnTypeCode::Uint2 => Type::Uint2,
243 ColumnTypeCode::Uint4 => Type::Uint4,
244 ColumnTypeCode::Uint8 => Type::Uint8,
245 ColumnTypeCode::Uint16 => Type::Uint16,
246 ColumnTypeCode::Utf8 => Type::Utf8,
247 ColumnTypeCode::Decimal => Type::Decimal,
248 ColumnTypeCode::Blob => Type::Blob,
249 _ => Type::Any,
250 }
251}
252
253fn none_value(code: ColumnTypeCode) -> Value {
254 Value::None {
255 inner: type_for_code(code),
256 }
257}
258
259fn read_value_at(col: &BorrowedColumn<'_>, index: usize) -> Value {
260 let code = col.type_code();
261 if !is_defined_at(col, index) {
262 return none_value(code);
263 }
264 match code {
265 ColumnTypeCode::Bool => col
266 .data_bytes()
267 .get(index / 8)
268 .copied()
269 .map(|b| Value::Boolean((b >> (index % 8)) & 1 == 1))
270 .unwrap_or_else(|| none_value(code)),
271 ColumnTypeCode::Float4 => fixed_at::<f32>(col, index)
272 .and_then(|v| OrderedF32::try_from(v).ok())
273 .map(Value::Float4)
274 .unwrap_or_else(|| none_value(code)),
275 ColumnTypeCode::Float8 => fixed_at::<f64>(col, index)
276 .and_then(|v| OrderedF64::try_from(v).ok())
277 .map(Value::Float8)
278 .unwrap_or_else(|| none_value(code)),
279 ColumnTypeCode::Int1 => fixed_at::<i8>(col, index).map(Value::Int1).unwrap_or_else(|| none_value(code)),
280 ColumnTypeCode::Int2 => {
281 fixed_at::<i16>(col, index).map(Value::Int2).unwrap_or_else(|| none_value(code))
282 }
283 ColumnTypeCode::Int4 => {
284 fixed_at::<i32>(col, index).map(Value::Int4).unwrap_or_else(|| none_value(code))
285 }
286 ColumnTypeCode::Int8 => {
287 fixed_at::<i64>(col, index).map(Value::Int8).unwrap_or_else(|| none_value(code))
288 }
289 ColumnTypeCode::Int16 => {
290 fixed_at::<i128>(col, index).map(Value::Int16).unwrap_or_else(|| none_value(code))
291 }
292 ColumnTypeCode::Uint1 => {
293 fixed_at::<u8>(col, index).map(Value::Uint1).unwrap_or_else(|| none_value(code))
294 }
295 ColumnTypeCode::Uint2 => {
296 fixed_at::<u16>(col, index).map(Value::Uint2).unwrap_or_else(|| none_value(code))
297 }
298 ColumnTypeCode::Uint4 => {
299 fixed_at::<u32>(col, index).map(Value::Uint4).unwrap_or_else(|| none_value(code))
300 }
301 ColumnTypeCode::Uint8 => {
302 fixed_at::<u64>(col, index).map(Value::Uint8).unwrap_or_else(|| none_value(code))
303 }
304 ColumnTypeCode::Uint16 => {
305 fixed_at::<u128>(col, index).map(Value::Uint16).unwrap_or_else(|| none_value(code))
306 }
307 ColumnTypeCode::Utf8 => col
308 .iter_str()
309 .nth(index)
310 .map(|s| Value::Utf8(s.to_string()))
311 .unwrap_or_else(|| none_value(code)),
312 ColumnTypeCode::Decimal => decode_serialized_at::<Decimal>(col, index)
313 .map(Value::Decimal)
314 .unwrap_or_else(|| none_value(code)),
315 _ => none_value(code),
316 }
317}
318
319impl<'a> BorrowedColumns<'a> {
320 pub fn row(self, index: usize) -> Option<RowView<'a>> {
321 if index >= self.row_count() {
322 return None;
323 }
324 Some(RowView::new(self, index))
325 }
326
327 pub fn rows(self) -> impl Iterator<Item = RowView<'a>> {
328 (0..self.row_count()).map(move |i| RowView::new(self, i))
329 }
330}