1use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::{Error, Result};
12use lance_io::object_store::{ObjectStore, ObjectStoreExt};
13use log::warn;
14use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
15use snafu::location;
16
17use super::{
18 current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
19 ManifestNamingScheme, MANIFEST_EXTENSION,
20};
21use crate::format::{Index, Manifest};
22use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
23
24#[async_trait]
37pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
38 async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
40
41 async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
45
46 async fn get_latest_manifest_location(
52 &self,
53 base_uri: &str,
54 ) -> Result<Option<ManifestLocation>> {
55 self.get_latest_version(base_uri).await.and_then(|res| {
56 res.map(|(version, uri)| {
57 let path = Path::from(uri);
58 let naming_scheme = detect_naming_scheme_from_path(&path)?;
59 Ok(ManifestLocation {
60 version,
61 path,
62 size: None,
63 naming_scheme,
64 })
65 })
66 .transpose()
67 })
68 }
69
70 async fn put_if_not_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>;
72
73 async fn put_if_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>;
75
76 async fn delete(&self, _base_uri: &str) -> Result<()> {
78 Ok(())
79 }
80}
81
82fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
83 path.filename()
84 .and_then(ManifestNamingScheme::detect_scheme)
85 .ok_or_else(|| {
86 Error::corrupt_file(
87 path.clone(),
88 "Path does not follow known manifest naming convention.",
89 location!(),
90 )
91 })
92}
93
94#[derive(Debug)]
98pub struct ExternalManifestCommitHandler {
99 pub external_manifest_store: Arc<dyn ExternalManifestStore>,
100}
101
102impl ExternalManifestCommitHandler {
103 async fn finalize_manifest(
113 &self,
114 base_path: &Path,
115 staging_manifest_path: &Path,
116 version: u64,
117 store: &dyn OSObjectStore,
118 naming_scheme: ManifestNamingScheme,
119 ) -> std::result::Result<Path, Error> {
120 let final_manifest_path = naming_scheme.manifest_path(base_path, version);
122 match store
123 .copy(staging_manifest_path, &final_manifest_path)
124 .await
125 {
126 Ok(_) => {}
127 Err(ObjectStoreError::NotFound { .. }) => return Ok(final_manifest_path), Err(e) => return Err(e.into()),
129 };
130
131 self.external_manifest_store
133 .put_if_exists(base_path.as_ref(), version, final_manifest_path.as_ref())
134 .await?;
135
136 match store.delete(staging_manifest_path).await {
138 Ok(_) => {}
139 Err(ObjectStoreError::NotFound { .. }) => {}
140 Err(e) => return Err(e.into()),
141 }
142
143 Ok(final_manifest_path)
144 }
145}
146
147#[async_trait]
148impl CommitHandler for ExternalManifestCommitHandler {
149 async fn resolve_latest_location(
150 &self,
151 base_path: &Path,
152 object_store: &ObjectStore,
153 ) -> std::result::Result<ManifestLocation, Error> {
154 let path = self.resolve_latest_version(base_path, object_store).await?;
155 let naming_scheme = detect_naming_scheme_from_path(&path)?;
156 Ok(ManifestLocation {
157 version: self
158 .resolve_latest_version_id(base_path, object_store)
159 .await?,
160 path,
161 size: None,
162 naming_scheme,
163 })
164 }
165
166 async fn resolve_latest_version(
168 &self,
169 base_path: &Path,
170 object_store: &ObjectStore,
171 ) -> std::result::Result<Path, Error> {
172 let version = self
173 .external_manifest_store
174 .get_latest_version(base_path.as_ref())
175 .await?;
176
177 match version {
178 Some((version, path)) => {
179 if path.ends_with(&format!(".{MANIFEST_EXTENSION}")) {
181 return Ok(Path::parse(path)?);
182 }
183
184 let staged_path = Path::parse(&path)?;
186 let naming_scheme =
187 ManifestNamingScheme::detect_scheme_staging(staged_path.filename().unwrap());
188
189 self.finalize_manifest(
190 base_path,
191 &staged_path,
192 version,
193 &object_store.inner,
194 naming_scheme,
195 )
196 .await
197 }
198 None => Ok(current_manifest_path(object_store, base_path).await?.path),
201 }
202 }
203
204 async fn resolve_latest_version_id(
205 &self,
206 base_path: &Path,
207 object_store: &ObjectStore,
208 ) -> std::result::Result<u64, Error> {
209 let version = self
210 .external_manifest_store
211 .get_latest_version(base_path.as_ref())
212 .await?;
213
214 match version {
215 Some((version, _)) => Ok(version),
216 None => Ok(current_manifest_path(object_store, base_path)
217 .await?
218 .version),
219 }
220 }
221
222 async fn resolve_version(
223 &self,
224 base_path: &Path,
225 version: u64,
226 object_store: &dyn OSObjectStore,
227 ) -> std::result::Result<Path, Error> {
228 let path_res = self
229 .external_manifest_store
230 .get(base_path.as_ref(), version)
231 .await;
232
233 let path = match path_res {
234 Ok(p) => p,
235 Err(Error::NotFound { .. }) => {
237 let path = default_resolve_version(base_path, version, object_store)
238 .await
239 .map_err(|_| Error::NotFound {
240 uri: format!("{}@{}", base_path, version),
241 location: location!(),
242 })?
243 .path;
244 if object_store.exists(&path).await? {
245 match self
247 .external_manifest_store
248 .put_if_not_exists(base_path.as_ref(), version, path.as_ref())
249 .await
250 {
251 Ok(_) => {}
252 Err(e) => {
253 warn!(
254 "could not update external manifest store during load, with error: {}",
255 e
256 );
257 }
258 }
259 return Ok(path);
260 } else {
261 return Err(Error::NotFound {
262 uri: path.to_string(),
263 location: location!(),
264 });
265 }
266 }
267 Err(e) => return Err(e),
268 };
269
270 let current_path = Path::parse(path)?;
272 if current_path.extension() == Some(MANIFEST_EXTENSION) {
273 return Ok(current_path);
274 }
275
276 let naming_scheme =
277 ManifestNamingScheme::detect_scheme_staging(current_path.filename().unwrap());
278
279 self.finalize_manifest(
280 base_path,
281 &Path::parse(¤t_path)?,
282 version,
283 object_store,
284 naming_scheme,
285 )
286 .await
287 }
288
289 async fn resolve_version_location(
290 &self,
291 base_path: &Path,
292 version: u64,
293 object_store: &dyn OSObjectStore,
294 ) -> std::result::Result<ManifestLocation, Error> {
295 let path = self
296 .resolve_version(base_path, version, object_store)
297 .await?;
298 let naming_scheme = detect_naming_scheme_from_path(&path)?;
299 Ok(ManifestLocation {
300 version,
301 path,
302 size: None,
303 naming_scheme,
304 })
305 }
306
307 async fn commit(
308 &self,
309 manifest: &mut Manifest,
310 indices: Option<Vec<Index>>,
311 base_path: &Path,
312 object_store: &ObjectStore,
313 manifest_writer: ManifestWriter,
314 naming_scheme: ManifestNamingScheme,
315 ) -> std::result::Result<Path, CommitError> {
316 let path = naming_scheme.manifest_path(base_path, manifest.version);
321 let staging_path = make_staging_manifest_path(&path)?;
322 manifest_writer(object_store, manifest, indices, &staging_path).await?;
323
324 let res = self
326 .external_manifest_store
327 .put_if_not_exists(base_path.as_ref(), manifest.version, staging_path.as_ref())
328 .await
329 .map_err(|_| CommitError::CommitConflict {});
330
331 if let Err(err) = res {
332 match object_store.inner.delete(&staging_path).await {
334 Ok(_) => {}
335 Err(ObjectStoreError::NotFound { .. }) => {}
336 Err(e) => return Err(CommitError::OtherError(e.into())),
337 }
338 return Err(err);
339 }
340
341 let scheme = detect_naming_scheme_from_path(&path)?;
342
343 Ok(self
344 .finalize_manifest(
345 base_path,
346 &staging_path,
347 manifest.version,
348 &object_store.inner,
349 scheme,
350 )
351 .await?)
352 }
353
354 async fn delete(&self, base_path: &Path) -> Result<()> {
355 self.external_manifest_store
356 .delete(base_path.as_ref())
357 .await
358 }
359}