use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use lvqr_cluster::Cluster;
use lvqr_fragment::{FragmentBroadcasterRegistry, FragmentStream};
use tokio::runtime::Handle;
use tracing::{info, warn};
pub const DEFAULT_CLAIM_LEASE: Duration = Duration::from_secs(10);
pub fn install_cluster_claim_bridge(cluster: Arc<Cluster>, lease: Duration, registry: &FragmentBroadcasterRegistry) {
let claimed: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
registry.on_entry_created(move |broadcast, _track, bc| {
{
let mut set = claimed.lock().expect("cluster-claim dedup set poisoned");
if !set.insert(broadcast.to_string()) {
return;
}
}
let sub = bc.subscribe();
let broadcast_name = broadcast.to_string();
let cluster = cluster.clone();
let claimed = claimed.clone();
let Ok(handle) = Handle::try_current() else {
warn!(broadcast = %broadcast_name, "no tokio handle; cluster claim bridge inactive for this broadcast");
claimed
.lock()
.expect("cluster-claim dedup set poisoned")
.remove(&broadcast_name);
return;
};
handle.spawn(async move {
let claim = match cluster.claim_broadcast(&broadcast_name, lease).await {
Ok(c) => c,
Err(err) => {
warn!(
error = %err,
broadcast = %broadcast_name,
"cluster auto-claim failed; peer redirect disabled for this session",
);
claimed
.lock()
.expect("cluster-claim dedup set poisoned")
.remove(&broadcast_name);
return;
}
};
info!(
broadcast = %broadcast_name,
owner = %claim.owner,
"cluster auto-claim installed",
);
let mut sub = sub;
while sub.next_fragment().await.is_some() {}
drop(claim);
claimed
.lock()
.expect("cluster-claim dedup set poisoned")
.remove(&broadcast_name);
info!(
broadcast = %broadcast_name,
"cluster auto-claim released (broadcaster closed)",
);
});
});
}