1use std::{
2 collections::{BTreeMap, HashMap},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use nestforge_core::{AuthIdentity, Container, RequestId};
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11use serde_json::Value;
12
13type MessageFuture = Pin<Box<dyn Future<Output = Result<Value>> + Send>>;
14type EventFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16pub trait MicroserviceClient: Send + Sync + 'static {
17 fn send<Payload, Response>(
18 &self,
19 pattern: impl Into<String>,
20 payload: Payload,
21 ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
22 where
23 Payload: Serialize + Send + 'static,
24 Response: DeserializeOwned + Send + 'static;
25
26 fn emit<Payload>(
27 &self,
28 pattern: impl Into<String>,
29 payload: Payload,
30 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
31 where
32 Payload: Serialize + Send + 'static;
33}
34
35#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
36pub struct TransportMetadata {
37 pub values: BTreeMap<String, String>,
38}
39
40impl TransportMetadata {
41 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn insert(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
46 self.values.insert(key.into(), value.into());
47 self
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct MessageEnvelope {
53 pub pattern: String,
54 pub payload: Value,
55 #[serde(default)]
56 pub metadata: TransportMetadata,
57}
58
59impl MessageEnvelope {
60 pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
61 Ok(Self {
62 pattern: pattern.into(),
63 payload: serde_json::to_value(payload).context("Failed to serialize message payload")?,
64 metadata: TransportMetadata::default(),
65 })
66 }
67
68 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
69 self.metadata = metadata;
70 self
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct EventEnvelope {
76 pub pattern: String,
77 pub payload: Value,
78 #[serde(default)]
79 pub metadata: TransportMetadata,
80}
81
82impl EventEnvelope {
83 pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
84 Ok(Self {
85 pattern: pattern.into(),
86 payload: serde_json::to_value(payload).context("Failed to serialize event payload")?,
87 metadata: TransportMetadata::default(),
88 })
89 }
90
91 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
92 self.metadata = metadata;
93 self
94 }
95}
96
97#[derive(Clone)]
98pub struct MicroserviceContext {
99 container: Container,
100 transport: Arc<str>,
101 pattern: Arc<str>,
102 metadata: TransportMetadata,
103 request_id: Option<RequestId>,
104 auth_identity: Option<AuthIdentity>,
105}
106
107impl MicroserviceContext {
108 pub fn new(
109 container: Container,
110 transport: impl Into<String>,
111 pattern: impl Into<String>,
112 metadata: TransportMetadata,
113 ) -> Self {
114 let request_id = container.resolve::<RequestId>().ok().map(|value| (*value).clone());
115 let auth_identity = container
116 .resolve::<AuthIdentity>()
117 .ok()
118 .map(|value| (*value).clone());
119
120 Self {
121 container,
122 transport: Arc::<str>::from(transport.into()),
123 pattern: Arc::<str>::from(pattern.into()),
124 metadata,
125 request_id,
126 auth_identity,
127 }
128 }
129
130 pub fn container(&self) -> &Container {
131 &self.container
132 }
133
134 pub fn transport(&self) -> &str {
135 &self.transport
136 }
137
138 pub fn pattern(&self) -> &str {
139 &self.pattern
140 }
141
142 pub fn metadata(&self) -> &TransportMetadata {
143 &self.metadata
144 }
145
146 pub fn request_id(&self) -> Option<&RequestId> {
147 self.request_id.as_ref()
148 }
149
150 pub fn auth_identity(&self) -> Option<&AuthIdentity> {
151 self.auth_identity.as_ref()
152 }
153
154 pub fn resolve<T>(&self) -> Result<Arc<T>>
155 where
156 T: Send + Sync + 'static,
157 {
158 self.container.resolve::<T>().map_err(anyhow::Error::new)
159 }
160}
161
162trait MessageHandler: Send + Sync + 'static {
163 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture;
164}
165
166trait EventHandler: Send + Sync + 'static {
167 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture;
168}
169
170struct TypedMessageHandler<Payload, Response, Handler, Fut> {
171 handler: Handler,
172 _marker: std::marker::PhantomData<fn(Payload, Response, Fut)>,
173}
174
175impl<Payload, Response, Handler, Fut> MessageHandler
176 for TypedMessageHandler<Payload, Response, Handler, Fut>
177where
178 Payload: DeserializeOwned + Send + 'static,
179 Response: Serialize + Send + 'static,
180 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
181 Fut: Future<Output = Result<Response>> + Send + 'static,
182{
183 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture {
184 let payload = serde_json::from_value::<Payload>(payload)
185 .context("Failed to deserialize message payload");
186
187 match payload {
188 Ok(payload) => {
189 let future = (self.handler)(payload, ctx);
190 Box::pin(async move {
191 let response = future.await?;
192 serde_json::to_value(response).context("Failed to serialize message response")
193 })
194 }
195 Err(err) => Box::pin(async move { Err(err) }),
196 }
197 }
198}
199
200struct TypedEventHandler<Payload, Handler, Fut> {
201 handler: Handler,
202 _marker: std::marker::PhantomData<fn(Payload, Fut)>,
203}
204
205impl<Payload, Handler, Fut> EventHandler for TypedEventHandler<Payload, Handler, Fut>
206where
207 Payload: DeserializeOwned + Send + 'static,
208 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
209 Fut: Future<Output = Result<()>> + Send + 'static,
210{
211 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture {
212 let payload = serde_json::from_value::<Payload>(payload)
213 .context("Failed to deserialize event payload");
214
215 match payload {
216 Ok(payload) => Box::pin((self.handler)(payload, ctx)),
217 Err(err) => Box::pin(async move { Err(err) }),
218 }
219 }
220}
221
222#[derive(Clone, Default)]
223pub struct MicroserviceRegistry {
224 message_handlers: Arc<HashMap<String, Arc<dyn MessageHandler>>>,
225 event_handlers: Arc<HashMap<String, Arc<dyn EventHandler>>>,
226}
227
228impl MicroserviceRegistry {
229 pub fn builder() -> MicroserviceRegistryBuilder {
230 MicroserviceRegistryBuilder::default()
231 }
232
233 pub async fn dispatch_message(
234 &self,
235 envelope: MessageEnvelope,
236 ctx: MicroserviceContext,
237 ) -> Result<Value> {
238 let handler = self
239 .message_handlers
240 .get(&envelope.pattern)
241 .with_context(|| format!("No message handler registered for `{}`", envelope.pattern))?;
242
243 handler.handle(envelope.payload, ctx).await
244 }
245
246 pub async fn dispatch_event(
247 &self,
248 envelope: EventEnvelope,
249 ctx: MicroserviceContext,
250 ) -> Result<()> {
251 let handler = self
252 .event_handlers
253 .get(&envelope.pattern)
254 .with_context(|| format!("No event handler registered for `{}`", envelope.pattern))?;
255
256 handler.handle(envelope.payload, ctx).await
257 }
258}
259
260#[derive(Default)]
261pub struct MicroserviceRegistryBuilder {
262 message_handlers: HashMap<String, Arc<dyn MessageHandler>>,
263 event_handlers: HashMap<String, Arc<dyn EventHandler>>,
264}
265
266impl MicroserviceRegistryBuilder {
267 pub fn message<Payload, Response, Handler, Fut>(
268 mut self,
269 pattern: impl Into<String>,
270 handler: Handler,
271 ) -> Self
272 where
273 Payload: DeserializeOwned + Send + 'static,
274 Response: Serialize + Send + 'static,
275 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
276 Fut: Future<Output = Result<Response>> + Send + 'static,
277 {
278 self.message_handlers.insert(
279 pattern.into(),
280 Arc::new(TypedMessageHandler::<Payload, Response, Handler, Fut> {
281 handler,
282 _marker: std::marker::PhantomData,
283 }),
284 );
285 self
286 }
287
288 pub fn event<Payload, Handler, Fut>(
289 mut self,
290 pattern: impl Into<String>,
291 handler: Handler,
292 ) -> Self
293 where
294 Payload: DeserializeOwned + Send + 'static,
295 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
296 Fut: Future<Output = Result<()>> + Send + 'static,
297 {
298 self.event_handlers.insert(
299 pattern.into(),
300 Arc::new(TypedEventHandler::<Payload, Handler, Fut> {
301 handler,
302 _marker: std::marker::PhantomData,
303 }),
304 );
305 self
306 }
307
308 pub fn build(self) -> MicroserviceRegistry {
309 MicroserviceRegistry {
310 message_handlers: Arc::new(self.message_handlers),
311 event_handlers: Arc::new(self.event_handlers),
312 }
313 }
314}
315
316#[derive(Clone)]
317pub struct InProcessMicroserviceClient {
318 container: Container,
319 registry: MicroserviceRegistry,
320 transport: Arc<str>,
321 metadata: TransportMetadata,
322}
323
324impl InProcessMicroserviceClient {
325 pub fn new(container: Container, registry: MicroserviceRegistry) -> Self {
326 Self {
327 container,
328 registry,
329 transport: Arc::<str>::from("in-process"),
330 metadata: TransportMetadata::default(),
331 }
332 }
333
334 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
335 self.transport = Arc::<str>::from(transport.into());
336 self
337 }
338
339 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
340 self.metadata = metadata;
341 self
342 }
343
344 fn context(&self, pattern: impl Into<String>) -> MicroserviceContext {
345 MicroserviceContext::new(
346 self.container.clone(),
347 self.transport.to_string(),
348 pattern,
349 self.metadata.clone(),
350 )
351 }
352}
353
354impl MicroserviceClient for InProcessMicroserviceClient {
355 fn send<Payload, Response>(
356 &self,
357 pattern: impl Into<String>,
358 payload: Payload,
359 ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
360 where
361 Payload: Serialize + Send + 'static,
362 Response: DeserializeOwned + Send + 'static,
363 {
364 let registry = self.registry.clone();
365 let pattern = pattern.into();
366 let envelope = MessageEnvelope::new(pattern.clone(), payload);
367 let context = self.context(pattern);
368
369 Box::pin(async move {
370 let response = registry
371 .dispatch_message(envelope?, context)
372 .await?;
373 serde_json::from_value(response).context("Failed to deserialize microservice response")
374 })
375 }
376
377 fn emit<Payload>(
378 &self,
379 pattern: impl Into<String>,
380 payload: Payload,
381 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
382 where
383 Payload: Serialize + Send + 'static,
384 {
385 let registry = self.registry.clone();
386 let pattern = pattern.into();
387 let envelope = EventEnvelope::new(pattern.clone(), payload);
388 let context = self.context(pattern);
389
390 Box::pin(async move { registry.dispatch_event(envelope?, context).await })
391 }
392}