Skip to main content

lance_namespace_impls/
connect.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connect functionality for Lance Namespace implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use lance::session::Session;
10use lance_core::{Error, Result};
11use lance_namespace::LanceNamespace;
12
13/// Builder for creating Lance namespace connections.
14///
15/// This builder provides a fluent API for configuring and establishing
16/// connections to Lance namespace implementations.
17///
18/// # Examples
19///
20/// ```no_run
21/// # use lance_namespace_impls::ConnectBuilder;
22/// # use std::collections::HashMap;
23/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24/// // Connect to directory implementation
25/// let namespace = ConnectBuilder::new("dir")
26///     .property("root", "/path/to/data")
27///     .property("storage.region", "us-west-2")
28///     .connect()
29///     .await?;
30/// # Ok(())
31/// # }
32/// ```
33///
34/// ```no_run
35/// # use lance_namespace_impls::ConnectBuilder;
36/// # use lance::session::Session;
37/// # use std::sync::Arc;
38/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
39/// // Connect with a shared session
40/// let session = Arc::new(Session::default());
41/// let namespace = ConnectBuilder::new("dir")
42///     .property("root", "/path/to/data")
43///     .session(session)
44///     .connect()
45///     .await?;
46/// # Ok(())
47/// # }
48/// ```
49#[derive(Debug, Clone)]
50pub struct ConnectBuilder {
51    impl_name: String,
52    properties: HashMap<String, String>,
53    session: Option<Arc<Session>>,
54}
55
56impl ConnectBuilder {
57    /// Create a new ConnectBuilder for the specified implementation.
58    ///
59    /// # Arguments
60    ///
61    /// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
62    pub fn new(impl_name: impl Into<String>) -> Self {
63        Self {
64            impl_name: impl_name.into(),
65            properties: HashMap::new(),
66            session: None,
67        }
68    }
69
70    /// Add a configuration property.
71    ///
72    /// # Arguments
73    ///
74    /// * `key` - Property key
75    /// * `value` - Property value
76    pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
77        self.properties.insert(key.into(), value.into());
78        self
79    }
80
81    /// Add multiple configuration properties.
82    ///
83    /// # Arguments
84    ///
85    /// * `properties` - HashMap of properties to add
86    pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
87        self.properties.extend(properties);
88        self
89    }
90
91    /// Set the Lance session to use for this connection.
92    ///
93    /// When a session is provided, the namespace will reuse the session's
94    /// object store registry, allowing multiple namespaces and datasets
95    /// to share the same underlying storage connections.
96    ///
97    /// # Arguments
98    ///
99    /// * `session` - Arc-wrapped Lance session
100    pub fn session(mut self, session: Arc<Session>) -> Self {
101        self.session = Some(session);
102        self
103    }
104
105    /// Build and establish the connection to the namespace.
106    ///
107    /// # Returns
108    ///
109    /// Returns a trait object implementing `LanceNamespace`.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if:
114    /// - The implementation type is not supported
115    /// - Required configuration properties are missing
116    /// - Connection to the backend fails
117    pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
118        match self.impl_name.as_str() {
119            #[cfg(feature = "rest")]
120            "rest" => {
121                // Create REST implementation (REST doesn't use session)
122                crate::rest::RestNamespaceBuilder::from_properties(self.properties)
123                    .map(|builder| Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
124            }
125            #[cfg(not(feature = "rest"))]
126            "rest" => Err(Error::Namespace {
127                source: "REST namespace implementation requires 'rest' feature to be enabled"
128                    .into(),
129                location: snafu::location!(),
130            }),
131            "dir" => {
132                // Create directory implementation (always available)
133                crate::dir::DirectoryNamespaceBuilder::from_properties(
134                    self.properties,
135                    self.session,
136                )?
137                .build()
138                .await
139                .map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
140            }
141            _ => Err(Error::Namespace {
142                source: format!(
143                    "Implementation '{}' is not available. Supported: dir{}",
144                    self.impl_name,
145                    if cfg!(feature = "rest") { ", rest" } else { "" }
146                )
147                .into(),
148                location: snafu::location!(),
149            }),
150        }
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use lance_core::utils::tempfile::TempStdDir;
158    use lance_namespace::models::ListTablesRequest;
159    use std::collections::HashMap;
160
161    #[tokio::test]
162    async fn test_connect_builder_basic() {
163        let temp_dir = TempStdDir::default();
164
165        let namespace = ConnectBuilder::new("dir")
166            .property("root", temp_dir.to_str().unwrap())
167            .connect()
168            .await
169            .unwrap();
170
171        // Verify we can use the namespace
172        let mut request = ListTablesRequest::new();
173        request.id = Some(vec![]);
174        let response = namespace.list_tables(request).await.unwrap();
175        assert_eq!(response.tables.len(), 0);
176    }
177
178    #[tokio::test]
179    async fn test_connect_builder_with_properties() {
180        let temp_dir = TempStdDir::default();
181        let mut props = HashMap::new();
182        props.insert("storage.option1".to_string(), "value1".to_string());
183
184        let namespace = ConnectBuilder::new("dir")
185            .property("root", temp_dir.to_str().unwrap())
186            .properties(props)
187            .connect()
188            .await
189            .unwrap();
190
191        // Verify we can use the namespace
192        let mut request = ListTablesRequest::new();
193        request.id = Some(vec![]);
194        let response = namespace.list_tables(request).await.unwrap();
195        assert_eq!(response.tables.len(), 0);
196    }
197
198    #[tokio::test]
199    async fn test_connect_builder_with_session() {
200        let temp_dir = TempStdDir::default();
201        let session = Arc::new(Session::default());
202
203        let namespace = ConnectBuilder::new("dir")
204            .property("root", temp_dir.to_str().unwrap())
205            .session(session.clone())
206            .connect()
207            .await
208            .unwrap();
209
210        // Verify we can use the namespace
211        let mut request = ListTablesRequest::new();
212        request.id = Some(vec![]);
213        let response = namespace.list_tables(request).await.unwrap();
214        assert_eq!(response.tables.len(), 0);
215    }
216
217    #[tokio::test]
218    async fn test_connect_builder_invalid_impl() {
219        let result = ConnectBuilder::new("invalid")
220            .property("root", "/tmp")
221            .connect()
222            .await;
223
224        assert!(result.is_err());
225        let err = result.err().unwrap();
226        assert!(err.to_string().contains("not available"));
227    }
228}