use tracing::debug;
use crate::Error;
use crate::control::lease::{DEFAULT_LEASE_DURATION, acquire_lease};
use crate::control::state::SharedState;
pub async fn plan_with_cache_miss_retry<F, P>(
shared: &SharedState,
tenant_id: u64,
plan_fn: F,
) -> Result<P, Error>
where
F: Fn() -> Result<P, Error>,
{
match plan_fn() {
Ok(plan) => Ok(plan),
Err(Error::RetryableSchemaChanged { descriptor }) => {
debug!(
descriptor = %descriptor,
tenant_id,
"gateway: descriptor cache miss — fetching fresh lease and retrying plan"
);
refresh_descriptor_lease(shared, tenant_id, &descriptor).await?;
plan_fn()
}
Err(other) => Err(other),
}
}
async fn refresh_descriptor_lease(
shared: &SharedState,
tenant_id: u64,
descriptor: &str,
) -> Result<(), Error> {
if shared.metadata_raft.get().is_none() {
return Ok(());
}
let descriptor_id = nodedb_cluster::DescriptorId {
kind: nodedb_cluster::DescriptorKind::Collection,
tenant_id,
name: descriptor.to_owned(),
};
tokio::task::block_in_place(|| {
acquire_lease(shared, descriptor_id, 0, DEFAULT_LEASE_DURATION)
})?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_physical::physical_plan::{KvOp, PhysicalPlan};
fn ok_plan() -> Result<PhysicalPlan, Error> {
Ok(PhysicalPlan::Kv(KvOp::Get {
collection: "users".into(),
key: vec![],
rls_filters: vec![],
surrogate_ceiling: None,
}))
}
#[test]
fn ok_path_calls_plan_fn_once() {
let call_count = std::cell::Cell::new(0usize);
let rt = tokio::runtime::Runtime::new().unwrap();
let mut attempts = 0usize;
let result: Result<PhysicalPlan, Error> = rt.block_on(async {
attempts += 1;
match ok_plan() {
Ok(p) => Ok(p),
Err(Error::RetryableSchemaChanged { .. }) => {
attempts += 1;
ok_plan()
}
Err(e) => Err(e),
}
});
let _ = call_count;
assert!(result.is_ok());
assert_eq!(attempts, 1);
}
#[test]
fn double_miss_propagates_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut calls = 0usize;
let result: Result<PhysicalPlan, Error> = rt.block_on(async {
let mut result = Err(Error::RetryableSchemaChanged {
descriptor: "orders".into(),
});
calls += 1;
if matches!(result, Err(Error::RetryableSchemaChanged { .. })) {
calls += 1;
result = Err(Error::RetryableSchemaChanged {
descriptor: "orders".into(),
});
}
result
});
assert!(matches!(result, Err(Error::RetryableSchemaChanged { .. })));
assert_eq!(calls, 2);
}
}