Skip to main content

nestforge_microservices/
lib.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use nestforge_core::{AuthIdentity, Container, RequestId};
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11use serde_json::Value;
12
13type MessageFuture = Pin<Box<dyn Future<Output = Result<Value>> + Send>>;
14type EventFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16pub trait MicroserviceClient: Send + Sync + 'static {
17    fn send<Payload, Response>(
18        &self,
19        pattern: impl Into<String>,
20        payload: Payload,
21    ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
22    where
23        Payload: Serialize + Send + 'static,
24        Response: DeserializeOwned + Send + 'static;
25
26    fn emit<Payload>(
27        &self,
28        pattern: impl Into<String>,
29        payload: Payload,
30    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
31    where
32        Payload: Serialize + Send + 'static;
33}
34
35#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
36pub struct TransportMetadata {
37    pub values: BTreeMap<String, String>,
38}
39
40impl TransportMetadata {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    pub fn insert(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
46        self.values.insert(key.into(), value.into());
47        self
48    }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct MessageEnvelope {
53    pub pattern: String,
54    pub payload: Value,
55    #[serde(default)]
56    pub metadata: TransportMetadata,
57}
58
59impl MessageEnvelope {
60    pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
61        Ok(Self {
62            pattern: pattern.into(),
63            payload: serde_json::to_value(payload).context("Failed to serialize message payload")?,
64            metadata: TransportMetadata::default(),
65        })
66    }
67
68    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
69        self.metadata = metadata;
70        self
71    }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct EventEnvelope {
76    pub pattern: String,
77    pub payload: Value,
78    #[serde(default)]
79    pub metadata: TransportMetadata,
80}
81
82impl EventEnvelope {
83    pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
84        Ok(Self {
85            pattern: pattern.into(),
86            payload: serde_json::to_value(payload).context("Failed to serialize event payload")?,
87            metadata: TransportMetadata::default(),
88        })
89    }
90
91    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
92        self.metadata = metadata;
93        self
94    }
95}
96
97#[derive(Clone)]
98pub struct MicroserviceContext {
99    container: Container,
100    transport: Arc<str>,
101    pattern: Arc<str>,
102    metadata: TransportMetadata,
103    request_id: Option<RequestId>,
104    auth_identity: Option<AuthIdentity>,
105}
106
107impl MicroserviceContext {
108    pub fn new(
109        container: Container,
110        transport: impl Into<String>,
111        pattern: impl Into<String>,
112        metadata: TransportMetadata,
113    ) -> Self {
114        let request_id = container.resolve::<RequestId>().ok().map(|value| (*value).clone());
115        let auth_identity = container
116            .resolve::<AuthIdentity>()
117            .ok()
118            .map(|value| (*value).clone());
119
120        Self {
121            container,
122            transport: Arc::<str>::from(transport.into()),
123            pattern: Arc::<str>::from(pattern.into()),
124            metadata,
125            request_id,
126            auth_identity,
127        }
128    }
129
130    pub fn container(&self) -> &Container {
131        &self.container
132    }
133
134    pub fn transport(&self) -> &str {
135        &self.transport
136    }
137
138    pub fn pattern(&self) -> &str {
139        &self.pattern
140    }
141
142    pub fn metadata(&self) -> &TransportMetadata {
143        &self.metadata
144    }
145
146    pub fn request_id(&self) -> Option<&RequestId> {
147        self.request_id.as_ref()
148    }
149
150    pub fn auth_identity(&self) -> Option<&AuthIdentity> {
151        self.auth_identity.as_ref()
152    }
153
154    pub fn resolve<T>(&self) -> Result<Arc<T>>
155    where
156        T: Send + Sync + 'static,
157    {
158        self.container.resolve::<T>().map_err(anyhow::Error::new)
159    }
160}
161
162trait MessageHandler: Send + Sync + 'static {
163    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture;
164}
165
166trait EventHandler: Send + Sync + 'static {
167    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture;
168}
169
170struct TypedMessageHandler<Payload, Response, Handler, Fut> {
171    handler: Handler,
172    _marker: std::marker::PhantomData<fn(Payload, Response, Fut)>,
173}
174
175impl<Payload, Response, Handler, Fut> MessageHandler
176    for TypedMessageHandler<Payload, Response, Handler, Fut>
177where
178    Payload: DeserializeOwned + Send + 'static,
179    Response: Serialize + Send + 'static,
180    Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
181    Fut: Future<Output = Result<Response>> + Send + 'static,
182{
183    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture {
184        let payload = serde_json::from_value::<Payload>(payload)
185            .context("Failed to deserialize message payload");
186
187        match payload {
188            Ok(payload) => {
189                let future = (self.handler)(payload, ctx);
190                Box::pin(async move {
191                    let response = future.await?;
192                    serde_json::to_value(response).context("Failed to serialize message response")
193                })
194            }
195            Err(err) => Box::pin(async move { Err(err) }),
196        }
197    }
198}
199
200struct TypedEventHandler<Payload, Handler, Fut> {
201    handler: Handler,
202    _marker: std::marker::PhantomData<fn(Payload, Fut)>,
203}
204
205impl<Payload, Handler, Fut> EventHandler for TypedEventHandler<Payload, Handler, Fut>
206where
207    Payload: DeserializeOwned + Send + 'static,
208    Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
209    Fut: Future<Output = Result<()>> + Send + 'static,
210{
211    fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture {
212        let payload = serde_json::from_value::<Payload>(payload)
213            .context("Failed to deserialize event payload");
214
215        match payload {
216            Ok(payload) => Box::pin((self.handler)(payload, ctx)),
217            Err(err) => Box::pin(async move { Err(err) }),
218        }
219    }
220}
221
222#[derive(Clone, Default)]
223pub struct MicroserviceRegistry {
224    message_handlers: Arc<HashMap<String, Arc<dyn MessageHandler>>>,
225    event_handlers: Arc<HashMap<String, Arc<dyn EventHandler>>>,
226}
227
228impl MicroserviceRegistry {
229    pub fn builder() -> MicroserviceRegistryBuilder {
230        MicroserviceRegistryBuilder::default()
231    }
232
233    pub async fn dispatch_message(
234        &self,
235        envelope: MessageEnvelope,
236        ctx: MicroserviceContext,
237    ) -> Result<Value> {
238        let handler = self
239            .message_handlers
240            .get(&envelope.pattern)
241            .with_context(|| format!("No message handler registered for `{}`", envelope.pattern))?;
242
243        handler.handle(envelope.payload, ctx).await
244    }
245
246    pub async fn dispatch_event(
247        &self,
248        envelope: EventEnvelope,
249        ctx: MicroserviceContext,
250    ) -> Result<()> {
251        let handler = self
252            .event_handlers
253            .get(&envelope.pattern)
254            .with_context(|| format!("No event handler registered for `{}`", envelope.pattern))?;
255
256        handler.handle(envelope.payload, ctx).await
257    }
258}
259
260#[derive(Default)]
261pub struct MicroserviceRegistryBuilder {
262    message_handlers: HashMap<String, Arc<dyn MessageHandler>>,
263    event_handlers: HashMap<String, Arc<dyn EventHandler>>,
264}
265
266impl MicroserviceRegistryBuilder {
267    pub fn message<Payload, Response, Handler, Fut>(
268        mut self,
269        pattern: impl Into<String>,
270        handler: Handler,
271    ) -> Self
272    where
273        Payload: DeserializeOwned + Send + 'static,
274        Response: Serialize + Send + 'static,
275        Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
276        Fut: Future<Output = Result<Response>> + Send + 'static,
277    {
278        self.message_handlers.insert(
279            pattern.into(),
280            Arc::new(TypedMessageHandler::<Payload, Response, Handler, Fut> {
281                handler,
282                _marker: std::marker::PhantomData,
283            }),
284        );
285        self
286    }
287
288    pub fn event<Payload, Handler, Fut>(
289        mut self,
290        pattern: impl Into<String>,
291        handler: Handler,
292    ) -> Self
293    where
294        Payload: DeserializeOwned + Send + 'static,
295        Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
296        Fut: Future<Output = Result<()>> + Send + 'static,
297    {
298        self.event_handlers.insert(
299            pattern.into(),
300            Arc::new(TypedEventHandler::<Payload, Handler, Fut> {
301                handler,
302                _marker: std::marker::PhantomData,
303            }),
304        );
305        self
306    }
307
308    pub fn build(self) -> MicroserviceRegistry {
309        MicroserviceRegistry {
310            message_handlers: Arc::new(self.message_handlers),
311            event_handlers: Arc::new(self.event_handlers),
312        }
313    }
314}
315
316#[derive(Clone)]
317pub struct InProcessMicroserviceClient {
318    container: Container,
319    registry: MicroserviceRegistry,
320    transport: Arc<str>,
321    metadata: TransportMetadata,
322}
323
324impl InProcessMicroserviceClient {
325    pub fn new(container: Container, registry: MicroserviceRegistry) -> Self {
326        Self {
327            container,
328            registry,
329            transport: Arc::<str>::from("in-process"),
330            metadata: TransportMetadata::default(),
331        }
332    }
333
334    pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
335        self.transport = Arc::<str>::from(transport.into());
336        self
337    }
338
339    pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
340        self.metadata = metadata;
341        self
342    }
343
344    fn context(&self, pattern: impl Into<String>) -> MicroserviceContext {
345        MicroserviceContext::new(
346            self.container.clone(),
347            self.transport.to_string(),
348            pattern,
349            self.metadata.clone(),
350        )
351    }
352}
353
354impl MicroserviceClient for InProcessMicroserviceClient {
355    fn send<Payload, Response>(
356        &self,
357        pattern: impl Into<String>,
358        payload: Payload,
359    ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
360    where
361        Payload: Serialize + Send + 'static,
362        Response: DeserializeOwned + Send + 'static,
363    {
364        let registry = self.registry.clone();
365        let pattern = pattern.into();
366        let envelope = MessageEnvelope::new(pattern.clone(), payload);
367        let context = self.context(pattern);
368
369        Box::pin(async move {
370            let response = registry
371                .dispatch_message(envelope?, context)
372                .await?;
373            serde_json::from_value(response).context("Failed to deserialize microservice response")
374        })
375    }
376
377    fn emit<Payload>(
378        &self,
379        pattern: impl Into<String>,
380        payload: Payload,
381    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
382    where
383        Payload: Serialize + Send + 'static,
384    {
385        let registry = self.registry.clone();
386        let pattern = pattern.into();
387        let envelope = EventEnvelope::new(pattern.clone(), payload);
388        let context = self.context(pattern);
389
390        Box::pin(async move { registry.dispatch_event(envelope?, context).await })
391    }
392}