use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::RecordBatchIterator;
use async_trait::async_trait;
use lance::Dataset;
use lance::Error as LanceError;
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
use lance_namespace::NamespaceError;
#[cfg(test)]
use lance_namespace::models::CreateTableVersionRequest;
use crate::error::{OmniError, Result};
#[cfg(test)]
use super::SubTableUpdate;
use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id};
use super::metadata::{TableVersionMetadata, parse_namespace_version_request};
use super::migrations::{read_stamp, refuse_if_stamp_unsupported};
use super::state::{
GraphLineageRow, GraphLineageRowPart, ManifestState, assemble_manifest_state,
graph_lineage_row_parts, head_lineage_row, manifest_rows_batch, manifest_schema,
read_manifest_state, read_publish_scan,
};
use super::{
ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION,
SubTableEntry, TableRegistration, TableTombstone,
};
const PUBLISHER_RETRY_BUDGET: u32 = 5;
#[derive(Debug, Clone)]
pub(crate) struct LineageIntent {
pub graph_commit_id: String,
pub branch: Option<String>,
pub actor_id: Option<String>,
pub merged_parent_commit_id: Option<String>,
pub created_at: i64,
}
#[derive(Debug)]
pub(super) struct PublishOutcome {
pub dataset: Dataset,
pub parent_commit_id: Option<String>,
pub known_state: ManifestState,
}
#[async_trait]
pub(super) trait ManifestBatchPublisher: Send + Sync {
async fn publish(
&self,
changes: &[ManifestChange],
expected_table_versions: &HashMap<String, u64>,
lineage: Option<&LineageIntent>,
) -> Result<PublishOutcome>;
}
pub(super) struct GraphNamespacePublisher {
root_uri: String,
branch: Option<String>,
}
#[derive(Debug)]
struct PendingVersionRow {
object_id: String,
object_type: String,
location: Option<String>,
metadata: Option<String>,
table_key: String,
table_version: Option<u64>,
table_branch: Option<String>,
row_count: Option<u64>,
}
struct LoadedPublishState {
dataset: Dataset,
registered_tables: HashMap<String, String>,
existing_versions: HashMap<(String, u64), SubTableEntry>,
existing_tombstones: HashMap<(String, u64), ()>,
lineage_rows: Vec<GraphLineageRow>,
}
impl GraphNamespacePublisher {
pub(super) fn new(root_uri: &str, branch: Option<&str>) -> Self {
Self {
root_uri: root_uri.trim_end_matches('/').to_string(),
branch: branch
.filter(|branch| *branch != "main")
.map(ToOwned::to_owned),
}
}
async fn dataset(&self) -> Result<Dataset> {
open_manifest_dataset(&self.root_uri, self.branch.as_deref()).await
}
async fn load_publish_state(&self) -> Result<LoadedPublishState> {
crate::failpoints::maybe_fail_retryable_contention(
crate::failpoints::names::PUBLISH_LOAD_STATE_RETRYABLE_CONTENTION,
)?;
let dataset = self.dataset().await?;
refuse_if_stamp_unsupported(read_stamp(&dataset))?;
let scan = read_publish_scan(&dataset).await?;
let existing_versions = scan
.version_entries
.iter()
.map(|entry| {
(
(entry.table_key.clone(), entry.table_version),
entry.clone(),
)
})
.collect();
let existing_tombstones = scan.tombstones.into_iter().collect();
Ok(LoadedPublishState {
dataset,
registered_tables: scan.table_locations,
existing_versions,
existing_tombstones,
lineage_rows: scan.lineage_rows,
})
}
fn build_pending_rows(
changes: &[ManifestChange],
known_tables: &HashMap<String, String>,
existing_versions: &HashMap<(String, u64), SubTableEntry>,
existing_tombstones: &HashMap<(String, u64), ()>,
) -> Result<Vec<PendingVersionRow>> {
let mut request_versions = HashMap::<(String, u64), ()>::new();
let mut known_tables = known_tables.clone();
let mut rows = Vec::with_capacity(changes.len());
for change in changes {
if let ManifestChange::RegisterTable(TableRegistration {
table_key,
table_path,
}) = change
{
if let Some(existing_path) = known_tables.get(table_key) {
if existing_path != table_path {
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table {} already exists with different path {}",
table_key, existing_path
),
}
.to_string(),
));
}
} else {
known_tables.insert(table_key.clone(), table_path.clone());
}
rows.push(PendingVersionRow {
object_id: table_key.clone(),
object_type: OBJECT_TYPE_TABLE.to_string(),
location: Some(table_path.clone()),
metadata: None,
table_key: table_key.clone(),
table_version: None,
table_branch: None,
row_count: None,
});
}
}
for change in changes {
match change {
ManifestChange::RegisterTable(_) => {}
ManifestChange::Update(update) => {
let request = update.to_create_table_version_request();
let (table_key, table_version, row_count, table_branch, version_metadata) =
parse_namespace_version_request(&request)
.map_err(|e| OmniError::Lance(e.to_string()))?;
if !known_tables.contains_key(table_key.as_str()) {
return Err(OmniError::Lance(
NamespaceError::TableNotFound {
message: format!("table {} not found", table_key),
}
.to_string(),
));
}
if request_versions
.insert((table_key.clone(), table_version), ())
.is_some()
{
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table version {} already exists for {}",
table_version, table_key
),
}
.to_string(),
));
}
if let Some(existing) =
existing_versions.get(&(table_key.clone(), table_version))
{
let is_owner_branch_handoff = existing.row_count == row_count
&& existing.table_branch != table_branch;
if !is_owner_branch_handoff {
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table version {} already exists for {}",
table_version, table_key
),
}
.to_string(),
));
}
}
rows.push(PendingVersionRow {
object_id: version_object_id(&table_key, table_version),
object_type: OBJECT_TYPE_TABLE_VERSION.to_string(),
location: None,
metadata: Some(version_metadata.to_json_string()?),
table_key,
table_version: Some(table_version),
table_branch,
row_count: Some(row_count),
});
}
ManifestChange::Tombstone(TableTombstone {
table_key,
tombstone_version,
}) => {
if !known_tables.contains_key(table_key.as_str()) {
return Err(OmniError::Lance(
NamespaceError::TableNotFound {
message: format!("table {} not found", table_key),
}
.to_string(),
));
}
if existing_tombstones.contains_key(&(table_key.clone(), *tombstone_version)) {
return Err(OmniError::Lance(
NamespaceError::ConcurrentModification {
message: format!(
"table tombstone {} already exists for {}",
tombstone_version, table_key
),
}
.to_string(),
));
}
rows.push(PendingVersionRow {
object_id: tombstone_object_id(table_key, *tombstone_version),
object_type: OBJECT_TYPE_TABLE_TOMBSTONE.to_string(),
location: None,
metadata: None,
table_key: table_key.clone(),
table_version: Some(*tombstone_version),
table_branch: None,
row_count: None,
});
}
}
}
Ok(rows)
}
fn resolve_lineage_rows(
lineage_rows: &[GraphLineageRow],
intent: &LineageIntent,
new_manifest_version: u64,
) -> Result<(Vec<PendingVersionRow>, Option<String>)> {
let parent_commit_id = head_lineage_row(lineage_rows).map(|h| h.graph_commit_id.clone());
let commit = GraphLineageRow {
graph_commit_id: intent.graph_commit_id.clone(),
manifest_branch: intent.branch.clone(),
manifest_version: new_manifest_version,
parent_commit_id: parent_commit_id.clone(),
merged_parent_commit_id: intent.merged_parent_commit_id.clone(),
actor_id: intent.actor_id.clone(),
created_at: intent.created_at,
};
let parts = graph_lineage_row_parts(&commit, intent.branch.as_deref())?;
Ok((
parts.into_iter().map(lineage_part_to_pending).collect(),
parent_commit_id,
))
}
fn pending_rows_to_batch(rows: Vec<PendingVersionRow>) -> Result<arrow_array::RecordBatch> {
let mut object_ids = Vec::with_capacity(rows.len());
let mut object_types = Vec::with_capacity(rows.len());
let mut locations: Vec<Option<String>> = Vec::with_capacity(rows.len());
let mut metadata = Vec::with_capacity(rows.len());
let mut table_keys = Vec::with_capacity(rows.len());
let mut table_versions: Vec<Option<u64>> = Vec::with_capacity(rows.len());
let mut table_branches = Vec::with_capacity(rows.len());
let mut row_counts: Vec<Option<u64>> = Vec::with_capacity(rows.len());
for row in rows {
object_ids.push(row.object_id);
object_types.push(row.object_type);
locations.push(row.location);
metadata.push(row.metadata);
table_keys.push(row.table_key);
table_versions.push(row.table_version);
table_branches.push(row.table_branch);
row_counts.push(row.row_count);
}
manifest_rows_batch(
object_ids,
object_types,
locations,
metadata,
table_keys,
table_versions,
table_branches,
row_counts,
)
}
fn latest_visible_per_table(
existing_versions: &HashMap<(String, u64), SubTableEntry>,
existing_tombstones: &HashMap<(String, u64), ()>,
) -> HashMap<String, u64> {
let mut max_tombstones = HashMap::<String, u64>::new();
for (key, version) in existing_tombstones.keys() {
max_tombstones
.entry(key.clone())
.and_modify(|v| {
if *version > *v {
*v = *version;
}
})
.or_insert(*version);
}
let mut latest = HashMap::<String, u64>::new();
for (key, version) in existing_versions.keys() {
let tombstoned = max_tombstones
.get(key)
.map(|t| *t >= *version)
.unwrap_or(false);
if tombstoned {
continue;
}
latest
.entry(key.clone())
.and_modify(|v| {
if *version > *v {
*v = *version;
}
})
.or_insert(*version);
}
for (key, tombstone) in &max_tombstones {
latest.entry(key.clone()).or_insert(*tombstone);
}
latest
}
fn fold_inputs(
existing_versions: &HashMap<(String, u64), SubTableEntry>,
existing_tombstones: &HashMap<(String, u64), ()>,
rows: &[PendingVersionRow],
registered_tables: &HashMap<String, String>,
) -> Result<(Vec<SubTableEntry>, Vec<(String, u64)>)> {
let mut table_locations: HashMap<String, String> = registered_tables.clone();
for row in rows {
if row.object_type == OBJECT_TYPE_TABLE {
if let Some(location) = &row.location {
table_locations.insert(row.table_key.clone(), location.clone());
}
}
}
let mut version_map: HashMap<(String, u64), SubTableEntry> = existing_versions.clone();
let mut tombstones: Vec<(String, u64)> = existing_tombstones
.keys()
.map(|(key, version)| (key.clone(), *version))
.collect();
for row in rows {
match row.object_type.as_str() {
OBJECT_TYPE_TABLE_VERSION => {
let table_version = row.table_version.ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: table_version row missing version for {}",
row.table_key
))
})?;
let table_path =
table_locations.get(&row.table_key).cloned().ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: missing table row for {}",
row.table_key
))
})?;
let metadata_json = row.metadata.as_deref().ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: table_version row missing metadata for {}",
row.table_key
))
})?;
version_map.insert(
(row.table_key.clone(), table_version),
SubTableEntry {
table_key: row.table_key.clone(),
table_path,
table_version,
table_branch: row.table_branch.clone(),
row_count: row.row_count.ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: table_version row missing row_count for {}",
row.table_key
))
})?,
version_metadata: TableVersionMetadata::from_json_str(metadata_json)?,
},
);
}
OBJECT_TYPE_TABLE_TOMBSTONE => {
let tombstone_version = row.table_version.ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: tombstone row missing version for {}",
row.table_key
))
})?;
tombstones.push((row.table_key.clone(), tombstone_version));
}
_ => {}
}
}
Ok((version_map.into_values().collect(), tombstones))
}
fn check_expected_table_versions(
latest_per_table: &HashMap<String, u64>,
expected: &HashMap<String, u64>,
) -> Result<()> {
for (table_key, expected_version) in expected {
let actual = latest_per_table.get(table_key).copied().unwrap_or(0);
if actual != *expected_version {
return Err(OmniError::manifest_expected_version_mismatch(
table_key.clone(),
*expected_version,
actual,
));
}
}
Ok(())
}
async fn merge_rows(&self, dataset: Dataset, rows: Vec<PendingVersionRow>) -> Result<Dataset> {
let batch = Self::pending_rows_to_batch(rows)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)], manifest_schema());
let dataset = Arc::new(dataset);
let mut merge_builder = MergeInsertBuilder::try_new(dataset, vec!["object_id".to_string()])
.map_err(|e| OmniError::Lance(e.to_string()))?;
merge_builder.when_matched(WhenMatched::UpdateAll);
merge_builder.when_not_matched(WhenNotMatched::InsertAll);
merge_builder.conflict_retries(0);
merge_builder.use_index(false);
merge_builder.skip_auto_cleanup(true);
let (new_dataset, _stats) = merge_builder
.try_build()
.map_err(|e| OmniError::Lance(e.to_string()))?
.execute_reader(Box::new(reader))
.await
.map_err(map_lance_publish_error)?;
Ok(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone()))
}
#[cfg(test)]
pub(super) async fn publish_requests(
&self,
requests: &[CreateTableVersionRequest],
) -> Result<Dataset> {
let changes = requests
.iter()
.cloned()
.map(|request| {
let (table_key, table_version, row_count, table_branch, version_metadata) =
parse_namespace_version_request(&request)
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(ManifestChange::Update(SubTableUpdate {
table_key,
table_version,
table_branch,
row_count,
version_metadata,
}))
})
.collect::<Result<Vec<_>>>()?;
Ok(self.publish(&changes, &HashMap::new(), None).await?.dataset)
}
}
fn lineage_part_to_pending(part: GraphLineageRowPart) -> PendingVersionRow {
PendingVersionRow {
object_id: part.object_id,
object_type: part.object_type.to_string(),
location: None,
metadata: Some(part.metadata),
table_key: String::new(),
table_version: part.table_version,
table_branch: part.table_branch,
row_count: None,
}
}
pub(crate) fn map_lance_publish_error(err: LanceError) -> OmniError {
if matches!(err, LanceError::TooMuchWriteContention { .. }) {
return OmniError::manifest_row_level_cas_contention(format!(
"manifest publish lost a row-level CAS race: {}",
err
));
}
OmniError::Lance(err.to_string())
}
#[async_trait]
impl ManifestBatchPublisher for GraphNamespacePublisher {
async fn publish(
&self,
changes: &[ManifestChange],
expected_table_versions: &HashMap<String, u64>,
lineage: Option<&LineageIntent>,
) -> Result<PublishOutcome> {
if changes.is_empty() && expected_table_versions.is_empty() && lineage.is_none() {
let dataset = self.dataset().await?;
let known_state = read_manifest_state(&dataset).await?;
return Ok(PublishOutcome {
dataset,
parent_commit_id: None,
known_state,
});
}
for attempt in 0..=PUBLISHER_RETRY_BUDGET {
let loaded = match self.load_publish_state().await {
Ok(loaded) => loaded,
Err(err)
if attempt < PUBLISHER_RETRY_BUDGET && is_retryable_publish_conflict(&err) =>
{
continue;
}
Err(err) => return Err(err),
};
let LoadedPublishState {
dataset,
registered_tables: known_tables,
existing_versions,
existing_tombstones,
lineage_rows,
} = loaded;
let latest_per_table =
Self::latest_visible_per_table(&existing_versions, &existing_tombstones);
Self::check_expected_table_versions(&latest_per_table, expected_table_versions)?;
let mut rows = Self::build_pending_rows(
changes,
&known_tables,
&existing_versions,
&existing_tombstones,
)?;
let parent_commit_id = match lineage {
Some(intent) => {
let new_manifest_version = dataset.version().version + 1;
let (commit_rows, parent) =
Self::resolve_lineage_rows(&lineage_rows, intent, new_manifest_version)?;
rows.extend(commit_rows);
parent
}
None => None,
};
if rows.is_empty() {
let known_state = assemble_manifest_state(
dataset.version().version,
existing_versions.values().cloned().collect(),
existing_tombstones
.keys()
.map(|(key, version)| (key.clone(), *version)),
);
return Ok(PublishOutcome {
dataset,
parent_commit_id,
known_state,
});
}
let (fold_entries, fold_tombstones) =
Self::fold_inputs(&existing_versions, &existing_tombstones, &rows, &known_tables)?;
match self.merge_rows(dataset, rows).await {
Ok(new_dataset) => {
let known_state = assemble_manifest_state(
new_dataset.version().version,
fold_entries,
fold_tombstones,
);
return Ok(PublishOutcome {
dataset: new_dataset,
parent_commit_id,
known_state,
});
}
Err(err) => {
if attempt < PUBLISHER_RETRY_BUDGET && is_retryable_publish_conflict(&err) {
continue;
}
return Err(err);
}
}
}
Err(OmniError::manifest_conflict(format!(
"manifest publish exhausted {} retries against concurrent writers",
PUBLISHER_RETRY_BUDGET
)))
}
}
pub(crate) fn is_retryable_publish_conflict(err: &OmniError) -> bool {
matches!(
err,
OmniError::Manifest(m)
if matches!(
m.details,
Some(crate::error::ManifestConflictDetails::RowLevelCasContention)
)
)
}