1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
/*!
Inspired and compatible with [Moleculer JS](https://github.com/moleculerjs/moleculer)

You can currently do all the basics of `emit`, `broadcast` and `call`.

However it only works with the `NATS` transporter and `JSON` serializer/deserializer.

## Getting Started

Simple example showing how to receive an event, and responding to a request, for more check the
[examples folder](https://github.com/primcloud/moleculer-rs/tree/master/examples).

```rust
use std::error::Error;
use serde::Deserialize;

use moleculer::{
    config::{ConfigBuilder, Transporter},
    service::{Context, Event, EventBuilder, Service},
    ServiceBroker,
};

#[tokio::main]
async fn main() -> eyre::Result<()> {
    env_logger::init();
    color_eyre::install()?;

    // build config
    let config = ConfigBuilder::default().transporter("nats://localhost:4222")
    .build();

    // create the first event
    let print_hi = EventBuilder::new("printHi").add_callback(print_hi).build();

    // create the second event
    let print_name = EventBuilder::new("printName")
        .add_callback(print_name)
        .build();

    // create math action
    let math_action = ActionBuilder::new("mathAdd").add_callback(math_add).build();

    // create a service with events and actions
    let greeter_service = Service::new("rustGreeter")
        .add_event(print_hi)
        .add_event(print_name)
        .add_action(math_action);

    // create service broker with service
    let service_broker = ServiceBroker::new(config).add_service(greeter_service);

    // start the service broker
    service_broker.start().await;

    Ok(())
}


// callback for first event, will be called whenever "printHi" event is received
fn print_hi(_ctx: Context<Event>) -> Result<(), Box<dyn Error>> {
    println!("Hello from Rust");
    Ok(())
}

// callback for second event, will be called whenever "printName" event is received
fn print_name(ctx: Context<Event>) -> Result<(), Box<dyn Error>> {
    let msg: PrintNameMessage = serde_json::from_value(ctx.params)?;

    println!("Hello to: {} from Rust", msg.name);

    Ok(())
}

// callback for math action
fn math_add(ctx: Context<Action>) -> Result<(), Box<dyn Error>> {
    // get message decode using serde
    let msg: ActionMessage = serde_json::from_value(ctx.params.clone())?;
    let answer = msg.a + msg.b;

    // serialize reply using serde and send reply
    let _ = ctx.reply(answer.into());

    Ok(())
}

#[derive(Deserialize)]
struct PrintNameMessage {
    name: String,
}

#[derive(Deserialize)]
struct ActionMessage {
    a: i32,
    b: i32,
}
```
*/

mod data_structures;
mod util;

pub mod config;
pub mod service;

mod broker;
mod channels;
mod nats;

use act_zero::runtimes::tokio::spawn_actor;
use act_zero::*;
use config::Config;
use serde_json::Value;
use service::Service;
use thiserror::Error;
use tokio::sync::oneshot::{self, error};

#[doc(hidden)]
#[derive(Error, Debug)]
pub enum Error {
    #[error("Timeout reached waiting for response")]
    ReceiveError(#[from] error::RecvError),

    #[error("Unknown error")]
    UnknownError,
}

#[allow(dead_code)]
pub(crate) mod built_info {
    include!(concat!(env!("OUT_DIR"), "/built.rs"));
}

/// The struct used to interact with moleculer.
/// Use [`emit()`][Self::emit()], [`broadcast()`][Self::broadcast()] and [`call()`][Self::call()] functions.
/// ```rust
/// // emit an event
/// broker.emit("printHi", json!{})
///
/// // broadcast an event
/// broker.broadcast("printHi", json!{})
///
/// // call an action
/// let result = broker.call("math.add", json!{"a": 1, "b": c}).await?;
/// ```
#[derive(Clone)]
pub struct ServiceBroker {
    addr: Addr<broker::ServiceBroker>,
}

/// An alias to [service::Context\<service::Event>][service::Context].
/// In all contexts [`emit()`][service::Context::emit()], [`broadcast()`][service::Context::broadcast()]
/// and [`call()`][service::Context::call()] are available.
pub type EventContext = service::Context<service::Event>;

/// An alias to [service::Context\<service::Action>][service::Context].
/// Send a response to a request using [`reply()`][service::Context::reply()].
pub type ActionContext = service::Context<service::Action>;

impl ServiceBroker {
    /// Create new service broker, takes [Config] struct.
    pub fn new(config: Config) -> ServiceBroker {
        ServiceBroker {
            addr: spawn_actor(broker::ServiceBroker::new(config)),
        }
    }

    /// Add a service to the service broker.
    pub fn add_service(self, service: Service) -> Self {
        send!(self.addr.add_service(service));
        self
    }

    /// Add all the services to the service broker at once.
    /// Takes a vector of services and replaces any services the broker already had.
    pub fn add_services(self, services: Vec<Service>) -> Self {
        send!(self.addr.add_services(services));
        self
    }

    /// Starts the service, this will run forever until your application exits.
    pub async fn start(self) {
        self.addr.termination().await
    }

    /// Request/Response style call
    /// Call an action directly with params serialized into
    /// [serde_json::Value](https://docs.rs/serde_json/1.0.64/serde_json/value/index.html) and `await` on the result
    pub async fn call<S: Into<String>>(self, action: S, params: Value) -> Result<Value, Error> {
        let (tx, rx) = oneshot::channel();

        send!(self.addr.call(action.into(), params, tx));
        let response_value = rx.await?;

        Ok(response_value)
    }

    /// Emits a balanced event to one of the nodes.
    pub fn emit<S: Into<String>>(&self, event: S, params: Value) {
        send!(self.addr.emit(event.into(), params))
    }

    /// Emits an event to all the nodes that can handle the event.
    pub fn broadcast<S: Into<String>>(&self, event: S, params: Value) {
        send!(self.addr.broadcast(event.into(), params))
    }
}

#[doc(hidden)]
impl From<Addr<broker::ServiceBroker>> for ServiceBroker {
    fn from(addr: Addr<broker::ServiceBroker>) -> Self {
        Self { addr }
    }
}