[−][src]Module multistream_batch::channel::buf_batch
This module provides BufBatchChannel
that will buffer items until the batch is ready and provide them in
one go using Drain
iterator.
Example
Collect batches of items after reaching different limits and use of Flush
command.
use multistream_batch::channel::buf_batch::BufBatchChannel; use multistream_batch::channel::buf_batch::Command::*; use std::time::Duration; use assert_matches::assert_matches; // Create producer thread with `BufBatchChannel` configured with a maximum size of 4 items and // a maximum batch duration since the first received item of 200 ms. let mut batch = BufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| { // Send a sequence of `Append` commands with integer item value sender.send(Append(1)).unwrap(); sender.send(Append(2)).unwrap(); sender.send(Append(3)).unwrap(); sender.send(Append(4)).unwrap(); // At this point batch should have reached its capacity of 4 items // Send some more to buffer up for next batch sender.send(Append(5)).unwrap(); sender.send(Append(6)).unwrap(); // Introduce delay to trigger maximum duration timeout std::thread::sleep(Duration::from_millis(400)); // Send items that will be flushed by `Flush` command sender.send(Append(7)).unwrap(); sender.send(Append(8)).unwrap(); // Flush outstanding items sender.send(Flush).unwrap(); // Last buffered up items will be flushed automatically when this thread exits sender.send(Append(9)).unwrap(); sender.send(Append(10)).unwrap(); // Exiting closure will shutdown the producer thread }); // Batch flushed due to size limit assert_matches!(batch.next(), Ok(drain) => assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4]) ); // Batch flushed due to duration limit assert_matches!(batch.next(), Ok(drain) => assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6]) ); // Batch flushed by sending `Flush` command assert_matches!(batch.next(), Ok(drain) => assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8]) ); // Batch flushed by dropping sender (thread exit) assert_matches!(batch.next(), Ok(drain) => assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10]) );
Structs
BufBatchChannel | Batches items in internal buffer up to |
DrainToEnd | Iterator that will drain all buffered items first and then all items from the channel. |
Enums
Command | Commands that can be sent to |