use nodedb_types::DatabaseId;
use std::time::{Duration, Instant};
use nodedb_cluster::{DescriptorId, DescriptorKind, MetadataEntry, encode_entry};
use nodedb_types::Hlc;
use crate::control::catalog_entry::CatalogEntry;
use crate::control::rolling_upgrade::DESCRIPTOR_DRAIN_VERSION;
use crate::control::state::SharedState;
use crate::error::Error;
const POLL_INTERVAL: Duration = Duration::from_millis(50);
const DRAIN_TTL_GRACE: Duration = Duration::from_secs(30);
pub fn drain_for_ddl(
shared: &SharedState,
id: DescriptorId,
up_to_version: u64,
max_wait: Duration,
) -> Result<(), Error> {
{
let vs = shared.cluster_version_view();
if !vs.can_activate_feature(DESCRIPTOR_DRAIN_VERSION) {
tracing::warn!(
min_version = vs.min_version,
required = DESCRIPTOR_DRAIN_VERSION,
"descriptor lease drain: cluster in compat mode, skipping drain"
);
return Ok(());
}
}
if up_to_version == 0 {
return Ok(());
}
let now_hlc = shared.hlc_clock.now();
let ttl_ns: u64 = (max_wait + DRAIN_TTL_GRACE)
.as_nanos()
.try_into()
.unwrap_or(u64::MAX);
let expires_at = Hlc::new(now_hlc.wall_ns.saturating_add(ttl_ns), 0);
propose_drain(
shared,
MetadataEntry::DescriptorDrainStart {
descriptor_id: id.clone(),
up_to_version,
expires_at,
},
"drain_start",
)?;
match poll_leases_drained(shared, &id, up_to_version, max_wait) {
Ok(()) => Ok(()),
Err(e) => {
if let Err(cleanup_err) = propose_drain(
shared,
MetadataEntry::DescriptorDrainEnd {
descriptor_id: id.clone(),
},
"drain_end",
) {
tracing::warn!(
error = %cleanup_err,
"descriptor lease drain: cleanup propose failed after timeout"
);
}
Err(e)
}
}
}
pub(crate) fn poll_leases_drained(
shared: &SharedState,
id: &DescriptorId,
up_to_version: u64,
max_wait: Duration,
) -> Result<(), Error> {
let deadline = Instant::now() + max_wait;
loop {
let remaining = count_matching_leases(shared, id, up_to_version);
if remaining == 0 {
return Ok(());
}
if Instant::now() >= deadline {
return Err(Error::Config {
detail: format!(
"descriptor lease drain timed out after {max_wait:?} \
waiting for {id:?} up to version {up_to_version} \
(still held: {remaining})"
),
});
}
std::thread::sleep(POLL_INTERVAL);
}
}
fn count_matching_leases(shared: &SharedState, id: &DescriptorId, up_to_version: u64) -> usize {
let cache = shared
.metadata_cache
.read()
.unwrap_or_else(|p| p.into_inner());
cache
.leases
.iter()
.filter(|((lid, _), l)| lid == id && l.version <= up_to_version)
.count()
}
fn propose_drain(
shared: &SharedState,
entry: MetadataEntry,
operation: &'static str,
) -> Result<(), Error> {
let Some(handle) = shared.metadata_raft.get() else {
apply_drain_locally(shared, &entry);
return Ok(());
};
let raw = encode_entry(&entry).map_err(|e| Error::Config {
detail: format!("descriptor drain {operation} encode: {e}"),
})?;
let log_index = handle.propose(raw)?;
let watcher = shared.applied_index_watcher(nodedb_cluster::METADATA_GROUP_ID);
const DRAIN_PROPOSE_TIMEOUT: Duration = Duration::from_secs(5);
let outcome =
tokio::task::block_in_place(|| watcher.wait_for(log_index, DRAIN_PROPOSE_TIMEOUT));
if !outcome.is_reached() {
return Err(Error::Config {
detail: format!(
"descriptor drain {operation} did not apply within {DRAIN_PROPOSE_TIMEOUT:?} \
(log index {log_index}, current: {}, outcome: {outcome:?})",
watcher.current()
),
});
}
Ok(())
}
fn apply_drain_locally(shared: &SharedState, entry: &MetadataEntry) {
match entry {
MetadataEntry::DescriptorDrainStart {
descriptor_id,
up_to_version,
expires_at,
} => {
shared
.lease_drain
.install_start(descriptor_id.clone(), *up_to_version, *expires_at);
}
MetadataEntry::DescriptorDrainEnd { descriptor_id } => {
shared.lease_drain.install_end(descriptor_id);
}
_ => {}
}
}
pub fn descriptor_id_for_implicit_clear(entry: &CatalogEntry) -> Option<DescriptorId> {
match entry {
CatalogEntry::PutCollection(stored) => Some(DescriptorId::new(
stored.tenant_id,
DescriptorKind::Collection,
stored.name.clone(),
)),
CatalogEntry::PutMaterializedView(stored) => Some(DescriptorId::new(
stored.tenant_id,
DescriptorKind::MaterializedView,
stored.name.clone(),
)),
CatalogEntry::PutFunction(stored) => Some(DescriptorId::new(
stored.tenant_id,
DescriptorKind::Function,
stored.name.clone(),
)),
CatalogEntry::PutProcedure(stored) => Some(DescriptorId::new(
stored.tenant_id,
DescriptorKind::Procedure,
stored.name.clone(),
)),
CatalogEntry::PutTrigger(stored) => Some(DescriptorId::new(
stored.tenant_id,
DescriptorKind::Trigger,
stored.name.clone(),
)),
CatalogEntry::PutSequence(stored) => Some(DescriptorId::new(
stored.tenant_id,
DescriptorKind::Sequence,
stored.name.clone(),
)),
_ => None,
}
}
pub fn descriptor_id_and_prior_version(
entry: &CatalogEntry,
shared: &SharedState,
) -> Option<(DescriptorId, u64)> {
let catalog = shared.credentials.catalog();
let catalog = catalog.as_ref()?;
match entry {
CatalogEntry::PutCollection(stored) => {
let prior = catalog
.get_collection(DatabaseId::DEFAULT, stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|c| c.descriptor_version)
.unwrap_or(0);
Some((
DescriptorId::new(
stored.tenant_id,
DescriptorKind::Collection,
stored.name.clone(),
),
prior,
))
}
CatalogEntry::PutMaterializedView(stored) => {
let prior = catalog
.get_materialized_view(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|v| v.descriptor_version)
.unwrap_or(0);
Some((
DescriptorId::new(
stored.tenant_id,
DescriptorKind::MaterializedView,
stored.name.clone(),
),
prior,
))
}
CatalogEntry::PutFunction(stored) => {
let prior = catalog
.get_function(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|f| f.descriptor_version)
.unwrap_or(0);
Some((
DescriptorId::new(
stored.tenant_id,
DescriptorKind::Function,
stored.name.clone(),
),
prior,
))
}
CatalogEntry::PutProcedure(stored) => {
let prior = catalog
.get_procedure(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|p| p.descriptor_version)
.unwrap_or(0);
Some((
DescriptorId::new(
stored.tenant_id,
DescriptorKind::Procedure,
stored.name.clone(),
),
prior,
))
}
CatalogEntry::PutTrigger(stored) => {
let prior = catalog
.get_trigger(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|t| t.descriptor_version)
.unwrap_or(0);
Some((
DescriptorId::new(
stored.tenant_id,
DescriptorKind::Trigger,
stored.name.clone(),
),
prior,
))
}
CatalogEntry::PutSequence(stored) => {
let prior = catalog
.get_sequence(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|s| s.descriptor_version)
.unwrap_or(0);
Some((
DescriptorId::new(
stored.tenant_id,
DescriptorKind::Sequence,
stored.name.clone(),
),
prior,
))
}
_ => None,
}
}