use std::{fmt::Formatter, sync::Arc};
use futures::{TryFutureExt, stream::BoxStream};
use lance::io::WrappingObjectStore;
use object_store::{
Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, path::Path,
};
use async_trait::async_trait;
#[cfg(test)]
pub mod io_tracking;
#[derive(Debug)]
struct MirroringObjectStore {
primary: Arc<dyn ObjectStore>,
secondary: Arc<dyn ObjectStore>,
}
impl std::fmt::Display for MirroringObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, "MirrowingObjectStore")?;
writeln!(f, "primary:")?;
self.primary.fmt(f)?;
writeln!(f, "secondary:")?;
self.secondary.fmt(f)?;
Ok(())
}
}
trait PrimaryOnly {
fn primary_only(&self) -> bool;
}
impl PrimaryOnly for Path {
fn primary_only(&self) -> bool {
self.filename().unwrap_or("") == "_latest.manifest"
}
}
#[async_trait]
impl ObjectStore for MirroringObjectStore {
async fn put_opts(
&self,
location: &Path,
bytes: PutPayload,
options: PutOptions,
) -> Result<PutResult> {
if location.primary_only() {
self.primary.put_opts(location, bytes, options).await
} else {
self.secondary
.put_opts(location, bytes.clone(), options.clone())
.await?;
self.primary.put_opts(location, bytes, options).await
}
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
if location.primary_only() {
return self.primary.put_multipart_opts(location, opts).await;
}
let secondary = self
.secondary
.put_multipart_opts(location, opts.clone())
.await?;
let primary = self.primary.put_multipart_opts(location, opts).await?;
Ok(Box::new(MirroringUpload { primary, secondary }))
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.primary.get_opts(location, options).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.primary.head(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {
if !location.primary_only() {
match self.secondary.delete(location).await {
Err(Error::NotFound { .. }) | Ok(_) => {}
Err(e) => return Err(e),
}
}
self.primary.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.primary.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.primary.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
if to.primary_only() {
self.primary.copy(from, to).await
} else {
self.secondary.copy(from, to).await?;
self.primary.copy(from, to).await?;
Ok(())
}
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
if !to.primary_only() {
self.secondary.copy(from, to).await?;
}
self.primary.copy_if_not_exists(from, to).await
}
}
#[derive(Debug)]
struct MirroringUpload {
primary: Box<dyn MultipartUpload>,
secondary: Box<dyn MultipartUpload>,
}
#[async_trait]
impl MultipartUpload for MirroringUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let put_primary = self.primary.put_part(data.clone());
let put_secondary = self.secondary.put_part(data);
Box::pin(put_secondary.and_then(|_| put_primary))
}
async fn complete(&mut self) -> Result<PutResult> {
self.secondary.complete().await?;
self.primary.complete().await
}
async fn abort(&mut self) -> Result<()> {
self.secondary.abort().await?;
self.primary.abort().await
}
}
#[derive(Debug)]
pub struct MirroringObjectStoreWrapper {
secondary: Arc<dyn ObjectStore>,
}
impl MirroringObjectStoreWrapper {
pub fn new(secondary: Arc<dyn ObjectStore>) -> Self {
Self { secondary }
}
}
impl WrappingObjectStore for MirroringObjectStoreWrapper {
fn wrap(&self, _store_prefix: &str, primary: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(MirroringObjectStore {
primary,
secondary: self.secondary.clone(),
})
}
}
#[cfg(all(test, not(windows)))]
mod test {
use super::*;
use futures::TryStreamExt;
use lance::{dataset::WriteParams, io::ObjectStoreParams};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use object_store::local::LocalFileSystem;
use tempfile;
use crate::{
connect,
query::{ExecutableQuery, QueryBase},
table::WriteOptions,
};
#[ignore]
#[tokio::test]
async fn test_e2e() {
let dir1 = tempfile::tempdir().unwrap().keep().canonicalize().unwrap();
let dir2 = tempfile::tempdir().unwrap().keep().canonicalize().unwrap();
let secondary_store = LocalFileSystem::new_with_prefix(dir2.to_str().unwrap()).unwrap();
let object_store_wrapper = Arc::new(MirroringObjectStoreWrapper {
secondary: Arc::new(secondary_store),
});
let db = connect(dir1.to_str().unwrap()).execute().await.unwrap();
let mut param = WriteParams::default();
let store_params = ObjectStoreParams {
object_store_wrapper: Some(object_store_wrapper),
..Default::default()
};
param.store_params = Some(store_params);
let mut datagen = BatchGenerator::new();
datagen = datagen.col(Box::<IncrementingInt32>::default());
datagen = datagen.col(Box::new(RandomVector::default().named("vector".into())));
let data: Box<dyn arrow_array::RecordBatchReader + Send> = Box::new(datagen.batch(100));
let res = db
.create_table("test", data)
.write_options(WriteOptions {
lance_write_params: Some(param),
})
.execute()
.await;
let t = res.unwrap();
assert_eq!(t.count_rows(None).await.unwrap(), 100);
let q = t
.query()
.limit(10)
.nearest_to(&[0.1, 0.1, 0.1, 0.1])
.unwrap()
.execute()
.await
.unwrap();
let bateches = q.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(bateches.len(), 1);
assert_eq!(bateches[0].num_rows(), 10);
use walkdir::WalkDir;
let primary_location = dir1.join("test.lance").canonicalize().unwrap();
let secondary_location = dir2.join(primary_location.strip_prefix("/").unwrap());
let should_skip = |path: &std::path::Path| -> bool {
let path_str = path.to_str().unwrap();
path_str.contains("_latest.manifest")
|| path_str.contains("_versions")
|| path_str.contains("_transactions")
};
let primary_files: Vec<_> = WalkDir::new(&primary_location)
.into_iter()
.filter_entry(|e| !should_skip(e.path()))
.filter_map(|e| e.ok())
.map(|e| {
e.path()
.strip_prefix(&primary_location)
.unwrap()
.to_path_buf()
})
.collect();
let secondary_files: Vec<_> = WalkDir::new(&secondary_location)
.into_iter()
.filter_entry(|e| !should_skip(e.path()))
.filter_map(|e| e.ok())
.map(|e| {
e.path()
.strip_prefix(&secondary_location)
.unwrap()
.to_path_buf()
})
.collect();
assert_eq!(primary_files, secondary_files, "File lists should match");
}
}