greentic-operator 0.4.43

Greentic operator CLI for local dev and demo orchestration.
Documentation
use std::path::Path;

use anyhow::Context;
use serde_json::{Value as JsonValue, json};

use crate::demo::ingress_types::EventEnvelopeV1;
use crate::demo::runner_host::OperatorContext;
use crate::domains::Domain;
use crate::messaging_universal::app;
use crate::operator_log;
use crate::runner_exec::{self, RunRequest};

pub fn route_events_to_default_flow(
    bundle: &Path,
    ctx: &OperatorContext,
    events: &[EventEnvelopeV1],
) -> anyhow::Result<usize> {
    if events.is_empty() {
        return Ok(0);
    }
    let team = ctx.team.as_deref();
    let app_pack_path = app::resolve_app_pack_path(bundle, &ctx.tenant, team, None)
        .context("resolve default app pack for event routing")?;
    let pack_info = app::load_app_pack_info(&app_pack_path).context("load app pack manifest")?;
    let flow = app::select_app_flow(&pack_info).context("select app default flow")?;

    let mut routed = 0usize;
    for event in events {
        let input = build_event_flow_input(event, ctx);
        let request = RunRequest {
            root: bundle.to_path_buf(),
            domain: Domain::Events,
            pack_path: app_pack_path.clone(),
            pack_label: pack_info.pack_id.clone(),
            flow_id: flow.id.clone(),
            tenant: ctx.tenant.clone(),
            team: ctx.team.clone(),
            input,
            dist_offline: true,
        };
        runner_exec::run_provider_pack_flow(request)
            .with_context(|| format!("route event {} -> {}", event.event_type, flow.id))?;
        routed += 1;
    }
    operator_log::info(
        module_path!(),
        format!(
            "event router delivered {} event(s) to pack={} flow={}",
            routed, pack_info.pack_id, flow.id
        ),
    );
    Ok(routed)
}

fn build_event_flow_input(event: &EventEnvelopeV1, ctx: &OperatorContext) -> JsonValue {
    json!({
        "event": event,
        "events": [event],
        "tenant": ctx.tenant,
        "team": ctx.team,
        "correlation_id": event.correlation_id.clone().or(ctx.correlation_id.clone()),
    })
}