uni_store/storage/
property_builder.rs1use crate::storage::arrow_convert::PropertyExtractor;
7use anyhow::{Result, anyhow};
8use arrow_array::ArrayRef;
9use arrow_array::builder::LargeBinaryBuilder;
10use std::collections::HashMap;
11use std::sync::Arc;
12use uni_common::{Properties, Schema, Value};
13
14pub struct PropertyColumnBuilder<'a> {
16 schema: &'a Schema,
17 label: &'a str,
18 len: usize,
19 deleted: Option<&'a [bool]>,
20}
21
22impl<'a> PropertyColumnBuilder<'a> {
23 pub fn new(schema: &'a Schema, label: &'a str, len: usize) -> Self {
24 Self {
25 schema,
26 label,
27 len,
28 deleted: None,
29 }
30 }
31
32 pub fn with_deleted(mut self, deleted: &'a [bool]) -> Self {
33 self.deleted = Some(deleted);
34 self
35 }
36
37 pub fn build<F>(self, get_row_props: F) -> Result<Vec<ArrayRef>>
38 where
39 F: Fn(usize) -> &'a Properties,
40 {
41 let mut columns = Vec::new();
42
43 if let Some(props) = self.schema.properties.get(self.label) {
44 let mut sorted_props: Vec<_> = props.iter().collect();
45 sorted_props.sort_by_key(|(name, _)| *name);
46
47 let default_deleted = vec![false; self.len];
48 let deleted = self.deleted.unwrap_or(&default_deleted);
49
50 for (name, meta) in sorted_props {
51 let extractor = PropertyExtractor::new(name, &meta.r#type);
52 let column =
53 extractor.build_column(self.len, deleted, |i| get_row_props(i).get(name))?;
54 columns.push(column);
55 }
56 }
57
58 Ok(columns)
59 }
60}
61
62pub fn build_overflow_json_column<'a, F>(
75 len: usize,
76 label_or_type: &str,
77 schema: &Schema,
78 get_row_props: F,
79 skip_keys: &[&str],
80) -> Result<ArrayRef>
81where
82 F: Fn(usize) -> &'a Properties,
83{
84 let schema_props = schema.properties.get(label_or_type);
85 let mut builder = LargeBinaryBuilder::new();
86
87 for i in 0..len {
88 let props = get_row_props(i);
89 let mut overflow_props = HashMap::new();
90
91 for (key, value) in props.iter() {
92 if skip_keys.contains(&key.as_str()) {
93 continue;
94 }
95 if !schema_props.is_some_and(|sp| sp.contains_key(key)) {
96 overflow_props.insert(key.clone(), value.clone());
97 }
98 }
99
100 if overflow_props.is_empty() {
101 builder.append_null();
102 } else {
103 let json_val = serde_json::to_value(&overflow_props)
104 .map_err(|e| anyhow!("Failed to serialize overflow properties: {}", e))?;
105 let uni_val: Value = json_val.into();
106 let jsonb = uni_common::cypher_value_codec::encode(&uni_val);
107 builder.append_value(&jsonb);
108 }
109 }
110
111 Ok(Arc::new(builder.finish()))
112}