1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
use std::collections::HashMap;
use std::time::Duration;

use disque::Disque;
use redis::{RedisResult, Iter, Value};

/// Helper to add a new job
///
/// # Examples
///
/// ```
/// # use disque::Disque;
/// # use disque::AddJobBuilder;
///
/// let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
/// let jobid = AddJobBuilder::new(b"example queue", b"my job", 10000
///     ).delay(1440).run(&disque).unwrap();
/// ```
pub struct AddJobBuilder<'a> {
    queue_name: &'a [u8],
    job: &'a [u8],
    timeout: Duration,
    replicate: Option<usize>,
    delay: Option<Duration>,
    retry: Option<Duration>,
    ttl: Option<Duration>,
    maxlen: Option<usize>,
    async: bool,
}

impl<'a> AddJobBuilder<'a> {
    /// Creates a new builder for `queue_name`.
    /// Timeout is specified in milliseconds.
    pub fn new(queue_name: &'a [u8], job: &'a [u8], timeout_ms: u64
            ) -> AddJobBuilder<'a> {
        AddJobBuilder {
            queue_name: queue_name,
            job: job,
            timeout: Duration::from_millis(timeout_ms),
            replicate: None,
            delay: None,
            retry: None,
            ttl: None,
            maxlen: None,
            async: false,
        }
    }

    /// Changes the queue name where the job will be added.
    pub fn queue_name(&mut self, queue_name: &'a [u8]) -> &mut Self {
        self.queue_name = queue_name; self
    }

    /// Changes the job body.
    pub fn job(&mut self, job: &'a [u8]) -> &mut Self {
        self.job = job; self
    }

    /// Changes the timeout. It must be specified in milliseconds.
    pub fn timeout(&mut self, timeout_ms: u64) -> &mut Self {
        self.timeout = Duration::from_millis(timeout_ms); self
    }

    /// The number of nodes the job should be replicated to.
    pub fn replicate(&mut self, replicate: usize) -> &mut Self {
        self.replicate = Some(replicate); self
    }

    /// The number of seconds that should elapse before the job is queued.
    pub fn delay(&mut self, delay: u64) -> &mut Self {
        self.delay = Some(Duration::from_secs(delay)); self
    }

    /// Period after which, if no ACK is received, the job is put again
    /// into the queue for delivery
    pub fn retry(&mut self, retry: u64) -> &mut Self {
        self.retry = Some(Duration::from_secs(retry)); self
    }

    /// The max job life in seconds. After this time, the job is deleted even
    /// if it was not successfully delivered.
    pub fn ttl(&mut self, ttl: u64) -> &mut Self {
        self.ttl = Some(Duration::from_secs(ttl)); self
    }

    /// If there are already count messages queued for the specified queue
    /// name, the message is refused and an error reported to the client.
    pub fn maxlen(&mut self, maxlen: usize) -> &mut Self {
        self.maxlen = Some(maxlen); self
    }

    /// If true, asks the server to let the command return ASAP and replicate
    /// the job to other nodes in the background.
    /// Otherwise, the job is put into the queue only when the client gets a
    /// positive reply.
    pub fn async(&mut self, async: bool) -> &mut Self {
        self.async = async; self
    }

    /// Executes the addjob command.
    pub fn run(&self, disque: &Disque) -> RedisResult<String> {
        disque.addjob(self.queue_name, self.job, self.timeout, self.replicate,
                self.delay, self.retry, self.ttl, self.maxlen, self.async)
    }
}

#[test]
fn add_job_builder() {
    let mut jb = AddJobBuilder::new(b"queue", b"job", 123);
    assert_eq!(jb.queue_name, b"queue");
    assert_eq!(jb.job, b"job");
    assert_eq!(jb.timeout, Duration::from_millis(123));
    assert_eq!(jb.replicate, None);
    assert_eq!(jb.delay, None);
    assert_eq!(jb.retry, None);
    assert_eq!(jb.ttl, None);
    assert_eq!(jb.maxlen, None);
    assert_eq!(jb.async, false);
    jb.replicate(3).delay(4).retry(5).ttl(6).maxlen(7).async(true);
    assert_eq!(jb.queue_name, b"queue");
    assert_eq!(jb.job, b"job");
    assert_eq!(jb.timeout, Duration::from_millis(123));
    assert_eq!(jb.replicate, Some(3));
    assert_eq!(jb.delay, Some(Duration::from_secs(4)));
    assert_eq!(jb.retry, Some(Duration::from_secs(5)));
    assert_eq!(jb.ttl, Some(Duration::from_secs(6)));
    assert_eq!(jb.maxlen, Some(7));
    assert_eq!(jb.async, true);
    jb.queue_name(b"my queue").job(b"my job").timeout(234);
    assert_eq!(jb.queue_name, b"my queue");
    assert_eq!(jb.job, b"my job");
    assert_eq!(jb.timeout, Duration::from_millis(234));
    assert_eq!(jb.replicate, Some(3));
    assert_eq!(jb.delay, Some(Duration::from_secs(4)));
    assert_eq!(jb.retry, Some(Duration::from_secs(5)));
    assert_eq!(jb.ttl, Some(Duration::from_secs(6)));
    assert_eq!(jb.maxlen, Some(7));
    assert_eq!(jb.async, true);
}

