1use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::sync::Arc;
9use std::sync::LazyLock;
10
11use fsst::Compressor;
12use fsst::Decompressor;
13use fsst::Symbol;
14use prost::Message as _;
15use vortex_array::Array;
16use vortex_array::ArrayEq;
17use vortex_array::ArrayHash;
18use vortex_array::ArrayId;
19use vortex_array::ArrayParts;
20use vortex_array::ArrayRef;
21use vortex_array::ArraySlots;
22use vortex_array::ArrayView;
23use vortex_array::Canonical;
24use vortex_array::ExecutionCtx;
25use vortex_array::ExecutionResult;
26use vortex_array::IntoArray;
27use vortex_array::LEGACY_SESSION;
28use vortex_array::Precision;
29use vortex_array::TypedArrayRef;
30use vortex_array::VortexSessionExecute;
31use vortex_array::arrays::VarBin;
32use vortex_array::arrays::VarBinArray;
33use vortex_array::arrays::varbin::VarBinArrayExt;
34use vortex_array::buffer::BufferHandle;
35use vortex_array::builders::ArrayBuilder;
36use vortex_array::builders::VarBinViewBuilder;
37use vortex_array::dtype::DType;
38use vortex_array::dtype::Nullability;
39use vortex_array::dtype::PType;
40use vortex_array::serde::ArrayChildren;
41use vortex_array::smallvec::smallvec;
42use vortex_array::validity::Validity;
43use vortex_array::vtable::VTable;
44use vortex_array::vtable::ValidityVTable;
45use vortex_array::vtable::child_to_validity;
46use vortex_array::vtable::validity_to_child;
47use vortex_buffer::Buffer;
48use vortex_buffer::ByteBuffer;
49use vortex_error::VortexExpect;
50use vortex_error::VortexResult;
51use vortex_error::vortex_bail;
52use vortex_error::vortex_ensure;
53use vortex_error::vortex_err;
54use vortex_error::vortex_panic;
55use vortex_session::VortexSession;
56use vortex_session::registry::CachedId;
57
58use crate::canonical::canonicalize_fsst;
59use crate::canonical::fsst_decode_views;
60use crate::kernel::PARENT_KERNELS;
61use crate::rules::RULES;
62
63pub type FSSTArray = Array<FSST>;
65
66#[derive(Clone, prost::Message)]
67pub struct FSSTMetadata {
68 #[prost(enumeration = "PType", tag = "1")]
69 uncompressed_lengths_ptype: i32,
70
71 #[prost(enumeration = "PType", tag = "2")]
72 codes_offsets_ptype: i32,
73}
74
75impl FSSTMetadata {
76 pub fn get_uncompressed_lengths_ptype(&self) -> VortexResult<PType> {
77 PType::try_from(self.uncompressed_lengths_ptype)
78 .map_err(|_| vortex_err!("Invalid PType {}", self.uncompressed_lengths_ptype))
79 }
80}
81
82impl ArrayHash for FSSTData {
83 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
84 self.symbols.array_hash(state, precision);
85 self.symbol_lengths.array_hash(state, precision);
86 self.codes_bytes.as_host().array_hash(state, precision);
87 }
88}
89
90impl ArrayEq for FSSTData {
91 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
92 self.symbols.array_eq(&other.symbols, precision)
93 && self
94 .symbol_lengths
95 .array_eq(&other.symbol_lengths, precision)
96 && self
97 .codes_bytes
98 .as_host()
99 .array_eq(other.codes_bytes.as_host(), precision)
100 }
101}
102
103impl VTable for FSST {
104 type TypedArrayData = FSSTData;
105 type OperationsVTable = Self;
106 type ValidityVTable = Self;
107
108 fn id(&self) -> ArrayId {
109 static ID: CachedId = CachedId::new("vortex.fsst");
110 *ID
111 }
112
113 fn validate(
114 &self,
115 data: &Self::TypedArrayData,
116 dtype: &DType,
117 len: usize,
118 slots: &[Option<ArrayRef>],
119 ) -> VortexResult<()> {
120 let mut ctx = LEGACY_SESSION.create_execution_ctx();
122 data.validate(dtype, len, slots, &mut ctx)
123 }
124
125 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
126 3
127 }
128
129 fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
130 match idx {
131 0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()),
132 1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
133 2 => array.codes_bytes_handle().clone(),
134 _ => vortex_panic!("FSSTArray buffer index {idx} out of bounds"),
135 }
136 }
137
138 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
139 match idx {
140 0 => Some("symbols".to_string()),
141 1 => Some("symbol_lengths".to_string()),
142 2 => Some("compressed_codes".to_string()),
143 _ => vortex_panic!("FSSTArray buffer_name index {idx} out of bounds"),
144 }
145 }
146
147 fn serialize(
148 array: ArrayView<'_, Self>,
149 _session: &VortexSession,
150 ) -> VortexResult<Option<Vec<u8>>> {
151 let codes_offsets = array.as_ref().slots()[CODES_OFFSETS_SLOT]
152 .as_ref()
153 .vortex_expect("FSSTArray codes_offsets slot");
154 Ok(Some(
155 FSSTMetadata {
156 uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
157 codes_offsets_ptype: codes_offsets.dtype().as_ptype().into(),
158 }
159 .encode_to_vec(),
160 ))
161 }
162
163 fn deserialize(
188 &self,
189 dtype: &DType,
190 len: usize,
191 metadata: &[u8],
192 buffers: &[BufferHandle],
193 children: &dyn ArrayChildren,
194 session: &VortexSession,
195 ) -> VortexResult<ArrayParts<Self>> {
196 let metadata = FSSTMetadata::decode(metadata)?;
197 let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
198 let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);
199
200 let mut ctx = session.create_execution_ctx();
201 if buffers.len() == 2 {
202 return Self::deserialize_legacy(
203 self,
204 dtype,
205 len,
206 &metadata,
207 &symbols,
208 &symbol_lengths,
209 children,
210 &mut ctx,
211 );
212 }
213
214 if buffers.len() == 3 {
215 let uncompressed_lengths = children.get(
216 0,
217 &DType::Primitive(
218 metadata.get_uncompressed_lengths_ptype()?,
219 Nullability::NonNullable,
220 ),
221 len,
222 )?;
223
224 let codes_bytes = buffers[2].clone();
225 let codes_offsets = children.get(
226 1,
227 &DType::Primitive(
228 PType::try_from(metadata.codes_offsets_ptype)?,
229 Nullability::NonNullable,
230 ),
231 len + 1,
233 )?;
234
235 let codes_validity = if children.len() == 2 {
236 Validity::from(dtype.nullability())
237 } else if children.len() == 3 {
238 let validity = children.get(2, &Validity::DTYPE, len)?;
239 Validity::Array(validity)
240 } else {
241 vortex_bail!("Expected 2 or 3 children, got {}", children.len());
242 };
243
244 FSSTData::validate_parts(
245 &symbols,
246 &symbol_lengths,
247 &codes_bytes,
248 &codes_offsets,
249 dtype.nullability(),
250 &uncompressed_lengths,
251 dtype,
252 len,
253 &mut ctx,
254 )?;
255 let slots = smallvec![
256 Some(uncompressed_lengths),
257 Some(codes_offsets),
258 validity_to_child(&codes_validity, len),
259 ];
260 let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
261 return Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots));
262 }
263
264 vortex_bail!(
265 "InvalidArgument: Expected 2 or 3 buffers, got {}",
266 buffers.len()
267 );
268 }
269
270 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
271 SLOT_NAMES[idx].to_string()
272 }
273
274 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
275 canonicalize_fsst(array.as_view(), ctx).map(ExecutionResult::done)
276 }
277
278 fn append_to_builder(
279 array: ArrayView<'_, Self>,
280 builder: &mut dyn ArrayBuilder,
281 ctx: &mut ExecutionCtx,
282 ) -> VortexResult<()> {
283 let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
284 builder.extend_from_array(
285 &array
286 .array()
287 .clone()
288 .execute::<Canonical>(ctx)?
289 .into_array(),
290 );
291 return Ok(());
292 };
293
294 let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress());
298 let (buffers, views) = fsst_decode_views(array, next_buffer_index, ctx)?;
299
300 builder.push_buffer_and_adjusted_views(
301 &buffers,
302 &views,
303 array
304 .array()
305 .validity()?
306 .execute_mask(array.array().len(), ctx)?,
307 );
308 Ok(())
309 }
310
311 fn execute_parent(
312 array: ArrayView<'_, Self>,
313 parent: &ArrayRef,
314 child_idx: usize,
315 ctx: &mut ExecutionCtx,
316 ) -> VortexResult<Option<ArrayRef>> {
317 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
318 }
319
320 fn reduce_parent(
321 array: ArrayView<'_, Self>,
322 parent: &ArrayRef,
323 child_idx: usize,
324 ) -> VortexResult<Option<ArrayRef>> {
325 RULES.evaluate(array, parent, child_idx)
326 }
327}
328
329pub(crate) const UNCOMPRESSED_LENGTHS_SLOT: usize = 0;
331pub(crate) const CODES_OFFSETS_SLOT: usize = 1;
333pub(crate) const CODES_VALIDITY_SLOT: usize = 2;
335pub(crate) const NUM_SLOTS: usize = 3;
336pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
337 ["uncompressed_lengths", "codes_offsets", "codes_validity"];
338
339#[derive(Clone)]
348pub struct FSSTData {
349 symbols: Buffer<Symbol>,
350 symbol_lengths: Buffer<u8>,
351 codes_bytes: BufferHandle,
353 len: usize,
355
356 compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
358}
359
360impl Display for FSSTData {
361 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
362 write!(f, "len: {}, nsymbols: {}", self.len, self.symbols.len())
363 }
364}
365
366impl Debug for FSSTData {
367 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
368 f.debug_struct("FSSTArray")
369 .field("symbols", &self.symbols)
370 .field("symbol_lengths", &self.symbol_lengths)
371 .field("codes_bytes_len", &self.codes_bytes.len())
372 .field("len", &self.len)
373 .field("uncompressed_lengths", &"<outer slot>")
374 .field("codes_offsets", &"<outer slot>")
375 .field("codes_validity", &"<outer slot>")
376 .finish()
377 }
378}
379
380#[derive(Clone, Debug)]
381pub struct FSST;
382
383impl FSST {
384 pub fn try_new(
390 dtype: DType,
391 symbols: Buffer<Symbol>,
392 symbol_lengths: Buffer<u8>,
393 codes: VarBinArray,
394 uncompressed_lengths: ArrayRef,
395 ctx: &mut ExecutionCtx,
396 ) -> VortexResult<FSSTArray> {
397 let len = codes.len();
398 FSSTData::validate_parts_from_codes(
399 &symbols,
400 &symbol_lengths,
401 &codes,
402 &uncompressed_lengths,
403 &dtype,
404 len,
405 ctx,
406 )?;
407 let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
408 let codes_bytes = codes.bytes_handle().clone();
409 let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?;
410 Ok(unsafe {
411 Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
412 })
413 }
414
415 #[allow(clippy::too_many_arguments)]
419 fn deserialize_legacy(
420 &self,
421 dtype: &DType,
422 len: usize,
423 metadata: &FSSTMetadata,
424 symbols: &Buffer<Symbol>,
425 symbol_lengths: &Buffer<u8>,
426 children: &dyn ArrayChildren,
427 ctx: &mut ExecutionCtx,
428 ) -> VortexResult<ArrayParts<Self>> {
429 if children.len() != 2 {
430 vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
431 }
432 let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
433 let codes: VarBinArray = codes
434 .as_opt::<VarBin>()
435 .ok_or_else(|| {
436 vortex_err!(
437 "Expected VarBinArray for codes, got {}",
438 codes.encoding_id()
439 )
440 })?
441 .into_owned();
442 let uncompressed_lengths = children.get(
443 1,
444 &DType::Primitive(
445 metadata.get_uncompressed_lengths_ptype()?,
446 Nullability::NonNullable,
447 ),
448 len,
449 )?;
450
451 FSSTData::validate_parts_from_codes(
452 symbols,
453 symbol_lengths,
454 &codes,
455 &uncompressed_lengths,
456 dtype,
457 len,
458 ctx,
459 )?;
460 let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
461 let codes_bytes = codes.bytes_handle().clone();
462 let data = FSSTData::try_new(symbols.clone(), symbol_lengths.clone(), codes_bytes, len)?;
463 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
464 }
465
466 pub(crate) unsafe fn new_unchecked(
467 dtype: DType,
468 symbols: Buffer<Symbol>,
469 symbol_lengths: Buffer<u8>,
470 codes: VarBinArray,
471 uncompressed_lengths: ArrayRef,
472 ) -> FSSTArray {
473 let len = codes.len();
474 let slots = FSSTData::make_slots(&codes, &uncompressed_lengths);
475 let codes_bytes = codes.bytes_handle().clone();
476 let data = unsafe { FSSTData::new_unchecked(symbols, symbol_lengths, codes_bytes, len) };
477 unsafe {
478 Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
479 }
480 }
481}
482
483impl FSSTData {
484 fn make_slots(codes: &VarBinArray, uncompressed_lengths: &ArrayRef) -> ArraySlots {
485 smallvec![
486 Some(uncompressed_lengths.clone()),
487 Some(codes.offsets().clone()),
488 validity_to_child(
489 &codes
490 .validity()
491 .vortex_expect("FSST codes validity should be derivable"),
492 codes.len(),
493 ),
494 ]
495 }
496
497 pub fn try_new(
510 symbols: Buffer<Symbol>,
511 symbol_lengths: Buffer<u8>,
512 codes_bytes: BufferHandle,
513 len: usize,
514 ) -> VortexResult<Self> {
515 unsafe {
517 Ok(Self::new_unchecked(
518 symbols,
519 symbol_lengths,
520 codes_bytes,
521 len,
522 ))
523 }
524 }
525
526 pub fn validate(
527 &self,
528 dtype: &DType,
529 len: usize,
530 slots: &[Option<ArrayRef>],
531 ctx: &mut ExecutionCtx,
532 ) -> VortexResult<()> {
533 let codes_offsets = slots[CODES_OFFSETS_SLOT]
534 .as_ref()
535 .vortex_expect("FSSTArray codes_offsets slot");
536 Self::validate_parts(
537 &self.symbols,
538 &self.symbol_lengths,
539 &self.codes_bytes,
540 codes_offsets,
541 dtype.nullability(),
542 uncompressed_lengths_from_slots(slots),
543 dtype,
544 len,
545 ctx,
546 )
547 }
548
549 #[expect(clippy::too_many_arguments)]
551 fn validate_parts(
552 symbols: &Buffer<Symbol>,
553 symbol_lengths: &Buffer<u8>,
554 codes_bytes: &BufferHandle,
555 codes_offsets: &ArrayRef,
556 codes_nullability: Nullability,
557 uncompressed_lengths: &ArrayRef,
558 dtype: &DType,
559 len: usize,
560 ctx: &mut ExecutionCtx,
561 ) -> VortexResult<()> {
562 vortex_ensure!(
563 matches!(dtype, DType::Binary(_) | DType::Utf8(_)),
564 "FSST arrays must be Binary or Utf8, found {dtype}"
565 );
566
567 if symbols.len() > 255 {
568 vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
569 }
570 if symbols.len() != symbol_lengths.len() {
571 vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
572 }
573
574 let codes_len = codes_offsets.len().saturating_sub(1);
576 if codes_len != len {
577 vortex_bail!(InvalidArgument: "codes must have same len as outer array");
578 }
579
580 if uncompressed_lengths.len() != len {
581 vortex_bail!(InvalidArgument: "uncompressed_lengths must be same len as codes");
582 }
583
584 if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() {
585 vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
586 }
587
588 if !codes_offsets.dtype().is_int() || codes_offsets.dtype().is_nullable() {
590 vortex_bail!(InvalidArgument: "codes offsets must be non-nullable integer type, found {}", codes_offsets.dtype());
591 }
592
593 if codes_nullability != dtype.nullability() {
594 vortex_bail!(InvalidArgument: "codes nullability must match outer dtype nullability");
595 }
596
597 if codes_bytes.is_on_host() && codes_offsets.is_host() && !codes_offsets.is_empty() {
599 let last_offset: usize = (&codes_offsets
600 .execute_scalar(codes_offsets.len() - 1, ctx)
601 .vortex_expect("offsets must support scalar_at"))
602 .try_into()
603 .vortex_expect("Failed to convert offset to usize");
604 vortex_ensure!(
605 last_offset <= codes_bytes.len(),
606 InvalidArgument: "Last codes offset {} exceeds codes bytes length {}",
607 last_offset,
608 codes_bytes.len()
609 );
610 }
611
612 Ok(())
613 }
614
615 fn validate_parts_from_codes(
617 symbols: &Buffer<Symbol>,
618 symbol_lengths: &Buffer<u8>,
619 codes: &VarBinArray,
620 uncompressed_lengths: &ArrayRef,
621 dtype: &DType,
622 len: usize,
623 ctx: &mut ExecutionCtx,
624 ) -> VortexResult<()> {
625 Self::validate_parts(
626 symbols,
627 symbol_lengths,
628 codes.bytes_handle(),
629 codes.offsets(),
630 codes.dtype().nullability(),
631 uncompressed_lengths,
632 dtype,
633 len,
634 ctx,
635 )
636 }
637
638 pub(crate) unsafe fn new_unchecked(
639 symbols: Buffer<Symbol>,
640 symbol_lengths: Buffer<u8>,
641 codes_bytes: BufferHandle,
642 len: usize,
643 ) -> Self {
644 let symbols2 = symbols.clone();
645 let symbol_lengths2 = symbol_lengths.clone();
646 let compressor = Arc::new(LazyLock::new(Box::new(move || {
647 Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
648 })
649 as Box<dyn Fn() -> Compressor + Send>));
650 Self {
651 symbols,
652 symbol_lengths,
653 codes_bytes,
654 len,
655 compressor,
656 }
657 }
658
659 pub fn len(&self) -> usize {
661 self.len
662 }
663
664 pub fn is_empty(&self) -> bool {
666 self.len == 0
667 }
668
669 pub fn symbols(&self) -> &Buffer<Symbol> {
671 &self.symbols
672 }
673
674 pub fn symbol_lengths(&self) -> &Buffer<u8> {
676 &self.symbol_lengths
677 }
678
679 pub fn codes_bytes_handle(&self) -> &BufferHandle {
681 &self.codes_bytes
682 }
683
684 pub fn codes_bytes(&self) -> &ByteBuffer {
686 self.codes_bytes.as_host()
687 }
688
689 pub fn decompressor(&self) -> Decompressor<'_> {
692 Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
693 }
694
695 pub fn compressor(&self) -> &Compressor {
697 self.compressor.as_ref()
698 }
699}
700
701fn uncompressed_lengths_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
702 slots[UNCOMPRESSED_LENGTHS_SLOT]
703 .as_ref()
704 .vortex_expect("FSSTArray uncompressed_lengths slot")
705}
706
707pub trait FSSTArrayExt: TypedArrayRef<FSST> {
708 fn uncompressed_lengths(&self) -> &ArrayRef {
709 uncompressed_lengths_from_slots(self.as_ref().slots())
710 }
711
712 fn uncompressed_lengths_dtype(&self) -> &DType {
713 self.uncompressed_lengths().dtype()
714 }
715
716 fn codes(&self) -> VarBinArray {
719 let offsets = self.as_ref().slots()[CODES_OFFSETS_SLOT]
720 .as_ref()
721 .vortex_expect("FSSTArray codes_offsets slot")
722 .clone();
723 let validity = child_to_validity(
724 self.as_ref().slots()[CODES_VALIDITY_SLOT].as_ref(),
725 self.as_ref().dtype().nullability(),
726 );
727 let codes_bytes = self.codes_bytes_handle().clone();
728 unsafe {
730 VarBinArray::new_unchecked_from_handle(
731 offsets,
732 codes_bytes,
733 DType::Binary(self.as_ref().dtype().nullability()),
734 validity,
735 )
736 }
737 }
738
739 fn codes_dtype(&self) -> DType {
741 DType::Binary(self.as_ref().dtype().nullability())
742 }
743}
744
745impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}
746
747impl ValidityVTable<FSST> for FSST {
748 fn validity(array: ArrayView<'_, FSST>) -> VortexResult<Validity> {
749 Ok(child_to_validity(
750 array.slots()[CODES_VALIDITY_SLOT].as_ref(),
751 array.dtype().nullability(),
752 ))
753 }
754}
755
756#[cfg(test)]
757mod test {
758 use fsst::Compressor;
759 use fsst::Symbol;
760 use prost::Message;
761 use vortex_array::ArrayPlugin;
762 use vortex_array::IntoArray;
763 use vortex_array::LEGACY_SESSION;
764 use vortex_array::VortexSessionExecute;
765 use vortex_array::accessor::ArrayAccessor;
766 use vortex_array::arrays::VarBinViewArray;
767 use vortex_array::buffer::BufferHandle;
768 use vortex_array::dtype::DType;
769 use vortex_array::dtype::Nullability;
770 use vortex_array::dtype::PType;
771 use vortex_array::test_harness::check_metadata;
772 use vortex_buffer::Buffer;
773 use vortex_error::VortexError;
774
775 use crate::FSST;
776 use crate::array::FSSTArrayExt;
777 use crate::array::FSSTMetadata;
778 use crate::fsst_compress_iter;
779
780 #[cfg_attr(miri, ignore)]
781 #[test]
782 fn test_fsst_metadata() {
783 check_metadata(
784 "fsst.metadata",
785 &FSSTMetadata {
786 uncompressed_lengths_ptype: PType::U64 as i32,
787 codes_offsets_ptype: PType::I32 as i32,
788 }
789 .encode_to_vec(),
790 );
791 }
792
793 #[test]
801 fn test_back_compat() {
802 let symbols = Buffer::<Symbol>::copy_from([
803 Symbol::from_slice(b"abc00000"),
804 Symbol::from_slice(b"defghijk"),
805 ]);
806 let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);
807
808 let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
809 let mut ctx = LEGACY_SESSION.create_execution_ctx();
810 let fsst_array = fsst_compress_iter(
811 [Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
812 2,
813 DType::Utf8(Nullability::NonNullable),
814 &compressor,
815 &mut ctx,
816 );
817
818 let compressed_codes = fsst_array.codes();
819
820 let buffers = [
824 BufferHandle::new_host(symbols.into_byte_buffer()),
825 BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
826 ];
827
828 let children = vec![
832 compressed_codes.into_array(),
833 fsst_array.uncompressed_lengths().clone(),
834 ];
835
836 let fsst = ArrayPlugin::deserialize(
837 &FSST,
838 &DType::Utf8(Nullability::NonNullable),
839 2,
840 &FSSTMetadata {
841 uncompressed_lengths_ptype: fsst_array
842 .uncompressed_lengths()
843 .dtype()
844 .as_ptype()
845 .into(),
846 codes_offsets_ptype: 0,
848 }
849 .encode_to_vec(),
850 &buffers,
851 &children.as_slice(),
852 &LEGACY_SESSION,
853 )
854 .unwrap();
855
856 let decompressed = fsst
857 .execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
858 .unwrap();
859 decompressed
860 .with_iterator(|it| {
861 assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
862 assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
863 Ok::<_, VortexError>(())
864 })
865 .unwrap()
866 }
867}