scouter_dataframe/parquet/tracing/traits.rs
1use arrow::datatypes::*;
2use deltalake::kernel::{
3 DataType as DeltaDataType, PrimitiveType, StructField as DeltaStructField, StructType,
4};
5use std::sync::Arc;
6
7pub(crate) fn attribute_field() -> Field {
8 Field::new(
9 "attributes",
10 DataType::Map(
11 Arc::new(Field::new(
12 "key_value",
13 DataType::Struct(
14 vec![
15 Field::new("key", DataType::Utf8, false),
16 Field::new("value", DataType::Utf8View, true),
17 ]
18 .into(),
19 ),
20 false,
21 )),
22 false,
23 ),
24 false,
25 )
26}
27
28pub trait TraceSchemaExt {
29 /// Define the Arrow schema for trace spans
30 fn create_schema() -> Schema {
31 Schema::new(vec![
32 // ========== Core Identifiers ==========
33 Field::new("trace_id", DataType::FixedSizeBinary(16), false),
34 Field::new("span_id", DataType::FixedSizeBinary(8), false),
35 Field::new("parent_span_id", DataType::FixedSizeBinary(8), true),
36 Field::new("root_span_id", DataType::FixedSizeBinary(8), false),
37 // ========== Metadata ==========
38 // Dictionary encoding for high-repetition string fields
39 Field::new(
40 "service_name",
41 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
42 false,
43 ),
44 Field::new("span_name", DataType::Utf8, false),
45 Field::new(
46 "span_kind",
47 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
48 true,
49 ),
50 // ========== Temporal Data ==========
51 Field::new(
52 "start_time",
53 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
54 false,
55 ),
56 Field::new(
57 "end_time",
58 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
59 false,
60 ),
61 Field::new("duration_ms", DataType::Int64, false),
62 // ========== Status ==========
63 Field::new("status_code", DataType::Int32, false),
64 Field::new("status_message", DataType::Utf8, true),
65 // ========== Hierarchy/Navigation ==========
66 Field::new("depth", DataType::Int32, false),
67 Field::new("span_order", DataType::Int32, false),
68 Field::new(
69 "path",
70 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
71 false,
72 ),
73 attribute_field(),
74 // ========== Events (Nested) ==========
75 // SpanEvent: all fields non-nullable, attributes Vec can be empty
76 Field::new(
77 "events",
78 DataType::List(Arc::new(Field::new(
79 "item",
80 DataType::Struct(
81 vec![
82 Field::new("name", DataType::Utf8, false),
83 Field::new(
84 "timestamp",
85 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
86 false,
87 ),
88 attribute_field(),
89 Field::new("dropped_attributes_count", DataType::UInt32, false),
90 ]
91 .into(),
92 ),
93 true,
94 ))),
95 false,
96 ),
97 // ========== Links (Nested) ==========
98 // SpanLink: all fields non-nullable, attributes Vec can be empty
99 Field::new(
100 "links",
101 DataType::List(Arc::new(Field::new(
102 "item",
103 DataType::Struct(
104 vec![
105 Field::new("trace_id", DataType::FixedSizeBinary(16), false),
106 Field::new("span_id", DataType::FixedSizeBinary(8), false),
107 Field::new("trace_state", DataType::Utf8, false),
108 attribute_field(),
109 Field::new("dropped_attributes_count", DataType::UInt32, false),
110 ]
111 .into(),
112 ),
113 true,
114 ))),
115 false,
116 ),
117 // ========== Payload (Large JSON) ==========
118 // Use Utf8View for potentially very large input/output values
119 Field::new("input", DataType::Utf8View, true),
120 Field::new("output", DataType::Utf8View, true),
121 // ========== Full-Text Search Optimization ==========
122 // Pre-computed concatenated search string to avoid JSON parsing
123 Field::new("search_blob", DataType::Utf8View, false),
124 ])
125 }
126}
127
128/// Convert Arrow Schema to Delta Lake StructFields
129pub fn arrow_schema_to_delta(schema: &Schema) -> Vec<DeltaStructField> {
130 schema
131 .fields()
132 .iter()
133 .map(|field| arrow_field_to_delta(field))
134 .collect()
135}
136
137/// Convert a single Arrow Field to Delta Lake StructField
138fn arrow_field_to_delta(field: &Field) -> DeltaStructField {
139 let delta_type = arrow_type_to_delta(field.data_type());
140 DeltaStructField::new(field.name().clone(), delta_type, field.is_nullable())
141}
142
143/// Map Arrow DataType to Delta Lake DataType
144fn arrow_type_to_delta(arrow_type: &DataType) -> DeltaDataType {
145 match arrow_type {
146 // Primitive types
147 DataType::Boolean => DeltaDataType::Primitive(PrimitiveType::Boolean),
148 DataType::Int8 => DeltaDataType::Primitive(PrimitiveType::Byte),
149 DataType::Int16 => DeltaDataType::Primitive(PrimitiveType::Short),
150 DataType::Int32 => DeltaDataType::Primitive(PrimitiveType::Integer),
151 DataType::Int64 => DeltaDataType::Primitive(PrimitiveType::Long),
152 DataType::Float32 => DeltaDataType::Primitive(PrimitiveType::Float),
153 DataType::Float64 => DeltaDataType::Primitive(PrimitiveType::Double),
154
155 // String types
156 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
157 DeltaDataType::Primitive(PrimitiveType::String)
158 }
159
160 // Binary types
161 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
162 DeltaDataType::Primitive(PrimitiveType::Binary)
163 }
164
165 // Temporal types
166 DataType::Timestamp(TimeUnit::Microsecond, Some(_))
167 | DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => {
168 DeltaDataType::Primitive(PrimitiveType::Timestamp)
169 }
170 DataType::Timestamp(TimeUnit::Microsecond, None)
171 | DataType::Timestamp(TimeUnit::Nanosecond, None) => {
172 DeltaDataType::Primitive(PrimitiveType::TimestampNtz)
173 }
174 DataType::Date32 | DataType::Date64 => DeltaDataType::Primitive(PrimitiveType::Date),
175
176 // Complex types
177 DataType::List(field) | DataType::LargeList(field) => {
178 let element_type = arrow_type_to_delta(field.data_type());
179 DeltaDataType::Array(Box::new(deltalake::kernel::ArrayType::new(
180 element_type,
181 field.is_nullable(),
182 )))
183 }
184
185 DataType::Struct(fields) => {
186 let delta_fields: Vec<DeltaStructField> =
187 fields.iter().map(|f| arrow_field_to_delta(f)).collect();
188 DeltaDataType::Struct(Box::new(StructType::try_new(delta_fields).unwrap()))
189 }
190
191 DataType::Map(field, _sorted) => {
192 if let DataType::Struct(map_fields) = field.data_type() {
193 let key_type = arrow_type_to_delta(map_fields[0].data_type());
194 let value_type = arrow_type_to_delta(map_fields[1].data_type());
195 DeltaDataType::Map(Box::new(deltalake::kernel::MapType::new(
196 key_type,
197 value_type,
198 map_fields[1].is_nullable(),
199 )))
200 } else {
201 // Fallback
202 DeltaDataType::Primitive(PrimitiveType::String)
203 }
204 }
205
206 // Dictionary encoding - use underlying value type
207 DataType::Dictionary(_, value_type) => arrow_type_to_delta(value_type),
208
209 // Fallback for unsupported types
210 _ => DeltaDataType::Primitive(PrimitiveType::String),
211 }
212}
213
214//#[async_trait]
215//pub trait TraceWriterExt: DeltaTableExt {
216// /// Background task: compact small files every hour
217// ///
218// /// This handles the small files created by frequent flushes
219// pub async fn start_auto_compaction(
220// self: Arc<Self>,
221// interval_minutes: u64,
222// ) -> Result<(), DataFrameError> {
223// let mut ticker = interval(Duration::from_secs(interval_minutes * 60));
224//
225// loop {
226// ticker.tick().await;
227//
228// if let Err(e) = self.optimize_table().await {
229// tracing::warn!("Auto-compaction failed: {}", e);
230// }
231// }
232// }
233//
234// /// Compact small files + apply Z-ORDER
235// pub async fn optimize_table(&self) -> Result<(), DataFrameError> {
236// let table = self.table();
237// table.update_state().await?;
238// table
239// .optimize()
240// .with_target_size(128 * 1024 * 1024)
241// .with_type(OptimizeType::ZOrder(vec![
242// "start_time".to_string(),
243// "service_name".to_string(),
244// "trace_id".to_string(),
245// ])).await?;
246//
247// Ok(())
248// }
249//
250// /// High-throughput streaming writer with real-time query support
251// ///
252// /// Strategy:
253// /// - Flush every 5 seconds (configurable) for query latency
254// /// - OR flush at 10K spans (smaller batches for responsiveness)
255// /// - Delta Lake handles small file compaction via auto-optimize
256// pub async fn start_streaming_writer(
257// self: Arc<Self>,
258// rx: mpsc::Receiver<Vec<TraceSpan>>,
259// flush_interval_secs: u64,
260// max_batch_size: usize,
261// ) -> Result<(), DataFrameError> {
262// let table_url = Url::parse(&format!("{}/{}", self.storage_root(), self.table_name()))?;
263//
264// let mut table = DeltaTableBuilder::from_url(table_url)?.build()?;
265//
266// table
267// .optimize()
268// .with_target_size(128 * 1024 * 1024)
269// .with_type(OptimizeType::ZOrder(vec![
270// "start_time".to_string(),
271// "service_name".to_string(),
272// "trace_id".to_string(),
273// ]))
274// .await?;
275//
276// self.streaming_write_loop(
277// rx,
278// &mut table,
279// &mut writer,
280// flush_interval_secs,
281// max_batch_size,
282// )
283// .await
284// }
285//
286// async fn streaming_write_loop(
287// &self,
288// mut rx: mpsc::Receiver<Vec<TraceSpan>>,
289// table: &mut deltalake::DeltaTable,
290// writer: &mut DeltaWriter,
291// flush_interval_secs: u64,
292// max_batch_size: usize,
293// ) -> Result<(), DataFrameError> {
294// let mut buffer = Vec::with_capacity(max_batch_size);
295// let mut flush_timer = interval(Duration::from_secs(flush_interval_secs));
296// let mut last_flush = Instant::now();
297//
298// loop {
299// tokio::select! {
300// // Receive incoming spans
301// Some(spans) = rx.recv() => {
302// buffer.extend(spans);
303//
304// // Flush if buffer reaches threshold
305// if buffer.len() >= max_batch_size {
306// self.flush_buffer(&mut buffer, writer, table).await?;
307// last_flush = Instant::now();
308// }
309// }
310//
311// // Time-based flush for low-volume periods
312// _ = flush_timer.tick() => {
313// if !buffer.is_empty() && last_flush.elapsed().as_secs() >= flush_interval_secs {
314// self.flush_buffer(&mut buffer, writer, table).await?;
315// last_flush = Instant::now();
316// }
317// }
318//
319// // Channel closed - final flush
320// else => {
321// if !buffer.is_empty() {
322// self.flush_buffer(&mut buffer, writer, table).await?;
323// }
324// break;
325// }
326// }
327// }
328//
329// Ok(())
330// }
331//
332// async fn flush_buffer(
333// &self,
334// buffer: &mut Vec<TraceSpan>,
335// table: &mut DeltaTable,
336// ) -> Result<(), DataFrameError> {
337// if buffer.is_empty() {
338// return Ok(());
339// }
340//
341// let batch = self.build_batch(std::mem::take(buffer))?;
342//
343// let properties = WriterProperties::builder()
344// .set_compression(Compression::SNAPPY)
345// .build();
346//
347// let builder = table.clone().write(vec![batch]).with_writer_properties(
348// WriterProperties::builder()
349// .set_compression(Compression::SNAPPY)
350// .build(),
351// )
352// .await?;
353//
354// WriteBuilder
355//
356//
357//
358// deltalake::operations::DeltaOps::from(table.clone())
359// .write(vec![batch])
360// .with_writer_properties(
361// WriterProperties::builder()
362// .set_compression(Compression::SNAPPY)
363// .build(),
364// )
365// .await?;
366//
367// // Reload table metadata for next write
368// *table = DeltaTableBuilder::from_url(Url::parse(&format!(
369// "{}/{}",
370// self.storage_root(),
371// self.table_name()
372// ))?)?
373// .build()?;
374//
375// Ok(())
376// }
377//}
378//
379//}