use crate::output_stream::OutputStream;
use crate::output_stream::backend::broadcast::BroadcastOutputStream;
use crate::output_stream::backend::discard::DiscardedOutputStream;
use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
use crate::output_stream::config::{
StreamConfig, StreamConfigBuilder, StreamConfigMaxBufferedChunksBuilder,
StreamConfigReadChunkSizeBuilder, StreamConfigReadyBuilder, StreamConfigReplayBuilder,
};
use crate::output_stream::policy::{
BestEffortDelivery, Delivery, ReliableDelivery, Replay, ReplayEnabled,
};
use std::marker::PhantomData;
use std::process::Stdio;
use tokio::io::AsyncRead;
mod process_stream_config {
use super::OutputStream;
use std::process::Stdio;
use tokio::io::AsyncRead;
pub trait Sealed<Stream>
where
Stream: OutputStream,
{
fn child_stdio(&self) -> Stdio;
fn into_stream<S>(self, captured: Option<S>, stream_name: &'static str) -> Stream
where
S: AsyncRead + Unpin + Send + 'static;
}
}
pub trait ProcessStreamConfig<Stream>: process_stream_config::Sealed<Stream>
where
Stream: OutputStream,
{
}
impl<Config, Stream> ProcessStreamConfig<Stream> for Config
where
Config: process_stream_config::Sealed<Stream>,
Stream: OutputStream,
{
}
impl<D, R> process_stream_config::Sealed<BroadcastOutputStream<D, R>>
for PipedStreamConfig<BroadcastBackend, StreamConfigReadyBuilder<D, R>>
where
D: Delivery,
R: Replay,
{
fn child_stdio(&self) -> Stdio {
Stdio::piped()
}
fn into_stream<S>(
self,
captured: Option<S>,
stream_name: &'static str,
) -> BroadcastOutputStream<D, R>
where
S: AsyncRead + Unpin + Send + 'static,
{
let stream = captured.expect(
"broadcast backend requires a captured pipe; child_stdio() promised Stdio::piped()",
);
BroadcastOutputStream::from_stream(stream, stream_name, self.stage.build())
}
}
impl<D, R> process_stream_config::Sealed<SingleSubscriberOutputStream<D, R>>
for PipedStreamConfig<SingleSubscriberBackend, StreamConfigReadyBuilder<D, R>>
where
D: Delivery,
R: Replay,
{
fn child_stdio(&self) -> Stdio {
Stdio::piped()
}
fn into_stream<S>(
self,
captured: Option<S>,
stream_name: &'static str,
) -> SingleSubscriberOutputStream<D, R>
where
S: AsyncRead + Unpin + Send + 'static,
{
let stream = captured.expect(
"single-subscriber backend requires a captured pipe; child_stdio() promised \
Stdio::piped()",
);
SingleSubscriberOutputStream::from_stream(stream, stream_name, self.stage.build())
}
}
impl process_stream_config::Sealed<DiscardedOutputStream> for DiscardedStreamConfig {
fn child_stdio(&self) -> Stdio {
Stdio::null()
}
fn into_stream<S>(
self,
_captured: Option<S>,
stream_name: &'static str,
) -> DiscardedOutputStream
where
S: AsyncRead + Unpin + Send + 'static,
{
DiscardedOutputStream::new(stream_name)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ProcessStreamBuilder;
#[doc(hidden)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BroadcastBackend;
#[doc(hidden)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SingleSubscriberBackend;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DiscardedStreamConfig;
#[doc(hidden)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PipedStreamConfig<Backend, Stage> {
stage: Stage,
_backend: PhantomData<Backend>,
}
impl<Backend, Stage> PipedStreamConfig<Backend, Stage> {
fn new(stage: Stage) -> Self {
Self {
stage,
_backend: PhantomData,
}
}
}
impl<Backend> PipedStreamConfig<Backend, StreamConfigBuilder> {
#[must_use]
pub fn best_effort_delivery(
self,
) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<BestEffortDelivery>> {
PipedStreamConfig::new(self.stage.best_effort_delivery())
}
#[must_use]
pub fn reliable_for_active_subscribers(
self,
) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<ReliableDelivery>> {
PipedStreamConfig::new(self.stage.reliable_for_active_subscribers())
}
}
impl<Backend, D> PipedStreamConfig<Backend, StreamConfigReplayBuilder<D>>
where
D: Delivery,
{
#[must_use]
pub fn no_replay(
self,
) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, crate::NoReplay>> {
PipedStreamConfig::new(self.stage.no_replay())
}
#[must_use]
pub fn replay_last_chunks(
self,
chunks: usize,
) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
PipedStreamConfig::new(self.stage.replay_last_chunks(chunks))
}
#[must_use]
pub fn replay_last_bytes(
self,
bytes: crate::NumBytes,
) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
PipedStreamConfig::new(self.stage.replay_last_bytes(bytes))
}
#[must_use]
pub fn replay_all(
self,
) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
PipedStreamConfig::new(self.stage.replay_all())
}
}
impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, R>>
where
D: Delivery,
R: Replay,
{
#[must_use]
pub fn read_chunk_size(
self,
read_chunk_size: crate::NumBytes,
) -> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>> {
PipedStreamConfig::new(self.stage.read_chunk_size(read_chunk_size))
}
}
impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>>
where
D: Delivery,
R: Replay,
{
#[must_use]
pub fn max_buffered_chunks(
self,
max_buffered_chunks: usize,
) -> PipedStreamConfig<Backend, StreamConfigReadyBuilder<D, R>> {
PipedStreamConfig::new(self.stage.max_buffered_chunks(max_buffered_chunks))
}
}
impl ProcessStreamBuilder {
#[must_use]
pub fn broadcast(self) -> PipedStreamConfig<BroadcastBackend, StreamConfigBuilder> {
PipedStreamConfig::new(StreamConfig::builder())
}
#[must_use]
pub fn single_subscriber(
self,
) -> PipedStreamConfig<SingleSubscriberBackend, StreamConfigBuilder> {
PipedStreamConfig::new(StreamConfig::builder())
}
#[must_use]
pub fn discard(self) -> DiscardedStreamConfig {
DiscardedStreamConfig
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{NumBytes, NumBytesExt};
use assertr::prelude::*;
mod read_chunk_size {
use super::*;
#[test]
fn panics_on_zero_value() {
assert_that_panic_by(|| {
let _config = ProcessStreamBuilder
.single_subscriber()
.best_effort_delivery()
.no_replay()
.read_chunk_size(NumBytes::zero())
.max_buffered_chunks(1);
})
.has_type::<String>()
.is_equal_to("read_chunk_size must be greater than zero bytes");
}
}
mod max_buffered_chunks {
use super::*;
#[test]
fn panics_on_zero_for_single_subscriber() {
assert_that_panic_by(|| {
let _config = ProcessStreamBuilder
.single_subscriber()
.best_effort_delivery()
.no_replay()
.read_chunk_size(8.bytes())
.max_buffered_chunks(0);
})
.has_type::<String>()
.is_equal_to("max_buffered_chunks must be greater than zero");
}
#[test]
fn panics_on_zero_for_broadcast() {
assert_that_panic_by(|| {
let _config = ProcessStreamBuilder
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(8.bytes())
.max_buffered_chunks(0);
})
.has_type::<String>()
.is_equal_to("max_buffered_chunks must be greater than zero");
}
}
}