1use arrow::datatypes::*;
2use deltalake::kernel::{
3 DataType as DeltaDataType, PrimitiveType, StructField as DeltaStructField, StructType,
4};
5use std::sync::Arc;
6
7pub(crate) fn attribute_field() -> Field {
8 Field::new(
9 "attributes",
10 DataType::Map(
11 Arc::new(Field::new(
12 "key_value",
13 DataType::Struct(
14 vec![
15 Field::new("key", DataType::Utf8, false),
16 Field::new("value", DataType::Utf8View, true),
17 ]
18 .into(),
19 ),
20 false,
21 )),
22 false,
23 ),
24 false,
25 )
26}
27
28pub(crate) fn resource_attribute_field() -> Field {
30 Field::new(
31 "resource_attributes",
32 DataType::Map(
33 Arc::new(Field::new(
34 "key_value",
35 DataType::Struct(
36 vec![
37 Field::new("key", DataType::Utf8, false),
38 Field::new("value", DataType::Utf8View, true),
39 ]
40 .into(),
41 ),
42 false,
43 )),
44 false,
45 ),
46 true, )
48}
49
50pub trait TraceSchemaExt {
51 fn create_schema() -> Schema {
58 Schema::new(vec![
59 Field::new("trace_id", DataType::FixedSizeBinary(16), false),
61 Field::new("span_id", DataType::FixedSizeBinary(8), false),
62 Field::new("parent_span_id", DataType::FixedSizeBinary(8), true),
63 Field::new("flags", DataType::Int32, false),
65 Field::new("trace_state", DataType::Utf8, false),
66 Field::new("scope_name", DataType::Utf8, false),
68 Field::new("scope_version", DataType::Utf8, true),
69 Field::new(
72 "service_name",
73 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
74 false,
75 ),
76 Field::new("span_name", DataType::Utf8, false),
77 Field::new(
78 "span_kind",
79 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
80 true,
81 ),
82 Field::new(
84 "start_time",
85 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
86 false,
87 ),
88 Field::new(
89 "end_time",
90 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
91 false,
92 ),
93 Field::new("duration_ms", DataType::Int64, false),
94 Field::new("status_code", DataType::Int32, false),
96 Field::new("status_message", DataType::Utf8, true),
97 Field::new("label", DataType::Utf8, true),
99 attribute_field(),
101 resource_attribute_field(),
102 Field::new(
105 "events",
106 DataType::List(Arc::new(Field::new(
107 "item",
108 DataType::Struct(
109 vec![
110 Field::new("name", DataType::Utf8, false),
111 Field::new(
112 "timestamp",
113 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
114 false,
115 ),
116 attribute_field(),
117 Field::new("dropped_attributes_count", DataType::UInt32, false),
118 ]
119 .into(),
120 ),
121 true,
122 ))),
123 false,
124 ),
125 Field::new(
127 "links",
128 DataType::List(Arc::new(Field::new(
129 "item",
130 DataType::Struct(
131 vec![
132 Field::new("trace_id", DataType::FixedSizeBinary(16), false),
133 Field::new("span_id", DataType::FixedSizeBinary(8), false),
134 Field::new("trace_state", DataType::Utf8, false),
135 attribute_field(),
136 Field::new("dropped_attributes_count", DataType::UInt32, false),
137 ]
138 .into(),
139 ),
140 true,
141 ))),
142 false,
143 ),
144 Field::new("input", DataType::Utf8View, true),
146 Field::new("output", DataType::Utf8View, true),
147 Field::new("search_blob", DataType::Utf8View, false),
150 Field::new("partition_date", DataType::Date32, false),
153 ])
154 }
155}
156
157pub fn arrow_schema_to_delta(schema: &Schema) -> Vec<DeltaStructField> {
159 schema
160 .fields()
161 .iter()
162 .map(|field| arrow_field_to_delta(field))
163 .collect()
164}
165
166fn arrow_field_to_delta(field: &Field) -> DeltaStructField {
168 let delta_type = arrow_type_to_delta(field.data_type());
169 DeltaStructField::new(field.name().clone(), delta_type, field.is_nullable())
170}
171
172fn arrow_type_to_delta(arrow_type: &DataType) -> DeltaDataType {
174 match arrow_type {
175 DataType::Boolean => DeltaDataType::Primitive(PrimitiveType::Boolean),
177 DataType::Int8 => DeltaDataType::Primitive(PrimitiveType::Byte),
178 DataType::Int16 => DeltaDataType::Primitive(PrimitiveType::Short),
179 DataType::Int32 => DeltaDataType::Primitive(PrimitiveType::Integer),
180 DataType::Int64 => DeltaDataType::Primitive(PrimitiveType::Long),
181 DataType::UInt8 | DataType::UInt16 => DeltaDataType::Primitive(PrimitiveType::Short),
183 DataType::UInt32 => DeltaDataType::Primitive(PrimitiveType::Integer),
184 DataType::UInt64 => DeltaDataType::Primitive(PrimitiveType::Long),
185 DataType::Float32 => DeltaDataType::Primitive(PrimitiveType::Float),
186 DataType::Float64 => DeltaDataType::Primitive(PrimitiveType::Double),
187
188 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
190 DeltaDataType::Primitive(PrimitiveType::String)
191 }
192
193 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
195 DeltaDataType::Primitive(PrimitiveType::Binary)
196 }
197
198 DataType::Timestamp(TimeUnit::Microsecond, Some(_))
200 | DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => {
201 DeltaDataType::Primitive(PrimitiveType::Timestamp)
202 }
203 DataType::Timestamp(TimeUnit::Microsecond, None)
204 | DataType::Timestamp(TimeUnit::Nanosecond, None) => {
205 DeltaDataType::Primitive(PrimitiveType::TimestampNtz)
206 }
207 DataType::Date32 | DataType::Date64 => DeltaDataType::Primitive(PrimitiveType::Date),
208
209 DataType::List(field) | DataType::LargeList(field) => {
211 let element_type = arrow_type_to_delta(field.data_type());
212 DeltaDataType::Array(Box::new(deltalake::kernel::ArrayType::new(
213 element_type,
214 field.is_nullable(),
215 )))
216 }
217
218 DataType::Struct(fields) => {
219 let delta_fields: Vec<DeltaStructField> =
220 fields.iter().map(|f| arrow_field_to_delta(f)).collect();
221 DeltaDataType::Struct(Box::new(StructType::try_new(delta_fields).unwrap()))
222 }
223
224 DataType::Map(field, _sorted) => {
225 if let DataType::Struct(map_fields) = field.data_type() {
226 let key_type = arrow_type_to_delta(map_fields[0].data_type());
227 let value_type = arrow_type_to_delta(map_fields[1].data_type());
228 DeltaDataType::Map(Box::new(deltalake::kernel::MapType::new(
229 key_type,
230 value_type,
231 map_fields[1].is_nullable(),
232 )))
233 } else {
234 DeltaDataType::Primitive(PrimitiveType::String)
235 }
236 }
237
238 DataType::Dictionary(_, value_type) => arrow_type_to_delta(value_type),
240
241 _ => DeltaDataType::Primitive(PrimitiveType::String),
243 }
244}