use crate::messaging::{BrokerMessage, PublishResponse, RequestArgs};
use crate::OP_HANDLE_MESSAGE;
use log::{error, info, trace};
use nats::Connection;
use std::error::Error;
use std::sync::Arc;
use std::sync::RwLock;
use std::{collections::HashMap, time::Duration};
use wascap::prelude::KeyPair;
use wasmcloud_provider_core::capabilities::Dispatcher;
use wasmcloud_provider_core::serialize;
const ENV_NATS_SUBSCRIPTION: &str = "SUBSCRIPTION";
const ENV_NATS_URL: &str = "URL";
const ENV_NATS_CLIENT_JWT: &str = "CLIENT_JWT";
const ENV_NATS_CLIENT_SEED: &str = "CLIENT_SEED";
const ENV_NATS_QUEUEGROUP_NAME: &str = "QUEUEGROUP_NAME";
const ENV_NATS_CREDSFILE: &str = "CREDSFILE";
pub(crate) fn publish(
client: &Connection,
msg: BrokerMessage,
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
trace!(
"Publishing message on {} ({} bytes)",
&msg.subject,
&msg.body.len()
);
let res = if !msg.reply_to.is_empty() {
client.publish_with_reply_or_headers(&msg.subject, Some(&msg.reply_to), None, &msg.body)
} else {
client.publish(&msg.subject, &msg.body)
};
match res {
Ok(_) => Ok(serialize(PublishResponse { published: true })?),
Err(e) => {
error!("Failed to publish message: {}", e);
Ok(serialize(PublishResponse { published: false })?)
}
}
}
pub(crate) fn request(
client: &Connection,
msg: RequestArgs,
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
let reply = client.request_timeout(
&msg.subject,
&msg.body,
Duration::from_millis(msg.timeout as u64),
)?;
Ok(reply.data)
}
pub(crate) fn initialize_client(
dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
actor: &str,
values: &HashMap<String, String>,
) -> Result<Connection, Box<dyn Error + Sync + Send>> {
let c = get_connection(values)?;
match values.get(ENV_NATS_SUBSCRIPTION) {
Some(ref subs) => {
let subs: Vec<_> = subs
.split(',')
.map(|s| {
if s.is_empty() {
Err("Empty topic".into())
} else {
create_subscription(actor, values, dispatcher.clone(), &c, s.to_string())
}
})
.collect();
if subs.is_empty() {
Err("No subscriptions created".into())
} else {
Ok(c)
}
}
None => Ok(c),
}
}
fn create_subscription(
actor: &str,
values: &HashMap<String, String>,
dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
client: &Connection,
sub: String,
) -> Result<(), Box<dyn Error + Sync + Send>> {
let actor = actor.to_string();
let _ = match values.get(ENV_NATS_QUEUEGROUP_NAME) {
Some(qgroup) => {
trace!("Queue subscribing '{}' to '{}'", qgroup, sub);
client
.queue_subscribe(&sub, qgroup)?
.with_handler(move |msg| {
let dm = delivermessage_for_natsmessage(&msg);
let buf = serialize(&dm).unwrap();
let d = dispatcher.read().unwrap();
if let Err(e) = d.dispatch(&actor, OP_HANDLE_MESSAGE, &buf) {
error!("Dispatch failed: {}", e);
}
Ok(())
})
}
None => {
trace!("Subscribing to '{}'", sub);
client.subscribe(&sub)?.with_handler(move |msg| {
let dm = delivermessage_for_natsmessage(&msg);
let buf = serialize(&dm).unwrap();
let d = dispatcher.read().unwrap();
if let Err(e) = d.dispatch(&actor, OP_HANDLE_MESSAGE, &buf) {
error!("Dispatch failed: {}", e);
}
Ok(())
})
}
};
Ok(())
}
fn delivermessage_for_natsmessage(msg: &nats::Message) -> BrokerMessage {
BrokerMessage {
subject: msg.subject.clone(),
reply_to: msg.reply.clone().unwrap_or_else(|| "".to_string()),
body: msg.data.clone(),
}
}
fn get_connection(
values: &HashMap<String, String>,
) -> Result<nats::Connection, Box<dyn std::error::Error + Send + Sync>> {
let nats_url = match values.get(ENV_NATS_URL) {
Some(v) => v,
None => "nats://0.0.0.0:4222",
}
.to_string();
info!("NATS provider host: {}", nats_url);
let mut opts = if let Some(creds) = get_credsfile(values) {
nats::Options::with_credentials(creds)
} else {
let jwt = values
.get(ENV_NATS_CLIENT_JWT)
.clone()
.unwrap_or(&"".to_string())
.to_string();
if !jwt.is_empty() {
let seed = values
.get(ENV_NATS_CLIENT_SEED)
.clone()
.unwrap_or(&"".to_string())
.to_string();
let kp = KeyPair::from_seed(&seed)?;
nats::Options::with_jwt(
move || Ok(jwt.to_string()),
move |nonce| kp.sign(nonce).unwrap(),
)
} else {
nats::Options::new()
}
};
opts = opts.with_name("wasmCloud NATS Provider");
opts.connect(&nats_url)
.map_err(|e| format!("NATS connection failure:{}", e).into())
}
fn get_credsfile(values: &HashMap<String, String>) -> Option<String> {
values.get(ENV_NATS_CREDSFILE).cloned()
}