1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use std::{collections::HashMap, fmt::Debug, time::Duration};

use crate::{
    error::{JobError, JobStreamError, WorkerError},
    request::{JobRequest, JobState},
};
use chrono::{DateTime, Utc};
use futures::{future::BoxFuture, stream::BoxStream};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

/// Represents a result for a [Job].
pub type JobFuture<I> = BoxFuture<'static, I>;
/// Represents a stream for [Job].
pub type JobStreamResult<T> = BoxStream<'static, Result<Option<JobRequest<T>>, JobStreamError>>;

#[derive(Debug)]
/// Represents a wrapper for a job produced from streams
pub struct JobRequestWrapper<T>(pub Result<Option<JobRequest<T>>, JobStreamError>);

/// Trait representing a job.
///
///
/// # Example
/// ```rust,ignore
/// impl Job for Email {
///     const NAME: &'static str = "apalis::Email";
/// }
/// ```
pub trait Job: Sized + Send + Unpin + Serialize + DeserializeOwned + Debug + Sync {
    /// Represents the name for job.
    const NAME: &'static str;

    /// How long it took before service was ready
    fn on_service_ready(&self, _req: &JobRequest<Self>, _latency: Duration) {
        #[cfg(feature = "trace")]
        tracing::debug!(latency = ?_latency, "service.ready");
    }

    /// Handle worker errors related to a job
    fn on_worker_error(&self, _req: &JobRequest<Self>, _error: &WorkerError) {
        #[cfg(feature = "trace")]
        tracing::warn!(error =?_error, "storage.error");
    }
}

/// Job objects that can be reconstructed from the data stored in Storage.
///
/// Implemented for all `Deserialize` objects by default by relying on SerdeJson
/// decoding.
trait JobDecodable
where
    Self: Sized,
{
    /// Decode the given value into a message
    ///
    /// In the default implementation, the string value is decoded by assuming
    /// it was encoded through the Msgpack encoding.
    fn decode_job(value: &[u8]) -> Result<Self, JobError>;
}

/// Job objects that can be encoded to a string to be stored in Storage.
///
/// Implemented for all `Serialize` objects by default by encoding with Serde.
trait JobEncodable
where
    Self: Sized,
{
    /// Encode the value into a bytes array to be inserted into Storage.
    ///
    /// In the default implementation, the object is encoded with Serde.
    fn encode_job(&self) -> Result<Vec<u8>, JobError>;
}

// impl<T> JobDecodable for T
// where
//     T: DeserializeOwned,
// {
//     fn decode_job(value: &Vec<u8>) -> Result<T, JobError> {
//         Ok(serde_json::from_slice(value)?)
//     }
// }

// impl<T: Serialize> JobEncodable for T {
//     fn encode_job(&self) -> Result<Vec<u8>, JobError> {
//         Ok(serde_json::to_vec(self)?)
//     }
// }

/// Represents a Stream of jobs being consumed by a Worker
pub trait JobStream {
    /// The job result
    type Job: Job;
    /// Get the stream of jobs
    fn stream(&mut self, worker_id: String, interval: Duration) -> JobStreamResult<Self::Job>;
}

/// A serializable vesrion of a worker.
#[derive(Debug, Serialize, Deserialize)]
pub struct JobStreamWorker {
    /// The Worker's Id
    worker_id: String,
    /// Target for the worker, useful for display and filtering
    /// uses [std::any::type_name]
    job_type: String,
    // TODO: Add a Source type to Worker trait.
    /// The type of job stream
    source: String,
    /// The layers that were loaded for worker. uses [std::any::type_name]
    layers: String,
    /// The last time the worker was seen. [Storage] has keep alive.
    last_seen: DateTime<Utc>,
}

impl JobStreamWorker {
    /// Build a worker representation for serialization
    pub fn new<S, T>(worker_id: String, last_seen: DateTime<Utc>) -> Self
    where
        S: JobStream<Job = T>,
    {
        JobStreamWorker {
            worker_id,
            job_type: std::any::type_name::<T>().to_string(),
            source: std::any::type_name::<S>().to_string(),
            layers: String::new(),
            last_seen,
        }
    }
    /// Set layers
    pub fn set_layers(&mut self, layers: String) {
        self.layers = layers;
    }
}

/// Counts of different job states
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct Counts {
    /// Counts are flattened from a Hashmap of [JobState]
    #[serde(flatten)]
    pub inner: HashMap<JobState, i64>,
}

/// JobStream extension usually useful for management via cli, web etc
#[async_trait::async_trait]

pub trait JobStreamExt<Job>: JobStream<Job = Job>
where
    Self: Sized,
{
    /// List all Workers that are working on a Job Stream
    async fn list_workers(&mut self) -> Result<Vec<JobStreamWorker>, JobError>;

    /// Returns the counts of jobs in different states
    async fn counts(&mut self) -> Result<Counts, JobError> {
        Ok(Counts {
            ..Default::default()
        })
    }

    /// Fetch jobs persisted from storage
    async fn list_jobs(
        &mut self,
        status: &JobState,
        page: i32,
    ) -> Result<Vec<JobRequest<Job>>, JobError>;
}