use anyhow::{Error, Result};
use async_nats::jetstream::{
consumer::AckPolicy, consumer::push::Config as PushConfig, stream::Config as StreamConfig,
stream::RetentionPolicy,
};
use futures::StreamExt;
use gsm_core::{
AdapterDescriptor, AdapterRegistry, DefaultAdapterPacksConfig, OutMessage,
adapter_pack_paths_from_env, default_adapter_pack_paths,
};
use metrics::counter;
use std::path::PathBuf;
use tracing::{error, info, warn};
use crate::adapter_registry::AdapterLookup;
use crate::config::EgressConfig;
use gsm_bus::{BusClient, NatsBusClient, egress_subject_with_prefix, to_value};
pub async fn run() -> Result<()> {
let config = EgressConfig::from_env()?;
let client = async_nats::connect(&config.nats_url).await?;
let js = async_nats::jetstream::new(client.clone());
let default_cfg = DefaultAdapterPacksConfig::from_env();
let mut pack_paths =
default_adapter_pack_paths(PathBuf::from(&config.packs_root).as_path(), &default_cfg);
pack_paths.extend(adapter_pack_paths_from_env());
let registry = AdapterRegistry::load_from_paths(&pack_paths).unwrap_or_else(|err| {
warn!(error = %err, "failed to load adapter packs; proceeding without registry");
AdapterRegistry::default()
});
let adapters = AdapterLookup::new(®istry);
let bus = NatsBusClient::new(client.clone());
let stream_name = format!("messaging-egress-{}", config.env.0);
let stream = js
.get_or_create_stream(StreamConfig {
name: stream_name.clone(),
subjects: vec![config.subject_filter.clone()],
retention: RetentionPolicy::WorkQueue,
max_messages: -1,
max_messages_per_subject: -1,
max_bytes: -1,
..Default::default()
})
.await?;
let consumer_name = format!("messaging-egress-{}", config.env.0);
let deliver_subject = format!("deliver.messaging-egress.{}", config.env.0);
let consumer = stream
.get_or_create_consumer(
&consumer_name,
PushConfig {
durable_name: Some(consumer_name.clone()),
deliver_subject: deliver_subject.clone(),
deliver_group: Some(format!("messaging-egress-{}", config.env.0)),
ack_policy: AckPolicy::Explicit,
max_ack_pending: 128,
..Default::default()
},
)
.await?;
info!(
subject = %config.subject_filter,
stream = %stream_name,
consumer = %consumer_name,
"messaging-egress listening for envelopes"
);
let mut messages = consumer.messages().await?;
while let Some(result) = messages.next().await {
match result {
Ok(message) => {
if let Err(err) = process_message(
&message,
&adapters,
config.adapter.as_deref(),
&bus,
&config,
)
.await
{
error!(error = %err, "failed to process egress payload");
}
if let Err(err) = message.ack().await {
warn!(error = %err, "failed to ack egress delivery");
}
}
Err(err) => {
warn!(error = %err, "missing message from JetStream");
}
}
}
Ok(())
}
async fn process_message(
msg: &async_nats::jetstream::Message,
adapters: &AdapterLookup<'_>,
adapter_override: Option<&str>,
bus: &impl BusClient,
config: &EgressConfig,
) -> Result<(), Error> {
let out: OutMessage = serde_json::from_slice(&msg.payload)?;
info!(
env = %out.ctx.env.as_str(),
tenant = %out.tenant,
platform = %out.platform.as_str(),
chat_id = %out.chat_id,
"received OutMessage for processing"
);
let adapter = if let Some(name) = adapter_override {
adapters.egress(name)?
} else {
adapters.default_for_platform(out.platform.as_str())?
};
info!(
adapter = %adapter.name,
component = %adapter.component,
flow = ?adapter.flow_path(),
tenant = %out.tenant,
platform = %out.platform.as_str(),
"resolved egress adapter"
);
process_message_internal(&out, &adapter, bus, config).await
}
pub async fn process_message_internal(
out: &OutMessage,
adapter: &AdapterDescriptor,
bus: &impl BusClient,
config: &EgressConfig,
) -> Result<(), Error> {
let subject = egress_subject_with_prefix(
config.egress_prefix.as_str(),
&out.tenant,
out.platform.as_str(),
);
let payload = serde_json::json!({
"tenant": out.tenant,
"platform": out.platform.as_str(),
"chat_id": out.chat_id,
"text": out.text,
"kind": out.kind,
"metadata": out.meta,
"adapter": adapter.name,
});
let value = to_value(&payload)?;
bus.publish_value(&subject, value).await.map_err(|err| {
tracing::error!(
%subject,
tenant = %out.tenant,
platform = %out.platform.as_str(),
error = %err,
"failed to publish egress envelope"
);
anyhow::Error::new(err)
})?;
let _ = counter!(
"messaging_egress_total",
"tenant" => out.tenant.clone(),
"platform" => out.platform.as_str().to_string(),
"adapter" => adapter.name.clone()
);
Ok(())
}