mod outbox;
#[cfg(feature = "postgres")]
mod postgres;
use std::future::Future;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use time::OffsetDateTime;
pub use outbox::{DeadLetter, DeadLetterQuery, OutboxEffect, OutboxStore};
#[cfg(feature = "postgres")]
pub use postgres::PgStore;
use crate::error::Result;
use crate::workflow::WorkflowId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ObservationOutcome {
Accepted,
Rejected(Value),
AlreadyCompleted,
}
impl ObservationOutcome {
pub fn as_str(&self) -> &'static str {
match self {
ObservationOutcome::Accepted => "accepted",
ObservationOutcome::Rejected(_) => "rejected",
ObservationOutcome::AlreadyCompleted => "already_completed",
}
}
pub fn rejection_payload(&self) -> Option<&Value> {
match self {
ObservationOutcome::Rejected(value) => Some(value),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct InputObservation {
pub workflow_type: String,
pub workflow_id: WorkflowId,
pub input_type: String,
pub payload: Value,
pub outcome: ObservationOutcome,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredEvent {
pub global_sequence: i64,
pub workflow_type: String,
pub workflow_id: WorkflowId,
pub sequence: i64,
pub payload: Value,
pub created_at: OffsetDateTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowInstanceSummary {
pub workflow_type: String,
pub workflow_id: WorkflowId,
pub created_at: OffsetDateTime,
pub event_count: i64,
pub last_event_at: Option<OffsetDateTime>,
pub completed_at: Option<OffsetDateTime>,
}
pub enum BeginResult<U> {
Active {
events: Vec<Value>,
uow: U,
},
Completed,
}
pub trait Store: Send + Sync + Clone + 'static {
type UnitOfWork<'a>: UnitOfWork + Send
where
Self: 'a;
fn begin<'a>(
&'a self,
workflow_type: &'static str,
workflow_id: &WorkflowId,
unique_key: Option<&str>,
) -> impl Future<Output = Result<BeginResult<Self::UnitOfWork<'a>>>> + Send;
fn record_observation(
&self,
observation: InputObservation,
) -> impl Future<Output = Result<()>> + Send;
}
pub trait UnitOfWork: Send {
fn append_events<E, I>(&mut self, events: I) -> impl Future<Output = Result<()>> + Send
where
E: Serialize + Send,
I: IntoIterator<Item = E> + Send;
fn enqueue_effects<F, I>(&mut self, effects: I) -> impl Future<Output = Result<()>> + Send
where
F: Serialize + Send,
I: IntoIterator<Item = F> + Send;
fn schedule_timers<T>(&mut self, timers: T) -> impl Future<Output = Result<()>> + Send
where
T: IntoIterator<Item = crate::Timer<serde_json::Value>> + Send;
fn cancel_timers(&mut self, keys: Vec<String>) -> impl Future<Output = Result<()>> + Send;
fn record_input_observation(
&mut self,
observation: InputObservation,
) -> impl Future<Output = Result<()>> + Send;
fn mark_completed(&mut self);
fn commit(self) -> impl Future<Output = Result<()>> + Send;
}
pub trait EventStore: Send + Sync + Clone + 'static {
fn fetch_events_since(
&self,
after: i64,
limit: u32,
) -> impl Future<Output = Result<Vec<StoredEvent>>> + Send;
}
pub trait ProjectionStore: Send + Sync + Clone + 'static {
fn load_projection_position(
&self,
projection_name: &str,
) -> impl Future<Output = Result<i64>> + Send;
fn store_projection_position(
&self,
projection_name: &str,
global_sequence: i64,
) -> impl Future<Output = Result<()>> + Send;
}
#[async_trait]
pub trait WorkflowQueryStore: Send + Sync + Clone + 'static {
async fn list_workflows(
&self,
workflow_type: Option<&str>,
limit: u32,
offset: u32,
) -> Result<Vec<WorkflowInstanceSummary>>;
async fn fetch_workflow_events(
&self,
workflow_type: &str,
workflow_id: &WorkflowId,
) -> Result<Vec<StoredEvent>>;
}