/// Helper to get a list of queues
///
/// # Examples
///
/// ```
/// # use disque::Disque;
/// # use disque::QueueQueryBuilder;
///
/// let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
/// let queues = QueueQueryBuilder::new().busyloop(true).minlen(50)
///     .iter(&disque).unwrap().collect::<Vec<_>>();
/// assert!(queues.len() >= 0);
/// ```
pub struct QueueQueryBuilder {
    count: u64,
    busyloop: bool,
    minlen: Option<u64>,
    maxlen: Option<u64>,
    importrate: Option<u64>,
}

impl QueueQueryBuilder {
    /// Creates a new builder.
    pub fn new() -> QueueQueryBuilder {
        QueueQueryBuilder {
            count: 16,
            busyloop: false,
            minlen: None,
            maxlen: None,
            importrate: None,
        }
    }

    /// A hint about how much work to do per iteration.
    pub fn count(&mut self, count: u64) -> &mut Self {
        self.count = count; self
    }

    /// If true, blocks and returns all the queues in a busy loop.
    pub fn busyloop(&mut self, busyloop: bool) -> &mut Self {
        self.busyloop = busyloop; self
    }

    /// Only return queues with at least `minlen` jobs.
    pub fn minlen(&mut self, minlen: u64) -> &mut Self {
        self.minlen = Some(minlen); self
    }

    /// Only return queues with at most `maxlen` jobs.
    pub fn maxlen(&mut self, maxlen: u64) -> &mut Self {
        self.maxlen = Some(maxlen); self
    }

    /// Only return queues with a job import rate (from other nodes) greater
    /// than or equal to  `importrate`.
    pub fn importrate(&mut self, importrate: u64) -> &mut Self {
        self.importrate = Some(importrate); self
    }

    /// Gets the queue iterator.
    pub fn iter<'a>(&'a self, disque: &'a Disque) -> RedisResult<Iter<Vec<u8>>> {
        disque.qscan(0, self.count, self.busyloop, self.minlen, self.maxlen,
                self.importrate)
    }
}

#[test]
fn queue_query_builder() {
    let mut qqb = QueueQueryBuilder::new();
    assert_eq!(qqb.count, 16);
    assert_eq!(qqb.busyloop, false);
    assert_eq!(qqb.minlen, None);
    assert_eq!(qqb.maxlen, None);
    assert_eq!(qqb.importrate, None);
    qqb.count(11).busyloop(true).minlen(1).maxlen(10).importrate(5);
    assert_eq!(qqb.count, 11);
    assert_eq!(qqb.busyloop, true);
    assert_eq!(qqb.minlen, Some(1));
    assert_eq!(qqb.maxlen, Some(10));
    assert_eq!(qqb.importrate, Some(5));
}

/// Helper to get a list of jobs
///
/// # Examples
///
/// ```
/// # use disque::Disque;
/// # use disque::JobQueryBuilder;
///
/// let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
/// let jobs = JobQueryBuilder::new().queue(b"my queue").state("queued")
///     .iter_ids(&disque).unwrap().collect::<Vec<_>>();
/// assert!(jobs.len() >= 0);
/// ```
pub struct JobQueryBuilder<'a> {
    count: u64,
    busyloop: bool,
    queue: Option<&'a [u8]>,
    states: Vec<&'a str>,
}

impl<'a> JobQueryBuilder<'a> {
    pub fn new() -> JobQueryBuilder<'a> {
        JobQueryBuilder {
            count: 16,
            busyloop: false,
            queue: None,
            states: vec![],
        }
    }

    /// A hint about how much work to do per iteration.
    pub fn count(&mut self, count: u64) -> &mut Self {
        self.count = count; self
    }

    /// Block and return all the elements in a busy loop.
    pub fn busyloop(&mut self, busyloop: bool) -> &mut Self {
        self.busyloop = busyloop; self
    }

    /// Only get jobs in the specified queue.
    pub fn queue(&mut self, queue: &'a [u8]) -> &mut Self {
        self.queue = Some(queue); self
    }

    /// Only return jobs in specified states. Can be called multiple times to
    /// get jobs in any of those states.
    pub fn state(&mut self, state: &'a str) -> &mut Self {
        self.states.push(state); self
    }

    /// Gets a job ids iterator.
    pub fn iter_ids<'b>(&'b self, disque: &'b Disque
            ) -> RedisResult<Iter<String>> {
        disque.jscan_id(0, self.count, self.busyloop, self.queue,
                &*self.states)
    }

    /// Gets a job information iterator.
    pub fn iter_all<'b>(&'b self, disque: &'b Disque
            ) -> RedisResult<Iter<HashMap<String, Value>>> {
        disque.jscan_all(0, self.count, self.busyloop, self.queue,
                &*self.states)
    }
}

#[test]
fn job_query_builder() {
    let mut jqb = JobQueryBuilder::new();
    assert_eq!(jqb.count, 16);
    assert_eq!(jqb.busyloop, false);
    assert_eq!(jqb.queue, None);
    assert_eq!(jqb.states.len(), 0);
    jqb.count(20).busyloop(true).queue(b"jqb queue").state("state1"
            ).state("state2");
    assert_eq!(jqb.count, 20);
    assert_eq!(jqb.busyloop, true);
    assert_eq!(jqb.queue, Some(b"jqb queue" as &[u8]));
    assert_eq!(jqb.states, vec!["state1", "state2"]);
}