use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use super::refcount::CollectionQuiesce;
impl CollectionQuiesce {
pub fn begin_drain(&self, tenant_id: u64, collection: &str) {
let mut inner = self.inner_mut();
let entry = inner
.states
.entry((tenant_id, collection.to_string()))
.or_default();
entry.draining = true;
}
pub fn clear_drain(&self, tenant_id: u64, collection: &str) {
let mut inner = self.inner_mut();
if let Some(state) = inner.states.get_mut(&(tenant_id, collection.to_string())) {
state.draining = false;
}
}
pub fn forget(&self, tenant_id: u64, collection: &str) {
let mut inner = self.inner_mut();
inner.states.remove(&(tenant_id, collection.to_string()));
}
pub fn wait_until_drained(self: &Arc<Self>, tenant_id: u64, collection: &str) -> WaitDrain {
WaitDrain {
registry: Arc::clone(self),
tenant_id,
collection: collection.to_string(),
notified: None,
}
}
fn inner_mut(&self) -> std::sync::MutexGuard<'_, super::refcount::Inner> {
self.inner.lock().expect("CollectionQuiesce mutex poisoned")
}
}
pub struct WaitDrain {
registry: Arc<CollectionQuiesce>,
tenant_id: u64,
collection: String,
notified: Option<Pin<Box<tokio::sync::futures::Notified<'static>>>>,
}
unsafe impl Send for WaitDrain {}
impl Future for WaitDrain {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
if self.registry.open_scans(self.tenant_id, &self.collection) == 0 {
return Poll::Ready(());
}
if self.notified.is_none() {
let notify: &tokio::sync::Notify = &self.registry.notify;
let notified: tokio::sync::futures::Notified<'_> = notify.notified();
let notified: tokio::sync::futures::Notified<'static> =
unsafe { std::mem::transmute(notified) };
self.notified = Some(Box::pin(notified));
}
let fut = self.notified.as_mut().expect("just set");
match fut.as_mut().poll(cx) {
Poll::Ready(()) => {
self.notified = None;
continue;
}
Poll::Pending => {
if self.registry.open_scans(self.tenant_id, &self.collection) == 0 {
return Poll::Ready(());
}
return Poll::Pending;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn drain_resolves_immediately_when_no_open_scans() {
let q = CollectionQuiesce::new();
q.begin_drain(1, "c");
q.wait_until_drained(1, "c").await;
}
#[tokio::test]
async fn drain_waits_for_last_scan_to_release() {
let q = CollectionQuiesce::new();
let g1 = q.try_start_scan(1, "c").unwrap();
let g2 = q.try_start_scan(1, "c").unwrap();
q.begin_drain(1, "c");
let q_clone = Arc::clone(&q);
let drain_task = tokio::spawn(async move {
q_clone.wait_until_drained(1, "c").await;
});
tokio::task::yield_now().await;
assert!(
!drain_task.is_finished(),
"drain must not resolve while scans open"
);
drop(g1);
tokio::task::yield_now().await;
assert!(
!drain_task.is_finished(),
"drain must not resolve with 1 scan still open"
);
drop(g2);
drain_task.await.unwrap();
}
#[tokio::test]
async fn forget_clears_state() {
let q = CollectionQuiesce::new();
q.begin_drain(1, "c");
q.wait_until_drained(1, "c").await;
q.forget(1, "c");
assert!(!q.is_draining(1, "c"));
assert!(q.try_start_scan(1, "c").is_ok());
}
}