Skip to main content

nestforge_microservices/
lib.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use nestforge_core::{AuthIdentity, Container, RequestId};
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11use serde_json::Value;
12
13type MessageFuture = Pin<Box<dyn Future<Output = Result<Value>> + Send>>;
14type EventFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16pub trait MicroserviceClient: Send + Sync + 'static {
17    fn send<Payload, Response>(
18        &self,
19        pattern: impl Into<String>,
20        payload: Payload,
21    ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
22    where
23        Payload: Serialize + Send + 'static,
24        Response: DeserializeOwned + Send + 'static;
25
26    fn emit<Payload>(
27        &self,
28        pattern: impl Into<String>,
29        payload: Payload,
30    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
31    where
32        Payload: Serialize + Send + 'static;
33}
34
35#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
36pub struct TransportMetadata {
37    pub values: BTreeMap<String, String>,
38}
39
40impl TransportMetadata {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    pub fn insert(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
46        self.values.insert(key.into(), value.into());
47        self
48    }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct MessageEnvelope {
53    pub pattern: String,
54    pub payload: Value,
55    #[serde(default)]
56    pub metadata: TransportMetadata,
57}
58
59impl MessageEnvelope {
60    pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
61        Ok(Self {
62            pattern: pattern.into(),
63            payload: serde_json::to_value(payload)
64                .context("Failed to serialize message payload")?,
65            metadata: TransportMetadata::default(),
66        })
67    }
68
69    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
70        self.metadata = metadata;
71        self
72    }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct EventEnvelope {
77    pub pattern: String,
78    pub payload: Value,
79    #[serde(default)]
80    pub metadata: TransportMetadata,
81}
82
83impl EventEnvelope {
84    pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
85        Ok(Self {
86            pattern: pattern.into(),
87            payload: serde_json::to_value(payload).context("Failed to serialize event payload")?,
88            metadata: TransportMetadata::default(),
89        })
90    }
91
92    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
93        self.metadata = metadata;
94        self
95    }
96}
97
98#[derive(Clone)]
99pub struct MicroserviceContext {
100    container: Container,
101    transport: Arc<str>,
102    pattern: Arc<str>,
103    metadata: TransportMetadata,
104    request_id: Option<RequestId>,
105    auth_identity: Option<AuthIdentity>,
106}
107
108impl MicroserviceContext {
109    pub fn new(
110        container: Container,
111        transport: impl Into<String>,
112        pattern: impl Into<String>,
113        metadata: TransportMetadata,
114    ) -> Self {
115        let request_id = container
116            .resolve::<RequestId>()
117            .ok()
118            .map(|value| (*value).clone());
119        let auth_identity = container
120            .resolve::<AuthIdentity>()
121            .ok()
122            .map(|value| (*value).clone());
123
124        Self {
125            container,
126            transport: Arc::<str>::from(transport.into()),
127            pattern: Arc::<str>::from(pattern.into()),
128            metadata,
129            request_id,
130            auth_identity,
131        }
132    }
133
134    pub fn container(&self) -> &Container {
135        &self.container
136    }
137
138    pub fn transport(&self) -> &str {
139        &self.transport
140    }
141
142    pub fn pattern(&self) -> &str {
143        &self.pattern
144    }
145
146    pub fn metadata(&self) -> &TransportMetadata {
147        &self.metadata
148    }
149
150    pub fn request_id(&self) -> Option<&RequestId> {
151        self.request_id.as_ref()
152    }
153
154    pub fn auth_identity(&self) -> Option<&AuthIdentity> {
155        self.auth_identity.as_ref()
156    }
157
158    pub fn resolve<T>(&self) -> Result<Arc<T>>
159    where
160        T: Send + Sync + 'static,
161    {
162        self.container.resolve::<T>().map_err(anyhow::Error::new)
163    }
164}
165
166trait MessageHandler: Send + Sync + 'static {
167    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture;
168}
169
170trait EventHandler: Send + Sync + 'static {
171    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture;
172}
173
174struct TypedMessageHandler<Payload, Response, Handler, Fut> {
175    handler: Handler,
176    _marker: std::marker::PhantomData<fn(Payload, Response, Fut)>,
177}
178
179impl<Payload, Response, Handler, Fut> MessageHandler
180    for TypedMessageHandler<Payload, Response, Handler, Fut>
181where
182    Payload: DeserializeOwned + Send + 'static,
183    Response: Serialize + Send + 'static,
184    Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
185    Fut: Future<Output = Result<Response>> + Send + 'static,
186{
187    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture {
188        let payload = serde_json::from_value::<Payload>(payload)
189            .context("Failed to deserialize message payload");
190
191        match payload {
192            Ok(payload) => {
193                let future = (self.handler)(payload, ctx);
194                Box::pin(async move {
195                    let response = future.await?;
196                    serde_json::to_value(response).context("Failed to serialize message response")
197                })
198            }
199            Err(err) => Box::pin(async move { Err(err) }),
200        }
201    }
202}
203
204struct TypedEventHandler<Payload, Handler, Fut> {
205    handler: Handler,
206    _marker: std::marker::PhantomData<fn(Payload, Fut)>,
207}
208
209impl<Payload, Handler, Fut> EventHandler for TypedEventHandler<Payload, Handler, Fut>
210where
211    Payload: DeserializeOwned + Send + 'static,
212    Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
213    Fut: Future<Output = Result<()>> + Send + 'static,
214{
215    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture {
216        let payload = serde_json::from_value::<Payload>(payload)
217            .context("Failed to deserialize event payload");
218
219        match payload {
220            Ok(payload) => Box::pin((self.handler)(payload, ctx)),
221            Err(err) => Box::pin(async move { Err(err) }),
222        }
223    }
224}
225
226#[derive(Clone, Default)]
227pub struct MicroserviceRegistry {
228    message_handlers: Arc<HashMap<String, Arc<dyn MessageHandler>>>,
229    event_handlers: Arc<HashMap<String, Arc<dyn EventHandler>>>,
230}
231
232impl MicroserviceRegistry {
233    pub fn builder() -> MicroserviceRegistryBuilder {
234        MicroserviceRegistryBuilder::default()
235    }
236
237    pub async fn dispatch_message(
238        &self,
239        envelope: MessageEnvelope,
240        ctx: MicroserviceContext,
241    ) -> Result<Value> {
242        let handler = self
243            .message_handlers
244            .get(&envelope.pattern)
245            .with_context(|| format!("No message handler registered for `{}`", envelope.pattern))?;
246
247        handler.handle(envelope.payload, ctx).await
248    }
249
250    pub async fn dispatch_event(
251        &self,
252        envelope: EventEnvelope,
253        ctx: MicroserviceContext,
254    ) -> Result<()> {
255        let handler = self
256            .event_handlers
257            .get(&envelope.pattern)
258            .with_context(|| format!("No event handler registered for `{}`", envelope.pattern))?;
259
260        handler.handle(envelope.payload, ctx).await
261    }
262}
263
264#[derive(Default)]
265pub struct MicroserviceRegistryBuilder {
266    message_handlers: HashMap<String, Arc<dyn MessageHandler>>,
267    event_handlers: HashMap<String, Arc<dyn EventHandler>>,
268}
269
270impl MicroserviceRegistryBuilder {
271    pub fn message<Payload, Response, Handler, Fut>(
272        mut self,
273        pattern: impl Into<String>,
274        handler: Handler,
275    ) -> Self
276    where
277        Payload: DeserializeOwned + Send + 'static,
278        Response: Serialize + Send + 'static,
279        Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
280        Fut: Future<Output = Result<Response>> + Send + 'static,
281    {
282        self.message_handlers.insert(
283            pattern.into(),
284            Arc::new(TypedMessageHandler::<Payload, Response, Handler, Fut> {
285                handler,
286                _marker: std::marker::PhantomData,
287            }),
288        );
289        self
290    }
291
292    pub fn event<Payload, Handler, Fut>(
293        mut self,
294        pattern: impl Into<String>,
295        handler: Handler,
296    ) -> Self
297    where
298        Payload: DeserializeOwned + Send + 'static,
299        Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
300        Fut: Future<Output = Result<()>> + Send + 'static,
301    {
302        self.event_handlers.insert(
303            pattern.into(),
304            Arc::new(TypedEventHandler::<Payload, Handler, Fut> {
305                handler,
306                _marker: std::marker::PhantomData,
307            }),
308        );
309        self
310    }
311
312    pub fn build(self) -> MicroserviceRegistry {
313        MicroserviceRegistry {
314            message_handlers: Arc::new(self.message_handlers),
315            event_handlers: Arc::new(self.event_handlers),
316        }
317    }
318}
319
320#[derive(Clone)]
321pub struct InProcessMicroserviceClient {
322    container: Container,
323    registry: MicroserviceRegistry,
324    transport: Arc<str>,
325    metadata: TransportMetadata,
326}
327
328impl InProcessMicroserviceClient {
329    pub fn new(container: Container, registry: MicroserviceRegistry) -> Self {
330        Self {
331            container,
332            registry,
333            transport: Arc::<str>::from("in-process"),
334            metadata: TransportMetadata::default(),
335        }
336    }
337
338    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
339        self.transport = Arc::<str>::from(transport.into());
340        self
341    }
342
343    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
344        self.metadata = metadata;
345        self
346    }
347
348    fn context(&self, pattern: impl Into<String>) -> MicroserviceContext {
349        MicroserviceContext::new(
350            self.container.clone(),
351            self.transport.to_string(),
352            pattern,
353            self.metadata.clone(),
354        )
355    }
356}
357
358impl MicroserviceClient for InProcessMicroserviceClient {
359    fn send<Payload, Response>(
360        &self,
361        pattern: impl Into<String>,
362        payload: Payload,
363    ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
364    where
365        Payload: Serialize + Send + 'static,
366        Response: DeserializeOwned + Send + 'static,
367    {
368        let registry = self.registry.clone();
369        let pattern = pattern.into();
370        let envelope = MessageEnvelope::new(pattern.clone(), payload);
371        let context = self.context(pattern);
372
373        Box::pin(async move {
374            let response = registry.dispatch_message(envelope?, context).await?;
375            serde_json::from_value(response).context("Failed to deserialize microservice response")
376        })
377    }
378
379    fn emit<Payload>(
380        &self,
381        pattern: impl Into<String>,
382        payload: Payload,
383    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
384    where
385        Payload: Serialize + Send + 'static,
386    {
387        let registry = self.registry.clone();
388        let pattern = pattern.into();
389        let envelope = EventEnvelope::new(pattern.clone(), payload);
390        let context = self.context(pattern);
391
392        Box::pin(async move { registry.dispatch_event(envelope?, context).await })
393    }
394}