use crate::prelude::{Data, ErrorKind, PortId};
use crate::types::{LinkMessage, Payload, SerializerFn};
use crate::{bail, zferror, Result};
use flume::Sender;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use uhlc::{Timestamp, HLC};
pub struct Outputs {
pub(crate) hmap: HashMap<PortId, Vec<flume::Sender<LinkMessage>>>,
pub(crate) hlc: Arc<HLC>,
}
impl Deref for Outputs {
type Target = HashMap<PortId, Vec<flume::Sender<LinkMessage>>>;
fn deref(&self) -> &Self::Target {
&self.hmap
}
}
impl Outputs {
pub(crate) fn new(hlc: Arc<HLC>) -> Self {
Self {
hmap: HashMap::default(),
hlc,
}
}
pub(crate) fn insert(&mut self, port_id: PortId, tx: Sender<LinkMessage>) {
self.hmap.entry(port_id).or_insert_with(Vec::new).push(tx)
}
pub fn take(&mut self, port_id: impl AsRef<str>) -> Option<OutputBuilder> {
self.hmap
.remove(port_id.as_ref())
.map(|senders| OutputBuilder {
port_id: port_id.as_ref().into(),
senders,
hlc: Arc::clone(&self.hlc),
last_watermark: Arc::new(AtomicU64::new(
self.hlc.new_timestamp().get_time().as_u64(),
)),
})
}
}
pub struct OutputBuilder {
pub(crate) port_id: PortId,
pub(crate) senders: Vec<flume::Sender<LinkMessage>>,
pub(crate) hlc: Arc<HLC>,
pub(crate) last_watermark: Arc<AtomicU64>,
}
impl OutputBuilder {
pub fn raw(self) -> OutputRaw {
OutputRaw {
port_id: self.port_id,
senders: self.senders,
hlc: self.hlc,
last_watermark: self.last_watermark,
}
}
pub fn typed<T: Send + Sync + 'static>(
self,
serializer: impl Fn(&mut Vec<u8>, &T) -> anyhow::Result<()> + Send + Sync + 'static,
) -> Output<T> {
Output {
_phantom: PhantomData,
output_raw: self.raw(),
serializer: Arc::new(move |buffer, data| {
if let Some(typed) = (*data).as_any().downcast_ref::<T>() {
match (serializer)(buffer, typed) {
Ok(serialized_data) => Ok(serialized_data),
Err(e) => bail!(ErrorKind::DeserializationError, e),
}
} else {
bail!(
ErrorKind::DeserializationError,
"Failed to downcast provided value"
)
}
}),
}
}
}
#[derive(Clone)]
pub struct OutputRaw {
pub(crate) port_id: PortId,
pub(crate) senders: Vec<flume::Sender<LinkMessage>>,
pub(crate) hlc: Arc<HLC>,
pub(crate) last_watermark: Arc<AtomicU64>,
}
impl OutputRaw {
pub fn port_id(&self) -> &PortId {
&self.port_id
}
pub fn channels_count(&self) -> usize {
self.senders.len()
}
pub(crate) fn check_timestamp(&self, timestamp: Option<u64>) -> Result<Timestamp> {
let ts = match timestamp {
Some(ts_u64) => Timestamp::new(uhlc::NTP64(ts_u64), *self.hlc.get_id()),
None => self.hlc.new_timestamp(),
};
if ts.get_time().0 < self.last_watermark.load(Ordering::Relaxed) {
return Err(zferror!(ErrorKind::BelowWatermarkTimestamp(ts)).into());
}
Ok(ts)
}
pub(crate) fn try_forward(&self, message: LinkMessage) -> Result<()> {
let mut err_count = 0;
self.senders.iter().for_each(|sender| {
if let Err(e) = sender.try_send(message.clone()) {
err_count += 1;
match e {
flume::TrySendError::Full(_) => {
log::error!("[Output: {}] A channel is full", self.port_id)
}
flume::TrySendError::Disconnected(_) => {
log::error!("[Output: {}] A channel is disconnected", self.port_id)
}
}
}
});
if err_count > 0 {
return Err(zferror!(
ErrorKind::SendError,
"[Output: {}] Encountered {} errors while sending (async) data",
self.port_id,
err_count
)
.into());
}
Ok(())
}
pub fn try_send(&self, data: impl Into<Payload>, timestamp: Option<u64>) -> Result<()> {
let ts = self.check_timestamp(timestamp)?;
let message = LinkMessage::from_payload(data.into(), ts);
self.try_forward(message)
}
pub fn try_send_watermark(&self, timestamp: Option<u64>) -> Result<()> {
let ts = self.check_timestamp(timestamp)?;
self.last_watermark
.store(ts.get_time().0, Ordering::Relaxed);
let message = LinkMessage::Watermark(ts);
self.try_forward(message)
}
pub async fn forward(&self, message: LinkMessage) -> Result<()> {
let mut err = 0;
let fut_senders = self
.senders
.iter()
.map(|sender| sender.send_async(message.clone()));
let res = futures::future::join_all(fut_senders).await;
res.iter().for_each(|res| {
if let Err(e) = res {
log::error!(
"[Output: {}] Error occured while sending to downstream node(s): {:?}",
self.port_id(),
e
);
err += 1;
}
});
if err > 0 {
return Err(zferror!(
ErrorKind::SendError,
"[Output: {}] Encountered {} errors while sending (async) data",
self.port_id,
err
)
.into());
}
Ok(())
}
pub async fn send(&self, data: impl Into<Payload>, timestamp: Option<u64>) -> Result<()> {
let ts = self.check_timestamp(timestamp)?;
let message = LinkMessage::from_payload(data.into(), ts);
self.forward(message).await
}
pub async fn send_watermark(&self, timestamp: Option<u64>) -> Result<()> {
let ts = self.check_timestamp(timestamp)?;
self.last_watermark
.store(ts.get_time().0, Ordering::Relaxed);
let message = LinkMessage::Watermark(ts);
self.forward(message).await
}
}
#[derive(Clone)]
pub struct Output<T> {
_phantom: PhantomData<T>,
pub(crate) output_raw: OutputRaw,
pub(crate) serializer: Arc<SerializerFn>,
}
impl<T> Deref for Output<T> {
type Target = OutputRaw;
fn deref(&self) -> &Self::Target {
&self.output_raw
}
}
impl<T: Send + Sync + 'static> Output<T> {
fn construct_message(
&self,
data: impl Into<Data<T>>,
timestamp: Option<u64>,
) -> Result<LinkMessage> {
let ts = self.check_timestamp(timestamp)?;
let payload = Payload::from_data(data.into(), Arc::clone(&self.serializer));
Ok(LinkMessage::from_payload(payload, ts))
}
pub async fn send(&self, data: impl Into<Data<T>>, timestamp: Option<u64>) -> Result<()> {
self.output_raw
.forward(self.construct_message(data, timestamp)?)
.await
}
pub fn try_send(&self, data: impl Into<Data<T>>, timestamp: Option<u64>) -> Result<()> {
self.output_raw
.try_forward(self.construct_message(data, timestamp)?)
}
}
#[cfg(test)]
#[path = "./tests/output-tests.rs"]
mod tests;