1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::{Error, Result};
12use lance_io::object_store::ObjectStore;
13use log::warn;
14use object_store::ObjectMeta;
15use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
16use snafu::location;
17
18use super::{
19 current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
20 ManifestNamingScheme, MANIFEST_EXTENSION,
21};
22use crate::format::{Index, Manifest};
23use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
24
25#[async_trait]
38pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
39 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
41
42 async fn get_manifest_location(
43 &self,
44 base_uri: &str,
45 version: u64,
46 ) -> Result<ManifestLocation> {
47 let path = self.get(base_uri, version).await?;
48 let path = Path::from(path);
49 let naming_scheme = detect_naming_scheme_from_path(&path)?;
50 Ok(ManifestLocation {
51 version,
52 path,
53 size: None,
54 naming_scheme,
55 })
56 }
57
58 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
62
63 async fn get_latest_manifest_location(
69 &self,
70 base_uri: &str,
71 ) -> Result<Option<ManifestLocation>> {
72 self.get_latest_version(base_uri).await.and_then(|res| {
73 res.map(|(version, uri)| {
74 let path = Path::from(uri);
75 let naming_scheme = detect_naming_scheme_from_path(&path)?;
76 Ok(ManifestLocation {
77 version,
78 path,
79 size: None,
80 naming_scheme,
81 })
82 })
83 .transpose()
84 })
85 }
86
87 async fn put_if_not_exists(
89 &self,
90 base_uri: &str,
91 version: u64,
92 path: &str,
93 size: u64,
94 ) -> Result<()>;
95
96 async fn put_if_exists(
98 &self,
99 base_uri: &str,
100 version: u64,
101 path: &str,
102 size: u64,
103 ) -> Result<()>;
104
105 async fn delete(&self, _base_uri: &str) -> Result<()> {
107 Ok(())
108 }
109}
110
111pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
112 path.filename()
113 .and_then(|name| {
114 ManifestNamingScheme::detect_scheme(name)
115 .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
116 })
117 .ok_or_else(|| {
118 Error::corrupt_file(
119 path.clone(),
120 "Path does not follow known manifest naming convention.",
121 location!(),
122 )
123 })
124}
125
126#[derive(Debug)]
130pub struct ExternalManifestCommitHandler {
131 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
132}
133
134impl ExternalManifestCommitHandler {
135 async fn finalize_manifest(
145 &self,
146 base_path: &Path,
147 staging_manifest_path: &Path,
148 version: u64,
149 size: u64,
150 store: &dyn OSObjectStore,
151 naming_scheme: ManifestNamingScheme,
152 ) -> std::result::Result<Path, Error> {
153 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
155 match store
156 .copy(staging_manifest_path, &final_manifest_path)
157 .await
158 {
159 Ok(_) => {}
160 Err(ObjectStoreError::NotFound { .. }) => return Ok(final_manifest_path), Err(e) => return Err(e.into()),
162 };
163
164 self.external_manifest_store
166 .put_if_exists(
167 base_path.as_ref(),
168 version,
169 final_manifest_path.as_ref(),
170 size,
171 )
172 .await?;
173
174 match store.delete(staging_manifest_path).await {
176 Ok(_) => {}
177 Err(ObjectStoreError::NotFound { .. }) => {}
178 Err(e) => return Err(e.into()),
179 }
180
181 Ok(final_manifest_path)
182 }
183}
184
185#[async_trait]
186impl CommitHandler for ExternalManifestCommitHandler {
187 async fn resolve_latest_location(
188 &self,
189 base_path: &Path,
190 object_store: &ObjectStore,
191 ) -> std::result::Result<ManifestLocation, Error> {
192 let location = self
193 .external_manifest_store
194 .get_latest_manifest_location(base_path.as_ref())
195 .await?;
196
197 match location {
198 Some(ManifestLocation {
199 version,
200 path,
201 size,
202 naming_scheme,
203 }) => {
204 if path.extension() == Some(MANIFEST_EXTENSION) {
206 return Ok(ManifestLocation {
207 version,
208 path,
209 size,
210 naming_scheme,
211 });
212 }
213
214 let size = if let Some(size) = size {
215 size
216 } else {
217 object_store.size(&path).await? as u64
218 };
219
220 let final_path = self
221 .finalize_manifest(
222 base_path,
223 &path,
224 version,
225 size,
226 &object_store.inner,
227 naming_scheme,
228 )
229 .await?;
230
231 Ok(ManifestLocation {
232 version,
233 path: final_path,
234 size: Some(size),
235 naming_scheme,
236 })
237 }
238 None => current_manifest_path(object_store, base_path).await,
241 }
242 }
243
244 async fn resolve_latest_version(
246 &self,
247 base_path: &Path,
248 object_store: &ObjectStore,
249 ) -> std::result::Result<Path, Error> {
250 self.resolve_latest_location(base_path, object_store)
251 .await
252 .map(|l| l.path)
253 }
254
255 async fn resolve_latest_version_id(
256 &self,
257 base_path: &Path,
258 object_store: &ObjectStore,
259 ) -> std::result::Result<u64, Error> {
260 let version = self
261 .external_manifest_store
262 .get_latest_version(base_path.as_ref())
263 .await?;
264
265 match version {
266 Some((version, _)) => Ok(version),
267 None => Ok(current_manifest_path(object_store, base_path)
268 .await?
269 .version),
270 }
271 }
272
273 async fn resolve_version(
274 &self,
275 base_path: &Path,
276 version: u64,
277 object_store: &dyn OSObjectStore,
278 ) -> std::result::Result<Path, Error> {
279 Ok(self
280 .resolve_version_location(base_path, version, object_store)
281 .await?
282 .path)
283 }
284
285 async fn resolve_version_location(
286 &self,
287 base_path: &Path,
288 version: u64,
289 object_store: &dyn OSObjectStore,
290 ) -> std::result::Result<ManifestLocation, Error> {
291 let location_res = self
292 .external_manifest_store
293 .get_manifest_location(base_path.as_ref(), version)
294 .await;
295
296 let location = match location_res {
297 Ok(p) => p,
298 Err(Error::NotFound { .. }) => {
300 let path = default_resolve_version(base_path, version, object_store)
301 .await
302 .map_err(|_| Error::NotFound {
303 uri: format!("{}@{}", base_path, version),
304 location: location!(),
305 })?
306 .path;
307 match object_store.head(&path).await {
308 Ok(ObjectMeta { size, .. }) => {
309 let res = self
310 .external_manifest_store
311 .put_if_not_exists(
312 base_path.as_ref(),
313 version,
314 path.as_ref(),
315 size as u64,
316 )
317 .await;
318 if let Err(e) = res {
319 warn!(
320 "could not update external manifest store during load, with error: {}",
321 e
322 );
323 }
324 let naming_scheme =
325 ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
326 return Ok(ManifestLocation {
327 version,
328 path,
329 size: Some(size as u64),
330 naming_scheme,
331 });
332 }
333 Err(ObjectStoreError::NotFound { .. }) => {
334 return Err(Error::NotFound {
335 uri: path.to_string(),
336 location: location!(),
337 });
338 }
339 Err(e) => return Err(e.into()),
340 }
341 }
342 Err(e) => return Err(e),
343 };
344
345 if location.path.extension() == Some(MANIFEST_EXTENSION) {
347 return Ok(location);
348 }
349
350 let naming_scheme =
351 ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
352
353 let size = if let Some(size) = location.size {
354 size
355 } else {
356 object_store.head(&location.path).await?.size as u64
357 };
358
359 let new_path = self
360 .finalize_manifest(
361 base_path,
362 &location.path,
363 version,
364 size,
365 object_store,
366 naming_scheme,
367 )
368 .await?;
369
370 Ok(ManifestLocation {
371 path: new_path,
372 ..location
373 })
374 }
375
376 async fn commit(
377 &self,
378 manifest: &mut Manifest,
379 indices: Option<Vec<Index>>,
380 base_path: &Path,
381 object_store: &ObjectStore,
382 manifest_writer: ManifestWriter,
383 naming_scheme: ManifestNamingScheme,
384 ) -> std::result::Result<Path, CommitError> {
385 let path = naming_scheme.manifest_path(base_path, manifest.version);
390 let staging_path = make_staging_manifest_path(&path)?;
391 let size = manifest_writer(object_store, manifest, indices, &staging_path).await?;
392
393 let res = self
395 .external_manifest_store
396 .put_if_not_exists(
397 base_path.as_ref(),
398 manifest.version,
399 staging_path.as_ref(),
400 size,
401 )
402 .await
403 .map_err(|_| CommitError::CommitConflict {});
404
405 if let Err(err) = res {
406 match object_store.inner.delete(&staging_path).await {
408 Ok(_) => {}
409 Err(ObjectStoreError::NotFound { .. }) => {}
410 Err(e) => return Err(CommitError::OtherError(e.into())),
411 }
412 return Err(err);
413 }
414
415 Ok(self
416 .finalize_manifest(
417 base_path,
418 &staging_path,
419 manifest.version,
420 size,
421 &object_store.inner,
422 naming_scheme,
423 )
424 .await?)
425 }
426
427 async fn delete(&self, base_path: &Path) -> Result<()> {
428 self.external_manifest_store
429 .delete(base_path.as_ref())
430 .await
431 }
432}