Skip to main content

scouter_dataframe/parquet/tracing/
traits.rs

1use arrow::datatypes::*;
2use deltalake::kernel::{
3    DataType as DeltaDataType, PrimitiveType, StructField as DeltaStructField, StructType,
4};
5use std::sync::Arc;
6
7pub(crate) fn attribute_field() -> Field {
8    Field::new(
9        "attributes",
10        DataType::Map(
11            Arc::new(Field::new(
12                "key_value",
13                DataType::Struct(
14                    vec![
15                        Field::new("key", DataType::Utf8, false),
16                        Field::new("value", DataType::Utf8View, true),
17                    ]
18                    .into(),
19                ),
20                false,
21            )),
22            false,
23        ),
24        false,
25    )
26}
27
28/// Map field for resource_attributes (nullable map, matching attribute_field structure)
29pub(crate) fn resource_attribute_field() -> Field {
30    Field::new(
31        "resource_attributes",
32        DataType::Map(
33            Arc::new(Field::new(
34                "key_value",
35                DataType::Struct(
36                    vec![
37                        Field::new("key", DataType::Utf8, false),
38                        Field::new("value", DataType::Utf8View, true),
39                    ]
40                    .into(),
41                ),
42                false,
43            )),
44            false,
45        ),
46        true, // nullable: a span may have no resource attributes
47    )
48}
49
50pub trait TraceSchemaExt {
51    /// Define the Arrow schema for trace spans.
52    ///
53    /// Hierarchy fields (depth, span_order, path, root_span_id) are NOT stored —
54    /// they are computed at query time via Rust DFS traversal, matching how Jaeger/Zipkin operate.
55    ///
56    /// Fields align 1:1 with `TraceSpanRecord` (the ingest type), enabling zero-transform writes.
57    fn create_schema() -> Schema {
58        Schema::new(vec![
59            // ========== Core Identifiers ==========
60            Field::new("trace_id", DataType::FixedSizeBinary(16), false),
61            Field::new("span_id", DataType::FixedSizeBinary(8), false),
62            Field::new("parent_span_id", DataType::FixedSizeBinary(8), true),
63            // ========== W3C Trace Context ==========
64            Field::new("flags", DataType::Int32, false),
65            Field::new("trace_state", DataType::Utf8, false),
66            // ========== Instrumentation Scope ==========
67            Field::new("scope_name", DataType::Utf8, false),
68            Field::new("scope_version", DataType::Utf8, true),
69            // ========== Metadata ==========
70            // Dictionary encoding for high-repetition string fields
71            Field::new(
72                "service_name",
73                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
74                false,
75            ),
76            Field::new("span_name", DataType::Utf8, false),
77            Field::new(
78                "span_kind",
79                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
80                true,
81            ),
82            // ========== Temporal Data ==========
83            Field::new(
84                "start_time",
85                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
86                false,
87            ),
88            Field::new(
89                "end_time",
90                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
91                false,
92            ),
93            Field::new("duration_ms", DataType::Int64, false),
94            // ========== Status ==========
95            Field::new("status_code", DataType::Int32, false),
96            Field::new("status_message", DataType::Utf8, true),
97            // ========== Scouter-specific ==========
98            Field::new("label", DataType::Utf8, true),
99            // ========== Attributes ==========
100            attribute_field(),
101            resource_attribute_field(),
102            // ========== Events (Nested) ==========
103            // SpanEvent: all fields non-nullable, attributes Vec can be empty
104            Field::new(
105                "events",
106                DataType::List(Arc::new(Field::new(
107                    "item",
108                    DataType::Struct(
109                        vec![
110                            Field::new("name", DataType::Utf8, false),
111                            Field::new(
112                                "timestamp",
113                                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
114                                false,
115                            ),
116                            attribute_field(),
117                            Field::new("dropped_attributes_count", DataType::UInt32, false),
118                        ]
119                        .into(),
120                    ),
121                    true,
122                ))),
123                false,
124            ),
125            // ========== Links (Nested) ==========
126            Field::new(
127                "links",
128                DataType::List(Arc::new(Field::new(
129                    "item",
130                    DataType::Struct(
131                        vec![
132                            Field::new("trace_id", DataType::FixedSizeBinary(16), false),
133                            Field::new("span_id", DataType::FixedSizeBinary(8), false),
134                            Field::new("trace_state", DataType::Utf8, false),
135                            attribute_field(),
136                            Field::new("dropped_attributes_count", DataType::UInt32, false),
137                        ]
138                        .into(),
139                    ),
140                    true,
141                ))),
142                false,
143            ),
144            // ========== Payload (Large JSON) ==========
145            Field::new("input", DataType::Utf8View, true),
146            Field::new("output", DataType::Utf8View, true),
147            // ========== Full-Text Search Optimization ==========
148            // Pre-computed concatenated search string to avoid JSON parsing at query time
149            Field::new("search_blob", DataType::Utf8View, false),
150            // ========== Partitioning ==========
151            // Hive-style date partition key derived from start_time — lets DataFusion skip
152            Field::new("partition_date", DataType::Date32, false),
153        ])
154    }
155}
156
157/// Convert Arrow Schema to Delta Lake StructFields
158pub fn arrow_schema_to_delta(schema: &Schema) -> Vec<DeltaStructField> {
159    schema
160        .fields()
161        .iter()
162        .map(|field| arrow_field_to_delta(field))
163        .collect()
164}
165
166/// Convert a single Arrow Field to Delta Lake StructField
167fn arrow_field_to_delta(field: &Field) -> DeltaStructField {
168    let delta_type = arrow_type_to_delta(field.data_type());
169    DeltaStructField::new(field.name().clone(), delta_type, field.is_nullable())
170}
171
172/// Map Arrow DataType to Delta Lake DataType
173fn arrow_type_to_delta(arrow_type: &DataType) -> DeltaDataType {
174    match arrow_type {
175        // Primitive types
176        DataType::Boolean => DeltaDataType::Primitive(PrimitiveType::Boolean),
177        DataType::Int8 => DeltaDataType::Primitive(PrimitiveType::Byte),
178        DataType::Int16 => DeltaDataType::Primitive(PrimitiveType::Short),
179        DataType::Int32 => DeltaDataType::Primitive(PrimitiveType::Integer),
180        DataType::Int64 => DeltaDataType::Primitive(PrimitiveType::Long),
181        // Unsigned int types — Delta Lake has no native unsigned; map to next-larger signed type
182        DataType::UInt8 | DataType::UInt16 => DeltaDataType::Primitive(PrimitiveType::Short),
183        DataType::UInt32 => DeltaDataType::Primitive(PrimitiveType::Integer),
184        DataType::UInt64 => DeltaDataType::Primitive(PrimitiveType::Long),
185        DataType::Float32 => DeltaDataType::Primitive(PrimitiveType::Float),
186        DataType::Float64 => DeltaDataType::Primitive(PrimitiveType::Double),
187
188        // String types
189        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
190            DeltaDataType::Primitive(PrimitiveType::String)
191        }
192
193        // Binary types
194        DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
195            DeltaDataType::Primitive(PrimitiveType::Binary)
196        }
197
198        // Temporal types
199        DataType::Timestamp(TimeUnit::Microsecond, Some(_))
200        | DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => {
201            DeltaDataType::Primitive(PrimitiveType::Timestamp)
202        }
203        DataType::Timestamp(TimeUnit::Microsecond, None)
204        | DataType::Timestamp(TimeUnit::Nanosecond, None) => {
205            DeltaDataType::Primitive(PrimitiveType::TimestampNtz)
206        }
207        DataType::Date32 | DataType::Date64 => DeltaDataType::Primitive(PrimitiveType::Date),
208
209        // Complex types
210        DataType::List(field) | DataType::LargeList(field) => {
211            let element_type = arrow_type_to_delta(field.data_type());
212            DeltaDataType::Array(Box::new(deltalake::kernel::ArrayType::new(
213                element_type,
214                field.is_nullable(),
215            )))
216        }
217
218        DataType::Struct(fields) => {
219            let delta_fields: Vec<DeltaStructField> =
220                fields.iter().map(|f| arrow_field_to_delta(f)).collect();
221            DeltaDataType::Struct(Box::new(StructType::try_new(delta_fields).unwrap()))
222        }
223
224        DataType::Map(field, _sorted) => {
225            if let DataType::Struct(map_fields) = field.data_type() {
226                let key_type = arrow_type_to_delta(map_fields[0].data_type());
227                let value_type = arrow_type_to_delta(map_fields[1].data_type());
228                DeltaDataType::Map(Box::new(deltalake::kernel::MapType::new(
229                    key_type,
230                    value_type,
231                    map_fields[1].is_nullable(),
232                )))
233            } else {
234                DeltaDataType::Primitive(PrimitiveType::String)
235            }
236        }
237
238        // Dictionary encoding - use underlying value type
239        DataType::Dictionary(_, value_type) => arrow_type_to_delta(value_type),
240
241        // Fallback for unsupported types
242        _ => DeltaDataType::Primitive(PrimitiveType::String),
243    }
244}