use {
crate::{
Datum,
StreamId,
primitives::Short,
streams::{
consumer::builder::ConsumerConfig,
status::{ChannelInfo, Stats, When},
},
},
core::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
},
futures::Stream,
std::sync::Arc,
tokio::sync::mpsc,
tokio_util::sync::DropGuard,
};
mod builder;
mod receiver;
mod worker;
pub use builder::Builder;
use tokio::sync::mpsc::error::TryRecvError;
pub struct Consumer<D: Datum> {
status: When,
config: Arc<ConsumerConfig>,
stats: Stats,
chan: mpsc::UnboundedReceiver<(D, usize)>,
_abort: DropGuard,
}
impl<D: Datum> Debug for Consumer<D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Consumer<{}>({}, {} producers)",
Short(self.config.stream_id),
std::any::type_name::<D>(),
self.status.active.borrow().len(),
)
}
}
impl<D: Datum> Consumer<D> {
pub fn stream_id(&self) -> &StreamId {
&self.config.stream_id
}
pub const fn when(&self) -> &When {
&self.status
}
pub fn is_online(&self) -> bool {
self.status.is_online()
}
pub fn config(&self) -> &ConsumerConfig {
&self.config
}
pub const fn stats(&self) -> &Stats {
&self.stats
}
pub fn producers(&self) -> impl Iterator<Item = ChannelInfo> {
let active = self.status.active.borrow().clone();
active.into_iter().map(|(_, info)| info)
}
pub async fn recv(&mut self) -> Option<D> {
match self.chan.recv().await {
Some((datum, bytes_len)) => {
self.stats.increment_datums();
self.stats.increment_bytes(bytes_len);
Some(datum)
}
None => None,
}
}
pub fn try_recv(&mut self) -> Result<D, TryRecvError> {
match self.chan.try_recv() {
Ok((datum, bytes_len)) => {
self.stats.increment_datums();
self.stats.increment_bytes(bytes_len);
Ok(datum)
}
Err(e) => Err(e),
}
}
}
impl<D: Datum> Stream for Consumer<D> {
type Item = D;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.chan.poll_recv(cx) {
Poll::Ready(Some((datum, bytes_len))) => {
this.stats.increment_datums();
this.stats.increment_bytes(bytes_len);
Poll::Ready(Some(datum))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}