1#![cfg_attr(feature = "nightly", feature(catch_panic))]
30extern crate disque;
31
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::sync::mpsc::{channel, Receiver, Sender};
35use std::thread::{spawn, JoinHandle};
36#[cfg(feature = "nightly")] use std::thread::catch_panic;
37
38use disque::Disque;
39
40#[derive(Clone)]
43pub enum JobStatus {
44 FastAck,
46 AckJob,
48 NAck,
50}
51
52pub trait Handler {
54 fn process_job(&self, queue_name: &[u8], jobid: &String, body: Vec<u8>) -> JobStatus;
56 fn process_error(&self, _: &[u8], _: &String, _: u32, _: u32) -> bool {
61 false
62 }
63}
64
65#[derive(Clone)]
67struct HandlerWrapper<H: Handler> {
68 handler: Arc<H>,
69}
70unsafe impl<H: Handler> Send for HandlerWrapper<H> {}
71unsafe impl<H: Handler> Sync for HandlerWrapper<H> {}
72
73#[cfg(feature = "nightly")]
74macro_rules! spawn {
75 ($func: expr, $err: expr) => {
76 spawn(move || {
77 match catch_panic(move || $func) {
78 Ok(_) => (),
79 Err(e) => ($err)(e),
80 }
81 })
82 }
83}
84
85#[cfg(not(feature = "nightly"))]
86macro_rules! spawn {
87 ($func: expr, $err: expr) => {
88 spawn(move || $func)
89 }
90}
91
92#[allow(dead_code)]
93enum JobUpdate {
94 Success(usize, String, JobStatus),
95 Failure(usize),
96}
97
98#[allow(unused_variables)]
102fn create_worker<H: Handler + Clone + 'static>(position: usize,
103 task_rx: Receiver<Option<(Vec<u8>, String, Vec<u8>, u32, u32)>>,
104 completion_tx: Sender<JobUpdate>,
105 handler_: HandlerWrapper<H>,
106 ) -> JoinHandle<()> {
107 let handlerw = handler_.clone();
108 let completion_tx2 = completion_tx.clone();
109 spawn!({
110 let handler = handlerw.handler;
111 loop {
112 let (queue, jobid, job, nack,
113 additional_deliveries) = match task_rx.recv() {
114 Ok(o) => match o {
115 Some(v) => v,
116 None => break,
117 },
118 Err(e) => {
119 println!("Error in worker thread {:?}", e);
121 break;
122 }
123 };
124
125 if nack > 0 || additional_deliveries > 0 {
126 if !handler.process_error(&*queue, &jobid, nack,
127 additional_deliveries) {
128 return;
129 }
130 }
131 let status = handler.process_job(&*queue, &jobid, job);
132
133 completion_tx.send(JobUpdate::Success(position, jobid, status)).unwrap();
134 }
135 }, |e| {
136 println!("handle panic {:?}", e);
137 completion_tx2.send(JobUpdate::Failure(position)).unwrap();
138 })
139}
140
141pub struct EventLoop<H: Handler + Clone + 'static> {
143 disque: Disque,
145 workers: Vec<(JoinHandle<()>, Sender<Option<(Vec<u8>, String, Vec<u8>, u32, u32)>>)>,
147 completion_rx: Receiver<JobUpdate>,
149 completion_tx: Sender<JobUpdate>,
152 free_workers: HashSet<usize>,
154 queues: HashSet<Vec<u8>>,
156 hello: (u8, String, Vec<(String, String, u16, u32)>),
158 node_counter: HashMap<Vec<u8>, usize>,
162 ahandler: HandlerWrapper<H>
164}
165
166impl<H: Handler + Clone + 'static> EventLoop<H> {
167 pub fn new(
168 disque: Disque, numworkers: usize,
169 handler: H) -> Self {
170 let mut workers = Vec::with_capacity(numworkers);
171 let mut free_workers = HashSet::with_capacity(numworkers);
172 let (completion_tx, completion_rx) = channel();
173 let ahandler = HandlerWrapper { handler: Arc::new(handler) };
174 for i in 0..numworkers {
175 let (task_tx, task_rx) = channel();
176 let jg = create_worker(i, task_rx, completion_tx.clone(),
177 ahandler.clone());
178 workers.push((jg, task_tx));
179 free_workers.insert(i);
180 }
181 let hello = disque.hello().unwrap();
182 EventLoop {
183 disque: disque,
184 completion_rx: completion_rx,
185 workers: workers,
186 hello: hello,
187 free_workers: free_workers,
188 queues: HashSet::new(),
189 node_counter: HashMap::new(),
190 completion_tx: completion_tx,
191 ahandler: ahandler,
192 }
193 }
194
195 pub fn watch_queue(&mut self, queue_name: Vec<u8>) {
197 self.queues.insert(queue_name);
198 }
199
200 pub fn unwatch_queue(&mut self, queue_name: &Vec<u8>) {
202 self.queues.remove(queue_name);
203 }
204
205 fn completed(&mut self, worker: usize, jobid: String, status: JobStatus) {
207 self.free_workers.insert(worker);
208 match status {
209 JobStatus::FastAck => self.disque.fastackjob(jobid.as_bytes()),
210 JobStatus::AckJob => self.disque.ackjob(jobid.as_bytes()),
211 JobStatus::NAck => self.disque.nackjob(jobid.as_bytes()),
212 }.unwrap();
213 }
214
215 fn handle_worker_panic(&mut self, worker: usize) {
217 if self.workers.len() == 0 {
218 return;
220 }
221
222 let (task_tx, task_rx) = channel();
223 let jg = create_worker(worker, task_rx, self.completion_tx.clone(),
224 self.ahandler.clone());
225 self.workers[worker] = (jg, task_tx);
226 self.free_workers.insert(worker);
227 }
228
229 fn mark_completed(&mut self, blocking: bool) {
232 macro_rules! recv {
233 ($func: ident) => {
234 match self.completion_rx.$func() {
235 Ok(c) => match c {
236 JobUpdate::Success(worker, jobid, status) => {
237 self.completed(worker, jobid, status);
238 },
239 JobUpdate::Failure(worker) => self.handle_worker_panic(worker),
240 },
241 Err(_) => return,
242 }
243 }
244 }
245
246 if blocking {
247 recv!(recv);
248 }
249 loop {
250 recv!(try_recv);
251 }
252 }
253
254 pub fn choose_favorite_node(&self) -> (Vec<u8>, usize) {
256 let mut r = (&Vec::new(), &0);
257 for n in self.node_counter.iter() {
258 if n.1 >= r.1 {
259 r = n;
260 }
261 }
262 (r.0.clone(), r.1.clone())
263 }
264
265 pub fn jobcount_current_node(&self) -> usize {
267 let nodeid = self.hello.1.as_bytes()[0..8].to_vec();
268 self.node_counter.get(&nodeid).unwrap_or(&0).clone()
269 }
270
271 pub fn current_node_id(&self) -> String {
273 self.hello.1.clone()
274 }
275
276 fn run_once(&mut self) -> bool {
279 self.mark_completed(false);
280 let worker = match self.free_workers.iter().next() {
281 Some(w) => w.clone(),
282 None => return false,
283 };
284
285 let job = match self.disque.getjob_withcounters(false, None,
286 &*self.queues.iter().map(|k| &**k).collect::<Vec<_>>()
287 ).unwrap() {
288 Some(j) => j,
289 None => return false,
290 };
291
292 let nodeid = job.1.as_bytes()[2..10].to_vec();
293 let v = self.node_counter.remove(&nodeid).unwrap_or(0);
294 self.node_counter.insert(nodeid, v + 1);
295
296 self.free_workers.remove(&worker);
297 self.workers[worker].1.send(Some(job)).unwrap();
298 true
299 }
300
301 fn connect_to_node(&mut self, new_master: Vec<u8>) -> bool {
303 let mut hello = None;
304 for node in self.hello.2.iter() {
305 if node.0.as_bytes()[..new_master.len()] == *new_master {
306 match Disque::open(&*format!("redis://{}:{}/", node.1, node.2)) {
307 Ok(disque) => {
308 hello = Some(match disque.hello() {
309 Ok(hello) => hello,
310 Err(_) => break,
311 });
312 self.disque = disque;
313 break;
314 },
315 Err(_) => (),
316 }
317 break;
318 }
319 }
320 match hello {
321 Some(h) => { self.hello = h; true }
322 None => false,
323 }
324 }
325
326 pub fn do_cycle(&mut self) {
328 let (fav_node, fav_count) = self.choose_favorite_node();
329 let current_count = self.jobcount_current_node();
330 if fav_count as f64 / current_count as f64 > 1.2 {
332 self.connect_to_node(fav_node);
333 }
334 }
335
336 pub fn run(&mut self, cycle: usize) {
338 self.run_times_cycle(0, cycle)
339 }
340
341 pub fn run_times(&mut self, times: usize) {
343 self.run_times_cycle(times, 0)
344 }
345
346 pub fn run_times_cycle(&mut self, times: usize, cycle: usize) {
348 let mut c = 0;
349 let mut counter = 0;
350 loop {
351 let did_run = self.run_once();
352 if did_run {
353 if times > 0 {
354 counter += 1;
355 if counter == times {
356 break;
357 }
358 }
359 if cycle > 0 {
360 c += 1;
361 if c == cycle {
362 self.do_cycle();
363 c = 0;
364 }
365 }
366 } else {
367 self.mark_completed(true);
368 }
369 }
370 self.mark_completed(false);
371 }
372
373 pub fn stop(mut self) {
376 for worker in std::mem::replace(&mut self.workers, vec![]).into_iter() {
377 worker.1.send(None).unwrap();
378 worker.0.join().unwrap();
379 }
380 self.mark_completed(false);
381 }
382}
383
384#[test]
385fn favorite() {
386 #[derive(Clone)]
387 struct MyHandler;
388 impl Handler for MyHandler {
389 fn process_job(&self, _: &[u8], _: &String, _: Vec<u8>) -> JobStatus {
390 JobStatus::AckJob
391 }
392 fn process_error(&self, _: &[u8], _: &String, _: u32, _: u32) -> bool {
393 false
394 }
395 }
396 let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
397 let mut el = EventLoop::new(disque, 1, MyHandler);
398 el.node_counter.insert(vec![1, 2, 3], 123);
399 el.node_counter.insert(vec![4, 5, 6], 456);
400 el.node_counter.insert(vec![0, 0, 0], 0);
401 assert_eq!(el.choose_favorite_node(), (vec![4, 5, 6], 456));
402}