1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::{Error, Result};
12use lance_io::object_store::ObjectStore;
13use log::warn;
14use object_store::ObjectMeta;
15use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
16use snafu::location;
17
18use super::{
19 current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
20 ManifestNamingScheme, MANIFEST_EXTENSION,
21};
22use crate::format::{Index, Manifest};
23use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
24
25#[async_trait]
38pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
39 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
41
42 async fn get_manifest_location(
43 &self,
44 base_uri: &str,
45 version: u64,
46 ) -> Result<ManifestLocation> {
47 let path = self.get(base_uri, version).await?;
48 let path = Path::from(path);
49 let naming_scheme = detect_naming_scheme_from_path(&path)?;
50 Ok(ManifestLocation {
51 version,
52 path,
53 size: None,
54 naming_scheme,
55 e_tag: None,
56 })
57 }
58
59 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
63
64 async fn get_latest_manifest_location(
70 &self,
71 base_uri: &str,
72 ) -> Result<Option<ManifestLocation>> {
73 self.get_latest_version(base_uri).await.and_then(|res| {
74 res.map(|(version, uri)| {
75 let path = Path::from(uri);
76 let naming_scheme = detect_naming_scheme_from_path(&path)?;
77 Ok(ManifestLocation {
78 version,
79 path,
80 size: None,
81 naming_scheme,
82 e_tag: None,
83 })
84 })
85 .transpose()
86 })
87 }
88
89 async fn put_if_not_exists(
91 &self,
92 base_uri: &str,
93 version: u64,
94 path: &str,
95 size: u64,
96 e_tag: Option<String>,
97 ) -> Result<()>;
98
99 async fn put_if_exists(
101 &self,
102 base_uri: &str,
103 version: u64,
104 path: &str,
105 size: u64,
106 e_tag: Option<String>,
107 ) -> Result<()>;
108
109 async fn delete(&self, _base_uri: &str) -> Result<()> {
111 Ok(())
112 }
113}
114
115pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
116 path.filename()
117 .and_then(|name| {
118 ManifestNamingScheme::detect_scheme(name)
119 .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
120 })
121 .ok_or_else(|| {
122 Error::corrupt_file(
123 path.clone(),
124 "Path does not follow known manifest naming convention.",
125 location!(),
126 )
127 })
128}
129
130#[derive(Debug)]
134pub struct ExternalManifestCommitHandler {
135 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
136}
137
138impl ExternalManifestCommitHandler {
139 #[allow(clippy::too_many_arguments)]
149 async fn finalize_manifest(
150 &self,
151 base_path: &Path,
152 staging_manifest_path: &Path,
153 version: u64,
154 size: u64,
155 e_tag: Option<String>,
156 store: &dyn OSObjectStore,
157 naming_scheme: ManifestNamingScheme,
158 ) -> std::result::Result<ManifestLocation, Error> {
159 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
161
162 let copied = match store
163 .copy(staging_manifest_path, &final_manifest_path)
164 .await
165 {
166 Ok(_) => true,
167 Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()),
169 };
170
171 let e_tag = if size < 5 * 1024 * 1024 {
176 e_tag
177 } else {
178 let meta = store.head(&final_manifest_path).await?;
179 meta.e_tag
180 };
181
182 let location = ManifestLocation {
183 version,
184 path: final_manifest_path,
185 size: Some(size),
186 naming_scheme,
187 e_tag,
188 };
189
190 if !copied {
191 return Ok(location);
192 }
193
194 self.external_manifest_store
196 .put_if_exists(
197 base_path.as_ref(),
198 version,
199 location.path.as_ref(),
200 size,
201 location.e_tag.clone(),
202 )
203 .await?;
204
205 match store.delete(staging_manifest_path).await {
207 Ok(_) => {}
208 Err(ObjectStoreError::NotFound { .. }) => {}
209 Err(e) => return Err(e.into()),
210 }
211
212 Ok(location)
213 }
214}
215
216#[async_trait]
217impl CommitHandler for ExternalManifestCommitHandler {
218 async fn resolve_latest_location(
219 &self,
220 base_path: &Path,
221 object_store: &ObjectStore,
222 ) -> std::result::Result<ManifestLocation, Error> {
223 let location = self
224 .external_manifest_store
225 .get_latest_manifest_location(base_path.as_ref())
226 .await?;
227
228 match location {
229 Some(ManifestLocation {
230 version,
231 path,
232 size,
233 naming_scheme,
234 e_tag,
235 }) => {
236 if path.extension() == Some(MANIFEST_EXTENSION) {
238 return Ok(ManifestLocation {
239 version,
240 path,
241 size,
242 naming_scheme,
243 e_tag,
244 });
245 }
246
247 let (size, e_tag) = if let Some(size) = size {
248 (size, e_tag)
249 } else {
250 let meta = object_store.inner.head(&path).await?;
251 (meta.size, meta.e_tag)
252 };
253
254 let final_location = self
255 .finalize_manifest(
256 base_path,
257 &path,
258 version,
259 size,
260 e_tag.clone(),
261 &object_store.inner,
262 naming_scheme,
263 )
264 .await?;
265
266 Ok(final_location)
267 }
268 None => current_manifest_path(object_store, base_path).await,
271 }
272 }
273
274 async fn resolve_version_location(
275 &self,
276 base_path: &Path,
277 version: u64,
278 object_store: &dyn OSObjectStore,
279 ) -> std::result::Result<ManifestLocation, Error> {
280 let location_res = self
281 .external_manifest_store
282 .get_manifest_location(base_path.as_ref(), version)
283 .await;
284
285 let location = match location_res {
286 Ok(p) => p,
287 Err(Error::NotFound { .. }) => {
289 let path = default_resolve_version(base_path, version, object_store)
290 .await
291 .map_err(|_| Error::NotFound {
292 uri: format!("{}@{}", base_path, version),
293 location: location!(),
294 })?
295 .path;
296 match object_store.head(&path).await {
297 Ok(ObjectMeta { size, e_tag, .. }) => {
298 let res = self
299 .external_manifest_store
300 .put_if_not_exists(
301 base_path.as_ref(),
302 version,
303 path.as_ref(),
304 size,
305 e_tag.clone(),
306 )
307 .await;
308 if let Err(e) = res {
309 warn!(
310 "could not update external manifest store during load, with error: {}",
311 e
312 );
313 }
314 let naming_scheme =
315 ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
316 return Ok(ManifestLocation {
317 version,
318 path,
319 size: Some(size),
320 naming_scheme,
321 e_tag,
322 });
323 }
324 Err(ObjectStoreError::NotFound { .. }) => {
325 return Err(Error::NotFound {
326 uri: path.to_string(),
327 location: location!(),
328 });
329 }
330 Err(e) => return Err(e.into()),
331 }
332 }
333 Err(e) => return Err(e),
334 };
335
336 if location.path.extension() == Some(MANIFEST_EXTENSION) {
338 return Ok(location);
339 }
340
341 let naming_scheme =
342 ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
343
344 let (size, e_tag) = if let Some(size) = location.size {
345 (size, location.e_tag.clone())
346 } else {
347 let meta = object_store.head(&location.path).await?;
348 (meta.size as u64, meta.e_tag)
349 };
350
351 self.finalize_manifest(
352 base_path,
353 &location.path,
354 version,
355 size,
356 e_tag,
357 object_store,
358 naming_scheme,
359 )
360 .await
361 }
362
363 async fn commit(
364 &self,
365 manifest: &mut Manifest,
366 indices: Option<Vec<Index>>,
367 base_path: &Path,
368 object_store: &ObjectStore,
369 manifest_writer: ManifestWriter,
370 naming_scheme: ManifestNamingScheme,
371 ) -> std::result::Result<ManifestLocation, CommitError> {
372 let path = naming_scheme.manifest_path(base_path, manifest.version);
377 let staging_path = make_staging_manifest_path(&path)?;
378 let write_res = manifest_writer(object_store, manifest, indices, &staging_path).await?;
379
380 let res = self
382 .external_manifest_store
383 .put_if_not_exists(
384 base_path.as_ref(),
385 manifest.version,
386 staging_path.as_ref(),
387 write_res.size as u64,
388 write_res.e_tag.clone(),
389 )
390 .await
391 .map_err(|_| CommitError::CommitConflict {});
392
393 if let Err(err) = res {
394 match object_store.inner.delete(&staging_path).await {
396 Ok(_) => {}
397 Err(ObjectStoreError::NotFound { .. }) => {}
398 Err(e) => return Err(CommitError::OtherError(e.into())),
399 }
400 return Err(err);
401 }
402
403 Ok(self
404 .finalize_manifest(
405 base_path,
406 &staging_path,
407 manifest.version,
408 write_res.size as u64,
409 write_res.e_tag,
410 &object_store.inner,
411 naming_scheme,
412 )
413 .await?)
414 }
415
416 async fn delete(&self, base_path: &Path) -> Result<()> {
417 self.external_manifest_store
418 .delete(base_path.as_ref())
419 .await
420 }
421}