1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12 AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
19use tracing::info;
20
21use super::{
22 MANIFEST_EXTENSION, ManifestLocation, ManifestNamingScheme, current_manifest_path,
23 default_resolve_version, make_staging_manifest_path,
24};
25use crate::format::{IndexMetadata, Manifest, Transaction};
26use crate::io::commit::{CommitError, CommitHandler};
27
28#[async_trait]
41pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
42 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
44
45 async fn get_manifest_location(
46 &self,
47 base_uri: &str,
48 version: u64,
49 ) -> Result<ManifestLocation> {
50 let path = self.get(base_uri, version).await?;
51 let path = Path::from(path);
52 let naming_scheme = detect_naming_scheme_from_path(&path)?;
53 Ok(ManifestLocation {
54 version,
55 path,
56 size: None,
57 naming_scheme,
58 e_tag: None,
59 })
60 }
61
62 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
66
67 async fn get_latest_manifest_location(
73 &self,
74 base_uri: &str,
75 ) -> Result<Option<ManifestLocation>> {
76 self.get_latest_version(base_uri).await.and_then(|res| {
77 res.map(|(version, uri)| {
78 let path = Path::from(uri);
79 let naming_scheme = detect_naming_scheme_from_path(&path)?;
80 Ok(ManifestLocation {
81 version,
82 path,
83 size: None,
84 naming_scheme,
85 e_tag: None,
86 })
87 })
88 .transpose()
89 })
90 }
91
92 #[allow(clippy::too_many_arguments)]
101 async fn put(
102 &self,
103 base_path: &Path,
104 version: u64,
105 staging_path: &Path,
106 size: u64,
107 e_tag: Option<String>,
108 object_store: &dyn OSObjectStore,
109 naming_scheme: ManifestNamingScheme,
110 ) -> Result<ManifestLocation> {
111 self.put_if_not_exists(
115 base_path.as_ref(),
116 version,
117 staging_path.as_ref(),
118 size,
119 e_tag.clone(),
120 )
121 .await?;
122
123 let final_path = naming_scheme.manifest_path(base_path, version);
125 let copied = match object_store.copy(staging_path, &final_path).await {
126 Ok(_) => true,
127 Err(ObjectStoreError::NotFound { .. }) => false,
128 Err(e) => return Err(e.into()),
129 };
130 if copied {
131 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_path.as_ref());
132 }
133
134 let e_tag = if copied && size < 5 * 1024 * 1024 {
136 e_tag
137 } else {
138 let meta = object_store.head(&final_path).await?;
139 meta.e_tag
140 };
141
142 let location = ManifestLocation {
143 version,
144 path: final_path.clone(),
145 size: Some(size),
146 naming_scheme,
147 e_tag: e_tag.clone(),
148 };
149
150 if !copied {
151 return Ok(location);
152 }
153
154 self.put_if_exists(
156 base_path.as_ref(),
157 version,
158 final_path.as_ref(),
159 size,
160 e_tag,
161 )
162 .await?;
163
164 match object_store.delete(staging_path).await {
166 Ok(_) => {}
167 Err(ObjectStoreError::NotFound { .. }) => {}
168 Err(e) => return Err(e.into()),
169 }
170 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
171
172 Ok(location)
173 }
174
175 async fn put_if_not_exists(
177 &self,
178 base_uri: &str,
179 version: u64,
180 path: &str,
181 size: u64,
182 e_tag: Option<String>,
183 ) -> Result<()>;
184
185 async fn put_if_exists(
187 &self,
188 base_uri: &str,
189 version: u64,
190 path: &str,
191 size: u64,
192 e_tag: Option<String>,
193 ) -> Result<()>;
194
195 async fn delete(&self, _base_uri: &str) -> Result<()> {
197 Ok(())
198 }
199}
200
201pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
202 path.filename()
203 .and_then(|name| {
204 ManifestNamingScheme::detect_scheme(name)
205 .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
206 })
207 .ok_or_else(|| {
208 Error::corrupt_file(
209 path.clone(),
210 "Path does not follow known manifest naming convention.",
211 )
212 })
213}
214
215#[derive(Debug)]
219pub struct ExternalManifestCommitHandler {
220 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
221}
222
223impl ExternalManifestCommitHandler {
224 #[allow(clippy::too_many_arguments)]
234 async fn finalize_manifest(
235 &self,
236 base_path: &Path,
237 staging_manifest_path: &Path,
238 version: u64,
239 size: u64,
240 e_tag: Option<String>,
241 store: &dyn OSObjectStore,
242 naming_scheme: ManifestNamingScheme,
243 ) -> std::result::Result<ManifestLocation, Error> {
244 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
246
247 let copied = match store
248 .copy(staging_manifest_path, &final_manifest_path)
249 .await
250 {
251 Ok(_) => true,
252 Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()),
254 };
255 if copied {
256 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
257 }
258
259 let e_tag = if copied && size < 5 * 1024 * 1024 {
266 e_tag
267 } else {
268 let meta = store.head(&final_manifest_path).await?;
269 meta.e_tag
270 };
271
272 let location = ManifestLocation {
273 version,
274 path: final_manifest_path,
275 size: Some(size),
276 naming_scheme,
277 e_tag,
278 };
279
280 if !copied {
281 return Ok(location);
282 }
283
284 self.external_manifest_store
286 .put_if_exists(
287 base_path.as_ref(),
288 version,
289 location.path.as_ref(),
290 size,
291 location.e_tag.clone(),
292 )
293 .await?;
294
295 match store.delete(staging_manifest_path).await {
297 Ok(_) => {}
298 Err(ObjectStoreError::NotFound { .. }) => {}
299 Err(e) => return Err(e.into()),
300 }
301 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
302
303 Ok(location)
304 }
305}
306
307#[async_trait]
308impl CommitHandler for ExternalManifestCommitHandler {
309 async fn resolve_latest_location(
310 &self,
311 base_path: &Path,
312 object_store: &ObjectStore,
313 ) -> std::result::Result<ManifestLocation, Error> {
314 let location = self
315 .external_manifest_store
316 .get_latest_manifest_location(base_path.as_ref())
317 .await?;
318
319 match location {
320 Some(ManifestLocation {
321 version,
322 path,
323 size,
324 naming_scheme,
325 e_tag,
326 }) => {
327 if path.extension() == Some(MANIFEST_EXTENSION) {
329 return Ok(ManifestLocation {
330 version,
331 path,
332 size,
333 naming_scheme,
334 e_tag,
335 });
336 }
337
338 let (size, e_tag) = if let Some(size) = size {
339 (size, e_tag)
340 } else {
341 match object_store.inner.head(&path).await {
342 Ok(meta) => (meta.size, meta.e_tag),
343 Err(ObjectStoreError::NotFound { .. }) => {
344 let new_location = self
346 .external_manifest_store
347 .get_manifest_location(base_path.as_ref(), version)
348 .await?;
349 return Ok(new_location);
350 }
351 Err(e) => return Err(e.into()),
352 }
353 };
354
355 let final_location = self
356 .finalize_manifest(
357 base_path,
358 &path,
359 version,
360 size,
361 e_tag.clone(),
362 &object_store.inner,
363 naming_scheme,
364 )
365 .await?;
366
367 Ok(final_location)
368 }
369 None => current_manifest_path(object_store, base_path).await,
372 }
373 }
374
375 async fn resolve_version_location(
376 &self,
377 base_path: &Path,
378 version: u64,
379 object_store: &dyn OSObjectStore,
380 ) -> std::result::Result<ManifestLocation, Error> {
381 let location_res = self
382 .external_manifest_store
383 .get_manifest_location(base_path.as_ref(), version)
384 .await;
385
386 let location = match location_res {
387 Ok(p) => p,
388 Err(Error::NotFound { .. }) => {
390 let path = default_resolve_version(base_path, version, object_store)
391 .await
392 .map_err(|_| Error::not_found(format!("{}@{}", base_path, version)))?
393 .path;
394 match object_store.head(&path).await {
395 Ok(ObjectMeta { size, e_tag, .. }) => {
396 let res = self
397 .external_manifest_store
398 .put_if_not_exists(
399 base_path.as_ref(),
400 version,
401 path.as_ref(),
402 size,
403 e_tag.clone(),
404 )
405 .await;
406 if let Err(e) = res {
407 warn!(
408 "could not update external manifest store during load, with error: {}",
409 e
410 );
411 }
412 let naming_scheme =
413 ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
414 return Ok(ManifestLocation {
415 version,
416 path,
417 size: Some(size),
418 naming_scheme,
419 e_tag,
420 });
421 }
422 Err(ObjectStoreError::NotFound { .. }) => {
423 return Err(Error::not_found(path.to_string()));
424 }
425 Err(e) => return Err(e.into()),
426 }
427 }
428 Err(e) => return Err(e),
429 };
430
431 if location.path.extension() == Some(MANIFEST_EXTENSION) {
433 return Ok(location);
434 }
435
436 let naming_scheme =
437 ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
438
439 let (size, e_tag) = if let Some(size) = location.size {
440 (size, location.e_tag.clone())
441 } else {
442 let meta = object_store.head(&location.path).await?;
443 (meta.size as u64, meta.e_tag)
444 };
445
446 self.finalize_manifest(
447 base_path,
448 &location.path,
449 version,
450 size,
451 e_tag,
452 object_store,
453 naming_scheme,
454 )
455 .await
456 }
457
458 async fn commit(
459 &self,
460 manifest: &mut Manifest,
461 indices: Option<Vec<IndexMetadata>>,
462 base_path: &Path,
463 object_store: &ObjectStore,
464 manifest_writer: super::ManifestWriter,
465 naming_scheme: ManifestNamingScheme,
466 transaction: Option<Transaction>,
467 ) -> std::result::Result<ManifestLocation, CommitError> {
468 let path = naming_scheme.manifest_path(base_path, manifest.version);
473 let staging_path = make_staging_manifest_path(&path)?;
474 let write_res =
475 manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
476
477 let result = self
479 .external_manifest_store
480 .put(
481 base_path,
482 manifest.version,
483 &staging_path,
484 write_res.size as u64,
485 write_res.e_tag,
486 &object_store.inner,
487 naming_scheme,
488 )
489 .await;
490
491 match result {
492 Ok(location) => Ok(location),
493 Err(_) => {
494 match object_store.inner.delete(&staging_path).await {
496 Ok(_) => {}
497 Err(ObjectStoreError::NotFound { .. }) => {}
498 Err(e) => return Err(CommitError::OtherError(e.into())),
499 }
500 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
501 Err(CommitError::CommitConflict {})
502 }
503 }
504 }
505
506 async fn delete(&self, base_path: &Path) -> Result<()> {
507 self.external_manifest_store
508 .delete(base_path.as_ref())
509 .await
510 }
511}