use std::{
fmt,
sync::{Arc, Mutex},
};
use serde::Deserialize;
use crate::{
communication::{Pusher, SendEndpoint},
dataflow::{deadlines::ConditionContext, Data, Message, Timestamp},
};
use super::{errors::SendError, StreamId, WriteStreamT};
#[derive(Clone)]
pub struct WriteStream<D: Data> {
id: StreamId,
name: String,
pusher: Option<Pusher<Arc<Message<D>>>>,
stats: Arc<Mutex<WriteStreamStatistics>>,
}
impl<D: Data> WriteStream<D> {
pub(crate) fn new(id: StreamId, name: &str) -> Self {
tracing::debug!("Initializing a WriteStream {} with the ID: {}", name, id);
Self {
id,
name: name.to_string(),
pusher: Some(Pusher::new()),
stats: Arc::new(Mutex::new(WriteStreamStatistics::new())),
}
}
pub(crate) fn from_endpoints(
endpoints: Vec<SendEndpoint<Arc<Message<D>>>>,
id: StreamId,
) -> Self {
let mut stream = Self::new(id, &id.to_string());
for endpoint in endpoints {
stream.add_endpoint(endpoint);
}
stream
}
pub fn id(&self) -> StreamId {
self.id
}
pub fn name(&self) -> String {
self.name.clone()
}
pub fn is_closed(&self) -> bool {
self.stats.lock().unwrap().is_stream_closed()
}
fn add_endpoint(&mut self, endpoint: SendEndpoint<Arc<Message<D>>>) {
self.pusher
.as_mut()
.expect("Attempted to add endpoint to WriteStream, however no pusher exists")
.add_endpoint(endpoint);
}
fn close_stream(&mut self) {
tracing::debug!("Closing write stream {} (ID: {})", self.name(), self.id());
self.stats.lock().unwrap().close_stream();
self.pusher = None;
}
fn update_statistics(&mut self, msg: &Message<D>) -> Result<(), SendError> {
match msg {
Message::TimestampedData(td) => {
let mut stats = self.stats.lock().unwrap();
if td.timestamp < *stats.low_watermark() {
return Err(SendError::TimestampError);
}
stats
.condition_context
.increment_msg_count(self.id(), td.timestamp.clone())
}
Message::Watermark(msg_watermark) => {
let mut stats = self.stats.lock().unwrap();
if msg_watermark < stats.low_watermark() {
return Err(SendError::TimestampError);
}
tracing::debug!(
"Updating watermark on WriteStream {} (ID: {}) from {:?} to {:?}",
self.name(),
self.id(),
stats.low_watermark(),
msg_watermark
);
stats.update_low_watermark(msg_watermark.clone());
stats
.condition_context
.notify_watermark_arrival(self.id(), msg_watermark.clone());
}
}
Ok(())
}
#[allow(dead_code)]
pub(crate) fn get_statistics(&self) -> Arc<Mutex<WriteStreamStatistics>> {
Arc::clone(&self.stats)
}
pub fn clear_state(&mut self, timestamp: Timestamp) {
self.stats
.lock()
.unwrap()
.condition_context
.clear_state(self.id(), timestamp);
}
pub(crate) fn get_condition_context(&self) -> ConditionContext {
self.stats.lock().unwrap().get_condition_context().clone()
}
}
impl<D: Data> fmt::Debug for WriteStream<D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WriteStream {{ id: {}, low_watermark: {:?} }}",
self.id,
self.stats.lock().unwrap().low_watermark,
)
}
}
impl<'a, D: Data + Deserialize<'a>> WriteStreamT<D> for WriteStream<D> {
fn send(&mut self, msg: Message<D>) -> Result<(), SendError> {
if self.is_closed() {
tracing::warn!(
"Trying to send messages on a closed WriteStream {} (ID: {})",
self.name(),
self.id(),
);
return Err(SendError::Closed);
}
let mut close_stream: bool = false;
if msg.is_top_watermark() {
tracing::debug!(
"Sending top watermark on the stream {} (ID: {}).",
self.name(),
self.id()
);
close_stream = true;
}
self.update_statistics(&msg)?;
let msg_arc = Arc::new(msg);
match self.pusher.as_mut() {
Some(pusher) => pusher.send(msg_arc).map_err(SendError::from)?,
None => {
tracing::debug!(
"No Pusher was found for the WriteStream {} (ID: {}). \
Skipping message sending.",
self.name(),
self.id()
);
}
};
if close_stream {
self.close_stream();
}
Ok(())
}
}
pub(crate) struct WriteStreamStatistics {
low_watermark: Timestamp,
is_stream_closed: bool,
condition_context: ConditionContext,
}
impl WriteStreamStatistics {
fn new() -> Self {
Self {
low_watermark: Timestamp::Bottom,
is_stream_closed: false,
condition_context: ConditionContext::new(),
}
}
fn close_stream(&mut self) {
self.low_watermark = Timestamp::Top;
self.is_stream_closed = true;
}
fn is_stream_closed(&self) -> bool {
self.is_stream_closed
}
fn low_watermark(&self) -> &Timestamp {
&self.low_watermark
}
fn update_low_watermark(&mut self, watermark_timestamp: Timestamp) {
if self.low_watermark < watermark_timestamp {
self.low_watermark = watermark_timestamp;
}
}
pub(crate) fn get_condition_context(&self) -> &ConditionContext {
&self.condition_context
}
}