lance_namespace_datafusion/
schema.rs1use std::any::Any;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use dashmap::DashMap;
9use datafusion::catalog::SchemaProvider;
10use datafusion::datasource::TableProvider;
11use datafusion::error::Result;
12
13use crate::error::to_datafusion_error;
14use crate::namespace_level::NamespaceLevel;
15use lance::datafusion::LanceTableProvider;
16
17#[derive(Debug, Clone)]
22pub struct LanceSchemaProvider {
23 ns_level: NamespaceLevel,
24 tables: DashMap<String, Arc<LanceTableProvider>>,
25}
26
27impl LanceSchemaProvider {
28 pub async fn try_new(namespace: NamespaceLevel) -> Result<Self> {
29 Ok(Self {
30 ns_level: namespace,
31 tables: DashMap::new(),
32 })
33 }
34
35 async fn load_and_cache_table(
36 &self,
37 table_name: &str,
38 ) -> Result<Option<Arc<dyn TableProvider>>> {
39 let dataset = self
40 .ns_level
41 .load_dataset(table_name)
42 .await
43 .map_err(to_datafusion_error)?;
44 let dataset = Arc::new(dataset);
45 let table_provider = Arc::new(LanceTableProvider::new(dataset, false, false));
46 self.tables
47 .insert(table_name.to_string(), Arc::clone(&table_provider));
48 Ok(Some(table_provider as Arc<dyn TableProvider>))
49 }
50}
51
52#[async_trait]
53impl SchemaProvider for LanceSchemaProvider {
54 fn as_any(&self) -> &dyn Any {
55 self
56 }
57
58 fn table_names(&self) -> Vec<String> {
59 self.tables
60 .iter()
61 .map(|entry| entry.key().clone())
62 .collect()
63 }
64
65 async fn table(&self, table_name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
66 if let Some(existing) = self.tables.get(table_name) {
67 let ds = existing.dataset();
69 let latest = ds.latest_version_id().await.map_err(to_datafusion_error)?;
70 let is_stale = latest != ds.version().version;
71 if is_stale {
72 self.tables.remove(table_name);
73 self.load_and_cache_table(table_name).await
74 } else {
75 Ok(Some(Arc::clone(existing.value()) as Arc<dyn TableProvider>))
76 }
77 } else {
78 self.load_and_cache_table(table_name).await
79 }
80 }
81
82 fn table_exist(&self, name: &str) -> bool {
83 self.tables.contains_key(name)
84 }
85}