tokio_beanstalkd/
lib.rs

1//! This crate provides a client for working with [Beanstalkd](https://beanstalkd.github.io/), a simple
2//! fast work queue.
3//
4//! # About Beanstalkd
5//!
6//! Beanstalkd is a simple fast work queue. It works at the TCP connection level, considering each TCP
7//! connection individually. A worker may have multiple connections to the Beanstalkd server and each
8//! connection will be considered separate.
9//!
10//! The protocol is ASCII text based but the data itself is just a bytestream. This means that the
11//! application is responsible for interpreting the data.
12//!
13//! ## Operation
14//! This library can serve as a client for both the application and the worker. The application would
15//! [`put`](tokio_beanstalkd::put) jobs on the queue and the workers can [`reserve`](tokio_beanstalkd::put)
16//! them. Once they are done with the job, they have to [`delete`](tokio_beanstalkd::delete) job.
17//! This is required for every job, or else Beanstalkd will not remove it fromits internal datastructres.
18//!
19//! If a worker cannot finish the job in it's TTR (Time To Run), then it can [`release`](tokio_beanstalkd::release)
20//! the job. Theapplication can use the [`using`](tokio_beanstalkd::using) method to put jobs in a specific tube,
21//! and workers can use [watch](tokio_beanstalkd::watch)
22//!
23//! [release]: struct.Beanstalkd.html#method.release
24//! [using]: struct.Beanstalkd.html#method.using
25//!
26//! ## Interaction with Tokio
27//!
28//! The futures in this crate expect to be running under a `tokio::Runtime`. In the common case,
29//! you cannot resolve them solely using `.wait()`, but should instead use `tokio::run` or
30//! explicitly create a `tokio::Runtime` and then use `Runtime::block_on`.
31//!
32//! An simple example client could look something like this:
33//!
34//! ```no_run
35//! # extern crate tokio;
36//! # extern crate futures;
37//! # extern crate tokio_beanstalkd;
38//! # use tokio::prelude::*;
39//! # use tokio_beanstalkd::*;
40//! # fn consumer_commands() {
41//! let mut rt = tokio::runtime::Runtime::new().unwrap();
42//! let bean = rt.block_on(
43//!     Beanstalkd::connect(&"127.0.0.1:11300".parse().unwrap()).and_then(|bean| {
44//!         bean.put(0, 1, 100, &b"update:42"[..])
45//!             .inspect(|(_, response)| {
46//!                 response.as_ref().unwrap();
47//!             })
48//!             .and_then(|(bean, _)| {
49//!                 // Use a particular tube
50//!                 bean.using("notifications")
51//!             }).and_then(|(bean, _)| bean.put(0, 1, 100, &b"notify:100"[..]))
52//!     }),
53//! );
54//! rt.shutdown_on_idle();
55//! # }
56//! ```
57//!
58//! And a worker could look something like this:
59//! ```no_run
60//! # extern crate tokio;
61//! # extern crate futures;
62//! # extern crate tokio_beanstalkd;
63//! # use tokio::prelude::*;
64//! # use tokio_beanstalkd::*;
65//! # fn consumer_commands() {
66//!  let mut rt = tokio::runtime::Runtime::new().unwrap();
67//!  let bean = rt.block_on(
68//!      Beanstalkd::connect(&"127.0.0.1:11300".parse().unwrap()).and_then(|bean| {
69//!          bean.reserve()
70//!              .inspect(|(_, response)| {
71//!                  // Do something with the response
72//!              }).and_then(|(bean, response)| {
73//!                  // Delete the job once it is done
74//!                  bean.delete(response.as_ref().unwrap().id)
75//!              })
76//!      }),
77//!  );
78//!  rt.shutdown_on_idle();
79//! # }
80//! ```
81
82extern crate bytes;
83extern crate futures;
84#[macro_use]
85extern crate failure;
86extern crate tokio;
87
88pub mod errors;
89mod proto;
90
91use tokio::codec::Framed;
92use tokio::prelude::*;
93
94use std::borrow::Cow;
95use std::net::SocketAddr;
96
97use proto::error as proto_error;
98pub use proto::response::*;
99pub use proto::{Id, Tube};
100use proto_error::{ErrorKind, ProtocolError};
101// Request doesn't have to be a public type
102use proto::Request;
103
104use errors::{BeanstalkError, Consumer, Put};
105
106/// A macro to map internal types to external types. The response from a Stream is a
107/// `(Result<Option<Decoder::Item>, Decoder::Error, connection)`. This value needs to be
108/// maapped to the types that we expect. As we can see there are three possible outcomes,
109/// and each outcome where there is either a response or an error from the server, we get
110/// either an AnyResponse or a proto_error::ProtocolError which need to mapped to simpler
111/// public facing types. That is what the two mappings are. The first one is when you get
112/// Some(response) and the second one is purely for when the Decoder returns an Error
113macro_rules! handle_response {
114    ($input:ident, $mapping:tt, $error_mapping:tt) => {
115        $input.into_future().then(|val| match val {
116            Ok((Some(val), conn)) => Ok((Beanstalkd { connection: conn }, match val $mapping)),
117            // None is only returned when the stream is closed
118            Ok((None, _)) => bail!("Stream closed"),
119            Err((e, conn)) => Ok((Beanstalkd { connection: conn }, match e.kind() $error_mapping)),
120        })
121    };
122}
123
124/// Connection to the Beanstalkd server.
125///
126/// All interactions with Beanstalkd happen by calling methods on a `Beanstalkd` instance.
127///
128/// Even though there is a `quit` command, Beanstalkd consideres a closed connection as the
129/// end of communication, so just dropping this struct will close the connection.
130#[derive(Debug)]
131pub struct Beanstalkd {
132    connection: Framed<tokio::net::TcpStream, proto::CommandCodec>,
133}
134
135// FIXME: log out unexpected errors using env_logger
136impl Beanstalkd {
137    /// Connect to a Beanstalkd instance.
138    ///
139    /// A successful TCP connect is considered the start of communication.
140    pub fn connect(addr: &SocketAddr) -> impl Future<Item = Self, Error = failure::Error> {
141        tokio::net::TcpStream::connect(addr)
142            .map_err(failure::Error::from)
143            .map(Beanstalkd::setup)
144    }
145
146    fn setup(stream: tokio::net::TcpStream) -> Self {
147        let bean = Framed::new(stream, proto::CommandCodec::new());
148        Beanstalkd { connection: bean }
149    }
150
151    /// The "put" command is for any process that wants to insert a job into the queue.
152    ///
153    /// It inserts a job into the client's currently used tube (see the `use` command
154    /// below).
155    ///
156    /// - `priority` is an integer < 2**32. Jobs with smaller priority values will be
157    ///   scheduled before jobs with larger priorities. The most urgent priority is 0;
158    ///   the least urgent priority is 4,294,967,295.
159
160    /// - `delay` is an integer number of seconds to wait before putting the job in
161    ///   the ready queue. The job will be in the "delayed" state during this time.
162
163    /// - `ttr` -- time to run -- is an integer number of seconds to allow a worker
164    ///   to run this job. This time is counted from the moment a worker reserves
165    ///   this job. If the worker does not delete, release, or bury the job within
166    ///   `ttr` seconds, the job will time out and the server will release the job.
167    ///   The minimum ttr is 1. If the client sends 0, the server will silently
168    ///   increase the ttr to 1.
169
170    /// - `data` is the job body -- a sequence of bytes of length <bytes> from the
171    ///   previous line.
172    ///
173    /// After sending the command line and body, the client waits for a reply, which
174    /// is the integer id of the new job.
175    pub fn put<D>(
176        self,
177        priority: u32,
178        delay: u32,
179        ttr: u32,
180        data: D,
181    ) -> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
182    where
183        D: Into<Cow<'static, [u8]>>,
184    {
185        let data = data.into();
186        self.connection
187            .send(proto::Request::Put {
188                priority,
189                delay,
190                ttr,
191                data,
192            })
193            .and_then(|conn| {
194                handle_response!(conn, {
195                    AnyResponse::Inserted(id) => Ok(id),
196                    AnyResponse::Buried => Err(Put::Buried),
197                    _r => Err(Put::Beanstalk{error: BeanstalkError::UnexpectedResponse})
198                }, {
199                    // TODO: extract duplication. There has got to be some way to do that...
200                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Put::Beanstalk{error: BeanstalkError::BadFormat}),
201                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Put::Beanstalk{error: BeanstalkError::OutOfMemory}),
202                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Put::Beanstalk{error: BeanstalkError::InternalError}),
203                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Put::Beanstalk{error: BeanstalkError::UnknownCommand}),
204
205                    ErrorKind::Protocol(ProtocolError::ExpectedCRLF) => Err(Put::ExpectedCRLF),
206                    ErrorKind::Protocol(ProtocolError::JobTooBig) => Err(Put::JobTooBig),
207                    ErrorKind::Protocol(ProtocolError::Draining) => Err(Put::Draining),
208                    _r => Err(Put::Beanstalk{error: BeanstalkError::UnexpectedResponse})
209                })
210            })
211    }
212
213    /// Reserve a [job](tokio_beanstalkd::response::Job) to process.
214    ///
215    /// FIXME: need to handle different responses returned at different TTR vs reserve-with-timeout times
216    ///
217    /// A process that wants to consume jobs from the queue uses `reserve`,
218    /// `[delete](tokio_beanstalkd::delete)`,
219    /// `[release](tokio_beanstalkd::release)`, and
220    /// `[bury](tokio_beanstalkd::bury)`.
221    pub fn reserve(
222        self,
223    ) -> impl Future<Item = (Self, Result<Job, Consumer>), Error = failure::Error> {
224        self.connection
225            .send(proto::Request::Reserve)
226            .and_then(|conn| {
227                handle_response!(conn, {
228                    AnyResponse::Reserved(job) => Ok(job),
229                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
230                }, {
231                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
232                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
233                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
234                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
235                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
236                 })
237            })
238    }
239
240    /// The "use" command is for producers. Subsequent put commands will put jobs into
241    /// the tube specified by this command. If no use command has been issued, jobs
242    /// will be put into the tube named "default".
243    ///
244    /// - `tube` is a name at most 200 bytes. It specifies the tube to use. If the
245    ///   tube does not exist, it will be created.
246    pub fn using(
247        self,
248        tube: &'static str,
249    ) -> impl Future<Item = (Self, Result<Tube, BeanstalkError>), Error = failure::Error> {
250        self.connection
251            .send(Request::Use { tube })
252            .and_then(|conn| {
253                handle_response!(conn, {
254                    AnyResponse::Using(tube) => Ok(tube),
255                    _r => Err(BeanstalkError::UnexpectedResponse)
256                }, {
257                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(BeanstalkError::BadFormat),
258                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(BeanstalkError::OutOfMemory),
259                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(BeanstalkError::InternalError),
260                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(BeanstalkError::UnknownCommand),
261                    _r => Err(BeanstalkError::UnexpectedResponse)
262                 })
263            })
264    }
265
266    /// The delete command removes a job from the server entirely. It is normally used
267    /// by the client when the job has successfully run to completion. A client can
268    /// delete jobs that it has reserved, ready jobs, delayed jobs, and jobs that are
269    /// buried.
270    ///
271    ///  - `id` is the job id to delete.
272    pub fn delete(
273        self,
274        id: u32,
275    ) -> impl Future<Item = (Self, Result<(), errors::Consumer>), Error = failure::Error> {
276        self.connection
277            .send(Request::Delete { id })
278            .and_then(|conn| {
279                handle_response!(conn, {
280                    AnyResponse::Deleted => Ok(()),
281                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
282                }, {
283                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
284                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
285                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
286                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
287
288                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
289                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
290                })
291            })
292    }
293
294    /// The release command puts a reserved job back into the ready queue (and marks
295    /// its state as "ready") to be run by any client. It is normally used when the job
296    /// fails because of a transitory error.
297    ///
298    ///  - `id` is the job id to release.
299    ///
300    /// - `pri` is a new priority to assign to the job.
301    ///
302    /// - `delay` is an integer number of seconds to wait before putting the job in
303    ///   the ready queue. The job will be in the "delayed" state during this time.
304    pub fn release(
305        self,
306        id: u32,
307        priority: u32,
308        delay: u32,
309    ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
310        self.connection
311            .send(Request::Release {
312                id,
313                priority,
314                delay,
315            })
316            .and_then(|conn| {
317                handle_response!(conn, {
318                    AnyResponse::Released => Ok(()),
319                    AnyResponse::Buried => Err(Consumer::Buried),
320                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
321                }, {
322                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
323                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
324                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
325                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
326
327                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
328                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
329                })
330            })
331    }
332
333    /// The "touch" command allows a worker to request more time to work on a job.
334    /// This is useful for jobs that potentially take a long time, but you still want
335    /// the benefits of a TTR pulling a job away from an unresponsive worker.  A worker
336    /// may periodically tell the server that it's still alive and processing a job
337    /// (e.g. it may do this on DEADLINE_SOON). The command postpones the auto
338    /// release of a reserved job until TTR seconds from when the command is issued.
339    ///
340    /// - `id` is the ID of a job reserved by the current connection.
341    pub fn touch(
342        self,
343        id: u32,
344    ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
345        self.connection
346            .send(Request::Touch { id })
347            .and_then(|conn| {
348                handle_response!(conn, {
349                    AnyResponse::Touched => Ok(()),
350                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
351                }, {
352
353                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
354                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
355                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
356                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
357                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
358                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
359                })
360            })
361    }
362
363    /// The bury command puts a job into the "buried" state. Buried jobs are put into a
364    /// FIFO linked list and will not be touched by the server again until a client
365    /// kicks them with the "kick" command.
366    ///
367    ///  - `id` is the job id to release.
368    ///
369    /// - `prioritiy` is a new priority to assign to the job.
370    pub fn bury(
371        self,
372        id: u32,
373        priority: u32,
374    ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
375        self.connection
376            .send(Request::Bury { id, priority })
377            .and_then(|conn| {
378                handle_response!(conn, {
379                    AnyResponse::Buried => Ok(()),
380                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
381                }, {
382                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
383                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
384                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
385                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
386                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
387                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
388                })
389            })
390    }
391
392    /// The "watch" command adds the named tube to the watch list for the current
393    /// connection. A reserve command will take a job from any of the tubes in the
394    /// watch list. For each new connection, the watch list initially consists of one
395    /// tube, named "default".
396    ///
397    ///  - <tube> is a name at most 200 bytes. It specifies a tube to add to the watch
398    ///     list. If the tube doesn't exist, it will be created.
399    ///
400    /// The value returned is the count of the tubes being watched by the current connection.
401    pub fn watch(
402        self,
403        tube: &'static str,
404    ) -> impl Future<Item = (Self, Result<u32, BeanstalkError>), Error = failure::Error> {
405        self.connection
406            .send(Request::Watch { tube })
407            .and_then(|conn| {
408                handle_response!(conn, {
409                    AnyResponse::Watching(n) => Ok(n),
410                    _r => Err(BeanstalkError::UnexpectedResponse)
411                }, {
412
413                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(BeanstalkError::BadFormat),
414                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(BeanstalkError::OutOfMemory),
415                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(BeanstalkError::InternalError),
416                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(BeanstalkError::UnknownCommand),
417                    _r => Err(BeanstalkError::UnexpectedResponse)
418                })
419            })
420    }
421
422    /// The "ignore" command is for consumers. It removes the named tube from the
423    /// watch list for the current connection.
424    ///
425    ///  - <tube> is a name at most 200 bytes. It specifies a tube to add to the watch
426    ///     list. If the tube doesn't exist, it will be created.
427    ///
428    /// A successful response is:
429    ///
430    /// - The count of the number of tubes currently watching
431    pub fn ignore(
432        self,
433        tube: &'static str,
434    ) -> impl Future<Item = (Self, Result<u32, Consumer>), Error = failure::Error> {
435        self.connection
436            .send(Request::Ignore { tube })
437            .and_then(|conn| {
438                handle_response!(conn, {
439                    AnyResponse::Watching(n) => Ok(n),
440                    AnyResponse::NotIgnored => Err(errors::Consumer::NotIgnored),
441                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
442                }, {
443                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
444                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
445                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
446                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
447                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
448                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
449                })
450            })
451    }
452
453    /// The peek command lets the client inspect a job in the system. There are four
454    /// types of jobs as enumerated in [PeekType](tokio_beanstalkd::PeekType). All but the
455    /// first operate only on the currently used tube.
456    ///
457    /// * It takes a [PeekType](tokio_beanstalkd::PeekType) representing the type of peek
458    /// operation to perform
459    ///
460    /// * And returns a [Job](tokio_beanstalkd::response::job) on success.
461    pub fn peek(
462        self,
463        peek_type: PeekType,
464    ) -> impl Future<Item = (Self, Result<Job, Consumer>), Error = failure::Error> {
465        let request = match peek_type {
466            PeekType::Ready => Request::PeekReady,
467            PeekType::Delayed => Request::PeekDelay,
468            PeekType::Buried => Request::PeekBuried,
469            PeekType::Normal(id) => Request::Peek { id },
470        };
471
472        self.connection
473            .send(request)
474            .and_then(|conn| {
475                handle_response!(conn, {
476                    AnyResponse::Found(job) => Ok(job),
477                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
478                }, {
479                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
480                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
481                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
482                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
483                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
484                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
485                })
486            })
487    }
488
489    /// The kick command applies only to the currently used tube. It moves jobs into
490    /// the ready queue. If there are any buried jobs, it will only kick buried jobs.
491    /// Otherwise it will kick delayed jobs.
492    ///
493    /// * It takes a `bound` which is the number of jobs it will kick
494    /// * The response is a u32 representing the number of jobs kicked by the server
495    pub fn kick(
496        self,
497        bound: u32,
498    ) -> impl Future<Item = (Self, Result<u32, Consumer>), Error = failure::Error> {
499        self.connection
500            .send(Request::Kick{ bound })
501            .and_then(|conn| {
502                handle_response!(conn, {
503                    AnyResponse::Kicked(count) => Ok(count),
504                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
505                }, {
506                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
507                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
508                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
509                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
510                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
511                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
512            })
513        })
514    }
515
516    /// The kick-job command is a variant of kick that operates with a single job
517    /// identified by its job id. If the given job id exists and is in a buried or
518    /// delayed state, it will be moved to the ready queue of the the same tube where it
519    /// currently belongs.
520    ///
521    /// * It takes an `id` of the job to be kicked
522    /// * And returns `()` on success
523    pub fn kick_job(
524        self,
525        id: Id,
526    ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
527        self.connection
528            .send(Request::KickJob { id })
529            .and_then(|conn| {
530                handle_response!(conn, {
531                    AnyResponse::JobKicked => Ok(()),
532                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
533                }, {
534                    ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
535                    ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
536                    ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
537                    ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
538                    ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
539                    _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
540            })
541        })
542    }
543}
544
545/// The type of [peek][tokio_beanstalkd::peek] request you want to make
546pub enum PeekType {
547    /// The next ready job
548    Ready,
549
550    /// The next delayed job
551    Delayed,
552
553    /// The next bufied job
554    Buried,
555
556    /// The job with the given Id
557    Normal(Id),
558}
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563    use std::process::Command;
564    use std::sync::atomic::{AtomicBool, Ordering};
565
566    static mut SPAWNED: AtomicBool = AtomicBool::new(false);
567
568    // Simple function to make sure only one instance of beanstalkd is spawned.
569    unsafe fn spawn_beanstalkd() {
570        if !*SPAWNED.get_mut() {
571            Command::new("beanstalkd")
572                .spawn()
573                .expect("Unable to spawn server");
574            SPAWNED.compare_and_swap(false, true, Ordering::SeqCst);
575        }
576    }
577
578    #[test]
579    fn it_works() {
580        unsafe {
581            spawn_beanstalkd();
582        }
583        let mut rt = tokio::runtime::Runtime::new().unwrap();
584        let bean = rt.block_on(
585            Beanstalkd::connect(
586                &"127.0.0.1:11300"
587                    .parse()
588                    .expect("Unable to connect to Beanstalkd"),
589            ).and_then(|bean| {
590                // Let put a job in
591                bean.put(0, 1, 100, &b"data"[..])
592                    .inspect(|(_, response)| assert!(response.is_ok()))
593                    .and_then(|(bean, _)| {
594                        // how about another one?
595                        bean.put(0, 1, 100, &b"more data"[..])
596                    }).inspect(|(_, response)| assert!(response.is_ok()))
597                    .and_then(|(bean, _)| {
598                        // Let's watch a particular tube
599                        bean.using("test")
600                    }).inspect(|(_, response)| assert_eq!(response.as_ref().unwrap(), "test"))
601            }),
602        );
603        assert!(!bean.is_err());
604        drop(bean);
605        rt.shutdown_on_idle();
606    }
607
608    #[test]
609    fn consumer_commands() {
610        unsafe {
611            spawn_beanstalkd();
612        }
613        let mut rt = tokio::runtime::Runtime::new().unwrap();
614        let bean = rt.block_on(
615            Beanstalkd::connect(
616                &"127.0.0.1:11300"
617                    .parse()
618                    .expect("Unable to connect to Beanstalkd"),
619            ).and_then(|bean| {
620                // [put][tokio_beanstalkd::put] test
621                bean.put(0, 1, 100, &b"data"[..])
622                    .inspect(|(_, response)| {
623                        response.as_ref().unwrap();
624                    })
625
626                    // [reserve][tokio_beanstalkd::reserve] test
627                    .and_then(|(bean, _)| bean.reserve())
628                    .inspect(|(_, response)| assert_eq!(response.as_ref().unwrap().data, b"data"))
629
630                    // [peek][tokio_beanstalkd::peek] test with PeekType::Normal
631                    .and_then(|(bean, response)| bean.peek(PeekType::Normal(response.as_ref().unwrap().id)))
632                    .inspect(|(_, response)| {
633                        match response.as_ref() {
634                            Ok(_job) => assert!(true),
635                            Err(e) => panic!("Got err: {:?}", e)
636                        }
637                    })
638
639                    // [touch][tokio_beanstalkd::touch] test
640                    .and_then(|(bean, response)| bean.touch(response.unwrap().id))
641                    .inspect(|(_, response)| {
642                        response.as_ref().unwrap();
643                    })
644
645                    // [put][tokio_beanstalkd::put] test
646                    .and_then(|(bean, _)| {
647                        // how about another one?
648                        bean.put(0, 1, 100, &b"more data"[..])
649                    })
650
651                    // [peek][tokio_beanstalkd::peek] PeekType::Ready test
652                    .and_then(|(bean, _)| bean.peek(PeekType::Ready))
653                    .inspect(|(_, response)| {
654                        match response.as_ref() {
655                            Ok(_job) => assert!(true),
656                            Err(e) => panic!("Got err: {:?}", e)
657                        }
658                    })
659                    // [release][tokio_beanstalkd::release] test
660                    .and_then(|(bean, _)| bean.reserve())
661                    .and_then(|(bean, response)| bean.release(response.unwrap().id, 10, 10))
662                    .inspect(|(_, response)| {
663                        response.as_ref().unwrap();
664                    })
665
666                    // [bury][tokio_beanstalkd::bury] test
667                    .and_then(|(bean, _)| bean.reserve())
668                    .and_then(|(bean, response)| bean.bury(response.unwrap().id, 10))
669                    .inspect(|(_, response)| {
670                        response.as_ref().unwrap();
671                    })
672
673                    // [delete][tokio_beanstalkd::delete] test
674                    .and_then(|(bean, _response)| bean.delete(100))
675                    .inspect(|(_, response)| {
676                        assert_eq!(*response, Err(errors::Consumer::NotFound));
677                    })
678
679                    // [watch][tokio_beanstalkd::watch] test
680                    .and_then(|(bean, _)| bean.watch("test"))
681                    .inspect(|(_, response)| assert_eq!(*response.as_ref().unwrap(), 2))
682
683                    // [ignore][tokio_beanstalkd::ignore] test
684                    .and_then(|(bean, _)| bean.ignore("test"))
685                    .inspect(|(_, response)| assert_eq!(*response.as_ref().unwrap(), 1))
686
687                    // [peek][tokio_beanstalkd::peek] PeekType::Buried test
688                    .and_then(|(bean, _)| bean.peek(PeekType::Buried))
689                    .inspect(|(_, response)| {
690                        match response.as_ref() {
691                            Ok(_job) => assert!(true),
692                            Err(e) => panic!("Got err: {:?}", e)
693                        }
694                    })
695
696                    // [peek][tokio_beanstalkd::peek] PeekType::Delayed test
697                    .and_then(|(bean, _)| bean.peek(PeekType::Delayed))
698                    .inspect(|(_, response)| {
699                        match response.as_ref() {
700                            Ok(_job) => assert!(true),
701                            Err(e) => panic!("Got err: {:?}", e)
702                        }
703                    })
704            }),
705        );
706        assert!(!bean.is_err());
707        drop(bean);
708        rt.shutdown_on_idle();
709    }
710}