1use crate::backend::Backend;
38use crate::error::BoxDynError;
39use crate::task::Task;
40use crate::task::task_id::{RandomId, TaskId};
41use crate::worker::builder::{IntoWorkerService, WorkerBuilder};
42use crate::worker::{Event, ReadinessService, TrackerService, WorkerError};
43use futures_channel::mpsc::{self, channel};
44use futures_core::future::BoxFuture;
45use futures_core::stream::BoxStream;
46use futures_util::{SinkExt, StreamExt};
47use std::fmt::{self, Debug};
48use std::future::Future;
49use std::marker::PhantomData;
50use std::task::{Context, Poll};
51use tower_layer::Layer;
52use tower_service::Service;
53
54type TestStream<IdType, Res> =
55 BoxStream<'static, Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
56pub struct TestWorker<B, S, Res, IdType = RandomId> {
84 stream: TestStream<IdType, Res>,
85 backend: PhantomData<B>,
86 service: PhantomData<(S, Res)>,
87}
88
89impl<B, S, Res, IdType> fmt::Debug for TestWorker<B, S, Res, IdType> {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.debug_struct("TestWorker")
92 .field("stream", &"BoxStream<...>") .field("backend", &std::any::type_name::<B>())
94 .field("service", &std::any::type_name::<(S, Res)>())
95 .field("id_type", &std::any::type_name::<IdType>())
96 .finish()
97 }
98}
99
100pub trait ExecuteNext<Args, Ctx> {
102 type Result;
104 fn execute_next(&mut self) -> impl Future<Output = Self::Result> + Send;
107}
108
109impl<B, S, Args, Ctx, Res, IdType> ExecuteNext<Args, Ctx> for TestWorker<B, S, Res, IdType>
110where
111 S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
112 B: Send,
113 Res: Send,
114{
115 type Result = Option<Result<(TaskId<IdType>, Result<Res, BoxDynError>), WorkerError>>;
116 async fn execute_next(&mut self) -> Self::Result {
117 self.stream.next().await
118 }
119}
120
121impl<B, S, Res> TestWorker<B, S, Res, ()> {
122 pub fn new<Args, Ctx, W>(backend: B, factory: W) -> TestWorker<B, S, Res, B::IdType>
124 where
125 W: IntoWorkerService<B, S, Args, Ctx>,
126 B: Backend<Args = Args, Context = Ctx> + 'static,
127 S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
128 B::Stream: Unpin + Send + 'static,
129 B::Beat: Unpin + Send + 'static,
130 Args: Send + 'static,
131 Ctx: Send + 'static,
132 B::Error: Into<BoxDynError> + Send + 'static,
133 B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
134 S::Future: Send,
135 S::Error: Into<BoxDynError> + Send + Sync,
136 S::Response: Clone + Send,
137 Res: 'static,
138 <<B as Backend>::Layer as Layer<
139 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
140 >>::Service: Service<Task<Args, Ctx, B::IdType>>,
141 <<<B as Backend>::Layer as Layer<
142 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
143 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
144 <<<B as Backend>::Layer as Layer<
145 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
146 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
147 <<B as Backend>::Layer as Layer<
148 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
149 >>::Service: std::marker::Send + 'static,
150 B::IdType: Send + Clone + 'static,
151 {
152 let service = factory.into_service(&backend);
153 TestWorker::new_with_svc(backend, service)
154 }
155
156 pub fn new_with_svc<Args, Ctx>(backend: B, service: S) -> TestWorker<B, S, Res, B::IdType>
158 where
159 B: Backend<Args = Args, Context = Ctx> + 'static,
160 S: Service<Task<Args, Ctx, B::IdType>, Response = Res> + Send + 'static,
161 B::Stream: Unpin + Send + 'static,
162 B::Beat: Unpin + Send + 'static,
163 Args: Send + 'static,
164 Ctx: Send + 'static,
165 B::Error: Into<BoxDynError> + Send + 'static,
166 B::Layer: Layer<ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>>,
167 S::Future: Send,
168 S::Error: Into<BoxDynError> + Send + Sync,
169 S::Response: Clone + Send,
170 Res: 'static,
171 <<B as Backend>::Layer as Layer<
172 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
173 >>::Service: Service<Task<Args, Ctx, B::IdType>>,
174 <<<B as Backend>::Layer as Layer<
175 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
176 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Error: Into<BoxDynError> + Sync + Send,
177 <<<B as Backend>::Layer as Layer<
178 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
179 >>::Service as Service<Task<Args, Ctx, B::IdType>>>::Future: Send,
180 <<B as Backend>::Layer as Layer<
181 ReadinessService<TrackerService<TestEmitService<S, Res, B::IdType>>>,
182 >>::Service: std::marker::Send + 'static,
183 B::IdType: Send + Clone + 'static,
184 {
185 enum Item<R, IdType> {
186 Ev(Result<Event, WorkerError>),
187 Res((TaskId<IdType>, Result<R, BoxDynError>)),
188 }
189 let (tx, rx) = channel(1);
190 let sender = tx.clone();
191 let service: TestEmitService<S, Res, B::IdType> = TestEmitService {
192 service,
193 tx: tx.clone(),
194 };
195 let stream = WorkerBuilder::new("test-worker")
196 .backend(backend)
197 .build(service)
198 .stream()
199 .map(|r| Item::Ev(r));
200 let task_stream = rx.map(|s| Item::Res(s));
201 let stream = futures_util::stream::select(task_stream, stream)
202 .filter_map(move |s| {
203 let mut tx = sender.clone();
204 async move {
205 match s {
206 Item::Ev(Err(e)) => {
207 tx.close().await.unwrap();
208 Some(Err(e))
209 }
210 Item::Ev(_) => None,
211 Item::Res(r) => Some(Ok(r)),
212 }
213 }
214 })
215 .boxed();
216 TestWorker {
217 stream,
218 service: PhantomData,
219 backend: PhantomData,
220 }
221 }
222}
223
224#[derive(Debug, Clone)]
226pub struct TestEmitService<S, Response, IdType> {
227 tx: mpsc::Sender<(TaskId<IdType>, Result<Response, BoxDynError>)>,
228 service: S,
229}
230
231impl<S, Args, Ctx, Res, IdType> Service<Task<Args, Ctx, IdType>> for TestEmitService<S, Res, IdType>
232where
233 S: Service<Task<Args, Ctx, IdType>, Response = Res> + Send + 'static,
234 S::Future: Send + 'static,
235 Args: Send + 'static,
236 Ctx: Send + 'static,
237 S::Response: Send + 'static,
238 S::Error: Into<BoxDynError> + Send,
239 IdType: Send + 'static + Clone,
240{
241 type Response = ();
242 type Error = String;
243 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
244
245 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
246 self.service
247 .poll_ready(cx)
248 .map_err(|e| e.into().to_string())
249 }
250
251 fn call(&mut self, task: Task<Args, Ctx, IdType>) -> Self::Future {
252 let task_id = task.parts.task_id.clone().unwrap();
253 let mut tx = Clone::clone(&self.tx);
254 let fut = self.service.call(task);
255 Box::pin(async move {
256 let res = fut.await;
257 match res {
258 Ok(res) => {
259 tx.send((task_id, Ok(res))).await.unwrap();
260 Ok(())
261 }
262 Err(err) => {
263 let e = err.into();
264 let e_str = e.to_string();
265 tx.send((task_id, Err(e))).await.unwrap();
266 Err(e_str)
267 }
268 }
269 })
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use crate::{
276 backend::{TaskSink, memory::MemoryStorage},
277 error::BoxDynError,
278 task_fn::task_fn,
279 worker::{
280 WorkerContext,
281 test_worker::{ExecuteNext, TestWorker},
282 },
283 };
284 use std::time::Duration;
285
286 #[tokio::test]
287 async fn basic_worker() {
288 let mut backend = MemoryStorage::new();
289
290 for i in 0..=10 {
291 backend.push(i).await.unwrap();
292 }
293
294 let service = task_fn(|req: u32, w: WorkerContext| async move {
295 if req == 10 {
296 w.stop()?;
297 }
298 tokio::time::sleep(Duration::from_secs(1)).await;
299 Ok::<_, BoxDynError>(req)
300 });
301 let mut worker = TestWorker::new(backend, service);
302 while let Some(Ok((_, ret))) = worker.execute_next().await {
303 dbg!(ret.unwrap());
304 }
305 println!("Worker run successfully");
306 }
307}