1use std::{
2 collections::{BTreeMap, HashMap},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use nestforge_core::{AuthIdentity, Container, RequestId};
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11use serde_json::Value;
12
13type MessageFuture = Pin<Box<dyn Future<Output = Result<Value>> + Send>>;
14type EventFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16pub trait MicroserviceClient: Send + Sync + 'static {
17 fn send<Payload, Response>(
18 &self,
19 pattern: impl Into<String>,
20 payload: Payload,
21 ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
22 where
23 Payload: Serialize + Send + 'static,
24 Response: DeserializeOwned + Send + 'static;
25
26 fn emit<Payload>(
27 &self,
28 pattern: impl Into<String>,
29 payload: Payload,
30 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
31 where
32 Payload: Serialize + Send + 'static;
33}
34
35#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
36pub struct TransportMetadata {
37 pub values: BTreeMap<String, String>,
38}
39
40impl TransportMetadata {
41 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn insert(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
46 self.values.insert(key.into(), value.into());
47 self
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct MessageEnvelope {
53 pub pattern: String,
54 pub payload: Value,
55 #[serde(default)]
56 pub metadata: TransportMetadata,
57}
58
59impl MessageEnvelope {
60 pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
61 Ok(Self {
62 pattern: pattern.into(),
63 payload: serde_json::to_value(payload)
64 .context("Failed to serialize message payload")?,
65 metadata: TransportMetadata::default(),
66 })
67 }
68
69 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
70 self.metadata = metadata;
71 self
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct EventEnvelope {
77 pub pattern: String,
78 pub payload: Value,
79 #[serde(default)]
80 pub metadata: TransportMetadata,
81}
82
83impl EventEnvelope {
84 pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
85 Ok(Self {
86 pattern: pattern.into(),
87 payload: serde_json::to_value(payload).context("Failed to serialize event payload")?,
88 metadata: TransportMetadata::default(),
89 })
90 }
91
92 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
93 self.metadata = metadata;
94 self
95 }
96}
97
98#[derive(Clone)]
99pub struct MicroserviceContext {
100 container: Container,
101 transport: Arc<str>,
102 pattern: Arc<str>,
103 metadata: TransportMetadata,
104 request_id: Option<RequestId>,
105 auth_identity: Option<AuthIdentity>,
106}
107
108impl MicroserviceContext {
109 pub fn new(
110 container: Container,
111 transport: impl Into<String>,
112 pattern: impl Into<String>,
113 metadata: TransportMetadata,
114 ) -> Self {
115 let request_id = container
116 .resolve::<RequestId>()
117 .ok()
118 .map(|value| (*value).clone());
119 let auth_identity = container
120 .resolve::<AuthIdentity>()
121 .ok()
122 .map(|value| (*value).clone());
123
124 Self {
125 container,
126 transport: Arc::<str>::from(transport.into()),
127 pattern: Arc::<str>::from(pattern.into()),
128 metadata,
129 request_id,
130 auth_identity,
131 }
132 }
133
134 pub fn container(&self) -> &Container {
135 &self.container
136 }
137
138 pub fn transport(&self) -> &str {
139 &self.transport
140 }
141
142 pub fn pattern(&self) -> &str {
143 &self.pattern
144 }
145
146 pub fn metadata(&self) -> &TransportMetadata {
147 &self.metadata
148 }
149
150 pub fn request_id(&self) -> Option<&RequestId> {
151 self.request_id.as_ref()
152 }
153
154 pub fn auth_identity(&self) -> Option<&AuthIdentity> {
155 self.auth_identity.as_ref()
156 }
157
158 pub fn resolve<T>(&self) -> Result<Arc<T>>
159 where
160 T: Send + Sync + 'static,
161 {
162 self.container.resolve::<T>().map_err(anyhow::Error::new)
163 }
164}
165
166trait MessageHandler: Send + Sync + 'static {
167 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture;
168}
169
170trait EventHandler: Send + Sync + 'static {
171 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture;
172}
173
174struct TypedMessageHandler<Payload, Response, Handler, Fut> {
175 handler: Handler,
176 _marker: std::marker::PhantomData<fn(Payload, Response, Fut)>,
177}
178
179impl<Payload, Response, Handler, Fut> MessageHandler
180 for TypedMessageHandler<Payload, Response, Handler, Fut>
181where
182 Payload: DeserializeOwned + Send + 'static,
183 Response: Serialize + Send + 'static,
184 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
185 Fut: Future<Output = Result<Response>> + Send + 'static,
186{
187 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture {
188 let payload = serde_json::from_value::<Payload>(payload)
189 .context("Failed to deserialize message payload");
190
191 match payload {
192 Ok(payload) => {
193 let future = (self.handler)(payload, ctx);
194 Box::pin(async move {
195 let response = future.await?;
196 serde_json::to_value(response).context("Failed to serialize message response")
197 })
198 }
199 Err(err) => Box::pin(async move { Err(err) }),
200 }
201 }
202}
203
204struct TypedEventHandler<Payload, Handler, Fut> {
205 handler: Handler,
206 _marker: std::marker::PhantomData<fn(Payload, Fut)>,
207}
208
209impl<Payload, Handler, Fut> EventHandler for TypedEventHandler<Payload, Handler, Fut>
210where
211 Payload: DeserializeOwned + Send + 'static,
212 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
213 Fut: Future<Output = Result<()>> + Send + 'static,
214{
215 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture {
216 let payload = serde_json::from_value::<Payload>(payload)
217 .context("Failed to deserialize event payload");
218
219 match payload {
220 Ok(payload) => Box::pin((self.handler)(payload, ctx)),
221 Err(err) => Box::pin(async move { Err(err) }),
222 }
223 }
224}
225
226#[derive(Clone, Default)]
227pub struct MicroserviceRegistry {
228 message_handlers: Arc<HashMap<String, Arc<dyn MessageHandler>>>,
229 event_handlers: Arc<HashMap<String, Arc<dyn EventHandler>>>,
230}
231
232impl MicroserviceRegistry {
233 pub fn builder() -> MicroserviceRegistryBuilder {
234 MicroserviceRegistryBuilder::default()
235 }
236
237 pub async fn dispatch_message(
238 &self,
239 envelope: MessageEnvelope,
240 ctx: MicroserviceContext,
241 ) -> Result<Value> {
242 let handler = self
243 .message_handlers
244 .get(&envelope.pattern)
245 .with_context(|| format!("No message handler registered for `{}`", envelope.pattern))?;
246
247 handler.handle(envelope.payload, ctx).await
248 }
249
250 pub async fn dispatch_event(
251 &self,
252 envelope: EventEnvelope,
253 ctx: MicroserviceContext,
254 ) -> Result<()> {
255 let handler = self
256 .event_handlers
257 .get(&envelope.pattern)
258 .with_context(|| format!("No event handler registered for `{}`", envelope.pattern))?;
259
260 handler.handle(envelope.payload, ctx).await
261 }
262}
263
264#[derive(Default)]
265pub struct MicroserviceRegistryBuilder {
266 message_handlers: HashMap<String, Arc<dyn MessageHandler>>,
267 event_handlers: HashMap<String, Arc<dyn EventHandler>>,
268}
269
270impl MicroserviceRegistryBuilder {
271 pub fn message<Payload, Response, Handler, Fut>(
272 mut self,
273 pattern: impl Into<String>,
274 handler: Handler,
275 ) -> Self
276 where
277 Payload: DeserializeOwned + Send + 'static,
278 Response: Serialize + Send + 'static,
279 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
280 Fut: Future<Output = Result<Response>> + Send + 'static,
281 {
282 self.message_handlers.insert(
283 pattern.into(),
284 Arc::new(TypedMessageHandler::<Payload, Response, Handler, Fut> {
285 handler,
286 _marker: std::marker::PhantomData,
287 }),
288 );
289 self
290 }
291
292 pub fn event<Payload, Handler, Fut>(
293 mut self,
294 pattern: impl Into<String>,
295 handler: Handler,
296 ) -> Self
297 where
298 Payload: DeserializeOwned + Send + 'static,
299 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
300 Fut: Future<Output = Result<()>> + Send + 'static,
301 {
302 self.event_handlers.insert(
303 pattern.into(),
304 Arc::new(TypedEventHandler::<Payload, Handler, Fut> {
305 handler,
306 _marker: std::marker::PhantomData,
307 }),
308 );
309 self
310 }
311
312 pub fn build(self) -> MicroserviceRegistry {
313 MicroserviceRegistry {
314 message_handlers: Arc::new(self.message_handlers),
315 event_handlers: Arc::new(self.event_handlers),
316 }
317 }
318}
319
320#[derive(Clone)]
321pub struct InProcessMicroserviceClient {
322 container: Container,
323 registry: MicroserviceRegistry,
324 transport: Arc<str>,
325 metadata: TransportMetadata,
326}
327
328impl InProcessMicroserviceClient {
329 pub fn new(container: Container, registry: MicroserviceRegistry) -> Self {
330 Self {
331 container,
332 registry,
333 transport: Arc::<str>::from("in-process"),
334 metadata: TransportMetadata::default(),
335 }
336 }
337
338 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
339 self.transport = Arc::<str>::from(transport.into());
340 self
341 }
342
343 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
344 self.metadata = metadata;
345 self
346 }
347
348 fn context(&self, pattern: impl Into<String>) -> MicroserviceContext {
349 MicroserviceContext::new(
350 self.container.clone(),
351 self.transport.to_string(),
352 pattern,
353 self.metadata.clone(),
354 )
355 }
356}
357
358impl MicroserviceClient for InProcessMicroserviceClient {
359 fn send<Payload, Response>(
360 &self,
361 pattern: impl Into<String>,
362 payload: Payload,
363 ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
364 where
365 Payload: Serialize + Send + 'static,
366 Response: DeserializeOwned + Send + 'static,
367 {
368 let registry = self.registry.clone();
369 let pattern = pattern.into();
370 let envelope = MessageEnvelope::new(pattern.clone(), payload);
371 let context = self.context(pattern);
372
373 Box::pin(async move {
374 let response = registry.dispatch_message(envelope?, context).await?;
375 serde_json::from_value(response).context("Failed to deserialize microservice response")
376 })
377 }
378
379 fn emit<Payload>(
380 &self,
381 pattern: impl Into<String>,
382 payload: Payload,
383 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
384 where
385 Payload: Serialize + Send + 'static,
386 {
387 let registry = self.registry.clone();
388 let pattern = pattern.into();
389 let envelope = EventEnvelope::new(pattern.clone(), payload);
390 let context = self.context(pattern);
391
392 Box::pin(async move { registry.dispatch_event(envelope?, context).await })
393 }
394}