1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
/*! Inspired and compatible with [Moleculer JS](https://github.com/moleculerjs/moleculer) You can currently do all the basics of `emit`, `broadcast` and `call`. However it only works with the `NATS` transporter and `JSON` serializer/deserializer. ## Getting Started Simple example showing how to receive an event, and responding to a request, for more check the [examples folder](https://github.com/primcloud/moleculer-rs/tree/master/examples). ```rust use std::error::Error; use serde::Deserialize; use moleculer::{ config::{ConfigBuilder, Transporter}, service::{Context, Event, EventBuilder, Service}, ServiceBroker, }; #[tokio::main] async fn main() -> eyre::Result<()> { env_logger::init(); color_eyre::install()?; // build config let config = ConfigBuilder::default().transporter("nats://localhost:4222") .build(); // create the first event let print_hi = EventBuilder::new("printHi").add_callback(print_hi).build(); // create the second event let print_name = EventBuilder::new("printName") .add_callback(print_name) .build(); // create math action let math_action = ActionBuilder::new("mathAdd").add_callback(math_add).build(); // create a service with events and actions let greeter_service = Service::new("rustGreeter") .add_event(print_hi) .add_event(print_name) .add_action(math_action); // create service broker with service let service_broker = ServiceBroker::new(config).add_service(greeter_service); // start the service broker service_broker.start().await; Ok(()) } // callback for first event, will be called whenever "printHi" event is received fn print_hi(_ctx: Context<Event>) -> Result<(), Box<dyn Error>> { println!("Hello from Rust"); Ok(()) } // callback for second event, will be called whenever "printName" event is received fn print_name(ctx: Context<Event>) -> Result<(), Box<dyn Error>> { let msg: PrintNameMessage = serde_json::from_value(ctx.params)?; println!("Hello to: {} from Rust", msg.name); Ok(()) } // callback for math action fn math_add(ctx: Context<Action>) -> Result<(), Box<dyn Error>> { // get message decode using serde let msg: ActionMessage = serde_json::from_value(ctx.params.clone())?; let answer = msg.a + msg.b; // serialize reply using serde and send reply let _ = ctx.reply(answer.into()); Ok(()) } #[derive(Deserialize)] struct PrintNameMessage { name: String, } #[derive(Deserialize)] struct ActionMessage { a: i32, b: i32, } ``` */ mod data_structures; mod util; pub mod config; pub mod service; mod broker; mod channels; mod nats; use act_zero::runtimes::tokio::spawn_actor; use act_zero::*; use config::Config; use serde_json::Value; use service::Service; use thiserror::Error; use tokio::sync::oneshot::{self, error}; #[doc(hidden)] #[derive(Error, Debug)] pub enum Error { #[error("Timeout reached waiting for response")] ReceiveError(#[from] error::RecvError), #[error("Unknown error")] UnknownError, } #[allow(dead_code)] pub(crate) mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); } /// The struct used to interact with moleculer. /// Use [`emit()`][Self::emit()], [`broadcast()`][Self::broadcast()] and [`call()`][Self::call()] functions. /// ```rust /// // emit an event /// broker.emit("printHi", json!{}) /// /// // broadcast an event /// broker.broadcast("printHi", json!{}) /// /// // call an action /// let result = broker.call("math.add", json!{"a": 1, "b": c}).await?; /// ``` #[derive(Clone)] pub struct ServiceBroker { addr: Addr<broker::ServiceBroker>, } /// An alias to [service::Context\<service::Event>][service::Context]. /// In all contexts [`emit()`][service::Context::emit()], [`broadcast()`][service::Context::broadcast()] /// and [`call()`][service::Context::call()] are available. pub type EventContext = service::Context<service::Event>; /// An alias to [service::Context\<service::Action>][service::Context]. /// Send a response to a request using [`reply()`][service::Context::reply()]. pub type ActionContext = service::Context<service::Action>; impl ServiceBroker { /// Create new service broker, takes [Config] struct. pub fn new(config: Config) -> ServiceBroker { ServiceBroker { addr: spawn_actor(broker::ServiceBroker::new(config)), } } /// Add a service to the service broker. pub fn add_service(self, service: Service) -> Self { send!(self.addr.add_service(service)); self } /// Add all the services to the service broker at once. /// Takes a vector of services and replaces any services the broker already had. pub fn add_services(self, services: Vec<Service>) -> Self { send!(self.addr.add_services(services)); self } /// Starts the service, this will run forever until your application exits. pub async fn start(self) { self.addr.termination().await } /// Request/Response style call /// Call an action directly with params serialized into /// [serde_json::Value](https://docs.rs/serde_json/1.0.64/serde_json/value/index.html) and `await` on the result pub async fn call<S: Into<String>>(self, action: S, params: Value) -> Result<Value, Error> { let (tx, rx) = oneshot::channel(); send!(self.addr.call(action.into(), params, tx)); let response_value = rx.await?; Ok(response_value) } /// Emits a balanced event to one of the nodes. pub fn emit<S: Into<String>>(&self, event: S, params: Value) { send!(self.addr.emit(event.into(), params)) } /// Emits an event to all the nodes that can handle the event. pub fn broadcast<S: Into<String>>(&self, event: S, params: Value) { send!(self.addr.broadcast(event.into(), params)) } } #[doc(hidden)] impl From<Addr<broker::ServiceBroker>> for ServiceBroker { fn from(addr: Addr<broker::ServiceBroker>) -> Self { Self { addr } } }