lance_namespace_impls/connect.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connect functionality for Lance Namespace implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use lance::session::Session;
10use lance_core::Result;
11use lance_namespace::LanceNamespace;
12use lance_namespace::error::NamespaceError;
13
14use crate::context::DynamicContextProvider;
15
16/// Builder for creating Lance namespace connections.
17///
18/// This builder provides a fluent API for configuring and establishing
19/// connections to Lance namespace implementations.
20///
21/// # Examples
22///
23/// ```no_run
24/// # use lance_namespace_impls::ConnectBuilder;
25/// # use std::collections::HashMap;
26/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27/// // Connect to directory implementation
28/// let namespace = ConnectBuilder::new("dir")
29/// .property("root", "/path/to/data")
30/// .property("storage.region", "us-west-2")
31/// .connect()
32/// .await?;
33/// # Ok(())
34/// # }
35/// ```
36///
37/// ```no_run
38/// # use lance_namespace_impls::ConnectBuilder;
39/// # use lance::session::Session;
40/// # use std::sync::Arc;
41/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
42/// // Connect with a shared session
43/// let session = Arc::new(Session::default());
44/// let namespace = ConnectBuilder::new("dir")
45/// .property("root", "/path/to/data")
46/// .session(session)
47/// .connect()
48/// .await?;
49/// # Ok(())
50/// # }
51/// ```
52///
53/// ## With Dynamic Context Provider
54///
55/// ```no_run
56/// # use lance_namespace_impls::{ConnectBuilder, DynamicContextProvider, OperationInfo};
57/// # use std::collections::HashMap;
58/// # use std::sync::Arc;
59/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60/// #[derive(Debug)]
61/// struct MyProvider;
62///
63/// impl DynamicContextProvider for MyProvider {
64/// fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
65/// let mut ctx = HashMap::new();
66/// ctx.insert("headers.Authorization".to_string(), "Bearer token".to_string());
67/// ctx
68/// }
69/// }
70///
71/// let namespace = ConnectBuilder::new("rest")
72/// .property("uri", "https://api.example.com")
73/// .context_provider(Arc::new(MyProvider))
74/// .connect()
75/// .await?;
76/// # Ok(())
77/// # }
78/// ```
79#[derive(Clone)]
80pub struct ConnectBuilder {
81 impl_name: String,
82 properties: HashMap<String, String>,
83 session: Option<Arc<Session>>,
84 context_provider: Option<Arc<dyn DynamicContextProvider>>,
85}
86
87impl std::fmt::Debug for ConnectBuilder {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("ConnectBuilder")
90 .field("impl_name", &self.impl_name)
91 .field("properties", &self.properties)
92 .field("session", &self.session)
93 .field(
94 "context_provider",
95 &self.context_provider.as_ref().map(|_| "Some(...)"),
96 )
97 .finish()
98 }
99}
100
101impl ConnectBuilder {
102 /// Create a new ConnectBuilder for the specified implementation.
103 ///
104 /// # Arguments
105 ///
106 /// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
107 pub fn new(impl_name: impl Into<String>) -> Self {
108 Self {
109 impl_name: impl_name.into(),
110 properties: HashMap::new(),
111 session: None,
112 context_provider: None,
113 }
114 }
115
116 /// Add a configuration property.
117 ///
118 /// # Arguments
119 ///
120 /// * `key` - Property key
121 /// * `value` - Property value
122 pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
123 self.properties.insert(key.into(), value.into());
124 self
125 }
126
127 /// Add multiple configuration properties.
128 ///
129 /// # Arguments
130 ///
131 /// * `properties` - HashMap of properties to add
132 pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
133 self.properties.extend(properties);
134 self
135 }
136
137 /// Set the Lance session to use for this connection.
138 ///
139 /// When a session is provided, the namespace will reuse the session's
140 /// object store registry, allowing multiple namespaces and datasets
141 /// to share the same underlying storage connections.
142 ///
143 /// # Arguments
144 ///
145 /// * `session` - Arc-wrapped Lance session
146 pub fn session(mut self, session: Arc<Session>) -> Self {
147 self.session = Some(session);
148 self
149 }
150
151 /// Set a dynamic context provider for per-request context.
152 ///
153 /// The provider will be called before each operation to generate
154 /// additional context. For RestNamespace, context keys that start with
155 /// `headers.` are converted to HTTP headers by stripping the prefix.
156 ///
157 /// # Arguments
158 ///
159 /// * `provider` - The context provider implementation
160 pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
161 self.context_provider = Some(provider);
162 self
163 }
164
165 /// Build and establish the connection to the namespace.
166 ///
167 /// # Returns
168 ///
169 /// Returns a trait object implementing `LanceNamespace`.
170 ///
171 /// # Errors
172 ///
173 /// Returns an error if:
174 /// - The implementation type is not supported
175 /// - Required configuration properties are missing
176 /// - Connection to the backend fails
177 pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
178 match self.impl_name.as_str() {
179 #[cfg(feature = "rest")]
180 "rest" => {
181 // Create REST implementation (REST doesn't use session)
182 let mut builder =
183 crate::rest::RestNamespaceBuilder::from_properties(self.properties)?;
184 if let Some(provider) = self.context_provider {
185 builder = builder.context_provider(provider);
186 }
187 Ok(Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
188 }
189 #[cfg(not(feature = "rest"))]
190 "rest" => Err(NamespaceError::Unsupported {
191 message: "REST namespace implementation requires 'rest' feature to be enabled"
192 .to_string(),
193 }
194 .into()),
195 "dir" => {
196 // Create directory implementation (always available)
197 let mut builder = crate::dir::DirectoryNamespaceBuilder::from_properties(
198 self.properties,
199 self.session,
200 )?;
201 if let Some(provider) = self.context_provider {
202 builder = builder.context_provider(provider);
203 }
204 builder
205 .build()
206 .await
207 .map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
208 }
209 _ => Err(NamespaceError::Unsupported {
210 message: format!(
211 "Implementation '{}' is not available. Supported: dir{}",
212 self.impl_name,
213 if cfg!(feature = "rest") { ", rest" } else { "" }
214 ),
215 }
216 .into()),
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use lance_core::utils::tempfile::TempStdDir;
225 use lance_namespace::models::ListTablesRequest;
226 use std::collections::HashMap;
227
228 #[tokio::test]
229 async fn test_connect_builder_basic() {
230 let temp_dir = TempStdDir::default();
231
232 let namespace = ConnectBuilder::new("dir")
233 .property("root", temp_dir.to_str().unwrap())
234 .connect()
235 .await
236 .unwrap();
237
238 // Verify we can use the namespace
239 let mut request = ListTablesRequest::new();
240 request.id = Some(vec![]);
241 let response = namespace.list_tables(request).await.unwrap();
242 assert_eq!(response.tables.len(), 0);
243 }
244
245 #[tokio::test]
246 async fn test_connect_builder_with_properties() {
247 let temp_dir = TempStdDir::default();
248 let mut props = HashMap::new();
249 props.insert("storage.option1".to_string(), "value1".to_string());
250
251 let namespace = ConnectBuilder::new("dir")
252 .property("root", temp_dir.to_str().unwrap())
253 .properties(props)
254 .connect()
255 .await
256 .unwrap();
257
258 // Verify we can use the namespace
259 let mut request = ListTablesRequest::new();
260 request.id = Some(vec![]);
261 let response = namespace.list_tables(request).await.unwrap();
262 assert_eq!(response.tables.len(), 0);
263 }
264
265 #[tokio::test]
266 async fn test_connect_builder_with_session() {
267 let temp_dir = TempStdDir::default();
268 let session = Arc::new(Session::default());
269
270 let namespace = ConnectBuilder::new("dir")
271 .property("root", temp_dir.to_str().unwrap())
272 .session(session.clone())
273 .connect()
274 .await
275 .unwrap();
276
277 // Verify we can use the namespace
278 let mut request = ListTablesRequest::new();
279 request.id = Some(vec![]);
280 let response = namespace.list_tables(request).await.unwrap();
281 assert_eq!(response.tables.len(), 0);
282 }
283
284 #[tokio::test]
285 async fn test_connect_builder_invalid_impl() {
286 let result = ConnectBuilder::new("invalid")
287 .property("root", "/tmp")
288 .connect()
289 .await;
290
291 assert!(result.is_err());
292 let err = result.err().unwrap();
293 assert!(err.to_string().contains("not available"));
294 }
295}