Skip to main content

nestrs_microservices/
lib.rs

1//! Optional microservices transport primitives for nestrs (Phase 4 roadmap crate).
2//!
3//! This crate intentionally starts with a tiny, stable interface so transports (NATS/Redis/gRPC)
4//! can be added incrementally without blocking core HTTP framework progress.
5//!
6//! ## Cross-cutting on message handlers
7//!
8//! On `#[micro_routes]` impl blocks, per-handler attributes **`#[use_micro_interceptors(...)]`**,
9//! **`#[use_micro_guards(...)]`**, and **`#[use_micro_pipes(...)]`** run before your
10//! `#[message_pattern]` / `#[event_pattern]` body (order: interceptors → guards → pipes). This is
11//! the closest analogue to Nest’s microservice pipes/guards/interceptors; there is no separate
12//! exception-filter pipeline — return [`TransportError`] from handlers (the `nestrs` crate maps
13//! `HttpException` into [`TransportError`] with JSON details in generated `#[micro_routes]` code).
14
15pub mod custom;
16pub mod wire;
17
18pub use wire::WIRE_FORMAT_DOC_REVISION;
19
20use async_trait::async_trait;
21use serde::{Deserialize, Serialize};
22use std::any::TypeId;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27
28#[cfg(feature = "grpc")]
29mod grpc;
30mod kafka;
31mod mqtt;
32#[cfg(feature = "nats")]
33mod nats;
34mod rabbitmq;
35#[cfg(feature = "redis")]
36mod redis;
37mod tcp;
38
39#[cfg(feature = "grpc")]
40pub use grpc::{
41    GrpcMicroserviceOptions, GrpcMicroserviceServer, GrpcTransport, GrpcTransportOptions,
42};
43pub use kafka::KafkaTransport;
44#[cfg(feature = "kafka")]
45pub use kafka::{
46    kafka_cluster_reachable, kafka_cluster_reachable_with, KafkaConnectionOptions,
47    KafkaMicroserviceOptions, KafkaMicroserviceServer, KafkaSaslOptions, KafkaTlsOptions,
48    KafkaTransportOptions,
49};
50pub use mqtt::MqttTransport;
51#[cfg(feature = "mqtt")]
52pub use mqtt::{
53    MqttMicroserviceOptions, MqttMicroserviceServer, MqttSocketOptions, MqttTlsMode,
54    MqttTransportOptions,
55};
56#[cfg(feature = "nats")]
57pub use nats::{
58    NatsMicroserviceOptions, NatsMicroserviceServer, NatsTransport, NatsTransportOptions,
59};
60pub use nestrs_events::EventBus;
61pub use rabbitmq::RabbitMqTransport;
62#[cfg(feature = "rabbitmq")]
63pub use rabbitmq::{
64    RabbitMqMicroserviceOptions, RabbitMqMicroserviceServer, RabbitMqTransportOptions,
65};
66#[cfg(feature = "redis")]
67pub use redis::{
68    RedisMicroserviceOptions, RedisMicroserviceServer, RedisTransport, RedisTransportOptions,
69};
70pub use tcp::{TcpMicroserviceOptions, TcpMicroserviceServer, TcpTransport, TcpTransportOptions};
71
72#[doc(hidden)]
73pub use linkme;
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct MessageEnvelope<T> {
77    pub pattern: String,
78    pub payload: T,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct TransportError {
83    pub message: String,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub details: Option<serde_json::Value>,
86}
87
88impl TransportError {
89    pub fn new(message: impl Into<String>) -> Self {
90        Self {
91            message: message.into(),
92            details: None,
93        }
94    }
95
96    pub fn with_details(mut self, details: serde_json::Value) -> Self {
97        self.details = Some(details);
98        self
99    }
100}
101
102/// Authorization / policy hook before a microservice handler runs (Nest microservice guard analogue).
103#[async_trait]
104pub trait MicroCanActivate: Default + Send + Sync + 'static {
105    async fn can_activate_micro(
106        &self,
107        pattern: &str,
108        payload: &serde_json::Value,
109    ) -> Result<(), TransportError>;
110}
111
112/// Transform inbound JSON after guards (Nest microservice pipe analogue).
113#[async_trait]
114pub trait MicroPipeTransform: Default + Send + Sync + 'static {
115    async fn transform_micro(
116        &self,
117        pattern: &str,
118        payload: serde_json::Value,
119    ) -> Result<serde_json::Value, TransportError>;
120}
121
122/// Observe inbound patterns (logging / metrics); does not fail the pipeline.
123#[async_trait]
124pub trait MicroIncomingInterceptor: Default + Send + Sync + 'static {
125    async fn before_handle_micro(&self, pattern: &str, payload: &serde_json::Value);
126}
127
128#[async_trait]
129pub trait Transport: Send + Sync + 'static {
130    async fn send_json(
131        &self,
132        pattern: &str,
133        payload: serde_json::Value,
134    ) -> Result<serde_json::Value, TransportError>;
135    async fn emit_json(
136        &self,
137        pattern: &str,
138        payload: serde_json::Value,
139    ) -> Result<(), TransportError>;
140}
141
142/// A Nest-style microservice handler registry entrypoint (controller/service methods annotated with
143/// `#[message_pattern]` / `#[event_pattern]` via the `#[micro_routes]` impl macro).
144#[async_trait]
145pub trait MicroserviceHandler: Send + Sync + 'static {
146    /// Handle a request/reply message pattern. Return `None` when the handler doesn't match `pattern`.
147    async fn handle_message(
148        &self,
149        pattern: &str,
150        payload: serde_json::Value,
151    ) -> Option<Result<serde_json::Value, TransportError>>;
152
153    /// Handle a fire-and-forget event pattern. Return `true` when the handler matched `pattern`.
154    async fn handle_event(&self, pattern: &str, payload: serde_json::Value) -> bool;
155}
156
157pub type MicroserviceHandlerFactory =
158    fn(&nestrs_core::ProviderRegistry) -> Arc<dyn MicroserviceHandler>;
159
160pub type ShutdownFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
161
162#[async_trait]
163pub trait MicroserviceServer: Send + Sync + 'static {
164    async fn listen_with_shutdown(
165        self: Box<Self>,
166        shutdown: ShutdownFuture,
167    ) -> Result<(), TransportError>;
168}
169
170/// Implemented by `#[module(microservices = [...])]` to declare which providers handle patterns.
171pub trait MicroserviceModule {
172    fn microservice_handlers() -> Vec<MicroserviceHandlerFactory>;
173}
174
175pub fn handler_factory<T>(registry: &nestrs_core::ProviderRegistry) -> Arc<dyn MicroserviceHandler>
176where
177    T: nestrs_core::Injectable + MicroserviceHandler,
178{
179    registry.get::<T>()
180}
181
182/// Nest-like client proxy wrapper over a configured [`Transport`].
183#[derive(Clone)]
184pub struct ClientProxy {
185    transport: Arc<dyn Transport>,
186}
187
188impl ClientProxy {
189    pub fn new(transport: Arc<dyn Transport>) -> Self {
190        Self { transport }
191    }
192
193    pub async fn send<TReq, TRes>(
194        &self,
195        pattern: &str,
196        payload: &TReq,
197    ) -> Result<TRes, TransportError>
198    where
199        TReq: Serialize + Send + Sync,
200        TRes: for<'de> Deserialize<'de> + Send,
201    {
202        let req = serde_json::to_value(payload)
203            .map_err(|e| TransportError::new(format!("serialize request failed: {e}")))?;
204        let res = self.transport.send_json(pattern, req).await?;
205        serde_json::from_value(res)
206            .map_err(|e| TransportError::new(format!("deserialize response failed: {e}")))
207    }
208
209    pub async fn emit<TReq>(&self, pattern: &str, payload: &TReq) -> Result<(), TransportError>
210    where
211        TReq: Serialize + Send + Sync,
212    {
213        let req = serde_json::to_value(payload)
214            .map_err(|e| TransportError::new(format!("serialize event failed: {e}")))?;
215        self.transport.emit_json(pattern, req).await
216    }
217}
218
219#[async_trait]
220impl nestrs_core::Injectable for ClientProxy {
221    fn construct(_registry: &nestrs_core::ProviderRegistry) -> Arc<Self> {
222        panic!(
223            "ClientProxy must be provided by ClientsModule::register(...) or constructed manually"
224        );
225    }
226}
227
228/// Auto-wiring registration entry for `#[event_routes]` + `#[on_event("...")]` handlers.
229pub struct OnEventRegistration {
230    pub register: fn(&nestrs_core::ProviderRegistry),
231}
232
233#[linkme::distributed_slice]
234pub static ON_EVENT_REGISTRATIONS: [OnEventRegistration] = [..];
235
236/// Subscribe all `#[on_event]` handlers registered via `#[event_routes]`.
237pub fn wire_on_event_handlers(registry: &nestrs_core::ProviderRegistry) {
238    for reg in ON_EVENT_REGISTRATIONS.iter() {
239        (reg.register)(registry);
240    }
241}
242
243#[derive(Clone)]
244pub struct ClientConfig {
245    pub name: &'static str,
246    pub transport: Arc<dyn Transport>,
247}
248
249impl ClientConfig {
250    pub fn new(name: &'static str, transport: Arc<dyn Transport>) -> Self {
251        Self { name, transport }
252    }
253
254    pub fn tcp(name: &'static str, options: TcpTransportOptions) -> Self {
255        Self::new(name, Arc::new(TcpTransport::new(options)))
256    }
257
258    #[cfg(feature = "nats")]
259    pub fn nats(name: &'static str, options: NatsTransportOptions) -> Self {
260        Self::new(name, Arc::new(NatsTransport::new(options)))
261    }
262
263    #[cfg(feature = "redis")]
264    pub fn redis(name: &'static str, options: RedisTransportOptions) -> Self {
265        Self::new(name, Arc::new(RedisTransport::new(options)))
266    }
267
268    #[cfg(feature = "grpc")]
269    pub fn grpc(name: &'static str, options: GrpcTransportOptions) -> Self {
270        Self::new(name, Arc::new(GrpcTransport::new(options)))
271    }
272
273    #[cfg(feature = "kafka")]
274    pub fn kafka(name: &'static str, options: KafkaTransportOptions) -> Self {
275        Self::new(name, Arc::new(KafkaTransport::new(options)))
276    }
277
278    #[cfg(not(feature = "kafka"))]
279    pub fn kafka(name: &'static str) -> Self {
280        Self::new(name, Arc::new(KafkaTransport::new()))
281    }
282
283    #[cfg(feature = "mqtt")]
284    pub fn mqtt(name: &'static str, options: MqttTransportOptions) -> Self {
285        Self::new(name, Arc::new(MqttTransport::new(options)))
286    }
287
288    #[cfg(not(feature = "mqtt"))]
289    pub fn mqtt(name: &'static str) -> Self {
290        Self::new(name, Arc::new(MqttTransport::new()))
291    }
292
293    #[cfg(feature = "rabbitmq")]
294    pub fn rabbitmq(name: &'static str, options: RabbitMqTransportOptions) -> Self {
295        Self::new(name, Arc::new(RabbitMqTransport::new(options)))
296    }
297
298    #[cfg(not(feature = "rabbitmq"))]
299    pub fn rabbitmq(name: &'static str) -> Self {
300        Self::new(name, Arc::new(RabbitMqTransport::new()))
301    }
302}
303
304#[derive(Clone)]
305pub struct ClientsService {
306    clients: Arc<HashMap<&'static str, ClientProxy>>,
307}
308
309impl ClientsService {
310    pub fn get(&self, name: &str) -> Option<ClientProxy> {
311        self.clients.get(name).cloned()
312    }
313
314    pub fn expect(&self, name: &str) -> ClientProxy {
315        self.get(name).unwrap_or_else(|| {
316            let known = self.clients.keys().copied().collect::<Vec<_>>().join(", ");
317            panic!("ClientProxy `{name}` not registered. Known clients: [{known}]");
318        })
319    }
320}
321
322#[async_trait]
323impl nestrs_core::Injectable for ClientsService {
324    fn construct(_registry: &nestrs_core::ProviderRegistry) -> Arc<Self> {
325        panic!("ClientsService must be provided by ClientsModule::register(...)");
326    }
327}
328
329pub struct ClientsModule;
330
331impl ClientsModule {
332    /// Register named microservice clients into a runtime [`nestrs_core::DynamicModule`].
333    ///
334    /// Exports:
335    /// - [`ClientsService`]
336    /// - [`EventBus`]
337    /// - [`ClientProxy`] **only** when exactly one client config is provided (default client).
338    pub fn register(configs: &[ClientConfig]) -> nestrs_core::DynamicModule {
339        if configs.is_empty() {
340            panic!("ClientsModule::register requires at least one ClientConfig");
341        }
342
343        let mut seen = std::collections::HashSet::<&'static str>::new();
344        let mut clients = HashMap::<&'static str, ClientProxy>::new();
345        for cfg in configs {
346            if !seen.insert(cfg.name) {
347                panic!(
348                    "ClientsModule::register: duplicate client name `{}`",
349                    cfg.name
350                );
351            }
352            clients.insert(cfg.name, ClientProxy::new(cfg.transport.clone()));
353        }
354
355        let mut registry = nestrs_core::ProviderRegistry::new();
356        registry.register::<EventBus>();
357
358        let clients_service = Arc::new(ClientsService {
359            clients: Arc::new(clients),
360        });
361        registry.override_provider::<ClientsService>(clients_service);
362
363        let mut exports = vec![TypeId::of::<ClientsService>(), TypeId::of::<EventBus>()];
364
365        if configs.len() == 1 {
366            let first = &configs[0];
367            registry.override_provider::<ClientProxy>(Arc::new(ClientProxy::new(
368                first.transport.clone(),
369            )));
370            exports.push(TypeId::of::<ClientProxy>());
371        }
372
373        nestrs_core::DynamicModule::from_parts(registry, axum::Router::new(), exports)
374    }
375}