Skip to main content

lance_namespace_datafusion/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::any::Any;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use dashmap::DashMap;
9use datafusion::catalog::SchemaProvider;
10use datafusion::datasource::TableProvider;
11use datafusion::error::Result;
12
13use crate::error::to_datafusion_error;
14use crate::namespace_level::NamespaceLevel;
15use lance::datafusion::LanceTableProvider;
16
17/// A dynamic [`SchemaProvider`] backed directly by a [`NamespaceLevel`].
18///
19/// Exposes Lance tables in the namespace as [`LanceTableProvider`] instances,
20/// loaded on demand and cached by table name.
21#[derive(Debug, Clone)]
22pub struct LanceSchemaProvider {
23    ns_level: NamespaceLevel,
24    tables: DashMap<String, Arc<LanceTableProvider>>,
25}
26
27impl LanceSchemaProvider {
28    pub async fn try_new(namespace: NamespaceLevel) -> Result<Self> {
29        Ok(Self {
30            ns_level: namespace,
31            tables: DashMap::new(),
32        })
33    }
34
35    async fn load_and_cache_table(
36        &self,
37        table_name: &str,
38    ) -> Result<Option<Arc<dyn TableProvider>>> {
39        let dataset = self
40            .ns_level
41            .load_dataset(table_name)
42            .await
43            .map_err(to_datafusion_error)?;
44        let dataset = Arc::new(dataset);
45        let table_provider = Arc::new(LanceTableProvider::new(dataset, false, false));
46        self.tables
47            .insert(table_name.to_string(), Arc::clone(&table_provider));
48        Ok(Some(table_provider as Arc<dyn TableProvider>))
49    }
50}
51
52#[async_trait]
53impl SchemaProvider for LanceSchemaProvider {
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57
58    fn table_names(&self) -> Vec<String> {
59        self.tables
60            .iter()
61            .map(|entry| entry.key().clone())
62            .collect()
63    }
64
65    async fn table(&self, table_name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
66        if let Some(existing) = self.tables.get(table_name) {
67            // Reuse cached provider when still fresh; otherwise reload.
68            let ds = existing.dataset();
69            let latest = ds.latest_version_id().await.map_err(to_datafusion_error)?;
70            let is_stale = latest != ds.version().version;
71            if is_stale {
72                self.tables.remove(table_name);
73                self.load_and_cache_table(table_name).await
74            } else {
75                Ok(Some(Arc::clone(existing.value()) as Arc<dyn TableProvider>))
76            }
77        } else {
78            self.load_and_cache_table(table_name).await
79        }
80    }
81
82    fn table_exist(&self, name: &str) -> bool {
83        self.tables.contains_key(name)
84    }
85}