1use std::{
2 collections::{BTreeMap, HashMap},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use nestforge_core::{AuthIdentity, Container, RequestId};
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11use serde_json::Value;
12
13type MessageFuture = Pin<Box<dyn Future<Output = Result<Value>> + Send>>;
14type EventFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
15
16pub trait MicroserviceClient: Send + Sync + 'static {
32 fn send<Payload, Response>(
43 &self,
44 pattern: impl Into<String>,
45 payload: Payload,
46 ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
47 where
48 Payload: Serialize + Send + 'static,
49 Response: DeserializeOwned + Send + 'static;
50
51 fn emit<Payload>(
62 &self,
63 pattern: impl Into<String>,
64 payload: Payload,
65 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
66 where
67 Payload: Serialize + Send + 'static;
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
77pub struct TransportMetadata {
78 pub values: BTreeMap<String, String>,
79}
80
81impl TransportMetadata {
82 pub fn new() -> Self {
86 Self::default()
87 }
88
89 pub fn insert(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
99 self.values.insert(key.into(), value.into());
100 self
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct MessageEnvelope {
112 pub pattern: String,
113 pub payload: Value,
114 #[serde(default)]
115 pub metadata: TransportMetadata,
116}
117
118impl MessageEnvelope {
119 pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
127 Ok(Self {
128 pattern: pattern.into(),
129 payload: serde_json::to_value(payload)
130 .context("Failed to serialize message payload")?,
131 metadata: TransportMetadata::default(),
132 })
133 }
134
135 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
139 self.metadata = metadata;
140 self
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct EventEnvelope {
152 pub pattern: String,
153 pub payload: Value,
154 #[serde(default)]
155 pub metadata: TransportMetadata,
156}
157
158impl EventEnvelope {
159 pub fn new(pattern: impl Into<String>, payload: impl Serialize) -> Result<Self> {
167 Ok(Self {
168 pattern: pattern.into(),
169 payload: serde_json::to_value(payload).context("Failed to serialize event payload")?,
170 metadata: TransportMetadata::default(),
171 })
172 }
173
174 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
178 self.metadata = metadata;
179 self
180 }
181}
182
183#[derive(Clone)]
190pub struct MicroserviceContext {
191 container: Container,
192 transport: Arc<str>,
193 pattern: Arc<str>,
194 metadata: TransportMetadata,
195 request_id: Option<RequestId>,
196 auth_identity: Option<AuthIdentity>,
197}
198
199impl MicroserviceContext {
200 pub fn new(
210 container: Container,
211 transport: impl Into<String>,
212 pattern: impl Into<String>,
213 metadata: TransportMetadata,
214 ) -> Self {
215 let request_id = container
216 .resolve::<RequestId>()
217 .ok()
218 .map(|value| (*value).clone());
219 let auth_identity = container
220 .resolve::<AuthIdentity>()
221 .ok()
222 .map(|value| (*value).clone());
223
224 Self {
225 container,
226 transport: Arc::<str>::from(transport.into()),
227 pattern: Arc::<str>::from(pattern.into()),
228 metadata,
229 request_id,
230 auth_identity,
231 }
232 }
233
234 pub fn container(&self) -> &Container {
238 &self.container
239 }
240
241 pub fn transport(&self) -> &str {
245 &self.transport
246 }
247
248 pub fn pattern(&self) -> &str {
252 &self.pattern
253 }
254
255 pub fn metadata(&self) -> &TransportMetadata {
259 &self.metadata
260 }
261
262 pub fn request_id(&self) -> Option<&RequestId> {
266 self.request_id.as_ref()
267 }
268
269 pub fn auth_identity(&self) -> Option<&AuthIdentity> {
270 self.auth_identity.as_ref()
271 }
272
273 pub fn resolve<T>(&self) -> Result<Arc<T>>
274 where
275 T: Send + Sync + 'static,
276 {
277 self.container.resolve::<T>().map_err(anyhow::Error::new)
278 }
279}
280
281trait MessageHandler: Send + Sync + 'static {
282 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture;
283}
284
285trait EventHandler: Send + Sync + 'static {
286 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture;
287}
288
289struct TypedMessageHandler<Payload, Response, Handler, Fut> {
290 handler: Handler,
291 _marker: std::marker::PhantomData<fn(Payload, Response, Fut)>,
292}
293
294impl<Payload, Response, Handler, Fut> MessageHandler
295 for TypedMessageHandler<Payload, Response, Handler, Fut>
296where
297 Payload: DeserializeOwned + Send + 'static,
298 Response: Serialize + Send + 'static,
299 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
300 Fut: Future<Output = Result<Response>> + Send + 'static,
301{
302 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> MessageFuture {
303 let payload = serde_json::from_value::<Payload>(payload)
304 .context("Failed to deserialize message payload");
305
306 match payload {
307 Ok(payload) => {
308 let future = (self.handler)(payload, ctx);
309 Box::pin(async move {
310 let response = future.await?;
311 serde_json::to_value(response).context("Failed to serialize message response")
312 })
313 }
314 Err(err) => Box::pin(async move { Err(err) }),
315 }
316 }
317}
318
319struct TypedEventHandler<Payload, Handler, Fut> {
320 handler: Handler,
321 _marker: std::marker::PhantomData<fn(Payload, Fut)>,
322}
323
324impl<Payload, Handler, Fut> EventHandler for TypedEventHandler<Payload, Handler, Fut>
325where
326 Payload: DeserializeOwned + Send + 'static,
327 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
328 Fut: Future<Output = Result<()>> + Send + 'static,
329{
330 fn handle(&self, payload: Value, ctx: MicroserviceContext) -> EventFuture {
331 let payload = serde_json::from_value::<Payload>(payload)
332 .context("Failed to deserialize event payload");
333
334 match payload {
335 Ok(payload) => Box::pin((self.handler)(payload, ctx)),
336 Err(err) => Box::pin(async move { Err(err) }),
337 }
338 }
339}
340
341#[derive(Clone, Default)]
342pub struct MicroserviceRegistry {
343 message_handlers: Arc<HashMap<String, Arc<dyn MessageHandler>>>,
344 event_handlers: Arc<HashMap<String, Arc<dyn EventHandler>>>,
345}
346
347impl MicroserviceRegistry {
348 pub fn builder() -> MicroserviceRegistryBuilder {
349 MicroserviceRegistryBuilder::default()
350 }
351
352 pub async fn dispatch_message(
353 &self,
354 envelope: MessageEnvelope,
355 ctx: MicroserviceContext,
356 ) -> Result<Value> {
357 let handler = self
358 .message_handlers
359 .get(&envelope.pattern)
360 .with_context(|| format!("No message handler registered for `{}`", envelope.pattern))?;
361
362 handler.handle(envelope.payload, ctx).await
363 }
364
365 pub async fn dispatch_event(
366 &self,
367 envelope: EventEnvelope,
368 ctx: MicroserviceContext,
369 ) -> Result<()> {
370 let handler = self
371 .event_handlers
372 .get(&envelope.pattern)
373 .with_context(|| format!("No event handler registered for `{}`", envelope.pattern))?;
374
375 handler.handle(envelope.payload, ctx).await
376 }
377}
378
379#[derive(Default)]
380pub struct MicroserviceRegistryBuilder {
381 message_handlers: HashMap<String, Arc<dyn MessageHandler>>,
382 event_handlers: HashMap<String, Arc<dyn EventHandler>>,
383}
384
385impl MicroserviceRegistryBuilder {
386 pub fn message<Payload, Response, Handler, Fut>(
387 mut self,
388 pattern: impl Into<String>,
389 handler: Handler,
390 ) -> Self
391 where
392 Payload: DeserializeOwned + Send + 'static,
393 Response: Serialize + Send + 'static,
394 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
395 Fut: Future<Output = Result<Response>> + Send + 'static,
396 {
397 self.message_handlers.insert(
398 pattern.into(),
399 Arc::new(TypedMessageHandler::<Payload, Response, Handler, Fut> {
400 handler,
401 _marker: std::marker::PhantomData,
402 }),
403 );
404 self
405 }
406
407 pub fn event<Payload, Handler, Fut>(
408 mut self,
409 pattern: impl Into<String>,
410 handler: Handler,
411 ) -> Self
412 where
413 Payload: DeserializeOwned + Send + 'static,
414 Handler: Fn(Payload, MicroserviceContext) -> Fut + Send + Sync + 'static,
415 Fut: Future<Output = Result<()>> + Send + 'static,
416 {
417 self.event_handlers.insert(
418 pattern.into(),
419 Arc::new(TypedEventHandler::<Payload, Handler, Fut> {
420 handler,
421 _marker: std::marker::PhantomData,
422 }),
423 );
424 self
425 }
426
427 pub fn build(self) -> MicroserviceRegistry {
428 MicroserviceRegistry {
429 message_handlers: Arc::new(self.message_handlers),
430 event_handlers: Arc::new(self.event_handlers),
431 }
432 }
433}
434
435#[derive(Clone)]
436pub struct InProcessMicroserviceClient {
437 container: Container,
438 registry: MicroserviceRegistry,
439 transport: Arc<str>,
440 metadata: TransportMetadata,
441}
442
443impl InProcessMicroserviceClient {
444 pub fn new(container: Container, registry: MicroserviceRegistry) -> Self {
445 Self {
446 container,
447 registry,
448 transport: Arc::<str>::from("in-process"),
449 metadata: TransportMetadata::default(),
450 }
451 }
452
453 pub fn with_transport(mut self, transport: impl Into<String>) -> Self {
454 self.transport = Arc::<str>::from(transport.into());
455 self
456 }
457
458 pub fn with_metadata(mut self, metadata: TransportMetadata) -> Self {
459 self.metadata = metadata;
460 self
461 }
462
463 fn context(&self, pattern: impl Into<String>) -> MicroserviceContext {
464 MicroserviceContext::new(
465 self.container.clone(),
466 self.transport.to_string(),
467 pattern,
468 self.metadata.clone(),
469 )
470 }
471}
472
473impl MicroserviceClient for InProcessMicroserviceClient {
474 fn send<Payload, Response>(
475 &self,
476 pattern: impl Into<String>,
477 payload: Payload,
478 ) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>>
479 where
480 Payload: Serialize + Send + 'static,
481 Response: DeserializeOwned + Send + 'static,
482 {
483 let registry = self.registry.clone();
484 let pattern = pattern.into();
485 let envelope = MessageEnvelope::new(pattern.clone(), payload);
486 let context = self.context(pattern);
487
488 Box::pin(async move {
489 let response = registry.dispatch_message(envelope?, context).await?;
490 serde_json::from_value(response).context("Failed to deserialize microservice response")
491 })
492 }
493
494 fn emit<Payload>(
495 &self,
496 pattern: impl Into<String>,
497 payload: Payload,
498 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
499 where
500 Payload: Serialize + Send + 'static,
501 {
502 let registry = self.registry.clone();
503 let pattern = pattern.into();
504 let envelope = EventEnvelope::new(pattern.clone(), payload);
505 let context = self.context(pattern);
506
507 Box::pin(async move { registry.dispatch_event(envelope?, context).await })
508 }
509}