use {
crate::{
Datum,
StreamId,
primitives::Short,
streams::{
producer::builder::ProducerConfig,
status::{ChannelInfo, When},
},
},
core::{
fmt::Debug,
pin::{Pin, pin},
task::{Context, Poll},
},
futures::{Sink, SinkExt},
std::sync::Arc,
tokio::sync::mpsc::{self, error::TrySendError},
tokio_util::sync::PollSender,
};
mod builder;
mod error;
mod sender;
mod sink;
mod worker;
pub(super) use sink::Sinks;
pub use {
builder::{Builder, Error as BuilderError},
error::Error,
};
#[derive(Clone)]
pub struct Producer<D: Datum> {
status: When,
chan: PollSender<D>,
config: Arc<ProducerConfig>,
}
impl<D: Datum> Debug for Producer<D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Producer<{}>({})",
Short(self.config.stream_id),
std::any::type_name::<D>()
)
}
}
impl<D: Datum> Producer<D> {
pub(crate) fn new(
chan: mpsc::Sender<D>,
status: When,
config: Arc<ProducerConfig>,
) -> Self {
Self {
status,
config,
chan: PollSender::new(chan),
}
}
pub fn stream_id(&self) -> &StreamId {
&self.config().stream_id
}
pub const fn when(&self) -> &When {
&self.status
}
pub fn config(&self) -> &ProducerConfig {
&self.config
}
pub fn is_online(&self) -> bool {
self.status.is_online()
}
pub fn try_send(&self, item: D) -> Result<(), Error<D>> {
if !self.is_online() {
return Err(Error::Offline(item));
}
let Some(inner) = self.chan.get_ref() else {
return Err(Error::Closed(Some(item)));
};
inner.try_send(item).map_err(|e| match e {
TrySendError::Full(d) => Error::Full(d),
TrySendError::Closed(d) => Error::Closed(Some(d)),
})
}
pub fn consumers(&self) -> impl Iterator<Item = ChannelInfo> {
let active = self.status.active.borrow().clone();
active.into_iter().map(|(_, info)| info)
}
}
impl<D: Datum> Sink<D> for Producer<D> {
type Error = Error<D>;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
let online_fut = pin!(this.status.online.wait_for(|s| *s));
match online_fut.poll(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
return Poll::Pending;
}
}
this
.chan
.poll_ready_unpin(cx)
.map_err(|e| Error::Closed(e.into_inner()))
}
fn start_send(self: Pin<&mut Self>, item: D) -> Result<(), Self::Error> {
self.try_send(item)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self
.get_mut()
.chan
.poll_flush_unpin(cx)
.map_err(|e| Error::Closed(e.into_inner()))
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self
.get_mut()
.chan
.poll_close_unpin(cx)
.map_err(|e| Error::Closed(e.into_inner()))
}
}