1#[macro_use]
2extern crate wascc_codec as codec;
3
4mod generated;
5mod natsprov;
6
7const VERSION: &str = env!("CARGO_PKG_VERSION");
8const REVISION: u32 = 2; #[macro_use]
11extern crate log;
12
13use codec::capabilities::{
14 CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
15 OP_GET_CAPABILITY_DESCRIPTOR,
16};
17
18pub const OP_DELIVER_MESSAGE: &str = "DeliverMessage";
19pub const OP_PUBLISH_MESSAGE: &str = "Publish";
20pub const OP_PERFORM_REQUEST: &str = "Request";
21
22use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
23use generated::messaging::{BrokerMessage, RequestArgs};
24
25use generated::core::CapabilityConfiguration;
26use std::collections::HashMap;
27use wascc_codec::{deserialize, serialize};
28
29use std::error::Error;
30use std::sync::Arc;
31use std::sync::RwLock;
32
33#[cfg(not(feature = "static_plugin"))]
34capability_provider!(NatsProvider, NatsProvider::new);
35
36const CAPABILITY_ID: &str = "wascc:messaging";
37
38#[derive(Clone)]
40pub struct NatsProvider {
41 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
42 clients: Arc<RwLock<HashMap<String, nats::Connection>>>,
43}
44
45impl Default for NatsProvider {
46 fn default() -> Self {
47 match env_logger::try_init() {
48 Ok(_) => {}
49 Err(_) => {}
50 };
51
52 NatsProvider {
53 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
54 clients: Arc::new(RwLock::new(HashMap::new())),
55 }
56 }
57}
58
59impl NatsProvider {
60 pub fn new() -> NatsProvider {
63 Self::default()
64 }
65
66 fn publish_message(
67 &self,
68 actor: &str,
69 msg: BrokerMessage,
70 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
71 let lock = self.clients.read().unwrap();
72 let client = lock.get(actor).unwrap();
73
74 natsprov::publish(&client, msg)
75 }
76
77 fn request(
78 &self,
79 actor: &str,
80 msg: RequestArgs,
81 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
82 let lock = self.clients.read().unwrap();
83 let client = lock.get(actor).unwrap();
84
85 natsprov::request(&client, msg)
86 }
87
88 fn configure(
89 &self,
90 msg: CapabilityConfiguration,
91 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
92 let d = self.dispatcher.clone();
93 let c = natsprov::initialize_client(d, &msg.module, &msg.values)?;
94
95 self.clients.write().unwrap().insert(msg.module, c);
96 Ok(vec![])
97 }
98
99 fn remove_actor(
100 &self,
101 msg: CapabilityConfiguration,
102 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
103 info!("Removing NATS client for actor {}", msg.module);
104 self.clients.write().unwrap().remove(&msg.module);
105 Ok(vec![])
106 }
107
108 fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
109 Ok(serialize(
110 CapabilityDescriptor::builder()
111 .id(CAPABILITY_ID)
112 .name("Default waSCC Messaging Provider (NATS)")
113 .long_description("A NATS-based implementation of the wascc:messaging contract")
114 .version(VERSION)
115 .revision(REVISION)
116 .with_operation(
117 OP_PUBLISH_MESSAGE,
118 OperationDirection::ToProvider,
119 "Sends a message on a subject with an optional reply-to",
120 )
121 .with_operation(
122 OP_PERFORM_REQUEST,
123 OperationDirection::ToProvider,
124 "Sends a message on a subject expecting a reply on an auto-generated inbox",
125 )
126 .with_operation(
127 OP_DELIVER_MESSAGE,
128 OperationDirection::ToActor,
129 "Delivers a message from a NATS subscription to an actor",
130 )
131 .build(),
132 )?)
133 }
134}
135
136impl CapabilityProvider for NatsProvider {
137 fn configure_dispatch(
139 &self,
140 dispatcher: Box<dyn Dispatcher>,
141 ) -> Result<(), Box<dyn Error + Sync + Send>> {
142 trace!("Dispatcher received.");
143 let mut lock = self.dispatcher.write().unwrap();
144 *lock = dispatcher;
145
146 Ok(())
147 }
148
149 fn handle_call(
151 &self,
152 actor: &str,
153 op: &str,
154 msg: &[u8],
155 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
156 trace!("Received host call from {}, operation - {}", actor, op);
157
158 match op {
159 OP_PUBLISH_MESSAGE => self.publish_message(actor, deserialize(msg)?),
160 OP_PERFORM_REQUEST => self.request(actor, deserialize(msg)?),
161 OP_GET_CAPABILITY_DESCRIPTOR if actor == "system" => self.get_descriptor(),
162 OP_BIND_ACTOR if actor == "system" => self.configure(deserialize(msg)?),
163 OP_REMOVE_ACTOR if actor == "system" => self.remove_actor(deserialize(msg)?),
164 _ => Err("bad dispatch".into()),
165 }
166 }
167
168 fn stop(&self) {}
169}