use crate::error::Result;
use crate::io::FileIO;
use crate::reader::{DataFileEntry, DataFileStats, ManifestListReader, ManifestReader};
use crate::scan::TableScanBuilder;
use crate::spec::{PartitionField, PartitionSpec, Schema, Snapshot, TableIdent, TableMetadata};
use crate::transaction::Transaction;
#[derive(Clone)]
pub struct Table {
identifier: TableIdent,
metadata: TableMetadata,
metadata_location: String,
file_io: FileIO,
}
impl Table {
pub fn new(
identifier: TableIdent,
metadata: TableMetadata,
metadata_location: String,
file_io: FileIO,
) -> Self {
Self {
identifier,
metadata,
metadata_location,
file_io,
}
}
pub fn identifier(&self) -> &TableIdent {
&self.identifier
}
pub fn metadata(&self) -> &TableMetadata {
&self.metadata
}
pub fn schema(&self) -> Result<&Schema> {
self.metadata.current_schema()
}
pub fn location(&self) -> &str {
self.metadata.location()
}
pub fn metadata_location(&self) -> &str {
&self.metadata_location
}
pub fn file_io(&self) -> &FileIO {
&self.file_io
}
pub fn current_snapshot(&self) -> Option<&Snapshot> {
self.metadata.current_snapshot()
}
pub fn transaction(&self) -> Transaction {
Transaction::new(self.clone())
}
pub async fn files(&self) -> Result<Vec<DataFileEntry>> {
let snapshot = self
.current_snapshot()
.ok_or_else(|| crate::error::Error::invalid_input("Table has no current snapshot"))?;
let manifest_paths =
ManifestListReader::read(&self.file_io, snapshot.manifest_list()).await?;
let mut all_files = Vec::new();
for manifest_path in manifest_paths {
let files = ManifestReader::read(&self.file_io, &manifest_path).await?;
all_files.extend(files);
}
Ok(all_files)
}
pub fn scan(&self) -> TableScanBuilder<'_> {
TableScanBuilder::new(self)
}
pub async fn files_with_stats(&self) -> Result<Vec<DataFileStats>> {
let snapshot = self
.current_snapshot()
.ok_or_else(|| crate::error::Error::invalid_input("Table has no current snapshot"))?;
let manifest_paths =
ManifestListReader::read(&self.file_io, snapshot.manifest_list()).await?;
let mut all_files = Vec::new();
for manifest_path in manifest_paths {
let files = ManifestReader::read_with_stats(&self.file_io, &manifest_path).await?;
all_files.extend(files);
}
Ok(all_files)
}
pub fn current_partition_spec(&self) -> Option<&PartitionSpec> {
self.metadata.partition_specs().first()
}
pub fn partition_fields(&self) -> &[PartitionField] {
self.current_partition_spec()
.map(|s| s.fields())
.unwrap_or(&[])
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spec::{NamespaceIdent, NestedField, PrimitiveType, Type};
use opendal::Operator;
#[test]
fn test_table_creation() {
let schema = crate::spec::Schema::builder()
.with_fields(vec![NestedField::required_field(
1,
"id".to_string(),
Type::Primitive(PrimitiveType::Long),
)])
.build()
.unwrap();
let metadata = TableMetadata::builder()
.with_location("s3://test/table")
.with_current_schema(schema)
.build()
.unwrap();
let ident = TableIdent::new(
NamespaceIdent::new(vec!["default".to_string()]),
"test".to_string(),
);
let op = Operator::via_iter(opendal::Scheme::Memory, []).unwrap();
let file_io = FileIO::new(op);
let table = Table::new(
ident,
metadata,
"s3://test/metadata.json".to_string(),
file_io,
);
assert_eq!(table.location(), "s3://test/table");
}
}