wascc_nats/
natsprov.rs

1use crate::generated::messaging::{BrokerMessage, PublishResponse, RequestArgs};
2use std::error::Error;
3use std::sync::Arc;
4use std::sync::RwLock;
5use std::{collections::HashMap, time::Duration};
6use wascc_codec::capabilities::Dispatcher;
7
8use crate::OP_DELIVER_MESSAGE;
9use nats::Connection;
10use wascc_codec::serialize;
11
12const ENV_NATS_SUBSCRIPTION: &str = "SUBSCRIPTION";
13const ENV_NATS_URL: &str = "URL";
14const ENV_NATS_CLIENT_JWT: &str = "CLIENT_JWT";
15const ENV_NATS_CLIENT_SEED: &str = "CLIENT_SEED";
16const ENV_NATS_QUEUEGROUP_NAME: &str = "QUEUEGROUP_NAME";
17const ENV_NATS_CREDSFILE: &str = "CREDSFILE";
18
19pub(crate) fn publish(
20    client: &Connection,
21    msg: BrokerMessage,
22) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
23    trace!(
24        "Publishing message on {} ({} bytes)",
25        &msg.subject,
26        &msg.body.len()
27    );
28
29    let res = if msg.reply_to.len() > 0 {
30        client.publish_with_reply_or_headers(&msg.subject, Some(&msg.reply_to), None, &msg.body)
31    } else {
32        client.publish(&msg.subject, &msg.body)
33    };
34
35    match res {
36        Ok(_) => Ok(serialize(PublishResponse { published: true })?),
37        Err(e) => {
38            error!("Failed to publish message: {}", e);
39            Ok(serialize(PublishResponse { published: false })?)
40        }
41    }
42}
43
44pub(crate) fn request(
45    client: &Connection,
46    msg: RequestArgs,
47) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
48    let reply = client.request_timeout(
49        &msg.subject,
50        &msg.body,
51        Duration::from_millis(msg.timeout as u64),
52    )?;
53    Ok(reply.data)
54}
55
56pub(crate) fn initialize_client(
57    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
58    actor: &str,
59    values: &HashMap<String, String>,
60) -> Result<Connection, Box<dyn Error + Sync + Send>> {
61    let c = get_connection(values)?;
62
63    match values.get(ENV_NATS_SUBSCRIPTION) {
64        Some(ref subs) => {
65            let subs: Vec<_> = subs
66                .split(',')
67                .map(|s| {
68                    if s.is_empty() {
69                        Err("Empty topic".into())
70                    } else {
71                        create_subscription(actor, values, dispatcher.clone(), &c, s.to_string())
72                    }
73                })
74                .collect();
75            if subs.is_empty() {
76                Err("No subscriptions created".into())
77            } else {
78                Ok(c)
79            }
80        }
81        None => Ok(c),
82    }
83}
84
85fn create_subscription(
86    actor: &str,
87    values: &HashMap<String, String>,
88    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
89    client: &Connection,
90    sub: String,
91) -> Result<(), Box<dyn Error + Sync + Send>> {
92    let actor = actor.to_string();
93    let _ = match values.get(ENV_NATS_QUEUEGROUP_NAME) {
94        Some(qgroup) => {
95            trace!("Queue subscribing '{}' to '{}'", qgroup, sub);
96            client
97                .queue_subscribe(&sub, qgroup)?
98                .with_handler(move |msg| {
99                    let dm = delivermessage_for_natsmessage(&msg);
100                    let buf = serialize(&dm).unwrap();
101
102                    let d = dispatcher.read().unwrap();
103                    if let Err(e) = d.dispatch(&actor, OP_DELIVER_MESSAGE, &buf) {
104                        error!("Dispatch failed: {}", e);
105                    }
106                    Ok(())
107                })
108        }
109        None => {
110            trace!("Subscribing to '{}'", sub);
111
112            client.subscribe(&sub)?.with_handler(move |msg| {
113                let dm = delivermessage_for_natsmessage(&msg);
114                let buf = serialize(&dm).unwrap();
115                let d = dispatcher.read().unwrap();
116                if let Err(e) = d.dispatch(&actor, OP_DELIVER_MESSAGE, &buf) {
117                    error!("Dispatch failed: {}", e);
118                }
119                Ok(())
120            })
121        }
122    };
123
124    Ok(())
125}
126
127fn delivermessage_for_natsmessage(msg: &nats::Message) -> BrokerMessage {
128    BrokerMessage {
129        subject: msg.subject.clone(),
130        reply_to: msg.reply.clone().unwrap_or_else(|| "".to_string()),
131        body: msg.data.clone(),
132    }
133}
134
135fn get_connection(
136    values: &HashMap<String, String>,
137) -> Result<nats::Connection, Box<dyn std::error::Error + Send + Sync>> {
138    let nats_url = match values.get(ENV_NATS_URL) {
139        Some(v) => v,
140        None => "nats://0.0.0.0:4222",
141    }
142    .to_string();
143    info!("NATS provider host: {}", nats_url);
144    let mut opts = if let Some(creds) = get_credsfile(values) {
145        nats::Options::with_credentials(creds)
146    } else {
147        nats::Options::new()
148    };
149    opts = opts.with_name("waSCC NATS Provider");
150    opts.connect(&nats_url)
151        .map_err(|e| format!("NATS connection failure:{}", e).into())
152}
153
154fn get_credsfile(values: &HashMap<String, String>) -> Option<String> {
155    values.get(ENV_NATS_CREDSFILE).cloned()
156}