1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12 AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
19use snafu::location;
20use tracing::info;
21
22use super::{
23 current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
24 ManifestNamingScheme, MANIFEST_EXTENSION,
25};
26use crate::format::{IndexMetadata, Manifest, Transaction};
27use crate::io::commit::{CommitError, CommitHandler};
28
29#[async_trait]
42pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
43 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
45
46 async fn get_manifest_location(
47 &self,
48 base_uri: &str,
49 version: u64,
50 ) -> Result<ManifestLocation> {
51 let path = self.get(base_uri, version).await?;
52 let path = Path::from(path);
53 let naming_scheme = detect_naming_scheme_from_path(&path)?;
54 Ok(ManifestLocation {
55 version,
56 path,
57 size: None,
58 naming_scheme,
59 e_tag: None,
60 })
61 }
62
63 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
67
68 async fn get_latest_manifest_location(
74 &self,
75 base_uri: &str,
76 ) -> Result<Option<ManifestLocation>> {
77 self.get_latest_version(base_uri).await.and_then(|res| {
78 res.map(|(version, uri)| {
79 let path = Path::from(uri);
80 let naming_scheme = detect_naming_scheme_from_path(&path)?;
81 Ok(ManifestLocation {
82 version,
83 path,
84 size: None,
85 naming_scheme,
86 e_tag: None,
87 })
88 })
89 .transpose()
90 })
91 }
92
93 async fn put_if_not_exists(
95 &self,
96 base_uri: &str,
97 version: u64,
98 path: &str,
99 size: u64,
100 e_tag: Option<String>,
101 ) -> Result<()>;
102
103 async fn put_if_exists(
105 &self,
106 base_uri: &str,
107 version: u64,
108 path: &str,
109 size: u64,
110 e_tag: Option<String>,
111 ) -> Result<()>;
112
113 async fn delete(&self, _base_uri: &str) -> Result<()> {
115 Ok(())
116 }
117}
118
119pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
120 path.filename()
121 .and_then(|name| {
122 ManifestNamingScheme::detect_scheme(name)
123 .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
124 })
125 .ok_or_else(|| {
126 Error::corrupt_file(
127 path.clone(),
128 "Path does not follow known manifest naming convention.",
129 location!(),
130 )
131 })
132}
133
134#[derive(Debug)]
138pub struct ExternalManifestCommitHandler {
139 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
140}
141
142impl ExternalManifestCommitHandler {
143 #[allow(clippy::too_many_arguments)]
153 async fn finalize_manifest(
154 &self,
155 base_path: &Path,
156 staging_manifest_path: &Path,
157 version: u64,
158 size: u64,
159 e_tag: Option<String>,
160 store: &dyn OSObjectStore,
161 naming_scheme: ManifestNamingScheme,
162 ) -> std::result::Result<ManifestLocation, Error> {
163 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
165
166 let copied = match store
167 .copy(staging_manifest_path, &final_manifest_path)
168 .await
169 {
170 Ok(_) => true,
171 Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()),
173 };
174 if copied {
175 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
176 }
177
178 let e_tag = if copied && size < 5 * 1024 * 1024 {
185 e_tag
186 } else {
187 let meta = store.head(&final_manifest_path).await?;
188 meta.e_tag
189 };
190
191 let location = ManifestLocation {
192 version,
193 path: final_manifest_path,
194 size: Some(size),
195 naming_scheme,
196 e_tag,
197 };
198
199 if !copied {
200 return Ok(location);
201 }
202
203 self.external_manifest_store
205 .put_if_exists(
206 base_path.as_ref(),
207 version,
208 location.path.as_ref(),
209 size,
210 location.e_tag.clone(),
211 )
212 .await?;
213
214 match store.delete(staging_manifest_path).await {
216 Ok(_) => {}
217 Err(ObjectStoreError::NotFound { .. }) => {}
218 Err(e) => return Err(e.into()),
219 }
220 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
221
222 Ok(location)
223 }
224}
225
226#[async_trait]
227impl CommitHandler for ExternalManifestCommitHandler {
228 async fn resolve_latest_location(
229 &self,
230 base_path: &Path,
231 object_store: &ObjectStore,
232 ) -> std::result::Result<ManifestLocation, Error> {
233 let location = self
234 .external_manifest_store
235 .get_latest_manifest_location(base_path.as_ref())
236 .await?;
237
238 match location {
239 Some(ManifestLocation {
240 version,
241 path,
242 size,
243 naming_scheme,
244 e_tag,
245 }) => {
246 if path.extension() == Some(MANIFEST_EXTENSION) {
248 return Ok(ManifestLocation {
249 version,
250 path,
251 size,
252 naming_scheme,
253 e_tag,
254 });
255 }
256
257 let (size, e_tag) = if let Some(size) = size {
258 (size, e_tag)
259 } else {
260 let meta = object_store.inner.head(&path).await?;
261 (meta.size, meta.e_tag)
262 };
263
264 let final_location = self
265 .finalize_manifest(
266 base_path,
267 &path,
268 version,
269 size,
270 e_tag.clone(),
271 &object_store.inner,
272 naming_scheme,
273 )
274 .await?;
275
276 Ok(final_location)
277 }
278 None => current_manifest_path(object_store, base_path).await,
281 }
282 }
283
284 async fn resolve_version_location(
285 &self,
286 base_path: &Path,
287 version: u64,
288 object_store: &dyn OSObjectStore,
289 ) -> std::result::Result<ManifestLocation, Error> {
290 let location_res = self
291 .external_manifest_store
292 .get_manifest_location(base_path.as_ref(), version)
293 .await;
294
295 let location = match location_res {
296 Ok(p) => p,
297 Err(Error::NotFound { .. }) => {
299 let path = default_resolve_version(base_path, version, object_store)
300 .await
301 .map_err(|_| Error::NotFound {
302 uri: format!("{}@{}", base_path, version),
303 location: location!(),
304 })?
305 .path;
306 match object_store.head(&path).await {
307 Ok(ObjectMeta { size, e_tag, .. }) => {
308 let res = self
309 .external_manifest_store
310 .put_if_not_exists(
311 base_path.as_ref(),
312 version,
313 path.as_ref(),
314 size,
315 e_tag.clone(),
316 )
317 .await;
318 if let Err(e) = res {
319 warn!(
320 "could not update external manifest store during load, with error: {}",
321 e
322 );
323 }
324 let naming_scheme =
325 ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
326 return Ok(ManifestLocation {
327 version,
328 path,
329 size: Some(size),
330 naming_scheme,
331 e_tag,
332 });
333 }
334 Err(ObjectStoreError::NotFound { .. }) => {
335 return Err(Error::NotFound {
336 uri: path.to_string(),
337 location: location!(),
338 });
339 }
340 Err(e) => return Err(e.into()),
341 }
342 }
343 Err(e) => return Err(e),
344 };
345
346 if location.path.extension() == Some(MANIFEST_EXTENSION) {
348 return Ok(location);
349 }
350
351 let naming_scheme =
352 ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
353
354 let (size, e_tag) = if let Some(size) = location.size {
355 (size, location.e_tag.clone())
356 } else {
357 let meta = object_store.head(&location.path).await?;
358 (meta.size as u64, meta.e_tag)
359 };
360
361 self.finalize_manifest(
362 base_path,
363 &location.path,
364 version,
365 size,
366 e_tag,
367 object_store,
368 naming_scheme,
369 )
370 .await
371 }
372
373 async fn commit(
374 &self,
375 manifest: &mut Manifest,
376 indices: Option<Vec<IndexMetadata>>,
377 base_path: &Path,
378 object_store: &ObjectStore,
379 manifest_writer: super::ManifestWriter,
380 naming_scheme: ManifestNamingScheme,
381 transaction: Option<Transaction>,
382 ) -> std::result::Result<ManifestLocation, CommitError> {
383 let path = naming_scheme.manifest_path(base_path, manifest.version);
388 let staging_path = make_staging_manifest_path(&path)?;
389 let write_res =
390 manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
391
392 let res = self
394 .external_manifest_store
395 .put_if_not_exists(
396 base_path.as_ref(),
397 manifest.version,
398 staging_path.as_ref(),
399 write_res.size as u64,
400 write_res.e_tag.clone(),
401 )
402 .await
403 .map_err(|_| CommitError::CommitConflict {});
404
405 if let Err(err) = res {
406 match object_store.inner.delete(&staging_path).await {
408 Ok(_) => {}
409 Err(ObjectStoreError::NotFound { .. }) => {}
410 Err(e) => return Err(CommitError::OtherError(e.into())),
411 }
412 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
413 return Err(err);
414 }
415
416 Ok(self
417 .finalize_manifest(
418 base_path,
419 &staging_path,
420 manifest.version,
421 write_res.size as u64,
422 write_res.e_tag,
423 &object_store.inner,
424 naming_scheme,
425 )
426 .await?)
427 }
428
429 async fn delete(&self, base_path: &Path) -> Result<()> {
430 self.external_manifest_store
431 .delete(base_path.as_ref())
432 .await
433 }
434}