reifydb_sdk/operator/
change.rs1use core::{slice, str};
5
6use reifydb_abi::{
7 data::{
8 buffer::BufferFFI,
9 column::{ColumnFFI, ColumnTypeCode, ColumnsFFI},
10 },
11 flow::{
12 change::{ChangeFFI, OriginFFI},
13 diff::{DiffFFI, DiffType},
14 },
15};
16
17#[derive(Clone, Copy)]
18pub struct BorrowedChange<'a> {
19 ffi: &'a ChangeFFI,
20}
21
22impl<'a> BorrowedChange<'a> {
23 pub unsafe fn from_raw(ptr: *const ChangeFFI) -> Self {
28 debug_assert!(!ptr.is_null(), "BorrowedChange::from_raw: null pointer");
29 Self {
30 ffi: unsafe { &*ptr },
31 }
32 }
33
34 pub fn origin(&self) -> OriginFFI {
35 self.ffi.origin
36 }
37
38 pub fn version(&self) -> u64 {
39 self.ffi.version
40 }
41
42 pub fn changed_at_nanos(&self) -> u64 {
43 self.ffi.changed_at
44 }
45
46 pub fn diff_count(&self) -> usize {
47 self.ffi.diff_count
48 }
49
50 pub fn diffs(&self) -> impl Iterator<Item = BorrowedDiff<'a>> + 'a {
51 let count = self.ffi.diff_count;
52 let base = self.ffi.diffs;
53 (0..count).map(move |i| {
54 let diff_ffi: &'a DiffFFI = unsafe { &*base.add(i) };
55 BorrowedDiff {
56 ffi: diff_ffi,
57 }
58 })
59 }
60}
61
62#[derive(Clone, Copy)]
63pub struct BorrowedDiff<'a> {
64 ffi: &'a DiffFFI,
65}
66
67impl<'a> BorrowedDiff<'a> {
68 pub fn kind(&self) -> DiffType {
69 self.ffi.diff_type
70 }
71
72 pub fn pre(&self) -> BorrowedColumns<'a> {
73 BorrowedColumns {
74 ffi: &self.ffi.pre,
75 }
76 }
77
78 pub fn post(&self) -> BorrowedColumns<'a> {
79 BorrowedColumns {
80 ffi: &self.ffi.post,
81 }
82 }
83}
84
85#[derive(Clone, Copy)]
86pub struct BorrowedColumns<'a> {
87 ffi: &'a ColumnsFFI,
88}
89
90impl<'a> BorrowedColumns<'a> {
91 pub unsafe fn from_ffi(ptr: *const ColumnsFFI) -> Self {
96 debug_assert!(!ptr.is_null(), "BorrowedColumns::from_ffi: null pointer");
97 Self {
98 ffi: unsafe { &*ptr },
99 }
100 }
101
102 pub fn row_count(&self) -> usize {
103 self.ffi.row_count
104 }
105
106 pub fn column_count(&self) -> usize {
107 self.ffi.column_count
108 }
109
110 pub fn is_empty(&self) -> bool {
111 self.ffi.row_count == 0 && self.ffi.column_count == 0
112 }
113
114 pub fn row_numbers(&self) -> &'a [u64] {
115 if self.ffi.row_numbers.is_null() || self.ffi.row_count == 0 {
116 &[]
117 } else {
118 unsafe { slice::from_raw_parts(self.ffi.row_numbers, self.ffi.row_count) }
119 }
120 }
121
122 pub fn created_at(&self) -> &'a [u64] {
123 if self.ffi.created_at.is_null() || self.ffi.row_count == 0 {
124 &[]
125 } else {
126 unsafe { slice::from_raw_parts(self.ffi.created_at, self.ffi.row_count) }
127 }
128 }
129
130 pub fn updated_at(&self) -> &'a [u64] {
131 if self.ffi.updated_at.is_null() || self.ffi.row_count == 0 {
132 &[]
133 } else {
134 unsafe { slice::from_raw_parts(self.ffi.updated_at, self.ffi.row_count) }
135 }
136 }
137
138 pub fn columns(&self) -> impl Iterator<Item = BorrowedColumn<'a>> + 'a {
139 let count = self.ffi.column_count;
140 let base = self.ffi.columns;
141 (0..count).map(move |i| {
142 let col_ffi: &'a ColumnFFI = unsafe { &*base.add(i) };
143 BorrowedColumn {
144 ffi: col_ffi,
145 }
146 })
147 }
148
149 pub fn column(&self, name: &str) -> Option<BorrowedColumn<'a>> {
150 self.columns().find(|c| c.name() == name)
151 }
152}
153
154#[derive(Clone, Copy)]
155pub struct BorrowedColumn<'a> {
156 ffi: &'a ColumnFFI,
157}
158
159impl<'a> BorrowedColumn<'a> {
160 pub fn name(&self) -> &'a str {
161 read_buffer_str(&self.ffi.name)
162 }
163
164 pub fn type_code(&self) -> ColumnTypeCode {
165 self.ffi.data.type_code
166 }
167
168 pub fn row_count(&self) -> usize {
169 self.ffi.data.row_count
170 }
171
172 pub fn data_bytes(&self) -> &'a [u8] {
173 read_buffer(&self.ffi.data.data)
174 }
175
176 pub fn offsets(&self) -> &'a [u64] {
177 let buf = &self.ffi.data.offsets;
178 if buf.ptr.is_null() || buf.len == 0 {
179 &[]
180 } else {
181 let count = buf.len / core::mem::size_of::<u64>();
182 unsafe { slice::from_raw_parts(buf.ptr as *const u64, count) }
183 }
184 }
185
186 pub fn defined_bitvec(&self) -> &'a [u8] {
187 read_buffer(&self.ffi.data.defined_bitvec)
188 }
189
190 pub unsafe fn as_slice<T: Copy>(&self) -> Option<&'a [T]> {
195 let bytes = self.data_bytes();
196 let count = self.row_count();
197 let elem = core::mem::size_of::<T>();
198 if elem == 0 || count.checked_mul(elem)? != bytes.len() {
199 return None;
200 }
201 Some(unsafe { slice::from_raw_parts(bytes.as_ptr() as *const T, count) })
202 }
203
204 pub fn iter_str(&self) -> impl Iterator<Item = &'a str> + 'a {
205 let data = self.data_bytes();
206 let offsets = self.offsets();
207 let row_count = self.row_count();
208 let offsets_len = offsets.len();
209 (0..row_count).map(move |i| {
210 if i + 1 >= offsets_len {
211 return "";
212 }
213 let start = offsets[i] as usize;
214 let end = offsets[i + 1] as usize;
215 if end > data.len() {
216 return "";
217 }
218 str::from_utf8(&data[start..end]).unwrap_or("")
219 })
220 }
221
222 pub fn iter_bytes(&self) -> impl Iterator<Item = &'a [u8]> + 'a {
223 let data = self.data_bytes();
224 let offsets = self.offsets();
225 let row_count = self.row_count();
226 let offsets_len = offsets.len();
227 (0..row_count).map(move |i| {
228 if i + 1 >= offsets_len {
229 return &[][..];
230 }
231 let start = offsets[i] as usize;
232 let end = offsets[i + 1] as usize;
233 if end > data.len() {
234 return &[][..];
235 }
236 &data[start..end]
237 })
238 }
239}
240
241fn read_buffer<'a>(buf: &BufferFFI) -> &'a [u8] {
242 if buf.ptr.is_null() || buf.len == 0 {
243 &[]
244 } else {
245 unsafe { slice::from_raw_parts(buf.ptr, buf.len) }
246 }
247}
248
249fn read_buffer_str<'a>(buf: &BufferFFI) -> &'a str {
250 let bytes: &'a [u8] = read_buffer(buf);
251 str::from_utf8(bytes).unwrap_or("")
252}
253
254pub type DiffKind = reifydb_abi::flow::diff::DiffType;