use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering};
use nodedb_types::{CloneStatus, CollectionType, DatabaseId};
use crate::control::maintenance::wrapper::{MaintenanceOutcome, with_budget};
use crate::control::security::catalog::{StoredCollection, SystemCatalog};
use crate::control::state::SharedState;
use super::columnar::materialize_columnar_collection;
use super::document::materialize_document_collection;
use super::kv::materialize_kv_collection;
use super::progress::CloneMaterializerHandle;
#[derive(Debug)]
pub enum MaterializeOutcome {
AllComplete,
Incomplete { collections_remaining: usize },
BudgetDeferred,
Cancelled,
}
pub struct MaterializeParams<'a> {
pub db_id: DatabaseId,
pub state: &'a SharedState,
pub catalog: &'a SystemCatalog,
pub cancel: &'a AtomicBool,
pub handle: Option<&'a CloneMaterializerHandle>,
pub estimated_secs: f64,
}
pub fn materialize_database(params: MaterializeParams<'_>) -> crate::Result<MaterializeOutcome> {
if params.cancel.load(Ordering::Relaxed) {
return Ok(MaterializeOutcome::Cancelled);
}
let outcome = with_budget(
¶ms.state.maintenance_budget,
params.db_id,
params.estimated_secs,
|| do_materialize_database(¶ms),
);
match outcome {
MaintenanceOutcome::Deferred => Ok(MaterializeOutcome::BudgetDeferred),
MaintenanceOutcome::Ran(inner) => inner,
}
}
fn do_materialize_database(params: &MaterializeParams<'_>) -> crate::Result<MaterializeOutcome> {
if params.cancel.load(Ordering::Relaxed) {
return Ok(MaterializeOutcome::Cancelled);
}
let pending = pending_clone_collections(params.catalog, params.db_id)?;
if let Some(h) = params.handle {
h.notify_start(pending.len());
}
if pending.is_empty() {
return Ok(MaterializeOutcome::AllComplete);
}
let source_db_ids: HashSet<DatabaseId> = pending
.iter()
.filter_map(|c| c.cloned_from.as_ref().map(|o| o.source_database))
.collect();
let _freeze_guards: Vec<crate::control::clone::FreezeGuard> = source_db_ids
.iter()
.map(|db_id| params.state.materialize_freeze.freeze(*db_id))
.collect();
let runtime_handle = tokio::runtime::Handle::try_current().map_err(|_| {
crate::Error::Dispatch {
detail: "clone materializer requires a Tokio runtime context".into(),
}
})?;
let mut remaining = 0usize;
for coll in &pending {
if params.cancel.load(Ordering::Relaxed) {
return Ok(MaterializeOutcome::Cancelled);
}
match materialize_one(&runtime_handle, params, coll) {
Ok(()) => {
if let Some(h) = params.handle {
h.notify_collection_done();
}
}
Err(e) => {
tracing::warn!(
db_id = params.db_id.as_u64(),
collection = %coll.name,
error = %e,
"clone materialize: per-collection error",
);
remaining += 1;
if matches!(&e, crate::Error::BadRequest { .. }) {
return Err(e);
}
}
}
}
if remaining == 0 {
Ok(MaterializeOutcome::AllComplete)
} else {
Ok(MaterializeOutcome::Incomplete {
collections_remaining: remaining,
})
}
}
fn pending_clone_collections(
catalog: &SystemCatalog,
db_id: DatabaseId,
) -> crate::Result<Vec<StoredCollection>> {
let all = catalog.load_all_collections(db_id)?;
Ok(all
.into_iter()
.filter(|c| {
c.cloned_from.is_some()
&& matches!(
c.clone_status,
CloneStatus::Shadowed | CloneStatus::Materializing { .. }
)
})
.collect())
}
fn materialize_one(
runtime: &tokio::runtime::Handle,
params: &MaterializeParams<'_>,
coll: &StoredCollection,
) -> crate::Result<()> {
match &coll.collection_type {
CollectionType::KeyValue(_) => tokio::task::block_in_place(|| {
runtime.block_on(materialize_kv_collection(
params.state,
params.catalog,
params.db_id,
coll,
))
}),
CollectionType::Document(_) => tokio::task::block_in_place(|| {
runtime.block_on(materialize_document_collection(
params.state,
params.catalog,
params.db_id,
coll,
))
}),
CollectionType::Columnar(_) => tokio::task::block_in_place(|| {
runtime.block_on(materialize_columnar_collection(
params.state,
params.catalog,
params.db_id,
coll,
))
}),
}
}
pub fn force_materialize_blocking(
db_id: DatabaseId,
state: &SharedState,
catalog: &SystemCatalog,
handle: Option<&CloneMaterializerHandle>,
) -> crate::Result<()> {
let cancel = AtomicBool::new(false);
let params = MaterializeParams {
db_id,
state,
catalog,
cancel: &cancel,
handle,
estimated_secs: 0.0,
};
match do_materialize_database(¶ms)? {
MaterializeOutcome::AllComplete => Ok(()),
MaterializeOutcome::Incomplete {
collections_remaining,
} => Err(crate::Error::Storage {
engine: "clone_materializer".into(),
detail: format!(
"{collections_remaining} collection(s) in database {} did not \
finish materializing; check logs for per-collection errors",
db_id.as_u64()
),
}),
MaterializeOutcome::BudgetDeferred => Ok(()),
MaterializeOutcome::Cancelled => Ok(()),
}
}
pub fn run_scheduled_sweep(
state: &SharedState,
catalog: &SystemCatalog,
cancel: &AtomicBool,
) -> crate::Result<()> {
let database_ids: Vec<DatabaseId> = catalog
.list_databases()?
.into_iter()
.map(|d| d.id)
.collect();
for db_id in database_ids {
if cancel.load(Ordering::Relaxed) {
break;
}
let params = MaterializeParams {
db_id,
state,
catalog,
cancel,
handle: None,
estimated_secs: 5.0,
};
match materialize_database(params) {
Ok(MaterializeOutcome::AllComplete) => {}
Ok(MaterializeOutcome::Incomplete {
collections_remaining,
}) => {
tracing::info!(
db_id = db_id.as_u64(),
collections_remaining,
"clone sweep partial: per-collection errors logged separately",
);
}
Ok(MaterializeOutcome::BudgetDeferred) => {}
Ok(MaterializeOutcome::Cancelled) => break,
Err(crate::Error::BadRequest { detail }) => {
tracing::info!(db_id = db_id.as_u64(), %detail, "clone sweep skipped");
}
Err(e) => return Err(e),
}
}
Ok(())
}