greentic-x 0.4.13

Greentic-X CLI for catalog-driven composition, scaffolding, validation, and simulation.
Documentation
use greentic_x_flow::{
    FlowDefinition, JoinMode, JoinStep, OperationCallStep, RenderSource, RenderSpec, ResolverStep,
    ReturnStep, SplitBranch, SplitStep, Step, ValueSource,
};
use greentic_x_types::{OperationId, ResolverId};
use serde::{Deserialize, Serialize};
use std::path::Path;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct ObservabilityPlaybookProfile {
    pub profile_id: String,
    pub resolver: String,
    #[serde(default)]
    pub query_ops: Vec<String>,
    #[serde(default)]
    pub analysis_ops: Vec<String>,
    pub present_op: String,
    #[serde(default)]
    pub split_join: Option<SplitJoinProfile>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct SplitJoinProfile {
    pub branches: Vec<SplitJoinBranch>,
    #[serde(default = "default_join_mode")]
    pub join_mode: JoinMode,
    #[serde(default = "default_join_output_key")]
    pub join_output_key: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct SplitJoinBranch {
    pub branch_id: String,
    #[serde(default)]
    pub query_ops: Vec<String>,
    #[serde(default)]
    pub analysis_ops: Vec<String>,
    #[serde(default)]
    pub simulated_duration_ms: u64,
}

fn default_join_mode() -> JoinMode {
    JoinMode::All
}

fn default_join_output_key() -> String {
    "merged".to_owned()
}

pub(crate) fn read_profile(path: &Path) -> Result<ObservabilityPlaybookProfile, String> {
    let data = std::fs::read_to_string(path)
        .map_err(|err| format!("failed to read {}: {err}", path.display()))?;
    serde_json::from_str(&data).map_err(|err| format!("failed to parse {}: {err}", path.display()))
}

pub(crate) fn validate_profile(profile: &ObservabilityPlaybookProfile) -> Vec<String> {
    let mut issues = Vec::new();
    if profile.profile_id.trim().is_empty() {
        issues.push("profile_id must not be empty".to_owned());
    }
    if ResolverId::new(profile.resolver.clone()).is_err() {
        issues.push(format!(
            "resolver {} is not a valid identifier",
            profile.resolver
        ));
    }
    for operation_id in profile.query_ops.iter().chain(profile.analysis_ops.iter()) {
        if OperationId::new(operation_id.clone()).is_err() {
            issues.push(format!(
                "operation {} is not a valid identifier",
                operation_id
            ));
        }
    }
    if OperationId::new(profile.present_op.clone()).is_err() {
        issues.push(format!(
            "present_op {} is not a valid identifier",
            profile.present_op
        ));
    }
    if profile.query_ops.is_empty() && profile.split_join.is_none() {
        issues.push("query_ops must not be empty when split_join is absent".to_owned());
    }
    if profile.analysis_ops.is_empty() && profile.split_join.is_none() {
        issues.push("analysis_ops must not be empty when split_join is absent".to_owned());
    }
    if let Some(split_join) = &profile.split_join {
        if split_join.branches.len() < 2 {
            issues.push("split_join must define at least two branches".to_owned());
        }
        for branch in &split_join.branches {
            if branch.branch_id.trim().is_empty() {
                issues.push("split_join branch_id must not be empty".to_owned());
            }
            if branch.query_ops.is_empty() {
                issues.push(format!(
                    "split_join branch {} must define at least one query op",
                    branch.branch_id
                ));
            }
            if branch.analysis_ops.is_empty() {
                issues.push(format!(
                    "split_join branch {} must define at least one analysis op",
                    branch.branch_id
                ));
            }
            for operation_id in branch.query_ops.iter().chain(branch.analysis_ops.iter()) {
                if OperationId::new(operation_id.clone()).is_err() {
                    issues.push(format!(
                        "split_join branch {} contains invalid operation {}",
                        branch.branch_id, operation_id
                    ));
                }
            }
        }
    }
    issues
}

pub(crate) fn compile_profile(
    profile: &ObservabilityPlaybookProfile,
) -> Result<FlowDefinition, String> {
    let issues = validate_profile(profile);
    if !issues.is_empty() {
        return Err(issues.join("; "));
    }

    let resolver_id =
        ResolverId::new(profile.resolver.clone()).map_err(|err| format!("resolver id: {err}"))?;
    let present_op = OperationId::new(profile.present_op.clone())
        .map_err(|err| format!("present op id {}: {err}", profile.present_op))?;

    let mut steps = Vec::new();
    steps.push(Step::resolve(
        "resolve",
        ResolverStep::new(resolver_id, ValueSource::input("query"), "resolved"),
    ));

    if let Some(split_join) = &profile.split_join {
        let mut branches = Vec::new();
        for branch in &split_join.branches {
            let branch_steps = compile_linear_steps(
                &branch.query_ops,
                &branch.analysis_ops,
                ValueSource::input("query"),
                &format!("branch_{}", branch.branch_id),
            )?;
            let final_output_key = last_output_key(&branch_steps).ok_or_else(|| {
                format!(
                    "split_join branch {} does not produce an output key",
                    branch.branch_id
                )
            })?;
            let mut branch_steps = branch_steps;
            branch_steps.push(Step::return_output(
                format!("return_{}", branch.branch_id),
                ReturnStep::new(ValueSource::context(final_output_key)),
            ));
            branches.push(SplitBranch {
                branch_id: branch.branch_id.clone(),
                simulated_duration_ms: branch.simulated_duration_ms,
                steps: branch_steps,
            });
        }
        steps.push(Step::split("split", SplitStep { branches }));
        steps.push(Step::join(
            "join",
            JoinStep {
                split_step_id: "split".to_owned(),
                mode: split_join.join_mode,
                output_key: split_join.join_output_key.clone(),
                timeout_ms: None,
            },
        ));
        steps.push(Step::call(
            "present",
            OperationCallStep::from_source(
                present_op,
                ValueSource::context(split_join.join_output_key.clone()),
                "present_result",
            ),
        ));
    } else {
        let linear_steps = compile_linear_steps(
            &profile.query_ops,
            &profile.analysis_ops,
            ValueSource::context("resolved"),
            "",
        )?;
        steps.extend(linear_steps);
        let input_source = last_output_key(&steps)
            .map(ValueSource::context)
            .ok_or_else(|| "compiled profile has no operation output".to_owned())?;
        steps.push(Step::call(
            "present",
            OperationCallStep::from_source(present_op, input_source, "present_result"),
        ));
    }

    steps.push(Step::return_output(
        "return",
        ReturnStep::new(ValueSource::context("present_result.output")).with_render(RenderSpec {
            renderer_id: "noop.summary".to_owned(),
            source: RenderSource::AllEvidence,
            view_id: "summary-card".to_owned(),
            title: format!("{} summary", profile.profile_id),
            summary: "Compiled from gx.observability.playbook.v1".to_owned(),
        }),
    ));

    Ok(FlowDefinition {
        flow_id: profile.profile_id.clone(),
        steps,
    })
}

fn compile_linear_steps(
    query_ops: &[String],
    analysis_ops: &[String],
    initial_source: ValueSource,
    prefix: &str,
) -> Result<Vec<Step>, String> {
    let mut steps = Vec::new();
    let mut source = initial_source;
    for (index, operation_id) in query_ops.iter().enumerate() {
        let op_id = OperationId::new(operation_id.clone())
            .map_err(|err| format!("query op {}: {err}", operation_id))?;
        let step_id = prefixed(prefix, "query", index);
        let output_key = format!("{}_result", step_id);
        steps.push(Step::call(
            step_id.clone(),
            OperationCallStep::from_source(op_id, source, output_key.clone()),
        ));
        source = ValueSource::context(format!("{output_key}.output"));
    }
    for (index, operation_id) in analysis_ops.iter().enumerate() {
        let op_id = OperationId::new(operation_id.clone())
            .map_err(|err| format!("analysis op {}: {err}", operation_id))?;
        let step_id = prefixed(prefix, "analyse", index);
        let output_key = format!("{}_result", step_id);
        steps.push(Step::call(
            step_id.clone(),
            OperationCallStep::from_source(op_id, source, output_key.clone()),
        ));
        source = ValueSource::context(format!("{output_key}.output"));
    }
    Ok(steps)
}

fn prefixed(prefix: &str, stem: &str, index: usize) -> String {
    if prefix.is_empty() {
        format!("{stem}_{index}")
    } else {
        format!("{prefix}_{stem}_{index}")
    }
}

fn last_output_key(steps: &[Step]) -> Option<String> {
    steps.iter().rev().find_map(|step| match &step.kind {
        greentic_x_flow::StepKind::Call(call) => Some(call.output_key.clone()),
        greentic_x_flow::StepKind::Resolve(resolve) => Some(resolve.output_key.clone()),
        _ => None,
    })
}