use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use nodedb_cluster::DescriptorId;
use tracing::warn;
use crate::control::state::SharedState;
#[derive(Debug, Default)]
pub struct LeaseRefCount {
counts: Mutex<HashMap<DescriptorId, u32>>,
}
impl LeaseRefCount {
pub fn new() -> Self {
Self::default()
}
pub fn increment(&self, id: &DescriptorId) -> u32 {
let mut map = self.counts.lock().unwrap_or_else(|p| p.into_inner());
let entry = map.entry(id.clone()).or_insert(0);
*entry += 1;
*entry
}
pub fn decrement(&self, id: &DescriptorId) -> u32 {
let mut map = self.counts.lock().unwrap_or_else(|p| p.into_inner());
if let Some(entry) = map.get_mut(id) {
*entry = entry.saturating_sub(1);
let c = *entry;
if c == 0 {
map.remove(id);
}
c
} else {
0
}
}
pub fn current(&self, id: &DescriptorId) -> u32 {
let map = self.counts.lock().unwrap_or_else(|p| p.into_inner());
map.get(id).copied().unwrap_or(0)
}
pub fn distinct_count(&self) -> usize {
let map = self.counts.lock().unwrap_or_else(|p| p.into_inner());
map.len()
}
}
pub struct QueryLeaseScope {
descriptor_ids: Vec<DescriptorId>,
shared: Weak<SharedState>,
}
impl QueryLeaseScope {
pub fn empty() -> Self {
Self {
descriptor_ids: Vec::new(),
shared: Weak::new(),
}
}
pub fn new(descriptor_ids: Vec<DescriptorId>, shared: &Arc<SharedState>) -> Self {
Self {
descriptor_ids,
shared: Arc::downgrade(shared),
}
}
pub fn len(&self) -> usize {
self.descriptor_ids.len()
}
pub fn is_empty(&self) -> bool {
self.descriptor_ids.is_empty()
}
}
impl Drop for QueryLeaseScope {
fn drop(&mut self) {
if self.descriptor_ids.is_empty() {
return;
}
let Some(shared) = self.shared.upgrade() else {
return;
};
let mut to_release = Vec::new();
for id in self.descriptor_ids.drain(..) {
let new_count = shared.lease_refcount.decrement(&id);
if new_count == 0 {
to_release.push(id);
}
}
if to_release.is_empty() {
return;
}
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let shared = Arc::clone(&shared);
handle.spawn(async move {
let shared_inner = Arc::clone(&shared);
let descriptor_ids = to_release.clone();
let result = tokio::task::spawn_blocking(move || {
shared_inner.release_descriptor_leases(descriptor_ids)
})
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => {
warn!(
error = %e,
count = to_release.len(),
"QueryLeaseScope drop: background release failed"
);
}
Err(e) => {
warn!(
error = %e,
count = to_release.len(),
"QueryLeaseScope drop: spawn_blocking panicked"
);
}
}
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_cluster::DescriptorKind;
fn id(name: &str) -> DescriptorId {
DescriptorId::new(1, DescriptorKind::Collection, name.to_string())
}
#[test]
fn first_increment_returns_one() {
let rc = LeaseRefCount::new();
let a = id("a");
assert_eq!(rc.increment(&a), 1);
}
#[test]
fn second_increment_returns_two() {
let rc = LeaseRefCount::new();
let a = id("a");
rc.increment(&a);
assert_eq!(rc.increment(&a), 2);
}
#[test]
fn decrement_to_zero_removes_entry() {
let rc = LeaseRefCount::new();
let a = id("a");
rc.increment(&a);
assert_eq!(rc.decrement(&a), 0);
assert_eq!(rc.current(&a), 0);
assert_eq!(rc.distinct_count(), 0);
}
#[test]
fn decrement_preserves_shared_lease() {
let rc = LeaseRefCount::new();
let a = id("a");
rc.increment(&a);
rc.increment(&a);
assert_eq!(rc.decrement(&a), 1);
assert_eq!(rc.current(&a), 1);
assert_eq!(rc.distinct_count(), 1);
}
#[test]
fn distinct_descriptors_track_independently() {
let rc = LeaseRefCount::new();
let a = id("a");
let b = id("b");
rc.increment(&a);
rc.increment(&b);
assert_eq!(rc.distinct_count(), 2);
rc.decrement(&a);
assert_eq!(rc.distinct_count(), 1);
assert_eq!(rc.current(&a), 0);
assert_eq!(rc.current(&b), 1);
}
#[test]
fn decrement_on_unknown_id_is_safe() {
let rc = LeaseRefCount::new();
assert_eq!(rc.decrement(&id("nothing")), 0);
}
#[test]
fn empty_scope_drops_cleanly() {
let scope = QueryLeaseScope::empty();
drop(scope); }
}