disqrust/
lib.rs

1//! A high-level library to implement Disque workers.
2//!
3//! ```rust,no_run
4//! extern crate disque;
5//! extern crate disqrust;
6//!
7//! use disque::Disque;
8//! use disqrust::{EventLoop, Handler, JobStatus};
9//!
10//! #[derive(Clone)]
11//! struct MyHandler(u8);
12//!
13//! impl Handler for MyHandler {
14//!     fn process_job(&self, queue_name: &[u8], jobid: &String, body: Vec<u8>) -> JobStatus {
15//!         match queue_name {
16//!             b"send email" => { /* send email */; JobStatus::AckJob },
17//!             _ => JobStatus::NAck,
18//!         }
19//!     }
20//! }
21//!
22//! fn main() {
23//!     let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
24//!     let mut el = EventLoop::new(disque, 4, MyHandler(0));
25//!     el.watch_queue(b"my queue".to_vec());
26//!     el.run(1000);
27//! }
28//! ```
29#![cfg_attr(feature = "nightly", feature(catch_panic))]
30extern crate disque;
31
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::sync::mpsc::{channel, Receiver, Sender};
35use std::thread::{spawn, JoinHandle};
36#[cfg(feature = "nightly")] use std::thread::catch_panic;
37
38use disque::Disque;
39
40/// Once a job execution finishes, change its status by performing one of this
41/// actions.
42#[derive(Clone)]
43pub enum JobStatus {
44    /// Performs a best effort cluster wide deletion of the job.
45    FastAck,
46    /// Acknowledges the execution of the jobs.
47    AckJob,
48    /// Puts back the job in the queue ASAP.
49    NAck,
50}
51
52/// Handles a job task.
53pub trait Handler {
54    /// Process a job.
55    fn process_job(&self, queue_name: &[u8], jobid: &String, body: Vec<u8>) -> JobStatus;
56    /// Decides if a job that failed in the past should be re-executed.
57    /// `nack` is the count of negatives acknowledges.
58    /// `additional_deliveries` is the number of times the job was processed
59    /// but it was not acknowledged.
60    fn process_error(&self, _: &[u8], _: &String, _: u32, _: u32) -> bool {
61        false
62    }
63}
64
65/// A wrapper to send the handler to each worker thread.
66#[derive(Clone)]
67struct HandlerWrapper<H: Handler> {
68    handler: Arc<H>,
69}
70unsafe impl<H: Handler> Send for HandlerWrapper<H> {}
71unsafe impl<H: Handler> Sync for HandlerWrapper<H> {}
72
73#[cfg(feature = "nightly")]
74macro_rules! spawn {
75    ($func: expr, $err: expr) => {
76        spawn(move || {
77            match catch_panic(move || $func) {
78                Ok(_) => (),
79                Err(e) => ($err)(e),
80            }
81        })
82    }
83}
84
85#[cfg(not(feature = "nightly"))]
86macro_rules! spawn {
87    ($func: expr, $err: expr) => {
88        spawn(move || $func)
89    }
90}
91
92#[allow(dead_code)]
93enum JobUpdate {
94    Success(usize, String, JobStatus),
95    Failure(usize),
96}
97
98/// Creates a worker to handle tasks coming from `task_rx`, reporting them back
99/// to `completion_tx` using the provided `handler_`. The `position` is the
100/// worker id.
101#[allow(unused_variables)]
102fn create_worker<H: Handler + Clone + 'static>(position: usize,
103        task_rx: Receiver<Option<(Vec<u8>, String, Vec<u8>, u32, u32)>>,
104        completion_tx: Sender<JobUpdate>,
105        handler_: HandlerWrapper<H>,
106        ) -> JoinHandle<()> {
107    let handlerw = handler_.clone();
108    let completion_tx2 = completion_tx.clone();
109    spawn!({
110        let handler = handlerw.handler;
111        loop {
112            let (queue, jobid, job, nack,
113                additional_deliveries) = match task_rx.recv() {
114                Ok(o) => match o {
115                    Some(v) => v,
116                    None => break,
117                },
118                Err(e) => {
119                    // TODO: better log
120                    println!("Error in worker thread {:?}", e);
121                    break;
122                }
123            };
124
125            if nack > 0 || additional_deliveries > 0 {
126                if !handler.process_error(&*queue, &jobid, nack,
127                    additional_deliveries) {
128                    return;
129                }
130            }
131            let status = handler.process_job(&*queue, &jobid, job);
132
133            completion_tx.send(JobUpdate::Success(position, jobid, status)).unwrap();
134        }
135    }, |e| {
136        println!("handle panic {:?}", e);
137        completion_tx2.send(JobUpdate::Failure(position)).unwrap();
138    })
139}
140
141/// Workers manager.
142pub struct EventLoop<H: Handler + Clone + 'static> {
143    /// The connection to pull the jobs.
144    disque: Disque,
145    /// The worker threads and their channels to send jobs.
146    workers: Vec<(JoinHandle<()>, Sender<Option<(Vec<u8>, String, Vec<u8>, u32, u32)>>)>,
147    /// The receiver when tasks are completed.
148    completion_rx: Receiver<JobUpdate>,
149    /// The sender for when tasks are completed. Keeping a reference just to
150    /// provide to workers.
151    completion_tx: Sender<JobUpdate>,
152    /// Set of available workers.
153    free_workers: HashSet<usize>,
154    /// Watched queue names.
155    queues: HashSet<Vec<u8>>,
156    /// Server network layout.
157    hello: (u8, String, Vec<(String, String, u16, u32)>),
158    /// Counter for where the tasks are coming from. If most tasks are coming
159    /// from a server that is not the one the connection is issued, it can be
160    /// used to connect directly to the other one.
161    node_counter: HashMap<Vec<u8>, usize>,
162    /// The task processor.
163    ahandler: HandlerWrapper<H>
164}
165
166impl<H: Handler + Clone + 'static> EventLoop<H> {
167    pub fn new(
168            disque: Disque, numworkers: usize,
169            handler: H) -> Self {
170        let mut workers = Vec::with_capacity(numworkers);
171        let mut free_workers = HashSet::with_capacity(numworkers);
172        let (completion_tx, completion_rx) = channel();
173        let ahandler = HandlerWrapper { handler: Arc::new(handler) };
174        for i in 0..numworkers {
175            let (task_tx, task_rx) = channel();
176            let jg = create_worker(i, task_rx, completion_tx.clone(),
177                    ahandler.clone());
178            workers.push((jg, task_tx));
179            free_workers.insert(i);
180        }
181        let hello = disque.hello().unwrap();
182        EventLoop {
183            disque: disque,
184            completion_rx: completion_rx,
185            workers: workers,
186            hello: hello,
187            free_workers: free_workers,
188            queues: HashSet::new(),
189            node_counter: HashMap::new(),
190            completion_tx: completion_tx,
191            ahandler: ahandler,
192        }
193    }
194
195    /// Adds a queue to process its jobs.
196    pub fn watch_queue(&mut self, queue_name: Vec<u8>) {
197        self.queues.insert(queue_name);
198    }
199
200    /// Removes a queue from job processing.
201    pub fn unwatch_queue(&mut self, queue_name: &Vec<u8>) {
202        self.queues.remove(queue_name);
203    }
204
205    /// Marks a job as completed
206    fn completed(&mut self, worker: usize, jobid: String, status: JobStatus) {
207        self.free_workers.insert(worker);
208        match status {
209            JobStatus::FastAck => self.disque.fastackjob(jobid.as_bytes()),
210            JobStatus::AckJob => self.disque.ackjob(jobid.as_bytes()),
211            JobStatus::NAck => self.disque.nackjob(jobid.as_bytes()),
212        }.unwrap();
213    }
214
215    /// Creates a new worker to replace one that has entered into panic.
216    fn handle_worker_panic(&mut self, worker: usize) {
217        if self.workers.len() == 0 {
218            // shutting down
219            return;
220        }
221
222        let (task_tx, task_rx) = channel();
223        let jg = create_worker(worker, task_rx, self.completion_tx.clone(),
224                self.ahandler.clone());
225        self.workers[worker] = (jg, task_tx);
226        self.free_workers.insert(worker);
227    }
228
229    /// Checks workers to see if they have completed their jobs.
230    /// If `blocking` it will wait until at least one new is available.
231    fn mark_completed(&mut self, blocking: bool) {
232        macro_rules! recv {
233            ($func: ident) => {
234                match self.completion_rx.$func() {
235                    Ok(c) => match c {
236                        JobUpdate::Success(worker, jobid, status) => {
237                            self.completed(worker, jobid, status);
238                        },
239                        JobUpdate::Failure(worker) => self.handle_worker_panic(worker),
240                    },
241                    Err(_) => return,
242                }
243            }
244        }
245
246        if blocking {
247            recv!(recv);
248        }
249        loop {
250            recv!(try_recv);
251        }
252    }
253
254    /// Connects to the server that is issuing most of the jobs.
255    pub fn choose_favorite_node(&self) -> (Vec<u8>, usize) {
256        let mut r = (&Vec::new(), &0);
257        for n in self.node_counter.iter() {
258            if n.1 >= r.1 {
259                r = n;
260            }
261        }
262        (r.0.clone(), r.1.clone())
263    }
264
265    /// Number of jobs produced by the current server.
266    pub fn jobcount_current_node(&self) -> usize {
267        let nodeid = self.hello.1.as_bytes()[0..8].to_vec();
268        self.node_counter.get(&nodeid).unwrap_or(&0).clone()
269    }
270
271    /// Identifier of the current server.
272    pub fn current_node_id(&self) -> String {
273        self.hello.1.clone()
274    }
275
276    /// Fetches a task and sends it to a worker.
277    /// Returns true if a job was received and is processing.
278    fn run_once(&mut self) -> bool {
279        self.mark_completed(false);
280        let worker = match self.free_workers.iter().next() {
281            Some(w) => w.clone(),
282            None => return false,
283        };
284
285        let job = match self.disque.getjob_withcounters(false, None,
286                &*self.queues.iter().map(|k| &**k).collect::<Vec<_>>()
287                ).unwrap() {
288            Some(j) => j,
289            None => return false,
290        };
291
292        let nodeid = job.1.as_bytes()[2..10].to_vec();
293        let v = self.node_counter.remove(&nodeid).unwrap_or(0);
294        self.node_counter.insert(nodeid, v + 1);
295
296        self.free_workers.remove(&worker);
297        self.workers[worker].1.send(Some(job)).unwrap();
298        true
299    }
300
301    /// Connects to a new server.
302    fn connect_to_node(&mut self, new_master: Vec<u8>) -> bool {
303        let mut hello = None;
304        for node in self.hello.2.iter() {
305            if node.0.as_bytes()[..new_master.len()] == *new_master {
306                match Disque::open(&*format!("redis://{}:{}/", node.1, node.2)) {
307                    Ok(disque) => {
308                        hello = Some(match disque.hello() {
309                            Ok(hello) => hello,
310                            Err(_) => break,
311                        });
312                        self.disque = disque;
313                        break;
314                    },
315                    Err(_) => (),
316                }
317                break;
318            }
319        }
320        match hello {
321            Some(h) => { self.hello = h; true }
322            None => false,
323        }
324    }
325
326    /// Connects to the server doing most jobs.
327    pub fn do_cycle(&mut self) {
328        let (fav_node, fav_count) = self.choose_favorite_node();
329        let current_count = self.jobcount_current_node();
330        // only change if it is at least 20% better than the current node
331        if fav_count as f64 / current_count as f64 > 1.2 {
332            self.connect_to_node(fav_node);
333        }
334    }
335
336    /// Runs for ever. Every `cycle` jobs reevaluates which server to use.
337    pub fn run(&mut self, cycle: usize) {
338        self.run_times_cycle(0, cycle)
339    }
340
341    /// Runs until `times` jobs are received.
342    pub fn run_times(&mut self, times: usize) {
343        self.run_times_cycle(times, 0)
344    }
345
346    /// Runs `times` jobs and changes server every `cycle`.
347    pub fn run_times_cycle(&mut self, times: usize, cycle: usize) {
348        let mut c = 0;
349        let mut counter = 0;
350        loop {
351            let did_run = self.run_once();
352            if did_run {
353                if times > 0 {
354                    counter += 1;
355                    if counter == times {
356                        break;
357                    }
358                }
359                if cycle > 0 {
360                    c += 1;
361                    if c == cycle {
362                        self.do_cycle();
363                        c = 0;
364                    }
365                }
366            } else {
367                self.mark_completed(true);
368            }
369        }
370        self.mark_completed(false);
371    }
372
373    /// Sends a kill signal to all workers and waits for them to finish their
374    /// current job.
375    pub fn stop(mut self) {
376        for worker in std::mem::replace(&mut self.workers, vec![]).into_iter() {
377            worker.1.send(None).unwrap();
378            worker.0.join().unwrap();
379        }
380        self.mark_completed(false);
381    }
382}
383
384#[test]
385fn favorite() {
386    #[derive(Clone)]
387    struct MyHandler;
388    impl Handler for MyHandler {
389        fn process_job(&self, _: &[u8], _: &String, _: Vec<u8>) -> JobStatus {
390            JobStatus::AckJob
391        }
392        fn process_error(&self, _: &[u8], _: &String, _: u32, _: u32) -> bool {
393            false
394        }
395    }
396    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
397    let mut el = EventLoop::new(disque, 1, MyHandler);
398    el.node_counter.insert(vec![1, 2, 3], 123);
399    el.node_counter.insert(vec![4, 5, 6], 456);
400    el.node_counter.insert(vec![0, 0, 0], 0);
401    assert_eq!(el.choose_favorite_node(), (vec![4, 5, 6], 456));
402}