1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12 AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
19use snafu::location;
20use tracing::info;
21
22use super::{
23 current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
24 ManifestNamingScheme, MANIFEST_EXTENSION,
25};
26use crate::format::{IndexMetadata, Manifest, Transaction};
27use crate::io::commit::{CommitError, CommitHandler};
28
29#[async_trait]
42pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
43 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
45
46 async fn get_manifest_location(
47 &self,
48 base_uri: &str,
49 version: u64,
50 ) -> Result<ManifestLocation> {
51 let path = self.get(base_uri, version).await?;
52 let path = Path::from(path);
53 let naming_scheme = detect_naming_scheme_from_path(&path)?;
54 Ok(ManifestLocation {
55 version,
56 path,
57 size: None,
58 naming_scheme,
59 e_tag: None,
60 })
61 }
62
63 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
67
68 async fn get_latest_manifest_location(
74 &self,
75 base_uri: &str,
76 ) -> Result<Option<ManifestLocation>> {
77 self.get_latest_version(base_uri).await.and_then(|res| {
78 res.map(|(version, uri)| {
79 let path = Path::from(uri);
80 let naming_scheme = detect_naming_scheme_from_path(&path)?;
81 Ok(ManifestLocation {
82 version,
83 path,
84 size: None,
85 naming_scheme,
86 e_tag: None,
87 })
88 })
89 .transpose()
90 })
91 }
92
93 async fn put_if_not_exists(
95 &self,
96 base_uri: &str,
97 version: u64,
98 path: &str,
99 size: u64,
100 e_tag: Option<String>,
101 ) -> Result<()>;
102
103 async fn put_if_exists(
105 &self,
106 base_uri: &str,
107 version: u64,
108 path: &str,
109 size: u64,
110 e_tag: Option<String>,
111 ) -> Result<()>;
112
113 async fn delete(&self, _base_uri: &str) -> Result<()> {
115 Ok(())
116 }
117}
118
119pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
120 path.filename()
121 .and_then(|name| {
122 ManifestNamingScheme::detect_scheme(name)
123 .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
124 })
125 .ok_or_else(|| {
126 Error::corrupt_file(
127 path.clone(),
128 "Path does not follow known manifest naming convention.",
129 location!(),
130 )
131 })
132}
133
134#[derive(Debug)]
138pub struct ExternalManifestCommitHandler {
139 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
140}
141
142impl ExternalManifestCommitHandler {
143 #[allow(clippy::too_many_arguments)]
153 async fn finalize_manifest(
154 &self,
155 base_path: &Path,
156 staging_manifest_path: &Path,
157 version: u64,
158 size: u64,
159 e_tag: Option<String>,
160 store: &dyn OSObjectStore,
161 naming_scheme: ManifestNamingScheme,
162 ) -> std::result::Result<ManifestLocation, Error> {
163 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
165
166 let copied = match store
167 .copy(staging_manifest_path, &final_manifest_path)
168 .await
169 {
170 Ok(_) => true,
171 Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()),
173 };
174 if copied {
175 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
176 }
177
178 let e_tag = if copied && size < 5 * 1024 * 1024 {
185 e_tag
186 } else {
187 let meta = store.head(&final_manifest_path).await?;
188 meta.e_tag
189 };
190
191 let location = ManifestLocation {
192 version,
193 path: final_manifest_path,
194 size: Some(size),
195 naming_scheme,
196 e_tag,
197 };
198
199 if !copied {
200 return Ok(location);
201 }
202
203 self.external_manifest_store
205 .put_if_exists(
206 base_path.as_ref(),
207 version,
208 location.path.as_ref(),
209 size,
210 location.e_tag.clone(),
211 )
212 .await?;
213
214 match store.delete(staging_manifest_path).await {
216 Ok(_) => {}
217 Err(ObjectStoreError::NotFound { .. }) => {}
218 Err(e) => return Err(e.into()),
219 }
220 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
221
222 Ok(location)
223 }
224}
225
226#[async_trait]
227impl CommitHandler for ExternalManifestCommitHandler {
228 async fn resolve_latest_location(
229 &self,
230 base_path: &Path,
231 object_store: &ObjectStore,
232 ) -> std::result::Result<ManifestLocation, Error> {
233 let location = self
234 .external_manifest_store
235 .get_latest_manifest_location(base_path.as_ref())
236 .await?;
237
238 match location {
239 Some(ManifestLocation {
240 version,
241 path,
242 size,
243 naming_scheme,
244 e_tag,
245 }) => {
246 if path.extension() == Some(MANIFEST_EXTENSION) {
248 return Ok(ManifestLocation {
249 version,
250 path,
251 size,
252 naming_scheme,
253 e_tag,
254 });
255 }
256
257 let (size, e_tag) = if let Some(size) = size {
258 (size, e_tag)
259 } else {
260 match object_store.inner.head(&path).await {
261 Ok(meta) => (meta.size, meta.e_tag),
262 Err(ObjectStoreError::NotFound { .. }) => {
263 let new_location = self
265 .external_manifest_store
266 .get_manifest_location(base_path.as_ref(), version)
267 .await?;
268 return Ok(new_location);
269 }
270 Err(e) => return Err(e.into()),
271 }
272 };
273
274 let final_location = self
275 .finalize_manifest(
276 base_path,
277 &path,
278 version,
279 size,
280 e_tag.clone(),
281 &object_store.inner,
282 naming_scheme,
283 )
284 .await?;
285
286 Ok(final_location)
287 }
288 None => current_manifest_path(object_store, base_path).await,
291 }
292 }
293
294 async fn resolve_version_location(
295 &self,
296 base_path: &Path,
297 version: u64,
298 object_store: &dyn OSObjectStore,
299 ) -> std::result::Result<ManifestLocation, Error> {
300 let location_res = self
301 .external_manifest_store
302 .get_manifest_location(base_path.as_ref(), version)
303 .await;
304
305 let location = match location_res {
306 Ok(p) => p,
307 Err(Error::NotFound { .. }) => {
309 let path = default_resolve_version(base_path, version, object_store)
310 .await
311 .map_err(|_| Error::NotFound {
312 uri: format!("{}@{}", base_path, version),
313 location: location!(),
314 })?
315 .path;
316 match object_store.head(&path).await {
317 Ok(ObjectMeta { size, e_tag, .. }) => {
318 let res = self
319 .external_manifest_store
320 .put_if_not_exists(
321 base_path.as_ref(),
322 version,
323 path.as_ref(),
324 size,
325 e_tag.clone(),
326 )
327 .await;
328 if let Err(e) = res {
329 warn!(
330 "could not update external manifest store during load, with error: {}",
331 e
332 );
333 }
334 let naming_scheme =
335 ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
336 return Ok(ManifestLocation {
337 version,
338 path,
339 size: Some(size),
340 naming_scheme,
341 e_tag,
342 });
343 }
344 Err(ObjectStoreError::NotFound { .. }) => {
345 return Err(Error::NotFound {
346 uri: path.to_string(),
347 location: location!(),
348 });
349 }
350 Err(e) => return Err(e.into()),
351 }
352 }
353 Err(e) => return Err(e),
354 };
355
356 if location.path.extension() == Some(MANIFEST_EXTENSION) {
358 return Ok(location);
359 }
360
361 let naming_scheme =
362 ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
363
364 let (size, e_tag) = if let Some(size) = location.size {
365 (size, location.e_tag.clone())
366 } else {
367 let meta = object_store.head(&location.path).await?;
368 (meta.size as u64, meta.e_tag)
369 };
370
371 self.finalize_manifest(
372 base_path,
373 &location.path,
374 version,
375 size,
376 e_tag,
377 object_store,
378 naming_scheme,
379 )
380 .await
381 }
382
383 async fn commit(
384 &self,
385 manifest: &mut Manifest,
386 indices: Option<Vec<IndexMetadata>>,
387 base_path: &Path,
388 object_store: &ObjectStore,
389 manifest_writer: super::ManifestWriter,
390 naming_scheme: ManifestNamingScheme,
391 transaction: Option<Transaction>,
392 ) -> std::result::Result<ManifestLocation, CommitError> {
393 let path = naming_scheme.manifest_path(base_path, manifest.version);
398 let staging_path = make_staging_manifest_path(&path)?;
399 let write_res =
400 manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
401
402 let res = self
404 .external_manifest_store
405 .put_if_not_exists(
406 base_path.as_ref(),
407 manifest.version,
408 staging_path.as_ref(),
409 write_res.size as u64,
410 write_res.e_tag.clone(),
411 )
412 .await
413 .map_err(|_| CommitError::CommitConflict {});
414
415 if let Err(err) = res {
416 match object_store.inner.delete(&staging_path).await {
418 Ok(_) => {}
419 Err(ObjectStoreError::NotFound { .. }) => {}
420 Err(e) => return Err(CommitError::OtherError(e.into())),
421 }
422 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
423 return Err(err);
424 }
425
426 Ok(self
427 .finalize_manifest(
428 base_path,
429 &staging_path,
430 manifest.version,
431 write_res.size as u64,
432 write_res.e_tag,
433 &object_store.inner,
434 naming_scheme,
435 )
436 .await?)
437 }
438
439 async fn delete(&self, base_path: &Path) -> Result<()> {
440 self.external_manifest_store
441 .delete(base_path.as_ref())
442 .await
443 }
444}