1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
use crate::error::Error;
use crate::proto::{Client, Info, Job, Push, PushBulk, QueueAction, QueueControl};
use std::collections::HashMap;
use std::io::prelude::*;
use std::net::TcpStream;

#[cfg(feature = "ent")]
use crate::proto::{Batch, BatchHandle, CommitBatch, OpenBatch};

/// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers.
///
/// # Connecting to Faktory
///
/// To issue jobs, the `Producer` must first be connected to the Faktory server. Exactly how you do
/// that depends on your setup. Faktory suggests using the `FAKTORY_PROVIDER` and `FAKTORY_URL`
/// environment variables (see their docs for more information) with `localhost:7419` as the
/// fallback default. If you want this behavior, pass `None` to
/// [`Producer::connect`](struct.Producer.html#method.connect). If not, you can supply the URL
/// directly to [`Producer::connect`](struct.Producer.html#method.connect) in the form:
///
/// ```text
/// protocol://[:password@]hostname[:port]
/// ```
///
///
/// # Issuing jobs
///
/// Most of the lifetime of a `Producer` will be spent creating and enqueueing jobs for Faktory
/// workers. This is done by passing a [`Job`](struct.Job.html) to
/// [`Producer::enqueue`](struct.Producer.html#method.enqueue). The most important part of a `Job`
/// is its `kind`; this field dictates how workers will execute the job when they receive it. The
/// string provided here must match a handler registered on the worker using
/// [`ConsumerBuilder::register`](struct.ConsumerBuilder.html#method.register) (or the equivalent
/// handler registration method in workers written in other languages).
///
/// Since Faktory workers do not all need to be the same (you could have some written in Rust for
/// performance-critical tasks, some in Ruby for more webby tasks, etc.), it may be the case that a
/// given job can only be executed by some workers (e.g., if they job type is not registered at
/// others). To allow for this, Faktory includes a `labels` field with each job. Jobs will only be
/// sent to workers whose labels (see
/// [`ConsumerBuilder::labels`](struct.ConsumerBuilder.html#method.labels)) match those set in
/// `Job::labels`.
///
/// # Examples
///
/// Connecting to an unsecured Faktory server using environment variables
///
/// ```no_run
/// use faktory::Producer;
/// let p = Producer::connect(None).unwrap();
/// ```
///
/// Connecting to a secured Faktory server using an explicit URL
///
/// ```no_run
/// use faktory::Producer;
/// let p = Producer::connect(Some("tcp://:hunter2@localhost:7439")).unwrap();
/// ```
///
/// Issuing a job using a `Producer`
///
/// ```no_run
/// # use faktory::Producer;
/// # let mut p = Producer::connect(None).unwrap();
/// use faktory::Job;
/// p.enqueue(Job::new("foobar", vec!["z"])).unwrap();
/// ```
///
// TODO: provide way of inspecting status of job.
pub struct Producer<S: Read + Write> {
    c: Client<S>,
}

impl Producer<TcpStream> {
    /// Connect to a Faktory server.
    ///
    /// If `url` is not given, will use the standard Faktory environment variables. Specifically,
    /// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address
    /// from (defaults to `FAKTORY_URL`), and then that environment variable is read to get the
    /// server address. If the latter environment variable is not defined, the connection will be
    /// made to
    ///
    /// ```text
    /// tcp://localhost:7419
    /// ```
    ///
    /// If `url` is given, but does not specify a port, it defaults to 7419.
    pub fn connect(url: Option<&str>) -> Result<Self, Error> {
        let c = Client::connect(url)?;
        Ok(Producer { c })
    }
}

impl<S: Read + Write> Producer<S> {
    /// Connect to a Faktory server with a non-standard stream.
    pub fn connect_with(stream: S, pwd: Option<String>) -> Result<Producer<S>, Error> {
        let c = Client::connect_with(stream, pwd)?;
        Ok(Producer { c })
    }

    /// Enqueue the given job on the Faktory server.
    ///
    /// Returns `Ok` if the job was successfully queued by the Faktory server.
    pub fn enqueue(&mut self, job: Job) -> Result<(), Error> {
        self.c.issue(&Push::from(job))?.await_ok()
    }

    /// Enqueue numerous jobs on the Faktory server.
    ///
    /// Provided you have numerous jobs to submit, using this method will be more efficient as compared
    /// to calling [`enqueue`](Producer::enqueue) multiple times.
    ///
    /// The returned `Ok` result will contain a tuple of enqueued jobs count and an option of a hash map
    /// with job ids mapped onto error messages. Therefore `Ok(n, None)` will indicate that all n jobs
    /// have been enqueued without errors.
    ///
    /// Note that this is not an all-or-nothing operation: jobs that contain errors will not be enqueued,
    /// while those that are error-free _will_ be enqueued by the Faktory server.
    pub fn enqueue_many<J>(
        &mut self,
        jobs: J,
    ) -> Result<(usize, Option<HashMap<String, String>>), Error>
    where
        J: IntoIterator<Item = Job>,
        J::IntoIter: ExactSizeIterator,
    {
        let jobs = jobs.into_iter();
        let jobs_count = jobs.len();
        let errors: HashMap<String, String> = self
            .c
            .issue(&PushBulk::from(jobs.collect::<Vec<_>>()))?
            .read_json()?
            .expect("Faktory server sends {} literal when there are no errors");
        if errors.is_empty() {
            return Ok((jobs_count, None));
        }
        Ok((jobs_count - errors.len(), Some(errors)))
    }

    /// Retrieve information about the running server.
    ///
    /// The returned value is the result of running the `INFO` command on the server.
    pub fn info(&mut self) -> Result<serde_json::Value, Error> {
        self.c
            .issue(&Info)?
            .read_json()
            .map(|v| v.expect("info command cannot give empty response"))
    }

    /// Pause the given queues.
    pub fn queue_pause<T: AsRef<str>>(&mut self, queues: &[T]) -> Result<(), Error> {
        self.c
            .issue(&QueueControl::new(QueueAction::Pause, queues))?
            .await_ok()
    }

    /// Resume the given queues.
    pub fn queue_resume<T: AsRef<str>>(&mut self, queues: &[T]) -> Result<(), Error> {
        self.c
            .issue(&QueueControl::new(QueueAction::Resume, queues))?
            .await_ok()
    }

    /// Initiate a new batch of jobs.
    #[cfg(feature = "ent")]
    #[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
    pub fn start_batch(&mut self, batch: Batch) -> Result<BatchHandle<'_, S>, Error> {
        let bid = self.c.issue(&batch)?.read_bid()?;
        Ok(BatchHandle::new(bid, self))
    }

    /// Open an already existing batch of jobs.
    ///
    /// This will not error if a batch with the provided `bid` does not exist,
    /// rather `Ok(None)` will be returned.
    #[cfg(feature = "ent")]
    #[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
    pub fn open_batch(&mut self, bid: String) -> Result<Option<BatchHandle<'_, S>>, Error> {
        let bid = self.c.issue(&OpenBatch::from(bid))?.maybe_bid()?;
        Ok(bid.map(|bid| BatchHandle::new(bid, self)))
    }

    #[cfg(feature = "ent")]
    pub(crate) fn commit_batch(&mut self, bid: String) -> Result<(), Error> {
        self.c.issue(&CommitBatch::from(bid))?.await_ok()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    // https://github.com/rust-lang/rust/pull/42219
    //#[allow_fail]
    #[ignore]
    fn it_works() {
        let mut p = Producer::connect(None).unwrap();
        p.enqueue(Job::new("foobar", vec!["z"])).unwrap();
    }
}