1use std::{
8 collections::BTreeSet,
9 io::Cursor,
10 string::{String, ToString},
11 sync::Arc,
12 vec::Vec,
13};
14
15use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch};
16use arrow_ipc::{reader::StreamReader, writer::StreamWriter};
17use arrow_schema::{DataType, Field, Schema};
18use oxgraph_snapshot::Snapshot;
19use zerocopy::{
20 FromBytes, Immutable, IntoBytes, KnownLayout,
21 byteorder::{LE, U64},
22};
23
24use crate::{
25 model::{
26 IdFamily, IdentityMapMode, IdentityModeRecord, IdentityModeSummary,
27 IdentitySnapshotSummary, LayerName, LayerRole, MissingPolicy, PropertyError, PropertyLayer,
28 PropertyLayerData, StorageMode, id_family_from_tag, id_family_tag, layer_role_from_tag,
29 layer_role_tag, map_arrow_error, missing_policy_tag, storage_from_tags, storage_tag,
30 validate_sparse_indices,
31 },
32 weights::{GraphPropertyLayers, HyperPropertyLayers},
33 width::{
34 PropertyIndex, PropertySnapshotMetaWord, SNAPSHOT_PROPERTY_VERSION, le_word,
35 le_word_to_u32, le_word_to_u64, le_word_to_usize,
36 },
37};
38
39pub fn validate_identity_snapshot<W>(
51 snapshot: &Snapshot<'_>,
52) -> Result<IdentitySnapshotSummary, PropertyError>
53where
54 W: PropertySnapshotMetaWord,
55{
56 let section =
57 snapshot
58 .section(W::IDENTITY_MODES_KIND)
59 .ok_or(PropertyError::MissingSnapshotSection {
60 kind: W::IDENTITY_MODES_KIND,
61 })?;
62 if section.version() != SNAPSHOT_PROPERTY_VERSION {
63 return Err(PropertyError::SnapshotSectionVersion {
64 kind: W::IDENTITY_MODES_KIND,
65 version: section.version(),
66 });
67 }
68 let records: &[IdentityModeRecord<W>] =
69 section
70 .try_as_slice()
71 .map_err(|error| PropertyError::SnapshotSectionView {
72 kind: W::IDENTITY_MODES_KIND,
73 error,
74 })?;
75 let records = validate_identity_records::<W>(snapshot, records)?;
76 Ok(IdentitySnapshotSummary { records })
77}
78
79#[derive(Clone, Debug, Eq, PartialEq)]
85#[must_use]
86pub struct EncodedPropertySnapshot {
87 pub descriptors: Vec<u8>,
89 pub data: Vec<u8>,
91}
92
93#[derive(Clone, Debug, Eq, PartialEq)]
99#[must_use]
100pub struct PropertySnapshotSummary {
101 pub layer_count: usize,
103 pub total_logical_values: usize,
105}
106
107#[derive(Clone, Debug)]
118#[must_use]
119#[non_exhaustive]
120pub enum DecodedPropertyData {
121 Dense {
123 values: ArrayRef,
125 },
126 Sparse {
128 indices: ArrayRef,
130 values: ArrayRef,
132 default: Option<ArrayRef>,
134 },
135}
136
137#[derive(Clone, Debug)]
149#[must_use]
150pub struct DecodedPropertyLayer {
151 pub layer_id: u64,
153 pub name: String,
155 pub id_family: IdFamily,
157 pub role: LayerRole,
159 pub storage: StorageMode,
161 pub logical_len: usize,
163 pub data: DecodedPropertyData,
165}
166
167#[derive(Clone, Copy, Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
169#[repr(C)]
170pub(crate) struct PropertySnapshotHeader {
171 record_count: U64<LE>,
173 record_bytes: U64<LE>,
175}
176
177#[derive(Clone, Copy, Debug, Eq, FromBytes, Immutable, IntoBytes, KnownLayout, PartialEq)]
179#[repr(C)]
180pub struct PropertySnapshotRecord<W>
181where
182 W: PropertySnapshotMetaWord,
183{
184 layer_id: W::LittleEndianWord,
186 name_offset: W::LittleEndianWord,
188 name_len: W::LittleEndianWord,
190 id_family: W::LittleEndianWord,
192 role: W::LittleEndianWord,
194 storage: W::LittleEndianWord,
196 missing_policy: W::LittleEndianWord,
198 logical_len: W::LittleEndianWord,
200 value_count: W::LittleEndianWord,
202 value_data_offset: W::LittleEndianWord,
204 value_data_len: W::LittleEndianWord,
206 default_data_offset: W::LittleEndianWord,
208 default_data_len: W::LittleEndianWord,
210 reserved: W::LittleEndianWord,
212}
213
214pub fn encode_property_snapshot<W, Id, I>(
225 layers: &[PropertyLayer<Id, I>],
226) -> Result<EncodedPropertySnapshot, PropertyError>
227where
228 W: PropertySnapshotMetaWord,
229 Id: Copy + Into<u64> + TryInto<W>,
230 I: PropertyIndex,
231{
232 let mut encoder = PropertySnapshotEncoder::<W>::with_capacity(layers.len());
233 for layer in layers {
234 encoder.append::<Id, I>(layer)?;
235 }
236 encoder.finish()
237}
238
239pub fn encode_graph_property_snapshot<W, Id, NodeIndex, EdgeIndex>(
249 layers: GraphPropertyLayers<'_, Id, NodeIndex, EdgeIndex>,
250) -> Result<EncodedPropertySnapshot, PropertyError>
251where
252 W: PropertySnapshotMetaWord,
253 Id: Copy + Into<u64> + TryInto<W>,
254 NodeIndex: PropertyIndex,
255 EdgeIndex: PropertyIndex,
256{
257 let mut encoder = PropertySnapshotEncoder::<W>::with_capacity(
258 layers.element.len().saturating_add(layers.relation.len()),
259 );
260 for layer in layers.element {
261 encoder.append::<Id, NodeIndex>(layer)?;
262 }
263 for layer in layers.relation {
264 encoder.append::<Id, EdgeIndex>(layer)?;
265 }
266 encoder.finish()
267}
268
269pub fn encode_hyper_property_snapshot<W, Id, VertexIndex, RelationIndex, IncidenceIndex>(
279 layers: HyperPropertyLayers<'_, Id, VertexIndex, RelationIndex, IncidenceIndex>,
280) -> Result<EncodedPropertySnapshot, PropertyError>
281where
282 W: PropertySnapshotMetaWord,
283 Id: Copy + Into<u64> + TryInto<W>,
284 VertexIndex: PropertyIndex,
285 RelationIndex: PropertyIndex,
286 IncidenceIndex: PropertyIndex,
287{
288 let mut encoder = PropertySnapshotEncoder::<W>::with_capacity(
289 layers
290 .element
291 .len()
292 .saturating_add(layers.relation.len())
293 .saturating_add(layers.incidence.len()),
294 );
295 for layer in layers.element {
296 encoder.append::<Id, VertexIndex>(layer)?;
297 }
298 for layer in layers.relation {
299 encoder.append::<Id, RelationIndex>(layer)?;
300 }
301 for layer in layers.incidence {
302 encoder.append::<Id, IncidenceIndex>(layer)?;
303 }
304 encoder.finish()
305}
306
307struct PropertySnapshotEncoder<W>
318where
319 W: PropertySnapshotMetaWord,
320{
321 data: Vec<u8>,
323 strings: Vec<u8>,
325 records: Vec<PropertySnapshotRecord<W>>,
327 names: BTreeSet<(IdFamily, LayerName)>,
329 ids: BTreeSet<u64>,
331}
332
333impl<W> PropertySnapshotEncoder<W>
334where
335 W: PropertySnapshotMetaWord,
336{
337 fn with_capacity(capacity: usize) -> Self {
339 Self {
340 data: Vec::new(),
341 strings: Vec::new(),
342 records: Vec::with_capacity(capacity),
343 names: BTreeSet::new(),
344 ids: BTreeSet::new(),
345 }
346 }
347
348 fn append<Id, I>(&mut self, layer: &PropertyLayer<Id, I>) -> Result<(), PropertyError>
350 where
351 Id: Copy + Into<u64> + TryInto<W>,
352 I: PropertyIndex,
353 {
354 let descriptor = layer.descriptor();
355 if !self
356 .names
357 .insert((descriptor.id_family, descriptor.name.clone()))
358 {
359 return Err(PropertyError::DuplicateName {
360 id_family: descriptor.id_family,
361 name: descriptor.name.clone(),
362 });
363 }
364 let diagnostic_layer_id = descriptor.layer_id.0.into();
365 if !self.ids.insert(diagnostic_layer_id) {
366 return Err(PropertyError::DuplicateLayerId {
367 layer_id: diagnostic_layer_id,
368 });
369 }
370 let name_offset = append_string(&mut self.strings, descriptor.name.as_str());
371 let value_data_offset = self.data.len();
372 let layer_data = encode_layer_value_ipc(layer)?;
373 let value_data_len = layer_data.len();
374 self.data.extend_from_slice(&layer_data);
375 let (default_data_offset, default_data_len) =
376 encode_layer_default_ipc(layer)?.map_or((0, 0), |default_data| {
377 let offset = self.data.len();
378 let len = default_data.len();
379 self.data.extend_from_slice(&default_data);
380 (offset, len)
381 });
382 let layer_id = descriptor.layer_id.0.try_into().map_err(|_error| {
383 PropertyError::SnapshotDescriptorMismatch {
384 reason: "layer ID does not fit selected metadata width",
385 }
386 })?;
387 self.records.push(PropertySnapshotRecord::<W> {
388 layer_id: layer_id.to_le_word(),
389 name_offset: le_word::<W>(name_offset)?,
390 name_len: le_word::<W>(descriptor.name.as_str().len())?,
391 id_family: le_word::<W>(id_family_tag(descriptor.id_family) as usize)?,
392 role: le_word::<W>(layer_role_tag(descriptor.role) as usize)?,
393 storage: le_word::<W>(storage_tag(descriptor.storage) as usize)?,
394 missing_policy: le_word::<W>(missing_policy_tag(descriptor.storage) as usize)?,
395 logical_len: le_word::<W>(layer.len())?,
396 value_count: le_word::<W>(layer_value_count(layer))?,
397 value_data_offset: le_word::<W>(value_data_offset)?,
398 value_data_len: le_word::<W>(value_data_len)?,
399 default_data_offset: le_word::<W>(default_data_offset)?,
400 default_data_len: le_word::<W>(default_data_len)?,
401 reserved: le_word::<W>(0)?,
402 });
403 Ok(())
404 }
405
406 fn finish(self) -> Result<EncodedPropertySnapshot, PropertyError> {
408 let record_bytes = self
409 .records
410 .len()
411 .checked_mul(core::mem::size_of::<PropertySnapshotRecord<W>>())
412 .ok_or(PropertyError::SnapshotDescriptorMismatch {
413 reason: "record byte length overflow",
414 })?;
415 let header = PropertySnapshotHeader {
416 record_count: U64::new(usize_to_u64(self.records.len())?),
417 record_bytes: U64::new(usize_to_u64(record_bytes)?),
418 };
419 let mut descriptor_bytes = Vec::with_capacity(
420 core::mem::size_of::<PropertySnapshotHeader>() + record_bytes + self.strings.len(),
421 );
422 descriptor_bytes.extend_from_slice(header.as_bytes());
423 descriptor_bytes.extend_from_slice(self.records.as_bytes());
424 descriptor_bytes.extend_from_slice(&self.strings);
425 Ok(EncodedPropertySnapshot {
426 descriptors: descriptor_bytes,
427 data: self.data,
428 })
429 }
430}
431
432pub fn validate_property_snapshot<W>(
444 snapshot: &Snapshot<'_>,
445) -> Result<PropertySnapshotSummary, PropertyError>
446where
447 W: PropertySnapshotMetaWord,
448{
449 let descriptor_section = snapshot.section(W::PROPERTY_DESCRIPTORS_KIND).ok_or(
450 PropertyError::MissingSnapshotSection {
451 kind: W::PROPERTY_DESCRIPTORS_KIND,
452 },
453 )?;
454 let data_section =
455 snapshot
456 .section(W::PROPERTY_DATA_KIND)
457 .ok_or(PropertyError::MissingSnapshotSection {
458 kind: W::PROPERTY_DATA_KIND,
459 })?;
460 if descriptor_section.version() != SNAPSHOT_PROPERTY_VERSION {
461 return Err(PropertyError::SnapshotSectionVersion {
462 kind: W::PROPERTY_DESCRIPTORS_KIND,
463 version: descriptor_section.version(),
464 });
465 }
466 if data_section.version() != SNAPSHOT_PROPERTY_VERSION {
467 return Err(PropertyError::SnapshotSectionVersion {
468 kind: W::PROPERTY_DATA_KIND,
469 version: data_section.version(),
470 });
471 }
472 validate_property_sections::<W>(descriptor_section.bytes(), data_section.bytes())
473}
474
475pub fn validate_property_sections<W>(
485 descriptor_bytes: &[u8],
486 data_bytes: &[u8],
487) -> Result<PropertySnapshotSummary, PropertyError>
488where
489 W: PropertySnapshotMetaWord,
490{
491 let header_len = core::mem::size_of::<PropertySnapshotHeader>();
492 if descriptor_bytes.len() < header_len {
493 return Err(PropertyError::SnapshotDataLength {
494 reason: "descriptor header is truncated",
495 });
496 }
497 let record_count = read_u64_le(&descriptor_bytes[0..8])?;
498 let record_bytes = read_u64_le(&descriptor_bytes[8..16])?;
499 let record_count_usize = u64_to_usize(record_count)?;
500 let record_bytes_usize = u64_to_usize(record_bytes)?;
501 let expected_record_bytes = record_count_usize
502 .checked_mul(core::mem::size_of::<PropertySnapshotRecord<W>>())
503 .ok_or(PropertyError::SnapshotDescriptorMismatch {
504 reason: "record byte length overflow",
505 })?;
506 if record_bytes_usize != expected_record_bytes {
507 return Err(PropertyError::SnapshotDescriptorMismatch {
508 reason: "record byte length does not match record count",
509 });
510 }
511 let record_start = header_len;
512 let string_start = record_start.checked_add(record_bytes_usize).ok_or(
513 PropertyError::SnapshotDescriptorMismatch {
514 reason: "descriptor section length overflow",
515 },
516 )?;
517 if descriptor_bytes.len() < string_start {
518 return Err(PropertyError::SnapshotDataLength {
519 reason: "descriptor records are truncated",
520 });
521 }
522 let record_bytes_slice = &descriptor_bytes[record_start..string_start];
523 let string_bytes = &descriptor_bytes[string_start..];
524 let mut names: BTreeSet<(IdFamily, &str)> = BTreeSet::new();
525 let mut ids: BTreeSet<u64> = BTreeSet::new();
526 let mut ranges = Vec::with_capacity(record_count_usize);
527 let mut total_logical_values = 0_usize;
528 for position in 0..record_count_usize {
529 let start = position * core::mem::size_of::<PropertySnapshotRecord<W>>();
530 let record = parse_property_record::<W>(&record_bytes_slice[start..])?;
531 let id_family = id_family_from_tag(le_word_to_u32::<W>(record.id_family)?)?;
532 let _role = layer_role_from_tag(le_word_to_u32::<W>(record.role)?)?;
533 let storage = storage_from_tags(
534 le_word_to_u32::<W>(record.storage)?,
535 le_word_to_u32::<W>(record.missing_policy)?,
536 )?;
537 let name = read_snapshot_str(
538 string_bytes,
539 le_word_to_usize::<W>(record.name_offset)?,
540 le_word_to_usize::<W>(record.name_len)?,
541 )?;
542 let layer_id = le_word_to_u64::<W>(record.layer_id);
543 if !ids.insert(layer_id) {
544 return Err(PropertyError::DuplicateLayerId { layer_id });
545 }
546 if !names.insert((id_family, name)) {
547 return Err(PropertyError::DuplicateName {
548 id_family,
549 name: LayerName::try_new(name)?,
550 });
551 }
552 let layer_ranges = validate_property_record_data::<W>(&record, storage, data_bytes)?;
553 ranges.extend(layer_ranges);
554 total_logical_values = total_logical_values
555 .checked_add(le_word_to_usize::<W>(record.logical_len)?)
556 .ok_or(PropertyError::SnapshotDescriptorMismatch {
557 reason: "logical value total overflow",
558 })?;
559 }
560 validate_data_coverage(&mut ranges, data_bytes.len())?;
561 Ok(PropertySnapshotSummary {
562 layer_count: record_count_usize,
563 total_logical_values,
564 })
565}
566
567impl DecodedPropertyLayer {
568 pub fn decode_all<W>(snapshot: &Snapshot<'_>) -> Result<Vec<Self>, PropertyError>
591 where
592 W: PropertySnapshotMetaWord,
593 {
594 let descriptor_section = snapshot.section(W::PROPERTY_DESCRIPTORS_KIND).ok_or(
595 PropertyError::MissingSnapshotSection {
596 kind: W::PROPERTY_DESCRIPTORS_KIND,
597 },
598 )?;
599 let data_section = snapshot.section(W::PROPERTY_DATA_KIND).ok_or(
600 PropertyError::MissingSnapshotSection {
601 kind: W::PROPERTY_DATA_KIND,
602 },
603 )?;
604 if descriptor_section.version() != SNAPSHOT_PROPERTY_VERSION {
605 return Err(PropertyError::SnapshotSectionVersion {
606 kind: W::PROPERTY_DESCRIPTORS_KIND,
607 version: descriptor_section.version(),
608 });
609 }
610 if data_section.version() != SNAPSHOT_PROPERTY_VERSION {
611 return Err(PropertyError::SnapshotSectionVersion {
612 kind: W::PROPERTY_DATA_KIND,
613 version: data_section.version(),
614 });
615 }
616 Self::decode_sections::<W>(descriptor_section.bytes(), data_section.bytes())
617 }
618
619 pub fn decode_sections<W>(
636 descriptor_bytes: &[u8],
637 data_bytes: &[u8],
638 ) -> Result<Vec<Self>, PropertyError>
639 where
640 W: PropertySnapshotMetaWord,
641 {
642 let _summary = validate_property_sections::<W>(descriptor_bytes, data_bytes)?;
643 let header_len = core::mem::size_of::<PropertySnapshotHeader>();
644 let record_count_usize = u64_to_usize(read_u64_le(&descriptor_bytes[0..8])?)?;
645 let record_bytes_usize = u64_to_usize(read_u64_le(&descriptor_bytes[8..16])?)?;
646 let record_start = header_len;
647 let string_start = record_start.checked_add(record_bytes_usize).ok_or(
648 PropertyError::SnapshotDescriptorMismatch {
649 reason: "descriptor section length overflow",
650 },
651 )?;
652 let record_bytes_slice = &descriptor_bytes[record_start..string_start];
653 let string_bytes = &descriptor_bytes[string_start..];
654 let record_size = core::mem::size_of::<PropertySnapshotRecord<W>>();
655 let mut out = Vec::with_capacity(record_count_usize);
656 for position in 0..record_count_usize {
657 let start = position.checked_mul(record_size).ok_or(
658 PropertyError::SnapshotDescriptorMismatch {
659 reason: "record offset overflow",
660 },
661 )?;
662 let record = parse_property_record::<W>(&record_bytes_slice[start..])?;
663 let layer_id = le_word_to_u64::<W>(record.layer_id);
664 let id_family = id_family_from_tag(le_word_to_u32::<W>(record.id_family)?)?;
665 let role = layer_role_from_tag(le_word_to_u32::<W>(record.role)?)?;
666 let storage = storage_from_tags(
667 le_word_to_u32::<W>(record.storage)?,
668 le_word_to_u32::<W>(record.missing_policy)?,
669 )?;
670 let name = read_snapshot_str(
671 string_bytes,
672 le_word_to_usize::<W>(record.name_offset)?,
673 le_word_to_usize::<W>(record.name_len)?,
674 )?
675 .to_string();
676 let logical_len = le_word_to_usize::<W>(record.logical_len)?;
677 let value_offset = le_word_to_usize::<W>(record.value_data_offset)?;
678 let value_len = le_word_to_usize::<W>(record.value_data_len)?;
679 let value_end = checked_end(value_offset, value_len, data_bytes.len())?;
680 let value_batch = read_one_ipc_batch(&data_bytes[value_offset..value_end])?;
681 let default_offset = le_word_to_usize::<W>(record.default_data_offset)?;
682 let default_len = le_word_to_usize::<W>(record.default_data_len)?;
683 let default_batch = if default_len == 0 {
684 None
685 } else {
686 let default_end = checked_end(default_offset, default_len, data_bytes.len())?;
687 Some(read_one_ipc_batch(
688 &data_bytes[default_offset..default_end],
689 )?)
690 };
691 let data = match storage {
692 StorageMode::Dense => DecodedPropertyData::Dense {
693 values: Arc::clone(value_batch.column(0)),
694 },
695 StorageMode::Sparse { .. } => DecodedPropertyData::Sparse {
696 indices: Arc::clone(value_batch.column(0)),
697 values: Arc::clone(value_batch.column(1)),
698 default: default_batch
699 .as_ref()
700 .map(|batch| Arc::clone(batch.column(0))),
701 },
702 };
703 out.push(Self {
704 layer_id,
705 name,
706 id_family,
707 role,
708 storage,
709 logical_len,
710 data,
711 });
712 }
713 Ok(out)
714 }
715}
716
717fn validate_identity_records<W>(
723 snapshot: &Snapshot<'_>,
724 records: &[IdentityModeRecord<W>],
725) -> Result<Vec<IdentityModeSummary>, PropertyError>
726where
727 W: PropertySnapshotMetaWord,
728{
729 let mut seen = BTreeSet::new();
730 let mut summaries = Vec::with_capacity(records.len());
731 for record in records {
732 let family = record.id_family()?;
733 if !seen.insert(family) {
734 return Err(PropertyError::SnapshotDescriptorMismatch {
735 reason: "duplicate identity family mode record",
736 });
737 }
738 let mode = record.mode()?;
739 let local_len = record.local_len();
740 match mode {
741 IdentityMapMode::LocalEqualsCanonical => {}
742 IdentityMapMode::ExplicitMap => {
743 validate_identity_map_section::<W>(snapshot, family, local_len)?;
744 }
745 }
746 summaries.push(IdentityModeSummary {
747 id_family: family,
748 mode,
749 local_len,
750 });
751 }
752 Ok(summaries)
753}
754
755fn validate_identity_map_section<W>(
761 snapshot: &Snapshot<'_>,
762 id_family: IdFamily,
763 required: usize,
764) -> Result<(), PropertyError>
765where
766 W: PropertySnapshotMetaWord,
767{
768 let kind = identity_map_kind::<W>(id_family);
769 let section = snapshot
770 .section(kind)
771 .ok_or(PropertyError::MissingIdentityMap { id_family })?;
772 if section.version() != SNAPSHOT_PROPERTY_VERSION {
773 return Err(PropertyError::SnapshotSectionVersion {
774 kind,
775 version: section.version(),
776 });
777 }
778 let map: &[W::LittleEndianWord] = section
779 .try_as_slice()
780 .map_err(|error| PropertyError::SnapshotSectionView { kind, error })?;
781 if map.len() != required {
782 return Err(PropertyError::IdentityMapLength {
783 id_family,
784 required,
785 actual: map.len(),
786 });
787 }
788 Ok(())
789}
790
791const fn identity_map_kind<W>(id_family: IdFamily) -> u32
797where
798 W: PropertySnapshotMetaWord,
799{
800 match id_family {
801 IdFamily::Element => W::ELEMENT_IDENTITY_MAP_KIND,
802 IdFamily::Relation => W::RELATION_IDENTITY_MAP_KIND,
803 IdFamily::Incidence => W::INCIDENCE_IDENTITY_MAP_KIND,
804 }
805}
806
807fn append_string(strings: &mut Vec<u8>, value: &str) -> usize {
813 let offset = strings.len();
814 strings.extend_from_slice(value.as_bytes());
815 offset
816}
817
818fn layer_value_count<Id, I>(layer: &PropertyLayer<Id, I>) -> usize
824where
825 I: PropertyIndex,
826{
827 match layer.data() {
828 PropertyLayerData::Dense { values } => values.len(),
829 PropertyLayerData::Sparse { indices, .. } => indices.len(),
830 }
831}
832
833fn encode_layer_value_ipc<Id, I>(layer: &PropertyLayer<Id, I>) -> Result<Vec<u8>, PropertyError>
839where
840 I: PropertyIndex,
841{
842 let (schema, columns) = match layer.data() {
843 PropertyLayerData::Dense { values } => {
844 let schema = Arc::new(Schema::new(vec![layer.descriptor().arrow_field.clone()]));
845 (schema, vec![Arc::clone(values)])
846 }
847 PropertyLayerData::Sparse {
848 indices,
849 values,
850 default: _,
851 } => {
852 let fields = vec![
853 Field::new("index", index_data_type::<I>(), false),
854 layer.descriptor().arrow_field.clone(),
855 ];
856 let columns: Vec<ArrayRef> = vec![Arc::clone(indices) as ArrayRef, Arc::clone(values)];
857 (Arc::new(Schema::new(fields)), columns)
858 }
859 };
860 write_one_ipc_batch(&schema, columns)
861}
862
863fn encode_layer_default_ipc<Id, I>(
869 layer: &PropertyLayer<Id, I>,
870) -> Result<Option<Vec<u8>>, PropertyError>
871where
872 I: PropertyIndex,
873{
874 let PropertyLayerData::Sparse {
875 default: Some(default),
876 ..
877 } = layer.data()
878 else {
879 return Ok(None);
880 };
881 let schema = Arc::new(Schema::new(vec![layer.descriptor().arrow_field.clone()]));
882 write_one_ipc_batch(&schema, vec![Arc::clone(default)]).map(Some)
883}
884
885fn write_one_ipc_batch(
891 schema: &Arc<Schema>,
892 columns: Vec<ArrayRef>,
893) -> Result<Vec<u8>, PropertyError> {
894 let batch = RecordBatch::try_new(Arc::clone(schema), columns).map_err(map_arrow_error)?;
895 let mut out = Vec::new();
896 {
897 let mut writer =
898 StreamWriter::try_new(&mut out, schema.as_ref()).map_err(map_arrow_error)?;
899 writer.write(&batch).map_err(map_arrow_error)?;
900 writer.finish().map_err(map_arrow_error)?;
901 }
902 Ok(out)
903}
904
905fn parse_property_record<W>(bytes: &[u8]) -> Result<PropertySnapshotRecord<W>, PropertyError>
911where
912 W: PropertySnapshotMetaWord,
913{
914 let need = core::mem::size_of::<PropertySnapshotRecord<W>>();
915 if bytes.len() < need {
916 return Err(PropertyError::SnapshotDataLength {
917 reason: "property record is truncated",
918 });
919 }
920 PropertySnapshotRecord::<W>::read_from_bytes(&bytes[..need]).map_err(|_error| {
921 PropertyError::SnapshotDataLength {
922 reason: "property record is truncated",
923 }
924 })
925}
926
927fn validate_property_record_data<W>(
933 record: &PropertySnapshotRecord<W>,
934 storage: StorageMode,
935 data: &[u8],
936) -> Result<Vec<core::ops::Range<usize>>, PropertyError>
937where
938 W: PropertySnapshotMetaWord,
939{
940 if le_word_to_u64::<W>(record.reserved) != 0 {
941 return Err(PropertyError::SnapshotDescriptorMismatch {
942 reason: "property descriptor reserved word must be zero",
943 });
944 }
945 let offset = le_word_to_usize::<W>(record.value_data_offset)?;
946 let len = le_word_to_usize::<W>(record.value_data_len)?;
947 let end = checked_end(offset, len, data.len())?;
948 let value_batch = read_one_ipc_batch(&data[offset..end])?;
949 let default_offset = le_word_to_usize::<W>(record.default_data_offset)?;
950 let default_len = le_word_to_usize::<W>(record.default_data_len)?;
951 let default_batch = if default_len == 0 {
952 None
953 } else {
954 let default_end = checked_end(default_offset, default_len, data.len())?;
955 Some(read_one_ipc_batch(&data[default_offset..default_end])?)
956 };
957 match storage {
958 StorageMode::Dense => {
959 if default_len != 0 {
960 return Err(PropertyError::SnapshotDescriptorMismatch {
961 reason: "dense property must not declare a default stream",
962 });
963 }
964 validate_dense_batch::<W>(record, &value_batch)?;
965 }
966 StorageMode::Sparse { missing } => {
967 validate_sparse_batch::<W>(record, missing, &value_batch, default_batch.as_ref())?;
968 }
969 }
970 let mut ranges = Vec::with_capacity(2);
971 ranges.push(offset..end);
972 if default_len != 0 {
973 ranges.push(default_offset..default_offset + default_len);
974 }
975 Ok(ranges)
976}
977
978fn read_one_ipc_batch(bytes: &[u8]) -> Result<RecordBatch, PropertyError> {
984 let reader = StreamReader::try_new(Cursor::new(bytes), None).map_err(map_arrow_error)?;
985 let mut batches = Vec::new();
986 for batch in reader {
987 batches.push(batch.map_err(map_arrow_error)?);
988 if batches.len() > 1 {
989 return Err(PropertyError::SnapshotDescriptorMismatch {
990 reason: "property IPC stream contains more than one batch",
991 });
992 }
993 }
994 let mut iter = batches.into_iter();
995 iter.next()
996 .ok_or(PropertyError::SnapshotDescriptorMismatch {
997 reason: "property IPC stream contains no batches",
998 })
999}
1000
1001fn validate_dense_batch<W>(
1007 record: &PropertySnapshotRecord<W>,
1008 batch: &RecordBatch,
1009) -> Result<(), PropertyError>
1010where
1011 W: PropertySnapshotMetaWord,
1012{
1013 if batch.num_columns() != 1 {
1014 return Err(PropertyError::SnapshotDescriptorMismatch {
1015 reason: "dense property batch must contain one column",
1016 });
1017 }
1018 let values = batch.column(0);
1019 if values.len() != le_word_to_usize::<W>(record.logical_len)?
1020 || values.len() != le_word_to_usize::<W>(record.value_count)?
1021 {
1022 return Err(PropertyError::SnapshotDataLength {
1023 reason: "dense property Arrow length does not match descriptor",
1024 });
1025 }
1026 validate_value_column(values.as_ref())
1027}
1028
1029fn validate_sparse_batch<W>(
1035 record: &PropertySnapshotRecord<W>,
1036 missing: MissingPolicy,
1037 value_batch: &RecordBatch,
1038 default_batch: Option<&RecordBatch>,
1039) -> Result<(), PropertyError>
1040where
1041 W: PropertySnapshotMetaWord,
1042{
1043 if value_batch.num_columns() != 2 {
1044 return Err(PropertyError::SnapshotDescriptorMismatch {
1045 reason: "sparse property value stream must contain index and value columns",
1046 });
1047 }
1048 let indexes = value_batch.column(0);
1049 let values = value_batch.column(1);
1050 let value_count = le_word_to_usize::<W>(record.value_count)?;
1051 if indexes.len() != value_count || values.len() != value_count {
1052 return Err(PropertyError::SnapshotDataLength {
1053 reason: "sparse property Arrow value count does not match descriptor",
1054 });
1055 }
1056 validate_value_column(values.as_ref())?;
1057 validate_sparse_index_column(indexes.as_ref(), le_word_to_usize::<W>(record.logical_len)?)?;
1058 match (missing, default_batch) {
1059 (MissingPolicy::Null, None) => {}
1060 (MissingPolicy::Null, Some(_)) => {
1061 return Err(PropertyError::SnapshotDescriptorMismatch {
1062 reason: "sparse-null property must not declare a default stream",
1063 });
1064 }
1065 (MissingPolicy::Default, Some(default_batch)) => {
1066 if default_batch.num_columns() != 1 {
1067 return Err(PropertyError::SnapshotDescriptorMismatch {
1068 reason: "sparse default stream must contain one column",
1069 });
1070 }
1071 let default = default_batch.column(0);
1072 if default.len() != 1 || default.data_type() != values.data_type() || default.is_null(0)
1073 {
1074 return Err(PropertyError::SnapshotDescriptorMismatch {
1075 reason: "sparse property default column is not a non-null matching scalar",
1076 });
1077 }
1078 }
1079 (MissingPolicy::Default, None) => {
1080 return Err(PropertyError::SnapshotDescriptorMismatch {
1081 reason: "sparse-default property is missing its default stream",
1082 });
1083 }
1084 }
1085 Ok(())
1086}
1087
1088fn validate_value_column(values: &dyn Array) -> Result<(), PropertyError> {
1094 if values.null_count() > values.len() {
1095 return Err(PropertyError::SnapshotDescriptorMismatch {
1096 reason: "Arrow value column has invalid null accounting",
1097 });
1098 }
1099 Ok(())
1100}
1101
1102fn validate_data_coverage(
1108 ranges: &mut [core::ops::Range<usize>],
1109 data_len: usize,
1110) -> Result<(), PropertyError> {
1111 ranges.sort_by_key(|range| range.start);
1112 let mut cursor = 0_usize;
1113 for range in ranges {
1114 if range.start != cursor {
1115 return Err(PropertyError::SnapshotDescriptorMismatch {
1116 reason: "property data ranges leave a gap or overlap",
1117 });
1118 }
1119 cursor = range.end;
1120 }
1121 if cursor != data_len {
1122 return Err(PropertyError::SnapshotDescriptorMismatch {
1123 reason: "property data section has trailing bytes",
1124 });
1125 }
1126 Ok(())
1127}
1128
1129fn read_snapshot_str(bytes: &[u8], offset: usize, len: usize) -> Result<&str, PropertyError> {
1135 let end = checked_end(offset, len, bytes.len())?;
1136 core::str::from_utf8(&bytes[offset..end])
1137 .map_err(|_error| PropertyError::SnapshotInvalidUtf8 { offset })
1138}
1139
1140fn checked_end(offset: usize, len: usize, available: usize) -> Result<usize, PropertyError> {
1146 let end = offset
1147 .checked_add(len)
1148 .ok_or(PropertyError::SnapshotRangeOutOfBounds {
1149 offset,
1150 len,
1151 available,
1152 })?;
1153 if end > available {
1154 Err(PropertyError::SnapshotRangeOutOfBounds {
1155 offset,
1156 len,
1157 available,
1158 })
1159 } else {
1160 Ok(end)
1161 }
1162}
1163
1164fn read_u64_le(bytes: &[u8]) -> Result<u64, PropertyError> {
1170 if bytes.len() < core::mem::size_of::<u64>() {
1171 return Err(PropertyError::SnapshotDataLength {
1172 reason: "u64 field is truncated",
1173 });
1174 }
1175 let mut array = [0_u8; 8];
1176 array.copy_from_slice(&bytes[..8]);
1177 Ok(u64::from_le_bytes(array))
1178}
1179
1180fn u64_to_usize(value: u64) -> Result<usize, PropertyError> {
1186 usize::try_from(value).map_err(|_error| PropertyError::SnapshotDescriptorMismatch {
1187 reason: "snapshot length does not fit usize",
1188 })
1189}
1190
1191fn usize_to_u64(value: usize) -> Result<u64, PropertyError> {
1197 u64::try_from(value).map_err(|_error| PropertyError::LengthDoesNotFitU64 { value })
1198}
1199
1200fn validate_sparse_index_column(indices: &dyn Array, len: usize) -> Result<(), PropertyError> {
1213 fn typed<I>(indices: &dyn Array, len: usize) -> Result<(), PropertyError>
1215 where
1216 I: PropertyIndex,
1217 {
1218 let typed = indices
1219 .as_any()
1220 .downcast_ref::<PrimitiveArray<I::ArrowType>>()
1221 .ok_or(PropertyError::SnapshotDescriptorMismatch {
1222 reason: "sparse property index column does not match its declared Arrow width",
1223 })?;
1224 validate_sparse_indices::<I>(typed, len)
1225 }
1226
1227 match indices.data_type() {
1228 DataType::UInt16 => typed::<u16>(indices, len),
1229 DataType::UInt32 => typed::<u32>(indices, len),
1230 DataType::UInt64 => typed::<u64>(indices, len),
1231 _ => Err(PropertyError::SnapshotDescriptorMismatch {
1232 reason: "sparse property index column is not UInt16, UInt32, or UInt64",
1233 }),
1234 }
1235}
1236
1237const fn index_data_type<I>() -> DataType
1243where
1244 I: PropertyIndex,
1245{
1246 if core::mem::size_of::<I>() == core::mem::size_of::<u16>() {
1247 DataType::UInt16
1248 } else if core::mem::size_of::<I>() == core::mem::size_of::<u32>() {
1249 DataType::UInt32
1250 } else {
1251 DataType::UInt64
1252 }
1253}