liquid_cache/utils/
variant_schema.rs1use std::{collections::BTreeMap, sync::Arc};
4
5use arrow_schema::{DataType, Field, FieldRef, Fields};
6
7fn split_variant_path(path: &str) -> Vec<String> {
8 path.split('.')
9 .filter(|segment| !segment.is_empty())
10 .map(|segment| segment.to_string())
11 .collect()
12}
13
14#[derive(Default, Clone)]
16pub struct VariantSchema {
17 root: VariantSchemaNode,
18}
19
20impl VariantSchema {
21 pub fn new(existing_typed_value: Option<&Field>) -> Self {
23 let mut root = VariantSchemaNode::default();
24 if let Some(field) = existing_typed_value {
25 root.absorb_root(field);
26 }
27 Self { root }
28 }
29
30 pub fn insert_path(&mut self, path: &str, data_type: &DataType) {
32 let segments = split_variant_path(path);
33 if segments.is_empty() {
34 return;
35 }
36 self.root.insert_segments(&segments, data_type);
37 }
38
39 pub fn typed_fields(&self) -> Vec<FieldRef> {
41 self.root.build_arrow_children()
42 }
43
44 pub fn shredding_type(&self) -> Option<DataType> {
46 self.root.logical_struct_type()
47 }
48}
49
50#[derive(Default, Clone)]
51struct VariantSchemaNode {
52 children: BTreeMap<String, VariantSchemaNode>,
53 leaf_type: Option<DataType>,
54}
55
56impl VariantSchemaNode {
57 fn absorb_root(&mut self, typed_field: &Field) {
58 if let DataType::Struct(children) = typed_field.data_type() {
59 for child in children.iter() {
60 self.children
61 .entry(child.name().clone())
62 .or_default()
63 .absorb_shredded_field(child.as_ref());
64 }
65 }
66 }
67
68 fn absorb_shredded_field(&mut self, field: &Field) {
69 match field.data_type() {
70 DataType::Struct(children) => {
71 let Some(typed_child) = children
72 .iter()
73 .find(|child| child.name() == "typed_value")
74 .map(|child| child.as_ref())
75 else {
76 return;
77 };
78 match typed_child.data_type() {
79 DataType::Struct(grand_children) if !grand_children.is_empty() => {
80 for grand_child in grand_children.iter() {
81 self.children
82 .entry(grand_child.name().clone())
83 .or_default()
84 .absorb_shredded_field(grand_child.as_ref());
85 }
86 }
87 other => {
88 self.leaf_type = Some(other.clone());
89 }
90 }
91 }
92 other => {
93 self.leaf_type = Some(other.clone());
94 }
95 }
96 }
97
98 fn insert_segments(&mut self, segments: &[String], data_type: &DataType) {
99 if segments.is_empty() {
100 self.leaf_type = Some(data_type.clone());
101 return;
102 }
103 let (head, tail) = segments.split_first().unwrap();
104 self.children
105 .entry(head.clone())
106 .or_default()
107 .insert_segments(tail, data_type);
108 }
109
110 fn build_arrow_children(&self) -> Vec<FieldRef> {
111 self.children
112 .iter()
113 .filter_map(|(name, child)| child.build_arrow_field(name))
114 .collect()
115 }
116
117 fn build_arrow_field(&self, name: &str) -> Option<FieldRef> {
118 let typed_value_type = if self.children.is_empty() {
119 self.leaf_type.clone()?
120 } else {
121 let child_fields = self.build_arrow_children();
122 if child_fields.is_empty() {
123 return None;
124 }
125 DataType::Struct(Fields::from(child_fields))
126 };
127 let fields = Fields::from(vec![
128 Arc::new(Field::new("value", DataType::BinaryView, true)),
129 Arc::new(Field::new("typed_value", typed_value_type, true)),
130 ]);
131 Some(Arc::new(Field::new(name, DataType::Struct(fields), false)))
132 }
133
134 fn logical_struct_type(&self) -> Option<DataType> {
135 if self.children.is_empty() {
136 self.leaf_type.clone()
137 } else {
138 let child_fields: Vec<_> = self
139 .children
140 .iter()
141 .filter_map(|(name, child)| child.logical_field(name))
142 .collect();
143 if child_fields.is_empty() {
144 None
145 } else {
146 Some(DataType::Struct(Fields::from(child_fields)))
147 }
148 }
149 }
150
151 fn logical_field(&self, name: &str) -> Option<FieldRef> {
152 self.logical_struct_type()
153 .map(|data_type| Arc::new(Field::new(name, data_type, false)))
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 #[test]
162 fn shredding_type_matches_typed_fields() {
163 let mut schema = VariantSchema::default();
164 schema.insert_path("a", &DataType::Int64);
165 schema.insert_path("b.c", &DataType::Utf8);
166
167 let typed = schema.typed_fields();
168 assert_eq!(typed.len(), 2);
169 assert_eq!(typed[0].name(), "a");
170 assert_eq!(typed[1].name(), "b");
171
172 let logical = schema.shredding_type().unwrap();
173 match logical {
174 DataType::Struct(fields) => {
175 assert_eq!(fields.len(), 2);
176 }
177 _ => panic!("expected struct"),
178 }
179 }
180}