[][src]Module multistream_batch::channel::buf_batch

This module provides BufBatchChannel that will buffer items until the batch is ready and provide them in one go using Drain iterator.

Example

Collect batches of items after reaching different limits and use of Flush command.

use multistream_batch::channel::buf_batch::BufBatchChannel;
use multistream_batch::channel::buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread with `BufBatchChannel` configured with a maximum size of 4 items and
// a maximum batch duration since the first received item of 200 ms.
let mut batch = BufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
    // Send a sequence of `Append` commands with integer item value
    sender.send(Append(1)).unwrap();
    sender.send(Append(2)).unwrap();
    sender.send(Append(3)).unwrap();
    sender.send(Append(4)).unwrap();
    // At this point batch should have reached its capacity of 4 items

    // Send some more to buffer up for next batch
    sender.send(Append(5)).unwrap();
    sender.send(Append(6)).unwrap();

    // Introduce delay to trigger maximum duration timeout
    std::thread::sleep(Duration::from_millis(400));

    // Send items that will be flushed by `Flush` command
    sender.send(Append(7)).unwrap();
    sender.send(Append(8)).unwrap();
    // Flush outstanding items
    sender.send(Flush).unwrap();

    // Last buffered up items will be flushed automatically when this thread exits
    sender.send(Append(9)).unwrap();
    sender.send(Append(10)).unwrap();
    // Exiting closure will shutdown the producer thread
});

// Batch flushed due to size limit
assert_matches!(batch.next(), Ok(drain) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);

// Batch flushed due to duration limit
assert_matches!(batch.next(), Ok(drain) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);

// Batch flushed by sending `Flush` command
assert_matches!(batch.next(), Ok(drain) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);

// Batch flushed by dropping sender (thread exit)
assert_matches!(batch.next(), Ok(drain) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);

Structs

BufBatchChannel

Batches items in internal buffer up to max_size items or until max_duration has elapsed since the first item appended to the batch.

DrainToEnd

Iterator that will drain all buffered items first and then all items from the channel.

Enums

Command

Commands that can be sent to BufBatchChannel via Sender endpoint.