dora-node-api 0.2.0-rc-10

Dora Rust Node API
Documentation
use super::{communication::DaemonChannel, EventStreamThreadHandle};
use dora_core::{
    config::{DataId, NodeId},
    daemon_messages::{DaemonRequest, Data, DataflowId},
    message::Metadata,
};
use eyre::{bail, eyre, Context};
use std::sync::Arc;

pub(crate) struct ControlChannel {
    channel: DaemonChannel,
    _event_stream_thread_handle: Option<Arc<EventStreamThreadHandle>>,
}

impl ControlChannel {
    #[tracing::instrument(skip(channel, event_stream_thread_handle))]
    pub(crate) fn init(
        dataflow_id: DataflowId,
        node_id: &NodeId,
        mut channel: DaemonChannel,
        event_stream_thread_handle: Arc<EventStreamThreadHandle>,
    ) -> eyre::Result<Self> {
        channel.register(dataflow_id, node_id.clone())?;

        Ok(Self {
            channel,
            _event_stream_thread_handle: Some(event_stream_thread_handle),
        })
    }

    pub fn report_stop(&mut self) -> eyre::Result<()> {
        let reply = self
            .channel
            .request(&DaemonRequest::Stopped)
            .wrap_err("failed to report stopped to dora-daemon")?;
        match reply {
            dora_core::daemon_messages::DaemonReply::Result(result) => result
                .map_err(|e| eyre!(e))
                .wrap_err("failed to report stop event to dora-daemon")?,
            other => bail!("unexpected stopped reply: {other:?}"),
        }
        Ok(())
    }

    pub fn report_closed_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
        let reply = self
            .channel
            .request(&DaemonRequest::CloseOutputs(outputs))
            .wrap_err("failed to report closed outputs to dora-daemon")?;
        match reply {
            dora_core::daemon_messages::DaemonReply::Result(result) => result
                .map_err(|e| eyre!(e))
                .wrap_err("failed to receive closed outputs reply from dora-daemon")?,
            other => bail!("unexpected closed outputs reply: {other:?}"),
        }
        Ok(())
    }

    pub fn send_message(
        &mut self,
        output_id: DataId,
        metadata: Metadata<'static>,
        data: Option<Data>,
    ) -> eyre::Result<()> {
        let request = DaemonRequest::SendMessage {
            output_id,
            metadata,
            data,
        };
        let reply = self
            .channel
            .request(&request)
            .wrap_err("failed to send SendMessage request to dora-daemon")?;
        match reply {
            dora_core::daemon_messages::DaemonReply::Empty => Ok(()),
            other => bail!("unexpected SendMessage reply: {other:?}"),
        }
    }
}