erdos 0.4.0

ERDOS is a platform for developing self-driving cars and robotics applications.
Documentation
use std::{
    borrow::BorrowMut,
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

use serde::Deserialize;

use crate::{
    dataflow::{
        graph::{default_graph, StreamSetupHook},
        Data, Message,
    },
    scheduler::channel_manager::ChannelManager,
};

use super::{
    errors::{ReadError, TryReadError},
    OperatorStream, ReadStream, Stream, StreamId,
};

/// An [`ExtractStream`] enables drivers to read data from a running ERDOS application.
///
/// Similar to a [`ReadStream`], an [`ExtractStream`] exposes [`read`](ExtractStream::read) and
/// [`try_read`](ExtractStream::try_read) functions to allow drivers to read data output by the
/// operators of the graph.
///
/// # Example
/// The below example shows how to use an [`IngestStream`](crate::dataflow::stream::IngestStream)
/// to send data to a [`FlatMapOperator`](crate::dataflow::operators::FlatMapOperator),
/// and retrieve the processed values through an [`ExtractStream`].
/// ```no_run
/// # use erdos::dataflow::{
/// #    stream::{IngestStream, ExtractStream, Stream},
/// #    operators::FlatMapOperator,
/// #    OperatorConfig, Message, Timestamp
/// # };
/// # use erdos::*;
/// # use erdos::node::Node;
/// #
/// let args = erdos::new_app("ERDOS").get_matches();
/// let mut node = Node::new(Configuration::from_args(&args));
///
/// // Create an IngestStream.
/// let mut ingest_stream = IngestStream::new();
///
/// // Create an ExtractStream from the ReadStream of the FlatMapOperator.
/// let output_stream = erdos::connect_one_in_one_out(
///     || FlatMapOperator::new(|x: &usize| { std::iter::once(2 * x) }),
///     || {},
///     OperatorConfig::new().name("MapOperator"),
///     &ingest_stream,
/// );
/// let mut extract_stream = ExtractStream::new(&output_stream);
///
/// node.run_async();
///
/// // Send data on the IngestStream.
/// for i in 1..10 {
///     ingest_stream.send(Message::new_message(Timestamp::Time(vec![i as u64]), i)).unwrap();
/// }
///
/// // Retrieve mapped values using an ExtractStream.
/// for i in 1..10 {
///     let message = extract_stream.read().unwrap();
///     assert_eq!(*message.data().unwrap(), 2 * i);
/// }
/// ```
pub struct ExtractStream<D>
where
    for<'a> D: Data + Deserialize<'a>,
{
    /// The unique ID of the stream (automatically generated by the constructor)
    id: StreamId,
    /// The ReadStream associated with the ExtractStream.
    read_stream_option: Option<ReadStream<D>>,
    // Used to circumvent requiring Send to transfer ReadStream across threads
    channel_manager_option: Arc<Mutex<Option<Arc<Mutex<ChannelManager>>>>>,
}

impl<D> ExtractStream<D>
where
    for<'a> D: Data + Deserialize<'a>,
{
    /// Returns a new instance of the [`ExtractStream`].
    ///
    /// # Arguments
    /// * `stream`: The [`Stream`] returned by an [operator](crate::dataflow::operator)
    /// from which to extract messages.
    pub fn new(stream: &OperatorStream<D>) -> Self {
        tracing::debug!(
            "Initializing an ExtractStream with the ReadStream {} (ID: {})",
            stream.name(),
            stream.id(),
        );

        let id = stream.id();

        // Create the ExtractStream structure.
        let extract_stream = Self {
            id,
            read_stream_option: None,
            channel_manager_option: Arc::new(Mutex::new(None)),
        };

        default_graph::add_extract_stream(&extract_stream);
        extract_stream
    }

    /// Returns `true` if a top watermark message was sent or the [`ExtractStream`] failed to set
    /// up.
    pub fn is_closed(&self) -> bool {
        self.read_stream_option
            .as_ref()
            .map(ReadStream::is_closed)
            .unwrap_or(true)
    }

    /// Non-blocking read from the [`ExtractStream`].
    ///
    /// Returns the Message available on the [`ReadStream`], or an [`Empty`](TryReadError::Empty)
    /// if no message is available.
    pub fn try_read(&mut self) -> Result<Message<D>, TryReadError> {
        if let Some(read_stream) = self.read_stream_option.borrow_mut() {
            read_stream.try_read()
        } else {
            // Try to setup read stream
            if let Some(channel_manager) = &*self.channel_manager_option.lock().unwrap() {
                match channel_manager.lock().unwrap().take_recv_endpoint(self.id) {
                    Ok(recv_endpoint) => {
                        let mut read_stream = ReadStream::new(
                            self.id,
                            &default_graph::get_stream_name(&self.id),
                            Some(recv_endpoint),
                        );

                        let result = read_stream.try_read();
                        self.read_stream_option.replace(read_stream);
                        return result;
                    }
                    Err(msg) => tracing::error!(
                        "ExtractStream {} (ID: {}): error getting endpoint from \
                        channel manager \"{}\"",
                        self.name(),
                        self.id(),
                        msg
                    ),
                }
            }
            Err(TryReadError::Disconnected)
        }
    }

    /// Blocking read from the [`ExtractStream`].
    ///
    /// Returns the Message available on the [`ReadStream`].
    pub fn read(&mut self) -> Result<Message<D>, ReadError> {
        loop {
            let result = self.try_read();
            if self.read_stream_option.is_some() {
                match result {
                    Ok(msg) => return Ok(msg),
                    Err(TryReadError::Disconnected) => return Err(ReadError::Disconnected),
                    Err(TryReadError::Empty) => (),
                    Err(TryReadError::SerializationError) => {
                        return Err(ReadError::SerializationError)
                    }
                    Err(TryReadError::Closed) => return Err(ReadError::Closed),
                };
            } else {
                thread::sleep(Duration::from_millis(100));
            }
        }
    }

    pub fn id(&self) -> StreamId {
        self.id
    }

    pub fn name(&self) -> String {
        default_graph::get_stream_name(&self.id)
    }

    /// Returns a function that sets up self.read_stream_option using the channel_manager.
    pub(crate) fn get_setup_hook(&self) -> impl StreamSetupHook {
        let channel_manager_option_copy = Arc::clone(&self.channel_manager_option);

        move |channel_manager: Arc<Mutex<ChannelManager>>| {
            channel_manager_option_copy
                .lock()
                .unwrap()
                .replace(channel_manager);
        }
    }
}

// Needed to avoid deadlock in Python
unsafe impl<D> Send for ExtractStream<D> where for<'a> D: Data + Deserialize<'a> {}