1pub mod custom;
16pub mod wire;
17
18pub use wire::WIRE_FORMAT_DOC_REVISION;
19
20use async_trait::async_trait;
21use serde::{Deserialize, Serialize};
22use std::any::TypeId;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27
28#[cfg(feature = "grpc")]
29mod grpc;
30mod kafka;
31mod mqtt;
32#[cfg(feature = "nats")]
33mod nats;
34mod rabbitmq;
35#[cfg(feature = "redis")]
36mod redis;
37mod tcp;
38
39#[cfg(feature = "grpc")]
40pub use grpc::{
41 GrpcMicroserviceOptions, GrpcMicroserviceServer, GrpcTransport, GrpcTransportOptions,
42};
43pub use kafka::KafkaTransport;
44#[cfg(feature = "kafka")]
45pub use kafka::{
46 kafka_cluster_reachable, kafka_cluster_reachable_with, KafkaConnectionOptions,
47 KafkaMicroserviceOptions, KafkaMicroserviceServer, KafkaSaslOptions, KafkaTlsOptions,
48 KafkaTransportOptions,
49};
50pub use mqtt::MqttTransport;
51#[cfg(feature = "mqtt")]
52pub use mqtt::{
53 MqttMicroserviceOptions, MqttMicroserviceServer, MqttSocketOptions, MqttTlsMode,
54 MqttTransportOptions,
55};
56#[cfg(feature = "nats")]
57pub use nats::{
58 NatsMicroserviceOptions, NatsMicroserviceServer, NatsTransport, NatsTransportOptions,
59};
60pub use nestrs_events::EventBus;
61pub use rabbitmq::RabbitMqTransport;
62#[cfg(feature = "rabbitmq")]
63pub use rabbitmq::{
64 RabbitMqMicroserviceOptions, RabbitMqMicroserviceServer, RabbitMqTransportOptions,
65};
66#[cfg(feature = "redis")]
67pub use redis::{
68 RedisMicroserviceOptions, RedisMicroserviceServer, RedisTransport, RedisTransportOptions,
69};
70pub use tcp::{TcpMicroserviceOptions, TcpMicroserviceServer, TcpTransport, TcpTransportOptions};
71
72#[doc(hidden)]
73pub use linkme;
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct MessageEnvelope<T> {
77 pub pattern: String,
78 pub payload: T,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct TransportError {
83 pub message: String,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub details: Option<serde_json::Value>,
86}
87
88impl TransportError {
89 pub fn new(message: impl Into<String>) -> Self {
90 Self {
91 message: message.into(),
92 details: None,
93 }
94 }
95
96 pub fn with_details(mut self, details: serde_json::Value) -> Self {
97 self.details = Some(details);
98 self
99 }
100}
101
102#[async_trait]
104pub trait MicroCanActivate: Default + Send + Sync + 'static {
105 async fn can_activate_micro(
106 &self,
107 pattern: &str,
108 payload: &serde_json::Value,
109 ) -> Result<(), TransportError>;
110}
111
112#[async_trait]
114pub trait MicroPipeTransform: Default + Send + Sync + 'static {
115 async fn transform_micro(
116 &self,
117 pattern: &str,
118 payload: serde_json::Value,
119 ) -> Result<serde_json::Value, TransportError>;
120}
121
122#[async_trait]
124pub trait MicroIncomingInterceptor: Default + Send + Sync + 'static {
125 async fn before_handle_micro(&self, pattern: &str, payload: &serde_json::Value);
126}
127
128#[async_trait]
129pub trait Transport: Send + Sync + 'static {
130 async fn send_json(
131 &self,
132 pattern: &str,
133 payload: serde_json::Value,
134 ) -> Result<serde_json::Value, TransportError>;
135 async fn emit_json(
136 &self,
137 pattern: &str,
138 payload: serde_json::Value,
139 ) -> Result<(), TransportError>;
140}
141
142#[async_trait]
145pub trait MicroserviceHandler: Send + Sync + 'static {
146 async fn handle_message(
148 &self,
149 pattern: &str,
150 payload: serde_json::Value,
151 ) -> Option<Result<serde_json::Value, TransportError>>;
152
153 async fn handle_event(&self, pattern: &str, payload: serde_json::Value) -> bool;
155}
156
157pub type MicroserviceHandlerFactory =
158 fn(&nestrs_core::ProviderRegistry) -> Arc<dyn MicroserviceHandler>;
159
160pub type ShutdownFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
161
162#[async_trait]
163pub trait MicroserviceServer: Send + Sync + 'static {
164 async fn listen_with_shutdown(
165 self: Box<Self>,
166 shutdown: ShutdownFuture,
167 ) -> Result<(), TransportError>;
168}
169
170pub trait MicroserviceModule {
172 fn microservice_handlers() -> Vec<MicroserviceHandlerFactory>;
173}
174
175pub fn handler_factory<T>(registry: &nestrs_core::ProviderRegistry) -> Arc<dyn MicroserviceHandler>
176where
177 T: nestrs_core::Injectable + MicroserviceHandler,
178{
179 registry.get::<T>()
180}
181
182#[derive(Clone)]
184pub struct ClientProxy {
185 transport: Arc<dyn Transport>,
186}
187
188impl ClientProxy {
189 pub fn new(transport: Arc<dyn Transport>) -> Self {
190 Self { transport }
191 }
192
193 pub async fn send<TReq, TRes>(
194 &self,
195 pattern: &str,
196 payload: &TReq,
197 ) -> Result<TRes, TransportError>
198 where
199 TReq: Serialize + Send + Sync,
200 TRes: for<'de> Deserialize<'de> + Send,
201 {
202 let req = serde_json::to_value(payload)
203 .map_err(|e| TransportError::new(format!("serialize request failed: {e}")))?;
204 let res = self.transport.send_json(pattern, req).await?;
205 serde_json::from_value(res)
206 .map_err(|e| TransportError::new(format!("deserialize response failed: {e}")))
207 }
208
209 pub async fn emit<TReq>(&self, pattern: &str, payload: &TReq) -> Result<(), TransportError>
210 where
211 TReq: Serialize + Send + Sync,
212 {
213 let req = serde_json::to_value(payload)
214 .map_err(|e| TransportError::new(format!("serialize event failed: {e}")))?;
215 self.transport.emit_json(pattern, req).await
216 }
217}
218
219#[async_trait]
220impl nestrs_core::Injectable for ClientProxy {
221 fn construct(_registry: &nestrs_core::ProviderRegistry) -> Arc<Self> {
222 panic!(
223 "ClientProxy must be provided by ClientsModule::register(...) or constructed manually"
224 );
225 }
226}
227
228pub struct OnEventRegistration {
230 pub register: fn(&nestrs_core::ProviderRegistry),
231}
232
233#[linkme::distributed_slice]
234pub static ON_EVENT_REGISTRATIONS: [OnEventRegistration] = [..];
235
236pub fn wire_on_event_handlers(registry: &nestrs_core::ProviderRegistry) {
238 for reg in ON_EVENT_REGISTRATIONS.iter() {
239 (reg.register)(registry);
240 }
241}
242
243#[derive(Clone)]
244pub struct ClientConfig {
245 pub name: &'static str,
246 pub transport: Arc<dyn Transport>,
247}
248
249impl ClientConfig {
250 pub fn new(name: &'static str, transport: Arc<dyn Transport>) -> Self {
251 Self { name, transport }
252 }
253
254 pub fn tcp(name: &'static str, options: TcpTransportOptions) -> Self {
255 Self::new(name, Arc::new(TcpTransport::new(options)))
256 }
257
258 #[cfg(feature = "nats")]
259 pub fn nats(name: &'static str, options: NatsTransportOptions) -> Self {
260 Self::new(name, Arc::new(NatsTransport::new(options)))
261 }
262
263 #[cfg(feature = "redis")]
264 pub fn redis(name: &'static str, options: RedisTransportOptions) -> Self {
265 Self::new(name, Arc::new(RedisTransport::new(options)))
266 }
267
268 #[cfg(feature = "grpc")]
269 pub fn grpc(name: &'static str, options: GrpcTransportOptions) -> Self {
270 Self::new(name, Arc::new(GrpcTransport::new(options)))
271 }
272
273 #[cfg(feature = "kafka")]
274 pub fn kafka(name: &'static str, options: KafkaTransportOptions) -> Self {
275 Self::new(name, Arc::new(KafkaTransport::new(options)))
276 }
277
278 #[cfg(not(feature = "kafka"))]
279 pub fn kafka(name: &'static str) -> Self {
280 Self::new(name, Arc::new(KafkaTransport::new()))
281 }
282
283 #[cfg(feature = "mqtt")]
284 pub fn mqtt(name: &'static str, options: MqttTransportOptions) -> Self {
285 Self::new(name, Arc::new(MqttTransport::new(options)))
286 }
287
288 #[cfg(not(feature = "mqtt"))]
289 pub fn mqtt(name: &'static str) -> Self {
290 Self::new(name, Arc::new(MqttTransport::new()))
291 }
292
293 #[cfg(feature = "rabbitmq")]
294 pub fn rabbitmq(name: &'static str, options: RabbitMqTransportOptions) -> Self {
295 Self::new(name, Arc::new(RabbitMqTransport::new(options)))
296 }
297
298 #[cfg(not(feature = "rabbitmq"))]
299 pub fn rabbitmq(name: &'static str) -> Self {
300 Self::new(name, Arc::new(RabbitMqTransport::new()))
301 }
302}
303
304#[derive(Clone)]
305pub struct ClientsService {
306 clients: Arc<HashMap<&'static str, ClientProxy>>,
307}
308
309impl ClientsService {
310 pub fn get(&self, name: &str) -> Option<ClientProxy> {
311 self.clients.get(name).cloned()
312 }
313
314 pub fn expect(&self, name: &str) -> ClientProxy {
315 self.get(name).unwrap_or_else(|| {
316 let known = self.clients.keys().copied().collect::<Vec<_>>().join(", ");
317 panic!("ClientProxy `{name}` not registered. Known clients: [{known}]");
318 })
319 }
320}
321
322#[async_trait]
323impl nestrs_core::Injectable for ClientsService {
324 fn construct(_registry: &nestrs_core::ProviderRegistry) -> Arc<Self> {
325 panic!("ClientsService must be provided by ClientsModule::register(...)");
326 }
327}
328
329pub struct ClientsModule;
330
331impl ClientsModule {
332 pub fn register(configs: &[ClientConfig]) -> nestrs_core::DynamicModule {
339 if configs.is_empty() {
340 panic!("ClientsModule::register requires at least one ClientConfig");
341 }
342
343 let mut seen = std::collections::HashSet::<&'static str>::new();
344 let mut clients = HashMap::<&'static str, ClientProxy>::new();
345 for cfg in configs {
346 if !seen.insert(cfg.name) {
347 panic!(
348 "ClientsModule::register: duplicate client name `{}`",
349 cfg.name
350 );
351 }
352 clients.insert(cfg.name, ClientProxy::new(cfg.transport.clone()));
353 }
354
355 let mut registry = nestrs_core::ProviderRegistry::new();
356 registry.register::<EventBus>();
357
358 let clients_service = Arc::new(ClientsService {
359 clients: Arc::new(clients),
360 });
361 registry.override_provider::<ClientsService>(clients_service);
362
363 let mut exports = vec![TypeId::of::<ClientsService>(), TypeId::of::<EventBus>()];
364
365 if configs.len() == 1 {
366 let first = &configs[0];
367 registry.override_provider::<ClientProxy>(Arc::new(ClientProxy::new(
368 first.transport.clone(),
369 )));
370 exports.push(TypeId::of::<ClientProxy>());
371 }
372
373 nestrs_core::DynamicModule::from_parts(registry, axum::Router::new(), exports)
374 }
375}