use crate::WaitForLineResult;
use crate::output_stream::config::StreamConfig;
use crate::output_stream::consumer::driver::consume_sync;
use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
use crate::output_stream::event::StreamEvent;
use crate::output_stream::line::adapter::LineAdapter;
use crate::output_stream::policy::{
BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, Replay, ReplayEnabled,
ReplayRetention,
};
use crate::output_stream::visitors::factories::impl_consumer_factories;
use crate::output_stream::visitors::wait::WaitForLineSink;
use crate::output_stream::{OutputStream, Subscription, TrySubscribable};
use crate::{
AsyncStreamVisitor, Consumer, LineParsingOptions, NumBytes, StreamConsumerError, StreamVisitor,
};
use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
type FactoryReturn<T> = Result<T, StreamConsumerError>;
mod reader;
mod state;
mod subscription;
use reader::{read_chunked_best_effort, read_chunked_reliable};
use state::{ActiveSubscriber, ConfiguredShared};
use subscription::SingleSubscriberSubscription;
impl Subscription for mpsc::Receiver<StreamEvent> {
fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_ {
self.recv()
}
}
pub struct SingleSubscriberOutputStream<D = BestEffortDelivery, R = NoReplay>
where
D: Delivery,
R: Replay,
{
stream_reader: JoinHandle<()>,
options: StreamConfig<D, R>,
configured_shared: Arc<ConfiguredShared>,
name: &'static str,
}
impl<D, R> Drop for SingleSubscriberOutputStream<D, R>
where
D: Delivery,
R: Replay,
{
fn drop(&mut self) {
self.stream_reader.abort();
self.configured_shared.clear_active();
}
}
impl<D, R> Debug for SingleSubscriberOutputStream<D, R>
where
D: Delivery + Debug,
R: Replay + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SingleSubscriberOutputStream")
.field("output_collector", &"non-debug < JoinHandle<()> >")
.field("options", &self.options)
.field("name", &self.name)
.finish_non_exhaustive()
}
}
impl<D, R> OutputStream for SingleSubscriberOutputStream<D, R>
where
D: Delivery,
R: Replay,
{
fn read_chunk_size(&self) -> NumBytes {
self.options.read_chunk_size
}
fn max_buffered_chunks(&self) -> usize {
self.options.max_buffered_chunks
}
fn name(&self) -> &'static str {
self.name
}
}
impl<D, R> SingleSubscriberOutputStream<D, R>
where
D: Delivery,
R: Replay,
{
pub fn from_stream<S>(stream: S, stream_name: &'static str, options: StreamConfig<D, R>) -> Self
where
S: AsyncRead + Unpin + Send + 'static,
{
options.assert_valid("options");
let shared = Arc::new(ConfiguredShared::new());
let active_rx = shared.subscribe_active();
let delivery_guarantee = options.delivery_guarantee();
let replay_retention = options.replay_retention();
let stream_reader = match delivery_guarantee {
DeliveryGuarantee::BestEffort => tokio::spawn(read_chunked_best_effort(
stream,
Arc::clone(&shared),
active_rx,
options.read_chunk_size,
replay_retention,
stream_name,
)),
DeliveryGuarantee::ReliableForActiveSubscribers => tokio::spawn(read_chunked_reliable(
stream,
Arc::clone(&shared),
active_rx,
options.read_chunk_size,
replay_retention,
stream_name,
)),
};
Self {
stream_reader,
options,
configured_shared: shared,
name: stream_name,
}
}
#[must_use]
pub fn replay_enabled(&self) -> bool {
self.options.replay_enabled()
}
#[must_use]
pub fn replay_retention(&self) -> Option<ReplayRetention> {
self.options.replay_retention()
}
fn take_subscription(&self) -> Result<SingleSubscriberSubscription, StreamConsumerError> {
let shared = &self.configured_shared;
let (sender, receiver) = mpsc::channel(self.options.max_buffered_chunks);
let (id, replay, terminal_event) = {
let mut state = shared
.state
.lock()
.expect("single-subscriber state poisoned");
if state.active_id.is_some() {
return Err(StreamConsumerError::ActiveConsumer {
stream_name: self.name,
});
}
let replay = if state.replay_sealed || self.options.replay_retention().is_none() {
VecDeque::default()
} else {
state.snapshot_events()
};
let id = state.attach_subscriber();
shared
.active_tx
.send_replace(Some(Arc::new(ActiveSubscriber { id, sender })));
(id, replay, state.terminal_event.clone())
};
Ok(SingleSubscriberSubscription {
id,
shared: Arc::clone(shared),
replay,
terminal_event,
live_receiver: Some(receiver),
})
}
}
impl<D> SingleSubscriberOutputStream<D, ReplayEnabled>
where
D: Delivery,
{
pub fn seal_replay(&self) {
let mut state = self
.configured_shared
.state
.lock()
.expect("single-subscriber state poisoned");
state.replay_sealed = true;
state.trim_replay_window(self.options.replay_retention());
}
#[must_use]
pub fn is_replay_sealed(&self) -> bool {
self.configured_shared
.state
.lock()
.expect("single-subscriber state poisoned")
.replay_sealed
}
}
impl<D, R> TrySubscribable for SingleSubscriberOutputStream<D, R>
where
D: Delivery,
R: Replay,
{
fn try_subscribe(&self) -> Result<impl Subscription, StreamConsumerError> {
self.take_subscription()
}
}
impl<D, R> SingleSubscriberOutputStream<D, R>
where
D: Delivery,
R: Replay,
{
pub fn consume_with<V>(&self, visitor: V) -> Result<Consumer<V::Output>, StreamConsumerError>
where
V: StreamVisitor,
{
Ok(spawn_consumer_sync(
self.name(),
self.take_subscription()?,
visitor,
))
}
pub fn consume_with_async<V>(
&self,
visitor: V,
) -> Result<Consumer<V::Output>, StreamConsumerError>
where
V: AsyncStreamVisitor,
{
Ok(spawn_consumer_async(
self.name(),
self.take_subscription()?,
visitor,
))
}
impl_consumer_factories!();
pub fn wait_for_line(
&self,
timeout: Duration,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
) -> Result<
impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static,
StreamConsumerError,
> {
let subscription = self.take_subscription()?;
let visitor = LineAdapter::new(options, WaitForLineSink::new(predicate));
Ok(async move {
let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
.await
{
Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
Ok(Err(err)) => Err(err),
Err(_) => Ok(WaitForLineResult::Timeout),
}
})
}
}
#[cfg(test)]
mod tests;