Skip to main content

nestrs_microservices/
tcp.rs

1use crate::{MicroserviceHandler, Transport, TransportError};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::net::SocketAddr;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::net::{TcpListener, TcpStream};
9
10#[derive(Clone, Debug)]
11pub struct TcpTransportOptions {
12    pub addr: SocketAddr,
13}
14
15impl TcpTransportOptions {
16    pub fn new(addr: SocketAddr) -> Self {
17        Self { addr }
18    }
19}
20
21/// Simple JSON-over-TCP transport (NestJS `Transport.TCP` analogue).
22///
23/// Wire format: newline-delimited JSON.
24#[derive(Clone)]
25pub struct TcpTransport {
26    options: TcpTransportOptions,
27    seq: Arc<AtomicU64>,
28}
29
30impl TcpTransport {
31    pub fn new(options: TcpTransportOptions) -> Self {
32        Self {
33            options,
34            seq: Arc::new(AtomicU64::new(1)),
35        }
36    }
37
38    fn next_id(&self) -> String {
39        self.seq.fetch_add(1, Ordering::Relaxed).to_string()
40    }
41}
42
43#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45enum PacketKind {
46    Send,
47    Emit,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51struct MicroserviceRequest {
52    id: String,
53    kind: PacketKind,
54    pattern: String,
55    payload: serde_json::Value,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59struct MicroserviceErrorPayload {
60    message: String,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    details: Option<serde_json::Value>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66struct MicroserviceResponse {
67    id: String,
68    ok: bool,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    payload: Option<serde_json::Value>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    error: Option<MicroserviceErrorPayload>,
73}
74
75#[async_trait]
76impl Transport for TcpTransport {
77    async fn send_json(
78        &self,
79        pattern: &str,
80        payload: serde_json::Value,
81    ) -> Result<serde_json::Value, TransportError> {
82        let id = self.next_id();
83        let req = MicroserviceRequest {
84            id: id.clone(),
85            kind: PacketKind::Send,
86            pattern: pattern.to_string(),
87            payload,
88        };
89
90        let mut stream = TcpStream::connect(self.options.addr)
91            .await
92            .map_err(|e| TransportError::new(format!("tcp transport connect failed: {e}")))?;
93
94        let line = serde_json::to_string(&req)
95            .map_err(|e| TransportError::new(format!("serialize request failed: {e}")))?;
96        stream
97            .write_all(line.as_bytes())
98            .await
99            .map_err(|e| TransportError::new(format!("write request failed: {e}")))?;
100        stream
101            .write_all(b"\n")
102            .await
103            .map_err(|e| TransportError::new(format!("write request newline failed: {e}")))?;
104        stream
105            .flush()
106            .await
107            .map_err(|e| TransportError::new(format!("flush request failed: {e}")))?;
108
109        let mut reader = BufReader::new(stream);
110        let mut resp_line = String::new();
111        let n = reader
112            .read_line(&mut resp_line)
113            .await
114            .map_err(|e| TransportError::new(format!("read response failed: {e}")))?;
115        if n == 0 {
116            return Err(TransportError::new("tcp transport: empty response"));
117        }
118        let resp: MicroserviceResponse = serde_json::from_str(resp_line.trim_end_matches('\n'))
119            .map_err(|e| TransportError::new(format!("deserialize response failed: {e}")))?;
120        if resp.id != id {
121            return Err(TransportError::new("tcp transport: response id mismatch"));
122        }
123        if resp.ok {
124            Ok(resp.payload.unwrap_or(serde_json::Value::Null))
125        } else {
126            let mut err = TransportError::new(
127                resp.error
128                    .as_ref()
129                    .map(|e| e.message.as_str())
130                    .unwrap_or("microservice error"),
131            );
132            if let Some(details) = resp.error.and_then(|e| e.details) {
133                err = err.with_details(details);
134            }
135            Err(err)
136        }
137    }
138
139    async fn emit_json(
140        &self,
141        pattern: &str,
142        payload: serde_json::Value,
143    ) -> Result<(), TransportError> {
144        let id = self.next_id();
145        let req = MicroserviceRequest {
146            id,
147            kind: PacketKind::Emit,
148            pattern: pattern.to_string(),
149            payload,
150        };
151
152        let mut stream = TcpStream::connect(self.options.addr)
153            .await
154            .map_err(|e| TransportError::new(format!("tcp transport connect failed: {e}")))?;
155
156        let line = serde_json::to_string(&req)
157            .map_err(|e| TransportError::new(format!("serialize event failed: {e}")))?;
158        stream
159            .write_all(line.as_bytes())
160            .await
161            .map_err(|e| TransportError::new(format!("write event failed: {e}")))?;
162        stream
163            .write_all(b"\n")
164            .await
165            .map_err(|e| TransportError::new(format!("write event newline failed: {e}")))?;
166        stream
167            .flush()
168            .await
169            .map_err(|e| TransportError::new(format!("flush event failed: {e}")))?;
170        Ok(())
171    }
172}
173
174#[derive(Clone, Debug)]
175pub struct TcpMicroserviceOptions {
176    pub addr: SocketAddr,
177}
178
179impl TcpMicroserviceOptions {
180    pub fn new(addr: SocketAddr) -> Self {
181        Self { addr }
182    }
183}
184
185pub struct TcpMicroserviceServer {
186    options: TcpMicroserviceOptions,
187    handlers: Vec<Arc<dyn MicroserviceHandler>>,
188}
189
190impl TcpMicroserviceServer {
191    pub fn new(
192        options: TcpMicroserviceOptions,
193        handlers: Vec<Arc<dyn MicroserviceHandler>>,
194    ) -> Self {
195        Self { options, handlers }
196    }
197
198    pub async fn listen(self) -> Result<(), TransportError> {
199        self.listen_with_shutdown(std::future::pending::<()>())
200            .await
201    }
202
203    pub async fn listen_with_shutdown<F>(self, shutdown: F) -> Result<(), TransportError>
204    where
205        F: std::future::Future<Output = ()> + Send + 'static,
206    {
207        let listener = TcpListener::bind(self.options.addr)
208            .await
209            .map_err(|e| TransportError::new(format!("tcp microservice bind failed: {e}")))?;
210
211        let handlers = Arc::new(self.handlers);
212
213        tokio::pin!(shutdown);
214
215        loop {
216            tokio::select! {
217                _ = &mut shutdown => {
218                    break;
219                }
220                accepted = listener.accept() => {
221                    let (stream, _peer) = accepted
222                        .map_err(|e| TransportError::new(format!("tcp microservice accept failed: {e}")))?;
223                    let handlers = handlers.clone();
224                    tokio::spawn(async move {
225                        serve_connection(stream, handlers).await;
226                    });
227                }
228            }
229        }
230
231        Ok(())
232    }
233}
234
235async fn serve_connection(stream: TcpStream, handlers: Arc<Vec<Arc<dyn MicroserviceHandler>>>) {
236    let (read_half, mut write_half) = stream.into_split();
237    let mut lines = BufReader::new(read_half).lines();
238
239    while let Ok(Some(line)) = lines.next_line().await {
240        let req: MicroserviceRequest = match serde_json::from_str(&line) {
241            Ok(v) => v,
242            Err(_) => {
243                // best-effort error frame for malformed payloads
244                let _ = write_half
245                    .write_all(br#"{"id":"0","ok":false,"error":{"message":"invalid request"}}"#)
246                    .await;
247                let _ = write_half.write_all(b"\n").await;
248                continue;
249            }
250        };
251
252        match req.kind {
253            PacketKind::Send => {
254                let res = dispatch_send(&handlers, &req.pattern, req.payload).await;
255                let wire = match res {
256                    Ok(payload) => MicroserviceResponse {
257                        id: req.id,
258                        ok: true,
259                        payload: Some(payload),
260                        error: None,
261                    },
262                    Err(e) => MicroserviceResponse {
263                        id: req.id,
264                        ok: false,
265                        payload: None,
266                        error: Some(MicroserviceErrorPayload {
267                            message: e.message,
268                            details: e.details,
269                        }),
270                    },
271                };
272
273                if let Ok(text) = serde_json::to_string(&wire) {
274                    let _ = write_half.write_all(text.as_bytes()).await;
275                    let _ = write_half.write_all(b"\n").await;
276                }
277            }
278            PacketKind::Emit => {
279                dispatch_emit(&handlers, &req.pattern, req.payload).await;
280            }
281        }
282    }
283}
284
285async fn dispatch_send(
286    handlers: &[Arc<dyn MicroserviceHandler>],
287    pattern: &str,
288    payload: serde_json::Value,
289) -> Result<serde_json::Value, TransportError> {
290    for h in handlers {
291        if let Some(res) = h.handle_message(pattern, payload.clone()).await {
292            return res;
293        }
294    }
295    Err(TransportError::new(format!(
296        "no microservice handler for pattern `{pattern}`"
297    )))
298}
299
300async fn dispatch_emit(
301    handlers: &[Arc<dyn MicroserviceHandler>],
302    pattern: &str,
303    payload: serde_json::Value,
304) {
305    for h in handlers {
306        let _ = h.handle_event(pattern, payload.clone()).await;
307    }
308}
309
310#[async_trait]
311impl crate::MicroserviceServer for TcpMicroserviceServer {
312    async fn listen_with_shutdown(
313        self: Box<Self>,
314        shutdown: crate::ShutdownFuture,
315    ) -> Result<(), TransportError> {
316        (*self).listen_with_shutdown(shutdown).await
317    }
318}