1use apalis_core::{
2 backend::{BackendExt, TaskSinkError, codec::Codec},
3 error::BoxDynError,
4 task::{Task, builder::TaskBuilder, metadata::MetadataExt, task_id::TaskId},
5};
6use futures::Sink;
7
8use crate::{context::WorkflowContext, id_generator::GenerateId};
9
10pub trait WorkflowSink<Args>: BackendExt
12where
13 Self::Codec: Codec<Args, Compact = Self::Compact>,
14{
15 fn push_start(
17 &mut self,
18 step: Args,
19 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send {
20 self.push_step(step, 0)
21 }
22
23 fn push_step(
25 &mut self,
26 step: Args,
27 index: usize,
28 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
29}
30
31impl<S: Send, Args: Send, Compact, Err> WorkflowSink<Args> for S
32where
33 S: Sink<Task<Compact, S::Context, S::IdType>, Error = Err>
34 + BackendExt<Error = Err, Compact = Compact>
35 + Unpin,
36 S::IdType: GenerateId + Send,
37 S::Codec: Codec<Args, Compact = Compact>,
38 S::Context: MetadataExt<WorkflowContext> + Send,
39 Err: std::error::Error + Send + Sync + 'static,
40 <S::Codec as Codec<Args>>::Error: Into<BoxDynError> + Send + Sync + 'static,
41 <S::Context as MetadataExt<WorkflowContext>>::Error: Into<BoxDynError> + Send + Sync + 'static,
42 Compact: Send + 'static,
43{
44 async fn push_step(
45 &mut self,
46 step: Args,
47 index: usize,
48 ) -> Result<(), TaskSinkError<Self::Error>> {
49 use futures::SinkExt;
50 let task_id = TaskId::new(S::IdType::generate());
51 let compact = S::Codec::encode(&step).map_err(|e| TaskSinkError::CodecError(e.into()))?;
52 let task = TaskBuilder::new(compact)
53 .meta(WorkflowContext { step_index: index })
54 .with_task_id(task_id.clone())
55 .build();
56 self.send(task)
57 .await
58 .map_err(|e| TaskSinkError::PushError(e))
59 }
60}