dora-cli 0.5.0

`dora` goal is to be a low latency, composable, and distributed data flow.
use std::{
    borrow::Cow,
    collections::{BTreeSet, HashMap},
    fmt,
};

use crate::common::{resolve_dataflow_identifier_interactive, rpc};
use dora_core::{config::InputMapping, descriptor::Descriptor};
use dora_message::{
    DataflowId,
    cli_to_coordinator::CoordinatorControlClient,
    id::{DataId, NodeId},
    tarpc,
};
use eyre::{ContextCompat, bail};
use uuid::Uuid;

#[derive(Debug, clap::Args)]
pub struct DataflowSelector {
    /// Identifier of the dataflow
    #[clap(long, short, value_name = "UUID_OR_NAME")]
    pub dataflow: Option<String>,
}

impl DataflowSelector {
    pub async fn resolve(
        &self,
        client: &CoordinatorControlClient,
    ) -> eyre::Result<(Uuid, Descriptor)> {
        let dataflow_id =
            resolve_dataflow_identifier_interactive(client, self.dataflow.as_deref()).await?;
        let info = rpc(
            "get dataflow info",
            client.info(tarpc::context::current(), dataflow_id),
        )
        .await?;
        Ok((dataflow_id, info.descriptor))
    }
}

#[derive(Debug, clap::Args)]
pub struct TopicSelector {
    #[clap(flatten)]
    pub dataflow: DataflowSelector,
    /// Data to inspect, e.g. `node_id/output_id`
    #[clap(value_name = "DATA")]
    pub data: Vec<String>,
}

#[derive(Clone, PartialOrd, Ord, PartialEq, Eq)]
pub struct TopicIdentifier {
    pub node_id: NodeId,
    pub data_id: DataId,
}

impl fmt::Display for TopicIdentifier {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}/{}", self.node_id, self.data_id)
    }
}

impl TopicSelector {
    pub async fn resolve(
        &self,
        client: &CoordinatorControlClient,
    ) -> eyre::Result<(DataflowId, BTreeSet<TopicIdentifier>)> {
        let (dataflow_id, dataflow_descriptor) = self.dataflow.resolve(client).await?;
        if !dataflow_descriptor.debug.publish_all_messages_to_zenoh {
            bail!(
                "Dataflow `{dataflow_id}` does not have `publish_all_messages_to_zenoh` enabled. You should enable it in order to inspect data.\n\
                \n\
                Tip: Add the following snipppet to your dataflow descriptor:\n\
                \n\
                ```\n\
                _unstable_debug:\n  publish_all_messages_to_zenoh: true\n\
                ```
                "
            );
        }

        let node_map = dataflow_descriptor
            .nodes
            .iter()
            .map(|node| (&node.id, node))
            .collect::<HashMap<_, _>>();

        let mut data = BTreeSet::new();
        if self.data.is_empty() {
            data.extend(dataflow_descriptor.nodes.iter().flat_map(|node| {
                node.outputs.iter().map(|output| TopicIdentifier {
                    node_id: node.id.clone(),
                    data_id: output.clone(),
                })
            }));
            return Ok((dataflow_id, data));
        }

        for s in &self.data {
            let mut s = Cow::Borrowed(s.as_str());
            if !s.contains('/') {
                s.to_mut().push('/');
            }
            match s.parse() {
                Ok(InputMapping::User(user)) => {
                    let node = *node_map
                        .get(&user.source)
                        .with_context(|| format!("Unknown node: `{}`", user.source))?;
                    if user.output.is_empty() {
                        data.extend(node.outputs.iter().map(|output| TopicIdentifier {
                            node_id: user.source.clone(),
                            data_id: output.clone(),
                        }));
                    } else if node.outputs.contains(&user.output) {
                        data.insert(TopicIdentifier {
                            node_id: user.source,
                            data_id: user.output,
                        });
                    } else {
                        bail!(
                            "Node `{}` does not have output `{}`",
                            user.source,
                            user.output
                        );
                    }
                }
                Ok(_) => {
                    bail!("Reserved input mapping cannot be inspected")
                }
                Err(e) => bail!("Invalid output id `{s}`: {e}"),
            }
        }

        Ok((dataflow_id, data))
    }
}