use std::sync::Arc;
use serde_json::Value;
use crate::Workflow;
use crate::error::{Error, Result};
use crate::runtime::registry::{DynamicEntry, WorkflowRegistry};
use crate::store::{Store, StoredEvent, WorkflowInstanceSummary, WorkflowQueryStore};
use crate::workflow::WorkflowId;
#[derive(Debug, Clone, Default)]
pub struct WorkflowServiceConfig {
pub record_input_observations: bool,
}
#[derive(Debug, Clone)]
pub enum ExecuteOutcome<R> {
Accepted {
events_appended: usize,
},
Rejected(R),
AlreadyCompleted,
}
impl<R> ExecuteOutcome<R> {
pub fn map<R2, F>(self, f: F) -> ExecuteOutcome<R2>
where
F: FnOnce(R) -> R2,
{
match self {
ExecuteOutcome::Accepted { events_appended } => {
ExecuteOutcome::Accepted { events_appended }
}
ExecuteOutcome::Rejected(r) => ExecuteOutcome::Rejected(f(r)),
ExecuteOutcome::AlreadyCompleted => ExecuteOutcome::AlreadyCompleted,
}
}
pub fn try_map<R2, E, F>(self, f: F) -> std::result::Result<ExecuteOutcome<R2>, E>
where
F: FnOnce(R) -> std::result::Result<R2, E>,
{
Ok(match self {
ExecuteOutcome::Accepted { events_appended } => {
ExecuteOutcome::Accepted { events_appended }
}
ExecuteOutcome::Rejected(r) => ExecuteOutcome::Rejected(f(r)?),
ExecuteOutcome::AlreadyCompleted => ExecuteOutcome::AlreadyCompleted,
})
}
}
#[derive(Clone)]
pub struct WorkflowService<S>
where
S: Store,
{
registry: Arc<WorkflowRegistry>,
store: S,
config: WorkflowServiceConfig,
}
impl<S> WorkflowService<S>
where
S: Store,
{
pub(crate) fn new(
store: S,
registry: Arc<WorkflowRegistry>,
config: WorkflowServiceConfig,
) -> Self {
Self {
registry,
store,
config,
}
}
pub async fn execute<W>(&self, input: &W::Input) -> Result<ExecuteOutcome<W::Rejection>>
where
W: Workflow + Send + Sync + 'static,
W::State: Send,
W::Input: Send + Sync,
W::Rejection: Send,
{
let Some(entry) = self.registry.get_typed::<W>() else {
return Err(Error::UnknownWorkflowType(W::TYPE.to_string()));
};
entry.execute(input).await
}
pub async fn execute_dynamic(
&self,
workflow_type: &str,
payload: &Value,
) -> Result<ExecuteOutcome<Value>> {
let Some((_workflow_type, entry)) = self.registry.get(workflow_type) else {
return Err(Error::UnknownWorkflowType(workflow_type.to_string()));
};
entry.execute_dynamic(payload.clone()).await
}
pub(crate) fn workflow_count(&self) -> usize {
self.registry.len()
}
pub(crate) fn registered_types(&self) -> Vec<String> {
self.registry.registered_types()
}
pub(crate) fn get_entry(
&self,
workflow_type: &str,
) -> Option<(&'static str, &dyn DynamicEntry)> {
self.registry.get(workflow_type)
}
pub fn config(&self) -> &WorkflowServiceConfig {
&self.config
}
}
impl<S> WorkflowService<S>
where
S: Store + WorkflowQueryStore,
{
pub async fn list_workflows(
&self,
workflow_type: Option<&str>,
limit: u32,
offset: u32,
) -> Result<Vec<WorkflowInstanceSummary>> {
self.store
.list_workflows(workflow_type, limit, offset)
.await
}
pub async fn fetch_workflow_events(
&self,
workflow_type: &str,
workflow_id: &WorkflowId,
) -> Result<Vec<StoredEvent>> {
self.store
.fetch_workflow_events(workflow_type, workflow_id)
.await
}
pub async fn fetch_latest_state<W>(&self, workflow_id: &WorkflowId) -> Result<W::State>
where
W: Workflow + Send + Sync + 'static,
W::State: Send,
{
let Some(entry) = self.registry.get_typed::<W>() else {
return Err(Error::UnknownWorkflowType(W::TYPE.to_string()));
};
entry.replay_latest_state(workflow_id).await
}
pub async fn fetch_latest_state_dynamic(
&self,
workflow_type: &str,
workflow_id: &WorkflowId,
) -> Result<Value> {
let Some((_workflow_type, entry)) = self.registry.get(workflow_type) else {
return Err(Error::UnknownWorkflowType(workflow_type.to_string()));
};
entry.replay_latest_state_dynamic(workflow_id).await
}
}