Skip to main content

liquid_cache/utils/
variant_schema.rs

1//! Helpers for building shredding schemas for variant columns.
2
3use std::{collections::BTreeMap, sync::Arc};
4
5use arrow_schema::{DataType, Field, FieldRef, Fields};
6
7fn split_variant_path(path: &str) -> Vec<String> {
8    path.split('.')
9        .filter(|segment| !segment.is_empty())
10        .map(|segment| segment.to_string())
11        .collect()
12}
13
14/// Logical schema builder for variant typed_value trees.
15#[derive(Default, Clone)]
16pub struct VariantSchema {
17    root: VariantSchemaNode,
18}
19
20impl VariantSchema {
21    /// Create a schema seeded with an existing typed_value field (if present).
22    pub fn new(existing_typed_value: Option<&Field>) -> Self {
23        let mut root = VariantSchemaNode::default();
24        if let Some(field) = existing_typed_value {
25            root.absorb_root(field);
26        }
27        Self { root }
28    }
29
30    /// Insert a typed path into the schema.
31    pub fn insert_path(&mut self, path: &str, data_type: &DataType) {
32        let segments = split_variant_path(path);
33        if segments.is_empty() {
34            return;
35        }
36        self.root.insert_segments(&segments, data_type);
37    }
38
39    /// The physical fields for the typed_value struct.
40    pub fn typed_fields(&self) -> Vec<FieldRef> {
41        self.root.build_arrow_children()
42    }
43
44    /// The logical struct type used when shredding.
45    pub fn shredding_type(&self) -> Option<DataType> {
46        self.root.logical_struct_type()
47    }
48}
49
50#[derive(Default, Clone)]
51struct VariantSchemaNode {
52    children: BTreeMap<String, VariantSchemaNode>,
53    leaf_type: Option<DataType>,
54}
55
56impl VariantSchemaNode {
57    fn absorb_root(&mut self, typed_field: &Field) {
58        if let DataType::Struct(children) = typed_field.data_type() {
59            for child in children.iter() {
60                self.children
61                    .entry(child.name().clone())
62                    .or_default()
63                    .absorb_shredded_field(child.as_ref());
64            }
65        }
66    }
67
68    fn absorb_shredded_field(&mut self, field: &Field) {
69        match field.data_type() {
70            DataType::Struct(children) => {
71                let Some(typed_child) = children
72                    .iter()
73                    .find(|child| child.name() == "typed_value")
74                    .map(|child| child.as_ref())
75                else {
76                    return;
77                };
78                match typed_child.data_type() {
79                    DataType::Struct(grand_children) if !grand_children.is_empty() => {
80                        for grand_child in grand_children.iter() {
81                            self.children
82                                .entry(grand_child.name().clone())
83                                .or_default()
84                                .absorb_shredded_field(grand_child.as_ref());
85                        }
86                    }
87                    other => {
88                        self.leaf_type = Some(other.clone());
89                    }
90                }
91            }
92            other => {
93                self.leaf_type = Some(other.clone());
94            }
95        }
96    }
97
98    fn insert_segments(&mut self, segments: &[String], data_type: &DataType) {
99        if segments.is_empty() {
100            self.leaf_type = Some(data_type.clone());
101            return;
102        }
103        let (head, tail) = segments.split_first().unwrap();
104        self.children
105            .entry(head.clone())
106            .or_default()
107            .insert_segments(tail, data_type);
108    }
109
110    fn build_arrow_children(&self) -> Vec<FieldRef> {
111        self.children
112            .iter()
113            .filter_map(|(name, child)| child.build_arrow_field(name))
114            .collect()
115    }
116
117    fn build_arrow_field(&self, name: &str) -> Option<FieldRef> {
118        let typed_value_type = if self.children.is_empty() {
119            self.leaf_type.clone()?
120        } else {
121            let child_fields = self.build_arrow_children();
122            if child_fields.is_empty() {
123                return None;
124            }
125            DataType::Struct(Fields::from(child_fields))
126        };
127        let fields = Fields::from(vec![
128            Arc::new(Field::new("value", DataType::BinaryView, true)),
129            Arc::new(Field::new("typed_value", typed_value_type, true)),
130        ]);
131        Some(Arc::new(Field::new(name, DataType::Struct(fields), false)))
132    }
133
134    fn logical_struct_type(&self) -> Option<DataType> {
135        if self.children.is_empty() {
136            self.leaf_type.clone()
137        } else {
138            let child_fields: Vec<_> = self
139                .children
140                .iter()
141                .filter_map(|(name, child)| child.logical_field(name))
142                .collect();
143            if child_fields.is_empty() {
144                None
145            } else {
146                Some(DataType::Struct(Fields::from(child_fields)))
147            }
148        }
149    }
150
151    fn logical_field(&self, name: &str) -> Option<FieldRef> {
152        self.logical_struct_type()
153            .map(|data_type| Arc::new(Field::new(name, data_type, false)))
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[test]
162    fn shredding_type_matches_typed_fields() {
163        let mut schema = VariantSchema::default();
164        schema.insert_path("a", &DataType::Int64);
165        schema.insert_path("b.c", &DataType::Utf8);
166
167        let typed = schema.typed_fields();
168        assert_eq!(typed.len(), 2);
169        assert_eq!(typed[0].name(), "a");
170        assert_eq!(typed[1].name(), "b");
171
172        let logical = schema.shredding_type().unwrap();
173        match logical {
174            DataType::Struct(fields) => {
175                assert_eq!(fields.len(), 2);
176            }
177            _ => panic!("expected struct"),
178        }
179    }
180}