Skip to main content

reifydb_sdk/operator/
change.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use core::{slice, str};
5
6use reifydb_abi::{
7	data::{
8		buffer::BufferFFI,
9		column::{ColumnFFI, ColumnTypeCode, ColumnsFFI},
10	},
11	flow::{
12		change::{ChangeFFI, OriginFFI},
13		diff::{DiffFFI, DiffType},
14	},
15};
16
17#[derive(Clone, Copy)]
18pub struct BorrowedChange<'a> {
19	ffi: &'a ChangeFFI,
20}
21
22impl<'a> BorrowedChange<'a> {
23	/// # Safety
24	///
25	/// `ptr` must be non-null and point to a valid `ChangeFFI` whose backing
26	/// buffers remain live for the lifetime `'a`.
27	pub unsafe fn from_raw(ptr: *const ChangeFFI) -> Self {
28		debug_assert!(!ptr.is_null(), "BorrowedChange::from_raw: null pointer");
29		Self {
30			ffi: unsafe { &*ptr },
31		}
32	}
33
34	pub fn origin(&self) -> OriginFFI {
35		self.ffi.origin
36	}
37
38	pub fn version(&self) -> u64 {
39		self.ffi.version
40	}
41
42	pub fn changed_at_nanos(&self) -> u64 {
43		self.ffi.changed_at
44	}
45
46	pub fn diff_count(&self) -> usize {
47		self.ffi.diff_count
48	}
49
50	pub fn diffs(&self) -> impl Iterator<Item = BorrowedDiff<'a>> + 'a {
51		let count = self.ffi.diff_count;
52		let base = self.ffi.diffs;
53		(0..count).map(move |i| {
54			let diff_ffi: &'a DiffFFI = unsafe { &*base.add(i) };
55			BorrowedDiff {
56				ffi: diff_ffi,
57			}
58		})
59	}
60}
61
62#[derive(Clone, Copy)]
63pub struct BorrowedDiff<'a> {
64	ffi: &'a DiffFFI,
65}
66
67impl<'a> BorrowedDiff<'a> {
68	pub fn kind(&self) -> DiffType {
69		self.ffi.diff_type
70	}
71
72	pub fn pre(&self) -> BorrowedColumns<'a> {
73		BorrowedColumns {
74			ffi: &self.ffi.pre,
75		}
76	}
77
78	pub fn post(&self) -> BorrowedColumns<'a> {
79		BorrowedColumns {
80			ffi: &self.ffi.post,
81		}
82	}
83}
84
85#[derive(Clone, Copy)]
86pub struct BorrowedColumns<'a> {
87	ffi: &'a ColumnsFFI,
88}
89
90impl<'a> BorrowedColumns<'a> {
91	/// Wrap a raw `*const ColumnsFFI` for the duration of the FFI call.
92	///
93	/// # Safety
94	/// - `ptr` must be non-null and point at a `ColumnsFFI` whose buffer pointers are valid for at least `'a`.
95	pub unsafe fn from_ffi(ptr: *const ColumnsFFI) -> Self {
96		debug_assert!(!ptr.is_null(), "BorrowedColumns::from_ffi: null pointer");
97		Self {
98			ffi: unsafe { &*ptr },
99		}
100	}
101
102	pub fn row_count(&self) -> usize {
103		self.ffi.row_count
104	}
105
106	pub fn column_count(&self) -> usize {
107		self.ffi.column_count
108	}
109
110	pub fn is_empty(&self) -> bool {
111		self.ffi.row_count == 0 && self.ffi.column_count == 0
112	}
113
114	pub fn row_numbers(&self) -> &'a [u64] {
115		if self.ffi.row_numbers.is_null() || self.ffi.row_count == 0 {
116			&[]
117		} else {
118			unsafe { slice::from_raw_parts(self.ffi.row_numbers, self.ffi.row_count) }
119		}
120	}
121
122	pub fn created_at(&self) -> &'a [u64] {
123		if self.ffi.created_at.is_null() || self.ffi.row_count == 0 {
124			&[]
125		} else {
126			unsafe { slice::from_raw_parts(self.ffi.created_at, self.ffi.row_count) }
127		}
128	}
129
130	pub fn updated_at(&self) -> &'a [u64] {
131		if self.ffi.updated_at.is_null() || self.ffi.row_count == 0 {
132			&[]
133		} else {
134			unsafe { slice::from_raw_parts(self.ffi.updated_at, self.ffi.row_count) }
135		}
136	}
137
138	pub fn columns(&self) -> impl Iterator<Item = BorrowedColumn<'a>> + 'a {
139		let count = self.ffi.column_count;
140		let base = self.ffi.columns;
141		(0..count).map(move |i| {
142			let col_ffi: &'a ColumnFFI = unsafe { &*base.add(i) };
143			BorrowedColumn {
144				ffi: col_ffi,
145			}
146		})
147	}
148
149	pub fn column(&self, name: &str) -> Option<BorrowedColumn<'a>> {
150		self.columns().find(|c| c.name() == name)
151	}
152}
153
154#[derive(Clone, Copy)]
155pub struct BorrowedColumn<'a> {
156	ffi: &'a ColumnFFI,
157}
158
159impl<'a> BorrowedColumn<'a> {
160	pub fn name(&self) -> &'a str {
161		read_buffer_str(&self.ffi.name)
162	}
163
164	pub fn type_code(&self) -> ColumnTypeCode {
165		self.ffi.data.type_code
166	}
167
168	pub fn row_count(&self) -> usize {
169		self.ffi.data.row_count
170	}
171
172	pub fn data_bytes(&self) -> &'a [u8] {
173		read_buffer(&self.ffi.data.data)
174	}
175
176	pub fn offsets(&self) -> &'a [u64] {
177		let buf = &self.ffi.data.offsets;
178		if buf.ptr.is_null() || buf.len == 0 {
179			&[]
180		} else {
181			let count = buf.len / core::mem::size_of::<u64>();
182			unsafe { slice::from_raw_parts(buf.ptr as *const u64, count) }
183		}
184	}
185
186	pub fn defined_bitvec(&self) -> &'a [u8] {
187		read_buffer(&self.ffi.data.defined_bitvec)
188	}
189
190	/// # Safety
191	///
192	/// The caller must ensure the column's underlying bytes are a valid,
193	/// properly aligned array of `T` for the column's row count.
194	pub unsafe fn as_slice<T: Copy>(&self) -> Option<&'a [T]> {
195		let bytes = self.data_bytes();
196		let count = self.row_count();
197		let elem = core::mem::size_of::<T>();
198		if elem == 0 || count.checked_mul(elem)? != bytes.len() {
199			return None;
200		}
201		Some(unsafe { slice::from_raw_parts(bytes.as_ptr() as *const T, count) })
202	}
203
204	pub fn iter_str(&self) -> impl Iterator<Item = &'a str> + 'a {
205		let data = self.data_bytes();
206		let offsets = self.offsets();
207		let row_count = self.row_count();
208		let offsets_len = offsets.len();
209		(0..row_count).map(move |i| {
210			if i + 1 >= offsets_len {
211				return "";
212			}
213			let start = offsets[i] as usize;
214			let end = offsets[i + 1] as usize;
215			if end > data.len() {
216				return "";
217			}
218			str::from_utf8(&data[start..end]).unwrap_or("")
219		})
220	}
221
222	pub fn iter_bytes(&self) -> impl Iterator<Item = &'a [u8]> + 'a {
223		let data = self.data_bytes();
224		let offsets = self.offsets();
225		let row_count = self.row_count();
226		let offsets_len = offsets.len();
227		(0..row_count).map(move |i| {
228			if i + 1 >= offsets_len {
229				return &[][..];
230			}
231			let start = offsets[i] as usize;
232			let end = offsets[i + 1] as usize;
233			if end > data.len() {
234				return &[][..];
235			}
236			&data[start..end]
237		})
238	}
239}
240
241fn read_buffer<'a>(buf: &BufferFFI) -> &'a [u8] {
242	if buf.ptr.is_null() || buf.len == 0 {
243		&[]
244	} else {
245		unsafe { slice::from_raw_parts(buf.ptr, buf.len) }
246	}
247}
248
249fn read_buffer_str<'a>(buf: &BufferFFI) -> &'a str {
250	let bytes: &'a [u8] = read_buffer(buf);
251	str::from_utf8(bytes).unwrap_or("")
252}
253
254pub type DiffKind = reifydb_abi::flow::diff::DiffType;