use async_trait::async_trait;
use super::context::EffectContext;
use crate::Workflow;
use tracing::warn;
pub(crate) struct NoopHandler<W>(std::marker::PhantomData<W>);
impl<W> Default for NoopHandler<W> {
fn default() -> Self {
Self(std::marker::PhantomData)
}
}
#[derive(Debug)]
pub struct NoopHandlerError;
impl std::fmt::Display for NoopHandlerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("noop handler does not fail")
}
}
impl std::error::Error for NoopHandlerError {}
#[async_trait]
pub trait EffectHandler: Send + Sync + 'static {
type Workflow: Workflow;
type Error: std::fmt::Display + Send + 'static;
async fn handle(
&self,
effect: &<Self::Workflow as Workflow>::Effect,
ctx: &EffectContext,
) -> Result<Option<<Self::Workflow as Workflow>::Input>, Self::Error>;
}
#[async_trait]
impl<W> EffectHandler for NoopHandler<W>
where
W: Workflow + Send + Sync + 'static,
W::Input: Send,
{
type Workflow = W;
type Error = NoopHandlerError;
async fn handle(
&self,
_effect: &<Self::Workflow as Workflow>::Effect,
_ctx: &EffectContext,
) -> Result<Option<<Self::Workflow as Workflow>::Input>, Self::Error> {
warn!(
workflow_type = W::TYPE,
"NoopHandler received an effect; workflow should not emit effects"
);
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct BufferWriter {
buffer: Arc<Mutex<Vec<u8>>>,
}
impl std::io::Write for BufferWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buffer.lock().unwrap().extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
struct NoopWorkflow;
#[derive(Default)]
struct NoopState;
#[derive(serde::Serialize, serde::Deserialize)]
struct NoopInput {
id: String,
}
#[derive(serde::Serialize, serde::Deserialize, Clone)]
struct NoopEvent;
#[derive(serde::Serialize)]
struct NoopEffect;
impl crate::HasWorkflowId for NoopInput {
fn workflow_id(&self) -> crate::WorkflowId {
crate::WorkflowId::new(&self.id)
}
}
impl Workflow for NoopWorkflow {
type State = NoopState;
type Input = NoopInput;
type Event = NoopEvent;
type Effect = NoopEffect;
type Rejection = std::borrow::Cow<'static, str>;
const TYPE: &'static str = "noop";
fn evolve(state: Self::State, _event: Self::Event) -> Self::State {
state
}
fn decide(
_now: time::OffsetDateTime,
_state: &Self::State,
_input: &Self::Input,
) -> crate::Decision<Self::Event, Self::Effect, Self::Input, Self::Rejection> {
crate::Decision::accept(NoopEvent)
}
}
#[tokio::test]
async fn noop_handler_logs_warning_on_handle() {
let buffer = Arc::new(Mutex::new(Vec::new()));
let writer_buffer = Arc::clone(&buffer);
let subscriber = tracing_subscriber::fmt()
.with_writer(move || BufferWriter {
buffer: Arc::clone(&writer_buffer),
})
.with_ansi(false)
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let handler = NoopHandler::<NoopWorkflow>::default();
let ctx = EffectContext::new(
uuid::Uuid::nil(),
crate::WorkflowRef::new("noop", "noop-1"),
1,
time::OffsetDateTime::UNIX_EPOCH,
);
let effect = NoopEffect;
let _ = handler.handle(&effect, &ctx).await;
let locked = buffer.lock().unwrap();
let output = String::from_utf8_lossy(&locked);
assert!(output.contains("NoopHandler received an effect"));
}
}