1use crate::{
2 backend::{Backend, codec::Codec},
3 error::BoxDynError,
4 task::Task,
5};
6use futures_core::Stream;
7use futures_sink::Sink;
8use futures_util::SinkExt;
9use futures_util::StreamExt;
10use futures_util::stream;
11
12#[derive(Debug, thiserror::Error)]
14pub enum TaskSinkError<PushError> {
15 #[error("Failed to push task: {0}")]
17 PushError(#[from] PushError),
18 #[error("Failed to encode/decode task: {0}")]
20 CodecError(BoxDynError),
21}
22
23pub trait TaskSink<Args>: Backend {
25 fn push(
27 &mut self,
28 task: Args,
29 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
30
31 fn push_bulk(
33 &mut self,
34 tasks: Vec<Args>,
35 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
36
37 fn push_stream(
39 &mut self,
40 tasks: impl Stream<Item = Args> + Unpin + Send,
41 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
42
43 fn push_task(
45 &mut self,
46 task: Task<Args, Self::Context, Self::IdType>,
47 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
48
49 fn push_all(
51 &mut self,
52 tasks: impl Stream<Item = Task<Args, Self::Context, Self::IdType>> + Unpin + Send,
53 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
54}
55
56impl<Args, S, E, C> TaskSink<Args> for S
57where
58 S: Sink<Task<C::Compact, Self::Context, Self::IdType>, Error = E>
59 + Unpin
60 + Backend<Args = Args, Error = E, Codec = C>
61 + Send,
62 Args: Send,
63 C::Compact: Send,
64 S::Context: Send + Default,
65 S::IdType: Send + 'static,
66 C: Codec<Args>,
67 E: Send,
68 C::Error: std::error::Error + Send + Sync + 'static,
69{
70 async fn push(&mut self, task: Args) -> Result<(), TaskSinkError<Self::Error>> {
71 use futures_util::SinkExt;
72 let encoded = C::encode(&task).map_err(|e| TaskSinkError::CodecError(e.into()))?;
73 self.send(Task::new(encoded)).await?;
74 Ok(())
75 }
76
77 async fn push_bulk(&mut self, tasks: Vec<Args>) -> Result<(), TaskSinkError<Self::Error>> {
78 use futures_util::SinkExt;
79 let tasks = tasks
80 .into_iter()
81 .map(Task::new)
82 .map(|task| {
83 task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
84 })
85 .collect::<Result<Vec<_>, _>>()?;
86 self.send_all(&mut stream::iter(tasks.into_iter().map(Ok)))
87 .await?;
88 Ok(())
89 }
90
91 async fn push_stream(
92 &mut self,
93 tasks: impl Stream<Item = Args> + Unpin + Send,
94 ) -> Result<(), TaskSinkError<Self::Error>> {
95 Ok(self
96 .sink_map_err(|e| TaskSinkError::PushError(e))
97 .send_all(&mut tasks.map(Task::new).map(|task| {
98 task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
99 }))
100 .await?)
101 }
102
103 async fn push_task(
104 &mut self,
105 task: Task<Args, Self::Context, Self::IdType>,
106 ) -> Result<(), TaskSinkError<Self::Error>> {
107 use futures_util::SinkExt;
108 self.sink_map_err(|e| TaskSinkError::PushError(e))
109 .send(task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))?)
110 .await
111 }
112
113 async fn push_all(
114 &mut self,
115 tasks: impl Stream<Item = Task<Args, Self::Context, Self::IdType>> + Unpin + Send,
116 ) -> Result<(), TaskSinkError<Self::Error>> {
117 use futures_util::SinkExt;
118 self.sink_map_err(|e| TaskSinkError::PushError(e))
119 .send_all(&mut tasks.map(|task| {
120 task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
121 }))
122 .await
123 }
124}
125
126pub trait WeakTaskSink<Args>: Backend {
132 fn push(
134 &mut self,
135 task: Args,
136 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
137
138 fn push_bulk(
140 &mut self,
141 tasks: Vec<Args>,
142 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
143
144 fn push_stream(
146 &mut self,
147 tasks: impl Stream<Item = Args> + Unpin + Send,
148 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
149
150 fn push_task(
152 &mut self,
153 task: Task<Args, Self::Context, Self::IdType>,
154 ) -> impl Future<Output = Result<(), TaskSinkError<Self::Error>>> + Send;
155}
156
157impl<Args, S, E, C> WeakTaskSink<Args> for S
158where
159 S: Sink<Task<C::Compact, Self::Context, Self::IdType>, Error = E>
160 + Unpin
161 + Backend<Error = E, Codec = C> + Send,
163 Args: Send,
164 C::Compact: Send,
165 S::Context: Send + Default,
166 S::IdType: Send + 'static,
167 C: Codec<Args>,
168 E: Send,
169 C::Error: std::error::Error + Send + Sync + 'static,
170{
171 async fn push(&mut self, task: Args) -> Result<(), TaskSinkError<Self::Error>> {
172 use futures_util::SinkExt;
173 let encoded = C::encode(&task).map_err(|e| TaskSinkError::CodecError(e.into()))?;
174 self.sink_map_err(|e| TaskSinkError::PushError(e))
175 .send(Task::new(encoded))
176 .await
177 }
178
179 async fn push_bulk(&mut self, tasks: Vec<Args>) -> Result<(), TaskSinkError<Self::Error>> {
180 use futures_util::SinkExt;
181 self.sink_map_err(|e| TaskSinkError::PushError(e))
182 .send_all(&mut stream::iter(
183 tasks
184 .into_iter()
185 .map(Task::new)
186 .map(|task| {
187 task.try_map(|t| {
188 C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into()))
189 })
190 })
191 .collect::<Vec<_>>(),
192 ))
193 .await
194 }
195
196 async fn push_stream(
197 &mut self,
198 tasks: impl Stream<Item = Args> + Unpin + Send,
199 ) -> Result<(), TaskSinkError<Self::Error>> {
200 use futures_util::SinkExt;
201 use futures_util::StreamExt;
202 self.sink_map_err(|e| TaskSinkError::PushError(e))
203 .send_all(&mut tasks.map(Task::new).map(|task| {
204 task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))
205 }))
206 .await
207 }
208
209 async fn push_task(
210 &mut self,
211 task: Task<Args, Self::Context, Self::IdType>,
212 ) -> Result<(), TaskSinkError<Self::Error>> {
213 use futures_util::SinkExt;
214 self.sink_map_err(|e| TaskSinkError::PushError(e))
215 .send(task.try_map(|t| C::encode(&t).map_err(|e| TaskSinkError::CodecError(e.into())))?)
216 .await
217 }
218}