apalis_workflow/
sink.rs

1use apalis_core::{
2    backend::{BackendExt, TaskSinkError, codec::Codec},
3    error::BoxDynError,
4    task::{Task, builder::TaskBuilder, metadata::MetadataExt, task_id::TaskId},
5};
6use futures::Sink;
7
8use crate::{context::WorkflowContext, id_generator::GenerateId};
9
10/// Extension trait for pushing tasks into a workflow
11pub trait WorkflowSink<Args>: BackendExt
12where
13    Self::Codec: Codec<Args, Compact = Self::Compact>,
14{
15    /// Push a step into the workflow sink at the start
16    fn push_start(
17        &mut self,
18        step: Args,
19    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send {
20        self.push_step(step, 0)
21    }
22
23    /// Push a step into the workflow sink at the specified index
24    fn push_step(
25        &mut self,
26        step: Args,
27        index: usize,
28    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
29}
30
31impl<S: Send, Args: Send, Compact, Err> WorkflowSink<Args> for S
32where
33    S: Sink<Task<Compact, S::Context, S::IdType>, Error = Err>
34        + BackendExt<Error = Err, Compact = Compact>
35        + Unpin,
36    S::IdType: GenerateId + Send,
37    S::Codec: Codec<Args, Compact = Compact>,
38    S::Context: MetadataExt<WorkflowContext> + Send,
39    Err: std::error::Error + Send + Sync + 'static,
40    <S::Codec as Codec<Args>>::Error: Into<BoxDynError> + Send + Sync + 'static,
41    <S::Context as MetadataExt<WorkflowContext>>::Error: Into<BoxDynError> + Send + Sync + 'static,
42    Compact: Send + 'static,
43{
44    async fn push_step(
45        &mut self,
46        step: Args,
47        index: usize,
48    ) -> Result<(), TaskSinkError<Self::Error>> {
49        use futures::SinkExt;
50        let task_id = TaskId::new(S::IdType::generate());
51        let compact = S::Codec::encode(&step).map_err(|e| TaskSinkError::CodecError(e.into()))?;
52        let task = TaskBuilder::new(compact)
53            .meta(WorkflowContext { step_index: index })
54            .with_task_id(task_id.clone())
55            .build();
56        self.send(task)
57            .await
58            .map_err(|e| TaskSinkError::PushError(e))
59    }
60}