use std::sync::Arc;
use std::time::Duration;
use nodedb_cluster::{METADATA_GROUP_ID, MetadataEntry, WaitOutcome, encode_entry};
#[cfg(test)]
use nodedb_cluster::AppliedIndexWatcher;
use crate::control::catalog_entry::{self, CatalogEntry};
use crate::control::state::SharedState;
use crate::error::Error;
pub const DEFAULT_PROPOSE_TIMEOUT: Duration = Duration::from_secs(5);
pub const DEFAULT_DRAIN_TIMEOUT: Duration = Duration::from_secs(35);
pub trait MetadataRaftHandle: Send + Sync {
fn propose(&self, bytes: Vec<u8>) -> Result<u64, Error>;
}
pub struct RaftLoopProposerHandle {
raft_loop: Arc<
nodedb_cluster::RaftLoop<
crate::control::cluster::SpscCommitApplier,
crate::control::LocalPlanExecutor,
>,
>,
}
impl RaftLoopProposerHandle {
pub fn new(
raft_loop: Arc<
nodedb_cluster::RaftLoop<
crate::control::cluster::SpscCommitApplier,
crate::control::LocalPlanExecutor,
>,
>,
) -> Self {
Self { raft_loop }
}
}
impl MetadataRaftHandle for RaftLoopProposerHandle {
fn propose(&self, bytes: Vec<u8>) -> Result<u64, Error> {
let raft_loop = self.raft_loop.clone();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(raft_loop.propose_to_metadata_group_via_leader(bytes))
})
.map_err(|e| Error::Config {
detail: format!("metadata propose: {e}"),
})
}
}
pub fn propose_catalog_entry(shared: &SharedState, entry: &CatalogEntry) -> Result<u64, Error> {
propose_catalog_entry_with_timeout(shared, entry, DEFAULT_PROPOSE_TIMEOUT)
}
pub fn propose_catalog_entry_with_timeout(
shared: &SharedState,
entry: &CatalogEntry,
timeout: Duration,
) -> Result<u64, Error> {
let Some(handle) = shared.metadata_raft.get() else {
return Ok(0);
};
{
let vs = shared.cluster_version_view();
if !vs.can_activate_feature(crate::control::rolling_upgrade::DISTRIBUTED_CATALOG_VERSION) {
tracing::warn!(
min_version = vs.min_version,
required = crate::control::rolling_upgrade::DISTRIBUTED_CATALOG_VERSION,
"metadata propose: cluster in compat mode (mixed-version), \
falling back to legacy direct-write path"
);
return Ok(0);
}
}
if let Some((descriptor_id, prior_version)) =
crate::control::lease::descriptor_id_and_prior_version(entry, shared)
&& prior_version > 0
{
crate::control::lease::drain_for_ddl(
shared,
descriptor_id,
prior_version,
DEFAULT_DRAIN_TIMEOUT,
)?;
}
let payload = catalog_entry::encode(entry)?;
if crate::control::server::pgwire::session::ddl_buffer::try_buffer(payload.clone()) {
return Ok(0);
}
let metadata_entry = match crate::control::server::pgwire::session::audit_context::current() {
Some(ctx) => MetadataEntry::CatalogDdlAudited {
payload,
auth_user_id: ctx.auth_user_id,
auth_user_name: ctx.auth_user_name,
sql_text: ctx.sql_text,
},
None => MetadataEntry::CatalogDdl { payload },
};
let raw = encode_entry(&metadata_entry).map_err(|e| Error::Config {
detail: format!("metadata entry encode: {e}"),
})?;
let log_index = handle.propose(raw)?;
let watcher = shared.applied_index_watcher(METADATA_GROUP_ID);
let outcome = tokio::task::block_in_place(|| watcher.wait_for(log_index, timeout));
match outcome {
WaitOutcome::Reached => Ok(log_index),
WaitOutcome::TimedOut => Err(Error::Config {
detail: format!(
"metadata propose timed out after {:?} waiting for log index {} (current: {})",
timeout,
log_index,
watcher.current()
),
}),
WaitOutcome::GroupGone => Err(Error::Config {
detail: "metadata group no longer hosted on this node".into(),
}),
}
}
pub fn propose_surrogate_hwm(shared: &SharedState, hwm: u32) -> Result<u64, Error> {
let Some(handle) = shared.metadata_raft.get() else {
return Ok(0);
};
let entry = MetadataEntry::SurrogateAlloc { hwm };
let raw = encode_entry(&entry).map_err(|e| Error::Config {
detail: format!("surrogate_alloc encode: {e}"),
})?;
let log_index = handle.propose(raw)?;
let watcher = shared.applied_index_watcher(METADATA_GROUP_ID);
let outcome =
tokio::task::block_in_place(|| watcher.wait_for(log_index, DEFAULT_PROPOSE_TIMEOUT));
if !outcome.is_reached() {
return Err(Error::Config {
detail: format!("surrogate_alloc propose timed out waiting for log index {log_index}"),
});
}
Ok(log_index)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn watcher_helper_returns_reached_on_past_target() {
let w = AppliedIndexWatcher::new();
w.bump(10);
assert!(w.wait_for(5, Duration::from_millis(1)).is_reached());
}
}