messaging-egress 0.1.0

Greentic messaging egress worker: consumes normalized envelopes and delivers platform-specific payloads.
Documentation
use anyhow::{Error, Result};
use async_nats::jetstream::{
    consumer::AckPolicy, consumer::push::Config as PushConfig, stream::Config as StreamConfig,
    stream::RetentionPolicy,
};
use futures::StreamExt;
use gsm_core::{
    AdapterDescriptor, AdapterRegistry, DefaultAdapterPacksConfig, OutMessage,
    adapter_pack_paths_from_env, default_adapter_pack_paths,
};
use metrics::counter;
use std::path::PathBuf;
use tracing::{error, info, warn};

use crate::adapter_registry::AdapterLookup;
use crate::config::EgressConfig;
use gsm_bus::{BusClient, NatsBusClient, egress_subject_with_prefix, to_value};

pub async fn run() -> Result<()> {
    let config = EgressConfig::from_env()?;
    let client = async_nats::connect(&config.nats_url).await?;
    let js = async_nats::jetstream::new(client.clone());

    let default_cfg = DefaultAdapterPacksConfig::from_env();
    let mut pack_paths =
        default_adapter_pack_paths(PathBuf::from(&config.packs_root).as_path(), &default_cfg);
    pack_paths.extend(adapter_pack_paths_from_env());
    let registry = AdapterRegistry::load_from_paths(&pack_paths).unwrap_or_else(|err| {
        warn!(error = %err, "failed to load adapter packs; proceeding without registry");
        AdapterRegistry::default()
    });
    let adapters = AdapterLookup::new(&registry);
    let bus = NatsBusClient::new(client.clone());

    let stream_name = format!("messaging-egress-{}", config.env.0);
    let stream = js
        .get_or_create_stream(StreamConfig {
            name: stream_name.clone(),
            subjects: vec![config.subject_filter.clone()],
            retention: RetentionPolicy::WorkQueue,
            max_messages: -1,
            max_messages_per_subject: -1,
            max_bytes: -1,
            ..Default::default()
        })
        .await?;

    let consumer_name = format!("messaging-egress-{}", config.env.0);
    let deliver_subject = format!("deliver.messaging-egress.{}", config.env.0);
    let consumer = stream
        .get_or_create_consumer(
            &consumer_name,
            PushConfig {
                durable_name: Some(consumer_name.clone()),
                deliver_subject: deliver_subject.clone(),
                deliver_group: Some(format!("messaging-egress-{}", config.env.0)),
                ack_policy: AckPolicy::Explicit,
                max_ack_pending: 128,
                ..Default::default()
            },
        )
        .await?;

    info!(
        subject = %config.subject_filter,
        stream = %stream_name,
        consumer = %consumer_name,
        "messaging-egress listening for envelopes"
    );

    let mut messages = consumer.messages().await?;

    while let Some(result) = messages.next().await {
        match result {
            Ok(message) => {
                if let Err(err) = process_message(
                    &message,
                    &adapters,
                    config.adapter.as_deref(),
                    &bus,
                    &config,
                )
                .await
                {
                    error!(error = %err, "failed to process egress payload");
                }
                if let Err(err) = message.ack().await {
                    warn!(error = %err, "failed to ack egress delivery");
                }
            }
            Err(err) => {
                warn!(error = %err, "missing message from JetStream");
            }
        }
    }

    Ok(())
}

async fn process_message(
    msg: &async_nats::jetstream::Message,
    adapters: &AdapterLookup<'_>,
    adapter_override: Option<&str>,
    bus: &impl BusClient,
    config: &EgressConfig,
) -> Result<(), Error> {
    let out: OutMessage = serde_json::from_slice(&msg.payload)?;
    info!(
        env = %out.ctx.env.as_str(),
        tenant = %out.tenant,
        platform = %out.platform.as_str(),
        chat_id = %out.chat_id,
        "received OutMessage for processing"
    );
    let adapter = if let Some(name) = adapter_override {
        adapters.egress(name)?
    } else {
        adapters.default_for_platform(out.platform.as_str())?
    };
    info!(
        adapter = %adapter.name,
        component = %adapter.component,
        flow = ?adapter.flow_path(),
        tenant = %out.tenant,
        platform = %out.platform.as_str(),
        "resolved egress adapter"
    );
    process_message_internal(&out, &adapter, bus, config).await
}

/// Internal helper used by tests to avoid NATS.
pub async fn process_message_internal(
    out: &OutMessage,
    adapter: &AdapterDescriptor,
    bus: &impl BusClient,
    config: &EgressConfig,
) -> Result<(), Error> {
    let subject = egress_subject_with_prefix(
        config.egress_prefix.as_str(),
        &out.tenant,
        out.platform.as_str(),
    );
    let payload = serde_json::json!({
        "tenant": out.tenant,
        "platform": out.platform.as_str(),
        "chat_id": out.chat_id,
        "text": out.text,
        "kind": out.kind,
        "metadata": out.meta,
        "adapter": adapter.name,
    });
    let value = to_value(&payload)?;
    bus.publish_value(&subject, value).await.map_err(|err| {
        tracing::error!(
            %subject,
            tenant = %out.tenant,
            platform = %out.platform.as_str(),
            error = %err,
            "failed to publish egress envelope"
        );
        anyhow::Error::new(err)
    })?;
    let _ = counter!(
        "messaging_egress_total",
        "tenant" => out.tenant.clone(),
        "platform" => out.platform.as_str().to_string(),
        "adapter" => adapter.name.clone()
    );
    Ok(())
}