1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use std::marker::PhantomData;

use futures::StreamExt;
use tower::layer::util::Stack;

use crate::{
    builder::WorkerBuilder,
    job::JobStreamResult,
    layers::{
        ack::{Ack, AckLayer},
        extensions::Extension,
    },
};

use super::{MessageQueue, WithMq};

impl<J: 'static, M, Mq> WithMq<Stack<Extension<Mq>, Stack<AckLayer<Mq, J>, M>>, Mq>
    for WorkerBuilder<(), (), M>
where
    Mq: MessageQueue<J> + Send + Sync + 'static + Clone + Ack<J>,
    M: Send + Sync + 'static,
{
    type Job = J;
    type Stream = JobStreamResult<J>;
    fn with_mq(
        self,
        mq: Mq,
    ) -> WorkerBuilder<J, Self::Stream, Stack<Extension<Mq>, Stack<AckLayer<Mq, J>, M>>> {
        let worker_id = self.id;
        let source = mq.consume(&worker_id).boxed();

        let layer = self
            .layer
            .layer(AckLayer::new(mq.clone(), worker_id.clone()))
            .layer(Extension(mq.clone()));

        WorkerBuilder {
            job: PhantomData,
            layer,
            source,
            id: worker_id,
            beats: self.beats,
        }
    }
}