use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use bytes::Bytes;
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use serde::{Deserialize, Serialize};
use uni_store::storage::manager::StorageManager;
use uni_store::store_utils::{delete_with_timeout, get_with_timeout, put_with_timeout};
const INTENT_TIMEOUT: Duration = Duration::from_secs(30);
fn intent_path() -> ObjectStorePath {
ObjectStorePath::from("catalog/bulk_flush_intent.json")
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) enum BulkIntentPhase {
Active,
Committed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct BulkFlushIntent {
pub phase: BulkIntentPhase,
pub tables: BTreeMap<String, Option<u64>>,
pub snapshot_id: Option<String>,
}
pub(crate) async fn write_active(
store: &Arc<dyn ObjectStore>,
tables: &std::collections::HashMap<String, Option<u64>>,
) -> Result<()> {
let intent = BulkFlushIntent {
phase: BulkIntentPhase::Active,
tables: tables.iter().map(|(k, v)| (k.clone(), *v)).collect(),
snapshot_id: None,
};
write(store, &intent).await
}
pub(crate) async fn write_committed(
store: &Arc<dyn ObjectStore>,
tables: &std::collections::HashMap<String, Option<u64>>,
snapshot_id: &str,
) -> Result<()> {
let intent = BulkFlushIntent {
phase: BulkIntentPhase::Committed,
tables: tables.iter().map(|(k, v)| (k.clone(), *v)).collect(),
snapshot_id: Some(snapshot_id.to_string()),
};
write(store, &intent).await
}
async fn write(store: &Arc<dyn ObjectStore>, intent: &BulkFlushIntent) -> Result<()> {
let json = serde_json::to_vec(intent)?;
put_with_timeout(store, &intent_path(), Bytes::from(json), INTENT_TIMEOUT).await?;
Ok(())
}
pub(crate) async fn clear(store: &Arc<dyn ObjectStore>) -> Result<()> {
match delete_with_timeout(store, &intent_path(), INTENT_TIMEOUT).await {
Ok(()) => Ok(()),
Err(e) if is_not_found(&e) => Ok(()),
Err(e) => Err(e),
}
}
async fn read(store: &Arc<dyn ObjectStore>) -> Result<Option<BulkFlushIntent>> {
match get_with_timeout(store, &intent_path(), INTENT_TIMEOUT).await {
Ok(result) => {
let bytes = result.bytes().await?;
let intent: BulkFlushIntent = serde_json::from_slice(&bytes)?;
Ok(Some(intent))
}
Err(e) if is_not_found(&e) => Ok(None),
Err(e) => Err(e),
}
}
fn is_not_found(e: &anyhow::Error) -> bool {
e.to_string().to_lowercase().contains("not found")
}
pub async fn recover_interrupted_bulk_load(storage: &StorageManager) -> Result<()> {
let store = storage.store();
let Some(intent) = read(&store).await? else {
return Ok(());
};
match intent.phase {
BulkIntentPhase::Committed => {
if let Some(snapshot_id) = &intent.snapshot_id {
storage
.snapshot_manager()
.set_latest_snapshot(snapshot_id)
.await?;
}
tracing::warn!(
tables = intent.tables.len(),
"Finalized a committed-but-unfinished bulk load on reopen"
);
}
BulkIntentPhase::Active => {
let backend = storage.backend();
let mut failures = 0usize;
for (table, pre_version) in &intent.tables {
let result = match pre_version {
Some(version) => backend.rollback_table(table, *version).await,
None => backend.drop_table(table).await,
};
if let Err(e) = result {
failures += 1;
tracing::warn!(table = %table, error = %e, "bulk recovery: table rollback failed");
}
}
backend.clear_cache();
tracing::warn!(
tables = intent.tables.len(),
failures,
"Rolled back an interrupted bulk load on reopen"
);
}
}
clear(&store).await?;
Ok(())
}