pub(crate) mod consumer;
pub(crate) mod backend;
pub(crate) mod config;
pub(crate) mod event;
pub(crate) mod line;
pub(crate) mod num_bytes;
pub(crate) mod policy;
pub(crate) mod visitor;
pub mod visitors;
use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
use crate::{AsyncStreamVisitor, Consumer, StreamVisitor};
use core::error::Error;
use event::StreamEvent;
use num_bytes::NumBytes;
pub trait OutputStream: Consumable {
fn read_chunk_size(&self) -> NumBytes;
fn max_buffered_chunks(&self) -> usize;
fn name(&self) -> &'static str;
}
pub trait Subscription: Send + 'static {
fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_;
}
pub trait Subscribable {
type Subscription: Subscription;
type SubscribeError: Error + Send + Sync + 'static;
fn try_subscribe(&self) -> Result<Self::Subscription, Self::SubscribeError>;
}
pub trait Consumable: Subscribable {
type Error: Error + Send + Sync + 'static + From<Self::SubscribeError>;
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the `Consumer`-internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
fn consume<V>(&self, visitor: V) -> Result<Consumer<V::Output>, Self::Error>
where
V: StreamVisitor,
Self: OutputStream,
{
Ok(spawn_consumer_sync(
self.name(),
self.try_subscribe()?,
visitor,
))
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the `Consumer`-internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
fn consume_async<V>(&self, visitor: V) -> Result<Consumer<V::Output>, Self::Error>
where
V: AsyncStreamVisitor,
Self: OutputStream,
{
Ok(spawn_consumer_async(
self.name(),
self.try_subscribe()?,
visitor,
))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Next {
Continue,
Break,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::output_stream::backend::broadcast::BroadcastOutputStream;
use crate::output_stream::backend::discard::DiscardedOutputStream;
use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
use crate::output_stream::config::StreamConfig;
use crate::output_stream::event::Chunk;
use crate::output_stream::visitors::inspect::InspectChunks;
use crate::{ConsumerError, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
use assertr::prelude::*;
use std::fmt::Debug;
use tokio::io::AsyncWriteExt;
async fn count_chunks<S>(stream: &S) -> Result<usize, ConsumerError>
where
S: Consumable + OutputStream,
S::SubscribeError: Debug,
{
use std::sync::{Arc, Mutex};
let counter = Arc::new(Mutex::new(0_usize));
let counter_in_visitor = Arc::clone(&counter);
let consumer = stream
.consume(
InspectChunks::builder()
.f(move |_chunk: Chunk| {
*counter_in_visitor.lock().unwrap() += 1;
Next::Continue
})
.build(),
)
.expect("consumer should start");
consumer.wait().await?;
Ok(*counter.lock().unwrap())
}
#[tokio::test]
async fn cross_backend_consumable_smoke() {
let stream_config: StreamConfig<crate::LossyWithoutBackpressure, crate::NoReplay> =
StreamConfig::builder()
.lossy_without_backpressure()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
.build();
let (broadcast_read, mut broadcast_write) = tokio::io::duplex(64);
let broadcast = BroadcastOutputStream::from_stream(broadcast_read, "bcast", stream_config);
broadcast_write.write_all(b"abc").await.unwrap();
drop(broadcast_write);
let broadcast_count = count_chunks(&broadcast).await.unwrap();
assert_that!(broadcast_count).is_greater_or_equal_to(1);
let (single_read, mut single_write) = tokio::io::duplex(64);
let single =
SingleSubscriberOutputStream::from_stream(single_read, "single", stream_config);
single_write.write_all(b"abc").await.unwrap();
drop(single_write);
let single_count = count_chunks(&single).await.unwrap();
assert_that!(single_count).is_greater_or_equal_to(1);
let discarded = DiscardedOutputStream::new("discard");
let discarded_count = count_chunks(&discarded).await.unwrap();
assert_that!(discarded_count).is_equal_to(0);
}
}