egglog_core_relations/row_buffer/
mod.rs1use core::slice;
4use std::{cell::Cell, mem, ops::Deref};
5
6use crate::numeric_id::NumericId;
7use egglog_concurrency::{ParallelVecWriter, parallel_writer::write_cell_slice};
8use rayon::iter::ParallelIterator;
9use smallvec::SmallVec;
10
11use crate::{
12 common::Value,
13 offsets::RowId,
14 pool::{Pooled, with_pool_set},
15};
16
17#[cfg(test)]
18mod tests;
19
20pub struct RowBuffer {
26 n_columns: usize,
27 total_rows: usize,
28 data: Pooled<Vec<Cell<Value>>>,
29}
30
31unsafe impl Send for RowBuffer {}
43unsafe impl Sync for RowBuffer {}
44
45impl Clone for RowBuffer {
46 fn clone(&self) -> Self {
47 RowBuffer {
48 n_columns: self.n_columns,
49 total_rows: self.total_rows,
50 data: Pooled::cloned(&self.data),
51 }
52 }
53}
54
55impl RowBuffer {
56 pub(crate) fn new(n_columns: usize) -> RowBuffer {
58 assert_ne!(
59 n_columns, 0,
60 "attempting to create a row batch with no columns"
61 );
62 RowBuffer {
63 n_columns,
64 total_rows: 0,
65 data: with_pool_set(|ps| ps.get()),
66 }
67 }
68
69 pub(crate) fn parallel_writer(&mut self) -> ParallelRowBufWriter {
70 let data = mem::take(&mut self.data);
71 ParallelRowBufWriter {
72 buf: RowBuffer {
73 n_columns: self.n_columns,
74 total_rows: self.total_rows,
75 data: Default::default(),
76 },
77 vec: ParallelVecWriter::new(Pooled::into_inner(data)),
78 }
79 }
80
81 pub(crate) fn reserve(&mut self, additional: usize) {
83 self.data.reserve(additional * self.n_columns);
84 }
85
86 pub(crate) fn arity(&self) -> usize {
88 self.n_columns
89 }
90
91 pub(crate) fn raw_rows(&self) -> *const Value {
92 self.data.as_ptr() as *const Value
93 }
94
95 pub(crate) unsafe fn set_len(&mut self, count: usize) {
101 unsafe {
102 self.data.set_len(count * self.n_columns);
103 }
104 self.total_rows = count;
105 }
106
107 pub(crate) fn non_stale(&self) -> impl Iterator<Item = &[Value]> {
109 self.data
110 .chunks(self.n_columns)
111 .filter(|row| !row[0].get().is_stale())
112 .map(|row| unsafe { mem::transmute::<&[Cell<Value>], &[Value]>(row) })
121 }
122
123 pub(crate) fn non_stale_mut(&mut self) -> impl Iterator<Item = &mut [Value]> {
124 self.data
125 .chunks_mut(self.n_columns)
126 .filter(|row| !row[0].get().is_stale())
127 .map(|row| unsafe { mem::transmute::<&mut [Cell<Value>], &mut [Value]>(row) })
136 }
137
138 pub(crate) fn parallel_iter(&self) -> impl ParallelIterator<Item = &[Value]> {
140 use rayon::prelude::*;
141 unsafe { mem::transmute::<&[Cell<Value>], &[Value]>(&self.data) }.par_chunks(self.n_columns)
150 }
151
152 pub(crate) fn iter(&self) -> impl Iterator<Item = &[Value]> {
154 self.data
155 .chunks(self.n_columns)
156 .map(|row| unsafe { mem::transmute::<&[Cell<Value>], &[Value]>(row) })
158 }
159
160 pub(crate) fn clear(&mut self) {
162 self.data.clear();
163 self.total_rows = 0;
164 }
165
166 pub(crate) fn len(&self) -> usize {
168 self.total_rows
169 }
170
171 pub(crate) unsafe fn set_stale_shared(&self, row: RowId) -> bool {
184 let cells = &self.data[row.index() * self.n_columns..(row.index() + 1) * self.n_columns];
185 let was_stale = cells[0].get().is_stale();
186 cells[0].set(Value::stale());
187 was_stale
188 }
189
190 pub(crate) fn get_row(&self, row: RowId) -> &[Value] {
195 unsafe { get_row(&self.data, self.n_columns, row) }
197 }
198
199 pub(crate) unsafe fn get_row_unchecked(&self, row: RowId) -> &[Value] {
201 unsafe {
202 slice::from_raw_parts(
203 self.data.as_ptr().add(row.index() * self.n_columns) as *const Value,
204 self.n_columns,
205 )
206 }
207 }
208
209 pub(crate) fn get_row_mut(&mut self, row: RowId) -> &mut [Value] {
214 unsafe {
216 mem::transmute::<&mut [Cell<Value>], &mut [Value]>(
217 &mut self.data[row.index() * self.n_columns..(row.index() + 1) * self.n_columns],
218 )
219 }
220 }
221
222 pub(crate) fn set_stale(&mut self, row: RowId) -> bool {
228 let row = self.get_row_mut(row);
229 let res = row[0].is_stale();
230 row[0].set_stale();
231 res
232 }
233
234 pub(crate) fn add_row(&mut self, row: &[Value]) -> RowId {
240 assert_eq!(
241 row.len(),
242 self.n_columns,
243 "attempting to add a row with mismatched arity to table"
244 );
245 if self.total_rows == 0 {
246 Pooled::refresh(&mut self.data);
247 }
248 let res = RowId::from_usize(self.total_rows);
249 self.data.extend(row.iter().copied().map(Cell::new));
250 self.total_rows += 1;
251 res
252 }
253
254 pub(crate) fn remove_stale(&mut self, mut remap: impl FnMut(&[Value], RowId, RowId)) {
258 let mut within_row = 0;
259 let mut row_in = 0;
260 let mut row_out = 0;
261 let mut keep_row = true;
262 let mut scratch = SmallVec::<[Value; 8]>::new();
263 self.data.retain(|entry| {
264 if within_row == 0 {
265 keep_row = !entry.get().is_stale();
266 if keep_row {
267 scratch.push(entry.get());
268 row_out += 1;
269 }
270 row_in += 1;
271 } else if keep_row {
272 scratch.push(entry.get());
273 }
274 within_row += 1;
275 if within_row == self.n_columns {
276 within_row = 0;
277 if keep_row {
278 remap(&scratch, RowId::new(row_in - 1), RowId::new(row_out - 1));
279 scratch.clear();
280 }
281 }
282 keep_row
283 });
284 self.total_rows = row_out as usize;
285 }
286}
287
288pub struct TaggedRowBuffer {
292 inner: RowBuffer,
293}
294
295impl TaggedRowBuffer {
296 pub fn new(n_columns: usize) -> TaggedRowBuffer {
298 TaggedRowBuffer {
299 inner: RowBuffer::new(n_columns + 1),
300 }
301 }
302
303 pub fn clear(&mut self) {
305 self.inner.clear()
306 }
307
308 pub fn is_empty(&self) -> bool {
310 self.inner.len() == 0
311 }
312
313 pub fn len(&self) -> usize {
315 self.inner.len()
316 }
317
318 fn base_arity(&self) -> usize {
319 self.inner.n_columns - 1
320 }
321
322 pub fn add_row(&mut self, row_id: RowId, row: &[Value]) -> RowId {
325 assert_eq!(
330 row.len(),
331 self.base_arity(),
332 "attempting to add a row with mismatched arity to table"
333 );
334 if self.inner.total_rows == 0 {
335 Pooled::refresh(&mut self.inner.data);
336 }
337 let res = RowId::from_usize(self.inner.total_rows);
338 self.inner.data.extend(row.iter().copied().map(Cell::new));
339 self.inner.data.push(Cell::new(Value::new(row_id.rep())));
340 self.inner.total_rows += 1;
341 res
342 }
343
344 pub fn get_row(&self, row: RowId) -> (RowId, &[Value]) {
347 self.unwrap_row(self.inner.get_row(row))
348 }
349
350 pub fn get_row_mut(&mut self, row: RowId) -> (RowId, &mut [Value]) {
351 let base_arity = self.base_arity();
352 let row = self.inner.get_row_mut(row);
353 let row_id = row[base_arity];
354 let row = &mut row[..base_arity];
355 (RowId::new(row_id.rep()), row)
356 }
357
358 pub fn iter(&self) -> impl Iterator<Item = (RowId, &[Value])> {
360 self.inner.iter().map(|row| self.unwrap_row(row))
361 }
362
363 pub fn par_iter(&self) -> impl ParallelIterator<Item = (RowId, &[Value])> {
365 self.inner.parallel_iter().map(|row| self.unwrap_row(row))
366 }
367
368 pub fn non_stale(&self) -> impl Iterator<Item = (RowId, &[Value])> {
370 self.inner.non_stale().map(|row| self.unwrap_row(row))
371 }
372
373 pub fn non_stale_mut(&mut self) -> impl Iterator<Item = (RowId, &mut [Value])> {
375 let base_arity = self.base_arity();
376 self.inner
377 .non_stale_mut()
378 .map(move |row| Self::unwrap_row_mut(base_arity, row))
379 }
380
381 pub fn set_stale(&mut self, row: RowId) -> bool {
382 self.inner.set_stale(row)
383 }
384
385 fn unwrap_row<'a>(&self, row: &'a [Value]) -> (RowId, &'a [Value]) {
386 let row_id = row[self.base_arity()];
387 let row = &row[..self.base_arity()];
388 (RowId::new(row_id.rep()), row)
389 }
390 fn unwrap_row_mut(base_arity: usize, row: &mut [Value]) -> (RowId, &mut [Value]) {
391 let row_id = row[base_arity];
392 let row = &mut row[..base_arity];
393 (RowId::new(row_id.rep()), row)
394 }
395}
396
397unsafe fn get_row(data: &[Cell<Value>], n_columns: usize, row: RowId) -> &[Value] {
401 unsafe {
402 mem::transmute::<&[Cell<Value>], &[Value]>(
403 &data[row.index() * n_columns..(row.index() + 1) * n_columns],
404 )
405 }
406}
407
408pub(crate) struct ParallelRowBufWriter {
415 buf: RowBuffer,
416 vec: ParallelVecWriter<Cell<Value>>,
417}
418
419impl ParallelRowBufWriter {
420 pub(crate) fn read_handle(&self) -> ReadHandle<'_, impl Deref<Target = [Cell<Value>]> + '_> {
421 ReadHandle {
422 buf: &self.buf,
423 data: self.vec.read_access(),
424 }
425 }
426
427 pub(crate) fn append_contents(&self, rows: &RowBuffer) -> RowId {
428 assert_eq!(rows.n_columns, self.buf.n_columns);
429 let start_off = write_cell_slice(&self.vec, rows.data.as_slice());
430 debug_assert_eq!(start_off % self.buf.n_columns, 0);
431 RowId::from_usize(start_off / self.buf.n_columns)
432 }
433
434 pub(crate) fn finish(mut self) -> RowBuffer {
435 self.buf.data = Pooled::new(self.vec.finish());
436 self.buf.total_rows = self.buf.data.len() / self.buf.n_columns;
437 self.buf
438 }
439}
440
441pub(crate) struct ReadHandle<'a, T> {
443 buf: &'a RowBuffer,
444 data: T,
445}
446
447impl<T: Deref<Target = [Cell<Value>]>> ReadHandle<'_, T> {
448 pub(crate) unsafe fn get_row_unchecked(&self, row: RowId) -> &[Value] {
456 unsafe {
459 std::slice::from_raw_parts(
460 self.data.as_ptr().add(row.index() * self.buf.n_columns) as *const Value,
461 self.buf.n_columns,
462 )
463 }
464 }
465
466 pub(crate) unsafe fn set_stale_shared(&self, row: RowId) -> bool {
472 let cells: &[Cell<Value>] = &self.data;
473 let cell_ptr: *const Cell<Value> = cells.as_ptr();
474 let to_set: &Cell<Value> = unsafe { &*cell_ptr.add(row.index() * self.buf.n_columns) };
475 let was_stale = to_set.get().is_stale();
476 to_set.set(Value::stale());
477 was_stale
478 }
479}