1use crate::generated::messaging::{BrokerMessage, PublishResponse, RequestArgs};
2use std::error::Error;
3use std::sync::Arc;
4use std::sync::RwLock;
5use std::{collections::HashMap, time::Duration};
6use wascc_codec::capabilities::Dispatcher;
7
8use crate::OP_DELIVER_MESSAGE;
9use nats::Connection;
10use wascc_codec::serialize;
11
12const ENV_NATS_SUBSCRIPTION: &str = "SUBSCRIPTION";
13const ENV_NATS_URL: &str = "URL";
14const ENV_NATS_CLIENT_JWT: &str = "CLIENT_JWT";
15const ENV_NATS_CLIENT_SEED: &str = "CLIENT_SEED";
16const ENV_NATS_QUEUEGROUP_NAME: &str = "QUEUEGROUP_NAME";
17const ENV_NATS_CREDSFILE: &str = "CREDSFILE";
18
19pub(crate) fn publish(
20 client: &Connection,
21 msg: BrokerMessage,
22) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
23 trace!(
24 "Publishing message on {} ({} bytes)",
25 &msg.subject,
26 &msg.body.len()
27 );
28
29 let res = if msg.reply_to.len() > 0 {
30 client.publish_with_reply_or_headers(&msg.subject, Some(&msg.reply_to), None, &msg.body)
31 } else {
32 client.publish(&msg.subject, &msg.body)
33 };
34
35 match res {
36 Ok(_) => Ok(serialize(PublishResponse { published: true })?),
37 Err(e) => {
38 error!("Failed to publish message: {}", e);
39 Ok(serialize(PublishResponse { published: false })?)
40 }
41 }
42}
43
44pub(crate) fn request(
45 client: &Connection,
46 msg: RequestArgs,
47) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
48 let reply = client.request_timeout(
49 &msg.subject,
50 &msg.body,
51 Duration::from_millis(msg.timeout as u64),
52 )?;
53 Ok(reply.data)
54}
55
56pub(crate) fn initialize_client(
57 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
58 actor: &str,
59 values: &HashMap<String, String>,
60) -> Result<Connection, Box<dyn Error + Sync + Send>> {
61 let c = get_connection(values)?;
62
63 match values.get(ENV_NATS_SUBSCRIPTION) {
64 Some(ref subs) => {
65 let subs: Vec<_> = subs
66 .split(',')
67 .map(|s| {
68 if s.is_empty() {
69 Err("Empty topic".into())
70 } else {
71 create_subscription(actor, values, dispatcher.clone(), &c, s.to_string())
72 }
73 })
74 .collect();
75 if subs.is_empty() {
76 Err("No subscriptions created".into())
77 } else {
78 Ok(c)
79 }
80 }
81 None => Ok(c),
82 }
83}
84
85fn create_subscription(
86 actor: &str,
87 values: &HashMap<String, String>,
88 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
89 client: &Connection,
90 sub: String,
91) -> Result<(), Box<dyn Error + Sync + Send>> {
92 let actor = actor.to_string();
93 let _ = match values.get(ENV_NATS_QUEUEGROUP_NAME) {
94 Some(qgroup) => {
95 trace!("Queue subscribing '{}' to '{}'", qgroup, sub);
96 client
97 .queue_subscribe(&sub, qgroup)?
98 .with_handler(move |msg| {
99 let dm = delivermessage_for_natsmessage(&msg);
100 let buf = serialize(&dm).unwrap();
101
102 let d = dispatcher.read().unwrap();
103 if let Err(e) = d.dispatch(&actor, OP_DELIVER_MESSAGE, &buf) {
104 error!("Dispatch failed: {}", e);
105 }
106 Ok(())
107 })
108 }
109 None => {
110 trace!("Subscribing to '{}'", sub);
111
112 client.subscribe(&sub)?.with_handler(move |msg| {
113 let dm = delivermessage_for_natsmessage(&msg);
114 let buf = serialize(&dm).unwrap();
115 let d = dispatcher.read().unwrap();
116 if let Err(e) = d.dispatch(&actor, OP_DELIVER_MESSAGE, &buf) {
117 error!("Dispatch failed: {}", e);
118 }
119 Ok(())
120 })
121 }
122 };
123
124 Ok(())
125}
126
127fn delivermessage_for_natsmessage(msg: &nats::Message) -> BrokerMessage {
128 BrokerMessage {
129 subject: msg.subject.clone(),
130 reply_to: msg.reply.clone().unwrap_or_else(|| "".to_string()),
131 body: msg.data.clone(),
132 }
133}
134
135fn get_connection(
136 values: &HashMap<String, String>,
137) -> Result<nats::Connection, Box<dyn std::error::Error + Send + Sync>> {
138 let nats_url = match values.get(ENV_NATS_URL) {
139 Some(v) => v,
140 None => "nats://0.0.0.0:4222",
141 }
142 .to_string();
143 info!("NATS provider host: {}", nats_url);
144 let mut opts = if let Some(creds) = get_credsfile(values) {
145 nats::Options::with_credentials(creds)
146 } else {
147 nats::Options::new()
148 };
149 opts = opts.with_name("waSCC NATS Provider");
150 opts.connect(&nats_url)
151 .map_err(|e| format!("NATS connection failure:{}", e).into())
152}
153
154fn get_credsfile(values: &HashMap<String, String>) -> Option<String> {
155 values.get(ENV_NATS_CREDSFILE).cloned()
156}