use std::time::Duration;
use nodedb_cluster::{DescriptorId, DescriptorLease, MetadataEntry};
use nodedb_types::Hlc;
use crate::control::state::SharedState;
use crate::error::Error;
pub const DEFAULT_LEASE_DURATION: Duration = Duration::from_secs(300);
pub fn compute_expires_at(now: Hlc, duration: Duration) -> Hlc {
let delta_ns: u64 = duration.as_nanos().try_into().unwrap_or(u64::MAX);
Hlc::new(now.wall_ns.saturating_add(delta_ns), 0)
}
pub fn acquire_lease(
shared: &SharedState,
descriptor_id: DescriptorId,
version: u64,
duration: Duration,
) -> Result<DescriptorLease, Error> {
let now = shared.hlc_clock.now();
let cache_key = (descriptor_id.clone(), shared.node_id);
{
let cache = shared
.metadata_cache
.read()
.unwrap_or_else(|p| p.into_inner());
if let Some(existing) = cache.leases.get(&cache_key)
&& existing.version >= version
&& existing.expires_at > now
{
return Ok(existing.clone());
}
}
force_refresh_lease(shared, descriptor_id, version, duration)
}
pub fn force_refresh_lease(
shared: &SharedState,
descriptor_id: DescriptorId,
version: u64,
duration: Duration,
) -> Result<DescriptorLease, Error> {
let now_wall_ns = super::wall_now_ns();
if shared
.lease_drain
.is_draining(&descriptor_id, version, now_wall_ns)
{
return Err(Error::Config {
detail: format!(
"descriptor lease drain in progress: \
{descriptor_id:?} at version {version}"
),
});
}
let now = shared.hlc_clock.now();
let cache_key = (descriptor_id.clone(), shared.node_id);
let expires_at = compute_expires_at(now, duration);
let lease = DescriptorLease {
descriptor_id,
version,
node_id: shared.node_id,
expires_at,
};
if shared.metadata_raft.get().is_none() {
install_into_local_cache(shared, &lease);
return Ok(lease);
}
let entry = MetadataEntry::DescriptorLeaseGrant(lease.clone());
super::propose_and_wait(shared, &entry, "grant")?;
{
let cache = shared
.metadata_cache
.read()
.unwrap_or_else(|p| p.into_inner());
if let Some(installed) = cache.leases.get(&cache_key) {
return Ok(installed.clone());
}
}
Ok(lease)
}
fn install_into_local_cache(shared: &SharedState, lease: &DescriptorLease) {
let mut cache = shared
.metadata_cache
.write()
.unwrap_or_else(|p| p.into_inner());
cache
.leases
.insert((lease.descriptor_id.clone(), lease.node_id), lease.clone());
if lease.expires_at > cache.last_applied_hlc {
cache.last_applied_hlc = lease.expires_at;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn compute_expires_at_advances_wall_clock() {
let now = Hlc::new(1_000_000_000, 5);
let expires = compute_expires_at(now, Duration::from_secs(300));
assert_eq!(expires.wall_ns, 1_000_000_000 + 300 * 1_000_000_000);
assert_eq!(expires.logical, 0);
assert!(expires > now);
}
#[test]
fn compute_expires_at_zero_duration_is_strictly_greater_than_zero_hlc() {
let now = Hlc::new(0, 0);
let expires = compute_expires_at(now, Duration::from_secs(0));
assert_eq!(expires, Hlc::new(0, 0));
}
#[test]
fn compute_expires_at_saturates_on_overflow() {
let now = Hlc::new(u64::MAX - 100, 0);
let expires = compute_expires_at(now, Duration::from_secs(u64::MAX));
assert_eq!(expires.wall_ns, u64::MAX);
}
}