sidekiq/
server.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use redis::{Pipeline, PipelineCommands};
5use r2d2::{Pool, Config};
6use r2d2_redis::RedisConnectionManager;
7
8use rand::Rng;
9
10use threadpool::ThreadPool;
11
12use chan::{sync, after, tick, Receiver, Sender};
13use chan_signal::{Signal as SysSignal, notify};
14
15use libc::getpid;
16
17use chrono::UTC;
18
19use serde_json::to_string;
20
21use worker::SidekiqWorker;
22use errors::*;
23use utils::rust_gethostname;
24use middleware::MiddleWare;
25use job_handler::JobHandler;
26use RedisPool;
27
28#[derive(Debug)]
29pub enum Signal {
30    Complete(String, usize),
31    Fail(String, usize),
32    Acquire(String),
33    Terminated(String),
34}
35
36pub enum Operation {
37    Terminate,
38}
39
40pub struct SidekiqServer<'a> {
41    redispool: RedisPool,
42    threadpool: ThreadPool,
43    pub namespace: String,
44    job_handlers: BTreeMap<String, Box<JobHandler + 'a>>,
45    middlewares: Vec<Box<MiddleWare + 'a>>,
46    queues: Vec<String>,
47    weights: Vec<f64>,
48    started_at: f64,
49    rs: String,
50    pid: usize,
51    signal_chan: Receiver<SysSignal>,
52    worker_info: BTreeMap<String, bool>, // busy?
53    concurrency: usize,
54    pub force_quite_timeout: usize,
55}
56
57impl<'a> SidekiqServer<'a> {
58    // Interfaces to be exposed
59
60    pub fn new(redis: &str, concurrency: usize) -> Result<Self> {
61        let signal = notify(&[SysSignal::INT, SysSignal::USR1]); // should be here to set proper signal mask to all threads
62        let now = UTC::now();
63        let config = Config::builder()
64            .pool_size(concurrency as u32 + 3) // dunno why, it corrupt for unable to get connection sometimes with concurrency + 1
65            .build();
66        let manager = try!(RedisConnectionManager::new(redis));
67        let pool = try!(Pool::new(config, manager));
68        Ok(SidekiqServer {
69            redispool: pool,
70            threadpool: ThreadPool::new_with_name("worker".into(), concurrency),
71            namespace: String::new(),
72            job_handlers: BTreeMap::new(),
73            queues: vec![],
74            weights: vec![],
75            started_at: now.timestamp() as f64 + now.timestamp_subsec_micros() as f64 / 1000000f64,
76            pid: unsafe { getpid() } as usize,
77            worker_info: BTreeMap::new(),
78            concurrency: concurrency,
79            signal_chan: signal,
80            force_quite_timeout: 10,
81            middlewares: vec![],
82            // random itentity
83            rs: ::rand::thread_rng().gen_ascii_chars().take(12).collect(),
84        })
85    }
86
87    pub fn new_queue(&mut self, name: &str, weight: usize) {
88        self.queues.push(name.into());
89        self.weights.push(weight as f64);
90    }
91
92    pub fn attach_handler<T: JobHandler + 'a>(&mut self, name: &str, handle: T) {
93        self.job_handlers.insert(name.into(), Box::new(handle));
94    }
95
96    pub fn attach_middleware<T: MiddleWare + 'a>(&mut self, factory: T) {
97        self.middlewares.push(Box::new(factory));
98    }
99
100    pub fn start(&mut self) {
101        info!("sidekiq-rs is running...");
102        if self.queues.len() == 0 {
103            error!("queue is empty, exiting");
104            return;
105        }
106        let (tsx, rsx) = sync(self.concurrency + 10);
107        let (tox, rox) = sync(self.concurrency + 10);
108        let signal = self.signal_chan.clone();
109
110        // start worker threads
111        self.launch_workers(tsx.clone(), rox.clone());
112
113        // controller loop
114        let (tox2, rsx2) = (tox.clone(), rsx.clone()); // rename channels cuz `chan_select!` will rename'em below
115        let clock = tick(Duration::from_secs(2)); // report to sidekiq every 5 secs
116        loop {
117            if let Err(e) = self.report_alive() {
118                error!("report alive failed: '{}'", e);
119            }
120            chan_select! {
121                signal.recv() -> signal => {
122                    match signal {
123                        Some(signal @ SysSignal::USR1) => {
124                            info!("{:?}: Terminating", signal);
125                            self.terminate_gracefully(tox2, rsx2);
126                            break;
127                        }
128                        Some(signal @ SysSignal::INT) => {
129                            info!("{:?}: Force terminating", signal);                            
130                            self.terminate_forcely(tox2, rsx2);
131                            break;
132                        }
133                        Some(_) => { unimplemented!() }
134                        None => { unimplemented!() }
135                    }
136                },
137                clock.recv() => {
138                    debug!("server clock triggered");
139                },
140                rsx.recv() -> sig => {
141                    debug!("received signal {:?}", sig);
142                    if let Some(Err(e)) = sig.map(|s| self.deal_signal(s)) {
143                        error!("error when dealing signal: '{}'", e);
144                    }
145                    let worker_count = self.threadpool.active_count();
146                    // relaunch workers if they died unexpectly
147                    if worker_count < self.concurrency {
148                        warn!("worker down, restarting");
149                        self.launch_workers(tsx.clone(), rox.clone());
150                    } else if worker_count > self.concurrency {
151                        unreachable!("unreachable! worker_count can never larger than concurrency!")
152                    }
153                }
154            }
155        }
156
157        // exiting
158        info!("sidekiq exited");
159    }
160
161    // Worker start/terminate functions
162
163
164    fn launch_workers(&mut self, tsx: Sender<Signal>, rox: Receiver<Operation>) {
165        while self.worker_info.len() < self.concurrency {
166            self.launch_worker(tsx.clone(), rox.clone());
167        }
168    }
169
170
171    fn launch_worker(&mut self, tsx: Sender<Signal>, rox: Receiver<Operation>) {
172        let worker = SidekiqWorker::new(&self.identity(),
173                                        self.redispool.clone(),
174                                        tsx,
175                                        rox,
176                                        self.queues.clone(),
177                                        self.weights.clone(),
178                                        self.job_handlers
179                                            .iter_mut()
180                                            .map(|(k, v)| (k.clone(), v.cloned()))
181                                            .collect(),
182                                        self.middlewares.iter_mut().map(|v| v.cloned()).collect(),
183                                        self.namespace.clone());
184        self.worker_info.insert(worker.id.clone(), false);
185        self.threadpool.execute(move || worker.work());
186    }
187
188    fn inform_termination(&self, tox: Sender<Operation>) {
189        for _ in 0..self.concurrency {
190            tox.send(Operation::Terminate);
191        }
192    }
193
194    fn terminate_forcely(&mut self, tox: Sender<Operation>, rsx: Receiver<Signal>) {
195        self.inform_termination(tox);
196
197        let timer = after(Duration::from_secs(self.force_quite_timeout as u64));
198        // deplete the signal channel
199        loop {
200            chan_select! {
201                timer.recv() => {
202                    info!("force quitting");
203                    break
204                },
205                rsx.recv() -> sig => {
206                    debug!("received signal {:?}", sig);
207                    if let Some(Err(e)) = sig.map(|s| self.deal_signal(s)) {
208                        error!("error when dealing signal: '{}'", e);
209                    }
210                    if self.worker_info.len() == 0 {
211                        break
212                    }
213                },
214            }
215        }
216    }
217
218
219    fn terminate_gracefully(&mut self, tox: Sender<Operation>, rsx: Receiver<Signal>) {
220        self.inform_termination(tox);
221
222        info!("waiting for other workers exit");
223        // deplete the signal channel
224        loop {
225            chan_select! {
226                rsx.recv() -> sig => {
227                    debug!("received signal {:?}", sig);
228                    if let Some(Err(e)) = sig.map(|s| self.deal_signal(s)) {
229                        error!("error when dealing signal: '{}'", e);
230                    }
231                    if self.worker_info.len()== 0 {
232                        break
233                    }
234                },
235            }
236        }
237    }
238
239
240    fn deal_signal(&mut self, sig: Signal) -> Result<()> {
241        debug!("dealing signal {:?}", sig);
242        match sig {
243            Signal::Complete(id, n) => {
244                let _ = try!(self.report_processed(n));
245                *self.worker_info.get_mut(&id).unwrap() = false;
246            }
247            Signal::Fail(id, n) => {
248                let _ = try!(self.report_failed(n));
249                *self.worker_info.get_mut(&id).unwrap() = false;
250            }
251            Signal::Acquire(id) => {
252                self.worker_info.insert(id, true);
253            }
254            Signal::Terminated(id) => {
255                self.worker_info.remove(&id);
256            }
257        }
258        debug!("signal dealt");
259        Ok(())
260    }
261
262    // Sidekiq dashboard reporting functions
263
264
265    fn report_alive(&mut self) -> Result<()> {
266        let now = UTC::now();
267
268        let content = vec![("info",
269                            to_string(&json!({
270                                "hostname": rust_gethostname().unwrap_or("unknown".into()),
271                                "started_at": self.started_at,
272                                "pid": self.pid,
273                                "concurrency": self.concurrency,
274                                "queues": self.queues.clone(),
275                                "labels": [],
276                                "identity": self.identity()
277                            }))
278                                .unwrap()),
279                           ("busy", self.worker_info.values().filter(|v| **v).count().to_string()),
280                           ("beat",
281                            (now.timestamp() as f64 +
282                             now.timestamp_subsec_micros() as f64 / 1000000f64)
283                                .to_string())];
284        let conn = try!(self.redispool.get());
285        try!(Pipeline::new()
286            .hset_multiple(self.with_namespace(&self.identity()), &content)
287            .expire(self.with_namespace(&self.identity()), 5)
288            .sadd(self.with_namespace(&"processes"), self.identity())
289            .query::<()>(&*conn));
290
291        Ok(())
292
293    }
294
295
296    fn report_processed(&mut self, n: usize) -> Result<()> {
297        let connection = try!(self.redispool.get());
298        let _: () = Pipeline::new().incr(self.with_namespace(&format!("stat:processed:{}",
299                                               UTC::now().format("%Y-%m-%d"))),
300                  n)
301            .incr(self.with_namespace(&format!("stat:processed")), n)
302            .query(&*connection)?;
303
304        Ok(())
305    }
306
307
308    fn report_failed(&mut self, n: usize) -> Result<()> {
309        let connection = try!(self.redispool.get());
310        let _: () = Pipeline::new()
311            .incr(self.with_namespace(&format!("stat:failed:{}", UTC::now().format("%Y-%m-%d"))),
312                  n)
313            .incr(self.with_namespace(&format!("stat:failed")), n)
314            .query(&*connection)?;
315        Ok(())
316    }
317
318
319    fn identity(&self) -> String {
320        let host = rust_gethostname().unwrap_or("unknown".into());
321        let pid = self.pid;
322
323        host + ":" + &pid.to_string() + ":" + &self.rs
324    }
325
326
327    fn with_namespace(&self, snippet: &str) -> String {
328        if self.namespace == "" {
329            snippet.into()
330        } else {
331            self.namespace.clone() + ":" + snippet
332        }
333    }
334}