1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
mod error;
#[cfg(feature = "worker")]
mod streams;
#[cfg(feature = "worker")]
mod worker;
use std::time::Duration;

use chrono::{DateTime, Utc};

use crate::{
    job::JobStream,
    job::{Job, JobStreamResult},
    request::JobRequest,
};

#[cfg(feature = "storage")]
pub use self::error::StorageError;

#[cfg(feature = "worker")]
pub use worker::StorageWorker;

/// Represents a Storage Result
pub type StorageResult<I> = Result<I, StorageError>;

/// Represents a [Storage] that can be passed to a [Builder]
///
/// [Builder]: crate::builder::WorkerBuilder
#[async_trait::async_trait]
pub trait Storage: Clone {
    /// The type of job that can be persisted
    type Output: Job;

    /// Pushes a job to a storage
    ///
    /// TODO: return id
    async fn push(&mut self, job: Self::Output) -> StorageResult<()>;

    /// Push a job into the scheduled set
    async fn schedule(&mut self, job: Self::Output, on: DateTime<Utc>) -> StorageResult<()>;

    /// Return the number of pending jobs from the queue
    async fn len(&self) -> StorageResult<i64>;

    /// Fetch a job given an id
    async fn fetch_by_id(&self, job_id: String) -> StorageResult<Option<JobRequest<Self::Output>>>;

    /// Get the stream of jobs
    fn consume(&mut self, worker_id: String, interval: Duration) -> JobStreamResult<Self::Output>;

    /// Acknowledge a job which returns [JobResult::Success]
    async fn ack(&mut self, worker_id: String, job_id: String) -> StorageResult<()>;

    /// Retry a job which returns [JobResult::Retry]
    async fn retry(&mut self, worker_id: String, job_id: String) -> StorageResult<()>;

    /// Called by a Worker to keep the storage alive and prevent jobs from being deemed as orphaned
    async fn keep_alive<Service>(&mut self, worker_id: String) -> StorageResult<()>;

    /// Kill a job that returns [JobResult::Kill]
    async fn kill(&mut self, worker_id: String, job_id: String) -> StorageResult<()>;

    /// Update a job details
    async fn update_by_id(
        &self,
        job_id: String,
        job: &JobRequest<Self::Output>,
    ) -> StorageResult<()>;

    /// Used for scheduling jobs
    async fn heartbeat(&mut self, pulse: StorageWorkerPulse) -> StorageResult<bool>;

    /// Kill a job that returns [JobResult::Reschedule]
    /// [`JobResult::Reschedule`]: crate::response::JobResult
    async fn reschedule(
        &mut self,
        job: &JobRequest<Self::Output>,
        wait: Duration,
    ) -> StorageResult<()>;

    /// Used to recover jobs when a Worker shuts down.
    async fn reenqueue_active(&mut self, _job_ids: Vec<String>) -> StorageResult<()> {
        Ok(())
    }

    /// This method is not implemented yet but its self explanatory
    #[doc(hidden)]
    async fn is_empty(&self) -> StorageResult<bool> {
        unimplemented!()
    }
}

/// Each [Worker] sends heartbeat messages to storage
#[non_exhaustive]
#[derive(Debug, Clone, Hash, PartialEq, Eq)]

pub enum StorageWorkerPulse {
    /// Push scheduled jobs into the active set
    EnqueueScheduled {
        /// the count of jobs to be scheduled
        count: i32,
    },
    /// Resque any orphaned jobs
    RenqueueOrpharned {
        /// the count of orphaned jobs
        count: i32,
    },
}

impl<S> JobStream for S
where
    S: Storage,
{
    type Job = S::Output;

    /// Consume the stream of jobs from Storage
    fn stream(&mut self, worker_id: String, interval: Duration) -> JobStreamResult<S::Output> {
        self.consume(worker_id, interval)
    }
}