vortex_vector/binaryview/
vector_mut.rs1use std::sync::Arc;
7
8use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
9use vortex_error::{VortexExpect, VortexResult, vortex_ensure};
10use vortex_mask::MaskMut;
11
12use crate::binaryview::BinaryViewType;
13use crate::binaryview::vector::BinaryViewVector;
14use crate::binaryview::view::{BinaryView, validate_views};
15use crate::{VectorMutOps, VectorOps};
16
17const BUFFER_CAPACITY: usize = 2 * 1024 * 1024;
19
20#[derive(Clone, Debug)]
24pub struct BinaryViewVectorMut<T: BinaryViewType> {
25 views: BufferMut<BinaryView>,
27 validity: MaskMut,
29
30 buffers: Vec<ByteBuffer>,
32 open_buffer: Option<ByteBufferMut>,
34
35 _marker: std::marker::PhantomData<T>,
37}
38
39impl<T: BinaryViewType> BinaryViewVectorMut<T> {
40 pub fn new(views: BufferMut<BinaryView>, buffers: Vec<ByteBuffer>, validity: MaskMut) -> Self {
47 Self::try_new(views, buffers, validity)
48 .vortex_expect("Failed to create `BinaryViewVectorMut`")
49 }
50
51 pub fn with_capacity(capacity: usize) -> Self {
55 Self::new(
56 BufferMut::with_capacity(capacity),
57 Vec::new(),
58 MaskMut::with_capacity(capacity),
59 )
60 }
61
62 pub fn try_new(
70 views: BufferMut<BinaryView>,
71 buffers: Vec<ByteBuffer>,
72 validity: MaskMut,
73 ) -> VortexResult<Self> {
74 vortex_ensure!(
75 views.len() == validity.len(),
76 "views buffer length {} != validity length {}",
77 views.len(),
78 validity.len()
79 );
80
81 validate_views(&views, &buffers, |index| validity.value(index), T::validate)?;
82
83 Ok(Self {
84 views,
85 buffers,
86 validity,
87 open_buffer: None,
88 _marker: std::marker::PhantomData,
89 })
90 }
91
92 pub unsafe fn new_unchecked(
98 views: BufferMut<BinaryView>,
99 validity: MaskMut,
100 buffers: Vec<ByteBuffer>,
101 ) -> Self {
102 if cfg!(debug_assertions) {
103 Self::new(views, buffers, validity)
104 } else {
105 Self {
106 views,
107 buffers,
108 validity,
109 open_buffer: None,
110 _marker: std::marker::PhantomData,
111 }
112 }
113 }
114
115 pub unsafe fn views_mut(&mut self) -> &mut BufferMut<BinaryView> {
122 &mut self.views
123 }
124
125 pub unsafe fn validity_mut(&mut self) -> &mut MaskMut {
132 &mut self.validity
133 }
134
135 pub fn buffers(&mut self) -> &mut Vec<ByteBuffer> {
137 &mut self.buffers
138 }
139
140 pub fn append_values(&mut self, value: &T::Slice, n: usize) {
158 let bytes = value.as_ref();
159 if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
160 self.views.push_n(BinaryView::new_inlined(bytes), n);
161 } else {
162 let buffer_index =
163 u32::try_from(self.buffers.len()).vortex_expect("buffer count exceeds u32::MAX");
164
165 let buf = self
166 .open_buffer
167 .get_or_insert_with(|| ByteBufferMut::with_capacity(BUFFER_CAPACITY));
168 let offset = u32::try_from(buf.len()).vortex_expect("buffer length exceeds u32::MAX");
169 buf.extend_from_slice(value.as_ref());
170
171 self.views
172 .push_n(BinaryView::make_view(bytes, buffer_index, offset), n);
173 }
174
175 self.validity.append_n(true, n);
176 }
177
178 pub fn append_owned_values(&mut self, value: T::Scalar, n: usize) {
182 let buffer: ByteBuffer = value.into();
183
184 if buffer.len() <= BinaryView::MAX_INLINED_SIZE {
185 self.views
186 .push_n(BinaryView::new_inlined(buffer.as_ref()), n);
187 } else {
188 self.flush_open_buffer();
189
190 let buffer_index = u32::try_from(self.buffers.len())
191 .vortex_expect("buffer count exceeds u32::MAX")
192 + 1;
193 self.views
194 .push_n(BinaryView::make_view(buffer.as_ref(), buffer_index, 0), n);
195 self.buffers.push(buffer);
196 }
197
198 self.validity.append_n(true, n);
199 }
200
201 fn flush_open_buffer(&mut self) {
202 if let Some(open) = self.open_buffer.take() {
203 self.buffers.push(open.freeze());
204 }
205 }
206}
207
208impl<T: BinaryViewType> VectorMutOps for BinaryViewVectorMut<T> {
209 type Immutable = BinaryViewVector<T>;
210
211 fn len(&self) -> usize {
212 self.views.len()
213 }
214
215 fn validity(&self) -> &MaskMut {
216 &self.validity
217 }
218
219 fn capacity(&self) -> usize {
220 self.views.capacity()
221 }
222
223 fn reserve(&mut self, additional: usize) {
224 self.views.reserve(additional);
225 self.validity.reserve(additional);
226 }
227
228 fn clear(&mut self) {
229 self.views.clear();
230 self.validity.clear();
231 self.buffers.clear();
232 self.open_buffer = None;
233 }
234
235 fn truncate(&mut self, len: usize) {
236 self.views.truncate(len);
237 self.validity.truncate(len);
238 }
239
240 fn extend_from_vector(&mut self, other: &BinaryViewVector<T>) {
241 self.flush_open_buffer();
243
244 let offset =
245 u32::try_from(self.buffers.len()).vortex_expect("buffer count exceeds u32::MAX");
246
247 self.buffers.extend(other.buffers().iter().cloned());
248
249 let new_views_iter = other.views().iter().copied().map(|mut v| {
250 if v.is_inlined() {
251 v
252 } else {
253 v.as_view_mut().buffer_index += offset;
254 v
255 }
256 });
257 self.views.extend(new_views_iter);
258
259 self.validity.append_mask(other.validity())
260 }
261
262 fn append_nulls(&mut self, n: usize) {
263 self.views.push_n(BinaryView::empty_view(), n);
264 self.validity.append_n(false, n);
265 }
266
267 fn freeze(mut self) -> BinaryViewVector<T> {
268 self.flush_open_buffer();
270
271 unsafe {
272 BinaryViewVector::new_unchecked(
273 self.views.freeze(),
274 Arc::new(self.buffers.into_boxed_slice()),
275 self.validity.freeze(),
276 )
277 }
278 }
279
280 fn split_off(&mut self, _at: usize) -> Self {
281 todo!()
282 }
283
284 fn unsplit(&mut self, other: Self) {
285 if self.is_empty() {
286 *self = other;
287 return;
288 }
289
290 todo!()
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use std::ops::Deref;
297 use std::sync::Arc;
298
299 use vortex_buffer::{ByteBuffer, buffer, buffer_mut};
300 use vortex_mask::{Mask, MaskMut};
301
302 use crate::binaryview::view::BinaryView;
303 use crate::binaryview::{StringVector, StringVectorMut};
304 use crate::{VectorMutOps, VectorOps};
305
306 #[test]
307 fn test_basic() {
308 let strings_mut = StringVectorMut::new(
309 buffer_mut![
310 BinaryView::new_inlined(b"inlined1"),
311 BinaryView::make_view(b"long string 1", 0, 0),
312 BinaryView::new_inlined(b"inlined2"),
313 BinaryView::make_view(b"long string 2", 0, 13),
314 BinaryView::new_inlined(b"inlined3"),
315 BinaryView::make_view(b"long string 3", 0, 26),
316 ],
317 vec![ByteBuffer::copy_from(
318 "long string 1long string 2long string 3",
319 )],
320 MaskMut::new_true(6),
321 );
322
323 let strings = strings_mut.freeze();
324 assert_eq!(strings.get_ref(0), Some("inlined1"));
325 assert_eq!(strings.get_ref(1), Some("long string 1"));
326 assert_eq!(strings.get_ref(2), Some("inlined2"));
327 assert_eq!(strings.get_ref(3), Some("long string 2"));
328 assert_eq!(strings.get_ref(4), Some("inlined3"));
329 assert_eq!(strings.get_ref(5), Some("long string 3"));
330 }
331
332 #[test]
333 fn test_extend_self_reference() {
334 let buf0 = ByteBuffer::copy_from(
335 b"a really very quite long string 1a really very quite long string 2",
336 );
337 let buf1 = ByteBuffer::copy_from(
338 b"a really very quite long string 3a really very quite long string 4",
339 );
340
341 let mut strings_mut = StringVectorMut::new(
342 buffer_mut![
343 BinaryView::new_inlined(b"inlined0"),
344 BinaryView::new_inlined(b"inlined1"),
345 BinaryView::make_view(b"a really very quite long string 4", 1, 33),
346 BinaryView::make_view(b"a really very quite long string 3", 1, 0),
347 BinaryView::make_view(b"a really very quite long string 2", 0, 33),
348 BinaryView::make_view(b"a really very quite long string 1", 0, 0),
349 ],
350 vec![buf0.clone(), buf1.clone()],
351 MaskMut::new_true(6),
352 );
353
354 let strings = StringVector::new(
356 buffer![BinaryView::make_view(
357 b"a really very quite long string 2",
358 0,
359 33
360 )],
361 Arc::new(Box::new([buf1.clone()])),
362 Mask::new_true(1),
363 );
364
365 strings_mut.extend_from_vector(&strings);
366
367 let strings_finished = strings_mut.freeze();
368 assert!(strings_finished.validity().all_true());
369
370 assert_eq!(strings_finished.get_ref(0).unwrap(), "inlined0");
371 assert_eq!(strings_finished.get_ref(1).unwrap(), "inlined1");
372 assert_eq!(
373 strings_finished.get_ref(2).unwrap(),
374 "a really very quite long string 4"
375 );
376 assert_eq!(
377 strings_finished.get_ref(3).unwrap(),
378 "a really very quite long string 3"
379 );
380 assert_eq!(
381 strings_finished.get_ref(4).unwrap(),
382 "a really very quite long string 2",
383 );
384 assert_eq!(
385 strings_finished.get_ref(5).unwrap(),
386 "a really very quite long string 1"
387 );
388 assert_eq!(
389 strings_finished.get_ref(6).unwrap(),
390 "a really very quite long string 4"
391 );
392
393 assert_eq!(
394 strings_finished.buffers().deref().as_ref(),
395 &[buf0, buf1.clone(), buf1]
396 );
397 }
398
399 #[test]
400 fn test_extend_nulls() {
401 let mut mask1 = MaskMut::with_capacity(4);
403 mask1.append_n(false, 2);
404 mask1.append_n(true, 2);
405
406 let mut strings_mut = StringVectorMut::new(
407 buffer_mut![
408 BinaryView::empty_view(),
409 BinaryView::empty_view(),
410 BinaryView::new_inlined(b"nonnull1"),
411 BinaryView::new_inlined(b"nonnull2"),
412 ],
413 vec![ByteBuffer::empty()],
414 mask1,
415 );
416
417 let strings = StringVector::new(
418 buffer![
419 BinaryView::new_inlined(b"extend1"),
420 BinaryView::empty_view(),
421 BinaryView::new_inlined(b"extend2"),
422 ],
423 Arc::new(Box::new([ByteBuffer::empty()])),
424 Mask::from_iter([true, false, true]),
425 );
426
427 strings_mut.extend_from_vector(&strings);
428 let strings_finished = strings_mut.freeze();
429
430 assert_eq!(strings_finished.get_ref(0), None);
431 assert_eq!(strings_finished.get_ref(1), None);
432 assert_eq!(strings_finished.get_ref(2), Some("nonnull1"));
433 assert_eq!(strings_finished.get_ref(3), Some("nonnull2"));
434 assert_eq!(strings_finished.get_ref(4), Some("extend1"));
435 assert_eq!(strings_finished.get_ref(5), None);
436 assert_eq!(strings_finished.get_ref(6), Some("extend2"));
437 }
438}