Skip to main content

nestrs_microservices/
wire.rs

1//! Shared JSON wire format for Redis, Kafka, MQTT, RabbitMQ, custom transporters, and the JSON
2//! payloads carried by the gRPC adapter (`nestrs.microservices` proto).
3//!
4//! ## Format stability
5//!
6//! The JSON shapes are covered by **golden tests** in this crate (`tests/wire_conformance.rs` +
7//! `tests/fixtures/*.json`) and the **`wire_json`** **`cargo-fuzz`** target (`fuzz/fuzz_targets/wire_json.rs`).
8//! Bump [`WIRE_FORMAT_DOC_REVISION`] when you intentionally change fields or serialization so release
9//! notes can call out wire compatibility.
10//!
11//! Revision **`1`**: `WireKind` as snake_case strings (`send`, `emit`); `WireRequest` with optional
12//! `reply` and `correlation_id`; `WireResponse` with `ok`, optional `payload`, optional `error`
13//! (`message` + optional `details`).
14
15/// Human-readable revision for release notes and external integrators (not sent on the wire).
16pub const WIRE_FORMAT_DOC_REVISION: u32 = 1;
17
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::sync::Arc;
21
22use crate::{MicroserviceHandler, TransportError};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum WireKind {
27    Send,
28    Emit,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct WireRequest {
33    pub kind: WireKind,
34    pub pattern: String,
35    pub payload: Value,
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub reply: Option<String>,
38    /// Kafka (and similar) request–reply: reply record key for demux on a shared reply topic.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub correlation_id: Option<String>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct WireError {
45    pub message: String,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub details: Option<Value>,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct WireResponse {
52    pub ok: bool,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub payload: Option<Value>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub error: Option<WireError>,
57}
58
59pub async fn dispatch_send(
60    handlers: &[Arc<dyn MicroserviceHandler>],
61    pattern: &str,
62    payload: Value,
63) -> Result<Value, TransportError> {
64    for h in handlers {
65        if let Some(res) = h.handle_message(pattern, payload.clone()).await {
66            return res;
67        }
68    }
69    Err(TransportError::new(format!(
70        "no microservice handler for pattern `{pattern}`"
71    )))
72}
73
74pub async fn dispatch_emit(
75    handlers: &[Arc<dyn MicroserviceHandler>],
76    pattern: &str,
77    payload: Value,
78) {
79    for h in handlers {
80        let _ = h.handle_event(pattern, payload.clone()).await;
81    }
82}