Skip to main content

oxgraph_property/
snapshot.rs

1//! Property and identity snapshot encoding, validation, and decoding.
2//!
3//! Owns the wire record/header types, the [`PropertySnapshotEncoder`], the
4//! `encode_*` / `validate_*` entry points, the [`DecodedPropertyLayer`]
5//! decoder, and the snapshot-only byte/Arrow-IPC helpers.
6
7use std::{
8    collections::BTreeSet,
9    io::Cursor,
10    string::{String, ToString},
11    sync::Arc,
12    vec::Vec,
13};
14
15use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch};
16use arrow_ipc::{reader::StreamReader, writer::StreamWriter};
17use arrow_schema::{DataType, Field, Schema};
18use oxgraph_snapshot::Snapshot;
19use zerocopy::{
20    FromBytes, Immutable, IntoBytes, KnownLayout,
21    byteorder::{LE, U64},
22};
23
24use crate::{
25    model::{
26        IdFamily, IdentityMapMode, IdentityModeRecord, IdentityModeSummary,
27        IdentitySnapshotSummary, LayerName, LayerRole, MissingPolicy, PropertyError, PropertyLayer,
28        PropertyLayerData, StorageMode, id_family_from_tag, id_family_tag, layer_role_from_tag,
29        layer_role_tag, map_arrow_error, missing_policy_tag, storage_from_tags, storage_tag,
30        validate_sparse_indices,
31    },
32    weights::{GraphPropertyLayers, HyperPropertyLayers},
33    width::{
34        PropertyIndex, PropertySnapshotMetaWord, SNAPSHOT_PROPERTY_VERSION, le_word,
35        le_word_to_u32, le_word_to_u64, le_word_to_usize,
36    },
37};
38
39/// Validates identity mode and explicit map sections in a snapshot.
40///
41/// # Errors
42///
43/// Returns [`PropertyError`] if mode records are malformed, duplicated, or if
44/// an explicit map is missing or length-inconsistent.
45///
46/// # Performance
47///
48/// This function is `O(s + f)` for snapshot section count `s` and identity
49/// family count `f`.
50pub fn validate_identity_snapshot<W>(
51    snapshot: &Snapshot<'_>,
52) -> Result<IdentitySnapshotSummary, PropertyError>
53where
54    W: PropertySnapshotMetaWord,
55{
56    let section =
57        snapshot
58            .section(W::IDENTITY_MODES_KIND)
59            .ok_or(PropertyError::MissingSnapshotSection {
60                kind: W::IDENTITY_MODES_KIND,
61            })?;
62    if section.version() != SNAPSHOT_PROPERTY_VERSION {
63        return Err(PropertyError::SnapshotSectionVersion {
64            kind: W::IDENTITY_MODES_KIND,
65            version: section.version(),
66        });
67    }
68    let records: &[IdentityModeRecord<W>] =
69        section
70            .try_as_slice()
71            .map_err(|error| PropertyError::SnapshotSectionView {
72                kind: W::IDENTITY_MODES_KIND,
73                error,
74            })?;
75    let records = validate_identity_records::<W>(snapshot, records)?;
76    Ok(IdentitySnapshotSummary { records })
77}
78
79/// Encoded property descriptor and Arrow IPC data payloads.
80///
81/// # Performance
82///
83/// Cloning is `O(descriptor bytes + data bytes)`.
84#[derive(Clone, Debug, Eq, PartialEq)]
85#[must_use]
86pub struct EncodedPropertySnapshot {
87    /// Payload for the selected property descriptor section kind.
88    pub descriptors: Vec<u8>,
89    /// Payload for the selected property data section kind.
90    pub data: Vec<u8>,
91}
92
93/// Summary returned after property snapshot validation.
94///
95/// # Performance
96///
97/// Cloning is `O(layer_count)`.
98#[derive(Clone, Debug, Eq, PartialEq)]
99#[must_use]
100pub struct PropertySnapshotSummary {
101    /// Number of validated property layers.
102    pub layer_count: usize,
103    /// Total logical values across layers.
104    pub total_logical_values: usize,
105}
106
107/// Arrow payload of a property layer decoded from snapshot bytes.
108///
109/// Dense layers expose a single value array indexed by logical position. Sparse
110/// layers expose the explicit `(indices, values)` pair plus an optional
111/// non-null default array; the index array's [`arrow_schema::DataType`] matches
112/// the encoded sparse index width.
113///
114/// # Performance
115///
116/// Cloning is `O(1)` (each variant holds [`ArrayRef`] handles).
117#[derive(Clone, Debug)]
118#[must_use]
119#[non_exhaustive]
120pub enum DecodedPropertyData {
121    /// Dense Arrow values; `values.len()` equals the descriptor's logical length.
122    Dense {
123        /// Decoded value Arrow array.
124        values: ArrayRef,
125    },
126    /// Sparse Arrow values plus optional default scalar.
127    Sparse {
128        /// Sparse index Arrow array; `indices.len() == values.len()`.
129        indices: ArrayRef,
130        /// Sparse value Arrow array; `values.len() == indices.len()`.
131        values: ArrayRef,
132        /// Length-one non-null default array when [`MissingPolicy::Default`] is in effect.
133        default: Option<ArrayRef>,
134    },
135}
136
137/// One property layer decoded from snapshot bytes.
138///
139/// Returned by [`DecodedPropertyLayer::decode_all`] and
140/// [`DecodedPropertyLayer::decode_sections`].
141/// Field types mirror the descriptor record without exposing the wire word
142/// width, so callers can introspect the layer without referencing
143/// [`PropertySnapshotMetaWord`] directly.
144///
145/// # Performance
146///
147/// Cloning is `O(name bytes)` (the Arrow payload clones in `O(1)`).
148#[derive(Clone, Debug)]
149#[must_use]
150pub struct DecodedPropertyLayer {
151    /// Stable layer ID as decoded from the descriptor record.
152    pub layer_id: u64,
153    /// Layer name decoded from the descriptor string table.
154    pub name: String,
155    /// ID family the layer is keyed by.
156    pub id_family: IdFamily,
157    /// Layer role tag.
158    pub role: LayerRole,
159    /// Storage mode (carrying the sparse missing policy when applicable).
160    pub storage: StorageMode,
161    /// Logical layer length declared by the descriptor record.
162    pub logical_len: usize,
163    /// Arrow payload decoded from the layer's IPC value (and optional default) stream.
164    pub data: DecodedPropertyData,
165}
166
167/// Wire header for the property descriptor section.
168#[derive(Clone, Copy, Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
169#[repr(C)]
170pub(crate) struct PropertySnapshotHeader {
171    /// Number of descriptor records.
172    record_count: U64<LE>,
173    /// Byte length occupied by descriptor records after this header.
174    record_bytes: U64<LE>,
175}
176
177/// Wire descriptor record for one property layer.
178#[derive(Clone, Copy, Debug, Eq, FromBytes, Immutable, IntoBytes, KnownLayout, PartialEq)]
179#[repr(C)]
180pub struct PropertySnapshotRecord<W>
181where
182    W: PropertySnapshotMetaWord,
183{
184    /// Stable layer ID.
185    layer_id: W::LittleEndianWord,
186    /// Offset of layer name in descriptor string table.
187    name_offset: W::LittleEndianWord,
188    /// Length of layer name in descriptor string table.
189    name_len: W::LittleEndianWord,
190    /// ID-family tag.
191    id_family: W::LittleEndianWord,
192    /// Layer-role tag.
193    role: W::LittleEndianWord,
194    /// Storage tag.
195    storage: W::LittleEndianWord,
196    /// Missing-policy tag.
197    missing_policy: W::LittleEndianWord,
198    /// Logical layer length.
199    logical_len: W::LittleEndianWord,
200    /// Explicit sparse value count, or dense value count.
201    value_count: W::LittleEndianWord,
202    /// Offset of the value Arrow IPC stream in the property data section.
203    value_data_offset: W::LittleEndianWord,
204    /// Byte length of the value Arrow IPC stream in the property data section.
205    value_data_len: W::LittleEndianWord,
206    /// Offset of the sparse-default Arrow IPC stream in the property data section.
207    default_data_offset: W::LittleEndianWord,
208    /// Byte length of the sparse-default Arrow IPC stream in the property data section.
209    default_data_len: W::LittleEndianWord,
210    /// Reserved for future descriptor flags.
211    reserved: W::LittleEndianWord,
212}
213
214/// Encodes property descriptor and Arrow IPC data sections.
215///
216/// # Errors
217///
218/// Returns [`PropertyError`] for duplicate layer IDs/names or inconsistent
219/// descriptor/storage combinations.
220///
221/// # Performance
222///
223/// This function is `O(l + total values + total name bytes)` for `l` layers.
224pub fn encode_property_snapshot<W, Id, I>(
225    layers: &[PropertyLayer<Id, I>],
226) -> Result<EncodedPropertySnapshot, PropertyError>
227where
228    W: PropertySnapshotMetaWord,
229    Id: Copy + Into<u64> + TryInto<W>,
230    I: PropertyIndex,
231{
232    let mut encoder = PropertySnapshotEncoder::<W>::with_capacity(layers.len());
233    for layer in layers {
234        encoder.append::<Id, I>(layer)?;
235    }
236    encoder.finish()
237}
238
239/// Encodes graph property layers into descriptor/data payloads.
240///
241/// # Errors
242///
243/// Returns [`PropertyError`] if any layer metadata or Arrow payload is invalid.
244///
245/// # Performance
246///
247/// This function is `O(l + total values + total name bytes)`.
248pub fn encode_graph_property_snapshot<W, Id, NodeIndex, EdgeIndex>(
249    layers: GraphPropertyLayers<'_, Id, NodeIndex, EdgeIndex>,
250) -> Result<EncodedPropertySnapshot, PropertyError>
251where
252    W: PropertySnapshotMetaWord,
253    Id: Copy + Into<u64> + TryInto<W>,
254    NodeIndex: PropertyIndex,
255    EdgeIndex: PropertyIndex,
256{
257    let mut encoder = PropertySnapshotEncoder::<W>::with_capacity(
258        layers.element.len().saturating_add(layers.relation.len()),
259    );
260    for layer in layers.element {
261        encoder.append::<Id, NodeIndex>(layer)?;
262    }
263    for layer in layers.relation {
264        encoder.append::<Id, EdgeIndex>(layer)?;
265    }
266    encoder.finish()
267}
268
269/// Encodes hypergraph property layers into descriptor/data payloads.
270///
271/// # Errors
272///
273/// Returns [`PropertyError`] if any layer metadata or Arrow payload is invalid.
274///
275/// # Performance
276///
277/// This function is `O(l + total values + total name bytes)`.
278pub fn encode_hyper_property_snapshot<W, Id, VertexIndex, RelationIndex, IncidenceIndex>(
279    layers: HyperPropertyLayers<'_, Id, VertexIndex, RelationIndex, IncidenceIndex>,
280) -> Result<EncodedPropertySnapshot, PropertyError>
281where
282    W: PropertySnapshotMetaWord,
283    Id: Copy + Into<u64> + TryInto<W>,
284    VertexIndex: PropertyIndex,
285    RelationIndex: PropertyIndex,
286    IncidenceIndex: PropertyIndex,
287{
288    let mut encoder = PropertySnapshotEncoder::<W>::with_capacity(
289        layers
290            .element
291            .len()
292            .saturating_add(layers.relation.len())
293            .saturating_add(layers.incidence.len()),
294    );
295    for layer in layers.element {
296        encoder.append::<Id, VertexIndex>(layer)?;
297    }
298    for layer in layers.relation {
299        encoder.append::<Id, RelationIndex>(layer)?;
300    }
301    for layer in layers.incidence {
302        encoder.append::<Id, IncidenceIndex>(layer)?;
303    }
304    encoder.finish()
305}
306
307/// Mutable accumulator for one in-progress property snapshot encoding pass.
308///
309/// Owns the descriptor record table, string table, and data payload between
310/// calls to [`PropertySnapshotEncoder::append`], and finalizes them into an
311/// [`EncodedPropertySnapshot`] via [`PropertySnapshotEncoder::finish`].
312///
313/// # Performance
314///
315/// Construction is `O(1)`. `append` is `O(layer values + layer name length)`.
316/// `finish` is `O(record bytes + string bytes)`.
317struct PropertySnapshotEncoder<W>
318where
319    W: PropertySnapshotMetaWord,
320{
321    /// Concatenated Arrow IPC value/default payload bytes referenced by records.
322    data: Vec<u8>,
323    /// Concatenated layer name bytes referenced by descriptor records.
324    strings: Vec<u8>,
325    /// In-order descriptor records emitted during the encoding pass.
326    records: Vec<PropertySnapshotRecord<W>>,
327    /// Layer names seen so far, scoped by ID family, used to reject duplicates.
328    names: BTreeSet<(IdFamily, LayerName)>,
329    /// Layer IDs (as `u64`) seen so far, used to reject duplicates.
330    ids: BTreeSet<u64>,
331}
332
333impl<W> PropertySnapshotEncoder<W>
334where
335    W: PropertySnapshotMetaWord,
336{
337    /// Constructs an encoder with descriptor capacity hint `capacity`.
338    fn with_capacity(capacity: usize) -> Self {
339        Self {
340            data: Vec::new(),
341            strings: Vec::new(),
342            records: Vec::with_capacity(capacity),
343            names: BTreeSet::new(),
344            ids: BTreeSet::new(),
345        }
346    }
347
348    /// Encodes `layer` into a descriptor record and appends its Arrow payloads.
349    fn append<Id, I>(&mut self, layer: &PropertyLayer<Id, I>) -> Result<(), PropertyError>
350    where
351        Id: Copy + Into<u64> + TryInto<W>,
352        I: PropertyIndex,
353    {
354        let descriptor = layer.descriptor();
355        if !self
356            .names
357            .insert((descriptor.id_family, descriptor.name.clone()))
358        {
359            return Err(PropertyError::DuplicateName {
360                id_family: descriptor.id_family,
361                name: descriptor.name.clone(),
362            });
363        }
364        let diagnostic_layer_id = descriptor.layer_id.0.into();
365        if !self.ids.insert(diagnostic_layer_id) {
366            return Err(PropertyError::DuplicateLayerId {
367                layer_id: diagnostic_layer_id,
368            });
369        }
370        let name_offset = append_string(&mut self.strings, descriptor.name.as_str());
371        let value_data_offset = self.data.len();
372        let layer_data = encode_layer_value_ipc(layer)?;
373        let value_data_len = layer_data.len();
374        self.data.extend_from_slice(&layer_data);
375        let (default_data_offset, default_data_len) =
376            encode_layer_default_ipc(layer)?.map_or((0, 0), |default_data| {
377                let offset = self.data.len();
378                let len = default_data.len();
379                self.data.extend_from_slice(&default_data);
380                (offset, len)
381            });
382        let layer_id = descriptor.layer_id.0.try_into().map_err(|_error| {
383            PropertyError::SnapshotDescriptorMismatch {
384                reason: "layer ID does not fit selected metadata width",
385            }
386        })?;
387        self.records.push(PropertySnapshotRecord::<W> {
388            layer_id: layer_id.to_le_word(),
389            name_offset: le_word::<W>(name_offset)?,
390            name_len: le_word::<W>(descriptor.name.as_str().len())?,
391            id_family: le_word::<W>(id_family_tag(descriptor.id_family) as usize)?,
392            role: le_word::<W>(layer_role_tag(descriptor.role) as usize)?,
393            storage: le_word::<W>(storage_tag(descriptor.storage) as usize)?,
394            missing_policy: le_word::<W>(missing_policy_tag(descriptor.storage) as usize)?,
395            logical_len: le_word::<W>(layer.len())?,
396            value_count: le_word::<W>(layer_value_count(layer))?,
397            value_data_offset: le_word::<W>(value_data_offset)?,
398            value_data_len: le_word::<W>(value_data_len)?,
399            default_data_offset: le_word::<W>(default_data_offset)?,
400            default_data_len: le_word::<W>(default_data_len)?,
401            reserved: le_word::<W>(0)?,
402        });
403        Ok(())
404    }
405
406    /// Finalizes descriptor/data bytes after all records have been appended.
407    fn finish(self) -> Result<EncodedPropertySnapshot, PropertyError> {
408        let record_bytes = self
409            .records
410            .len()
411            .checked_mul(core::mem::size_of::<PropertySnapshotRecord<W>>())
412            .ok_or(PropertyError::SnapshotDescriptorMismatch {
413                reason: "record byte length overflow",
414            })?;
415        let header = PropertySnapshotHeader {
416            record_count: U64::new(usize_to_u64(self.records.len())?),
417            record_bytes: U64::new(usize_to_u64(record_bytes)?),
418        };
419        let mut descriptor_bytes = Vec::with_capacity(
420            core::mem::size_of::<PropertySnapshotHeader>() + record_bytes + self.strings.len(),
421        );
422        descriptor_bytes.extend_from_slice(header.as_bytes());
423        descriptor_bytes.extend_from_slice(self.records.as_bytes());
424        descriptor_bytes.extend_from_slice(&self.strings);
425        Ok(EncodedPropertySnapshot {
426            descriptors: descriptor_bytes,
427            data: self.data,
428        })
429    }
430}
431
432/// Validates property descriptor/data sections in a snapshot.
433///
434/// # Errors
435///
436/// Returns [`PropertyError`] if required sections are missing, have unsupported
437/// versions, or contain inconsistent descriptor/data records.
438///
439/// # Performance
440///
441/// This function is `O(s + l log l + total name bytes)` for snapshot section
442/// count `s` and property layer count `l`.
443pub fn validate_property_snapshot<W>(
444    snapshot: &Snapshot<'_>,
445) -> Result<PropertySnapshotSummary, PropertyError>
446where
447    W: PropertySnapshotMetaWord,
448{
449    let descriptor_section = snapshot.section(W::PROPERTY_DESCRIPTORS_KIND).ok_or(
450        PropertyError::MissingSnapshotSection {
451            kind: W::PROPERTY_DESCRIPTORS_KIND,
452        },
453    )?;
454    let data_section =
455        snapshot
456            .section(W::PROPERTY_DATA_KIND)
457            .ok_or(PropertyError::MissingSnapshotSection {
458                kind: W::PROPERTY_DATA_KIND,
459            })?;
460    if descriptor_section.version() != SNAPSHOT_PROPERTY_VERSION {
461        return Err(PropertyError::SnapshotSectionVersion {
462            kind: W::PROPERTY_DESCRIPTORS_KIND,
463            version: descriptor_section.version(),
464        });
465    }
466    if data_section.version() != SNAPSHOT_PROPERTY_VERSION {
467        return Err(PropertyError::SnapshotSectionVersion {
468            kind: W::PROPERTY_DATA_KIND,
469            version: data_section.version(),
470        });
471    }
472    validate_property_sections::<W>(descriptor_section.bytes(), data_section.bytes())
473}
474
475/// Validates raw property descriptor and data section payloads.
476///
477/// # Errors
478///
479/// Returns [`PropertyError`] if the encoded payloads are structurally invalid.
480///
481/// # Performance
482///
483/// This function is `O(l log l + total name bytes + Arrow IPC validation)`.
484pub fn validate_property_sections<W>(
485    descriptor_bytes: &[u8],
486    data_bytes: &[u8],
487) -> Result<PropertySnapshotSummary, PropertyError>
488where
489    W: PropertySnapshotMetaWord,
490{
491    let header_len = core::mem::size_of::<PropertySnapshotHeader>();
492    if descriptor_bytes.len() < header_len {
493        return Err(PropertyError::SnapshotDataLength {
494            reason: "descriptor header is truncated",
495        });
496    }
497    let record_count = read_u64_le(&descriptor_bytes[0..8])?;
498    let record_bytes = read_u64_le(&descriptor_bytes[8..16])?;
499    let record_count_usize = u64_to_usize(record_count)?;
500    let record_bytes_usize = u64_to_usize(record_bytes)?;
501    let expected_record_bytes = record_count_usize
502        .checked_mul(core::mem::size_of::<PropertySnapshotRecord<W>>())
503        .ok_or(PropertyError::SnapshotDescriptorMismatch {
504            reason: "record byte length overflow",
505        })?;
506    if record_bytes_usize != expected_record_bytes {
507        return Err(PropertyError::SnapshotDescriptorMismatch {
508            reason: "record byte length does not match record count",
509        });
510    }
511    let record_start = header_len;
512    let string_start = record_start.checked_add(record_bytes_usize).ok_or(
513        PropertyError::SnapshotDescriptorMismatch {
514            reason: "descriptor section length overflow",
515        },
516    )?;
517    if descriptor_bytes.len() < string_start {
518        return Err(PropertyError::SnapshotDataLength {
519            reason: "descriptor records are truncated",
520        });
521    }
522    let record_bytes_slice = &descriptor_bytes[record_start..string_start];
523    let string_bytes = &descriptor_bytes[string_start..];
524    let mut names: BTreeSet<(IdFamily, &str)> = BTreeSet::new();
525    let mut ids: BTreeSet<u64> = BTreeSet::new();
526    let mut ranges = Vec::with_capacity(record_count_usize);
527    let mut total_logical_values = 0_usize;
528    for position in 0..record_count_usize {
529        let start = position * core::mem::size_of::<PropertySnapshotRecord<W>>();
530        let record = parse_property_record::<W>(&record_bytes_slice[start..])?;
531        let id_family = id_family_from_tag(le_word_to_u32::<W>(record.id_family)?)?;
532        let _role = layer_role_from_tag(le_word_to_u32::<W>(record.role)?)?;
533        let storage = storage_from_tags(
534            le_word_to_u32::<W>(record.storage)?,
535            le_word_to_u32::<W>(record.missing_policy)?,
536        )?;
537        let name = read_snapshot_str(
538            string_bytes,
539            le_word_to_usize::<W>(record.name_offset)?,
540            le_word_to_usize::<W>(record.name_len)?,
541        )?;
542        let layer_id = le_word_to_u64::<W>(record.layer_id);
543        if !ids.insert(layer_id) {
544            return Err(PropertyError::DuplicateLayerId { layer_id });
545        }
546        if !names.insert((id_family, name)) {
547            return Err(PropertyError::DuplicateName {
548                id_family,
549                name: LayerName::try_new(name)?,
550            });
551        }
552        let layer_ranges = validate_property_record_data::<W>(&record, storage, data_bytes)?;
553        ranges.extend(layer_ranges);
554        total_logical_values = total_logical_values
555            .checked_add(le_word_to_usize::<W>(record.logical_len)?)
556            .ok_or(PropertyError::SnapshotDescriptorMismatch {
557                reason: "logical value total overflow",
558            })?;
559    }
560    validate_data_coverage(&mut ranges, data_bytes.len())?;
561    Ok(PropertySnapshotSummary {
562        layer_count: record_count_usize,
563        total_logical_values,
564    })
565}
566
567impl DecodedPropertyLayer {
568    /// Decodes every property layer carried by a snapshot.
569    ///
570    /// Mirrors [`BcsrSnapshotHypergraph::from_snapshot`] on the topology side:
571    /// a single constructor on the decoded type that takes the wire snapshot
572    /// and returns the materialized form. Each layer is returned in descriptor
573    /// order with its Arrow payload restored via
574    /// [`arrow_ipc::reader::StreamReader`].
575    ///
576    /// Calls [`validate_property_snapshot`] before decoding so the diagnostics
577    /// match the validator exactly.
578    ///
579    /// [`BcsrSnapshotHypergraph::from_snapshot`]: https://docs.rs/oxgraph-hyper-bcsr/latest/oxgraph_hyper_bcsr/struct.BcsrSnapshotHypergraph.html#method.from_snapshot
580    ///
581    /// # Errors
582    ///
583    /// Returns [`PropertyError`] if required sections are missing, have an
584    /// unsupported version, or contain inconsistent descriptor/data records.
585    ///
586    /// # Performance
587    ///
588    /// `O(s + l + total Arrow IPC payload bytes)` for snapshot section count
589    /// `s` and property layer count `l`.
590    pub fn decode_all<W>(snapshot: &Snapshot<'_>) -> Result<Vec<Self>, PropertyError>
591    where
592        W: PropertySnapshotMetaWord,
593    {
594        let descriptor_section = snapshot.section(W::PROPERTY_DESCRIPTORS_KIND).ok_or(
595            PropertyError::MissingSnapshotSection {
596                kind: W::PROPERTY_DESCRIPTORS_KIND,
597            },
598        )?;
599        let data_section = snapshot.section(W::PROPERTY_DATA_KIND).ok_or(
600            PropertyError::MissingSnapshotSection {
601                kind: W::PROPERTY_DATA_KIND,
602            },
603        )?;
604        if descriptor_section.version() != SNAPSHOT_PROPERTY_VERSION {
605            return Err(PropertyError::SnapshotSectionVersion {
606                kind: W::PROPERTY_DESCRIPTORS_KIND,
607                version: descriptor_section.version(),
608            });
609        }
610        if data_section.version() != SNAPSHOT_PROPERTY_VERSION {
611            return Err(PropertyError::SnapshotSectionVersion {
612                kind: W::PROPERTY_DATA_KIND,
613                version: data_section.version(),
614            });
615        }
616        Self::decode_sections::<W>(descriptor_section.bytes(), data_section.bytes())
617    }
618
619    /// Decodes property layers from raw descriptor and data section payloads.
620    ///
621    /// Lower-level entry point for callers that already have the two section
622    /// byte slices in hand (e.g. when reassembling property data from a custom
623    /// container). Re-runs [`validate_property_sections`] so structural errors
624    /// surface with identical diagnostics.
625    ///
626    /// # Errors
627    ///
628    /// Returns [`PropertyError`] if the encoded payloads are structurally
629    /// invalid.
630    ///
631    /// # Performance
632    ///
633    /// `O(l + total Arrow IPC payload bytes + total name bytes)` for layer
634    /// count `l`.
635    pub fn decode_sections<W>(
636        descriptor_bytes: &[u8],
637        data_bytes: &[u8],
638    ) -> Result<Vec<Self>, PropertyError>
639    where
640        W: PropertySnapshotMetaWord,
641    {
642        let _summary = validate_property_sections::<W>(descriptor_bytes, data_bytes)?;
643        let header_len = core::mem::size_of::<PropertySnapshotHeader>();
644        let record_count_usize = u64_to_usize(read_u64_le(&descriptor_bytes[0..8])?)?;
645        let record_bytes_usize = u64_to_usize(read_u64_le(&descriptor_bytes[8..16])?)?;
646        let record_start = header_len;
647        let string_start = record_start.checked_add(record_bytes_usize).ok_or(
648            PropertyError::SnapshotDescriptorMismatch {
649                reason: "descriptor section length overflow",
650            },
651        )?;
652        let record_bytes_slice = &descriptor_bytes[record_start..string_start];
653        let string_bytes = &descriptor_bytes[string_start..];
654        let record_size = core::mem::size_of::<PropertySnapshotRecord<W>>();
655        let mut out = Vec::with_capacity(record_count_usize);
656        for position in 0..record_count_usize {
657            let start = position.checked_mul(record_size).ok_or(
658                PropertyError::SnapshotDescriptorMismatch {
659                    reason: "record offset overflow",
660                },
661            )?;
662            let record = parse_property_record::<W>(&record_bytes_slice[start..])?;
663            let layer_id = le_word_to_u64::<W>(record.layer_id);
664            let id_family = id_family_from_tag(le_word_to_u32::<W>(record.id_family)?)?;
665            let role = layer_role_from_tag(le_word_to_u32::<W>(record.role)?)?;
666            let storage = storage_from_tags(
667                le_word_to_u32::<W>(record.storage)?,
668                le_word_to_u32::<W>(record.missing_policy)?,
669            )?;
670            let name = read_snapshot_str(
671                string_bytes,
672                le_word_to_usize::<W>(record.name_offset)?,
673                le_word_to_usize::<W>(record.name_len)?,
674            )?
675            .to_string();
676            let logical_len = le_word_to_usize::<W>(record.logical_len)?;
677            let value_offset = le_word_to_usize::<W>(record.value_data_offset)?;
678            let value_len = le_word_to_usize::<W>(record.value_data_len)?;
679            let value_end = checked_end(value_offset, value_len, data_bytes.len())?;
680            let value_batch = read_one_ipc_batch(&data_bytes[value_offset..value_end])?;
681            let default_offset = le_word_to_usize::<W>(record.default_data_offset)?;
682            let default_len = le_word_to_usize::<W>(record.default_data_len)?;
683            let default_batch = if default_len == 0 {
684                None
685            } else {
686                let default_end = checked_end(default_offset, default_len, data_bytes.len())?;
687                Some(read_one_ipc_batch(
688                    &data_bytes[default_offset..default_end],
689                )?)
690            };
691            let data = match storage {
692                StorageMode::Dense => DecodedPropertyData::Dense {
693                    values: Arc::clone(value_batch.column(0)),
694                },
695                StorageMode::Sparse { .. } => DecodedPropertyData::Sparse {
696                    indices: Arc::clone(value_batch.column(0)),
697                    values: Arc::clone(value_batch.column(1)),
698                    default: default_batch
699                        .as_ref()
700                        .map(|batch| Arc::clone(batch.column(0))),
701                },
702            };
703            out.push(Self {
704                layer_id,
705                name,
706                id_family,
707                role,
708                storage,
709                logical_len,
710                data,
711            });
712        }
713        Ok(out)
714    }
715}
716
717/// Validates identity records and required map sections.
718///
719/// # Performance
720///
721/// This function is `O(f)` for `f` records.
722fn validate_identity_records<W>(
723    snapshot: &Snapshot<'_>,
724    records: &[IdentityModeRecord<W>],
725) -> Result<Vec<IdentityModeSummary>, PropertyError>
726where
727    W: PropertySnapshotMetaWord,
728{
729    let mut seen = BTreeSet::new();
730    let mut summaries = Vec::with_capacity(records.len());
731    for record in records {
732        let family = record.id_family()?;
733        if !seen.insert(family) {
734            return Err(PropertyError::SnapshotDescriptorMismatch {
735                reason: "duplicate identity family mode record",
736            });
737        }
738        let mode = record.mode()?;
739        let local_len = record.local_len();
740        match mode {
741            IdentityMapMode::LocalEqualsCanonical => {}
742            IdentityMapMode::ExplicitMap => {
743                validate_identity_map_section::<W>(snapshot, family, local_len)?;
744            }
745        }
746        summaries.push(IdentityModeSummary {
747            id_family: family,
748            mode,
749            local_len,
750        });
751    }
752    Ok(summaries)
753}
754
755/// Validates one explicit identity-map section.
756///
757/// # Performance
758///
759/// This function is `O(s)` for snapshot section count `s`.
760fn validate_identity_map_section<W>(
761    snapshot: &Snapshot<'_>,
762    id_family: IdFamily,
763    required: usize,
764) -> Result<(), PropertyError>
765where
766    W: PropertySnapshotMetaWord,
767{
768    let kind = identity_map_kind::<W>(id_family);
769    let section = snapshot
770        .section(kind)
771        .ok_or(PropertyError::MissingIdentityMap { id_family })?;
772    if section.version() != SNAPSHOT_PROPERTY_VERSION {
773        return Err(PropertyError::SnapshotSectionVersion {
774            kind,
775            version: section.version(),
776        });
777    }
778    let map: &[W::LittleEndianWord] = section
779        .try_as_slice()
780        .map_err(|error| PropertyError::SnapshotSectionView { kind, error })?;
781    if map.len() != required {
782        return Err(PropertyError::IdentityMapLength {
783            id_family,
784            required,
785            actual: map.len(),
786        });
787    }
788    Ok(())
789}
790
791/// Returns the explicit identity-map section kind for a family.
792///
793/// # Performance
794///
795/// This function is `O(1)`.
796const fn identity_map_kind<W>(id_family: IdFamily) -> u32
797where
798    W: PropertySnapshotMetaWord,
799{
800    match id_family {
801        IdFamily::Element => W::ELEMENT_IDENTITY_MAP_KIND,
802        IdFamily::Relation => W::RELATION_IDENTITY_MAP_KIND,
803        IdFamily::Incidence => W::INCIDENCE_IDENTITY_MAP_KIND,
804    }
805}
806
807/// Appends a string to a snapshot string table.
808///
809/// # Performance
810///
811/// This function is `O(value.len())`.
812fn append_string(strings: &mut Vec<u8>, value: &str) -> usize {
813    let offset = strings.len();
814    strings.extend_from_slice(value.as_bytes());
815    offset
816}
817
818/// Returns the number of value slots encoded for a layer.
819///
820/// # Performance
821///
822/// This function is `O(1)`.
823fn layer_value_count<Id, I>(layer: &PropertyLayer<Id, I>) -> usize
824where
825    I: PropertyIndex,
826{
827    match layer.data() {
828        PropertyLayerData::Dense { values } => values.len(),
829        PropertyLayerData::Sparse { indices, .. } => indices.len(),
830    }
831}
832
833/// Encodes one property layer's value stream as Arrow IPC.
834///
835/// # Performance
836///
837/// This function is `O(layer payload bytes)`.
838fn encode_layer_value_ipc<Id, I>(layer: &PropertyLayer<Id, I>) -> Result<Vec<u8>, PropertyError>
839where
840    I: PropertyIndex,
841{
842    let (schema, columns) = match layer.data() {
843        PropertyLayerData::Dense { values } => {
844            let schema = Arc::new(Schema::new(vec![layer.descriptor().arrow_field.clone()]));
845            (schema, vec![Arc::clone(values)])
846        }
847        PropertyLayerData::Sparse {
848            indices,
849            values,
850            default: _,
851        } => {
852            let fields = vec![
853                Field::new("index", index_data_type::<I>(), false),
854                layer.descriptor().arrow_field.clone(),
855            ];
856            let columns: Vec<ArrayRef> = vec![Arc::clone(indices) as ArrayRef, Arc::clone(values)];
857            (Arc::new(Schema::new(fields)), columns)
858        }
859    };
860    write_one_ipc_batch(&schema, columns)
861}
862
863/// Encodes one sparse-default layer's default stream as Arrow IPC.
864///
865/// # Performance
866///
867/// This function is `O(default payload bytes)`.
868fn encode_layer_default_ipc<Id, I>(
869    layer: &PropertyLayer<Id, I>,
870) -> Result<Option<Vec<u8>>, PropertyError>
871where
872    I: PropertyIndex,
873{
874    let PropertyLayerData::Sparse {
875        default: Some(default),
876        ..
877    } = layer.data()
878    else {
879        return Ok(None);
880    };
881    let schema = Arc::new(Schema::new(vec![layer.descriptor().arrow_field.clone()]));
882    write_one_ipc_batch(&schema, vec![Arc::clone(default)]).map(Some)
883}
884
885/// Writes one Arrow IPC stream with a single record batch.
886///
887/// # Performance
888///
889/// This function is `O(payload bytes)`.
890fn write_one_ipc_batch(
891    schema: &Arc<Schema>,
892    columns: Vec<ArrayRef>,
893) -> Result<Vec<u8>, PropertyError> {
894    let batch = RecordBatch::try_new(Arc::clone(schema), columns).map_err(map_arrow_error)?;
895    let mut out = Vec::new();
896    {
897        let mut writer =
898            StreamWriter::try_new(&mut out, schema.as_ref()).map_err(map_arrow_error)?;
899        writer.write(&batch).map_err(map_arrow_error)?;
900        writer.finish().map_err(map_arrow_error)?;
901    }
902    Ok(out)
903}
904
905/// Parses one property snapshot record from the front of `bytes`.
906///
907/// # Performance
908///
909/// This function is `O(1)`.
910fn parse_property_record<W>(bytes: &[u8]) -> Result<PropertySnapshotRecord<W>, PropertyError>
911where
912    W: PropertySnapshotMetaWord,
913{
914    let need = core::mem::size_of::<PropertySnapshotRecord<W>>();
915    if bytes.len() < need {
916        return Err(PropertyError::SnapshotDataLength {
917            reason: "property record is truncated",
918        });
919    }
920    PropertySnapshotRecord::<W>::read_from_bytes(&bytes[..need]).map_err(|_error| {
921        PropertyError::SnapshotDataLength {
922            reason: "property record is truncated",
923        }
924    })
925}
926
927/// Validates a property data range declared by one record.
928///
929/// # Performance
930///
931/// This function is `O(Arrow IPC payload validation)`.
932fn validate_property_record_data<W>(
933    record: &PropertySnapshotRecord<W>,
934    storage: StorageMode,
935    data: &[u8],
936) -> Result<Vec<core::ops::Range<usize>>, PropertyError>
937where
938    W: PropertySnapshotMetaWord,
939{
940    if le_word_to_u64::<W>(record.reserved) != 0 {
941        return Err(PropertyError::SnapshotDescriptorMismatch {
942            reason: "property descriptor reserved word must be zero",
943        });
944    }
945    let offset = le_word_to_usize::<W>(record.value_data_offset)?;
946    let len = le_word_to_usize::<W>(record.value_data_len)?;
947    let end = checked_end(offset, len, data.len())?;
948    let value_batch = read_one_ipc_batch(&data[offset..end])?;
949    let default_offset = le_word_to_usize::<W>(record.default_data_offset)?;
950    let default_len = le_word_to_usize::<W>(record.default_data_len)?;
951    let default_batch = if default_len == 0 {
952        None
953    } else {
954        let default_end = checked_end(default_offset, default_len, data.len())?;
955        Some(read_one_ipc_batch(&data[default_offset..default_end])?)
956    };
957    match storage {
958        StorageMode::Dense => {
959            if default_len != 0 {
960                return Err(PropertyError::SnapshotDescriptorMismatch {
961                    reason: "dense property must not declare a default stream",
962                });
963            }
964            validate_dense_batch::<W>(record, &value_batch)?;
965        }
966        StorageMode::Sparse { missing } => {
967            validate_sparse_batch::<W>(record, missing, &value_batch, default_batch.as_ref())?;
968        }
969    }
970    let mut ranges = Vec::with_capacity(2);
971    ranges.push(offset..end);
972    if default_len != 0 {
973        ranges.push(default_offset..default_offset + default_len);
974    }
975    Ok(ranges)
976}
977
978/// Reads exactly one Arrow IPC record batch.
979///
980/// # Performance
981///
982/// This function is `O(bytes.len())`.
983fn read_one_ipc_batch(bytes: &[u8]) -> Result<RecordBatch, PropertyError> {
984    let reader = StreamReader::try_new(Cursor::new(bytes), None).map_err(map_arrow_error)?;
985    let mut batches = Vec::new();
986    for batch in reader {
987        batches.push(batch.map_err(map_arrow_error)?);
988        if batches.len() > 1 {
989            return Err(PropertyError::SnapshotDescriptorMismatch {
990                reason: "property IPC stream contains more than one batch",
991            });
992        }
993    }
994    let mut iter = batches.into_iter();
995    iter.next()
996        .ok_or(PropertyError::SnapshotDescriptorMismatch {
997            reason: "property IPC stream contains no batches",
998        })
999}
1000
1001/// Validates one dense Arrow IPC batch.
1002///
1003/// # Performance
1004///
1005/// This function is `O(1)`.
1006fn validate_dense_batch<W>(
1007    record: &PropertySnapshotRecord<W>,
1008    batch: &RecordBatch,
1009) -> Result<(), PropertyError>
1010where
1011    W: PropertySnapshotMetaWord,
1012{
1013    if batch.num_columns() != 1 {
1014        return Err(PropertyError::SnapshotDescriptorMismatch {
1015            reason: "dense property batch must contain one column",
1016        });
1017    }
1018    let values = batch.column(0);
1019    if values.len() != le_word_to_usize::<W>(record.logical_len)?
1020        || values.len() != le_word_to_usize::<W>(record.value_count)?
1021    {
1022        return Err(PropertyError::SnapshotDataLength {
1023            reason: "dense property Arrow length does not match descriptor",
1024        });
1025    }
1026    validate_value_column(values.as_ref())
1027}
1028
1029/// Validates one sparse Arrow IPC batch.
1030///
1031/// # Performance
1032///
1033/// This function is `O(value_count)` for sparse index validation.
1034fn validate_sparse_batch<W>(
1035    record: &PropertySnapshotRecord<W>,
1036    missing: MissingPolicy,
1037    value_batch: &RecordBatch,
1038    default_batch: Option<&RecordBatch>,
1039) -> Result<(), PropertyError>
1040where
1041    W: PropertySnapshotMetaWord,
1042{
1043    if value_batch.num_columns() != 2 {
1044        return Err(PropertyError::SnapshotDescriptorMismatch {
1045            reason: "sparse property value stream must contain index and value columns",
1046        });
1047    }
1048    let indexes = value_batch.column(0);
1049    let values = value_batch.column(1);
1050    let value_count = le_word_to_usize::<W>(record.value_count)?;
1051    if indexes.len() != value_count || values.len() != value_count {
1052        return Err(PropertyError::SnapshotDataLength {
1053            reason: "sparse property Arrow value count does not match descriptor",
1054        });
1055    }
1056    validate_value_column(values.as_ref())?;
1057    validate_sparse_index_column(indexes.as_ref(), le_word_to_usize::<W>(record.logical_len)?)?;
1058    match (missing, default_batch) {
1059        (MissingPolicy::Null, None) => {}
1060        (MissingPolicy::Null, Some(_)) => {
1061            return Err(PropertyError::SnapshotDescriptorMismatch {
1062                reason: "sparse-null property must not declare a default stream",
1063            });
1064        }
1065        (MissingPolicy::Default, Some(default_batch)) => {
1066            if default_batch.num_columns() != 1 {
1067                return Err(PropertyError::SnapshotDescriptorMismatch {
1068                    reason: "sparse default stream must contain one column",
1069                });
1070            }
1071            let default = default_batch.column(0);
1072            if default.len() != 1 || default.data_type() != values.data_type() || default.is_null(0)
1073            {
1074                return Err(PropertyError::SnapshotDescriptorMismatch {
1075                    reason: "sparse property default column is not a non-null matching scalar",
1076                });
1077            }
1078        }
1079        (MissingPolicy::Default, None) => {
1080            return Err(PropertyError::SnapshotDescriptorMismatch {
1081                reason: "sparse-default property is missing its default stream",
1082            });
1083        }
1084    }
1085    Ok(())
1086}
1087
1088/// Validates an Arrow value column against snapshot metadata.
1089///
1090/// # Performance
1091///
1092/// This function is `O(1)`.
1093fn validate_value_column(values: &dyn Array) -> Result<(), PropertyError> {
1094    if values.null_count() > values.len() {
1095        return Err(PropertyError::SnapshotDescriptorMismatch {
1096            reason: "Arrow value column has invalid null accounting",
1097        });
1098    }
1099    Ok(())
1100}
1101
1102/// Validates descriptor ranges cover data exactly without overlap or trailing bytes.
1103///
1104/// # Performance
1105///
1106/// This function is `O(n log n)` for `n` ranges.
1107fn validate_data_coverage(
1108    ranges: &mut [core::ops::Range<usize>],
1109    data_len: usize,
1110) -> Result<(), PropertyError> {
1111    ranges.sort_by_key(|range| range.start);
1112    let mut cursor = 0_usize;
1113    for range in ranges {
1114        if range.start != cursor {
1115            return Err(PropertyError::SnapshotDescriptorMismatch {
1116                reason: "property data ranges leave a gap or overlap",
1117            });
1118        }
1119        cursor = range.end;
1120    }
1121    if cursor != data_len {
1122        return Err(PropertyError::SnapshotDescriptorMismatch {
1123            reason: "property data section has trailing bytes",
1124        });
1125    }
1126    Ok(())
1127}
1128
1129/// Reads a UTF-8 string from a snapshot string table.
1130///
1131/// # Performance
1132///
1133/// This function is `O(len)` for UTF-8 validation.
1134fn read_snapshot_str(bytes: &[u8], offset: usize, len: usize) -> Result<&str, PropertyError> {
1135    let end = checked_end(offset, len, bytes.len())?;
1136    core::str::from_utf8(&bytes[offset..end])
1137        .map_err(|_error| PropertyError::SnapshotInvalidUtf8 { offset })
1138}
1139
1140/// Checks a byte range against an available length.
1141///
1142/// # Performance
1143///
1144/// This function is `O(1)`.
1145fn checked_end(offset: usize, len: usize, available: usize) -> Result<usize, PropertyError> {
1146    let end = offset
1147        .checked_add(len)
1148        .ok_or(PropertyError::SnapshotRangeOutOfBounds {
1149            offset,
1150            len,
1151            available,
1152        })?;
1153    if end > available {
1154        Err(PropertyError::SnapshotRangeOutOfBounds {
1155            offset,
1156            len,
1157            available,
1158        })
1159    } else {
1160        Ok(end)
1161    }
1162}
1163
1164/// Reads a little-endian `u64` from an eight-byte slice.
1165///
1166/// # Performance
1167///
1168/// This function is `O(1)`.
1169fn read_u64_le(bytes: &[u8]) -> Result<u64, PropertyError> {
1170    if bytes.len() < core::mem::size_of::<u64>() {
1171        return Err(PropertyError::SnapshotDataLength {
1172            reason: "u64 field is truncated",
1173        });
1174    }
1175    let mut array = [0_u8; 8];
1176    array.copy_from_slice(&bytes[..8]);
1177    Ok(u64::from_le_bytes(array))
1178}
1179
1180/// Converts `u64` to `usize` for snapshot lengths.
1181///
1182/// # Performance
1183///
1184/// This function is `O(1)`.
1185fn u64_to_usize(value: u64) -> Result<usize, PropertyError> {
1186    usize::try_from(value).map_err(|_error| PropertyError::SnapshotDescriptorMismatch {
1187        reason: "snapshot length does not fit usize",
1188    })
1189}
1190
1191/// Converts `usize` to `u64` for snapshot lengths.
1192///
1193/// # Performance
1194///
1195/// This function is `O(1)`.
1196fn usize_to_u64(value: usize) -> Result<u64, PropertyError> {
1197    u64::try_from(value).map_err(|_error| PropertyError::LengthDoesNotFitU64 { value })
1198}
1199
1200/// Validates a decoded sparse index column against the layer's logical length.
1201///
1202/// The wire descriptor records the metadata word width `W`, not the sparse
1203/// index width `I`; `I` is only recoverable from the decoded Arrow column's
1204/// [`DataType`]. This selects the typed [`PropertyIndex`] width from that
1205/// runtime data type, downcasts once, and forwards to the typed
1206/// [`validate_sparse_indices`] so the ordering/bounds contract has a single
1207/// definition shared with [`PropertyLayer::try_new_sparse`].
1208///
1209/// # Performance
1210///
1211/// This function is `O(indices.len())`.
1212fn validate_sparse_index_column(indices: &dyn Array, len: usize) -> Result<(), PropertyError> {
1213    /// Downcasts `indices` to the `I`-typed primitive column and validates it.
1214    fn typed<I>(indices: &dyn Array, len: usize) -> Result<(), PropertyError>
1215    where
1216        I: PropertyIndex,
1217    {
1218        let typed = indices
1219            .as_any()
1220            .downcast_ref::<PrimitiveArray<I::ArrowType>>()
1221            .ok_or(PropertyError::SnapshotDescriptorMismatch {
1222                reason: "sparse property index column does not match its declared Arrow width",
1223            })?;
1224        validate_sparse_indices::<I>(typed, len)
1225    }
1226
1227    match indices.data_type() {
1228        DataType::UInt16 => typed::<u16>(indices, len),
1229        DataType::UInt32 => typed::<u32>(indices, len),
1230        DataType::UInt64 => typed::<u64>(indices, len),
1231        _ => Err(PropertyError::SnapshotDescriptorMismatch {
1232            reason: "sparse property index column is not UInt16, UInt32, or UInt64",
1233        }),
1234    }
1235}
1236
1237/// Returns the Arrow data type for a property index width.
1238///
1239/// # Performance
1240///
1241/// This function is `O(1)`.
1242const fn index_data_type<I>() -> DataType
1243where
1244    I: PropertyIndex,
1245{
1246    if core::mem::size_of::<I>() == core::mem::size_of::<u16>() {
1247        DataType::UInt16
1248    } else if core::mem::size_of::<I>() == core::mem::size_of::<u32>() {
1249        DataType::UInt32
1250    } else {
1251        DataType::UInt64
1252    }
1253}