1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12 AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
19use snafu::location;
20use tracing::info;
21
22use super::{
23 current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
24 ManifestNamingScheme, MANIFEST_EXTENSION,
25};
26use crate::format::{IndexMetadata, Manifest};
27use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
28
29#[async_trait]
42pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
43 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
45
46 async fn get_manifest_location(
47 &self,
48 base_uri: &str,
49 version: u64,
50 ) -> Result<ManifestLocation> {
51 let path = self.get(base_uri, version).await?;
52 let path = Path::from(path);
53 let naming_scheme = detect_naming_scheme_from_path(&path)?;
54 Ok(ManifestLocation {
55 version,
56 path,
57 size: None,
58 naming_scheme,
59 e_tag: None,
60 })
61 }
62
63 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
67
68 async fn get_latest_manifest_location(
74 &self,
75 base_uri: &str,
76 ) -> Result<Option<ManifestLocation>> {
77 self.get_latest_version(base_uri).await.and_then(|res| {
78 res.map(|(version, uri)| {
79 let path = Path::from(uri);
80 let naming_scheme = detect_naming_scheme_from_path(&path)?;
81 Ok(ManifestLocation {
82 version,
83 path,
84 size: None,
85 naming_scheme,
86 e_tag: None,
87 })
88 })
89 .transpose()
90 })
91 }
92
93 async fn put_if_not_exists(
95 &self,
96 base_uri: &str,
97 version: u64,
98 path: &str,
99 size: u64,
100 e_tag: Option<String>,
101 ) -> Result<()>;
102
103 async fn put_if_exists(
105 &self,
106 base_uri: &str,
107 version: u64,
108 path: &str,
109 size: u64,
110 e_tag: Option<String>,
111 ) -> Result<()>;
112
113 async fn delete(&self, _base_uri: &str) -> Result<()> {
115 Ok(())
116 }
117}
118
119pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
120 path.filename()
121 .and_then(|name| {
122 ManifestNamingScheme::detect_scheme(name)
123 .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
124 })
125 .ok_or_else(|| {
126 Error::corrupt_file(
127 path.clone(),
128 "Path does not follow known manifest naming convention.",
129 location!(),
130 )
131 })
132}
133
134#[derive(Debug)]
138pub struct ExternalManifestCommitHandler {
139 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
140}
141
142impl ExternalManifestCommitHandler {
143 #[allow(clippy::too_many_arguments)]
153 async fn finalize_manifest(
154 &self,
155 base_path: &Path,
156 staging_manifest_path: &Path,
157 version: u64,
158 size: u64,
159 e_tag: Option<String>,
160 store: &dyn OSObjectStore,
161 naming_scheme: ManifestNamingScheme,
162 ) -> std::result::Result<ManifestLocation, Error> {
163 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
165
166 let copied = match store
167 .copy(staging_manifest_path, &final_manifest_path)
168 .await
169 {
170 Ok(_) => true,
171 Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()),
173 };
174 if copied {
175 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
176 }
177
178 let e_tag = if copied && size < 5 * 1024 * 1024 {
185 e_tag
186 } else {
187 let meta = store.head(&final_manifest_path).await?;
188 meta.e_tag
189 };
190
191 let location = ManifestLocation {
192 version,
193 path: final_manifest_path,
194 size: Some(size),
195 naming_scheme,
196 e_tag,
197 };
198
199 if !copied {
200 return Ok(location);
201 }
202
203 self.external_manifest_store
205 .put_if_exists(
206 base_path.as_ref(),
207 version,
208 location.path.as_ref(),
209 size,
210 location.e_tag.clone(),
211 )
212 .await?;
213
214 match store.delete(staging_manifest_path).await {
216 Ok(_) => {}
217 Err(ObjectStoreError::NotFound { .. }) => {}
218 Err(e) => return Err(e.into()),
219 }
220 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
221
222 Ok(location)
223 }
224}
225
226#[async_trait]
227impl CommitHandler for ExternalManifestCommitHandler {
228 async fn resolve_latest_location(
229 &self,
230 base_path: &Path,
231 object_store: &ObjectStore,
232 ) -> std::result::Result<ManifestLocation, Error> {
233 let location = self
234 .external_manifest_store
235 .get_latest_manifest_location(base_path.as_ref())
236 .await?;
237
238 match location {
239 Some(ManifestLocation {
240 version,
241 path,
242 size,
243 naming_scheme,
244 e_tag,
245 }) => {
246 if path.extension() == Some(MANIFEST_EXTENSION) {
248 return Ok(ManifestLocation {
249 version,
250 path,
251 size,
252 naming_scheme,
253 e_tag,
254 });
255 }
256
257 let (size, e_tag) = if let Some(size) = size {
258 (size, e_tag)
259 } else {
260 let meta = object_store.inner.head(&path).await?;
261 (meta.size, meta.e_tag)
262 };
263
264 let final_location = self
265 .finalize_manifest(
266 base_path,
267 &path,
268 version,
269 size,
270 e_tag.clone(),
271 &object_store.inner,
272 naming_scheme,
273 )
274 .await?;
275
276 Ok(final_location)
277 }
278 None => current_manifest_path(object_store, base_path).await,
281 }
282 }
283
284 async fn resolve_version_location(
285 &self,
286 base_path: &Path,
287 version: u64,
288 object_store: &dyn OSObjectStore,
289 ) -> std::result::Result<ManifestLocation, Error> {
290 let location_res = self
291 .external_manifest_store
292 .get_manifest_location(base_path.as_ref(), version)
293 .await;
294
295 let location = match location_res {
296 Ok(p) => p,
297 Err(Error::NotFound { .. }) => {
299 let path = default_resolve_version(base_path, version, object_store)
300 .await
301 .map_err(|_| Error::NotFound {
302 uri: format!("{}@{}", base_path, version),
303 location: location!(),
304 })?
305 .path;
306 match object_store.head(&path).await {
307 Ok(ObjectMeta { size, e_tag, .. }) => {
308 let res = self
309 .external_manifest_store
310 .put_if_not_exists(
311 base_path.as_ref(),
312 version,
313 path.as_ref(),
314 size,
315 e_tag.clone(),
316 )
317 .await;
318 if let Err(e) = res {
319 warn!(
320 "could not update external manifest store during load, with error: {}",
321 e
322 );
323 }
324 let naming_scheme =
325 ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
326 return Ok(ManifestLocation {
327 version,
328 path,
329 size: Some(size),
330 naming_scheme,
331 e_tag,
332 });
333 }
334 Err(ObjectStoreError::NotFound { .. }) => {
335 return Err(Error::NotFound {
336 uri: path.to_string(),
337 location: location!(),
338 });
339 }
340 Err(e) => return Err(e.into()),
341 }
342 }
343 Err(e) => return Err(e),
344 };
345
346 if location.path.extension() == Some(MANIFEST_EXTENSION) {
348 return Ok(location);
349 }
350
351 let naming_scheme =
352 ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
353
354 let (size, e_tag) = if let Some(size) = location.size {
355 (size, location.e_tag.clone())
356 } else {
357 let meta = object_store.head(&location.path).await?;
358 (meta.size as u64, meta.e_tag)
359 };
360
361 self.finalize_manifest(
362 base_path,
363 &location.path,
364 version,
365 size,
366 e_tag,
367 object_store,
368 naming_scheme,
369 )
370 .await
371 }
372
373 async fn commit(
374 &self,
375 manifest: &mut Manifest,
376 indices: Option<Vec<IndexMetadata>>,
377 base_path: &Path,
378 object_store: &ObjectStore,
379 manifest_writer: ManifestWriter,
380 naming_scheme: ManifestNamingScheme,
381 ) -> std::result::Result<ManifestLocation, CommitError> {
382 let path = naming_scheme.manifest_path(base_path, manifest.version);
387 let staging_path = make_staging_manifest_path(&path)?;
388 let write_res = manifest_writer(object_store, manifest, indices, &staging_path).await?;
389
390 let res = self
392 .external_manifest_store
393 .put_if_not_exists(
394 base_path.as_ref(),
395 manifest.version,
396 staging_path.as_ref(),
397 write_res.size as u64,
398 write_res.e_tag.clone(),
399 )
400 .await
401 .map_err(|_| CommitError::CommitConflict {});
402
403 if let Err(err) = res {
404 match object_store.inner.delete(&staging_path).await {
406 Ok(_) => {}
407 Err(ObjectStoreError::NotFound { .. }) => {}
408 Err(e) => return Err(CommitError::OtherError(e.into())),
409 }
410 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
411 return Err(err);
412 }
413
414 Ok(self
415 .finalize_manifest(
416 base_path,
417 &staging_path,
418 manifest.version,
419 write_res.size as u64,
420 write_res.e_tag,
421 &object_store.inner,
422 naming_scheme,
423 )
424 .await?)
425 }
426
427 async fn delete(&self, base_path: &Path) -> Result<()> {
428 self.external_manifest_store
429 .delete(base_path.as_ref())
430 .await
431 }
432}