zus_rpc_server/
handler.rs1use {
2 futures::{SinkExt, StreamExt},
3 std::{sync::Arc, time::Instant},
4 tokio::net::TcpStream,
5 tokio_util::codec::Framed,
6 tracing::{debug, error, instrument},
7};
8
9use {
10 zus_common::{Result, RpcCodec, RpcMessage},
11 zus_proto::constants::*,
12};
13
14use crate::service::{RequestContext, ServiceRegistry};
15
16pub struct RpcHandler {
18 service_registry: Arc<ServiceRegistry>,
19}
20
21impl RpcHandler {
22 pub fn new(service_registry: Arc<ServiceRegistry>) -> Self {
23 Self { service_registry }
24 }
25
26 pub async fn handle(&self, mut framed: Framed<TcpStream, RpcCodec>) -> Result<()> {
27 while let Some(result) = framed.next().await {
28 match result {
29 | Ok(msg) => {
30 if msg.header.msg_type == MSG_TYPE_REQ {
31 let response = self.process_request(msg).await;
32 framed.send(response).await?;
33 } else if msg.header.msg_type == MSG_TYPE_NOTIFY {
34 let _ = self.process_request(msg).await;
36 }
37 }
38 | Err(e) => {
39 error!("Decode error: {:?}", e);
40 return Err(e);
41 }
42 }
43 }
44
45 Ok(())
46 }
47
48 #[instrument(name = "rpc_process_request", skip(self, msg), fields(sequence = msg.header.sequence))]
49 async fn process_request(&self, msg: RpcMessage) -> RpcMessage {
50 let sequence = msg.header.sequence;
51 let method_name = String::from_utf8_lossy(&msg.method);
52
53 debug!("Processing request: {}", method_name);
54
55 let start = Instant::now();
57
58 let context = RequestContext::default();
60
61 if method_name == ".CheckServerStatus" {
63 return RpcMessage::new_response(sequence, bytes::Bytes::from("OK"));
64 }
65
66 let (response_body, service_name) =
68 if let Some((service_name, service)) = self.service_registry.services().iter().next() {
69 self.service_registry.record_request(service_name);
71
72 let result = service.do_work(&method_name, msg.body, context).await;
73
74 let latency_ms = start.elapsed().as_millis() as u64;
76
77 match result {
78 | Ok(response) => {
79 self.service_registry.record_response(service_name, latency_ms, true);
81 (response, service_name.clone())
82 }
83 | Err(e) => {
84 error!("Service error: {:?}", e);
85 self.service_registry.record_response(service_name, latency_ms, false);
87 (bytes::Bytes::from(format!("Error: {e}")), service_name.clone())
88 }
89 }
90 } else {
91 (bytes::Bytes::from("No service registered"), String::new())
92 };
93
94 debug!("Request processed for service: {}", service_name);
95
96 RpcMessage::new_response(sequence, response_body)
97 }
98}