1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use redis::{Pipeline, PipelineCommands};
5use r2d2::{Pool, Config};
6use r2d2_redis::RedisConnectionManager;
7
8use rand::Rng;
9
10use threadpool::ThreadPool;
11
12use chan::{sync, after, tick, Receiver, Sender};
13use chan_signal::{Signal as SysSignal, notify};
14
15use libc::getpid;
16
17use chrono::UTC;
18
19use serde_json::to_string;
20
21use worker::SidekiqWorker;
22use errors::*;
23use utils::rust_gethostname;
24use middleware::MiddleWare;
25use job_handler::JobHandler;
26use RedisPool;
27
28#[derive(Debug)]
29pub enum Signal {
30 Complete(String, usize),
31 Fail(String, usize),
32 Acquire(String),
33 Terminated(String),
34}
35
36pub enum Operation {
37 Terminate,
38}
39
40pub struct SidekiqServer<'a> {
41 redispool: RedisPool,
42 threadpool: ThreadPool,
43 pub namespace: String,
44 job_handlers: BTreeMap<String, Box<JobHandler + 'a>>,
45 middlewares: Vec<Box<MiddleWare + 'a>>,
46 queues: Vec<String>,
47 weights: Vec<f64>,
48 started_at: f64,
49 rs: String,
50 pid: usize,
51 signal_chan: Receiver<SysSignal>,
52 worker_info: BTreeMap<String, bool>, concurrency: usize,
54 pub force_quite_timeout: usize,
55}
56
57impl<'a> SidekiqServer<'a> {
58 pub fn new(redis: &str, concurrency: usize) -> Result<Self> {
61 let signal = notify(&[SysSignal::INT, SysSignal::USR1]); let now = UTC::now();
63 let config = Config::builder()
64 .pool_size(concurrency as u32 + 3) .build();
66 let manager = try!(RedisConnectionManager::new(redis));
67 let pool = try!(Pool::new(config, manager));
68 Ok(SidekiqServer {
69 redispool: pool,
70 threadpool: ThreadPool::new_with_name("worker".into(), concurrency),
71 namespace: String::new(),
72 job_handlers: BTreeMap::new(),
73 queues: vec![],
74 weights: vec![],
75 started_at: now.timestamp() as f64 + now.timestamp_subsec_micros() as f64 / 1000000f64,
76 pid: unsafe { getpid() } as usize,
77 worker_info: BTreeMap::new(),
78 concurrency: concurrency,
79 signal_chan: signal,
80 force_quite_timeout: 10,
81 middlewares: vec![],
82 rs: ::rand::thread_rng().gen_ascii_chars().take(12).collect(),
84 })
85 }
86
87 pub fn new_queue(&mut self, name: &str, weight: usize) {
88 self.queues.push(name.into());
89 self.weights.push(weight as f64);
90 }
91
92 pub fn attach_handler<T: JobHandler + 'a>(&mut self, name: &str, handle: T) {
93 self.job_handlers.insert(name.into(), Box::new(handle));
94 }
95
96 pub fn attach_middleware<T: MiddleWare + 'a>(&mut self, factory: T) {
97 self.middlewares.push(Box::new(factory));
98 }
99
100 pub fn start(&mut self) {
101 info!("sidekiq-rs is running...");
102 if self.queues.len() == 0 {
103 error!("queue is empty, exiting");
104 return;
105 }
106 let (tsx, rsx) = sync(self.concurrency + 10);
107 let (tox, rox) = sync(self.concurrency + 10);
108 let signal = self.signal_chan.clone();
109
110 self.launch_workers(tsx.clone(), rox.clone());
112
113 let (tox2, rsx2) = (tox.clone(), rsx.clone()); let clock = tick(Duration::from_secs(2)); loop {
117 if let Err(e) = self.report_alive() {
118 error!("report alive failed: '{}'", e);
119 }
120 chan_select! {
121 signal.recv() -> signal => {
122 match signal {
123 Some(signal @ SysSignal::USR1) => {
124 info!("{:?}: Terminating", signal);
125 self.terminate_gracefully(tox2, rsx2);
126 break;
127 }
128 Some(signal @ SysSignal::INT) => {
129 info!("{:?}: Force terminating", signal);
130 self.terminate_forcely(tox2, rsx2);
131 break;
132 }
133 Some(_) => { unimplemented!() }
134 None => { unimplemented!() }
135 }
136 },
137 clock.recv() => {
138 debug!("server clock triggered");
139 },
140 rsx.recv() -> sig => {
141 debug!("received signal {:?}", sig);
142 if let Some(Err(e)) = sig.map(|s| self.deal_signal(s)) {
143 error!("error when dealing signal: '{}'", e);
144 }
145 let worker_count = self.threadpool.active_count();
146 if worker_count < self.concurrency {
148 warn!("worker down, restarting");
149 self.launch_workers(tsx.clone(), rox.clone());
150 } else if worker_count > self.concurrency {
151 unreachable!("unreachable! worker_count can never larger than concurrency!")
152 }
153 }
154 }
155 }
156
157 info!("sidekiq exited");
159 }
160
161 fn launch_workers(&mut self, tsx: Sender<Signal>, rox: Receiver<Operation>) {
165 while self.worker_info.len() < self.concurrency {
166 self.launch_worker(tsx.clone(), rox.clone());
167 }
168 }
169
170
171 fn launch_worker(&mut self, tsx: Sender<Signal>, rox: Receiver<Operation>) {
172 let worker = SidekiqWorker::new(&self.identity(),
173 self.redispool.clone(),
174 tsx,
175 rox,
176 self.queues.clone(),
177 self.weights.clone(),
178 self.job_handlers
179 .iter_mut()
180 .map(|(k, v)| (k.clone(), v.cloned()))
181 .collect(),
182 self.middlewares.iter_mut().map(|v| v.cloned()).collect(),
183 self.namespace.clone());
184 self.worker_info.insert(worker.id.clone(), false);
185 self.threadpool.execute(move || worker.work());
186 }
187
188 fn inform_termination(&self, tox: Sender<Operation>) {
189 for _ in 0..self.concurrency {
190 tox.send(Operation::Terminate);
191 }
192 }
193
194 fn terminate_forcely(&mut self, tox: Sender<Operation>, rsx: Receiver<Signal>) {
195 self.inform_termination(tox);
196
197 let timer = after(Duration::from_secs(self.force_quite_timeout as u64));
198 loop {
200 chan_select! {
201 timer.recv() => {
202 info!("force quitting");
203 break
204 },
205 rsx.recv() -> sig => {
206 debug!("received signal {:?}", sig);
207 if let Some(Err(e)) = sig.map(|s| self.deal_signal(s)) {
208 error!("error when dealing signal: '{}'", e);
209 }
210 if self.worker_info.len() == 0 {
211 break
212 }
213 },
214 }
215 }
216 }
217
218
219 fn terminate_gracefully(&mut self, tox: Sender<Operation>, rsx: Receiver<Signal>) {
220 self.inform_termination(tox);
221
222 info!("waiting for other workers exit");
223 loop {
225 chan_select! {
226 rsx.recv() -> sig => {
227 debug!("received signal {:?}", sig);
228 if let Some(Err(e)) = sig.map(|s| self.deal_signal(s)) {
229 error!("error when dealing signal: '{}'", e);
230 }
231 if self.worker_info.len()== 0 {
232 break
233 }
234 },
235 }
236 }
237 }
238
239
240 fn deal_signal(&mut self, sig: Signal) -> Result<()> {
241 debug!("dealing signal {:?}", sig);
242 match sig {
243 Signal::Complete(id, n) => {
244 let _ = try!(self.report_processed(n));
245 *self.worker_info.get_mut(&id).unwrap() = false;
246 }
247 Signal::Fail(id, n) => {
248 let _ = try!(self.report_failed(n));
249 *self.worker_info.get_mut(&id).unwrap() = false;
250 }
251 Signal::Acquire(id) => {
252 self.worker_info.insert(id, true);
253 }
254 Signal::Terminated(id) => {
255 self.worker_info.remove(&id);
256 }
257 }
258 debug!("signal dealt");
259 Ok(())
260 }
261
262 fn report_alive(&mut self) -> Result<()> {
266 let now = UTC::now();
267
268 let content = vec![("info",
269 to_string(&json!({
270 "hostname": rust_gethostname().unwrap_or("unknown".into()),
271 "started_at": self.started_at,
272 "pid": self.pid,
273 "concurrency": self.concurrency,
274 "queues": self.queues.clone(),
275 "labels": [],
276 "identity": self.identity()
277 }))
278 .unwrap()),
279 ("busy", self.worker_info.values().filter(|v| **v).count().to_string()),
280 ("beat",
281 (now.timestamp() as f64 +
282 now.timestamp_subsec_micros() as f64 / 1000000f64)
283 .to_string())];
284 let conn = try!(self.redispool.get());
285 try!(Pipeline::new()
286 .hset_multiple(self.with_namespace(&self.identity()), &content)
287 .expire(self.with_namespace(&self.identity()), 5)
288 .sadd(self.with_namespace(&"processes"), self.identity())
289 .query::<()>(&*conn));
290
291 Ok(())
292
293 }
294
295
296 fn report_processed(&mut self, n: usize) -> Result<()> {
297 let connection = try!(self.redispool.get());
298 let _: () = Pipeline::new().incr(self.with_namespace(&format!("stat:processed:{}",
299 UTC::now().format("%Y-%m-%d"))),
300 n)
301 .incr(self.with_namespace(&format!("stat:processed")), n)
302 .query(&*connection)?;
303
304 Ok(())
305 }
306
307
308 fn report_failed(&mut self, n: usize) -> Result<()> {
309 let connection = try!(self.redispool.get());
310 let _: () = Pipeline::new()
311 .incr(self.with_namespace(&format!("stat:failed:{}", UTC::now().format("%Y-%m-%d"))),
312 n)
313 .incr(self.with_namespace(&format!("stat:failed")), n)
314 .query(&*connection)?;
315 Ok(())
316 }
317
318
319 fn identity(&self) -> String {
320 let host = rust_gethostname().unwrap_or("unknown".into());
321 let pid = self.pid;
322
323 host + ":" + &pid.to_string() + ":" + &self.rs
324 }
325
326
327 fn with_namespace(&self, snippet: &str) -> String {
328 if self.namespace == "" {
329 snippet.into()
330 } else {
331 self.namespace.clone() + ":" + snippet
332 }
333 }
334}