erdos 0.4.0

ERDOS is a platform for developing self-driving cars and robotics applications.
Documentation
use std::{
    fmt,
    sync::{Arc, Mutex},
};

use serde::Deserialize;

use crate::{
    communication::{Pusher, SendEndpoint},
    dataflow::{deadlines::ConditionContext, Data, Message, Timestamp},
};

use super::{errors::SendError, StreamId, WriteStreamT};

/// A [`WriteStream`] allows [operators](crate::dataflow::operator) to send data to other operators.
///
/// [`Data`] is sent via [`Message`]s, which are broadcast to operators that are connected to the stream.
/// Messages are rapidly sent to operators within the same node using zero-copy communication.
/// Messages sent across nodes are serialized using
/// [abomonation](https://github.com/TimelyDataflow/abomonation) if possible,
/// before falling back to [bincode](https://github.com/servo/bincode).
///
/// # Example
/// The following example shows an operator which sends a sequence of numbers on a [`WriteStream`],
/// and ensures that downstream operators progress by sending a watermark after each number.
/// ```
/// # use std::{thread, time::Duration};
/// #
/// # use erdos::dataflow::{
/// #     operator::{Source, OperatorConfig}, stream::WriteStreamT, Message, Timestamp, WriteStream
/// # };
/// #
/// struct CounterOperator {}
///
/// impl Source<usize> for CounterOperator {
///     fn run(&mut self, config: &OperatorConfig, write_stream: &mut WriteStream<usize>) {
///         for t in 0..10 {
///             let timestamp = Timestamp::Time(vec![t as u64]);
///             write_stream
///                 .send(Message::new_message(timestamp.clone(), t))
///                 .unwrap();
///             write_stream
///                 .send(Message::new_watermark(timestamp))
///                 .unwrap();
///             thread::sleep(Duration::from_millis(100));
///         }
///     }
/// }
/// ```

#[derive(Clone)]
pub struct WriteStream<D: Data> {
    /// The unique ID of the stream (automatically generated by the constructor)
    id: StreamId,
    /// The name of the stream.
    name: String,
    /// Sends message to other operators.
    pusher: Option<Pusher<Arc<Message<D>>>>,
    /// Statistics about this instance of the write stream.
    stats: Arc<Mutex<WriteStreamStatistics>>,
}

impl<D: Data> WriteStream<D> {
    /// Creates the [`WriteStream`] to be used to send messages to the dataflow.
    pub(crate) fn new(id: StreamId, name: &str) -> Self {
        tracing::debug!("Initializing a WriteStream {} with the ID: {}", name, id);
        Self {
            id,
            name: name.to_string(),
            pusher: Some(Pusher::new()),
            stats: Arc::new(Mutex::new(WriteStreamStatistics::new())),
        }
    }

    pub(crate) fn from_endpoints(
        endpoints: Vec<SendEndpoint<Arc<Message<D>>>>,
        id: StreamId,
    ) -> Self {
        let mut stream = Self::new(id, &id.to_string());
        for endpoint in endpoints {
            stream.add_endpoint(endpoint);
        }
        stream
    }

    /// Get the ID given to the stream by the constructor
    pub fn id(&self) -> StreamId {
        self.id
    }

    /// Get the name of the stream.
    pub fn name(&self) -> String {
        self.name.clone()
    }

    /// Returns `true` if a top watermark message was received or the
    /// [`IngestStream`](crate::dataflow::stream::IngestStream) failed to set up.
    pub fn is_closed(&self) -> bool {
        self.stats.lock().unwrap().is_stream_closed()
    }

    fn add_endpoint(&mut self, endpoint: SendEndpoint<Arc<Message<D>>>) {
        self.pusher
            .as_mut()
            .expect("Attempted to add endpoint to WriteStream, however no pusher exists")
            .add_endpoint(endpoint);
    }

    /// Closes the stream for future messages.
    fn close_stream(&mut self) {
        tracing::debug!("Closing write stream {} (ID: {})", self.name(), self.id());
        self.stats.lock().unwrap().close_stream();
        self.pusher = None;
    }

