wascc_nats/
lib.rs

1#[macro_use]
2extern crate wascc_codec as codec;
3
4mod generated;
5mod natsprov;
6
7const VERSION: &str = env!("CARGO_PKG_VERSION");
8const REVISION: u32 = 2; // Increment for each crates publish
9
10#[macro_use]
11extern crate log;
12
13use codec::capabilities::{
14    CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
15    OP_GET_CAPABILITY_DESCRIPTOR,
16};
17
18pub const OP_DELIVER_MESSAGE: &str = "DeliverMessage";
19pub const OP_PUBLISH_MESSAGE: &str = "Publish";
20pub const OP_PERFORM_REQUEST: &str = "Request";
21
22use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
23use generated::messaging::{BrokerMessage, RequestArgs};
24
25use generated::core::CapabilityConfiguration;
26use std::collections::HashMap;
27use wascc_codec::{deserialize, serialize};
28
29use std::error::Error;
30use std::sync::Arc;
31use std::sync::RwLock;
32
33#[cfg(not(feature = "static_plugin"))]
34capability_provider!(NatsProvider, NatsProvider::new);
35
36const CAPABILITY_ID: &str = "wascc:messaging";
37
38/// NATS implementation of the `wascc:messaging` specification
39#[derive(Clone)]
40pub struct NatsProvider {
41    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
42    clients: Arc<RwLock<HashMap<String, nats::Connection>>>,
43}
44
45impl Default for NatsProvider {
46    fn default() -> Self {
47        match env_logger::try_init() {
48            Ok(_) => {}
49            Err(_) => {}
50        };
51
52        NatsProvider {
53            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
54            clients: Arc::new(RwLock::new(HashMap::new())),
55        }
56    }
57}
58
59impl NatsProvider {
60    /// Creates a new NATS provider. This is either invoked manually in static plugin
61    /// mode, or invoked by the host during dynamic loading
62    pub fn new() -> NatsProvider {
63        Self::default()
64    }
65
66    fn publish_message(
67        &self,
68        actor: &str,
69        msg: BrokerMessage,
70    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
71        let lock = self.clients.read().unwrap();
72        let client = lock.get(actor).unwrap();
73
74        natsprov::publish(&client, msg)
75    }
76
77    fn request(
78        &self,
79        actor: &str,
80        msg: RequestArgs,
81    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
82        let lock = self.clients.read().unwrap();
83        let client = lock.get(actor).unwrap();
84
85        natsprov::request(&client, msg)
86    }
87
88    fn configure(
89        &self,
90        msg: CapabilityConfiguration,
91    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
92        let d = self.dispatcher.clone();
93        let c = natsprov::initialize_client(d, &msg.module, &msg.values)?;
94
95        self.clients.write().unwrap().insert(msg.module, c);
96        Ok(vec![])
97    }
98
99    fn remove_actor(
100        &self,
101        msg: CapabilityConfiguration,
102    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
103        info!("Removing NATS client for actor {}", msg.module);
104        self.clients.write().unwrap().remove(&msg.module);
105        Ok(vec![])
106    }
107
108    fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
109        Ok(serialize(
110            CapabilityDescriptor::builder()
111                .id(CAPABILITY_ID)
112                .name("Default waSCC Messaging Provider (NATS)")
113                .long_description("A NATS-based implementation of the wascc:messaging contract")
114                .version(VERSION)
115                .revision(REVISION)
116                .with_operation(
117                    OP_PUBLISH_MESSAGE,
118                    OperationDirection::ToProvider,
119                    "Sends a message on a subject with an optional reply-to",
120                )
121                .with_operation(
122                    OP_PERFORM_REQUEST,
123                    OperationDirection::ToProvider,
124                    "Sends a message on a subject expecting a reply on an auto-generated inbox",
125                )
126                .with_operation(
127                    OP_DELIVER_MESSAGE,
128                    OperationDirection::ToActor,
129                    "Delivers a message from a NATS subscription to an actor",
130                )
131                .build(),
132        )?)
133    }
134}
135
136impl CapabilityProvider for NatsProvider {
137    /// Receives a dispatcher from the host runtime
138    fn configure_dispatch(
139        &self,
140        dispatcher: Box<dyn Dispatcher>,
141    ) -> Result<(), Box<dyn Error + Sync + Send>> {
142        trace!("Dispatcher received.");
143        let mut lock = self.dispatcher.write().unwrap();
144        *lock = dispatcher;
145
146        Ok(())
147    }
148
149    /// Handles an invocation received from the host runtime
150    fn handle_call(
151        &self,
152        actor: &str,
153        op: &str,
154        msg: &[u8],
155    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
156        trace!("Received host call from {}, operation - {}", actor, op);
157
158        match op {
159            OP_PUBLISH_MESSAGE => self.publish_message(actor, deserialize(msg)?),
160            OP_PERFORM_REQUEST => self.request(actor, deserialize(msg)?),
161            OP_GET_CAPABILITY_DESCRIPTOR if actor == "system" => self.get_descriptor(),
162            OP_BIND_ACTOR if actor == "system" => self.configure(deserialize(msg)?),
163            OP_REMOVE_ACTOR if actor == "system" => self.remove_actor(deserialize(msg)?),
164            _ => Err("bad dispatch".into()),
165        }
166    }
167
168    fn stop(&self) {}
169}