use super::super::notification::EventNotification;
use super::super::store::ConsumerStore;
use crate::error::{Error, Result};
use futures::stream::StreamExt;
use std::time::Duration;
use timesource_core::Persisted;
use tokio::select;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::Instrument;
use super::operation::ConsumerOperation;
pub(super) async fn coordinate_event_channel<Store>(
default_poller_frequency: Duration,
mut pg_notification_rx: mpsc::Receiver<Result<EventNotification<Store::Event>>>,
store: Store,
buffer_tx: mpsc::Sender<Result<Persisted<Store::Event>>>,
) where
Store: ConsumerStore,
{
info!("Catching up with events published since last committed offset");
let mut resume_stream = store.events_after_offset();
let mut last_known_offset = None;
let mut prior_events_total = 0;
while let Some(r) = resume_stream.next().await {
let event_id = r.as_ref().map(|x| x.id()).ok();
if let Err(error) = buffer_tx.send(r).await {
error!(?error, "Sender dropped");
return;
}
prior_events_total += 1;
if let Some(event_id) = event_id {
last_known_offset = Some(event_id);
}
}
info!(
last_known_offset,
total_processed = prior_events_total,
"Caught up with events published before bootstrap. Waiting for next event."
);
let mut poller_frequency = default_poller_frequency;
loop {
let awake_op = async {
let poller = sleep(poller_frequency);
let pg_notification = pg_notification_rx.recv();
select! {
_ = poller => {
warn!(?poller_frequency, "Polling for events in case there are network issues");
Ok(Some(ConsumerOperation::from(last_known_offset)))
}
payload = pg_notification => match payload {
Some(Ok(notification)) => {
if poller_frequency != default_poller_frequency {
info!("Consumer connection re-established. Disabling polling as first option");
poller_frequency = default_poller_frequency
}
Ok(ConsumerOperation::from_notification(notification, last_known_offset))
},
Some(Err(error)) => {
match error {
Error::ListenerDisconnected => {
error!(?error, "NOTIFY connection interrupted. Switching to frequent polling");
poller_frequency = Duration::from_secs(3);
},
_ => {
error!(?error, "Unexpected consumer internal error");
}
};
Ok(None)
}
None => {
Err("pg_notification channel dropped")
},
}
}
}
.instrument(info_span!("trigger_await", last_known_offset))
.await;
let awake_op = match awake_op {
Ok(op) => op,
Err(error) => {
error!(?error, "Unable to continue");
break;
}
};
if let Some(operation) = awake_op {
debug!(?operation, "New operation");
async {
match operation.handle(&store, buffer_tx.clone()).await {
Ok(None) | Err(None) => {}
Ok(Some(next_offset)) => {
last_known_offset = Some(next_offset);
}
Err(Some(next_offset)) => {
error!("Unable to process event. Skipping");
last_known_offset = Some(next_offset);
}
}
}
.instrument(info_span!("consumer_operation"))
.await;
}
}
}
#[cfg(test)]
mod tests {
use super::super::super::test_util::{mocks, MockEvent};
use super::*;
use futures::stream::{self, BoxStream};
use mockall::predicate;
use timesource_core::event::Persisted;
use tokio::task::yield_now;
use uuid::Uuid;
fn set_panic_hook() {
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
std::process::exit(1);
}));
}
fn notify_events(
aggregate_id: Uuid,
events: &[Persisted<MockEvent>],
prior_event_id: Option<u64>,
tx: mpsc::Sender<Result<EventNotification<MockEvent>>>,
) {
for event in events {
let prior_event_id = if event.id() > 0 {
Some(event.id() - 1)
} else {
prior_event_id
};
let notification = EventNotification {
prior_event_id,
event: Persisted::new(
aggregate_id,
event.data().clone(),
event.id(),
event.timestamp(),
),
};
tx.try_send(Ok(notification)).unwrap();
}
}
struct TestRunner {
aggregate_id: Uuid,
id: i64,
event_buffer_rx: mpsc::Receiver<Result<Persisted<MockEvent>>>,
event_buffer_tx: mpsc::Sender<Result<Persisted<MockEvent>>>,
pg_rx: mpsc::Receiver<Result<EventNotification<MockEvent>>>,
pg_tx: mpsc::Sender<Result<EventNotification<MockEvent>>>,
}
impl TestRunner {
fn new() -> Self {
set_panic_hook();
let aggregate_id = Uuid::new_v4();
let (event_buffer_tx, event_buffer_rx) = mpsc::channel(100);
let (pg_tx, pg_rx) = mpsc::channel(100);
Self {
aggregate_id,
id: 0,
event_buffer_rx,
event_buffer_tx,
pg_rx,
pg_tx,
}
}
fn make_event(&mut self, event: MockEvent) -> Persisted<MockEvent> {
let id = self.id;
self.id += 1;
event.new_persisted(self.aggregate_id, id)
}
fn make_events(&mut self, inputs: Vec<MockEvent>) -> Vec<Persisted<MockEvent>> {
inputs
.into_iter()
.map(|x| self.make_event(x))
.collect::<Vec<_>>()
}
fn append_events(
&mut self,
inputs: Vec<MockEvent>,
notify: bool,
) -> Vec<Persisted<MockEvent>> {
let events = self.make_events(inputs);
for event in &events {
self.event_buffer_tx.try_send(Ok(event.clone())).unwrap();
}
if notify {
notify_events(self.aggregate_id, &events, None, self.pg_tx.clone());
}
events
}
async fn append_events_and_notify(
&mut self,
inputs: Vec<MockEvent>,
) -> Vec<Persisted<MockEvent>> {
let events = self.append_events(inputs, true);
sleep(Duration::from_millis(100)).await;
events
}
fn append_events_only(&mut self, inputs: Vec<MockEvent>) -> Vec<Persisted<MockEvent>> {
self.append_events(inputs, false)
}
fn stream_all(&mut self) -> BoxStream<'static, Result<Persisted<MockEvent>>> {
let mut output = vec![];
while let Ok(item) = self.event_buffer_rx.try_recv() {
output.push(item);
}
stream::iter(output).boxed()
}
}
#[tokio::test]
async fn on_bootstrap_fetches_stream_since_offset() {
set_panic_hook();
let poller_frequency = Duration::from_secs(60);
let (_pg_tx, pg_rx) = mpsc::channel(100);
let (buffer_tx, _) = mpsc::channel(100);
let mut store = mocks::MockStore::new();
store
.expect_events_after_offset()
.times(1)
.return_once(|| stream::iter(vec![]).boxed());
store.expect_events_after().times(0);
store.expect_events_range().times(0);
let task = tokio::spawn(coordinate_event_channel(
poller_frequency,
pg_rx,
store,
buffer_tx,
));
sleep(Duration::from_millis(100)).await;
task.abort();
}
#[tokio::test]
async fn during_bootstrap_yields_when_buffer_is_full() {
let poller_frequency = Duration::from_secs(60);
let mut runner = TestRunner::new();
let (buffer_tx, _buffer_rx) = mpsc::channel(1);
let events = runner
.append_events_and_notify(vec![MockEvent::SomeEvent1, MockEvent::SomeEvent2])
.await;
let mut store = mocks::MockStore::new();
let stream_all = runner.stream_all();
store
.expect_events_after_offset()
.times(1)
.return_once(|| stream_all);
store.expect_events_after().times(0);
store.expect_events_range().times(0);
let task = tokio::spawn(coordinate_event_channel(
poller_frequency,
runner.pg_rx,
store,
buffer_tx.clone(),
));
yield_now().await;
notify_events(runner.aggregate_id, &events, None, runner.pg_tx.clone());
yield_now().await;
assert!(buffer_tx.capacity() == 0);
task.abort();
}
#[tokio::test]
async fn given_prior_events_it_sends_them_downstream_after_bootstrap() {
let poller_frequency = Duration::from_secs(60);
let mut runner = TestRunner::new();
let appended =
runner.append_events_only(vec![MockEvent::SomeEvent1, MockEvent::SomeEvent2]);
let mut store = mocks::MockStore::new();
let stream_all = runner.stream_all();
store
.expect_events_after_offset()
.times(1)
.return_once(|| stream_all);
store.expect_events_after().times(0);
store.expect_events_range().times(0);
let task = tokio::spawn(coordinate_event_channel(
poller_frequency,
runner.pg_rx,
store,
runner.event_buffer_tx.clone(),
));
yield_now().await;
for expected in appended {
let received = runner
.event_buffer_rx
.recv()
.await
.expect("should have received message")
.expect("should be persisted event");
assert_eq!(expected.into_data(), received.into_data());
}
task.abort();
}
#[tokio::test]
async fn on_polling_fetches_stream_from_last_known_offset() {
let poller_frequency = Duration::from_millis(50);
let mut runner = TestRunner::new();
let events = runner.append_events_only(vec![MockEvent::SomeEvent1, MockEvent::SomeEvent2]);
let mut store = mocks::MockStore::new();
let stream_all = runner.stream_all();
let events_after = runner.stream_all();
store
.expect_events_after_offset()
.times(1)
.return_once(|| stream_all);
store
.expect_events_after()
.with(predicate::eq(events[1].id()))
.times(1)
.return_once(|_| events_after);
store.expect_events_range().times(0);
let task = tokio::spawn(coordinate_event_channel(
poller_frequency,
runner.pg_rx,
store,
runner.event_buffer_tx.clone(),
));
sleep(Duration::from_millis(51)).await;
task.abort();
}
#[tokio::test]
async fn on_polling_fetches_all_events_if_offset_is_not_yet_known() {
let poller_frequency = Duration::from_millis(50);
let runner = TestRunner::new();
let mut store = mocks::MockStore::new();
store
.expect_events_after_offset()
.times(2)
.returning(|| stream::iter(vec![]).boxed());
store.expect_events_after().times(0);
store.expect_events_range().times(0);
let task = tokio::spawn(coordinate_event_channel(
poller_frequency,
runner.pg_rx,
store,
runner.event_buffer_tx.clone(),
));
sleep(Duration::from_millis(51)).await;
task.abort();
}
#[tokio::test]
async fn given_first_notification_and_no_prior_events_sends_event_downstream() {
let poller_frequency = Duration::from_secs(60);
let mut runner = TestRunner::new();
let mut store = mocks::MockStore::new();
let stream_all = runner.stream_all();
store
.expect_events_after_offset()
.times(1)
.return_once(|| stream_all);
store.expect_events_after().times(0);
store.expect_events_range().times(0);
let appended =
runner.append_events_only(vec![MockEvent::SomeEvent1, MockEvent::SomeEvent2]);
let task = tokio::spawn(coordinate_event_channel(
poller_frequency,
runner.pg_rx,
store,
runner.event_buffer_tx.clone(),
));
yield_now().await;
notify_events(runner.aggregate_id, &appended, None, runner.pg_tx.clone());
yield_now().await;
for expected in appended {
let received = runner
.event_buffer_rx
.recv()
.await
.expect("should have received message")
.expect("should be persisted event");
assert_eq!(expected.into_data(), received.into_data());
}
task.abort();
}
#[tokio::test]
async fn given_network_reconnection_it_rewinds_to_get_missing_events() {
let mut runner = TestRunner::new();
let first_events =
runner.append_events_only(vec![MockEvent::SomeEvent1, MockEvent::SomeEvent2]);
let first_events_offset = first_events.last().unwrap().id();
let missed_events = runner.make_events(vec![MockEvent::SomeEvent1, MockEvent::SomeEvent2]);
let missed_events_offset = missed_events.last().map(|x| x.id());
let event_after_reconnection = runner.make_event(MockEvent::SomeEvent1);
let mut store = mocks::MockStore::new();
let resume_stream = runner.stream_all();
store
.expect_events_after_offset()
.times(1)
.return_once(|| resume_stream);
store
.expect_events_after()
.times(1)
.with(predicate::eq(first_events_offset))
.returning(|_| stream::iter(vec![]).boxed());
store
.expect_events_range()
.times(1)
.with(
predicate::eq(first_events_offset),
predicate::eq(event_after_reconnection.id()),
)
.returning(|_, _| stream::iter(vec![]).boxed());
let task = tokio::spawn(coordinate_event_channel(
Duration::from_secs(60),
runner.pg_rx,
store,
runner.event_buffer_tx.clone(),
));
yield_now().await;
while runner.event_buffer_rx.try_recv().is_ok() {}
runner
.pg_tx
.send(Err(Error::ListenerDisconnected))
.await
.unwrap();
sleep(Duration::from_millis(3005)).await;
notify_events(
runner.aggregate_id,
&[event_after_reconnection],
missed_events_offset,
runner.pg_tx.clone(),
);
yield_now().await;
task.abort();
}
#[tokio::test]
async fn it_is_no_op_given_a_notification_for_events_which_were_processed_already() {
let poller_frequency = Duration::from_secs(60);
let mut runner = TestRunner::new();
let prior_events =
runner.append_events_only(vec![MockEvent::SomeEvent1, MockEvent::SomeEvent2]);
let mut store = mocks::MockStore::new();
let stream_all = runner.stream_all();
store
.expect_events_after_offset()
.times(1)
.return_once(|| stream_all);
store.expect_events_after().times(0);
store.expect_events_range().times(0);
let task = tokio::spawn(coordinate_event_channel(
poller_frequency,
runner.pg_rx,
store,
runner.event_buffer_tx.clone(),
));
yield_now().await;
notify_events(
runner.aggregate_id,
&[prior_events.last().unwrap().clone()],
Some(0),
runner.pg_tx.clone(),
);
yield_now().await;
task.abort();
}
}