1use crate::backend::Backend;
2use crate::builder::WorkerBuilder;
3use crate::codec::Codec;
4use crate::error::{BoxDynError, Error};
5use crate::request::{Parts, Request};
6use crate::service_fn::{service_fn, ServiceFn};
7use crate::storage::Storage;
8use crate::worker::{Ready, Worker};
9use futures::future::BoxFuture;
10use futures::FutureExt;
11use serde::de::DeserializeOwned;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use std::time::Duration;
21use tower::Layer;
22use tower::Service;
23
24type BoxedService<Input, Output> = tower::util::BoxService<Input, Output, crate::error::Error>;
25
26type SteppedService<Compact, Index, Ctx> =
27 BoxedService<Request<StepRequest<Compact, Index>, Ctx>, GoTo<Compact>>;
28
29#[derive(Debug, Serialize, Deserialize, Clone)]
31pub enum GoTo<N = ()> {
32 Next(N),
34 Delay {
36 next: N,
38 delay: Duration,
40 },
41 Done(N),
43}
44
45#[derive(Debug)]
47pub struct StepBuilder<Ctx, Compact, Input, Current, Encode, Index = usize> {
48 steps: HashMap<Index, SteppedService<Compact, Index, Ctx>>,
49 current_index: Index,
50 current: PhantomData<Current>,
51 codec: PhantomData<Encode>,
52 input: PhantomData<Input>,
53}
54
55impl<Ctx, Compact, Input, Encode, Index: Default> Default
56 for StepBuilder<Ctx, Compact, Input, Input, Encode, Index>
57{
58 fn default() -> Self {
59 Self {
60 steps: HashMap::new(),
61 current_index: Index::default(),
62 current: PhantomData,
63 codec: PhantomData,
64 input: PhantomData,
65 }
66 }
67}
68
69impl<Ctx, Compact, Input, Encode> StepBuilder<Ctx, Compact, Input, Input, Encode, usize> {
70 pub fn new() -> Self {
72 Self {
73 steps: HashMap::new(),
74 current_index: usize::default(),
75 current: PhantomData,
76 codec: PhantomData,
77 input: PhantomData,
78 }
79 }
80
81 pub fn new_with_stepper<I: Default>() -> StepBuilder<Ctx, Compact, Input, Input, Encode, I> {
83 StepBuilder {
84 steps: HashMap::new(),
85 current_index: I::default(),
86 current: PhantomData,
87 codec: PhantomData,
88 input: PhantomData,
89 }
90 }
91}
92
93impl<Ctx, Compact, Input, Current, Encode, Index>
109 StepBuilder<Ctx, Compact, Input, Current, Encode, Index>
110{
111 pub fn build<S>(self, store: S) -> StepService<Ctx, Compact, Input, S, Index> {
113 StepService {
114 inner: self.steps,
115 storage: store,
116 input: PhantomData,
117 }
118 }
119}
120
121#[derive(Debug)]
123pub struct StepService<Ctx, Compact, Input, S, Index> {
124 inner: HashMap<Index, SteppedService<Compact, Index, Ctx>>,
125 storage: S,
126 input: PhantomData<Input>,
127}
128
129impl<
130 Ctx,
131 Compact,
132 S: Storage<Job = StepRequest<Compact, Index>> + Send + Clone + 'static,
133 Input,
134 Index,
135 > Service<Request<StepRequest<Compact, Index>, Ctx>>
136 for StepService<Ctx, Compact, Input, S, Index>
137where
138 Compact: DeserializeOwned + Send + Clone + 'static,
139 S::Error: Send + Sync + std::error::Error,
140 Index: StepIndex + Send + Sync + 'static,
141{
142 type Response = GoTo<Compact>;
143 type Error = crate::error::Error;
144 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
145
146 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147 Poll::Ready(Ok(()))
148 }
149
150 fn call(&mut self, req: Request<StepRequest<Compact, Index>, Ctx>) -> Self::Future {
151 let index = &req.args.index;
152 let next_index = index.next();
153
154 let service = self
155 .inner
156 .get_mut(index)
157 .expect("Invalid index in inner services");
158 let fut = service.call(req);
160 let mut storage = self.storage.clone();
161 Box::pin(async move {
162 match fut.await {
163 Ok(response) => {
164 match &response {
165 GoTo::Next(resp) => {
166 storage
167 .push(StepRequest {
168 index: next_index,
169 step: resp.clone(),
170 })
171 .await
172 .map_err(|e| Error::SourceError(Arc::new(e.into())))?;
173 }
174 GoTo::Delay { next, delay } => {
175 let now = std::time::SystemTime::now();
176 let epoch_ms: u128 = now
177 .duration_since(std::time::UNIX_EPOCH)
178 .unwrap_or_default()
179 .as_millis();
180
181 let delay: u128 = delay.as_millis();
183
184 let target_ms: u128 = epoch_ms + delay;
187
188 let target_s: u128 = (target_ms + 999) / 1000;
190
191 let target = i64::try_from(target_s).unwrap_or(i64::MAX);
193 storage
194 .schedule(
195 StepRequest {
196 index: next_index,
197 step: next.clone(),
198 },
199 target,
200 )
201 .await
202 .map_err(|e| Error::SourceError(Arc::new(e.into())))?;
203 }
204 GoTo::Done(_) => {
205 }
207 };
208 Ok(response)
209 }
210 Err(e) => Err(e),
211 }
212 })
213 }
214}
215
216struct TransformingService<S, Compact, Input, Current, Next, Codec> {
217 inner: S,
218 _req: PhantomData<Compact>,
219 _input: PhantomData<Input>,
220 _codec: PhantomData<Codec>,
221 _output: PhantomData<Next>,
222 _current: PhantomData<Current>,
223}
224
225impl<S, Compact, Codec, Input, Current, Next>
226 TransformingService<S, Compact, Input, Current, Next, Codec>
227{
228 fn new(inner: S) -> Self {
229 TransformingService {
230 inner,
231 _req: PhantomData,
232 _input: PhantomData,
233 _output: PhantomData,
234 _codec: PhantomData,
235 _current: PhantomData,
236 }
237 }
238}
239
240impl<S, Ctx, Input, Current, Next, Compact, Encode, Index>
241 Service<Request<StepRequest<Compact, Index>, Ctx>>
242 for TransformingService<S, Compact, Input, Current, Next, Encode>
243where
244 S: Service<Request<Current, Ctx>, Response = GoTo<Next>>,
245 Ctx: Default,
246 S::Future: Send + 'static,
247 Current: DeserializeOwned,
248 Next: Serialize,
249 Encode: Codec<Compact = Compact>,
250 Encode::Error: Debug,
251{
252 type Response = GoTo<Compact>;
253 type Error = S::Error;
254 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
255
256 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
257 self.inner.poll_ready(cx)
258 }
259
260 fn call(&mut self, req: Request<StepRequest<Compact, Index>, Ctx>) -> Self::Future {
261 let transformed_req: Request<Current, Ctx> = {
262 Request::new_with_parts(
263 Encode::decode(req.args.step).expect(&format!(
264 "Could not decode step, expecting {}",
265 std::any::type_name::<Current>()
266 )),
267 req.parts,
268 )
269 };
270 let fut = self.inner.call(transformed_req).map(|res| match res {
271 Ok(o) => Ok(match o {
272 GoTo::Next(next) => {
273 GoTo::Next(Encode::encode(next).expect("Could not encode the next step"))
274 }
275 GoTo::Delay { next, delay } => GoTo::Delay {
276 next: Encode::encode(next).expect("Could not encode the next step"),
277 delay,
278 },
279 GoTo::Done(res) => {
280 GoTo::Done(Encode::encode(res).expect("Could not encode the next step"))
281 }
282 }),
283 Err(e) => Err(e),
284 });
285
286 Box::pin(fut)
287 }
288}
289
290#[derive(Debug, Serialize, Deserialize)]
292pub struct StepRequest<T, Index = usize> {
293 step: T,
294 index: Index,
295}
296
297impl<T, Index> StepRequest<T, Index> {
298 pub fn new(step: T) -> Self
300 where
301 Index: Default,
302 {
303 Self {
304 step,
305 index: Index::default(),
306 }
307 }
308
309 pub fn new_with_index(step: T, index: Index) -> Self {
311 Self { step, index }
312 }
313}
314
315pub trait Step<S, Ctx, Compact, Input, Current, Next, Encode, Index> {
317 fn step(self, service: S) -> StepBuilder<Ctx, Compact, Input, Next, Encode, Index>;
319}
320
321impl<S, Ctx, Input, Current, Next, Compact, Encode, Index>
322 Step<S, Ctx, Compact, Input, Current, Next, Encode, Index>
323 for StepBuilder<Ctx, Compact, Input, Current, Encode, Index>
324where
325 S: Service<Request<Current, Ctx>, Response = GoTo<Next>, Error = crate::error::Error>
326 + Send
327 + 'static
328 + Sync,
329 S::Future: Send + 'static,
330 Current: DeserializeOwned + Send + 'static,
331 S::Response: 'static,
332 Input: Send + 'static + Serialize,
333 Ctx: Default + Send,
334 Next: 'static + Send + Serialize,
335 Compact: Send + 'static,
336 Encode: Codec<Compact = Compact> + Send + 'static,
337 Encode::Error: Debug,
338 Index: StepIndex,
339{
340 fn step(mut self, service: S) -> StepBuilder<Ctx, Compact, Input, Next, Encode, Index> {
341 let next = self.current_index.next();
342 self.steps.insert(
343 self.current_index,
344 BoxedService::new(TransformingService::<
345 S,
346 Compact,
347 Input,
348 Current,
349 Next,
350 Encode,
351 >::new(service)),
352 );
353 StepBuilder {
354 steps: self.steps,
355 current: PhantomData,
356 codec: PhantomData,
357 input: PhantomData,
358 current_index: next,
359 }
360 }
361}
362
363pub trait StepFn<F, FnArgs, Ctx, Compact, Input, Current, Next, Codec, Index> {
365 fn step_fn(self, f: F) -> StepBuilder<Ctx, Compact, Input, Next, Codec, Index>;
367}
368
369impl<
370 S,
371 Ctx: Send + Sync,
372 F: Send + Sync,
373 FnArgs: Send + Sync,
374 Input,
375 Current,
376 Next,
377 Compact,
378 Encode,
379 Index,
380 > StepFn<F, FnArgs, Ctx, Compact, Input, Current, Next, Encode, Index> for S
381where
382 S: Step<ServiceFn<F, Current, Ctx, FnArgs>, Ctx, Compact, Input, Current, Next, Encode, Index>,
383{
384 fn step_fn(self, f: F) -> StepBuilder<Ctx, Compact, Input, Next, Encode, Index> {
385 self.step(service_fn(f))
386 }
387}
388
389pub trait StepWorkerFactory<Ctx, Compact, Input, Output, Index> {
391 type Source;
393
394 type Service;
396
397 type Codec;
399 fn build_stepped(
408 self,
409 builder: StepBuilder<Ctx, Compact, Input, Output, Self::Codec, Index>,
410 ) -> Worker<Ready<Self::Service, Self::Source>>;
411}
412
413impl<Req, P, M, Ctx, Input, Compact, Output, Index>
414 StepWorkerFactory<Ctx, Compact, Input, Output, Index>
415 for WorkerBuilder<Req, Ctx, P, M, StepService<Ctx, Compact, Input, P, Index>>
416where
417 Compact: Send + 'static + Sync,
418 P: Backend<Request<StepRequest<Compact, Index>, Ctx>> + 'static,
419 P: Storage<Job = StepRequest<Compact, Index>> + Clone,
420 M: Layer<StepService<Ctx, Compact, Input, P, Index>> + 'static,
421{
422 type Source = P;
423
424 type Service = M::Service;
425
426 type Codec = <P as Backend<Request<StepRequest<Compact, Index>, Ctx>>>::Codec;
427
428 fn build_stepped(
429 self,
430 builder: StepBuilder<Ctx, Compact, Input, Output, Self::Codec, Index>,
431 ) -> Worker<Ready<M::Service, P>> {
432 let worker_id = self.id;
433 let poller = self.source;
434 let middleware = self.layer;
435 let service = builder.build(poller.clone());
436 let service = middleware.service(service);
437
438 Worker::new(worker_id, Ready::new(service, poller))
439 }
440}
441
442#[derive(Debug, thiserror::Error)]
444pub enum StepError {
445 #[error("CodecError: {0}")]
447 CodecError(BoxDynError),
448 #[error("StorageError: {0}")]
450 StorageError(BoxDynError),
451}
452
453pub trait SteppableStorage<S: Storage, Codec, Compact, Input, Index> {
455 fn push_step<T: Serialize + Send>(
457 &mut self,
458 step: StepRequest<T, Index>,
459 ) -> impl Future<Output = Result<Parts<S::Context>, StepError>> + Send;
460
461 fn start_stepped(
463 &mut self,
464 step: Input,
465 ) -> impl Future<Output = Result<Parts<S::Context>, StepError>> + Send
466 where
467 Input: Serialize + Send,
468 Index: Default,
469 Self: Send,
470 {
471 async {
472 self.push_step(StepRequest {
473 step,
474 index: Index::default(),
475 })
476 .await
477 }
478 }
479}
480
481impl<S, Encode, Compact, Input, Index> SteppableStorage<S, Encode, Compact, Input, Index> for S
482where
483 S: Storage<Job = StepRequest<Compact, Index>, Codec = Encode>
484 + Backend<Request<StepRequest<Compact, Index>, <S as Storage>::Context>>
485 + Send,
486 Encode: Codec<Compact = Compact>,
487 Encode::Error: std::error::Error + Send + Sync + 'static,
488 S::Error: std::error::Error + Send + Sync + 'static,
489 Compact: Send,
490 Index: Send,
491{
492 async fn push_step<T: Serialize + Send>(
493 &mut self,
494 step: StepRequest<T, Index>,
495 ) -> Result<Parts<S::Context>, StepError> {
496 self.push(StepRequest {
497 index: step.index,
498 step: Encode::encode(&step.step).map_err(|e| StepError::CodecError(Box::new(e)))?,
499 })
500 .await
501 .map_err(|e| StepError::StorageError(Box::new(e)))
502 }
503}
504
505pub trait StepIndex: Eq + Hash {
508 fn next(&self) -> Self;
510}
511
512impl StepIndex for usize {
513 fn next(&self) -> Self {
514 *self + 1
515 }
516}
517
518impl StepIndex for u32 {
519 fn next(&self) -> Self {
520 *self + 1
521 }
522}