Struct shared_resource_pool_builder::SharedResourcePoolBuilder[][src]

pub struct SharedResourcePoolBuilder<SType, SR> { /* fields omitted */ }
Expand description

The builder to create pools on a shared resource.

Implementations

Creates an unbounded producer-consumer pool.

This method takes two parameters:

  • The producer function

    This function will generate the items to be further processed by the consumer. It must take one parameter, which is a sender like object like Sender. To generate an item for the consumer, use the send() method of the sender.

    The return type of this function will be ignored, i.e. it is ().

    By design, this function will run in only one thread. If you want it to run in multiple threads, you need to implement that by yourself. The preferred way is to run only a cheap listing algorithm in the producer and let the consumer do the computational intensive tasks, which will by default run within a thread pool.

  • The consumer function

    This function takes one parameter and will be called for each item that was sent by the producer. The return type must be the same type that is used by the shared consumer as defined by the Self::new() method.

    By design, the consumer functions will be run within a thread pool that is shared with every other pool consumers created by this builder. The number of threads is the number of cores, as defined by the default value of threadpool::Builder::num_threads().

The message channel, in which the producer sends and the consumer receives the items, is unlimited with this function (in other words, it uses channel()). On modern systems and “typical” use-cases, this should not be a problem. If however your system is low on memory or your producer generates millions over millions of items, then you might be interested in the bounded variant of this method.

Example

Assuming that heavy_algorithm() converts an integer to an object that can be used with the shared consumer function, a pool creation might look like this:

pool_builder.create_pool(
    |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
    |i| heavy_algorithm(i),
);

Creates a bounded producer-consumer pool.

This method is nearly identical to its unbounded variant. For a detailled description of the producer and consumer parameters, please look there.

The difference is, that this function takes an additional integer parameter, which limits the size of the message channel, that is used by the producer and consumer (in other words, it uses sync_channel()). If the channel is full, the producer will block until the consumer takes at least one item. This can be useful if you expect a huge amount of items to be generated, or if your system is low on memory.

Example

Assuming that crazy_producer() generates a huge amount of integer values, and that heavy_algorithm() converts an integer to an object that can be used with the shared consumer function, a bounded pool creation with a buffer of 100 items might look like this:

pool_builder.create_pool_bounded(
    100,
    |tx| crazy_producer().into_iter().for_each(|i| tx.send(i).unwrap()),
    |i| heavy_algorithm(i),
);

Send one single item to the shared consumer.

This method can be used to communicate with the shared resource, before creating another pool. Maybe you just want to add another item to the shared consumer. Or you want to use this method to trigger an alternate bevahior of the shared consumer.

Internally, this method behaves much like the Self::create_pool() method. It returns a JobHandle, which can be waited on with JobHandle::join(). The item is sent to the message queue of the pool builder and might not be consumed immediately if there are other items waiting.

Examples
Add another item

In the following example, in addition to summing up 0, 1, and 2, as well as 4, 5, and 6, we add a single 3 to the result:

let pool_builder = SharedResourcePoolBuilder::new(0, |sum, i| *sum += i);

pool_builder.create_pool(
    |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(i).unwrap()),
    |i| i,
);

pool_builder.oneshot(3);

pool_builder.create_pool(
    |tx| vec![4, 5, 6].into_iter().for_each(|i| tx.send(i).unwrap()),
    |i| i,
);

let result = pool_builder.join().unwrap();

assert_eq!(result, 21);
Trigger alternate behavior

In the following example, we change the behavior of the shared consumer in the middle of the execution:

let pool_builder = SharedResourcePoolBuilder::new(
    // Here we define our shared resource to be a tuple. The second element will determine
    // our current behavior and can be altered in the shared consumer function.
    (0, true),

    // The whole tuple must be used in the shared consumer. Add or subtract the received
    // value `i`, depending on the state of `adding`. If `None` is received, swap the state
    // variable, so that upcoming values will be handled by the alternate behavior.
    |(sum, adding), i| {
        match i {
            Some(i) => {
                if *adding {
                    *sum += i;
                } else {
                    *sum -= i;
                }
            }
            None => {
                *adding = !*adding;
            }
        }
    },
);

