1#![deny(missing_docs)]
2use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
30use std::collections::HashMap;
31use std::io;
32use std::io::prelude::*;
33use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
34use std::process;
35use uuid::Uuid;
36
37const CAN_DO: u32 = 1;
38const CANT_DO: u32 = 2;
39const PRE_SLEEP: u32 = 4;
41const NOOP: u32 = 6;
42const GRAB_JOB: u32 = 9;
43const NO_JOB: u32 = 10;
44const JOB_ASSIGN: u32 = 11;
45const WORK_COMPLETE: u32 = 13;
47const WORK_FAIL: u32 = 14;
48const SET_CLIENT_ID: u32 = 22;
49const WORK_EXCEPTION: u32 = 25;
52struct Packet {
61 cmd: u32,
63 data: Vec<u8>,
65}
66
67type WorkResult = Result<Vec<u8>, Option<Vec<u8>>>;
68type Callback = Box<Fn(&[u8]) -> WorkResult + 'static>;
69
70struct CallbackInfo {
71 callback: Callback,
72 enabled: bool,
73}
74
75impl CallbackInfo {
76 fn new<F: Fn(&[u8]) -> WorkResult + 'static>(callback: F) -> Self {
77 Self {
78 callback: Box::new(callback),
79 enabled: true,
80 }
81 }
82}
83
84impl Packet {
85 fn from_stream(stream: &mut TcpStream) -> io::Result<Self> {
87 let mut magic = vec![0u8; 4];
88 stream.read_exact(&mut magic)?;
89
90 if magic != b"\0RES" {
91 return Err(io::Error::new(
92 io::ErrorKind::InvalidData,
93 "Unexpected magic packet received from server",
94 ));
95 }
96
97 let cmd = stream.read_u32::<BigEndian>()?;
98 let size = stream.read_u32::<BigEndian>()?;
99 let mut data = vec![0u8; size as usize];
100
101 if size > 0 {
102 stream.read_exact(&mut data)?;
103 }
104
105 Ok(Packet { cmd, data })
106 }
107}
108
109struct Job {
110 handle: String,
111 function: String,
112 workload: Vec<u8>,
113}
114
115impl Job {
116 fn from_data(data: &[u8]) -> io::Result<Self> {
117 let mut iter = data.split(|c| *c == 0);
118
119 let handle = match iter.next() {
120 Some(handle) => String::from_utf8_lossy(handle),
121 None => {
122 return Err(io::Error::new(
123 io::ErrorKind::InvalidData,
124 "Could not decode handle id",
125 ));
126 }
127 };
128
129 let fun = match iter.next() {
130 Some(fun) => String::from_utf8_lossy(fun),
131 None => {
132 return Err(io::Error::new(
133 io::ErrorKind::InvalidData,
134 "Could not decode function name",
135 ));
136 }
137 };
138
139 let payload = &data[handle.len() + fun.len() + 2..];
140
141 Ok(Self {
142 handle: handle.to_string(),
143 function: fun.to_string(),
144 workload: payload.to_vec(),
145 })
146 }
147
148 fn send_response(
149 &self,
150 server: &mut ServerConnection,
151 response: &WorkResult,
152 ) -> io::Result<()> {
153 let (op, data) = match response {
154 Ok(data) => (WORK_COMPLETE, Some(data)),
155 Err(Some(data)) => (WORK_FAIL, Some(data)),
156 Err(None) => (WORK_EXCEPTION, None),
157 };
158
159 let size = self.handle.len() + 1 + data.map_or(0, |b| b.len());
160
161 let mut payload = Vec::with_capacity(size);
162 payload.extend_from_slice(self.handle.as_bytes());
163 if let Some(data) = data {
164 payload.extend_from_slice(b"\0");
165 payload.extend_from_slice(data);
166 }
167 server.send(op, &payload[..])
168 }
169}
170
171pub struct Worker {
196 id: String,
198 server: ServerConnection,
199 functions: HashMap<String, CallbackInfo>,
200}
201
202pub struct WorkerBuilder {
204 id: String,
205 addr: SocketAddr,
206}
207
208struct ServerConnection {
209 addr: SocketAddr,
210 stream: Option<TcpStream>,
211}
212
213impl ServerConnection {
214 fn new(addr: SocketAddr) -> Self {
215 Self { addr, stream: None }
216 }
217
218 fn connect(&mut self) -> io::Result<()> {
219 let stream = TcpStream::connect(self.addr)?;
220 self.stream = Some(stream);
221 Ok(())
222 }
223
224 fn read_header(&mut self) -> io::Result<Packet> {
225 let mut stream = match &mut self.stream {
226 Some(ref mut stream) => stream,
227 None => {
228 return Err(io::Error::new(
229 io::ErrorKind::NotConnected,
230 "Stream is not open...",
231 ));
232 }
233 };
234
235 Ok(Packet::from_stream(&mut stream)?)
236 }
237
238 fn send(&mut self, command: u32, param: &[u8]) -> io::Result<()> {
239 let mut stream = match &self.stream {
240 Some(ref stream) => stream,
241 None => {
242 return Err(io::Error::new(
243 io::ErrorKind::NotConnected,
244 "Stream is not open...",
245 ));
246 }
247 };
248
249 stream.write_all(b"\0REQ")?;
250 stream.write_u32::<BigEndian>(command)?;
251 stream.write_u32::<BigEndian>(param.len() as u32)?;
252 stream.write_all(param)?;
253
254 Ok(())
255 }
256}
257
258impl Worker {
259 pub fn register_function<S, F>(&mut self, name: S, callback: F) -> io::Result<()>
274 where
275 S: AsRef<str>,
276 F: Fn(&[u8]) -> WorkResult + 'static,
277 {
278 let name = name.as_ref();
279 self.server.send(CAN_DO, &name.as_bytes())?;
280 self.functions
281 .insert(name.to_string(), CallbackInfo::new(callback));
282 Ok(())
283 }
284
285 pub fn unregister_function<S>(&mut self, name: S) -> io::Result<()>
288 where
289 S: AsRef<str>,
290 {
291 let name = name.as_ref();
292 if let Some(func) = self.functions.remove(&name.to_string()) {
293 if func.enabled {
294 self.server.send(CANT_DO, &name.as_bytes())?;
295 }
296 }
297 Ok(())
298 }
299
300 pub fn set_function_enabled<S>(&mut self, name: S, enabled: bool) -> io::Result<()>
303 where
304 S: AsRef<str>,
305 {
306 let name = name.as_ref();
307 match self.functions.get_mut(name) {
308 Some(ref mut func) if func.enabled != enabled => {
309 func.enabled = enabled;
310 let op = if enabled { CAN_DO } else { CANT_DO };
311 self.server.send(op, name.as_bytes())?;
312 }
313 Some(_) => eprintln!(
314 "Function {} is already {}",
315 name,
316 if enabled { "enabled" } else { "disabled" }
317 ),
318 None => eprintln!("Unknown function {}", name),
319 }
320 Ok(())
321 }
322
323 pub fn set_client_id(&mut self) -> io::Result<()> {
325 self.server.send(SET_CLIENT_ID, self.id.as_bytes())
326 }
327
328 fn sleep(&mut self) -> io::Result<()> {
329 self.server.send(PRE_SLEEP, b"")?;
330 let resp = self.server.read_header()?;
331 match resp.cmd {
332 n if n == NOOP => Ok(()),
333 n => Err(io::Error::new(
334 io::ErrorKind::InvalidData,
335 format!(
336 "Worker was sleeping. NOOP was expected but packet {} was received instead.",
337 n
338 ),
339 )),
340 }
341 }
342
343 fn grab_job(&mut self) -> io::Result<Option<Job>> {
344 self.server.send(GRAB_JOB, b"")?;
345 let resp = self.server.read_header()?;
346 match resp.cmd {
347 n if n == JOB_ASSIGN => Ok(Some(Job::from_data(&resp.data[..])?)),
348 n if n == NO_JOB => Ok(None),
349 n => Err(io::Error::new(
350 io::ErrorKind::InvalidData,
351 format!(
352 "Either JOB_ASSIGN or NO_JOB was expected but packet {} was received instead.",
353 n
354 ),
355 )),
356 }
357 }
358
359 pub fn do_work(&mut self) -> io::Result<u32> {
362 let mut jobs = 0;
363
364 if let Some(job) = self.grab_job()? {
365 jobs += 1;
366 match self.functions.get(&job.function) {
367 Some(func) if func.enabled => {
368 job.send_response(&mut self.server, &(func.callback)(&job.workload))?
369 }
370 Some(_) => eprintln!("Disabled job {:?}", job.function),
372 None => eprintln!("Unknown job {:?}", job.function),
373 }
374 }
375
376 Ok(jobs)
377 }
378
379 pub fn run(&mut self) -> io::Result<()> {
382 loop {
383 let done = self.do_work()?;
384 if done == 0 {
385 self.sleep()?;
386 }
387 }
388 }
389
390 pub fn connect(&mut self) -> io::Result<&mut Self> {
392 self.server.connect()?;
393 self.set_client_id()?;
394 Ok(self)
395 }
396}
397
398impl WorkerBuilder {
399 pub fn new<S: Into<String>>(id: S, addr: SocketAddr) -> Self {
401 Self {
402 id: id.into(),
403 addr,
404 }
405 }
406
407 pub fn id<S: Into<String>>(&mut self, id: S) -> &mut Self {
409 self.id = id.into();
410 self
411 }
412
413 pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
415 self.addr = addr;
416 self
417 }
418
419 pub fn build(&self) -> Worker {
421 Worker {
422 id: self.id.clone(),
423 server: ServerConnection::new(self.addr),
424 functions: HashMap::new(),
425 }
426 }
427}
428
429impl Default for WorkerBuilder {
430 fn default() -> Self {
431 let uniqid = Uuid::new_v4();
432 Self::new(
433 format!("{}-{}", process::id(), uniqid.to_hyphenated()),
434 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4730),
435 )
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use std::io::BufReader;
443 use std::process::{Child, Command, Stdio};
444 use std::thread;
445 use std::time;
446
447 fn run_gearmand() -> Child {
448 let mut gearmand = Command::new("gearmand")
449 .arg("-L")
450 .arg("127.0.0.1")
451 .arg("-p")
452 .arg("14730")
453 .arg("-l")
454 .arg("stderr")
455 .arg("--verbose")
456 .arg("INFO")
457 .stderr(Stdio::piped())
458 .spawn()
459 .expect("Failed to stard gearmand");
460
461 let gearmand_err = gearmand
462 .stderr
463 .take()
464 .expect("Failed to capture gearmand's stderr");
465 let mut reader = BufReader::new(gearmand_err);
466 loop {
467 let mut line = String::new();
468 let len = reader.read_line(&mut line).unwrap();
469 if len == 0 || line.contains("Listening on 127.0.0.1:14730") {
470 break;
471 }
472 }
473
474 gearmand
475 }
476
477 fn submit_job(func: &str) -> Child {
478 let gearman_cli = Command::new("gearman")
479 .arg("-Is")
480 .arg("-p")
481 .arg("14730")
482 .arg("-f")
483 .arg(func)
484 .stdout(Stdio::piped())
485 .spawn()
486 .expect("Failed to submit gearman job");
487
488 let wait = time::Duration::from_millis(250);
489 thread::sleep(wait);
490
491 gearman_cli
492 }
493
494 #[test]
495 fn it_works() {
496 let mut gearmand = run_gearmand();
497
498 let addr = "127.0.0.1:14730".parse().unwrap();
499
500 let mut worker = WorkerBuilder::default()
501 .addr(addr)
502 .id("gearman-worker-rs-1")
503 .build();
504
505 worker
506 .connect()
507 .expect("Failed to connect to gearmand server");
508
509 worker
510 .register_function("testfun", |_| {
511 println!("testfun called");
512 Ok(b"foobar".to_vec())
513 })
514 .expect("Failed to register test function");
515
516 let gearman_cli = submit_job("testfun");
523
524 let done = worker.do_work().unwrap();
525 assert_eq!(1, done);
526
527 let output = gearman_cli
528 .wait_with_output()
529 .expect("Failed to retrieve job output");
530 assert_eq!(b"foobar", output.stdout.as_slice());
531
532 gearmand.kill().expect("Failed to kill gearmand");
533 }
534}