use crate::output_stream::{Chunk, Next};
use std::borrow::Cow;
use std::fmt::Debug;
use std::future::Future;
use thiserror::Error;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
#[derive(Debug, Error)]
pub enum CollectorError {
#[error("Failed to join/terminate the collector task over stream '{stream_name}': {source}")]
TaskJoin {
stream_name: &'static str,
#[source]
source: tokio::task::JoinError,
},
}
pub trait Sink: Debug + Send + Sync + 'static {}
impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
pub trait AsyncChunkCollector<S: Sink>: Send + 'static {
fn collect<'a>(
&'a mut self,
chunk: Chunk,
sink: &'a mut S,
) -> impl Future<Output = Next> + Send + 'a;
}
pub trait AsyncLineCollector<S: Sink>: Send + 'static {
fn collect<'a>(
&'a mut self,
line: Cow<'a, str>,
sink: &'a mut S,
) -> impl Future<Output = Next> + Send + 'a;
}
pub struct Collector<S: Sink> {
pub(crate) stream_name: &'static str,
pub(crate) task: Option<JoinHandle<S>>,
pub(crate) task_termination_sender: Option<Sender<()>>,
}
impl<S: Sink> Collector<S> {
#[must_use]
pub fn is_finished(&self) -> bool {
self.task
.as_ref()
.is_none_or(tokio::task::JoinHandle::is_finished)
}
pub async fn wait(mut self) -> Result<S, CollectorError> {
let tts = self.task_termination_sender.take();
let sink = self
.task
.take()
.expect("`task` to be present.")
.await
.map_err(|err| CollectorError::TaskJoin {
stream_name: self.stream_name,
source: err,
});
drop(tts);
sink
}
pub async fn cancel(mut self) -> Result<S, CollectorError> {
let _res = self
.task_termination_sender
.take()
.expect("`task_termination_sender` to be present.")
.send(());
self.wait().await
}
}
impl<S: Sink> Drop for Collector<S> {
fn drop(&mut self) {
if let Some(task_termination_sender) = self.task_termination_sender.take() {
let _res = task_termination_sender.send(());
}
if let Some(task) = self.task.take() {
task.abort();
}
}
}