use std::sync::Arc;
use sea_query::{Asterisk, ExprTrait, Iden, Query};
use super::caches::{Metadata, MetadataCache, Snapshot, SnapshotCache};
use crate::caches::SnapshotInfo;
use crate::spec::*;
use crate::*;
pub struct Ducklake {
conn: DucklakeConnection,
}
#[derive(Clone)]
#[repr(transparent)]
pub(crate) struct DucklakeConnection(Arc<DucklakeConnectionInner>);
struct DucklakeConnectionInner {
pool: db::Pool,
metadata_cache: Arc<MetadataCache>,
snapshot_cache: Arc<SnapshotCache>,
storage_options: Vec<(String, String)>,
travel_snapshot: Option<Arc<Snapshot>>,
}
impl Ducklake {
pub async fn create(options: CreateOptions) -> DucklakeResult<Self> {
let pool = db::Pool::new(&options.url).await?;
if pool
.table_exists(&spec::ducklake_metadata::Table.to_string())
.await?
{
return Err(DucklakeError::CatalogAlreadyInitialized);
}
let config = spec::InitConfig {
data_path: options.data_path,
};
spec::init_catalog(&pool, config).await?;
Self::new(pool, None, options.storage_options).await
}
pub async fn connect(options: ConnectOptions) -> DucklakeResult<Self> {
let pool = Self::init_catalog(&options.url, options.migrate).await?;
let snapshot = match options.connection_type {
ConnectionType::Latest => None,
ConnectionType::SnapshotId(id) => Some(SnapshotInfo::load_for_id(&pool, id).await?),
ConnectionType::SnapshotTimestamp(timestamp) => {
Some(SnapshotInfo::load_for_timestamp(&pool, timestamp).await?)
}
};
Self::new(pool, snapshot, options.storage_options).await
}
#[cfg(feature = "python")]
pub async fn disconnect(&mut self) {
self.conn.0.pool.close().await;
}
#[cfg(not(feature = "python"))]
pub async fn disconnect(self) {
self.conn.0.pool.close().await;
}
async fn init_catalog(url: &str, migrate: bool) -> DucklakeResult<db::Pool> {
let pool = db::Pool::new(url).await?;
if !pool
.table_exists(&spec::ducklake_metadata::Table.to_string())
.await?
{
return Err(DucklakeError::CatalogNotInitialized);
}
let version = get_version(&pool).await?;
if !spec::SUPPORTED_VERSIONS.contains(&version.as_str()) {
return Err(DucklakeError::UnsupportedVersion(version));
}
if version != spec::LATEST_VERSION && !migrate {
return Err(DucklakeError::OutdatedVersion(
version,
spec::LATEST_VERSION.to_string(),
));
}
spec::migrate_catalog(&pool, &version).await?;
Ok(pool)
}
async fn new(
pool: db::Pool,
travel_snapshot: Option<SnapshotInfo>,
storage_options: Vec<(String, String)>,
) -> DucklakeResult<Self> {
let has_travel_snapshot = travel_snapshot.is_some();
let snapshot_cache = SnapshotCache::new(pool.clone(), travel_snapshot).await?;
let metadata_cache = MetadataCache::new(pool.clone()).await?;
let travel_snapshot = if has_travel_snapshot {
Some(snapshot_cache.get_current())
} else {
None
};
let connection = DucklakeConnectionInner {
pool,
metadata_cache: Arc::new(metadata_cache),
snapshot_cache: Arc::new(snapshot_cache),
storage_options,
travel_snapshot,
};
let ducklake = Ducklake {
conn: DucklakeConnection(Arc::new(connection)),
};
Ok(ducklake)
}
}
impl Ducklake {
pub async fn at_snapshot_id(&self, snapshot_id: i64) -> DucklakeResult<Self> {
let snapshot_info = SnapshotInfo::load_for_id(self.conn.pool(), snapshot_id).await?;
self.at_snapshot(snapshot_info)
}
pub async fn at_snapshot_timestamp(
&self,
timestamp: chrono::DateTime<chrono::Utc>,
) -> DucklakeResult<Self> {
let snapshot_info = SnapshotInfo::load_for_timestamp(self.conn.pool(), timestamp).await?;
self.at_snapshot(snapshot_info)
}
fn at_snapshot(&self, snapshot_info: SnapshotInfo) -> DucklakeResult<Self> {
let travel_snapshot = self.conn.0.snapshot_cache.insert_snapshot(snapshot_info);
let connection = DucklakeConnectionInner {
pool: self.conn.0.pool.clone(),
metadata_cache: self.conn.0.metadata_cache.clone(),
snapshot_cache: self.conn.0.snapshot_cache.clone(),
storage_options: self.conn.0.storage_options.clone(),
travel_snapshot: Some(travel_snapshot),
};
let ducklake = Ducklake {
conn: DucklakeConnection(Arc::new(connection)),
};
Ok(ducklake)
}
}
pub struct SnapshotMetadata {
pub id: i64,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl Ducklake {
pub async fn latest_snapshot(&self) -> DucklakeResult<SnapshotMetadata> {
let snapshot = self.conn.latest_snapshot(false).await?;
let info = snapshot.info();
Ok(SnapshotMetadata {
id: info.id,
timestamp: info.snapshot_time,
})
}
pub async fn list_snapshots(&self) -> DucklakeResult<Vec<SnapshotMetadata>> {
if let Some(travel_snapshot) = &self.conn.0.travel_snapshot {
let info = travel_snapshot.info();
Ok(vec![SnapshotMetadata {
id: info.id,
timestamp: info.snapshot_time,
}])
} else {
list_snapshots(self.conn.pool()).await
}
}
}
impl Ducklake {
pub async fn table(
&self,
name: impl TryInto<TableName, Error = impl Into<DucklakeError>>,
) -> DucklakeResult<Table> {
let name = name.try_into().map_err(|e| e.into())?;
let snapshot = self.conn.latest_snapshot(true).await?;
let catalog = snapshot.catalog().await?;
let schema_id = catalog.try_schema_id_by_name(&name.schema)?;
let table_id = catalog.try_table_id_by_name(&name)?;
Ok(Table::new(self.conn.clone(), schema_id, table_id))
}
pub async fn list_tables(&self, schema: Option<&str>) -> DucklakeResult<Vec<Table>> {
let snapshot = self.conn.latest_snapshot(true).await?;
let catalog = snapshot.catalog().await?;
let table_ids = catalog.list_table_ids(schema);
let tables = table_ids
.into_iter()
.map(|id| {
catalog.try_table_name_by_id(id).map(|name| {
let schema_id = catalog.try_schema_id_by_name(&name.schema).unwrap();
Table::new(self.conn.clone(), schema_id, id)
})
})
.collect::<DucklakeResult<Vec<_>>>()?;
Ok(tables)
}
pub async fn list_schemas(&self) -> DucklakeResult<Vec<String>> {
let snapshot = self.conn.latest_snapshot(true).await?;
let catalog = snapshot.catalog().await?;
Ok(catalog.list_schema_names())
}
}
impl DucklakeConnection {
pub(crate) fn pool(&self) -> &db::Pool {
&self.0.pool
}
pub(crate) fn snapshot_cache(&self) -> &SnapshotCache {
&self.0.snapshot_cache
}
}
impl Ducklake {
pub async fn transaction(&self) -> DucklakeResult<Transaction<'_>> {
self.conn.transaction(None).await
}
pub async fn transaction_with_author(
&self,
author_info: AuthorInfo,
) -> DucklakeResult<Transaction<'_>> {
self.conn.transaction(Some(author_info)).await
}
}
impl DucklakeConnection {
pub async fn transaction(
&self,
author_info: Option<AuthorInfo>,
) -> DucklakeResult<Transaction<'_>> {
let snapshot = self.latest_snapshot(false).await?;
let metadata = self.0.metadata_cache.get_metadata();
let tx = Transaction::new(
&self.0.snapshot_cache,
&self.0.pool,
&self.0.storage_options,
metadata,
author_info.unwrap_or_default(),
snapshot,
)
.await?;
Ok(tx)
}
pub async fn latest_snapshot(
&self,
tolerate_immutable: bool,
) -> DucklakeResult<Arc<Snapshot>> {
if let Some(travel_snapshot) = &self.0.travel_snapshot {
if !tolerate_immutable {
return Err(DucklakeError::ImmutableDucklake);
}
Ok(travel_snapshot.clone())
} else {
self.0.snapshot_cache.get_latest().await
}
}
pub fn current_snapshot(&self) -> Arc<Snapshot> {
if let Some(travel_snapshot) = &self.0.travel_snapshot {
travel_snapshot.clone()
} else {
self.0.snapshot_cache.get_current()
}
}
}
macro_rules! within_transaction {
($(
$(#[$meta:meta])*
fn $name:ident($($arg:ident: $ty:ty),*) -> $ret:ty;
)*) => {
impl Ducklake {
$(
$(#[$meta])*
pub async fn $name(&self, $($arg: $ty),*) -> $ret {
let mut tx = self.transaction().await?;
let result = tx.$name($($arg),*)?;
tx.commit().await?;
Ok(result)
}
)*
}
};
}
within_transaction! {
fn create_schema(name: &str, path: Option<String>) -> DucklakeResult<()>;
fn delete_schema(name: &str) -> DucklakeResult<()>;
}
impl Ducklake {
pub async fn create_table(
&self,
name: impl TryInto<TableName, Error = impl Into<DucklakeError>>,
columns: Vec<Column>,
partition_columns: Option<Vec<PartitionColumn>>,
path: Option<String>,
tags: Option<Vec<Tag>>,
) -> DucklakeResult<()> {
let mut tx = self.transaction().await?;
tx.create_table(name, columns, partition_columns, path, tags)?;
tx.commit().await?;
Ok(())
}
}
impl DucklakeConnection {
pub(crate) fn metadata(&self) -> Arc<Metadata> {
self.0.metadata_cache.get_metadata()
}
}
impl Ducklake {
pub async fn set_metadata(
&self,
key: &str,
value: &str,
schema: Option<&str>,
) -> DucklakeResult<()> {
if let Some(schema_name) = schema {
let snapshot = self.conn.latest_snapshot(true).await?;
let catalog = snapshot.catalog().await?;
let schema_id = catalog.try_schema_id_by_name(schema_name)?;
self.conn
.0
.metadata_cache
.set_schema(schema_id, key.to_string(), value.to_string())
.await?;
} else {
self.conn
.0
.metadata_cache
.set_global(key.to_string(), value.to_string())
.await?;
}
Ok(())
}
pub async fn unset_metadata(&self, key: &str, schema: Option<&str>) -> DucklakeResult<()> {
if let Some(schema_name) = schema {
let snapshot = self.conn.latest_snapshot(true).await?;
let catalog = snapshot.catalog().await?;
let schema_id = catalog.try_schema_id_by_name(schema_name)?;
self.conn
.0
.metadata_cache
.unset_schema(schema_id, key)
.await?;
} else {
self.conn.0.metadata_cache.unset_global(key).await?;
}
Ok(())
}
}
impl DucklakeConnection {
pub(crate) async fn set_table_metadata(
&self,
key: &str,
value: &str,
table_id: i64,
) -> DucklakeResult<()> {
self.0
.metadata_cache
.set_table(table_id, key.to_string(), value.to_string())
.await
}
pub(crate) async fn unset_table_metadata(
&self,
key: &str,
table_id: i64,
) -> DucklakeResult<()> {
self.0.metadata_cache.unset_table(table_id, key).await
}
}
async fn get_version(pool: &db::Pool) -> DucklakeResult<String> {
let query = Query::select()
.column(ducklake_metadata::Column::Value)
.from(ducklake_metadata::Table)
.and_where(
ducklake_metadata::Column::Key
.col()
.eq(spec::metadata::VERSION),
)
.to_owned();
let version: (String,) = pool.fetch_one(&query).await?;
Ok(version.0)
}
async fn list_snapshots(pool: &db::Pool) -> DucklakeResult<Vec<SnapshotMetadata>> {
let query = Query::select()
.column(Asterisk)
.from(ducklake_snapshot::Table)
.order_by(
ducklake_snapshot::Column::SnapshotId,
sea_query::Order::Desc,
)
.to_owned();
let snapshots: Vec<DucklakeSnapshot> = pool.fetch_all(&query).await?;
Ok(snapshots
.into_iter()
.map(|snapshot| SnapshotMetadata {
id: snapshot.snapshot_id,
timestamp: snapshot.snapshot_time.0,
})
.collect())
}