use std::num::NonZeroUsize;
use std::sync::Arc;
use aion_core::{Event, TimerId, WorkflowFilter, WorkflowId, WorkflowSummary};
use aion_store::{
EventStore, ReadableEventStore, RunSummary, StoreError, TimerEntry, WritableEventStore,
WriteToken,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use tokio::sync::broadcast;
use super::publisher::BroadcastEventPublisher;
const MAX_BROADCAST_CAPACITY: usize = usize::MAX / 2;
#[derive(thiserror::Error, Clone, Copy, Debug, PartialEq, Eq)]
pub enum PublishError {
#[error(
"event streaming capacity {capacity} exceeds the broadcast channel maximum {MAX_BROADCAST_CAPACITY}"
)]
CapacityTooLarge {
capacity: usize,
},
}
pub struct PublishingEventStore {
inner: Arc<dyn EventStore>,
events: broadcast::Sender<Event>,
}
impl PublishingEventStore {
pub fn new(inner: Arc<dyn EventStore>, capacity: NonZeroUsize) -> Result<Self, PublishError> {
if capacity.get() > MAX_BROADCAST_CAPACITY {
return Err(PublishError::CapacityTooLarge {
capacity: capacity.get(),
});
}
let (events, initial_receiver) = broadcast::channel(capacity.get());
drop(initial_receiver);
Ok(Self { inner, events })
}
#[must_use]
pub fn publisher(&self) -> BroadcastEventPublisher {
BroadcastEventPublisher::new(self.events.clone())
}
}
#[async_trait]
impl WritableEventStore for PublishingEventStore {
async fn append(
&self,
token: WriteToken,
workflow_id: &WorkflowId,
events: &[Event],
expected_seq: u64,
) -> Result<(), StoreError> {
self.inner
.append(token, workflow_id, events, expected_seq)
.await?;
for event in events {
if self.events.receiver_count() == 0 {
continue;
}
let delivery = self.events.send(event.clone());
drop(delivery);
}
Ok(())
}
}
#[async_trait]
impl ReadableEventStore for PublishingEventStore {
async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
self.inner.read_history(workflow_id).await
}
async fn read_history_from(
&self,
workflow_id: &WorkflowId,
from_seq: u64,
) -> Result<Vec<Event>, StoreError> {
self.inner.read_history_from(workflow_id, from_seq).await
}
async fn read_run_chain(
&self,
workflow_id: &WorkflowId,
) -> Result<Vec<RunSummary>, StoreError> {
self.inner.read_run_chain(workflow_id).await
}
async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
self.inner.list_workflow_ids().await
}
async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
self.inner.list_active().await
}
async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
self.inner.query(filter).await
}
async fn schedule_timer(
&self,
workflow_id: &WorkflowId,
timer_id: &TimerId,
fire_at: DateTime<Utc>,
) -> Result<(), StoreError> {
self.inner
.schedule_timer(workflow_id, timer_id, fire_at)
.await
}
async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
self.inner.expired_timers(as_of).await
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use aion_core::{Event, EventEnvelope, Payload, WorkflowId};
use aion_store::{InMemoryStore, StoreError, WriteToken};
use futures::StreamExt;
use serde_json::json;
use crate::engine::delegated::EventFilter;
use crate::engine::delegated::EventPublisher;
use super::*;
fn capacity(value: usize) -> Result<NonZeroUsize, Box<dyn std::error::Error>> {
NonZeroUsize::new(value).ok_or_else(|| "capacity must be non-zero".into())
}
fn payload(label: &str) -> Result<Payload, aion_core::PayloadError> {
Payload::from_json(&json!({ "label": label }))
}
fn envelope(seq: u64, workflow_id: &WorkflowId) -> EventEnvelope {
EventEnvelope {
seq,
recorded_at: chrono::Utc::now(),
workflow_id: workflow_id.clone(),
}
}
fn started(seq: u64, workflow_id: &WorkflowId) -> Result<Event, aion_core::PayloadError> {
Ok(Event::WorkflowStarted {
envelope: envelope(seq, workflow_id),
workflow_type: "checkout".to_owned(),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
})
}
fn signal(seq: u64, workflow_id: &WorkflowId) -> Result<Event, aion_core::PayloadError> {
Ok(Event::SignalReceived {
envelope: envelope(seq, workflow_id),
name: "approved".to_owned(),
payload: payload("signal")?,
})
}
fn publishing_store(cap: usize) -> Result<PublishingEventStore, Box<dyn std::error::Error>> {
let inner: Arc<dyn aion_store::EventStore> = Arc::new(InMemoryStore::default());
Ok(PublishingEventStore::new(inner, capacity(cap)?)?)
}
async fn next_item(
stream: &mut futures::stream::BoxStream<
'static,
Result<Event, crate::engine::delegated::EventStreamLagged>,
>,
) -> Result<
Result<Event, crate::engine::delegated::EventStreamLagged>,
Box<dyn std::error::Error>,
> {
tokio::time::timeout(Duration::from_secs(2), stream.next())
.await?
.ok_or_else(|| "subscription stream ended unexpectedly".into())
}
#[tokio::test]
async fn append_publishes_committed_events_in_seq_order()
-> Result<(), Box<dyn std::error::Error>> {
let store = publishing_store(8)?;
let workflow_id = WorkflowId::new_v4();
let mut subscription = store.publisher().subscribe(EventFilter::default());
store
.append(
WriteToken::recorder(),
&workflow_id,
&[started(1, &workflow_id)?, signal(2, &workflow_id)?],
0,
)
.await?;
store
.append(
WriteToken::recorder(),
&workflow_id,
&[signal(3, &workflow_id)?],
2,
)
.await?;
for expected_seq in 1..=3 {
let event = next_item(&mut subscription).await??;
assert_eq!(event.seq(), expected_seq);
}
Ok(())
}
#[tokio::test]
async fn failed_append_publishes_nothing() -> Result<(), Box<dyn std::error::Error>> {
let store = publishing_store(8)?;
let workflow_id = WorkflowId::new_v4();
let mut subscription = store.publisher().subscribe(EventFilter::default());
let conflict = store
.append(
WriteToken::recorder(),
&workflow_id,
&[started(6, &workflow_id)?],
5,
)
.await;
assert!(matches!(conflict, Err(StoreError::SequenceConflict { .. })));
store
.append(
WriteToken::recorder(),
&workflow_id,
&[started(1, &workflow_id)?],
0,
)
.await?;
let event = next_item(&mut subscription).await??;
assert_eq!(event.seq(), 1);
Ok(())
}
#[tokio::test]
async fn reads_delegate_to_inner_store() -> Result<(), Box<dyn std::error::Error>> {
let inner = Arc::new(InMemoryStore::default());
let store = PublishingEventStore::new(
Arc::clone(&inner) as Arc<dyn aion_store::EventStore>,
capacity(8)?,
)?;
let workflow_id = WorkflowId::new_v4();
store
.append(
WriteToken::recorder(),
&workflow_id,
&[started(1, &workflow_id)?],
0,
)
.await?;
let wrapped_history = store.read_history(&workflow_id).await?;
let inner_history = inner.read_history(&workflow_id).await?;
assert_eq!(wrapped_history, inner_history);
assert_eq!(wrapped_history.len(), 1);
assert_eq!(store.list_active().await?, vec![workflow_id]);
Ok(())
}
#[tokio::test]
async fn capacity_above_broadcast_maximum_is_rejected() -> Result<(), Box<dyn std::error::Error>>
{
let inner: Arc<dyn aion_store::EventStore> = Arc::new(InMemoryStore::default());
let error = PublishingEventStore::new(inner, capacity(usize::MAX)?).err();
assert_eq!(
error,
Some(PublishError::CapacityTooLarge {
capacity: usize::MAX
})
);
Ok(())
}
}