greentic-operator 1.1.0-dev.26776008977

Greentic operator CLI for local dev and demo orchestration.
Documentation
use std::fs;
use std::path::{Path, PathBuf};

use serde::{Deserialize, Serialize};

use crate::capabilities::{
    CapabilityOfferRecord, CapabilitySubscriptionRecord, CapabilityTopicRecord, ResolveScope,
};
use crate::demo::runner_host::OperatorContext;
use crate::domains::Domain;
use crate::state_layout;

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum CapabilityAdapterKind {
    DirectComponent,
    HttpCompatibility,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolvedCallableRecord {
    pub capability_id: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub operation_id: Option<String>,
    pub stable_id: String,
    pub pack_id: String,
    pub domain: String,
    pub pack_path: String,
    pub provider_component_ref: String,
    pub provider_op: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub input_schema_ref: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub output_schema_ref: Option<String>,
    pub version: String,
    pub requires_setup: bool,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub setup_qa_ref: Option<String>,
    pub adapter_kind: CapabilityAdapterKind,
    pub scope: ResolvedScopeRecord,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolvedScopeRecord {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub env: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tenant: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub team: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolvedTopicRecord {
    pub topic: String,
    pub event_type: String,
    pub pack_id: String,
    pub domain: String,
    pub pack_path: String,
    pub publisher_component_ref: String,
    pub publisher_op: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub payload_schema_ref: Option<String>,
    pub adapter_kind: CapabilityAdapterKind,
    pub priority: i32,
    pub scope: ResolvedScopeRecord,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolvedSubscriptionRecord {
    pub subscription_id: String,
    pub topic: String,
    pub event_type: String,
    pub pack_id: String,
    pub domain: String,
    pub pack_path: String,
    pub capability_id: String,
    pub op: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub payload_schema_ref: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub filter_json_pointer: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub filter_equals: Option<String>,
    pub delivery_mode: String,
    pub max_attempts: u32,
    pub priority: i32,
    pub scope: ResolvedScopeRecord,
}

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct CapabilityCatalogArtifacts {
    pub callables: Vec<ResolvedCallableRecord>,
    pub subscriptions: Vec<ResolvedSubscriptionRecord>,
    pub topics: Vec<ResolvedTopicRecord>,
}

impl CapabilityCatalogArtifacts {
    pub fn empty() -> Self {
        Self::default()
    }
}

pub fn build_catalog_artifacts(
    offers: &[CapabilityOfferRecord],
    topics: &[CapabilityTopicRecord],
    subscriptions: &[CapabilitySubscriptionRecord],
    ctx: &OperatorContext,
    scope: &ResolveScope,
) -> CapabilityCatalogArtifacts {
    let callables = offers
        .iter()
        .map(|offer| ResolvedCallableRecord {
            capability_id: offer.cap_id.clone(),
            operation_id: offer.operation_id.clone(),
            stable_id: offer.stable_id.clone(),
            pack_id: offer.pack_id.clone(),
            domain: domain_name(offer.domain).to_string(),
            pack_path: offer.pack_path.display().to_string(),
            provider_component_ref: offer.provider_component_ref.clone(),
            provider_op: offer.provider_op.clone(),
            input_schema_ref: offer.input_schema_ref.clone(),
            output_schema_ref: offer.output_schema_ref.clone(),
            version: offer.version.clone(),
            requires_setup: offer.requires_setup,
            setup_qa_ref: offer.setup_qa_ref.clone(),
            adapter_kind: adapter_kind(offer.adapter_kind.as_deref()),
            scope: ResolvedScopeRecord {
                env: scope.env.clone(),
                tenant: Some(ctx.tenant.clone()),
                team: ctx.team.clone(),
            },
        })
        .collect();
    let topics = topics
        .iter()
        .map(|topic| ResolvedTopicRecord {
            topic: topic.topic.clone(),
            event_type: topic.event_type.clone(),
            pack_id: topic.pack_id.clone(),
            domain: domain_name(topic.domain).to_string(),
            pack_path: topic.pack_path.display().to_string(),
            publisher_component_ref: topic.publisher_component_ref.clone(),
            publisher_op: topic.publisher_op.clone(),
            payload_schema_ref: topic.payload_schema_ref.clone(),
            adapter_kind: adapter_kind(topic.adapter_kind.as_deref()),
            priority: topic.priority,
            scope: ResolvedScopeRecord {
                env: scope.env.clone(),
                tenant: Some(ctx.tenant.clone()),
                team: ctx.team.clone(),
            },
        })
        .collect();
    let subscriptions = subscriptions
        .iter()
        .map(|subscription| ResolvedSubscriptionRecord {
            subscription_id: subscription.subscription_id.clone(),
            topic: subscription.topic.clone(),
            event_type: subscription.event_type.clone(),
            pack_id: subscription.pack_id.clone(),
            domain: domain_name(subscription.domain).to_string(),
            pack_path: subscription.pack_path.display().to_string(),
            capability_id: subscription.capability_id.clone(),
            op: subscription.op.clone(),
            payload_schema_ref: subscription.payload_schema_ref.clone(),
            filter_json_pointer: subscription.filter_json_pointer.clone(),
            filter_equals: subscription.filter_equals.clone(),
            delivery_mode: subscription.delivery_mode.clone(),
            max_attempts: subscription.max_attempts,
            priority: subscription.priority,
            scope: ResolvedScopeRecord {
                env: scope.env.clone(),
                tenant: Some(ctx.tenant.clone()),
                team: ctx.team.clone(),
            },
        })
        .collect();

    CapabilityCatalogArtifacts {
        callables,
        subscriptions,
        topics,
    }
}

pub fn write_capability_catalog_artifacts(
    root: &Path,
    tenant: &str,
    team: Option<&str>,
    artifacts: &CapabilityCatalogArtifacts,
) -> anyhow::Result<CapabilityCatalogPaths> {
    let callables_path = state_layout::capability_state_path(root, tenant, team, "callables.json");
    let subscriptions_path =
        state_layout::capability_state_path(root, tenant, team, "subscriptions.json");
    let topics_path = state_layout::capability_state_path(root, tenant, team, "topics.json");

    write_json(&callables_path, &artifacts.callables)?;
    write_json(&subscriptions_path, &artifacts.subscriptions)?;
    write_json(&topics_path, &artifacts.topics)?;

    Ok(CapabilityCatalogPaths {
        callables: callables_path,
        subscriptions: subscriptions_path,
        topics: topics_path,
    })
}

fn write_json(path: &PathBuf, value: &impl Serialize) -> anyhow::Result<()> {
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)?;
    }
    let bytes = serde_json::to_vec_pretty(value)?;
    fs::write(path, bytes)?;
    Ok(())
}

fn domain_name(domain: Domain) -> &'static str {
    match domain {
        Domain::Messaging => "messaging",
        Domain::Events => "events",
        Domain::Secrets => "secrets",
        Domain::OAuth => "oauth",
    }
}

fn adapter_kind(raw: Option<&str>) -> CapabilityAdapterKind {
    match raw {
        Some("http_compatibility") => CapabilityAdapterKind::HttpCompatibility,
        _ => CapabilityAdapterKind::DirectComponent,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use tempfile::tempdir;

    #[test]
    fn writes_catalog_files_in_generic_state_layout() -> anyhow::Result<()> {
        let tmp = tempdir()?;
        let callable = ResolvedCallableRecord {
            stable_id: "offer.1".to_string(),
            pack_id: "pack-a".to_string(),
            domain: "messaging".to_string(),
            pack_path: "/tmp/pack-a.gtpack".to_string(),
            capability_id: "cap://example.v1".to_string(),
            operation_id: Some("perform_action".to_string()),
            version: "1".to_string(),
            provider_component_ref: "provider.component".to_string(),
            provider_op: "provider.invoke".to_string(),
            input_schema_ref: Some("schemas/input.json".to_string()),
            output_schema_ref: Some("schemas/output.json".to_string()),
            requires_setup: false,
            setup_qa_ref: None,
            adapter_kind: CapabilityAdapterKind::DirectComponent,
            scope: ResolvedScopeRecord {
                env: Some("dev".to_string()),
                tenant: Some("tenant-a".to_string()),
                team: Some("team-b".to_string()),
            },
        };
        let artifacts = CapabilityCatalogArtifacts {
            callables: vec![callable],
            subscriptions: vec![ResolvedSubscriptionRecord {
                subscription_id: "sub-a".to_string(),
                topic: "topic://example.v1".to_string(),
                event_type: "example.v1".to_string(),
                pack_id: "pack-b".to_string(),
                domain: "events".to_string(),
                pack_path: "/tmp/pack-b.gtpack".to_string(),
                capability_id: "cap://subscriber.v1".to_string(),
                op: "handle".to_string(),
                payload_schema_ref: Some("schemas/event.json".to_string()),
                filter_json_pointer: Some("/kind".to_string()),
                filter_equals: Some("match".to_string()),
                delivery_mode: "at_least_once".to_string(),
                max_attempts: 2,
                priority: 10,
                scope: ResolvedScopeRecord {
                    env: Some("dev".to_string()),
                    tenant: Some("tenant-a".to_string()),
                    team: Some("team-b".to_string()),
                },
            }],
            topics: vec![ResolvedTopicRecord {
                topic: "topic://example.v1".to_string(),
                event_type: "example.v1".to_string(),
                pack_id: "pack-c".to_string(),
                domain: "events".to_string(),
                pack_path: "/tmp/pack-c.gtpack".to_string(),
                publisher_component_ref: "events.component".to_string(),
                publisher_op: "publish".to_string(),
                payload_schema_ref: Some("schemas/event.json".to_string()),
                adapter_kind: CapabilityAdapterKind::DirectComponent,
                priority: 5,
                scope: ResolvedScopeRecord {
                    env: Some("dev".to_string()),
                    tenant: Some("tenant-a".to_string()),
                    team: Some("team-b".to_string()),
                },
            }],
        };
        let paths =
            write_capability_catalog_artifacts(tmp.path(), "tenant-a", Some("team-b"), &artifacts)?;

        assert!(paths.callables.exists());
        assert!(paths.subscriptions.exists());
        assert!(paths.topics.exists());

        let callables = std::fs::read_to_string(paths.callables)?;
        let subscriptions = std::fs::read_to_string(paths.subscriptions)?;
        let topics = std::fs::read_to_string(paths.topics)?;
        assert!(callables.contains("\"capability_id\": \"cap://example.v1\""));
        assert!(callables.contains("\"adapter_kind\": \"direct_component\""));
        assert!(subscriptions.contains("\"subscription_id\": \"sub-a\""));
        assert!(topics.contains("\"topic\": \"topic://example.v1\""));
        Ok(())
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CapabilityCatalogPaths {
    pub callables: PathBuf,
    pub subscriptions: PathBuf,
    pub topics: PathBuf,
}