Skip to main content

scouter_dataframe/parquet/tracing/
catalog.rs

1use async_trait::async_trait;
2use dashmap::DashMap;
3use datafusion::catalog::{CatalogProvider, SchemaProvider};
4use datafusion::common::DataFusionError;
5use datafusion::datasource::TableProvider;
6use std::any::Any;
7use std::sync::Arc;
8
9/// Flat table registry for the tracing engine's "default" schema.
10///
11/// Backed by a `DashMap` — `DashMap::insert()` is a single atomic operation,
12/// so concurrent readers either see the old `TableProvider` (already planning)
13/// or the new one, but never "not found" during the swap window.
14#[derive(Debug, Default)]
15pub struct TraceSchemaProvider {
16    tables: DashMap<String, Arc<dyn TableProvider>>,
17}
18
19impl TraceSchemaProvider {
20    pub fn new() -> Self {
21        Self {
22            tables: DashMap::new(),
23        }
24    }
25
26    /// Atomically swap the `TableProvider` for `name`.
27    pub fn swap(&self, name: &str, provider: Arc<dyn TableProvider>) {
28        self.tables.insert(name.to_string(), provider);
29    }
30}
31
32#[async_trait]
33impl SchemaProvider for TraceSchemaProvider {
34    fn as_any(&self) -> &dyn Any {
35        self
36    }
37
38    fn table_names(&self) -> Vec<String> {
39        self.tables.iter().map(|e| e.key().clone()).collect()
40    }
41
42    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
43        Ok(self.tables.get(name).map(|v| Arc::clone(v.value())))
44    }
45
46    fn table_exist(&self, name: &str) -> bool {
47        self.tables.contains_key(name)
48    }
49
50    fn register_table(
51        &self,
52        name: String,
53        table: Arc<dyn TableProvider>,
54    ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
55        Ok(self.tables.insert(name, table))
56    }
57
58    fn deregister_table(
59        &self,
60        name: &str,
61    ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
62        Ok(self.tables.remove(name).map(|(_, t)| t))
63    }
64}
65
66/// Catalog provider for the tracing engine.
67///
68/// Wraps a single "default" schema backed by `TraceSchemaProvider` (DashMap).
69/// Registered on the shared `SessionContext` as `"scouter_tracing"` — when the
70/// session is configured with `with_default_catalog_and_schema("scouter_tracing", "default")`,
71/// unqualified table names (`trace_spans`, `trace_summaries`) resolve through this
72/// catalog's DashMap automatically. No SQL changes are needed in queries.
73///
74/// # Atomic swap invariant
75///
76/// Both `TraceSpanDBEngine` and `TraceSummaryDBEngine` call `swap()` to update
77/// their respective table providers. A single `DashMap::insert()` is atomic —
78/// there is no window where a planning query can see "table not found" between
79/// the old provider being removed and the new one being registered.
80#[derive(Debug)]
81pub struct TraceCatalogProvider {
82    schema: Arc<TraceSchemaProvider>,
83}
84
85impl TraceCatalogProvider {
86    pub fn new() -> Self {
87        Self {
88            schema: Arc::new(TraceSchemaProvider::new()),
89        }
90    }
91
92    /// Atomically swap the `TableProvider` for `name`.
93    ///
94    /// Use this instead of `ctx.deregister_table()` + `ctx.register_table()` —
95    /// those two calls leave a window where the table appears absent.
96    pub fn swap(&self, name: &str, provider: Arc<dyn TableProvider>) {
97        self.schema.swap(name, provider);
98    }
99}
100
101impl Default for TraceCatalogProvider {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl CatalogProvider for TraceCatalogProvider {
108    fn as_any(&self) -> &dyn Any {
109        self
110    }
111
112    fn schema_names(&self) -> Vec<String> {
113        vec!["default".to_string()]
114    }
115
116    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
117        if name == "default" {
118            Some(Arc::clone(&self.schema) as Arc<dyn SchemaProvider>)
119        } else {
120            None
121        }
122    }
123}