Skip to main content

lance_namespace_impls/
connect.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connect functionality for Lance Namespace implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use lance::session::Session;
10use lance_core::{Error, Result};
11use lance_namespace::LanceNamespace;
12
13use crate::context::DynamicContextProvider;
14
15/// Builder for creating Lance namespace connections.
16///
17/// This builder provides a fluent API for configuring and establishing
18/// connections to Lance namespace implementations.
19///
20/// # Examples
21///
22/// ```no_run
23/// # use lance_namespace_impls::ConnectBuilder;
24/// # use std::collections::HashMap;
25/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26/// // Connect to directory implementation
27/// let namespace = ConnectBuilder::new("dir")
28///     .property("root", "/path/to/data")
29///     .property("storage.region", "us-west-2")
30///     .connect()
31///     .await?;
32/// # Ok(())
33/// # }
34/// ```
35///
36/// ```no_run
37/// # use lance_namespace_impls::ConnectBuilder;
38/// # use lance::session::Session;
39/// # use std::sync::Arc;
40/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
41/// // Connect with a shared session
42/// let session = Arc::new(Session::default());
43/// let namespace = ConnectBuilder::new("dir")
44///     .property("root", "/path/to/data")
45///     .session(session)
46///     .connect()
47///     .await?;
48/// # Ok(())
49/// # }
50/// ```
51///
52/// ## With Dynamic Context Provider
53///
54/// ```no_run
55/// # use lance_namespace_impls::{ConnectBuilder, DynamicContextProvider, OperationInfo};
56/// # use std::collections::HashMap;
57/// # use std::sync::Arc;
58/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
59/// #[derive(Debug)]
60/// struct MyProvider;
61///
62/// impl DynamicContextProvider for MyProvider {
63///     fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
64///         let mut ctx = HashMap::new();
65///         ctx.insert("headers.Authorization".to_string(), "Bearer token".to_string());
66///         ctx
67///     }
68/// }
69///
70/// let namespace = ConnectBuilder::new("rest")
71///     .property("uri", "https://api.example.com")
72///     .context_provider(Arc::new(MyProvider))
73///     .connect()
74///     .await?;
75/// # Ok(())
76/// # }
77/// ```
78#[derive(Clone)]
79pub struct ConnectBuilder {
80    impl_name: String,
81    properties: HashMap<String, String>,
82    session: Option<Arc<Session>>,
83    context_provider: Option<Arc<dyn DynamicContextProvider>>,
84}
85
86impl std::fmt::Debug for ConnectBuilder {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("ConnectBuilder")
89            .field("impl_name", &self.impl_name)
90            .field("properties", &self.properties)
91            .field("session", &self.session)
92            .field(
93                "context_provider",
94                &self.context_provider.as_ref().map(|_| "Some(...)"),
95            )
96            .finish()
97    }
98}
99
100impl ConnectBuilder {
101    /// Create a new ConnectBuilder for the specified implementation.
102    ///
103    /// # Arguments
104    ///
105    /// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
106    pub fn new(impl_name: impl Into<String>) -> Self {
107        Self {
108            impl_name: impl_name.into(),
109            properties: HashMap::new(),
110            session: None,
111            context_provider: None,
112        }
113    }
114
115    /// Add a configuration property.
116    ///
117    /// # Arguments
118    ///
119    /// * `key` - Property key
120    /// * `value` - Property value
121    pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
122        self.properties.insert(key.into(), value.into());
123        self
124    }
125
126    /// Add multiple configuration properties.
127    ///
128    /// # Arguments
129    ///
130    /// * `properties` - HashMap of properties to add
131    pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
132        self.properties.extend(properties);
133        self
134    }
135
136    /// Set the Lance session to use for this connection.
137    ///
138    /// When a session is provided, the namespace will reuse the session's
139    /// object store registry, allowing multiple namespaces and datasets
140    /// to share the same underlying storage connections.
141    ///
142    /// # Arguments
143    ///
144    /// * `session` - Arc-wrapped Lance session
145    pub fn session(mut self, session: Arc<Session>) -> Self {
146        self.session = Some(session);
147        self
148    }
149
150    /// Set a dynamic context provider for per-request context.
151    ///
152    /// The provider will be called before each operation to generate
153    /// additional context. For RestNamespace, context keys that start with
154    /// `headers.` are converted to HTTP headers by stripping the prefix.
155    ///
156    /// # Arguments
157    ///
158    /// * `provider` - The context provider implementation
159    pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
160        self.context_provider = Some(provider);
161        self
162    }
163
164    /// Build and establish the connection to the namespace.
165    ///
166    /// # Returns
167    ///
168    /// Returns a trait object implementing `LanceNamespace`.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if:
173    /// - The implementation type is not supported
174    /// - Required configuration properties are missing
175    /// - Connection to the backend fails
176    pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
177        match self.impl_name.as_str() {
178            #[cfg(feature = "rest")]
179            "rest" => {
180                // Create REST implementation (REST doesn't use session)
181                let mut builder =
182                    crate::rest::RestNamespaceBuilder::from_properties(self.properties)?;
183                if let Some(provider) = self.context_provider {
184                    builder = builder.context_provider(provider);
185                }
186                Ok(Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
187            }
188            #[cfg(not(feature = "rest"))]
189            "rest" => Err(Error::namespace_source(
190                "REST namespace implementation requires 'rest' feature to be enabled".into(),
191            )),
192            "dir" => {
193                // Create directory implementation (always available)
194                let mut builder = crate::dir::DirectoryNamespaceBuilder::from_properties(
195                    self.properties,
196                    self.session,
197                )?;
198                if let Some(provider) = self.context_provider {
199                    builder = builder.context_provider(provider);
200                }
201                builder
202                    .build()
203                    .await
204                    .map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
205            }
206            _ => Err(Error::namespace_source(
207                format!(
208                    "Implementation '{}' is not available. Supported: dir{}",
209                    self.impl_name,
210                    if cfg!(feature = "rest") { ", rest" } else { "" }
211                )
212                .into(),
213            )),
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use lance_core::utils::tempfile::TempStdDir;
222    use lance_namespace::models::ListTablesRequest;
223    use std::collections::HashMap;
224
225    #[tokio::test]
226    async fn test_connect_builder_basic() {
227        let temp_dir = TempStdDir::default();
228
229        let namespace = ConnectBuilder::new("dir")
230            .property("root", temp_dir.to_str().unwrap())
231            .connect()
232            .await
233            .unwrap();
234
235        // Verify we can use the namespace
236        let mut request = ListTablesRequest::new();
237        request.id = Some(vec![]);
238        let response = namespace.list_tables(request).await.unwrap();
239        assert_eq!(response.tables.len(), 0);
240    }
241
242    #[tokio::test]
243    async fn test_connect_builder_with_properties() {
244        let temp_dir = TempStdDir::default();
245        let mut props = HashMap::new();
246        props.insert("storage.option1".to_string(), "value1".to_string());
247
248        let namespace = ConnectBuilder::new("dir")
249            .property("root", temp_dir.to_str().unwrap())
250            .properties(props)
251            .connect()
252            .await
253            .unwrap();
254
255        // Verify we can use the namespace
256        let mut request = ListTablesRequest::new();
257        request.id = Some(vec![]);
258        let response = namespace.list_tables(request).await.unwrap();
259        assert_eq!(response.tables.len(), 0);
260    }
261
262    #[tokio::test]
263    async fn test_connect_builder_with_session() {
264        let temp_dir = TempStdDir::default();
265        let session = Arc::new(Session::default());
266
267        let namespace = ConnectBuilder::new("dir")
268            .property("root", temp_dir.to_str().unwrap())
269            .session(session.clone())
270            .connect()
271            .await
272            .unwrap();
273
274        // Verify we can use the namespace
275        let mut request = ListTablesRequest::new();
276        request.id = Some(vec![]);
277        let response = namespace.list_tables(request).await.unwrap();
278        assert_eq!(response.tables.len(), 0);
279    }
280
281    #[tokio::test]
282    async fn test_connect_builder_invalid_impl() {
283        let result = ConnectBuilder::new("invalid")
284            .property("root", "/tmp")
285            .connect()
286            .await;
287
288        assert!(result.is_err());
289        let err = result.err().unwrap();
290        assert!(err.to_string().contains("not available"));
291    }
292}