gearman_worker/
lib.rs

1#![deny(missing_docs)]
2//! # gearman-worker
3//!
4//! The `gearman-worker` crate provides a high level library to easily
5//! implement gearman [`Worker`](struct.Worker.html)s.
6//!
7//! It handles registration of functions as jobs in the gearman queue
8//! server, fetching of jobs and their workload.
9//!
10//! ## Usage
11//!
12//! ```ignore
13//! use gearman_worker::WorkerBuilder;
14//!
15//! fn main() {
16//!     let mut worker = WorkerBuilder::default().build();
17//!     worker.connect().unwrap();
18//!
19//!     worker.register_function("greet", |input| {
20//!         let hello = String::from_utf8_lossy(input);
21//!         let response = format!("{} world!", hello);
22//!         Ok(response.into_bytes())
23//!     }).unwrap();
24//!
25//!     worker.run().unwrap();
26//! }
27//! ```
28
29use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
30use std::collections::HashMap;
31use std::io;
32use std::io::prelude::*;
33use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
34use std::process;
35use uuid::Uuid;
36
37const CAN_DO: u32 = 1;
38const CANT_DO: u32 = 2;
39// const RESET_ABILITIES: u32 = 3;
40const PRE_SLEEP: u32 = 4;
41const NOOP: u32 = 6;
42const GRAB_JOB: u32 = 9;
43const NO_JOB: u32 = 10;
44const JOB_ASSIGN: u32 = 11;
45// const WORK_STATUS: u32 = 12;
46const WORK_COMPLETE: u32 = 13;
47const WORK_FAIL: u32 = 14;
48const SET_CLIENT_ID: u32 = 22;
49// const CAN_DO_TIMEOUT: u32 = 23;
50// const ALL_YOURS: u32 = 24;
51const WORK_EXCEPTION: u32 = 25;
52// const WORK_DATA: u32 = 28;
53// const WORK_WARNING: u32 = 29;
54// const GRAB_JOB_UNIQ: u32 = 30;
55// const JOB_ASSIGN_UNIQ: u32 = 31;
56// const GRAB_JOB_ALL: u32 = 39;
57// const JOB_ASSIGN_ALL: u32 = 40;
58
59/// A packet received from the gearman queue server.
60struct Packet {
61    /// The packet type representing the request intent
62    cmd: u32,
63    /// The data associated with the request
64    data: Vec<u8>,
65}
66
67type WorkResult = Result<Vec<u8>, Option<Vec<u8>>>;
68type Callback = Box<Fn(&[u8]) -> WorkResult + 'static>;
69
70struct CallbackInfo {
71    callback: Callback,
72    enabled: bool,
73}
74
75impl CallbackInfo {
76    fn new<F: Fn(&[u8]) -> WorkResult + 'static>(callback: F) -> Self {
77        Self {
78            callback: Box::new(callback),
79            enabled: true,
80        }
81    }
82}
83
84impl Packet {
85    /// Decodes a packet from a stream received from the gearman server
86    fn from_stream(stream: &mut TcpStream) -> io::Result<Self> {
87        let mut magic = vec![0u8; 4];
88        stream.read_exact(&mut magic)?;
89
90        if magic != b"\0RES" {
91            return Err(io::Error::new(
92                io::ErrorKind::InvalidData,
93                "Unexpected magic packet received from server",
94            ));
95        }
96
97        let cmd = stream.read_u32::<BigEndian>()?;
98        let size = stream.read_u32::<BigEndian>()?;
99        let mut data = vec![0u8; size as usize];
100
101        if size > 0 {
102            stream.read_exact(&mut data)?;
103        }
104
105        Ok(Packet { cmd, data })
106    }
107}
108
109struct Job {
110    handle: String,
111    function: String,
112    workload: Vec<u8>,
113}
114
115impl Job {
116    fn from_data(data: &[u8]) -> io::Result<Self> {
117        let mut iter = data.split(|c| *c == 0);
118
119        let handle = match iter.next() {
120            Some(handle) => String::from_utf8_lossy(handle),
121            None => {
122                return Err(io::Error::new(
123                    io::ErrorKind::InvalidData,
124                    "Could not decode handle id",
125                ));
126            }
127        };
128
129        let fun = match iter.next() {
130            Some(fun) => String::from_utf8_lossy(fun),
131            None => {
132                return Err(io::Error::new(
133                    io::ErrorKind::InvalidData,
134                    "Could not decode function name",
135                ));
136            }
137        };
138
139        let payload = &data[handle.len() + fun.len() + 2..];
140
141        Ok(Self {
142            handle: handle.to_string(),
143            function: fun.to_string(),
144            workload: payload.to_vec(),
145        })
146    }
147
148    fn send_response(
149        &self,
150        server: &mut ServerConnection,
151        response: &WorkResult,
152    ) -> io::Result<()> {
153        let (op, data) = match response {
154            Ok(data) => (WORK_COMPLETE, Some(data)),
155            Err(Some(data)) => (WORK_FAIL, Some(data)),
156            Err(None) => (WORK_EXCEPTION, None),
157        };
158
159        let size = self.handle.len() + 1 + data.map_or(0, |b| b.len());
160
161        let mut payload = Vec::with_capacity(size);
162        payload.extend_from_slice(self.handle.as_bytes());
163        if let Some(data) = data {
164            payload.extend_from_slice(b"\0");
165            payload.extend_from_slice(data);
166        }
167        server.send(op, &payload[..])
168    }
169}
170
171/// The `Worker` processes jobs provided by the gearman queue server.
172///
173/// Building a worker requires a [`SocketAddr`](https://doc.rust-lang.org/std/net/enum.SocketAddr.html) to
174/// connect to the gearman server (typically some ip address on port 4730).
175///
176/// The worker also needs a unique id to identify itself to the server.
177/// This can be omitted letting the [`WorkerBuilder`](struct.WorkerBuilder.html) generate one composed
178/// by the process id and a random uuid v4.
179///
180/// # Examples
181///
182/// Create a worker with all default options.
183///
184/// ```
185/// use gearman_worker::WorkerBuilder;
186/// let mut worker = WorkerBuilder::default().build();
187/// ```
188///
189/// Create a worker with all explicit options.
190///
191/// ```
192/// use gearman_worker::WorkerBuilder;
193/// let mut worker = WorkerBuilder::new("my-worker-1", "127.0.0.1:4730".parse().unwrap()).build();
194/// ```
195pub struct Worker {
196    /// the unique id of the worker
197    id: String,
198    server: ServerConnection,
199    functions: HashMap<String, CallbackInfo>,
200}
201
202/// Helps building a new [`Worker`](struct.Worker.html)
203pub struct WorkerBuilder {
204    id: String,
205    addr: SocketAddr,
206}
207
208struct ServerConnection {
209    addr: SocketAddr,
210    stream: Option<TcpStream>,
211}
212
213impl ServerConnection {
214    fn new(addr: SocketAddr) -> Self {
215        Self { addr, stream: None }
216    }
217
218    fn connect(&mut self) -> io::Result<()> {
219        let stream = TcpStream::connect(self.addr)?;
220        self.stream = Some(stream);
221        Ok(())
222    }
223
224    fn read_header(&mut self) -> io::Result<Packet> {
225        let mut stream = match &mut self.stream {
226            Some(ref mut stream) => stream,
227            None => {
228                return Err(io::Error::new(
229                    io::ErrorKind::NotConnected,
230                    "Stream is not open...",
231                ));
232            }
233        };
234
235        Ok(Packet::from_stream(&mut stream)?)
236    }
237
238    fn send(&mut self, command: u32, param: &[u8]) -> io::Result<()> {
239        let mut stream = match &self.stream {
240            Some(ref stream) => stream,
241            None => {
242                return Err(io::Error::new(
243                    io::ErrorKind::NotConnected,
244                    "Stream is not open...",
245                ));
246            }
247        };
248
249        stream.write_all(b"\0REQ")?;
250        stream.write_u32::<BigEndian>(command)?;
251        stream.write_u32::<BigEndian>(param.len() as u32)?;
252        stream.write_all(param)?;
253
254        Ok(())
255    }
256}
257
258impl Worker {
259    /// Registers a `callback` function that can handle jobs with the specified `name`
260    /// provided by the gearman queue server.
261    ///
262    /// The callback has the signature `Fn(&[u8]) -> WorkResult + 'static` receiving a
263    /// slice of bytes in input, which is the workload received from the gearmand server.
264    ///
265    /// It can return `Ok(Vec<u8>)` ([`WORK_COMPLETE`][protocol]) where the vector of
266    /// bytes is the result of the job that will be transmitted back to the server,
267    /// `Err(None)` ([`WORK_EXCEPTION`][protocol]) which will tell the server that the
268    /// job failed with an unspecified error or `Err(Some(Vec<u8>))` ([`WORK_FAIL`][protocol])
269    /// which will also represent a job failure but will include a payload of the error
270    /// to the gearmand server.
271    ///
272    /// [protocol]: http://gearman.org/protocol
273    pub fn register_function<S, F>(&mut self, name: S, callback: F) -> io::Result<()>
274    where
275        S: AsRef<str>,
276        F: Fn(&[u8]) -> WorkResult + 'static,
277    {
278        let name = name.as_ref();
279        self.server.send(CAN_DO, &name.as_bytes())?;
280        self.functions
281            .insert(name.to_string(), CallbackInfo::new(callback));
282        Ok(())
283    }
284
285    /// Unregisters a previously registered function, notifying the server that
286    /// this worker is not available anymore to process jobs with the specified `name`.
287    pub fn unregister_function<S>(&mut self, name: S) -> io::Result<()>
288    where
289        S: AsRef<str>,
290    {
291        let name = name.as_ref();
292        if let Some(func) = self.functions.remove(&name.to_string()) {
293            if func.enabled {
294                self.server.send(CANT_DO, &name.as_bytes())?;
295            }
296        }
297        Ok(())
298    }
299
300    /// Notify the gearman queue server that we are available/unavailable to process
301    /// jobs with the specified `name`.
302    pub fn set_function_enabled<S>(&mut self, name: S, enabled: bool) -> io::Result<()>
303    where
304        S: AsRef<str>,
305    {
306        let name = name.as_ref();
307        match self.functions.get_mut(name) {
308            Some(ref mut func) if func.enabled != enabled => {
309                func.enabled = enabled;
310                let op = if enabled { CAN_DO } else { CANT_DO };
311                self.server.send(op, name.as_bytes())?;
312            }
313            Some(_) => eprintln!(
314                "Function {} is already {}",
315                name,
316                if enabled { "enabled" } else { "disabled" }
317            ),
318            None => eprintln!("Unknown function {}", name),
319        }
320        Ok(())
321    }
322
323    /// Let the server know that the worker identifies itself with the associated `id`.
324    pub fn set_client_id(&mut self) -> io::Result<()> {
325        self.server.send(SET_CLIENT_ID, self.id.as_bytes())
326    }
327
328    fn sleep(&mut self) -> io::Result<()> {
329        self.server.send(PRE_SLEEP, b"")?;
330        let resp = self.server.read_header()?;
331        match resp.cmd {
332            n if n == NOOP => Ok(()),
333            n => Err(io::Error::new(
334                io::ErrorKind::InvalidData,
335                format!(
336                    "Worker was sleeping. NOOP was expected but packet {} was received instead.",
337                    n
338                ),
339            )),
340        }
341    }
342
343    fn grab_job(&mut self) -> io::Result<Option<Job>> {
344        self.server.send(GRAB_JOB, b"")?;
345        let resp = self.server.read_header()?;
346        match resp.cmd {
347            n if n == JOB_ASSIGN => Ok(Some(Job::from_data(&resp.data[..])?)),
348            n if n == NO_JOB => Ok(None),
349            n => Err(io::Error::new(
350                io::ErrorKind::InvalidData,
351                format!(
352                    "Either JOB_ASSIGN or NO_JOB was expected but packet {} was received instead.",
353                    n
354                ),
355            )),
356        }
357    }
358
359    /// Ask the server to do some work. This will process one job that will be provided by
360    /// the gearman queue server when available.
361    pub fn do_work(&mut self) -> io::Result<u32> {
362        let mut jobs = 0;
363
364        if let Some(job) = self.grab_job()? {
365            jobs += 1;
366            match self.functions.get(&job.function) {
367                Some(func) if func.enabled => {
368                    job.send_response(&mut self.server, &(func.callback)(&job.workload))?
369                }
370                // gearmand should never pass us a job which was never advertised or unregistered
371                Some(_) => eprintln!("Disabled job {:?}", job.function),
372                None => eprintln!("Unknown job {:?}", job.function),
373            }
374        }
375
376        Ok(jobs)
377    }
378
379    /// Process any available job as soon as the gearman queue server provides us with one
380    /// in a loop.
381    pub fn run(&mut self) -> io::Result<()> {
382        loop {
383            let done = self.do_work()?;
384            if done == 0 {
385                self.sleep()?;
386            }
387        }
388    }
389
390    /// Estabilish a connection with the queue server and send it the ID of this worker.
391    pub fn connect(&mut self) -> io::Result<&mut Self> {
392        self.server.connect()?;
393        self.set_client_id()?;
394        Ok(self)
395    }
396}
397
398impl WorkerBuilder {
399    /// Create a new WorkerBuilder passing all options explicitly.
400    pub fn new<S: Into<String>>(id: S, addr: SocketAddr) -> Self {
401        Self {
402            id: id.into(),
403            addr,
404        }
405    }
406
407    /// Set a specific ID for this worker. This should be unique to all workers!
408    pub fn id<S: Into<String>>(&mut self, id: S) -> &mut Self {
409        self.id = id.into();
410        self
411    }
412
413    /// Define the socket address to connect to.
414    pub fn addr(&mut self, addr: SocketAddr) -> &mut Self {
415        self.addr = addr;
416        self
417    }
418
419    /// Build the [`Worker`](struct.Worker.html).
420    pub fn build(&self) -> Worker {
421        Worker {
422            id: self.id.clone(),
423            server: ServerConnection::new(self.addr),
424            functions: HashMap::new(),
425        }
426    }
427}
428
429impl Default for WorkerBuilder {
430    fn default() -> Self {
431        let uniqid = Uuid::new_v4();
432        Self::new(
433            format!("{}-{}", process::id(), uniqid.to_hyphenated()),
434            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4730),
435        )
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use std::io::BufReader;
443    use std::process::{Child, Command, Stdio};
444    use std::thread;
445    use std::time;
446
447    fn run_gearmand() -> Child {
448        let mut gearmand = Command::new("gearmand")
449            .arg("-L")
450            .arg("127.0.0.1")
451            .arg("-p")
452            .arg("14730")
453            .arg("-l")
454            .arg("stderr")
455            .arg("--verbose")
456            .arg("INFO")
457            .stderr(Stdio::piped())
458            .spawn()
459            .expect("Failed to stard gearmand");
460
461        let gearmand_err = gearmand
462            .stderr
463            .take()
464            .expect("Failed to capture gearmand's stderr");
465        let mut reader = BufReader::new(gearmand_err);
466        loop {
467            let mut line = String::new();
468            let len = reader.read_line(&mut line).unwrap();
469            if len == 0 || line.contains("Listening on 127.0.0.1:14730") {
470                break;
471            }
472        }
473
474        gearmand
475    }
476
477    fn submit_job(func: &str) -> Child {
478        let gearman_cli = Command::new("gearman")
479            .arg("-Is")
480            .arg("-p")
481            .arg("14730")
482            .arg("-f")
483            .arg(func)
484            .stdout(Stdio::piped())
485            .spawn()
486            .expect("Failed to submit gearman job");
487
488        let wait = time::Duration::from_millis(250);
489        thread::sleep(wait);
490
491        gearman_cli
492    }
493
494    #[test]
495    fn it_works() {
496        let mut gearmand = run_gearmand();
497
498        let addr = "127.0.0.1:14730".parse().unwrap();
499
500        let mut worker = WorkerBuilder::default()
501            .addr(addr)
502            .id("gearman-worker-rs-1")
503            .build();
504
505        worker
506            .connect()
507            .expect("Failed to connect to gearmand server");
508
509        worker
510            .register_function("testfun", |_| {
511                println!("testfun called");
512                Ok(b"foobar".to_vec())
513            })
514            .expect("Failed to register test function");
515
516        // worker.set_function_enabled("testfun", false).unwrap();
517
518        // worker.unregister_function("testfun").unwrap();
519
520        // worker.set_function_enabled("testfun", true).unwrap();
521
522        let gearman_cli = submit_job("testfun");
523
524        let done = worker.do_work().unwrap();
525        assert_eq!(1, done);
526
527        let output = gearman_cli
528            .wait_with_output()
529            .expect("Failed to retrieve job output");
530        assert_eq!(b"foobar", output.stdout.as_slice());
531
532        gearmand.kill().expect("Failed to kill gearmand");
533    }
534}