1use crate::{MicroserviceHandler, Transport, TransportError};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::net::SocketAddr;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::net::{TcpListener, TcpStream};
9
10#[derive(Clone, Debug)]
11pub struct TcpTransportOptions {
12 pub addr: SocketAddr,
13}
14
15impl TcpTransportOptions {
16 pub fn new(addr: SocketAddr) -> Self {
17 Self { addr }
18 }
19}
20
21#[derive(Clone)]
25pub struct TcpTransport {
26 options: TcpTransportOptions,
27 seq: Arc<AtomicU64>,
28}
29
30impl TcpTransport {
31 pub fn new(options: TcpTransportOptions) -> Self {
32 Self {
33 options,
34 seq: Arc::new(AtomicU64::new(1)),
35 }
36 }
37
38 fn next_id(&self) -> String {
39 self.seq.fetch_add(1, Ordering::Relaxed).to_string()
40 }
41}
42
43#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45enum PacketKind {
46 Send,
47 Emit,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51struct MicroserviceRequest {
52 id: String,
53 kind: PacketKind,
54 pattern: String,
55 payload: serde_json::Value,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59struct MicroserviceErrorPayload {
60 message: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 details: Option<serde_json::Value>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66struct MicroserviceResponse {
67 id: String,
68 ok: bool,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 payload: Option<serde_json::Value>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 error: Option<MicroserviceErrorPayload>,
73}
74
75#[async_trait]
76impl Transport for TcpTransport {
77 async fn send_json(
78 &self,
79 pattern: &str,
80 payload: serde_json::Value,
81 ) -> Result<serde_json::Value, TransportError> {
82 let id = self.next_id();
83 let req = MicroserviceRequest {
84 id: id.clone(),
85 kind: PacketKind::Send,
86 pattern: pattern.to_string(),
87 payload,
88 };
89
90 let mut stream = TcpStream::connect(self.options.addr)
91 .await
92 .map_err(|e| TransportError::new(format!("tcp transport connect failed: {e}")))?;
93
94 let line = serde_json::to_string(&req)
95 .map_err(|e| TransportError::new(format!("serialize request failed: {e}")))?;
96 stream
97 .write_all(line.as_bytes())
98 .await
99 .map_err(|e| TransportError::new(format!("write request failed: {e}")))?;
100 stream
101 .write_all(b"\n")
102 .await
103 .map_err(|e| TransportError::new(format!("write request newline failed: {e}")))?;
104 stream
105 .flush()
106 .await
107 .map_err(|e| TransportError::new(format!("flush request failed: {e}")))?;
108
109 let mut reader = BufReader::new(stream);
110 let mut resp_line = String::new();
111 let n = reader
112 .read_line(&mut resp_line)
113 .await
114 .map_err(|e| TransportError::new(format!("read response failed: {e}")))?;
115 if n == 0 {
116 return Err(TransportError::new("tcp transport: empty response"));
117 }
118 let resp: MicroserviceResponse = serde_json::from_str(resp_line.trim_end_matches('\n'))
119 .map_err(|e| TransportError::new(format!("deserialize response failed: {e}")))?;
120 if resp.id != id {
121 return Err(TransportError::new("tcp transport: response id mismatch"));
122 }
123 if resp.ok {
124 Ok(resp.payload.unwrap_or(serde_json::Value::Null))
125 } else {
126 let mut err = TransportError::new(
127 resp.error
128 .as_ref()
129 .map(|e| e.message.as_str())
130 .unwrap_or("microservice error"),
131 );
132 if let Some(details) = resp.error.and_then(|e| e.details) {
133 err = err.with_details(details);
134 }
135 Err(err)
136 }
137 }
138
139 async fn emit_json(
140 &self,
141 pattern: &str,
142 payload: serde_json::Value,
143 ) -> Result<(), TransportError> {
144 let id = self.next_id();
145 let req = MicroserviceRequest {
146 id,
147 kind: PacketKind::Emit,
148 pattern: pattern.to_string(),
149 payload,
150 };
151
152 let mut stream = TcpStream::connect(self.options.addr)
153 .await
154 .map_err(|e| TransportError::new(format!("tcp transport connect failed: {e}")))?;
155
156 let line = serde_json::to_string(&req)
157 .map_err(|e| TransportError::new(format!("serialize event failed: {e}")))?;
158 stream
159 .write_all(line.as_bytes())
160 .await
161 .map_err(|e| TransportError::new(format!("write event failed: {e}")))?;
162 stream
163 .write_all(b"\n")
164 .await
165 .map_err(|e| TransportError::new(format!("write event newline failed: {e}")))?;
166 stream
167 .flush()
168 .await
169 .map_err(|e| TransportError::new(format!("flush event failed: {e}")))?;
170 Ok(())
171 }
172}
173
174#[derive(Clone, Debug)]
175pub struct TcpMicroserviceOptions {
176 pub addr: SocketAddr,
177}
178
179impl TcpMicroserviceOptions {
180 pub fn new(addr: SocketAddr) -> Self {
181 Self { addr }
182 }
183}
184
185pub struct TcpMicroserviceServer {
186 options: TcpMicroserviceOptions,
187 handlers: Vec<Arc<dyn MicroserviceHandler>>,
188}
189
190impl TcpMicroserviceServer {
191 pub fn new(
192 options: TcpMicroserviceOptions,
193 handlers: Vec<Arc<dyn MicroserviceHandler>>,
194 ) -> Self {
195 Self { options, handlers }
196 }
197
198 pub async fn listen(self) -> Result<(), TransportError> {
199 self.listen_with_shutdown(std::future::pending::<()>())
200 .await
201 }
202
203 pub async fn listen_with_shutdown<F>(self, shutdown: F) -> Result<(), TransportError>
204 where
205 F: std::future::Future<Output = ()> + Send + 'static,
206 {
207 let listener = TcpListener::bind(self.options.addr)
208 .await
209 .map_err(|e| TransportError::new(format!("tcp microservice bind failed: {e}")))?;
210
211 let handlers = Arc::new(self.handlers);
212
213 tokio::pin!(shutdown);
214
215 loop {
216 tokio::select! {
217 _ = &mut shutdown => {
218 break;
219 }
220 accepted = listener.accept() => {
221 let (stream, _peer) = accepted
222 .map_err(|e| TransportError::new(format!("tcp microservice accept failed: {e}")))?;
223 let handlers = handlers.clone();
224 tokio::spawn(async move {
225 serve_connection(stream, handlers).await;
226 });
227 }
228 }
229 }
230
231 Ok(())
232 }
233}
234
235async fn serve_connection(stream: TcpStream, handlers: Arc<Vec<Arc<dyn MicroserviceHandler>>>) {
236 let (read_half, mut write_half) = stream.into_split();
237 let mut lines = BufReader::new(read_half).lines();
238
239 while let Ok(Some(line)) = lines.next_line().await {
240 let req: MicroserviceRequest = match serde_json::from_str(&line) {
241 Ok(v) => v,
242 Err(_) => {
243 let _ = write_half
245 .write_all(br#"{"id":"0","ok":false,"error":{"message":"invalid request"}}"#)
246 .await;
247 let _ = write_half.write_all(b"\n").await;
248 continue;
249 }
250 };
251
252 match req.kind {
253 PacketKind::Send => {
254 let res = dispatch_send(&handlers, &req.pattern, req.payload).await;
255 let wire = match res {
256 Ok(payload) => MicroserviceResponse {
257 id: req.id,
258 ok: true,
259 payload: Some(payload),
260 error: None,
261 },
262 Err(e) => MicroserviceResponse {
263 id: req.id,
264 ok: false,
265 payload: None,
266 error: Some(MicroserviceErrorPayload {
267 message: e.message,
268 details: e.details,
269 }),
270 },
271 };
272
273 if let Ok(text) = serde_json::to_string(&wire) {
274 let _ = write_half.write_all(text.as_bytes()).await;
275 let _ = write_half.write_all(b"\n").await;
276 }
277 }
278 PacketKind::Emit => {
279 dispatch_emit(&handlers, &req.pattern, req.payload).await;
280 }
281 }
282 }
283}
284
285async fn dispatch_send(
286 handlers: &[Arc<dyn MicroserviceHandler>],
287 pattern: &str,
288 payload: serde_json::Value,
289) -> Result<serde_json::Value, TransportError> {
290 for h in handlers {
291 if let Some(res) = h.handle_message(pattern, payload.clone()).await {
292 return res;
293 }
294 }
295 Err(TransportError::new(format!(
296 "no microservice handler for pattern `{pattern}`"
297 )))
298}
299
300async fn dispatch_emit(
301 handlers: &[Arc<dyn MicroserviceHandler>],
302 pattern: &str,
303 payload: serde_json::Value,
304) {
305 for h in handlers {
306 let _ = h.handle_event(pattern, payload.clone()).await;
307 }
308}
309
310#[async_trait]
311impl crate::MicroserviceServer for TcpMicroserviceServer {
312 async fn listen_with_shutdown(
313 self: Box<Self>,
314 shutdown: crate::ShutdownFuture,
315 ) -> Result<(), TransportError> {
316 (*self).listen_with_shutdown(shutdown).await
317 }
318}