greentic-start-dev 1.1.27190108346

Greentic lifecycle runner for start/restart/stop orchestration
Documentation
use std::path::Path;

use anyhow::Context;
use serde_json::{Value as JsonValue, json};

use crate::domains::Domain;
use crate::ingress_types::EventEnvelopeV1;
use crate::messaging_app as app;
use crate::operator_log;
use crate::runner_exec::{self, RunRequest};
use crate::runner_host::OperatorContext;

pub fn route_events_to_default_flow(
    bundle: &Path,
    ctx: &OperatorContext,
    events: &[EventEnvelopeV1],
) -> anyhow::Result<usize> {
    if events.is_empty() {
        return Ok(0);
    }
    let team = ctx.team.as_deref();
    let app_pack_path = app::resolve_app_pack_path(bundle, &ctx.tenant, team, None)
        .context("resolve default app pack for event routing")?;
    let pack_info = app::load_app_pack_info(&app_pack_path).context("load app pack manifest")?;
    let flow = app::select_app_flow(&pack_info).context("select app default flow")?;

    let mut routed = 0usize;
    for event in events {
        let input = build_event_flow_input(event, ctx);
        let request = RunRequest {
            root: bundle.to_path_buf(),
            domain: Domain::Events,
            pack_path: app_pack_path.clone(),
            pack_label: pack_info.pack_id.clone(),
            flow_id: flow.id.clone(),
            tenant: ctx.tenant.clone(),
            team: ctx.team.clone(),
            input,
            dist_offline: true,
        };
        runner_exec::run_provider_pack_flow(request)
            .with_context(|| format!("route event {} -> {}", event.event_type, flow.id))?;
        routed += 1;
    }
    operator_log::info(
        module_path!(),
        format!(
            "event router delivered {} event(s) to pack={} flow={}",
            routed, pack_info.pack_id, flow.id
        ),
    );
    Ok(routed)
}

fn build_event_flow_input(event: &EventEnvelopeV1, ctx: &OperatorContext) -> JsonValue {
    json!({
        "event": event,
        "events": [event],
        "tenant": ctx.tenant,
        "team": ctx.team,
        "correlation_id": event.correlation_id.clone().or(ctx.correlation_id.clone()),
    })
}

#[cfg(test)]
mod tests {
    use super::build_event_flow_input;
    use crate::runner_host::OperatorContext;

    #[test]
    fn build_event_flow_input_prefers_event_correlation_id() {
        let event: crate::ingress_types::EventEnvelopeV1 =
            serde_json::from_value(serde_json::json!({
                "event_id": "evt-1",
                "event_type": "subscription.created",
                "occurred_at": "2026-04-01T00:00:00Z",
                "source": {
                    "domain": "events",
                    "provider": "events-webhook",
                    "handler_id": "default"
                },
                "scope": {
                    "tenant": "demo",
                    "team": "default"
                },
                "correlation_id": "evt-corr",
                "payload": {"id": "1"}
            }))
            .expect("event");
        let ctx = OperatorContext {
            tenant: "demo".to_string(),
            team: Some("default".to_string()),
            correlation_id: Some("ctx-corr".to_string()),
        };

        let input = build_event_flow_input(&event, &ctx);
        assert_eq!(input["tenant"], "demo");
        assert_eq!(input["team"], "default");
        assert_eq!(input["correlation_id"], "evt-corr");
        assert_eq!(input["events"].as_array().expect("events").len(), 1);
    }

    #[test]
    fn build_event_flow_input_falls_back_to_context_correlation_id() {
        let event: crate::ingress_types::EventEnvelopeV1 =
            serde_json::from_value(serde_json::json!({
                "event_id": "evt-2",
                "event_type": "subscription.deleted",
                "occurred_at": "2026-04-01T00:00:01Z",
                "source": {
                    "domain": "events",
                    "provider": "events-webhook"
                },
                "scope": {
                    "tenant": "demo"
                },
                "payload": {"id": "2"}
            }))
            .expect("event");
        let ctx = OperatorContext {
            tenant: "demo".to_string(),
            team: None,
            correlation_id: Some("ctx-corr".to_string()),
        };

        let input = build_event_flow_input(&event, &ctx);
        assert_eq!(input["correlation_id"], "ctx-corr");
        assert!(input["team"].is_null());
    }
}