use super::*;
use async_trait::async_trait;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use crate::capsule::{Capsule, CapsuleId, CapsuleState, InterceptResult};
use crate::context::CapsuleContext;
use crate::error::CapsuleResult;
use crate::manifest::{CapabilitiesDef, CapsuleManifest, PackageDef, SubscribeDef};
use astrid_events::ipc::IpcPayload;
struct MockCapsule {
id: CapsuleId,
manifest: CapsuleManifest,
invoked: Arc<AtomicBool>,
invocation_log: Option<Arc<Mutex<Vec<String>>>>,
result_override: Option<InterceptResult>,
principal_log: Option<Arc<Mutex<Vec<String>>>>,
invoke_counter: Option<Arc<AtomicUsize>>,
}
impl MockCapsule {
fn new(name: &str, interceptor_event: &str) -> (Self, Arc<AtomicBool>) {
Self::with_priority(name, interceptor_event, 100, None)
}
fn with_priority(
name: &str,
interceptor_event: &str,
priority: u32,
invocation_log: Option<Arc<Mutex<Vec<String>>>>,
) -> (Self, Arc<AtomicBool>) {
let invoked = Arc::new(AtomicBool::new(false));
let manifest = CapsuleManifest {
package: PackageDef {
name: name.to_string(),
version: "0.0.1".to_string(),
description: None,
authors: Vec::new(),
repository: None,
homepage: None,
documentation: None,
license: None,
license_file: None,
readme: None,
keywords: Vec::new(),
categories: Vec::new(),
astrid_version: None,
publish: None,
include: None,
exclude: None,
metadata: None,
},
components: Vec::new(),
imports: std::collections::HashMap::new(),
exports: std::collections::HashMap::new(),
capabilities: CapabilitiesDef::default(),
env: std::collections::HashMap::new(),
context_files: Vec::new(),
commands: Vec::new(),
mcp_servers: Vec::new(),
skills: Vec::new(),
uplinks: Vec::new(),
publishes: ::std::collections::HashMap::new(),
subscribes: ::std::collections::HashMap::from([(
interceptor_event.to_string(),
SubscribeDef {
wit: "opaque".to_string(),
version: None,
tag: None,
rev: None,
branch: None,
path: None,
handler: Some("test_action".to_string()),
priority: Some(priority),
},
)]),
tools: ::std::vec::Vec::new(),
};
let capsule = Self {
id: CapsuleId::from_static(name),
manifest,
invoked: Arc::clone(&invoked),
invocation_log,
result_override: None,
principal_log: None,
invoke_counter: None,
};
(capsule, invoked)
}
}
#[async_trait]
impl Capsule for MockCapsule {
fn id(&self) -> &CapsuleId {
&self.id
}
fn manifest(&self) -> &CapsuleManifest {
&self.manifest
}
fn state(&self) -> CapsuleState {
CapsuleState::Ready
}
async fn load(&mut self, _ctx: &CapsuleContext) -> CapsuleResult<()> {
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
Ok(())
}
async fn invoke_interceptor(
&self,
_action: &str,
_payload: &[u8],
caller: Option<&astrid_events::ipc::IpcMessage>,
) -> CapsuleResult<InterceptResult> {
self.invoked.store(true, Ordering::SeqCst);
if let Some(ref log) = self.invocation_log {
log.lock().unwrap().push(self.id.to_string());
}
if let Some(ref log) = self.principal_log {
let p = caller
.and_then(|m| m.principal.clone())
.unwrap_or_else(|| "<none>".to_string());
log.lock().unwrap().push(p);
}
if let Some(ref c) = self.invoke_counter {
c.fetch_add(1, Ordering::SeqCst);
}
if let Some(ref result) = self.result_override {
return Ok(result.clone());
}
Ok(InterceptResult::Continue(Vec::new()))
}
}
fn publish_ipc(bus: &EventBus, topic: &str) {
let msg = astrid_events::ipc::IpcMessage::new(
topic,
IpcPayload::Custom {
data: serde_json::json!({}),
},
uuid::Uuid::nil(),
);
bus.publish(AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("test"),
message: msg,
});
}
fn publish_ipc_as(bus: &EventBus, topic: &str, principal: &str) {
let msg = astrid_events::ipc::IpcMessage::new(
topic,
IpcPayload::Custom {
data: serde_json::json!({}),
},
uuid::Uuid::nil(),
)
.with_principal(principal);
bus.publish(AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("test"),
message: msg,
});
}
#[tokio::test]
async fn dispatch_routes_to_matching_interceptor() {
let (capsule, invoked) = MockCapsule::new("test-capsule", "test.topic");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "test.topic");
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
invoked.load(Ordering::SeqCst),
"interceptor should have been invoked for matching topic"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_skips_non_matching_topic() {
let (capsule, invoked) = MockCapsule::new("test-capsule-skip", "specific.topic");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "other.topic");
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
!invoked.load(Ordering::SeqCst),
"interceptor should NOT have been invoked for non-matching topic"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_concurrent_does_not_block() {
let (cap_a, invoked_a) = MockCapsule::new("capsule-a", "topic.a");
let (cap_b, invoked_b) = MockCapsule::new("capsule-b", "topic.b");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(cap_a)).unwrap();
registry.register(Box::new(cap_b)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "topic.a");
publish_ipc(&bus, "topic.b");
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
invoked_a.load(Ordering::SeqCst),
"capsule-a interceptor should have been invoked"
);
assert!(
invoked_b.load(Ordering::SeqCst),
"capsule-b interceptor should have been invoked"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_routes_lifecycle_events() {
let (capsule, invoked) =
MockCapsule::new("lifecycle-capsule", "astrid.v1.lifecycle.tool_call_started");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
bus.publish(AstridEvent::ToolCallStarted {
metadata: astrid_events::EventMetadata::new("test"),
call_id: uuid::Uuid::nil(),
tool_name: "search".into(),
server_name: None,
});
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
invoked.load(Ordering::SeqCst),
"EventDispatcher should dispatch lifecycle events by event_type()"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_publishes_lag_event_on_overflow() {
let bus = Arc::new(EventBus::with_capacity(2));
let (lag_capsule, _lag_invoked) =
MockCapsule::new("lag-listener", "astrid.v1.event_bus.lagged");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(lag_capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
for i in 0..20 {
publish_ipc(&bus, &format!("flood.event.{i}"));
}
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(!handle.is_finished(), "dispatcher should still be running");
handle.abort();
}
#[test]
fn mock_capsule_check_health_returns_ready() {
let (capsule, _) = MockCapsule::new("health-test", "test.topic");
assert_eq!(capsule.check_health(), CapsuleState::Ready);
}
#[tokio::test]
async fn dispatch_respects_interceptor_priority_order() {
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let (guard, _) =
MockCapsule::with_priority("guard", "shared.topic", 10, Some(Arc::clone(&order)));
let (handler, _) =
MockCapsule::with_priority("handler", "shared.topic", 100, Some(Arc::clone(&order)));
let (transform, _) =
MockCapsule::with_priority("transform", "shared.topic", 50, Some(Arc::clone(&order)));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(handler)).unwrap();
registry.register(Box::new(guard)).unwrap();
registry.register(Box::new(transform)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "shared.topic");
tokio::time::sleep(Duration::from_millis(300)).await;
let recorded = order.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["guard", "transform", "handler"],
"interceptors must fire in priority order (lower first)"
);
handle.abort();
}
#[tokio::test]
async fn find_matching_interceptors_sorts_by_priority() {
let (low, _) = MockCapsule::with_priority("low-pri", "test.event", 10, None);
let (high, _) = MockCapsule::with_priority("high-pri", "test.event", 200, None);
let (mid, _) = MockCapsule::with_priority("mid-pri", "test.event", 50, None);
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(high)).unwrap();
registry.register(Box::new(low)).unwrap();
registry.register(Box::new(mid)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let matches = find_matching_interceptors(®istry, "test.event").await;
let names: Vec<&str> = matches.iter().map(|(c, _)| c.id().as_str()).collect();
assert_eq!(
names,
vec!["low-pri", "mid-pri", "high-pri"],
"find_matching_interceptors must return results sorted by priority"
);
}
#[tokio::test]
async fn deny_interceptor_short_circuits_chain() {
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let (mut guard, _) =
MockCapsule::with_priority("guard", "shared.topic", 10, Some(Arc::clone(&order)));
guard.result_override = Some(InterceptResult::Deny {
reason: "blocked by guard".into(),
});
let (handler, invoked_handler) =
MockCapsule::with_priority("handler", "shared.topic", 100, Some(Arc::clone(&order)));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(handler)).unwrap();
registry.register(Box::new(guard)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "shared.topic");
tokio::time::sleep(Duration::from_millis(300)).await;
let recorded = order.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["guard"],
"only the guard should have fired — handler should be short-circuited"
);
assert!(
!invoked_handler.load(Ordering::SeqCst),
"handler must NOT be invoked after Deny"
);
handle.abort();
}
#[tokio::test]
async fn final_interceptor_short_circuits_chain() {
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let (mut cache, _) =
MockCapsule::with_priority("cache", "shared.topic", 30, Some(Arc::clone(&order)));
cache.result_override = Some(InterceptResult::Final(b"cached response".to_vec()));
let (core, invoked_core) =
MockCapsule::with_priority("core", "shared.topic", 100, Some(Arc::clone(&order)));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(core)).unwrap();
registry.register(Box::new(cache)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "shared.topic");
tokio::time::sleep(Duration::from_millis(300)).await;
let recorded = order.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["cache"],
"only the cache should have fired — core should be short-circuited"
);
assert!(
!invoked_core.load(Ordering::SeqCst),
"core must NOT be invoked after Final"
);
handle.abort();
}
#[test]
fn intercept_result_from_guest_bytes() {
let r = InterceptResult::from_guest_bytes(vec![]);
assert!(matches!(r, InterceptResult::Continue(ref b) if b.is_empty()));
let r = InterceptResult::from_guest_bytes(vec![0x00, 1, 2, 3]);
assert!(matches!(r, InterceptResult::Continue(ref b) if b == &[1, 2, 3]));
let r = InterceptResult::from_guest_bytes(vec![0x01, 4, 5]);
assert!(matches!(r, InterceptResult::Final(ref b) if b == &[4, 5]));
let r = InterceptResult::from_guest_bytes(vec![0x02, b'n', b'o']);
assert!(matches!(r, InterceptResult::Deny { ref reason } if reason == "no"));
let r = InterceptResult::from_guest_bytes(vec![0xFF, 1]);
assert!(matches!(r, InterceptResult::Continue(ref b) if b == &[0xFF, 1]));
}
#[tokio::test]
async fn single_match_does_not_block_across_principal_keys() {
let principal_log = Arc::new(Mutex::new(Vec::<String>::new()));
let (mut capsule, _invoked) = MockCapsule::new("class-cap", "split.topic");
capsule.principal_log = Some(Arc::clone(&principal_log));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc_as(&bus, "split.topic", "alice");
publish_ipc_as(&bus, "split.topic", "bob");
tokio::time::sleep(Duration::from_millis(300)).await;
let recorded = principal_log.lock().unwrap().clone();
assert!(
recorded.contains(&"alice".to_string()),
"alice's event should have been invoked: {recorded:?}"
);
assert!(
recorded.contains(&"bob".to_string()),
"bob's event should have been invoked: {recorded:?}"
);
handle.abort();
}
#[tokio::test]
async fn chain_serializes_per_principal_key_on_same_capsule() {
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let (cap_a, _) =
MockCapsule::with_priority("ser-a", "chain.topic", 50, Some(Arc::clone(&order)));
let (cap_b, _) =
MockCapsule::with_priority("ser-b", "chain.topic", 100, Some(Arc::clone(&order)));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(cap_a)).unwrap();
registry.register(Box::new(cap_b)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc_as(&bus, "chain.topic", "alice");
publish_ipc_as(&bus, "chain.topic", "alice");
tokio::time::sleep(Duration::from_millis(500)).await;
let recorded = order.lock().unwrap().clone();
assert_eq!(recorded.len(), 4, "two full chains should have completed");
assert_eq!(recorded[0], "ser-a");
assert_eq!(recorded[1], "ser-b");
assert_eq!(recorded[2], "ser-a");
assert_eq!(recorded[3], "ser-b");
handle.abort();
}
#[tokio::test]
async fn dispatch_isolates_per_principal_under_n1000_fanin() {
let counter = Arc::new(AtomicUsize::new(0));
let principals = Arc::new(Mutex::new(Vec::<String>::new()));
let (mut capsule, _) = MockCapsule::new("fanin-cap", "fanin.topic");
capsule.invoke_counter = Some(Arc::clone(&counter));
capsule.principal_log = Some(Arc::clone(&principals));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(4096));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
const N: usize = 1000;
for i in 0..N {
publish_ipc_as(&bus, "fanin.topic", &format!("user-{i}"));
}
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while counter.load(Ordering::SeqCst) < N && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(20)).await;
}
let observed = counter.load(Ordering::SeqCst);
assert_eq!(
observed, N,
"all {N} per-principal events should have invoked the interceptor (got {observed})"
);
let recorded_principals = principals.lock().unwrap().clone();
let unique = recorded_principals
.iter()
.collect::<std::collections::HashSet<_>>()
.len();
assert_eq!(
unique, N,
"every recorded invocation should carry a distinct principal (got {unique})"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_does_not_drop_under_burst_to_single_principal() {
let counter = Arc::new(AtomicUsize::new(0));
let (mut capsule, _) = MockCapsule::new("burst-cap", "burst.topic");
capsule.invoke_counter = Some(Arc::clone(&counter));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(256));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
const N: usize = 50;
for _ in 0..N {
publish_ipc_as(&bus, "burst.topic", "burst-user");
}
let deadline = std::time::Instant::now() + Duration::from_secs(3);
while counter.load(Ordering::SeqCst) < N && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let observed = counter.load(Ordering::SeqCst);
assert_eq!(
observed, N,
"single-principal burst of {N} should all be delivered (got {observed})"
);
handle.abort();
}
#[tokio::test]
async fn dispatcher_idle_evicts_per_principal_consumers_after_grace() {
super::set_idle_consumer_grace_for_test(100);
struct ResetGrace;
impl Drop for ResetGrace {
fn drop(&mut self) {
super::set_idle_consumer_grace_for_test(super::DEFAULT_IDLE_CONSUMER_GRACE_MS);
}
}
let _reset = ResetGrace;
let counter = Arc::new(AtomicUsize::new(0));
let (mut capsule, _) = MockCapsule::new("evict-cap", "evict.topic");
capsule.invoke_counter = Some(Arc::clone(&counter));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc_as(&bus, "evict.topic", "alice");
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while counter.load(Ordering::SeqCst) < 1 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"first alice event should land"
);
tokio::time::sleep(Duration::from_millis(400)).await;
publish_ipc_as(&bus, "evict.topic", "alice");
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while counter.load(Ordering::SeqCst) < 2 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
counter.load(Ordering::SeqCst),
2,
"second alice event must re-spawn the consumer and land"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_respawns_when_mapped_consumer_is_closed() {
let counter = Arc::new(AtomicUsize::new(0));
let (mut capsule, _) = MockCapsule::new("respawn-cap", "respawn.topic");
capsule.invoke_counter = Some(Arc::clone(&counter));
let capsule: Arc<dyn Capsule> = Arc::new(capsule);
let queues: CapsuleQueues = Arc::new(parking_lot::Mutex::new(HashMap::new()));
let key = (capsule.id().clone(), Some("alice".to_string()));
let (dead_tx, dead_rx) = mpsc::channel::<InterceptorWork>(CAPSULE_EVENT_QUEUE_CAPACITY);
drop(dead_rx);
assert!(
dead_tx.is_closed(),
"precondition: the seeded sender is closed"
);
queues.lock().insert(key.clone(), dead_tx);
dispatch_single(
&queues,
Arc::clone(&capsule),
"test_action".to_string(),
Arc::new("respawn.topic".to_string()),
Arc::new(Vec::new()),
None,
Some("alice".to_string()),
);
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while counter.load(Ordering::SeqCst) < 1 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"a dispatch through a closed mapped sender must re-spawn and deliver, not drop"
);
}
#[tokio::test]
async fn chain_lock_prunes_entry_when_last_referrer_drops() {
let chain_locks: ChainLocks = Arc::new(parking_lot::RwLock::new(HashMap::new()));
let cap = CapsuleId::from_static("chainmap-cap");
for i in 0..256 {
let key = (cap.clone(), Some(format!("user-{i}")));
let guard = acquire_chain_lock(&chain_locks, key).await;
assert_eq!(chain_locks.read().len(), 1, "entry present while held");
drop(guard);
assert!(
chain_locks.read().is_empty(),
"map must shed the entry once the last referrer drops"
);
}
assert!(
chain_locks.read().is_empty(),
"chain_locks must not retain one entry per principal"
);
}
#[tokio::test]
async fn chain_lock_retained_while_another_holder_exists() {
let chain_locks: ChainLocks = Arc::new(parking_lot::RwLock::new(HashMap::new()));
let cap = CapsuleId::from_static("shared-cap");
let key = (cap.clone(), Some("alice".to_string()));
let g1 = acquire_chain_lock(&chain_locks, key.clone()).await;
assert_eq!(chain_locks.read().len(), 1);
let cl = Arc::clone(&chain_locks);
let k2 = key.clone();
let task = tokio::spawn(async move {
let g2 = acquire_chain_lock(&cl, k2).await;
tokio::task::yield_now().await;
drop(g2);
});
assert_eq!(
chain_locks.read().len(),
1,
"entry must persist while g1 holds it"
);
drop(g1);
task.await.unwrap();
assert!(
chain_locks.read().is_empty(),
"entry pruned once both holders drop"
);
}