Skip to main content

nestforge_microservices/
lib.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use nestforge_core::{AuthIdentity, Container, RequestId};
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11use serde_json::Value;
12
13type MessageFuture = Pin<Box<dyn Future<Output = Result<Value>> + Send>>;
14type EventFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16/**
17 * MicroserviceClient Trait
18 *
19 * Defines the interface for sending messages and emitting events
20 * to microservices. Implement this trait to create custom transport
21 * adapters (e.g., TCP, Redis, NATS).
22 *
23 * # Methods
24 * - `send`: Sends a request-response style message and waits for a response
25 * - `emit`: Sends an event (fire-and-forget) without waiting for a response
26 *
27 * # Type Parameters
28 * - `Payload`: The message payload type (must be serializable)
29 * - `Response`: The response type (must be deserializable)
30 */
31pub trait MicroserviceClient: Send + Sync + 'static {
32    /**
33     * Sends a message and waits for a response.
34     *
35     * # Arguments
36     * - `pattern`: The microservice pattern/endpoint to call
37     * - `payload`: The message payload to send
38     *
39     * # Returns
40     * A future that resolves to the response from the microservice.
41     */
42    fn send<Payload, Response>(
43        &self,
44        pattern: impl Into<String>,
45        payload: Payload,
46    ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
47    where
48        Payload: Serialize + Send + 'static,
49        Response: DeserializeOwned + Send + 'static;
50
51    /**
52     * Emits an event without waiting for a response.
53     *
54     * # Arguments
55     * - `pattern`: The event pattern to emit
56     * - `payload`: The event payload to send
57     *
58     * # Returns
59     * A future that completes when the event is sent.
60     */
61    fn emit<Payload>(
62        &self,
63        pattern: impl Into<String>,
64        payload: Payload,
65    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
66    where
67        Payload: Serialize + Send + 'static;
68}
69
70/**
71 * TransportMetadata
72 *
73 * Key-value metadata that accompanies microservice messages and events.
74 * Used for routing, correlation, and custom headers.
75 */
76#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
77pub struct TransportMetadata {
78    pub values: BTreeMap<String, String>,
79}
80
81impl TransportMetadata {
82    /**
83     * Creates a new empty TransportMetadata.
84     */
85    pub fn new() -> Self {
86        Self::default()
87    }
88
89    /**
90     * Adds a key-value pair to the metadata.
91     *
92     * # Arguments
93     * - `key`: The metadata key
94     * - `value`: The metadata value
95     *
96     * Returns the modified metadata for chaining.
97     */
98    pub fn insert(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
99        self.values.insert(key.into(), value.into());
100        self
101    }
102}
103
104/**
105 * MessageEnvelope
106 *
107 * A request-response style message sent between microservices.
108 * Contains the pattern, payload, and optional metadata.
109 */
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct MessageEnvelope {
112    pub pattern: String,
113    pub payload: Value,
114    #[serde(default)]
115    pub metadata: TransportMetadata,
116}
117
118impl MessageEnvelope {
119    /**
120     * Creates a new message envelope.
121     *
122     * # Arguments
123     * - `pattern`: The microservice pattern to target
124     * - `payload`: The message payload (must be serializable)
125     */
126    pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
127        Ok(Self {
128            pattern: pattern.into(),
129            payload: serde_json::to_value(payload)
130                .context("Failed to serialize message payload")?,
131            metadata: TransportMetadata::default(),
132        })
133    }
134
135    /**
136     * Attaches metadata to the message envelope.
137     */
138    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
139        self.metadata = metadata;
140        self
141    }
142}
143
144/**
145 * EventEnvelope
146 *
147 * A fire-and-forget event sent to microservices.
148 * Events do not expect a response.
149 */
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct EventEnvelope {
152    pub pattern: String,
153    pub payload: Value,
154    #[serde(default)]
155    pub metadata: TransportMetadata,
156}
157
158impl EventEnvelope {
159    /**
160     * Creates a new event envelope.
161     *
162     * # Arguments
163     * - `pattern`: The event pattern to emit
164     * - `payload`: The event payload (must be serializable)
165     */
166    pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
167        Ok(Self {
168            pattern: pattern.into(),
169            payload: serde_json::to_value(payload).context("Failed to serialize event payload")?,
170            metadata: TransportMetadata::default(),
171        })
172    }
173
174    /**
175     * Attaches metadata to the event envelope.
176     */
177    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
178        self.metadata = metadata;
179        self
180    }
181}
182
183/**
184 * MicroserviceContext
185 *
186 * The execution context for handling microservice messages and events.
187 * Provides access to the DI container, transport info, and authentication.
188 */
189#[derive(Clone)]
190pub struct MicroserviceContext {
191    container: Container,
192    transport: Arc<str>,
193    pattern: Arc<str>,
194    metadata: TransportMetadata,
195    request_id: Option<RequestId>,
196    auth_identity: Option<AuthIdentity>,
197}
198
199impl MicroserviceContext {
200    /**
201     * Creates a new microservice context.
202     *
203     * # Arguments
204     * - `container`: The DI container for resolving services
205     * - `transport`: The transport type (e.g., "tcp", "redis", "websocket")
206     * - `pattern`: The message/event pattern
207     * - `metadata`: Additional transport metadata
208     */
209    pub fn new(
210        container: Container,
211        transport: impl Into<String>,
212        pattern: impl Into<String>,
213        metadata: TransportMetadata,
214    ) -> Self {
215        let request_id = container
216            .resolve::<RequestId>()
217            .ok()
218            .map(|value| (*value).clone());
219        let auth_identity = container
220            .resolve::<AuthIdentity>()
221            .ok()
222            .map(|value| (*value).clone());
223
224        Self {
225            container,
226            transport: Arc::<str>::from(transport.into()),
227            pattern: Arc::<str>::from(pattern.into()),
228            metadata,
229            request_id,
230            auth_identity,
231        }
232    }
233
234    /**
235     * Returns a reference to the DI container.
236     */
237    pub fn container(&self) -> &Container {
238        &self.container
239    }
240
241    /**
242     * Returns the transport type.
243     */
244    pub fn transport(&self) -> &str {
245        &self.transport
246    }
247
248    /**
249     * Returns the message/event pattern.
250     */
251    pub fn pattern(&self) -> &str {
252        &self.pattern
253    }
254
255    /**
256     * Returns a reference to the transport metadata.
257     */
258    pub fn metadata(&self) -> &TransportMetadata {
259        &self.metadata
260    }
261
262    /**
263     * Returns the request ID if available.
264     */
265    pub fn request_id(&self) -> Option<&RequestId> {
266        self.request_id.as_ref()
267    }
268
269    pub fn auth_identity(&self) -> Option<&AuthIdentity> {
270        self.auth_identity.as_ref()
271    }
272
273    pub fn resolve<T>(&self) -> Result<Arc<T>>
274    where
275        T: Send + Sync + 'static,
276    {
277        self.container.resolve::<T>().map_err(anyhow::Error::new)
278    }
279}
280
281trait MessageHandler: Send + Sync + 'static {
282    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture;
283}
284
285trait EventHandler: Send + Sync + 'static {
286    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture;
287}
288
289struct TypedMessageHandler<Payload, Response, Handler, Fut> {
290    handler: Handler,
291    _marker: std::marker::PhantomData<fn(Payload, Response, Fut)>,
292}
293
294impl<Payload, Response, Handler, Fut> MessageHandler
295    for TypedMessageHandler<Payload, Response, Handler, Fut>
296where
297    Payload: DeserializeOwned + Send + 'static,
298    Response: Serialize + Send + 'static,
299    Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
300    Fut: Future<Output = Result<Response>> + Send + 'static,
301{
302    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture {
303        let payload = serde_json::from_value::<Payload>(payload)
304            .context("Failed to deserialize message payload");
305
306        match payload {
307            Ok(payload) => {
308                let future = (self.handler)(payload, ctx);
309                Box::pin(async move {
310                    let response = future.await?;
311                    serde_json::to_value(response).context("Failed to serialize message response")
312                })
313            }
314            Err(err) => Box::pin(async move { Err(err) }),
315        }
316    }
317}
318
319struct TypedEventHandler<Payload, Handler, Fut> {
320    handler: Handler,
321    _marker: std::marker::PhantomData<fn(Payload, Fut)>,
322}
323
324impl<Payload, Handler, Fut> EventHandler for TypedEventHandler<Payload, Handler, Fut>
325where
326    Payload: DeserializeOwned + Send + 'static,
327    Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
328    Fut: Future<Output = Result<()>> + Send + 'static,
329{
330    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture {
331        let payload = serde_json::from_value::<Payload>(payload)
332            .context("Failed to deserialize event payload");
333
334        match payload {
335            Ok(payload) => Box::pin((self.handler)(payload, ctx)),
336            Err(err) => Box::pin(async move { Err(err) }),
337        }
338    }
339}
340
341#[derive(Clone, Default)]
342pub struct MicroserviceRegistry {
343    message_handlers: Arc<HashMap<String, Arc<dyn MessageHandler>>>,
344    event_handlers: Arc<HashMap<String, Arc<dyn EventHandler>>>,
345}
346
347impl MicroserviceRegistry {
348    pub fn builder() -> MicroserviceRegistryBuilder {
349        MicroserviceRegistryBuilder::default()
350    }
351
352    pub async fn dispatch_message(
353        &self,
354        envelope: MessageEnvelope,
355        ctx: MicroserviceContext,
356    ) -> Result<Value> {
357        let handler = self
358            .message_handlers
359            .get(&envelope.pattern)
360            .with_context(|| format!("No message handler registered for `{}`", envelope.pattern))?;
361
362        handler.handle(envelope.payload, ctx).await
363    }
364
365    pub async fn dispatch_event(
366        &self,
367        envelope: EventEnvelope,
368        ctx: MicroserviceContext,
369    ) -> Result<()> {
370        let handler = self
371            .event_handlers
372            .get(&envelope.pattern)
373            .with_context(|| format!("No event handler registered for `{}`", envelope.pattern))?;
374
375        handler.handle(envelope.payload, ctx).await
376    }
377}
378
379#[derive(Default)]
380pub struct MicroserviceRegistryBuilder {
381    message_handlers: HashMap<String, Arc<dyn MessageHandler>>,
382    event_handlers: HashMap<String, Arc<dyn EventHandler>>,
383}
384
385impl MicroserviceRegistryBuilder {
386    pub fn message<Payload, Response, Handler, Fut>(
387        mut self,
388        pattern: impl Into<String>,
389        handler: Handler,
390    ) -> Self
391    where
392        Payload: DeserializeOwned + Send + 'static,
393        Response: Serialize + Send + 'static,
394        Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
395        Fut: Future<Output = Result<Response>> + Send + 'static,
396    {
397        self.message_handlers.insert(
398            pattern.into(),
399            Arc::new(TypedMessageHandler::<Payload, Response, Handler, Fut> {
400                handler,
401                _marker: std::marker::PhantomData,
402            }),
403        );
404        self
405    }
406
407    pub fn event<Payload, Handler, Fut>(
408        mut self,
409        pattern: impl Into<String>,
410        handler: Handler,
411    ) -> Self
412    where
413        Payload: DeserializeOwned + Send + 'static,
414        Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
415        Fut: Future<Output = Result<()>> + Send + 'static,
416    {
417        self.event_handlers.insert(
418            pattern.into(),
419            Arc::new(TypedEventHandler::<Payload, Handler, Fut> {
420                handler,
421                _marker: std::marker::PhantomData,
422            }),
423        );
424        self
425    }
426
427    pub fn build(self) -> MicroserviceRegistry {
428        MicroserviceRegistry {
429            message_handlers: Arc::new(self.message_handlers),
430            event_handlers: Arc::new(self.event_handlers),
431        }
432    }
433}
434
435#[derive(Clone)]
436pub struct InProcessMicroserviceClient {
437    container: Container,
438    registry: MicroserviceRegistry,
439    transport: Arc<str>,
440    metadata: TransportMetadata,
441}
442
443impl InProcessMicroserviceClient {
444    pub fn new(container: Container, registry: MicroserviceRegistry) -> Self {
445        Self {
446            container,
447            registry,
448            transport: Arc::<str>::from("in-process"),
449            metadata: TransportMetadata::default(),
450        }
451    }
452
453    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
454        self.transport = Arc::<str>::from(transport.into());
455        self
456    }
457
458    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
459        self.metadata = metadata;
460        self
461    }
462
463    fn context(&self, pattern: impl Into<String>) -> MicroserviceContext {
464        MicroserviceContext::new(
465            self.container.clone(),
466            self.transport.to_string(),
467            pattern,
468            self.metadata.clone(),
469        )
470    }
471}
472
473impl MicroserviceClient for InProcessMicroserviceClient {
474    fn send<Payload, Response>(
475        &self,
476        pattern: impl Into<String>,
477        payload: Payload,
478    ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
479    where
480        Payload: Serialize + Send + 'static,
481        Response: DeserializeOwned + Send + 'static,
482    {
483        let registry = self.registry.clone();
484        let pattern = pattern.into();
485        let envelope = MessageEnvelope::new(pattern.clone(), payload);
486        let context = self.context(pattern);
487
488        Box::pin(async move {
489            let response = registry.dispatch_message(envelope?, context).await?;
490            serde_json::from_value(response).context("Failed to deserialize microservice response")
491        })
492    }
493
494    fn emit<Payload>(
495        &self,
496        pattern: impl Into<String>,
497        payload: Payload,
498    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
499    where
500        Payload: Serialize + Send + 'static,
501    {
502        let registry = self.registry.clone();
503        let pattern = pattern.into();
504        let envelope = EventEnvelope::new(pattern.clone(), payload);
505        let context = self.context(pattern);
506
507        Box::pin(async move { registry.dispatch_event(envelope?, context).await })
508    }
509}