Skip to main content

scouter_dataframe/parquet/tracing/
engine.rs

1use crate::error::TraceEngineError;
2use crate::parquet::tracing::traits::arrow_schema_to_delta;
3use crate::parquet::tracing::traits::attribute_field;
4use crate::parquet::tracing::traits::TraceSchemaExt;
5use crate::storage::ObjectStore;
6use arrow::array::*;
7use arrow::datatypes::*;
8use arrow_array::RecordBatch;
9use datafusion::prelude::SessionContext;
10use deltalake::operations::optimize::OptimizeType;
11use deltalake::DeltaTable;
12use scouter_settings::ObjectStorageSettings;
13use scouter_types::sql::TraceSpan;
14use scouter_types::SpanId;
15use scouter_types::TraceId;
16use scouter_types::{Attribute, SpanEvent, SpanLink};
17use serde_json::Value;
18use std::sync::Arc;
19use tokio::sync::oneshot;
20use tokio::sync::{mpsc, RwLock as AsyncRwLock};
21use tokio::time::{interval, Duration};
22use tracing::{debug, error, info, instrument};
23use url::Url;
24const TRACE_SPAN_TABLE_NAME: &str = "trace_spans";
25
26pub enum TableCommand {
27    Write {
28        spans: Vec<TraceSpan>,
29        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
30    },
31    Optimize {
32        respond_to: oneshot::Sender<Result<(), TraceEngineError>>,
33    },
34    Shutdown,
35}
36
37async fn build_url(object_store: &ObjectStore) -> Result<Url, TraceEngineError> {
38    let base_url = object_store.get_base_url()?; // Use existing method
39    Ok(base_url)
40}
41
42#[instrument(skip_all)]
43async fn create_table(table_url: Url, schema: SchemaRef) -> Result<DeltaTable, TraceEngineError> {
44    info!("Creating new Delta table at URL: {}", table_url);
45
46    let table = DeltaTable::try_from_url(table_url).await?;
47
48    let delta_fields = arrow_schema_to_delta(&schema);
49
50    table
51        .create()
52        .with_table_name(TRACE_SPAN_TABLE_NAME)
53        .with_columns(delta_fields)
54        .await
55        .map_err(Into::into)
56}
57
58#[instrument(skip_all)]
59async fn build_or_create_table(
60    object_store: &ObjectStore,
61    schema: SchemaRef,
62) -> Result<DeltaTable, TraceEngineError> {
63    let table_url = build_url(object_store).await?;
64
65    info!("Attempting to load table at URL: {}", table_url);
66
67    // For local filesystem, ensure directory exists
68    if table_url.scheme() == "file" {
69        if let Ok(path) = table_url.to_file_path() {
70            if !path.exists() {
71                info!("Creating directory for local table: {:?}", path);
72                std::fs::create_dir_all(&path)?;
73            }
74        }
75    }
76
77    match DeltaTable::try_from_url(table_url.clone()).await {
78        Ok(table) => {
79            info!("Loaded existing Delta table");
80            Ok(table)
81        }
82        Err(deltalake::DeltaTableError::NotATable(_)) => {
83            info!("Table does not exist, creating new table");
84            create_table(table_url, schema).await
85        }
86        Err(e) => Err(e.into()),
87    }
88}
89/// Core trace span dataframe for high-throughput observability workloads
90///
91/// Design decisions:
92/// - Dictionary encoding for service_name, span_kind (high cardinality, high repetition)
93/// - FixedSizeBinary for IDs (compact representation vs hex strings)
94/// - Nested structures for events/links to maintain relational integrity
95/// - Search blob for full-text queries without parsing JSON
96/// - Attribute shredding foundation for future optimization
97pub struct TraceSpanDBEngine {
98    schema: Arc<Schema>,
99    pub object_store: ObjectStore,
100    table: Arc<AsyncRwLock<DeltaTable>>,
101    pub ctx: Arc<SessionContext>,
102}
103
104impl TraceSchemaExt for TraceSpanDBEngine {}
105
106impl TraceSpanDBEngine {
107    pub async fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, TraceEngineError> {
108        let object_store = ObjectStore::new(storage_settings)?;
109        let schema = Arc::new(Self::create_schema());
110        let delta_table = build_or_create_table(&object_store, schema.clone()).await?;
111        let ctx = object_store.get_session()?;
112        ctx.register_table(TRACE_SPAN_TABLE_NAME, Arc::new(delta_table.clone()))?;
113
114        Ok(TraceSpanDBEngine {
115            schema,
116            object_store,
117            table: Arc::new(AsyncRwLock::new(delta_table)),
118            ctx: Arc::new(ctx),
119        })
120    }
121
122    /// Build a RecordBatch from a vector of TraceSpan records
123    pub fn build_batch(&self, spans: Vec<TraceSpan>) -> Result<RecordBatch, TraceEngineError> {
124        // we need to time the batch building process to identify bottlenecks
125        let start_time = std::time::Instant::now();
126        let mut builder = TraceSpanBatchBuilder::new(self.schema.clone());
127
128        for span in spans {
129            builder.append(&span)?;
130        }
131
132        let record_batch = builder
133            .finish()
134            .inspect_err(|e| error!("Failed to build RecordBatch: {}", e))?;
135
136        let duration = start_time.elapsed();
137        debug!(
138            "Built RecordBatch with {} rows in {:?}",
139            record_batch.num_rows(),
140            duration
141        );
142        Ok(record_batch)
143    }
144
145    /// Helper to write spans directly to the Delta table
146    /// Write will consume current table state and return updated table
147    async fn write_spans(&self, spans: Vec<TraceSpan>) -> Result<(), TraceEngineError> {
148        info!("Engine received write request for {} spans", spans.len());
149
150        let batch = self
151            .build_batch(spans)
152            .inspect_err(|e| error!("failed to build batch: {:?}", e))?;
153        info!("Built batch with {} rows", batch.num_rows());
154
155        let mut table_guard = self.table.write().await;
156        info!("Acquired table write lock");
157
158        // Try to update table state, but ignore if table is freshly created
159        if let Err(e) = table_guard.update_incremental(None).await {
160            // If table is new, it won't have log files yet - this is expected
161            info!("Table update skipped (table may be newly created): {}", e);
162        }
163
164        let current_table = table_guard.clone();
165
166        let updated_table = current_table
167            .write(vec![batch])
168            .with_save_mode(deltalake::protocol::SaveMode::Append)
169            .await?;
170
171        info!("Successfully wrote batch to Delta Lake");
172
173        // Re-register with SessionContext so queries see the new data
174        {
175            self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
176            self.ctx
177                .register_table(TRACE_SPAN_TABLE_NAME, Arc::new(updated_table.clone()))?;
178        }
179
180        *table_guard = updated_table;
181
182        Ok(())
183    }
184
185    async fn optimize_table(&self) -> Result<(), TraceEngineError> {
186        let mut table_guard = self.table.write().await;
187
188        let current_table = table_guard.clone();
189
190        let (updated_table, _metrics) = current_table
191            .optimize()
192            .with_target_size(128 * 1024 * 1024)
193            .with_type(OptimizeType::ZOrder(vec![
194                "start_time".to_string(),
195                "service_name".to_string(),
196            ]))
197            .await?;
198
199        // Re-register with SessionContext
200        self.ctx.deregister_table(TRACE_SPAN_TABLE_NAME)?;
201        self.ctx
202            .register_table(TRACE_SPAN_TABLE_NAME, Arc::new(updated_table.clone()))?;
203
204        *table_guard = updated_table;
205
206        Ok(())
207    }
208
209    #[instrument(skip_all, name = "buffering_actor")]
210    pub fn start_actor(
211        self,
212        compaction_interval_hours: u64,
213    ) -> (mpsc::Sender<TableCommand>, tokio::task::JoinHandle<()>) {
214        let (tx, mut rx) = mpsc::channel::<TableCommand>(100);
215
216        let handle = tokio::spawn(async move {
217            let mut compaction_ticker =
218                interval(Duration::from_secs(compaction_interval_hours * 3600));
219            compaction_ticker.tick().await;
220
221            loop {
222                tokio::select! {
223                    Some(cmd) = rx.recv() => {
224                        match cmd {
225                            TableCommand::Write { spans, respond_to } => {
226                                match self.write_spans(spans).await {
227                                    Ok(_) => {
228                                        let _ = respond_to.send(Ok(()));
229                                    }
230                                    Err(e) => {
231                                        tracing::error!("Write failed: {}", e);
232                                        let _ = respond_to.send(Err(e));
233                                    }
234                                }
235                            }
236                            TableCommand::Optimize { respond_to } => {
237                                match self.optimize_table().await {
238                                    Ok(_) => {
239                                        tracing::info!("Compaction completed");
240                                        let _ = respond_to.send(Ok(()));
241                                    }
242                                    Err(e) => {
243                                        tracing::error!("Compaction failed: {}", e);
244                                        let _ = respond_to.send(Err(e));
245                                    }
246                                }
247                            }
248                            TableCommand::Shutdown => {
249                                tracing::info!("Shutting down table engine");
250                                break;
251                            }
252                        }
253                    }
254                    _ = compaction_ticker.tick() => {
255                        if let Err(e) = self.optimize_table().await {
256                            tracing::error!("Scheduled compaction failed: {}", e);
257                        } else {
258                            tracing::info!("Scheduled compaction completed");
259                        }
260                    }
261                }
262            }
263        });
264
265        (tx, handle)
266    }
267}
268
269/// Efficient builder for converting TraceSpan records into Arrow RecordBatch
270///
271/// Design notes:
272/// - Pre-allocates builders to minimize reallocations
273/// - Uses type-safe builders to catch schema mismatches at compile time
274/// - Handles null values properly for optional fields
275pub struct TraceSpanBatchBuilder {
276    schema: SchemaRef,
277
278    // ID builders
279    trace_id: FixedSizeBinaryBuilder,
280    span_id: FixedSizeBinaryBuilder,
281    parent_span_id: FixedSizeBinaryBuilder,
282    root_span_id: FixedSizeBinaryBuilder,
283
284    // Metadata builders
285    service_name: StringDictionaryBuilder<Int32Type>,
286    span_name: StringBuilder,
287    span_kind: StringDictionaryBuilder<Int8Type>,
288
289    // Time builders
290    start_time: TimestampMicrosecondBuilder,
291    end_time: TimestampMicrosecondBuilder,
292    duration_ms: Int64Builder,
293
294    // Status builders
295    status_code: Int32Builder,
296    status_message: StringBuilder,
297
298    // Hierarchy builders
299    depth: Int32Builder,
300    span_order: Int32Builder,
301    path: ListBuilder<StringBuilder>,
302
303    // Attribute builders
304    attributes: MapBuilder<StringBuilder, StringViewBuilder>,
305
306    // Nested structure builders
307    events: ListBuilder<StructBuilder>,
308    links: ListBuilder<StructBuilder>,
309
310    // Payload builders
311    input: StringViewBuilder,
312    output: StringViewBuilder,
313
314    // Search optimizer
315    search_blob: StringViewBuilder,
316}
317
318impl TraceSpanBatchBuilder {
319    pub fn new(schema: SchemaRef) -> Self {
320        // Initialize all builders
321        let trace_id = FixedSizeBinaryBuilder::new(16);
322        let span_id = FixedSizeBinaryBuilder::new(8);
323        let parent_span_id = FixedSizeBinaryBuilder::new(8);
324        let root_span_id = FixedSizeBinaryBuilder::new(8);
325
326        let service_name = StringDictionaryBuilder::<Int32Type>::new();
327        let span_name = StringBuilder::new();
328        let span_kind = StringDictionaryBuilder::<Int8Type>::new();
329
330        let start_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
331        let end_time = TimestampMicrosecondBuilder::new().with_timezone("UTC");
332        let duration_ms = Int64Builder::new();
333
334        let status_code = Int32Builder::new();
335        let status_message = StringBuilder::new();
336
337        let depth = Int32Builder::new();
338        let span_order = Int32Builder::new();
339        let path = ListBuilder::new(StringBuilder::new());
340
341        let map_field_name = MapFieldNames {
342            entry: "key_value".to_string(),
343            key: "key".to_string(),
344            value: "value".to_string(),
345        };
346        let attributes = MapBuilder::new(
347            Some(map_field_name.clone()),
348            StringBuilder::new(),
349            StringViewBuilder::new(),
350        );
351
352        // Events list builder - must match SpanEvent struct
353        let event_fields = vec![
354            Field::new("name", DataType::Utf8, false),
355            Field::new(
356                "timestamp",
357                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
358                false,
359            ),
360            attribute_field(),
361            Field::new("dropped_attributes_count", DataType::UInt32, false),
362        ];
363
364        let event_struct_builders = vec![
365            Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
366            Box::new(TimestampMicrosecondBuilder::new().with_timezone("UTC"))
367                as Box<dyn ArrayBuilder>,
368            Box::new(MapBuilder::new(
369                Some(map_field_name.clone()),
370                StringBuilder::new(),
371                StringViewBuilder::new(),
372            )) as Box<dyn ArrayBuilder>,
373            Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
374        ];
375
376        let event_struct_builder = StructBuilder::new(event_fields, event_struct_builders);
377        let events = ListBuilder::new(event_struct_builder);
378
379        // Links list builder - must match SpanLink struct
380        let link_fields = vec![
381            Field::new("trace_id", DataType::FixedSizeBinary(16), false),
382            Field::new("span_id", DataType::FixedSizeBinary(8), false),
383            Field::new("trace_state", DataType::Utf8, false),
384            attribute_field(),
385            Field::new("dropped_attributes_count", DataType::UInt32, false),
386        ];
387
388        let link_struct_builders = vec![
389            Box::new(FixedSizeBinaryBuilder::new(16)) as Box<dyn ArrayBuilder>,
390            Box::new(FixedSizeBinaryBuilder::new(8)) as Box<dyn ArrayBuilder>,
391            Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
392            Box::new(MapBuilder::new(
393                Some(map_field_name.clone()),
394                StringBuilder::new(),
395                StringViewBuilder::new(),
396            )) as Box<dyn ArrayBuilder>,
397            Box::new(UInt32Builder::new()) as Box<dyn ArrayBuilder>,
398        ];
399
400        let link_struct_builder = StructBuilder::new(link_fields, link_struct_builders);
401        let links = ListBuilder::new(link_struct_builder);
402
403        let input = StringViewBuilder::new();
404        let output = StringViewBuilder::new();
405        let search_blob = StringViewBuilder::new();
406
407        Self {
408            schema,
409            trace_id,
410            span_id,
411            parent_span_id,
412            root_span_id,
413            service_name,
414            span_name,
415            span_kind,
416            start_time,
417            end_time,
418            duration_ms,
419            status_code,
420            status_message,
421            depth,
422            span_order,
423            path,
424            attributes,
425            events,
426            links,
427            input,
428            output,
429            search_blob,
430        }
431    }
432
433    /// Append a single TraceSpan to the batch
434    pub fn append(&mut self, span: &TraceSpan) -> Result<(), TraceEngineError> {
435        // IDs - convert hex strings to binary
436        Self::append_id_as_bytes(&span.trace_id, &mut self.trace_id, 16).inspect_err(|e| {
437            error!("Failed to append trace_id for span {}: {}", span.span_id, e);
438        })?;
439        Self::append_id_as_bytes(&span.span_id, &mut self.span_id, 8).inspect_err(|e| {
440            error!("Failed to append span_id for span {}: {}", span.span_id, e);
441        })?;
442        Self::append_id_as_bytes(&span.root_span_id, &mut self.root_span_id, 8).inspect_err(
443            |e| {
444                error!(
445                    "Failed to append root_span_id for span {}: {}",
446                    span.span_id, e
447                );
448            },
449        )?;
450
451        match &span.parent_span_id {
452            Some(pid) => Self::append_id_as_bytes(pid, &mut self.parent_span_id, 8)?,
453            None => self.parent_span_id.append_null(),
454        }
455
456        // Metadata
457        self.service_name.append_value(&span.service_name);
458        self.span_name.append_value(&span.span_name);
459
460        match &span.span_kind {
461            Some(kind) => self.span_kind.append_value(kind),
462            None => self.span_kind.append_null(),
463        }
464
465        // Timestamps
466        self.start_time
467            .append_value(span.start_time.timestamp_micros());
468
469        self.end_time.append_value(span.end_time.timestamp_micros());
470
471        self.duration_ms.append_value(span.duration_ms);
472
473        // Status
474        self.status_code.append_value(span.status_code);
475        match &span.status_message {
476            Some(msg) => self.status_message.append_value(msg),
477            None => self.status_message.append_null(),
478        }
479
480        // Hierarchy
481        self.depth.append_value(span.depth);
482        self.span_order.append_value(span.span_order);
483
484        // Path (list of strings)
485        for path_segment in &span.path {
486            self.path.values().append_value(path_segment);
487        }
488        self.path.append(true);
489
490        // Attributes (map)
491        self.append_attributes(&span.attributes).inspect_err(|e| {
492            error!(
493                "Failed to append attributes for span {}: {}",
494                span.span_id, e
495            );
496        })?;
497
498        // Events (nested list of structs)
499        self.append_events(&span.events).inspect_err(|e| {
500            error!("Failed to append events for span {}: {}", span.span_id, e);
501        })?;
502
503        // Links (nested list of structs)
504        self.append_links(&span.links).inspect_err(|e| {
505            error!("Failed to append links for span {}: {}", span.span_id, e);
506        })?;
507
508        // Payloads (potentially large JSON)
509        match &span.input {
510            Some(v) => self.input.append_value(v.to_string()),
511            None => self.input.append_null(),
512        }
513
514        match &span.output {
515            Some(v) => self.output.append_value(v.to_string()),
516            None => self.output.append_null(),
517        }
518
519        // Search blob - concatenate searchable fields
520        let search_text = self.build_search_blob(span);
521        self.search_blob.append_value(search_text);
522
523        Ok(())
524    }
525
526    /// Convert hex string ID to binary and append
527    fn append_id_as_bytes(
528        hex_str: &str,
529        builder: &mut FixedSizeBinaryBuilder,
530        expected_size: usize,
531    ) -> Result<(), TraceEngineError> {
532        match expected_size {
533            16 => {
534                let bytes = TraceId::hex_to_bytes(hex_str)?;
535                builder.append_value(&bytes)?;
536            }
537            8 => {
538                let bytes = SpanId::hex_to_bytes(hex_str)?;
539                builder.append_value(&bytes)?;
540            }
541            _ => {
542                return Err(TraceEngineError::InvalidHexId(
543                    hex_str.to_string(),
544                    "Unsupported ID size".to_string(),
545                ))
546            }
547        }
548        Ok(())
549    }
550
551    /// Append attributes as a map (keys must be sorted)
552    fn append_attributes(&mut self, attributes: &[Attribute]) -> Result<(), TraceEngineError> {
553        for attr in attributes {
554            self.attributes.keys().append_value(&attr.key);
555
556            // Convert serde_json::Value to string for storage
557            let value_str = match &attr.value {
558                Value::String(s) => s.clone(),
559                Value::Null => String::new(),
560                other => other.to_string(),
561            };
562
563            self.attributes.values().append_value(value_str);
564        }
565        self.attributes.append(true)?;
566        Ok(())
567    }
568
569    /// Append events as a list of structs
570    fn append_events(&mut self, events: &[SpanEvent]) -> Result<(), TraceEngineError> {
571        let event_struct = self.events.values();
572        for event in events {
573            // Event name
574            let name_builder = event_struct
575                .field_builder::<StringBuilder>(0)
576                .ok_or_else(|| TraceEngineError::DowncastError("event name builder"))?;
577            name_builder.append_value(&event.name);
578
579            // Event timestamp
580            let time_builder = event_struct
581                .field_builder::<TimestampMicrosecondBuilder>(1)
582                .ok_or_else(|| TraceEngineError::DowncastError("event timestamp builder"))?;
583            time_builder.append_value(event.timestamp.timestamp_micros());
584
585            // Event attributes (nested map) - must be sorted
586            let attr_builder = event_struct
587                .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(2)
588                .ok_or_else(|| TraceEngineError::DowncastError("event attributes builder"))?;
589
590            for attr in &event.attributes {
591                attr_builder.keys().append_value(&attr.key);
592                let value_str = match &attr.value {
593                    Value::String(s) => s.clone(),
594                    Value::Null => String::new(),
595                    other => other.to_string(),
596                };
597                attr_builder.values().append_value(value_str);
598            }
599            attr_builder.append(true)?;
600
601            // Dropped attributes count
602            let dropped_builder =
603                event_struct
604                    .field_builder::<UInt32Builder>(3)
605                    .ok_or_else(|| {
606                        TraceEngineError::DowncastError("dropped attributes count builder")
607                    })?;
608            dropped_builder.append_value(event.dropped_attributes_count);
609
610            event_struct.append(true);
611        }
612
613        self.events.append(true);
614        Ok(())
615    }
616
617    /// Append links as a list of structs
618    fn append_links(&mut self, links: &[SpanLink]) -> Result<(), TraceEngineError> {
619        let link_struct = self.links.values();
620
621        for link in links {
622            // Link trace_id
623            let trace_builder = link_struct
624                .field_builder::<FixedSizeBinaryBuilder>(0)
625                .ok_or_else(|| TraceEngineError::DowncastError("link trace_id builder"))?;
626
627            let trace_bytes = TraceId::hex_to_bytes(&link.trace_id).map_err(|e| {
628                TraceEngineError::InvalidHexId(link.trace_id.clone(), e.to_string())
629            })?;
630            trace_builder.append_value(&trace_bytes)?;
631
632            // Link span_id
633            let span_builder = link_struct
634                .field_builder::<FixedSizeBinaryBuilder>(1)
635                .ok_or_else(|| TraceEngineError::DowncastError("link span_id builder"))?;
636
637            let span_bytes = SpanId::hex_to_bytes(&link.span_id)
638                .map_err(|e| TraceEngineError::InvalidHexId(link.span_id.clone(), e.to_string()))?;
639            span_builder.append_value(&span_bytes)?;
640
641            // Link trace_state - SpanLink.trace_state is String (non-nullable), can be empty
642            let state_builder = link_struct
643                .field_builder::<StringBuilder>(2)
644                .ok_or_else(|| TraceEngineError::DowncastError("link trace_state builder"))?;
645            state_builder.append_value(&link.trace_state);
646
647            // Link attributes - must be sorted
648            let attr_builder = link_struct
649                .field_builder::<MapBuilder<StringBuilder, StringViewBuilder>>(3)
650                .ok_or_else(|| TraceEngineError::DowncastError("link attributes builder"))?;
651
652            for attr in &link.attributes {
653                attr_builder.keys().append_value(&attr.key);
654                let value_str = match &attr.value {
655                    Value::String(s) => s.clone(),
656                    Value::Null => String::new(),
657                    other => other.to_string(),
658                };
659                attr_builder.values().append_value(value_str);
660            }
661            attr_builder.append(true)?;
662
663            // Dropped attributes count
664            let dropped_builder =
665                link_struct
666                    .field_builder::<UInt32Builder>(4)
667                    .ok_or_else(|| {
668                        TraceEngineError::DowncastError("link dropped attributes count builder")
669                    })?;
670            dropped_builder.append_value(link.dropped_attributes_count);
671
672            link_struct.append(true);
673        }
674
675        self.links.append(true);
676        Ok(())
677    }
678
679    /// Build a concatenated search string for full-text queries
680    ///
681    /// This avoids parsing JSON during queries by pre-computing searchable text
682    fn build_search_blob(&self, span: &TraceSpan) -> String {
683        let mut search = String::with_capacity(512);
684
685        // Service and span name
686        search.push_str(&span.service_name);
687        search.push(' ');
688        search.push_str(&span.span_name);
689        search.push(' ');
690
691        // Status message
692        if let Some(msg) = &span.status_message {
693            search.push_str(msg);
694            search.push(' ');
695        }
696
697        // Attributes (key:value pairs)
698        for attr in &span.attributes {
699            search.push_str(&attr.key);
700            search.push(':');
701
702            let value_str = match &attr.value {
703                Value::String(s) => s.as_str(),
704                Value::Number(n) => {
705                    search.push_str(&n.to_string());
706                    continue;
707                }
708                Value::Bool(b) => {
709                    search.push_str(&b.to_string());
710                    continue;
711                }
712                Value::Null => continue,
713                _ => {
714                    search.push_str(&attr.value.to_string());
715                    continue;
716                }
717            };
718
719            search.push_str(value_str);
720            search.push(' ');
721        }
722
723        // Event names (for searchability)
724        for event in &span.events {
725            search.push_str(&event.name);
726            search.push(' ');
727        }
728
729        search
730    }
731
732    /// Finalize and build the RecordBatch
733    pub fn finish(mut self) -> Result<RecordBatch, TraceEngineError> {
734        let batch = RecordBatch::try_new(
735            self.schema.clone(),
736            vec![
737                Arc::new(self.trace_id.finish()),
738                Arc::new(self.span_id.finish()),
739                Arc::new(self.parent_span_id.finish()),
740                Arc::new(self.root_span_id.finish()),
741                Arc::new(self.service_name.finish()),
742                Arc::new(self.span_name.finish()),
743                Arc::new(self.span_kind.finish()),
744                Arc::new(self.start_time.finish()),
745                Arc::new(self.end_time.finish()),
746                Arc::new(self.duration_ms.finish()),
747                Arc::new(self.status_code.finish()),
748                Arc::new(self.status_message.finish()),
749                Arc::new(self.depth.finish()),
750                Arc::new(self.span_order.finish()),
751                Arc::new(self.path.finish()),
752                Arc::new(self.attributes.finish()),
753                Arc::new(self.events.finish()),
754                Arc::new(self.links.finish()),
755                Arc::new(self.input.finish()),
756                Arc::new(self.output.finish()),
757                Arc::new(self.search_blob.finish()),
758            ],
759        )?;
760
761        Ok(batch)
762    }
763}