use crate::NumBytes;
use crate::output_stream::policy::{
BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, ReliableDelivery, Replay,
ReplayEnabled, ReplayRetention,
};
pub const DEFAULT_READ_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024);
pub const DEFAULT_MAX_BUFFERED_CHUNKS: usize = 128;
pub(crate) fn assert_max_buffered_chunks_non_zero(chunks: usize, parameter_name: &str) {
assert!(chunks > 0, "{parameter_name} must be greater than zero");
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamConfig<D = BestEffortDelivery, R = NoReplay>
where
D: Delivery,
R: Replay,
{
pub read_chunk_size: NumBytes,
pub max_buffered_chunks: usize,
pub delivery: D,
pub replay: R,
}
impl StreamConfig<BestEffortDelivery, NoReplay> {
#[must_use]
pub fn builder() -> StreamConfigBuilder {
StreamConfigBuilder
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamConfigBuilder;
impl StreamConfigBuilder {
#[must_use]
pub fn best_effort_delivery(self) -> StreamConfigReplayBuilder<BestEffortDelivery> {
StreamConfigReplayBuilder {
delivery: BestEffortDelivery,
}
}
#[must_use]
pub fn reliable_for_active_subscribers(self) -> StreamConfigReplayBuilder<ReliableDelivery> {
StreamConfigReplayBuilder {
delivery: ReliableDelivery,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamConfigReplayBuilder<D>
where
D: Delivery,
{
delivery: D,
}
impl<D> StreamConfigReplayBuilder<D>
where
D: Delivery,
{
#[must_use]
pub fn no_replay(self) -> StreamConfigReadChunkSizeBuilder<D, NoReplay> {
StreamConfigReadChunkSizeBuilder {
delivery: self.delivery,
replay: NoReplay,
}
}
#[must_use]
pub fn replay_last_chunks(
self,
chunks: usize,
) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
let replay_retention = ReplayRetention::LastChunks(chunks);
replay_retention.assert_non_zero("chunks");
StreamConfigReadChunkSizeBuilder {
delivery: self.delivery,
replay: ReplayEnabled::new(replay_retention),
}
}
#[must_use]
pub fn replay_last_bytes(
self,
bytes: NumBytes,
) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
let replay_retention = ReplayRetention::LastBytes(bytes);
replay_retention.assert_non_zero("bytes");
StreamConfigReadChunkSizeBuilder {
delivery: self.delivery,
replay: ReplayEnabled::new(replay_retention),
}
}
#[must_use]
pub fn replay_all(self) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
StreamConfigReadChunkSizeBuilder {
delivery: self.delivery,
replay: ReplayEnabled::new(ReplayRetention::All),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamConfigReadChunkSizeBuilder<D, R>
where
D: Delivery,
R: Replay,
{
delivery: D,
replay: R,
}
impl<D, R> StreamConfigReadChunkSizeBuilder<D, R>
where
D: Delivery,
R: Replay,
{
#[must_use]
pub fn read_chunk_size(
self,
read_chunk_size: NumBytes,
) -> StreamConfigMaxBufferedChunksBuilder<D, R> {
read_chunk_size.assert_non_zero("read_chunk_size");
StreamConfigMaxBufferedChunksBuilder {
delivery: self.delivery,
replay: self.replay,
read_chunk_size,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamConfigMaxBufferedChunksBuilder<D, R>
where
D: Delivery,
R: Replay,
{
delivery: D,
replay: R,
read_chunk_size: NumBytes,
}
impl<D, R> StreamConfigMaxBufferedChunksBuilder<D, R>
where
D: Delivery,
R: Replay,
{
#[must_use]
pub fn max_buffered_chunks(self, max_buffered_chunks: usize) -> StreamConfigReadyBuilder<D, R> {
assert_max_buffered_chunks_non_zero(max_buffered_chunks, "max_buffered_chunks");
StreamConfigReadyBuilder {
config: StreamConfig {
read_chunk_size: self.read_chunk_size,
max_buffered_chunks,
delivery: self.delivery,
replay: self.replay,
},
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamConfigReadyBuilder<D, R>
where
D: Delivery,
R: Replay,
{
config: StreamConfig<D, R>,
}
impl<D, R> StreamConfigReadyBuilder<D, R>
where
D: Delivery,
R: Replay,
{
#[must_use]
pub fn build(self) -> StreamConfig<D, R> {
self.config
}
}
impl<D, R> StreamConfig<D, R>
where
D: Delivery,
R: Replay,
{
#[must_use]
pub fn delivery_guarantee(self) -> DeliveryGuarantee {
self.delivery.guarantee()
}
#[must_use]
pub fn replay_retention(self) -> Option<ReplayRetention> {
self.replay.replay_retention()
}
#[must_use]
pub fn replay_enabled(self) -> bool {
self.replay.replay_enabled()
}
pub(crate) fn assert_valid(self, parameter_name: &str) {
self.read_chunk_size
.assert_non_zero(&format!("{parameter_name}.read_chunk_size"));
assert_max_buffered_chunks_non_zero(
self.max_buffered_chunks,
&format!("{parameter_name}.max_buffered_chunks"),
);
if let Some(replay_retention) = self.replay_retention() {
replay_retention.assert_non_zero(&format!("{parameter_name}.replay_retention"));
}
}
}
impl<D> StreamConfig<D, ReplayEnabled>
where
D: Delivery,
{
#[must_use]
pub fn with_replay_retention(mut self, replay_retention: ReplayRetention) -> Self {
replay_retention.assert_non_zero("replay_retention");
self.replay.replay_retention = replay_retention;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::output_stream::num_bytes::NumBytesExt;
use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
use assertr::prelude::*;
#[test]
fn builder_creates_expected_delivery_and_replay_configs() {
let config: StreamConfig<BestEffortDelivery, NoReplay> = StreamConfig::builder()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
.build();
assert_that!(config.delivery_guarantee()).is_equal_to(DeliveryGuarantee::BestEffort);
assert_that!(config.replay_enabled()).is_false();
assert_that!(config.replay_retention()).is_none();
assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
let config: StreamConfig<ReliableDelivery, NoReplay> = StreamConfig::builder()
.reliable_for_active_subscribers()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
.build();
assert_that!(config.delivery_guarantee())
.is_equal_to(DeliveryGuarantee::ReliableForActiveSubscribers);
assert_that!(config.replay_enabled()).is_false();
assert_that!(config.replay_retention()).is_none();
assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
let config: StreamConfig<BestEffortDelivery, ReplayEnabled> = StreamConfig::builder()
.best_effort_delivery()
.replay_last_chunks(2)
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
.build();
assert_that!(config.delivery_guarantee()).is_equal_to(DeliveryGuarantee::BestEffort);
assert_that!(config.replay_enabled()).is_true();
assert_that!(config.replay_retention()).is_equal_to(Some(ReplayRetention::LastChunks(2)));
assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
let config: StreamConfig<ReliableDelivery, ReplayEnabled> = StreamConfig::builder()
.reliable_for_active_subscribers()
.replay_last_bytes(16.bytes())
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
.build();
assert_that!(config.delivery_guarantee())
.is_equal_to(DeliveryGuarantee::ReliableForActiveSubscribers);
assert_that!(config.replay_enabled()).is_true();
assert_that!(config.replay_retention())
.is_equal_to(Some(ReplayRetention::LastBytes(16.bytes())));
assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
}
#[test]
fn invalid_configs_panic_with_parameter_names() {
assert_that_panic_by(|| {
let _config = StreamConfig::builder()
.best_effort_delivery()
.no_replay()
.read_chunk_size(0.bytes());
})
.has_type::<String>()
.is_equal_to("read_chunk_size must be greater than zero bytes");
assert_that_panic_by(|| {
let _config = StreamConfig::builder()
.best_effort_delivery()
.no_replay()
.read_chunk_size(8.bytes())
.max_buffered_chunks(0);
})
.has_type::<String>()
.is_equal_to("max_buffered_chunks must be greater than zero");
assert_that_panic_by(|| {
let _config = StreamConfig::builder()
.best_effort_delivery()
.replay_last_chunks(0);
})
.has_type::<String>()
.is_equal_to("chunks must retain at least one chunk");
assert_that_panic_by(|| {
let _config = StreamConfig::builder()
.best_effort_delivery()
.replay_last_bytes(NumBytes::zero());
})
.has_type::<String>()
.is_equal_to("bytes must retain at least one byte");
assert_that_panic_by(|| {
let _replay = ReplayEnabled::new(ReplayRetention::LastChunks(0));
})
.has_type::<String>()
.is_equal_to("replay_retention must retain at least one chunk");
assert_that_panic_by(|| {
let config = StreamConfig::builder()
.best_effort_delivery()
.replay_all()
.read_chunk_size(8.bytes())
.max_buffered_chunks(2)
.build();
let _config =
config.with_replay_retention(ReplayRetention::LastBytes(NumBytes::zero()));
})
.has_type::<String>()
.is_equal_to("replay_retention must retain at least one byte");
assert_that_panic_by(|| {
let config = StreamConfig {
read_chunk_size: 8.bytes(),
max_buffered_chunks: 2,
delivery: BestEffortDelivery,
replay: ReplayEnabled {
replay_retention: ReplayRetention::LastBytes(NumBytes::zero()),
},
};
config.assert_valid("options");
})
.has_type::<String>()
.is_equal_to("options.replay_retention must retain at least one byte");
}
#[tokio::test]
async fn one_config_constructs_both_stream_backends() {
use crate::OutputStream;
use crate::output_stream::backend::broadcast::BroadcastOutputStream;
use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
let config = StreamConfig::builder()
.best_effort_delivery()
.no_replay()
.read_chunk_size(8.bytes())
.max_buffered_chunks(2)
.build();
let broadcast = BroadcastOutputStream::from_stream(tokio::io::empty(), "stdout", config);
let single_subscriber =
SingleSubscriberOutputStream::from_stream(tokio::io::empty(), "stderr", config);
assert_that!(broadcast.read_chunk_size()).is_equal_to(8.bytes());
assert_that!(single_subscriber.read_chunk_size()).is_equal_to(8.bytes());
assert_that!(broadcast.max_buffered_chunks()).is_equal_to(2);
assert_that!(single_subscriber.max_buffered_chunks()).is_equal_to(2);
}
}