StreamPartitionExt

Trait StreamPartitionExt 

Source
pub trait StreamPartitionExt: Stream {
    // Provided methods
    fn partition_by<F, Fut, K>(
        self,
        f: F,
    ) -> Arc<Mutex<PartitionBy<Self, Fut, F, K>>>
       where Self: Sized,
             Self::Item: Clone,
             F: Fn(&Self::Item) -> Fut,
             Fut: Future<Output = K>,
             K: Hash + Eq + Clone { ... }
    fn partition_by_with_config<F, Fut, K>(
        self,
        f: F,
        allow_new_queues: bool,
    ) -> Arc<Mutex<PartitionBy<Self, Fut, F, K>>>
       where Self: Sized,
             Self::Item: Clone,
             F: Fn(&Self::Item) -> Fut,
             Fut: Future<Output = K>,
             K: Hash + Eq + Clone { ... }
}
Expand description

Extension trait that adds partitioning functionality to any stream.

This trait provides the partition_by method that can be called on any stream to create a partitioned stream that splits items based on keys generated by an async function.

Provided Methods§

Source

fn partition_by<F, Fut, K>( self, f: F, ) -> Arc<Mutex<PartitionBy<Self, Fut, F, K>>>
where Self: Sized, Self::Item: Clone, F: Fn(&Self::Item) -> Fut, Fut: Future<Output = K>, K: Hash + Eq + Clone,

Partitions this stream into multiple sub-streams based on keys generated by an async function.

Returns a stream of (K, Partitioned<Self, K>) tuples, where each tuple represents a new partition. The first element is the key, and the second is a stream that will yield only items from the original stream that produce that key.

§Arguments
  • f - An async function that takes stream items and returns a partitioning key
§Type Parameters
  • F - The partitioning function type
  • Fut - The future type returned by the partitioning function
  • K - The key type used for partitioning (must be Hash + Eq + Clone)
§Example
use futures::{stream, StreamExt};
use futures::future::ready;
use stream_partition::StreamPartitionExt;

let numbers = stream::iter(vec![1, 2, 3, 4, 5, 6]);
let mut partitioner = numbers.partition_by(|x| ready(x % 2));
Source

fn partition_by_with_config<F, Fut, K>( self, f: F, allow_new_queues: bool, ) -> Arc<Mutex<PartitionBy<Self, Fut, F, K>>>
where Self: Sized, Self::Item: Clone, F: Fn(&Self::Item) -> Fut, Fut: Future<Output = K>, K: Hash + Eq + Clone,

Partitions this stream with configuration options.

Like partition_by, but allows specifying whether new partitions should be created automatically or if items for unknown keys should be dropped.

§Arguments
  • f - An async function that takes stream items and returns a partitioning key
  • allow_new_queues - If true, creates new queues for unknown keys; if false, drops items for unknown keys
§Example
use futures::{stream, StreamExt};
use futures::future::ready;
use stream_partition::StreamPartitionExt;

let numbers = stream::iter(vec![1, 2, 3, 4, 5, 6]);
// Only allow partitions for pre-registered keys
let mut partitioner = numbers.partition_by_with_config(|x| ready(x % 2), false);

// Pre-register the keys we want to allow
partitioner.lock().unwrap().register_keys([0, 1]);

Implementors§