use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use crate::caches::{SnapshotInfo, TableStats};
use crate::catalog::{Catalog, ColumnRef, SchemaRef, TableRef};
use crate::spec::DucklakeSnapshot;
use crate::{DucklakeResult, db};
pub struct CommitState<'a> {
snapshot_id: i64,
schema_version: i64,
next_catalog_id: i64,
next_file_id: i64,
catalog: Cow<'a, Catalog>,
next_column_ids: HashMap<i64, i64>,
table_stats: Option<HashMap<i64, TableStats>>,
}
impl<'a> CommitState<'a> {
pub fn new(
snapshot_info: &SnapshotInfo,
catalog: Cow<'a, Catalog>,
schema_changed: bool,
table_stats: Option<HashMap<i64, TableStats>>,
) -> Self {
Self {
snapshot_id: snapshot_info.id + 1,
schema_version: snapshot_info.schema_version + (schema_changed as i64),
next_catalog_id: snapshot_info.next_catalog_id,
next_file_id: snapshot_info.next_file_id,
catalog,
next_column_ids: HashMap::new(),
table_stats,
}
}
}
impl<'a> CommitState<'a> {
pub fn schema_id(&mut self, schema_ref: SchemaRef) -> i64 {
match self.catalog.schema_id(schema_ref) {
Some(id) => id,
None => {
let id = self.catalog_id();
self.catalog.to_mut().resolve_schema_id(schema_ref, id);
id
}
}
}
pub fn table_id(&mut self, table_ref: TableRef) -> i64 {
match self.catalog.table_id(table_ref) {
Some(id) => id,
None => {
let id = self.catalog_id();
self.catalog.to_mut().resolve_table_id(table_ref, id);
id
}
}
}
pub fn column_id(&mut self, column_ref: ColumnRef) -> i64 {
match self.catalog.column_id(column_ref) {
Some(id) => id,
None => {
let table_id = self
.catalog
.table_id(column_ref.table_ref)
.expect("table ID must be set before resolving column IDs");
let id = self.next_column_id(table_id);
self.catalog.to_mut().resolve_column_id(column_ref, id);
id
}
}
}
pub fn partition_id(&mut self, table_ref: TableRef) -> i64 {
match self.catalog.partition_id(table_ref) {
Some(id) => id,
None => {
let id = self.catalog_id();
self.catalog.to_mut().resolve_partition_id(table_ref, id);
id
}
}
}
pub async fn ensure_next_column_id_set(
&mut self,
table_id: i64,
fetch_id: impl Future<Output = DucklakeResult<i64>>,
) -> DucklakeResult<()> {
if let Entry::Vacant(entry) = self.next_column_ids.entry(table_id) {
let id = fetch_id.await?;
entry.insert(id);
}
Ok(())
}
pub async fn table_stats(&mut self, table_id: i64) -> DucklakeResult<&mut TableStats> {
let stats = self
.table_stats
.as_mut()
.expect("table stats are unset but requested");
Ok(stats.entry(table_id).or_default())
}
}
impl<'a> CommitState<'a> {
pub fn snapshot_id(&self) -> i64 {
self.snapshot_id
}
pub fn schema_version(&self) -> i64 {
self.schema_version
}
pub fn file_id(&mut self) -> i64 {
let file_id = self.next_file_id;
self.next_file_id += 1;
file_id
}
pub fn table_schema(&self, table_ref: TableRef) -> crate::Schema {
self.catalog.table_info_by_ref(table_ref).schema
}
fn catalog_id(&mut self) -> i64 {
let catalog_id = self.next_catalog_id;
self.next_catalog_id += 1;
catalog_id
}
fn next_column_id(&mut self, table_id: i64) -> i64 {
let column_id = self
.next_column_ids
.get_mut(&table_id)
.expect("next column ID must be set for table");
let next_id = *column_id;
*column_id += 1;
next_id
}
}
impl<'a> From<&CommitState<'a>> for DucklakeSnapshot {
fn from(metadata: &CommitState<'a>) -> Self {
Self {
snapshot_id: metadata.snapshot_id,
snapshot_time: db::UtcDateTime::now(),
schema_version: metadata.schema_version,
next_catalog_id: metadata.next_catalog_id,
next_file_id: metadata.next_file_id,
}
}
}