scouter_dataframe/parquet/tracing/
catalog.rs1use async_trait::async_trait;
2use dashmap::DashMap;
3use datafusion::catalog::{CatalogProvider, SchemaProvider};
4use datafusion::common::DataFusionError;
5use datafusion::datasource::TableProvider;
6use std::any::Any;
7use std::sync::Arc;
8
9#[derive(Debug, Default)]
15pub struct TraceSchemaProvider {
16 tables: DashMap<String, Arc<dyn TableProvider>>,
17}
18
19impl TraceSchemaProvider {
20 pub fn new() -> Self {
21 Self {
22 tables: DashMap::new(),
23 }
24 }
25
26 pub fn swap(&self, name: &str, provider: Arc<dyn TableProvider>) {
28 self.tables.insert(name.to_string(), provider);
29 }
30}
31
32#[async_trait]
33impl SchemaProvider for TraceSchemaProvider {
34 fn as_any(&self) -> &dyn Any {
35 self
36 }
37
38 fn table_names(&self) -> Vec<String> {
39 self.tables.iter().map(|e| e.key().clone()).collect()
40 }
41
42 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
43 Ok(self.tables.get(name).map(|v| Arc::clone(v.value())))
44 }
45
46 fn table_exist(&self, name: &str) -> bool {
47 self.tables.contains_key(name)
48 }
49
50 fn register_table(
51 &self,
52 name: String,
53 table: Arc<dyn TableProvider>,
54 ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
55 Ok(self.tables.insert(name, table))
56 }
57
58 fn deregister_table(
59 &self,
60 name: &str,
61 ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
62 Ok(self.tables.remove(name).map(|(_, t)| t))
63 }
64}
65
66#[derive(Debug)]
81pub struct TraceCatalogProvider {
82 schema: Arc<TraceSchemaProvider>,
83}
84
85impl TraceCatalogProvider {
86 pub fn new() -> Self {
87 Self {
88 schema: Arc::new(TraceSchemaProvider::new()),
89 }
90 }
91
92 pub fn swap(&self, name: &str, provider: Arc<dyn TableProvider>) {
97 self.schema.swap(name, provider);
98 }
99}
100
101impl Default for TraceCatalogProvider {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl CatalogProvider for TraceCatalogProvider {
108 fn as_any(&self) -> &dyn Any {
109 self
110 }
111
112 fn schema_names(&self) -> Vec<String> {
113 vec!["default".to_string()]
114 }
115
116 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
117 if name == "default" {
118 Some(Arc::clone(&self.schema) as Arc<dyn SchemaProvider>)
119 } else {
120 None
121 }
122 }
123}