Struct faktory::Consumer [] [src]

pub struct Consumer<S, F> where
    S: Read + Write
{ /* fields omitted */ }

Consumer is used to run a worker that processes jobs provided by Faktory.

Building the worker

Faktory needs a decent amount of information from its workers, such as a unique worker ID, a hostname for the worker, its process ID, and a set of labels used to indicate which jobs the worker can accept. In order to enable setting all these, constructing a worker is a two-step process. You first use a ConsumerBuilder (which conveniently implements a sensible Default) to set the worker metadata, as well as to register any job handlers. You then use one of the connect_* methods to finalize the worker and connect to the Faktory server.

In most cases, ConsumerBuilder::default() will do what you want. You only need to augment it with calls to register to register handlers for each of your job types, and then you can connect. If you have different types of workers, you may also want to use labels to further narrow down what kind of jobs this worker should accept.

Handlers

For each Job that the worker receives, the handler that is registered for that job's type will be called. If a job is received with a type for which no handler exists, the job will be failed and returned to the Faktory server. Similarly, if a handler returns an error response, the job will be failed, and the error reported back to the Faktory server.

If you are new to Rust, getting the handler types to work out can be a little tricky. If you want to understand why, I highly recommend that you have a look at the chapter on closures and generic parameters in the Rust Book. If you just want it to work, my recommendation is to either use regular functions instead of closures, and giving &func_name as the handler, or wrapping all your closures in Box::new().

Concurrency

By default, only a single thread is spun up to process the jobs given to this worker. If you want to dedicate more resources to processing jobs, you have a number of options listed below. As you go down the list below, efficiency increases, but fault isolation decreases. I will not give further detail here, but rather recommend that if these don't mean much to you, you should use the last approach and let the library handle the concurrency for you.

  • You can spin up more worker processes by launching your worker program more than once.
  • You can create more than one Consumer.
  • You can call ConsumerBuilder::workers to set the number of worker threads you'd like the Consumer to use internally.

Connecting to Faktory

To fetch jobs, the Consumer must first be connected to the Faktory server. Exactly how you do that depends on your setup. In most cases, you'll want to use Consumer::connect, and provide a connection URL (None will use the Faktory environment variables). If you supply a URL, it must be of the form:

protocol://[:password@]hostname[:port]

Faktory suggests using the FAKTORY_PROVIDER and FAKTORY_URL environment variables (see their docs for more information) with localhost:7419 as the fallback default. If you want this behavior, use ConsumerBuilder::connect_env. If not, you can supply the URL directly to ConsumerBuilder::connect. Both methods take a connection type as described above.

See the Producer examples for examples of how to connect to different Factory setups.

Worker lifecycle

Okay, so you've built your worker and connected to the Faktory server. Now what?

If all this process is doing is handling jobs, reconnecting on failure, and exiting when told to by the Faktory server, you should use run_to_completion. If you want more fine-grained control over the lifetime of your process, you should use Consumer::run. See the documentation for each of these methods for details.

Examples

Create a worker with all default options, register a single handler (for the foobar job type), connect to the Faktory server, and start accepting jobs.

use faktory::ConsumerBuilder;
use std::io;
let mut c = ConsumerBuilder::default();
c.register("foobar", |job| -> io::Result<()> {
    println!("{:?}", job);
    Ok(())
});
let mut c = c.connect(None).unwrap();
if let Err(e) = c.run(&["default"]) {
    println!("worker failed: {}", e);
}

Methods

impl<S, E, F> Consumer<S, F> where
    S: Read + Write,
    E: Error,
    F: Fn(Job) -> Result<(), E> + Send + Sync + 'static, 
[src]

[src]

Fetch and run a single job on the current thread, and then return.

impl<S, E, F> Consumer<S, F> where
    S: Read + Write + Reconnect + Send + 'static,
    E: Error,
    F: Fn(Job) -> Result<(), E> + Send + Sync + 'static, 
[src]

[src]

Run this worker on the given queues until an I/O error occurs (Err is returned), or until the server tells the worker to disengage (Ok is returned).

The value in an Ok indicates the number of workers that may still be processing jobs.

Note that if the worker fails, reconnect() should likely be called before calling run() again. If an error occurred while reporting a job success or failure, the result will be re-reported to the server without re-executing the job. If the worker was terminated (i.e., run returns with an Ok response), the worker should not try to resume by calling run again. This will cause a panic.

[src]

Run this worker until the server tells us to exit or a connection cannot be re-established.

This function never returns. When the worker decides to exit, the process is terminated.