tokio_beanstalkd/lib.rs
1//! This crate provides a client for working with [Beanstalkd](https://beanstalkd.github.io/), a simple
2//! fast work queue.
3//
4//! # About Beanstalkd
5//!
6//! Beanstalkd is a simple fast work queue. It works at the TCP connection level, considering each TCP
7//! connection individually. A worker may have multiple connections to the Beanstalkd server and each
8//! connection will be considered separate.
9//!
10//! The protocol is ASCII text based but the data itself is just a bytestream. This means that the
11//! application is responsible for interpreting the data.
12//!
13//! ## Operation
14//! This library can serve as a client for both the application and the worker. The application would
15//! [`put`](tokio_beanstalkd::put) jobs on the queue and the workers can [`reserve`](tokio_beanstalkd::put)
16//! them. Once they are done with the job, they have to [`delete`](tokio_beanstalkd::delete) job.
17//! This is required for every job, or else Beanstalkd will not remove it fromits internal datastructres.
18//!
19//! If a worker cannot finish the job in it's TTR (Time To Run), then it can [`release`](tokio_beanstalkd::release)
20//! the job. Theapplication can use the [`using`](tokio_beanstalkd::using) method to put jobs in a specific tube,
21//! and workers can use [watch](tokio_beanstalkd::watch)
22//!
23//! [release]: struct.Beanstalkd.html#method.release
24//! [using]: struct.Beanstalkd.html#method.using
25//!
26//! ## Interaction with Tokio
27//!
28//! The futures in this crate expect to be running under a `tokio::Runtime`. In the common case,
29//! you cannot resolve them solely using `.wait()`, but should instead use `tokio::run` or
30//! explicitly create a `tokio::Runtime` and then use `Runtime::block_on`.
31//!
32//! An simple example client could look something like this:
33//!
34//! ```no_run
35//! # extern crate tokio;
36//! # extern crate futures;
37//! # extern crate tokio_beanstalkd;
38//! # use tokio::prelude::*;
39//! # use tokio_beanstalkd::*;
40//! # fn consumer_commands() {
41//! let mut rt = tokio::runtime::Runtime::new().unwrap();
42//! let bean = rt.block_on(
43//! Beanstalkd::connect(&"127.0.0.1:11300".parse().unwrap()).and_then(|bean| {
44//! bean.put(0, 1, 100, &b"update:42"[..])
45//! .inspect(|(_, response)| {
46//! response.as_ref().unwrap();
47//! })
48//! .and_then(|(bean, _)| {
49//! // Use a particular tube
50//! bean.using("notifications")
51//! }).and_then(|(bean, _)| bean.put(0, 1, 100, &b"notify:100"[..]))
52//! }),
53//! );
54//! rt.shutdown_on_idle();
55//! # }
56//! ```
57//!
58//! And a worker could look something like this:
59//! ```no_run
60//! # extern crate tokio;
61//! # extern crate futures;
62//! # extern crate tokio_beanstalkd;
63//! # use tokio::prelude::*;
64//! # use tokio_beanstalkd::*;
65//! # fn consumer_commands() {
66//! let mut rt = tokio::runtime::Runtime::new().unwrap();
67//! let bean = rt.block_on(
68//! Beanstalkd::connect(&"127.0.0.1:11300".parse().unwrap()).and_then(|bean| {
69//! bean.reserve()
70//! .inspect(|(_, response)| {
71//! // Do something with the response
72//! }).and_then(|(bean, response)| {
73//! // Delete the job once it is done
74//! bean.delete(response.as_ref().unwrap().id)
75//! })
76//! }),
77//! );
78//! rt.shutdown_on_idle();
79//! # }
80//! ```
81
82extern crate bytes;
83extern crate futures;
84#[macro_use]
85extern crate failure;
86extern crate tokio;
87
88pub mod errors;
89mod proto;
90
91use tokio::codec::Framed;
92use tokio::prelude::*;
93
94use std::borrow::Cow;
95use std::net::SocketAddr;
96
97use proto::error as proto_error;
98pub use proto::response::*;
99pub use proto::{Id, Tube};
100use proto_error::{ErrorKind, ProtocolError};
101// Request doesn't have to be a public type
102use proto::Request;
103
104use errors::{BeanstalkError, Consumer, Put};
105
106/// A macro to map internal types to external types. The response from a Stream is a
107/// `(Result<Option<Decoder::Item>, Decoder::Error, connection)`. This value needs to be
108/// maapped to the types that we expect. As we can see there are three possible outcomes,
109/// and each outcome where there is either a response or an error from the server, we get
110/// either an AnyResponse or a proto_error::ProtocolError which need to mapped to simpler
111/// public facing types. That is what the two mappings are. The first one is when you get
112/// Some(response) and the second one is purely for when the Decoder returns an Error
113macro_rules! handle_response {
114 ($input:ident, $mapping:tt, $error_mapping:tt) => {
115 $input.into_future().then(|val| match val {
116 Ok((Some(val), conn)) => Ok((Beanstalkd { connection: conn }, match val $mapping)),
117 // None is only returned when the stream is closed
118 Ok((None, _)) => bail!("Stream closed"),
119 Err((e, conn)) => Ok((Beanstalkd { connection: conn }, match e.kind() $error_mapping)),
120 })
121 };
122}
123
124/// Connection to the Beanstalkd server.
125///
126/// All interactions with Beanstalkd happen by calling methods on a `Beanstalkd` instance.
127///
128/// Even though there is a `quit` command, Beanstalkd consideres a closed connection as the
129/// end of communication, so just dropping this struct will close the connection.
130#[derive(Debug)]
131pub struct Beanstalkd {
132 connection: Framed<tokio::net::TcpStream, proto::CommandCodec>,
133}
134
135// FIXME: log out unexpected errors using env_logger
136impl Beanstalkd {
137 /// Connect to a Beanstalkd instance.
138 ///
139 /// A successful TCP connect is considered the start of communication.
140 pub fn connect(addr: &SocketAddr) -> impl Future<Item = Self, Error = failure::Error> {
141 tokio::net::TcpStream::connect(addr)
142 .map_err(failure::Error::from)
143 .map(Beanstalkd::setup)
144 }
145
146 fn setup(stream: tokio::net::TcpStream) -> Self {
147 let bean = Framed::new(stream, proto::CommandCodec::new());
148 Beanstalkd { connection: bean }
149 }
150
151 /// The "put" command is for any process that wants to insert a job into the queue.
152 ///
153 /// It inserts a job into the client's currently used tube (see the `use` command
154 /// below).
155 ///
156 /// - `priority` is an integer < 2**32. Jobs with smaller priority values will be
157 /// scheduled before jobs with larger priorities. The most urgent priority is 0;
158 /// the least urgent priority is 4,294,967,295.
159
160 /// - `delay` is an integer number of seconds to wait before putting the job in
161 /// the ready queue. The job will be in the "delayed" state during this time.
162
163 /// - `ttr` -- time to run -- is an integer number of seconds to allow a worker
164 /// to run this job. This time is counted from the moment a worker reserves
165 /// this job. If the worker does not delete, release, or bury the job within
166 /// `ttr` seconds, the job will time out and the server will release the job.
167 /// The minimum ttr is 1. If the client sends 0, the server will silently
168 /// increase the ttr to 1.
169
170 /// - `data` is the job body -- a sequence of bytes of length <bytes> from the
171 /// previous line.
172 ///
173 /// After sending the command line and body, the client waits for a reply, which
174 /// is the integer id of the new job.
175 pub fn put<D>(
176 self,
177 priority: u32,
178 delay: u32,
179 ttr: u32,
180 data: D,
181 ) -> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
182 where
183 D: Into<Cow<'static, [u8]>>,
184 {
185 let data = data.into();
186 self.connection
187 .send(proto::Request::Put {
188 priority,
189 delay,
190 ttr,
191 data,
192 })
193 .and_then(|conn| {
194 handle_response!(conn, {
195 AnyResponse::Inserted(id) => Ok(id),
196 AnyResponse::Buried => Err(Put::Buried),
197 _r => Err(Put::Beanstalk{error: BeanstalkError::UnexpectedResponse})
198 }, {
199 // TODO: extract duplication. There has got to be some way to do that...
200 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Put::Beanstalk{error: BeanstalkError::BadFormat}),
201 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Put::Beanstalk{error: BeanstalkError::OutOfMemory}),
202 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Put::Beanstalk{error: BeanstalkError::InternalError}),
203 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Put::Beanstalk{error: BeanstalkError::UnknownCommand}),
204
205 ErrorKind::Protocol(ProtocolError::ExpectedCRLF) => Err(Put::ExpectedCRLF),
206 ErrorKind::Protocol(ProtocolError::JobTooBig) => Err(Put::JobTooBig),
207 ErrorKind::Protocol(ProtocolError::Draining) => Err(Put::Draining),
208 _r => Err(Put::Beanstalk{error: BeanstalkError::UnexpectedResponse})
209 })
210 })
211 }
212
213 /// Reserve a [job](tokio_beanstalkd::response::Job) to process.
214 ///
215 /// FIXME: need to handle different responses returned at different TTR vs reserve-with-timeout times
216 ///
217 /// A process that wants to consume jobs from the queue uses `reserve`,
218 /// `[delete](tokio_beanstalkd::delete)`,
219 /// `[release](tokio_beanstalkd::release)`, and
220 /// `[bury](tokio_beanstalkd::bury)`.
221 pub fn reserve(
222 self,
223 ) -> impl Future<Item = (Self, Result<Job, Consumer>), Error = failure::Error> {
224 self.connection
225 .send(proto::Request::Reserve)
226 .and_then(|conn| {
227 handle_response!(conn, {
228 AnyResponse::Reserved(job) => Ok(job),
229 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
230 }, {
231 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
232 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
233 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
234 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
235 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
236 })
237 })
238 }
239
240 /// The "use" command is for producers. Subsequent put commands will put jobs into
241 /// the tube specified by this command. If no use command has been issued, jobs
242 /// will be put into the tube named "default".
243 ///
244 /// - `tube` is a name at most 200 bytes. It specifies the tube to use. If the
245 /// tube does not exist, it will be created.
246 pub fn using(
247 self,
248 tube: &'static str,
249 ) -> impl Future<Item = (Self, Result<Tube, BeanstalkError>), Error = failure::Error> {
250 self.connection
251 .send(Request::Use { tube })
252 .and_then(|conn| {
253 handle_response!(conn, {
254 AnyResponse::Using(tube) => Ok(tube),
255 _r => Err(BeanstalkError::UnexpectedResponse)
256 }, {
257 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(BeanstalkError::BadFormat),
258 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(BeanstalkError::OutOfMemory),
259 ErrorKind::Protocol(ProtocolError::InternalError) => Err(BeanstalkError::InternalError),
260 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(BeanstalkError::UnknownCommand),
261 _r => Err(BeanstalkError::UnexpectedResponse)
262 })
263 })
264 }
265
266 /// The delete command removes a job from the server entirely. It is normally used
267 /// by the client when the job has successfully run to completion. A client can
268 /// delete jobs that it has reserved, ready jobs, delayed jobs, and jobs that are
269 /// buried.
270 ///
271 /// - `id` is the job id to delete.
272 pub fn delete(
273 self,
274 id: u32,
275 ) -> impl Future<Item = (Self, Result<(), errors::Consumer>), Error = failure::Error> {
276 self.connection
277 .send(Request::Delete { id })
278 .and_then(|conn| {
279 handle_response!(conn, {
280 AnyResponse::Deleted => Ok(()),
281 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
282 }, {
283 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
284 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
285 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
286 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
287
288 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
289 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
290 })
291 })
292 }
293
294 /// The release command puts a reserved job back into the ready queue (and marks
295 /// its state as "ready") to be run by any client. It is normally used when the job
296 /// fails because of a transitory error.
297 ///
298 /// - `id` is the job id to release.
299 ///
300 /// - `pri` is a new priority to assign to the job.
301 ///
302 /// - `delay` is an integer number of seconds to wait before putting the job in
303 /// the ready queue. The job will be in the "delayed" state during this time.
304 pub fn release(
305 self,
306 id: u32,
307 priority: u32,
308 delay: u32,
309 ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
310 self.connection
311 .send(Request::Release {
312 id,
313 priority,
314 delay,
315 })
316 .and_then(|conn| {
317 handle_response!(conn, {
318 AnyResponse::Released => Ok(()),
319 AnyResponse::Buried => Err(Consumer::Buried),
320 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
321 }, {
322 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
323 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
324 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
325 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
326
327 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
328 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
329 })
330 })
331 }
332
333 /// The "touch" command allows a worker to request more time to work on a job.
334 /// This is useful for jobs that potentially take a long time, but you still want
335 /// the benefits of a TTR pulling a job away from an unresponsive worker. A worker
336 /// may periodically tell the server that it's still alive and processing a job
337 /// (e.g. it may do this on DEADLINE_SOON). The command postpones the auto
338 /// release of a reserved job until TTR seconds from when the command is issued.
339 ///
340 /// - `id` is the ID of a job reserved by the current connection.
341 pub fn touch(
342 self,
343 id: u32,
344 ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
345 self.connection
346 .send(Request::Touch { id })
347 .and_then(|conn| {
348 handle_response!(conn, {
349 AnyResponse::Touched => Ok(()),
350 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
351 }, {
352
353 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
354 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
355 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
356 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
357 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
358 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
359 })
360 })
361 }
362
363 /// The bury command puts a job into the "buried" state. Buried jobs are put into a
364 /// FIFO linked list and will not be touched by the server again until a client
365 /// kicks them with the "kick" command.
366 ///
367 /// - `id` is the job id to release.
368 ///
369 /// - `prioritiy` is a new priority to assign to the job.
370 pub fn bury(
371 self,
372 id: u32,
373 priority: u32,
374 ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
375 self.connection
376 .send(Request::Bury { id, priority })
377 .and_then(|conn| {
378 handle_response!(conn, {
379 AnyResponse::Buried => Ok(()),
380 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
381 }, {
382 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
383 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
384 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
385 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
386 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
387 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
388 })
389 })
390 }
391
392 /// The "watch" command adds the named tube to the watch list for the current
393 /// connection. A reserve command will take a job from any of the tubes in the
394 /// watch list. For each new connection, the watch list initially consists of one
395 /// tube, named "default".
396 ///
397 /// - <tube> is a name at most 200 bytes. It specifies a tube to add to the watch
398 /// list. If the tube doesn't exist, it will be created.
399 ///
400 /// The value returned is the count of the tubes being watched by the current connection.
401 pub fn watch(
402 self,
403 tube: &'static str,
404 ) -> impl Future<Item = (Self, Result<u32, BeanstalkError>), Error = failure::Error> {
405 self.connection
406 .send(Request::Watch { tube })
407 .and_then(|conn| {
408 handle_response!(conn, {
409 AnyResponse::Watching(n) => Ok(n),
410 _r => Err(BeanstalkError::UnexpectedResponse)
411 }, {
412
413 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(BeanstalkError::BadFormat),
414 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(BeanstalkError::OutOfMemory),
415 ErrorKind::Protocol(ProtocolError::InternalError) => Err(BeanstalkError::InternalError),
416 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(BeanstalkError::UnknownCommand),
417 _r => Err(BeanstalkError::UnexpectedResponse)
418 })
419 })
420 }
421
422 /// The "ignore" command is for consumers. It removes the named tube from the
423 /// watch list for the current connection.
424 ///
425 /// - <tube> is a name at most 200 bytes. It specifies a tube to add to the watch
426 /// list. If the tube doesn't exist, it will be created.
427 ///
428 /// A successful response is:
429 ///
430 /// - The count of the number of tubes currently watching
431 pub fn ignore(
432 self,
433 tube: &'static str,
434 ) -> impl Future<Item = (Self, Result<u32, Consumer>), Error = failure::Error> {
435 self.connection
436 .send(Request::Ignore { tube })
437 .and_then(|conn| {
438 handle_response!(conn, {
439 AnyResponse::Watching(n) => Ok(n),
440 AnyResponse::NotIgnored => Err(errors::Consumer::NotIgnored),
441 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
442 }, {
443 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
444 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
445 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
446 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
447 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
448 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
449 })
450 })
451 }
452
453 /// The peek command lets the client inspect a job in the system. There are four
454 /// types of jobs as enumerated in [PeekType](tokio_beanstalkd::PeekType). All but the
455 /// first operate only on the currently used tube.
456 ///
457 /// * It takes a [PeekType](tokio_beanstalkd::PeekType) representing the type of peek
458 /// operation to perform
459 ///
460 /// * And returns a [Job](tokio_beanstalkd::response::job) on success.
461 pub fn peek(
462 self,
463 peek_type: PeekType,
464 ) -> impl Future<Item = (Self, Result<Job, Consumer>), Error = failure::Error> {
465 let request = match peek_type {
466 PeekType::Ready => Request::PeekReady,
467 PeekType::Delayed => Request::PeekDelay,
468 PeekType::Buried => Request::PeekBuried,
469 PeekType::Normal(id) => Request::Peek { id },
470 };
471
472 self.connection
473 .send(request)
474 .and_then(|conn| {
475 handle_response!(conn, {
476 AnyResponse::Found(job) => Ok(job),
477 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
478 }, {
479 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
480 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
481 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
482 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
483 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
484 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
485 })
486 })
487 }
488
489 /// The kick command applies only to the currently used tube. It moves jobs into
490 /// the ready queue. If there are any buried jobs, it will only kick buried jobs.
491 /// Otherwise it will kick delayed jobs.
492 ///
493 /// * It takes a `bound` which is the number of jobs it will kick
494 /// * The response is a u32 representing the number of jobs kicked by the server
495 pub fn kick(
496 self,
497 bound: u32,
498 ) -> impl Future<Item = (Self, Result<u32, Consumer>), Error = failure::Error> {
499 self.connection
500 .send(Request::Kick{ bound })
501 .and_then(|conn| {
502 handle_response!(conn, {
503 AnyResponse::Kicked(count) => Ok(count),
504 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
505 }, {
506 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
507 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
508 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
509 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
510 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
511 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
512 })
513 })
514 }
515
516 /// The kick-job command is a variant of kick that operates with a single job
517 /// identified by its job id. If the given job id exists and is in a buried or
518 /// delayed state, it will be moved to the ready queue of the the same tube where it
519 /// currently belongs.
520 ///
521 /// * It takes an `id` of the job to be kicked
522 /// * And returns `()` on success
523 pub fn kick_job(
524 self,
525 id: Id,
526 ) -> impl Future<Item = (Self, Result<(), Consumer>), Error = failure::Error> {
527 self.connection
528 .send(Request::KickJob { id })
529 .and_then(|conn| {
530 handle_response!(conn, {
531 AnyResponse::JobKicked => Ok(()),
532 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
533 }, {
534 ErrorKind::Protocol(ProtocolError::BadFormat) => Err(Consumer::Beanstalk{error: BeanstalkError::BadFormat}),
535 ErrorKind::Protocol(ProtocolError::OutOfMemory) => Err(Consumer::Beanstalk{error: BeanstalkError::OutOfMemory}),
536 ErrorKind::Protocol(ProtocolError::InternalError) => Err(Consumer::Beanstalk{error: BeanstalkError::InternalError}),
537 ErrorKind::Protocol(ProtocolError::UnknownCommand) => Err(Consumer::Beanstalk{error: BeanstalkError::UnknownCommand}),
538 ErrorKind::Protocol(ProtocolError::NotFound) => Err(Consumer::NotFound),
539 _r => Err(Consumer::Beanstalk{error: BeanstalkError::UnexpectedResponse})
540 })
541 })
542 }
543}
544
545/// The type of [peek][tokio_beanstalkd::peek] request you want to make
546pub enum PeekType {
547 /// The next ready job
548 Ready,
549
550 /// The next delayed job
551 Delayed,
552
553 /// The next bufied job
554 Buried,
555
556 /// The job with the given Id
557 Normal(Id),
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563 use std::process::Command;
564 use std::sync::atomic::{AtomicBool, Ordering};
565
566 static mut SPAWNED: AtomicBool = AtomicBool::new(false);
567
568 // Simple function to make sure only one instance of beanstalkd is spawned.
569 unsafe fn spawn_beanstalkd() {
570 if !*SPAWNED.get_mut() {
571 Command::new("beanstalkd")
572 .spawn()
573 .expect("Unable to spawn server");
574 SPAWNED.compare_and_swap(false, true, Ordering::SeqCst);
575 }
576 }
577
578 #[test]
579 fn it_works() {
580 unsafe {
581 spawn_beanstalkd();
582 }
583 let mut rt = tokio::runtime::Runtime::new().unwrap();
584 let bean = rt.block_on(
585 Beanstalkd::connect(
586 &"127.0.0.1:11300"
587 .parse()
588 .expect("Unable to connect to Beanstalkd"),
589 ).and_then(|bean| {
590 // Let put a job in
591 bean.put(0, 1, 100, &b"data"[..])
592 .inspect(|(_, response)| assert!(response.is_ok()))
593 .and_then(|(bean, _)| {
594 // how about another one?
595 bean.put(0, 1, 100, &b"more data"[..])
596 }).inspect(|(_, response)| assert!(response.is_ok()))
597 .and_then(|(bean, _)| {
598 // Let's watch a particular tube
599 bean.using("test")
600 }).inspect(|(_, response)| assert_eq!(response.as_ref().unwrap(), "test"))
601 }),
602 );
603 assert!(!bean.is_err());
604 drop(bean);
605 rt.shutdown_on_idle();
606 }
607
608 #[test]
609 fn consumer_commands() {
610 unsafe {
611 spawn_beanstalkd();
612 }
613 let mut rt = tokio::runtime::Runtime::new().unwrap();
614 let bean = rt.block_on(
615 Beanstalkd::connect(
616 &"127.0.0.1:11300"
617 .parse()
618 .expect("Unable to connect to Beanstalkd"),
619 ).and_then(|bean| {
620 // [put][tokio_beanstalkd::put] test
621 bean.put(0, 1, 100, &b"data"[..])
622 .inspect(|(_, response)| {
623 response.as_ref().unwrap();
624 })
625
626 // [reserve][tokio_beanstalkd::reserve] test
627 .and_then(|(bean, _)| bean.reserve())
628 .inspect(|(_, response)| assert_eq!(response.as_ref().unwrap().data, b"data"))
629
630 // [peek][tokio_beanstalkd::peek] test with PeekType::Normal
631 .and_then(|(bean, response)| bean.peek(PeekType::Normal(response.as_ref().unwrap().id)))
632 .inspect(|(_, response)| {
633 match response.as_ref() {
634 Ok(_job) => assert!(true),
635 Err(e) => panic!("Got err: {:?}", e)
636 }
637 })
638
639 // [touch][tokio_beanstalkd::touch] test
640 .and_then(|(bean, response)| bean.touch(response.unwrap().id))
641 .inspect(|(_, response)| {
642 response.as_ref().unwrap();
643 })
644
645 // [put][tokio_beanstalkd::put] test
646 .and_then(|(bean, _)| {
647 // how about another one?
648 bean.put(0, 1, 100, &b"more data"[..])
649 })
650
651 // [peek][tokio_beanstalkd::peek] PeekType::Ready test
652 .and_then(|(bean, _)| bean.peek(PeekType::Ready))
653 .inspect(|(_, response)| {
654 match response.as_ref() {
655 Ok(_job) => assert!(true),
656 Err(e) => panic!("Got err: {:?}", e)
657 }
658 })
659 // [release][tokio_beanstalkd::release] test
660 .and_then(|(bean, _)| bean.reserve())
661 .and_then(|(bean, response)| bean.release(response.unwrap().id, 10, 10))
662 .inspect(|(_, response)| {
663 response.as_ref().unwrap();
664 })
665
666 // [bury][tokio_beanstalkd::bury] test
667 .and_then(|(bean, _)| bean.reserve())
668 .and_then(|(bean, response)| bean.bury(response.unwrap().id, 10))
669 .inspect(|(_, response)| {
670 response.as_ref().unwrap();
671 })
672
673 // [delete][tokio_beanstalkd::delete] test
674 .and_then(|(bean, _response)| bean.delete(100))
675 .inspect(|(_, response)| {
676 assert_eq!(*response, Err(errors::Consumer::NotFound));
677 })
678
679 // [watch][tokio_beanstalkd::watch] test
680 .and_then(|(bean, _)| bean.watch("test"))
681 .inspect(|(_, response)| assert_eq!(*response.as_ref().unwrap(), 2))
682
683 // [ignore][tokio_beanstalkd::ignore] test
684 .and_then(|(bean, _)| bean.ignore("test"))
685 .inspect(|(_, response)| assert_eq!(*response.as_ref().unwrap(), 1))
686
687 // [peek][tokio_beanstalkd::peek] PeekType::Buried test
688 .and_then(|(bean, _)| bean.peek(PeekType::Buried))
689 .inspect(|(_, response)| {
690 match response.as_ref() {
691 Ok(_job) => assert!(true),
692 Err(e) => panic!("Got err: {:?}", e)
693 }
694 })
695
696 // [peek][tokio_beanstalkd::peek] PeekType::Delayed test
697 .and_then(|(bean, _)| bean.peek(PeekType::Delayed))
698 .inspect(|(_, response)| {
699 match response.as_ref() {
700 Ok(_job) => assert!(true),
701 Err(e) => panic!("Got err: {:?}", e)
702 }
703 })
704 }),
705 );
706 assert!(!bean.is_err());
707 drop(bean);
708 rt.shutdown_on_idle();
709 }
710}