lance_namespace_impls/connect.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Connect functionality for Lance Namespace implementations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use lance::session::Session;
10use lance_core::{Error, Result};
11use lance_namespace::LanceNamespace;
12
13use crate::context::DynamicContextProvider;
14
15/// Builder for creating Lance namespace connections.
16///
17/// This builder provides a fluent API for configuring and establishing
18/// connections to Lance namespace implementations.
19///
20/// # Examples
21///
22/// ```no_run
23/// # use lance_namespace_impls::ConnectBuilder;
24/// # use std::collections::HashMap;
25/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26/// // Connect to directory implementation
27/// let namespace = ConnectBuilder::new("dir")
28/// .property("root", "/path/to/data")
29/// .property("storage.region", "us-west-2")
30/// .connect()
31/// .await?;
32/// # Ok(())
33/// # }
34/// ```
35///
36/// ```no_run
37/// # use lance_namespace_impls::ConnectBuilder;
38/// # use lance::session::Session;
39/// # use std::sync::Arc;
40/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
41/// // Connect with a shared session
42/// let session = Arc::new(Session::default());
43/// let namespace = ConnectBuilder::new("dir")
44/// .property("root", "/path/to/data")
45/// .session(session)
46/// .connect()
47/// .await?;
48/// # Ok(())
49/// # }
50/// ```
51///
52/// ## With Dynamic Context Provider
53///
54/// ```no_run
55/// # use lance_namespace_impls::{ConnectBuilder, DynamicContextProvider, OperationInfo};
56/// # use std::collections::HashMap;
57/// # use std::sync::Arc;
58/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
59/// #[derive(Debug)]
60/// struct MyProvider;
61///
62/// impl DynamicContextProvider for MyProvider {
63/// fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
64/// let mut ctx = HashMap::new();
65/// ctx.insert("headers.Authorization".to_string(), "Bearer token".to_string());
66/// ctx
67/// }
68/// }
69///
70/// let namespace = ConnectBuilder::new("rest")
71/// .property("uri", "https://api.example.com")
72/// .context_provider(Arc::new(MyProvider))
73/// .connect()
74/// .await?;
75/// # Ok(())
76/// # }
77/// ```
78#[derive(Clone)]
79pub struct ConnectBuilder {
80 impl_name: String,
81 properties: HashMap<String, String>,
82 session: Option<Arc<Session>>,
83 context_provider: Option<Arc<dyn DynamicContextProvider>>,
84}
85
86impl std::fmt::Debug for ConnectBuilder {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("ConnectBuilder")
89 .field("impl_name", &self.impl_name)
90 .field("properties", &self.properties)
91 .field("session", &self.session)
92 .field(
93 "context_provider",
94 &self.context_provider.as_ref().map(|_| "Some(...)"),
95 )
96 .finish()
97 }
98}
99
100impl ConnectBuilder {
101 /// Create a new ConnectBuilder for the specified implementation.
102 ///
103 /// # Arguments
104 ///
105 /// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
106 pub fn new(impl_name: impl Into<String>) -> Self {
107 Self {
108 impl_name: impl_name.into(),
109 properties: HashMap::new(),
110 session: None,
111 context_provider: None,
112 }
113 }
114
115 /// Add a configuration property.
116 ///
117 /// # Arguments
118 ///
119 /// * `key` - Property key
120 /// * `value` - Property value
121 pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
122 self.properties.insert(key.into(), value.into());
123 self
124 }
125
126 /// Add multiple configuration properties.
127 ///
128 /// # Arguments
129 ///
130 /// * `properties` - HashMap of properties to add
131 pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
132 self.properties.extend(properties);
133 self
134 }
135
136 /// Set the Lance session to use for this connection.
137 ///
138 /// When a session is provided, the namespace will reuse the session's
139 /// object store registry, allowing multiple namespaces and datasets
140 /// to share the same underlying storage connections.
141 ///
142 /// # Arguments
143 ///
144 /// * `session` - Arc-wrapped Lance session
145 pub fn session(mut self, session: Arc<Session>) -> Self {
146 self.session = Some(session);
147 self
148 }
149
150 /// Set a dynamic context provider for per-request context.
151 ///
152 /// The provider will be called before each operation to generate
153 /// additional context. For RestNamespace, context keys that start with
154 /// `headers.` are converted to HTTP headers by stripping the prefix.
155 ///
156 /// # Arguments
157 ///
158 /// * `provider` - The context provider implementation
159 pub fn context_provider(mut self, provider: Arc<dyn DynamicContextProvider>) -> Self {
160 self.context_provider = Some(provider);
161 self
162 }
163
164 /// Build and establish the connection to the namespace.
165 ///
166 /// # Returns
167 ///
168 /// Returns a trait object implementing `LanceNamespace`.
169 ///
170 /// # Errors
171 ///
172 /// Returns an error if:
173 /// - The implementation type is not supported
174 /// - Required configuration properties are missing
175 /// - Connection to the backend fails
176 pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
177 match self.impl_name.as_str() {
178 #[cfg(feature = "rest")]
179 "rest" => {
180 // Create REST implementation (REST doesn't use session)
181 let mut builder =
182 crate::rest::RestNamespaceBuilder::from_properties(self.properties)?;
183 if let Some(provider) = self.context_provider {
184 builder = builder.context_provider(provider);
185 }
186 Ok(Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
187 }
188 #[cfg(not(feature = "rest"))]
189 "rest" => Err(Error::Namespace {
190 source: "REST namespace implementation requires 'rest' feature to be enabled"
191 .into(),
192 location: snafu::location!(),
193 }),
194 "dir" => {
195 // Create directory implementation (always available)
196 let mut builder = crate::dir::DirectoryNamespaceBuilder::from_properties(
197 self.properties,
198 self.session,
199 )?;
200 if let Some(provider) = self.context_provider {
201 builder = builder.context_provider(provider);
202 }
203 builder
204 .build()
205 .await
206 .map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
207 }
208 _ => Err(Error::Namespace {
209 source: format!(
210 "Implementation '{}' is not available. Supported: dir{}",
211 self.impl_name,
212 if cfg!(feature = "rest") { ", rest" } else { "" }
213 )
214 .into(),
215 location: snafu::location!(),
216 }),
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use lance_core::utils::tempfile::TempStdDir;
225 use lance_namespace::models::ListTablesRequest;
226 use std::collections::HashMap;
227
228 #[tokio::test]
229 async fn test_connect_builder_basic() {
230 let temp_dir = TempStdDir::default();
231
232 let namespace = ConnectBuilder::new("dir")
233 .property("root", temp_dir.to_str().unwrap())
234 .connect()
235 .await
236 .unwrap();
237
238 // Verify we can use the namespace
239 let mut request = ListTablesRequest::new();
240 request.id = Some(vec![]);
241 let response = namespace.list_tables(request).await.unwrap();
242 assert_eq!(response.tables.len(), 0);
243 }
244
245 #[tokio::test]
246 async fn test_connect_builder_with_properties() {
247 let temp_dir = TempStdDir::default();
248 let mut props = HashMap::new();
249 props.insert("storage.option1".to_string(), "value1".to_string());
250
251 let namespace = ConnectBuilder::new("dir")
252 .property("root", temp_dir.to_str().unwrap())
253 .properties(props)
254 .connect()
255 .await
256 .unwrap();
257
258 // Verify we can use the namespace
259 let mut request = ListTablesRequest::new();
260 request.id = Some(vec![]);
261 let response = namespace.list_tables(request).await.unwrap();
262 assert_eq!(response.tables.len(), 0);
263 }
264
265 #[tokio::test]
266 async fn test_connect_builder_with_session() {
267 let temp_dir = TempStdDir::default();
268 let session = Arc::new(Session::default());
269
270 let namespace = ConnectBuilder::new("dir")
271 .property("root", temp_dir.to_str().unwrap())
272 .session(session.clone())
273 .connect()
274 .await
275 .unwrap();
276
277 // Verify we can use the namespace
278 let mut request = ListTablesRequest::new();
279 request.id = Some(vec![]);
280 let response = namespace.list_tables(request).await.unwrap();
281 assert_eq!(response.tables.len(), 0);
282 }
283
284 #[tokio::test]
285 async fn test_connect_builder_invalid_impl() {
286 let result = ConnectBuilder::new("invalid")
287 .property("root", "/tmp")
288 .connect()
289 .await;
290
291 assert!(result.is_err());
292 let err = result.err().unwrap();
293 assert!(err.to_string().contains("not available"));
294 }
295}