Skip to main content

lance_namespace_impls/
connect.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connect functionality for Lance Namespace implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use lance::session::Session;
10use lance_core::{Error, Result};
11use lance_namespace::LanceNamespace;
12
13use crate::context::DynamicContextProvider;
14
15/// Builder for creating Lance namespace connections.
16///
17/// This builder provides a fluent API for configuring and establishing
18/// connections to Lance namespace implementations.
19///
20/// # Examples
21///
22/// ```no_run
23/// # use lance_namespace_impls::ConnectBuilder;
24/// # use std::collections::HashMap;
25/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26/// // Connect to directory implementation
27/// let namespace = ConnectBuilder::new("dir")
28///     .property("root", "/path/to/data")
29///     .property("storage.region", "us-west-2")
30///     .connect()
31///     .await?;
32/// # Ok(())
33/// # }
34/// ```
35///
36/// ```no_run
37/// # use lance_namespace_impls::ConnectBuilder;
38/// # use lance::session::Session;
39/// # use std::sync::Arc;
40/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
41/// // Connect with a shared session
42/// let session = Arc::new(Session::default());
43/// let namespace = ConnectBuilder::new("dir")
44///     .property("root", "/path/to/data")
45///     .session(session)
46///     .connect()
47///     .await?;
48/// # Ok(())
49/// # }
50/// ```
51///
52/// ## With Dynamic Context Provider
53///
54/// ```no_run
55/// # use lance_namespace_impls::{ConnectBuilder, DynamicContextProvider, OperationInfo};
56/// # use std::collections::HashMap;
57/// # use std::sync::Arc;
58/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
59/// #[derive(Debug)]
60/// struct MyProvider;
61///
62/// impl DynamicContextProvider for MyProvider {
63///     fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
64///         let mut ctx = HashMap::new();
65///         ctx.insert("headers.Authorization".to_string(), "Bearer token".to_string());
66///         ctx
67///     }
68/// }
69///
70/// let namespace = ConnectBuilder::new("rest")
71///     .property("uri", "https://api.example.com")
72///     .context_provider(Arc::new(MyProvider))
73///     .connect()
74///     .await?;
75/// # Ok(())
76/// # }
77/// ```
78#[derive(Clone)]
79pub struct ConnectBuilder {
80    impl_name: String,
81    properties: HashMap<String, String>,
82    session: Option<Arc<Session>>,
83    context_provider: Option<Arc<dyn DynamicContextProvider>>,
84}
85
86impl std::fmt::Debug for ConnectBuilder {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("ConnectBuilder")
89            .field("impl_name", &self.impl_name)
90            .field("properties", &self.properties)
91            .field("session", &self.session)
92            .field(
93                "context_provider",
94                &self.context_provider.as_ref().map(|_| "Some(...)"),
95            )
96            .finish()
97    }
98}
99
100impl ConnectBuilder {
101    /// Create a new ConnectBuilder for the specified implementation.
102    ///
103    /// # Arguments
104    ///
105    /// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
106    pub fn new(impl_name: impl Into<String>) -> Self {
107        Self {
108            impl_name: impl_name.into(),
109            properties: HashMap::new(),
110            session: None,
111            context_provider: None,
112        }
113    }
114
115    /// Add a configuration property.
116    ///
117    /// # Arguments
118    ///
119    /// * `key` - Property key
120    /// * `value` - Property value
121    pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
122        self.properties.insert(key.into(), value.into());
123        self
124    }
125
126    /// Add multiple configuration properties.
127    ///
128    /// # Arguments
129    ///
130    /// * `properties` - HashMap of properties to add
131    pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
132        self.properties.extend(properties);
133        self
134    }
135
136    /// Set the Lance session to use for this connection.
137    ///
138    /// When a session is provided, the namespace will reuse the session's
139    /// object store registry, allowing multiple namespaces and datasets
140    /// to share the same underlying storage connections.
141    ///
142    /// # Arguments
143    ///
144    /// * `session` - Arc-wrapped Lance session
145    pub fn session(mut self, session: Arc<Session>) -> Self {
146        self.session = Some(session);
147        self
148    }
149
150    /// Set a dynamic context provider for per-request context.
151    ///
152    /// The provider will be called before each operation to generate
153    /// additional context. For RestNamespace, context keys that start with
154    /// `headers.` are converted to HTTP headers by stripping the prefix.
155    ///
156    /// # Arguments
157    ///
158    /// * `provider` - The context provider implementation
159    pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
160        self.context_provider = Some(provider);
161        self
162    }
163
164    /// Build and establish the connection to the namespace.
165    ///
166    /// # Returns
167    ///
168    /// Returns a trait object implementing `LanceNamespace`.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if:
173    /// - The implementation type is not supported
174    /// - Required configuration properties are missing
175    /// - Connection to the backend fails
176    pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
177        match self.impl_name.as_str() {
178            #[cfg(feature = "rest")]
179            "rest" => {
180                // Create REST implementation (REST doesn't use session)
181                let mut builder =
182                    crate::rest::RestNamespaceBuilder::from_properties(self.properties)?;
183                if let Some(provider) = self.context_provider {
184                    builder = builder.context_provider(provider);
185                }
186                Ok(Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
187            }
188            #[cfg(not(feature = "rest"))]
189            "rest" => Err(Error::Namespace {
190                source: "REST namespace implementation requires 'rest' feature to be enabled"
191                    .into(),
192                location: snafu::location!(),
193            }),
194            "dir" => {
195                // Create directory implementation (always available)
196                let mut builder = crate::dir::DirectoryNamespaceBuilder::from_properties(
197                    self.properties,
198                    self.session,
199                )?;
200                if let Some(provider) = self.context_provider {
201                    builder = builder.context_provider(provider);
202                }
203                builder
204                    .build()
205                    .await
206                    .map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
207            }
208            _ => Err(Error::Namespace {
209                source: format!(
210                    "Implementation '{}' is not available. Supported: dir{}",
211                    self.impl_name,
212                    if cfg!(feature = "rest") { ", rest" } else { "" }
213                )
214                .into(),
215                location: snafu::location!(),
216            }),
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use lance_core::utils::tempfile::TempStdDir;
225    use lance_namespace::models::ListTablesRequest;
226    use std::collections::HashMap;
227
228    #[tokio::test]
229    async fn test_connect_builder_basic() {
230        let temp_dir = TempStdDir::default();
231
232        let namespace = ConnectBuilder::new("dir")
233            .property("root", temp_dir.to_str().unwrap())
234            .connect()
235            .await
236            .unwrap();
237
238        // Verify we can use the namespace
239        let mut request = ListTablesRequest::new();
240        request.id = Some(vec![]);
241        let response = namespace.list_tables(request).await.unwrap();
242        assert_eq!(response.tables.len(), 0);
243    }
244
245    #[tokio::test]
246    async fn test_connect_builder_with_properties() {
247        let temp_dir = TempStdDir::default();
248        let mut props = HashMap::new();
249        props.insert("storage.option1".to_string(), "value1".to_string());
250
251        let namespace = ConnectBuilder::new("dir")
252            .property("root", temp_dir.to_str().unwrap())
253            .properties(props)
254            .connect()
255            .await
256            .unwrap();
257
258        // Verify we can use the namespace
259        let mut request = ListTablesRequest::new();
260        request.id = Some(vec![]);
261        let response = namespace.list_tables(request).await.unwrap();
262        assert_eq!(response.tables.len(), 0);
263    }
264
265    #[tokio::test]
266    async fn test_connect_builder_with_session() {
267        let temp_dir = TempStdDir::default();
268        let session = Arc::new(Session::default());
269
270        let namespace = ConnectBuilder::new("dir")
271            .property("root", temp_dir.to_str().unwrap())
272            .session(session.clone())
273            .connect()
274            .await
275            .unwrap();
276
277        // Verify we can use the namespace
278        let mut request = ListTablesRequest::new();
279        request.id = Some(vec![]);
280        let response = namespace.list_tables(request).await.unwrap();
281        assert_eq!(response.tables.len(), 0);
282    }
283
284    #[tokio::test]
285    async fn test_connect_builder_invalid_impl() {
286        let result = ConnectBuilder::new("invalid")
287            .property("root", "/tmp")
288            .connect()
289            .await;
290
291        assert!(result.is_err());
292        let err = result.err().unwrap();
293        assert!(err.to_string().contains("not available"));
294    }
295}