    /// Updates the last watermark received on the stream.
    ///
    /// # Arguments
    /// * `msg` - The message to be sent on the stream.
    fn update_statistics(&mut self, msg: &Message<D>) -> Result<(), SendError> {
        match msg {
            Message::TimestampedData(td) => {
                let mut stats = self.stats.lock().unwrap();
                if td.timestamp < *stats.low_watermark() {
                    return Err(SendError::TimestampError);
                }
                // Increment the message count.
                stats
                    .condition_context
                    .increment_msg_count(self.id(), td.timestamp.clone())
            }
            Message::Watermark(msg_watermark) => {
                let mut stats = self.stats.lock().unwrap();
                if msg_watermark < stats.low_watermark() {
                    return Err(SendError::TimestampError);
                }
                tracing::debug!(
                    "Updating watermark on WriteStream {} (ID: {}) from {:?} to {:?}",
                    self.name(),
                    self.id(),
                    stats.low_watermark(),
                    msg_watermark
                );
                stats.update_low_watermark(msg_watermark.clone());

                // Notify the arrival of the watermark.
                stats
                    .condition_context
                    .notify_watermark_arrival(self.id(), msg_watermark.clone());
            }
        }
        Ok(())
    }

    /// Gets the statistics of the WriteStream.
    #[allow(dead_code)]
    pub(crate) fn get_statistics(&self) -> Arc<Mutex<WriteStreamStatistics>> {
        Arc::clone(&self.stats)
    }

    /// Clears the condition context state.
    pub fn clear_state(&mut self, timestamp: Timestamp) {
        self.stats
            .lock()
            .unwrap()
            .condition_context
            .clear_state(self.id(), timestamp);
    }

    pub(crate) fn get_condition_context(&self) -> ConditionContext {
        self.stats.lock().unwrap().get_condition_context().clone()
    }
}

impl<D: Data> fmt::Debug for WriteStream<D> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WriteStream {{ id: {}, low_watermark: {:?} }}",
            self.id,
            self.stats.lock().unwrap().low_watermark,
        )
    }
}

impl<'a, D: Data + Deserialize<'a>> WriteStreamT<D> for WriteStream<D> {
    fn send(&mut self, msg: Message<D>) -> Result<(), SendError> {
        // Check if the stream was closed before, and return an error.
        if self.is_closed() {
            tracing::warn!(
                "Trying to send messages on a closed WriteStream {} (ID: {})",
                self.name(),
                self.id(),
            );
            return Err(SendError::Closed);
        }

        // Close the stream later if the message being sent represents the top watermark.
        let mut close_stream: bool = false;
        if msg.is_top_watermark() {
            tracing::debug!(
                "Sending top watermark on the stream {} (ID: {}).",
                self.name(),
                self.id()
            );
            close_stream = true;
        }

        // Update the watermark and send the message forward.
        self.update_statistics(&msg)?;
        let msg_arc = Arc::new(msg);

        match self.pusher.as_mut() {
            Some(pusher) => pusher.send(msg_arc).map_err(SendError::from)?,
            None => {
                tracing::debug!(
                    "No Pusher was found for the WriteStream {} (ID: {}). \
                             Skipping message sending.",
                    self.name(),
                    self.id()
                );
            }
        };

        // If we received a top watermark, close the stream.
        if close_stream {
            self.close_stream();
        }
        Ok(())
    }
}

/// Maintains statistics on the WriteStream required for the maintenance of the watermarks, and the
/// execution of end conditions for deadlines.
pub(crate) struct WriteStreamStatistics {
    low_watermark: Timestamp,
    is_stream_closed: bool,
    condition_context: ConditionContext,
}

impl WriteStreamStatistics {
    fn new() -> Self {
        Self {
            low_watermark: Timestamp::Bottom,
            is_stream_closed: false,
            condition_context: ConditionContext::new(),
        }
    }

    /// Closes the stream.
    fn close_stream(&mut self) {
        self.low_watermark = Timestamp::Top;
        self.is_stream_closed = true;
    }

    /// Is the stream closed?
    fn is_stream_closed(&self) -> bool {
        self.is_stream_closed
    }

    /// Returns the current low watermark on the stream.
    fn low_watermark(&self) -> &Timestamp {
        &self.low_watermark
    }

    /// Update the low watermark of the corresponding stream.
    /// Increases the low watermark only if watermark_timestamp > current low watermark.
    fn update_low_watermark(&mut self, watermark_timestamp: Timestamp) {
        if self.low_watermark < watermark_timestamp {
            self.low_watermark = watermark_timestamp;
        }
    }

    /// Get the ConditionContext saved in the stream.
    pub(crate) fn get_condition_context(&self) -> &ConditionContext {
        &self.condition_context
    }
}