1use crate::backend::Backend;
38use crate::error::BoxDynError;
39use crate::task::task_id::{RandomId, TaskId};
40use crate::task::Task;
41use crate::worker::builder::{IntoWorkerService, WorkerBuilder};
42use crate::worker::{Event, ReadinessService, TrackerService, WorkerError};
43use futures_channel::mpsc::{self, channel};
44use futures_core::future::BoxFuture;
45use futures_core::stream::BoxStream;
46use futures_util::{SinkExt, StreamExt};
47use std::fmt::{self, Debug};
48use std::future::Future;
49use std::marker::PhantomData;
50use std::task::{Context, Poll};
51use tower_layer::Layer;
52use tower_service::Service;
53
54pub struct TestWorker<B, S, Res, IdType = RandomId> {
83 stream: BoxStream<'static, Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>,
84 backend: PhantomData<B>,
85 service: PhantomData<(S, Res)>,
86}
87
88impl<B, S, Res, IdType> fmt::Debug for TestWorker<B, S, Res, IdType> {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 f.debug_struct("TestWorker")
91 .field("stream", &"BoxStream<...>") .field("backend", &std::any::type_name::<B>())
93 .field("service", &std::any::type_name::<(S, Res)>())
94 .field("id_type", &std::any::type_name::<IdType>())
95 .finish()
96 }
97}
98
99pub trait ExecuteNext<Args, Ctx> {
101 type Result;
103 fn execute_next(&mut self) -> impl Future<Output = Self::Result> + Send;
106}
107
108impl<B, S, Args, Ctx, Res, IdType> ExecuteNext<Args, Ctx> for TestWorker<B, S, Res, IdType>
109where
110 S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
111 B: Send,
112 Res: Send,
113{
114 type Result = Option<Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
115 async fn execute_next(&mut self) -> Self::Result {
116 self.stream.next().await
117 }
118}
119
120impl<B, S, Res> TestWorker<B, S, Res, ()> {
121 pub fn new<Args, Ctx, W>(backend: B, factory: W) -> TestWorker<B, S, Res, B::IdType>
123 where
124 W: IntoWorkerService<B, S, Args, Ctx>,
125 B: Backend<Args, Context = Ctx> + 'static,
126 S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
127 B::Stream: Unpin + Send + 'static,
128 B::Beat: Unpin + Send + 'static,
129 Args: Send + 'static,
130 Ctx: Send + 'static,
131 B::Error: Into<BoxDynError> + Send + 'static,
132 B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
133 S::Future: Send,
134 S::Error: Into<BoxDynError> + Send + Sync,
135 S::Response: Clone + Send,
136 Res: 'static,
137 <<B as Backend<Args>>::Layer as Layer<
138 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
139 >>::Service: Service<Task<Args, Ctx, B::IdType>>,
140 <<<B as Backend<Args>>::Layer as Layer<
141 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
142 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
143 <<<B as Backend<Args>>::Layer as Layer<
144 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
145 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
146 <<B as Backend<Args>>::Layer as Layer<
147 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
148 >>::Service: std::marker::Send + 'static,
149 B::IdType: Send + Clone + 'static,
150 {
151 let service = factory.into_service(&backend);
152 TestWorker::new_with_svc(backend, service)
153 }
154
155 pub fn new_with_svc<Args, Ctx>(backend: B, service: S) -> TestWorker<B, S, Res, B::IdType>
157 where
158 B: Backend<Args, Context = Ctx> + 'static,
159 S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
160 B::Stream: Unpin + Send + 'static,
161 B::Beat: Unpin + Send + 'static,
162 Args: Send + 'static,
163 Ctx: Send + 'static,
164 B::Error: Into<BoxDynError> + Send + 'static,
165 B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
166 S::Future: Send,
167 S::Error: Into<BoxDynError> + Send + Sync,
168 S::Response: Clone + Send,
169 Res: 'static,
170 <<B as Backend<Args>>::Layer as Layer<
171 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
172 >>::Service: Service<Task<Args, Ctx, B::IdType>>,
173 <<<B as Backend<Args>>::Layer as Layer<
174 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
175 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
176 <<<B as Backend<Args>>::Layer as Layer<
177 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
178 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
179 <<B as Backend<Args>>::Layer as Layer<
180 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
181 >>::Service: std::marker::Send + 'static,
182 B::IdType: Send + Clone + 'static,
183 {
184 enum Item<R, IdType> {
185 Ev(Result<Event, WorkerError>),
186 Res((TaskId<IdType>, Result<R, BoxDynError>)),
187 }
188 let (tx, rx) = channel(1);
189 let sender = tx.clone();
190 let service: TestEmitService<S, Res, B::IdType> = TestEmitService {
191 service,
192 tx: tx.clone(),
193 };
194 let stream = WorkerBuilder::new("test-worker")
195 .backend(backend)
196 .build(service)
197 .stream()
198 .map(|r| Item::Ev(r));
199 let task_stream = rx.map(|s| Item::Res(s));
200 let stream = futures_util::stream::select(task_stream, stream)
201 .filter_map(move |s| {
202 let mut tx = sender.clone();
203 async move {
204 match s {
205 Item::Ev(Err(e)) => {
206 tx.close().await.unwrap();
207 Some(Err(e))
208 }
209 Item::Ev(_) => None,
210 Item::Res(r) => Some(Ok(r)),
211 }
212 }
213 })
214 .boxed();
215 TestWorker {
216 stream,
217 service: PhantomData,
218 backend: PhantomData,
219 }
220 }
221}
222
223#[derive(Debug, Clone)]
225pub struct TestEmitService<S, Response, IdType> {
226 tx: mpsc::Sender<(TaskId<IdType>, Result<Response, BoxDynError>)>,
227 service: S,
228}
229
230impl<S, Args, Ctx, Res, IdType> Service<Task<Args, Ctx, IdType>> for TestEmitService<S, Res, IdType>
231where
232 S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
233 S::Future: Send + 'static,
234 Args: Send + 'static,
235 Ctx: Send + 'static,
236 S::Response: Send + 'static,
237 S::Error: Into<BoxDynError> + Send,
238 IdType: Send + 'static + Clone,
239{
240 type Response = ();
241 type Error = String;
242 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
243
244 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
245 self.service
246 .poll_ready(cx)
247 .map_err(|e| e.into().to_string())
248 }
249
250 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
251 let task_id = task.parts.task_id.clone().unwrap();
252 let mut tx = Clone::clone(&self.tx);
253 let fut = self.service.call(task);
254 Box::pin(async move {
255 let res = fut.await;
256 match res {
257 Ok(res) => {
258 tx.send((task_id, Ok(res))).await.unwrap();
259 Ok(())
260 }
261 Err(err) => {
262 let e = err.into();
263 let e_str = e.to_string();
264 tx.send((task_id, Err(e))).await.unwrap();
265 Err(e_str)
266 }
267 }
268 })
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use crate::{
275 backend::{memory::MemoryStorage, TaskSink},
276 error::BoxDynError,
277 task_fn::task_fn,
278 worker::{
279 test_worker::{ExecuteNext, TestWorker},
280 WorkerContext,
281 },
282 };
283 use std::time::Duration;
284
285 #[tokio::test]
286 async fn basic_worker() {
287 let mut backend = MemoryStorage::new();
288
289 for i in 0..=10 {
290 backend.push(i).await.unwrap();
291 }
292
293 let service = task_fn(|req: u32, w: WorkerContext| async move {
294 if req == 10 {
295 w.stop()?;
296 }
297 tokio::time::sleep(Duration::from_secs(1)).await;
298 Ok::<_, BoxDynError>(req)
299 });
300 let mut worker = TestWorker::new(backend, service);
301 while let Some(Ok((_, ret))) = worker.execute_next().await {
302 dbg!(ret.unwrap());
303 }
304 println!("Worker run successfully");
305 }
306}