Skip to main content

scouter_dataframe/parquet/tracing/
traits.rs

1use arrow::datatypes::*;
2use deltalake::kernel::{
3    DataType as DeltaDataType, PrimitiveType, StructField as DeltaStructField, StructType,
4};
5use std::sync::Arc;
6
7pub(crate) fn attribute_field() -> Field {
8    Field::new(
9        "attributes",
10        DataType::Map(
11            Arc::new(Field::new(
12                "key_value",
13                DataType::Struct(
14                    vec![
15                        Field::new("key", DataType::Utf8, false),
16                        Field::new("value", DataType::Utf8View, true),
17                    ]
18                    .into(),
19                ),
20                false,
21            )),
22            false,
23        ),
24        false,
25    )
26}
27
28pub trait TraceSchemaExt {
29    /// Define the Arrow schema for trace spans
30    fn create_schema() -> Schema {
31        Schema::new(vec![
32            // ========== Core Identifiers ==========
33            Field::new("trace_id", DataType::FixedSizeBinary(16), false),
34            Field::new("span_id", DataType::FixedSizeBinary(8), false),
35            Field::new("parent_span_id", DataType::FixedSizeBinary(8), true),
36            Field::new("root_span_id", DataType::FixedSizeBinary(8), false),
37            // ========== Metadata ==========
38            // Dictionary encoding for high-repetition string fields
39            Field::new(
40                "service_name",
41                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
42                false,
43            ),
44            Field::new("span_name", DataType::Utf8, false),
45            Field::new(
46                "span_kind",
47                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
48                true,
49            ),
50            // ========== Temporal Data ==========
51            Field::new(
52                "start_time",
53                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
54                false,
55            ),
56            Field::new(
57                "end_time",
58                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
59                false,
60            ),
61            Field::new("duration_ms", DataType::Int64, false),
62            // ========== Status ==========
63            Field::new("status_code", DataType::Int32, false),
64            Field::new("status_message", DataType::Utf8, true),
65            // ========== Hierarchy/Navigation ==========
66            Field::new("depth", DataType::Int32, false),
67            Field::new("span_order", DataType::Int32, false),
68            Field::new(
69                "path",
70                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
71                false,
72            ),
73            attribute_field(),
74            // ========== Events (Nested) ==========
75            // SpanEvent: all fields non-nullable, attributes Vec can be empty
76            Field::new(
77                "events",
78                DataType::List(Arc::new(Field::new(
79                    "item",
80                    DataType::Struct(
81                        vec![
82                            Field::new("name", DataType::Utf8, false),
83                            Field::new(
84                                "timestamp",
85                                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
86                                false,
87                            ),
88                            attribute_field(),
89                            Field::new("dropped_attributes_count", DataType::UInt32, false),
90                        ]
91                        .into(),
92                    ),
93                    true,
94                ))),
95                false,
96            ),
97            // ========== Links (Nested) ==========
98            // SpanLink: all fields non-nullable, attributes Vec can be empty
99            Field::new(
100                "links",
101                DataType::List(Arc::new(Field::new(
102                    "item",
103                    DataType::Struct(
104                        vec![
105                            Field::new("trace_id", DataType::FixedSizeBinary(16), false),
106                            Field::new("span_id", DataType::FixedSizeBinary(8), false),
107                            Field::new("trace_state", DataType::Utf8, false),
108                            attribute_field(),
109                            Field::new("dropped_attributes_count", DataType::UInt32, false),
110                        ]
111                        .into(),
112                    ),
113                    true,
114                ))),
115                false,
116            ),
117            // ========== Payload (Large JSON) ==========
118            // Use Utf8View for potentially very large input/output values
119            Field::new("input", DataType::Utf8View, true),
120            Field::new("output", DataType::Utf8View, true),
121            // ========== Full-Text Search Optimization ==========
122            // Pre-computed concatenated search string to avoid JSON parsing
123            Field::new("search_blob", DataType::Utf8View, false),
124        ])
125    }
126}
127
128/// Convert Arrow Schema to Delta Lake StructFields
129pub fn arrow_schema_to_delta(schema: &Schema) -> Vec<DeltaStructField> {
130    schema
131        .fields()
132        .iter()
133        .map(|field| arrow_field_to_delta(field))
134        .collect()
135}
136
137/// Convert a single Arrow Field to Delta Lake StructField
138fn arrow_field_to_delta(field: &Field) -> DeltaStructField {
139    let delta_type = arrow_type_to_delta(field.data_type());
140    DeltaStructField::new(field.name().clone(), delta_type, field.is_nullable())
141}
142
143/// Map Arrow DataType to Delta Lake DataType
144fn arrow_type_to_delta(arrow_type: &DataType) -> DeltaDataType {
145    match arrow_type {
146        // Primitive types
147        DataType::Boolean => DeltaDataType::Primitive(PrimitiveType::Boolean),
148        DataType::Int8 => DeltaDataType::Primitive(PrimitiveType::Byte),
149        DataType::Int16 => DeltaDataType::Primitive(PrimitiveType::Short),
150        DataType::Int32 => DeltaDataType::Primitive(PrimitiveType::Integer),
151        DataType::Int64 => DeltaDataType::Primitive(PrimitiveType::Long),
152        DataType::Float32 => DeltaDataType::Primitive(PrimitiveType::Float),
153        DataType::Float64 => DeltaDataType::Primitive(PrimitiveType::Double),
154
155        // String types
156        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
157            DeltaDataType::Primitive(PrimitiveType::String)
158        }
159
160        // Binary types
161        DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
162            DeltaDataType::Primitive(PrimitiveType::Binary)
163        }
164
165        // Temporal types
166        DataType::Timestamp(TimeUnit::Microsecond, Some(_))
167        | DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => {
168            DeltaDataType::Primitive(PrimitiveType::Timestamp)
169        }
170        DataType::Timestamp(TimeUnit::Microsecond, None)
171        | DataType::Timestamp(TimeUnit::Nanosecond, None) => {
172            DeltaDataType::Primitive(PrimitiveType::TimestampNtz)
173        }
174        DataType::Date32 | DataType::Date64 => DeltaDataType::Primitive(PrimitiveType::Date),
175
176        // Complex types
177        DataType::List(field) | DataType::LargeList(field) => {
178            let element_type = arrow_type_to_delta(field.data_type());
179            DeltaDataType::Array(Box::new(deltalake::kernel::ArrayType::new(
180                element_type,
181                field.is_nullable(),
182            )))
183        }
184
185        DataType::Struct(fields) => {
186            let delta_fields: Vec<DeltaStructField> =
187                fields.iter().map(|f| arrow_field_to_delta(f)).collect();
188            DeltaDataType::Struct(Box::new(StructType::try_new(delta_fields).unwrap()))
189        }
190
191        DataType::Map(field, _sorted) => {
192            if let DataType::Struct(map_fields) = field.data_type() {
193                let key_type = arrow_type_to_delta(map_fields[0].data_type());
194                let value_type = arrow_type_to_delta(map_fields[1].data_type());
195                DeltaDataType::Map(Box::new(deltalake::kernel::MapType::new(
196                    key_type,
197                    value_type,
198                    map_fields[1].is_nullable(),
199                )))
200            } else {
201                // Fallback
202                DeltaDataType::Primitive(PrimitiveType::String)
203            }
204        }
205
206        // Dictionary encoding - use underlying value type
207        DataType::Dictionary(_, value_type) => arrow_type_to_delta(value_type),
208
209        // Fallback for unsupported types
210        _ => DeltaDataType::Primitive(PrimitiveType::String),
211    }
212}
213
214//#[async_trait]
215//pub trait TraceWriterExt: DeltaTableExt {
216//    /// Background task: compact small files every hour
217//    ///
218//    /// This handles the small files created by frequent flushes
219//    pub async fn start_auto_compaction(
220//        self: Arc<Self>,
221//        interval_minutes: u64,
222//    ) -> Result<(), DataFrameError> {
223//        let mut ticker = interval(Duration::from_secs(interval_minutes * 60));
224//
225//        loop {
226//            ticker.tick().await;
227//
228//            if let Err(e) = self.optimize_table().await {
229//                tracing::warn!("Auto-compaction failed: {}", e);
230//            }
231//        }
232//    }
233//
234//    /// Compact small files + apply Z-ORDER
235//    pub async fn optimize_table(&self) -> Result<(), DataFrameError> {
236//        let table = self.table();
237//        table.update_state().await?;
238//        table
239//            .optimize()
240//            .with_target_size(128 * 1024 * 1024)
241//            .with_type(OptimizeType::ZOrder(vec![
242//                "start_time".to_string(),
243//                "service_name".to_string(),
244//                "trace_id".to_string(),
245//            ])).await?;
246//
247//        Ok(())
248//    }
249//
250//    /// High-throughput streaming writer with real-time query support
251//    ///
252//    /// Strategy:
253//    /// - Flush every 5 seconds (configurable) for query latency
254//    /// - OR flush at 10K spans (smaller batches for responsiveness)
255//    /// - Delta Lake handles small file compaction via auto-optimize
256//    pub async fn start_streaming_writer(
257//        self: Arc<Self>,
258//        rx: mpsc::Receiver<Vec<TraceSpan>>,
259//        flush_interval_secs: u64,
260//        max_batch_size: usize,
261//    ) -> Result<(), DataFrameError> {
262//        let table_url = Url::parse(&format!("{}/{}", self.storage_root(), self.table_name()))?;
263//
264//        let mut table = DeltaTableBuilder::from_url(table_url)?.build()?;
265//
266//        table
267//            .optimize()
268//            .with_target_size(128 * 1024 * 1024)
269//            .with_type(OptimizeType::ZOrder(vec![
270//                "start_time".to_string(),
271//                "service_name".to_string(),
272//                "trace_id".to_string(),
273//            ]))
274//            .await?;
275//
276//        self.streaming_write_loop(
277//            rx,
278//            &mut table,
279//            &mut writer,
280//            flush_interval_secs,
281//            max_batch_size,
282//        )
283//        .await
284//    }
285//
286//    async fn streaming_write_loop(
287//        &self,
288//        mut rx: mpsc::Receiver<Vec<TraceSpan>>,
289//        table: &mut deltalake::DeltaTable,
290//        writer: &mut DeltaWriter,
291//        flush_interval_secs: u64,
292//        max_batch_size: usize,
293//    ) -> Result<(), DataFrameError> {
294//        let mut buffer = Vec::with_capacity(max_batch_size);
295//        let mut flush_timer = interval(Duration::from_secs(flush_interval_secs));
296//        let mut last_flush = Instant::now();
297//
298//        loop {
299//            tokio::select! {
300//                // Receive incoming spans
301//                Some(spans) = rx.recv() => {
302//                    buffer.extend(spans);
303//
304//                    // Flush if buffer reaches threshold
305//                    if buffer.len() >= max_batch_size {
306//                        self.flush_buffer(&mut buffer, writer, table).await?;
307//                        last_flush = Instant::now();
308//                    }
309//                }
310//
311//                // Time-based flush for low-volume periods
312//                _ = flush_timer.tick() => {
313//                    if !buffer.is_empty() && last_flush.elapsed().as_secs() >= flush_interval_secs {
314//                        self.flush_buffer(&mut buffer, writer, table).await?;
315//                        last_flush = Instant::now();
316//                    }
317//                }
318//
319//                // Channel closed - final flush
320//                else => {
321//                    if !buffer.is_empty() {
322//                        self.flush_buffer(&mut buffer, writer, table).await?;
323//                    }
324//                    break;
325//                }
326//            }
327//        }
328//
329//        Ok(())
330//    }
331//
332//    async fn flush_buffer(
333//        &self,
334//        buffer: &mut Vec<TraceSpan>,
335//        table: &mut DeltaTable,
336//    ) -> Result<(), DataFrameError> {
337//        if buffer.is_empty() {
338//            return Ok(());
339//        }
340//
341//        let batch = self.build_batch(std::mem::take(buffer))?;
342//
343//        let properties = WriterProperties::builder()
344//                    .set_compression(Compression::SNAPPY)
345//                    .build();
346//
347//        let builder  = table.clone().write(vec![batch]).with_writer_properties(
348//                WriterProperties::builder()
349//                    .set_compression(Compression::SNAPPY)
350//                    .build(),
351//            )
352//            .await?;
353//
354//        WriteBuilder
355//
356//
357//
358//        deltalake::operations::DeltaOps::from(table.clone())
359//            .write(vec![batch])
360//            .with_writer_properties(
361//                WriterProperties::builder()
362//                    .set_compression(Compression::SNAPPY)
363//                    .build(),
364//            )
365//            .await?;
366//
367//        // Reload table metadata for next write
368//        *table = DeltaTableBuilder::from_url(Url::parse(&format!(
369//            "{}/{}",
370//            self.storage_root(),
371//            self.table_name()
372//        ))?)?
373//        .build()?;
374//
375//        Ok(())
376//    }
377//}
378//
379//}