Skip to main content

reifydb_sdk/operator/
view_row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::from_bytes;
5use reifydb_abi::data::column::ColumnTypeCode;
6use reifydb_type::value::{
7	Value, decimal::Decimal, ordered_f32::OrderedF32, ordered_f64::OrderedF64, row_number::RowNumber, r#type::Type,
8};
9use serde::de::DeserializeOwned;
10
11use crate::operator::change::{BorrowedColumn, BorrowedColumns};
12
13#[derive(Clone, Copy)]
14pub struct RowView<'a> {
15	columns: BorrowedColumns<'a>,
16	index: usize,
17}
18
19impl<'a> RowView<'a> {
20	pub(crate) fn new(columns: BorrowedColumns<'a>, index: usize) -> Self {
21		Self {
22			columns,
23			index,
24		}
25	}
26
27	pub fn index(&self) -> usize {
28		self.index
29	}
30
31	pub fn columns(&self) -> BorrowedColumns<'a> {
32		self.columns
33	}
34
35	pub fn row_number(&self) -> Option<RowNumber> {
36		self.columns.row_numbers().get(self.index).copied().map(RowNumber)
37	}
38
39	pub fn created_at_nanos(&self) -> Option<u64> {
40		self.columns.created_at().get(self.index).copied()
41	}
42
43	pub fn updated_at_nanos(&self) -> Option<u64> {
44		self.columns.updated_at().get(self.index).copied()
45	}
46
47	pub fn is_defined(&self, name: &str) -> bool {
48		match self.columns.column(name) {
49			Some(col) => is_defined_at(&col, self.index),
50			None => false,
51		}
52	}
53
54	pub fn utf8(&self, name: &str) -> Option<&'a str> {
55		let col = self.column_defined(name)?;
56		if col.type_code() != ColumnTypeCode::Utf8 {
57			return None;
58		}
59		col.iter_str().nth(self.index)
60	}
61
62	pub fn blob(&self, name: &str) -> Option<&'a [u8]> {
63		let col = self.column_defined(name)?;
64		if col.type_code() != ColumnTypeCode::Blob {
65			return None;
66		}
67		col.iter_bytes().nth(self.index)
68	}
69
70	pub fn bool(&self, name: &str) -> Option<bool> {
71		let col = self.column_defined(name)?;
72		if col.type_code() != ColumnTypeCode::Bool {
73			return None;
74		}
75		let bytes = col.data_bytes();
76		let byte = bytes.get(self.index / 8).copied()?;
77		Some((byte >> (self.index % 8)) & 1 == 1)
78	}
79
80	pub fn u64(&self, name: &str) -> Option<u64> {
81		let col = self.column_defined(name)?;
82		match col.type_code() {
83			ColumnTypeCode::Uint8 => fixed_at::<u64>(&col, self.index),
84			ColumnTypeCode::Uint4 => fixed_at::<u32>(&col, self.index).map(u64::from),
85			ColumnTypeCode::Uint2 => fixed_at::<u16>(&col, self.index).map(u64::from),
86			ColumnTypeCode::Uint1 => fixed_at::<u8>(&col, self.index).map(u64::from),
87			_ => None,
88		}
89	}
90
91	pub fn u32(&self, name: &str) -> Option<u32> {
92		let col = self.column_defined(name)?;
93		match col.type_code() {
94			ColumnTypeCode::Uint4 => fixed_at::<u32>(&col, self.index),
95			ColumnTypeCode::Uint2 => fixed_at::<u16>(&col, self.index).map(u32::from),
96			ColumnTypeCode::Uint1 => fixed_at::<u8>(&col, self.index).map(u32::from),
97			_ => None,
98		}
99	}
100
101	pub fn u16(&self, name: &str) -> Option<u16> {
102		let col = self.column_defined(name)?;
103		match col.type_code() {
104			ColumnTypeCode::Uint2 => fixed_at::<u16>(&col, self.index),
105			ColumnTypeCode::Uint1 => fixed_at::<u8>(&col, self.index).map(u16::from),
106			_ => None,
107		}
108	}
109
110	pub fn u8(&self, name: &str) -> Option<u8> {
111		let col = self.column_defined(name)?;
112		if col.type_code() != ColumnTypeCode::Uint1 {
113			return None;
114		}
115		fixed_at::<u8>(&col, self.index)
116	}
117
118	pub fn i64(&self, name: &str) -> Option<i64> {
119		let col = self.column_defined(name)?;
120		match col.type_code() {
121			ColumnTypeCode::Int8 => fixed_at::<i64>(&col, self.index),
122			ColumnTypeCode::Int4 => fixed_at::<i32>(&col, self.index).map(i64::from),
123			ColumnTypeCode::Int2 => fixed_at::<i16>(&col, self.index).map(i64::from),
124			ColumnTypeCode::Int1 => fixed_at::<i8>(&col, self.index).map(i64::from),
125			_ => None,
126		}
127	}
128
129	pub fn i32(&self, name: &str) -> Option<i32> {
130		let col = self.column_defined(name)?;
131		match col.type_code() {
132			ColumnTypeCode::Int4 => fixed_at::<i32>(&col, self.index),
133			ColumnTypeCode::Int2 => fixed_at::<i16>(&col, self.index).map(i32::from),
134			ColumnTypeCode::Int1 => fixed_at::<i8>(&col, self.index).map(i32::from),
135			_ => None,
136		}
137	}
138
139	pub fn i16(&self, name: &str) -> Option<i16> {
140		let col = self.column_defined(name)?;
141		match col.type_code() {
142			ColumnTypeCode::Int2 => fixed_at::<i16>(&col, self.index),
143			ColumnTypeCode::Int1 => fixed_at::<i8>(&col, self.index).map(i16::from),
144			_ => None,
145		}
146	}
147
148	pub fn i8(&self, name: &str) -> Option<i8> {
149		let col = self.column_defined(name)?;
150		if col.type_code() != ColumnTypeCode::Int1 {
151			return None;
152		}
153		fixed_at::<i8>(&col, self.index)
154	}
155
156	pub fn f64(&self, name: &str) -> Option<f64> {
157		let col = self.column_defined(name)?;
158		match col.type_code() {
159			ColumnTypeCode::Float8 => fixed_at::<f64>(&col, self.index),
160			ColumnTypeCode::Float4 => fixed_at::<f32>(&col, self.index).map(f64::from),
161			_ => None,
162		}
163	}
164
165	pub fn f32(&self, name: &str) -> Option<f32> {
166		let col = self.column_defined(name)?;
167		if col.type_code() != ColumnTypeCode::Float4 {
168			return None;
169		}
170		fixed_at::<f32>(&col, self.index)
171	}
172
173	pub fn decimal(&self, name: &str) -> Option<Decimal> {
174		let col = self.column_defined(name)?;
175		match col.type_code() {
176			ColumnTypeCode::Decimal => decode_serialized_at::<Decimal>(&col, self.index),
177			ColumnTypeCode::Float8 => fixed_at::<f64>(&col, self.index).map(Decimal::from),
178			ColumnTypeCode::Float4 => fixed_at::<f32>(&col, self.index).map(|v| Decimal::from(v as f64)),
179			_ => None,
180		}
181	}
182
183	pub fn value(&self, name: &str) -> Option<Value> {
184		let col = self.columns.column(name)?;
185		Some(read_value_at(&col, self.index))
186	}
187
188	fn column_defined(&self, name: &str) -> Option<BorrowedColumn<'a>> {
189		let col = self.columns.column(name)?;
190		if !is_defined_at(&col, self.index) {
191			return None;
192		}
193		Some(col)
194	}
195}
196
197pub(crate) fn is_defined_at(col: &BorrowedColumn<'_>, index: usize) -> bool {
198	let bv = col.defined_bitvec();
199	if bv.is_empty() {
200		return true;
201	}
202	let byte = match bv.get(index / 8) {
203		Some(b) => *b,
204		None => return false,
205	};
206	(byte >> (index % 8)) & 1 == 1
207}
208
209pub(crate) fn fixed_at<T: Copy>(col: &BorrowedColumn<'_>, index: usize) -> Option<T> {
210	let slice = unsafe { col.as_slice::<T>()? };
211	slice.get(index).copied()
212}
213
214pub(crate) fn decode_serialized_at<T>(col: &BorrowedColumn<'_>, index: usize) -> Option<T>
215where
216	T: DeserializeOwned,
217{
218	let data = col.data_bytes();
219	let offsets = col.offsets();
220	if index + 1 >= offsets.len() {
221		return None;
222	}
223	let start = offsets[index] as usize;
224	let end = offsets[index + 1] as usize;
225	if end > data.len() || start > end {
226		return None;
227	}
228	from_bytes::<T>(&data[start..end]).ok()
229}
230
231fn type_for_code(code: ColumnTypeCode) -> Type {
232	match code {
233		ColumnTypeCode::Bool => Type::Boolean,
234		ColumnTypeCode::Float4 => Type::Float4,
235		ColumnTypeCode::Float8 => Type::Float8,
236		ColumnTypeCode::Int1 => Type::Int1,
237		ColumnTypeCode::Int2 => Type::Int2,
238		ColumnTypeCode::Int4 => Type::Int4,
239		ColumnTypeCode::Int8 => Type::Int8,
240		ColumnTypeCode::Int16 => Type::Int16,
241		ColumnTypeCode::Uint1 => Type::Uint1,
242		ColumnTypeCode::Uint2 => Type::Uint2,
243		ColumnTypeCode::Uint4 => Type::Uint4,
244		ColumnTypeCode::Uint8 => Type::Uint8,
245		ColumnTypeCode::Uint16 => Type::Uint16,
246		ColumnTypeCode::Utf8 => Type::Utf8,
247		ColumnTypeCode::Decimal => Type::Decimal,
248		ColumnTypeCode::Blob => Type::Blob,
249		_ => Type::Any,
250	}
251}
252
253fn none_value(code: ColumnTypeCode) -> Value {
254	Value::None {
255		inner: type_for_code(code),
256	}
257}
258
259fn read_value_at(col: &BorrowedColumn<'_>, index: usize) -> Value {
260	let code = col.type_code();
261	if !is_defined_at(col, index) {
262		return none_value(code);
263	}
264	match code {
265		ColumnTypeCode::Bool => col
266			.data_bytes()
267			.get(index / 8)
268			.copied()
269			.map(|b| Value::Boolean((b >> (index % 8)) & 1 == 1))
270			.unwrap_or_else(|| none_value(code)),
271		ColumnTypeCode::Float4 => fixed_at::<f32>(col, index)
272			.and_then(|v| OrderedF32::try_from(v).ok())
273			.map(Value::Float4)
274			.unwrap_or_else(|| none_value(code)),
275		ColumnTypeCode::Float8 => fixed_at::<f64>(col, index)
276			.and_then(|v| OrderedF64::try_from(v).ok())
277			.map(Value::Float8)
278			.unwrap_or_else(|| none_value(code)),
279		ColumnTypeCode::Int1 => fixed_at::<i8>(col, index).map(Value::Int1).unwrap_or_else(|| none_value(code)),
280		ColumnTypeCode::Int2 => {
281			fixed_at::<i16>(col, index).map(Value::Int2).unwrap_or_else(|| none_value(code))
282		}
283		ColumnTypeCode::Int4 => {
284			fixed_at::<i32>(col, index).map(Value::Int4).unwrap_or_else(|| none_value(code))
285		}
286		ColumnTypeCode::Int8 => {
287			fixed_at::<i64>(col, index).map(Value::Int8).unwrap_or_else(|| none_value(code))
288		}
289		ColumnTypeCode::Int16 => {
290			fixed_at::<i128>(col, index).map(Value::Int16).unwrap_or_else(|| none_value(code))
291		}
292		ColumnTypeCode::Uint1 => {
293			fixed_at::<u8>(col, index).map(Value::Uint1).unwrap_or_else(|| none_value(code))
294		}
295		ColumnTypeCode::Uint2 => {
296			fixed_at::<u16>(col, index).map(Value::Uint2).unwrap_or_else(|| none_value(code))
297		}
298		ColumnTypeCode::Uint4 => {
299			fixed_at::<u32>(col, index).map(Value::Uint4).unwrap_or_else(|| none_value(code))
300		}
301		ColumnTypeCode::Uint8 => {
302			fixed_at::<u64>(col, index).map(Value::Uint8).unwrap_or_else(|| none_value(code))
303		}
304		ColumnTypeCode::Uint16 => {
305			fixed_at::<u128>(col, index).map(Value::Uint16).unwrap_or_else(|| none_value(code))
306		}
307		ColumnTypeCode::Utf8 => col
308			.iter_str()
309			.nth(index)
310			.map(|s| Value::Utf8(s.to_string()))
311			.unwrap_or_else(|| none_value(code)),
312		ColumnTypeCode::Decimal => decode_serialized_at::<Decimal>(col, index)
313			.map(Value::Decimal)
314			.unwrap_or_else(|| none_value(code)),
315		_ => none_value(code),
316	}
317}
318
319impl<'a> BorrowedColumns<'a> {
320	pub fn row(self, index: usize) -> Option<RowView<'a>> {
321		if index >= self.row_count() {
322			return None;
323		}
324		Some(RowView::new(self, index))
325	}
326
327	pub fn rows(self) -> impl Iterator<Item = RowView<'a>> {
328		(0..self.row_count()).map(move |i| RowView::new(self, i))
329	}
330}