1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12 AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::ObjectStoreExt;
19use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
20use tracing::info;
21
22use super::{
23 MANIFEST_EXTENSION, ManifestLocation, ManifestNamingScheme, current_manifest_path,
24 default_resolve_version, make_staging_manifest_path, write_version_hint,
25};
26use crate::format::{IndexMetadata, Manifest, Transaction};
27use crate::io::commit::{CommitError, CommitHandler};
28
29#[async_trait]
42pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
43 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
45
46 async fn get_manifest_location(
47 &self,
48 base_uri: &str,
49 version: u64,
50 ) -> Result<ManifestLocation> {
51 let path = self.get(base_uri, version).await?;
52 let path = Path::parse(&path).map_err(|e| Error::invalid_input(e.to_string()))?;
53 let naming_scheme = detect_naming_scheme_from_path(&path)?;
54 Ok(ManifestLocation {
55 version,
56 path,
57 size: None,
58 naming_scheme,
59 e_tag: None,
60 })
61 }
62
63 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
67
68 async fn get_latest_manifest_location(
74 &self,
75 base_uri: &str,
76 ) -> Result<Option<ManifestLocation>> {
77 self.get_latest_version(base_uri).await.and_then(|res| {
78 res.map(|(version, uri)| {
79 let path = Path::parse(&uri).map_err(|e| Error::invalid_input(e.to_string()))?;
80 let naming_scheme = detect_naming_scheme_from_path(&path)?;
81 Ok(ManifestLocation {
82 version,
83 path,
84 size: None,
85 naming_scheme,
86 e_tag: None,
87 })
88 })
89 .transpose()
90 })
91 }
92
93 #[allow(clippy::too_many_arguments)]
102 async fn put(
103 &self,
104 base_path: &Path,
105 version: u64,
106 staging_path: &Path,
107 size: u64,
108 e_tag: Option<String>,
109 object_store: &dyn OSObjectStore,
110 naming_scheme: ManifestNamingScheme,
111 ) -> Result<ManifestLocation> {
112 self.put_if_not_exists(
116 base_path.as_ref(),
117 version,
118 staging_path.as_ref(),
119 size,
120 e_tag.clone(),
121 )
122 .await?;
123
124 let final_path = naming_scheme.manifest_path(base_path, version);
126 let copied = match object_store.copy(staging_path, &final_path).await {
127 Ok(_) => true,
128 Err(ObjectStoreError::NotFound { .. }) => false,
129 Err(e) => return Err(e.into()),
130 };
131 if copied {
132 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_path.as_ref());
133 }
134
135 let e_tag = if copied && size < 5 * 1024 * 1024 {
137 e_tag
138 } else {
139 let meta = object_store.head(&final_path).await?;
140 meta.e_tag
141 };
142
143 let location = ManifestLocation {
144 version,
145 path: final_path.clone(),
146 size: Some(size),
147 naming_scheme,
148 e_tag: e_tag.clone(),
149 };
150
151 if !copied {
152 return Ok(location);
153 }
154
155 self.put_if_exists(
157 base_path.as_ref(),
158 version,
159 final_path.as_ref(),
160 size,
161 e_tag,
162 )
163 .await?;
164
165 match object_store.delete(staging_path).await {
167 Ok(_) => {}
168 Err(ObjectStoreError::NotFound { .. }) => {}
169 Err(e) => return Err(e.into()),
170 }
171 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
172
173 Ok(location)
174 }
175
176 async fn put_if_not_exists(
178 &self,
179 base_uri: &str,
180 version: u64,
181 path: &str,
182 size: u64,
183 e_tag: Option<String>,
184 ) -> Result<()>;
185
186 async fn put_if_exists(
188 &self,
189 base_uri: &str,
190 version: u64,
191 path: &str,
192 size: u64,
193 e_tag: Option<String>,
194 ) -> Result<()>;
195
196 async fn delete(&self, _base_uri: &str) -> Result<()> {
198 Ok(())
199 }
200}
201
202pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
203 path.filename()
204 .and_then(|name| {
205 ManifestNamingScheme::detect_scheme(name)
206 .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
207 })
208 .ok_or_else(|| {
209 Error::corrupt_file(
210 path.clone(),
211 "Path does not follow known manifest naming convention.",
212 )
213 })
214}
215
216#[derive(Debug)]
220pub struct ExternalManifestCommitHandler {
221 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
222}
223
224impl ExternalManifestCommitHandler {
225 #[allow(clippy::too_many_arguments)]
235 async fn finalize_manifest(
236 &self,
237 base_path: &Path,
238 staging_manifest_path: &Path,
239 version: u64,
240 size: u64,
241 e_tag: Option<String>,
242 store: &dyn OSObjectStore,
243 naming_scheme: ManifestNamingScheme,
244 ) -> std::result::Result<ManifestLocation, Error> {
245 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
247
248 let copied = match store
249 .copy(staging_manifest_path, &final_manifest_path)
250 .await
251 {
252 Ok(_) => true,
253 Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()),
255 };
256 if copied {
257 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
258 }
259
260 let e_tag = if copied && size < 5 * 1024 * 1024 {
267 e_tag
268 } else {
269 let meta = store.head(&final_manifest_path).await?;
270 meta.e_tag
271 };
272
273 let location = ManifestLocation {
274 version,
275 path: final_manifest_path,
276 size: Some(size),
277 naming_scheme,
278 e_tag,
279 };
280
281 if !copied {
282 return Ok(location);
283 }
284
285 self.external_manifest_store
287 .put_if_exists(
288 base_path.as_ref(),
289 version,
290 location.path.as_ref(),
291 size,
292 location.e_tag.clone(),
293 )
294 .await?;
295
296 match store.delete(staging_manifest_path).await {
298 Ok(_) => {}
299 Err(ObjectStoreError::NotFound { .. }) => {}
300 Err(e) => return Err(e.into()),
301 }
302 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
303
304 Ok(location)
305 }
306}
307
308#[async_trait]
309impl CommitHandler for ExternalManifestCommitHandler {
310 async fn resolve_latest_location(
311 &self,
312 base_path: &Path,
313 object_store: &ObjectStore,
314 ) -> std::result::Result<ManifestLocation, Error> {
315 let location = self
316 .external_manifest_store
317 .get_latest_manifest_location(base_path.as_ref())
318 .await?;
319
320 match location {
321 Some(ManifestLocation {
322 version,
323 path,
324 size,
325 naming_scheme,
326 e_tag,
327 }) => {
328 if path.extension() == Some(MANIFEST_EXTENSION) {
330 return Ok(ManifestLocation {
331 version,
332 path,
333 size,
334 naming_scheme,
335 e_tag,
336 });
337 }
338
339 let (size, e_tag) = if let Some(size) = size {
340 (size, e_tag)
341 } else {
342 match object_store.inner.head(&path).await {
343 Ok(meta) => (meta.size, meta.e_tag),
344 Err(ObjectStoreError::NotFound { .. }) => {
345 let new_location = self
347 .external_manifest_store
348 .get_manifest_location(base_path.as_ref(), version)
349 .await?;
350 return Ok(new_location);
351 }
352 Err(e) => return Err(e.into()),
353 }
354 };
355
356 let final_location = self
357 .finalize_manifest(
358 base_path,
359 &path,
360 version,
361 size,
362 e_tag.clone(),
363 &object_store.inner,
364 naming_scheme,
365 )
366 .await?;
367
368 Ok(final_location)
369 }
370 None => current_manifest_path(object_store, base_path).await,
373 }
374 }
375
376 async fn resolve_version_location(
377 &self,
378 base_path: &Path,
379 version: u64,
380 object_store: &dyn OSObjectStore,
381 ) -> std::result::Result<ManifestLocation, Error> {
382 let location_res = self
383 .external_manifest_store
384 .get_manifest_location(base_path.as_ref(), version)
385 .await;
386
387 let location = match location_res {
388 Ok(p) => p,
389 Err(Error::NotFound { .. }) => {
391 let path = default_resolve_version(base_path, version, object_store)
392 .await
393 .map_err(|_| Error::not_found(format!("{}@{}", base_path, version)))?
394 .path;
395 match object_store.head(&path).await {
396 Ok(ObjectMeta { size, e_tag, .. }) => {
397 let res = self
398 .external_manifest_store
399 .put_if_not_exists(
400 base_path.as_ref(),
401 version,
402 path.as_ref(),
403 size,
404 e_tag.clone(),
405 )
406 .await;
407 if let Err(e) = res {
408 warn!(
409 "could not update external manifest store during load, with error: {}",
410 e
411 );
412 }
413 let naming_scheme =
414 ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
415 return Ok(ManifestLocation {
416 version,
417 path,
418 size: Some(size),
419 naming_scheme,
420 e_tag,
421 });
422 }
423 Err(ObjectStoreError::NotFound { .. }) => {
424 return Err(Error::not_found(path.to_string()));
425 }
426 Err(e) => return Err(e.into()),
427 }
428 }
429 Err(e) => return Err(e),
430 };
431
432 if location.path.extension() == Some(MANIFEST_EXTENSION) {
434 return Ok(location);
435 }
436
437 let naming_scheme =
438 ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
439
440 let (size, e_tag) = if let Some(size) = location.size {
441 (size, location.e_tag.clone())
442 } else {
443 let meta = object_store.head(&location.path).await?;
444 (meta.size as u64, meta.e_tag)
445 };
446
447 self.finalize_manifest(
448 base_path,
449 &location.path,
450 version,
451 size,
452 e_tag,
453 object_store,
454 naming_scheme,
455 )
456 .await
457 }
458
459 async fn commit(
460 &self,
461 manifest: &mut Manifest,
462 indices: Option<Vec<IndexMetadata>>,
463 base_path: &Path,
464 object_store: &ObjectStore,
465 manifest_writer: super::ManifestWriter,
466 naming_scheme: ManifestNamingScheme,
467 transaction: Option<Transaction>,
468 ) -> std::result::Result<ManifestLocation, CommitError> {
469 let path = naming_scheme.manifest_path(base_path, manifest.version);
474 let staging_path = make_staging_manifest_path(&path)?;
475 let write_res =
476 manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
477
478 let result = self
480 .external_manifest_store
481 .put(
482 base_path,
483 manifest.version,
484 &staging_path,
485 write_res.size as u64,
486 write_res.e_tag,
487 &object_store.inner,
488 naming_scheme,
489 )
490 .await;
491
492 match result {
493 Ok(location) => {
494 write_version_hint(object_store, base_path, manifest.version).await;
495 Ok(location)
496 }
497 Err(_) => {
498 match object_store.inner.delete(&staging_path).await {
500 Ok(_) => {}
501 Err(ObjectStoreError::NotFound { .. }) => {}
502 Err(e) => return Err(CommitError::OtherError(e.into())),
503 }
504 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
505 Err(CommitError::CommitConflict {})
506 }
507 }
508 }
509
510 async fn delete(&self, base_path: &Path) -> Result<()> {
511 self.external_manifest_store
512 .delete(base_path.as_ref())
513 .await
514 }
515}