1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use crate::layers::ack::AckLayer;
use crate::layers::extensions::Extension;
use futures::StreamExt;
use std::{marker::PhantomData, time::Duration};
use tower::layer::util::Stack;

use crate::{builder::WorkerBuilder, job::JobStreamResult};

use super::beats::EnqueueScheduled;
use super::beats::ReenqueueOrphaned;
use super::{beats::KeepAlive, Storage};

/// A helper trait to help build a [WorkerBuilder] that consumes a [Storage]
pub trait WithStorage<NS, ST: Storage<Output = Self::Job>>: Sized {
    /// The job to consume
    type Job;
    /// The source of jobs
    type Stream;
    /// The builder method to produce a default [WorkerBuilder] that will consume jobs
    fn with_storage(self, storage: ST) -> WorkerBuilder<Self::Job, Self::Stream, NS> {
        self.with_storage_config(storage, |e| e)
    }
    /// The builder method to produce a configured [WorkerBuilder] that will consume jobs
    fn with_storage_config(
        self,
        storage: ST,
        config: impl Fn(WorkerConfig) -> WorkerConfig,
    ) -> WorkerBuilder<Self::Job, Self::Stream, NS>;
}

/// Allows configuring of how storages are consumed
#[derive(Debug)]
pub struct WorkerConfig {
    keep_alive: Duration,
    enqueue_scheduled: Option<(i32, Duration)>,
    reenqueue_orphaned: Option<(i32, Duration)>,
    buffer_size: usize,
    fetch_interval: Duration,
}

impl Default for WorkerConfig {
    fn default() -> Self {
        Self {
            keep_alive: Duration::from_secs(30),
            enqueue_scheduled: Some((10, Duration::from_secs(10))),
            reenqueue_orphaned: Some((10, Duration::from_secs(10))),
            buffer_size: 1,
            fetch_interval: Duration::from_millis(50),
        }
    }
}

impl WorkerConfig {
    /// The number of jobs to fetch in one poll
    ///
    /// Defaults to 1
    pub fn buffer_size(mut self, buffer_size: usize) -> Self {
        self.buffer_size = buffer_size;
        self
    }

    /// The rate at which jobs in the scheduled queue are pushed into the active queue
    ///
    /// Can be set to none for sql scenarios as sql uses run_at
    /// This mainly applies for redis currently
    pub fn enqueue_scheduled(mut self, interval: Option<(i32, Duration)>) -> Self {
        self.enqueue_scheduled = interval;
        self
    }

    /// The rate at which orphaned jobs are returned to the queue
    ///
    /// If None then no garbage collection of orphaned jobs
    pub fn reenqueue_orphaned(mut self, interval: Option<(i32, Duration)>) -> Self {
        self.reenqueue_orphaned = interval;
        self
    }

    /// The rate at which polling is occurring
    /// This may be ignored if the storage uses pubsub
    pub fn fetch_interval(mut self, interval: Duration) -> Self {
        self.fetch_interval = interval;
        self
    }
}

impl<J: 'static + Send + Sync, M, ST>
    WithStorage<Stack<Extension<ST>, Stack<AckLayer<ST, J>, M>>, ST> for WorkerBuilder<(), (), M>
where
    ST: Storage<Output = J> + Send + Sync + 'static,
    M: Send + Sync + 'static,
{
    type Job = J;
    type Stream = JobStreamResult<J>;
    fn with_storage_config(
        mut self,
        mut storage: ST,
        config: impl Fn(WorkerConfig) -> WorkerConfig,
    ) -> WorkerBuilder<J, Self::Stream, Stack<Extension<ST>, Stack<AckLayer<ST, J>, M>>> {
        let worker_config = config(WorkerConfig::default());
        let worker_id = self.id;
        let source = storage
            .consume(
                &worker_id,
                worker_config.fetch_interval,
                worker_config.buffer_size,
            )
            .boxed();

        let layer = self
            .layer
            .layer(AckLayer::new(storage.clone(), worker_id.clone()))
            .layer(Extension(storage.clone()));

        let keep_alive: KeepAlive<ST, M> =
            KeepAlive::new::<J>(&worker_id, storage.clone(), worker_config.keep_alive);
        self.beats.push(Box::new(keep_alive));
        if let Some((count, duration)) = worker_config.reenqueue_orphaned {
            let reenqueue_orphaned = ReenqueueOrphaned::new(storage.clone(), count, duration);
            self.beats.push(Box::new(reenqueue_orphaned));
        }
        if let Some((count, duration)) = worker_config.enqueue_scheduled {
            let enqueue_scheduled = EnqueueScheduled::new(storage.clone(), count, duration);
            self.beats.push(Box::new(enqueue_scheduled));
        }
        WorkerBuilder {
            job: PhantomData,
            layer,
            source,
            id: worker_id,
            beats: self.beats,
        }
    }
}