apalis_core/backend/
sink.rs

1use crate::{
2    backend::{Backend, codec::Codec},
3    error::BoxDynError,
4    task::Task,
5};
6use futures_core::Stream;
7use futures_sink::Sink;
8use futures_util::SinkExt;
9use futures_util::StreamExt;
10use futures_util::stream;
11
12/// Error type for TaskSink operations
13#[derive(Debug, thiserror::Error)]
14pub enum TaskSinkError<PushError> {
15    /// Error occurred while pushing the task
16    #[error("Failed to push task: {0}")]
17    PushError(#[from] PushError),
18    /// Error occurred during encoding/decoding of the task
19    #[error("Failed to encode/decode task: {0}")]
20    CodecError(BoxDynError),
21}
22
23/// Extends Backend to allow pushing tasks into the backend
24pub trait TaskSink<Args>: Backend {
25    /// Allows pushing a single task into the backend
26    fn push(
27        &mut self,
28        task: Args,
29    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
30
31    /// Allows pushing multiple tasks into the backend in bulk
32    fn push_bulk(
33        &mut self,
34        tasks: Vec<Args>,
35    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
36
37    /// Allows pushing tasks from a stream into the backend
38    fn push_stream(
39        &mut self,
40        tasks: impl Stream<Item = Args> + Unpin + Send,
41    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
42
43    /// Allows pushing a fully constructed task into the backend
44    fn push_task(
45        &mut self,
46        task: Task<Args, Self::Context, Self::IdType>,
47    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
48
49    /// Allows pushing a fully constructed task into the backend
50    fn push_all(
51        &mut self,
52        tasks: impl Stream<Item = Task<Args, Self::Context, Self::IdType>> + Unpin + Send,
53    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
54}
55
56impl<Args, S, E, C> TaskSink<Args> for S
57where
58    S: Sink<Task<C::Compact, Self::Context, Self::IdType>, Error = E>
59        + Unpin
60        + Backend<Args = Args, Error = E, Codec = C>
61        + Send,
62    Args: Send,
63    C::Compact: Send,
64    S::Context: Send + Default,
65    S::IdType: Send + 'static,
66    C: Codec<Args>,
67    E: Send,
68    C::Error: std::error::Error + Send + Sync + 'static,
69{
70    async fn push(&mut self, task: Args) -> Result<(), TaskSinkError<Self::Error>> {
71        use futures_util::SinkExt;
72        let encoded = C::encode(&task).map_err(|e| TaskSinkError::CodecError(e.into()))?;
73        self.send(Task::new(encoded)).await?;
74        Ok(())
75    }
76
77    async fn push_bulk(&mut self, tasks: Vec<Args>) -> Result<(), TaskSinkError<Self::Error>> {
78        use futures_util::SinkExt;
79        let tasks = tasks
80            .into_iter()
81            .map(Task::new)
82            .map(|task| {
83                task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
84            })
85            .collect::<Result<Vec<_>, _>>()?;
86        self.send_all(&mut stream::iter(tasks.into_iter().map(Ok)))
87            .await?;
88        Ok(())
89    }
90
91    async fn push_stream(
92        &mut self,
93        tasks: impl Stream<Item = Args> + Unpin + Send,
94    ) -> Result<(), TaskSinkError<Self::Error>> {
95        Ok(self
96            .sink_map_err(|e| TaskSinkError::PushError(e))
97            .send_all(&mut tasks.map(Task::new).map(|task| {
98                task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
99            }))
100            .await?)
101    }
102
103    async fn push_task(
104        &mut self,
105        task: Task<Args, Self::Context, Self::IdType>,
106    ) -> Result<(), TaskSinkError<Self::Error>> {
107        use futures_util::SinkExt;
108        self.sink_map_err(|e| TaskSinkError::PushError(e))
109            .send(task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))?)
110            .await
111    }
112
113    async fn push_all(
114        &mut self,
115        tasks: impl Stream<Item = Task<Args, Self::Context, Self::IdType>> + Unpin + Send,
116    ) -> Result<(), TaskSinkError<Self::Error>> {
117        use futures_util::SinkExt;
118        self.sink_map_err(|e| TaskSinkError::PushError(e))
119            .send_all(&mut tasks.map(|task| {
120                task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
121            }))
122            .await
123    }
124}
125
126/// Extends Backend to allow pushing tasks into the backend without requiring compile time constraints.
127/// By default the TaskSink trait requires `TaskSink<Args>: Backend<Args>` which can be restrictive in certain scenarios.
128/// This means you cannot push tasks of different argument types using the same backend instance.
129/// The WeakTaskSink trait relaxes this constraint, allowing pushing tasks of any argument type as long as they match the backend's context and ID type.
130/// This is useful for dynamic task management scenarios where tasks of varying types need to be pushed into the same backend instance.
131pub trait WeakTaskSink<Args>: Backend {
132    /// Allows pushing a single task into the backend
133    fn push(
134        &mut self,
135        task: Args,
136    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
137
138    /// Allows pushing multiple tasks into the backend in bulk
139    fn push_bulk(
140        &mut self,
141        tasks: Vec<Args>,
142    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
143
144    /// Allows pushing tasks from a stream into the backend
145    fn push_stream(
146        &mut self,
147        tasks: impl Stream<Item = Args> + Unpin + Send,
148    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
149
150    /// Allows pushing a fully constructed task into the backend
151    fn push_task(
152        &mut self,
153        task: Task<Args, Self::Context, Self::IdType>,
154    ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
155}
156
157impl<Args, S, E, C> WeakTaskSink<Args> for S
158where
159    S: Sink<Task<C::Compact, Self::Context, Self::IdType>, Error = E>
160        + Unpin
161        + Backend<Error = E, Codec = C> // Note: No Args constraint here
162        + Send,
163    Args: Send,
164    C::Compact: Send,
165    S::Context: Send + Default,
166    S::IdType: Send + 'static,
167    C: Codec<Args>,
168    E: Send,
169    C::Error: std::error::Error + Send + Sync + 'static,
170{
171    async fn push(&mut self, task: Args) -> Result<(), TaskSinkError<Self::Error>> {
172        use futures_util::SinkExt;
173        let encoded = C::encode(&task).map_err(|e| TaskSinkError::CodecError(e.into()))?;
174        self.sink_map_err(|e| TaskSinkError::PushError(e))
175            .send(Task::new(encoded))
176            .await
177    }
178
179    async fn push_bulk(&mut self, tasks: Vec<Args>) -> Result<(), TaskSinkError<Self::Error>> {
180        use futures_util::SinkExt;
181        self.sink_map_err(|e| TaskSinkError::PushError(e))
182            .send_all(&mut stream::iter(
183                tasks
184                    .into_iter()
185                    .map(Task::new)
186                    .map(|task| {
187                        task.try_map(|t| {
188                            C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into()))
189                        })
190                    })
191                    .collect::<Vec<_>>(),
192            ))
193            .await
194    }
195
196    async fn push_stream(
197        &mut self,
198        tasks: impl Stream<Item = Args> + Unpin + Send,
199    ) -> Result<(), TaskSinkError<Self::Error>> {
200        use futures_util::SinkExt;
201        use futures_util::StreamExt;
202        self.sink_map_err(|e| TaskSinkError::PushError(e))
203            .send_all(&mut tasks.map(Task::new).map(|task| {
204                task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
205            }))
206            .await
207    }
208
209    async fn push_task(
210        &mut self,
211        task: Task<Args, Self::Context, Self::IdType>,
212    ) -> Result<(), TaskSinkError<Self::Error>> {
213        use futures_util::SinkExt;
214        self.sink_map_err(|e| TaskSinkError::PushError(e))
215            .send(task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))?)
216            .await
217    }
218}