Skip to main content

uni_store/storage/
property_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Helper for building Arrow property columns from row-based data.
5
6use crate::storage::arrow_convert::PropertyExtractor;
7use anyhow::{Result, anyhow};
8use arrow_array::ArrayRef;
9use arrow_array::builder::LargeBinaryBuilder;
10use std::collections::HashMap;
11use std::sync::Arc;
12use uni_common::{Properties, Schema, Value};
13
14/// Builds property columns for a specific label/edge_type using the Schema.
15pub struct PropertyColumnBuilder<'a> {
16    schema: &'a Schema,
17    label: &'a str,
18    len: usize,
19    deleted: Option<&'a [bool]>,
20}
21
22impl<'a> PropertyColumnBuilder<'a> {
23    pub fn new(schema: &'a Schema, label: &'a str, len: usize) -> Self {
24        Self {
25            schema,
26            label,
27            len,
28            deleted: None,
29        }
30    }
31
32    pub fn with_deleted(mut self, deleted: &'a [bool]) -> Self {
33        self.deleted = Some(deleted);
34        self
35    }
36
37    pub fn build<F>(self, get_row_props: F) -> Result<Vec<ArrayRef>>
38    where
39        F: Fn(usize) -> &'a Properties,
40    {
41        let mut columns = Vec::new();
42
43        if let Some(props) = self.schema.properties.get(self.label) {
44            let mut sorted_props: Vec<_> = props.iter().collect();
45            sorted_props.sort_by_key(|(name, _)| *name);
46
47            let default_deleted = vec![false; self.len];
48            let deleted = self.deleted.unwrap_or(&default_deleted);
49
50            for (name, meta) in sorted_props {
51                let extractor = PropertyExtractor::new(name, &meta.r#type);
52                let column =
53                    extractor.build_column(self.len, deleted, |i| get_row_props(i).get(name))?;
54                columns.push(column);
55            }
56        }
57
58        Ok(columns)
59    }
60}
61
62/// Builds an `overflow_json` column (LargeBinary) for properties not defined in the schema.
63///
64/// Properties present in the schema are stored as typed columns; remaining properties
65/// are serialized into a JSONB binary blob per row. Rows with no overflow properties
66/// produce a null entry.
67///
68/// # Arguments
69/// * `len` - Number of rows
70/// * `label_or_type` - Label (for vertices) or edge type name used to look up schema properties
71/// * `schema` - The database schema
72/// * `get_row_props` - Closure that returns the full property map for a given row index
73/// * `skip_keys` - Additional property keys to exclude (e.g., `"ext_id"` for vertices)
74pub fn build_overflow_json_column<'a, F>(
75    len: usize,
76    label_or_type: &str,
77    schema: &Schema,
78    get_row_props: F,
79    skip_keys: &[&str],
80) -> Result<ArrayRef>
81where
82    F: Fn(usize) -> &'a Properties,
83{
84    let schema_props = schema.properties.get(label_or_type);
85    let mut builder = LargeBinaryBuilder::new();
86
87    for i in 0..len {
88        let props = get_row_props(i);
89        let mut overflow_props = HashMap::new();
90
91        for (key, value) in props.iter() {
92            if skip_keys.contains(&key.as_str()) {
93                continue;
94            }
95            if !schema_props.is_some_and(|sp| sp.contains_key(key)) {
96                overflow_props.insert(key.clone(), value.clone());
97            }
98        }
99
100        if overflow_props.is_empty() {
101            builder.append_null();
102        } else {
103            let json_val = serde_json::to_value(&overflow_props)
104                .map_err(|e| anyhow!("Failed to serialize overflow properties: {}", e))?;
105            let uni_val: Value = json_val.into();
106            let jsonb = uni_common::cypher_value_codec::encode(&uni_val);
107            builder.append_value(&jsonb);
108        }
109    }
110
111    Ok(Arc::new(builder.finish()))
112}