// First we add some numbers
pool_builder.create_pool(
    |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
    |i| i,
).join().unwrap();

// Swap summing behavior
pool_builder.oneshot(None).join().unwrap();

// Now we subtract the same numbers again, with the same code as before
pool_builder.create_pool(
    |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
    |i| i,
);

// Remember that we defined a boolean in our shared resource tuple. We are not interested
// in this one anymore, so we explicitly ignore it here.
let (result, _) = pool_builder.join().unwrap();

// Since we subtracted the same amount as we added, the result is now 0
assert_eq!(result, 0);

// Note that we joined the first pool and the oneshot to await them to be finished. In this
// simple example, it would normally not be necessary, since sending merely three items
// happens quite immediately and the shared consumer handles all items sequentially.
// Nevertheless, there could be a race condition if for some reason the swapping is
// triggered before all items could be added. In more complex scenarios, care should be
// taken to await all previous pools with `join()` before altering the shared consumer
// behavior.

Waits for all tasks to finish their work and return the shared resource afterwards.

This implicitly waits for all created pools to be finished, since their jobs all end up in the shared consumer. Note that after this call, this pool builder can no longer be used, since the shared resource will be moved to the caller.

This method returns a thread::Result. Since the Err variant of this specialized Result does not implement the Error trait, the ?-operator does not work here. For more information about how to handle panics in this case, please refer to the documentation of thread::Result.

Example:
let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), shared_consumer_fn);
pool_builder.create_pool(
    |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
    |i| heavy_algorithm(i),
);

let result = pool_builder.join().unwrap();

Creates a new unbounded pool builder.

This function takes two parameters:

  • The shared resource

    This will be the resource that is used with every item that gets sent from the pool consumer functions. Usually this will be something that cannot be easily shared with multiple threads, like a database connection object. But this is not a restriction, you can use pretty much everything as the shared resource.

  • The shared consumer function

    This function is called for each item that comes from the pool consumers, together with the shared resource. To give you the maximum flexibility, the function must take two parameters: The shared resource and an item. The type of the item will be always the same for every pool consumer you create with this builder.

Note, that the message channel, which is used by the pool consumers to communicate with the shared consumer, is unlimited (in other words, it uses channel()). On modern systems and “typical” use-cases, this should not be a problem. If however your system is low on memory or your producers generate millions over millions of items, then you might be interested in the bounded variant of this method.

Example
let pool_builder = SharedResourcePoolBuilder::new(
    // We can use pretty much anything as the shared resource. Here, we are using a Vector:
    Vec::new(),

    // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
    // either be given explicitly, or simply let Rust infer it.
    |vec, i| vec.push(i)
);

// Now we create a (very simple) pool for demonstration purpose. Since we return "i"
// unaltered, the parameter type of the shared consumer as well as the item type of the
// shared Vector will be inferred to "u8".
pool_builder.create_pool(
    |tx| tx.send(1u8).unwrap(),
    |i| i,
);

Creates a new bounded pool builder.

This method is nearly identical to its unbounded variant. For a detailled description of the shared resource and shared consumer parameters, please look there.

The difference is, that this function takes an additional integer parameter, which limits the size of the message channel, that is used by the pool consumers to communicate with the shared consumer (in other words, it uses sync_channel()). If the channel is full, the pool consumers will block until the shared consumer takes at least one item. This can be useful if you expect a huge amount of items to be generated, or if your system is low on memory.

Example
let pool_builder = SharedResourcePoolBuilder::new_bounded(
    // We are very low on memory
    10,

    // We can use pretty much anything as the shared resource. Here, we are using a Vector:
    Vec::new(),

    // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
    // either be given explicitly, or simply let Rust infer it.
    |vec, i| vec.push(i)
);

// Now we create a (very simple) pool for demonstration purpose. Since we return "i"
// unaltered, the parameter type of the shared consumer as well as the item type of the
// shared Vector will be inferred to "u8".
pool_builder.create_pool(
    |tx| tx.send(1u8).unwrap(),
    |i| i,
);

// No matter how many pools we create, the maximum buffered items will never exceed 10.

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Performs the conversion.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.