[][src]Module multistream_batch::channel::multi_buf_batch

This module provides MultiBufBatchChannel that will buffer items into multiple internal batches based on batch stream key until one of the batches is ready and provide them in one go, along with the batch stream key, using Drain iterator.

Example

Collect batches of items from two streams by reaching different individual batch limits and using Flush command.

use multistream_batch::channel::multi_buf_batch::MultiBufBatchChannel;
use multistream_batch::channel::multi_buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread with `MultiBufBatchChannel` configured with a maximum size of 4 items and
// a maximum batch duration since the first received item of 200 ms.
let mut batch = MultiBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
    // Send a sequence of `Append` commands with integer stream key and item value
    sender.send(Append(1, 1)).unwrap();
    sender.send(Append(0, 1)).unwrap();
    sender.send(Append(1, 2)).unwrap();
    sender.send(Append(0, 2)).unwrap();
    sender.send(Append(1, 3)).unwrap();
    sender.send(Append(0, 3)).unwrap();
    sender.send(Append(1, 4)).unwrap();
    // At this point batch with stream key `1` should have reached its capacity of 4 items
    sender.send(Append(0, 4)).unwrap();
    // At this point batch with stream key `0` should have reached its capacity of 4 items

    // Send some more to buffer up for next batch
    sender.send(Append(0, 5)).unwrap();
    sender.send(Append(1, 5)).unwrap();
    sender.send(Append(1, 6)).unwrap();
    sender.send(Append(0, 6)).unwrap();

    // Introduce delay to trigger maximum duration timeout
    std::thread::sleep(Duration::from_millis(400));

    // Send items that will be flushed by `Flush` command
    sender.send(Append(0, 7)).unwrap();
    sender.send(Append(1, 7)).unwrap();
    sender.send(Append(1, 8)).unwrap();
    sender.send(Append(0, 8)).unwrap();
    // Flush outstanding items for batch with stream key `1` and `0`
    sender.send(Flush(1)).unwrap();
    sender.send(Flush(0)).unwrap();

    // Last buffered up items will be flushed automatically when this thread exits
    sender.send(Append(0, 9)).unwrap();
    sender.send(Append(1, 9)).unwrap();
    sender.send(Append(1, 10)).unwrap();
    sender.send(Append(0, 10)).unwrap();
    // Exiting closure will shutdown the producer thread
});

// Batches flushed due to individual batch size limit
assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);

assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);

// Batches flushed due to duration limit
assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);

assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);

// Batches flushed by sending `Flush` command starting from batch with stream key `1`
assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);

assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);

// Batches flushed by dropping sender (thread exit)
assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);

assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);

Re-exports

pub use crate::multi_buf_batch::Stats;

Structs

MultiBufBatchChannel

Collects items into multiple batches based on the stream key. A batch may become ready after collecting max_size number of items or until max_duration has elapsed since the first item appended to the batch.

Enums

Command

Commands that can be sent to MultiBufBatchChannel via Sender endpoint.