Skip to main content

lance_namespace_impls/
connect.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connect functionality for Lance Namespace implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use lance::session::Session;
10use lance_core::Result;
11use lance_namespace::LanceNamespace;
12use lance_namespace::error::NamespaceError;
13
14use crate::context::DynamicContextProvider;
15
16/// Builder for creating Lance namespace connections.
17///
18/// This builder provides a fluent API for configuring and establishing
19/// connections to Lance namespace implementations.
20///
21/// # Examples
22///
23/// ```no_run
24/// # use lance_namespace_impls::ConnectBuilder;
25/// # use std::collections::HashMap;
26/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27/// // Connect to directory implementation
28/// let namespace = ConnectBuilder::new("dir")
29///     .property("root", "/path/to/data")
30///     .property("storage.region", "us-west-2")
31///     .connect()
32///     .await?;
33/// # Ok(())
34/// # }
35/// ```
36///
37/// ```no_run
38/// # use lance_namespace_impls::ConnectBuilder;
39/// # use lance::session::Session;
40/// # use std::sync::Arc;
41/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
42/// // Connect with a shared session
43/// let session = Arc::new(Session::default());
44/// let namespace = ConnectBuilder::new("dir")
45///     .property("root", "/path/to/data")
46///     .session(session)
47///     .connect()
48///     .await?;
49/// # Ok(())
50/// # }
51/// ```
52///
53/// ## With Dynamic Context Provider
54///
55/// ```no_run
56/// # use lance_namespace_impls::{ConnectBuilder, DynamicContextProvider, OperationInfo};
57/// # use std::collections::HashMap;
58/// # use std::sync::Arc;
59/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60/// #[derive(Debug)]
61/// struct MyProvider;
62///
63/// impl DynamicContextProvider for MyProvider {
64///     fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
65///         let mut ctx = HashMap::new();
66///         ctx.insert("headers.Authorization".to_string(), "Bearer token".to_string());
67///         ctx
68///     }
69/// }
70///
71/// let namespace = ConnectBuilder::new("rest")
72///     .property("uri", "https://api.example.com")
73///     .context_provider(Arc::new(MyProvider))
74///     .connect()
75///     .await?;
76/// # Ok(())
77/// # }
78/// ```
79#[derive(Clone)]
80pub struct ConnectBuilder {
81    impl_name: String,
82    properties: HashMap<String, String>,
83    session: Option<Arc<Session>>,
84    context_provider: Option<Arc<dyn DynamicContextProvider>>,
85}
86
87impl std::fmt::Debug for ConnectBuilder {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_struct("ConnectBuilder")
90            .field("impl_name", &self.impl_name)
91            .field("properties", &self.properties)
92            .field("session", &self.session)
93            .field(
94                "context_provider",
95                &self.context_provider.as_ref().map(|_| "Some(...)"),
96            )
97            .finish()
98    }
99}
100
101impl ConnectBuilder {
102    /// Create a new ConnectBuilder for the specified implementation.
103    ///
104    /// # Arguments
105    ///
106    /// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
107    pub fn new(impl_name: impl Into<String>) -> Self {
108        Self {
109            impl_name: impl_name.into(),
110            properties: HashMap::new(),
111            session: None,
112            context_provider: None,
113        }
114    }
115
116    /// Add a configuration property.
117    ///
118    /// # Arguments
119    ///
120    /// * `key` - Property key
121    /// * `value` - Property value
122    pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
123        self.properties.insert(key.into(), value.into());
124        self
125    }
126
127    /// Add multiple configuration properties.
128    ///
129    /// # Arguments
130    ///
131    /// * `properties` - HashMap of properties to add
132    pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
133        self.properties.extend(properties);
134        self
135    }
136
137    /// Set the Lance session to use for this connection.
138    ///
139    /// When a session is provided, the namespace will reuse the session's
140    /// object store registry, allowing multiple namespaces and datasets
141    /// to share the same underlying storage connections.
142    ///
143    /// # Arguments
144    ///
145    /// * `session` - Arc-wrapped Lance session
146    pub fn session(mut self, session: Arc<Session>) -> Self {
147        self.session = Some(session);
148        self
149    }
150
151    /// Set a dynamic context provider for per-request context.
152    ///
153    /// The provider will be called before each operation to generate
154    /// additional context. For RestNamespace, context keys that start with
155    /// `headers.` are converted to HTTP headers by stripping the prefix.
156    ///
157    /// # Arguments
158    ///
159    /// * `provider` - The context provider implementation
160    pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
161        self.context_provider = Some(provider);
162        self
163    }
164
165    /// Build and establish the connection to the namespace.
166    ///
167    /// # Returns
168    ///
169    /// Returns a trait object implementing `LanceNamespace`.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if:
174    /// - The implementation type is not supported
175    /// - Required configuration properties are missing
176    /// - Connection to the backend fails
177    pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
178        match self.impl_name.as_str() {
179            #[cfg(feature = "rest")]
180            "rest" => {
181                // Create REST implementation (REST doesn't use session)
182                let mut builder =
183                    crate::rest::RestNamespaceBuilder::from_properties(self.properties)?;
184                if let Some(provider) = self.context_provider {
185                    builder = builder.context_provider(provider);
186                }
187                Ok(Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
188            }
189            #[cfg(not(feature = "rest"))]
190            "rest" => Err(NamespaceError::Unsupported {
191                message: "REST namespace implementation requires 'rest' feature to be enabled"
192                    .to_string(),
193            }
194            .into()),
195            "dir" => {
196                // Create directory implementation (always available)
197                let mut builder = crate::dir::DirectoryNamespaceBuilder::from_properties(
198                    self.properties,
199                    self.session,
200                )?;
201                if let Some(provider) = self.context_provider {
202                    builder = builder.context_provider(provider);
203                }
204                builder
205                    .build()
206                    .await
207                    .map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
208            }
209            _ => Err(NamespaceError::Unsupported {
210                message: format!(
211                    "Implementation '{}' is not available. Supported: dir{}",
212                    self.impl_name,
213                    if cfg!(feature = "rest") { ", rest" } else { "" }
214                ),
215            }
216            .into()),
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use lance_core::utils::tempfile::TempStdDir;
225    use lance_namespace::models::ListTablesRequest;
226    use std::collections::HashMap;
227
228    #[tokio::test]
229    async fn test_connect_builder_basic() {
230        let temp_dir = TempStdDir::default();
231
232        let namespace = ConnectBuilder::new("dir")
233            .property("root", temp_dir.to_str().unwrap())
234            .connect()
235            .await
236            .unwrap();
237
238        // Verify we can use the namespace
239        let mut request = ListTablesRequest::new();
240        request.id = Some(vec![]);
241        let response = namespace.list_tables(request).await.unwrap();
242        assert_eq!(response.tables.len(), 0);
243    }
244
245    #[tokio::test]
246    async fn test_connect_builder_with_properties() {
247        let temp_dir = TempStdDir::default();
248        let mut props = HashMap::new();
249        props.insert("storage.option1".to_string(), "value1".to_string());
250
251        let namespace = ConnectBuilder::new("dir")
252            .property("root", temp_dir.to_str().unwrap())
253            .properties(props)
254            .connect()
255            .await
256            .unwrap();
257
258        // Verify we can use the namespace
259        let mut request = ListTablesRequest::new();
260        request.id = Some(vec![]);
261        let response = namespace.list_tables(request).await.unwrap();
262        assert_eq!(response.tables.len(), 0);
263    }
264
265    #[tokio::test]
266    async fn test_connect_builder_with_session() {
267        let temp_dir = TempStdDir::default();
268        let session = Arc::new(Session::default());
269
270        let namespace = ConnectBuilder::new("dir")
271            .property("root", temp_dir.to_str().unwrap())
272            .session(session.clone())
273            .connect()
274            .await
275            .unwrap();
276
277        // Verify we can use the namespace
278        let mut request = ListTablesRequest::new();
279        request.id = Some(vec![]);
280        let response = namespace.list_tables(request).await.unwrap();
281        assert_eq!(response.tables.len(), 0);
282    }
283
284    #[tokio::test]
285    async fn test_connect_builder_invalid_impl() {
286        let result = ConnectBuilder::new("invalid")
287            .property("root", "/tmp")
288            .connect()
289            .await;
290
291        assert!(result.is_err());
292        let err = result.err().unwrap();
293        assert!(err.to_string().contains("not available"));
294    }
295}