1use crate::backend::Backend;
38use crate::error::BoxDynError;
39use crate::task::Task;
40use crate::task::task_id::{RandomId, TaskId};
41use crate::worker::builder::{IntoWorkerService, WorkerBuilder};
42use crate::worker::{Event, ReadinessService, TrackerService, WorkerError};
43use futures_channel::mpsc::{self, channel};
44use futures_core::future::BoxFuture;
45use futures_core::stream::BoxStream;
46use futures_util::{SinkExt, StreamExt};
47use std::fmt::{self, Debug};
48use std::future::Future;
49use std::marker::PhantomData;
50use std::task::{Context, Poll};
51use tower_layer::Layer;
52use tower_service::Service;
53
54type TestStream<IdType, Res> =
55 BoxStream<'static, Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
56pub struct TestWorker<B, S, Res, IdType = RandomId> {
84 stream: TestStream<IdType, Res>,
85 backend: PhantomData<B>,
86 service: PhantomData<(S, Res)>,
87}
88
89impl<B, S, Res, IdType> fmt::Debug for TestWorker<B, S, Res, IdType> {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.debug_struct("TestWorker")
92 .field("stream", &"BoxStream<...>") .field("backend", &std::any::type_name::<B>())
94 .field("service", &std::any::type_name::<(S, Res)>())
95 .field("id_type", &std::any::type_name::<IdType>())
96 .finish()
97 }
98}
99
100pub trait ExecuteNext<Args, Ctx> {
102 type Result;
104 fn execute_next(&mut self) -> impl Future<Output = Self::Result> + Send;
107}
108
109impl<B, S, Args, Ctx, Res, IdType> ExecuteNext<Args, Ctx> for TestWorker<B, S, Res, IdType>
110where
111 S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
112 B: Send,
113 Res: Send,
114{
115 type Result = Option<Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
116 async fn execute_next(&mut self) -> Self::Result {
117 self.stream.next().await
118 }
119}
120
121impl<B, S, Res> TestWorker<B, S, Res, ()> {
122 pub fn new<Args, Ctx, W>(backend: B, factory: W) -> TestWorker<W::Backend, S, Res, B::IdType>
124 where
125 W: IntoWorkerService<B, S, Args, Ctx>,
126 W::Backend: Backend<
127 Args = B::Args,
128 Context = B::Context,
129 IdType = B::IdType,
130 Error = B::Error,
131 Stream = B::Stream,
132 Beat = B::Beat,
133 Layer = B::Layer,
134 > + 'static,
135 B: Backend<Args = Args, Context = Ctx> + 'static,
136 S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
137 B::Stream: Unpin + Send + 'static,
138 B::Beat: Unpin + Send + 'static,
139 Args: Send + 'static,
140 Ctx: Send + 'static,
141 B::Error: Into<BoxDynError> + Send + 'static,
142 B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
143 S::Future: Send,
144 S::Error: Into<BoxDynError> + Send + Sync,
145 S::Response: Clone + Send,
146 Res: 'static,
147 <<B as Backend>::Layer as Layer<
148 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
149 >>::Service: Service<Task<Args, Ctx, B::IdType>>,
150 <<<B as Backend>::Layer as Layer<
151 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
152 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
153 <<<B as Backend>::Layer as Layer<
154 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
155 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
156 <<B as Backend>::Layer as Layer<
157 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
158 >>::Service: std::marker::Send + 'static,
159 B::IdType: Send + Clone + 'static,
160 {
161 let worker_service = factory.into_service(backend);
162 TestWorker::<W::Backend, S, Res, _>::new_with_svc(
163 worker_service.backend,
164 worker_service.service,
165 )
166 }
167}
168
169impl<B, S, Res> TestWorker<B, S, Res, ()> {
170 pub fn new_with_svc<Args, Ctx>(backend: B, service: S) -> TestWorker<B, S, Res, B::IdType>
172 where
173 B: Backend<Args = Args, Context = Ctx> + 'static,
174 S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
175 B::Stream: Unpin + Send + 'static,
176 B::Beat: Unpin + Send + 'static,
177 Args: Send + 'static,
178 Ctx: Send + 'static,
179 B::Error: Into<BoxDynError> + Send + 'static,
180 B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
181 S::Future: Send,
182 S::Error: Into<BoxDynError> + Send + Sync,
183 S::Response: Clone + Send,
184 Res: 'static,
185 <<B as Backend>::Layer as Layer<
186 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
187 >>::Service: Service<Task<Args, Ctx, B::IdType>>,
188 <<<B as Backend>::Layer as Layer<
189 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
190 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
191 <<<B as Backend>::Layer as Layer<
192 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
193 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
194 <<B as Backend>::Layer as Layer<
195 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
196 >>::Service: std::marker::Send + 'static,
197 B::IdType: Send + Clone + 'static,
198 {
199 enum Item<R, IdType> {
200 Ev(Result<Event, WorkerError>),
201 Res((TaskId<IdType>, Result<R, BoxDynError>)),
202 }
203 let (tx, rx) = channel(1);
204 let sender = tx.clone();
205 let service: TestEmitService<S, Res, B::IdType> = TestEmitService {
206 service,
207 tx: tx.clone(),
208 };
209 let stream = WorkerBuilder::new("test-worker")
210 .backend(backend)
211 .build(service)
212 .stream()
213 .map(|r| Item::Ev(r));
214 let task_stream = rx.map(|s| Item::Res(s));
215 let stream = futures_util::stream::select(task_stream, stream)
216 .filter_map(move |s| {
217 let mut tx = sender.clone();
218 async move {
219 match s {
220 Item::Ev(Err(e)) => {
221 tx.close().await.unwrap();
222 Some(Err(e))
223 }
224 Item::Ev(_) => None,
225 Item::Res(r) => Some(Ok(r)),
226 }
227 }
228 })
229 .boxed();
230 TestWorker {
231 stream,
232 service: PhantomData,
233 backend: PhantomData,
234 }
235 }
236}
237
238impl<B, S, Res, I> TestWorker<B, S, Res, I> {
239 #[must_use]
241 pub fn into_stream(self) -> TestStream<I, Res> {
242 self.stream
243 }
244}
245
246#[derive(Debug, Clone)]
248pub struct TestEmitService<S, Response, IdType> {
249 tx: mpsc::Sender<(TaskId<IdType>, Result<Response, BoxDynError>)>,
250 service: S,
251}
252
253impl<S, Args, Ctx, Res, IdType> Service<Task<Args, Ctx, IdType>> for TestEmitService<S, Res, IdType>
254where
255 S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
256 S::Future: Send + 'static,
257 Args: Send + 'static,
258 Ctx: Send + 'static,
259 S::Response: Send + Clone + 'static,
260 S::Error: Into<BoxDynError> + Send,
261 IdType: Send + 'static + Clone,
262{
263 type Response = Res;
264 type Error = String;
265 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
266
267 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
268 self.service
269 .poll_ready(cx)
270 .map_err(|e| e.into().to_string())
271 }
272
273 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
274 let task_id = task.parts.task_id.clone().unwrap();
275 let mut tx = Clone::clone(&self.tx);
276 let fut = self.service.call(task);
277 Box::pin(async move {
278 let res = fut.await;
279 match res {
280 Ok(res) => {
281 tx.send((task_id, Ok(res.clone()))).await.unwrap();
282 Ok(res)
283 }
284 Err(err) => {
285 let e = err.into();
286 let e_str = e.to_string();
287 tx.send((task_id, Err(e))).await.unwrap();
288 Err(e_str)
289 }
290 }
291 })
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use futures_util::StreamExt;
298
299 use crate::{
300 backend::{TaskSink, memory::MemoryStorage},
301 error::BoxDynError,
302 task_fn::task_fn,
303 worker::{
304 WorkerContext,
305 test_worker::{ExecuteNext, TestWorker},
306 },
307 };
308 use std::time::Duration;
309
310 #[tokio::test]
311 async fn basic_worker() {
312 let mut backend = MemoryStorage::new();
313
314 for i in 0..=10 {
315 backend.push(i).await.unwrap();
316 }
317
318 let service = task_fn(|req: u32, w: WorkerContext| async move {
319 if req == 10 {
320 w.stop()?;
321 }
322 tokio::time::sleep(Duration::from_secs(1)).await;
323 Ok::<_, BoxDynError>(req)
324 });
325 let mut worker = TestWorker::new(backend, service);
326 while let Some(Ok((_, ret))) = worker.execute_next().await {
327 ret.unwrap();
328 }
329 println!("Worker run successfully");
330 }
331
332 #[tokio::test]
333 async fn basic_worker_as_stream() {
334 let mut backend = MemoryStorage::new();
335
336 for i in 0..=10 {
337 backend.push(i).await.unwrap();
338 }
339
340 let service = task_fn(|req: u32, w: WorkerContext| async move {
341 if req == 10 {
342 w.stop()?;
343 }
344 tokio::time::sleep(Duration::from_secs(1)).await;
345 Ok::<_, BoxDynError>(req)
346 });
347 let worker = TestWorker::new(backend, service);
348 let mut result_stream = worker.into_stream();
349 while let Some(Ok((_, ret))) = result_stream.next().await {
350 ret.unwrap();
351 }
352 println!("Worker run successfully");
353 }
354}