use std::sync::Arc;
use lance_core::Result;
use lance_datafusion::pb;
use lance_io::object_store::StorageOptions;
use prost::Message;
use crate::Dataset;
use crate::dataset::builder::DatasetBuilder;
pub async fn table_identifier_from_dataset(dataset: &Dataset) -> Result<pb::TableIdentifier> {
Ok(pb::TableIdentifier {
uri: dataset.uri().to_string(),
version: dataset.manifest.version,
manifest_etag: dataset.manifest_location.e_tag.clone(),
serialized_manifest: None,
storage_options: dataset
.latest_storage_options()
.await?
.map(|StorageOptions(m)| m)
.unwrap_or_default(),
})
}
pub async fn table_identifier_from_dataset_with_manifest(
dataset: &Dataset,
) -> Result<pb::TableIdentifier> {
let manifest_proto = lance_table::format::pb::Manifest::from(dataset.manifest.as_ref());
Ok(pb::TableIdentifier {
uri: dataset.uri().to_string(),
version: dataset.manifest.version,
manifest_etag: dataset.manifest_location.e_tag.clone(),
serialized_manifest: Some(manifest_proto.encode_to_vec()),
storage_options: dataset
.latest_storage_options()
.await?
.map(|StorageOptions(m)| m)
.unwrap_or_default(),
})
}
pub async fn open_dataset_from_table_identifier(
table_id: &pb::TableIdentifier,
) -> Result<Arc<Dataset>> {
let mut builder = DatasetBuilder::from_uri(&table_id.uri).with_version(table_id.version);
if let Some(manifest_bytes) = &table_id.serialized_manifest {
builder = builder.with_serialized_manifest(manifest_bytes)?;
}
if !table_id.storage_options.is_empty() {
builder = builder.with_storage_options(table_id.storage_options.clone());
}
Ok(Arc::new(builder.load().await?))
}
pub async fn resolve_dataset(
dataset: Option<Arc<Dataset>>,
table_id: Option<&pb::TableIdentifier>,
) -> Result<Arc<Dataset>> {
use lance_core::Error;
match dataset {
Some(ds) => Ok(ds),
None => {
let table_id = table_id.ok_or_else(|| {
Error::invalid_input_source("Missing TableIdentifier in proto".into())
})?;
open_dataset_from_table_identifier(table_id).await
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::RecordBatchIterator;
use arrow_array::types::UInt32Type;
use lance_datagen::{array, gen_batch};
use std::collections::HashMap;
async fn make_test_dataset() -> (Arc<Dataset>, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let batch = gen_batch()
.col("x", array::step::<UInt32Type>())
.col("y", array::step::<UInt32Type>())
.into_batch_rows(lance_datagen::RowCount::from(100))
.unwrap();
let path = dir.path().join("test.lance");
let ds = Dataset::write(
RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
path.to_str().unwrap(),
None,
)
.await
.unwrap();
(Arc::new(ds), dir)
}
#[test]
fn test_table_identifier_proto_roundtrip() {
let id = pb::TableIdentifier {
uri: "s3://bucket/table.lance".to_string(),
version: 42,
manifest_etag: Some("etag123".to_string()),
serialized_manifest: None,
storage_options: HashMap::new(),
};
let bytes = id.encode_to_vec();
let back = pb::TableIdentifier::decode(bytes.as_slice()).unwrap();
assert_eq!(id.uri, back.uri);
assert_eq!(id.version, back.version);
assert_eq!(id.manifest_etag, back.manifest_etag);
assert!(back.serialized_manifest.is_none());
}
#[test]
fn test_table_identifier_proto_with_storage_options() {
let mut opts = HashMap::new();
opts.insert("region".to_string(), "us-east-1".to_string());
opts.insert("endpoint".to_string(), "https://s3.example.com".to_string());
let id = pb::TableIdentifier {
uri: "s3://bucket/table.lance".to_string(),
version: 7,
manifest_etag: None,
serialized_manifest: None,
storage_options: opts.clone(),
};
let bytes = id.encode_to_vec();
let back = pb::TableIdentifier::decode(bytes.as_slice()).unwrap();
assert_eq!(back.storage_options, opts);
}
#[tokio::test]
async fn test_table_identifier_from_dataset_roundtrip() {
let (dataset, _dir) = make_test_dataset().await;
let id = table_identifier_from_dataset(&dataset).await.unwrap();
assert_eq!(id.uri, dataset.uri());
assert_eq!(id.version, dataset.manifest.version);
assert!(id.serialized_manifest.is_none());
let back = open_dataset_from_table_identifier(&id).await.unwrap();
assert_eq!(back.uri(), dataset.uri());
assert_eq!(back.manifest.version, dataset.manifest.version);
}
#[tokio::test]
async fn test_table_identifier_with_manifest_roundtrip() {
let (dataset, _dir) = make_test_dataset().await;
let id = table_identifier_from_dataset_with_manifest(&dataset)
.await
.unwrap();
assert_eq!(id.uri, dataset.uri());
assert_eq!(id.version, dataset.manifest.version);
assert!(id.serialized_manifest.is_some());
let manifest_bytes = id.serialized_manifest.as_ref().unwrap();
let _manifest_proto =
lance_table::format::pb::Manifest::decode(manifest_bytes.as_slice()).unwrap();
let back = open_dataset_from_table_identifier(&id).await.unwrap();
assert_eq!(back.uri(), dataset.uri());
assert_eq!(back.manifest.version, dataset.manifest.version);
}
}