use super::handler::Action;
use super::lease_state::{LeaseEvent, LeaseOptions, LeaseState, NewMessage};
use super::leaser::{ConfirmedAcks, Leaser};
use super::shutdown_behavior::ShutdownBehavior;
#[cfg(test)]
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{UnboundedReceiver, WeakUnboundedSender, unbounded_channel};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
pub(super) struct LeaseLoop {
pub(super) handle: JoinHandle<()>,
pub(super) message_tx: WeakUnboundedSender<NewMessage>,
pub(super) ack_tx: WeakUnboundedSender<Action>,
pub(super) cancel: CancellationToken,
}
impl LeaseLoop {
pub(super) fn new<L>(
leaser: L,
mut confirmed_rx: UnboundedReceiver<ConfirmedAcks>,
mut eo_extend_rx: UnboundedReceiver<ConfirmedAcks>,
options: LeaseOptions,
) -> Self
where
L: Leaser + Clone + Send + 'static,
{
let (message_tx, mut message_rx) = unbounded_channel::<NewMessage>();
let (ack_tx, mut ack_rx) = unbounded_channel();
let weak_message_tx = message_tx.downgrade();
let weak_ack_tx = ack_tx.downgrade();
let shutdown_guard = match options.shutdown_behavior {
ShutdownBehavior::WaitForProcessing => Some(message_tx.clone()),
ShutdownBehavior::NackImmediately => None,
};
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
cancel_clone.cancelled().await;
drop(message_tx);
drop(ack_tx);
});
let mut state = LeaseState::new(leaser, options);
let handle = tokio::spawn(async move {
loop {
tokio::select! {
biased;
event = state.next_event() => {
match event {
LeaseEvent::Flush => state.flush(),
LeaseEvent::Extend => state.extend(),
LeaseEvent::ExtendCompleted(ack_ids) => {
state.update_last_extension(ack_ids);
}
}
},
message = message_rx.recv() => {
match message {
None => break shutdown(state, ack_rx).await,
Some(m) => state.add(m.ack_id, m.lease_info),
}
},
action = ack_rx.recv() => {
match action {
None => break state.shutdown().await,
Some(a) => state.process(a),
}
},
confirmed_acks = confirmed_rx.recv() => {
match confirmed_acks {
None => break,
Some(results) => state.confirm(results),
}
},
extend_results = eo_extend_rx.recv() => {
match extend_results {
None => break,
Some(r) => {
let extended: Vec<String> = r
.into_iter()
.filter_map(|(id, res)| res.ok().map(|_| id))
.collect();
state.update_last_extension_eo(extended);
}
}
},
}
}
drop(shutdown_guard);
});
LeaseLoop {
handle,
message_tx: weak_message_tx,
ack_tx: weak_ack_tx,
cancel,
}
}
}
async fn shutdown<L>(mut state: LeaseState<L>, mut ack_rx: UnboundedReceiver<Action>)
where
L: Leaser + Clone + Send + 'static,
{
while let Ok(r) = ack_rx.try_recv() {
if let Action::Ack(ack_id) = r {
state.process(Action::Ack(ack_id));
}
}
state.shutdown().await;
}
#[cfg(test)]
mod tests {
use super::super::lease_state::tests::{
at_least_once_info, exactly_once_info, sorted, test_id, test_ids,
};
use super::super::leaser::tests::MockLeaser;
use super::*;
use crate::subscriber::lease_state::{ExactlyOnceInfo, LeaseInfo};
use google_cloud_test_macros::tokio_test_no_panics;
use std::collections::HashMap;
use std::sync::Arc;
use test_case::test_case;
use tokio::sync::Mutex;
use tokio::sync::oneshot::channel;
use tokio::time::{Duration, Instant};
fn test_message(id: i32) -> NewMessage {
NewMessage {
ack_id: test_id(id),
lease_info: at_least_once_info(),
}
}
impl LeaseLoop {
#[track_caller]
fn strong_ack_tx(&self) -> UnboundedSender<Action> {
self.ack_tx.upgrade().expect("shutdown has not begun")
}
#[track_caller]
fn strong_message_tx(&self) -> UnboundedSender<NewMessage> {
self.message_tx.upgrade().expect("shutdown has not begun")
}
}
#[tokio_test_no_panics(start_paused = true)]
async fn basic_exactly_once() -> anyhow::Result<()> {
const FLUSH_START: Duration = Duration::from_millis(200);
let mock = Arc::new(Mutex::new(MockLeaser::new()));
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
flush_start: FLUSH_START,
extend_start: Duration::from_secs(900),
..Default::default()
};
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, eo_extend_rx, options);
tokio::task::yield_now().await;
for i in 0..30 {
lease_loop.strong_message_tx().send(NewMessage {
ack_id: test_id(i),
lease_info: exactly_once_info(),
})?;
}
for i in 0..10 {
lease_loop
.strong_ack_tx()
.send(Action::ExactlyOnceAck(test_id(i)))?;
}
for i in 10..20 {
lease_loop
.strong_ack_tx()
.send(Action::ExactlyOnceNack(test_id(i)))?;
}
mock.lock()
.await
.expect_confirmed_ack()
.times(1)
.withf(|v| sorted(v) == test_ids(0..10))
.returning(|_| ());
mock.lock()
.await
.expect_confirmed_nack()
.times(1)
.withf(|v| sorted(v) == test_ids(10..20))
.returning(|_| ());
tokio::time::advance(FLUSH_START).await;
tokio::task::yield_now().await;
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn confirmed_ack() -> anyhow::Result<()> {
const FLUSH_START: Duration = Duration::from_millis(200);
let mock = Arc::new(Mutex::new(MockLeaser::new()));
let (confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
flush_start: FLUSH_START,
extend_start: Duration::from_secs(900),
..Default::default()
};
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, eo_extend_rx, options);
tokio::task::yield_now().await;
let (result_tx, mut result_rx) = channel();
lease_loop.strong_message_tx().send(NewMessage {
ack_id: test_id(0),
lease_info: LeaseInfo::ExactlyOnce(ExactlyOnceInfo::new(result_tx)),
})?;
lease_loop
.strong_ack_tx()
.send(Action::ExactlyOnceAck(test_id(0)))?;
let mut ack_results = HashMap::new();
ack_results.insert(test_id(0), Ok(()));
confirmed_tx.send(ack_results)?;
mock.lock()
.await
.expect_confirmed_ack()
.times(1)
.withf(|v| *v == vec![test_id(0)])
.returning(|_| ());
tokio::time::advance(FLUSH_START).await;
tokio::task::yield_now().await;
result_rx.try_recv()??;
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn flush_acks_nacks_on_interval() -> anyhow::Result<()> {
const FLUSH_PERIOD: Duration = Duration::from_secs(1);
const FLUSH_START: Duration = Duration::from_millis(200);
let mock = Arc::new(Mutex::new(MockLeaser::new()));
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
flush_period: FLUSH_PERIOD,
flush_start: FLUSH_START,
extend_start: Duration::from_secs(900),
..Default::default()
};
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, eo_extend_rx, options);
tokio::task::yield_now().await;
for i in 0..30 {
lease_loop.strong_message_tx().send(test_message(i))?;
}
for i in 0..10 {
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}
mock.lock().await.checkpoint();
{
mock.lock()
.await
.expect_ack()
.times(1)
.withf(|v| sorted(v) == test_ids(0..10))
.returning(move |_| ());
tokio::time::advance(FLUSH_START).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
for i in 10..20 {
lease_loop.strong_ack_tx().send(Action::Nack(test_id(i)))?;
}
{
mock.lock()
.await
.expect_nack()
.times(1)
.withf(|v| sorted(v) == test_ids(10..20))
.returning(|_| ());
tokio::time::advance(FLUSH_PERIOD).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
for i in 20..25 {
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}
for i in 25..30 {
lease_loop.strong_ack_tx().send(Action::Nack(test_id(i)))?;
}
{
mock.lock()
.await
.expect_ack()
.times(1)
.withf(|v| sorted(v) == test_ids(20..25))
.returning(move |_| ());
mock.lock()
.await
.expect_nack()
.times(1)
.withf(|v| sorted(v) == test_ids(25..30))
.returning(|_| ());
tokio::time::advance(FLUSH_PERIOD).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn extend_interval() -> anyhow::Result<()> {
const EXTEND_PERIOD: Duration = Duration::from_secs(1);
const EXTEND_START: Duration = Duration::from_millis(200);
let mock = Arc::new(Mutex::new(MockLeaser::new()));
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
flush_start: Duration::from_secs(900),
extend_period: EXTEND_PERIOD,
extend_start: EXTEND_START,
max_lease_extension: Duration::from_secs(7),
..Default::default()
};
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, eo_extend_rx, options);
tokio::task::yield_now().await;
for i in 0..30 {
lease_loop.strong_message_tx().send(test_message(i))?;
}
for i in 30..60 {
lease_loop.strong_message_tx().send(NewMessage {
ack_id: test_id(i),
lease_info: exactly_once_info(),
})?;
}
mock.lock().await.checkpoint();
{
mock.lock()
.await
.expect_extend()
.times(1)
.withf(|v| sorted(v) == test_ids(0..30))
.returning(move |ack_ids| ack_ids);
mock.lock()
.await
.expect_eo_extend()
.times(1)
.withf(|v| sorted(v) == test_ids(30..60))
.return_once({
let tx = eo_extend_tx.clone();
move |ack_ids| {
let mut results = HashMap::new();
for id in ack_ids {
results.insert(id, Ok(()));
}
let _ = tx.send(results);
}
});
tokio::time::advance(EXTEND_START).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
for i in 0..10 {
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}
{
mock.lock().await.expect_extend().times(0);
tokio::time::advance(EXTEND_PERIOD).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
{
mock.lock()
.await
.expect_extend()
.times(1)
.withf(|v| sorted(v) == test_ids(10..30))
.returning(|ack_ids| ack_ids);
mock.lock()
.await
.expect_eo_extend()
.times(1)
.withf(|v| sorted(v) == test_ids(30..60))
.return_once({
let tx = eo_extend_tx.clone();
move |ack_ids| {
let mut results = HashMap::new();
for id in ack_ids {
results.insert(id, Ok(()));
}
let _ = tx.send(results);
}
});
tokio::time::advance(EXTEND_PERIOD).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn drop_does_not_wait_for_pending_operations() -> anyhow::Result<()> {
let start = Instant::now();
let mock = MockLeaser::new();
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let lease_loop = LeaseLoop::new(
Arc::new(mock),
confirmed_rx,
eo_extend_rx,
LeaseOptions::default(),
);
tokio::task::yield_now().await;
for i in 0..30 {
lease_loop.strong_message_tx().send(test_message(i))?;
}
for i in 0..10 {
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}
drop(lease_loop);
assert_eq!(start.elapsed(), Duration::ZERO);
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn shutdown_nack_immediately() -> anyhow::Result<()> {
let mock = Arc::new(Mutex::new(MockLeaser::new()));
mock.lock()
.await
.expect_ack()
.times(1)
.withf(|v| sorted(v) == test_ids(0..10))
.returning(|_| ());
mock.lock()
.await
.expect_nack()
.times(1)
.withf(|v| sorted(v) == test_ids(10..30))
.returning(|_| ());
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
shutdown_behavior: ShutdownBehavior::NackImmediately,
..Default::default()
};
let lease_loop = LeaseLoop::new(mock, confirmed_rx, eo_extend_rx, options);
for i in 0..30 {
lease_loop.strong_message_tx().send(test_message(i))?;
}
for i in 0..10 {
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}
for i in 10..20 {
lease_loop.strong_ack_tx().send(Action::Nack(test_id(i)))?;
}
lease_loop.cancel.cancel();
tokio::task::yield_now().await;
lease_loop.handle.await?;
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn shutdown_wait_for_processing() -> anyhow::Result<()> {
let mock = Arc::new(Mutex::new(MockLeaser::new()));
mock.lock()
.await
.expect_ack()
.times(1)
.withf(|v| sorted(v) == test_ids(0..20))
.returning(|_| ());
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
shutdown_behavior: ShutdownBehavior::WaitForProcessing,
..Default::default()
};
let lease_loop = LeaseLoop::new(mock, confirmed_rx, eo_extend_rx, options);
let ack_tx = lease_loop.strong_ack_tx();
for i in 0..20 {
lease_loop.strong_message_tx().send(test_message(i))?;
}
for i in 0..10 {
ack_tx.send(Action::Ack(test_id(i)))?;
}
lease_loop.cancel.cancel();
tokio::task::yield_now().await;
for i in 10..20 {
ack_tx.send(Action::Ack(test_id(i)))?;
}
drop(ack_tx);
lease_loop.handle.await?;
Ok(())
}
#[test_case(ShutdownBehavior::WaitForProcessing)]
#[test_case(ShutdownBehavior::NackImmediately)]
#[tokio_test_no_panics(start_paused = true)]
async fn shutdown_waits_for_flush(shutdown_behavior: ShutdownBehavior) -> anyhow::Result<()> {
const EXPECTED_SLEEP: Duration = Duration::from_millis(100);
let start = Instant::now();
#[derive(Clone)]
struct FakeLeaser;
#[async_trait::async_trait]
impl Leaser for FakeLeaser {
async fn ack(&self, mut ack_ids: Vec<String>) {
ack_ids.sort();
assert_eq!(ack_ids, test_ids(0..10));
tokio::time::sleep(EXPECTED_SLEEP).await;
}
async fn nack(&self, mut ack_ids: Vec<String>) {
ack_ids.sort();
assert_eq!(ack_ids, test_ids(10..30));
}
async fn extend(&self, ack_ids: Vec<String>) -> Vec<String> {
ack_ids
}
async fn confirmed_ack(&self, _ack_ids: Vec<String>) {}
async fn confirmed_nack(&self, _ack_ids: Vec<String>) {}
async fn eo_extend(&self, _ack_ids: Vec<String>) {}
}
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
shutdown_behavior,
..Default::default()
};
let lease_loop = LeaseLoop::new(FakeLeaser, confirmed_rx, eo_extend_rx, options);
for i in 0..30 {
lease_loop.strong_message_tx().send(test_message(i))?;
}
for i in 0..10 {
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}
lease_loop.cancel.cancel();
lease_loop.handle.await?;
assert_eq!(start.elapsed(), EXPECTED_SLEEP);
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn no_add_and_ack_race() -> anyhow::Result<()> {
for _ in 0..1000 {
let mock = Arc::new(Mutex::new(MockLeaser::new()));
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (_eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
flush_start: Duration::from_millis(100),
extend_start: Duration::from_millis(200),
..Default::default()
};
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, eo_extend_rx, options);
tokio::task::yield_now().await;
lease_loop.strong_message_tx().send(test_message(1))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(1)))?;
{
mock.lock()
.await
.expect_ack()
.times(1)
.withf(|v| *v == vec![test_id(1)])
.returning(|_| ());
tokio::time::advance(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
{
mock.lock().await.expect_extend().times(0);
tokio::time::advance(Duration::from_millis(100)).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}
}
Ok(())
}
#[tokio_test_no_panics(start_paused = true)]
async fn eo_extend_result_updates_lease_state() -> anyhow::Result<()> {
const EXTEND_START: Duration = Duration::from_millis(200);
const EXTEND_PERIOD: Duration = Duration::from_secs(1);
let mock = Arc::new(Mutex::new(MockLeaser::new()));
let (_confirmed_tx, confirmed_rx) = unbounded_channel();
let (eo_extend_tx, eo_extend_rx) = unbounded_channel();
let options = LeaseOptions {
flush_start: Duration::from_secs(900),
extend_period: EXTEND_PERIOD,
extend_start: EXTEND_START,
max_lease_extension: Duration::from_secs(10),
..Default::default()
};
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, eo_extend_rx, options);
tokio::task::yield_now().await;
lease_loop.strong_message_tx().send(NewMessage {
ack_id: test_id(0),
lease_info: exactly_once_info(),
})?;
mock.lock()
.await
.expect_eo_extend()
.times(1)
.withf(|v| *v == vec![test_id(0)])
.return_once({
let tx = eo_extend_tx.clone();
move |ack_ids| {
let mut results = HashMap::new();
for id in ack_ids {
results.insert(id, Ok(()));
}
let _ = tx.send(results);
}
});
tokio::time::advance(EXTEND_START).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
tokio::task::yield_now().await;
mock.lock().await.expect_eo_extend().times(0);
tokio::time::advance(EXTEND_PERIOD).await;
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
Ok(())
}
}